Back to index

nordugrid-arc-nox  1.1.0~rc6
Public Member Functions | Public Attributes | Private Member Functions
storage.ahash.replicatedahash.ReplicationStore Class Reference
Inheritance diagram for storage.ahash.replicatedahash.ReplicationStore:
Inheritance graph
[legend]
Collaboration diagram for storage.ahash.replicatedahash.ReplicationStore:
Collaboration graph
[legend]

List of all members.

Public Member Functions

def __init__
def lock
def unlock
def getDBFlags
def checkingThread
def dbReady
def getDBReady
def list
def get
def set
def restart
def terminate

Public Attributes

 eid
 my_replica
 repmgr
 my_url
 other_urls
 priority
 check_period
 cachesize
 dbenv_ready
 dbp
 database
 dbenv
 dbenv_flags
 deadlock_retries
 sleeptime
 datadir
 non_existent_object
 llock

Private Member Functions

def __configureReplication

Detailed Description

Wrapper class for enabling replication in TransDBStore

Definition at line 133 of file replicatedahash.py.


Constructor & Destructor Documentation

def storage.ahash.replicatedahash.ReplicationStore.__init__ (   self,
  cfg,
  ahash_send,
  non_existent_object = {} 
)
Constructor of ReplicationStore.

RepDBStore(cfg, sendMessage, processMessage)

Definition at line 139 of file replicatedahash.py.

00139 
00140                  non_existent_object = {}):
00141         """ Constructor of ReplicationStore.
00142 
00143         RepDBStore(cfg, sendMessage, processMessage)
00144 
00145         """
00146         
00147         dbenv_flags = db.DB_CREATE | \
00148                       db.DB_RECOVER | \
00149                       db.DB_THREAD | \
00150                       db.DB_INIT_REP | \
00151                       db.DB_INIT_LOCK | \
00152                       db.DB_INIT_LOG | \
00153                       db.DB_INIT_MPOOL | \
00154                       db.DB_INIT_TXN    
00155 
00156         storecfg = XMLTree(from_tree = 
00157                            ['StoreCfg', 
00158                             [('DataDir', 
00159                               (str(cfg.Get('LocalDir')))),
00160                               ('Priority', (int(str(cfg.Get('Priority'))))),
00161                                ('CheckPeriod',(5)),
00162                               ('DBEnvFlags',
00163                                (str(dbenv_flags)))
00164                             ]])
00165         storecfg = arc.XMLNode(storecfg.pretty_xml()) 
00166 
00167         # db environment is opened in TransDBStore.__init__
00168         TransDBStore.__init__(self, storecfg, non_existent_object)
00169         self.dbReady(False)
00170         # configure replication prior to initializing TransDBStore
00171         self.__configureReplication(cfg)
00172 
00173 
00174         log.msg(arc.VERBOSE, "Initialized replication environment")
00175 
00176         # eid is a local environment id, so I will be number 1
00177         self.eid = 1
00178         self.my_replica = {'url'      : self.my_url,
00179                            'id'       : self.eid,
00180                            'status'   : 'online'}
00181         other_replicas = []
00182         other_eid = self.eid
00183         for url in self.other_urls:
00184             other_eid += 1
00185             other_replicas +=  [{'url'      : url,
00186                                  'id'       : other_eid,
00187                                  'status'   : 'offline'}]
00188 
00189         # start replication manager
00190         self.repmgr = ReplicationManager(ahash_send, self.dbenv, 
00191                                          self.my_replica, other_replicas, self.dbReady)
00192         try:
00193             # start repmgr with 256 threads 
00194             # always do election of master
00195             threading.Thread(target = self.repmgr.start, args=[256, db.DB_REP_ELECTION]).start()
00196         except:
00197             log.msg(arc.ERROR, "Couldn't start replication manager.")
00198             log.msg()
00199             self.terminate()
00200 
00201         # thread for maintaining host list
00202         threading.Thread(target = self.checkingThread, args=[self.check_period]).start()
00203 

Here is the call graph for this function:


Member Function Documentation

Definition at line 204 of file replicatedahash.py.

