Back to index

moin  1.9.0~rc2
SubProcess.py
Go to the documentation of this file.
00001 """
00002 Enhanced subprocess.Popen subclass, supporting:
00003     * .communicate() with timeout
00004     * kill/terminate/send_signal (like in Py 2.6) for Py 2.4 / 2.5
00005 
00006 Sample usage:
00007     out, err = Popen(...).communicate(input, timeout=300)
00008 """
00009 
00010 import os
00011 import time
00012 import subprocess
00013 import threading
00014 import signal
00015 
00016 if subprocess.mswindows:
00017     try:
00018         # Python >= 2.6 should have this:
00019         from _subprocess import TerminateProcess
00020     except ImportError:
00021         # otherwise you need win32 extensions:
00022         from win32process import TerminateProcess
00023 else:
00024     import select
00025 
00026 class Popen(subprocess.Popen):
00027     # send_signal, terminate, kill copied from Python 2.6
00028     # (we want to support Python >= 2.4)
00029     if subprocess.mswindows:
00030         def send_signal(self, sig):
00031             """Send a signal to the process
00032             """
00033             if sig == signal.SIGTERM:
00034                 self.terminate()
00035             else:
00036                 raise ValueError("Only SIGTERM is supported on Windows")
00037 
00038         def terminate(self):
00039             """Terminates the process
00040             """
00041             TerminateProcess(self._handle, 1)
00042 
00043         kill = terminate
00044 
00045     else: # POSIX
00046         def send_signal(self, sig):
00047             """Send a signal to the process
00048             """
00049             os.kill(self.pid, sig)
00050 
00051         def terminate(self):
00052             """Terminate the process with SIGTERM
00053             """
00054             self.send_signal(signal.SIGTERM)
00055 
00056         def kill(self):
00057             """Kill the process with SIGKILL
00058             """
00059             self.send_signal(signal.SIGKILL)
00060 
00061     def communicate(self, input=None, timeout=None):
00062         """Interact with process: Send data to stdin.  Read data from
00063         stdout and stderr, until end-of-file is reached.  Wait for
00064         process to terminate.  The optional input argument should be a
00065         string to be sent to the child process, or None, if no data
00066         should be sent to the child.
00067 
00068         communicate() returns a tuple (stdout, stderr)."""
00069 
00070         self.timeout = timeout
00071 
00072         # Optimization: If we are only using one pipe, or no pipe at
00073         # all, using select() or threads is unnecessary.
00074         if [self.stdin, self.stdout, self.stderr].count(None) >= 2:
00075             stdout = None
00076             stderr = None
00077             if self.stdin:
00078                 if input:
00079                     self._fo_write_no_intr(self.stdin, input)
00080                 self.stdin.close()
00081             elif self.stdout:
00082                 stdout = self._fo_read_no_intr(self.stdout)
00083                 self.stdout.close()
00084             elif self.stderr:
00085                 stderr = self._fo_read_no_intr(self.stderr)
00086                 self.stderr.close()
00087             self.wait()
00088             return (stdout, stderr)
00089 
00090         return self._communicate(input)
00091 
00092     if subprocess.mswindows:
00093         def _communicate(self, input):
00094             stdout = None # Return
00095             stderr = None # Return
00096 
00097             if self.stdout:
00098                 stdout = []
00099                 stdout_thread = threading.Thread(target=self._readerthread,
00100                                                  args=(self.stdout, stdout))
00101                 stdout_thread.setDaemon(True)
00102                 stdout_thread.start()
00103             if self.stderr:
00104                 stderr = []
00105                 stderr_thread = threading.Thread(target=self._readerthread,
00106                                                  args=(self.stderr, stderr))
00107                 stderr_thread.setDaemon(True)
00108                 stderr_thread.start()
00109 
00110             if self.stdin:
00111                 if input is not None:
00112                     self.stdin.write(input)
00113                 self.stdin.close()
00114 
00115             if self.stdout:
00116                 stdout_thread.join(self.timeout)
00117             if self.stderr:
00118                 stderr_thread.join(self.timeout)
00119 
00120             # if the threads are still alive, that means the thread join timed out
00121             timed_out = (self.stdout and self.stdout_thread.isAlive() or
00122                          self.stderr and self.stderr_thread.isAlive())
00123             if timed_out:
00124                 self.kill()
00125             else:
00126                 self.wait()
00127 
00128             # All data exchanged.  Translate lists into strings.
00129             if stdout is not None:
00130                 stdout = stdout[0]
00131             if stderr is not None:
00132                 stderr = stderr[0]
00133 
00134             # Translate newlines, if requested.  We cannot let the file
00135             # object do the translation: It is based on stdio, which is
00136             # impossible to combine with select (unless forcing no
00137             # buffering).
00138             if self.universal_newlines and hasattr(file, 'newlines'):
00139                 if stdout:
00140                     stdout = self._translate_newlines(stdout)
00141                 if stderr:
00142                     stderr = self._translate_newlines(stderr)
00143 
00144             return (stdout, stderr)
00145 
00146     else: # POSIX
00147         def _communicate(self, input):
00148             timed_out = False
00149             read_set = []
00150             write_set = []
00151             stdout = None # Return
00152             stderr = None # Return
00153 
00154             if self.stdin:
00155                 # Flush stdio buffer.  This might block, if the user has
00156                 # been writing to .stdin in an uncontrolled fashion.
00157                 self.stdin.flush()
00158                 if input:
00159                     write_set.append(self.stdin)
00160                 else:
00161                     self.stdin.close()
00162             if self.stdout:
00163                 read_set.append(self.stdout)
00164                 stdout = []
00165             if self.stderr:
00166                 read_set.append(self.stderr)
00167                 stderr = []
00168 
00169             input_offset = 0
00170             while read_set or write_set:
00171                 try:
00172                     rlist, wlist, xlist = select.select(read_set, write_set, [], self.timeout)
00173                 except select.error, e:
00174                     if e.args[0] == errno.EINTR:
00175                         continue
00176                     raise
00177 
00178                 timed_out = not (rlist or wlist or xlist)
00179                 if timed_out:
00180                     break
00181 
00182                 if self.stdin in wlist:
00183                     # When select has indicated that the file is writable,
00184                     # we can write up to PIPE_BUF bytes without risk
00185                     # blocking.  POSIX defines PIPE_BUF >= 512
00186                     chunk = input[input_offset:input_offset + 512]
00187                     bytes_written = os.write(self.stdin.fileno(), chunk)
00188                     input_offset += bytes_written
00189                     if input_offset >= len(input):
00190                         self.stdin.close()
00191                         write_set.remove(self.stdin)
00192 
00193                 if self.stdout in rlist:
00194                     data = os.read(self.stdout.fileno(), 1024)
00195                     if data == "":
00196                         self.stdout.close()
00197                         read_set.remove(self.stdout)
00198                     stdout.append(data)
00199 
00200                 if self.stderr in rlist:
00201                     data = os.read(self.stderr.fileno(), 1024)
00202                     if data == "":
00203                         self.stderr.close()
00204                         read_set.remove(self.stderr)
00205                     stderr.append(data)
00206 
00207             # All data exchanged.  Translate lists into strings.
00208             if stdout is not None:
00209                 stdout = ''.join(stdout)
00210             if stderr is not None:
00211                 stderr = ''.join(stderr)
00212 
00213             # Translate newlines, if requested.  We cannot let the file
00214             # object do the translation: It is based on stdio, which is
00215             # impossible to combine with select (unless forcing no
00216             # buffering).
00217             if self.universal_newlines and hasattr(file, 'newlines'):
00218                 if stdout:
00219                     stdout = self._translate_newlines(stdout)
00220                 if stderr:
00221                     stderr = self._translate_newlines(stderr)
00222 
00223             if timed_out:
00224                 self.kill()
00225             else:
00226                 self.wait()
00227             return (stdout, stderr)
00228 
00229 
00230 def exec_cmd(cmd, input=None, timeout=None):
00231     p = Popen(cmd, shell=True,
00232               close_fds=not subprocess.mswindows,
00233               bufsize=1024,
00234               stdin=subprocess.PIPE,
00235               stdout=subprocess.PIPE,
00236               stderr=subprocess.PIPE)
00237     data, errors = p.communicate(input, timeout=timeout)
00238     return data, errors, p.returncode
00239 
00240 
00241 if __name__ == '__main__':
00242     print exec_cmd("python", "import time ; time.sleep(20) ; print 'never!' ;", timeout=10)
00243     print exec_cmd("python", "import time ; time.sleep(20) ; print '20s gone' ;")
00244