Back to index

nordugrid-arc-nox  1.1.0~rc6
Public Member Functions | Public Attributes | Private Member Functions
storage.shepherd.shepherd.Shepherd Class Reference

List of all members.

Public Member Functions

def __init__
def __del__
def getSpaceInformation
def isisLibrarianThread
def isisBartenderThread
def reportingThread
def toggleReport
def checkingThread
def changeState
def get
def put
def delete
def stat

Public Attributes

 service_is_running
 ssl_config
 backend
 store
 serviceID
 period
 min_interval
 creating_timeout
 checksum_lifetime
 librarian
 bartender
 changed_states
 doReporting

Private Member Functions

def _checking_checksum
def _file_arrived
def _refresh_checksum

Detailed Description

Definition at line 23 of file shepherd.py.


Constructor & Destructor Documentation

Definition at line 25 of file shepherd.py.

00025 
00026     def __init__(self, cfg):
00027         self.service_is_running = True
00028         self.ssl_config = parse_ssl_config(cfg)
00029         
00030         try:
00031             backendclass = str(cfg.Get('BackendClass'))
00032             backendcfg = cfg.Get('BackendCfg')
00033             self.backend = import_class_from_string(backendclass)(backendcfg, shepherd_uri, self._file_arrived, self.ssl_config)
00034         except Exception, e:
00035             log.msg(arc.ERROR, 'Cannot import backend class %(c)s (reason: %(r)s)' % {'c':backendclass, 'r':e})
00036             raise
00037         
00038         try:
00039             storeclass = str(cfg.Get('StoreClass'))
00040             storecfg = cfg.Get('StoreCfg')
00041             self.store = import_class_from_string(storeclass)(storecfg)
00042         except:
00043             log.msg(arc.ERROR, 'Cannot import store class', storeclass)
00044             raise
00045             
00046         try:
00047             self.serviceID = str(cfg.Get('ServiceID'))
00048         except:
00049             log.msg(arc.ERROR, 'Cannot get serviceID')
00050             raise
00051             
00052         try:
00053             self.period = float(str(cfg.Get('CheckPeriod')))
00054             self.min_interval = float(str(cfg.Get('MinCheckInterval')))
00055         except:
00056             log.msg(arc.ERROR, 'Cannot set CheckPeriod, MinCheckInterval')
00057             raise
00058                     
00059         try:
00060             self.creating_timeout = float(str(cfg.Get('CreatingTimeout')))
00061         except:
00062             self.creating_timeout = 0
00063             
00064         try:
00065             self.checksum_lifetime = float(str(cfg.Get('ChecksumLifetime')))
00066         except:
00067             self.checksum_lifetime = 3600
00068 
00069         librarian_urls =  get_child_values_by_name(cfg, 'LibrarianURL')
00070         self.librarian = LibrarianClient(librarian_urls, ssl_config = self.ssl_config)
00071         if librarian_urls:
00072             log.msg(arc.INFO,'Got Librarian URLs from the config:', librarian_urls)
00073         else:
00074             isis_urls = get_child_values_by_name(cfg, 'ISISURL')
00075             if not isis_urls:
00076                 log.msg(arc.ERROR, "No Librarian URLs and no ISIS URLs found in the configuration: no self-healing!")
00077             else:
00078                 log.msg(arc.INFO,'Got ISIS URL, starting isisLibrarianThread')
00079                 threading.Thread(target = self.isisLibrarianThread, args = [isis_urls]).start()
00080 
00081         bartender_urls =  get_child_values_by_name(cfg, 'BartenderURL')
00082         self.bartender = BartenderClient(bartender_urls, ssl_config = self.ssl_config)
00083         if bartender_urls:
00084             log.msg(arc.INFO,'Got Bartender URLs from the config:', bartender_urls)
00085         else:
00086             isis_urls = get_child_values_by_name(cfg, 'ISISURL')
00087             if not isis_urls:
00088                 log.msg(arc.ERROR, "No Bartender URLs and no ISIS URLs found in the configuration: no self-healing!")
00089             else:
00090                 log.msg(arc.INFO,'Got ISIS URL, starting isisBartenderThread')
00091                 threading.Thread(target = self.isisBartenderThread, args = [isis_urls]).start()
00092 
00093         self.changed_states = self.store.list()
00094         threading.Thread(target = self.checkingThread, args = [self.period]).start()
00095         self.doReporting = True
00096         threading.Thread(target = self.reportingThread, args = []).start()

