Back to index

plone3  3.1.7
test_sqlstorage.py
Go to the documentation of this file.
00001 ################################################################################
00002 #
00003 # Copyright (c) 2002-2005, Benjamin Saller <bcsaller@ideasuite.com>, and
00004 #                              the respective authors. All rights reserved.
00005 # For a list of Archetypes contributors see docs/CREDITS.txt.
00006 #
00007 # Redistribution and use in source and binary forms, with or without
00008 # modification, are permitted provided that the following conditions are met:
00009 #
00010 # * Redistributions of source code must retain the above copyright notice, this
00011 #   list of conditions and the following disclaimer.
00012 # * Redistributions in binary form must reproduce the above copyright notice,
00013 #   this list of conditions and the following disclaimer in the documentation
00014 #   and/or other materials provided with the distribution.
00015 # * Neither the name of the author nor the names of its contributors may be used
00016 #   to endorse or promote products derived from this software without specific
00017 #   prior written permission.
00018 #
00019 # THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
00020 # WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00021 # WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
00022 # FOR A PARTICULAR PURPOSE.
00023 #
00024 ################################################################################
00025 """
00026 """
00027 
00028 import os, sys
00029 from Testing import ZopeTestCase
00030 
00031 from Products.Archetypes.tests.atsitetestcase import ATSiteTestCase
00032 from Products.Archetypes.tests.utils import PACKAGE_HOME
00033 from Products.Archetypes.tests.utils import makeContent
00034 
00035 import transaction
00036 from zExceptions.ExceptionFormatter import format_exception
00037 
00038 def pretty_exc(self, exc, *args, **kw):
00039     t, e, tb = exc
00040     try:
00041         return ''.join(format_exception(t, e, tb, format_src=1))
00042     except:
00043         return ''.join(format_exception(t, e, tb))
00044 
00045 import unittest
00046 unittest.TestResult._exc_info_to_string = pretty_exc
00047 
00048 from Products.Archetypes.atapi import *
00049 from Products.Archetypes.config import PKG_NAME
00050 from Products.Archetypes.config import TOOL_NAME
00051 from Products.Archetypes import SQLStorage
00052 from Products.Archetypes import SQLMethod
00053 from Products.CMFCore.utils import getToolByName
00054 from Products.CMFCore.TypesTool import FactoryTypeInformation
00055 from Products.Archetypes.tests.utils import makeContent
00056 from Products.Archetypes.tests.utils import Dummy
00057 
00058 from DateTime import DateTime
00059 
00060 # the id to use in the connection objects
00061 connection_id = 'sql_connection'
00062 
00063 # the db names and Connection strings
00064 connectors = {}
00065 
00066 # aditional cleanup
00067 cleanup = {}
00068 
00069 
00070 # Gadfly
00071 
00072 # XXX: Gadfly no longer works in Zope 2.7.8/2.8.2
00073 # See http://www.zope.org/Collectors/Zope/556
00074 # We may get around this by avoiding NULL in SQLStorage,
00075 # but for now I am disabling the tests. [shh]
00076 #
00077 #try:
00078 #    from Products.ZGadflyDA.DA import Connection
00079 #except ImportError:
00080 #    print >>sys.stderr, 'Failed to import ZGadflyDA'
00081 #else:
00082 #    ZopeTestCase.installProduct('ZGadflyDA', 0)
00083 #    connectors['Gadfly'] = 'demo'
00084 
00085 # Postgresql
00086 
00087 try:
00088     from Products.ZPsycopgDA.DA import Connection
00089 except ImportError:
00090     print >>sys.stderr, 'Failed to import ZPsycopgDA'
00091 else:
00092     ZopeTestCase.installProduct('ZPsycopgDA', 0)
00093     connectors['Postgre'] = 'dbname=demo user=demo host=gandalf'
00094 
00095 # MySQL
00096 
00097 try:
00098     from Products.ZMySQLDA.DA import Connection
00099 except ImportError:
00100     print >>sys.stderr, 'Failed to import ZMySQLDA'
00101 else:
00102     ZopeTestCase.installProduct('ZMySQLDA', 0)
00103     transactional = 1 # needs INNODB!
00104     if transactional:
00105         connectors['MySQL'] = '+demo@gandalf demo'
00106     else:
00107         connectors['MySQL'] = '-demo@gandalf demo'
00108         def cleanupMySQL(self):
00109             instance = self._dummy
00110             args = {}
00111             args['table'] = 'Dummy'
00112             storage = self._storage_class
00113             method = SQLMethod(instance)
00114             method.edit(connection_id, ' '.join(args.keys()),
00115                         storage.query_drop)
00116             query, result = method(test__=1, **args)
00117         cleanup['MySQL'] = cleanupMySQL
00118 
00119 
00120 def gen_dummy(storage_class):
00121 
00122     Dummy.schema = BaseSchema + Schema((
00123 
00124         ObjectField(
00125         'aobjectfield',
00126         storage = storage_class()
00127         ),
00128 
00129         StringField(
00130         'astringfield',
00131         storage = storage_class()
00132         ),
00133 
00134         TextField(
00135         'atextfield',
00136         storage = storage_class()
00137         ),
00138 
00139         DateTimeField(
00140         'adatetimefield',
00141         storage = storage_class()
00142         ),
00143 
00144         LinesField(
00145         'alinesfield',
00146         storage = storage_class()
00147         ),
00148 
00149         IntegerField(
00150         'aintegerfield',
00151         storage = storage_class()
00152         ),
00153 
00154         FloatField(
00155         'afloatfield',
00156         storage = storage_class()
00157         ),
00158 
00159         FixedPointField(
00160         'afixedpointfield',
00161         storage = storage_class()
00162         ),
00163 
00164 ## Xiru: The new reference engine is not SQLStorage aware!
00165 
00166 ##         ReferenceField(
00167 ##         'areferencefield',
00168 ##         storage = storage_class()
00169 ##         ),
00170 
00171         BooleanField(
00172         'abooleanfield',
00173         storage = storage_class()
00174         ),
00175 
00176 ## Xiru: SQLStorage does not support the field types below. For
00177 ## FileField, use ObjectManagedStorage or AttributeStorage and for
00178 ## ImageField and PhotoField use AttributeStorage. They are complex
00179 ## object and persist their content in a RDBMS is not a trivial task
00180 ## (at lest, not without break a lot of things).
00181 
00182 ##         FileField(
00183 ##         'afilefield',
00184 ##         storage = storage_class()
00185 ##         ),
00186 
00187 ##         ImageField(
00188 ##         'aimagefield',
00189 ##         storage = storage_class()
00190 ##         ),
00191 
00192 ##         PhotoField(
00193 ##         'aphotofield',
00194 ##         storage = storage_class()
00195 ##         ),
00196 
00197     ))
00198 
00199     registerType(Dummy, PKG_NAME)
00200 
00201     content_types, constructors, ftis = process_types(listTypes(), PKG_NAME)
00202 
00203 
00204 def commonAfterSetUp(self):
00205 
00206     portal = self.portal
00207 
00208     # create the Database Adaptor (DA)
00209     if self.db_name == 'Postgre':
00210         portal.manage_addProduct['ZPsycopgDA'].manage_addZPsycopgConnection(
00211             id = connection_id, title = 'PostgreSQL Archetypes Storage',
00212             connection_string = connectors[self.db_name], zdatetime = 1, check = 1)
00213     elif self.db_name == 'MySQL':
00214         portal.manage_addProduct['ZMySQLDA'].manage_addZMySQLConnection(
00215             id = connection_id, title = 'MySQL Archetypes Storage',
00216             connection_string = connectors[self.db_name], check = 1)
00217     elif self.db_name == 'Gadfly':
00218         portal.manage_addProduct['ZGadflyDA'].manage_addZGadflyConnection(
00219             id = connection_id, title = 'Gadfly Archetypes Storage',
00220             connection = connectors[self.db_name], check = 1)
00221 
00222     # add type information for Dummy
00223     tt = portal.portal_types
00224     tt.manage_addTypeInformation(
00225         FactoryTypeInformation.meta_type,
00226         id = 'Dummy',
00227         typeinfo_name = 'CMFDefault: Document')
00228 
00229     # set archetype_tool default connection
00230     at = getToolByName(portal, TOOL_NAME)
00231     at.setDefaultConn(connection_id)
00232 
00233     # create storage instance and schema
00234     storage_class = getattr(SQLStorage, self.db_name + 'SQLStorage')
00235     gen_dummy(storage_class)
00236     self._storage_class = storage_class
00237 
00238     # create a object instance
00239     obj = Dummy(oid = 'dummy')
00240     portal._setObject('dummy', obj)
00241     obj = getattr(portal, 'dummy')
00242     self._dummy = obj
00243 
00244     # set meta_type for renaming
00245     obj.__factory_meta_type__ = 'Archetypes Content'   # Is It really needed?
00246     obj.meta_type = 'Archetypes Content'
00247 
00248 
00249 class GadflyMagic:
00250     """ Make Gadfly tests work out of the box """
00251 
00252     gadfly_dir = os.path.join(PACKAGE_HOME, 'gadfly')
00253     demo_dir = os.path.join(gadfly_dir, 'demo')
00254 
00255     def afterSetUp(self):
00256         # Patch Gadfly to work off the temp dir
00257         from Products.ZGadflyDA import db
00258         self._data_dir = db.data_dir
00259         db.data_dir = self.gadfly_dir
00260         # Create Gadfly database
00261         os.mkdir(self.gadfly_dir)
00262         os.mkdir(self.demo_dir)
00263 
00264     def afterClear(self):
00265         # Restore Gadfly config
00266         if hasattr(self, '_data_dir'):
00267             from Products.ZGadflyDA import db
00268             db.data_dir = self._data_dir
00269         # Remove Gadfly database
00270         if os.path.isdir(self.gadfly_dir):
00271             import shutil
00272             shutil.rmtree(self.gadfly_dir, 1)
00273 
00274 
00275 class SQLStorageTestBase(GadflyMagic, ATSiteTestCase):
00276     """ Abstract base class for the tests """
00277 
00278     db_name = ''
00279     cleanup = cleanup
00280 
00281     def afterSetUp(self):
00282         GadflyMagic.afterSetUp(self)
00283         commonAfterSetUp(self)
00284 
00285     def beforeTearDown(self):
00286         clean = self.cleanup.get(self.db_name, None)
00287         if clean is not None:
00288             clean(self)
00289 
00290 
00291 class SQLStorageTest(SQLStorageTestBase):
00292 
00293     def test_objectfield(self):
00294         dummy = self._dummy
00295         value = dummy.getAobjectfield()
00296         __traceback_info__ = (self.db_name, repr(value), None)
00297         # Gadfly represents None as an empty string
00298         if self.db_name == 'Gadfly':
00299             self.failUnless(value == '')
00300         else:
00301             self.failUnless(value is None)
00302         dummy.setAobjectfield('Bla')
00303         value = dummy.getAobjectfield()
00304         __traceback_info__ = (self.db_name, repr(value), 'Bla')
00305         self.failUnless(value == 'Bla')
00306 
00307     def test_stringfield(self):
00308         dummy = self._dummy
00309         value = dummy.getAstringfield()
00310         __traceback_info__ = (self.db_name, repr(value), None)
00311         # Gadfly represents None as an empty string
00312         if self.db_name == 'Gadfly':
00313             self.failUnless(value == '')
00314         else:
00315             self.failUnless(value is None)
00316         dummy.setAstringfield('Bla')
00317         value = dummy.getAstringfield()
00318         __traceback_info__ = (self.db_name, repr(value), 'Bla')
00319         self.failUnless(value == 'Bla')
00320 
00321     def test_stringfield_bug1003868(self):
00322         s = unicode('a?!', 'latin1')
00323         sp = self.portal.portal_properties.site_properties
00324         dummy = self._dummy
00325 
00326         sp.default_charset = 'latin1'
00327         dummy.setAstringfield(s)
00328         value = dummy.getAstringfield()
00329         __traceback_info__ = (self.db_name, repr(value), s)
00330         self.failUnlessEqual(value, s.encode(sp.default_charset))
00331 
00332         sp.default_charset = 'utf8'
00333         dummy.setAstringfield(s)
00334         value = dummy.getAstringfield()
00335         __traceback_info__ = (self.db_name, repr(value), s)
00336         self.failUnlessEqual(value, s.encode(sp.default_charset))
00337 
00338     def test_textfield(self):
00339         dummy = self._dummy
00340         value = dummy.getAtextfield()
00341         __traceback_info__ = (self.db_name, repr(value), None)
00342         # Gadfly represents None as an empty string
00343         if self.db_name == 'Gadfly':
00344             self.failUnless(str(value) == '', (value, ''))
00345         else:
00346             self.failUnless(value is None, (value, None))
00347         dummy.setAtextfield('Bla')
00348         value = dummy.getAtextfield()
00349         __traceback_info__ = (self.db_name, repr(value), 'Bla')
00350         self.failUnless(str(value) == 'Bla', (value, 'Bla'))
00351 
00352     def test_datetimefield(self):
00353         dummy = self._dummy
00354         value = dummy.getAdatetimefield()
00355         __traceback_info__ = (self.db_name, repr(value), None)
00356         self.failUnless(value is None)
00357         now = DateTime()
00358         dummy.setAdatetimefield(now)
00359         value = dummy.getAdatetimefield()
00360         __traceback_info__ = (self.db_name, value, now)
00361         self.failUnless(value.Time() == now.Time())
00362 
00363     def test_linesfield(self):
00364         dummy = self._dummy
00365         value = dummy.getAlinesfield()
00366         __traceback_info__ = (self.db_name, repr(value), ())
00367         self.failUnless(value is ())
00368         dummy.setAlinesfield(('bla', 'blo'))
00369         value = dummy.getAlinesfield()
00370         __traceback_info__ = (self.db_name, repr(value), ('bla', 'blo'))
00371         self.failUnless(value == ('bla', 'blo'))
00372 
00373     def test_integerfield(self):
00374         dummy = self._dummy
00375         value = dummy.getAintegerfield()
00376         __traceback_info__ = (self.db_name, repr(value), None)
00377         self.failUnless(value is None)
00378         dummy.setAintegerfield(23)
00379         value = dummy.getAintegerfield()
00380         __traceback_info__ = (self.db_name, repr(value), 23)
00381         self.failUnless(value == 23)
00382 
00383     def test_floatfield(self):
00384         dummy = self._dummy
00385         value = dummy.getAfloatfield()
00386         __traceback_info__ = (self.db_name, repr(value), None)
00387         self.failUnless(value is None)
00388         dummy.setAfloatfield(12.34)
00389         value = dummy.getAfloatfield()
00390         __traceback_info__ = (self.db_name, repr(value), 12.34)
00391         self.failUnless(value == 12.34)
00392 
00393     def test_fixedpointfield(self):
00394         dummy = self._dummy
00395         value = dummy.getAfixedpointfield()
00396         __traceback_info__ = (self.db_name, repr(value), '0.00')
00397         self.failUnless(value == '0.00')
00398         dummy.setAfixedpointfield('2.3')
00399         value = dummy.getAfixedpointfield()
00400         __traceback_info__ = (self.db_name, repr(value), '2.3')
00401         self.failUnless(value == '2.30')
00402 
00403 ## Xiru: This test is done, but it is not testing the storage "in
00404 ## practice" because reference field is not SQLStorage aware.
00405 
00406 ##     def test_referencefield(self):
00407 ##         dummy = self._dummy
00408 ##         value = dummy.getAreferencefield()
00409 ##         __traceback_info__ = (self.db_name, repr(value), [])
00410 ##         self.failUnless(value == [])
00411 
00412 ##         portal = self.portal
00413 
00414 ##         # create another object instance (dummy2) and test the
00415 ##         # reference creation from dummy to dummy2
00416 ##         obj = Dummy(oid = 'dummy2')
00417 ##         portal._setObject('dummy2', obj)
00418 ##         obj = getattr(portal, 'dummy2')
00419 ##         dummy2 = obj
00420 
00421 ##         dummy.setAreferencefield([dummy2])
00422 ##         value = dummy.getAreferencefield()
00423 ##         __traceback_info__ = (self.db_name, repr(value), [dummy2])
00424 ##         self.failUnless(value == [dummy2])
00425 
00426 ##         # one more object instance (dummy3) and test the reference
00427 ##         # creation from dummy3 to dummy and dummy2
00428 ##         obj = Dummy(oid = 'dummy3')
00429 ##         portal._setObject('dummy3', obj)
00430 ##         obj = getattr(portal, 'dummy3')
00431 ##         dummy3 = obj
00432 
00433 ##         dummy3.setAreferencefield([dummy, dummy2])
00434 ##         value = dummy3.getAreferencefield()
00435 ##         __traceback_info__ = (self.db_name, repr(value), [dummy, dummy2])
00436 ##         self.failUnless(value == [dummy, dummy2])
00437 
00438     def test_booleanfield(self):
00439         dummy = self._dummy
00440         value = dummy.getAbooleanfield()
00441         __traceback_info__ = (self.db_name, repr(value), None)
00442         self.failUnless(not value)
00443 
00444         dummy.setAbooleanfield(1)
00445         value = dummy.getAbooleanfield()
00446         __traceback_info__ = (self.db_name, repr(value), 1)
00447         self.failUnless(value == 1)
00448 
00449         dummy.setAbooleanfield(0)
00450         value = dummy.getAbooleanfield()
00451         __traceback_info__ = (self.db_name, repr(value), 0)
00452         self.failUnless(value == 0)
00453 
00454     def test_rename(self):
00455         self.loginAsPortalOwner()
00456         dummy = self._dummy
00457         content = 'The book is on the table!'
00458         dummy.setAtextfield(content)
00459         got = dummy.getAtextfield()
00460         self.failUnless(str(got) == content, (got, content))
00461         portal = self.portal
00462         obj_id = 'dummy'
00463         new_id = 'new_dummy'
00464         # make sure we have _p_jar
00465         transaction.savepoint(optimistic=True)
00466         portal.manage_renameObject(obj_id, new_id)
00467         dummy = getattr(portal, new_id)
00468         got = dummy.getAtextfield()
00469         self.failUnless(str(got) == content, (got, content))
00470 
00471 ## Xiru: These 3 tests below need some refactory!
00472 
00473 ##         def test_parentUID(self):
00474 ##             portal = self.portal
00475 ##             makeContent(portal, portal_type='SimpleFolder', id='folder1')
00476 ##             folder1 = getattr(portal, 'folder1')
00477 ##             makeContent(portal, portal_type='SimpleFolder', id='folder2')
00478 ##             folder2 = getattr(portal, 'folder2')
00479 ##             obj_id = 'dummy'
00480 ##             # make sure we have _p_jar
00481 ##             transaction.savepoint(optimistic=True)
00482 ##             cb = portal.manage_cutObjects([obj_id])
00483 ##             folder1.manage_pasteObjects(cb)
00484 ##             # shit, why this does not work anymore?
00485 ##             doc = getattr(folder1, obj_id)
00486 ##             PUID1 = folder1.UID()
00487 ##             f = StringField('PARENTUID', storage=doc.Schema()['atextfield'].storage)
00488 ##             PUID = f.get(doc)
00489 ##             __traceback_info__ = (self.db_name, str(PUID), str(PUID1))
00490 ##             self.failUnless(PUID == PUID1)
00491 ##             # make sure we have _p_jar
00492 ##             transaction.savepoint(optimistic=True)
00493 ##             cb = folder1.manage_cutObjects([obj_id])
00494 ##             folder2.manage_pasteObjects(cb)
00495 ##             PUID2 = folder2.UID()
00496 ##             doc = getattr(folder2, obj_id)
00497 ##             PUID = f.get(doc)
00498 ##             __traceback_info__ = (self.db_name, str(PUID2), str(PUID))
00499 ##             self.failUnless(str(PUID2) == str(PUID))
00500 
00501 ##         def test_emptyPUID(self):
00502 ##             portal = self.portal
00503 ##             obj_id = 'dummy'
00504 ##             portal._setObject(obj_id, self._nwdummy)
00505 ##             doc = getattr(portal, obj_id)
00506 ##             doc.initializeArchetype()
00507 ##             f = StringField('PARENTUID',
00508 ##                             storage=doc.Schema()['atextfield'].storage)
00509 ##             PUID = f.get(doc)
00510 ##             __traceback_info__ = (self.db_name, str(PUID), 'None')
00511 ##             self.failUnless(PUID == 'None')
00512 
00513 ##         def test_nomoreparentUID(self):
00514 ##             portal = self.portal
00515 ##             makeContent(portal, portal_type='SimpleFolder', id='folder1')
00516 ##             folder1 = getattr(portal, 'folder1')
00517 ##             obj_id = 'dummy'
00518 ##             folder1._setObject(obj_id, self._nwdummy)
00519 ##             doc = getattr(folder1, obj_id)
00520 ##             doc.initializeArchetype()
00521 ##             PUID1 = folder1.UID()
00522 ##             f = StringField('PARENTUID',
00523 ##                             storage=doc.Schema()['atextfield'].storage)
00524 ##             PUID = f.get(doc)
00525 ##             __traceback_info__ = (self.db_name, str(PUID), str(PUID1))
00526 ##             self.failUnless(str(PUID) == str(PUID1))
00527 ##             # make sure we have _p_jar
00528 ##             transaction.savepoint(optimistic=True)
00529 ##             cb = folder1.manage_cutObjects(ids=(obj_id,))
00530 ##             portal.manage_pasteObjects(cb)
00531 ##             doc = getattr(portal, obj_id)
00532 ##             PUID = f.get(doc)
00533 ##             __traceback_info__ = (self.db_name, str(PUID), 'None')
00534 ##             self.failUnless(PUID == 'None')
00535 
00536 
00537 # test each db
00538 
00539 tests = []
00540 
00541 for db_name in connectors.keys():
00542 
00543     class StorageTest(SQLStorageTest):
00544         db_name = db_name
00545 
00546     tests.append(StorageTest)
00547 
00548 
00549 # run tests
00550 
00551 def test_suite():
00552     from unittest import TestSuite, makeSuite
00553     suite = TestSuite()
00554     for test in tests:
00555         suite.addTest(makeSuite(test))
00556     return suite