6dcb8e440f016bf9084c5bc34b29f26c3aadae36
from test
import test_support
mswindows
= (sys
.platform
== "win32")
# Depends on the following external programs: Python
SETBINARY
= ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
# In a debug build, stuff like "[6580 refs]" is printed to stderr at
# shutdown time. That frustrates tests trying to check stderr produced
# from a spawned Python process.
def remove_stderr_debug_decorations(stderr
):
return re
.sub(r
"\[\d+ refs\]\r?\n?$", "", stderr
)
class ProcessTestCase(unittest
.TestCase
):
"""wrapper for mkstemp, calling mktemp if mkstemp is not available"""
if hasattr(tempfile
, "mkstemp"):
return tempfile
.mkstemp()
fname
= tempfile
.mktemp()
return os
.open(fname
, os
.O_RDWR|os
.O_CREAT
), fname
# call() function with sequence argument
rc
= subprocess
.call([sys
.executable
, "-c",
"import sys; sys.exit(47)"])
def test_call_kwargs(self
):
# call() function with keyword args
newenv
= os
.environ
.copy()
newenv
["FRUIT"] = "banana"
rc
= subprocess
.call([sys
.executable
, "-c",
'sys.exit(os.getenv("FRUIT")=="banana")'],
def test_stdin_none(self
):
# .stdin is None when not redirected
p
= subprocess
.Popen([sys
.executable
, "-c", 'print "banana"'],
stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
)
self
.assertEqual(p
.stdin
, None)
def test_stdout_none(self
):
# .stdout is None when not redirected
p
= subprocess
.Popen([sys
.executable
, "-c",
'print " this bit of output is from a '
'test of stdout in a different '
stdin
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
)
self
.assertEqual(p
.stdout
, None)
def test_stderr_none(self
):
# .stderr is None when not redirected
p
= subprocess
.Popen([sys
.executable
, "-c", 'print "banana"'],
stdin
=subprocess
.PIPE
, stdout
=subprocess
.PIPE
)
self
.assertEqual(p
.stderr
, None)
def test_executable(self
):
p
= subprocess
.Popen(["somethingyoudonthave",
"-c", "import sys; sys.exit(47)"],
executable
=sys
.executable
)
self
.assertEqual(p
.returncode
, 47)
def test_stdin_pipe(self
):
p
= subprocess
.Popen([sys
.executable
, "-c",
'import sys; sys.exit(sys.stdin.read() == "pear")'],
self
.assertEqual(p
.returncode
, 1)
def test_stdin_filedes(self
):
# stdin is set to open file descriptor
tf
= tempfile
.TemporaryFile()
p
= subprocess
.Popen([sys
.executable
, "-c",
'import sys; sys.exit(sys.stdin.read() == "pear")'],
self
.assertEqual(p
.returncode
, 1)
def test_stdin_fileobj(self
):
# stdin is set to open file object
tf
= tempfile
.TemporaryFile()
p
= subprocess
.Popen([sys
.executable
, "-c",
'import sys; sys.exit(sys.stdin.read() == "pear")'],
self
.assertEqual(p
.returncode
, 1)
def test_stdout_pipe(self
):
p
= subprocess
.Popen([sys
.executable
, "-c",
'import sys; sys.stdout.write("orange")'],
self
.assertEqual(p
.stdout
.read(), "orange")
def test_stdout_filedes(self
):
# stdout is set to open file descriptor
tf
= tempfile
.TemporaryFile()
p
= subprocess
.Popen([sys
.executable
, "-c",
'import sys; sys.stdout.write("orange")'],
self
.assertEqual(os
.read(d
, 1024), "orange")
def test_stdout_fileobj(self
):
# stdout is set to open file object
tf
= tempfile
.TemporaryFile()
p
= subprocess
.Popen([sys
.executable
, "-c",
'import sys; sys.stdout.write("orange")'],
self
.assertEqual(tf
.read(), "orange")
def test_stderr_pipe(self
):
p
= subprocess
.Popen([sys
.executable
, "-c",
'import sys; sys.stderr.write("strawberry")'],
self
.assertEqual(remove_stderr_debug_decorations(p
.stderr
.read()),
def test_stderr_filedes(self
):
# stderr is set to open file descriptor
tf
= tempfile
.TemporaryFile()
p
= subprocess
.Popen([sys
.executable
, "-c",
'import sys; sys.stderr.write("strawberry")'],
self
.assertEqual(remove_stderr_debug_decorations(os
.read(d
, 1024)),
def test_stderr_fileobj(self
):
# stderr is set to open file object
tf
= tempfile
.TemporaryFile()
p
= subprocess
.Popen([sys
.executable
, "-c",
'import sys; sys.stderr.write("strawberry")'],
self
.assertEqual(remove_stderr_debug_decorations(tf
.read()),
def test_stdout_stderr_pipe(self
):
# capture stdout and stderr to the same pipe
p
= subprocess
.Popen([sys
.executable
, "-c",
'sys.stdout.write("apple");' \
'sys.stderr.write("orange")'],
stderr
=subprocess
.STDOUT
)
stripped
= remove_stderr_debug_decorations(output
)
self
.assertEqual(stripped
, "appleorange")
def test_stdout_stderr_file(self
):
# capture stdout and stderr to the same open file
tf
= tempfile
.TemporaryFile()
p
= subprocess
.Popen([sys
.executable
, "-c",
'sys.stdout.write("apple");' \
'sys.stderr.write("orange")'],
stripped
= remove_stderr_debug_decorations(output
)
self
.assertEqual(stripped
, "appleorange")
tmpdir
= os
.getenv("TEMP", "/tmp")
# We cannot use os.path.realpath to canonicalize the path,
# since it doesn't expand Tru64 {memb} strings. See bug 1063571.
p
= subprocess
.Popen([sys
.executable
, "-c",
'sys.stdout.write(os.getcwd())'],
normcase
= os
.path
.normcase
self
.assertEqual(normcase(p
.stdout
.read()), normcase(tmpdir
))
newenv
= os
.environ
.copy()
newenv
["FRUIT"] = "orange"
p
= subprocess
.Popen([sys
.executable
, "-c",
'sys.stdout.write(os.getenv("FRUIT"))'],
self
.assertEqual(p
.stdout
.read(), "orange")
def test_communicate(self
):
p
= subprocess
.Popen([sys
.executable
, "-c",
'sys.stderr.write("pineapple");' \
'sys.stdout.write(sys.stdin.read())'],
(stdout
, stderr
) = p
.communicate("banana")
self
.assertEqual(stdout
, "banana")
self
.assertEqual(remove_stderr_debug_decorations(stderr
),
def test_communicate_returns(self
):
# communicate() should return None if no redirection is active
p
= subprocess
.Popen([sys
.executable
, "-c",
"import sys; sys.exit(47)"])
(stdout
, stderr
) = p
.communicate()
self
.assertEqual(stdout
, None)
self
.assertEqual(stderr
, None)
def test_communicate_pipe_buf(self
):
# communicate() with writes larger than pipe_buf
# This test will probably deadlock rather than fail, if
# communicate() does not work properly.
pipe_buf
= os
.fpathconf(x
, "PC_PIPE_BUF")
p
= subprocess
.Popen([sys
.executable
, "-c",
'sys.stdout.write(sys.stdin.read(47));' \
'sys.stderr.write("xyz"*%d);' \
'sys.stdout.write(sys.stdin.read())' % pipe_buf
],
string_to_write
= "abc"*pipe_buf
(stdout
, stderr
) = p
.communicate(string_to_write
)
self
.assertEqual(stdout
, string_to_write
)
def test_writes_before_communicate(self
):
# stdin.write before communicate()
p
= subprocess
.Popen([sys
.executable
, "-c",
'sys.stdout.write(sys.stdin.read())'],
(stdout
, stderr
) = p
.communicate("split")
self
.assertEqual(stdout
, "bananasplit")
self
.assertEqual(remove_stderr_debug_decorations(stderr
), "")
def test_universal_newlines(self
):
p
= subprocess
.Popen([sys
.executable
, "-c",
'import sys,os;' + SETBINARY
+
'sys.stdout.write("line1\\n");'
'sys.stdout.write("line2\\r");'
'sys.stdout.write("line3\\r\\n");'
'sys.stdout.write("line4\\r");'
'sys.stdout.write("\\nline5");'
'sys.stdout.write("\\nline6");'],
if hasattr(open, 'newlines'):
# Interpreter with universal newline support
"line1\nline2\nline3\nline4\nline5\nline6")
# Interpreter without universal newline support
"line1\nline2\rline3\r\nline4\r\nline5\nline6")
def test_universal_newlines_communicate(self
):
# universal newlines through communicate()
p
= subprocess
.Popen([sys
.executable
, "-c",
'import sys,os;' + SETBINARY
+
'sys.stdout.write("line1\\n");'
'sys.stdout.write("line2\\r");'
'sys.stdout.write("line3\\r\\n");'
'sys.stdout.write("line4\\r");'
'sys.stdout.write("\\nline5");'
'sys.stdout.write("\\nline6");'],
stdout
=subprocess
.PIPE
, stderr
=subprocess
.PIPE
,
(stdout
, stderr
) = p
.communicate()
if hasattr(open, 'newlines'):
# Interpreter with universal newline support
"line1\nline2\nline3\nline4\nline5\nline6")
# Interpreter without universal newline support
self
.assertEqual(stdout
, "line1\nline2\rline3\r\nline4\r\nline5\nline6")
def test_no_leaking(self
):
# Make sure we leak no resources
if test_support
.is_resource_enabled("subprocess") and not mswindows
:
max_handles
= 1026 # too much for most UNIX systems
for i
in range(max_handles
):
p
= subprocess
.Popen([sys
.executable
, "-c",
"import sys;sys.stdout.write(sys.stdin.read())"],
data
= p
.communicate("lime")[0]
self
.assertEqual(data
, "lime")
def test_list2cmdline(self
):
self
.assertEqual(subprocess
.list2cmdline(['a b c', 'd', 'e']),
self
.assertEqual(subprocess
.list2cmdline(['ab"c', '\\', 'd']),
self
.assertEqual(subprocess
.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
self
.assertEqual(subprocess
.list2cmdline(['a\\"b', 'c', 'd']),
self
.assertEqual(subprocess
.list2cmdline(['a\\\\b c', 'd', 'e']),
self
.assertEqual(subprocess
.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
p
= subprocess
.Popen([sys
.executable
,
"-c", "import time; time.sleep(1)"])
# We expect that the poll loop probably went around about 10 times,
# but, based on system scheduling we can't control, it's possible
# poll() never returned None. It "should be" very rare that it
# didn't go around at least twice.
# Subsequent invocations should just return the returncode
self
.assertEqual(p
.poll(), 0)
p
= subprocess
.Popen([sys
.executable
,
"-c", "import time; time.sleep(2)"])
self
.assertEqual(p
.wait(), 0)
# Subsequent invocations should just return the returncode
self
.assertEqual(p
.wait(), 0)
def test_invalid_bufsize(self
):
# an invalid type of the bufsize argument should raise
subprocess
.Popen([sys
.executable
, "-c", "pass"], "orange")
self
.fail("Expected TypeError")
def test_exceptions(self
):
# catched & re-raised exceptions
p
= subprocess
.Popen([sys
.executable
, "-c", ""],
cwd
="/this/path/does/not/exist")
# The attribute child_traceback should contain "os.chdir"
self
.assertNotEqual(e
.child_traceback
.find("os.chdir"), -1)
self
.fail("Expected OSError")
def test_run_abort(self
):
# returncode handles signal termination
p
= subprocess
.Popen([sys
.executable
,
"-c", "import os; os.abort()"])
self
.assertEqual(-p
.returncode
, signal
.SIGABRT
)
p
= subprocess
.Popen([sys
.executable
, "-c",
'sys.stdout.write(os.getenv("FRUIT"))'],
preexec_fn
=lambda: os
.putenv("FRUIT", "apple"))
self
.assertEqual(p
.stdout
.read(), "apple")
def test_args_string(self
):
f
, fname
= self
.mkstemp()
os
.write(f
, "#!/bin/sh\n")
os
.write(f
, "exec %s -c 'import sys; sys.exit(47)'\n" %
p
= subprocess
.Popen(fname
)
self
.assertEqual(p
.returncode
, 47)
def test_invalid_args(self
):
# invalid arguments should raise ValueError
self
.assertRaises(ValueError, subprocess
.call
,
"-c", "import sys; sys.exit(47)"],
self
.assertRaises(ValueError, subprocess
.call
,
"-c", "import sys; sys.exit(47)"],
def test_shell_sequence(self
):
# Run command through the shell (sequence)
newenv
= os
.environ
.copy()
newenv
["FRUIT"] = "apple"
p
= subprocess
.Popen(["echo $FRUIT"], shell
=1,
self
.assertEqual(p
.stdout
.read().strip(), "apple")
def test_shell_string(self
):
# Run command through the shell (string)
newenv
= os
.environ
.copy()
newenv
["FRUIT"] = "apple"
p
= subprocess
.Popen("echo $FRUIT", shell
=1,
self
.assertEqual(p
.stdout
.read().strip(), "apple")
def test_call_string(self
):
# call() function with string argument on UNIX
f
, fname
= self
.mkstemp()
os
.write(f
, "#!/bin/sh\n")
os
.write(f
, "exec %s -c 'import sys; sys.exit(47)'\n" %
rc
= subprocess
.call(fname
)
def test_startupinfo(self
):
# We uses hardcoded constants, because we do not want to
startupinfo
= subprocess
.STARTUPINFO()
startupinfo
.dwFlags
= STARTF_USESHOWWINDOW
startupinfo
.wShowWindow
= SW_MAXIMIZE
# Since Python is a console process, it won't be affected
# by wShowWindow, but the argument should be silently
subprocess
.call([sys
.executable
, "-c", "import sys; sys.exit(0)"],
def test_creationflags(self
):
sys
.stderr
.write(" a DOS box should flash briefly ...\n")
subprocess
.call(sys
.executable
+
' -c "import time; time.sleep(0.25)"',
creationflags
=CREATE_NEW_CONSOLE
)
def test_invalid_args(self
):
# invalid arguments should raise ValueError
self
.assertRaises(ValueError, subprocess
.call
,
"-c", "import sys; sys.exit(47)"],
self
.assertRaises(ValueError, subprocess
.call
,
"-c", "import sys; sys.exit(47)"],
def test_shell_sequence(self
):
# Run command through the shell (sequence)
newenv
= os
.environ
.copy()
newenv
["FRUIT"] = "physalis"
p
= subprocess
.Popen(["set"], shell
=1,
self
.assertNotEqual(p
.stdout
.read().find("physalis"), -1)
def test_shell_string(self
):
# Run command through the shell (string)
newenv
= os
.environ
.copy()
newenv
["FRUIT"] = "physalis"
p
= subprocess
.Popen("set", shell
=1,
self
.assertNotEqual(p
.stdout
.read().find("physalis"), -1)
def test_call_string(self
):
# call() function with string argument on Windows
rc
= subprocess
.call(sys
.executable
+
' -c "import sys; sys.exit(47)"')
test_support
.run_unittest(ProcessTestCase
)
if __name__
== "__main__":