Back to index

nordugrid-arc-nox  1.1.0~rc6
byteio.py
Go to the documentation of this file.
00001 from arcom.service import rbyteio_uri, byteio_simple_uri
00002 
00003 from storage.common import create_checksum, upload_to_turl, download_from_turl
00004 from storage.client import NotifyClient, ByteIOClient
00005 import traceback
00006 import arc
00007 import base64
00008 import os
00009 
00010 from arcom.logger import Logger
00011 log = Logger(arc.Logger(arc.Logger_getRootLogger(), 'Storage.ByteIO'))
00012 
00013 class ByteIOBackend:
00014 
00015     public_request_names = ['notify']
00016     supported_protocols = ['byteio']
00017 
00018     def __init__(self, backendcfg, ns_uri, file_arrived, ssl_config):
00019         self.file_arrived = file_arrived
00020         self.ns = arc.NS('she', ns_uri)
00021         self.ssl_config = ssl_config
00022         self.datadir = str(backendcfg.Get('StoreDir'))
00023         self.transferdir = str(backendcfg.Get('TransferDir'))
00024         self.turlprefix = str(backendcfg.Get('TURLPrefix'))
00025         if not os.path.exists(self.datadir):
00026             os.mkdir(self.datadir)
00027         log.msg(arc.VERBOSE, "ByteIOBackend datadir:", self.datadir)
00028         if not os.path.exists(self.transferdir):
00029             os.mkdir(self.transferdir)
00030         else:
00031             for filename in os.listdir(self.transferdir):
00032                 os.remove(os.path.join(self.transferdir, filename))
00033         log.msg(arc.VERBOSE, "ByteIOBackend transferdir:", self.transferdir)
00034         self.idstore = {}
00035 
00036     def copyTo(self, localID, turl, protocol):
00037         f = file(os.path.join(self.datadir, localID),'rb')
00038         log.msg(arc.VERBOSE, self.turlprefix, 'Uploading file to', turl)
00039         upload_to_turl(turl, protocol, f, ssl_config = self.ssl_config)
00040         f.close()
00041     
00042     def copyFrom(self, localID, turl, protocol):
00043         # TODO: download to a separate file, and if checksum OK, then copy the file 
00044         f = file(os.path.join(self.datadir, localID), 'wb')
00045         log.msg(arc.VERBOSE, self.turlprefix, 'Downloading file from', turl)
00046         download_from_turl(turl, protocol, f, ssl_config = self.ssl_config)
00047         f.close()
00048 
00049     def prepareToGet(self, referenceID, localID, protocol):
00050         if protocol not in self.supported_protocols:
00051             raise Exception, 'Unsupported protocol: ' + protocol
00052         turl_id = arc.UUID()
00053         try:
00054             os.link(os.path.join(self.datadir, localID), os.path.join(self.transferdir, turl_id))
00055             self.idstore[turl_id] = referenceID
00056             log.msg(arc.VERBOSE, self.turlprefix, '++', self.idstore)
00057             turl = self.turlprefix + turl_id
00058             return turl
00059         except:
00060             return None
00061 
00062     def prepareToPut(self, referenceID, localID, protocol):
00063         if protocol not in self.supported_protocols:
00064             raise Exception, 'Unsupported protocol: ' + protocol
00065         turl_id = arc.UUID()
00066         datapath = os.path.join(self.datadir, localID)
00067         f = file(datapath, 'wb')
00068         f.close()
00069         os.link(datapath, os.path.join(self.transferdir, turl_id))
00070         self.idstore[turl_id] = referenceID
00071         log.msg(arc.VERBOSE, self.turlprefix, '++', self.idstore)
00072         turl = self.turlprefix + turl_id
00073         return turl
00074 
00075     def remove(self, localID):
00076         try:
00077             fn = os.path.join(self.datadir, localID)
00078             os.remove(fn)
00079         except:
00080             return 'failed: ' + traceback.format_exc()
00081         return 'removed'
00082 
00083     def list(self):
00084         return os.listdir(os.datadir)
00085 
00086     def getAvailableSpace(self):
00087         return None
00088 
00089     def generateLocalID(self):
00090         return arc.UUID()
00091 
00092     def matchProtocols(self, protocols):
00093         return [protocol for protocol in protocols if protocol in self.supported_protocols]
00094 
00095     def notify(self, inpayload):
00096         request_node = inpayload.Get('notify')
00097         subject = str(request_node.Get('subject'))
00098         referenceID = self.idstore.get(subject, None)
00099         state = str(request_node.Get('state'))
00100         path = os.path.join(self.transferdir, subject)
00101         log.msg(arc.VERBOSE, self.turlprefix, 'Removing', path)
00102         os.remove(path)
00103         self.file_arrived(referenceID)
00104         out = arc.PayloadSOAP(self.ns)
00105         response_node = out.NewChild('she:notifyResponse').Set('OK')
00106         return out
00107 
00108     def checksum(self, localID, checksumType):
00109         return create_checksum(file(os.path.join(self.datadir, localID), 'rb'), checksumType)
00110 
00111 from arcom.security import parse_ssl_config
00112 from arcom.service import Service
00113 
00114 class ByteIOService(Service):
00115 
00116     def __init__(self, cfg):
00117         self.service_name = 'ByteIO'
00118         # names of provided methods
00119         request_names = ['read', 'write']
00120         # call the Service's constructor
00121         Service.__init__(self, [{'request_names' : request_names, 'namespace_prefix': 'rb', 'namespace_uri': rbyteio_uri}], cfg)
00122         self.transferdir = str(cfg.Get('TransferDir'))
00123         log.msg(arc.VERBOSE, "ByteIOService transfer dir:", self.transferdir)
00124         ssl_config = parse_ssl_config(cfg)
00125         self.notify = NotifyClient(str(cfg.Get('NotifyURL')), ssl_config = ssl_config)
00126 
00127     def _filename(self, subject):
00128         return os.path.join(self.transferdir, subject)
00129 
00130     def write(self, inpayload, subject):
00131         request_node = inpayload.Child()
00132         transfer_node = request_node.Get('transfer-information')
00133         if str(transfer_node.Attribute(0)) != byteio_simple_uri:
00134             raise Exception, 'transfer-mechanism not supported'
00135         try:
00136             fn = self._filename(subject)
00137             file(fn) # check existance
00138             f = file(fn,'wb') # open for overwriting
00139         except:
00140             log.msg()
00141             raise Exception, 'denied'
00142         encoded_data = str(transfer_node)
00143         data = base64.b64decode(encoded_data)
00144         try:
00145             f.write(data)
00146             f.close()
00147         except:
00148             log.msg()
00149             raise Exception, 'write failed'
00150         self.notify.notify(subject, 'received')
00151         out = self._new_soap_payload()
00152         response_node = out.NewChild('rb:writeResponse').Set('OK')
00153         return out
00154 
00155     def read(self, inpayload, subject):
00156         try:
00157             data = file(self._filename(subject),'rb').read()
00158         except:
00159             log.msg()
00160             data = ''
00161         self.notify.notify(subject, 'sent')
00162         out = self._new_soap_payload()
00163         response_node = out.NewChild('rb:readResponse')
00164         transfer_node = response_node.NewChild('rb:transfer-information')
00165         transfer_node.NewAttribute('transfer-mechanism').Set(byteio_simple_uri)
00166         encoded_data = base64.b64encode(data)
00167         transfer_node.Set(encoded_data)
00168         return out
00169 
00170 
00171     def _call_request(self, request_name, inmsg):
00172         # gets the last part of the request url
00173         # TODO: somehow detect if this is just the path of the service which means: no subject
00174         subject = inmsg.Attributes().get('ENDPOINT').split('/')[-1]
00175         # the subject of the byteio request: reference to the file
00176         log.msg(arc.VERBOSE, 'Subject:', subject)
00177         inpayload = inmsg.Payload()
00178         return getattr(self,request_name)(inpayload, subject)
00179