00204 
00205     def __configureReplication(self, cfg):
00206         self.my_url = str(cfg.Get('MyURL'))
00207         self.other_urls = get_child_values_by_name(cfg, 'OtherURL')
00208         # make sure my_url is not in other_urls
00209         self.other_urls = [url for url in self.other_urls if url != self.my_url]
00210         try:
00211             # Priority used in elections: Higher priority means
00212             # higher chance of winning the master election
00213             # (i.e., becoming master)
00214             self.priority = int(str(cfg.Get('Priority')))
00215         except:
00216             log.msg(arc.WARNING, "Bad Priority value, using default 10")
00217             self.priority = 10
00218         try:
00219             # In seconds, how often should we check for connected nodes
00220             self.check_period = float(str(cfg.Get('CheckPeriod')))
00221         except:
00222             log.msg(arc.WARNING, "Could not find checking period, using default 10s")
00223             self.check_period = 10.0
00224         try:
00225             # amount of db cached in memory
00226             # Must be integer. Optionally in order of kB, MB, GB, TB or PB,
00227             # otherwise bytes is used
00228             cachesize = str(cfg.Get('CacheSize'))
00229             cachemultiplier = {}
00230             cachemultiplier['kB']=1024            
00231             cachemultiplier['MB']=1024**2      
00232             cachemultiplier['GB']=1024**3         
00233             cachemultiplier['TB']=1024**4         
00234             cachemultiplier['PB']=1024**5
00235             tmp_cachesize = 0
00236             for m in cachemultiplier:
00237                 if cachesize.endswith(m):
00238                     tmp_cachesize = int(cachesize.split(m)[0])*cachemultiplier[m]
00239             if tmp_cachesize == 0:
00240                 tmp_cachesize = int(cachesize)
00241             self.cachesize = tmp_cachesize
00242         except:
00243             log.msg(arc.WARNING, "Bad cache size or no cache size configured, using 10MB")
00244             self.cachesize = 10*(1024**2)
00245         self.dbenv.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
00246         self.dbenv.rep_set_timeout(db.DB_REP_ACK_TIMEOUT, 5000000)
00247         self.dbenv.rep_set_priority(self.priority)
00248         self.dbenv.rep_set_config(db.DB_REP_CONF_BULK, True)
00249 

Here is the caller graph for this function:

Definition at line 294 of file replicatedahash.py.

00294 
00295     def checkingThread(self, period):
00296         time.sleep(10)
00297 
00298         while True:
00299             log.msg(arc.VERBOSE, "checkingThread slept %d s"%period)
00300             # self.site_list is set in event_callback when elected to master
00301             # and deleted if not elected in the event of election
00302             try:
00303                 if self.repmgr.isMaster():
00304                     # get list of all registered client sites
00305                     site_list = self.repmgr.getSiteList()
00306                     new_obj = {}
00307                     url = self.my_replica['url']
00308                     status = self.my_replica['status']
00309                     new_obj[('master', "%s:%s"%(url,status))] = url
00310                     # we are only interested in connected clients here
00311                     client_list = [(client['url'], client['status'])
00312                                    for id, client in site_list.items()
00313                                    if client['url'] != self.my_replica['url']
00314                                    ]
00315                     for (url, status) in client_list:
00316                         new_obj[('client', "%s:%s"%(url, status))] = url
00317                     # store the site list
00318                     while not self.lock(False):
00319                         time.sleep(0.2)
00320                     self.set(ahash_list_guid, new_obj)
00321                     self.unlock()
00322                     log.msg(arc.VERBOSE, "wrote ahash list %s"%str(new_obj))
00323                     if not self.dbenv_ready:
00324                         log.msg(arc.VERBOSE, "but dbenv wasn't ready.")
00325             except:
00326                 log.msg()
00327             time.sleep(period)
            

Here is the call graph for this function:

Here is the caller graph for this function:

def storage.ahash.replicatedahash.ReplicationStore.dbReady (   self,
  db_is_ready = True 
)
Callback function used by repmgr to notify me that I can accept requests

Definition at line 328 of file replicatedahash.py.

00328 
00329     def dbReady(self, db_is_ready=True):
00330         """
00331         Callback function used by repmgr to notify me that I can accept requests
00332         """
00333         self.dbenv_ready = db_is_ready
        

Here is the caller graph for this function:

def arcom.store.transdbstore.TransDBStore.get (   self,
  ID 
) [inherited]
Returns the object with the given ID.

get(ID)

'ID' is the ID of the requested object.
If there is no object with this ID, returns the given non_existent_object value.

Definition at line 212 of file transdbstore.py.

