Back to index

plone3  3.1.7
AggregatedStorage.py
Go to the documentation of this file.
00001 """
00002 AggregatedStorage for Archetypes
00003 
00004 (C) 2004, Andreas Jung & Econtec GmbH, D-90492 Nuernberg, Germany
00005 
00006 Released as open-source under the current Archetypes license
00007 
00008 $Id: AggregatedStorage.py 7768 2007-03-18 23:38:08Z nouri $
00009 """
00010 
00011 from time import time
00012 from types import StringType, DictType
00013 from threading import Lock
00014 
00015 from Storage import Storage
00016 from Registry import Registry
00017 
00018 CACHE_TIMEOUT = 5  # timeout in seconds for cache entries to expire
00019 
00020 class AggregatedStorage(Storage):
00021     """ Implementation of the AggregatedStorage proposal as described in http://plone.org/development/teams/developer/AggregatedStorage """
00022 
00023     def __init__(self, caching=0):
00024         self._reg_ag = Registry(StringType)  # registry for aggregators
00025         self._reg_dag = Registry(StringType) # registry for disaggregators
00026         self.cache = {}                      # map (objId, aggregator) -> (timestamp, result_dict)
00027         self._caching = caching
00028         self._lock = Lock()
00029         
00030     def __getstate__(self):
00031         """Override __getstate__ used for copy operations
00032         
00033         Required to fix the copy problem with the lock
00034         """
00035         state = self.__dict__
00036         state['_lock'] = None
00037         return state
00038 
00039     def __setstate__(self, state):
00040         """Override __setstate__ used for copy operations
00041         
00042         Required to fix the copy problem with the lock
00043         """
00044         state['_lock'] = Lock()
00045         self.__dict__.update(state)
00046 
00047     def registerAggregator(self, fieldname, methodname):
00048         if self._reg_ag.get(fieldname):
00049             raise KeyError('Aggregator for field "%s" already registered' % fieldname)
00050         self._reg_ag.register(fieldname, methodname)
00051 
00052 
00053     def registerDisaggregator(self, fieldname, methodname):
00054         if self._reg_dag.get(fieldname):
00055             raise KeyError('Disaggregator for field "%s" already registered' % fieldname)
00056         self._reg_dag.register(fieldname, methodname)
00057 
00058     def get(self, name, instance, **kwargs):
00059         methodname = self._reg_ag.get(name)
00060         if not methodname:
00061             raise KeyError('No aggregator registered for field "%s"' % name)
00062         method = getattr(instance, methodname)
00063         if not method:
00064             raise KeyError('Aggregator "%s" for field "%s" not found' % (methodname, name))
00065         result = method(name, instance, **kwargs)
00066         if not isinstance(result, DictType):
00067             raise TypeError('Result returned from an aggregator must be DictType')
00068         return result[name]
00069 
00070         if self._caching:
00071             cache_entry = self._cache_get(instance.getId(), methodname)
00072         else:
00073             cache_entry = None
00074 
00075         if cache_entry is None:
00076             method = getattr(instance, methodname)
00077             if not method:
00078                 raise KeyError('Aggregator "%s" for field "%s" not found' % (methodname, name))
00079             result = method(name, instance, **kwargs)
00080             if not isinstance(result, DictType):
00081                 raise TypeError('Result returned from an aggregator must be DictType')
00082 
00083             if self._caching:
00084                 self._cache_put(instance.getId(), methodname, result)
00085 
00086             if not result.has_key(name):
00087                 raise KeyError('result dictionary returned from "%s"'
00088                                ' does not contain an key for "%s"' % 
00089                                (methodname, name))
00090             return result[name]
00091         else:
00092             return cache_entry[name]
00093         
00094     def set(self, name, instance, value, **kwargs):
00095         methodname = self._reg_dag.get(name)
00096         if not methodname:
00097             raise KeyError('No disaggregator registered for field "%s"' % name)
00098 
00099         method = getattr(instance, methodname)
00100         if not method:
00101             raise KeyError('Disaggregator "%s" for field "%s" not found' % (methodname, name))
00102         if self._caching:
00103             self._cache_remove(instance.getId(), methodname)
00104         method(name, instance, value, **kwargs)
00105 
00106     def unset(self, name, instance, **kwargs):
00107         pass
00108 
00109     ######################################################################
00110     # A very basic cache implementation to cache the result dictionaries
00111     # returned by the aggregators
00112     ######################################################################
00113 
00114     def _cache_get(self, objId, methodname):
00115         """ retrieve the result dictionary for (objId, methodname) """
00116         self._lock.acquire()
00117         entry = self.cache.get((objId, methodname))
00118         if entry is None: 
00119             self._lock.release()
00120             return None
00121         if time.time() - entry[0] > CACHE_TIMEOUT: 
00122             del self.cache[(objId, methodname)]
00123             self._lock.release()
00124             return None
00125         self._lock.release()
00126         return entry[1]
00127 
00128     def _cache_put(self, objId, methodname, result):
00129         """ store (objId, methodname) : (current_time, result) in cache """
00130         self._lock.acquire()
00131         self.cache[(objId, methodname)] = (time.time(), result)
00132         self._lock.release()
00133 
00134     def _cache_remove(self, objId, methodname):
00135         """ remove (objId, methodname) from cache """
00136         
00137         self._lock.acquire()
00138         key = (objId, methodname)
00139         if self.cache.has_key(key):
00140             del self.cache[key] 
00141         self._lock.release()
00142 
00143 from Registry import registerStorage
00144 registerStorage(AggregatedStorage)