Back to index

nordugrid-arc-nox  1.1.0~rc6
Public Member Functions | Public Attributes | Private Member Functions
storage.librarian.librarian.Librarian Class Reference

List of all members.

Public Member Functions

def __init__
def initThread
def ahash_get_tree
def ahash_get
def ahash_change
def checkingThread
def report
def new
def get
def traverseLN
def modifyMetadata
def remove

Public Attributes

 service_state
 ssl_config
 period
 hbtimeout
 master_ahash
 ahash_reader
 ahash_writer
 thread_is_running

Private Member Functions

def _update_ahash_urls
def _change_states
def _set_next_heartbeat
def _parse_LN
def _traverse

Detailed Description

Definition at line 22 of file librarian.py.


Constructor & Destructor Documentation

def storage.librarian.librarian.Librarian.__init__ (   self,
  cfg,
  ssl_config,
  service_state 
)

Definition at line 24 of file librarian.py.

00024 
00025     def __init__(self, cfg, ssl_config, service_state):
00026         self.service_state = service_state
00027         self.ssl_config = ssl_config
00028         try:
00029             self.period = float(str(cfg.Get('CheckPeriod')))
00030             self.hbtimeout = float(str(cfg.Get('HeartbeatTimeout')))
00031         except:
00032             log.msg(arc.ERROR, 'Cannot find CheckPeriod, HeartbeatTimeout in the config')
00033             raise Exception, 'Librarian cannot run without CheckPeriod and HeartbeatTimeout'
00034         log.msg(arc.INFO,'Getting AHash URL from the config')
00035         ahash_urls =  get_child_values_by_name(cfg, 'AHashURL')
00036         if ahash_urls:
00037             log.msg(arc.INFO,'Got AHash URLs:', ahash_urls)
00038             log.msg(arc.VERBOSE, 'AHash URL found in the configuration.')
00039             self.master_ahash = arc.URL(ahash_urls[0])
00040             self.ahash_reader = AHashClient(ahash_urls, ssl_config = self.ssl_config)
00041             self.ahash_writer = AHashClient(ahash_urls, ssl_config = self.ssl_config)
00042             self.thread_is_running = True
00043             log.msg(arc.INFO,'Starting checking thread')
00044             threading.Thread(target = self.checkingThread).start()
00045             log.msg(arc.INFO,'Setting running state to True')
00046             self.service_state.running = True
00047         else:
00048             log.msg(arc.INFO,'No AHash from the config')
00049             isis_urls =  get_child_values_by_name(cfg, 'ISISURL')
00050             if not isis_urls:
00051                    log.msg(arc.ERROR, "AHash URL and ISIS URL not found in the configuration.")
00052                    raise Exception, 'Librarian cannot run with no A-Hash'
00053             log.msg(arc.INFO,'Got ISIS URL, starting initThread')
00054             self.thread_is_running = True
00055             threading.Thread(target = self.initThread, args = [isis_urls]).start()
        

Member Function Documentation

def storage.librarian.librarian.Librarian._change_states (   self,
  changes 
) [private]

Definition at line 185 of file librarian.py.

00185 
00186     def _change_states(self, changes):
00187         # we got a list of (GUID, serviceID, referenceID, state) - where GUID is of the file,
00188         #   serviceID is of the shepherd, referenceID is of the replica within the shepherd, and state is of this replica
00189         # we need to put the serviceID, referenceID pair into one string (a location)
00190         with_locations = [(GUID, serialize_ids([serviceID, referenceID]), state)
00191             for GUID, serviceID, referenceID, state in changes]
00192         # we will ask the librarian for each file to modify the state of this location of this file (or add this location if it was not already there)
00193         # if state == DELETED this location will be removed
00194         # but only if the file itself exists
00195         change_request = dict([
00196             (location, (GUID, (state != DELETED) and 'set' or 'unset', 'locations', location, state,
00197                 {'only if file exists' : ('is', 'entry', 'type', 'file')}))
00198                     for GUID, location, state in with_locations
00199         ])
00200         #print '_change_states request', change_request
00201         change_response = self.ahash_change(change_request)
00202         #print '_change_states response', change_response
00203         return change_response
        

Here is the call graph for this function:

Here is the caller graph for this function:

def storage.librarian.librarian.Librarian._parse_LN (   self,
  LN 
) [private]

Definition at line 311 of file librarian.py.

00311 
00312     def _parse_LN(self, LN):
00313         try:
00314             splitted = LN.split('/')
00315         except:
00316             raise Exception, 'Invalid Logical Name: `%s`' % LN
00317         guid = splitted[0]
00318         path = splitted[1:]
00319         return guid, path

Here is the caller graph for this function:

def storage.librarian.librarian.Librarian._set_next_heartbeat (   self,
  serviceID,
  next_heartbeat 
) [private]

Definition at line 204 of file librarian.py.

00204 
00205     def _set_next_heartbeat(self, serviceID, next_heartbeat):
00206         ahash_request = {'report' : (sestore_guid, 'set', serviceID, 'nextHeartbeat', next_heartbeat, {})}
00207         #print '_set_next_heartbeat request', ahash_request
00208         ahash_response = self.ahash_change(ahash_request)
00209         #print '_set_next_heartbeat response', ahash_response
00210         if ahash_response['report'][0] != 'set':
00211             log.msg(arc.VERBOSE, 'ERROR setting next heartbeat time!')
    

Here is the call graph for this function:

Here is the caller graph for this function:

def storage.librarian.librarian.Librarian._traverse (   self,
  guid,
  metadata,
  path,
  traversed,
  GUIDs 
) [private]

Definition at line 320 of file librarian.py.

00320 
00321     def _traverse(self, guid, metadata, path, traversed, GUIDs):
00322             try:
00323                 while path:
00324                     if path[0] in ['', '.']:
00325                         child_guid = guid
00326                         child_metadata = metadata
00327                     else:
00328                         child_guid = metadata[('entries',path[0])]
00329                         child_metadata = self.ahash_get([child_guid])[child_guid]
00330                     traversed.append(path.pop(0))
00331                     GUIDs.append(child_guid)
00332                     guid = child_guid
00333                     metadata = child_metadata
00334                 return metadata
00335             except KeyError:
00336                 return metadata
00337             except Exception, e:
00338                 log.msg(arc.ERROR, 'Error traversing: %s' % e)
00339                 #log.msg()
00340                 return {}
00341 

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 83 of file librarian.py.

00083 
00084     def _update_ahash_urls(self):
00085         try:
00086             ahash_list = self.ahash_reader.get(ahash_list_guid)[ahash_list_guid]
00087             if ahash_list:
00088                 master = [url for type,url in ahash_list.items() 
00089                           if 'master' in type and type[1].endswith('online')]
00090                 clients = [url for type,url in ahash_list.items() 
00091                            if 'client' in type and type[1].endswith('online')]
00092                 if master:
00093                     self.master_ahash = arc.URL(master[0])
00094                     self.ahash_writer.urls = [arc.URL(url) for url in master]
00095                     self.ahash_writer.reset()
00096                 if clients:
00097                     self.ahash_reader.urls = [arc.URL(url) for url in clients]
00098                     self.ahash_reader.reset()
00099         except:
00100             pass
        

Here is the caller graph for this function:

Wrapper for ahash.change replacing ahash.urls with master ahash, calling
ahash.change, putting back ahash.urls
ahash.change can only call master ahash

Definition at line 127 of file librarian.py.

00127 
00128     def ahash_change(self, changes):
00129         """
00130         Wrapper for ahash.change replacing ahash.urls with master ahash, calling
00131         ahash.change, putting back ahash.urls
00132         ahash.change can only call master ahash
00133         """
00134         ret = None
00135         try:
00136             ret = self.ahash_writer.change(changes)
00137             return ret
00138         except:
00139             # put back all known ahashes so we don't only ask the master
00140             # for an update
00141             ahash_reader_urls = self.ahash_reader.urls
00142             ahash_writer_urls = self.ahash_writer.urls
00143             # retry, updating ahash urls in case master is outdated
00144             self._update_ahash_urls()
00145             try:
00146                 ret = self.ahash_writer.change(changes)
00147                 return ret
00148             except:
00149                 # make sure ahash.urls are restored even is change failed
00150                 self.ahash_reader.urls = ahash_reader_urls
00151                 self.ahash_reader.reset()
00152                 self.ahash_writer.urls = ahash_writer_urls
00153                 self.ahash_writer.reset()
00154                 raise
    

Here is the call graph for this function:

Here is the caller graph for this function:

def storage.librarian.librarian.Librarian.ahash_get (   self,
  IDs,
  neededMetadata = [] 
)
Wrapper for ahash.get updating ahashes and calling ahash.get

Definition at line 114 of file librarian.py.

00114 
00115     def ahash_get(self, IDs, neededMetadata = []):
00116         """
00117         Wrapper for ahash.get updating ahashes and calling ahash.get
00118         """
00119         try:
00120             ret = self.ahash_reader.get(IDs, neededMetadata)
00121             return ret
00122         except:
00123             # if get fails, there are no clients available, so there is
00124             # nothing to do here
00125             #log.msg()
00126             raise

