Back to index

nordugrid-arc-nox  1.1.0~rc6
shepherd.py
Go to the documentation of this file.
00001 import urlparse
00002 import httplib
00003 import time
00004 import traceback
00005 import threading
00006 import random
00007 
00008 import arc
00009 from arcom import get_child_nodes, get_child_values_by_name
00010 from arcom import import_class_from_string
00011 from arcom.security import parse_ssl_config
00012 from arcom.service import shepherd_uri, librarian_servicetype, bartender_servicetype, true, parse_node, create_response, shepherd_servicetype
00013 
00014 from arcom.xmltree import XMLTree
00015 from storage.common import common_supported_protocols, serialize_ids
00016 from storage.client import LibrarianClient, BartenderClient, ISISClient
00017 
00018 from arcom.logger import Logger
00019 log = Logger(arc.Logger(arc.Logger_getRootLogger(), 'Storage.Shepherd'))
00020 
00021 from storage.common import ALIVE, CREATING, STALLED, INVALID, DELETED, THIRDWHEEL, OFFLINE
00022 
00023 class Shepherd:
00024 
00025     def __init__(self, cfg):
00026         self.service_is_running = True
00027         self.ssl_config = parse_ssl_config(cfg)
00028         
00029         try:
00030             backendclass = str(cfg.Get('BackendClass'))
00031             backendcfg = cfg.Get('BackendCfg')
00032             self.backend = import_class_from_string(backendclass)(backendcfg, shepherd_uri, self._file_arrived, self.ssl_config)
00033         except Exception, e:
00034             log.msg(arc.ERROR, 'Cannot import backend class %(c)s (reason: %(r)s)' % {'c':backendclass, 'r':e})
00035             raise
00036         
00037         try:
00038             storeclass = str(cfg.Get('StoreClass'))
00039             storecfg = cfg.Get('StoreCfg')
00040             self.store = import_class_from_string(storeclass)(storecfg)
00041         except:
00042             log.msg(arc.ERROR, 'Cannot import store class', storeclass)
00043             raise
00044             
00045         try:
00046             self.serviceID = str(cfg.Get('ServiceID'))
00047         except:
00048             log.msg(arc.ERROR, 'Cannot get serviceID')
00049             raise
00050             
00051         try:
00052             self.period = float(str(cfg.Get('CheckPeriod')))
00053             self.min_interval = float(str(cfg.Get('MinCheckInterval')))
00054         except:
00055             log.msg(arc.ERROR, 'Cannot set CheckPeriod, MinCheckInterval')
00056             raise
00057                     
00058         try:
00059             self.creating_timeout = float(str(cfg.Get('CreatingTimeout')))
00060         except:
00061             self.creating_timeout = 0
00062             
00063         try:
00064             self.checksum_lifetime = float(str(cfg.Get('ChecksumLifetime')))
00065         except:
00066             self.checksum_lifetime = 3600
00067 
00068         librarian_urls =  get_child_values_by_name(cfg, 'LibrarianURL')
00069         self.librarian = LibrarianClient(librarian_urls, ssl_config = self.ssl_config)
00070         if librarian_urls:
00071             log.msg(arc.INFO,'Got Librarian URLs from the config:', librarian_urls)
00072         else:
00073             isis_urls = get_child_values_by_name(cfg, 'ISISURL')
00074             if not isis_urls:
00075                 log.msg(arc.ERROR, "No Librarian URLs and no ISIS URLs found in the configuration: no self-healing!")
00076             else:
00077                 log.msg(arc.INFO,'Got ISIS URL, starting isisLibrarianThread')
00078                 threading.Thread(target = self.isisLibrarianThread, args = [isis_urls]).start()
00079 
00080         bartender_urls =  get_child_values_by_name(cfg, 'BartenderURL')
00081         self.bartender = BartenderClient(bartender_urls, ssl_config = self.ssl_config)
00082         if bartender_urls:
00083             log.msg(arc.INFO,'Got Bartender URLs from the config:', bartender_urls)
00084         else:
00085             isis_urls = get_child_values_by_name(cfg, 'ISISURL')
00086             if not isis_urls:
00087                 log.msg(arc.ERROR, "No Bartender URLs and no ISIS URLs found in the configuration: no self-healing!")
00088             else:
00089                 log.msg(arc.INFO,'Got ISIS URL, starting isisBartenderThread')
00090                 threading.Thread(target = self.isisBartenderThread, args = [isis_urls]).start()
00091 
00092         self.changed_states = self.store.list()
00093         threading.Thread(target = self.checkingThread, args = [self.period]).start()
00094         self.doReporting = True
00095         threading.Thread(target = self.reportingThread, args = []).start()
00096 
00097     def __del__(self):
00098         print "Shepherd's destructor called"
00099         self.service_is_running = False
00100         
00101     def getSpaceInformation(self):
00102         free_size = self.backend.getAvailableSpace()
00103         used_size = 0
00104         referenceIDs = self.store.list()
00105         for referenceID in referenceIDs:
00106             try:
00107                 localData = self.store.get(referenceID)
00108                 size = int(localData['size'])
00109                 used_size += size
00110             except:
00111                 pass
00112         total_size = free_size + used_size
00113         return free_size, used_size, total_size
00114     
00115     def isisLibrarianThread(self, isis_urls):
00116         while self.service_is_running:
00117             try:
00118                 if self.librarian.urls:
00119                     time.sleep(30)
00120                 else:
00121                     time.sleep(3)
00122                 log.msg(arc.INFO,'Getting Librarians from ISISes')
00123                 for isis_url in isis_urls:
00124                     if not self.service_is_running:
00125                         return
00126                     log.msg(arc.INFO,'Trying to get Librarian from', isis_url)
00127                     isis = ISISClient(isis_url, ssl_config = self.ssl_config)        
00128                     librarian_urls = isis.getServiceURLs(librarian_servicetype)
00129                     log.msg(arc.INFO, 'Got Librarian from ISIS:', librarian_urls)
00130                     if librarian_urls:
00131                         self.librarian = LibrarianClient(librarian_urls, ssl_config = self.ssl_config)
00132                         break
00133             except Exception, e:
00134                 log.msg(arc.WARNING, 'Error in isisLibrarianThread: %s' % e)
00135                 
00136     
00137     def isisBartenderThread(self, isis_urls):
00138         while self.service_is_running:
00139             try:
00140                 if self.bartender.urls:
00141                     time.sleep(30)
00142                 else:
00143                     time.sleep(3)
00144                 log.msg(arc.INFO,'Getting Bartenders from ISISes')
00145                 for isis_url in isis_urls:
00146                     if not self.service_is_running:
00147                         return
00148                     log.msg(arc.INFO,'Trying to get Bartender from', isis_url)
00149                     isis = ISISClient(isis_url, ssl_config = self.ssl_config)        
00150                     bartender_urls = isis.getServiceURLs(bartender_servicetype)
00151                     log.msg(arc.INFO, 'Got Bartender from ISIS:', bartender_urls)
00152                     if bartender_urls:
00153                         self.bartender = BartenderClient(bartender_urls, ssl_config = self.ssl_config)
00154                         break
00155             except Exception, e:
00156                 log.msg(arc.WARNING, 'Error in isisBartenderThread: %s' % e)
00157                 
00158 
00159     def reportingThread(self):
00160         # at the first start just wait for a few seconds
00161         time.sleep(5)
00162         while self.service_is_running:
00163             # do this forever
00164             try:
00165                 # if reporting is on
00166                 if self.doReporting:
00167                     # this list will hold the list of changed files we want to report
00168                     filelist = []
00169                     # when the state of a file changed somewhere the file is appended to the global 'changed_states' list
00170                     # so this list contains the files which state is changed between the last and the current cycle
00171                     while len(self.changed_states) > 0: # while there is changed file
00172                         # get the first one. the changed_states list contains referenceIDs
00173                         changed = self.changed_states.pop()
00174                         # get its local data (GUID, size, state, etc.)
00175                         localData = self.store.get(changed)
00176                         if not localData.has_key('GUID'):
00177                             log.msg(arc.VERBOSE, 'Error in shepherd.reportingThread()\n\treferenceID is in changed_states, but not in store')
00178                         else:
00179                             # add to the filelist the GUID, the referenceID and the state of the file
00180                             filelist.append((localData.get('GUID'), changed, localData.get('state')))
00181                     #print 'reporting', self.serviceID, filelist
00182                     # call the report method of the librarian with the collected filelist and with our serviceID
00183                     try:
00184                         next_report = self.librarian.report(self.serviceID, filelist)
00185                     except:
00186                         log.msg(arc.VERBOSE, 'Error sending report message to the Librarian, reason:', traceback.format_exc())
00187                         # if next_report is below zero, then we will send everything again
00188                         next_report = -1
00189                     # we should get the time of the next report
00190                     # if we don't get any time, do the next report 10 seconds later
00191                     if not next_report:
00192                         next_report = 10
00193                     last_report = time.time()
00194                     # if the next report time is below zero it means:
00195                     if next_report < 0: # 'please send all'
00196                         log.msg(arc.VERBOSE, 'reporting - asked to send all file data again')
00197                         # add the full list of stored files to the changed_state list - all the files will be reported next time (which is immediately, see below)
00198                         self.changed_states.extend(self.store.list())
00199                     # let's wait until there is any changed file or the reporting time is up - we need to do report even if no file changed (as a heartbeat)
00200                     time.sleep(10)
00201                     while len(self.changed_states) == 0 and last_report + next_report * 0.5 > time.time():
00202                         time.sleep(10)
00203                 else:
00204                     time.sleep(10)
00205             except:
00206                 log.msg()
00207                 time.sleep(10)
00208         
00209     def toggleReport(self, doReporting):
00210         self.doReporting = doReporting
00211         return str(self.doReporting)
00212     
00213     def _checking_checksum(self, referenceID, localData):
00214         # get the current state (which is stored locally) or an empty string if it somehow has no state
00215         state = localData.get('state','')
00216         # hack: if the file's state is ALIVE then only check the checksum if the last check was a long time ago
00217         current_time = time.time()
00218         if state != ALIVE or current_time - localData.get('lastChecksumTime',-1) > self.checksum_lifetime:
00219             # ask the backend to create the checksum of the file 
00220             try:
00221                 current_checksum = self.backend.checksum(localData['localID'], localData['checksumType'])
00222                 log.msg(arc.DEBUG, 'self.backend.checksum was called on %(rID)s, the calculated checksum is %(cs)s' % {'rID':referenceID, 'cs':current_checksum})
00223                 self.store.lock()
00224                 try:
00225                     current_local_data = self.store.get(referenceID)
00226                     current_local_data['lastChecksumTime'] = current_time
00227                     current_local_data['lastChecksum'] = current_checksum
00228                     self.store.set(referenceID, current_local_data)
00229                     self.store.unlock()
00230                 except:
00231                     log.msg()
00232                     self.store.unlock()
00233                     current_checksum = None
00234             except:
00235                 current_checksum = None
00236         else:
00237             current_checksum = localData.get('lastChecksum', None)
00238         # get the original checksum
00239         checksum = localData['checksum']
00240         #print '-=-', referenceID, state, checksum, current_checksum
00241         if checksum == current_checksum:
00242             # if the original and the current checksum is the same, then the replica is valid
00243             if state in [INVALID, CREATING, STALLED]:
00244                 # if it is currently INVALID or CREATING or STALLED, its state should be changed
00245                 log.msg(arc.VERBOSE, '\nCHECKSUM OK', referenceID)
00246                 self.changeState(referenceID, ALIVE)
00247                 state = ALIVE
00248             return state
00249         else:
00250             # or if the checksum is not the same - we have a corrupt file, or a not-fully-uploaded one
00251             if state == CREATING: 
00252                 # if the file's local state is CREATING, that's OK, the file is still being uploaded
00253                 if self.creating_timeout:
00254                     # but if the file is in CREATED state for a long time, then maybe something was wrong, we should change its state
00255                     now = time.time()
00256                     if now - float(localData.get('created', now)) > self.creating_timeout:
00257                         self.changeState(referenceID, STALLED)
00258                         return STALLED
00259                 return CREATING
00260             if state in [DELETED, STALLED]:
00261                 # if the file is DELETED or STALLED we don't care if the checksum is wrong
00262                 return state
00263             if state != INVALID:
00264                 # but if it is not CREATING, not DELETED or STALLED and not INVALID - then it was ALIVE: now its state should be changed to INVALID
00265                 log.msg(arc.VERBOSE, '\nCHECKSUM MISMATCH', referenceID, 'original:', checksum, 'current:', current_checksum)
00266                 self.changeState(referenceID, INVALID)
00267             return INVALID
00268         
00269     def _file_arrived(self, referenceID):
00270         # this is called usually by the backend when a file arrived (gets fully uploaded)
00271         # call the checksum checker which will change the state to ALIVE if its checksum is OK, but leave it as CREATING if the checksum is wrong
00272         # TODO: either this checking should be in seperate thread, or the backend's should call this in a seperate thread?
00273         localData = self.store.get(referenceID)
00274         GUID = localData['GUID']
00275         trials = 3 # if arccp hasn't set the right checksum yet, try to wait for it
00276         while localData['checksum'] == '' and trials > 0:
00277             trials = trials - 1
00278             # first sleep 1 sec, then 2, then 3
00279             time.sleep(3 - trials)
00280             # check if the cheksum changed in the Librarian
00281             metadata = self.librarian.get([GUID])[GUID]
00282             localData = self._refresh_checksum(referenceID, localData, metadata)
00283         state = self._checking_checksum(referenceID, localData)
00284         # if _checking_checksum haven't change the state to ALIVE: the file is corrupt
00285         # except if the checksum is '', then the arccp hasn't set the right checksum yet
00286         if state == CREATING and localData['checksum'] != '':
00287             self.changeState(referenceID, INVALID)
00288 
00289     def _refresh_checksum(self, referenceID, localData, metadata):
00290         checksum = localData['checksum']
00291         checksumType = localData['checksumType']
00292         librarian_checksum = metadata.get(('states','checksum'), checksum)
00293         librarian_checksumType = metadata.get(('states','checksumType'), checksumType)
00294         if checksum != librarian_checksum or checksumType != librarian_checksumType:
00295             # refresh the checksum
00296             self.store.lock()
00297             current_local_data = self.store.get(referenceID)
00298             try:
00299                 if not current_local_data: # what?
00300                     self.store.unlock()
00301                 else:
00302                     current_local_data['checksum'] = librarian_checksum
00303                     current_local_data['checksumType'] = librarian_checksumType
00304                     log.msg(arc.VERBOSE, 'checksum refreshed', current_local_data)
00305                     self.store.set(referenceID, current_local_data)
00306                     self.store.unlock()
00307                     return current_local_data
00308             except:
00309                 log.msg()
00310                 self.store.unlock()
00311                 return current_local_data
00312         return localData
00313 
00314     def checkingThread(self, period):
00315         # first just wait a few seconds
00316         time.sleep(10)
00317         while self.service_is_running:
00318             # do this forever
00319             try:
00320                 # get the referenceIDs of all the stored files
00321                 referenceIDs = self.store.list()
00322                 # count them
00323                 number = len(referenceIDs)
00324                 alive_GUIDs = []
00325                 # if there are stored files at all
00326                 if number > 0:
00327                     # we should check all the files periodically, with a given period, which determines the checking interval for one file
00328                     interval = period / number
00329                     # but we don't want to run constantly, after a file is checked we should wait at least a specified amount of time
00330                     if interval < self.min_interval:
00331                         interval = self.min_interval
00332                     log.msg(arc.VERBOSE,'\n', self.serviceID, 'is checking', number, 'files with interval', interval)
00333                     # randomize the list of files to be checked
00334                     random.shuffle(referenceIDs)
00335                     # start checking the first one
00336                     for referenceID in referenceIDs:
00337                         try:
00338                             localData = self.store.get(referenceID)
00339                             #print localData
00340                             GUID, localID = localData['GUID'], localData['localID']
00341                             # first we get the file's metadata from the librarian
00342                             metadata = self.librarian.get([GUID])[GUID]
00343                             # check if the cheksum changed in the Librarian
00344                             localData = self._refresh_checksum(referenceID, localData, metadata)
00345                             # check the real checksum of the file: if the checksum is OK or not, it changes the state of the replica as well
00346                             # and it returns the state and the GUID and the localID of the file
00347                             #   if _checking_checksum changed the state then the new state is returned here:
00348                             state = self._checking_checksum(referenceID, localData)
00349                             # now the file's state is according to its checksum
00350                             # checksum takes time, so refreshing metadata...
00351                             metadata = self.librarian.get([GUID])[GUID]
00352                             # if it is CREATING or ALIVE:
00353                             if state == CREATING or state == ALIVE:
00354                                 # if this metadata is not a valid file then the file must be already removed
00355                                 if metadata.get(('entry', 'type'), '') != 'file':
00356                                     # it seems this is not a real file anymore
00357                                     # we should remove it
00358                                     bsuccess = self.backend.remove(localID)
00359                                     self.store.set(referenceID, None)
00360                                 # if the file is ALIVE (which means it is not CREATING or DELETED)
00361                                 if state == ALIVE:
00362                                     if GUID in alive_GUIDs:
00363                                         # this means that we already have an other alive replica of this file
00364                                         log.msg(arc.VERBOSE, '\n\nFile', GUID, 'has more than one replicas on this storage element.')
00365                                         self.changeState(referenceID, DELETED)
00366                                     else:    
00367                                         # check the number of needed replicas
00368                                         needed_replicas = int(metadata.get(('states','neededReplicas'),1))
00369                                         #print metadata.items()
00370                                         # find myself among the locations
00371                                         mylocation = serialize_ids([self.serviceID, referenceID])
00372                                         myself = [v for (s, p), v in metadata.items() if s == 'locations' and p == mylocation]
00373                                         #print myself
00374                                         if not myself or myself[0] != ALIVE:
00375                                             # if the state of this replica is not proper in the Librarian, fix it
00376                                             metadata[('locations', mylocation)] = ALIVE
00377                                             self.changeState(referenceID, ALIVE)
00378                                         # get the number of shepherds with alive (or creating) replicas
00379                                         alive_replicas = len(dict([(property.split(' ')[0], value) 
00380                                                                    for (section, property), value in metadata.items()
00381                                                                    if section == 'locations' and value in [ALIVE, CREATING]]))
00382                                         if alive_replicas < needed_replicas:
00383                                             # if the file has fewer replicas than needed
00384                                             log.msg(arc.VERBOSE, '\n\nFile', GUID, 'has fewer replicas than needed.')
00385                                             # we offer our copy to replication
00386                                             try:
00387                                                 response = self.bartender.addReplica({'checkingThread' : GUID}, common_supported_protocols)
00388                                                 success, turl, protocol = response['checkingThread']
00389                                             except:
00390                                                 success = ''
00391                                             #print 'addReplica response', success, turl, protocol
00392                                             if success == 'done':
00393                                                 # if it's OK, we asks the backend to upload our copy to the TURL we got from the bartender
00394                                                 self.backend.copyTo(localID, turl, protocol)
00395                                                 # TODO: this should be done in some other thread
00396                                             else:
00397                                                 log.msg(arc.VERBOSE, 'checkingThread error, bartender responded', success)
00398                                             # so this GUID has an alive replica here
00399                                             alive_GUIDs.append(GUID)
00400                                         elif alive_replicas > needed_replicas:
00401                                             log.msg(arc.VERBOSE, '\n\nFile', GUID, 'has %d more replicas than needed.' % (alive_replicas-needed_replicas))
00402                                             thirdwheels = len([property for (section, property), value in metadata.items()
00403                                                                if section == 'locations' and value == THIRDWHEEL])
00404                                             if thirdwheels == 0:
00405                                                 self.changeState(referenceID, THIRDWHEEL)
00406                                         else:
00407                                             # so this GUID has an alive replica here
00408                                             alive_GUIDs.append(GUID)
00409                             # or if this replica is not needed
00410                             elif state == THIRDWHEEL:
00411                                 # check the number of needed replicasa
00412                                 needed_replicas = int(metadata.get(('states','neededReplicas'),1))
00413                                 # and the number of alive replicas
00414                                 alive_replicas = len([property for (section, property), value in metadata.items()
00415                                                           if section == 'locations' and value == ALIVE])
00416                                 # get the number of THIRDWHEELS not on this Shepherd (self.serviceID)
00417                                 thirdwheels = len([property for (section, property), value in metadata.items()
00418                                                    if section == 'locations' and value == THIRDWHEEL and not property.startswith(self.serviceID)])
00419                                 my_replicas = len([property for (section, property), value in metadata.items()
00420                                                    if section == 'locations' and value == ALIVE 
00421                                                    and property.startswith(self.serviceID)])
00422                                 # if shephered has other alive replicas or no-one else have a thirdwheel replica, 
00423                                 # and the file still has enough replicas, we delete this replica
00424                                 if my_replicas != 0 or (thirdwheels == 0 and alive_replicas >= needed_replicas):
00425                                     #bsuccess = self.backend.remove(localID)
00426                                     #self.store.set(referenceID, None)
00427                                     self.changeState(referenceID, DELETED)
00428                                 # else we sheepishly set the state back to ALIVE
00429                                 else:
00430                                     self.changeState(referenceID, ALIVE)
00431                                     state = ALIVE
00432                             # or if this replica is INVALID
00433                             elif state == INVALID:
00434                                 log.msg(arc.VERBOSE, '\n\nI have an invalid replica of file', GUID, '- now I remove it.')
00435                                 self.changeState(referenceID, DELETED)      
00436                                 #
00437                                 # # disabling pull-method of self-healing # #
00438                                 #                          
00439                                 #my_replicas = len([property for (section, property), value in metadata.items()
00440                                 #                   if section == 'locations' and value in [ALIVE,CREATING] 
00441                                 #                   and property.startswith(self.serviceID)])
00442                                 # we try to get a valid one by simply downloading this file
00443                                 #try:
00444                                 #    response = self.bartender.getFile({'checkingThread' : (GUID, common_supported_protocols)})
00445                                 #    success, turl, protocol = response['checkingThread']
00446                                 #except:
00447                                 #    success = traceback.format_exc()
00448                                 #if success == 'done':
00449                                 #    # if it's OK, then we change the state of our replica to CREATING
00450                                 #    self.changeState(referenceID, CREATING)
00451                                 #    # then asks the backend to get the file from the TURL we got from the bartender
00452                                 #    self.backend.copyFrom(localID, turl, protocol)
00453                                 #    # and after this copying is done, we indicate that it's arrived
00454                                 #    self._file_arrived(referenceID)
00455                                 #    # TODO: this should be done in some other thread
00456                                 #else:
00457                                 #    log.msg(arc.VERBOSE, 'checkingThread error, bartender responded', success)
00458                             elif state == OFFLINE:
00459                                 # online now
00460                                 state = ALIVE
00461                                 self.changeState(referenceID, ALIVE)
00462                             if state == DELETED:
00463                                 # remove replica if marked it as deleted
00464                                 bsuccess = self.backend.remove(localID)
00465                                 self.store.set(referenceID, None)
00466                         except:
00467                             log.msg(arc.VERBOSE, 'ERROR checking checksum of %(rID)s, reason: %(r)s' % {'rID':referenceID, 'r':traceback.format_exc()})
00468                         # sleep for interval +/- 0.5*interval seconds to avoid race condition
00469                         time.sleep(interval+((random.random()-0.5)*interval))
00470                 else:
00471                     time.sleep(period)
00472             except:
00473                 log.msg()
00474 
00475     def changeState(self, referenceID, newState, onlyIf = None):
00476         # change the file's local state and add it to the list of changed files
00477         self.store.lock()
00478         try:
00479             localData = self.store.get(referenceID)
00480             if not localData: # this file is already removed
00481                 self.store.unlock()
00482                 return False
00483             oldState = localData['state']
00484             log.msg(arc.VERBOSE, 'changeState', referenceID, oldState, '->', newState)
00485             # if a previous state is given, change only if the current state is the given state
00486             if onlyIf and oldState != onlyIf:
00487                 self.store.unlock()
00488                 return False
00489             localData['state'] = newState
00490             self.store.set(referenceID, localData)
00491             self.store.unlock()
00492             # append it to the list of changed files (these will be reported)
00493             self.changed_states.append(referenceID)
00494         except:
00495             log.msg()
00496             self.store.unlock()
00497             return False
00498 
00499     def get(self, request):
00500         response = {}
00501         for requestID, getRequestData in request.items():
00502             log.msg(arc.VERBOSE, '\n\n', getRequestData)
00503             referenceID = dict(getRequestData)['referenceID']
00504             protocols = [value for property, value in getRequestData if property == 'protocol']
00505             #print 'Shepherd.get:', referenceID, protocols
00506             localData = self.store.get(referenceID)
00507             #print 'localData:', localData
00508             if localData.get('state', INVALID) == ALIVE:
00509                 if localData.has_key('localID'):
00510                     localID = localData['localID']
00511                     checksum = localData['checksum']
00512                     checksumType = localData['checksumType']
00513                     protocol_match = self.backend.matchProtocols(protocols)
00514                     if protocol_match:
00515                         protocol = protocol_match[0]
00516                         try:
00517                             turl = self.backend.prepareToGet(referenceID, localID, protocol)
00518                             if turl:
00519                                 response[requestID] = [('TURL', turl), ('protocol', protocol),
00520                                     ('checksum', localData['checksum']), ('checksumType', localData['checksumType'])]
00521                             else:
00522                                 response[requestID] = [('error', 'internal error (empty TURL)')]
00523                         except:
00524                             log.msg()
00525                             response[requestID] = [('error', 'internal error (prepareToGet exception)')]
00526                     else:
00527                         response[requestID] = [('error', 'no supported protocol found')]
00528                 else:
00529                     response[requestID] = [('error', 'no such referenceID')]
00530             else:
00531                 response[requestID] = [('error', 'file is not alive')]
00532         return response
00533 
00534     def put(self, request):
00535         #print request
00536         response = {}
00537         for requestID, putRequestData in request.items():
00538             protocols = [value for property, value in putRequestData if property == 'protocol']
00539             protocol_match = self.backend.matchProtocols(protocols)
00540             if protocol_match:
00541                 # just the first protocol
00542                 protocol = protocol_match[0]
00543                 acl = [value for property, value in putRequestData if property == 'acl']
00544                 # create a dictionary from the putRequestData which contains e.g. 'size', 'GUID', 'checksum', 'checksumType'
00545                 requestData = dict(putRequestData)
00546                 size = int(requestData.get('size'))
00547                 # ask the backend if there is enough space 
00548                 availableSpace = self.backend.getAvailableSpace()
00549                 if availableSpace and availableSpace < size:
00550                     response[requestID] = [('error', 'not enough space')]
00551                 else:
00552                     GUID = requestData.get('GUID', None)
00553                     already_have_this_file = False
00554                     if GUID:
00555                         referenceIDs = self.store.list()
00556                         for referenceID in referenceIDs:
00557                             try:
00558                                 localData = self.store.get(referenceID)
00559                                 if localData['GUID'] == GUID and localData['state'] == ALIVE:
00560                                     already_have_this_file = True;
00561                                     break
00562                             except:
00563                                 log.msg()
00564                                 pass
00565                     if already_have_this_file:
00566                         response[requestID] = [('error', 'already have this file')]
00567                     else:
00568                         # create a new referenceIDs
00569                         referenceID = arc.UUID()
00570                         # ask the backend to create a local ID
00571                         localID = self.backend.generateLocalID()
00572                         # create the local data of the new file
00573                         file_data = {'localID' : localID,
00574                             'GUID' : requestData.get('GUID', None),
00575                             'checksum' : requestData.get('checksum', None),
00576                             'checksumType' : requestData.get('checksumType', None),
00577                             'lastChecksumTime' : -1,
00578                             'lastChecksum' : '',
00579                             'size' : size,
00580                             'acl': acl,
00581                             'created': str(time.time()),
00582                             'state' : CREATING} # first it has the state: CREATING
00583                         try:
00584                             # ask the backend to initiate the transfer
00585                             turl = self.backend.prepareToPut(referenceID, localID, protocol)
00586                             if turl:
00587                                 # add the returnable data to the response dict
00588                                 response[requestID] = [('TURL', turl), ('protocol', protocol), ('referenceID', referenceID)]
00589                                 # store the local data
00590                                 self.store.set(referenceID, file_data)
00591                                 # indicate that this file is 'changed': it should be reported in the next reporting cycle (in reportingThread)
00592                                 self.changed_states.append(referenceID)
00593                             else:
00594                                 response[requestID] = [('error', 'internal error (empty TURL)')]
00595                         except Exception, e:
00596                             log.msg()
00597                             response[requestID] = [('error', 'internal error (prepareToPut exception: %s)' % e)]
00598             else:
00599                 response[requestID] = [('error', 'no supported protocol found')]
00600         return response
00601 
00602     def delete(self,request):
00603         response = {}
00604         for requestID, referenceID in request.items():
00605             localData = self.store.get(referenceID)
00606             try:
00607                 # note that actual deletion is done in self.reportingThread
00608                 self.changeState(referenceID, DELETED)
00609                 response[requestID] = 'deleted'
00610             except:
00611                 response[requestID] = 'nosuchfile'
00612         return response
00613 
00614 
00615     def stat(self, request):
00616         properties = ['state', 'checksumType', 'checksum', 'acl', 'size', 'GUID', 'localID']
00617         response = {}
00618         for requestID, referenceID in request.items():
00619             localData = self.store.get(referenceID)
00620             response[requestID] = [referenceID]
00621             for p in properties:
00622                 response[requestID].append(localData.get(p, None))
00623         return response
00624 
00625 from arcom.service import Service
00626 
00627 class ShepherdService(Service):
00628 
00629     def __init__(self, cfg):
00630         try:
00631             serviceID = str(cfg.Get('ServiceID')).split('/')[-1]
00632         except:
00633             serviceID = "Shepherd"
00634         self.service_name = serviceID
00635         # names of provided methods
00636         request_names = ['get', 'put', 'stat', 'delete', 'toggleReport']
00637         # create the business-logic class
00638         self.shepherd = Shepherd(cfg)
00639         # get the additional request names from the backend
00640         backend_request_names = self.shepherd.backend.public_request_names
00641         # bring the additional request methods into the namespace of this object
00642         for name in backend_request_names:
00643             if not hasattr(self, name):
00644                 setattr(self, name, getattr(self.shepherd.backend, name))
00645                 request_names.append(name)
00646         # call the Service's constructor
00647         Service.__init__(self, [{'request_names' : request_names, 'namespace_prefix': 'she', 'namespace_uri': shepherd_uri}], cfg)
00648 
00649     def stat(self, inpayload):
00650         request = parse_node(inpayload.Child().Child(), ['requestID', 'referenceID'], single = True)
00651         response = self.shepherd.stat(request)
00652         #print response
00653         return create_response('she:stat',
00654             ['she:requestID', 'she:referenceID', 'she:state', 'she:checksumType', 'she:checksum', 'she:acl', 'she:size', 'she:GUID', 'she:localID'], response, self._new_soap_payload())
00655     
00656 
00657     def delete(self, inpayload):
00658         request = parse_node(inpayload.Child().Child(), ['requestID', 'referenceID'], single = True)
00659         response = self.shepherd.delete(request)
00660         tree = XMLTree(from_tree = 
00661             ('she:deleteResponseList',[
00662                 ('she:deleteResponseElement',[
00663                     ('she:requestID', requestID),
00664                     ('she:status', status)
00665                     ]) for requestID, status in response.items()
00666                 ])
00667             )
00668         out = self._new_soap_payload()
00669         response_node = out.NewChild('deleteresponse')
00670         tree.add_to_node(response_node)
00671         return out
00672 
00673     def _putget_in(self, putget, inpayload):
00674         request = dict([
00675             (str(node.Get('requestID')), [
00676                 (str(n.Get('property')), str(n.Get('value')))
00677                     for n in get_child_nodes(node.Get(putget + 'RequestDataList'))
00678             ]) for node in get_child_nodes(inpayload.Child().Child())])
00679         return request
00680 
00681     def _putget_out(self, putget, response):
00682         #print response
00683         tree = XMLTree(from_tree =
00684             ('she:' + putget + 'ResponseList', [
00685                 ('she:' + putget + 'ResponseElement', [
00686                     ('she:requestID', requestID),
00687                     ('she:' + putget + 'ResponseDataList', [
00688                         ('she:' + putget + 'ResponseDataElement', [
00689                             ('she:property', property),
00690                             ('she:value', value)
00691                         ]) for property, value in responseData
00692                     ])
00693                 ]) for requestID, responseData in response.items()
00694             ])
00695         )
00696         out = self._new_soap_payload()
00697         response_node = out.NewChild(putget + 'response')
00698         tree.add_to_node(response_node)
00699         return out
00700 
00701     def get(self, inpayload):
00702         # if inpayload.auth:
00703         #     print 'Shepherd auth "get": ', inpayload.auth
00704         request = self._putget_in('get', inpayload)
00705         response = self.shepherd.get(request)
00706         return self._putget_out('get', response)
00707 
00708     def put(self, inpayload):
00709         request = self._putget_in('put', inpayload)
00710         response = self.shepherd.put(request)
00711         return self._putget_out('put', response)
00712 
00713     def toggleReport(self, inpayload):
00714         doReporting = str(inpayload.Child().Get('doReporting'))
00715         response = self.shepherd.toggleReport(doReporting == true)
00716         out = self._new_soap_payload()
00717         response_node = out.NewChild('lbr:toggleReportResponse')
00718         response_node.Set(response)
00719         return out
00720 
00721     def RegistrationCollector(self, doc):
00722         regentry = arc.XMLNode('<RegEntry />')
00723         regentry.NewChild('SrcAdv').NewChild('Type').Set(shepherd_servicetype)
00724         #Place the document into the doc attribute
00725         doc.Replace(regentry)
00726         return True
00727 
00728     def GetAdditionalLocalInformation(self, service_node):
00729         service_node.Name('StorageService')
00730         service_node.NewChild('Type').Set(shepherd_servicetype)
00731         capacity_node = service_node.NewChild('StorageServiceCapacity')
00732         free_size, used_size, total_size = self.shepherd.getSpaceInformation()
00733         print free_size, used_size, total_size
00734         gigabyte = 1073741824.0
00735         capacity_node.NewChild('FreeSize').Set(str(free_size/gigabyte))
00736         capacity_node.NewChild('UsedSize').Set(str(used_size/gigabyte))
00737         capacity_node.NewChild('TotalSize').Set(str(total_size/gigabyte))