Definition at line 97 of file shepherd.py.

00097 
00098     def __del__(self):
00099         print "Shepherd's destructor called"
00100         self.service_is_running = False
        

Member Function Documentation

def storage.shepherd.shepherd.Shepherd._checking_checksum (   self,
  referenceID,
  localData 
) [private]

Definition at line 213 of file shepherd.py.

00213 
00214     def _checking_checksum(self, referenceID, localData):
00215         # get the current state (which is stored locally) or an empty string if it somehow has no state
00216         state = localData.get('state','')
00217         # hack: if the file's state is ALIVE then only check the checksum if the last check was a long time ago
00218         current_time = time.time()
00219         if state != ALIVE or current_time - localData.get('lastChecksumTime',-1) > self.checksum_lifetime:
00220             # ask the backend to create the checksum of the file 
00221             try:
00222                 current_checksum = self.backend.checksum(localData['localID'], localData['checksumType'])
00223                 log.msg(arc.DEBUG, 'self.backend.checksum was called on %(rID)s, the calculated checksum is %(cs)s' % {'rID':referenceID, 'cs':current_checksum})
00224                 self.store.lock()
00225                 try:
00226                     current_local_data = self.store.get(referenceID)
00227                     current_local_data['lastChecksumTime'] = current_time
00228                     current_local_data['lastChecksum'] = current_checksum
00229                     self.store.set(referenceID, current_local_data)
00230                     self.store.unlock()
00231                 except:
00232                     log.msg()
00233                     self.store.unlock()
00234                     current_checksum = None
00235             except:
00236                 current_checksum = None
00237         else:
00238             current_checksum = localData.get('lastChecksum', None)
00239         # get the original checksum
00240         checksum = localData['checksum']
00241         #print '-=-', referenceID, state, checksum, current_checksum
00242         if checksum == current_checksum:
00243             # if the original and the current checksum is the same, then the replica is valid
00244             if state in [INVALID, CREATING, STALLED]:
00245                 # if it is currently INVALID or CREATING or STALLED, its state should be changed
00246                 log.msg(arc.VERBOSE, '\nCHECKSUM OK', referenceID)
00247                 self.changeState(referenceID, ALIVE)
00248                 state = ALIVE
00249             return state
00250         else:
00251             # or if the checksum is not the same - we have a corrupt file, or a not-fully-uploaded one
00252             if state == CREATING: 
00253                 # if the file's local state is CREATING, that's OK, the file is still being uploaded
00254                 if self.creating_timeout:
00255                     # but if the file is in CREATED state for a long time, then maybe something was wrong, we should change its state
00256                     now = time.time()
00257                     if now - float(localData.get('created', now)) > self.creating_timeout:
00258                         self.changeState(referenceID, STALLED)
00259                         return STALLED
00260                 return CREATING
00261             if state in [DELETED, STALLED]:
00262                 # if the file is DELETED or STALLED we don't care if the checksum is wrong
00263                 return state
00264             if state != INVALID:
00265                 # but if it is not CREATING, not DELETED or STALLED and not INVALID - then it was ALIVE: now its state should be changed to INVALID
00266                 log.msg(arc.VERBOSE, '\nCHECKSUM MISMATCH', referenceID, 'original:', checksum, 'current:', current_checksum)
00267                 self.changeState(referenceID, INVALID)
00268             return INVALID
        

Here is the call graph for this function:

Here is the caller graph for this function:

def storage.shepherd.shepherd.Shepherd._file_arrived (   self,
  referenceID 
) [private]

Definition at line 269 of file shepherd.py.