Here is the caller graph for this function:

def storage.librarian.librarian.Librarian.ahash_get_tree (   self,
  IDs,
  neededMetadata = [] 
)
Wrapper for ahash.get_tree updating ahashes and calling ahash.get_tree

Definition at line 101 of file librarian.py.

00101 
00102     def ahash_get_tree(self, IDs, neededMetadata = []):
00103         """
00104         Wrapper for ahash.get_tree updating ahashes and calling ahash.get_tree
00105         """
00106         try:
00107             ret = self.ahash_reader.get_tree(IDs, neededMetadata)
00108             return ret
00109         except:
00110             # if get_tree fails, there are no clients available, so there is
00111             # nothing to do here
00112             #log.msg()
00113             raise

Here is the caller graph for this function:

Definition at line 155 of file librarian.py.

00155 
00156     def checkingThread(self):
00157         time.sleep(10)
00158         while self.thread_is_running:
00159             try:
00160                 SEs = self.ahash_get([sestore_guid])[sestore_guid]
00161                 #print 'registered shepherds:', SEs
00162                 now = time.time()
00163                 late_SEs = [serviceID for (serviceID, property), nextHeartbeat in SEs.items() if property == 'nextHeartbeat' and float(nextHeartbeat) < now and nextHeartbeat != '-1']
00164                 #print 'late shepherds (it is %s now)' % now, late_SEs
00165                 if late_SEs:
00166                     serviceGUIDs = dict([(serviceGUID, serviceID) for (serviceID, property), serviceGUID in SEs.items() if property == 'serviceGUID' and serviceID in late_SEs])
00167                     #print 'late shepherds serviceGUIDs', serviceGUIDs
00168                     filelists = self.ahash_get(serviceGUIDs.keys())
00169                     changes = []
00170                     for serviceGUID, serviceID in serviceGUIDs.items():
00171                         filelist = filelists[serviceGUID]
00172                         #print 'filelist of late shepherd', serviceID, filelist
00173                         changes.extend([(GUID, serviceID, referenceID, OFFLINE)
00174                             for (_, referenceID), GUID in filelist.items()])
00175                     change_response = self._change_states(changes)
00176                     for _, serviceID in serviceGUIDs.items():
00177                         self._set_next_heartbeat(serviceID, -1)
00178                 time.sleep(self.period)
00179             except Exception, e:
00180                 log.msg(arc.ERROR, "Error in Librarian's checking thread: %s" % e)
00181                 #log.msg()
00182             # update list of ahashes
00183             self._update_ahash_urls()
00184             time.sleep(self.period)

Here is the call graph for this function:

Here is the caller graph for this function:

def storage.librarian.librarian.Librarian.get (   self,
  requests,
  neededMetadata = [] 
)

Definition at line 308 of file librarian.py.

00308 
00309     def get(self, requests, neededMetadata = []):
00310         return self.ahash_get_tree(requests, neededMetadata)

Here is the call graph for this function:

def storage.librarian.librarian.Librarian.initThread (   self,
  isis_urls 
)

Definition at line 56 of file librarian.py.

00056 
00057     def initThread(self, isis_urls):
00058         found_ahash = False
00059         while not found_ahash:
00060             try:
00061                 time.sleep(3)
00062                 log.msg(arc.INFO,'Trying to get A-Hash from ISISes')
00063                 for isis_url in isis_urls:
00064                     if not self.thread_is_running:
00065                         return
00066                     log.msg(arc.INFO,'Trying to get A-Hash from', isis_url)
00067                     isis = ISISClient(isis_url, ssl_config = self.ssl_config)
00068                     try:
00069                         ahash_urls = isis.getServiceURLs(ahash_servicetype)
00070                         log.msg(arc.INFO,'Got A-Hash from ISIS:', ahash_urls)
00071                         if ahash_urls:
00072                             self.master_ahash = arc.URL(ahash_urls[0])
00073                             self.ahash_reader = AHashClient(ahash_urls, ssl_config = self.ssl_config)
00074                             self.ahash_writer = AHashClient(ahash_urls, ssl_config = self.ssl_config)
00075                             found_ahash = True
00076                     except:
00077                         log.msg()
00078             except Exception, e:
00079                 log.msg(arc.WARNING, 'Error in initThread: %s' % e)
00080         log.msg(arc.INFO,'initThread finishes, starting checkingThread')
00081         threading.Thread(target = self.checkingThread).start()
00082         self.service_state.running = True

Here is the call graph for this function:

Definition at line 377 of file librarian.py.

00377 
00378     def modifyMetadata(self, requests):
00379         changes = {}
00380         for changeID, (GUID, changeType, section, property, value) in requests.items():
00381             if changeType in ['set', 'unset']:
00382                 conditions = {}
00383             elif changeType == 'add':
00384                 changeType = 'set'
00385                 conditions = {'0': ('unset', section, property, '')}
00386             elif changeType.startswith('setifvalue='):
00387                 ifvalue = changeType[11:]
00388                 changeType = 'set'
00389                 conditions = {'0': ('is', section, property, ifvalue)}
00390             else:
00391                 continue
00392             changes[changeID] = (GUID, changeType, section, property, value, conditions)
00393         ahash_response = self.ahash_change(changes)
00394         response = {}
00395         for changeID, (success, conditionID) in ahash_response.items():
00396             if success in ['set', 'unset']:
00397                 response[changeID] = success
00398             elif conditionID == '0':
00399                 response[changeID] = 'condition failed'
00400             else:
00401                 response[changeID] = 'failed: ' + success
00402         return response

Here is the call graph for this function:

def storage.librarian.librarian.Librarian.new (   self,
  requests 
)

Definition at line 267 of file librarian.py.

00267 
00268     def new(self, requests):
00269         response = {}
00270         for rID, metadata in requests.items():
00271             #print 'Processing new request:', metadata
00272             try:
00273                 type = metadata[('entry','type')]
00274                 del metadata[('entry', 'type')]
00275             except:
00276                 type = None
00277             if type is None:
00278                 success = 'failed: no type given'
00279             else:
00280                 try:
00281                     GUID = metadata[('entry','GUID')]
00282                 except:
00283                     GUID = arc.UUID()
00284                     metadata[('entry', 'GUID')] = GUID
00285               
00286                 check = self.ahash_change(
00287                       {'new': (GUID, 'set', 'entry', 'type', type, {'0' : ('unset','entry','type','')})}
00288                     )
00289                 
00290                 status, failedCondition = check['new']
00291                 if status == 'set':
00292                     success = 'success'
00293                     changeID = 0
00294                     changes = {}
00295                     for ((section, property), value) in metadata.items():
00296                         changes[changeID] = (GUID, 'set', section, property, value, {})
00297                         changeID += 1
00298                     resp = self.ahash_change(changes)
00299                     for r in resp.keys():
00300                         if resp[r][0] != 'set':
00301                             success += ' (failed: %s - %s)' % (resp[r][0] + str(changes[r]))
00302                 elif failedCondition == '0':
00303                     success = 'failed: entry exists'
00304                 else:
00305                     success = 'failed: ' + status
00306             response[rID] = (GUID, success)
00307         return response

Here is the call graph for this function:

def storage.librarian.librarian.Librarian.remove (   self,
  requests 
)

Definition at line 403 of file librarian.py.

00403 
00404     def remove(self, requests):
00405         ahash_request = dict([(requestID, (GUID, 'delete', '', '', '', {}))
00406             for requestID, GUID in requests.items()])
00407         ahash_response = self.ahash_change(ahash_request)
00408         response = {}
00409         for requestID, (success, _) in ahash_response.items():
00410             if success == 'deleted':
00411                 response[requestID] = 'removed'
00412             else:
00413                 response[requestID] = 'failed: ' + success
        return response

Here is the call graph for this function:

def storage.librarian.librarian.Librarian.report (   self,
  serviceID,
  filelist 
)

Definition at line 212 of file librarian.py.

00212 
00213     def report(self, serviceID, filelist):
00214         try:
00215             # we got the ID of the shepherd service, and a filelist which contains (GUID, referenceID, state) tuples
00216             # here we get the list of registered services from the A-Hash (stored with the GUID 'sestore_guid')
00217             ses = self.ahash_get([sestore_guid])[sestore_guid]
00218             # we get the GUID of the shepherd
00219             serviceGUID = ses.get((serviceID,'serviceGUID'), None)
00220             # or this shepherd was not registered yet
00221             if not serviceGUID:
00222                 ## print 'report se is not registered yet', serviceID
00223                 # let's create a new GUID
00224                 serviceGUID = arc.UUID()
00225                 # let's add the new shepherd-GUID to the list of shepherd-GUIDs but only if someone else not done that just now
00226                 ahash_request = {'report' : (sestore_guid, 'set', serviceID, 'serviceGUID', serviceGUID, {'onlyif' : ('unset', serviceID, 'serviceGUID', '')})}
00227                 ## print 'report ahash_request', ahash_request
00228                 ahash_response = self.ahash_change(ahash_request)
00229                 ## print 'report ahash_response', ahash_response
00230                 success, unmetConditionID = ahash_response['report']
00231                 # if there was already a GUID for this service (which should have been created after we check it first but before we tried to create a new)
00232                 if unmetConditionID:
00233                     # let's try to get the GUID of the shepherd once again
00234                     ses = self.ahash_get([sestore_guid])[sestore_guid]
00235                     serviceGUID = ses.get((serviceID, 'serviceGUID'))
00236             # if the next heartbeat time of this shepherd is -1 or nonexistent, we will ask it to send all the file states, not just the changed
00237             please_send_all = int(ses.get((serviceID, 'nextHeartbeat'), -1)) == -1
00238             # calculate the next heartbeat time
00239             next_heartbeat = str(int(time.time() + self.hbtimeout))
00240             # register the next heartbeat time for this shepherd
00241             self._set_next_heartbeat(serviceID, next_heartbeat)
00242             # change the states of replicas in the metadata of the files to the just now reported values
00243             self._change_states([(GUID, serviceID, referenceID, state) for GUID, referenceID, state in filelist])
00244             # get the metadata of the shepherd
00245             se = self.ahash_get([serviceGUID])[serviceGUID]
00246             ## print 'report se before:', se
00247             # we want to know which files this shepherd stores, that's why we collect the GUIDs and referenceIDs of all the replicas the shepherd reports
00248             # we store this information in the A-Hash by the GUID of this shepherd (serviceGUID)
00249             # so this request asks the A-Hash to store the referenceID and GUID of this file in the section called 'file' in the A-Hash entry of the Shepherd
00250             # but if the shepherd reports that a replica is DELETED then it will be removed from this list (unset)
00251             change_request = dict([(referenceID, (serviceGUID, (state == DELETED) and 'unset' or 'set', 'file', referenceID, GUID, {}))
00252                 for GUID, referenceID, state in filelist])
00253             ## print 'report change_request:', change_request
00254             change_response = self.ahash_change(change_request)
00255             # TODO: check the response and do something if something is wrong
00256             ## print 'report change_response:', change_response
00257             ## se = self.ahash_get([serviceGUID])[serviceGUID]
00258             ## print 'report se after:', se
00259             # if we want the shepherd to send the state of all the files, not just the changed one, we return -1 as the next heartbeat's time
00260         except:
00261             log.msg(arc.ERROR, 'Error processing report message')
00262             please_send_all = True
00263         if please_send_all:
00264             return -1
00265         else:
00266             return int(self.hbtimeout)

Here is the call graph for this function:

def storage.librarian.librarian.Librarian.traverseLN (   self,
  requests 
)

Definition at line 342 of file librarian.py.

00342 
00343     def traverseLN(self, requests):
00344         response = {}
00345         for rID, LN in requests.items():
00346             guid0, path0 = self._parse_LN(LN)
00347             if not guid0:
00348                 guid = global_root_guid
00349             else:
00350                 guid = guid0
00351             traversed = [guid0]
00352             GUIDs = [guid]
00353             path = copy.deepcopy(path0)
00354             try:
00355                 metadata0 = self.ahash_get([guid])[guid]
00356             except:
00357                 # TODO: figure out this part
00358                 raise
00359                 metadata0 = {}
00360             if not metadata0.has_key(('entry','type')):
00361                 response[rID] = ([], False, '', guid0, None, '/'.join(path))
00362             else:
00363                 try:
00364                     metadata = self._traverse(guid, metadata0, path, traversed, GUIDs)
00365                     traversedList = [(traversed[i], GUIDs[i]) for i in range(len(traversed))]
00366                     wasComplete = len(path) == 0
00367                     traversedLN = guid0 + '/' + '/'.join(traversed[1:])
00368                     GUID = GUIDs[-1]
00369                     restLN = '/'.join(path)
00370                     response[rID] = (traversedList, wasComplete, traversedLN, GUID, metadata, restLN)
00371                 except Exception, e:
00372                     log.msg(arc.ERROR, "Error in traverseLN method: %s" % s)
00373                     #log.msg()
00374                     response[rID] = ([], False, '', guid0, None, '/'.join(path))
00375             #print '?\n? traversedList, wasComplete, traversedLN, GUID, metadata, restLN\n? ', response
00376         return response

Here is the call graph for this function:


Member Data Documentation

Definition at line 39 of file librarian.py.

Definition at line 40 of file librarian.py.

Definition at line 29 of file librarian.py.

Definition at line 38 of file librarian.py.

Definition at line 28 of file librarian.py.

Definition at line 25 of file librarian.py.

Definition at line 26 of file librarian.py.

Definition at line 41 of file librarian.py.


The documentation for this class was generated from the following file: