Back to index

nordugrid-arc-nox  1.1.0~rc6
Public Member Functions | Public Attributes
storage.ahash.replicatedahash.ReplicatedAHash Class Reference
Inheritance diagram for storage.ahash.replicatedahash.ReplicatedAHash:
Inheritance graph
[legend]
Collaboration diagram for storage.ahash.replicatedahash.ReplicatedAHash:
Collaboration graph
[legend]

List of all members.

Public Member Functions

def __init__
def sendMessage
def newSOAPPayload
def processMessage
def get
def change

Public Attributes

 ssl_config
 store
 ahashes
 public_request_names

Detailed Description

A replicated implementation of the A-Hash service. 

Definition at line 59 of file replicatedahash.py.


Constructor & Destructor Documentation

The constructor of the ReplicatedAHash class.

ReplicatedAHash(cfg)

Reimplemented from storage.ahash.ahash.CentralAHash.

Definition at line 62 of file replicatedahash.py.

00062 
00063     def __init__(self, cfg):
00064         """ The constructor of the ReplicatedAHash class.
00065 
00066         ReplicatedAHash(cfg)
00067 
00068         """
00069         log.msg(arc.VERBOSE, "ReplicatedAHash constructor called")
00070         # ssame ssl_config will be used for any ahash
00071         self.ssl_config = parse_ssl_config(cfg)
00072         self.store = None
00073         CentralAHash.__init__(self, cfg)
00074         self.ahashes = {}
00075         self.store = ReplicationStore(cfg, self.sendMessage)
00076         self.public_request_names = ['processMessage']
00077         # notify replication manager that communication is ready
00078         self.store.repmgr.comm_ready = True


Member Function Documentation

def storage.ahash.ahash.CentralAHash.change (   self,
  changes 
) [inherited]
Change the '(section, property) : value' entries of an object, if the given conditions are met.

change(changes)

'changes' is a dictionary of {'changeID' : 'change'}, where
    'changeID' is a reference ID used in the response list
    'change' is a (ID, changeType, section, property, value, conditions) tuple, where:
'ID' is the ID of the object which we want to change
'changeType' could be 'set', 'unset' or 'delete'
    'set' sets '(section, property)' to 'value'
    'unset' removes '(section, property)', the value does not matter
    'delete' removes the whole objects
'conditions' is a dictionary of {'conditionID' : 'condition'}, where
    'conditionID' is an ID of this condition
    'condition' is a (type, section, property, value) tuple:
        'type' could be 'is', 'isnot', 'isset', 'unset'
            'is': '(section, property)' is set to 'value'
            'isnot': '(section, property)' is not set to 'value'
            'isset': '(section, property)' is set to any value
            'unset': '(section, property)' is not set at all

This method returns a dictionary of {'changeID' : (success, conditionID)},
    where success could be:
- 'set'
- 'unset'
- 'deleted'
- 'failed'
- 'condition not met' (in this case, 'conditionID' gives the ID of the failed condition)
- 'invalid change type'
- 'unknown'

Definition at line 87 of file ahash.py.

00087 
00088     def change(self, changes):
00089         """ Change the '(section, property) : value' entries of an object, if the given conditions are met.
00090 
00091         change(changes)
00092 
00093         'changes' is a dictionary of {'changeID' : 'change'}, where
00094             'changeID' is a reference ID used in the response list
00095             'change' is a (ID, changeType, section, property, value, conditions) tuple, where:
00096                 'ID' is the ID of the object which we want to change
00097                 'changeType' could be 'set', 'unset' or 'delete'
00098                     'set' sets '(section, property)' to 'value'
00099                     'unset' removes '(section, property)', the value does not matter
00100                     'delete' removes the whole objects
00101                 'conditions' is a dictionary of {'conditionID' : 'condition'}, where
00102                     'conditionID' is an ID of this condition
00103                     'condition' is a (type, section, property, value) tuple:
00104                         'type' could be 'is', 'isnot', 'isset', 'unset'
00105                             'is': '(section, property)' is set to 'value'
00106                             'isnot': '(section, property)' is not set to 'value'
00107                             'isset': '(section, property)' is set to any value
00108                             'unset': '(section, property)' is not set at all
00109         
00110         This method returns a dictionary of {'changeID' : (success, conditionID)},
00111             where success could be:
00112                 - 'set'
00113                 - 'unset'
00114                 - 'deleted'
00115                 - 'failed'
00116                 - 'condition not met' (in this case, 'conditionID' gives the ID of the failed condition)
00117                 - 'invalid change type'
00118                 - 'unknown'
00119         """
00120         # prepare the dictionary which will hold the response
00121         response = {}
00122         for (changeID, (ID, changeType, section, property, value, conditionList)) in changes.items():
00123             # for each change in the changes list
00124             # lock the store to avoid inconsistency
00125             while not self.store.lock(blocking = False):
00126                 #print 'A-Hash cannot acquire lock, waiting...'
00127                 time.sleep(0.2)
00128             # prepare the 'success' of this change
00129             success = 'unknown'
00130             # prepare the 'conditionID' for an unmet condition
00131             unmetConditionID = ''
00132             try:
00133                 # get the current content of the object
00134                 obj = self.store.get(ID)
00135                 # now check all the conditions if there is any
00136                 ok = True
00137                 for (conditionID, (conditionType, conditionSection,
00138                         conditionProperty, conditionValue)) in conditionList.items():
00139                     # get the current value of the conditional (section, property), or None if it is not set
00140                     currentValue = obj.get((conditionSection, conditionProperty), None)
00141                     if conditionType == 'is':
00142                         # if the (section, property) is not set to value
00143                         if currentValue != conditionValue:
00144                             ok = False
00145                             unmetConditionID = conditionID
00146                             # jump out of the for statement
00147                             break
00148                     elif conditionType == 'isnot':
00149                         # if the (section, property) is set to value
00150                         if currentValue == conditionValue:
00151                             ok = False
00152                             unmetConditionID = conditionID
00153                             break
00154                     elif conditionType == 'isset':
00155                         # if the (section, property) is not set:
00156                         if currentValue is None:
00157                             ok = False
00158                             unmetConditionID = conditionID
00159                             break
00160                     elif conditionType == 'unset':
00161                         # if the (section, property) is set:
00162                         if currentValue is not None:
00163                             ok = False
00164                             unmetConditionID = conditionID
00165                             break
00166                 # if 'ok' is true then all conditions are met
00167                 if ok:
00168                     if changeType == 'set':
00169                         # sets the new value (overwrites old value of this (section, property))
00170                         obj[(section, property)] = value
00171                         # store the changed object
00172                         self.store.set(ID, obj)
00173                         # set the result of this change
00174                         success = 'set'
00175                     elif changeType == 'unset':
00176                         # removes the entry of (section, property)
00177                         del obj[(section, property)]
00178                         # store the changed object
00179                         self.store.set(ID, obj)
00180                         # set the result of this change
00181                         success = 'unset'
00182                     elif changeType == 'delete':
00183                         # remove the whole object
00184                         self.store.set(ID, None)
00185                         success = 'deleted'
00186                     else:
00187                         # there is no other changeType
00188                         success = 'invalid change type'
00189                 else:
00190                     success = 'condition not met'
00191             except:
00192                 # if there was an exception, set this to failed
00193                 success = 'failed'
00194                 log.msg()
00195             # we are done, release the lock
00196             self.store.unlock()
00197             # append the result of the change to the response list
00198             response[changeID] = (success, unmetConditionID)
00199         return response

def storage.ahash.ahash.CentralAHash.get (   self,
  ids,
  neededMetadata = [] 
) [inherited]
Gets all data of the given IDs.

get(ids)

'ids' is a list of strings: the IDs of the requested object
Returns a dictionary of { ID : metadata }
    where 'metadata' is a dictionary where the keys are ('section', 'property') tuples

Definition at line 63 of file ahash.py.

