Back to index

enigmail  1.4.3
Public Member Functions | Static Public Member Functions | Public Attributes | Private Member Functions | Static Private Member Functions | Static Private Attributes
pymake.process.ParallelContext Class Reference

List of all members.

Public Member Functions

def __init__
def finish
def run
def defer
def call
def call_native

Static Public Member Functions

def spin

Public Attributes

 jcount
 exit
 processpool
 threadpool
 pending
 running

Private Member Functions

def _docall_generic

Static Private Member Functions

def _waitany

Static Private Attributes

tuple _allcontexts = set()
tuple _condition = multiprocessing.Condition()

Detailed Description

Manages the parallel execution of processes.

Definition at line 235 of file process.py.


Constructor & Destructor Documentation

def pymake.process.ParallelContext.__init__ (   self,
  jcount 
)

Definition at line 243 of file process.py.

00243 
00244     def __init__(self, jcount):
00245         self.jcount = jcount
00246         self.exit = False
00247 
00248         self.processpool = multiprocessing.Pool(processes=jcount)
00249         self.threadpool = multiprocessing.dummy.Pool(processes=jcount)
00250         self.pending = [] # list of (cb, args, kwargs)
00251         self.running = [] # list of (subprocess, cb)
00252 
00253         self._allcontexts.add(self)


Member Function Documentation

def pymake.process.ParallelContext._docall_generic (   self,
  pool,
  job,
  cb,
  echo,
  justprint 
) [private]

Definition at line 271 of file process.py.

00271 
00272     def _docall_generic(self, pool, job, cb, echo, justprint):
00273         if echo is not None:
00274             print echo
00275         processcb = job.get_callback(ParallelContext._condition)
00276         if justprint:
00277             processcb(0)
00278         else:
00279             pool.apply_async(job_runner, args=(job,), callback=processcb)
00280         self.running.append((job, cb))

Here is the caller graph for this function:

def pymake.process.ParallelContext._waitany (   condition) [static, private]

Definition at line 299 of file process.py.

00299 
00300     def _waitany(condition):
00301         def _checkdone():
00302             jobs = []
00303             for c in ParallelContext._allcontexts:
00304                 for i in xrange(0, len(c.running)):
00305                     if c.running[i][0].done:
00306                         jobs.append(c.running[i])
00307                 for j in jobs:
00308                     if j in c.running:
00309                         c.running.remove(j)
00310             return jobs
00311 
00312         # We must acquire the lock, and then check to see if any jobs have
00313         # finished.  If we don't check after acquiring the lock it's possible
00314         # that all outstanding jobs will have completed before we wait and we'll
00315         # wait for notifications that have already occurred.
00316         condition.acquire()
00317         jobs = _checkdone()
00318 
00319         if jobs == []:
00320             condition.wait()
00321             jobs = _checkdone()
00322 
00323         condition.release()
00324 
00325         return jobs
        
def pymake.process.ParallelContext.call (   self,
  argv,
  shell,
  env,
  cwd,
  cb,
  echo,
  justprint = False,
  executable = None 
)
Asynchronously call the process

Definition at line 281 of file process.py.

00281 
00282     def call(self, argv, shell, env, cwd, cb, echo, justprint=False, executable=None):
00283         """
00284         Asynchronously call the process
00285         """
00286 
00287         job = PopenJob(argv, executable=executable, shell=shell, env=env, cwd=cwd)
00288         self.defer(self._docall_generic, self.threadpool, job, cb, echo, justprint)

Here is the call graph for this function:

def pymake.process.ParallelContext.call_native (   self,
  module,
  method,
  argv,
  env,
  cwd,
  cb,
  echo,
  justprint = False,
  pycommandpath = None 
)
Asynchronously call the native function

Definition at line 290 of file process.py.

00290 
00291                     echo, justprint=False, pycommandpath=None):
00292         """
00293         Asynchronously call the native function
00294         """
00295 
00296         job = PythonJob(module, method, argv, env, cwd, pycommandpath)
00297         self.defer(self._docall_generic, self.processpool, job, cb, echo, justprint)

Here is the call graph for this function:

Here is the caller graph for this function:

def pymake.process.ParallelContext.defer (   self,
  cb,
  args,
  kwargs 
)

Definition at line 267 of file process.py.

00267 
00268     def defer(self, cb, *args, **kwargs):
00269         assert self.jcount > 1 or not len(self.pending), "Serial execution error defering %r %r %r: currently pending %r" % (cb, args, kwargs, self.pending)
00270         self.pending.append((cb, args, kwargs))

Here is the caller graph for this function:

Definition at line 254 of file process.py.

00254 
00255     def finish(self):
00256         assert len(self.pending) == 0 and len(self.running) == 0, "pending: %i running: %i" % (len(self.pending), len(self.running))
00257         self.processpool.close()
00258         self.threadpool.close()
00259         self.processpool.join()
00260         self.threadpool.join()
00261         self._allcontexts.remove(self)

Definition at line 262 of file process.py.

00262 
00263     def run(self):
00264         while len(self.pending) and len(self.running) < self.jcount:
00265             cb, args, kwargs = self.pending.pop(0)
00266             cb(*args, **kwargs)

Spin the 'event loop', and never return.

Definition at line 327 of file process.py.

00327 
00328     def spin():
00329         """
00330         Spin the 'event loop', and never return.
00331         """
00332 
00333         while True:
00334             clist = list(ParallelContext._allcontexts)
00335             for c in clist:
00336                 c.run()
00337 
00338             dowait = util.any((len(c.running) for c in ParallelContext._allcontexts))
00339             if dowait:
00340                 # Wait on local jobs first for perf
00341                 for job, cb in ParallelContext._waitany(ParallelContext._condition):
00342                     cb(job.exitcode)
00343             else:
00344                 assert any(len(c.pending) for c in ParallelContext._allcontexts)

Here is the call graph for this function:


Member Data Documentation

tuple pymake.process.ParallelContext._allcontexts = set() [static, private]

Definition at line 240 of file process.py.

tuple pymake.process.ParallelContext._condition = multiprocessing.Condition() [static, private]

Definition at line 241 of file process.py.

Definition at line 245 of file process.py.

Definition at line 244 of file process.py.

Definition at line 249 of file process.py.

Definition at line 247 of file process.py.

Definition at line 250 of file process.py.

Definition at line 248 of file process.py.


The documentation for this class was generated from the following file: