Back to index

plone3  3.1.7
test_MoreCaching.py
Go to the documentation of this file.
00001 ##############################################################################
00002 #
00003 # Copyright (c) 2006 Zope Corporation and Contributors. All Rights
00004 # Reserved.
00005 #
00006 # This software is subject to the provisions of the Zope Public License,
00007 # Version 2.1 (ZPL).  A copy of the ZPL should accompany this
00008 # distribution.
00009 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
00010 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00011 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
00012 # FOR A PARTICULAR PURPOSE.
00013 #
00014 ##############################################################################
00015 
00016 from AccessControl.Permissions import view as View
00017 
00018 from Products.PluggableAuthService.tests import pastc
00019 from Products.PluggableAuthService.interfaces.plugins import IExtractionPlugin
00020 
00021 
00022 class CachingTests(pastc.PASTestCase):
00023 
00024     def afterSetUp(self):
00025         self.pas = self.folder.acl_users
00026         # Add a RAM cache
00027         factory = self.pas.manage_addProduct['StandardCacheManagers']
00028         factory.manage_addRAMCacheManager('ram_cache')
00029         self.cache = self.pas.ram_cache
00030         # Activate the cache
00031         self.pas.ZCacheable_setManagerId('ram_cache')
00032         # Create a protected document
00033         self.folder.manage_addDTMLMethod('doc', file='the document')
00034         self.doc = self.folder.doc
00035         self.doc.manage_permission(View, [pastc.user_role], acquire=False)
00036 
00037     def assertCacheStats(self, entries, misses, hits):
00038         # Check cache statistics against expected values
00039         report_item = {'entries': 0, 'misses': 0, 'hits': 0}
00040         report = self.cache.getCacheReport()
00041         if len(report):
00042             report_item = report[0]
00043         self.assertEqual(report_item.get('entries'), entries)
00044         self.assertEqual(report_item.get('misses'), misses)
00045         self.assertEqual(report_item.get('hits'), hits)
00046 
00047     def test__extractUserIds(self):
00048         request = self.app.REQUEST
00049         request._auth = 'Basic %s' % pastc.user_auth
00050 
00051         # Extract, we should see a cache miss
00052         self.pas._extractUserIds(request, self.pas.plugins)
00053         self.assertCacheStats(1, 1, 0)
00054 
00055         # Extract again, we should see a cache hit
00056         self.pas._extractUserIds(request, self.pas.plugins)
00057         self.assertCacheStats(1, 1, 1)
00058 
00059         # Extract yet again, we should see another hit
00060         self.pas._extractUserIds(request, self.pas.plugins)
00061         self.assertCacheStats(1, 1, 2)
00062 
00063     def test__extractUserIds_two_extractors(self):
00064         # Two extractors should result in two cache entries
00065         request = self.app.REQUEST
00066         request._auth = 'Basic %s' % pastc.user_auth
00067 
00068         factory = self.pas.manage_addProduct['PluggableAuthService']
00069         factory.addHTTPBasicAuthHelper('http_auth_2')
00070         self.pas.plugins.activatePlugin(IExtractionPlugin, 'http_auth_2')
00071 
00072         # Extract, we should see cache misses
00073         self.pas._extractUserIds(request, self.pas.plugins)
00074         self.assertCacheStats(2, 2, 0)
00075 
00076         # Extract again, we should see cache hits
00077         self.pas._extractUserIds(request, self.pas.plugins)
00078         self.assertCacheStats(2, 2, 2)
00079 
00080         # Extract yet again, we should see more hits
00081         self.pas._extractUserIds(request, self.pas.plugins)
00082         self.assertCacheStats(2, 2, 4)
00083 
00084     def test__findUser(self):
00085         # Find, we should see a cache miss
00086         self.pas._findUser(self.pas.plugins, pastc.user_name)
00087         self.assertCacheStats(1, 1, 0)
00088 
00089         # Find again, we should see a cache hit
00090         self.pas._findUser(self.pas.plugins, pastc.user_name)
00091         self.assertCacheStats(1, 1, 1)
00092 
00093         # Find yet again, we should see another hit
00094         self.pas._findUser(self.pas.plugins, pastc.user_name)
00095         self.assertCacheStats(1, 1, 2)
00096 
00097     def test__verifyUser(self):
00098         # Verify, we should see a cache miss
00099         self.pas._verifyUser(self.pas.plugins, pastc.user_name)
00100         self.assertCacheStats(1, 1, 0)
00101 
00102         # Verify again, we should see a cache hit
00103         self.pas._verifyUser(self.pas.plugins, pastc.user_name)
00104         self.assertCacheStats(1, 1, 1)
00105 
00106         # Verify yet again, we should see another hit
00107         self.pas._verifyUser(self.pas.plugins, pastc.user_name)
00108         self.assertCacheStats(1, 1, 2)
00109 
00110     def test_getUser(self):
00111         self.pas.getUser(pastc.user_name)
00112         self.assertCacheStats(2, 2, 0)
00113 
00114         self.pas.getUser(pastc.user_name)
00115         self.assertCacheStats(2, 2, 2)
00116 
00117         self.pas.getUser(pastc.user_name)
00118         self.assertCacheStats(2, 2, 4)
00119 
00120     def test_getUserById(self):
00121         self.pas.getUserById(pastc.user_name)
00122         self.assertCacheStats(2, 2, 0)
00123 
00124         self.pas.getUserById(pastc.user_name)
00125         self.assertCacheStats(2, 2, 2)
00126 
00127         self.pas.getUserById(pastc.user_name)
00128         self.assertCacheStats(2, 2, 4)
00129 
00130     def test_validate(self):
00131         # Rig the request so it looks like we traversed to doc
00132         request = self.app.REQUEST
00133         request['PUBLISHED'] = self.doc
00134         request['PARENTS'] = [self.app, self.folder]
00135         request.steps = list(self.doc.getPhysicalPath())
00136         request._auth = 'Basic %s' % pastc.user_auth
00137 
00138         user = self.pas.validate(request)
00139         self.failIf(user is None)
00140         self.assertEqual(user.getId(), pastc.user_name)
00141         self.assertEqual(user.getRoles(), ['Authenticated', pastc.user_role])
00142 
00143         self.assertCacheStats(2, 2, 0)
00144 
00145         self.pas.validate(request)
00146         self.assertCacheStats(2, 2, 2)
00147 
00148         self.pas.validate(request)
00149         self.assertCacheStats(2, 2, 4)
00150 
00151     def test_validate_anonymous(self):
00152         # Rig the request so it looks like we traversed to doc
00153         request = self.app.REQUEST
00154         request['PUBLISHED'] = self.doc
00155         request['PARENTS'] = [self.app, self.folder]
00156         request.steps = list(self.doc.getPhysicalPath())
00157 
00158         user = self.pas.validate(request)
00159         self.failUnless(user is None)
00160 
00161         self.assertCacheStats(0, 0, 0)
00162 
00163     def test_validate_utf8_credentials(self):
00164         # Rig the request so it looks like we traversed to doc
00165         request = self.app.REQUEST
00166         request['PUBLISHED'] = self.doc
00167         request['PARENTS'] = [self.app, self.folder]
00168         request.steps = list(self.doc.getPhysicalPath())
00169 
00170         # Rig the extractor so it returns UTF-8 credentials
00171         self.pas.http_auth.extractCredentials = \
00172             lambda req: { 'login': pastc.user_name
00173                         , 'password': pastc.user_password
00174                         , 'extra': 'M\303\244dchen'
00175                         }
00176 
00177         user = self.pas.validate(request)
00178         self.failIf(user is None)
00179         self.assertEqual(user.getId(), pastc.user_name)
00180         self.assertEqual(user.getRoles(), ['Authenticated', pastc.user_role])
00181 
00182         self.assertCacheStats(2, 2, 0)
00183 
00184         self.pas.validate(request)
00185         self.assertCacheStats(2, 2, 2)
00186 
00187         self.pas.validate(request)
00188         self.assertCacheStats(2, 2, 4)
00189 
00190     def test_validate_unicode_credentials(self):
00191         # Rig the request so it looks like we traversed to doc
00192         request = self.app.REQUEST
00193         request['PUBLISHED'] = self.doc
00194         request['PARENTS'] = [self.app, self.folder]
00195         request.steps = list(self.doc.getPhysicalPath())
00196 
00197         # Rig the extractor so it returns Unicode credentials
00198         self.pas.http_auth.extractCredentials = \
00199             lambda req: { 'login': pastc.user_name
00200                         , 'password': pastc.user_password
00201                         , 'extra': u'M\344dchen'
00202                         }
00203 
00204         user = self.pas.validate(request)
00205         self.failIf(user is None)
00206         self.assertEqual(user.getId(), pastc.user_name)
00207         self.assertEqual(user.getRoles(), ['Authenticated', pastc.user_role])
00208 
00209         self.assertCacheStats(2, 2, 0)
00210 
00211         self.pas.validate(request)
00212         self.assertCacheStats(2, 2, 2)
00213 
00214         self.pas.validate(request)
00215         self.assertCacheStats(2, 2, 4)
00216 
00217     def test_validate_utf16_credentials(self):
00218         # Rig the request so it looks like we traversed to doc
00219         request = self.app.REQUEST
00220         request['PUBLISHED'] = self.doc
00221         request['PARENTS'] = [self.app, self.folder]
00222         request.steps = list(self.doc.getPhysicalPath())
00223 
00224         # Rig the extractor so it returns UTF-16 credentials
00225         self.pas.http_auth.extractCredentials = \
00226             lambda req: { 'login': pastc.user_name
00227                         , 'password': pastc.user_password
00228                         , 'extra': u'M\344dchen'.encode('utf-16')
00229                         }
00230 
00231         user = self.pas.validate(request)
00232         self.failIf(user is None)
00233         self.assertEqual(user.getId(), pastc.user_name)
00234         self.assertEqual(user.getRoles(), ['Authenticated', pastc.user_role])
00235 
00236         self.assertCacheStats(2, 2, 0)
00237 
00238         self.pas.validate(request)
00239         self.assertCacheStats(2, 2, 2)
00240 
00241         self.pas.validate(request)
00242         self.assertCacheStats(2, 2, 4)
00243 
00244     def test__doAddUser(self):
00245         user_id = 'test_user_2_'
00246         password = 'secret'
00247 
00248         self.assertCacheStats(0, 0, 0)
00249 
00250         self.pas._doAddUser(user_id, password, [pastc.user_role], [])
00251 
00252         # XXX: _doAddUser calls getUser
00253         self.assertCacheStats(2, 2, 0)
00254 
00255         # XXX: As a result the user is now cached, but without roles
00256         user = self.pas.getUserById(user_id)
00257         self.failIf(user is None)
00258         self.assertEqual(user.getId(), user_id)
00259         self.assertEqual(user.getRoles(), ['Authenticated'])
00260 
00261         # XXX: Must clear cache to get roles
00262         self.pas.ZCacheable_invalidate()
00263 
00264         user = self.pas.getUserById(user_id)
00265         self.failIf(user is None)
00266         self.assertEqual(user.getId(), user_id)
00267         self.assertEqual(user.getRoles(), ['Authenticated', pastc.user_role])
00268 
00269 
00270 def test_suite():
00271     from unittest import TestSuite, makeSuite
00272     return TestSuite((
00273         makeSuite(CachingTests),
00274     ))
00275 
00276 if __name__ == '__main__':
00277     unittest.main(defaultTest='test_suite')