00269 
00270     def _file_arrived(self, referenceID):
00271         # this is called usually by the backend when a file arrived (gets fully uploaded)
00272         # call the checksum checker which will change the state to ALIVE if its checksum is OK, but leave it as CREATING if the checksum is wrong
00273         # TODO: either this checking should be in seperate thread, or the backend's should call this in a seperate thread?
00274         localData = self.store.get(referenceID)
00275         GUID = localData['GUID']
00276         trials = 3 # if arccp hasn't set the right checksum yet, try to wait for it
00277         while localData['checksum'] == '' and trials > 0:
00278             trials = trials - 1
00279             # first sleep 1 sec, then 2, then 3
00280             time.sleep(3 - trials)
00281             # check if the cheksum changed in the Librarian
00282             metadata = self.librarian.get([GUID])[GUID]
00283             localData = self._refresh_checksum(referenceID, localData, metadata)
00284         state = self._checking_checksum(referenceID, localData)
00285         # if _checking_checksum haven't change the state to ALIVE: the file is corrupt
00286         # except if the checksum is '', then the arccp hasn't set the right checksum yet
00287         if state == CREATING and localData['checksum'] != '':
00288             self.changeState(referenceID, INVALID)

Here is the call graph for this function:

def storage.shepherd.shepherd.Shepherd._refresh_checksum (   self,
  referenceID,
  localData,
  metadata 
) [private]

Definition at line 289 of file shepherd.py.

00289 
00290     def _refresh_checksum(self, referenceID, localData, metadata):
00291         checksum = localData['checksum']
00292         checksumType = localData['checksumType']
00293         librarian_checksum = metadata.get(('states','checksum'), checksum)
00294         librarian_checksumType = metadata.get(('states','checksumType'), checksumType)
00295         if checksum != librarian_checksum or checksumType != librarian_checksumType:
00296             # refresh the checksum
00297             self.store.lock()
00298             current_local_data = self.store.get(referenceID)
00299             try:
00300                 if not current_local_data: # what?
00301                     self.store.unlock()
00302                 else:
00303                     current_local_data['checksum'] = librarian_checksum
00304                     current_local_data['checksumType'] = librarian_checksumType
00305                     log.msg(arc.VERBOSE, 'checksum refreshed', current_local_data)
00306                     self.store.set(referenceID, current_local_data)
00307                     self.store.unlock()
00308                     return current_local_data
00309             except:
00310                 log.msg()
00311                 self.store.unlock()
00312                 return current_local_data
00313         return localData

Here is the caller graph for this function:

def storage.shepherd.shepherd.Shepherd.changeState (   self,
  referenceID,
  newState,
  onlyIf = None 
)

Definition at line 475 of file shepherd.py.

00475 
00476     def changeState(self, referenceID, newState, onlyIf = None):
00477         # change the file's local state and add it to the list of changed files
00478         self.store.lock()
00479         try:
00480             localData = self.store.get(referenceID)
00481             if not localData: # this file is already removed
00482                 self.store.unlock()
00483                 return False
00484             oldState = localData['state']
00485             log.msg(arc.VERBOSE, 'changeState', referenceID, oldState, '->', newState)
00486             # if a previous state is given, change only if the current state is the given state
00487             if onlyIf and oldState != onlyIf:
00488                 self.store.unlock()
00489                 return False
00490             localData['state'] = newState
00491             self.store.set(referenceID, localData)
00492             self.store.unlock()
00493             # append it to the list of changed files (these will be reported)
00494             self.changed_states.append(referenceID)
00495         except:
00496             log.msg()
00497             self.store.unlock()
00498             return False

Here is the caller graph for this function:

Definition at line 314 of file shepherd.py.

