Back to index

nordugrid-arc-nox  1.1.0~rc6
librarian.py
Go to the documentation of this file.
00001 import urlparse
00002 import httplib
00003 import arc
00004 import random
00005 import threading
00006 import time
00007 
00008 from arcom import get_child_nodes, get_child_values_by_name
00009 from arcom.security import parse_ssl_config
00010 from arcom.service import librarian_uri, ahash_servicetype, true, false, parse_node, create_response, node_to_data, librarian_servicetype
00011 
00012 from arcom.xmltree import XMLTree
00013 from storage.client import AHashClient, ISISClient
00014 from storage.common import global_root_guid, sestore_guid, ahash_list_guid, parse_metadata, create_metadata, serialize_ids
00015 from storage.common import OFFLINE, DELETED
00016 import traceback
00017 import copy
00018 
00019 from arcom.logger import Logger
00020 log = Logger(arc.Logger(arc.Logger_getRootLogger(), 'Storage.Librarian'))
00021 
00022 class Librarian:
00023     
00024     def __init__(self, cfg, ssl_config, service_state):
00025         self.service_state = service_state
00026         self.ssl_config = ssl_config
00027         try:
00028             self.period = float(str(cfg.Get('CheckPeriod')))
00029             self.hbtimeout = float(str(cfg.Get('HeartbeatTimeout')))
00030         except:
00031             log.msg(arc.ERROR, 'Cannot find CheckPeriod, HeartbeatTimeout in the config')
00032             raise Exception, 'Librarian cannot run without CheckPeriod and HeartbeatTimeout'
00033         log.msg(arc.INFO,'Getting AHash URL from the config')
00034         ahash_urls =  get_child_values_by_name(cfg, 'AHashURL')
00035         if ahash_urls:
00036             log.msg(arc.INFO,'Got AHash URLs:', ahash_urls)
00037             log.msg(arc.VERBOSE, 'AHash URL found in the configuration.')
00038             self.master_ahash = arc.URL(ahash_urls[0])
00039             self.ahash_reader = AHashClient(ahash_urls, ssl_config = self.ssl_config)
00040             self.ahash_writer = AHashClient(ahash_urls, ssl_config = self.ssl_config)
00041             self.thread_is_running = True
00042             log.msg(arc.INFO,'Starting checking thread')
00043             threading.Thread(target = self.checkingThread).start()
00044             log.msg(arc.INFO,'Setting running state to True')
00045             self.service_state.running = True
00046         else:
00047             log.msg(arc.INFO,'No AHash from the config')
00048             isis_urls =  get_child_values_by_name(cfg, 'ISISURL')
00049             if not isis_urls:
00050                    log.msg(arc.ERROR, "AHash URL and ISIS URL not found in the configuration.")
00051                    raise Exception, 'Librarian cannot run with no A-Hash'
00052             log.msg(arc.INFO,'Got ISIS URL, starting initThread')
00053             self.thread_is_running = True
00054             threading.Thread(target = self.initThread, args = [isis_urls]).start()
00055         
00056     def initThread(self, isis_urls):
00057         found_ahash = False
00058         while not found_ahash:
00059             try:
00060                 time.sleep(3)
00061                 log.msg(arc.INFO,'Trying to get A-Hash from ISISes')
00062                 for isis_url in isis_urls:
00063                     if not self.thread_is_running:
00064                         return
00065                     log.msg(arc.INFO,'Trying to get A-Hash from', isis_url)
00066                     isis = ISISClient(isis_url, ssl_config = self.ssl_config)
00067                     try:
00068                         ahash_urls = isis.getServiceURLs(ahash_servicetype)
00069                         log.msg(arc.INFO,'Got A-Hash from ISIS:', ahash_urls)
00070                         if ahash_urls:
00071                             self.master_ahash = arc.URL(ahash_urls[0])
00072                             self.ahash_reader = AHashClient(ahash_urls, ssl_config = self.ssl_config)
00073                             self.ahash_writer = AHashClient(ahash_urls, ssl_config = self.ssl_config)
00074                             found_ahash = True
00075                     except:
00076                         log.msg()
00077             except Exception, e:
00078                 log.msg(arc.WARNING, 'Error in initThread: %s' % e)
00079         log.msg(arc.INFO,'initThread finishes, starting checkingThread')
00080         threading.Thread(target = self.checkingThread).start()
00081         self.service_state.running = True
00082 
00083     def _update_ahash_urls(self):
00084         try:
00085             ahash_list = self.ahash_reader.get(ahash_list_guid)[ahash_list_guid]
00086             if ahash_list:
00087                 master = [url for type,url in ahash_list.items() 
00088                           if 'master' in type and type[1].endswith('online')]
00089                 clients = [url for type,url in ahash_list.items() 
00090                            if 'client' in type and type[1].endswith('online')]
00091                 if master:
00092                     self.master_ahash = arc.URL(master[0])
00093                     self.ahash_writer.urls = [arc.URL(url) for url in master]
00094                     self.ahash_writer.reset()
00095                 if clients:
00096                     self.ahash_reader.urls = [arc.URL(url) for url in clients]
00097                     self.ahash_reader.reset()
00098         except:
00099             pass
00100         
00101     def ahash_get_tree(self, IDs, neededMetadata = []):
00102         """
00103         Wrapper for ahash.get_tree updating ahashes and calling ahash.get_tree
00104         """
00105         try:
00106             ret = self.ahash_reader.get_tree(IDs, neededMetadata)
00107             return ret
00108         except:
00109             # if get_tree fails, there are no clients available, so there is
00110             # nothing to do here
00111             #log.msg()
00112             raise
00113 
00114     def ahash_get(self, IDs, neededMetadata = []):
00115         """
00116         Wrapper for ahash.get updating ahashes and calling ahash.get
00117         """
00118         try:
00119             ret = self.ahash_reader.get(IDs, neededMetadata)
00120             return ret
00121         except:
00122             # if get fails, there are no clients available, so there is
00123             # nothing to do here
00124             #log.msg()
00125             raise
00126 
00127     def ahash_change(self, changes):
00128         """
00129         Wrapper for ahash.change replacing ahash.urls with master ahash, calling
00130         ahash.change, putting back ahash.urls
00131         ahash.change can only call master ahash
00132         """
00133         ret = None
00134         try:
00135             ret = self.ahash_writer.change(changes)
00136             return ret
00137         except:
00138             # put back all known ahashes so we don't only ask the master
00139             # for an update
00140             ahash_reader_urls = self.ahash_reader.urls
00141             ahash_writer_urls = self.ahash_writer.urls
00142             # retry, updating ahash urls in case master is outdated
00143             self._update_ahash_urls()
00144             try:
00145                 ret = self.ahash_writer.change(changes)
00146                 return ret
00147             except:
00148                 # make sure ahash.urls are restored even is change failed
00149                 self.ahash_reader.urls = ahash_reader_urls
00150                 self.ahash_reader.reset()
00151                 self.ahash_writer.urls = ahash_writer_urls
00152                 self.ahash_writer.reset()
00153                 raise
00154     
00155     def checkingThread(self):
00156         time.sleep(10)
00157         while self.thread_is_running:
00158             try:
00159                 SEs = self.ahash_get([sestore_guid])[sestore_guid]
00160                 #print 'registered shepherds:', SEs
00161                 now = time.time()
00162                 late_SEs = [serviceID for (serviceID, property), nextHeartbeat in SEs.items() if property == 'nextHeartbeat' and float(nextHeartbeat) < now and nextHeartbeat != '-1']
00163                 #print 'late shepherds (it is %s now)' % now, late_SEs
00164                 if late_SEs:
00165                     serviceGUIDs = dict([(serviceGUID, serviceID) for (serviceID, property), serviceGUID in SEs.items() if property == 'serviceGUID' and serviceID in late_SEs])
00166                     #print 'late shepherds serviceGUIDs', serviceGUIDs
00167                     filelists = self.ahash_get(serviceGUIDs.keys())
00168                     changes = []
00169                     for serviceGUID, serviceID in serviceGUIDs.items():
00170                         filelist = filelists[serviceGUID]
00171                         #print 'filelist of late shepherd', serviceID, filelist
00172                         changes.extend([(GUID, serviceID, referenceID, OFFLINE)
00173                             for (_, referenceID), GUID in filelist.items()])
00174                     change_response = self._change_states(changes)
00175                     for _, serviceID in serviceGUIDs.items():
00176                         self._set_next_heartbeat(serviceID, -1)
00177                 time.sleep(self.period)
00178             except Exception, e:
00179                 log.msg(arc.ERROR, "Error in Librarian's checking thread: %s" % e)
00180                 #log.msg()
00181             # update list of ahashes
00182             self._update_ahash_urls()
00183             time.sleep(self.period)
00184 
00185     def _change_states(self, changes):
00186         # we got a list of (GUID, serviceID, referenceID, state) - where GUID is of the file,
00187         #   serviceID is of the shepherd, referenceID is of the replica within the shepherd, and state is of this replica
00188         # we need to put the serviceID, referenceID pair into one string (a location)
00189         with_locations = [(GUID, serialize_ids([serviceID, referenceID]), state)
00190             for GUID, serviceID, referenceID, state in changes]
00191         # we will ask the librarian for each file to modify the state of this location of this file (or add this location if it was not already there)
00192         # if state == DELETED this location will be removed
00193         # but only if the file itself exists
00194         change_request = dict([
00195             (location, (GUID, (state != DELETED) and 'set' or 'unset', 'locations', location, state,
00196                 {'only if file exists' : ('is', 'entry', 'type', 'file')}))
00197                     for GUID, location, state in with_locations
00198         ])
00199         #print '_change_states request', change_request
00200         change_response = self.ahash_change(change_request)
00201         #print '_change_states response', change_response
00202         return change_response
00203         
00204     def _set_next_heartbeat(self, serviceID, next_heartbeat):
00205         ahash_request = {'report' : (sestore_guid, 'set', serviceID, 'nextHeartbeat', next_heartbeat, {})}
00206         #print '_set_next_heartbeat request', ahash_request
00207         ahash_response = self.ahash_change(ahash_request)
00208         #print '_set_next_heartbeat response', ahash_response
00209         if ahash_response['report'][0] != 'set':
00210             log.msg(arc.VERBOSE, 'ERROR setting next heartbeat time!')
00211     
00212     def report(self, serviceID, filelist):
00213         try:
00214             # we got the ID of the shepherd service, and a filelist which contains (GUID, referenceID, state) tuples
00215             # here we get the list of registered services from the A-Hash (stored with the GUID 'sestore_guid')
00216             ses = self.ahash_get([sestore_guid])[sestore_guid]
00217             # we get the GUID of the shepherd
00218             serviceGUID = ses.get((serviceID,'serviceGUID'), None)
00219             # or this shepherd was not registered yet
00220             if not serviceGUID:
00221                 ## print 'report se is not registered yet', serviceID
00222                 # let's create a new GUID
00223                 serviceGUID = arc.UUID()
00224                 # let's add the new shepherd-GUID to the list of shepherd-GUIDs but only if someone else not done that just now
00225                 ahash_request = {'report' : (sestore_guid, 'set', serviceID, 'serviceGUID', serviceGUID, {'onlyif' : ('unset', serviceID, 'serviceGUID', '')})}
00226                 ## print 'report ahash_request', ahash_request
00227                 ahash_response = self.ahash_change(ahash_request)
00228                 ## print 'report ahash_response', ahash_response
00229                 success, unmetConditionID = ahash_response['report']
00230                 # if there was already a GUID for this service (which should have been created after we check it first but before we tried to create a new)
00231                 if unmetConditionID:
00232                     # let's try to get the GUID of the shepherd once again
00233                     ses = self.ahash_get([sestore_guid])[sestore_guid]
00234                     serviceGUID = ses.get((serviceID, 'serviceGUID'))
00235             # if the next heartbeat time of this shepherd is -1 or nonexistent, we will ask it to send all the file states, not just the changed
00236             please_send_all = int(ses.get((serviceID, 'nextHeartbeat'), -1)) == -1
00237             # calculate the next heartbeat time
00238             next_heartbeat = str(int(time.time() + self.hbtimeout))
00239             # register the next heartbeat time for this shepherd
00240             self._set_next_heartbeat(serviceID, next_heartbeat)
00241             # change the states of replicas in the metadata of the files to the just now reported values
00242             self._change_states([(GUID, serviceID, referenceID, state) for GUID, referenceID, state in filelist])
00243             # get the metadata of the shepherd
00244             se = self.ahash_get([serviceGUID])[serviceGUID]
00245             ## print 'report se before:', se
00246             # we want to know which files this shepherd stores, that's why we collect the GUIDs and referenceIDs of all the replicas the shepherd reports
00247             # we store this information in the A-Hash by the GUID of this shepherd (serviceGUID)
00248             # so this request asks the A-Hash to store the referenceID and GUID of this file in the section called 'file' in the A-Hash entry of the Shepherd
00249             # but if the shepherd reports that a replica is DELETED then it will be removed from this list (unset)
00250             change_request = dict([(referenceID, (serviceGUID, (state == DELETED) and 'unset' or 'set', 'file', referenceID, GUID, {}))
00251                 for GUID, referenceID, state in filelist])
00252             ## print 'report change_request:', change_request
00253             change_response = self.ahash_change(change_request)
00254             # TODO: check the response and do something if something is wrong
00255             ## print 'report change_response:', change_response
00256             ## se = self.ahash_get([serviceGUID])[serviceGUID]
00257             ## print 'report se after:', se
00258             # if we want the shepherd to send the state of all the files, not just the changed one, we return -1 as the next heartbeat's time
00259         except:
00260             log.msg(arc.ERROR, 'Error processing report message')
00261             please_send_all = True
00262         if please_send_all:
00263             return -1
00264         else:
00265             return int(self.hbtimeout)
00266 
00267     def new(self, requests):
00268         response = {}
00269         for rID, metadata in requests.items():
00270             #print 'Processing new request:', metadata
00271             try:
00272                 type = metadata[('entry','type')]
00273                 del metadata[('entry', 'type')]
00274             except:
00275                 type = None
00276             if type is None:
00277                 success = 'failed: no type given'
00278             else:
00279                 try:
00280                     GUID = metadata[('entry','GUID')]
00281                 except:
00282                     GUID = arc.UUID()
00283                     metadata[('entry', 'GUID')] = GUID
00284               
00285                 check = self.ahash_change(
00286                       {'new': (GUID, 'set', 'entry', 'type', type, {'0' : ('unset','entry','type','')})}
00287                     )
00288                 
00289                 status, failedCondition = check['new']
00290                 if status == 'set':
00291                     success = 'success'
00292                     changeID = 0
00293                     changes = {}
00294                     for ((section, property), value) in metadata.items():
00295                         changes[changeID] = (GUID, 'set', section, property, value, {})
00296                         changeID += 1
00297                     resp = self.ahash_change(changes)
00298                     for r in resp.keys():
00299                         if resp[r][0] != 'set':
00300                             success += ' (failed: %s - %s)' % (resp[r][0] + str(changes[r]))
00301                 elif failedCondition == '0':
00302                     success = 'failed: entry exists'
00303                 else:
00304                     success = 'failed: ' + status
00305             response[rID] = (GUID, success)
00306         return response
00307 
00308     def get(self, requests, neededMetadata = []):
00309         return self.ahash_get_tree(requests, neededMetadata)
00310 
00311     def _parse_LN(self, LN):
00312         try:
00313             splitted = LN.split('/')
00314         except:
00315             raise Exception, 'Invalid Logical Name: `%s`' % LN
00316         guid = splitted[0]
00317         path = splitted[1:]
00318         return guid, path
00319 
00320     def _traverse(self, guid, metadata, path, traversed, GUIDs):
00321             try:
00322                 while path:
00323                     if path[0] in ['', '.']:
00324                         child_guid = guid
00325                         child_metadata = metadata
00326                     else:
00327                         child_guid = metadata[('entries',path[0])]
00328                         child_metadata = self.ahash_get([child_guid])[child_guid]
00329                     traversed.append(path.pop(0))
00330                     GUIDs.append(child_guid)
00331                     guid = child_guid
00332                     metadata = child_metadata
00333                 return metadata
00334             except KeyError:
00335                 return metadata
00336             except Exception, e:
00337                 log.msg(arc.ERROR, 'Error traversing: %s' % e)
00338                 #log.msg()
00339                 return {}
00340 
00341 
00342     def traverseLN(self, requests):
00343         response = {}
00344         for rID, LN in requests.items():
00345             guid0, path0 = self._parse_LN(LN)
00346             if not guid0:
00347                 guid = global_root_guid
00348             else:
00349                 guid = guid0
00350             traversed = [guid0]
00351             GUIDs = [guid]
00352             path = copy.deepcopy(path0)
00353             try:
00354                 metadata0 = self.ahash_get([guid])[guid]
00355             except:
00356                 # TODO: figure out this part
00357                 raise
00358                 metadata0 = {}
00359             if not metadata0.has_key(('entry','type')):
00360                 response[rID] = ([], False, '', guid0, None, '/'.join(path))
00361             else:
00362                 try:
00363                     metadata = self._traverse(guid, metadata0, path, traversed, GUIDs)
00364                     traversedList = [(traversed[i], GUIDs[i]) for i in range(len(traversed))]
00365                     wasComplete = len(path) == 0
00366                     traversedLN = guid0 + '/' + '/'.join(traversed[1:])
00367                     GUID = GUIDs[-1]
00368                     restLN = '/'.join(path)
00369                     response[rID] = (traversedList, wasComplete, traversedLN, GUID, metadata, restLN)
00370                 except Exception, e:
00371                     log.msg(arc.ERROR, "Error in traverseLN method: %s" % s)
00372                     #log.msg()
00373                     response[rID] = ([], False, '', guid0, None, '/'.join(path))
00374             #print '?\n? traversedList, wasComplete, traversedLN, GUID, metadata, restLN\n? ', response
00375         return response
00376 
00377     def modifyMetadata(self, requests):
00378         changes = {}
00379         for changeID, (GUID, changeType, section, property, value) in requests.items():
00380             if changeType in ['set', 'unset']:
00381                 conditions = {}
00382             elif changeType == 'add':
00383                 changeType = 'set'
00384                 conditions = {'0': ('unset', section, property, '')}
00385             elif changeType.startswith('setifvalue='):
00386                 ifvalue = changeType[11:]
00387                 changeType = 'set'
00388                 conditions = {'0': ('is', section, property, ifvalue)}
00389             else:
00390                 continue
00391             changes[changeID] = (GUID, changeType, section, property, value, conditions)
00392         ahash_response = self.ahash_change(changes)
00393         response = {}
00394         for changeID, (success, conditionID) in ahash_response.items():
00395             if success in ['set', 'unset']:
00396                 response[changeID] = success
00397             elif conditionID == '0':
00398                 response[changeID] = 'condition failed'
00399             else:
00400                 response[changeID] = 'failed: ' + success
00401         return response
00402 
00403     def remove(self, requests):
00404         ahash_request = dict([(requestID, (GUID, 'delete', '', '', '', {}))
00405             for requestID, GUID in requests.items()])
00406         ahash_response = self.ahash_change(ahash_request)
00407         response = {}
00408         for requestID, (success, _) in ahash_response.items():
00409             if success == 'deleted':
00410                 response[requestID] = 'removed'
00411             else:
00412                 response[requestID] = 'failed: ' + success
00413         return response
00414     
00415 from arcom.service import Service
00416     
00417 class LibrarianService(Service):
00418     """ LibrarianService class implementing the XML interface of the storage Librarian service. """
00419 
00420     def __init__(self, cfg):
00421         """ Constructor of the LibrarianService
00422 
00423         LibrarianService(cfg)
00424 
00425         'cfg' is an XMLNode which containes the config of this service.
00426         """
00427         self.service_name = 'Librarian'
00428         # init logging
00429         # names of provided methods
00430         request_names = ['new','get','traverseLN', 'modifyMetadata', 'remove', 'report']
00431         # call the Service's constructor
00432         Service.__init__(self, [{'request_names' : request_names, 'namespace_prefix': 'lbr', 'namespace_uri': librarian_uri}], cfg, start_service = False)
00433         # this causes trouble on shutdown (the Librarian class would have a reference to the LibrarianService class, so the destructor would not be called)
00434         #ssl_config['get_trusted_dns_method'] = self._get_trusted_dns
00435         self.librarian = Librarian(cfg, self.ssl_config, self.state)
00436     
00437     def __del__(self):
00438         try:
00439             self.librarian.thread_is_running = False
00440         except:
00441             pass
00442         Service.__del__(self)
00443     
00444     def new(self, inpayload):
00445         requests0 = parse_node(inpayload.Child().Child(),
00446             ['requestID', 'metadataList'], single = True, string = False)
00447         requests = dict([(str(requestID), parse_metadata(metadataList)) 
00448             for requestID, metadataList in requests0.items()])
00449         resp = self.librarian.new(requests)
00450         return create_response('lbr:new',
00451             ['lbr:requestID', 'lbr:GUID', 'lbr:success'], resp, self._new_soap_payload())
00452 
00453     def get(self, inpayload):
00454         requests = [str(node.Get('GUID')) for node in get_child_nodes(inpayload.Child().Get('getRequestList'))]
00455         neededMetadata = [
00456             node_to_data(node, ['section', 'property'], single = True)
00457                 for node in get_child_nodes(inpayload.Child().Get('neededMetadataList'))
00458         ]
00459         tree = self.librarian.get(requests, neededMetadata)
00460         out = arc.PayloadSOAP(self._new_soap_payload())
00461         response_node = out.NewChild('lbr:getResponse')
00462         tree.add_to_node(response_node)
00463         return out
00464 
00465     def traverseLN(self, inpayload):
00466         # if inpayload.auth:
00467         #     print 'Librarian auth "traverseLN": ', inpayload.auth
00468         requests = parse_node(inpayload.Child().Child(), ['requestID', 'LN'], single = True)
00469         response = self.librarian.traverseLN(requests)
00470         for rID, (traversedList, wasComplete, traversedLN, GUID, metadata, restLN) in response.items():
00471             traversedListTree = [
00472                 ('lbr:traversedListElement', [
00473                     ('lbr:LNPart', LNpart),
00474                     ('lbr:GUID', partGUID)
00475                 ]) for (LNpart, partGUID) in traversedList
00476             ]
00477             metadataTree = create_metadata(metadata, 'lbr')
00478             response[rID] = (traversedListTree, wasComplete and true or false,
00479                 traversedLN, GUID, metadataTree, restLN)
00480         return create_response('lbr:traverseLN',
00481             ['lbr:requestID', 'lbr:traversedList', 'lbr:wasComplete',
00482                 'lbr:traversedLN', 'lbr:GUID', 'lbr:metadataList', 'lbr:restLN'], response, self._new_soap_payload())
00483 
00484     def modifyMetadata(self, inpayload):
00485         requests = parse_node(inpayload.Child().Child(), ['lbr:changeID',
00486             'lbr:GUID', 'lbr:changeType', 'lbr:section', 'lbr:property', 'lbr:value'])
00487         response = self.librarian.modifyMetadata(requests)
00488         return create_response('lbr:modifyMetadata', ['lbr:changeID', 'lbr:success'],
00489             response, self._new_soap_payload(), single = True)
00490 
00491     def remove(self, inpayload):
00492         requests = parse_node(inpayload.Child().Child(), ['lbr:requestID', 'lbr:GUID'], single = True)
00493         response = self.librarian.remove(requests)
00494         return create_response('lbr:remove', ['lbr:requestID', 'lbr:success'],
00495             response, self._new_soap_payload(), single = True)
00496     
00497     def report(self, inpayload):
00498         request_node = inpayload.Child()
00499         serviceID = str(request_node.Get('serviceID'))
00500         filelist_node = request_node.Get('filelist')
00501         file_nodes = get_child_nodes(filelist_node)
00502         filelist = [(str(node.Get('GUID')), str(node.Get('referenceID')), str(node.Get('state'))) for node in file_nodes]
00503         nextReportTime = self.librarian.report(serviceID, filelist)
00504         out = self._new_soap_payload()
00505         response_node = out.NewChild('lbr:registerResponse')
00506         response_node.NewChild('lbr:nextReportTime').Set(str(nextReportTime))
00507         return out
00508 
00509     def RegistrationCollector(self, doc):
00510         regentry = arc.XMLNode('<RegEntry />')
00511         regentry.NewChild('SrcAdv').NewChild('Type').Set(librarian_servicetype)
00512         #Place the document into the doc attribute
00513         doc.Replace(regentry)
00514         return True
00515 
00516     def GetAdditionalLocalInformation(self, service_node):
00517         service_node.NewChild('Type').Set(librarian_servicetype)
00518