Back to index

moin  1.9.0~rc2
indexing.py
Go to the documentation of this file.
00001 # -*- coding: iso-8859-1 -*-
00002 """
00003     MoinMoin - xapian search engine indexing
00004 
00005     @copyright: 2006-2009 MoinMoin:ThomasWaldmann,
00006                 2006 MoinMoin:FranzPletz,
00007                 2009 MoinMoin:DmitrijsMilajevs
00008     @license: GNU GPL, see COPYING for details.
00009 """
00010 
00011 import os, re
00012 import xapian
00013 
00014 from MoinMoin import log
00015 logging = log.getLogger(__name__)
00016 
00017 from MoinMoin.support import xappy
00018 from MoinMoin.search.builtin import BaseIndex
00019 from MoinMoin.search.Xapian.tokenizer import WikiAnalyzer
00020 from MoinMoin.util import filesys
00021 
00022 from MoinMoin.Page import Page
00023 from MoinMoin import config, wikiutil
00024 
00025 
00026 class Query(xapian.Query):
00027     pass
00028 
00029 
00030 class UnicodeQuery(xapian.Query):
00031     """ Xapian query object which automatically encodes unicode strings """
00032 
00033     def __init__(self, *args, **kwargs):
00034         """
00035         @keyword encoding: specify the encoding manually (default: value of config.charset)
00036         """
00037         self.encoding = kwargs.get('encoding', config.charset)
00038 
00039         nargs = []
00040         for term in args:
00041             if isinstance(term, unicode):
00042                 term = term.encode(self.encoding)
00043             elif isinstance(term, list) or isinstance(term, tuple):
00044                 term = [t.encode(self.encoding) for t in term]
00045             nargs.append(term)
00046 
00047         Query.__init__(self, *nargs, **kwargs)
00048 
00049 
00050 class MoinSearchConnection(xappy.SearchConnection):
00051 
00052     def get_all_documents(self, query=None):
00053         """
00054         Return all the documents in the index (that match query, if given).
00055         """
00056         document_count = self.get_doccount()
00057         query = query or self.query_all()
00058         hits = self.search(query, 0, document_count)
00059         return hits
00060 
00061     def get_all_documents_with_fields(self, **fields):
00062         """
00063         Return all the documents in the index (that match the field=value kwargs given).
00064         """
00065         field_queries = [self.query_field(field, value) for field, value in fields.iteritems()]
00066         query = self.query_composite(self.OP_AND, field_queries)
00067         return self.get_all_documents(query)
00068 
00069 
00070 XapianDatabaseLockError = xappy.XapianDatabaseLockError
00071 
00072 class MoinIndexerConnection(xappy.IndexerConnection):
00073 
00074     def __init__(self, *args, **kwargs):
00075         super(MoinIndexerConnection, self).__init__(*args, **kwargs)
00076         self._define_fields_actions()
00077 
00078     def _define_fields_actions(self):
00079         SORTABLE = xappy.FieldActions.SORTABLE
00080         INDEX_EXACT = xappy.FieldActions.INDEX_EXACT
00081         INDEX_FREETEXT = xappy.FieldActions.INDEX_FREETEXT
00082         STORE_CONTENT = xappy.FieldActions.STORE_CONTENT
00083 
00084         self.add_field_action('wikiname', INDEX_EXACT)
00085         self.add_field_action('wikiname', STORE_CONTENT)
00086         self.add_field_action('pagename', INDEX_EXACT)
00087         self.add_field_action('pagename', STORE_CONTENT)
00088         self.add_field_action('pagename', SORTABLE)
00089         self.add_field_action('attachment', INDEX_EXACT)
00090         self.add_field_action('attachment', STORE_CONTENT)
00091         self.add_field_action('mtime', INDEX_EXACT)
00092         self.add_field_action('mtime', STORE_CONTENT)
00093         self.add_field_action('revision', STORE_CONTENT)
00094         self.add_field_action('revision', INDEX_EXACT)
00095         self.add_field_action('mimetype', INDEX_EXACT)
00096         self.add_field_action('mimetype', STORE_CONTENT)
00097         self.add_field_action('title', INDEX_FREETEXT, weight=100)
00098         self.add_field_action('title', STORE_CONTENT)
00099         self.add_field_action('content', INDEX_FREETEXT, spell=True)
00100         self.add_field_action('domain', INDEX_EXACT)
00101         self.add_field_action('domain', STORE_CONTENT)
00102         self.add_field_action('lang', INDEX_EXACT)
00103         self.add_field_action('lang', STORE_CONTENT)
00104         self.add_field_action('stem_lang', INDEX_EXACT)
00105         self.add_field_action('author', INDEX_EXACT)
00106         self.add_field_action('linkto', INDEX_EXACT)
00107         self.add_field_action('linkto', STORE_CONTENT)
00108         self.add_field_action('category', INDEX_EXACT)
00109         self.add_field_action('category', STORE_CONTENT)
00110 
00111 
00112 class StemmedField(xappy.Field):
00113 
00114     def __init__(self, name, value, request):
00115         analyzer = WikiAnalyzer(request=request, language=request.cfg.language_default)
00116         value = ' '.join(unicode('%s %s' % (word, stemmed)).strip() for word, stemmed in analyzer.tokenize(value))
00117         super(StemmedField, self).__init__(name, value)
00118 
00119 
00120 class XapianIndex(BaseIndex):
00121 
00122     def __init__(self, request, name='index'):
00123         super(XapianIndex, self).__init__(request)
00124         self.db = os.path.join(self.main_dir, name)
00125 
00126     def _main_dir(self):
00127         """ Get the directory of the xapian index """
00128         if self.request.cfg.xapian_index_dir:
00129             return os.path.join(self.request.cfg.xapian_index_dir,
00130                     self.request.cfg.siteid)
00131         else:
00132             return os.path.join(self.request.cfg.cache_dir, 'xapian')
00133 
00134     def exists(self):
00135         """ Check if index exists """
00136         return os.path.exists(self.db)
00137 
00138     def mtime(self):
00139         """ Modification time of the index """
00140         return os.path.getmtime(self.db)
00141 
00142     def touch(self):
00143         """ Touch the index """
00144         filesys.touch(self.db)
00145 
00146     def get_search_connection(self):
00147         return MoinSearchConnection(self.db)
00148 
00149     def get_indexer_connection(self):
00150         return MoinIndexerConnection(self.db)
00151 
00152     def _search(self, query, sort='weight', historysearch=0):
00153         """
00154         Perform the search using xapian
00155 
00156         @param query: the search query objects
00157         @param sort: the sorting of the results (default: 'weight')
00158         @param historysearch: whether to search in all page revisions (default: 0) TODO: use/implement this
00159         """
00160         while True:
00161             try:
00162                 searcher, timestamp = self.request.cfg.xapian_searchers.pop()
00163                 if timestamp != self.mtime():
00164                     searcher.close()
00165                 else:
00166                     break
00167             except IndexError:
00168                 searcher = self.get_search_connection()
00169                 timestamp = self.mtime()
00170                 break
00171 
00172         # Refresh connection, since it may be outdated.
00173         searcher.reopen()
00174         query = query.xapian_term(self.request, searcher)
00175 
00176         # Get maximum possible amount of hits from xappy, which is number of documents in the index.
00177         document_count = searcher.get_doccount()
00178 
00179         kw = {}
00180         if sort == 'page_name':
00181             kw['sortby'] = 'pagename'
00182 
00183         hits = searcher.search(query, 0, document_count, **kw)
00184 
00185         self.request.cfg.xapian_searchers.append((searcher, timestamp))
00186         return hits
00187 
00188     def do_queued_updates(self, amount=-1):
00189         """ Index <amount> entries from the indexer queue.
00190 
00191             @param amount: amount of queue entries to process (default: -1 == all)
00192         """
00193         try:
00194             request = self._indexingRequest(self.request)
00195             connection = self.get_indexer_connection()
00196             self.touch()
00197             try:
00198                 done_count = 0
00199                 while amount:
00200                     # trick: if amount starts from -1, it will never get 0
00201                     amount -= 1
00202                     try:
00203                         pagename, attachmentname, revno = self.update_queue.get()
00204                     except IndexError:
00205                         # queue empty
00206                         break
00207                     else:
00208                         logging.debug("got from indexer queue: %r %r %r" % (pagename, attachmentname, revno))
00209                         if not attachmentname:
00210                             if revno is None:
00211                                 # generic "index this page completely, with attachments" request
00212                                 self._index_page(request, connection, pagename, mode='update')
00213                             else:
00214                                 # "index this page revision" request
00215                                 self._index_page_rev(request, connection, pagename, revno, mode='update')
00216                         else:
00217                             # "index this attachment" request
00218                             self._index_attachment(request, connection, pagename, attachmentname, mode='update')
00219                         done_count += 1
00220             finally:
00221                 logging.debug("updated xapian index with %d queued updates" % done_count)
00222                 connection.close()
00223         except XapianDatabaseLockError:
00224             # another indexer has locked the index, we can retry it later...
00225             logging.debug("can't lock xapian index, not doing queued updates now")
00226 
00227     def _get_document(self, connection, doc_id, mtime, mode):
00228         do_index = False
00229 
00230         if mode == 'update':
00231             try:
00232                 doc = connection.get_document(doc_id)
00233                 docmtime = long(doc.data['mtime'][0])
00234             except KeyError:
00235                 do_index = True
00236             else:
00237                 do_index = mtime > docmtime
00238         elif mode == 'add':
00239             do_index = True
00240         else:
00241             raise ValueError("mode must be 'update' or 'add'")
00242 
00243         if do_index:
00244             document = xappy.UnprocessedDocument()
00245             document.id = doc_id
00246         else:
00247             document = None
00248         return document
00249 
00250     def _add_fields_to_document(self, request, document, fields=None, multivalued_fields=None):
00251 
00252         fields_to_stem = ['title', 'content']
00253 
00254         if fields is None:
00255             fields = {}
00256         if multivalued_fields is None:
00257             multivalued_fields = {}
00258 
00259         for field, value in fields.iteritems():
00260             document.fields.append(xappy.Field(field, value))
00261             if field in fields_to_stem:
00262                 document.fields.append(StemmedField(field, value, request))
00263 
00264         for field, values in multivalued_fields.iteritems():
00265             for value in values:
00266                 document.fields.append(xappy.Field(field, value))
00267 
00268     def _get_languages(self, page):
00269         """ Get language of a page and the language to stem it in
00270 
00271         @param page: the page instance
00272         """
00273         lang = None
00274         default_lang = page.request.cfg.language_default
00275 
00276         # if we should stem, we check if we have a stemmer for the language available
00277         if page.request.cfg.xapian_stemming:
00278             lang = page.pi['language']
00279             try:
00280                 xapian.Stem(lang)
00281                 # if there is no exception, lang is stemmable
00282                 return (lang, lang)
00283             except xapian.InvalidArgumentError:
00284                 # lang is not stemmable
00285                 pass
00286 
00287         if not lang:
00288             # no lang found at all.. fallback to default language
00289             lang = default_lang
00290 
00291         # return actual lang and lang to stem in
00292         return (lang, default_lang)
00293 
00294     def _get_categories(self, page):
00295         """ Get all categories the page belongs to through the old regular expression
00296 
00297         @param page: the page instance
00298         """
00299         body = page.get_raw_body()
00300 
00301         prev, next = (0, 1)
00302         pos = 0
00303         while next:
00304             if next != 1:
00305                 pos += next.end()
00306             prev, next = next, re.search(r'-----*\s*\r?\n', body[pos:])
00307 
00308         if not prev or prev == 1:
00309             return []
00310         # for CategoryFoo, group 'all' matched CategoryFoo, group 'key' matched just Foo
00311         return [m.group('all') for m in self.request.cfg.cache.page_category_regex.finditer(body[pos:])]
00312 
00313     def _get_domains(self, page):
00314         """ Returns a generator with all the domains the page belongs to
00315 
00316         @param page: page
00317         """
00318         if page.isUnderlayPage():
00319             yield 'underlay'
00320         if page.isStandardPage():
00321             yield 'standard'
00322         if wikiutil.isSystemPage(self.request, page.page_name):
00323             yield 'system'
00324 
00325     def _index_page(self, request, connection, pagename, mode='update'):
00326         """ Index a page.
00327 
00328         Index all revisions (if wanted by configuration) and all attachments.
00329 
00330         @param request: request suitable for indexing
00331         @param connection: the Indexer connection object
00332         @param pagename: a page name
00333         @param mode: 'add' = just add, no checks
00334                      'update' = check if already in index and update if needed (mtime)
00335         """
00336         page = Page(request, pagename)
00337         revlist = page.getRevList() # recent revs first, does not include deleted revs
00338         logging.debug("indexing page %r, %d revs found" % (pagename, len(revlist)))
00339 
00340         if not revlist:
00341             # we have an empty revision list, that means the page is not there any more,
00342             # likely it (== all of its revisions, all of its attachments) got either renamed or nuked
00343             wikiname = request.cfg.interwikiname or u'Self'
00344 
00345             sc = self.get_search_connection()
00346             docs_to_delete = sc.get_all_documents_with_fields(wikiname=wikiname, pagename=pagename)
00347                                                               # any page rev, any attachment
00348             sc.close()
00349 
00350             for doc in docs_to_delete:
00351                 connection.delete(doc.id)
00352             logging.debug('page %s (all revs, all attachments) removed from xapian index' % pagename)
00353 
00354         else:
00355             if request.cfg.xapian_index_history:
00356                 index_revs, remove_revs = revlist, []
00357             else:
00358                 if page.exists(): # is current rev not deleted?
00359                     index_revs, remove_revs = revlist[:1], revlist[1:]
00360                 else:
00361                     index_revs, remove_revs = [], revlist
00362 
00363             for revno in index_revs:
00364                 updated = self._index_page_rev(request, connection, pagename, revno, mode=mode)
00365                 logging.debug("updated page %r rev %d (updated==%r)" % (pagename, revno, updated))
00366                 if not updated:
00367                     # we reached the revisions that are already present in the index
00368                     break
00369 
00370             for revno in remove_revs:
00371                 # XXX remove_revs can be rather long for pages with many revs and
00372                 # XXX most page revs usually will be already deleted. optimize?
00373                 self._remove_page_rev(request, connection, pagename, revno)
00374                 logging.debug("removed page %r rev %d" % (pagename, revno))
00375 
00376             from MoinMoin.action import AttachFile
00377             for attachmentname in AttachFile._get_files(request, pagename):
00378                 self._index_attachment(request, connection, pagename, attachmentname, mode)
00379 
00380     def _index_page_rev(self, request, connection, pagename, revno, mode='update'):
00381         """ Index a page revision.
00382 
00383         @param request: request suitable for indexing
00384         @param connection: the Indexer connection object
00385         @param pagename: the page name
00386         @param revno: page revision number (int)
00387         @param mode: 'add' = just add, no checks
00388                      'update' = check if already in index and update if needed (mtime)
00389         """
00390         page = Page(request, pagename, rev=revno)
00391         request.page = page # XXX for what is this needed?
00392 
00393         wikiname = request.cfg.interwikiname or u"Self"
00394         revision = str(page.get_real_rev())
00395         itemid = "%s:%s:%s" % (wikiname, pagename, revision)
00396         mtime = page.mtime_usecs()
00397 
00398         doc = self._get_document(connection, itemid, mtime, mode)
00399         logging.debug("%s %s %r" % (pagename, revision, doc))
00400         if doc:
00401             mimetype = 'text/%s' % page.pi['format']  # XXX improve this
00402 
00403             fields = {}
00404             fields['wikiname'] = wikiname
00405             fields['pagename'] = pagename
00406             fields['attachment'] = '' # this is a real page, not an attachment
00407             fields['mtime'] = str(mtime)
00408             fields['revision'] = revision
00409             fields['title'] = pagename
00410             fields['content'] = page.get_raw_body()
00411             fields['lang'], fields['stem_lang'] = self._get_languages(page)
00412             fields['author'] = page.edit_info().get('editor', '?')
00413 
00414             multivalued_fields = {}
00415             multivalued_fields['mimetype'] = [mt for mt in [mimetype] + mimetype.split('/')]
00416             multivalued_fields['domain'] = self._get_domains(page)
00417             multivalued_fields['linkto'] = page.getPageLinks(request)
00418             multivalued_fields['category'] = self._get_categories(page)
00419 
00420             self._add_fields_to_document(request, doc, fields, multivalued_fields)
00421 
00422             try:
00423                 connection.replace(doc)
00424             except xappy.IndexerError, err:
00425                 logging.warning("IndexerError at %r %r %r (%s)" % (
00426                     wikiname, pagename, revision, str(err)))
00427 
00428         return bool(doc)
00429 
00430     def _remove_page_rev(self, request, connection, pagename, revno):
00431         """ Remove a page revision from the index.
00432 
00433         @param request: request suitable for indexing
00434         @param connection: the Indexer connection object
00435         @param pagename: the page name
00436         @param revno: a real revision number (int), > 0
00437         """
00438         wikiname = request.cfg.interwikiname or u"Self"
00439         revision = str(revno)
00440         itemid = "%s:%s:%s" % (wikiname, pagename, revision)
00441         connection.delete(itemid)
00442         logging.debug('page %s, revision %d removed from index' % (pagename, revno))
00443 
00444     def _index_attachment(self, request, connection, pagename, attachmentname, mode='update'):
00445         """ Index an attachment
00446 
00447         @param request: request suitable for indexing
00448         @param connection: the Indexer connection object
00449         @param pagename: the page name
00450         @param attachmentname: the attachment's name
00451         @param mode: 'add' = just add, no checks
00452                      'update' = check if already in index and update if needed (mtime)
00453         """
00454         from MoinMoin.action import AttachFile
00455         wikiname = request.cfg.interwikiname or u"Self"
00456         itemid = "%s:%s//%s" % (wikiname, pagename, attachmentname)
00457 
00458         filename = AttachFile.getFilename(request, pagename, attachmentname)
00459         # check if the file is still there. as we might be doing queued index updates,
00460         # the file could be gone meanwhile...
00461         if os.path.exists(filename):
00462             mtime = wikiutil.timestamp2version(os.path.getmtime(filename))
00463             doc = self._get_document(connection, itemid, mtime, mode)
00464             logging.debug("%s %s %r" % (pagename, attachmentname, doc))
00465             if doc:
00466                 page = Page(request, pagename)
00467                 mimetype, att_content = self.contentfilter(filename)
00468 
00469                 fields = {}
00470                 fields['wikiname'] = wikiname
00471                 fields['pagename'] = pagename
00472                 fields['attachment'] = attachmentname
00473                 fields['mtime'] = str(mtime)
00474                 fields['revision'] = '0'
00475                 fields['title'] = '%s/%s' % (pagename, attachmentname)
00476                 fields['content'] = att_content
00477                 fields['lang'], fields['stem_lang'] = self._get_languages(page)
00478 
00479                 multivalued_fields = {}
00480                 multivalued_fields['mimetype'] = [mt for mt in [mimetype] + mimetype.split('/')]
00481                 multivalued_fields['domain'] = self._get_domains(page)
00482 
00483                 self._add_fields_to_document(request, doc, fields, multivalued_fields)
00484 
00485                 connection.replace(doc)
00486                 logging.debug('attachment %s (page %s) updated in index' % (attachmentname, pagename))
00487         else:
00488             # attachment file was deleted, remove it from index also
00489             connection.delete(itemid)
00490             logging.debug('attachment %s (page %s) removed from index' % (attachmentname, pagename))
00491 
00492     def _index_file(self, request, connection, filename, mode='update'):
00493         """ index files (that are NOT attachments, just arbitrary files)
00494 
00495         @param request: request suitable for indexing
00496         @param connection: the Indexer connection object
00497         @param filename: a filesystem file name
00498         @param mode: 'add' = just add, no checks
00499                      'update' = check if already in index and update if needed (mtime)
00500         """
00501         wikiname = request.cfg.interwikiname or u"Self"
00502         fs_rootpage = 'FS' # XXX FS hardcoded
00503 
00504         try:
00505             itemid = "%s:%s" % (wikiname, os.path.join(fs_rootpage, filename))
00506             mtime = wikiutil.timestamp2version(os.path.getmtime(filename))
00507 
00508             doc = self._get_document(connection, itemid, mtime, mode)
00509             logging.debug("%s %r" % (filename, doc))
00510             if doc:
00511                 mimetype, file_content = self.contentfilter(filename)
00512 
00513                 fields = {}
00514                 fields['wikiname'] = wikiname
00515                 fields['pagename'] = fs_rootpage
00516                 fields['attachment'] = filename # XXX we should treat files like real pages, not attachments
00517                 fields['mtime'] = str(mtime)
00518                 fields['revision'] = '0'
00519                 fields['title'] = " ".join(os.path.join(fs_rootpage, filename).split("/"))
00520                 fields['content'] = file_content
00521 
00522                 multivalued_fields = {}
00523                 multivalued_fields['mimetype'] = [mt for mt in [mimetype] + mimetype.split('/')]
00524 
00525                 self._add_fields_to_document(request, doc, fields, multivalued_fields)
00526 
00527                 connection.replace(doc)
00528 
00529         except (OSError, IOError, UnicodeError):
00530             logging.exception("_index_file crashed:")
00531 
00532     def _index_pages(self, request, files=None, mode='update', pages=None):
00533         """ Index all (given) pages (and all given files)
00534 
00535         This should be called from indexPages only!
00536 
00537         @param request: request suitable for indexing
00538         @param files: an optional list of files to index
00539         @param mode: 'add' = just add, no checks
00540                      'update' = check if already in index and update if needed (mtime)
00541         @param pages: list of pages to index, if not given, all pages are indexed
00542         """
00543         if pages is None:
00544             # Index all pages
00545             pages = request.rootpage.getPageList(user='', exists=1)
00546 
00547         try:
00548             connection = self.get_indexer_connection()
00549             self.touch()
00550             try:
00551                 logging.info("indexing %d pages..." % len(pages))
00552                 for pagename in pages:
00553                     self._index_page(request, connection, pagename, mode=mode)
00554                 if files:
00555                     logging.info("indexing all files...")
00556                     for fname in files:
00557                         fname = fname.strip()
00558                         self._index_file(request, connection, fname, mode)
00559             finally:
00560                 connection.close()
00561         except XapianDatabaseLockError:
00562             logging.warning("xapian index is locked, can't index.")
00563