00212 
00213     def get(self, ID):
00214         """ Returns the object with the given ID.
00215 
00216         get(ID)
00217 
00218         'ID' is the ID of the requested object.
00219         If there is no object with this ID, returns the given non_existent_object value.
00220         """
00221 
00222         if not self.getDBReady():
00223             raise db.DBError, "db not ready"
00224 
00225         self.dbp = self.__opendb(self.dbp)
00226 
00227         try:
00228             object = self.dbp.get(ID)
00229             try:
00230                 # using cPickle.loads for loading
00231                 object = cPickle.loads(object)
00232             except:
00233                 log.msg()
00234             if not object:
00235                 return copy.deepcopy(self.non_existent_object)
00236             return object
00237         #todo: handle all errors, e.g., deadlock and dead handle
00238         except db.DBNotFoundError:
00239             return copy.deepcopy(self.non_existent_object)
00240         except db.DBKeyEmptyError:
00241             return copy.deepcopy(self.non_existent_object)
00242         except db.DBLockDeadlockError:
00243             log.msg(arc.INFO, "Got deadlock error")            
00244             log.msg()
00245             raise db.DBError, "db deadlock"
00246         except db.DBRepHandleDeadError:
00247             log.msg(arc.INFO, "Got rep_dead_handle error")
00248             self.__err()
00249             self.dbp = None
00250             raise
00251         except db.DBError, msg:
00252             self.__del__()
00253             log.msg()
00254             log.msg(arc.ERROR, "Error getting %s"%ID)
00255             sys.exit(1)
00256             raise db.DBError, "Error listing db"
        

Here is the call graph for this function:

Here is the caller graph for this function:

Reimplemented from arcom.store.transdbstore.TransDBStore.

Definition at line 285 of file replicatedahash.py.

00285 
00286     def getDBFlags(self):
00287             # only master can create db
00288             # DB_AUTO_COMMIT ensures that all db modifications will
00289             # automatically be enclosed in transactions
00290             return (self.repmgr.isMaster() and 
00291                     db.DB_CREATE | db.DB_AUTO_COMMIT or 
00292                     db.DB_AUTO_COMMIT)
00293 

Overload of TransDBStore.getDBReady

Reimplemented from arcom.store.transdbstore.TransDBStore.

Definition at line 334 of file replicatedahash.py.

00334 
00335     def getDBReady(self):
00336         """
00337         Overload of TransDBStore.getDBReady
00338         """
00339         return self.dbenv_ready

def arcom.store.transdbstore.TransDBStore.list (   self) [inherited]
List the IDs of the existing entries.

list()

Returns a list of IDs.

Definition at line 187 of file transdbstore.py.

00187 
00188     def list(self):
00189         """ List the IDs of the existing entries.
00190         
00191         list()
00192         
00193         Returns a list of IDs.
00194         """
00195 
00196         if not self.getDBReady():
00197             raise db.DBError, "db not ready"
00198         self.dbp = self.__opendb(self.dbp)
00199         try:
00200             object = self.dbp.keys()
00201             return object
00202         except db.DBLockDeadlockError:
00203             log.msg(arc.INFO, "Got deadlock error")            
00204             log.msg()
00205             raise db.DBError, "db deadlock"
00206         except db.DBError:
00207             self.__del__()
00208             log.msg()
00209             log.msg(arc.ERROR, "Error listing db")
00210             sys.exit(1)
00211             raise db.DBError, "Error listing db"

Here is the call graph for this function:

def storage.ahash.replicatedahash.ReplicationStore.lock (   self,
  blocking = True 
)
Acquire the lock.

lock(blocking = True)

'blocking': if blocking is True, then this only returns when the lock is acquired.
If it is False, then it returns immediately with False if the lock is not available,
or with True if it could be acquired.

Reimplemented from arcom.store.basestore.BaseStore.

Definition at line 250 of file replicatedahash.py.

00250 
00251     def lock(self, blocking = True):
00252         """ Acquire the lock.
00253 
00254         lock(blocking = True)
00255 
00256         'blocking': if blocking is True, then this only returns when the lock is acquired.
00257         If it is False, then it returns immediately with False if the lock is not available,
00258         or with True if it could be acquired.
00259         """
00260         # sleep a little bit to avoid same thread getting lock all the time
00261         time.sleep(random.random()*0.05)
00262         if not self.getDBReady():
00263             time.sleep(0.2)
00264             return False
00265         if self.repmgr.isMaster():
00266             # only lock if master
00267             locked = self.llock.acquire(blocking)
00268             log.msg(arc.VERBOSE, "master locking", locked)
00269             return locked
00270         else:
00271             return True

Here is the call graph for this function:

Definition at line 317 of file transdbstore.py.

00317 
00318     def restart(self):
00319         try:
00320             self.dbenv.close(0)
00321             log.msg(arc.INFO, "db environment closed")
00322             self.dbenv = db.DBEnv(0)
00323             self.__opendbenv()
00324         except db.DBError, msg:
00325             log.msg()
00326             log.msg(arc.ERROR, "error closing environment")
00327             #raise db.DBError, msg

