diff options
author | nkozlovskiy <nmk@ydb.tech> | 2023-10-02 18:57:38 +0300 |
---|---|---|
committer | nkozlovskiy <nmk@ydb.tech> | 2023-10-02 19:39:06 +0300 |
commit | 6295ef4d23465c11296e898b9dc4524ad9592b5d (patch) | |
tree | fc0c852877b2c52f365a1f6ed0710955844338c2 /contrib/deprecated/python/subprocess32/test_subprocess32.py | |
parent | de63c80b75948ecc13894854514d147840ff8430 (diff) | |
download | ydb-6295ef4d23465c11296e898b9dc4524ad9592b5d.tar.gz |
oss ydb: fix dstool building and test run
Diffstat (limited to 'contrib/deprecated/python/subprocess32/test_subprocess32.py')
-rw-r--r-- | contrib/deprecated/python/subprocess32/test_subprocess32.py | 2485 |
1 files changed, 2485 insertions, 0 deletions
diff --git a/contrib/deprecated/python/subprocess32/test_subprocess32.py b/contrib/deprecated/python/subprocess32/test_subprocess32.py new file mode 100644 index 0000000000..bd4276a936 --- /dev/null +++ b/contrib/deprecated/python/subprocess32/test_subprocess32.py @@ -0,0 +1,2485 @@ +import unittest +from test import test_support +import subprocess32 +subprocess = subprocess32 +import sys +try: + import ctypes +except ImportError: + ctypes = None +else: + import ctypes.util +import signal +import os +import errno +import tempfile +import textwrap +import time +try: + import threading +except ImportError: + threading = None +import re +#import sysconfig +import select +import shutil +try: + import gc +except ImportError: + gc = None +import pickle + +mswindows = (sys.platform == "win32") +yenv = ''' +import os +os.environ['Y_PYTHON_ENTRY_POINT'] = ':main' +''' + +# +# Depends on the following external programs: Python +# + +if mswindows: + SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), ' + 'os.O_BINARY);') +else: + SETBINARY = '' + + +try: + mkstemp = tempfile.mkstemp +except AttributeError: + # tempfile.mkstemp is not available + def mkstemp(): + """Replacement for mkstemp, calling mktemp.""" + fname = tempfile.mktemp() + return os.open(fname, os.O_RDWR|os.O_CREAT), fname + +try: + strip_python_stderr = test_support.strip_python_stderr +except AttributeError: + # Copied from the test.test_support module in 2.7. + def strip_python_stderr(stderr): + """Strip the stderr of a Python process from potential debug output + emitted by the interpreter. + + This will typically be run on the result of the communicate() method + of a subprocess.Popen object. + """ + stderr = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr).strip() + return stderr + +class BaseTestCase(unittest.TestCase): + def setUp(self): + os.environ['Y_PYTHON_ENTRY_POINT'] = ':main' + # Try to minimize the number of children we have so this test + # doesn't crash on some buildbots (Alphas in particular). + reap_children() + if not hasattr(unittest.TestCase, 'addCleanup'): + self._cleanups = [] + + def tearDown(self): + try: + for inst in subprocess._active: + inst.wait() + subprocess._cleanup() + self.assertFalse(subprocess._active, "subprocess._active not empty") + finally: + if self._use_our_own_cleanup_implementation: + self._doCleanups() + + if not hasattr(unittest.TestCase, 'assertIn'): + def assertIn(self, a, b, msg=None): + self.assert_((a in b), msg or ('%r not in %r' % (a, b))) + def assertNotIn(self, a, b, msg=None): + self.assert_((a not in b), msg or ('%r in %r' % (a, b))) + + if not hasattr(unittest.TestCase, 'skipTest'): + def skipTest(self, message): + """These will still fail but it'll be clear that it is okay.""" + self.fail('SKIPPED - %s\n' % (message,)) + + def _addCleanup(self, function, *args, **kwargs): + """Add a function, with arguments, to be called when the test is + completed. Functions added are called on a LIFO basis and are + called after tearDown on test failure or success. + + Unlike unittest2 or python 2.7, cleanups are not if setUp fails. + That is easier to implement in this subclass and is all we need. + """ + self._cleanups.append((function, args, kwargs)) + + def _doCleanups(self): + """Execute all cleanup functions. Normally called for you after + tearDown.""" + while self._cleanups: + function, args, kwargs = self._cleanups.pop() + try: + function(*args, **kwargs) + except KeyboardInterrupt: + raise + except: + pass + + _use_our_own_cleanup_implementation = False + if not hasattr(unittest.TestCase, 'addCleanup'): + _use_our_own_cleanup_implementation = True + addCleanup = _addCleanup + + def assertStderrEqual(self, stderr, expected, msg=None): + # In a debug build, stuff like "[6580 refs]" is printed to stderr at + # shutdown time. That frustrates tests trying to check stderr produced + # from a spawned Python process. + actual = strip_python_stderr(stderr) + # strip_python_stderr also strips whitespace, so we do too. + expected = expected.strip() + self.assertEqual(actual, expected, msg) + + +class PopenTestException(Exception): + pass + + +class PopenExecuteChildRaises(subprocess32.Popen): + """Popen subclass for testing cleanup of subprocess.PIPE filehandles when + _execute_child fails. + """ + def _execute_child(self, *args, **kwargs): + raise PopenTestException("Forced Exception for Test") + + +class ProcessTestCase(BaseTestCase): + + def test_call_seq(self): + # call() function with sequence argument + rc = subprocess.call([sys.executable, "-c", yenv + + "import sys; sys.exit(47)"]) + self.assertEqual(rc, 47) + + def test_call_timeout(self): + # call() function with timeout argument; we want to test that the child + # process gets killed when the timeout expires. If the child isn't + # killed, this call will deadlock since subprocess.call waits for the + # child. + self.assertRaises(subprocess.TimeoutExpired, subprocess.call, + [sys.executable, "-c", yenv + "while True: pass"], + timeout=0.1) + + def test_check_call_zero(self): + # check_call() function with zero return code + rc = subprocess.check_call([sys.executable, "-c", yenv + + "import sys; sys.exit(0)"]) + self.assertEqual(rc, 0) + + def test_check_call_nonzero(self): + # check_call() function with non-zero return code + try: + subprocess.check_call([sys.executable, "-c", yenv + + "import sys; sys.exit(47)"]) + except subprocess.CalledProcessError, c: + self.assertEqual(c.returncode, 47) + + def test_check_output(self): + # check_output() function with zero return code + output = subprocess.check_output( + [sys.executable, "-c", yenv + "print 'BDFL'"]) + self.assertIn('BDFL', output) + + def test_check_output_nonzero(self): + # check_call() function with non-zero return code + try: + subprocess.check_output( + [sys.executable, "-c", yenv + "import sys; sys.exit(5)"]) + except subprocess.CalledProcessError, c: + self.assertEqual(c.returncode, 5) + + def test_check_output_stderr(self): + # check_output() function stderr redirected to stdout + output = subprocess.check_output( + [sys.executable, "-c", yenv + "import sys; sys.stderr.write('BDFL')"], + stderr=subprocess.STDOUT) + self.assertIn('BDFL', output) + + def test_check_output_stdout_arg(self): + # check_output() function stderr redirected to stdout + try: + output = subprocess.check_output( + [sys.executable, "-c", yenv + "print 'will not be run'"], + stdout=sys.stdout) + self.fail("Expected ValueError when stdout arg supplied.") + except ValueError, c: + self.assertIn('stdout', c.args[0]) + + def test_check_output_timeout(self): + # check_output() function with timeout arg + try: + output = subprocess.check_output( + [sys.executable, "-c", yenv + + "import sys; sys.stdout.write('BDFL')\n" + "sys.stdout.flush()\n" + "while True: pass"], + timeout=0.5) + except subprocess.TimeoutExpired, exception: + self.assertEqual(exception.output, 'BDFL') + else: + self.fail("Expected TimeoutExpired.") + + def test_call_kwargs(self): + # call() function with keyword args + newenv = os.environ.copy() + newenv["FRUIT"] = "banana" + rc = subprocess.call([sys.executable, "-c", yenv + + 'import sys, os;' + 'sys.exit(os.getenv("FRUIT")=="banana")'], + env=newenv) + self.assertEqual(rc, 1) + + def test_stdin_none(self): + # .stdin is None when not redirected + p = subprocess.Popen([sys.executable, "-c", yenv + 'print "banana"'], + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + p.wait() + self.assertEqual(p.stdin, None) + + def test_stdout_none(self): + # .stdout is None when not redirected, and the child's stdout will + # be inherited from the parent. In order to test this we run a + # subprocess in a subprocess: + # this_test + # \-- subprocess created by this test (parent) + # \-- subprocess created by the parent subprocess (child) + # The parent doesn't specify stdout, so the child will use the + # parent's stdout. This test checks that the message printed by the + # child goes to the parent stdout. The parent also checks that the + # child's stdout is None. See #11963. + code = ('import sys; from subprocess32 import Popen, PIPE;' + 'p = Popen([sys.executable, "-c", "print \'test_stdout_none\'"],' + ' stdin=PIPE, stderr=PIPE);' + 'p.wait(); assert p.stdout is None;') + p = subprocess.Popen([sys.executable, "-c", yenv + code], + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + self.addCleanup(p.stdout.close) + self.addCleanup(p.stderr.close) + out, err = p.communicate() + self.assertEqual(p.returncode, 0, err) + self.assertEqual(out.rstrip(), 'test_stdout_none') + + def test_stderr_none(self): + # .stderr is None when not redirected + p = subprocess.Popen([sys.executable, "-c", yenv + 'print "banana"'], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + p.wait() + self.assertEqual(p.stderr, None) + + # For use in the test_cwd* tests below. + def _normalize_cwd(self, cwd): + # Normalize an expected cwd (for Tru64 support). + # We can't use os.path.realpath since it doesn't expand Tru64 {memb} + # strings. See bug #1063571. + original_cwd = os.getcwd() + os.chdir(cwd) + cwd = os.getcwd() + os.chdir(original_cwd) + return cwd + + # For use in the test_cwd* tests below. + def _split_python_path(self): + # Return normalized (python_dir, python_base). + python_path = os.path.realpath(sys.executable) + return os.path.split(python_path) + + # For use in the test_cwd* tests below. + def _assert_cwd(self, expected_cwd, python_arg, **kwargs): + # Invoke Python via Popen, and assert that (1) the call succeeds, + # and that (2) the current working directory of the child process + # matches *expected_cwd*. + p = subprocess.Popen([python_arg, "-c", yenv + + "import os, sys; " + "sys.stdout.write(os.getcwd()); " + "sys.exit(47)"], + stdout=subprocess.PIPE, + **kwargs) + self.addCleanup(p.stdout.close) + p.wait() + self.assertEqual(47, p.returncode) + normcase = os.path.normcase + self.assertEqual(normcase(expected_cwd), + normcase(p.stdout.read().decode("utf-8"))) + + def test_cwd(self): + # Check that cwd changes the cwd for the child process. + temp_dir = tempfile.gettempdir() + temp_dir = self._normalize_cwd(temp_dir) + self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir) + + if not mswindows: # pending resolution of issue #15533 + def test_cwd_with_relative_arg(self): + # Check that Popen looks for args[0] relative to cwd if args[0] + # is relative. + python_dir, python_base = self._split_python_path() + rel_python = os.path.join(os.curdir, python_base) + + path = 'tempcwd' + saved_dir = os.getcwd() + os.mkdir(path) + try: + os.chdir(path) + wrong_dir = os.getcwd() + # Before calling with the correct cwd, confirm that the call fails + # without cwd and with the wrong cwd. + self.assertRaises(OSError, subprocess.Popen, + [rel_python]) + self.assertRaises(OSError, subprocess.Popen, + [rel_python], cwd=wrong_dir) + python_dir = self._normalize_cwd(python_dir) + self._assert_cwd(python_dir, rel_python, cwd=python_dir) + finally: + os.chdir(saved_dir) + shutil.rmtree(path) + + def test_cwd_with_relative_executable(self): + # Check that Popen looks for executable relative to cwd if executable + # is relative (and that executable takes precedence over args[0]). + python_dir, python_base = self._split_python_path() + rel_python = os.path.join(os.curdir, python_base) + doesntexist = "somethingyoudonthave" + + path = 'tempcwd' + saved_dir = os.getcwd() + os.mkdir(path) + try: + os.chdir(path) + wrong_dir = os.getcwd() + # Before calling with the correct cwd, confirm that the call fails + # without cwd and with the wrong cwd. + self.assertRaises(OSError, subprocess.Popen, + [doesntexist], executable=rel_python) + self.assertRaises(OSError, subprocess.Popen, + [doesntexist], executable=rel_python, + cwd=wrong_dir) + python_dir = self._normalize_cwd(python_dir) + self._assert_cwd(python_dir, doesntexist, executable=rel_python, + cwd=python_dir) + finally: + os.chdir(saved_dir) + shutil.rmtree(path) + + def test_cwd_with_absolute_arg(self): + # Check that Popen can find the executable when the cwd is wrong + # if args[0] is an absolute path. + python_dir, python_base = self._split_python_path() + abs_python = os.path.join(python_dir, python_base) + rel_python = os.path.join(os.curdir, python_base) + wrong_dir = tempfile.mkdtemp() + wrong_dir = os.path.realpath(wrong_dir) + try: + # Before calling with an absolute path, confirm that using a + # relative path fails. + self.assertRaises(OSError, subprocess.Popen, + [rel_python], cwd=wrong_dir) + wrong_dir = self._normalize_cwd(wrong_dir) + self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir) + finally: + shutil.rmtree(wrong_dir) + + def test_executable_with_cwd(self): + python_dir, python_base = self._split_python_path() + python_dir = self._normalize_cwd(python_dir) + self._assert_cwd(python_dir, "somethingyoudonthave", + executable=sys.executable, cwd=python_dir) + + #@unittest.skipIf(sysconfig.is_python_build(), + # "need an installed Python. See #7774") + #def test_executable_without_cwd(self): + # # For a normal installation, it should work without 'cwd' + # # argument. For test runs in the build directory, see #7774. + # self._assert_cwd('', "somethingyoudonthave", executable=sys.executable) + + def test_stdin_pipe(self): + # stdin redirection + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys; sys.exit(sys.stdin.read() == "pear")'], + stdin=subprocess.PIPE) + p.stdin.write("pear") + p.stdin.close() + p.wait() + self.assertEqual(p.returncode, 1) + + def test_stdin_filedes(self): + # stdin is set to open file descriptor + tf = tempfile.TemporaryFile() + d = tf.fileno() + os.write(d, "pear") + os.lseek(d, 0, 0) + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys; sys.exit(sys.stdin.read() == "pear")'], + stdin=d) + p.wait() + self.assertEqual(p.returncode, 1) + + def test_stdin_fileobj(self): + # stdin is set to open file object + tf = tempfile.TemporaryFile() + tf.write("pear") + tf.seek(0) + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys; sys.exit(sys.stdin.read() == "pear")'], + stdin=tf) + p.wait() + self.assertEqual(p.returncode, 1) + + def test_stdout_pipe(self): + # stdout redirection + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys; sys.stdout.write("orange")'], + stdout=subprocess.PIPE) + self.assertEqual(p.stdout.read(), "orange") + + def test_stdout_filedes(self): + # stdout is set to open file descriptor + tf = tempfile.TemporaryFile() + d = tf.fileno() + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys; sys.stdout.write("orange")'], + stdout=d) + p.wait() + os.lseek(d, 0, 0) + self.assertEqual(os.read(d, 1024), "orange") + + def test_stdout_fileobj(self): + # stdout is set to open file object + tf = tempfile.TemporaryFile() + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys; sys.stdout.write("orange")'], + stdout=tf) + p.wait() + tf.seek(0) + self.assertEqual(tf.read(), "orange") + + def test_stderr_pipe(self): + # stderr redirection + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys; sys.stderr.write("strawberry")'], + stderr=subprocess.PIPE) + self.assertStderrEqual(p.stderr.read(), "strawberry") + + def test_stderr_filedes(self): + # stderr is set to open file descriptor + tf = tempfile.TemporaryFile() + d = tf.fileno() + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys; sys.stderr.write("strawberry")'], + stderr=d) + p.wait() + os.lseek(d, 0, 0) + self.assertStderrEqual(os.read(d, 1024), "strawberry") + + def test_stderr_fileobj(self): + # stderr is set to open file object + tf = tempfile.TemporaryFile() + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys; sys.stderr.write("strawberry")'], + stderr=tf) + p.wait() + tf.seek(0) + self.assertStderrEqual(tf.read(), "strawberry") + + def test_stderr_redirect_with_no_stdout_redirect(self): + # test stderr=STDOUT while stdout=None (not set) + + # - grandchild prints to stderr + # - child redirects grandchild's stderr to its stdout + # - the parent should get grandchild's stderr in child's stdout + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys, subprocess32 as subprocess;' + 'rc = subprocess.call([sys.executable, "-c",' + ' "import sys;"' + ' "sys.stderr.write(\'42\')"],' + ' stderr=subprocess.STDOUT);' + 'sys.exit(rc)'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + #NOTE: stdout should get stderr from grandchild + self.assertStderrEqual(stdout, '42') + self.assertStderrEqual(stderr, '') # should be empty + self.assertEqual(p.returncode, 0) + + def test_stdout_stderr_pipe(self): + # capture stdout and stderr to the same pipe + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys;' + 'sys.stdout.write("apple");' + 'sys.stdout.flush();' + 'sys.stderr.write("orange")'], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + self.assertStderrEqual(p.stdout.read(), "appleorange") + + def test_stdout_stderr_file(self): + # capture stdout and stderr to the same open file + tf = tempfile.TemporaryFile() + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys;' + 'sys.stdout.write("apple");' + 'sys.stdout.flush();' + 'sys.stderr.write("orange")'], + stdout=tf, + stderr=tf) + p.wait() + tf.seek(0) + self.assertStderrEqual(tf.read(), "appleorange") + + def test_stdout_filedes_of_stdout(self): + # stdout is set to 1 (#1531862). + # To avoid printing the text on stdout, we do something similar to + # test_stdout_none (see above). The parent subprocess calls the child + # subprocess passing stdout=1, and this test uses stdout=PIPE in + # order to capture and check the output of the parent. See #11963. + code = ('import sys, subprocess32; ' + 'rc = subprocess32.call([sys.executable, "-c", ' + ' "import os, sys; sys.exit(os.write(sys.stdout.fileno(), ' + '\'test with stdout=1\'))"], stdout=1); ' + 'assert rc == 18') + p = subprocess.Popen([sys.executable, "-c", yenv + code], + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + self.addCleanup(p.stdout.close) + self.addCleanup(p.stderr.close) + out, err = p.communicate() + self.assertEqual(p.returncode, 0, err) + self.assertEqual(out.rstrip(), 'test with stdout=1') + + def test_stdout_devnull(self): + p = subprocess.Popen([sys.executable, "-c", yenv + + 'for i in range(10240):' + 'print("x" * 1024)'], + stdout=subprocess.DEVNULL) + p.wait() + self.assertEqual(p.stdout, None) + + def test_stderr_devnull(self): + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys\n' + 'for i in range(10240):' + 'sys.stderr.write("x" * 1024)'], + stderr=subprocess.DEVNULL) + p.wait() + self.assertEqual(p.stderr, None) + + def test_stdin_devnull(self): + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys;' + 'sys.stdin.read(1)'], + stdin=subprocess.DEVNULL) + p.wait() + self.assertEqual(p.stdin, None) + + def test_env(self): + newenv = os.environ.copy() + newenv["FRUIT"] = "orange" + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys,os;' + 'sys.stdout.write(os.getenv("FRUIT"))'], + stdout=subprocess.PIPE, + env=newenv) + try: + stdout, stderr = p.communicate() + self.assertEqual(stdout, "orange") + finally: + p.__exit__(None, None, None) + + def test_empty_env(self): + """test_empty_env() - verify that env={} is as empty as possible.""" + + def is_env_var_to_ignore(n): + """Determine if an environment variable is under our control.""" + # This excludes some __CF_* and VERSIONER_* keys MacOS insists + # on adding even when the environment in exec is empty. + # Gentoo sandboxes also force LD_PRELOAD and SANDBOX_* to exist. + return ('VERSIONER' in n or '__CF' in n or # MacOS + n == 'LD_PRELOAD' or n.startswith('SANDBOX')) # Gentoo + + p = subprocess.Popen( + [sys.executable, '-c', + 'import os; print(list(os.environ.keys()))'], + stdout=subprocess.PIPE, env={'Y_PYTHON_ENTRY_POINT': ':main'}) + try: + stdout, stderr = p.communicate() + child_env_names = eval(stdout.strip()) + self.assertTrue(isinstance(child_env_names, list), + msg=repr(child_env_names)) + child_env_names = [k for k in child_env_names + if not is_env_var_to_ignore(k)] + self.assertEqual(child_env_names, []) + finally: + p.__exit__(None, None, None) + + def test_communicate_stdin(self): + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys;' + 'sys.exit(sys.stdin.read() == "pear")'], + stdin=subprocess.PIPE) + p.communicate("pear") + self.assertEqual(p.returncode, 1) + + def test_communicate_stdout(self): + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys; sys.stdout.write("pineapple")'], + stdout=subprocess.PIPE) + (stdout, stderr) = p.communicate() + self.assertEqual(stdout, "pineapple") + self.assertEqual(stderr, None) + + def test_communicate_stderr(self): + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys; sys.stderr.write("pineapple")'], + stderr=subprocess.PIPE) + (stdout, stderr) = p.communicate() + self.assertEqual(stdout, None) + self.assertStderrEqual(stderr, "pineapple") + + def test_communicate(self): + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys,os;' + 'sys.stderr.write("pineapple");' + 'sys.stdout.write(sys.stdin.read())'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + (stdout, stderr) = p.communicate("banana") + self.assertEqual(stdout, "banana") + self.assertStderrEqual(stderr, "pineapple") + + def test_communicate_timeout(self): + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys,os,time;' + 'sys.stderr.write("pineapple\\n");' + 'time.sleep(1);' + 'sys.stderr.write("pear\\n");' + 'sys.stdout.write(sys.stdin.read())'], + universal_newlines=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + self.assertRaises(subprocess.TimeoutExpired, p.communicate, u"banana", + timeout=0.3) + # Make sure we can keep waiting for it, and that we get the whole output + # after it completes. + (stdout, stderr) = p.communicate() + self.assertEqual(stdout, "banana") + self.assertStderrEqual(stderr.encode(), "pineapple\npear\n") + + def test_communicate_timeout_large_ouput(self): + # Test a expring timeout while the child is outputting lots of data. + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys,os,time;' + 'sys.stdout.write("a" * (64 * 1024));' + 'time.sleep(0.2);' + 'sys.stdout.write("a" * (64 * 1024));' + 'time.sleep(0.2);' + 'sys.stdout.write("a" * (64 * 1024));' + 'time.sleep(0.2);' + 'sys.stdout.write("a" * (64 * 1024));'], + stdout=subprocess.PIPE) + self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4) + (stdout, _) = p.communicate() + self.assertEqual(len(stdout), 4 * 64 * 1024) + + # Test for the fd leak reported in http://bugs.python.org/issue2791. + def test_communicate_pipe_fd_leak(self): + for stdin_pipe in (False, True): + for stdout_pipe in (False, True): + for stderr_pipe in (False, True): + options = {} + if stdin_pipe: + options['stdin'] = subprocess.PIPE + if stdout_pipe: + options['stdout'] = subprocess.PIPE + if stderr_pipe: + options['stderr'] = subprocess.PIPE + if not options: + continue + p = subprocess.Popen((sys.executable, "-c", yenv + "pass"), **options) + p.communicate() + if p.stdin is not None: + self.assertTrue(p.stdin.closed) + if p.stdout is not None: + self.assertTrue(p.stdout.closed) + if p.stderr is not None: + self.assertTrue(p.stderr.closed) + + def test_communicate_returns(self): + # communicate() should return None if no redirection is active + p = subprocess.Popen([sys.executable, "-c", yenv + + "import sys; sys.exit(47)"]) + (stdout, stderr) = p.communicate() + self.assertEqual(stdout, None) + self.assertEqual(stderr, None) + + def test_communicate_pipe_buf(self): + # communicate() with writes larger than pipe_buf + # This test will probably deadlock rather than fail, if + # communicate() does not work properly. + x, y = os.pipe() + if mswindows: + pipe_buf = 512 + else: + pipe_buf = os.fpathconf(x, "PC_PIPE_BUF") + os.close(x) + os.close(y) + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys,os;' + 'sys.stdout.write(sys.stdin.read(47));' + 'sys.stderr.write("xyz"*%d);' + 'sys.stdout.write(sys.stdin.read())' % pipe_buf], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + string_to_write = "abc"*pipe_buf + (stdout, stderr) = p.communicate(string_to_write) + self.assertEqual(stdout, string_to_write) + + def test_writes_before_communicate(self): + # stdin.write before communicate() + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys,os;' + 'sys.stdout.write(sys.stdin.read())'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + p.stdin.write("banana") + (stdout, stderr) = p.communicate("split") + self.assertEqual(stdout, "bananasplit") + self.assertStderrEqual(stderr, "") + + def test_universal_newlines(self): + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys,os;' + SETBINARY + + 'sys.stdout.write("line1\\n");' + 'sys.stdout.flush();' + 'sys.stdout.write("line2\\r");' + 'sys.stdout.flush();' + 'sys.stdout.write("line3\\r\\n");' + 'sys.stdout.flush();' + 'sys.stdout.write("line4\\r");' + 'sys.stdout.flush();' + 'sys.stdout.write("\\nline5");' + 'sys.stdout.flush();' + 'sys.stdout.write("\\nline6");'], + stdout=subprocess.PIPE, + universal_newlines=1) + stdout = p.stdout.read() + if hasattr(file, 'newlines'): + # Interpreter with universal newline support + self.assertEqual(stdout, + "line1\nline2\nline3\nline4\nline5\nline6") + else: + # Interpreter without universal newline support + self.assertEqual(stdout, + "line1\nline2\rline3\r\nline4\r\nline5\nline6") + + def test_universal_newlines_communicate(self): + # universal newlines through communicate() + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys,os;' + SETBINARY + + 'sys.stdout.write("line1\\n");' + 'sys.stdout.flush();' + 'sys.stdout.write("line2\\r");' + 'sys.stdout.flush();' + 'sys.stdout.write("line3\\r\\n");' + 'sys.stdout.flush();' + 'sys.stdout.write("line4\\r");' + 'sys.stdout.flush();' + 'sys.stdout.write("\\nline5");' + 'sys.stdout.flush();' + 'sys.stdout.write("\\nline6");'], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=1) + (stdout, stderr) = p.communicate() + if hasattr(file, 'newlines'): + # Interpreter with universal newline support + self.assertEqual(stdout, + "line1\nline2\nline3\nline4\nline5\nline6") + else: + # Interpreter without universal newline support + self.assertEqual(stdout, + "line1\nline2\rline3\r\nline4\r\nline5\nline6") + + def test_universal_newlines_communicate_input_none(self): + # Test communicate(input=None) with universal newlines. + # + # We set stdout to PIPE because, as of this writing, a different + # code path is tested when the number of pipes is zero or one. + p = subprocess.Popen([sys.executable, "-c", yenv + "pass"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + universal_newlines=True) + p.communicate() + self.assertEqual(p.returncode, 0) + + def test_no_leaking(self): + # Make sure we leak no resources + if not hasattr(test_support, "is_resource_enabled") \ + or test_support.is_resource_enabled("subprocess") and not mswindows: + max_handles = 1026 # too much for most UNIX systems + else: + max_handles = 65 + for i in range(max_handles): + p = subprocess.Popen([sys.executable, "-c", yenv + + "import sys;sys.stdout.write(sys.stdin.read())"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + data = p.communicate("lime")[0] + self.assertEqual(data, "lime") + + def test_universal_newlines_communicate_stdin_stdout_stderr(self): + # universal newlines through communicate(), with stdin, stdout, stderr + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys,os;' + SETBINARY + '''\nif True: + s = sys.stdin.readline() + sys.stdout.write(s) + sys.stdout.write("line2\\r") + sys.stderr.write("eline2\\n") + s = sys.stdin.read() + sys.stdout.write(s) + sys.stdout.write("line4\\n") + sys.stdout.write("line5\\r\\n") + sys.stderr.write("eline6\\r") + sys.stderr.write("eline7\\r\\nz") + '''], + stdin=subprocess.PIPE, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + universal_newlines=True) + self.addCleanup(p.stdout.close) + self.addCleanup(p.stderr.close) + (stdout, stderr) = p.communicate(u"line1\nline3\n") + self.assertEqual(p.returncode, 0) + self.assertEqual(u"line1\nline2\nline3\nline4\nline5\n", stdout) + # Python debug build push something like "[42442 refs]\n" + # to stderr at exit of subprocess. + # Don't use assertStderrEqual because it strips CR and LF from output. + self.assertTrue(stderr.startswith(u"eline2\neline6\neline7\n")) + + def test_list2cmdline(self): + self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']), + '"a b c" d e') + self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']), + 'ab\\"c \\ d') + self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']), + 'ab\\"c " \\\\" d') + self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']), + 'a\\\\\\b "de fg" h') + self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']), + 'a\\\\\\"b c d') + self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']), + '"a\\\\b c" d e') + self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']), + '"a\\\\b\\ c" d e') + self.assertEqual(subprocess.list2cmdline(['ab', '']), + 'ab ""') + + + def test_poll(self): + p = subprocess.Popen([sys.executable, + "-c", yenv + "import time; time.sleep(1)"]) + count = 0 + while p.poll() is None: + time.sleep(0.1) + count += 1 + # We expect that the poll loop probably went around about 10 times, + # but, based on system scheduling we can't control, it's possible + # poll() never returned None. It "should be" very rare that it + # didn't go around at least twice. + self.assert_(count >= 2) + # Subsequent invocations should just return the returncode + self.assertEqual(p.poll(), 0) + + + def test_wait(self): + p = subprocess.Popen([sys.executable, + "-c", yenv + "import time; time.sleep(2)"]) + self.assertEqual(p.wait(), 0) + # Subsequent invocations should just return the returncode + self.assertEqual(p.wait(), 0) + + + def test_wait_timeout(self): + p = subprocess.Popen([sys.executable, + "-c", yenv + "import time; time.sleep(0.1)"]) + try: + p.wait(timeout=0.01) + except subprocess.TimeoutExpired, e: + self.assertIn("0.01", str(e)) # For coverage of __str__. + else: + self.fail("subprocess.TimeoutExpired expected but not raised.") + self.assertEqual(p.wait(timeout=2), 0) + + + def test_invalid_bufsize(self): + # an invalid type of the bufsize argument should raise + # TypeError. + try: + subprocess.Popen([sys.executable, "-c", yenv + "pass"], "orange") + except TypeError: + pass + + def test_leaking_fds_on_error(self): + # see bug #5179: Popen leaks file descriptors to PIPEs if + # the child fails to execute; this will eventually exhaust + # the maximum number of open fds. 1024 seems a very common + # value for that limit, but Windows has 2048, so we loop + # 1024 times (each call leaked two fds). + for i in range(1024): + # Windows raises IOError. Others raise OSError. + try: + subprocess.Popen(['nonexisting_i_hope'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + except EnvironmentError, c: + if c.errno != 2: # ignore "no such file" + raise + + #@unittest.skipIf(threading is None, "threading required") + def test_threadsafe_wait(self): + """Issue21291: Popen.wait() needs to be threadsafe for returncode.""" + proc = subprocess.Popen([sys.executable, '-c', yenv + + 'import time; time.sleep(12)']) + self.assertEqual(proc.returncode, None) + results = [] + + def kill_proc_timer_thread(): + results.append(('thread-start-poll-result', proc.poll())) + # terminate it from the thread and wait for the result. + proc.kill() + proc.wait() + results.append(('thread-after-kill-and-wait', proc.returncode)) + # this wait should be a no-op given the above. + proc.wait() + results.append(('thread-after-second-wait', proc.returncode)) + + # This is a timing sensitive test, the failure mode is + # triggered when both the main thread and this thread are in + # the wait() call at once. The delay here is to allow the + # main thread to most likely be blocked in its wait() call. + t = threading.Timer(0.2, kill_proc_timer_thread) + t.start() + + if mswindows: + expected_errorcode = 1 + else: + # Should be -9 because of the proc.kill() from the thread. + expected_errorcode = -9 + + # Wait for the process to finish; the thread should kill it + # long before it finishes on its own. Supplying a timeout + # triggers a different code path for better coverage. + proc.wait(timeout=20) + self.assertEqual(proc.returncode, expected_errorcode, + msg="unexpected result in wait from main thread") + + # This should be a no-op with no change in returncode. + proc.wait() + self.assertEqual(proc.returncode, expected_errorcode, + msg="unexpected result in second main wait.") + + t.join() + # Ensure that all of the thread results are as expected. + # When a race condition occurs in wait(), the returncode could + # be set by the wrong thread that doesn't actually have it + # leading to an incorrect value. + self.assertEqual([('thread-start-poll-result', None), + ('thread-after-kill-and-wait', expected_errorcode), + ('thread-after-second-wait', expected_errorcode)], + results) + + def test_issue8780(self): + # Ensure that stdout is inherited from the parent + # if stdout=PIPE is not used + code = ';'.join(( + 'import subprocess32, sys', + 'retcode = subprocess32.call(' + "[sys.executable, '-c', 'print(\"Hello World!\")'])", + 'assert retcode == 0')) + output = subprocess.check_output([sys.executable, '-c', yenv + code]) + self.assert_(output.startswith('Hello World!'), output) + + def test_communicate_epipe(self): + # Issue 10963: communicate() should hide EPIPE + p = subprocess.Popen([sys.executable, "-c", yenv + 'pass'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + self.addCleanup(p.stdout.close) + self.addCleanup(p.stderr.close) + self.addCleanup(p.stdin.close) + p.communicate(b"x" * 2**20) + + def test_communicate_epipe_only_stdin(self): + # Issue 10963: communicate() should hide EPIPE + p = subprocess.Popen([sys.executable, "-c", yenv + 'pass'], + stdin=subprocess.PIPE) + self.addCleanup(p.stdin.close) + p.wait() + p.communicate(b"x" * 2**20) + + if not mswindows: # Signal tests are POSIX specific. + def test_communicate_eintr(self): + # Issue #12493: communicate() should handle EINTR + def handler(signum, frame): + pass + old_handler = signal.signal(signal.SIGALRM, handler) + self.addCleanup(signal.signal, signal.SIGALRM, old_handler) + + # the process is running for 2 seconds + args = [sys.executable, "-c", yenv + 'import time; time.sleep(2)'] + for stream in ('stdout', 'stderr'): + kw = {stream: subprocess.PIPE} + process = subprocess.Popen(args, **kw) + try: + signal.alarm(1) + # communicate() will be interrupted by SIGALRM + process.communicate() + finally: + process.__exit__(None, None, None) + + + # This test is Linux-ish specific for simplicity to at least have + # some coverage. It is not a platform specific bug. + #@unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), + # "Linux specific") + def test_failed_child_execute_fd_leak(self): + """Test for the fork() failure fd leak reported in issue16327.""" + if not os.path.isdir('/proc/%d/fd' % os.getpid()): + self.skipTest("Linux specific") + fd_directory = '/proc/%d/fd' % os.getpid() + fds_before_popen = os.listdir(fd_directory) + try: + PopenExecuteChildRaises( + [sys.executable, '-c', yenv + 'pass'], stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + except PopenTestException: + pass # Yay! Because 2.4 doesn't support with statements. + else: + self.fail("PopenTestException expected but not raised.") + + # NOTE: This test doesn't verify that the real _execute_child + # does not close the file descriptors itself on the way out + # during an exception. Code inspection has confirmed that. + + fds_after_exception = os.listdir(fd_directory) + self.assertEqual(fds_before_popen, fds_after_exception) + + +class RunFuncTestCase(BaseTestCase): + def run_python(self, code, **kwargs): + """Run Python code in a subprocess using subprocess.run""" + argv = [sys.executable, "-c", yenv + code] + return subprocess.run(argv, **kwargs) + + def test_returncode(self): + # call() function with sequence argument + cp = self.run_python("import sys; sys.exit(47)") + self.assertEqual(cp.returncode, 47) + try: + cp.check_returncode() + except subprocess.CalledProcessError: + pass + else: + self.fail("CalledProcessError not raised") + + def test_check(self): + try: + self.run_python("import sys; sys.exit(47)", check=True) + except subprocess.CalledProcessError, exception: + self.assertEqual(exception.returncode, 47) + else: + self.fail("CalledProcessError not raised") + + def test_check_zero(self): + # check_returncode shouldn't raise when returncode is zero + cp = self.run_python("import sys; sys.exit(0)", check=True) + self.assertEqual(cp.returncode, 0) + + def test_timeout(self): + # run() function with timeout argument; we want to test that the child + # process gets killed when the timeout expires. If the child isn't + # killed, this call will deadlock since subprocess.run waits for the + # child. + try: + self.run_python("while True: pass", timeout=0.0001) + except subprocess.TimeoutExpired: + pass + else: + self.fail("TimeoutExpired not raised") + + def test_capture_stdout(self): + # capture stdout with zero return code + cp = self.run_python("print('BDFL')", stdout=subprocess.PIPE) + self.assertIn('BDFL', cp.stdout) + + def test_capture_stderr(self): + cp = self.run_python("import sys; sys.stderr.write('BDFL')", + stderr=subprocess.PIPE) + self.assertIn('BDFL', cp.stderr) + + def test_check_output_stdin_arg(self): + # run() can be called with stdin set to a file + tf = tempfile.TemporaryFile() + self.addCleanup(tf.close) + tf.write('pear') + tf.seek(0) + cp = self.run_python( + "import sys; sys.stdout.write(sys.stdin.read().upper())", + stdin=tf, stdout=subprocess.PIPE) + self.assertIn('PEAR', cp.stdout) + + def test_check_output_input_arg(self): + # check_output() can be called with input set to a string + cp = self.run_python( + "import sys; sys.stdout.write(sys.stdin.read().upper())", + input='pear', stdout=subprocess.PIPE) + self.assertIn('PEAR', cp.stdout) + + def test_check_output_stdin_with_input_arg(self): + # run() refuses to accept 'stdin' with 'input' + tf = tempfile.TemporaryFile() + self.addCleanup(tf.close) + tf.write('pear') + tf.seek(0) + try: + output = self.run_python("print('will not be run')", + stdin=tf, input='hare') + except ValueError, exception: + self.assertIn('stdin', exception.args[0]) + self.assertIn('input', exception.args[0]) + else: + self.fail("Expected ValueError when stdin and input args supplied.") + + def test_check_output_timeout(self): + try: + cp = self.run_python(( + "import sys, time\n" + "sys.stdout.write('BDFL')\n" + "sys.stdout.flush()\n" + "time.sleep(3600)"), + # Some heavily loaded buildbots (sparc Debian 3.x) require + # this much time to start and print. + timeout=3, stdout=subprocess.PIPE) + except subprocess.TimeoutExpired, exception: + self.assertEqual(exception.output, 'BDFL') + # output is aliased to stdout + self.assertEqual(exception.stdout, 'BDFL') + else: + self.fail("TimeoutExpired not raised") + + def test_run_kwargs(self): + newenv = os.environ.copy() + newenv["FRUIT"] = "banana" + cp = self.run_python(('import sys, os;' + 'os.getenv("FRUIT")=="banana" and sys.exit(33) or sys.exit(31)'), + env=newenv) + self.assertEqual(cp.returncode, 33) + + +# context manager +class _SuppressCoreFiles(object): + """Try to prevent core files from being created.""" + old_limit = None + + def __enter__(self): + """Try to save previous ulimit, then set it to (0, 0).""" + try: + import resource + self.old_limit = resource.getrlimit(resource.RLIMIT_CORE) + resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) + except (ImportError, ValueError, resource.error): + pass + + def __exit__(self, *args): + """Return core file behavior to default.""" + if self.old_limit is None: + return + try: + import resource + resource.setrlimit(resource.RLIMIT_CORE, self.old_limit) + except (ImportError, ValueError, resource.error): + pass + + +#@unittest.skipIf(mswindows, "POSIX specific tests") +class POSIXProcessTestCase(BaseTestCase): + + def setUp(self): + BaseTestCase.setUp(self) + self._nonexistent_dir = "/_this/pa.th/does/not/exist" + + def _get_chdir_exception(self): + try: + os.chdir(self._nonexistent_dir) + except OSError, e: + # This avoids hard coding the errno value or the OS perror() + # string and instead capture the exception that we want to see + # below for comparison. + desired_exception = e + desired_exception.strerror += ': ' + repr(self._nonexistent_dir) + else: + self.fail("chdir to nonexistant directory %s succeeded." % + self._nonexistent_dir) + return desired_exception + + def test_exception_cwd(self): + """Test error in the child raised in the parent for a bad cwd.""" + desired_exception = self._get_chdir_exception() + try: + p = subprocess.Popen([sys.executable, "-c", yenv + ""], + cwd=self._nonexistent_dir) + except OSError, e: + # Test that the child process chdir failure actually makes + # it up to the parent process as the correct exception. + self.assertEqual(desired_exception.errno, e.errno) + self.assertEqual(desired_exception.strerror, e.strerror) + else: + self.fail("Expected OSError: %s" % desired_exception) + + def test_exception_bad_executable(self): + """Test error in the child raised in the parent for a bad executable.""" + desired_exception = self._get_chdir_exception() + try: + p = subprocess.Popen([sys.executable, "-c", yenv + ""], + executable=self._nonexistent_dir) + except OSError, e: + # Test that the child process exec failure actually makes + # it up to the parent process as the correct exception. + self.assertEqual(desired_exception.errno, e.errno) + self.assertEqual(desired_exception.strerror, e.strerror) + else: + self.fail("Expected OSError: %s" % desired_exception) + + def test_exception_bad_args_0(self): + """Test error in the child raised in the parent for a bad args[0].""" + desired_exception = self._get_chdir_exception() + try: + p = subprocess.Popen([self._nonexistent_dir, "-c", yenv + ""]) + except OSError, e: + # Test that the child process exec failure actually makes + # it up to the parent process as the correct exception. + self.assertEqual(desired_exception.errno, e.errno) + self.assertEqual(desired_exception.strerror, e.strerror) + else: + self.fail("Expected OSError: %s" % desired_exception) + + #@unittest.skipIf(not os.path.exists('/proc/self/status')) + def test_restore_signals(self): + if not os.path.exists('/proc/self/status'): + print("SKIP - Functional test requires /proc/self/status.") + return + # Blindly assume that cat exists on systems with /proc/self/status... + default_proc_status = subprocess.check_output( + ['cat', '/proc/self/status'], + restore_signals=False) + for line in default_proc_status.splitlines(): + if line.startswith(b'SigIgn'): + default_sig_ign_mask = line + break + else: + self.skipTest("SigIgn not found in /proc/self/status.") + restored_proc_status = subprocess.check_output( + ['cat', '/proc/self/status'], + restore_signals=True) + for line in restored_proc_status.splitlines(): + if line.startswith(b'SigIgn'): + restored_sig_ign_mask = line + break + # restore_signals=True should've unblocked SIGPIPE and friends. + self.assertNotEqual(default_sig_ign_mask, restored_sig_ign_mask) + + def test_start_new_session(self): + # For code coverage of calling setsid(). We don't care if we get an + # EPERM error from it depending on the test execution environment, that + # still indicates that it was called. + try: + output = subprocess.check_output( + [sys.executable, "-c", yenv + + "import os; print(os.getpgid(os.getpid()))"], + start_new_session=True) + except OSError, e: + if e.errno != errno.EPERM: + raise + else: + parent_pgid = os.getpgid(os.getpid()) + child_pgid = int(output) + self.assertNotEqual(parent_pgid, child_pgid) + + def test_run_abort(self): + # returncode handles signal termination + scf = _SuppressCoreFiles() + scf.__enter__() + try: + p = subprocess.Popen([sys.executable, "-c", yenv + + "import os; os.abort()"]) + p.wait() + finally: + scf.__exit__() + self.assertEqual(-p.returncode, signal.SIGABRT) + + def test_preexec(self): + # DISCLAIMER: Setting environment variables is *not* a good use + # of a preexec_fn. This is merely a test. + p = subprocess.Popen([sys.executable, "-c", yenv + + "import sys, os;" + "sys.stdout.write(os.getenv('FRUIT'))"], + stdout=subprocess.PIPE, + preexec_fn=lambda: os.putenv("FRUIT", "apple")) + self.assertEqual(p.stdout.read(), "apple") + + def test_preexec_exception(self): + def raise_it(): + raise ValueError("What if two swallows carried a coconut?") + try: + p = subprocess.Popen([sys.executable, "-c", yenv + ""], + preexec_fn=raise_it) + except RuntimeError, e: + self.assertTrue( + subprocess._posixsubprocess, + "Expected a ValueError from the preexec_fn") + except ValueError, e: + self.assertIn("coconut", e.args[0]) + else: + self.fail("Exception raised by preexec_fn did not make it " + "to the parent process.") + + class _TestExecuteChildPopen(subprocess.Popen): + """Used to test behavior at the end of _execute_child.""" + def __init__(self, testcase, *args, **kwargs): + self._testcase = testcase + subprocess.Popen.__init__(self, *args, **kwargs) + + def _execute_child(self, *args, **kwargs): + try: + subprocess.Popen._execute_child(self, *args, **kwargs) + finally: + # Open a bunch of file descriptors and verify that + # none of them are the same as the ones the Popen + # instance is using for stdin/stdout/stderr. + devzero_fds = [os.open("/dev/zero", os.O_RDONLY) + for _ in range(8)] + try: + for fd in devzero_fds: + self._testcase.assertNotIn( + fd, (self.stdin.fileno(), self.stdout.fileno(), + self.stderr.fileno()), + msg="At least one fd was closed early.") + finally: + map(os.close, devzero_fds) + + #@unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.") + def test_preexec_errpipe_does_not_double_close_pipes(self): + """Issue16140: Don't double close pipes on preexec error.""" + + def raise_it(): + raise RuntimeError("force the _execute_child() errpipe_data path.") + + try: + self._TestExecuteChildPopen( + self, [sys.executable, "-c", yenv + "pass"], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, preexec_fn=raise_it) + except RuntimeError: + pass # Yay! Because 2.4 doesn't support with statements. + else: + self.fail("RuntimeError expected but not raised.") + + #@unittest.skipUnless(gc, "Requires a gc module.") + def test_preexec_gc_module_failure(self): + # This tests the code that disables garbage collection if the child + # process will execute any Python. + def raise_runtime_error(): + raise RuntimeError("this shouldn't escape") + enabled = gc.isenabled() + orig_gc_disable = gc.disable + orig_gc_isenabled = gc.isenabled + try: + gc.disable() + self.assertFalse(gc.isenabled()) + subprocess.call([sys.executable, '-c', yenv + ''], + preexec_fn=lambda: None) + self.assertFalse(gc.isenabled(), + "Popen enabled gc when it shouldn't.") + + gc.enable() + self.assertTrue(gc.isenabled()) + subprocess.call([sys.executable, '-c', yenv + ''], + preexec_fn=lambda: None) + self.assertTrue(gc.isenabled(), "Popen left gc disabled.") + + gc.disable = raise_runtime_error + self.assertRaises(RuntimeError, subprocess.Popen, + [sys.executable, '-c', yenv + ''], + preexec_fn=lambda: None) + + del gc.isenabled # force an AttributeError + self.assertRaises(AttributeError, subprocess.Popen, + [sys.executable, '-c', yenv + ''], + preexec_fn=lambda: None) + finally: + gc.disable = orig_gc_disable + gc.isenabled = orig_gc_isenabled + if not enabled: + gc.disable() + + def test_args_string(self): + # args is a string + f, fname = mkstemp() + os.write(f, "#!/bin/sh\n") + os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % + sys.executable) + os.close(f) + os.chmod(fname, 0700) + p = subprocess.Popen(fname) + p.wait() + os.remove(fname) + self.assertEqual(p.returncode, 47) + + def test_invalid_args(self): + # invalid arguments should raise ValueError + self.assertRaises(ValueError, subprocess.call, + [sys.executable, "-c", yenv + + "import sys; sys.exit(47)"], + startupinfo=47) + self.assertRaises(ValueError, subprocess.call, + [sys.executable, "-c", yenv + + "import sys; sys.exit(47)"], + creationflags=47) + + def test_shell_sequence(self): + # Run command through the shell (sequence) + newenv = os.environ.copy() + newenv["FRUIT"] = "apple" + p = subprocess.Popen(["echo $FRUIT"], shell=1, + stdout=subprocess.PIPE, + env=newenv) + self.assertEqual(p.stdout.read().strip(), "apple") + + def test_shell_string(self): + # Run command through the shell (string) + newenv = os.environ.copy() + newenv["FRUIT"] = "apple" + p = subprocess.Popen("echo $FRUIT", shell=1, + stdout=subprocess.PIPE, + env=newenv) + self.assertEqual(p.stdout.read().strip(), "apple") + + def test_call_string(self): + # call() function with string argument on UNIX + f, fname = mkstemp() + os.write(f, "#!/bin/sh\n") + os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" % + sys.executable) + os.close(f) + os.chmod(fname, 0700) + rc = subprocess.call(fname) + os.remove(fname) + self.assertEqual(rc, 47) + + def test_specific_shell(self): + # Issue #9265: Incorrect name passed as arg[0]. + shells = [] + for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']: + for name in ['bash', 'ksh']: + sh = os.path.join(prefix, name) + if os.path.isfile(sh): + shells.append(sh) + if not shells: # Will probably work for any shell but csh. + self.skipTest("bash or ksh required for this test") + sh = '/bin/sh' + if os.path.isfile(sh) and not os.path.islink(sh): + # Test will fail if /bin/sh is a symlink to csh. + shells.append(sh) + for sh in shells: + p = subprocess.Popen("echo $0", executable=sh, shell=True, + stdout=subprocess.PIPE) + self.assertEqual(p.stdout.read().strip(), sh) + + def _kill_process(self, method, *args): + # Do not inherit file handles from the parent. + # It should fix failures on some platforms. + p = subprocess.Popen([sys.executable, "-c", yenv + """if 1: + import sys, time + sys.stdout.write('x\\n') + sys.stdout.flush() + time.sleep(30) + """], + close_fds=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + # Wait for the interpreter to be completely initialized before + # sending any signal. + p.stdout.read(1) + getattr(p, method)(*args) + return p + + def test_send_signal(self): + p = self._kill_process('send_signal', signal.SIGINT) + _, stderr = p.communicate() + self.assertIn('KeyboardInterrupt', stderr) + self.assertNotEqual(p.wait(), 0) + + def test_kill(self): + p = self._kill_process('kill') + _, stderr = p.communicate() + self.assertStderrEqual(stderr, '') + self.assertEqual(p.wait(), -signal.SIGKILL) + + def test_terminate(self): + p = self._kill_process('terminate') + _, stderr = p.communicate() + self.assertStderrEqual(stderr, '') + self.assertEqual(p.wait(), -signal.SIGTERM) + + def check_close_std_fds(self, fds): + # Issue #9905: test that subprocess pipes still work properly with + # some standard fds closed + stdin = 0 + newfds = [] + for a in fds: + b = os.dup(a) + newfds.append(b) + if a == 0: + stdin = b + try: + for fd in fds: + os.close(fd) + out, err = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys;' + 'sys.stdout.write("apple");' + 'sys.stdout.flush();' + 'sys.stderr.write("orange")'], + stdin=stdin, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE).communicate() + err = strip_python_stderr(err) + self.assertEqual((out, err), ('apple', 'orange')) + finally: + for b, a in zip(newfds, fds): + os.dup2(b, a) + for b in newfds: + os.close(b) + + def test_close_fd_0(self): + self.check_close_std_fds([0]) + + def test_close_fd_1(self): + self.check_close_std_fds([1]) + + def test_close_fd_2(self): + self.check_close_std_fds([2]) + + def test_close_fds_0_1(self): + self.check_close_std_fds([0, 1]) + + def test_close_fds_0_2(self): + self.check_close_std_fds([0, 2]) + + def test_close_fds_1_2(self): + self.check_close_std_fds([1, 2]) + + def test_close_fds_0_1_2(self): + # Issue #10806: test that subprocess pipes still work properly with + # all standard fds closed. + self.check_close_std_fds([0, 1, 2]) + + def check_swap_fds(self, stdin_no, stdout_no, stderr_no): + # open up some temporary files + temps = [mkstemp() for i in range(3)] + temp_fds = [fd for fd, fname in temps] + try: + # unlink the files -- we won't need to reopen them + for fd, fname in temps: + os.unlink(fname) + + # save a copy of the standard file descriptors + saved_fds = [os.dup(fd) for fd in range(3)] + try: + # duplicate the temp files over the standard fd's 0, 1, 2 + for fd, temp_fd in enumerate(temp_fds): + os.dup2(temp_fd, fd) + + # write some data to what will become stdin, and rewind + os.write(stdin_no, "STDIN") + os.lseek(stdin_no, 0, 0) + + # now use those files in the given order, so that subprocess + # has to rearrange them in the child + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys; got = sys.stdin.read();' + 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], + stdin=stdin_no, + stdout=stdout_no, + stderr=stderr_no) + p.wait() + + for fd in temp_fds: + os.lseek(fd, 0, 0) + + out = os.read(stdout_no, 1024) + err = os.read(stderr_no, 1024) + finally: + for std, saved in enumerate(saved_fds): + os.dup2(saved, std) + os.close(saved) + + self.assertEqual(out, "got STDIN") + self.assertStderrEqual(err, "err") + + finally: + for fd in temp_fds: + os.close(fd) + + # When duping fds, if there arises a situation where one of the fds is + # either 0, 1 or 2, it is possible that it is overwritten (#12607). + # This tests all combinations of this. + def test_swap_fds(self): + self.check_swap_fds(0, 1, 2) + self.check_swap_fds(0, 2, 1) + self.check_swap_fds(1, 0, 2) + self.check_swap_fds(1, 2, 0) + self.check_swap_fds(2, 0, 1) + self.check_swap_fds(2, 1, 0) + + def test_small_errpipe_write_fd(self): + """Issue #15798: Popen should work when stdio fds are available.""" + new_stdin = os.dup(0) + new_stdout = os.dup(1) + try: + os.close(0) + os.close(1) + + subprocess.Popen([ + sys.executable, "-c", yenv + "pass"]).wait() + finally: + # Restore original stdin and stdout + os.dup2(new_stdin, 0) + os.dup2(new_stdout, 1) + os.close(new_stdin) + os.close(new_stdout) + + def test_remapping_std_fds(self): + # open up some temporary files + temps = [mkstemp() for i in range(3)] + try: + temp_fds = [fd for fd, fname in temps] + + # unlink the files -- we won't need to reopen them + for fd, fname in temps: + os.unlink(fname) + + # write some data to what will become stdin, and rewind + os.write(temp_fds[1], "STDIN") + os.lseek(temp_fds[1], 0, 0) + + # move the standard file descriptors out of the way + saved_fds = [os.dup(fd) for fd in range(3)] + try: + # duplicate the file objects over the standard fd's + for fd, temp_fd in enumerate(temp_fds): + os.dup2(temp_fd, fd) + + # now use those files in the "wrong" order, so that subprocess + # has to rearrange them in the child + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys; got = sys.stdin.read();' + 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], + stdin=temp_fds[1], + stdout=temp_fds[2], + stderr=temp_fds[0]) + p.wait() + finally: + # restore the original fd's underneath sys.stdin, etc. + for std, saved in enumerate(saved_fds): + os.dup2(saved, std) + os.close(saved) + + for fd in temp_fds: + os.lseek(fd, 0, 0) + + out = os.read(temp_fds[2], 1024) + err = os.read(temp_fds[0], 1024) + self.assertEqual(out, "got STDIN") + self.assertStderrEqual(err, "err") + + finally: + for fd in temp_fds: + os.close(fd) + + # NOTE: test_surrogates_error_message makes no sense on python 2.x. omitted. + # NOTE: test_undecodable_env makes no sense on python 2.x. omitted. + # NOTE: test_bytes_program makes no sense on python 2.x. omitted. + + if sys.version_info[:2] >= (2,7): + # Disabling this test on 2.6 and earlier as it fails on Travis CI regardless + # of LANG=C being set and is not worth the time to figure out why in such a + # legacy environment.. + # https://travis-ci.org/google/python-subprocess32/jobs/290065729 + def test_fs_encode_unicode_error(self): + fs_encoding = sys.getfilesystemencoding() + if fs_encoding.upper() not in ("ANSI_X3.4-1968", "ASCII"): + self.skipTest( + "Requires a restictive sys.filesystemencoding(), " + "not %s. Run python with LANG=C" % fs_encoding) + highbit_executable_name = os.path.join( + test_support.findfile("testdata"), u"Does\\Not\uDCff\\Exist") + try: + subprocess.call([highbit_executable_name]) + except UnicodeEncodeError: + return + except RuntimeError, e: + # The ProcessTestCasePOSIXPurePython version ends up here. It + # can't re-construct the unicode error from the child because it + # doesn't have all the arguments. BFD. One doesn't use + # subprocess32 for the old pure python implementation... + if "UnicodeEncodeError" not in str(e): + self.fail("Expected a RuntimeError whining about how a " + "UnicodeEncodeError from the child could not " + "be reraised. Not: %s" % e) + return + self.fail("Expected a UnicodeEncodeError to be raised.") + + def test_pipe_cloexec(self): + sleeper = test_support.findfile("testdata/input_reader.py") + fd_status = test_support.findfile("testdata/fd_status.py") + + p1 = subprocess.Popen([sys.executable, sleeper], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, close_fds=False) + + self.addCleanup(p1.communicate, '') + + p2 = subprocess.Popen([sys.executable, fd_status], + stdout=subprocess.PIPE, close_fds=False) + + output, error = p2.communicate() + result_fds = set(map(int, output.split(','))) + unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(), + p1.stderr.fileno()]) + + self.assertFalse(result_fds & unwanted_fds, + "Expected no fds from %r to be open in child, " + "found %r" % + (unwanted_fds, result_fds & unwanted_fds)) + + def test_pipe_cloexec_real_tools(self): + qcat = test_support.findfile("testdata/qcat.py") + qgrep = test_support.findfile("testdata/qgrep.py") + + subdata = 'zxcvbn' + data = subdata * 4 + '\n' + + p1 = subprocess.Popen([sys.executable, qcat], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + close_fds=False) + + p2 = subprocess.Popen([sys.executable, qgrep, subdata], + stdin=p1.stdout, stdout=subprocess.PIPE, + close_fds=False) + + self.addCleanup(p1.wait) + self.addCleanup(p2.wait) + def kill_p1(): + try: + p1.terminate() + except ProcessLookupError: + pass + def kill_p2(): + try: + p2.terminate() + except ProcessLookupError: + pass + self.addCleanup(kill_p1) + self.addCleanup(kill_p2) + + p1.stdin.write(data) + p1.stdin.close() + + readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10) + + self.assertTrue(readfiles, "The child hung") + self.assertEqual(p2.stdout.read(), data) + + p1.stdout.close() + p2.stdout.close() + + def test_close_fds(self): + fd_status = test_support.findfile("testdata/fd_status.py") + + fds = os.pipe() + self.addCleanup(os.close, fds[0]) + self.addCleanup(os.close, fds[1]) + + open_fds = set(fds) + # add a bunch more fds + for _ in range(9): + fd = os.open("/dev/null", os.O_RDONLY) + self.addCleanup(os.close, fd) + open_fds.add(fd) + + p = subprocess.Popen([sys.executable, fd_status], + stdout=subprocess.PIPE, close_fds=False) + output, ignored = p.communicate() + remaining_fds = set(map(int, output.split(','))) + + self.assertEqual(remaining_fds & open_fds, open_fds, + "Some fds were closed") + + p = subprocess.Popen([sys.executable, fd_status], + stdout=subprocess.PIPE, close_fds=True) + output, ignored = p.communicate() + remaining_fds = set(map(int, output.split(','))) + + self.assertFalse(remaining_fds & open_fds, + "Some fds were left open") + self.assertIn(1, remaining_fds, "Subprocess failed") + + # Keep some of the fd's we opened open in the subprocess. + # This tests _posixsubprocess.c's proper handling of fds_to_keep. + fds_to_keep = set(open_fds.pop() for _ in range(8)) + p = subprocess.Popen([sys.executable, fd_status], + stdout=subprocess.PIPE, close_fds=True, + pass_fds=()) + output, ignored = p.communicate() + remaining_fds = set(map(int, output.split(','))) + + self.assertFalse(remaining_fds & fds_to_keep & open_fds, + "Some fds not in pass_fds were left open") + self.assertIn(1, remaining_fds, "Subprocess failed") + + + def test_close_fds_when_max_fd_is_lowered(self): + """Confirm that issue21618 is fixed (may fail under valgrind).""" + fd_status = test_support.findfile("testdata/fd_status.py") + + open_fds = set() + # Add a bunch more fds to pass down. + for _ in range(40): + fd = os.open("/dev/null", os.O_RDONLY) + open_fds.add(fd) + + # Leave a two pairs of low ones available for use by the + # internal child error pipe and the stdout pipe. + # We also leave 10 more open for use by the Python 2 startup + # import machinery which tends to consume several at once. + for fd in sorted(open_fds)[:14]: + os.close(fd) + open_fds.remove(fd) + + for fd in open_fds: + self.addCleanup(os.close, fd) + + max_fd_open = max(open_fds) + + import resource + rlim_cur, rlim_max = resource.getrlimit(resource.RLIMIT_NOFILE) + try: + # 29 is lower than the highest fds we are leaving open. + resource.setrlimit(resource.RLIMIT_NOFILE, (29, rlim_max)) + # Launch a new Python interpreter with our low fd rlim_cur that + # inherits open fds above that limit. It then uses subprocess + # with close_fds=True to get a report of open fds in the child. + # An explicit list of fds to check is passed to fd_status.py as + # letting fd_status rely on its default logic would miss the + # fds above rlim_cur as it normally only checks up to that limit. + p = subprocess.Popen( + [sys.executable, '-c', yenv + + textwrap.dedent(""" + import subprocess32, sys + subprocess32.Popen([sys.executable, %(fd_status)r] + + [str(x) for x in range(%(max_fd)d)], + close_fds=True).wait() + """ % dict(fd_status=fd_status, max_fd=max_fd_open+1))], + stdout=subprocess.PIPE, close_fds=False) + finally: + resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_cur, rlim_max)) + + output, unused_stderr = p.communicate() + remaining_fds = set(map(int, output.strip().split(','))) + + self.assertFalse(remaining_fds & open_fds, + msg="Some fds were left open.") + + + def test_pass_fds(self): + fd_status = test_support.findfile("testdata/fd_status.py") + + open_fds = set() + + for x in range(5): + fds = os.pipe() + self.addCleanup(os.close, fds[0]) + self.addCleanup(os.close, fds[1]) + open_fds.update(fds) + + for fd in open_fds: + p = subprocess.Popen([sys.executable, fd_status], + stdout=subprocess.PIPE, close_fds=True, + pass_fds=(fd, )) + output, ignored = p.communicate() + + remaining_fds = set(map(int, output.split(','))) + to_be_closed = open_fds - set((fd,)) + + self.assertIn(fd, remaining_fds, "fd to be passed not passed") + self.assertFalse(remaining_fds & to_be_closed, + "fd to be closed passed") + + # Syntax requires Python 2.5, assertWarns requires Python 2.7. + #with self.assertWarns(RuntimeWarning) as context: + # self.assertFalse(subprocess.call( + # [sys.executable, "-c", yenv + "import sys; sys.exit(0)"], + # close_fds=False, pass_fds=(fd, ))) + #self.assertIn('overriding close_fds', str(context.warning)) + + def test_stdout_stdin_are_single_inout_fd(self): + inout = open(os.devnull, "r+") + try: + p = subprocess.Popen([sys.executable, "-c", yenv + "import sys; sys.exit(0)"], + stdout=inout, stdin=inout) + p.wait() + finally: + inout.close() + + def test_stdout_stderr_are_single_inout_fd(self): + inout = open(os.devnull, "r+") + try: + p = subprocess.Popen([sys.executable, "-c", yenv + "import sys; sys.exit(0)"], + stdout=inout, stderr=inout) + p.wait() + finally: + inout.close() + + def test_stderr_stdin_are_single_inout_fd(self): + inout = open(os.devnull, "r+") + try: + p = subprocess.Popen([sys.executable, "-c", yenv + "import sys; sys.exit(0)"], + stderr=inout, stdin=inout) + p.wait() + finally: + inout.close() + + def test_wait_when_sigchild_ignored(self): + # NOTE: sigchild_ignore.py may not be an effective test on all OSes. + sigchild_ignore = test_support.findfile("testdata/sigchild_ignore.py") + p = subprocess.Popen([sys.executable, sigchild_ignore], + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" + " non-zero with this error:\n%s" % stderr) + + def test_select_unbuffered(self): + # Issue #11459: bufsize=0 should really set the pipes as + # unbuffered (and therefore let select() work properly). + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys;' + 'sys.stdout.write("apple")'], + stdout=subprocess.PIPE, + bufsize=0) + f = p.stdout + self.addCleanup(f.close) + try: + self.assertEqual(f.read(4), "appl") + self.assertIn(f, select.select([f], [], [], 0.0)[0]) + finally: + p.wait() + + def test_zombie_fast_process_del(self): + # Issue #12650: on Unix, if Popen.__del__() was called before the + # process exited, it wouldn't be added to subprocess._active, and would + # remain a zombie. + # spawn a Popen, and delete its reference before it exits + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import sys, time;' + 'time.sleep(0.2)'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + self.addCleanup(p.stdout.close) + self.addCleanup(p.stderr.close) + ident = id(p) + pid = p.pid + del p + # check that p is in the active processes list + self.assertIn(ident, [id(o) for o in subprocess._active]) + + def test_leak_fast_process_del_killed(self): + # Issue #12650: on Unix, if Popen.__del__() was called before the + # process exited, and the process got killed by a signal, it would never + # be removed from subprocess._active, which triggered a FD and memory + # leak. + # spawn a Popen, delete its reference and kill it + p = subprocess.Popen([sys.executable, "-c", yenv + + 'import time;' + 'time.sleep(3)'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + self.addCleanup(p.stdout.close) + self.addCleanup(p.stderr.close) + ident = id(p) + pid = p.pid + del p + os.kill(pid, signal.SIGKILL) + # check that p is in the active processes list + self.assertIn(ident, [id(o) for o in subprocess._active]) + + # let some time for the process to exit, and create a new Popen: this + # should trigger the wait() of p + time.sleep(0.2) + try: + proc = subprocess.Popen(['nonexisting_i_hope'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + proc.__exit__(None, None, None) + except EnvironmentError: + pass + else: + self.fail("EnvironmentError not raised.") + # p should have been wait()ed on, and removed from the _active list + self.assertRaises(OSError, os.waitpid, pid, 0) + self.assertNotIn(ident, [id(o) for o in subprocess._active]) + + def test_close_fds_after_preexec(self): + fd_status = test_support.findfile("testdata/fd_status.py") + + # this FD is used as dup2() target by preexec_fn, and should be closed + # in the child process + fd = os.dup(1) + self.addCleanup(os.close, fd) + + p = subprocess.Popen([sys.executable, fd_status], + stdout=subprocess.PIPE, close_fds=True, + preexec_fn=lambda: os.dup2(1, fd)) + output, ignored = p.communicate() + + remaining_fds = set(map(int, output.split(','))) + + self.assertNotIn(fd, remaining_fds) + + def test_child_terminated_in_stopped_state(self): + """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335.""" + if not ctypes: + sys.stderr.write('ctypes module required.\n') + return + if not sys.executable: + self.stderr.write('Test requires sys.executable.\n') + return + PTRACE_TRACEME = 0 # From glibc and MacOS (PT_TRACE_ME). + libc_name = ctypes.util.find_library('c') + libc = ctypes.CDLL(libc_name) + if not hasattr(libc, 'ptrace'): + self.stderr.write('ptrace() required.\n') + return + test_ptrace = subprocess.Popen( + [sys.executable, '-c', yenv + """if True: + import ctypes + libc = ctypes.CDLL({libc_name!r}) + libc.ptrace({PTRACE_TRACEME}, 0, 0) + """.format(libc_name=libc_name, PTRACE_TRACEME=PTRACE_TRACEME) + ]) + if test_ptrace.wait() != 0: + sys.stderr.write('ptrace() failed - unable to test.\n') + return + child = subprocess.Popen( + [sys.executable, '-c', yenv + """if True: + import ctypes + libc = ctypes.CDLL({libc_name!r}) + libc.ptrace({PTRACE_TRACEME}, 0, 0) + libc.printf(ctypes.c_char_p(0xdeadbeef)) # Crash the process. + """.format(libc_name=libc_name, PTRACE_TRACEME=PTRACE_TRACEME) + ]) + try: + returncode = child.wait() + except Exception, e: + child.kill() # Clean up the hung stopped process. + raise e + self.assertNotEqual(0, returncode) + self.assert_(returncode < 0, msg=repr(returncode)) # signal death, likely SIGSEGV. + + +if mswindows: + class POSIXProcessTestCase(unittest.TestCase): pass + + +#@unittest.skipUnless(mswindows, "Windows specific tests") +class Win32ProcessTestCase(BaseTestCase): + + def test_startupinfo(self): + # startupinfo argument + # We uses hardcoded constants, because we do not want to + # depend on win32all. + STARTF_USESHOWWINDOW = 1 + SW_MAXIMIZE = 3 + startupinfo = subprocess.STARTUPINFO() + startupinfo.dwFlags = STARTF_USESHOWWINDOW + startupinfo.wShowWindow = SW_MAXIMIZE + # Since Python is a console process, it won't be affected + # by wShowWindow, but the argument should be silently + # ignored + subprocess.call([sys.executable, "-c", yenv + "import sys; sys.exit(0)"], + startupinfo=startupinfo) + + def test_creationflags(self): + # creationflags argument + CREATE_NEW_CONSOLE = 16 + sys.stderr.write(" a DOS box should flash briefly ...\n") + subprocess.call(sys.executable + + ' -c "import time; time.sleep(0.25)"', + creationflags=CREATE_NEW_CONSOLE) + + def test_invalid_args(self): + # invalid arguments should raise ValueError + self.assertRaises(ValueError, subprocess.call, + [sys.executable, "-c", yenv + + "import sys; sys.exit(47)"], + preexec_fn=lambda: 1) + self.assertRaises(ValueError, subprocess.call, + [sys.executable, "-c", yenv + + "import sys; sys.exit(47)"], + stdout=subprocess.PIPE, + close_fds=True) + + def test_close_fds(self): + # close file descriptors + rc = subprocess.call([sys.executable, "-c", yenv + + "import sys; sys.exit(47)"], + close_fds=True) + self.assertEqual(rc, 47) + + def test_shell_sequence(self): + # Run command through the shell (sequence) + newenv = os.environ.copy() + newenv["FRUIT"] = "physalis" + p = subprocess.Popen(["set"], shell=1, + stdout=subprocess.PIPE, + env=newenv) + self.assertIn("physalis", p.stdout.read()) + + def test_shell_string(self): + # Run command through the shell (string) + newenv = os.environ.copy() + newenv["FRUIT"] = "physalis" + p = subprocess.Popen("set", shell=1, + stdout=subprocess.PIPE, + env=newenv) + self.assertIn("physalis", p.stdout.read()) + + def test_call_string(self): + # call() function with string argument on Windows + rc = subprocess.call(sys.executable + + ' -c "import sys; sys.exit(47)"') + self.assertEqual(rc, 47) + + def _kill_process(self, method, *args): + # Some win32 buildbot raises EOFError if stdin is inherited + p = subprocess.Popen([sys.executable, "-c", yenv + "input()"], + stdin=subprocess.PIPE, stderr=subprocess.PIPE) + + # Let the process initialize (Issue #3137) + time.sleep(0.1) + # The process should not terminate prematurely + self.assert_(p.poll() is None) + # Retry if the process do not receive the signal. + count, maxcount = 0, 3 + while count < maxcount and p.poll() is None: + getattr(p, method)(*args) + time.sleep(0.1) + count += 1 + + returncode = p.poll() + self.assert_(returncode is not None, "the subprocess did not terminate") + if count > 1: + print >>sys.stderr, ("p.{}{} succeeded after " + "{} attempts".format(method, args, count)) + _, stderr = p.communicate() + self.assertStderrEqual(stderr, '') + self.assertEqual(p.wait(), returncode) + self.assertNotEqual(returncode, 0) + + def test_send_signal(self): + self._kill_process('send_signal', signal.SIGTERM) + + def test_kill(self): + self._kill_process('kill') + + def test_terminate(self): + self._kill_process('terminate') + + +if not mswindows: + class Win32ProcessTestCase(unittest.TestCase): pass + + +#@unittest.skipUnless(getattr(subprocess, '_has_poll', False), +# "poll system call not supported") +class ProcessTestCaseNoPoll(ProcessTestCase): + def setUp(self): + subprocess._has_poll = False + ProcessTestCase.setUp(self) + + def tearDown(self): + subprocess._has_poll = True + ProcessTestCase.tearDown(self) + + +if not getattr(subprocess, '_has_poll', False): + class ProcessTestCaseNoPoll(unittest.TestCase): pass + + +#@unittest.skipUnless(getattr(subprocess, '_posixsubprocess', False), +# "_posixsubprocess extension module not found.") +class ProcessTestCasePOSIXPurePython(ProcessTestCase, POSIXProcessTestCase): + def setUp(self): + subprocess._posixsubprocess = None + ProcessTestCase.setUp(self) + POSIXProcessTestCase.setUp(self) + + def tearDown(self): + subprocess._posixsubprocess = sys.modules['_posixsubprocess32'] + POSIXProcessTestCase.tearDown(self) + ProcessTestCase.tearDown(self) + + +class POSIXSubprocessModuleTestCase(unittest.TestCase): + def test_fork_exec_sorted_fd_sanity_check(self): + # Issue #23564: sanity check the fork_exec() fds_to_keep sanity check. + _posixsubprocess = subprocess._posixsubprocess + gc_enabled = gc.isenabled() + try: + gc.enable() + + for fds_to_keep in ( + (-1, 2, 3, 4, 5), # Negative number. + ('str', 4), # Not an int. + (18, 23, 42, 2**63), # Out of range. + (5, 4), # Not sorted. + (6, 7, 7, 8), # Duplicate. + ): + try: + _posixsubprocess.fork_exec( + ["false"], ["false"], + True, fds_to_keep, None, ["env"], + -1, -1, -1, -1, + 1, 2, 3, 4, + True, True, None) + except ValueError, exception: + self.assertTrue('fds_to_keep' in str(exception), + msg=str(exception)) + else: + self.fail("ValueError not raised, fds_to_keep=%s" % + (fds_to_keep,)) + finally: + if not gc_enabled: + gc.disable() + + def test_cloexec_pass_fds(self): + if not os.path.exists('/dev/null') or not os.path.isdir('/dev/fd'): + print("Skipped - This test requires /dev/null and /dev/fd/*.") + return + null_reader_proc = subprocess.Popen( + ["cat"], + stdin=open('/dev/null', 'rb'), + stdout=subprocess.PIPE) + try: + data = null_reader_proc.stdout + fd_name = '/dev/fd/%d' % data.fileno() + fd_reader_proc = subprocess.Popen( + ["cat", fd_name], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, # Capture any error from cat. + pass_fds=(data.fileno(),)) + try: + fddata = fd_reader_proc.stdout + self.assertEqual('', fddata.read()) + finally: + fd_reader_proc.wait() + finally: + null_reader_proc.wait() + + +if not getattr(subprocess, '_posixsubprocess', False): + print >>sys.stderr, "_posixsubprocess extension module not found." + class ProcessTestCasePOSIXPurePython(unittest.TestCase): pass + class POSIXSubprocessModuleTestCase(unittest.TestCase): pass + + +class HelperFunctionTests(unittest.TestCase): + #@unittest.skipIf(mswindows, "errno and EINTR make no sense on windows") + def test_eintr_retry_call(self): + record_calls = [] + def fake_os_func(*args): + record_calls.append(args) + if len(record_calls) == 2: + raise OSError(errno.EINTR, "fake interrupted system call") + return tuple(reversed(args)) + + self.assertEqual((999, 256), + subprocess._eintr_retry_call(fake_os_func, 256, 999)) + self.assertEqual([(256, 999)], record_calls) + # This time there will be an EINTR so it will loop once. + self.assertEqual((666,), + subprocess._eintr_retry_call(fake_os_func, 666)) + self.assertEqual([(256, 999), (666,), (666,)], record_calls) + + if mswindows: + del test_eintr_retry_call + + if not hasattr(unittest.TestCase, 'assertSequenceEqual'): + def assertSequenceEqual(self, seq1, seq2): + self.assertEqual(list(seq1), list(seq2)) + + def test_get_exec_path(self): + defpath_list = os.defpath.split(os.pathsep) + test_path = ['/monty', '/python', '', '/flying/circus'] + test_env = {'PATH': os.pathsep.join(test_path)} + + get_exec_path = subprocess._get_exec_path + saved_environ = os.environ + try: + os.environ = dict(test_env) + # Test that defaulting to os.environ works. + self.assertSequenceEqual(test_path, get_exec_path()) + self.assertSequenceEqual(test_path, get_exec_path(env=None)) + finally: + os.environ = saved_environ + + # No PATH environment variable + self.assertSequenceEqual(defpath_list, get_exec_path({})) + # Empty PATH environment variable + self.assertSequenceEqual(('',), get_exec_path({'PATH':''})) + # Supplied PATH environment variable + self.assertSequenceEqual(test_path, get_exec_path(test_env)) + + def test_args_from_interpreter_flags(self): + if sys.version_info[:2] < (2,6): + print "Skipped - only useful on 2.6 and higher." + return + # Mostly just to call it for code coverage. + args_list = subprocess32._args_from_interpreter_flags() + self.assertTrue(isinstance(args_list, list), msg=repr(args_list)) + + def test_timeout_expired_unpickling(self): + """https://github.com/google/python-subprocess32/issues/57""" + t = subprocess32.TimeoutExpired(['command', 'arg1'], 5, + output='stdout!', stderr='err') + t_pickled = pickle.dumps(t) + t2 = pickle.loads(t_pickled) + self.assertEqual(t.cmd, t2.cmd) + self.assertEqual(t.timeout, t2.timeout) + self.assertEqual(t.output, t2.output) + self.assertEqual(t.stderr, t2.stderr) + + def test_called_process_error_unpickling(self): + """https://github.com/google/python-subprocess32/issues/57""" + e = subprocess32.CalledProcessError( + 2, ['command', 'arg1'], output='stdout!', stderr='err') + e_pickled = pickle.dumps(e) + e2 = pickle.loads(e_pickled) + self.assertEqual(e.returncode, e2.returncode) + self.assertEqual(e.cmd, e2.cmd) + self.assertEqual(e.output, e2.output) + self.assertEqual(e.stderr, e2.stderr) + + +def reap_children(): + """Use this function at the end of test_main() whenever sub-processes + are started. This will help ensure that no extra children (zombies) + stick around to hog resources and create problems when looking + for refleaks. + """ + + # Reap all our dead child processes so we don't leave zombies around. + # These hog resources and might be causing some of the buildbots to die. + if hasattr(os, 'waitpid'): + any_process = -1 + while True: + try: + # This will raise an exception on Windows. That's ok. + pid, status = os.waitpid(any_process, os.WNOHANG) + if pid == 0: + break + except: + break + + + +class ContextManagerTests(BaseTestCase): + + def test_pipe(self): + proc = subprocess.Popen([sys.executable, "-c", yenv + + "import sys;" + "sys.stdout.write('stdout');" + "sys.stderr.write('stderr');"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + try: + self.assertEqual(proc.stdout.read(), "stdout") + self.assertStderrEqual(proc.stderr.read(), "stderr") + finally: + proc.__exit__(None, None, None) + + self.assertTrue(proc.stdout.closed) + self.assertTrue(proc.stderr.closed) + + def test_returncode(self): + proc = subprocess.Popen([sys.executable, "-c", yenv + + "import sys; sys.exit(100)"]) + proc.__exit__(None, None, None) + # __exit__ calls wait(), so the returncode should be set + self.assertEqual(proc.returncode, 100) + + def test_communicate_stdin(self): + proc = subprocess.Popen([sys.executable, "-c", yenv + + "import sys;" + "sys.exit(sys.stdin.read() == 'context')"], + stdin=subprocess.PIPE) + try: + proc.communicate("context") + self.assertEqual(proc.returncode, 1) + finally: + proc.__exit__(None, None, None) + + def test_invalid_args(self): + try: + proc = subprocess.Popen(['nonexisting_i_hope'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + proc.__exit__(None, None, None) + except EnvironmentError, exception: + # ignore errors that indicate the command was not found + if exception.errno not in (errno.ENOENT, errno.EACCES): + raise + else: + self.fail("Expected an EnvironmentError exception.") + + +if sys.version_info[:2] <= (2,4): + # The test suite hangs during the pure python test on 2.4. No idea why. + # That is not the implementation anyone is using this module for anyways. + class ProcessTestCasePOSIXPurePython(unittest.TestCase): pass + + +def main(): + unit_tests = (ProcessTestCase, + POSIXProcessTestCase, + POSIXSubprocessModuleTestCase, + Win32ProcessTestCase, + ProcessTestCasePOSIXPurePython, + ProcessTestCaseNoPoll, + HelperFunctionTests, + ContextManagerTests, + RunFuncTestCase, + ) + + test_support.run_unittest(*unit_tests) + reap_children() + +if __name__ == "__main__": + main() |