Back to index

nordugrid-arc-nox  1.1.0~rc6
transdbstore.py
Go to the documentation of this file.
00001 import arc
00002 
00003 import sys, exceptions, errno, copy, time
00004 import thread
00005 import cPickle
00006 from arcom.store.basestore import BaseStore
00007 
00008 from arcom.logger import Logger
00009 log = Logger(arc.Logger(arc.Logger_getRootLogger(), 'arcom.TransDBStore'))
00010 
00011 try:
00012     from bsddb3 import db
00013 except:
00014     try:
00015         from bsddb import db
00016     except:
00017         log.msg(arc.FATAL, "Could not import module bsddb. This is required for TransDBStore.")
00018         raise Exception, "Could not import module bsddb. This is required for TransDBStore."
00019 
00020 class TransConfigInfo:
00021     """
00022     Class containing info about the TransDBStore
00023     """
00024     def __init__(self):
00025         self.home = "TESTDIR"
00026 
00027 
00028 class TransDBStore(BaseStore):
00029     
00030     def __init__(self, storecfg, non_existent_object = {}):
00031         """ Constructor of TransDBStore.
00032 
00033         TransDBStore(storecfg)
00034 
00035         'storecfg' is an XMLNode with a 'DataDir'
00036         'non_existent_object' will be returned if an object not found
00037         """
00038 
00039         BaseStore.__init__(self, storecfg, non_existent_object)
00040         log.msg(arc.VERBOSE, "TransDBStore constructor called")
00041         log.msg(arc.VERBOSE, "datadir:", self.datadir)
00042 
00043         # db and transaction pointers
00044         self.dbp  = None
00045         
00046         self.database  = "store.db"
00047         self.dbenv = db.DBEnv(0)
00048         
00049         self.__configureDB(storecfg)
00050         self.dbenv.set_cachesize(0, self.cachesize, 0)
00051         # if key not found, raise DBNotFoundError:
00052         self.dbenv.set_get_returns_none(0) 
00053 
00054         self.__opendbenv()
00055         log.msg(arc.INFO, "db environment opened")
00056     
00057     def __del__(self):
00058         """
00059         exit gracefully, i.e., close dbp and dbenv
00060         """
00061         self.__err()
00062         self.terminate()
00063 
00064     def __configureDB(self, storecfg):
00065         try:
00066             self.dbenv_flags = int(str(storecfg.Get("DBEnvFlags")))
00067         except:
00068             self.dbenv_flags = db.DB_CREATE | \
00069                                db.DB_RECOVER | \
00070                                db.DB_INIT_LOCK | \
00071                                db.DB_INIT_LOG | \
00072                                db.DB_INIT_MPOOL | \
00073                                db.DB_INIT_TXN
00074         try:
00075             self.deadlock_retries = int(str(storecfg.Get('DeadlockRetries')))
00076         except:
00077             log.msg(arc.WARNING, "couldn't find DeadlockRetries, using 5 as default")
00078             self.deadlock_retries = 5
00079         try:
00080             self.cachesize = int(str(storecfg.Get('CacheSize')))
00081         except:
00082             self.cachesize = 10 * 1024 * 1024
00083             log.msg(arc.WARNING, "couldn't find CacheSize, using %d as default"%self.cachesize)
00084         try:
00085             self.cachesize = int(str(storecfg.Get('SleepTime')))
00086         except:
00087             self.sleeptime = 2
00088             log.msg(arc.WARNING, "couldn't find SleepTime, using %d as default"%self.cachesize)
00089 
00090     def __err(self):
00091         """
00092         close the db pointer dpb if not closed already
00093         """
00094         if self.dbp != None:
00095             self.dbp.close(db.DB_NOSYNC)
00096             log.msg(arc.INFO, "database closed")
00097             #log.msg()
00098 
00099     def __opendbenv(self):
00100         """
00101         open the db env
00102         """
00103         try:
00104             self.dbenv.open(self.datadir, self.dbenv_flags, 0)
00105         except db.DBError, msg:
00106             log.msg()
00107             log.msg(arc.ERROR, "Caught exception during DB environment open.")
00108             self.terminate()
00109             #raise db.DBError, msg
00110         
00111     def getDBFlags(self):
00112         # DB_AUTO_COMMIT ensures that all db modifications will
00113         # automatically be enclosed in transactions
00114         return db.DB_CREATE | db.DB_AUTO_COMMIT
00115 
00116     def getDBReady(self):
00117         """
00118         Dummy method always assuming db is ready
00119         """
00120         return True
00121 
00122     def __opendb(self, dbp):
00123         """
00124         Open the db using dbp as db handle
00125         """
00126         while dbp == None:
00127             dbp = db.DB(self.dbenv,0)
00128                         
00129             # todo: db performance can be fine-tuned with
00130             # proper pagesize, berkeley-db/db/ref/am_conf/pagesize.html
00131 #            try:
00132 #                dbp.set_pagesize(512)
00133 #            except db.DBError, msg:
00134 #                self.__err()
00135 #                log.msg()
00136 #                log.msg(arc.ERROR, "Error in dbp.set_pagesize")
00137 #                self.terminate()
00138             try:
00139                 # using linear hashing for db
00140                 dbp.open(self.database, dbtype = db.DB_HASH, 
00141                          flags = self.getDBFlags())
00142             except db.DBError, (errnum, strerror): 
00143                 # It is expected that this condition will be triggered when
00144                 # client sites starts up.
00145                 # It can take a while for the master site to be found and
00146                 # synced, and no DB will be available until then
00147                 if errnum == errno.ENOENT:
00148                     log.msg(arc.VERBOSE, "No stock db available yet - retrying")
00149                     log.msg()
00150                     try:
00151                         dbp.close(0)
00152                     except db.DBError, (errnum2, strerror2):
00153                         self.__del__()
00154                         
00155                         log.msg()
00156                         log.msg(arc.ERROR, "unexpected error closing after failed open")
00157                         sys.exit(1)
00158                         raise db.DBError, strerror2
00159                     dbp = None
00160                     time.sleep(self.sleeptime)
00161                     continue
00162                 elif errnum == db.DB_LOCK_DEADLOCK:
00163                     log.msg(arc.VERBOSE, "got deadlock - retrying")
00164                     try:
00165                         dbp.close(0)
00166                     except db.DBError, (errnum2, strerror2):
00167                         log.msg()
00168                         log.msg(arc.ERROR, "unexpected error closing after failed open")
00169                         #self.__del__()
00170                         
00171                         raise db.DBError, strerror2
00172                     dbp = None
00173                     time.sleep(self.sleeptime)
00174                     continue
00175                 else:
00176                     log.msg()
00177                     return None
00178                     #self.__del__()                    
00179             except:
00180                 log.msg()
00181                 return None
00182                 #self.__del__()
00183                 
00184         return dbp
00185 
00186     
00187     def list(self):
00188         """ List the IDs of the existing entries.
00189         
00190         list()
00191         
00192         Returns a list of IDs.
00193         """
00194 
00195         if not self.getDBReady():
00196             raise db.DBError, "db not ready"
00197         self.dbp = self.__opendb(self.dbp)
00198         try:
00199             object = self.dbp.keys()
00200             return object
00201         except db.DBLockDeadlockError:
00202             log.msg(arc.INFO, "Got deadlock error")            
00203             log.msg()
00204             raise db.DBError, "db deadlock"
00205         except db.DBError:
00206             self.__del__()
00207             log.msg()
00208             log.msg(arc.ERROR, "Error listing db")
00209             sys.exit(1)
00210             raise db.DBError, "Error listing db"
00211 
00212     def get(self, ID):
00213         """ Returns the object with the given ID.
00214 
00215         get(ID)
00216 
00217         'ID' is the ID of the requested object.
00218         If there is no object with this ID, returns the given non_existent_object value.
00219         """
00220 
00221         if not self.getDBReady():
00222             raise db.DBError, "db not ready"
00223 
00224         self.dbp = self.__opendb(self.dbp)
00225 
00226         try:
00227             object = self.dbp.get(ID)
00228             try:
00229                 # using cPickle.loads for loading
00230                 object = cPickle.loads(object)
00231             except:
00232                 log.msg()
00233             if not object:
00234                 return copy.deepcopy(self.non_existent_object)
00235             return object
00236         #todo: handle all errors, e.g., deadlock and dead handle
00237         except db.DBNotFoundError:
00238             return copy.deepcopy(self.non_existent_object)
00239         except db.DBKeyEmptyError:
00240             return copy.deepcopy(self.non_existent_object)
00241         except db.DBLockDeadlockError:
00242             log.msg(arc.INFO, "Got deadlock error")            
00243             log.msg()
00244             raise db.DBError, "db deadlock"
00245         except db.DBRepHandleDeadError:
00246             log.msg(arc.INFO, "Got rep_dead_handle error")
00247             self.__err()
00248             self.dbp = None
00249             raise
00250         except db.DBError, msg:
00251             self.__del__()
00252             log.msg()
00253             log.msg(arc.ERROR, "Error getting %s"%ID)
00254             sys.exit(1)
00255             raise db.DBError, "Error listing db"
00256         
00257     def set(self, ID, object):
00258         """ Stores an object with the given ID..
00259 
00260         set(ID, object)
00261 
00262         'ID' is the ID of the object
00263         'object' is the object itself
00264         If there is already an object with this ID it will be overwritten completely.
00265         If deadlock is caught, retry DeadlockRetries times
00266         """
00267         if not ID:
00268             raise Exception, 'ID is empty'
00269 
00270         if not self.getDBReady():
00271             return
00272         self.dbp = self.__opendb(self.dbp)
00273         retry = True
00274         retry_count = 0
00275 
00276         # try deadlock_retries times if receiving catching DBLockDeadlockError
00277         while retry:
00278             try:
00279                 if object == None:
00280                     self.dbp.delete(ID)
00281                 else:
00282                     # note that object needs to be converted to string
00283                     # using cPickle.dumps for converting
00284                     self.dbp.put(ID, cPickle.dumps(object, cPickle.HIGHEST_PROTOCOL))
00285                 retry = False
00286             except db.DBLockDeadlockError:
00287                 log.msg(arc.INFO, "Got deadlock error")
00288                 # need to close transaction handle as well
00289                 time.sleep(0.2)
00290                 if retry_count < self.deadlock_retries:
00291                     log.msg(arc.VERBOSE, "got DBLockDeadlockError")
00292                     retry_count += 1
00293                     log.msg(arc.VERBOSE, "retrying transaction", retry_count)
00294                     retry = True
00295                 else:
00296                     log.msg(arc.VERBOSE, "Deadlock exception, giving up...")
00297                     retry = False
00298             except db.DBRepHandleDeadError:
00299                 log.msg(arc.INFO, "Got rep_dead_handle error")            
00300                 #log.msg()
00301                 self.__err()
00302                 self.dbp = None
00303                 raise
00304             except db.DBAccessError:
00305                 log.msg(arc.WARNING,"Read-only db. I'm not a master.")
00306                 raise
00307             except db.DBNotFoundError:
00308                 log.msg(arc.WARNING, "cannot delete non-existing entries")
00309                 retry = False
00310             except:
00311                 self.__del__()
00312                 log.msg()
00313                 log.msg(arc.ERROR, "Error setting %s"%ID)
00314                 retry = False
00315                 sys.exit(1)
00316 
00317     def restart(self):
00318         try:
00319             self.dbenv.close(0)
00320             log.msg(arc.INFO, "db environment closed")
00321             self.dbenv = db.DBEnv(0)
00322             self.__opendbenv()
00323         except db.DBError, msg:
00324             log.msg()
00325             log.msg(arc.ERROR, "error closing environment")
00326             #raise db.DBError, msg
00327 
00328     def terminate(self):
00329         try:
00330             self.dbenv.close(0)
00331             log.msg(arc.INFO, "db environment closed")
00332         except db.DBError, msg:
00333             log.msg()
00334             log.msg(arc.ERROR, "error closing environment")
00335             #raise db.DBError, msg