Back to index

plone3  3.1.7
test_ZVCStorageTool.py
Go to the documentation of this file.
00001 #########################################################################
00002 # Copyright (c) 2004, 2005 Alberto Berti, Gregoire Weber,
00003 # Reflab (Vincenzo Di Somma, Francesco Ciriaci, Riccardo Lemmi)
00004 # All Rights Reserved.
00005 # 
00006 # This file is part of CMFEditions.
00007 # 
00008 # CMFEditions is free software; you can redistribute it and/or modify
00009 # it under the terms of the GNU General Public License as published by
00010 # the Free Software Foundation; either version 2 of the License, or
00011 # (at your option) any later version.
00012 # 
00013 # CMFEditions is distributed in the hope that it will be useful,
00014 # but WITHOUT ANY WARRANTY; without even the implied warranty of
00015 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016 # GNU General Public License for more details.
00017 # 
00018 # You should have received a copy of the GNU General Public License
00019 # along with CMFEditions; if not, write to the Free Software
00020 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
00021 #########################################################################
00022 """Test the standard archivist
00023 
00024 $Id: test_ZVCStorageTool.py,v 1.12 2005/02/24 21:53:44 tomek1024 Exp $
00025 """
00026 
00027 from Products.PloneTestCase import PloneTestCase
00028 PloneTestCase.setupPloneSite()
00029 
00030 from Interface.Verify import verifyObject
00031 from OFS.ObjectManager import ObjectManager
00032 
00033 from Products.CMFEditions.ArchivistTool import ObjectData
00034 from Products.CMFEditions.interfaces.IStorage import IStorage
00035 from Products.CMFEditions.interfaces.IStorage import IPurgeSupport
00036 from Products.CMFEditions.interfaces.IStorage import StorageUnregisteredError
00037 from Products.CMFEditions.interfaces.IStorage import StorageRetrieveError
00038 
00039 from DummyTools import Dummy as Dummy
00040 from DummyTools import DummyPurgePolicy
00041 from DummyTools import MemoryStorage
00042 from DummyTools import notifyModified
00043 
00044 class DummyOM(ObjectManager):
00045     pass
00046 
00047 class TestZVCStorageTool(PloneTestCase.PloneTestCase):
00048 
00049     def afterSetUp(self):
00050         # we need to have the Manager role to be able to add things
00051         # to the portal root
00052         self.setRoles(['Manager',])
00053 
00054         # add an additional user
00055         self.portal.acl_users.userFolderAddUser('reviewer', 'reviewer',
00056                                                 ['Manager'], '')
00057         
00058         # eventually install another storage 
00059         self.installStorageTool()
00060 
00061         # delete purge policy if there is one installed
00062         try:
00063             del self.portal.portal_purgepolicy
00064         except AttributeError:
00065             pass
00066 
00067     def installStorageTool(self):
00068         # No op: the storage tool is already installed by installProduct
00069         pass
00070 
00071     def installPurgePolicyTool(self):
00072         self._setDummyTool(DummyPurgePolicy())
00073 
00074     def _setDummyTool(self, tool):
00075         setattr(self.portal, tool.getId(), tool)
00076 
00077     def buildMetadata(self, comment):
00078         return {'sys_metadata': {'comment': comment}}
00079 
00080     def getComment(self, vdata):
00081         return vdata.metadata["sys_metadata"]["comment"]
00082 
00083     def test00_interface(self):
00084         portal_storage = self.portal.portal_historiesstorage
00085 
00086         # test interface conformance
00087         verifyObject(IStorage, portal_storage)
00088         verifyObject(IPurgeSupport, portal_storage)
00089 
00090     def test01_saveAfterRegisteringDoesNotRaiseException(self):
00091         portal_storage = self.portal.portal_historiesstorage
00092         obj = Dummy()
00093         
00094         sel = portal_storage.register(1, ObjectData(obj), 
00095                                       metadata=self.buildMetadata('saved'))
00096         self.assertEqual(sel, 0)
00097         sel = portal_storage.save(1, ObjectData(obj), 
00098                                   metadata=self.buildMetadata('saved'))
00099         self.assertEqual(sel, 1)
00100 
00101     def test02_saveUnregisteredObjectRaisesException(self):
00102         portal_storage = self.portal.portal_historiesstorage
00103         obj = Dummy()
00104 
00105         self.assertRaises(StorageUnregisteredError,
00106                           portal_storage.save,
00107                           1, ObjectData(obj), metadata=self.buildMetadata('saved'))
00108 
00109     def test03_saveAndRetrieve(self):
00110         portal_storage = self.portal.portal_historiesstorage
00111 
00112         obj1 = Dummy()
00113         obj1.text = 'v1 of text'
00114         portal_storage.register(1, ObjectData(obj1), metadata=self.buildMetadata('saved v1'))
00115 
00116         obj2 = Dummy()
00117         obj2.text = 'v2 of text'
00118         portal_storage.save(1, ObjectData(obj2), metadata=self.buildMetadata('saved v2'))
00119 
00120         # retrieve the state at registration time
00121         retrieved_obj = portal_storage.retrieve(history_id=1, selector=0)
00122         self.assertEqual(retrieved_obj.object.object.text, 'v1 of text')
00123         self.assertEqual(self.getComment(retrieved_obj), 'saved v1')
00124 
00125         # just check if first save wasn't a double save
00126         retrieved_obj = portal_storage.retrieve(history_id=1, selector=1)
00127         self.assertEqual(retrieved_obj.object.object.text, 'v2 of text')
00128         self.assertEqual(self.getComment(retrieved_obj), 'saved v2')
00129 
00130     def test05_getHistory(self):
00131         portal_storage = self.portal.portal_historiesstorage
00132         
00133         obj1 = Dummy()
00134         obj1.text = 'v1 of text'
00135         portal_storage.register(1, ObjectData(obj1), 
00136                                 metadata=self.buildMetadata('saved v1'))
00137         
00138         obj2 = Dummy()
00139         obj2.text = 'v2 of text'
00140         portal_storage.save(1, ObjectData(obj2), 
00141                             metadata=self.buildMetadata('saved v2'))
00142         
00143         obj3 = Dummy()
00144         obj3.text = 'v3 of text'
00145         portal_storage.save(1, ObjectData(obj3), 
00146                             metadata=self.buildMetadata('saved v3'))
00147         
00148         history = portal_storage.getHistory(history_id=1)
00149         length = len(history)
00150         
00151         # check length
00152         self.assertEquals(length, 3)
00153         
00154         # iterating over the history
00155         for i, vdata in enumerate(history):
00156             expected_test = 'v%s of text' % (i+1)
00157             self.assertEquals(vdata.object.object.text, expected_test)
00158             self.assertEquals(history[i].object.object.text, expected_test)
00159             
00160             expected_comment = 'saved v%s' % (i+1)
00161             self.assertEqual(self.getComment(vdata), expected_comment)
00162             self.assertEqual(self.getComment(history[i]), expected_comment)
00163             
00164         # accessing the versions
00165         self.assertEquals(history[0].object.object.text, "v1 of text")
00166         self.assertEqual(self.getComment(history[0]), "saved v1")
00167         self.assertEquals(history[1].object.object.text, "v2 of text")
00168         self.assertEqual(self.getComment(history[1]), "saved v2")
00169         self.assertEquals(history[2].object.object.text, "v3 of text")
00170         self.assertEqual(self.getComment(history[2]), "saved v3")
00171 
00172     def test06_checkObjectManagerIntegrity(self):
00173         portal_storage = self.portal.portal_historiesstorage
00174         
00175         om = DummyOM()
00176         sub1 = Dummy()
00177         sub2 = Dummy()
00178         om._setObject('sub1', sub1)
00179         om._setObject('sub2', sub2)
00180         self.assertEqual(len(om.objectIds()), 2)
00181         portal_storage.register(1, ObjectData(om), metadata=self.buildMetadata('saved v1'))
00182         vdata = portal_storage.retrieve(history_id=1, selector=0)
00183         retrieved_om = vdata.object
00184         self.assertEqual(len(retrieved_om.object.objectIds()), 2)
00185 
00186     def test07_getModificationDate(self):
00187         portal_storage = self.portal.portal_historiesstorage
00188         obj = Dummy()
00189         v1_modified = obj.modified()
00190         v1 = portal_storage.register(history_id=1, object=ObjectData(obj), metadata=self.buildMetadata('saved v1'))
00191         
00192         self.assertEqual(v1_modified, portal_storage.getModificationDate(history_id=1))
00193         self.assertEqual(v1_modified, portal_storage.getModificationDate(history_id=1, selector=v1))
00194 
00195         #storage never gets the same object twice, because the archivist always generates another copy on save,
00196         #which then have a diffrent python id.
00197 
00198         #simulate object copy
00199         notifyModified(obj)
00200         obj = Dummy()
00201         v2_modified = obj.modified()
00202         v2 = portal_storage.save(history_id=1, object=ObjectData(obj), metadata=self.buildMetadata('saved v2'))
00203         self.assertNotEquals(v1, v2)
00204         self.assertEqual(v2_modified, portal_storage.getModificationDate(history_id=1))
00205         self.assertEqual(v2_modified, portal_storage.getModificationDate(history_id=1, selector=v2))
00206         self.assertEqual(v1_modified, portal_storage.getModificationDate(history_id=1, selector=v1))
00207 
00208     def _setupMinimalHistory(self):
00209         portal_storage = self.portal.portal_historiesstorage
00210         
00211         obj1 = Dummy()
00212         obj1.text = 'v1 of text'
00213         portal_storage.register(1, ObjectData(obj1), metadata=self.buildMetadata('saved v1'))
00214         
00215         obj2 = Dummy()
00216         obj2.text = 'v2 of text'
00217         portal_storage.save(1, ObjectData(obj2), metadata=self.buildMetadata('saved v2'))
00218         
00219         obj3 = Dummy()
00220         obj3.text = 'v3 of text'
00221         portal_storage.save(1, ObjectData(obj3), metadata=self.buildMetadata('saved v3'))
00222 
00223         obj4 = Dummy()
00224         obj4.text = 'v4 of text'
00225         portal_storage.save(1, ObjectData(obj4), metadata=self.buildMetadata('saved v4'))
00226 
00227     def test08_lengthAfterHavingPurgedAVersion(self):
00228         self._setupMinimalHistory()
00229         portal_storage = self.portal.portal_historiesstorage
00230         
00231         # purge a version
00232         portal_storage.purge(1, 1, metadata=self.buildMetadata("purged v2"))
00233         
00234         # check length
00235         lenWith = len(portal_storage.getHistory(1, countPurged=True))
00236         self.assertEqual(lenWith, 4)
00237         lenWithout = len(portal_storage.getHistory(1, countPurged=False))
00238         self.assertEqual(lenWithout, 3)
00239         
00240         # purge again the same version (should not change the purge count)
00241         portal_storage.purge(1, 1, metadata=self.buildMetadata("purged v2"))
00242         
00243         # check again getLength (unchanged behaviour)
00244         lenWith = len(portal_storage.getHistory(1, countPurged=True))
00245         self.assertEqual(lenWith, 4)
00246         lenWithout = len(portal_storage.getHistory(1, countPurged=False))
00247         self.assertEqual(lenWithout, 3)
00248 
00249     def test09_retrievePurgedVersionsNoPolicyInstalled(self):
00250         self._setupMinimalHistory()
00251         portal_storage = self.portal.portal_historiesstorage
00252         
00253         # purge a version
00254         portal_storage.purge(1, 2, metadata=self.buildMetadata("purged v3"))
00255         
00256         # ``retrieve`` returns the removed info because there is no purge 
00257         # policy installed
00258         retrieved_obj = portal_storage.retrieve(history_id=1, selector=2)
00259         self.failIf(retrieved_obj.isValid())
00260         self.assertEqual(retrieved_obj.object.reason, "purged")
00261         self.assertEqual(self.getComment(retrieved_obj), "purged v3")
00262         
00263         retrieved_obj = portal_storage.retrieve(history_id=1, selector=2, 
00264                                                 substitute=False)
00265         self.failIf(retrieved_obj.isValid())
00266         self.assertEqual(retrieved_obj.object.reason, "purged")
00267         self.assertEqual(self.getComment(retrieved_obj), "purged v3")
00268 
00269         retrieved_obj = portal_storage.retrieve(history_id=1, selector=2, 
00270                                                 countPurged=False)
00271         self.failUnless(retrieved_obj.isValid())
00272         self.assertEqual(retrieved_obj.object.object.text, 'v4 of text')
00273         self.assertEqual(self.getComment(retrieved_obj), 'saved v4')
00274         
00275     def test10_retrievePurgedVersionsWithPolicyInstalled(self):
00276         self._setupMinimalHistory()
00277         portal_storage = self.portal.portal_historiesstorage
00278         
00279         # install the purge policy that returns the next older not removed
00280         self.installPurgePolicyTool()
00281         
00282         # purge
00283         portal_storage.purge(1, 1, metadata=self.buildMetadata("purged v2"))
00284         lenAll = len(portal_storage.getHistory(1))
00285         lenEff = len(portal_storage.getHistory(1, countPurged=False))
00286         self.assertEqual(lenAll, 4)
00287         self.assertEqual(lenEff, 3)
00288         
00289         # purge
00290         portal_storage.purge(1, 2, metadata=self.buildMetadata("purged v3"))
00291         lenAll = len(portal_storage.getHistory(1))
00292         lenEff = len(portal_storage.getHistory(1, countPurged=False))
00293         self.assertEqual(lenAll, 4)
00294         self.assertEqual(lenEff, 2)
00295         
00296         # ``retrieve`` returns the next older object 
00297         retrieved_obj = portal_storage.retrieve(history_id=1, selector=1)
00298         self.failUnless(retrieved_obj.isValid())
00299         self.assertEqual(retrieved_obj.object.object.text, 'v1 of text')
00300         self.assertEqual(self.getComment(retrieved_obj), 'saved v1')
00301         
00302         retrieved_obj = portal_storage.retrieve(history_id=1, selector=2)
00303         self.failUnless(retrieved_obj.isValid())
00304         self.assertEqual(retrieved_obj.object.object.text, 'v1 of text')
00305         self.assertEqual(self.getComment(retrieved_obj), 'saved v1')
00306         
00307         # ``retrieve`` returns existing object
00308         retrieved_obj = portal_storage.retrieve(history_id=1, selector=3)
00309         self.failUnless(retrieved_obj.isValid())
00310         self.assertEqual(retrieved_obj.object.object.text, 'v4 of text')
00311         self.assertEqual(self.getComment(retrieved_obj), 'saved v4')
00312         
00313         # check with substitute=False: should return the removed info
00314         retrieved_obj = portal_storage.retrieve(history_id=1, selector=1, 
00315                                                 substitute=False)
00316         self.failIf(retrieved_obj.isValid())
00317         self.assertEqual(retrieved_obj.object.reason, "purged")
00318         self.assertEqual(self.getComment(retrieved_obj), "purged v2")
00319         retrieved_obj = portal_storage.retrieve(history_id=1, selector=2,
00320                                                 substitute=False)
00321         self.failIf(retrieved_obj.isValid())
00322         self.assertEqual(retrieved_obj.object.reason, "purged")
00323         self.assertEqual(self.getComment(retrieved_obj), "purged v3")
00324 
00325     def test11_purgeOnSave(self):
00326         # install the purge policy that removes all except the current and 
00327         # previous objects
00328         self.installPurgePolicyTool()
00329         portal_storage = self.portal.portal_historiesstorage
00330         
00331         # save no 1
00332         obj1 = Dummy()
00333         obj1.text = 'v1 of text'
00334         
00335         sel = portal_storage.register(1, ObjectData(obj1), 
00336                                       metadata=self.buildMetadata('saved v1'))
00337         history = portal_storage.getHistory(1, countPurged=False)
00338         
00339         self.assertEquals(sel, 0)
00340         self.assertEquals(len(history), 1)
00341         self.assertEqual(history[0].object.object.text, 'v1 of text')
00342         self.assertEqual(self.getComment(history[0]), 'saved v1')
00343         
00344         # save no 2
00345         obj2 = Dummy()
00346         obj2.text = 'v2 of text'
00347         sel = portal_storage.save(1, ObjectData(obj2), 
00348                                   metadata=self.buildMetadata('saved v2'))
00349         history = portal_storage.getHistory(1, countPurged=False)
00350         
00351         self.assertEquals(sel, 1)
00352         self.assertEquals(len(history), 2)
00353         self.assertEqual(history[0].object.object.text, 'v1 of text')
00354         self.assertEqual(self.getComment(history[0]), 'saved v1')
00355         self.assertEqual(history[1].object.object.text, 'v2 of text')
00356         self.assertEqual(self.getComment(history[1]), 'saved v2')
00357         
00358         # save no 3: purged oldest version
00359         obj3 = Dummy()
00360         obj3.text = 'v3 of text'
00361         sel = portal_storage.save(1, ObjectData(obj3), 
00362                                   metadata=self.buildMetadata('saved v3'))
00363         history = portal_storage.getHistory(1, countPurged=False)
00364         length = len(history)
00365         
00366         # iterating over the history
00367         for i, vdata in enumerate(history):
00368             self.assertEquals(vdata.object.object.text, 
00369                               'v%s of text' % (i+2))
00370             self.assertEqual(self.getComment(vdata), 
00371                              'saved v%s' % (i+2))
00372             
00373         self.assertEquals(sel, 2)
00374         self.assertEquals(length, 2)
00375         self.assertEqual(history[0].object.object.text, 'v2 of text')
00376         self.assertEqual(self.getComment(history[0]), 'saved v2')
00377         self.assertEqual(history[1].object.object.text, 'v3 of text')
00378         self.assertEqual(self.getComment(history[1]), 'saved v3')
00379         
00380         # save no 4: purged oldest version
00381         obj4 = Dummy()
00382         obj4.text = 'v4 of text'
00383         sel = portal_storage.save(1, ObjectData(obj4), 
00384                                   metadata=self.buildMetadata('saved v4'))
00385         history = portal_storage.getHistory(1, countPurged=False)
00386         length = len(history)
00387         
00388         # iterating over the history
00389         for i, vdata in enumerate(history):
00390             self.assertEquals(vdata.object.object.text, 
00391                               'v%s of text' % (i+3))
00392             self.assertEqual(self.getComment(vdata), 
00393                              'saved v%s' % (i+3))
00394             
00395         self.assertEquals(sel, 3)
00396         self.assertEquals(length, 2)
00397         self.assertEqual(history[0].object.object.text, 'v3 of text')
00398         self.assertEqual(self.getComment(history[0]), 'saved v3')
00399         self.assertEqual(history[1].object.object.text, 'v4 of text')
00400         self.assertEqual(self.getComment(history[1]), 'saved v4')
00401 
00402     def test12_retrieveNonExistentVersion(self):
00403         portal_storage = self.portal.portal_historiesstorage
00404 
00405         obj1 = Dummy()
00406         obj1.text = 'v1 of text'
00407         portal_storage.register(1, ObjectData(obj1), metadata=self.buildMetadata('saved v1'))
00408 
00409         obj2 = Dummy()
00410         obj2.text = 'v2 of text'
00411         portal_storage.save(1, ObjectData(obj2), metadata=self.buildMetadata('saved v2'))
00412 
00413         # purge
00414         portal_storage.purge(1, 0, metadata=self.buildMetadata("purged v1"))
00415         
00416         # retrieve non existing version
00417         self.assertRaises(StorageRetrieveError,
00418                           portal_storage.retrieve, history_id=1, selector=2, 
00419                           countPurged=True, substitute=True)
00420 
00421         self.assertRaises(StorageRetrieveError,
00422                           portal_storage.retrieve, history_id=1, selector=1, 
00423                           countPurged=False)
00424 
00425     def test13_saveWithUnicodeComment(self):
00426         portal_storage = self.portal.portal_historiesstorage
00427         obj1 = Dummy()
00428         obj1.text = 'v1 of text'
00429         portal_storage.register(1, ObjectData(obj1),
00430                                 metadata=self.buildMetadata('saved v1'))
00431         portal_storage.save(1, ObjectData(obj1),
00432                             metadata=self.buildMetadata(u'saved v1\xc3\xa1'))
00433 
00434 class TestMemoryStorage(TestZVCStorageTool):
00435 
00436     def installStorageTool(self):
00437         # install the memory storage
00438         tool = MemoryStorage()
00439         setattr(self.portal, tool.getId(), tool)
00440 
00441 from unittest import TestSuite, makeSuite
00442 def test_suite():
00443     suite = TestSuite()
00444     suite.addTest(makeSuite(TestZVCStorageTool))
00445     suite.addTest(makeSuite(TestMemoryStorage))
00446     return suite