00314 
00315     def checkingThread(self, period):
00316         # first just wait a few seconds
00317         time.sleep(10)
00318         while self.service_is_running:
00319             # do this forever
00320             try:
00321                 # get the referenceIDs of all the stored files
00322                 referenceIDs = self.store.list()
00323                 # count them
00324                 number = len(referenceIDs)
00325                 alive_GUIDs = []
00326                 # if there are stored files at all
00327                 if number > 0:
00328                     # we should check all the files periodically, with a given period, which determines the checking interval for one file
00329                     interval = period / number
00330                     # but we don't want to run constantly, after a file is checked we should wait at least a specified amount of time
00331                     if interval < self.min_interval:
00332                         interval = self.min_interval
00333                     log.msg(arc.VERBOSE,'\n', self.serviceID, 'is checking', number, 'files with interval', interval)
00334                     # randomize the list of files to be checked
00335                     random.shuffle(referenceIDs)
00336                     # start checking the first one
00337                     for referenceID in referenceIDs:
00338                         try:
00339                             localData = self.store.get(referenceID)
00340                             #print localData
00341                             GUID, localID = localData['GUID'], localData['localID']
00342                             # first we get the file's metadata from the librarian
00343                             metadata = self.librarian.get([GUID])[GUID]
00344                             # check if the cheksum changed in the Librarian
00345                             localData = self._refresh_checksum(referenceID, localData, metadata)
00346                             # check the real checksum of the file: if the checksum is OK or not, it changes the state of the replica as well
00347                             # and it returns the state and the GUID and the localID of the file
00348                             #   if _checking_checksum changed the state then the new state is returned here:
00349                             state = self._checking_checksum(referenceID, localData)
00350                             # now the file's state is according to its checksum
00351                             # checksum takes time, so refreshing metadata...
00352                             metadata = self.librarian.get([GUID])[GUID]
00353                             # if it is CREATING or ALIVE:
00354                             if state == CREATING or state == ALIVE:
00355                                 # if this metadata is not a valid file then the file must be already removed
00356                                 if metadata.get(('entry', 'type'), '') != 'file':
00357                                     # it seems this is not a real file anymore
00358                                     # we should remove it
00359                                     bsuccess = self.backend.remove(localID)
00360                                     self.store.set(referenceID, None)
00361                                 # if the file is ALIVE (which means it is not CREATING or DELETED)
00362                                 if state == ALIVE:
00363                                     if GUID in alive_GUIDs:
00364                                         # this means that we already have an other alive replica of this file
00365                                         log.msg(arc.VERBOSE, '\n\nFile', GUID, 'has more than one replicas on this storage element.')
00366                                         self.changeState(referenceID, DELETED)
00367                                     else:    
00368                                         # check the number of needed replicas
00369                                         needed_replicas = int(metadata.get(('states','neededReplicas'),1))
00370                                         #print metadata.items()
00371                                         # find myself among the locations
00372                                         mylocation = serialize_ids([self.serviceID, referenceID])
00373                                         myself = [v for (s, p), v in metadata.items() if s == 'locations' and p == mylocation]
00374                                         #print myself
00375                                         if not myself or myself[0] != ALIVE:
00376                                             # if the state of this replica is not proper in the Librarian, fix it
00377                                             metadata[('locations', mylocation)] = ALIVE
00378                                             self.changeState(referenceID, ALIVE)
00379                                         # get the number of shepherds with alive (or creating) replicas
00380                                         alive_replicas = len(dict([(property.split(' ')[0], value) 
00381                                                                    for (section, property), value in metadata.items()
00382                                                                    if section == 'locations' and value in [ALIVE, CREATING]]))
00383                                         if alive_replicas < needed_replicas:
00384                                             # if the file has fewer replicas than needed
00385                                             log.msg(arc.VERBOSE, '\n\nFile', GUID, 'has fewer replicas than needed.')
00386                                             # we offer our copy to replication
00387                                             try:
00388                                                 response = self.bartender.addReplica({'checkingThread' : GUID}, common_supported_protocols)
00389                                                 success, turl, protocol = response['checkingThread']
00390                                             except:
00391                                                 success = ''
00392                                             #print 'addReplica response', success, turl, protocol
00393                                             if success == 'done':
00394                                                 # if it's OK, we asks the backend to upload our copy to the TURL we got from the bartender
00395                                                 self.backend.copyTo(localID, turl, protocol)
00396                                                 # TODO: this should be done in some other thread
00397                                             else:
00398                                                 log.msg(arc.VERBOSE, 'checkingThread error, bartender responded', success)
00399                                             # so this GUID has an alive replica here
00400                                             alive_GUIDs.append(GUID)
00401                                         elif alive_replicas > needed_replicas:
00402                                             log.msg(arc.VERBOSE, '\n\nFile', GUID, 'has %d more replicas than needed.' % (alive_replicas-needed_replicas))
00403                                             thirdwheels = len([property for (section, property), value in metadata.items()
00404                                                                if section == 'locations' and value == THIRDWHEEL])
00405                                             if thirdwheels == 0:
00406                                                 self.changeState(referenceID, THIRDWHEEL)
00407                                         else:
00408                                             # so this GUID has an alive replica here
00409                                             alive_GUIDs.append(GUID)
00410                             # or if this replica is not needed
00411                             elif state == THIRDWHEEL:
00412                                 # check the number of needed replicasa
00413                                 needed_replicas = int(metadata.get(('states','neededReplicas'),1))
00414                                 # and the number of alive replicas
00415                                 alive_replicas = len([property for (section, property), value in metadata.items()
00416                                                           if section == 'locations' and value == ALIVE])
00417                                 # get the number of THIRDWHEELS not on this Shepherd (self.serviceID)
00418                                 thirdwheels = len([property for (section, property), value in metadata.items()
00419                                                    if section == 'locations' and value == THIRDWHEEL and not property.startswith(self.serviceID)])
00420                                 my_replicas = len([property for (section, property), value in metadata.items()
00421                                                    if section == 'locations' and value == ALIVE 
00422                                                    and property.startswith(self.serviceID)])
00423                                 # if shephered has other alive replicas or no-one else have a thirdwheel replica, 
00424                                 # and the file still has enough replicas, we delete this replica
00425                                 if my_replicas != 0 or (thirdwheels == 0 and alive_replicas >= needed_replicas):
00426                                     #bsuccess = self.backend.remove(localID)
00427                                     #self.store.set(referenceID, None)
00428                                     self.changeState(referenceID, DELETED)
00429                                 # else we sheepishly set the state back to ALIVE
00430                                 else:
00431                                     self.changeState(referenceID, ALIVE)
00432                                     state = ALIVE
00433                             # or if this replica is INVALID
00434                             elif state == INVALID:
00435                                 log.msg(arc.VERBOSE, '\n\nI have an invalid replica of file', GUID, '- now I remove it.')
00436                                 self.changeState(referenceID, DELETED)      
00437                                 #
00438                                 # # disabling pull-method of self-healing # #
00439                                 #                          
00440                                 #my_replicas = len([property for (section, property), value in metadata.items()
00441                                 #                   if section == 'locations' and value in [ALIVE,CREATING] 
00442                                 #                   and property.startswith(self.serviceID)])
00443                                 # we try to get a valid one by simply downloading this file
00444                                 #try:
00445                                 #    response = self.bartender.getFile({'checkingThread' : (GUID, common_supported_protocols)})
00446                                 #    success, turl, protocol = response['checkingThread']
00447                                 #except:
00448                                 #    success = traceback.format_exc()
00449                                 #if success == 'done':
00450                                 #    # if it's OK, then we change the state of our replica to CREATING
00451                                 #    self.changeState(referenceID, CREATING)
00452                                 #    # then asks the backend to get the file from the TURL we got from the bartender
00453                                 #    self.backend.copyFrom(localID, turl, protocol)
00454                                 #    # and after this copying is done, we indicate that it's arrived
00455                                 #    self._file_arrived(referenceID)
00456                                 #    # TODO: this should be done in some other thread
00457                                 #else:
00458                                 #    log.msg(arc.VERBOSE, 'checkingThread error, bartender responded', success)
00459                             elif state == OFFLINE:
00460                                 # online now
00461                                 state = ALIVE
00462                                 self.changeState(referenceID, ALIVE)
00463                             if state == DELETED:
00464                                 # remove replica if marked it as deleted
00465                                 bsuccess = self.backend.remove(localID)
00466                                 self.store.set(referenceID, None)
00467                         except:
00468                             log.msg(arc.VERBOSE, 'ERROR checking checksum of %(rID)s, reason: %(r)s' % {'rID':referenceID, 'r':traceback.format_exc()})
00469                         # sleep for interval +/- 0.5*interval seconds to avoid race condition
00470                         time.sleep(interval+((random.random()-0.5)*interval))
00471                 else:
00472                     time.sleep(period)
00473             except:
00474                 log.msg()

