Back to index

enigmail  1.4.3
process.py
Go to the documentation of this file.
00001 """
00002 Skipping shell invocations is good, when possible. This wrapper around subprocess does dirty work of
00003 parsing command lines into argv and making sure that no shell magic is being used.
00004 """
00005 
00006 #TODO: ship pyprocessing?
00007 import multiprocessing, multiprocessing.dummy
00008 import subprocess, shlex, re, logging, sys, traceback, os, imp, glob
00009 # XXXkhuey Work around http://bugs.python.org/issue1731717
00010 subprocess._cleanup = lambda: None
00011 import command, util
00012 if sys.platform=='win32':
00013     import win32process
00014 
00015 _log = logging.getLogger('pymake.process')
00016 
00017 _escapednewlines = re.compile(r'\\\n')
00018 _blacklist = re.compile(r'[$><;[{~`|&()]')
00019 _needsglob = re.compile(r'[\*\?]')
00020 def clinetoargv(cline):
00021     """
00022     If this command line can safely skip the shell, return an argv array.
00023     @returns argv, badchar
00024     """
00025 
00026     str = _escapednewlines.sub('', cline)
00027     m = _blacklist.search(str)
00028     if m is not None:
00029         return None, m.group(0)
00030 
00031     args = shlex.split(str, comments=True)
00032 
00033     if len(args) and args[0].find('=') != -1:
00034         return None, '='
00035 
00036     return args, None
00037 
00038 def doglobbing(args, cwd):
00039     """
00040     Perform any needed globbing on the argument list passed in
00041     """
00042     globbedargs = []
00043     for arg in args:
00044         if _needsglob.search(arg):
00045             globbedargs.extend(glob.glob(os.path.join(cwd, arg)))
00046         else:
00047             globbedargs.append(arg)
00048 
00049     return globbedargs
00050 
00051 shellwords = (':', '.', 'break', 'cd', 'continue', 'exec', 'exit', 'export',
00052               'getopts', 'hash', 'pwd', 'readonly', 'return', 'shift', 
00053               'test', 'times', 'trap', 'umask', 'unset', 'alias',
00054               'set', 'bind', 'builtin', 'caller', 'command', 'declare',
00055               'echo', 'enable', 'help', 'let', 'local', 'logout', 
00056               'printf', 'read', 'shopt', 'source', 'type', 'typeset',
00057               'ulimit', 'unalias', 'set')
00058 
00059 def call(cline, env, cwd, loc, cb, context, echo, justprint=False):
00060     #TODO: call this once up-front somewhere and save the result?
00061     shell, msys = util.checkmsyscompat()
00062 
00063     shellreason = None
00064     if msys and cline.startswith('/'):
00065         shellreason = "command starts with /"
00066     else:
00067         argv, badchar = clinetoargv(cline)
00068         if argv is None:
00069             shellreason = "command contains shell-special character '%s'" % (badchar,)
00070         elif len(argv) and argv[0] in shellwords:
00071             shellreason = "command starts with shell primitive '%s'" % (argv[0],)
00072         else:
00073             argv = doglobbing(argv, cwd)
00074 
00075     if shellreason is not None:
00076         _log.debug("%s: using shell: %s: '%s'", loc, shellreason, cline)
00077         if msys:
00078             if len(cline) > 3 and cline[1] == ':' and cline[2] == '/':
00079                 cline = '/' + cline[0] + cline[2:]
00080             cline = [shell, "-c", cline]
00081         context.call(cline, shell=not msys, env=env, cwd=cwd, cb=cb, echo=echo,
00082                      justprint=justprint)
00083         return
00084 
00085     if not len(argv):
00086         cb(res=0)
00087         return
00088 
00089     if argv[0] == command.makepypath:
00090         command.main(argv[1:], env, cwd, cb)
00091         return
00092 
00093     if argv[0:2] == [sys.executable.replace('\\', '/'),
00094                      command.makepypath.replace('\\', '/')]:
00095         command.main(argv[2:], env, cwd, cb)
00096         return
00097 
00098     if argv[0].find('/') != -1:
00099         executable = util.normaljoin(cwd, argv[0])
00100     else:
00101         executable = None
00102 
00103     context.call(argv, executable=executable, shell=False, env=env, cwd=cwd, cb=cb,
00104                  echo=echo, justprint=justprint)
00105 
00106 def call_native(module, method, argv, env, cwd, loc, cb, context, echo, justprint=False,
00107                 pycommandpath=None):
00108     argv = doglobbing(argv, cwd)
00109     context.call_native(module, method, argv, env=env, cwd=cwd, cb=cb,
00110                         echo=echo, justprint=justprint, pycommandpath=pycommandpath)
00111 
00112 def statustoresult(status):
00113     """
00114     Convert the status returned from waitpid into a prettier numeric result.
00115     """
00116     sig = status & 0xFF
00117     if sig:
00118         return -sig
00119 
00120     return status >>8
00121 
00122 class Job(object):
00123     """
00124     A single job to be executed on the process pool.
00125     """
00126     done = False # set to true when the job completes
00127 
00128     def __init__(self):
00129         self.exitcode = -127
00130 
00131     def notify(self, condition, result):
00132         condition.acquire()
00133         self.done = True
00134         self.exitcode = result
00135         condition.notify()
00136         condition.release()
00137 
00138     def get_callback(self, condition):
00139         return lambda result: self.notify(condition, result)
00140 
00141 class PopenJob(Job):
00142     """
00143     A job that executes a command using subprocess.Popen.
00144     """
00145     def __init__(self, argv, executable, shell, env, cwd):
00146         Job.__init__(self)
00147         self.argv = argv
00148         self.executable = executable
00149         self.shell = shell
00150         self.env = env
00151         self.cwd = cwd
00152 
00153     def run(self):
00154         try:
00155             p = subprocess.Popen(self.argv, executable=self.executable, shell=self.shell, env=self.env, cwd=self.cwd)
00156             return p.wait()
00157         except OSError, e:
00158             print >>sys.stderr, e
00159             return -127
00160 
00161 class PythonException(Exception):
00162     def __init__(self, message, exitcode):
00163         Exception.__init__(self)
00164         self.message = message
00165         self.exitcode = exitcode
00166 
00167     def __str__(self):
00168         return self.message
00169 
00170 def load_module_recursive(module, path):
00171     """
00172     Emulate the behavior of __import__, but allow
00173     passing a custom path to search for modules.
00174     """
00175     bits = module.split('.')
00176     for i, bit in enumerate(bits):
00177         dotname = '.'.join(bits[:i+1])
00178         try:
00179           f, path, desc = imp.find_module(bit, path)
00180           m = imp.load_module(dotname, f, path, desc)
00181           if f is None:
00182               path = m.__path__
00183         except ImportError:
00184             return
00185 
00186 class PythonJob(Job):
00187     """
00188     A job that calls a Python method.
00189     """
00190     def __init__(self, module, method, argv, env, cwd, pycommandpath=None):
00191         self.module = module
00192         self.method = method
00193         self.argv = argv
00194         self.env = env
00195         self.cwd = cwd
00196         self.pycommandpath = pycommandpath or []
00197 
00198     def run(self):
00199         oldenv = os.environ
00200         try:
00201             os.chdir(self.cwd)
00202             os.environ = self.env
00203             if self.module not in sys.modules:
00204                 load_module_recursive(self.module,
00205                                       sys.path + self.pycommandpath)
00206             if self.module not in sys.modules:
00207                 print >>sys.stderr, "No module named '%s'" % self.module
00208                 return -127                
00209             m = sys.modules[self.module]
00210             if self.method not in m.__dict__:
00211                 print >>sys.stderr, "No method named '%s' in module %s" % (method, module)
00212                 return -127
00213             m.__dict__[self.method](self.argv)
00214         except PythonException, e:
00215             print >>sys.stderr, e
00216             return e.exitcode
00217         except:
00218             e = sys.exc_info()[1]
00219             if isinstance(e, SystemExit) and (e.code == 0 or e.code == '0'):
00220                 pass # sys.exit(0) is not a failure
00221             else:
00222                 print >>sys.stderr, e
00223                 print >>sys.stderr, traceback.print_exc()
00224                 return -127
00225         finally:
00226             os.environ = oldenv
00227         return 0
00228 
00229 def job_runner(job):
00230     """
00231     Run a job. Called in a Process pool.
00232     """
00233     return job.run()
00234 
00235 class ParallelContext(object):
00236     """
00237     Manages the parallel execution of processes.
00238     """
00239 
00240     _allcontexts = set()
00241     _condition = multiprocessing.Condition()
00242 
00243     def __init__(self, jcount):
00244         self.jcount = jcount
00245         self.exit = False
00246 
00247         self.processpool = multiprocessing.Pool(processes=jcount)
00248         self.threadpool = multiprocessing.dummy.Pool(processes=jcount)
00249         self.pending = [] # list of (cb, args, kwargs)
00250         self.running = [] # list of (subprocess, cb)
00251 
00252         self._allcontexts.add(self)
00253 
00254     def finish(self):
00255         assert len(self.pending) == 0 and len(self.running) == 0, "pending: %i running: %i" % (len(self.pending), len(self.running))
00256         self.processpool.close()
00257         self.threadpool.close()
00258         self.processpool.join()
00259         self.threadpool.join()
00260         self._allcontexts.remove(self)
00261 
00262     def run(self):
00263         while len(self.pending) and len(self.running) < self.jcount:
00264             cb, args, kwargs = self.pending.pop(0)
00265             cb(*args, **kwargs)
00266 
00267     def defer(self, cb, *args, **kwargs):
00268         assert self.jcount > 1 or not len(self.pending), "Serial execution error defering %r %r %r: currently pending %r" % (cb, args, kwargs, self.pending)
00269         self.pending.append((cb, args, kwargs))
00270 
00271     def _docall_generic(self, pool, job, cb, echo, justprint):
00272         if echo is not None:
00273             print echo
00274         processcb = job.get_callback(ParallelContext._condition)
00275         if justprint:
00276             processcb(0)
00277         else:
00278             pool.apply_async(job_runner, args=(job,), callback=processcb)
00279         self.running.append((job, cb))
00280 
00281     def call(self, argv, shell, env, cwd, cb, echo, justprint=False, executable=None):
00282         """
00283         Asynchronously call the process
00284         """
00285 
00286         job = PopenJob(argv, executable=executable, shell=shell, env=env, cwd=cwd)
00287         self.defer(self._docall_generic, self.threadpool, job, cb, echo, justprint)
00288 
00289     def call_native(self, module, method, argv, env, cwd, cb,
00290                     echo, justprint=False, pycommandpath=None):
00291         """
00292         Asynchronously call the native function
00293         """
00294 
00295         job = PythonJob(module, method, argv, env, cwd, pycommandpath)
00296         self.defer(self._docall_generic, self.processpool, job, cb, echo, justprint)
00297 
00298     @staticmethod
00299     def _waitany(condition):
00300         def _checkdone():
00301             jobs = []
00302             for c in ParallelContext._allcontexts:
00303                 for i in xrange(0, len(c.running)):
00304                     if c.running[i][0].done:
00305                         jobs.append(c.running[i])
00306                 for j in jobs:
00307                     if j in c.running:
00308                         c.running.remove(j)
00309             return jobs
00310 
00311         # We must acquire the lock, and then check to see if any jobs have
00312         # finished.  If we don't check after acquiring the lock it's possible
00313         # that all outstanding jobs will have completed before we wait and we'll
00314         # wait for notifications that have already occurred.
00315         condition.acquire()
00316         jobs = _checkdone()
00317 
00318         if jobs == []:
00319             condition.wait()
00320             jobs = _checkdone()
00321 
00322         condition.release()
00323 
00324         return jobs
00325         
00326     @staticmethod
00327     def spin():
00328         """
00329         Spin the 'event loop', and never return.
00330         """
00331 
00332         while True:
00333             clist = list(ParallelContext._allcontexts)
00334             for c in clist:
00335                 c.run()
00336 
00337             dowait = util.any((len(c.running) for c in ParallelContext._allcontexts))
00338             if dowait:
00339                 # Wait on local jobs first for perf
00340                 for job, cb in ParallelContext._waitany(ParallelContext._condition):
00341                     cb(job.exitcode)
00342             else:
00343                 assert any(len(c.pending) for c in ParallelContext._allcontexts)
00344 
00345 def makedeferrable(usercb, **userkwargs):
00346     def cb(*args, **kwargs):
00347         kwargs.update(userkwargs)
00348         return usercb(*args, **kwargs)
00349 
00350     return cb
00351 
00352 _serialContext = None
00353 _parallelContext = None
00354 
00355 def getcontext(jcount):
00356     global _serialContext, _parallelContext
00357     if jcount == 1:
00358         if _serialContext is None:
00359             _serialContext = ParallelContext(1)
00360         return _serialContext
00361     else:
00362         if _parallelContext is None:
00363             _parallelContext = ParallelContext(jcount)
00364         return _parallelContext
00365