Back to index

plone3  3.1.7
test_CMFCatalogAware.py
Go to the documentation of this file.
00001 ##############################################################################
00002 #
00003 # Copyright (c) 2005 Zope Corporation and Contributors. All Rights Reserved.
00004 #
00005 # This software is subject to the provisions of the Zope Public License,
00006 # Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
00007 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
00008 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00009 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
00010 # FOR A PARTICULAR PURPOSE.
00011 #
00012 ##############################################################################
00013 """Unit tests for CMFCatalogAware.
00014 
00015 $Id: test_CMFCatalogAware.py 82471 2007-12-27 18:03:03Z yuppie $
00016 """
00017 
00018 import unittest
00019 import Testing
00020 
00021 import transaction
00022 from AccessControl.SecurityManagement import newSecurityManager
00023 from OFS.Folder import Folder
00024 from OFS.SimpleItem import SimpleItem
00025 
00026 from zope.interface import implements
00027 
00028 from Products.CMFCore.CMFCatalogAware import CMFCatalogAware
00029 from Products.CMFCore.exceptions import NotFound
00030 from Products.CMFCore.interfaces import IContentish
00031 from Products.CMFCore.testing import EventZCMLLayer
00032 from Products.CMFCore.testing import TraversingZCMLLayer
00033 from Products.CMFCore.tests.test_PortalFolder import _AllowedUser
00034 from Products.CMFCore.tests.test_PortalFolder import _SensitiveSecurityPolicy
00035 from Products.CMFCore.tests.base.testcase import LogInterceptor
00036 from Products.CMFCore.tests.base.testcase import SecurityRequestTest
00037 
00038 CMF_SECURITY_INDEXES = CMFCatalogAware._cmf_security_indexes
00039 
00040 def physicalpath(ob):
00041     return '/'.join(ob.getPhysicalPath())
00042 
00043 
00044 class SimpleFolder(Folder):
00045     def __init__(self, id):
00046         self._setId(id)
00047 
00048 class DummyRoot(SimpleFolder):
00049     def getPhysicalRoot(self):
00050         return self
00051 
00052 class DummyOldBrain:
00053     def __init__(self, ob, path):
00054         self.ob = ob
00055         self.id = ob.getId()
00056         self.path = path
00057     def getPath(self):
00058         return self.path
00059     def getObject(self):
00060         if self.id == 'missing':
00061             if self.ob.GETOBJECT_RAISES:
00062                 raise NotFound("missing")
00063             else:
00064                 return None
00065         if self.id == 'hop':
00066             raise ValueError("security problem for this object")
00067         return self.ob
00068 
00069 class DummyBrain(DummyOldBrain):
00070     def _unrestrictedGetObject(self):
00071         if self.id == 'missing':
00072             return self.getObject()
00073         return self.ob
00074 
00075 class DummyCatalog(SimpleItem):
00076     brain_class = DummyBrain
00077     def __init__(self):
00078         self.log = []
00079         self.obs = []
00080     def indexObject(self, ob):
00081         self.log.append('index %s' % physicalpath(ob))
00082     def reindexObject(self, ob, idxs=[], update_metadata=0, uid=None):
00083         self.log.append('reindex %s %s' % (physicalpath(ob), idxs))
00084     def unindexObject(self, ob):
00085         self.log.append('unindex %s' % physicalpath(ob))
00086     def setObs(self, obs):
00087         self.obs = [(ob, physicalpath(ob)) for ob in obs]
00088     def unrestrictedSearchResults(self, path):
00089         res = []
00090         for ob, obpath in self.obs:
00091             if not (obpath+'/').startswith(path+'/'):
00092                 continue
00093             res.append(self.brain_class(ob, obpath))
00094         return res
00095 
00096 
00097 class DummyWorkflowTool(SimpleItem):
00098 
00099     def __init__(self):
00100         self.log = []
00101 
00102     def notifyCreated(self, obj):
00103         self.log.append('created %s' % physicalpath(obj))
00104 
00105 
00106 class TheClass(CMFCatalogAware, Folder):
00107 
00108     implements(IContentish)
00109 
00110     def __init__(self, id):
00111         self._setId(id)
00112         self.notified = False
00113 
00114     def notifyModified(self):
00115         self.notified = True
00116 
00117 
00118 class CMFCatalogAwareTests(unittest.TestCase, LogInterceptor):
00119 
00120     layer = TraversingZCMLLayer
00121 
00122     def setUp(self):
00123         self.root = DummyRoot('')
00124         self.root.site = SimpleFolder('site')
00125         self.site = self.root.site
00126         self.site._setObject('portal_catalog', DummyCatalog())
00127         self.site._setObject('portal_workflow', DummyWorkflowTool())
00128         self.site.foo = TheClass('foo')
00129 
00130     def tearDown(self):
00131         self._ignore_log_errors()
00132         self._ignore_log_errors(subsystem='CMFCore.CMFCatalogAware')
00133 
00134     def test_indexObject(self):
00135         foo = self.site.foo
00136         cat = self.site.portal_catalog
00137         foo.indexObject()
00138         self.assertEquals(cat.log, ["index /site/foo"])
00139 
00140     def test_unindexObject(self):
00141         foo = self.site.foo
00142         cat = self.site.portal_catalog
00143         foo.unindexObject()
00144         self.assertEquals(cat.log, ["unindex /site/foo"])
00145 
00146     def test_reindexObject(self):
00147         foo = self.site.foo
00148         cat = self.site.portal_catalog
00149         foo.reindexObject()
00150         self.assertEquals(cat.log, ["reindex /site/foo []"])
00151         self.assert_(foo.notified)
00152 
00153     def test_reindexObject_idxs(self):
00154         foo = self.site.foo
00155         cat = self.site.portal_catalog
00156         foo.reindexObject(idxs=['bar'])
00157         self.assertEquals(cat.log, ["reindex /site/foo ['bar']"])
00158         self.failIf(foo.notified)
00159 
00160     def test_reindexObjectSecurity(self):
00161         foo = self.site.foo
00162         self.site.foo.bar = TheClass('bar')
00163         bar = self.site.foo.bar
00164         self.site.foo.hop = TheClass('hop')
00165         hop = self.site.foo.hop
00166         cat = self.site.portal_catalog
00167         cat.setObs([foo, bar, hop])
00168         foo.reindexObjectSecurity()
00169         l = list(cat.log)
00170         l.sort()
00171         self.assertEquals(l, [
00172             "reindex /site/foo %s"%str(CMF_SECURITY_INDEXES),
00173             "reindex /site/foo/bar %s"%str(CMF_SECURITY_INDEXES),
00174             "reindex /site/foo/hop %s"%str(CMF_SECURITY_INDEXES),
00175             ])
00176         self.failIf(foo.notified)
00177         self.failIf(bar.notified)
00178         self.failIf(hop.notified)
00179 
00180     def test_reindexObjectSecurity_missing_raise(self):
00181         # Exception raised for missing object (Zope 2.8 brains)
00182         foo = self.site.foo
00183         missing = TheClass('missing').__of__(foo)
00184         missing.GETOBJECT_RAISES = True
00185         cat = self.site.portal_catalog
00186         try:
00187             self._catch_log_errors()
00188             cat.setObs([foo, missing])
00189         finally:
00190             self._ignore_log_errors()
00191         self.assertRaises(NotFound, foo.reindexObjectSecurity)
00192         self.failIf( self.logged ) # no logging due to raise
00193 
00194     def test_reindexObjectSecurity_missing_noraise(self):
00195         # Raising disabled
00196         self._catch_log_errors(subsystem='CMFCore.CMFCatalogAware')
00197         foo = self.site.foo
00198         missing = TheClass('missing').__of__(foo)
00199         missing.GETOBJECT_RAISES = False
00200         cat = self.site.portal_catalog
00201         cat.setObs([foo, missing])
00202         foo.reindexObjectSecurity()
00203         self.assertEquals(cat.log,
00204                           ["reindex /site/foo %s"%str(CMF_SECURITY_INDEXES)])
00205         self.failIf(foo.notified)
00206         self.failIf(missing.notified)
00207         self.assertEqual( len(self.logged), 1 ) # logging because no raise
00208 
00209     def test_catalog_tool(self):
00210         foo = self.site.foo
00211         self.assertEqual(foo._getCatalogTool(), self.site.portal_catalog)
00212 
00213     def test_workflow_tool(self):
00214         foo = self.site.foo
00215         self.assertEqual(foo._getWorkflowTool(), self.site.portal_workflow)
00216 
00217     # FIXME: more tests needed
00218 
00219 
00220 class CMFCatalogAware_CopySupport_Tests(SecurityRequestTest):
00221 
00222     layer = EventZCMLLayer
00223 
00224     def _makeSite(self):
00225         self.app._setObject('site', SimpleFolder('site'))
00226         site = self.app._getOb('site')
00227         site._setObject('portal_catalog', DummyCatalog())
00228         site._setObject('portal_workflow', DummyWorkflowTool())
00229         # Hack, we need a _p_mtime for the file, so we make sure that it
00230         # has one. We use a subtransaction, which means we can rollback
00231         # later and pretend we didn't touch the ZODB.
00232         transaction.savepoint(optimistic=True)
00233         return site
00234 
00235     def _initPolicyAndUser( self
00236                           , a_lambda=None
00237                           , v_lambda=None
00238                           , c_lambda=None
00239                           ):
00240         from AccessControl import SecurityManager
00241 
00242         def _promiscuous( *args, **kw ):
00243             return 1
00244 
00245         if a_lambda is None:
00246             a_lambda = _promiscuous
00247 
00248         if v_lambda is None:
00249             v_lambda = _promiscuous
00250 
00251         if c_lambda is None:
00252             c_lambda = _promiscuous
00253 
00254         scp = _SensitiveSecurityPolicy( v_lambda, c_lambda )
00255         SecurityManager.setSecurityPolicy( scp )
00256         newSecurityManager( None
00257                           , _AllowedUser(a_lambda).__of__(self.app.acl_users))
00258 
00259     def test_object_indexed_after_adding(self):
00260 
00261         site = self._makeSite()
00262         bar = TheClass('bar')
00263         site._setObject('bar', bar)
00264         cat = site.portal_catalog
00265         self.assertEquals(cat.log, ["index /site/bar"])
00266 
00267     def test_object_unindexed_after_removing(self):
00268 
00269         site = self._makeSite()
00270         bar = TheClass('bar')
00271         site._setObject('bar', bar)
00272         cat = site.portal_catalog
00273         cat.log = []
00274         site._delObject('bar')
00275         self.assertEquals(cat.log, ["unindex /site/bar"])
00276 
00277     def test_object_indexed_after_copy_and_pasting(self):
00278 
00279         self._initPolicyAndUser() # allow copy/paste operations
00280         site = self._makeSite()
00281         site.folder1 = SimpleFolder('folder1')
00282         folder1 = site.folder1
00283         site.folder2 = SimpleFolder('folder2')
00284         folder2 = site.folder2
00285 
00286         bar = TheClass('bar')
00287         folder1._setObject('bar', bar)
00288         cat = site.portal_catalog
00289         cat.log = []
00290 
00291         transaction.savepoint(optimistic=True)
00292 
00293         cookie = folder1.manage_copyObjects(ids=['bar'])
00294         folder2.manage_pasteObjects(cookie)
00295 
00296         self.assertEquals(cat.log, ["index /site/folder2/bar"])
00297 
00298     def test_object_reindexed_after_cut_and_paste(self):
00299 
00300         self._initPolicyAndUser() # allow copy/paste operations
00301         site = self._makeSite()
00302         site.folder1 = SimpleFolder('folder1')
00303         folder1 = site.folder1
00304         site.folder2 = SimpleFolder('folder2')
00305         folder2 = site.folder2
00306 
00307         bar = TheClass('bar')
00308         folder1._setObject('bar', bar)
00309         cat = site.portal_catalog
00310         cat.log = []
00311 
00312         transaction.savepoint(optimistic=True)
00313 
00314         cookie = folder1.manage_cutObjects(ids=['bar'])
00315         folder2.manage_pasteObjects(cookie)
00316 
00317         self.assertEquals(cat.log, ["unindex /site/folder1/bar",
00318                                     "index /site/folder2/bar"])
00319 
00320     def test_object_reindexed_after_moving(self):
00321 
00322         self._initPolicyAndUser() # allow copy/paste operations
00323         site = self._makeSite()
00324 
00325         bar = TheClass('bar')
00326         site._setObject('bar', bar)
00327         cat = site.portal_catalog
00328         cat.log = []
00329 
00330         transaction.savepoint(optimistic=True)
00331 
00332         site.manage_renameObject(id='bar', new_id='baz')
00333         self.assertEquals(cat.log, ["unindex /site/bar",
00334                                     "index /site/baz"])
00335 
00336 
00337 def test_suite():
00338     return unittest.TestSuite((
00339         unittest.makeSuite(CMFCatalogAwareTests),
00340         unittest.makeSuite(CMFCatalogAware_CopySupport_Tests),
00341         ))
00342 
00343 if __name__ == '__main__':
00344     from Products.CMFCore.testing import run
00345     run(test_suite())