Here is the call graph for this function:

def storage.shepherd.shepherd.Shepherd.delete (   self,
  request 
)

Definition at line 602 of file shepherd.py.

00602 
00603     def delete(self,request):
00604         response = {}
00605         for requestID, referenceID in request.items():
00606             localData = self.store.get(referenceID)
00607             try:
00608                 # note that actual deletion is done in self.reportingThread
00609                 self.changeState(referenceID, DELETED)
00610                 response[requestID] = 'deleted'
00611             except:
00612                 response[requestID] = 'nosuchfile'
00613         return response
00614 

Here is the call graph for this function:

def storage.shepherd.shepherd.Shepherd.get (   self,
  request 
)

Definition at line 499 of file shepherd.py.

00499 
00500     def get(self, request):
00501         response = {}
00502         for requestID, getRequestData in request.items():
00503             log.msg(arc.VERBOSE, '\n\n', getRequestData)
00504             referenceID = dict(getRequestData)['referenceID']
00505             protocols = [value for property, value in getRequestData if property == 'protocol']
00506             #print 'Shepherd.get:', referenceID, protocols
00507             localData = self.store.get(referenceID)
00508             #print 'localData:', localData
00509             if localData.get('state', INVALID) == ALIVE:
00510                 if localData.has_key('localID'):
00511                     localID = localData['localID']
00512                     checksum = localData['checksum']
00513                     checksumType = localData['checksumType']
00514                     protocol_match = self.backend.matchProtocols(protocols)
00515                     if protocol_match:
00516                         protocol = protocol_match[0]
00517                         try:
00518                             turl = self.backend.prepareToGet(referenceID, localID, protocol)
00519                             if turl:
00520                                 response[requestID] = [('TURL', turl), ('protocol', protocol),
00521                                     ('checksum', localData['checksum']), ('checksumType', localData['checksumType'])]
00522                             else:
00523                                 response[requestID] = [('error', 'internal error (empty TURL)')]
00524                         except:
00525                             log.msg()
00526                             response[requestID] = [('error', 'internal error (prepareToGet exception)')]
00527                     else:
00528                         response[requestID] = [('error', 'no supported protocol found')]
00529                 else:
00530                     response[requestID] = [('error', 'no such referenceID')]
00531             else:
00532                 response[requestID] = [('error', 'file is not alive')]
00533         return response

