Back to index

python-biopython  1.60
Async.py
Go to the documentation of this file.
00001 # Copyright 2007 by Tiago Antao <tiagoantao@gmail.com>.  All rights reserved.
00002 # This code is part of the Biopython distribution and governed by its
00003 # license.  Please see the LICENSE file that should have been included
00004 # as part of this package.
00005 
00006 
00007 """
00008 This modules allows for asynchronous execution of Fdist and
00009   spliting of loads.
00010 
00011 FDistAsync Allows for the execution of FDist.
00012 
00013 SplitFDist splits a single Fdist execution in several, taking advantage
00014     of multi-core architectures.
00015 
00016 """
00017 
00018 import os
00019 import shutil
00020 import thread
00021 from time import sleep
00022 from Bio.PopGen.Async import Local
00023 from Bio.PopGen.FDist.Controller import FDistController
00024 
00025 class FDistAsync(FDistController):
00026     """Asynchronous FDist execution.
00027     """
00028 
00029     def __init__(self, fdist_dir = "", ext = None):
00030         """Constructor.
00031 
00032         Parameters:
00033         fdist_dir - Where fdist can be found, if = "", then it
00034             should be on the path.
00035         ext - Extension of binary names (e.g. nothing on Unix,
00036               ".exe" on Windows
00037         """
00038         FDistController.__init__(self, fdist_dir, ext)
00039 
00040     def run_job(self, parameters, input_files):
00041         """Runs FDist asynchronously.
00042 
00043            Gets typical Fdist parameters from a dictionary and
00044            makes a "normal" call. This is run, normally, inside
00045            a separate thread.
00046         """
00047         npops = parameters['npops']
00048         nsamples = parameters['nsamples']
00049         fst = parameters['fst']
00050         sample_size = parameters['sample_size']
00051         mut = parameters.get('mut', 0)
00052         num_sims = parameters.get('num_sims', 20000)
00053         data_dir = parameters.get('data_dir', '.')
00054         is_dominant = parameters.get('is_dominant', False)
00055         theta = parameters.get('theta', 0.06)
00056         beta = parameters.get('beta', (0.25, 0.25))
00057         max_freq = parameters.get('max_freq', 0.99)
00058         fst = self.run_fdist(npops, nsamples, fst, sample_size,
00059             mut, num_sims, data_dir,
00060             is_dominant, theta, beta,
00061             max_freq)
00062         output_files = {}
00063         output_files['out.dat'] = open(data_dir + os.sep + 'out.dat', 'r')
00064         return fst, output_files
00065 
00066 class SplitFDist(object):
00067     """Splits a FDist run.
00068 
00069        The idea is to split a certain number of simulations in smaller
00070        numbers (e.g. 30.000 sims split in 30 packets of 1.000). This
00071        allows to run simulations in parallel, thus taking advantage
00072        of multi-core CPUs.
00073 
00074        Each SplitFDist object can only be used to run a single FDist
00075        simulation.
00076     """
00077     def __init__(self, report_fun = None,
00078         num_thr = 2, split_size = 1000, fdist_dir = '', ext = None):
00079         """Constructor.
00080 
00081            Parameters:
00082            report_fun - Function that is called when a single packet is
00083                run, it should have a single parameter: Fst.
00084            num_thr - Number of desired threads, typically the number
00085                of cores.
00086            split_size - Size that a full simulation will be split in.
00087            ext - Binary extension name (e.g. nothing on Unix, '.exe' on
00088                Windows).
00089         """
00090         self.async = Local.Local(num_thr)
00091         self.async.hooks['fdist'] = FDistAsync(fdist_dir, ext)
00092         self.report_fun = report_fun
00093         self.split_size = split_size
00094 
00095     #There might be races when reporting...
00096     def monitor(self):
00097         """Monitors and reports (using report_fun) execution.
00098 
00099            Every time a partial simulation ends, calls report_fun.
00100            IMPORTANT: monitor calls can be concurrent with other
00101            events, ie, a tasks might end while report_fun is being
00102            called. This means that report_fun should be consider that
00103            other events might be happening while it is running (it
00104            can call acquire/release if necessary).
00105         """
00106         while(True):
00107             sleep(1)
00108             self.async.access_ds.acquire()
00109             keys =  self.async.done.keys()[:]
00110             self.async.access_ds.release()
00111             for done in keys:
00112                 self.async.access_ds.acquire()
00113                 fst, files = self.async.done[done]
00114                 del self.async.done[done]
00115                 out_dat = files['out.dat']
00116                 f = open(self.data_dir + os.sep + 'out.dat','a')
00117                 f.writelines(out_dat.readlines())
00118                 f.close()
00119                 out_dat.close()
00120                 self.async.access_ds.release()
00121                 for file in os.listdir(self.parts[done]):
00122                     os.remove (self.parts[done] + os.sep + file)
00123                 os.rmdir(self.parts[done])
00124                 #print fst, out_dat
00125                 if self.report_fun:
00126                     self.report_fun(fst)
00127             self.async.access_ds.acquire()
00128             if len(self.async.waiting) == 0 and len(self.async.running) == 0 \
00129                and len(self.async.done) == 0:
00130                 break
00131             self.async.access_ds.release()
00132             #print 'R', self.async.running
00133             #print 'W', self.async.waiting
00134             #print 'R', self.async.running
00135 
00136     def acquire(self):
00137         """Allows the external acquisition of the lock.
00138         """
00139         self.async.access_ds.acquire()
00140 
00141     def release(self):
00142         """Allows the external release of the lock.
00143         """
00144         self.async.access_ds.release()
00145 
00146     #You can only run a fdist case at a time
00147     def run_fdist(self, npops, nsamples, fst, sample_size,
00148         mut = 0, num_sims = 20000, data_dir='.',
00149         is_dominant = False, theta = 0.06, beta = (0.25, 0.25),
00150         max_freq = 0.99):
00151         """Runs FDist.
00152 
00153            Parameters can be seen on FDistController.run_fdist.
00154 
00155            It will split a single execution in several parts and
00156            create separated data directories.
00157         """
00158         num_parts = num_sims/self.split_size
00159         self.parts = {}
00160         self.data_dir = data_dir
00161         for directory in range(num_parts):
00162            full_path = data_dir + os.sep + str(directory)
00163            try:
00164                os.mkdir(full_path)
00165            except OSError:
00166                pass #Its ok, if it is already there
00167            if "ss_file" in os.listdir(data_dir):
00168                shutil.copy(data_dir + os.sep + "ss_file", full_path)
00169            id = self.async.run_program('fdist', {
00170                'npops'       : npops,
00171                'nsamples'    : nsamples,
00172                'fst'         : fst,
00173                'sample_size' : sample_size,
00174                'mut'         : mut,
00175                'num_sims'    : self.split_size,
00176                'data_dir'    : full_path,
00177                'is_dominant' : is_dominant,
00178                'theta'       : theta,
00179                'beta'        : beta,
00180                'max_freq'    : max_freq 
00181            }, {})
00182            self.parts[id] = full_path
00183         thread.start_new_thread(self.monitor, ())