Back to index

plone3  3.1.7
testcase.py
Go to the documentation of this file.
00001 import unittest
00002 from Testing import ZopeTestCase
00003 
00004 import sys
00005 import time
00006 import logging
00007 from os import chmod, curdir, mkdir, remove, stat, walk
00008 from os.path import join, abspath, dirname
00009 from shutil import copytree, rmtree
00010 from stat import S_IREAD, S_IWRITE
00011 from tempfile import mktemp
00012 
00013 import transaction
00014 from AccessControl.SecurityManagement import newSecurityManager
00015 from AccessControl.SecurityManagement import noSecurityManager
00016 from AccessControl.SecurityManager import setSecurityPolicy
00017 
00018 from dummy import DummyFolder
00019 from security import AnonymousUser
00020 from security import PermissiveSecurityPolicy
00021 from Products.CMFCore.utils import getPackageLocation
00022 
00023 
00024 class LogInterceptor:
00025 
00026     logged = None
00027     installed = ()
00028     level = 0
00029 
00030     def _catch_log_errors(self, ignored_level=logging.WARNING, subsystem=''):
00031 
00032         if subsystem in self.installed:
00033             raise ValueError, 'Already installed filter!'
00034 
00035         root_logger = logging.getLogger(subsystem)
00036         self.installed += (subsystem,)
00037         self.level = ignored_level
00038         root_logger.addFilter(self)
00039 
00040     def filter(self, record):
00041         if record.levelno > self.level:
00042             return True
00043         if self.logged is None:
00044             self.logged = []
00045         self.logged.append(record)
00046         return False
00047 
00048     def _ignore_log_errors(self, subsystem=''):
00049 
00050         if subsystem not in self.installed:
00051             return
00052 
00053         root_logger = logging.getLogger(subsystem)
00054         root_logger.removeFilter(self)
00055         self.installed = tuple([s for s in self.installed if s != subsystem])
00056 
00057 
00058 class WarningInterceptor:
00059 
00060     _old_stderr = None
00061     _our_stderr_stream = None
00062 
00063     def _trap_warning_output( self ):
00064 
00065         if self._old_stderr is not None:
00066             return
00067 
00068         from StringIO import StringIO
00069 
00070         self._old_stderr = sys.stderr
00071         self._our_stderr_stream = sys.stderr = StringIO()
00072 
00073     def _free_warning_output( self ):
00074 
00075         if self._old_stderr is None:
00076             return
00077 
00078         sys.stderr = self._old_stderr
00079 
00080 
00081 class TransactionalTest(unittest.TestCase):
00082 
00083     try:
00084         from Testing.ZopeTestCase.layer import ZopeLite
00085     except ImportError:
00086         pass # Zope < 2.11
00087     else:
00088         layer = ZopeLite
00089 
00090     def setUp(self):
00091         transaction.begin()
00092         self.app = self.root = ZopeTestCase.app()
00093         self.REQUEST  = self.app.REQUEST
00094         self.RESPONSE = self.app.REQUEST.RESPONSE
00095 
00096     def tearDown(self):
00097         transaction.abort()
00098         ZopeTestCase.close(self.app)
00099 
00100 RequestTest = TransactionalTest
00101 
00102 
00103 class SecurityTest(TransactionalTest):
00104 
00105     def setUp(self):
00106         TransactionalTest.setUp(self)
00107         self._policy = PermissiveSecurityPolicy()
00108         self._oldPolicy = setSecurityPolicy(self._policy)
00109         newSecurityManager(None, AnonymousUser().__of__(self.app.acl_users))
00110 
00111     def tearDown(self):
00112         noSecurityManager()
00113         setSecurityPolicy(self._oldPolicy)
00114         TransactionalTest.tearDown(self)
00115 
00116 SecurityRequestTest = SecurityTest
00117 
00118 
00119 try:
00120     __file__
00121 except NameError:
00122     # Test was called directly, so no __file__ global exists.
00123     _prefix = abspath(curdir)
00124 else:
00125     # Test was called by another test.
00126     _prefix = abspath(dirname(__file__))
00127 
00128 _prefix = abspath(join(_prefix,'..'))
00129 
00130 
00131 class FSDVTest(unittest.TestCase, WarningInterceptor):
00132 
00133     tempname = _sourceprefix = _prefix
00134     _skinname = 'fake_skins'
00135     _layername = 'fake_skin'
00136 
00137     def _registerDirectory(self, object=None, ignore=None):
00138         self._trap_warning_output()
00139         from Products.CMFCore.DirectoryView import registerDirectory
00140         from Products.CMFCore.DirectoryView import addDirectoryViews
00141         if ignore is None:
00142             from Products.CMFCore.DirectoryView import ignore
00143         registerDirectory(self._skinname, self.tempname, ignore=ignore)
00144         if object is not None:
00145             ob = self.ob = DummyFolder()
00146             addDirectoryViews(ob, self._skinname, self.tempname)
00147 
00148     def setUp(self):
00149         # store the skin path name
00150         self.skin_path_name = join(self.tempname,self._skinname,self._layername)
00151 
00152     def tearDown(self):
00153         self._free_warning_output()
00154 
00155 
00156 class WritableFSDVTest(FSDVTest):
00157     # Base class for FSDV test, creates a fake skin
00158     # copy that can be edited.
00159 
00160     def _writeFile(self, filename, stuff):
00161         # write some stuff to a file on disk
00162         # make sure the file's modification time has changed
00163         # also make sure the skin folder mod time has changed
00164         try:
00165             dir_mtime = stat(self.skin_path_name)[8]
00166         except:  # XXX Why bare except?
00167             dir_mtime = 0
00168         thePath = join(self.skin_path_name,filename)
00169         try:
00170             mtime1 = stat(thePath)[8]
00171         except:  # XXX Why bare except?
00172             mtime1 = 0
00173         mtime2 = mtime1
00174         while mtime2==mtime1:
00175             f = open(thePath,'w')
00176             f.write(stuff)
00177             f.close()
00178             mtime2 = stat(thePath)[8]
00179         self._addedOrRemoved(dir_mtime)
00180 
00181     def _deleteFile(self,filename):
00182         try:
00183             dir_mtime = stat(self.skin_path_name)[8]
00184         except:  # XXX Why bare except?
00185             dir_mtime = 0
00186         remove(join(self.skin_path_name, filename))
00187         self._addedOrRemoved(dir_mtime)
00188 
00189     def _addedOrRemoved(self, old_mtime):
00190         # Called after adding/removing a file from self.skin_path_name.
00191 
00192         if sys.platform == 'win32':
00193             # Windows doesn't reliably update directory mod times, so
00194             # DirectoryView has an expensive workaround.  The
00195             # workaround does not rely on directory mod times.
00196             return
00197         limit = time.time() + 60  # If it takes 60 seconds, give up.
00198         new_mtime = old_mtime
00199         while new_mtime == old_mtime:
00200             # Many systems have a granularity of 1 second.
00201             # Add/remove a file until it actually changes the
00202             # directory mod time.
00203             if time.time() > limit:
00204                 raise RuntimeError(
00205                     "This platform (%s) does not update directory mod times "
00206                     "reliably." % sys.platform)
00207             time.sleep(0.1)
00208             fn = join(self.skin_path_name, '.touch')
00209             f = open(fn, 'w')
00210             f.write('Temporary file')
00211             f.close()
00212             remove(fn)
00213             new_mtime = stat(self.skin_path_name)[8]
00214 
00215     def setUp(self):
00216         # store the place where the skin copy will be created
00217         self.tempname = mktemp(dir=getPackageLocation('Products.CMFCore.tests'))
00218         # create the temporary folder
00219         mkdir(self.tempname)
00220         # copy the source fake skin to the new location
00221         copytree(join(self._sourceprefix,
00222                       self._skinname),
00223                  join(self.tempname,
00224                       self._skinname))
00225         # make sure we have a writable copy
00226         for root, dirs, files in walk(self.tempname):
00227             for name in files:
00228                 chmod(join(root, name), S_IREAD+S_IWRITE)
00229         FSDVTest.setUp(self)
00230 
00231     def tearDown(self):
00232         FSDVTest.tearDown(self)
00233         # kill the copy
00234         try:
00235             rmtree(self.tempname)
00236         except OSError:
00237             # try again (some files might be locked temporarily)
00238             time.sleep(0.1)
00239             rmtree(self.tempname)