Definition at line 101 of file shepherd.py.

00101 
00102     def getSpaceInformation(self):
00103         free_size = self.backend.getAvailableSpace()
00104         used_size = 0
00105         referenceIDs = self.store.list()
00106         for referenceID in referenceIDs:
00107             try:
00108                 localData = self.store.get(referenceID)
00109                 size = int(localData['size'])
00110                 used_size += size
00111             except:
00112                 pass
00113         total_size = free_size + used_size
00114         return free_size, used_size, total_size
    

Definition at line 137 of file shepherd.py.

00137 
00138     def isisBartenderThread(self, isis_urls):
00139         while self.service_is_running:
00140             try:
00141                 if self.bartender.urls:
00142                     time.sleep(30)
00143                 else:
00144                     time.sleep(3)
00145                 log.msg(arc.INFO,'Getting Bartenders from ISISes')
00146                 for isis_url in isis_urls:
00147                     if not self.service_is_running:
00148                         return
00149                     log.msg(arc.INFO,'Trying to get Bartender from', isis_url)
00150                     isis = ISISClient(isis_url, ssl_config = self.ssl_config)        
00151                     bartender_urls = isis.getServiceURLs(bartender_servicetype)
00152                     log.msg(arc.INFO, 'Got Bartender from ISIS:', bartender_urls)
00153                     if bartender_urls:
00154                         self.bartender = BartenderClient(bartender_urls, ssl_config = self.ssl_config)
00155                         break
00156             except Exception, e:
00157                 log.msg(arc.WARNING, 'Error in isisBartenderThread: %s' % e)
00158                 

Definition at line 115 of file shepherd.py.

