Back to index

nordugrid-arc-nox  1.1.0~rc6
replicatedahash.py
Go to the documentation of this file.
00001 """
00002 Replicated A-Hash
00003 ----
00004 
00005 Replicated prototype implementation of the A-Hash service.
00006 This service builds on the centralized A-Hash and 
00007 Berkeley DB High Availability.
00008 
00009 Methods:
00010     - get
00011     - change
00012     - sendMessage
00013     - processMessage
00014 Sample configuration:
00015 
00016         <Service name="pythonservice" id="ahash">
00017             <ClassName>storage.ahash.ahash.AHashService</ClassName>
00018             <AHashClass>storage.ahash.replicatedahash.ReplicatedAHash</AHashClass>
00019             <LocalDir>ahash_data</LocalDir>
00020             <MyURL>http://localhost:60000/RepAHash</MyURL>
00021             <OtherURL>http://otherhost:60000/RepAHash</OtherURL>
00022         </Service>
00023 """
00024 import arc
00025 import traceback
00026 import time, random
00027 import threading, thread
00028 import copy
00029 import base64
00030 import sys
00031 from arcom import get_child_values_by_name
00032 from arcom.threadpool import ThreadPool, ReadWriteLock
00033 from arcom.service import ahash_uri, node_to_data, get_child_nodes, parse_node, get_data_node
00034 from storage.common import create_metadata, ahash_list_guid
00035 from arcom.xmltree import XMLTree 
00036 from storage.ahash.ahash import CentralAHash
00037 from storage.client import AHashClient
00038 from arcom.store.transdbstore import TransDBStore
00039 from arcom.logger import Logger
00040 from arcom.security import parse_ssl_config
00041 log = Logger(arc.Logger(arc.Logger_getRootLogger(), 'Storage.ReplicatedAHash'))
00042 try:
00043     from bsddb3 import db
00044 except:
00045     try:
00046         from bsddb import db
00047     except:
00048         log.msg(arc.FATAL, "Could not import module bsddb. This is required for replicated A-Hash.")
00049         raise Exception, "Could not import module bsddb. This is required for replicated A-Hash."
00050 if (db.DB_VERSION_MAJOR == 4 and db.DB_VERSION_MINOR < 6) or db.DB_VERSION_MAJOR < 4:
00051     log.msg(arc.FATAL, "Berkeley DB is older than 4.6. Replicated A-Hash requires db4.6 or higher.")
00052     raise Exception, "Berkeley DB is older than 4.6. Replicated A-Hash requires db4.6 or higher."
00053 try:
00054     eid = db.DB_EID_BROADCAST
00055 except:
00056     log.msg(arc.FATAL, "Python module bsddb is older than 4.7.5. Replicated A-Hash requires bsddb version 4.7.5 or higher.")
00057     raise Exception, "Python module bsddb is older than 4.7.5. Replicated A-Hash requires bsddb version 4.7.5 or higher."
00058 
00059 class ReplicatedAHash(CentralAHash):
00060     """ A replicated implementation of the A-Hash service. """
00061 
00062     def __init__(self, cfg):
00063         """ The constructor of the ReplicatedAHash class.
00064 
00065         ReplicatedAHash(cfg)
00066 
00067         """
00068         log.msg(arc.VERBOSE, "ReplicatedAHash constructor called")
00069         # ssame ssl_config will be used for any ahash
00070         self.ssl_config = parse_ssl_config(cfg)
00071         self.store = None
00072         CentralAHash.__init__(self, cfg)
00073         self.ahashes = {}
00074         self.store = ReplicationStore(cfg, self.sendMessage)
00075         self.public_request_names = ['processMessage']
00076         # notify replication manager that communication is ready
00077         self.store.repmgr.comm_ready = True
00078 
00079     def sendMessage(self, url, repmsg):
00080         """
00081         Function used for callbacks from the communication framework
00082         of the replication manager
00083         Sends repmsg to url using HED
00084         """
00085         if not url in self.ahashes.keys():
00086             self.ahashes[url] = AHashClient(url, ssl_config=self.ssl_config,
00087                                             print_xml = False)
00088         ahash = self.ahashes[url]
00089         repmsg = base64.encodestring(str(repmsg))
00090         tree = XMLTree(from_tree = 
00091                        ('ahash:processMessage', [
00092                            ('ahash:msg', repmsg)
00093                            ]))
00094         log.msg(arc.VERBOSE, "sending message of length", len(repmsg), "to", url)
00095         msg = ahash.call(tree)
00096         ahash.reset()
00097         xml = ahash.xmlnode_class(msg)
00098         success = str(get_data_node(xml))
00099         log.msg(arc.VERBOSE, "sendt message, success=%s"%success)
00100         return success
00101 
00102     def newSOAPPayload(self):
00103         return arc.PayloadSOAP(self.ns)
00104 
00105     def processMessage(self, inpayload):
00106         """
00107         processing ahash replication message
00108         """
00109         log.msg(arc.VERBOSE, "processing message...")
00110         # get the grandchild of the root node, which is the 'changeRequestList'
00111         request_node = inpayload.Child()
00112         msg = eval(base64.decodestring(str(request_node.Get('msg'))))
00113         control = msg['control']
00114         record = msg['record']
00115         eid = msg['eid']
00116         retlsn = msg['lsn']
00117         sender = msg['sender']
00118         msgID = msg['msgID']
00119 
00120         resp = self.store.repmgr.processMessage(control, record, eid, retlsn, 
00121                                             sender, msgID)
00122         # prepare the response payload
00123         out = self.newSOAPPayload()
00124         # create the 'changeResponse' node
00125         response_node = out.NewChild('ahash:processResponse')
00126         # create an XMLTree for the response
00127         tree = XMLTree(from_tree = ('ahash:success', resp))
00128         # add the XMLTree to the XMLNode
00129         tree.add_to_node(response_node)
00130         log.msg(arc.VERBOSE, "processing message... Finished")
00131         return out
00132 
00133 class ReplicationStore(TransDBStore):
00134     """
00135     Wrapper class for enabling replication in TransDBStore
00136     """
00137     
00138     def __init__(self, cfg, ahash_send, 
00139                  non_existent_object = {}):
00140         """ Constructor of ReplicationStore.
00141 
00142         RepDBStore(cfg, sendMessage, processMessage)
00143 
00144         """
00145         
00146         dbenv_flags = db.DB_CREATE | \
00147                       db.DB_RECOVER | \
00148                       db.DB_THREAD | \
00149                       db.DB_INIT_REP | \
00150                       db.DB_INIT_LOCK | \
00151                       db.DB_INIT_LOG | \
00152                       db.DB_INIT_MPOOL | \
00153                       db.DB_INIT_TXN    
00154 
00155         storecfg = XMLTree(from_tree = 
00156                            ['StoreCfg', 
00157                             [('DataDir', 
00158                               (str(cfg.Get('LocalDir')))),
00159                               ('Priority', (int(str(cfg.Get('Priority'))))),
00160                                ('CheckPeriod',(5)),
00161                               ('DBEnvFlags',
00162                                (str(dbenv_flags)))
00163                             ]])
00164         storecfg = arc.XMLNode(storecfg.pretty_xml()) 
00165 
00166         # db environment is opened in TransDBStore.__init__
00167         TransDBStore.__init__(self, storecfg, non_existent_object)
00168         self.dbReady(False)
00169         # configure replication prior to initializing TransDBStore
00170         self.__configureReplication(cfg)
00171 
00172 
00173         log.msg(arc.VERBOSE, "Initialized replication environment")
00174 
00175         # eid is a local environment id, so I will be number 1
00176         self.eid = 1
00177         self.my_replica = {'url'      : self.my_url,
00178                            'id'       : self.eid,
00179                            'status'   : 'online'}
00180         other_replicas = []
00181         other_eid = self.eid
00182         for url in self.other_urls:
00183             other_eid += 1
00184             other_replicas +=  [{'url'      : url,
00185                                  'id'       : other_eid,
00186                                  'status'   : 'offline'}]
00187 
00188         # start replication manager
00189         self.repmgr = ReplicationManager(ahash_send, self.dbenv, 
00190                                          self.my_replica, other_replicas, self.dbReady)
00191         try:
00192             # start repmgr with 256 threads 
00193             # always do election of master
00194             threading.Thread(target = self.repmgr.start, args=[256, db.DB_REP_ELECTION]).start()
00195         except:
00196             log.msg(arc.ERROR, "Couldn't start replication manager.")
00197             log.msg()
00198             self.terminate()
00199 
00200         # thread for maintaining host list
00201         threading.Thread(target = self.checkingThread, args=[self.check_period]).start()
00202 
00203 
00204     def __configureReplication(self, cfg):
00205         self.my_url = str(cfg.Get('MyURL'))
00206         self.other_urls = get_child_values_by_name(cfg, 'OtherURL')
00207         # make sure my_url is not in other_urls
00208         self.other_urls = [url for url in self.other_urls if url != self.my_url]
00209         try:
00210             # Priority used in elections: Higher priority means
00211             # higher chance of winning the master election
00212             # (i.e., becoming master)
00213             self.priority = int(str(cfg.Get('Priority')))
00214         except:
00215             log.msg(arc.WARNING, "Bad Priority value, using default 10")
00216             self.priority = 10
00217         try:
00218             # In seconds, how often should we check for connected nodes
00219             self.check_period = float(str(cfg.Get('CheckPeriod')))
00220         except:
00221             log.msg(arc.WARNING, "Could not find checking period, using default 10s")
00222             self.check_period = 10.0
00223         try:
00224             # amount of db cached in memory
00225             # Must be integer. Optionally in order of kB, MB, GB, TB or PB,
00226             # otherwise bytes is used
00227             cachesize = str(cfg.Get('CacheSize'))
00228             cachemultiplier = {}
00229             cachemultiplier['kB']=1024            
00230             cachemultiplier['MB']=1024**2      
00231             cachemultiplier['GB']=1024**3         
00232             cachemultiplier['TB']=1024**4         
00233             cachemultiplier['PB']=1024**5
00234             tmp_cachesize = 0
00235             for m in cachemultiplier:
00236                 if cachesize.endswith(m):
00237                     tmp_cachesize = int(cachesize.split(m)[0])*cachemultiplier[m]
00238             if tmp_cachesize == 0:
00239                 tmp_cachesize = int(cachesize)
00240             self.cachesize = tmp_cachesize
00241         except:
00242             log.msg(arc.WARNING, "Bad cache size or no cache size configured, using 10MB")
00243             self.cachesize = 10*(1024**2)
00244         self.dbenv.repmgr_set_ack_policy(db.DB_REPMGR_ACKS_ALL)
00245         self.dbenv.rep_set_timeout(db.DB_REP_ACK_TIMEOUT, 5000000)
00246         self.dbenv.rep_set_priority(self.priority)
00247         self.dbenv.rep_set_config(db.DB_REP_CONF_BULK, True)
00248 
00249 
00250     def lock(self, blocking = True):
00251         """ Acquire the lock.
00252 
00253         lock(blocking = True)
00254 
00255         'blocking': if blocking is True, then this only returns when the lock is acquired.
00256         If it is False, then it returns immediately with False if the lock is not available,
00257         or with True if it could be acquired.
00258         """
00259         # sleep a little bit to avoid same thread getting lock all the time
00260         time.sleep(random.random()*0.05)
00261         if not self.getDBReady():
00262             time.sleep(0.2)
00263             return False
00264         if self.repmgr.isMaster():
00265             # only lock if master
00266             locked = self.llock.acquire(blocking)
00267             log.msg(arc.VERBOSE, "master locking", locked)
00268             return locked
00269         else:
00270             return True
00271 
00272     def unlock(self):
00273         """ Release the lock.
00274 
00275         unlock()
00276         """
00277         log.msg(arc.VERBOSE, "unlocking", self.repmgr.isMaster())
00278         try:
00279             self.llock.release()
00280             log.msg(arc.VERBOSE, "unlocked")
00281         except:
00282             log.msg(arc.VERBOSE, "couldn't unlock")
00283             pass
00284 
00285     def getDBFlags(self):
00286             # only master can create db
00287             # DB_AUTO_COMMIT ensures that all db modifications will
00288             # automatically be enclosed in transactions
00289             return (self.repmgr.isMaster() and 
00290                     db.DB_CREATE | db.DB_AUTO_COMMIT or 
00291                     db.DB_AUTO_COMMIT)
00292 
00293 
00294     def checkingThread(self, period):
00295         time.sleep(10)
00296 
00297         while True:
00298             log.msg(arc.VERBOSE, "checkingThread slept %d s"%period)
00299             # self.site_list is set in event_callback when elected to master
00300             # and deleted if not elected in the event of election
00301             try:
00302                 if self.repmgr.isMaster():
00303                     # get list of all registered client sites
00304                     site_list = self.repmgr.getSiteList()
00305                     new_obj = {}
00306                     url = self.my_replica['url']
00307                     status = self.my_replica['status']
00308                     new_obj[('master', "%s:%s"%(url,status))] = url
00309                     # we are only interested in connected clients here
00310                     client_list = [(client['url'], client['status'])
00311                                    for id, client in site_list.items()
00312                                    if client['url'] != self.my_replica['url']
00313                                    ]
00314                     for (url, status) in client_list:
00315                         new_obj[('client', "%s:%s"%(url, status))] = url
00316                     # store the site list
00317                     while not self.lock(False):
00318                         time.sleep(0.2)
00319                     self.set(ahash_list_guid, new_obj)
00320                     self.unlock()
00321                     log.msg(arc.VERBOSE, "wrote ahash list %s"%str(new_obj))
00322                     if not self.dbenv_ready:
00323                         log.msg(arc.VERBOSE, "but dbenv wasn't ready.")
00324             except:
00325                 log.msg()
00326             time.sleep(period)
00327             
00328     def dbReady(self, db_is_ready=True):
00329         """
00330         Callback function used by repmgr to notify me that I can accept requests
00331         """
00332         self.dbenv_ready = db_is_ready
00333         
00334     def getDBReady(self):
00335         """
00336         Overload of TransDBStore.getDBReady
00337         """
00338         return self.dbenv_ready
00339 
00340 hostMap = {}
00341 
00342 NEWSITE_MESSAGE = 1
00343 REP_MESSAGE = 2
00344 HEARTBEAT_MESSAGE = 3
00345 MASTER_MESSAGE = 4
00346 ELECTION_MESSAGE = 5
00347 
00348 class ReplicationManager:
00349     """
00350     class managing replicas, elections and message handling
00351     """
00352     
00353     def __init__(self, ahash_send, dbenv, my_replica, other_replicas, dbReady):
00354         
00355         # no master is found yet
00356         self.role = db.DB_EID_INVALID
00357         self.elected = False
00358         self.stop_electing = False
00359         self.ahash_send = ahash_send
00360         self.locker = ReadWriteLock()
00361         global hostMap
00362         self.hostMap = hostMap
00363         self.locker.acquire_write()
00364         self.hostMap[my_replica['id']] = my_replica
00365         for replica in other_replicas:
00366             self.hostMap[replica['id']] = replica
00367         self.locker.release_write()
00368         self.my_replica = my_replica
00369         self.eid = my_replica['id']
00370         self.url = my_replica['url']
00371         # assume no master yet
00372         self.masterID = db.DB_EID_INVALID
00373         self.dbenv = dbenv
00374         self.dbReady = dbReady
00375         #self.dbenv.set_verbose(db.DB_VERB_REPLICATION, True)
00376         # tell dbenv to uset our event callback function
00377         # to handle various events
00378         self.dbenv.set_event_notify(self.event_callback)
00379         self.pool = None
00380         self.semapool = threading.BoundedSemaphore(128)
00381         self.election_thread = threading.Thread(target=self.electionThread, args=[])
00382         self.comm_ready = False
00383         self.heartbeat_period = 10
00384         threading.Thread(target=self.heartbeatThread, 
00385                          args=[self.heartbeat_period]).start()
00386         self.check_heartbeat = False
00387         self.master_timestamp = 0
00388         
00389     def isMaster(self):
00390         is_master = self.role == db.DB_REP_MASTER
00391         return is_master
00392 
00393     def heartbeatThread(self, period):
00394         """
00395         Thread for sending heartbeat messages
00396         Heartbeats are only sendt when master is elected and 
00397         running (i.e., when check_heartbeat is true)
00398         Only clients sends heartbeats
00399         If heartbeat is not answered, re-election will be initiated in send method
00400         """
00401         time.sleep(10)
00402         
00403         # todo: implement list of time_since_heartbeat so the master can mark clients 
00404         # as offline when low write activity
00405         while True:
00406             if self.role == db.DB_REP_CLIENT:
00407                 if self.masterID != db.DB_EID_INVALID:
00408                     # use threaded send to avoid blocking
00409                     #threading.Thread(target=self.sendHeartbeatMsg, args=[]).start()
00410                     self.pool.queueTask(self.sendHeartbeatMsg)
00411             # add some randomness to avoid bugging the master too much
00412             log.msg(arc.VERBOSE, ("heartbeat", self.masterID, self.role))
00413             time.sleep(period)
00414 
00415     def getRole(self):
00416         role = self.role
00417         return role
00418 
00419     def setRole(self, role):
00420         self.role = role
00421     
00422     def getSiteList(self):
00423         self.locker.acquire_read()
00424         site_list = copy.deepcopy(self.hostMap)
00425         self.locker.release_read()
00426         return site_list
00427     
00428     def start(self, nthreads, flags):
00429         log.msg(arc.VERBOSE, "entering start")
00430         self.pool = ThreadPool(nthreads)
00431         while not self.comm_ready:
00432             time.sleep(2)
00433         try:
00434             self.dbenv.rep_set_transport(self.eid, self.repSend)
00435             self.dbenv.rep_start(db.DB_REP_CLIENT)
00436             log.msg(arc.VERBOSE, ("rep_start called with REP_CLIENT", self.hostMap))
00437             self.startElection()
00438         except:
00439             log.msg(arc.ERROR, "Couldn't start replication framework")
00440             log.msg()
00441 
00442     def electionThread(self):
00443         role = db.DB_EID_INVALID
00444         try:
00445             log.msg(arc.VERBOSE, "entered election thread")
00446             # send a message to discover if clients are offline
00447             self.sendElectionMsg()
00448             self.locker.acquire_read()
00449             num_reps = len([id for id,rep in self.hostMap.items() if rep["status"] != "offline"])
00450             self.locker.release_read()
00451             votes = num_reps/2 + 1
00452             if votes < 2:
00453                 votes = 2
00454             log.msg(arc.VERBOSE, "%s: my role is" % self.url, role)
00455             self.dbenv.rep_elect(num_reps, votes)
00456             # wait one second for election results
00457             time.sleep(1)
00458             role = self.getRole()
00459             log.msg(arc.VERBOSE, "%s: my role is now" % self.url, role)
00460             if self.elected:
00461                 self.elected = False
00462                 self.dbenv.rep_start(db.DB_REP_MASTER)                    
00463         except:
00464             log.msg(arc.ERROR, "Couldn't run election")
00465             log.msg(arc.VERBOSE, "num_reps is %(nr)d, votes is %(v)d, hostMap is %(hm)s" % {'nr':num_reps, 'v':votes, 'hm':str(self.hostMap)})
00466             time.sleep(2)
00467         log.msg(arc.VERBOSE, self.url, "tried election with %d replicas" % num_reps)
00468             
00469     def startElection(self):
00470         log.msg(arc.VERBOSE, "entering startElection")
00471         self.stop_electing = False
00472         self.dbReady(False)
00473         try:
00474             # try to join election_thread first to make sure only one election thread is running
00475             self.election_thread.join()
00476             self.election_thread = threading.Thread(target=self.electionThread, args=[])
00477         except:
00478             pass    
00479         self.election_thread.start()
00480 
00481     def beginRole(self, role):
00482         try:
00483             # check if role has changed, to avoid too much write blocking
00484             if self.getRole() != role:
00485                 log.msg(arc.INFO, "new role")
00486                 self.setRole(role)
00487             self.dbenv.rep_start(role==db.DB_REP_MASTER and 
00488                                  db.DB_REP_MASTER or db.DB_REP_CLIENT)
00489         except:
00490             log.msg(arc.ERROR, "Couldn't begin role")
00491             log.msg()
00492         return
00493 
00494     def send(self, env, control, record, lsn, eid, flags, msgID):
00495         """
00496         callback function for dbenv transport
00497         If no reply within 10 seconds, return 1
00498         """
00499         # wrap control, record, lsn, eid, flags and sender into dict
00500         # note: could be inefficient to send sender info for every message
00501         # if bandwidth and latency is low
00502         log.msg(arc.VERBOSE, "entering send")
00503         sender = self.my_replica
00504         msg = {'control':control,
00505                'record':record,
00506                'lsn':lsn,
00507                'eid':eid,
00508                'msgID':msgID,
00509                'sender':sender}
00510         
00511         
00512         retval = 1
00513         if msgID == NEWSITE_MESSAGE:
00514             eids = [control['id']]
00515             msg['control'] = None
00516         elif msgID == HEARTBEAT_MESSAGE:
00517             eids = [self.masterID]
00518         elif eid == db.DB_EID_BROADCAST:
00519             # send to all
00520             eids = [id for id,rep in self.hostMap.items()]    
00521         else:
00522             eids = [eid]
00523             if not self.hostMap.has_key(eid):
00524                 return retval
00525         for id in eids:
00526             if id == self.eid:
00527                 continue
00528             try:
00529                 msg['eid'] = id
00530                 resp = ["waiting"]
00531                 url = self.hostMap[id]['url']
00532                 time_since_send = time.time()
00533                 resp[0] = self.ahash_send(url, msg)
00534                 if str(resp[0]) == "processed":
00535                     # if at least one msg is sent, we're happy
00536                     retval = 0
00537                     # received message so sender cannot be offline
00538                     if self.hostMap[id]['status'] == 'offline':
00539                         self.locker.acquire_write()
00540                         self.hostMap[id]['status'] = 'online'
00541                         self.locker.release_write()
00542             except:
00543                 # assume url is disconnected
00544                 log.msg(arc.WARNING, "failed to send to", id, "of", str(eids))
00545                 self.locker.acquire_write()
00546                 
00547                 if id == self.masterID:
00548                     # timeout if I've heard nothing from the master
00549                     # or master doesn't reply
00550                     timeout = time.time() - self.master_timestamp > self.heartbeat_period*2\
00551                               or time.time()-time_since_send < 55
00552                     if timeout:
00553                         log.msg(arc.INFO, "Master is offline, starting re-election")
00554                         # in case more threads misses the master
00555                         if self.masterID != db.DB_EID_INVALID:
00556                             self.locker.release_write()
00557                             self.beginRole(db.DB_EID_INVALID)
00558                             self.masterID = db.DB_EID_INVALID
00559                             self.locker.acquire_write()
00560                             self.startElection()
00561                         # only set master to offline if it has really timed out
00562                         self.hostMap[id]['status'] = "offline"
00563                         if msgID == NEWSITE_MESSAGE:
00564                             record['status'] = "offline"
00565                 # cannot have less than 2 online sites
00566                 elif len([id2 for id2,rep in self.hostMap.items() if rep['status']=="online"]) > 2:
00567                     self.hostMap[id]['status'] = "offline"
00568                     if msgID == NEWSITE_MESSAGE:
00569                         record['status'] = "offline"
00570                 self.locker.release_write()
00571         return retval
00572     
00573     def repSend(self, env, control, record, lsn, eid, flags):
00574         """
00575         callback function for dbenv transport
00576         """
00577         log.msg(arc.VERBOSE, "entering repSend")
00578         if flags & db.DB_REP_PERMANENT and lsn != None:
00579             self.semapool.acquire()
00580             res = self.send(env, control, record, lsn, eid, flags, REP_MESSAGE)
00581             self.semapool.release()
00582             return res
00583         else:
00584             #threading.Thread(target=self.send, args=[env, control, record, lsn, eid, flags, REP_MESSAGE]).start()
00585             self.pool.queueTask(self.send, args=[env, control, record, lsn, eid, flags, REP_MESSAGE])
00586             return 0
00587 
00588     def sendNewSiteMsg(self, new_replica):
00589         """
00590         if new site is discovered sendNewSiteMsg will send 
00591         the hostMap to the new site
00592         """
00593         log.msg(arc.VERBOSE, "entering sendNewSiteMsg")
00594         self.locker.acquire_read()
00595         site_list = copy.deepcopy(self.hostMap)
00596         self.locker.release_read()
00597         ret = 1
00598         while ret:
00599             self.semapool.acquire()
00600             ret = self.send(None, new_replica, site_list, None, None, None, NEWSITE_MESSAGE)
00601             self.semapool.release()
00602             if new_replica['status'] == 'offline':
00603                 break
00604             time.sleep(30)
00605         return ret
00606     
00607     def sendHeartbeatMsg(self):
00608         """
00609         if new site is discovered sendHeartbeatMsg will send 
00610         the hostMap to the new site
00611         """
00612         log.msg(arc.VERBOSE, "entering sendHeartbeatMsg")
00613         self.semapool.acquire()
00614         ret = self.send(None, None, None, None, None, None, HEARTBEAT_MESSAGE)
00615         self.semapool.release()
00616         return ret
00617 
00618     def sendElectionMsg(self, eid=db.DB_EID_BROADCAST):
00619         """
00620         If elected, broadcast master id to all clients
00621         """
00622         log.msg(arc.VERBOSE, "entering sendNewMasterMsg")
00623         self.semapool.acquire()
00624         ret = self.send(None, None, None, None, eid, None, ELECTION_MESSAGE)
00625         self.semapool.release()
00626         return ret
00627     
00628     def sendNewMasterMsg(self, eid=db.DB_EID_BROADCAST):
00629         """
00630         If elected, broadcast master id to all clients
00631         """
00632         log.msg(arc.VERBOSE, "entering sendNewMasterMsg")
00633         self.semapool.acquire()
00634         ret = self.send(None, None, None, None, eid, None, MASTER_MESSAGE)
00635         self.semapool.release()
00636         return ret
00637     
00638     def processMessage(self, control, record, eid, retlsn, sender, msgID):
00639         """
00640         Function to process incoming messages, forwarding 
00641         them to self.dbenv.rep_process_message()
00642         """
00643         log.msg(arc.VERBOSE, "entering processMessage from ", sender)
00644 
00645         self.locker.acquire_read()
00646         urls = [rep['url'] for id,rep in self.hostMap.items() if rep['status']=='online']
00647         self.locker.release_read()
00648         if sender['url'] == self.url:
00649             log.msg(arc.ERROR, "received message from myself!")
00650             return "failed" 
00651         if not sender['url'] in urls:
00652             log.msg(arc.VERBOSE, "received from new sender or sender back online")
00653             really_new = False
00654             try:
00655                 # check if we know this one
00656                 newid = [id for id,rep in self.hostMap.items() if rep['url']==sender['url']][0]
00657             except:
00658                 # nope, never heard about it
00659                 newid = len(self.hostMap)+1
00660                 really_new = True
00661             sender['id'] = newid
00662             self.locker.acquire_write()
00663             self.hostMap[newid] = sender
00664             self.locker.release_write()
00665             # return hostMap to sender
00666             if really_new:
00667                 # use threaded send to avoid blocking
00668                 #threading.Thread(target=self.sendNewSiteMsg, args=[sender]).start()
00669                 self.pool.queueTask(self.sendNewSiteMsg, args=sender)
00670         if msgID == MASTER_MESSAGE:
00671             # sender is master, find local id for sender and set as masterID
00672             log.msg(arc.VERBOSE, "received master id")
00673             self.masterID = [id for id,rep in self.hostMap.items() if rep['url']==sender['url']][0]
00674             self.master_timestamp = time.time()
00675             return "processed"
00676         if msgID == HEARTBEAT_MESSAGE:
00677             log.msg(arc.VERBOSE, "received HEARTBEAT_MESSAGE")
00678             return "processed"
00679         if msgID == ELECTION_MESSAGE:
00680             log.msg(arc.VERBOSE, "received ELECTION_MESSAGE")
00681             return "processed"
00682         if msgID == NEWSITE_MESSAGE:
00683             # if unknown changes in record, update hostMap
00684             log.msg(arc.VERBOSE, "received NEWSITE_MESSAGE")
00685             for replica in record.values():
00686                 if  not replica['url'] in urls:
00687                     really_new = False
00688                     try:
00689                         # check if we know this one
00690                         newid = [id for id,rep in self.hostMap.items() if rep['url']==replica['url']][0]
00691                     except:
00692                         # nope, never heard about it
00693                         newid = len(self.hostMap)+1
00694                         really_new = True
00695                     self.locker.acquire_write()
00696                     # really new sender, appending to host map
00697                     replica['id'] = newid
00698                     self.hostMap[newid] = replica
00699                     self.locker.release_write()
00700                     # say hello to my new friend
00701                     if really_new:
00702                         # use threaded send to avoid blocking
00703                         #threading.Thread(target=self.sendNewSiteMsg, args=[replica]).start()
00704                         self.pool.queueTask(self.sendNewSiteMsg, args=replica)
00705             return "processed"
00706         try:
00707             eid = [id for id,rep in self.hostMap.items() if rep['url']==sender['url']][0]
00708         except:
00709             return "notfound"
00710         if eid == self.masterID:
00711             self.master_timestamp = time.time()
00712         try:
00713             log.msg(arc.VERBOSE, "processing message from %d"%eid)
00714             res, retlsn = self.dbenv.rep_process_message(control, record, eid)
00715         except db.DBNotFoundError:
00716             log.msg(arc.ERROR, "Got dbnotfound")
00717             log.msg(arc.ERROR, (control, record, eid, retlsn, sender, msgID))
00718             #log.msg()
00719             return "failed"
00720         except:
00721             log.msg(arc.ERROR, "couldn't process message")
00722             log.msg(arc.ERROR, (control, record, eid, retlsn, sender, msgID))
00723             log.msg()
00724             return "failed"
00725         
00726         if res == db.DB_REP_NEWSITE:
00727             log.msg(arc.VERBOSE, "received DB_REP_NEWSITE from %s"%str(sender))
00728             if self.isMaster():
00729                 # use threaded send to avoid blocking
00730                 #threading.Thread(target=self.sendNewMasterMsg, args=[eid]).start()
00731                 self.pool.queueTask(self.sendNewMasterMsg, args=eid)
00732         elif res == db.DB_REP_HOLDELECTION:
00733             log.msg(arc.VERBOSE, "received DB_REP_HOLDELECTION")
00734             self.beginRole(db.DB_EID_INVALID)
00735             self.masterID = db.DB_EID_INVALID
00736             self.startElection()
00737         elif res == db.DB_REP_ISPERM:
00738             log.msg(arc.VERBOSE, "REP_ISPERM returned for LSN %s"%str(retlsn))
00739             self.dbReady(True)
00740             self.setRole(db.DB_REP_CLIENT)
00741         elif res == db.DB_REP_NOTPERM:
00742             log.msg(arc.VERBOSE, "REP_NOTPERM returned for LSN %s"%str(retlsn))
00743         elif res == db.DB_REP_DUPMASTER:
00744             log.msg(arc.VERBOSE, "REP_DUPMASTER received, starting new election")
00745             # yield to revolution, switch to client
00746             self.beginRole(db.DB_EID_INVALID)
00747             self.masterID = db.DB_EID_INVALID
00748             self.startElection()
00749         elif res == db.DB_REP_IGNORE:
00750             log.msg(arc.VERBOSE, "REP_IGNORE received")
00751             log.msg(arc.VERBOSE, (control, record, eid, retlsn, sender, msgID))
00752         elif res == db.DB_REP_JOIN_FAILURE:
00753             log.msg(arc.ERROR, "JOIN_FAILURE received")
00754         else:
00755             log.msg(arc.VERBOSE, "unknown return code %s"%str(res))        
00756 
00757         return "processed"
00758 
00759     def event_callback(self, dbenv, which, info):
00760         """
00761         Callback function used to determine whether the local environment is a 
00762         replica or a master. This is called by the replication framework
00763         when the local replication environment changes state
00764         app = dbenv.get_private()
00765         """
00766 
00767         log.msg("entering event_callback")
00768         info = None
00769         try:
00770             if which == db.DB_EVENT_REP_MASTER:
00771                 log.msg(arc.VERBOSE, "I am now a master")
00772                 log.msg(arc.VERBOSE, "received DB_EVENT_REP_MASTER")
00773                 self.setRole(db.DB_REP_MASTER)
00774                 self.dbReady(True)
00775                 # use threaded send to avoid blocking
00776                 #threading.Thread(target=self.sendNewMasterMsg, args=[]).start()
00777                 self.pool.queueTask(self.sendNewMasterMsg)
00778             elif which == db.DB_EVENT_REP_CLIENT:
00779                 log.msg(arc.VERBOSE, "I am now a client")
00780                 self.setRole(db.DB_REP_CLIENT)
00781                 self.dbReady(True)
00782             elif which == db.DB_EVENT_REP_STARTUPDONE:
00783                 log.msg(arc.VERBOSE, ("Replication startup done",which,info))
00784             elif which == db.DB_EVENT_REP_PERM_FAILED:
00785                 log.msg(arc.VERBOSE, "Getting permission failed")
00786             elif which == db.DB_EVENT_WRITE_FAILED:
00787                 log.msg(arc.VERBOSE, "Write failed")
00788             elif which == db.DB_EVENT_REP_NEWMASTER:
00789                 log.msg(arc.VERBOSE, "New master elected")
00790                 # give master 5 seconds to celebrate victory
00791                 time.sleep(5)
00792                 self.check_heartbeat = True
00793             elif which == db.DB_EVENT_REP_ELECTED:
00794                 log.msg(arc.VERBOSE, "I won the election: I am the MASTER")
00795                 self.elected = True
00796                 # use threaded send to avoid blocking
00797                 #threading.Thread(target=self.sendNewMasterMsg, args=[]).start()
00798                 self.pool.queueTask(self.sendNewMasterMsg)
00799             elif which == db.DB_EVENT_PANIC:
00800                 log.msg(arc.ERROR, "Oops! Internal DB panic!")
00801                 raise db.DBRunRecoveryError, "Please run recovery."
00802         except db.DBRunRecoveryError:
00803             sys.exit(1)      
00804         except:
00805             log.msg()