Back to index

moin  1.9.0~rc2
ldap_testbase.py
Go to the documentation of this file.
00001 # -*- coding: utf-8 -*-
00002 """
00003     LDAPTestBase: LDAP testing support for py.test based unit tests
00004 
00005     Features
00006     --------
00007 
00008     * setup_class
00009       * automatic creation of a temporary LDAP server environment
00010       * automatic creation of a LDAP server process (slapd)
00011 
00012     * teardown_class
00013       * LDAP server process will be killed and termination will be waited for
00014       * temporary LDAP environment will be removed
00015 
00016     Usage
00017     -----
00018 
00019     Write your own test class and derive from LDAPTestBase:
00020 
00021     class TestLdap(LDAPTestBase):
00022         def testFunction(self):
00023             server_url = self.ldap_env.slapd.url
00024             lo = ldap.initialize(server_url)
00025             lo.simple_bind_s('', '')
00026 
00027     Notes
00028     -----
00029 
00030     On Ubuntu 8.04 there is apparmor imposing some restrictions on /usr/sbin/slapd,
00031     so you need to disable apparmor by invoking this as root:
00032 
00033     # /etc/init.d/apparmor stop
00034 
00035     @copyright: 2008 by Thomas Waldmann
00036     @license: GNU GPL, see COPYING for details.
00037 """
00038 
00039 SLAPD_EXECUTABLE = 'slapd'  # filename of LDAP server executable - if it is not
00040                             # in your PATH, you have to give full path/filename.
00041 
00042 import os, shutil, tempfile, time, base64, md5
00043 from StringIO import StringIO
00044 import signal
00045 import subprocess
00046 
00047 try:
00048     import ldap, ldif, ldap.modlist  # needs python-ldap
00049 except ImportError:
00050     ldap = None
00051 
00052 
00053 def check_environ():
00054     """ Check the system environment whether we are able to run.
00055         Either return some failure reason if we can't or None if everything
00056         looks OK.
00057     """
00058     if ldap is None:
00059         return "You need python-ldap installed to use ldap_testbase."
00060     slapd = False
00061     try:
00062         p = subprocess.Popen([SLAPD_EXECUTABLE, '-V'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
00063         pid = p.pid
00064         rc = p.wait()
00065         if pid and rc == 1:
00066             slapd = True  # it works
00067     except OSError, err:
00068         import errno
00069         if not (err.errno == errno.ENOENT or
00070                 (err.errno == 3 and os.name == 'nt')):
00071             raise
00072     if not slapd:
00073         return "Can't start %s (see SLAPD_EXECUTABLE)." % SLAPD_EXECUTABLE
00074     return None
00075 
00076 
00077 class Slapd(object):
00078     """ Manage a slapd process for testing purposes """
00079     def __init__(self,
00080                  config=None,  # config filename for -f
00081                  executable=SLAPD_EXECUTABLE,
00082                  debug_flags='', # None,  # for -d stats,acl,args,trace,sync,config
00083                  proto='ldap', ip='127.0.0.1', port=3890,  # use -h proto://ip:port
00084                  service_name=''  # defaults to -n executable:port, use None to not use -n
00085                 ):
00086         self.executable = executable
00087         self.config = config
00088         self.debug_flags = debug_flags
00089         self.proto = proto
00090         self.ip = ip
00091         self.port = port
00092         self.url = '%s://%s:%d' % (proto, ip, port) # can be used for ldap.initialize() call
00093         if service_name == '':
00094             self.service_name = '%s:%d' % (executable, port)
00095         else:
00096             self.service_name = service_name
00097 
00098     def start(self, timeout=0):
00099         """ start a slapd process and optionally wait up to timeout seconds until it responds """
00100         args = [self.executable, '-h', self.url, ]
00101         if self.config is not None:
00102             args.extend(['-f', self.config])
00103         if self.debug_flags is not None:
00104             args.extend(['-d', self.debug_flags])
00105         if self.service_name:
00106             args.extend(['-n', self.service_name])
00107         self.process = subprocess.Popen(args)
00108         started = None
00109         if timeout:
00110             lo = ldap.initialize(self.url)
00111             ldap.set_option(ldap.OPT_PROTOCOL_VERSION, ldap.VERSION3) # ldap v2 is outdated
00112             started = False
00113             wait_until = time.time() + timeout
00114             while time.time() < wait_until:
00115                 try:
00116                     lo.simple_bind_s('', '')
00117                     started = True
00118                 except ldap.SERVER_DOWN, err:
00119                     time.sleep(0.1)
00120                 else:
00121                     break
00122         return started
00123 
00124     def stop(self):
00125         """ stop this slapd process and wait until it has terminated """
00126         pid = self.process.pid
00127         os.kill(pid, signal.SIGTERM)
00128         os.waitpid(pid, 0)
00129 
00130 
00131 class LdapEnvironment(object):
00132     """ Manage a (temporary) environment for running a slapd in it """
00133 
00134     # default DB_CONFIG bdb configuration file contents
00135     DB_CONFIG = """\
00136 # STRANGE: if i use those settings, after the test slapd goes to 100% and doesn't terminate on SIGTERM
00137 # Set the database in memory cache size.
00138 #set_cachesize 0 10000000 1
00139 
00140 # Set log values.
00141 #set_lg_regionmax 262144
00142 #set_lg_bsize 262144
00143 #set_lg_max 10485760
00144 
00145 #set_tas_spins 0
00146 """
00147 
00148     def __init__(self,
00149                  basedn,
00150                  rootdn, rootpw,
00151                  instance=0,  # use different values when running multiple LdapEnvironments
00152                  schema_dir='/etc/ldap/schema',  # directory with schemas
00153                  coding='utf-8',  # coding used for config files
00154                  timeout=10,  # how long to wait for slapd starting [s]
00155                 ):
00156         self.basedn = basedn
00157         self.rootdn = rootdn
00158         self.rootpw = rootpw
00159         self.instance = instance
00160         self.schema_dir = schema_dir
00161         self.coding = coding
00162         self.ldap_dir = None
00163         self.slapd_conf = None
00164         self.timeout = timeout
00165 
00166     def create_env(self, slapd_config, db_config=DB_CONFIG):
00167         """ create a temporary LDAP server environment in a temp. directory,
00168             including writing a slapd.conf (see configure_slapd) and a
00169             DB_CONFIG there.
00170         """
00171         # create directories
00172         self.ldap_dir = tempfile.mkdtemp(prefix='LdapEnvironment-%d.' % self.instance)
00173         self.ldap_db_dir = os.path.join(self.ldap_dir, 'db')
00174         os.mkdir(self.ldap_db_dir)
00175 
00176         # create DB_CONFIG for bdb backend
00177         db_config_fname = os.path.join(self.ldap_db_dir, 'DB_CONFIG')
00178         f = open(db_config_fname, 'w')
00179         f.write(db_config)
00180         f.close()
00181 
00182         rootpw = '{MD5}' + base64.b64encode(md5.new(self.rootpw).digest())
00183 
00184         # create slapd.conf from content template in slapd_config
00185         slapd_config = slapd_config % {
00186             'ldap_dir': self.ldap_dir,
00187             'ldap_db_dir': self.ldap_db_dir,
00188             'schema_dir': self.schema_dir,
00189             'basedn': self.basedn,
00190             'rootdn': self.rootdn,
00191             'rootpw': rootpw,
00192         }
00193         if isinstance(slapd_config, unicode):
00194             slapd_config = slapd_config.encode(self.coding)
00195         self.slapd_conf = os.path.join(self.ldap_dir, "slapd.conf")
00196         f = open(self.slapd_conf, 'w')
00197         f.write(slapd_config)
00198         f.close()
00199 
00200     def start_slapd(self):
00201         """ start a slapd and optionally wait until it talks with us """
00202         self.slapd = Slapd(config=self.slapd_conf, port=3890+self.instance)
00203         started = self.slapd.start(timeout=self.timeout)
00204         return started
00205 
00206     def load_directory(self, ldif_content):
00207         """ load the directory with the ldif_content (str) """
00208         lo = ldap.initialize(self.slapd.url)
00209         ldap.set_option(ldap.OPT_PROTOCOL_VERSION, ldap.VERSION3) # ldap v2 is outdated
00210         lo.simple_bind_s(self.rootdn, self.rootpw)
00211 
00212         class LDIFLoader(ldif.LDIFParser):
00213             def handle(self, dn, entry):
00214                 lo.add_s(dn, ldap.modlist.addModlist(entry))
00215 
00216         loader = LDIFLoader(StringIO(ldif_content))
00217         loader.parse()
00218 
00219     def stop_slapd(self):
00220         """ stop a slapd """
00221         self.slapd.stop()
00222 
00223     def destroy_env(self):
00224         """ remove the temporary LDAP server environment """
00225         shutil.rmtree(self.ldap_dir)
00226 
00227 try:
00228     import py.test
00229 
00230     class LDAPTstBase:
00231         """ Test base class for py.test based tests which need a LDAP server to talk to.
00232 
00233             Inherit your test class from this base class to test LDAP stuff.
00234         """
00235 
00236         # You MUST define these in your derived class:
00237         slapd_config = None  # a string with your slapd.conf template
00238         ldif_content = None  # a string with your ldif contents
00239         basedn = None  # your base DN
00240         rootdn = None  # root DN
00241         rootpw = None  # root password
00242 
00243         def setup_class(self):
00244             """ Create LDAP server environment, start slapd """
00245             self.ldap_env = LdapEnvironment(self.basedn, self.rootdn, self.rootpw)
00246             self.ldap_env.create_env(slapd_config=self.slapd_config)
00247             started = self.ldap_env.start_slapd()
00248             if not started:
00249                 py.test.skip("Failed to start %s process, please see your syslog / log files"
00250                              " (and check if stopping apparmor helps, in case you use it)." % SLAPD_EXECUTABLE)
00251             self.ldap_env.load_directory(ldif_content=self.ldif_content)
00252 
00253         def teardown_class(self):
00254             """ Stop slapd, remove LDAP server environment """
00255             self.ldap_env.stop_slapd()
00256             self.ldap_env.destroy_env()
00257 
00258 except ImportError:
00259     pass  # obviously py.test not in use
00260