00063 
00064     def get(self, ids, neededMetadata = []):
00065         """ Gets all data of the given IDs.
00066 
00067         get(ids)
00068 
00069         'ids' is a list of strings: the IDs of the requested object
00070         Returns a dictionary of { ID : metadata }
00071             where 'metadata' is a dictionary where the keys are ('section', 'property') tuples
00072         """
00073         if neededMetadata: # if the needed metadata is given
00074             # if a property is empty in a (section, property) pair
00075             # it means we need all properties from that section
00076             allpropsections = [section for (section, property) in neededMetadata if property == '']
00077             # gets the metadata for each ID, filters it and creates an {ID : metadata} dictionary
00078             return dict([(
00079                 ID,
00080                 dict([((section, property), value) # for all metadata entry of this object
00081                     for (section, property), value in self.store.get(ID).items()
00082                         # if this (section, property) is needed or if this section needs all the properties
00083                         if (section, property) in neededMetadata or section in allpropsections])
00084             ) for ID in ids])
00085         else: # gets the metadata for each ID, and creates an {ID : metadata} dictionary
00086             return dict([(ID, self.store.get(ID)) for ID in ids])

Definition at line 102 of file replicatedahash.py.

00102 
00103     def newSOAPPayload(self):
00104         return arc.PayloadSOAP(self.ns)

Here is the caller graph for this function:

processing ahash replication message

Definition at line 105 of file replicatedahash.py.

00105 
00106     def processMessage(self, inpayload):
00107         """
00108         processing ahash replication message
00109         """
00110         log.msg(arc.VERBOSE, "processing message...")
00111         # get the grandchild of the root node, which is the 'changeRequestList'
00112         request_node = inpayload.Child()
00113         msg = eval(base64.decodestring(str(request_node.Get('msg'))))
00114         control = msg['control']
00115         record = msg['record']
00116         eid = msg['eid']
00117         retlsn = msg['lsn']
00118         sender = msg['sender']
00119         msgID = msg['msgID']
00120 
00121         resp = self.store.repmgr.processMessage(control, record, eid, retlsn, 
00122                                             sender, msgID)
00123         # prepare the response payload
00124         out = self.newSOAPPayload()
00125         # create the 'changeResponse' node
00126         response_node = out.NewChild('ahash:processResponse')
00127         # create an XMLTree for the response
00128         tree = XMLTree(from_tree = ('ahash:success', resp))
00129         # add the XMLTree to the XMLNode
00130         tree.add_to_node(response_node)
00131         log.msg(arc.VERBOSE, "processing message... Finished")
00132         return out

Here is the call graph for this function:

def storage.ahash.replicatedahash.ReplicatedAHash.sendMessage (   self,
  url,
  repmsg 
)
Function used for callbacks from the communication framework
of the replication manager
Sends repmsg to url using HED

Definition at line 79 of file replicatedahash.py.

00079 
00080     def sendMessage(self, url, repmsg):
00081         """
00082         Function used for callbacks from the communication framework
00083         of the replication manager
00084         Sends repmsg to url using HED
00085         """
00086         if not url in self.ahashes.keys():
00087             self.ahashes[url] = AHashClient(url, ssl_config=self.ssl_config,
00088                                             print_xml = False)
00089         ahash = self.ahashes[url]
00090         repmsg = base64.encodestring(str(repmsg))
00091         tree = XMLTree(from_tree = 
00092                        ('ahash:processMessage', [
00093                            ('ahash:msg', repmsg)
00094                            ]))
00095         log.msg(arc.VERBOSE, "sending message of length", len(repmsg), "to", url)
00096         msg = ahash.call(tree)
00097         ahash.reset()
00098         xml = ahash.xmlnode_class(msg)
00099         success = str(get_data_node(xml))
00100         log.msg(arc.VERBOSE, "sendt message, success=%s"%success)
00101         return success

Here is the call graph for this function:


Member Data Documentation

Definition at line 73 of file replicatedahash.py.

Reimplemented from storage.ahash.ahash.CentralAHash.

Definition at line 75 of file replicatedahash.py.

Definition at line 70 of file replicatedahash.py.

Reimplemented from storage.ahash.ahash.CentralAHash.

Definition at line 71 of file replicatedahash.py.


The documentation for this class was generated from the following file: