Back to index

citadel  8.12
database.c
Go to the documentation of this file.
00001 /*
00002  * This is a data store backend for the Citadel server which uses Berkeley DB.
00003  *
00004  * Copyright (c) 1987-2012 by the citadel.org team
00005  *
00006  * This program is open source software; you can redistribute it and/or
00007  * modify it under the terms of the GNU General Public License version 3.
00008  *
00009  * This program is distributed in the hope that it will be useful,
00010  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012  * GNU General Public License for more details.
00013  *
00014  */
00015 
00016 /*****************************************************************************
00017        Tunable configuration parameters for the Berkeley DB back end
00018  *****************************************************************************/
00019 
00020 /* Citadel will checkpoint the db at the end of every session, but only if
00021  * the specified number of kilobytes has been written, or if the specified
00022  * number of minutes has passed, since the last checkpoint.
00023  */
00024 #define MAX_CHECKPOINT_KBYTES      256
00025 #define MAX_CHECKPOINT_MINUTES     15
00026 
00027 /*****************************************************************************/
00028 
00029 #include "sysdep.h"
00030 #include <stdlib.h>
00031 #include <unistd.h>
00032 #include <stdio.h>
00033 #include <ctype.h>
00034 #include <string.h>
00035 #include <errno.h>
00036 #include <sys/types.h>
00037 #include <sys/stat.h>
00038 #include <dirent.h>
00039 #include <syslog.h>
00040 #include <zlib.h>
00041 
00042 #ifdef HAVE_DB_H
00043 #include <db.h>
00044 #elif defined(HAVE_DB4_DB_H)
00045 #include <db4/db.h>
00046 #else
00047 #error Neither <db.h> nor <db4/db.h> was found by configure. Install db4-devel.
00048 #endif
00049 
00050 
00051 #if DB_VERSION_MAJOR < 4 || DB_VERSION_MINOR < 1
00052 #error Citadel requires Berkeley DB v4.1 or newer.  Please upgrade.
00053 #endif
00054 
00055 
00056 #include <libcitadel.h>
00057 #include "citadel.h"
00058 #include "server.h"
00059 #include "citserver.h"
00060 #include "database.h"
00061 #include "msgbase.h"
00062 #include "sysdep_decls.h"
00063 #include "threads.h"
00064 #include "config.h"
00065 #include "control.h"
00066 
00067 #include "ctdl_module.h"
00068 
00069 
00070 static DB *dbp[MAXCDB];            /* One DB handle for each Citadel database */
00071 static DB_ENV *dbenv;              /* The DB environment (global) */
00072 
00073 
00074 void cdb_abort(void) {
00075        syslog(LOG_DEBUG,
00076               "citserver is stopping in order to prevent data loss. uid=%d gid=%d euid=%d egid=%d",
00077               getuid(),
00078               getgid(),
00079               geteuid(),
00080               getegid()
00081        );
00082        cit_backtrace();
00083        exit(CTDLEXIT_DB);
00084 }
00085 
00086 
00087 /* Verbose logging callback */
00088 void cdb_verbose_log(const DB_ENV *dbenv, const char *msg)
00089 {
00090        if (!IsEmptyStr(msg)) {
00091               syslog(LOG_DEBUG, "DB: %s", msg);
00092        }
00093 }
00094 
00095 
00096 /* Verbose logging callback */
00097 void cdb_verbose_err(const DB_ENV *dbenv, const char *errpfx, const char *msg)
00098 {
00099        syslog(LOG_ALERT, "DB: %s", msg);
00100 }
00101 
00102 
00103 /* just a little helper function */
00104 static void txabort(DB_TXN * tid)
00105 {
00106        int ret;
00107 
00108        ret = tid->abort(tid);
00109 
00110        if (ret) {
00111               syslog(LOG_EMERG, "bdb(): txn_abort: %s", db_strerror(ret));
00112               cdb_abort();
00113        }
00114 }
00115 
00116 /* this one is even more helpful than the last. */
00117 static void txcommit(DB_TXN * tid)
00118 {
00119        int ret;
00120 
00121        ret = tid->commit(tid, 0);
00122 
00123        if (ret) {
00124               syslog(LOG_EMERG, "bdb(): txn_commit: %s", db_strerror(ret));
00125               cdb_abort();
00126        }
00127 }
00128 
00129 /* are you sensing a pattern yet? */
00130 static void txbegin(DB_TXN ** tid)
00131 {
00132        int ret;
00133 
00134        ret = dbenv->txn_begin(dbenv, NULL, tid, 0);
00135 
00136        if (ret) {
00137               syslog(LOG_EMERG, "bdb(): txn_begin: %s", db_strerror(ret));
00138               cdb_abort();
00139        }
00140 }
00141 
00142 static void dbpanic(DB_ENV * env, int errval)
00143 {
00144        syslog(LOG_EMERG, "bdb(): PANIC: %s", db_strerror(errval));
00145 }
00146 
00147 static void cclose(DBC * cursor)
00148 {
00149        int ret;
00150 
00151        if ((ret = cursor->c_close(cursor))) {
00152               syslog(LOG_EMERG, "bdb(): c_close: %s", db_strerror(ret));
00153               cdb_abort();
00154        }
00155 }
00156 
00157 static void bailIfCursor(DBC ** cursors, const char *msg)
00158 {
00159        int i;
00160 
00161        for (i = 0; i < MAXCDB; i++)
00162               if (cursors[i] != NULL) {
00163                      syslog(LOG_EMERG, "bdb(): cursor still in progress on cdb %02x: %s", i, msg);
00164                      cdb_abort();
00165               }
00166 }
00167 
00168 
00169 void cdb_check_handles(void)
00170 {
00171        bailIfCursor(TSD->cursors, "in check_handles");
00172 
00173        if (TSD->tid != NULL) {
00174               syslog(LOG_EMERG, "bdb(): transaction still in progress!");
00175               cdb_abort();
00176        }
00177 }
00178 
00179 
00180 /*
00181  * Cull the database logs
00182  */
00183 static void cdb_cull_logs(void)
00184 {
00185        u_int32_t flags;
00186        int ret;
00187        char **file, **list;
00188        char errmsg[SIZ];
00189 
00190        flags = DB_ARCH_ABS;
00191 
00192        /* Get the list of names. */
00193        if ((ret = dbenv->log_archive(dbenv, &list, flags)) != 0) {
00194               syslog(LOG_ERR, "cdb_cull_logs: %s", db_strerror(ret));
00195               return;
00196        }
00197 
00198        /* Print the list of names. */
00199        if (list != NULL) {
00200               for (file = list; *file != NULL; ++file) {
00201                      syslog(LOG_DEBUG, "Deleting log: %s", *file);
00202                      ret = unlink(*file);
00203                      if (ret != 0) {
00204                             snprintf(errmsg, sizeof(errmsg),
00205                                     " ** ERROR **\n \n \n "
00206                                     "Citadel was unable to delete the "
00207                                     "database log file '%s' because of the "
00208                                     "following error:\n \n %s\n \n"
00209                                     " This log file is no longer in use "
00210                                     "and may be safely deleted.\n",
00211                                     *file, strerror(errno));
00212                             CtdlAideMessage(errmsg, "Database Warning Message");
00213                      }
00214               }
00215               free(list);
00216        }
00217 }
00218 
00219 /*
00220  * Manually initiate log file cull.
00221  */
00222 void cmd_cull(char *argbuf) {
00223        if (CtdlAccessCheck(ac_internal)) return;
00224        cdb_cull_logs();
00225        cprintf("%d Database log file cull completed.\n", CIT_OK);
00226 }
00227 
00228 
00229 /*
00230  * Request a checkpoint of the database.  Called once per minute by the thread manager.
00231  */
00232 void cdb_checkpoint(void)
00233 {
00234        int ret;
00235 
00236        syslog(LOG_DEBUG, "-- db checkpoint --");
00237        ret = dbenv->txn_checkpoint(dbenv, MAX_CHECKPOINT_KBYTES, MAX_CHECKPOINT_MINUTES, 0);
00238 
00239        if (ret != 0) {
00240               syslog(LOG_EMERG, "cdb_checkpoint: txn_checkpoint: %s", db_strerror(ret));
00241               cdb_abort();
00242        }
00243 
00244        /* After a successful checkpoint, we can cull the unused logs */
00245        if (config.c_auto_cull) {
00246               cdb_cull_logs();
00247        }
00248 }
00249 
00250 
00251 
00252 /*
00253  * Open the various databases we'll be using.  Any database which
00254  * does not exist should be created.  Note that we don't need a
00255  * critical section here, because there aren't any active threads
00256  * manipulating the database yet.
00257  */
00258 void open_databases(void)
00259 {
00260        int ret;
00261        int i;
00262        char dbfilename[32];
00263        u_int32_t flags = 0;
00264        int dbversion_major, dbversion_minor, dbversion_patch;
00265        int current_dbversion = 0;
00266 
00267        syslog(LOG_DEBUG, "bdb(): open_databases() starting");
00268        syslog(LOG_DEBUG, "Compiled db: %s", DB_VERSION_STRING);
00269        syslog(LOG_INFO, "  Linked db: %s",
00270               db_version(&dbversion_major, &dbversion_minor, &dbversion_patch));
00271 
00272        current_dbversion = (dbversion_major * 1000000) + (dbversion_minor * 1000) + dbversion_patch;
00273 
00274        syslog(LOG_DEBUG, "Calculated dbversion: %d", current_dbversion);
00275        syslog(LOG_DEBUG, "  Previous dbversion: %d", CitControl.MMdbversion);
00276 
00277        if ( (getenv("SUPPRESS_DBVERSION_CHECK") == NULL)
00278           && (CitControl.MMdbversion > current_dbversion) ) {
00279               syslog(LOG_EMERG, "You are attempting to run the Citadel server using a version");
00280               syslog(LOG_EMERG, "of Berkeley DB that is older than that which last created or");
00281               syslog(LOG_EMERG, "updated the database.  Because this would probably cause data");
00282               syslog(LOG_EMERG, "corruption or loss, the server is aborting execution now.");
00283               exit(CTDLEXIT_DB);
00284        }
00285 
00286        CitControl.MMdbversion = current_dbversion;
00287        put_control();
00288 
00289        syslog(LOG_INFO, "Linked zlib: %s\n", zlibVersion());
00290 
00291        /*
00292         * Silently try to create the database subdirectory.  If it's
00293         * already there, no problem.
00294         */
00295        if ((mkdir(ctdl_data_dir, 0700) != 0) && (errno != EEXIST)){
00296               syslog(LOG_EMERG, 
00297                            "unable to create database directory [%s]: %s", 
00298                            ctdl_data_dir, strerror(errno));
00299        }
00300        if (chmod(ctdl_data_dir, 0700) != 0){
00301               syslog(LOG_EMERG, 
00302                            "unable to set database directory accessrights [%s]: %s", 
00303                            ctdl_data_dir, strerror(errno));
00304        }
00305        if (chown(ctdl_data_dir, CTDLUID, (-1)) != 0){
00306               syslog(LOG_EMERG, 
00307                            "unable to set the owner for [%s]: %s", 
00308                            ctdl_data_dir, strerror(errno));
00309        }
00310        syslog(LOG_DEBUG, "bdb(): Setting up DB environment\n");
00311        /* db_env_set_func_yield((int (*)(u_long,  u_long))sched_yield); */
00312        ret = db_env_create(&dbenv, 0);
00313        if (ret) {
00314               syslog(LOG_EMERG, "bdb(): db_env_create: %s\n", db_strerror(ret));
00315               syslog(LOG_EMERG, "exit code %d\n", ret);
00316               exit(CTDLEXIT_DB);
00317        }
00318        dbenv->set_errpfx(dbenv, "citserver");
00319        dbenv->set_paniccall(dbenv, dbpanic);
00320        dbenv->set_errcall(dbenv, cdb_verbose_err);
00321        dbenv->set_errpfx(dbenv, "ctdl");
00322 #if (DB_VERSION_MAJOR == 4) && (DB_VERSION_MINOR >= 3)
00323        dbenv->set_msgcall(dbenv, cdb_verbose_log);
00324 #endif
00325        dbenv->set_verbose(dbenv, DB_VERB_DEADLOCK, 1);
00326        dbenv->set_verbose(dbenv, DB_VERB_RECOVERY, 1);
00327 
00328        /*
00329         * We want to specify the shared memory buffer pool cachesize,
00330         * but everything else is the default.
00331         */
00332        ret = dbenv->set_cachesize(dbenv, 0, 64 * 1024, 0);
00333        if (ret) {
00334               syslog(LOG_EMERG, "bdb(): set_cachesize: %s\n", db_strerror(ret));
00335               dbenv->close(dbenv, 0);
00336               syslog(LOG_EMERG, "exit code %d\n", ret);
00337               exit(CTDLEXIT_DB);
00338        }
00339 
00340        if ((ret = dbenv->set_lk_detect(dbenv, DB_LOCK_DEFAULT))) {
00341               syslog(LOG_EMERG, "bdb(): set_lk_detect: %s\n", db_strerror(ret));
00342               dbenv->close(dbenv, 0);
00343               syslog(LOG_EMERG, "exit code %d\n", ret);
00344               exit(CTDLEXIT_DB);
00345        }
00346 
00347        flags = DB_CREATE | DB_INIT_MPOOL | DB_PRIVATE | DB_INIT_TXN | DB_INIT_LOCK | DB_THREAD | DB_RECOVER;
00348        syslog(LOG_DEBUG, "dbenv->open(dbenv, %s, %d, 0)\n", ctdl_data_dir, flags);
00349        ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
00350        if (ret == DB_RUNRECOVERY) {
00351               syslog(LOG_ALERT, "dbenv->open: %s\n", db_strerror(ret));
00352               syslog(LOG_ALERT, "Attempting recovery...\n");
00353               flags |= DB_RECOVER;
00354               ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
00355        }
00356        if (ret == DB_RUNRECOVERY) {
00357               syslog(LOG_ALERT, "dbenv->open: %s\n", db_strerror(ret));
00358               syslog(LOG_ALERT, "Attempting catastrophic recovery...\n");
00359               flags &= ~DB_RECOVER;
00360               flags |= DB_RECOVER_FATAL;
00361               ret = dbenv->open(dbenv, ctdl_data_dir, flags, 0);
00362        }
00363        if (ret) {
00364               syslog(LOG_EMERG, "dbenv->open: %s\n", db_strerror(ret));
00365               dbenv->close(dbenv, 0);
00366               syslog(LOG_EMERG, "exit code %d\n", ret);
00367               exit(CTDLEXIT_DB);
00368        }
00369 
00370        syslog(LOG_INFO, "Starting up DB\n");
00371 
00372        for (i = 0; i < MAXCDB; ++i) {
00373 
00374               /* Create a database handle */
00375               ret = db_create(&dbp[i], dbenv, 0);
00376               if (ret) {
00377                      syslog(LOG_EMERG, "db_create: %s\n", db_strerror(ret));
00378                      syslog(LOG_EMERG, "exit code %d\n", ret);
00379                      exit(CTDLEXIT_DB);
00380               }
00381 
00382 
00383               /* Arbitrary names for our tables -- we reference them by
00384                * number, so we don't have string names for them.
00385                */
00386               snprintf(dbfilename, sizeof dbfilename, "cdb.%02x", i);
00387 
00388               ret = dbp[i]->open(dbp[i],
00389                                NULL,
00390                                dbfilename,
00391                                NULL,
00392                                DB_BTREE,
00393                                DB_CREATE | DB_AUTO_COMMIT | DB_THREAD,
00394                                0600
00395               );
00396               if (ret) {
00397                      syslog(LOG_EMERG, "db_open[%02x]: %s\n", i, db_strerror(ret));
00398                      if (ret == ENOMEM) {
00399                             syslog(LOG_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php?id=faq:troubleshooting:out_of_lock_entries for more information.");
00400                      }
00401                      syslog(LOG_EMERG, "exit code %d\n", ret);
00402                      exit(CTDLEXIT_DB);
00403               }
00404        }
00405 
00406 }
00407 
00408 
00409 /* Make sure we own all the files, because in a few milliseconds
00410  * we're going to drop root privs.
00411  */
00412 void cdb_chmod_data(void) {
00413        DIR *dp;
00414        struct dirent *d;
00415        char filename[PATH_MAX];
00416 
00417        dp = opendir(ctdl_data_dir);
00418        if (dp != NULL) {
00419               while (d = readdir(dp), d != NULL) {
00420                      if (d->d_name[0] != '.') {
00421                             snprintf(filename, sizeof filename,
00422                                     "%s/%s", ctdl_data_dir, d->d_name);
00423                             syslog(LOG_DEBUG, "chmod(%s, 0600) returned %d\n",
00424                                    filename, chmod(filename, 0600)
00425                             );
00426                             syslog(LOG_DEBUG, "chown(%s, CTDLUID, -1) returned %d\n",
00427                                    filename, chown(filename, CTDLUID, (-1))
00428                             );
00429                      }
00430               }
00431               closedir(dp);
00432        }
00433 
00434        syslog(LOG_DEBUG, "open_databases() finished\n");
00435        CtdlRegisterProtoHook(cmd_cull, "CULL", "Cull database logs");
00436 }
00437 
00438 
00439 /*
00440  * Close all of the db database files we've opened.  This can be done
00441  * in a loop, since it's just a bunch of closes.
00442  */
00443 void close_databases(void)
00444 {
00445        int a;
00446        int ret;
00447 
00448        if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0))) {
00449               syslog(LOG_EMERG,
00450                      "txn_checkpoint: %s\n", db_strerror(ret));
00451        }
00452 
00453        /* print some statistics... */
00454 #ifdef DB_STAT_ALL
00455        dbenv->lock_stat_print(dbenv, DB_STAT_ALL);
00456 #endif
00457 
00458        /* close the tables */
00459        for (a = 0; a < MAXCDB; ++a) {
00460               syslog(LOG_INFO, "Closing database %02x\n", a);
00461               ret = dbp[a]->close(dbp[a], 0);
00462               if (ret) {
00463                      syslog(LOG_EMERG, "db_close: %s\n", db_strerror(ret));
00464               }
00465 
00466        }
00467 
00468        /* Close the handle. */
00469        ret = dbenv->close(dbenv, 0);
00470        if (ret) {
00471               syslog(LOG_EMERG, "DBENV->close: %s\n", db_strerror(ret));
00472        }
00473 }
00474 
00475 
00476 /*
00477  * Decompress a database item if it was compressed on disk
00478  */
00479 void cdb_decompress_if_necessary(struct cdbdata *cdb)
00480 {
00481        static int magic = COMPRESS_MAGIC;
00482 
00483        if ((cdb == NULL) || 
00484            (cdb->ptr == NULL) || 
00485            (cdb->len < sizeof(magic)) ||
00486            (memcmp(cdb->ptr, &magic, sizeof(magic))))
00487            return;
00488 
00489        /* At this point we know we're looking at a compressed item. */
00490 
00491        struct CtdlCompressHeader zheader;
00492        char *uncompressed_data;
00493        char *compressed_data;
00494        uLongf destLen, sourceLen;
00495        size_t cplen;
00496 
00497        memset(&zheader, 0, sizeof(struct CtdlCompressHeader));
00498        cplen = sizeof(struct CtdlCompressHeader);
00499        if (sizeof(struct CtdlCompressHeader) > cdb->len)
00500               cplen = cdb->len;
00501        memcpy(&zheader, cdb->ptr, cplen);
00502 
00503        compressed_data = cdb->ptr;
00504        compressed_data += sizeof(struct CtdlCompressHeader);
00505 
00506        sourceLen = (uLongf) zheader.compressed_len;
00507        destLen = (uLongf) zheader.uncompressed_len;
00508        uncompressed_data = malloc(zheader.uncompressed_len);
00509 
00510        if (uncompress((Bytef *) uncompressed_data,
00511                      (uLongf *) & destLen,
00512                      (const Bytef *) compressed_data,
00513                      (uLong) sourceLen) != Z_OK) {
00514               syslog(LOG_EMERG, "uncompress() error\n");
00515               cdb_abort();
00516        }
00517 
00518        free(cdb->ptr);
00519        cdb->len = (size_t) destLen;
00520        cdb->ptr = uncompressed_data;
00521 }
00522 
00523 
00524 
00525 /*
00526  * Store a piece of data.  Returns 0 if the operation was successful.  If a
00527  * key already exists it should be overwritten.
00528  */
00529 int cdb_store(int cdb, const void *ckey, int ckeylen, void *cdata, int cdatalen)
00530 {
00531 
00532        DBT dkey, ddata;
00533        DB_TXN *tid;
00534        int ret = 0;
00535 
00536        struct CtdlCompressHeader zheader;
00537        char *compressed_data = NULL;
00538        int compressing = 0;
00539        size_t buffer_len = 0;
00540        uLongf destLen = 0;
00541 
00542        memset(&dkey, 0, sizeof(DBT));
00543        memset(&ddata, 0, sizeof(DBT));
00544        dkey.size = ckeylen;
00545        /* no, we don't care for this error. */
00546        dkey.data = ckey;
00547 
00548        ddata.size = cdatalen;
00549        ddata.data = cdata;
00550 
00551        /* Only compress Visit records.  Everything else is uncompressed. */
00552        if (cdb == CDB_VISIT) {
00553               compressing = 1;
00554               zheader.magic = COMPRESS_MAGIC;
00555               zheader.uncompressed_len = cdatalen;
00556               buffer_len = ((cdatalen * 101) / 100) + 100
00557                   + sizeof(struct CtdlCompressHeader);
00558               destLen = (uLongf) buffer_len;
00559               compressed_data = malloc(buffer_len);
00560               if (compress2((Bytef *) (compressed_data + sizeof(struct CtdlCompressHeader)),
00561                      &destLen, (Bytef *) cdata, (uLongf) cdatalen, 1) != Z_OK)
00562               {
00563                      syslog(LOG_EMERG, "compress2() error\n");
00564                      cdb_abort();
00565               }
00566               zheader.compressed_len = (size_t) destLen;
00567               memcpy(compressed_data, &zheader, sizeof(struct CtdlCompressHeader));
00568               ddata.size = (size_t) (sizeof(struct CtdlCompressHeader) + zheader.compressed_len);
00569               ddata.data = compressed_data;
00570        }
00571 
00572        if (TSD->tid != NULL) {
00573               ret = dbp[cdb]->put(dbp[cdb],      /* db */
00574                                 TSD->tid, /* transaction ID */
00575                                 &dkey,    /* key */
00576                                 &ddata,   /* data */
00577                                 0);       /* flags */
00578               if (ret) {
00579                      syslog(LOG_EMERG, "cdb_store(%d): %s", cdb, db_strerror(ret));
00580                      cdb_abort();
00581               }
00582               if (compressing)
00583                      free(compressed_data);
00584               return ret;
00585 
00586        } else {
00587               bailIfCursor(TSD->cursors, "attempt to write during r/o cursor");
00588 
00589              retry:
00590               txbegin(&tid);
00591 
00592               if ((ret = dbp[cdb]->put(dbp[cdb], /* db */
00593                                     tid,  /* transaction ID */
00594                                     &dkey,       /* key */
00595                                     &ddata,      /* data */
00596                                     0))) {       /* flags */
00597                      if (ret == DB_LOCK_DEADLOCK) {
00598                             txabort(tid);
00599                             goto retry;
00600                      } else {
00601                             syslog(LOG_EMERG, "cdb_store(%d): %s", cdb, db_strerror(ret));
00602                             cdb_abort();
00603                      }
00604               } else {
00605                      txcommit(tid);
00606                      if (compressing) {
00607                             free(compressed_data);
00608                      }
00609                      return ret;
00610               }
00611        }
00612        return ret;
00613 }
00614 
00615 
00616 /*
00617  * Delete a piece of data.  Returns 0 if the operation was successful.
00618  */
00619 int cdb_delete(int cdb, void *key, int keylen)
00620 {
00621 
00622        DBT dkey;
00623        DB_TXN *tid;
00624        int ret;
00625 
00626        memset(&dkey, 0, sizeof dkey);
00627        dkey.size = keylen;
00628        dkey.data = key;
00629 
00630        if (TSD->tid != NULL) {
00631               ret = dbp[cdb]->del(dbp[cdb], TSD->tid, &dkey, 0);
00632               if (ret) {
00633                      syslog(LOG_EMERG, "cdb_delete(%d): %s\n", cdb, db_strerror(ret));
00634                      if (ret != DB_NOTFOUND) {
00635                             cdb_abort();
00636                      }
00637               }
00638        } else {
00639               bailIfCursor(TSD->cursors, "attempt to delete during r/o cursor");
00640 
00641              retry:
00642               txbegin(&tid);
00643 
00644               if ((ret = dbp[cdb]->del(dbp[cdb], tid, &dkey, 0))
00645                   && ret != DB_NOTFOUND) {
00646                      if (ret == DB_LOCK_DEADLOCK) {
00647                             txabort(tid);
00648                             goto retry;
00649                      } else {
00650                             syslog(LOG_EMERG, "cdb_delete(%d): %s\n",
00651                                    cdb, db_strerror(ret));
00652                             cdb_abort();
00653                      }
00654               } else {
00655                      txcommit(tid);
00656               }
00657        }
00658        return ret;
00659 }
00660 
00661 static DBC *localcursor(int cdb)
00662 {
00663        int ret;
00664        DBC *curs;
00665 
00666        if (TSD->cursors[cdb] == NULL)
00667               ret = dbp[cdb]->cursor(dbp[cdb], TSD->tid, &curs, 0);
00668        else
00669               ret = TSD->cursors[cdb]->c_dup(TSD->cursors[cdb], &curs, DB_POSITION);
00670 
00671        if (ret) {
00672               syslog(LOG_EMERG, "localcursor: %s\n", db_strerror(ret));
00673               cdb_abort();
00674        }
00675 
00676        return curs;
00677 }
00678 
00679 
00680 /*
00681  * Fetch a piece of data.  If not found, returns NULL.  Otherwise, it returns
00682  * a struct cdbdata which it is the caller's responsibility to free later on
00683  * using the cdb_free() routine.
00684  */
00685 struct cdbdata *cdb_fetch(int cdb, const void *key, int keylen)
00686 {
00687        struct cdbdata *tempcdb;
00688        DBT dkey, dret;
00689        int ret;
00690 
00691        memset(&dkey, 0, sizeof(DBT));
00692        dkey.size = keylen;
00693        /* no we don't care about this error. */
00694        dkey.data = key;
00695 
00696        if (TSD->tid != NULL) {
00697               memset(&dret, 0, sizeof(DBT));
00698               dret.flags = DB_DBT_MALLOC;
00699               ret = dbp[cdb]->get(dbp[cdb], TSD->tid, &dkey, &dret, 0);
00700        } else {
00701               DBC *curs;
00702 
00703               do {
00704                      memset(&dret, 0, sizeof(DBT));
00705                      dret.flags = DB_DBT_MALLOC;
00706 
00707                      curs = localcursor(cdb);
00708 
00709                      ret = curs->c_get(curs, &dkey, &dret, DB_SET);
00710                      cclose(curs);
00711               }
00712               while (ret == DB_LOCK_DEADLOCK);
00713 
00714        }
00715 
00716        if ((ret != 0) && (ret != DB_NOTFOUND)) {
00717               syslog(LOG_EMERG, "cdb_fetch(%d): %s\n", cdb, db_strerror(ret));
00718               cdb_abort();
00719        }
00720 
00721        if (ret != 0)
00722               return NULL;
00723        tempcdb = (struct cdbdata *) malloc(sizeof(struct cdbdata));
00724 
00725        if (tempcdb == NULL) {
00726               syslog(LOG_EMERG, "cdb_fetch: Cannot allocate memory for tempcdb\n");
00727               cdb_abort();
00728               return NULL; /* make it easier for static analysis... */
00729        }
00730        else
00731        {
00732               tempcdb->len = dret.size;
00733               tempcdb->ptr = dret.data;
00734               cdb_decompress_if_necessary(tempcdb);
00735               return (tempcdb);
00736        }
00737 }
00738 
00739 
00740 /*
00741  * Free a cdbdata item.
00742  *
00743  * Note that we only free the 'ptr' portion if it is not NULL.  This allows
00744  * other code to assume ownership of that memory simply by storing the
00745  * pointer elsewhere and then setting 'ptr' to NULL.  cdb_free() will then
00746  * avoid freeing it.
00747  */
00748 void cdb_free(struct cdbdata *cdb)
00749 {
00750        if (cdb->ptr) {
00751               free(cdb->ptr);
00752        }
00753        free(cdb);
00754 }
00755 
00756 void cdb_close_cursor(int cdb)
00757 {
00758        if (TSD->cursors[cdb] != NULL) {
00759               cclose(TSD->cursors[cdb]);
00760        }
00761 
00762        TSD->cursors[cdb] = NULL;
00763 }
00764 
00765 /* 
00766  * Prepare for a sequential search of an entire database.
00767  * (There is guaranteed to be no more than one traversal in
00768  * progress per thread at any given time.)
00769  */
00770 void cdb_rewind(int cdb)
00771 {
00772        int ret = 0;
00773 
00774        if (TSD->cursors[cdb] != NULL) {
00775               syslog(LOG_EMERG,
00776                      "cdb_rewind: must close cursor on database %d before reopening.\n", cdb);
00777               cdb_abort();
00778               /* cclose(TSD->cursors[cdb]); */
00779        }
00780 
00781        /*
00782         * Now initialize the cursor
00783         */
00784        ret = dbp[cdb]->cursor(dbp[cdb], TSD->tid, &TSD->cursors[cdb], 0);
00785        if (ret) {
00786               syslog(LOG_EMERG, "cdb_rewind: db_cursor: %s\n", db_strerror(ret));
00787               cdb_abort();
00788        }
00789 }
00790 
00791 
00792 /*
00793  * Fetch the next item in a sequential search.  Returns a pointer to a 
00794  * cdbdata structure, or NULL if we've hit the end.
00795  */
00796 struct cdbdata *cdb_next_item(int cdb)
00797 {
00798        DBT key, data;
00799        struct cdbdata *cdbret;
00800        int ret = 0;
00801 
00802        /* Initialize the key/data pair so the flags aren't set. */
00803        memset(&key, 0, sizeof(key));
00804        memset(&data, 0, sizeof(data));
00805        data.flags = DB_DBT_MALLOC;
00806 
00807        ret = TSD->cursors[cdb]->c_get(TSD->cursors[cdb], &key, &data, DB_NEXT);
00808 
00809        if (ret) {
00810               if (ret != DB_NOTFOUND) {
00811                      syslog(LOG_EMERG, "cdb_next_item(%d): %s\n", cdb, db_strerror(ret));
00812                      cdb_abort();
00813               }
00814               cdb_close_cursor(cdb);
00815               return NULL;  /* presumably, end of file */
00816        }
00817 
00818        cdbret = (struct cdbdata *) malloc(sizeof(struct cdbdata));
00819        cdbret->len = data.size;
00820        cdbret->ptr = data.data;
00821        cdb_decompress_if_necessary(cdbret);
00822 
00823        return (cdbret);
00824 }
00825 
00826 
00827 
00828 /*
00829  * Transaction-based stuff.  I'm writing this as I bake cookies...
00830  */
00831 
00832 void cdb_begin_transaction(void)
00833 {
00834 
00835        bailIfCursor(TSD->cursors, "can't begin transaction during r/o cursor");
00836 
00837        if (TSD->tid != NULL) {
00838               syslog(LOG_EMERG, "cdb_begin_transaction: ERROR: nested transaction\n");
00839               cdb_abort();
00840        }
00841 
00842        txbegin(&TSD->tid);
00843 }
00844 
00845 void cdb_end_transaction(void)
00846 {
00847        int i;
00848 
00849        for (i = 0; i < MAXCDB; i++)
00850               if (TSD->cursors[i] != NULL) {
00851                      syslog(LOG_WARNING,
00852                             "cdb_end_transaction: WARNING: cursor %d still open at transaction end\n",
00853                             i);
00854                      cclose(TSD->cursors[i]);
00855                      TSD->cursors[i] = NULL;
00856               }
00857 
00858        if (TSD->tid == NULL) {
00859               syslog(LOG_EMERG,
00860                      "cdb_end_transaction: ERROR: txcommit(NULL) !!\n");
00861               cdb_abort();
00862        } else {
00863               txcommit(TSD->tid);
00864        }
00865 
00866        TSD->tid = NULL;
00867 }
00868 
00869 /*
00870  * Truncate (delete every record)
00871  */
00872 void cdb_trunc(int cdb)
00873 {
00874        /* DB_TXN *tid; */
00875        int ret;
00876        u_int32_t count;
00877 
00878        if (TSD->tid != NULL) {
00879               syslog(LOG_EMERG, "cdb_trunc must not be called in a transaction.");
00880               cdb_abort();
00881        } else {
00882               bailIfCursor(TSD->cursors, "attempt to write during r/o cursor");
00883 
00884              retry:
00885               /* txbegin(&tid); */
00886 
00887               if ((ret = dbp[cdb]->truncate(dbp[cdb],   /* db */
00888                                          NULL,   /* transaction ID */
00889                                          &count, /* #rows deleted */
00890                                          0))) {  /* flags */
00891                      if (ret == DB_LOCK_DEADLOCK) {
00892                             /* txabort(tid); */
00893                             goto retry;
00894                      } else {
00895                             syslog(LOG_EMERG, "cdb_truncate(%d): %s\n", cdb, db_strerror(ret));
00896                             if (ret == ENOMEM) {
00897                                    syslog(LOG_EMERG, "You may need to tune your database; please read http://www.citadel.org/doku.php?id=faq:troubleshooting:out_of_lock_entries for more information.");
00898                             }
00899                             exit(CTDLEXIT_DB);
00900                      }
00901               } else {
00902                      /* txcommit(tid); */
00903               }
00904        }
00905 }