Back to index

plone3  3.1.7
lockable.py
Go to the documentation of this file.
00001 from zope.interface import implements
00002 from zope.component import adapts
00003 
00004 from persistent.dict import PersistentDict
00005 
00006 from zope.annotation.interfaces import IAnnotations
00007 
00008 from AccessControl import getSecurityManager
00009 from webdav.LockItem import LockItem, MAXTIMEOUT
00010 
00011 from plone.locking.interfaces import ILockable
00012 from plone.locking.interfaces import INonStealableLock
00013 from plone.locking.interfaces import ITTWLockable
00014 from plone.locking.interfaces import STEALABLE_LOCK
00015 
00016 ANNOTATION_KEY = 'plone.locking'
00017 
00018 class TTWLockable(object):
00019     """An object that is being locked through-the-web
00020     """
00021 
00022     implements(ILockable)
00023     adapts(ITTWLockable)
00024 
00025     def __init__(self, context):
00026         self.context = context
00027         self.__locks = None
00028         
00029     def lock(self, lock_type=STEALABLE_LOCK, children=False):
00030         if not self.locked():
00031             user = getSecurityManager().getUser()
00032             depth = children and 'infinity' or 0
00033             lock = LockItem(user, depth=depth, timeout=MAXTIMEOUT)
00034             token = lock.getLockToken()
00035             self.context.wl_setLock(token, lock)
00036 
00037             self._locks()[lock_type.__name__] = dict(type = lock_type,
00038                                                   token = token)
00039 
00040     def unlock(self, lock_type=STEALABLE_LOCK, stealable_only=True):
00041         if not self.locked():
00042             return
00043 
00044         if not lock_type.stealable or not stealable_only \
00045            or self.stealable(lock_type):
00046             key = self._locks().get(lock_type.__name__, None)
00047             if key:
00048                 self.context.wl_delLock(key['token'])
00049                 del self._locks()[lock_type.__name__]
00050 
00051     def clear_locks(self):
00052         self.context.wl_clearLocks()
00053         self._locks().clear()
00054 
00055     def locked(self):
00056         return bool(self.context.wl_isLocked())
00057 
00058     def can_safely_unlock(self, lock_type=STEALABLE_LOCK):
00059         if not lock_type.user_unlockable:
00060             return False
00061 
00062         info = self.lock_info()
00063         # There is no lock, so return True
00064         if len(info) == 0:
00065             return True
00066 
00067         userid = getSecurityManager().getUser().getId() or None
00068         for l in info:
00069             # There is another lock of a different type
00070             if not hasattr(l['type'], '__name__') or \
00071                l['type'].__name__ != lock_type.__name__:
00072                 return False
00073             # The lock is in fact held by the current user
00074             if l['creator'] == userid:
00075                 return True
00076         return False
00077 
00078     def stealable(self, lock_type=STEALABLE_LOCK):
00079         # If the lock type is not stealable ever, return False
00080         if not lock_type.stealable:
00081             return False
00082         # Can't steal locks of a different type
00083         for l in self.lock_info():
00084             if not hasattr(l['type'], '__name__') or \
00085                l['type'].__name__ != lock_type.__name__:
00086                 return False
00087         # The lock type is stealable, and the object is not marked as 
00088         # non-stelaable, so return True
00089         if not INonStealableLock.providedBy(self.context):
00090             return True
00091         # Lock type is stealable, object is not stealable, but return True
00092         # anyway if we can safely unlock this object (e.g. we are the owner)
00093         return self.can_safely_unlock(lock_type)
00094 
00095     def lock_info(self):
00096         info = []
00097         rtokens = dict([(v['token'], v['type']) for v in self._locks(False).values()])
00098         for lock in self.context.wl_lockValues(1):
00099             if not lock.isValid():
00100                 continue # Skip invalid/expired locks
00101             token = lock.getLockToken()
00102             creator = lock.getCreator()
00103             # creator can be None when locked by an anonymous user
00104             if creator is not None:
00105                 creator = creator[1]
00106             info.append({
00107                 'creator' : creator,
00108                 'time'    : lock.getModifiedTime(),
00109                 'token'   : token,
00110                 'type'    : rtokens.get(token, None),
00111             })
00112         return info
00113 
00114     def _locks(self, create=True):
00115         if self.__locks is not None:
00116             return self.__locks
00117 
00118         annotations = IAnnotations(self.context)
00119         locks = annotations.get(ANNOTATION_KEY, None)
00120         if locks is None and create:
00121             locks = annotations.setdefault(ANNOTATION_KEY, PersistentDict())
00122         if locks is not None:
00123             self.__locks = locks
00124             return self.__locks
00125         else:
00126             return {}