Here is the call graph for this function:

def arcom.store.transdbstore.TransDBStore.set (   self,
  ID,
  object 
) [inherited]
Stores an object with the given ID..

set(ID, object)

'ID' is the ID of the object
'object' is the object itself
If there is already an object with this ID it will be overwritten completely.
If deadlock is caught, retry DeadlockRetries times

Definition at line 257 of file transdbstore.py.

00257 
00258     def set(self, ID, object):
00259         """ Stores an object with the given ID..
00260 
00261         set(ID, object)
00262 
00263         'ID' is the ID of the object
00264         'object' is the object itself
00265         If there is already an object with this ID it will be overwritten completely.
00266         If deadlock is caught, retry DeadlockRetries times
00267         """
00268         if not ID:
00269             raise Exception, 'ID is empty'
00270 
00271         if not self.getDBReady():
00272             return
00273         self.dbp = self.__opendb(self.dbp)
00274         retry = True
00275         retry_count = 0
00276 
00277         # try deadlock_retries times if receiving catching DBLockDeadlockError
00278         while retry:
00279             try:
00280                 if object == None:
00281                     self.dbp.delete(ID)
00282                 else:
00283                     # note that object needs to be converted to string
00284                     # using cPickle.dumps for converting
00285                     self.dbp.put(ID, cPickle.dumps(object, cPickle.HIGHEST_PROTOCOL))
00286                 retry = False
00287             except db.DBLockDeadlockError:
00288                 log.msg(arc.INFO, "Got deadlock error")
00289                 # need to close transaction handle as well
00290                 time.sleep(0.2)
00291                 if retry_count < self.deadlock_retries:
00292                     log.msg(arc.VERBOSE, "got DBLockDeadlockError")
00293                     retry_count += 1
00294                     log.msg(arc.VERBOSE, "retrying transaction", retry_count)
00295                     retry = True
00296                 else:
00297                     log.msg(arc.VERBOSE, "Deadlock exception, giving up...")
00298                     retry = False
00299             except db.DBRepHandleDeadError:
00300                 log.msg(arc.INFO, "Got rep_dead_handle error")            
00301                 #log.msg()
00302                 self.__err()
00303                 self.dbp = None
00304                 raise
00305             except db.DBAccessError:
00306                 log.msg(arc.WARNING,"Read-only db. I'm not a master.")
00307                 raise
00308             except db.DBNotFoundError:
00309                 log.msg(arc.WARNING, "cannot delete non-existing entries")
00310                 retry = False
00311             except:
00312                 self.__del__()
00313                 log.msg()
00314                 log.msg(arc.ERROR, "Error setting %s"%ID)
00315                 retry = False
00316                 sys.exit(1)

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 328 of file transdbstore.py.

00328 
00329     def terminate(self):
00330         try:
00331             self.dbenv.close(0)
00332             log.msg(arc.INFO, "db environment closed")
00333         except db.DBError, msg:
00334             log.msg()
00335             log.msg(arc.ERROR, "error closing environment")
00336             #raise db.DBError, msg

Here is the caller graph for this function:

Release the lock.

unlock()

Reimplemented from arcom.store.basestore.BaseStore.

Definition at line 272 of file replicatedahash.py.

00272 
00273     def unlock(self):
00274         """ Release the lock.
00275 
00276         unlock()
00277         """
00278         log.msg(arc.VERBOSE, "unlocking", self.repmgr.isMaster())
00279         try:
00280             self.llock.release()
00281             log.msg(arc.VERBOSE, "unlocked")
00282         except:
00283             log.msg(arc.VERBOSE, "couldn't unlock")
00284             pass


Member Data Documentation

Reimplemented from arcom.store.transdbstore.TransDBStore.

Definition at line 240 of file replicatedahash.py.

Definition at line 219 of file replicatedahash.py.

Definition at line 46 of file transdbstore.py.

Definition at line 18 of file basestore.py.

Definition at line 47 of file transdbstore.py.

Definition at line 66 of file transdbstore.py.

Definition at line 332 of file replicatedahash.py.

Definition at line 44 of file transdbstore.py.

Definition at line 75 of file transdbstore.py.

Definition at line 176 of file replicatedahash.py.

Definition at line 25 of file basestore.py.

Definition at line 177 of file replicatedahash.py.

Definition at line 205 of file replicatedahash.py.

Definition at line 20 of file basestore.py.

Definition at line 206 of file replicatedahash.py.

Definition at line 213 of file replicatedahash.py.

Definition at line 189 of file replicatedahash.py.

Definition at line 87 of file transdbstore.py.


The documentation for this class was generated from the following file: