Back to index

plone3  3.1.7
workspace.py
Go to the documentation of this file.
00001 from itertools import izip, repeat
00002 from Globals import InitializeClass
00003 from Acquisition import aq_inner, aq_parent, aq_get
00004 from AccessControl import ClassSecurityInfo
00005 from Products.PageTemplates.PageTemplateFile import PageTemplateFile
00006 from zope.component import getAdapters
00007 from zope.annotation.interfaces import IAnnotations
00008 from plone.memoize.volatile import cache, DontCache
00009 
00010 from Products.PluggableAuthService.utils import classImplements
00011 from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
00012 from Products.PlonePAS.interfaces.plugins import ILocalRolesPlugin
00013 
00014 from borg.localrole.interfaces import ILocalRoleProvider
00015 
00016 # XXX: BBB interfaces, to be removed
00017 from borg.localrole.bbb.interfaces import IWorkspace
00018 from borg.localrole.bbb.interfaces import IGroupAwareWorkspace
00019 
00020 manage_addWorkspaceLocalRoleManagerForm = PageTemplateFile(
00021         "zmi/WorkspaceLocalRoleManagerForm.pt", globals(),
00022         __name__="manage_addWorkspaceRoleManagerForm")
00023 
00024 def manage_addWorkspaceLocalRoleManager(dispatcher, id, title=None, REQUEST=None):
00025     """Add a WorkspaceLocalRoleManager to a Pluggable Authentication Services."""
00026     wlrm = WorkspaceLocalRoleManager(id, title)
00027     dispatcher._setObject(wlrm.getId(), wlrm)
00028 
00029     if REQUEST is not None:
00030         REQUEST.RESPONSE.redirect(
00031                 '%s/manage_workspace?manage_tabs_message=WorkspaceLocalRoleManager+added.'
00032                 % dispatcher.absolute_url())
00033 
00034 
00035 # memoize support for `checkLocalRolesAllowed`
00036 def clra_cache_key(method, self, user, obj, object_roles):
00037     """ The cache key needs to include all arguments when caching allowed
00038         local roles, but the key function also needs to decide whether
00039         `volatile.cache` can cache or not by checking if it's possible to
00040         get a request instance from the object.
00041 
00042         To test we'll nee an adaptable object, a user and the method which
00043         results' we'd like to cache:
00044 
00045           >>> from zope.interface import implements, Interface
00046           >>> class DummyObject(object):
00047           ...     implements(Interface)
00048           >>> obj = DummyObject()
00049 
00050           >>> from borg.localrole.tests import DummyUser
00051           >>> john = DummyUser('john')
00052 
00053           >>> rm = WorkspaceLocalRoleManager('rm', 'A Role Manager')
00054           >>> fun = rm.__class__.checkLocalRolesAllowed
00055 
00056         The dummy object doesn't have an acquired request, so no caching
00057         can be done:
00058 
00059           >>> clra_cache_key(fun, 'me', john, obj, ['foo', 'bar'])
00060           Traceback (most recent call last):
00061           ...
00062           DontCache
00063 
00064         So let's add one and try again.  Before we also need to mark it as
00065         being annotatable, which normally happens elsewhere:
00066 
00067           >>> from ZPublisher.HTTPRequest import HTTPRequest
00068           >>> request = HTTPRequest('', dict(HTTP_HOST='nohost:8080'), {})
00069 
00070           >>> from Products.Five.zcml import load_config
00071           >>> import zope.component
00072           >>> import zope.annotation
00073           >>> load_config('meta.zcml', zope.component)
00074           >>> load_config('configure.zcml', zope.annotation)
00075           >>> from zope.interface import classImplements
00076           >>> from zope.annotation.interfaces import IAttributeAnnotatable
00077           >>> classImplements(HTTPRequest, IAttributeAnnotatable)
00078 
00079           >>> obj.REQUEST = request
00080           >>> clra_cache_key(fun, 'hmm', john, obj, ['foo', 'bar'])
00081           ('john', ..., ('foo', 'bar'))
00082 
00083         If the objects happens to have a `getPhysicalPath` method, that should
00084         be used instead of the hash:
00085 
00086           >>> class DummyObjectWithPath(DummyObject):
00087           ...     def getPhysicalPath(self):
00088           ...         return '42!'
00089           >>> obj = DummyObjectWithPath()
00090           >>> obj.REQUEST = request
00091           >>> clra_cache_key(fun, 'hmm', john, obj, ['foo', 'bar'])
00092           ('john', '42!', ('foo', 'bar'))
00093 
00094         Now let's check if the results of a call to `checkLocalRolesAllowed`
00095         is indeed cached, i.e. is the request was annotated correctly.  First
00096         try to log the method invocation, though.  As monkey patching in
00097         something between the original method and the already applied cache
00098         decorator is tricky, we abuse `_get_userfolder`, which is called
00099         first thing in `checkLocalRolesAllowed`:
00100 
00101           >>> original = rm._get_userfolder
00102           >>> def logger(self, *args, **kw):
00103           ...     print 'checkLocalRolesAllowed called...'
00104           ...     return original(self, *args, **kw)
00105           >>> rm._get_userfolder = logger
00106 
00107           >>> print rm.checkLocalRolesAllowed(john, obj, ['foo', 'bar'])
00108           checkLocalRolesAllowed called...
00109           None
00110           >>> IAnnotations(request)
00111           {"borg.localrole.workspace.checkLocalRolesAllowed:('john', '42!', ('foo', 'bar'))": None}
00112 
00113         Calling the method a second time should directly return the cached
00114         value, i.e. the logger shouldn't print anything:
00115 
00116           >>> print rm.checkLocalRolesAllowed(john, obj, ['foo', 'bar'])
00117           None
00118 
00119     """
00120     request = aq_get(obj, 'REQUEST', None)
00121     if IAnnotations(request, None) is None:
00122         raise DontCache
00123     try:
00124         oid = obj.getPhysicalPath()
00125     except AttributeError:
00126         oid = id(obj)
00127     return (user.getId(), oid, tuple(object_roles))
00128 
00129 def store_on_request(method, self, user, obj, object_roles):
00130     """ helper for caching local roles on the request """
00131     return IAnnotations(aq_get(obj, 'REQUEST'))
00132 
00133 
00134 class WorkspaceLocalRoleManager(BasePlugin):
00135     """This is the actual plug-in. It takes care of looking up
00136     ILocalRolesProvider adapters (when available) and granting local roles
00137     appropriately.
00138 
00139     First we need to make and register an adapter to provide some roles::
00140 
00141         >>> from zope.interface import implements, Interface
00142         >>> from zope.component import adapts
00143         >>> from borg.localrole.tests import SimpleLocalRoleProvider
00144         >>> from borg.localrole.tests import DummyUser
00145         >>> from zope.component import provideAdapter
00146         >>> provideAdapter(SimpleLocalRoleProvider, adapts=(Interface,))
00147 
00148 
00149     We need an object to adapt, we require nothing of this object,
00150     except it must be adaptable (e.g. have an interface)::
00151 
00152         >>> class DummyObject(object):
00153         ...     implements(Interface)
00154         >>> ob = DummyObject()
00155 
00156     And we need some users that we'll check the permissions of::
00157 
00158         >>> user1 = DummyUser('bogus_user')
00159         >>> user2 = DummyUser('bogus_user2')
00160 
00161     Now we're ready to make one of our RoleManagers and try it out.
00162     First we'll verify that our users have the 'Foo' role, then we'll
00163     make sure they can access objects which require that role, but not
00164     others::
00165 
00166         >>> rm = WorkspaceLocalRoleManager('rm', 'A Role Manager')
00167         >>> rm.getRolesInContext(user1, ob)
00168         ['Foo']
00169         >>> rm.checkLocalRolesAllowed(user1, ob, ['Bar', 'Foo', 'Baz'])
00170         1
00171         >>> rm.checkLocalRolesAllowed(user1, ob, ['Bar', 'Baz']) is None
00172         True
00173         >>> rm.getAllLocalRolesInContext(ob)
00174         {'bogus_user': set(['Foo'])}
00175 
00176 
00177     Multiple Role Providers
00178     -----------------------
00179 
00180     It is a bit more interesting when we have more than one adapter
00181     registered.  We register it with a name so that it supplements,
00182     rather than conflict with or override the existing adapter::
00183 
00184         >>> class LessSimpleLocalRoleProvider(SimpleLocalRoleProvider):
00185         ...     userid = 'bogus_user2'
00186         ...     roles = ('Foo', 'Baz')
00187         ...     def getRoles(self, userid):
00188         ...         '''Grant bogus_user2 the 'Foo' and 'Baz' roles'''
00189         ...         if userid == self.userid:
00190         ...             return self.roles
00191         ...         return ()
00192         ...
00193         ...     def getAllRoles(self):
00194         ...         yield (self.userid, self.roles)
00195 
00196         >>> provideAdapter(LessSimpleLocalRoleProvider, adapts=(Interface,),
00197         ...                name='adapter2')
00198 
00199    This should have no effect on our first user::
00200 
00201         >>> rm.getRolesInContext(user1, ob)
00202         ['Foo']
00203         >>> rm.checkLocalRolesAllowed(user1, ob, ['Bar', 'Foo', 'Baz'])
00204         1
00205         >>> rm.checkLocalRolesAllowed(user1, ob, ['Bar', 'Baz']) is None
00206         True
00207         >>> rm.getAllLocalRolesInContext(ob)
00208         {'bogus_user2': set(['Foo', 'Baz']), 'bogus_user': set(['Foo'])}
00209 
00210     But our second user notices the change, note that even though two
00211     of our local role providers grant the role 'Foo', it is not duplicated::
00212 
00213         >>> rm.getRolesInContext(user2, ob)
00214         ['Foo', 'Baz']
00215         >>> rm.checkLocalRolesAllowed(user2, ob, ['Bar', 'Foo', 'Baz'])
00216         1
00217         >>> rm.checkLocalRolesAllowed(user2, ob, ['Bar', 'Baz'])
00218         1
00219         >>> rm.checkLocalRolesAllowed(user2, ob, ['Bar']) is None
00220         True
00221 
00222 
00223     Role Acquisition and Blocking
00224     -----------------------------
00225 
00226     This plugin will acquire role definitions from parent objects,
00227     unless explicitly blocked.  To test this, we need some objects
00228     which support acquisition::
00229 
00230         >>> from Acquisition import Implicit
00231         >>> class DummyImplicit(DummyObject, Implicit):
00232         ...     def stupid_method(self):
00233         ...         return 1
00234         >>> root = DummyImplicit()
00235         >>> next = DummyImplicit().__of__(root)
00236         >>> last = DummyImplicit().__of__(next)
00237         >>> other = DummyImplicit().__of__(root)
00238 
00239     So we now have /root/next/last and /root/other, we'll create and
00240     register special adapters for our next and other objects.
00241 
00242         >>> class ISpecial1(Interface):
00243         ...     pass
00244         >>> class ISpecial2(Interface):
00245         ...     pass
00246         >>> from zope.interface import directlyProvides
00247         >>> directlyProvides(next, ISpecial1)
00248         >>> directlyProvides(other, ISpecial2)
00249         >>> class Adapter1(LessSimpleLocalRoleProvider):
00250         ...     adapts(ISpecial1)
00251         ...     userid = 'bogus_user'
00252         ...     roles = ('Bar',)
00253         >>> class Adapter2(LessSimpleLocalRoleProvider):
00254         ...     adapts(ISpecial2)
00255         ...     userid = 'bogus_user3'
00256         ...     roles = ('Foobar',)
00257         >>> user3 = DummyUser('bogus_user3')
00258 
00259     We'll register these to override the existing unnamed adapter:
00260 
00261         >>> provideAdapter(Adapter1)
00262         >>> provideAdapter(Adapter2)
00263 
00264     Now we can show how acquisition of roles works, first we look at the
00265     'last' item, which should have roles provided by
00266     SimpleLocalRoleProvider, and LessSimpleLocalRoleProvider, as well
00267     as acquired from Adapter1 on 'next':
00268 
00269         >>> rm.getRolesInContext(user1, last)
00270         ['Foo', 'Bar']
00271 
00272         >>> rm.getRolesInContext(user2, last)
00273         ['Foo', 'Baz']
00274 
00275     If we look at the parent, we get the same results, because the
00276     SimpleLocalRoleProvider adapter also applies to the 'root'
00277     object. However, if we enable local role blocking on 'next' we
00278     won't see the roles from the 'root'::
00279 
00280         >>> rm.getRolesInContext(user1, next)
00281         ['Foo', 'Bar']
00282         >>> next.__ac_local_roles_block__ = True
00283         >>> rm.getRolesInContext(user1, next)
00284         ['Bar']
00285 
00286     The checkLocalRolesAllowed and getAllLocalRolesInContext methods
00287     take acquisition and blocking into account as well::
00288 
00289         >>> rm.checkLocalRolesAllowed(user1, last,  ['Bar'])
00290         1
00291         >>> rm.checkLocalRolesAllowed(user1, next,  ['Foo', 'Baz']) is None
00292         True
00293         >>> rm.getAllLocalRolesInContext(last)
00294         {'bogus_user2': set(['Foo', 'Baz']), 'bogus_user': set(['Foo', 'Bar'])}
00295 
00296     It's important to note, that roles are acquired only by
00297     containment.  Additional wrapping cannot change the security on an
00298     object.  For example if we were to wrap 'last' in the context of
00299     other, which provides a special role for 'user3', we should see no
00300     effect::
00301 
00302         >>> rm.getRolesInContext(user3, last)
00303         ['Foo']
00304         >>> rm.getRolesInContext(user3, other)
00305         ['Foobar', 'Foo']
00306         >>> rm.getRolesInContext(user3, last.__of__(other))
00307         ['Foo']
00308 
00309     It's also important that methods of objects yield the same local
00310     roles as the objects would
00311 
00312         >>> rm.getRolesInContext(user3, other.stupid_method)
00313         ['Foobar', 'Foo']
00314 
00315     Group Support
00316     -------------
00317 
00318     This plugin also handles roles granted to user groups, calling up
00319     the adapters to get roles for any groups the user might belong
00320     to::
00321 
00322         >>> user4 = DummyUser('bogus_user4', ('Group1', 'Group2'))
00323         >>> user4.getGroups()
00324         ('Group1', 'Group2')
00325         >>> rm.getRolesInContext(user4, last)
00326         ['Foo']
00327         >>> class Adapter3(LessSimpleLocalRoleProvider):
00328         ...     userid = 'Group2'
00329         ...     roles = ('Foobar',)
00330 
00331         >>> provideAdapter(Adapter3, adapts=(Interface,), name='group_adapter')
00332         >>> rm.getRolesInContext(user4, last)
00333         ['Foobar', 'Foo']
00334 
00335 
00336     Wrong User Folder
00337     -----------------
00338 
00339     Finally, to ensure full test coverage, we provide a user object
00340     which pretends to be wrapped in such a way that the user folder
00341     does not recognize it.  We check that it always gets an empty set
00342     of roles and a special 0 value when checking access::
00343 
00344         >>> class BadUser(DummyUser):
00345         ...     def _check_context(self, obj):
00346         ...         return False
00347         >>> bad_user = BadUser('bad_user')
00348         >>> rm.getRolesInContext(bad_user, ob)
00349         []
00350         >>> rm.checkLocalRolesAllowed(bad_user, ob, ['Bar', 'Foo', 'Baz'])
00351         0
00352 
00353     """
00354     meta_type = "Workspace Roles Manager"
00355     security  = ClassSecurityInfo()
00356 
00357     def __init__(self, id, title=""):
00358         self.id = id
00359         self.title = title
00360 
00361     def _get_userfolder(user, obj):
00362         """Gets the unwrapped user folder for the user, because we may
00363         need to rewrap"""
00364         context = user
00365         while context is not None:
00366             if hasattr(context, 'getId'):
00367                 if context.getId() == 'acl_users':
00368                     break
00369             context = aq_parent(aq_inner(context))
00370         else:
00371             return None
00372         return aq_inner(context)
00373     #
00374     # ILocalRolesPlugin implementation
00375     #
00376 
00377     def _getAdapters(self, obj):
00378         adapters = getAdapters((obj,), ILocalRoleProvider)
00379         # this is sequence of tuples of the form (name, adapter),
00380         # we don't really care about the names
00381         return (a[1] for a in adapters)
00382 
00383     def _parent_chain(self, obj):
00384         """Iterate over the containment chain, stopping if we hit a
00385         local role blocker"""
00386         while obj is not None:
00387             yield obj
00388             if getattr(obj, '__ac_local_roles_block__', None):
00389                 raise StopIteration
00390             new = aq_parent(aq_inner(obj))
00391             # if the obj is a method we get the class
00392             obj = getattr(obj, 'im_self', new)
00393 
00394     def _get_principal_ids(self, user):
00395         """Returns a list of the ids of all involved security
00396         principals: the user and all groups that they belong
00397         to. (Note: recursive groups are not yet supported"""
00398         principal_ids = list(user.getGroups())
00399         principal_ids.insert(0, user.getId())
00400         return principal_ids
00401 
00402     security.declarePrivate("getRolesInContext")
00403     def getRolesInContext(self, user, object):
00404         # we combine the permission of the user with those of the
00405         # groups she belongs to
00406         uf = self._get_userfolder(user)
00407         if uf is not None:
00408             # rewrap user with an unwrapped user folder, so
00409             # _check_context works appropriately
00410             user = aq_inner(user)
00411             user = user.__of__(uf)
00412         principal_ids = self._get_principal_ids(user)
00413         roles = set()
00414         for obj in self._parent_chain(object):
00415             if user._check_context(obj):
00416                 count = -1
00417                 for count, a in enumerate(self._getAdapters(obj)):
00418                     for pid in principal_ids:
00419                         roles.update(a.getRoles(pid))
00420                 # XXX: BBB code, kicks in only if there's no proper adapter
00421                 if count == -1:
00422                     workspace = IGroupAwareWorkspace(obj, IWorkspace(obj, None))
00423                     if workspace is not None:
00424                         roles.update(workspace.getLocalRolesForPrincipal(user))
00425                         for group in self._groups(obj, user, workspace):
00426                             roles.update(workspace.getLocalRolesForPrincipal(group))
00427         return list(roles)
00428 
00429     security.declarePrivate("checkLocalRolesAllowed")
00430     @cache(get_key=clra_cache_key, get_cache=store_on_request)
00431     def checkLocalRolesAllowed(self, user, object, object_roles):
00432         """Checks if the user has one of the specified roles in the
00433         given context, short circuits when the first provider granting
00434         one of the roles is found."""
00435         uf = self._get_userfolder(user)
00436         if uf is not None:
00437             # rewrap user with an unwrapped user folder, so
00438             # _check_context works appropriately
00439             user = aq_inner(user)
00440             user = user.__of__(uf)
00441         check_roles = dict(izip(object_roles, repeat(True)))
00442         principal_ids = self._get_principal_ids(user)
00443         for obj in self._parent_chain(object):
00444             count = -1
00445             for count, a in enumerate(self._getAdapters(obj)):
00446                 for pid in principal_ids:
00447                     roles = a.getRoles(pid)
00448                     for role in check_roles:
00449                         if role in roles:
00450                             if user._check_context(obj):
00451                                 return 1
00452                             else:
00453                                 return 0
00454             # XXX: BBB code, kicks in only if there's no proper adapter
00455             if count == -1:
00456                 workspace = IGroupAwareWorkspace(obj, IWorkspace(obj, None))
00457                 if workspace is not None:
00458                     roles = workspace.getLocalRolesForPrincipal(user)
00459                     for role in check_roles:
00460                         if role in roles:
00461                             if user._check_context(obj):
00462                                 return 1
00463                             else:
00464                                 return 0
00465                     for group in self._groups(obj, user, workspace):
00466                         roles = workspace.getLocalRolesForPrincipal(group)
00467                         for role in check_roles:
00468                             if role in roles:
00469                                 if user._check_context(obj):
00470                                     return 1
00471                                 else:
00472                                     return 0
00473 
00474         return None
00475 
00476     security.declarePrivate("getAllLocalRolesInContext")
00477     def getAllLocalRolesInContext(self, object):
00478         rolemap = {}
00479         for obj in self._parent_chain(object):
00480             for a in self._getAdapters(obj):
00481                 iter_roles = a.getAllRoles()
00482                 for principal, roles in iter_roles:
00483                     rolemap.setdefault(principal, set()).update(roles)
00484             else: # XXX: BBB code, kicks in only if there's no proper ddapter
00485                 workspace = IGroupAwareWorkspace(obj, IWorkspace(obj, None))
00486                 if workspace is not None:
00487                     rolemap.update(workspace.getLocalRoles())
00488 
00489         return rolemap
00490                
00491     # XXX: for BBB only
00492 
00493     security.declarePrivate("_groups")
00494     def _groups(self, obj, user, workspace):
00495         """If workspace provides IGroupAwareWorkspace and the user has
00496         a getGroups() method, yield each group_id returned by that method.
00497         """
00498         if IGroupAwareWorkspace.providedBy(workspace):
00499             getGroups = getattr(user, 'getGroups', None)
00500             if getGroups is not None:
00501                 acl_users = aq_parent(aq_inner(self))
00502                 for group_id in getGroups():
00503                     yield acl_users.getGroupById(group_id)
00504 
00505 classImplements(WorkspaceLocalRoleManager, ILocalRolesPlugin)
00506 InitializeClass(WorkspaceLocalRoleManager)