Initial commit of OpenSPARC T2 architecture model.
[OpenSPARC-T2-SAM] / sam-t2 / devtools / v8plus / lib / python2.4 / test / test_subprocess.py
CommitLineData
920dae64
AT
1import unittest
2from test import test_support
3import subprocess
4import sys
5import signal
6import os
7import tempfile
8import time
9import re
10
11mswindows = (sys.platform == "win32")
12
13#
14# Depends on the following external programs: Python
15#
16
17if mswindows:
18 SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
19 'os.O_BINARY);')
20else:
21 SETBINARY = ''
22
23# In a debug build, stuff like "[6580 refs]" is printed to stderr at
24# shutdown time. That frustrates tests trying to check stderr produced
25# from a spawned Python process.
26def remove_stderr_debug_decorations(stderr):
27 return re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)
28
29class ProcessTestCase(unittest.TestCase):
30 def mkstemp(self):
31 """wrapper for mkstemp, calling mktemp if mkstemp is not available"""
32 if hasattr(tempfile, "mkstemp"):
33 return tempfile.mkstemp()
34 else:
35 fname = tempfile.mktemp()
36 return os.open(fname, os.O_RDWR|os.O_CREAT), fname
37
38 #
39 # Generic tests
40 #
41 def test_call_seq(self):
42 # call() function with sequence argument
43 rc = subprocess.call([sys.executable, "-c",
44 "import sys; sys.exit(47)"])
45 self.assertEqual(rc, 47)
46
47 def test_call_kwargs(self):
48 # call() function with keyword args
49 newenv = os.environ.copy()
50 newenv["FRUIT"] = "banana"
51 rc = subprocess.call([sys.executable, "-c",
52 'import sys, os;' \
53 'sys.exit(os.getenv("FRUIT")=="banana")'],
54 env=newenv)
55 self.assertEqual(rc, 1)
56
57 def test_stdin_none(self):
58 # .stdin is None when not redirected
59 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
60 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
61 p.wait()
62 self.assertEqual(p.stdin, None)
63
64 def test_stdout_none(self):
65 # .stdout is None when not redirected
66 p = subprocess.Popen([sys.executable, "-c",
67 'print " this bit of output is from a '
68 'test of stdout in a different '
69 'process ..."'],
70 stdin=subprocess.PIPE, stderr=subprocess.PIPE)
71 p.wait()
72 self.assertEqual(p.stdout, None)
73
74 def test_stderr_none(self):
75 # .stderr is None when not redirected
76 p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
77 stdin=subprocess.PIPE, stdout=subprocess.PIPE)
78 p.wait()
79 self.assertEqual(p.stderr, None)
80
81 def test_executable(self):
82 p = subprocess.Popen(["somethingyoudonthave",
83 "-c", "import sys; sys.exit(47)"],
84 executable=sys.executable)
85 p.wait()
86 self.assertEqual(p.returncode, 47)
87
88 def test_stdin_pipe(self):
89 # stdin redirection
90 p = subprocess.Popen([sys.executable, "-c",
91 'import sys; sys.exit(sys.stdin.read() == "pear")'],
92 stdin=subprocess.PIPE)
93 p.stdin.write("pear")
94 p.stdin.close()
95 p.wait()
96 self.assertEqual(p.returncode, 1)
97
98 def test_stdin_filedes(self):
99 # stdin is set to open file descriptor
100 tf = tempfile.TemporaryFile()
101 d = tf.fileno()
102 os.write(d, "pear")
103 os.lseek(d, 0, 0)
104 p = subprocess.Popen([sys.executable, "-c",
105 'import sys; sys.exit(sys.stdin.read() == "pear")'],
106 stdin=d)
107 p.wait()
108 self.assertEqual(p.returncode, 1)
109
110 def test_stdin_fileobj(self):
111 # stdin is set to open file object
112 tf = tempfile.TemporaryFile()
113 tf.write("pear")
114 tf.seek(0)
115 p = subprocess.Popen([sys.executable, "-c",
116 'import sys; sys.exit(sys.stdin.read() == "pear")'],
117 stdin=tf)
118 p.wait()
119 self.assertEqual(p.returncode, 1)
120
121 def test_stdout_pipe(self):
122 # stdout redirection
123 p = subprocess.Popen([sys.executable, "-c",
124 'import sys; sys.stdout.write("orange")'],
125 stdout=subprocess.PIPE)
126 self.assertEqual(p.stdout.read(), "orange")
127
128 def test_stdout_filedes(self):
129 # stdout is set to open file descriptor
130 tf = tempfile.TemporaryFile()
131 d = tf.fileno()
132 p = subprocess.Popen([sys.executable, "-c",
133 'import sys; sys.stdout.write("orange")'],
134 stdout=d)
135 p.wait()
136 os.lseek(d, 0, 0)
137 self.assertEqual(os.read(d, 1024), "orange")
138
139 def test_stdout_fileobj(self):
140 # stdout is set to open file object
141 tf = tempfile.TemporaryFile()
142 p = subprocess.Popen([sys.executable, "-c",
143 'import sys; sys.stdout.write("orange")'],
144 stdout=tf)
145 p.wait()
146 tf.seek(0)
147 self.assertEqual(tf.read(), "orange")
148
149 def test_stderr_pipe(self):
150 # stderr redirection
151 p = subprocess.Popen([sys.executable, "-c",
152 'import sys; sys.stderr.write("strawberry")'],
153 stderr=subprocess.PIPE)
154 self.assertEqual(remove_stderr_debug_decorations(p.stderr.read()),
155 "strawberry")
156
157 def test_stderr_filedes(self):
158 # stderr is set to open file descriptor
159 tf = tempfile.TemporaryFile()
160 d = tf.fileno()
161 p = subprocess.Popen([sys.executable, "-c",
162 'import sys; sys.stderr.write("strawberry")'],
163 stderr=d)
164 p.wait()
165 os.lseek(d, 0, 0)
166 self.assertEqual(remove_stderr_debug_decorations(os.read(d, 1024)),
167 "strawberry")
168
169 def test_stderr_fileobj(self):
170 # stderr is set to open file object
171 tf = tempfile.TemporaryFile()
172 p = subprocess.Popen([sys.executable, "-c",
173 'import sys; sys.stderr.write("strawberry")'],
174 stderr=tf)
175 p.wait()
176 tf.seek(0)
177 self.assertEqual(remove_stderr_debug_decorations(tf.read()),
178 "strawberry")
179
180 def test_stdout_stderr_pipe(self):
181 # capture stdout and stderr to the same pipe
182 p = subprocess.Popen([sys.executable, "-c",
183 'import sys;' \
184 'sys.stdout.write("apple");' \
185 'sys.stdout.flush();' \
186 'sys.stderr.write("orange")'],
187 stdout=subprocess.PIPE,
188 stderr=subprocess.STDOUT)
189 output = p.stdout.read()
190 stripped = remove_stderr_debug_decorations(output)
191 self.assertEqual(stripped, "appleorange")
192
193 def test_stdout_stderr_file(self):
194 # capture stdout and stderr to the same open file
195 tf = tempfile.TemporaryFile()
196 p = subprocess.Popen([sys.executable, "-c",
197 'import sys;' \
198 'sys.stdout.write("apple");' \
199 'sys.stdout.flush();' \
200 'sys.stderr.write("orange")'],
201 stdout=tf,
202 stderr=tf)
203 p.wait()
204 tf.seek(0)
205 output = tf.read()
206 stripped = remove_stderr_debug_decorations(output)
207 self.assertEqual(stripped, "appleorange")
208
209 def test_cwd(self):
210 tmpdir = os.getenv("TEMP", "/tmp")
211 # We cannot use os.path.realpath to canonicalize the path,
212 # since it doesn't expand Tru64 {memb} strings. See bug 1063571.
213 cwd = os.getcwd()
214 os.chdir(tmpdir)
215 tmpdir = os.getcwd()
216 os.chdir(cwd)
217 p = subprocess.Popen([sys.executable, "-c",
218 'import sys,os;' \
219 'sys.stdout.write(os.getcwd())'],
220 stdout=subprocess.PIPE,
221 cwd=tmpdir)
222 normcase = os.path.normcase
223 self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))
224
225 def test_env(self):
226 newenv = os.environ.copy()
227 newenv["FRUIT"] = "orange"
228 p = subprocess.Popen([sys.executable, "-c",
229 'import sys,os;' \
230 'sys.stdout.write(os.getenv("FRUIT"))'],
231 stdout=subprocess.PIPE,
232 env=newenv)
233 self.assertEqual(p.stdout.read(), "orange")
234
235 def test_communicate(self):
236 p = subprocess.Popen([sys.executable, "-c",
237 'import sys,os;' \
238 'sys.stderr.write("pineapple");' \
239 'sys.stdout.write(sys.stdin.read())'],
240 stdin=subprocess.PIPE,
241 stdout=subprocess.PIPE,
242 stderr=subprocess.PIPE)
243 (stdout, stderr) = p.communicate("banana")
244 self.assertEqual(stdout, "banana")
245 self.assertEqual(remove_stderr_debug_decorations(stderr),
246 "pineapple")
247
248 def test_communicate_returns(self):
249 # communicate() should return None if no redirection is active
250 p = subprocess.Popen([sys.executable, "-c",
251 "import sys; sys.exit(47)"])
252 (stdout, stderr) = p.communicate()
253 self.assertEqual(stdout, None)
254 self.assertEqual(stderr, None)
255
256 def test_communicate_pipe_buf(self):
257 # communicate() with writes larger than pipe_buf
258 # This test will probably deadlock rather than fail, if
259 # communicate() does not work properly.
260 x, y = os.pipe()
261 if mswindows:
262 pipe_buf = 512
263 else:
264 pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
265 os.close(x)
266 os.close(y)
267 p = subprocess.Popen([sys.executable, "-c",
268 'import sys,os;'
269 'sys.stdout.write(sys.stdin.read(47));' \
270 'sys.stderr.write("xyz"*%d);' \
271 'sys.stdout.write(sys.stdin.read())' % pipe_buf],
272 stdin=subprocess.PIPE,
273 stdout=subprocess.PIPE,
274 stderr=subprocess.PIPE)
275 string_to_write = "abc"*pipe_buf
276 (stdout, stderr) = p.communicate(string_to_write)
277 self.assertEqual(stdout, string_to_write)
278
279 def test_writes_before_communicate(self):
280 # stdin.write before communicate()
281 p = subprocess.Popen([sys.executable, "-c",
282 'import sys,os;' \
283 'sys.stdout.write(sys.stdin.read())'],
284 stdin=subprocess.PIPE,
285 stdout=subprocess.PIPE,
286 stderr=subprocess.PIPE)
287 p.stdin.write("banana")
288 (stdout, stderr) = p.communicate("split")
289 self.assertEqual(stdout, "bananasplit")
290 self.assertEqual(remove_stderr_debug_decorations(stderr), "")
291
292 def test_universal_newlines(self):
293 p = subprocess.Popen([sys.executable, "-c",
294 'import sys,os;' + SETBINARY +
295 'sys.stdout.write("line1\\n");'
296 'sys.stdout.flush();'
297 'sys.stdout.write("line2\\r");'
298 'sys.stdout.flush();'
299 'sys.stdout.write("line3\\r\\n");'
300 'sys.stdout.flush();'
301 'sys.stdout.write("line4\\r");'
302 'sys.stdout.flush();'
303 'sys.stdout.write("\\nline5");'
304 'sys.stdout.flush();'
305 'sys.stdout.write("\\nline6");'],
306 stdout=subprocess.PIPE,
307 universal_newlines=1)
308 stdout = p.stdout.read()
309 if hasattr(open, 'newlines'):
310 # Interpreter with universal newline support
311 self.assertEqual(stdout,
312 "line1\nline2\nline3\nline4\nline5\nline6")
313 else:
314 # Interpreter without universal newline support
315 self.assertEqual(stdout,
316 "line1\nline2\rline3\r\nline4\r\nline5\nline6")
317
318 def test_universal_newlines_communicate(self):
319 # universal newlines through communicate()
320 p = subprocess.Popen([sys.executable, "-c",
321 'import sys,os;' + SETBINARY +
322 'sys.stdout.write("line1\\n");'
323 'sys.stdout.flush();'
324 'sys.stdout.write("line2\\r");'
325 'sys.stdout.flush();'
326 'sys.stdout.write("line3\\r\\n");'
327 'sys.stdout.flush();'
328 'sys.stdout.write("line4\\r");'
329 'sys.stdout.flush();'
330 'sys.stdout.write("\\nline5");'
331 'sys.stdout.flush();'
332 'sys.stdout.write("\\nline6");'],
333 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
334 universal_newlines=1)
335 (stdout, stderr) = p.communicate()
336 if hasattr(open, 'newlines'):
337 # Interpreter with universal newline support
338 self.assertEqual(stdout,
339 "line1\nline2\nline3\nline4\nline5\nline6")
340 else:
341 # Interpreter without universal newline support
342 self.assertEqual(stdout, "line1\nline2\rline3\r\nline4\r\nline5\nline6")
343
344 def test_no_leaking(self):
345 # Make sure we leak no resources
346 if test_support.is_resource_enabled("subprocess") and not mswindows:
347 max_handles = 1026 # too much for most UNIX systems
348 else:
349 max_handles = 65
350 for i in range(max_handles):
351 p = subprocess.Popen([sys.executable, "-c",
352 "import sys;sys.stdout.write(sys.stdin.read())"],
353 stdin=subprocess.PIPE,
354 stdout=subprocess.PIPE,
355 stderr=subprocess.PIPE)
356 data = p.communicate("lime")[0]
357 self.assertEqual(data, "lime")
358
359
360 def test_list2cmdline(self):
361 self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
362 '"a b c" d e')
363 self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
364 'ab\\"c \\ d')
365 self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
366 'a\\\\\\b "de fg" h')
367 self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
368 'a\\\\\\"b c d')
369 self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
370 '"a\\\\b c" d e')
371 self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
372 '"a\\\\b\\ c" d e')
373
374
375 def test_poll(self):
376 p = subprocess.Popen([sys.executable,
377 "-c", "import time; time.sleep(1)"])
378 count = 0
379 while p.poll() is None:
380 time.sleep(0.1)
381 count += 1
382 # We expect that the poll loop probably went around about 10 times,
383 # but, based on system scheduling we can't control, it's possible
384 # poll() never returned None. It "should be" very rare that it
385 # didn't go around at least twice.
386 self.assert_(count >= 2)
387 # Subsequent invocations should just return the returncode
388 self.assertEqual(p.poll(), 0)
389
390
391 def test_wait(self):
392 p = subprocess.Popen([sys.executable,
393 "-c", "import time; time.sleep(2)"])
394 self.assertEqual(p.wait(), 0)
395 # Subsequent invocations should just return the returncode
396 self.assertEqual(p.wait(), 0)
397
398
399 def test_invalid_bufsize(self):
400 # an invalid type of the bufsize argument should raise
401 # TypeError.
402 try:
403 subprocess.Popen([sys.executable, "-c", "pass"], "orange")
404 except TypeError:
405 pass
406 else:
407 self.fail("Expected TypeError")
408
409 #
410 # POSIX tests
411 #
412 if not mswindows:
413 def test_exceptions(self):
414 # catched & re-raised exceptions
415 try:
416 p = subprocess.Popen([sys.executable, "-c", ""],
417 cwd="/this/path/does/not/exist")
418 except OSError, e:
419 # The attribute child_traceback should contain "os.chdir"
420 # somewhere.
421 self.assertNotEqual(e.child_traceback.find("os.chdir"), -1)
422 else:
423 self.fail("Expected OSError")
424
425 def test_run_abort(self):
426 # returncode handles signal termination
427 p = subprocess.Popen([sys.executable,
428 "-c", "import os; os.abort()"])
429 p.wait()
430 self.assertEqual(-p.returncode, signal.SIGABRT)
431
432 def test_preexec(self):
433 # preexec function
434 p = subprocess.Popen([sys.executable, "-c",
435 'import sys,os;' \
436 'sys.stdout.write(os.getenv("FRUIT"))'],
437 stdout=subprocess.PIPE,
438 preexec_fn=lambda: os.putenv("FRUIT", "apple"))
439 self.assertEqual(p.stdout.read(), "apple")
440
441 def test_args_string(self):
442 # args is a string
443 f, fname = self.mkstemp()
444 os.write(f, "#!/bin/sh\n")
445 os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" %
446 sys.executable)
447 os.close(f)
448 os.chmod(fname, 0700)
449 p = subprocess.Popen(fname)
450 p.wait()
451 os.remove(fname)
452 self.assertEqual(p.returncode, 47)
453
454 def test_invalid_args(self):
455 # invalid arguments should raise ValueError
456 self.assertRaises(ValueError, subprocess.call,
457 [sys.executable,
458 "-c", "import sys; sys.exit(47)"],
459 startupinfo=47)
460 self.assertRaises(ValueError, subprocess.call,
461 [sys.executable,
462 "-c", "import sys; sys.exit(47)"],
463 creationflags=47)
464
465 def test_shell_sequence(self):
466 # Run command through the shell (sequence)
467 newenv = os.environ.copy()
468 newenv["FRUIT"] = "apple"
469 p = subprocess.Popen(["echo $FRUIT"], shell=1,
470 stdout=subprocess.PIPE,
471 env=newenv)
472 self.assertEqual(p.stdout.read().strip(), "apple")
473
474 def test_shell_string(self):
475 # Run command through the shell (string)
476 newenv = os.environ.copy()
477 newenv["FRUIT"] = "apple"
478 p = subprocess.Popen("echo $FRUIT", shell=1,
479 stdout=subprocess.PIPE,
480 env=newenv)
481 self.assertEqual(p.stdout.read().strip(), "apple")
482
483 def test_call_string(self):
484 # call() function with string argument on UNIX
485 f, fname = self.mkstemp()
486 os.write(f, "#!/bin/sh\n")
487 os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" %
488 sys.executable)
489 os.close(f)
490 os.chmod(fname, 0700)
491 rc = subprocess.call(fname)
492 os.remove(fname)
493 self.assertEqual(rc, 47)
494
495
496 #
497 # Windows tests
498 #
499 if mswindows:
500 def test_startupinfo(self):
501 # startupinfo argument
502 # We uses hardcoded constants, because we do not want to
503 # depend on win32all.
504 STARTF_USESHOWWINDOW = 1
505 SW_MAXIMIZE = 3
506 startupinfo = subprocess.STARTUPINFO()
507 startupinfo.dwFlags = STARTF_USESHOWWINDOW
508 startupinfo.wShowWindow = SW_MAXIMIZE
509 # Since Python is a console process, it won't be affected
510 # by wShowWindow, but the argument should be silently
511 # ignored
512 subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
513 startupinfo=startupinfo)
514
515 def test_creationflags(self):
516 # creationflags argument
517 CREATE_NEW_CONSOLE = 16
518 sys.stderr.write(" a DOS box should flash briefly ...\n")
519 subprocess.call(sys.executable +
520 ' -c "import time; time.sleep(0.25)"',
521 creationflags=CREATE_NEW_CONSOLE)
522
523 def test_invalid_args(self):
524 # invalid arguments should raise ValueError
525 self.assertRaises(ValueError, subprocess.call,
526 [sys.executable,
527 "-c", "import sys; sys.exit(47)"],
528 preexec_fn=lambda: 1)
529 self.assertRaises(ValueError, subprocess.call,
530 [sys.executable,
531 "-c", "import sys; sys.exit(47)"],
532 close_fds=True)
533
534 def test_shell_sequence(self):
535 # Run command through the shell (sequence)
536 newenv = os.environ.copy()
537 newenv["FRUIT"] = "physalis"
538 p = subprocess.Popen(["set"], shell=1,
539 stdout=subprocess.PIPE,
540 env=newenv)
541 self.assertNotEqual(p.stdout.read().find("physalis"), -1)
542
543 def test_shell_string(self):
544 # Run command through the shell (string)
545 newenv = os.environ.copy()
546 newenv["FRUIT"] = "physalis"
547 p = subprocess.Popen("set", shell=1,
548 stdout=subprocess.PIPE,
549 env=newenv)
550 self.assertNotEqual(p.stdout.read().find("physalis"), -1)
551
552 def test_call_string(self):
553 # call() function with string argument on Windows
554 rc = subprocess.call(sys.executable +
555 ' -c "import sys; sys.exit(47)"')
556 self.assertEqual(rc, 47)
557
558
559def test_main():
560 test_support.run_unittest(ProcessTestCase)
561
562if __name__ == "__main__":
563 test_main()