Back to index

citadel  8.12
serv_fulltext.c
Go to the documentation of this file.
00001 /*
00002  * This module handles fulltext indexing of the message base.
00003  * Copyright (c) 2005-2011 by the citadel.org team
00004  *
00005  * This program is open source software; you can redistribute it and/or
00006  * modify it under the terms of the GNU General Public License as published
00007  * by the Free Software Foundation; either version 3 of the License, or
00008  * (at your option) any later version.
00009  *
00010  * This program is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  * GNU General Public License for more details.
00014  *
00015  * You should have received a copy of the GNU General Public License
00016  * along with this program; if not, write to the Free Software
00017  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
00018  */
00019 
00020 #include "sysdep.h"
00021 #include <stdlib.h>
00022 #include <unistd.h>
00023 #include <stdio.h>
00024 #include <fcntl.h>
00025 #include <signal.h>
00026 #include <pwd.h>
00027 #include <errno.h>
00028 #include <sys/types.h>
00029 
00030 #if TIME_WITH_SYS_TIME
00031 # include <sys/time.h>
00032 # include <time.h>
00033 #else
00034 # if HAVE_SYS_TIME_H
00035 #  include <sys/time.h>
00036 # else
00037 #  include <time.h>
00038 # endif
00039 #endif
00040 
00041 #include <sys/wait.h>
00042 #include <string.h>
00043 #include <limits.h>
00044 #include <libcitadel.h>
00045 #include "citadel.h"
00046 #include "server.h"
00047 #include "citserver.h"
00048 #include "support.h"
00049 #include "config.h"
00050 #include "database.h"
00051 #include "msgbase.h"
00052 #include "control.h"
00053 #include "serv_fulltext.h"
00054 #include "ft_wordbreaker.h"
00055 #include "threads.h"
00056 #include "context.h"
00057 
00058 #include "ctdl_module.h"
00059 
00060 long ft_newhighest = 0L;
00061 long *ft_newmsgs = NULL;
00062 int ft_num_msgs = 0;
00063 int ft_num_alloc = 0;
00064 
00065 int ftc_num_msgs[65536];
00066 long *ftc_msgs[65536];
00067 
00068 
00069 /*
00070  * Compare function
00071  */
00072 int longcmp(const void *rec1, const void *rec2) {
00073        long i1, i2;
00074 
00075        i1 = *(const long *)rec1;
00076        i2 = *(const long *)rec2;
00077 
00078        if (i1 > i2) return(1);
00079        if (i1 < i2) return(-1);
00080        return(0);
00081 }
00082 
00083 /*
00084  * Flush our index cache out to disk.
00085  */
00086 void ft_flush_cache(void) {
00087        int i;
00088        time_t last_update = 0;
00089 
00090        for (i=0; i<65536; ++i) {
00091               if ((time(NULL) - last_update) >= 10) {
00092                      syslog(LOG_INFO,
00093                             "Flushing index cache to disk (%d%% complete)",
00094                             (i * 100 / 65536)
00095                      );
00096                      last_update = time(NULL);
00097               }
00098               if (ftc_msgs[i] != NULL) {
00099                      cdb_store(CDB_FULLTEXT, &i, sizeof(int), ftc_msgs[i],
00100                             (ftc_num_msgs[i] * sizeof(long)));
00101                      ftc_num_msgs[i] = 0;
00102                      free(ftc_msgs[i]);
00103                      ftc_msgs[i] = NULL;
00104               }
00105        }
00106        syslog(LOG_INFO, "Flushed index cache to disk (100%% complete)");
00107 }
00108 
00109 
00110 /*
00111  * Index or de-index a message.  (op == 1 to index, 0 to de-index)
00112  */
00113 void ft_index_message(long msgnum, int op) {
00114        int num_tokens = 0;
00115        int *tokens = NULL;
00116        int i, j;
00117        struct cdbdata *cdb_bucket;
00118        StrBuf *msgtext;
00119        char *txt;
00120        int tok;
00121        struct CtdlMessage *msg = NULL;
00122 
00123        msg = CtdlFetchMessage(msgnum, 1);
00124        if (msg == NULL) {
00125               syslog(LOG_ERR, "ft_index_message() could not load msg %ld", msgnum);
00126               return;
00127        }
00128 
00129        if (msg->cm_fields['1'] != NULL) {
00130               syslog(LOG_DEBUG, "ft_index_message() excluded msg %ld", msgnum);
00131               CtdlFreeMessage(msg);
00132               return;
00133        }
00134 
00135        syslog(LOG_DEBUG, "ft_index_message() %s msg %ld", (op ? "adding" : "removing") , msgnum);
00136 
00137        /* Output the message as text before indexing it, so we don't end up
00138         * indexing a bunch of encoded base64, etc.
00139         */
00140        CC->redirect_buffer = NewStrBufPlain(NULL, SIZ);
00141        CtdlOutputPreLoadedMsg(msg, MT_CITADEL, HEADERS_ALL, 0, 1, 0);
00142        CtdlFreeMessage(msg);
00143        msgtext = CC->redirect_buffer;
00144        CC->redirect_buffer = NULL;
00145        syslog(LOG_DEBUG, "Wordbreaking message %ld...", msgnum);
00146        if ((msgtext == NULL) || (StrLength(msgtext) == 0)) {
00147               syslog(LOG_ALERT, "This message has a zero length.  Probable data corruption.");
00148        }
00149        txt = SmashStrBuf(&msgtext);
00150        wordbreaker(txt, &num_tokens, &tokens);
00151        free(txt);
00152 
00153        syslog(LOG_DEBUG, "Indexing message %ld [%d tokens]", msgnum, num_tokens);
00154        if (num_tokens > 0) {
00155               for (i=0; i<num_tokens; ++i) {
00156 
00157                      /* Add the message to the relevant token bucket */
00158 
00159                      /* search for tokens[i] */
00160                      tok = tokens[i];
00161 
00162                      if ( (tok >= 0) && (tok <= 65535) ) {
00163                             /* fetch the bucket, Liza */
00164                             if (ftc_msgs[tok] == NULL) {
00165                                    cdb_bucket = cdb_fetch(CDB_FULLTEXT, &tok, sizeof(int));
00166                                    if (cdb_bucket != NULL) {
00167                                           ftc_num_msgs[tok] = cdb_bucket->len / sizeof(long);
00168                                           ftc_msgs[tok] = (long *)cdb_bucket->ptr;
00169                                           cdb_bucket->ptr = NULL;
00170                                           cdb_free(cdb_bucket);
00171                                    }
00172                                    else {
00173                                           ftc_num_msgs[tok] = 0;
00174                                           ftc_msgs[tok] = malloc(sizeof(long));
00175                                    }
00176                             }
00177        
00178        
00179                             if (op == 1) {       /* add to index */
00180                                    ++ftc_num_msgs[tok];
00181                                    ftc_msgs[tok] = realloc(ftc_msgs[tok],
00182                                                         ftc_num_msgs[tok]*sizeof(long));
00183                                    ftc_msgs[tok][ftc_num_msgs[tok] - 1] = msgnum;
00184                             }
00185        
00186                             if (op == 0) {       /* remove from index */
00187                                    if (ftc_num_msgs[tok] >= 1) {
00188                                           for (j=0; j<ftc_num_msgs[tok]; ++j) {
00189                                                  if (ftc_msgs[tok][j] == msgnum) {
00190                                                         memmove(&ftc_msgs[tok][j], &ftc_msgs[tok][j+1], ((ftc_num_msgs[tok] - j - 1)*sizeof(long)));
00191                                                         --ftc_num_msgs[tok];
00192                                                         --j;
00193                                                  }
00194                                           }
00195                                    }
00196                             }
00197                      }
00198                      else {
00199                             syslog(LOG_ALERT, "Invalid token %d !!", tok);
00200                      }
00201               }
00202 
00203               free(tokens);
00204        }
00205 }
00206 
00207 
00208 
00209 /*
00210  * Add a message to the list of those to be indexed.
00211  */
00212 void ft_index_msg(long msgnum, void *userdata) {
00213 
00214        if ((msgnum > CitControl.MMfulltext) && (msgnum <= ft_newhighest)) {
00215               ++ft_num_msgs;
00216               if (ft_num_msgs > ft_num_alloc) {
00217                      ft_num_alloc += 1024;
00218                      ft_newmsgs = realloc(ft_newmsgs,
00219                             (ft_num_alloc * sizeof(long)));
00220               }
00221               ft_newmsgs[ft_num_msgs - 1] = msgnum;
00222        }
00223 
00224 }
00225 
00226 /*
00227  * Scan a room for messages to index.
00228  */
00229 void ft_index_room(struct ctdlroom *qrbuf, void *data)
00230 {
00231        if (server_shutting_down)
00232               return;
00233               
00234        CtdlGetRoom(&CC->room, qrbuf->QRname);
00235        CtdlForEachMessage(MSGS_ALL, 0L, NULL, NULL, NULL, ft_index_msg, NULL);
00236 }
00237 
00238 
00239 /*
00240  * Begin the fulltext indexing process.
00241  */
00242 void do_fulltext_indexing(void) {
00243        int i;
00244        static time_t last_index = 0L;
00245        static time_t last_progress = 0L;
00246        time_t run_time = 0L;
00247        time_t end_time = 0L;
00248        static int is_running = 0;
00249        if (is_running) return;         /* Concurrency check - only one can run */
00250        is_running = 1;
00251        
00252        /*
00253         * Don't do this if the site doesn't have it enabled.
00254         */
00255        if (!config.c_enable_fulltext) {
00256               return;
00257        }
00258 
00259        /*
00260         * Make sure we don't run the indexer too frequently.
00261         * FIXME move the setting into config
00262         */
00263  
00264        if ( (time(NULL) - last_index) < 300L) {
00265               return;
00266        }
00267 
00268        /*
00269         * Check to see whether the fulltext index is up to date; if there
00270         * are no messages to index, don't waste any more time trying.
00271         */
00272        if ((CitControl.MMfulltext >= CitControl.MMhighest) && (CitControl.fulltext_wordbreaker == FT_WORDBREAKER_ID)) {
00273               return;              /* nothing to do! */
00274        }
00275        
00276        run_time = time(NULL);
00277        syslog(LOG_DEBUG, "do_fulltext_indexing() started (%ld)", run_time);
00278        
00279        /*
00280         * If we've switched wordbreaker modules, burn the index and start
00281         * over.
00282         */
00283        begin_critical_section(S_CONTROL);
00284        if (CitControl.fulltext_wordbreaker != FT_WORDBREAKER_ID) {
00285               syslog(LOG_DEBUG, "wb ver on disk = %d, code ver = %d",
00286                      CitControl.fulltext_wordbreaker, FT_WORDBREAKER_ID
00287               );
00288               syslog(LOG_INFO, "(re)initializing full text index");
00289               cdb_trunc(CDB_FULLTEXT);
00290               CitControl.MMfulltext = 0L;
00291               put_control();
00292        }
00293        end_critical_section(S_CONTROL);
00294 
00295        /*
00296         * Now go through each room and find messages to index.
00297         */
00298        ft_newhighest = CitControl.MMhighest;
00299        CtdlForEachRoom(ft_index_room, NULL);     /* load all msg pointers */
00300 
00301        if (ft_num_msgs > 0) {
00302               qsort(ft_newmsgs, ft_num_msgs, sizeof(long), longcmp);
00303               for (i=0; i<(ft_num_msgs-1); ++i) { /* purge dups */
00304                      if (ft_newmsgs[i] == ft_newmsgs[i+1]) {
00305                             memmove(&ft_newmsgs[i], &ft_newmsgs[i+1],
00306                                    ((ft_num_msgs - i - 1)*sizeof(long)));
00307                             --ft_num_msgs;
00308                             --i;
00309                      }
00310               }
00311 
00312               /* Here it is ... do each message! */
00313               for (i=0; i<ft_num_msgs; ++i) {
00314                      if (time(NULL) != last_progress) {
00315                             syslog(LOG_DEBUG,
00316                                    "Indexed %d of %d messages (%d%%)",
00317                                           i, ft_num_msgs,
00318                                           ((i*100) / ft_num_msgs)
00319                             );
00320                             last_progress = time(NULL);
00321                      }
00322                      ft_index_message(ft_newmsgs[i], 1);
00323 
00324                      /* Check to see if we need to quit early */
00325                      if (server_shutting_down) {
00326                             syslog(LOG_DEBUG, "Indexer quitting early");
00327                             ft_newhighest = ft_newmsgs[i];
00328                             break;
00329                      }
00330 
00331                      /* Check to see if we have to maybe flush to disk */
00332                      if (i >= FT_MAX_CACHE) {
00333                             syslog(LOG_DEBUG, "Time to flush.");
00334                             ft_newhighest = ft_newmsgs[i];
00335                             break;
00336                      }
00337 
00338               }
00339 
00340               free(ft_newmsgs);
00341               ft_num_msgs = 0;
00342               ft_num_alloc = 0;
00343               ft_newmsgs = NULL;
00344        }
00345        end_time = time(NULL);
00346 
00347        if (server_shutting_down) {
00348               is_running = 0;
00349               return;
00350        }
00351        
00352        syslog(LOG_DEBUG, "do_fulltext_indexing() duration (%ld)", end_time - run_time);
00353               
00354        /* Save our place so we don't have to do this again */
00355        ft_flush_cache();
00356        begin_critical_section(S_CONTROL);
00357        CitControl.MMfulltext = ft_newhighest;
00358        CitControl.fulltext_wordbreaker = FT_WORDBREAKER_ID;
00359        put_control();
00360        end_critical_section(S_CONTROL);
00361        last_index = time(NULL);
00362 
00363        syslog(LOG_DEBUG, "do_fulltext_indexing() finished");
00364        is_running = 0;
00365        return;
00366 }
00367 
00368 
00369 
00370 /*
00371  * API call to perform searches.
00372  * (This one does the "all of these words" search.)
00373  * Caller is responsible for freeing the message list.
00374  */
00375 void ft_search(int *fts_num_msgs, long **fts_msgs, const char *search_string) {
00376        int num_tokens = 0;
00377        int *tokens = NULL;
00378        int i, j;
00379        struct cdbdata *cdb_bucket;
00380        int num_all_msgs = 0;
00381        long *all_msgs = NULL;
00382        int num_ret_msgs = 0;
00383        int num_ret_alloc = 0;
00384        long *ret_msgs = NULL;
00385        int tok;
00386 
00387        wordbreaker(search_string, &num_tokens, &tokens);
00388        if (num_tokens > 0) {
00389               for (i=0; i<num_tokens; ++i) {
00390 
00391                      /* search for tokens[i] */
00392                      tok = tokens[i];
00393 
00394                      /* fetch the bucket, Liza */
00395                      if (ftc_msgs[tok] == NULL) {
00396                             cdb_bucket = cdb_fetch(CDB_FULLTEXT, &tok, sizeof(int));
00397                             if (cdb_bucket != NULL) {
00398                                    ftc_num_msgs[tok] = cdb_bucket->len / sizeof(long);
00399                                    ftc_msgs[tok] = (long *)cdb_bucket->ptr;
00400                                    cdb_bucket->ptr = NULL;
00401                                    cdb_free(cdb_bucket);
00402                             }
00403                             else {
00404                                    ftc_num_msgs[tok] = 0;
00405                                    ftc_msgs[tok] = malloc(sizeof(long));
00406                             }
00407                      }
00408 
00409                      num_all_msgs += ftc_num_msgs[tok];
00410                      if (num_all_msgs > 0) {
00411                             all_msgs = realloc(all_msgs, num_all_msgs*sizeof(long) );
00412                             memcpy(&all_msgs[num_all_msgs-ftc_num_msgs[tok]],
00413                                    ftc_msgs[tok], ftc_num_msgs[tok]*sizeof(long) );
00414                      }
00415 
00416               }
00417               free(tokens);
00418               if (all_msgs != NULL) {
00419                      qsort(all_msgs, num_all_msgs, sizeof(long), longcmp);
00420 
00421                      /*
00422                       * At this point, if a message appears num_tokens times in the
00423                       * list, then it contains all of the search tokens.
00424                       */
00425                      if (num_all_msgs >= num_tokens)
00426                             for (j=0; j<(num_all_msgs-num_tokens+1); ++j) {
00427                                    if (all_msgs[j] == all_msgs[j+num_tokens-1]) {
00428                                           
00429                                           ++num_ret_msgs;
00430                                           if (num_ret_msgs > num_ret_alloc) {
00431                                                  num_ret_alloc += 64;
00432                                                  ret_msgs = realloc(ret_msgs,
00433                                                                   (num_ret_alloc*sizeof(long)) );
00434                                           }
00435                                           ret_msgs[num_ret_msgs - 1] = all_msgs[j];
00436                                           
00437                                    }
00438                             }
00439                      free(all_msgs);
00440               }
00441        }
00442 
00443        *fts_num_msgs = num_ret_msgs;
00444        *fts_msgs = ret_msgs;
00445 }
00446 
00447 
00448 /*
00449  * This search command is for diagnostic purposes and may be removed or replaced.
00450  */
00451 void cmd_srch(char *argbuf) {
00452        int num_msgs = 0;
00453        long *msgs = NULL;
00454        int i;
00455        char search_string[256];
00456 
00457        if (CtdlAccessCheck(ac_logged_in)) return;
00458 
00459        if (!config.c_enable_fulltext) {
00460               cprintf("%d Full text index is not enabled on this server.\n",
00461                      ERROR + CMD_NOT_SUPPORTED);
00462               return;
00463        }
00464 
00465        extract_token(search_string, argbuf, 0, '|', sizeof search_string);
00466        ft_search(&num_msgs, &msgs, search_string);
00467 
00468        cprintf("%d %d msgs match all search words:\n",
00469               LISTING_FOLLOWS, num_msgs);
00470        if (num_msgs > 0) {
00471               for (i=0; i<num_msgs; ++i) {
00472                      cprintf("%ld\n", msgs[i]);
00473               }
00474        }
00475        if (msgs != NULL) free(msgs);
00476        cprintf("000\n");
00477 }
00478 
00479 /*
00480  * Zero out our index cache.
00481  */
00482 void initialize_ft_cache(void) {
00483        memset(ftc_num_msgs, 0, (65536 * sizeof(int)));
00484        memset(ftc_msgs, 0, (65536 * sizeof(long *)));
00485 }
00486 
00487 
00488 void ft_delete_remove(char *room, long msgnum)
00489 {
00490        if (room) return;
00491        
00492        /* Remove from fulltext index */
00493        if (config.c_enable_fulltext) {
00494               ft_index_message(msgnum, 0);
00495        }
00496 }
00497 
00498 /*****************************************************************************/
00499 
00500 CTDL_MODULE_INIT(fulltext)
00501 {
00502        if (!threading)
00503        {
00504               initialize_ft_cache();
00505               initialize_noise_words();
00506               CtdlRegisterProtoHook(cmd_srch, "SRCH", "Full text search");
00507               CtdlRegisterDeleteHook(ft_delete_remove);
00508               CtdlRegisterSearchFuncHook(ft_search, "fulltext");
00509               CtdlRegisterCleanupHook(noise_word_cleanup);
00510               CtdlRegisterSessionHook(do_fulltext_indexing, EVT_TIMER, PRIO_CLEANUP + 300);
00511        }
00512        /* return our module name for the log */
00513        return "fulltext";
00514 }