Back to index

nordugrid-arc-nox  1.1.0~rc6
hardlinkingbackend.py
Go to the documentation of this file.
00001 from storage.common import create_checksum, upload_to_turl, download_from_turl
00002 import traceback
00003 import arc
00004 import base64
00005 import os
00006 import threading
00007 import time
00008 import stat
00009 
00010 from arcom.logger import Logger
00011 log = Logger(arc.Logger(arc.Logger_getRootLogger(), 'Storage.HardlinkingBackend'))
00012 
00013 class HardlinkingBackend:
00014     """
00015     Superclass for http protocols using hardlinking scheme.
00016     Don't use this class directly. Use subclasses instead.
00017     """
00018     public_request_names = []
00019     supported_protocols = ['http']
00020 
00021     def __init__(self, backendcfg, ns_uri, file_arrived, ssl_config):
00022         """docstring for __init__"""
00023         self.ssl_config = ssl_config
00024         self.file_arrived = file_arrived
00025         #self.ns = arc.NS('she', ns_uri)
00026         self.datadir = str(backendcfg.Get('StoreDir'))
00027         if not self.datadir:
00028             raise Exception, 'No StoreDir given in the Shepherd backend config!'
00029         self.transferdir = str(backendcfg.Get('TransferDir'))
00030         self.turlprefix = str(backendcfg.Get('TURLPrefix'))
00031         if not self.turlprefix:
00032             raise Exception, 'No TURLPrefix is given in the Shepherd backend config!'
00033         if not self.turlprefix[-1] == '/':
00034             self.turlprefix = self.turlprefix + '/'
00035         if not os.path.exists(self.datadir):
00036             os.mkdir(self.datadir)
00037         if not os.path.exists(self.transferdir):
00038             os.mkdir(self.transferdir)
00039         else:
00040             for filename in os.listdir(self.transferdir):
00041                 if not ".py" in filename: # need this for apache backend
00042                     os.remove(os.path.join(self.transferdir, filename))
00043         self.idstore = {}
00044         threading.Thread(target = self.checkingThread, args = [5]).start()        
00045     
00046     def checkingThread(self, period):
00047         """docstring for checkingThread"""
00048         while True:
00049             try:
00050                 time.sleep(period)
00051                 for localID, referenceID in self.idstore.items():
00052                     filename = os.path.join(self.datadir, localID)
00053                     try:
00054                         nlink = os.stat(filename)[stat.ST_NLINK]
00055                     except:
00056                         # if the file does not exist, maybe it's already removed
00057                         del self.idstore[localID]
00058                         nlink = 0
00059                     log.msg(arc.VERBOSE, 'checking', localID, referenceID, nlink)
00060                     if nlink == 1:
00061                         # if there is just one link for this file, it is already removed from the transfer dir
00062                         self.file_arrived(referenceID)
00063                         del self.idstore[localID]
00064             except:
00065                 log.msg()
00066     
00067         
00068 
00069     def copyTo(self, localID, turl, protocol):
00070         f = file(os.path.join(self.datadir, localID),'rb')
00071         log.msg(arc.VERBOSE, self.turlprefix, 'Uploading file to', turl)
00072         upload_to_turl(turl, protocol, f, ssl_config = self.ssl_config)
00073         f.close()
00074     
00075     def copyFrom(self, localID, turl, protocol):
00076         # TODO: download to a separate file, and if checksum OK, then copy the file 
00077         f = file(os.path.join(self.datadir, localID), 'wb')
00078         log.msg(arc.VERBOSE, self.turlprefix, 'Downloading file from', turl)
00079         download_from_turl(turl, protocol, f, ssl_config = self.ssl_config)
00080         f.close()
00081 
00082     def prepareToGet(self, referenceID, localID, protocol):
00083         if protocol not in self.supported_protocols:
00084             raise Exception, 'Unsupported protocol: ' + protocol
00085         turl_id = arc.UUID()
00086         try:
00087             filepath = os.path.join(self.datadir, localID)
00088             # set it to readonly
00089             os.chmod(filepath, 0400)
00090             os.link(filepath, os.path.join(self.transferdir, turl_id))
00091             log.msg(arc.VERBOSE, self.turlprefix, '++', self.idstore)
00092             turl = self.turlprefix + turl_id
00093             return turl
00094         except:
00095             return None
00096 
00097     def prepareToPut(self, referenceID, localID, protocol):
00098         if protocol not in self.supported_protocols:
00099             raise Exception, 'Unsupported protocol: ' + protocol
00100         turl_id = arc.UUID()
00101         datapath = os.path.join(self.datadir, localID)
00102         f = file(datapath, 'wb')
00103         f.close()
00104         os.chmod(datapath, 0600)
00105         os.link(datapath, os.path.join(self.transferdir, turl_id))
00106         self.idstore[localID] = referenceID
00107         log.msg(arc.VERBOSE, self.turlprefix, '++', self.idstore)
00108         turl = self.turlprefix + turl_id
00109         return turl
00110 
00111     def remove(self, localID):
00112         try:
00113             fn = os.path.join(self.datadir, localID)
00114             os.remove(fn)
00115         except:
00116             return 'failed: ' + traceback.format_exc()
00117         return 'removed'
00118 
00119     def list(self):
00120         return os.listdir(os.datadir)
00121 
00122     def getAvailableSpace(self):
00123         try:
00124             f = os.statvfs(self.datadir)
00125             return f.f_frsize * f.f_bavail
00126         except:
00127             return None
00128 
00129     def generateLocalID(self):
00130         return arc.UUID()
00131 
00132     def matchProtocols(self, protocols):
00133         return [protocol for protocol in protocols if protocol in self.supported_protocols]
00134 
00135     def checksum(self, localID, checksumType):
00136         return create_checksum(file(os.path.join(self.datadir, localID), 'rb'), checksumType)
00137 
00138 
00139 class HopiBackend( HardlinkingBackend ):
00140     """
00141     Backend for Hopi with hardlinking
00142     """
00143 
00144     def __init__(self, *args):
00145         HardlinkingBackend.__init__(self, *args)
00146         log.msg(arc.VERBOSE, "HopiBackend datadir:", self.datadir)
00147         log.msg(arc.VERBOSE, "HopiBackend transferdir:", self.transferdir)        
00148 
00149 
00150 import pwd
00151 
00152 class ApacheBackend( HardlinkingBackend ):
00153     """
00154     Backend for Apache with hardlinking
00155     """
00156 
00157     def __init__(self, backendcfg, ns_uri, file_arrived, ssl_config):
00158         HardlinkingBackend.__init__(self, backendcfg, ns_uri, file_arrived, ssl_config)
00159         self.apacheuser = str(backendcfg.Get('ApacheUser'))
00160         self.uid = os.getuid()
00161         _, _, _, self.apachegid, _, _, _ = pwd.getpwnam(self.apacheuser) 
00162         log.msg(arc.VERBOSE, "ApacheBackend datadir:", self.datadir)
00163         log.msg(arc.VERBOSE, "ApacheBackend transferdir:", self.transferdir)
00164         
00165 
00166     def prepareToGet(self, referenceID, localID, protocol):
00167         turl = HardlinkingBackend.prepareToGet(self, referenceID, localID, protocol)
00168         if turl != None:
00169             filepath = os.path.join(self.datadir, localID)
00170             # set group to apache to make it readable to Apache
00171             os.chown(filepath, self.uid, self.apachegid)
00172             # set it to readonly
00173             os.chmod(filepath, 0440)
00174         return turl
00175 
00176     def prepareToPut(self, referenceID, localID, protocol):
00177         turl = HardlinkingBackend.prepareToPut(self, referenceID, localID, protocol)
00178         datapath = os.path.join(self.datadir, localID)
00179         # set group to apache to make it readable to Apache
00180         os.chown(datapath, self.uid, self.apachegid)
00181         # set file group writable
00182         os.chmod(datapath, 0660)
00183         return turl