00115 
00116     def isisLibrarianThread(self, isis_urls):
00117         while self.service_is_running:
00118             try:
00119                 if self.librarian.urls:
00120                     time.sleep(30)
00121                 else:
00122                     time.sleep(3)
00123                 log.msg(arc.INFO,'Getting Librarians from ISISes')
00124                 for isis_url in isis_urls:
00125                     if not self.service_is_running:
00126                         return
00127                     log.msg(arc.INFO,'Trying to get Librarian from', isis_url)
00128                     isis = ISISClient(isis_url, ssl_config = self.ssl_config)        
00129                     librarian_urls = isis.getServiceURLs(librarian_servicetype)
00130                     log.msg(arc.INFO, 'Got Librarian from ISIS:', librarian_urls)
00131                     if librarian_urls:
00132                         self.librarian = LibrarianClient(librarian_urls, ssl_config = self.ssl_config)
00133                         break
00134             except Exception, e:
00135                 log.msg(arc.WARNING, 'Error in isisLibrarianThread: %s' % e)
00136                 
    
def storage.shepherd.shepherd.Shepherd.put (   self,
  request 
)

Definition at line 534 of file shepherd.py.

00534 
00535     def put(self, request):
00536         #print request
00537         response = {}
00538         for requestID, putRequestData in request.items():
00539             protocols = [value for property, value in putRequestData if property == 'protocol']
00540             protocol_match = self.backend.matchProtocols(protocols)
00541             if protocol_match:
00542                 # just the first protocol
00543                 protocol = protocol_match[0]
00544                 acl = [value for property, value in putRequestData if property == 'acl']
00545                 # create a dictionary from the putRequestData which contains e.g. 'size', 'GUID', 'checksum', 'checksumType'
00546                 requestData = dict(putRequestData)
00547                 size = int(requestData.get('size'))
00548                 # ask the backend if there is enough space 
00549                 availableSpace = self.backend.getAvailableSpace()
00550                 if availableSpace and availableSpace < size:
00551                     response[requestID] = [('error', 'not enough space')]
00552                 else:
00553                     GUID = requestData.get('GUID', None)
00554                     already_have_this_file = False
00555                     if GUID:
00556                         referenceIDs = self.store.list()
00557                         for referenceID in referenceIDs:
00558                             try:
00559                                 localData = self.store.get(referenceID)
00560                                 if localData['GUID'] == GUID and localData['state'] == ALIVE:
00561                                     already_have_this_file = True;
00562                                     break
00563                             except:
00564                                 log.msg()
00565                                 pass
00566                     if already_have_this_file:
00567                         response[requestID] = [('error', 'already have this file')]
00568                     else:
00569                         # create a new referenceIDs
00570                         referenceID = arc.UUID()
00571                         # ask the backend to create a local ID
00572                         localID = self.backend.generateLocalID()
00573                         # create the local data of the new file
00574                         file_data = {'localID' : localID,
00575                             'GUID' : requestData.get('GUID', None),
00576                             'checksum' : requestData.get('checksum', None),
00577                             'checksumType' : requestData.get('checksumType', None),
00578                             'lastChecksumTime' : -1,
00579                             'lastChecksum' : '',
00580                             'size' : size,
00581                             'acl': acl,
00582                             'created': str(time.time()),
00583                             'state' : CREATING} # first it has the state: CREATING
00584                         try:
00585                             # ask the backend to initiate the transfer
00586                             turl = self.backend.prepareToPut(referenceID, localID, protocol)
00587                             if turl:
00588                                 # add the returnable data to the response dict
00589                                 response[requestID] = [('TURL', turl), ('protocol', protocol), ('referenceID', referenceID)]
00590                                 # store the local data
00591                                 self.store.set(referenceID, file_data)
00592                                 # indicate that this file is 'changed': it should be reported in the next reporting cycle (in reportingThread)
00593                                 self.changed_states.append(referenceID)
00594                             else:
00595                                 response[requestID] = [('error', 'internal error (empty TURL)')]
00596                         except Exception, e:
00597                             log.msg()
00598                             response[requestID] = [('error', 'internal error (prepareToPut exception: %s)' % e)]
00599             else:
00600                 response[requestID] = [('error', 'no supported protocol found')]
00601         return response

Definition at line 159 of file shepherd.py.

00159 
00160     def reportingThread(self):
00161         # at the first start just wait for a few seconds
00162         time.sleep(5)
00163         while self.service_is_running:
00164             # do this forever
00165             try:
00166                 # if reporting is on
00167                 if self.doReporting:
00168                     # this list will hold the list of changed files we want to report
00169                     filelist = []
00170                     # when the state of a file changed somewhere the file is appended to the global 'changed_states' list
00171                     # so this list contains the files which state is changed between the last and the current cycle
00172                     while len(self.changed_states) > 0: # while there is changed file
00173                         # get the first one. the changed_states list contains referenceIDs
00174                         changed = self.changed_states.pop()
00175                         # get its local data (GUID, size, state, etc.)
00176                         localData = self.store.get(changed)
00177                         if not localData.has_key('GUID'):
00178                             log.msg(arc.VERBOSE, 'Error in shepherd.reportingThread()\n\treferenceID is in changed_states, but not in store')
00179                         else:
00180                             # add to the filelist the GUID, the referenceID and the state of the file
00181                             filelist.append((localData.get('GUID'), changed, localData.get('state')))
00182                     #print 'reporting', self.serviceID, filelist
00183                     # call the report method of the librarian with the collected filelist and with our serviceID
00184                     try:
00185                         next_report = self.librarian.report(self.serviceID, filelist)
00186                     except:
00187                         log.msg(arc.VERBOSE, 'Error sending report message to the Librarian, reason:', traceback.format_exc())
00188                         # if next_report is below zero, then we will send everything again
00189                         next_report = -1
00190                     # we should get the time of the next report
00191                     # if we don't get any time, do the next report 10 seconds later
00192                     if not next_report:
00193                         next_report = 10
00194                     last_report = time.time()
00195                     # if the next report time is below zero it means:
00196                     if next_report < 0: # 'please send all'
00197                         log.msg(arc.VERBOSE, 'reporting - asked to send all file data again')
00198                         # add the full list of stored files to the changed_state list - all the files will be reported next time (which is immediately, see below)
00199                         self.changed_states.extend(self.store.list())
00200                     # let's wait until there is any changed file or the reporting time is up - we need to do report even if no file changed (as a heartbeat)
00201                     time.sleep(10)
00202                     while len(self.changed_states) == 0 and last_report + next_report * 0.5 > time.time():
00203                         time.sleep(10)
00204                 else:
00205                     time.sleep(10)
00206             except:
00207                 log.msg()
00208                 time.sleep(10)
        
def storage.shepherd.shepherd.Shepherd.stat (   self,
  request 
)

Definition at line 615 of file shepherd.py.

00615 
00616     def stat(self, request):
00617         properties = ['state', 'checksumType', 'checksum', 'acl', 'size', 'GUID', 'localID']
00618         response = {}
00619         for requestID, referenceID in request.items():
00620             localData = self.store.get(referenceID)
00621             response[requestID] = [referenceID]
00622             for p in properties:
00623                 response[requestID].append(localData.get(p, None))
00624         return response

def storage.shepherd.shepherd.Shepherd.toggleReport (   self,
  doReporting 
)

Definition at line 209 of file shepherd.py.

00209 
00210     def toggleReport(self, doReporting):
00211         self.doReporting = doReporting
00212         return str(self.doReporting)
    

Member Data Documentation

Definition at line 32 of file shepherd.py.

Definition at line 81 of file shepherd.py.

Definition at line 92 of file shepherd.py.

Definition at line 64 of file shepherd.py.

Definition at line 59 of file shepherd.py.

Definition at line 94 of file shepherd.py.

Definition at line 69 of file shepherd.py.

Definition at line 53 of file shepherd.py.

Definition at line 52 of file shepherd.py.

Definition at line 26 of file shepherd.py.

Definition at line 46 of file shepherd.py.

Definition at line 27 of file shepherd.py.

Definition at line 40 of file shepherd.py.


The documentation for this class was generated from the following file: