Back to index

citadel  8.12
serv_rssclient.c
Go to the documentation of this file.
00001 /*
00002  * Bring external RSS feeds into rooms.
00003  *
00004  * Copyright (c) 2007-2012 by the citadel.org team
00005  *
00006  * This program is open source software; you can redistribute it and/or modify
00007  * it under the terms of the GNU General Public License version 3.
00008  *
00009  * This program is distributed in the hope that it will be useful,
00010  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
00012  * GNU General Public License for more details.
00013  */
00014 
00015 #include <stdlib.h>
00016 #include <unistd.h>
00017 #include <stdio.h>
00018 
00019 #if TIME_WITH_SYS_TIME
00020 # include <sys/time.h>
00021 # include <time.h>
00022 #else
00023 # if HAVE_SYS_TIME_H
00024 #include <sys/time.h>
00025 # else
00026 #include <time.h>
00027 # endif
00028 #endif
00029 
00030 #include <ctype.h>
00031 #include <string.h>
00032 #include <errno.h>
00033 #include <sys/types.h>
00034 #include <sys/stat.h>
00035 #include <expat.h>
00036 #include <curl/curl.h>
00037 #include <libcitadel.h>
00038 #include "citadel.h"
00039 #include "server.h"
00040 #include "citserver.h"
00041 #include "support.h"
00042 #include "config.h"
00043 #include "threads.h"
00044 #include "ctdl_module.h"
00045 #include "msgbase.h"
00046 #include "parsedate.h"
00047 #include "database.h"
00048 #include "citadel_dirs.h"
00049 #include "md5.h"
00050 #include "context.h"
00051 #include "event_client.h"
00052 #include "rss_atom_parser.h"
00053 
00054 
00055 #define TMP_MSGDATA 0xFF
00056 #define TMP_SHORTER_URL_OFFSET 0xFE
00057 #define TMP_SHORTER_URLS 0xFD
00058 
00059 time_t last_run = 0L;
00060 
00061 pthread_mutex_t RSSQueueMutex; /* locks the access to the following vars: */
00062 HashList *RSSQueueRooms = NULL; /* rss_room_counter */
00063 HashList *RSSFetchUrls = NULL; /*->rss_aggregator;->RefCount access locked*/
00064 
00065 eNextState RSSAggregator_Terminate(AsyncIO *IO);
00066 eNextState RSSAggregator_TerminateDB(AsyncIO *IO);
00067 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO);
00068 struct CitContext rss_CC;
00069 
00070 struct rssnetcfg *rnclist = NULL;
00071 int RSSClientDebugEnabled = 0;
00072 #define N ((rss_aggregator*)IO->Data)->QRnumber
00073 
00074 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (RSSClientDebugEnabled != 0))
00075 
00076 #define EVRSSC_syslog(LEVEL, FORMAT, ...)                      \
00077        DBGLOG(LEVEL) syslog(LEVEL,                             \
00078                           "IO[%ld]CC[%d][%ld]RSS" FORMAT,             \
00079                           IO->ID, CCID, N, __VA_ARGS__)
00080 
00081 #define EVRSSCM_syslog(LEVEL, FORMAT)                                 \
00082        DBGLOG(LEVEL) syslog(LEVEL,                             \
00083                           "IO[%ld]CC[%d][%ld]RSS" FORMAT,             \
00084                           IO->ID, CCID, N)
00085 
00086 #define EVRSSQ_syslog(LEVEL, FORMAT, ...)                      \
00087        DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT,               \
00088                           __VA_ARGS__)
00089 #define EVRSSQM_syslog(LEVEL, FORMAT)                   \
00090        DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT)
00091 
00092 #define EVRSSCSM_syslog(LEVEL, FORMAT)                                \
00093        DBGLOG(LEVEL) syslog(LEVEL, "IO[%ld][%ld]RSS" FORMAT,          \
00094                           IO->ID, N)
00095 
00096 void DeleteRoomReference(long QRnumber)
00097 {
00098        HashPos *At;
00099        long HKLen;
00100        const char *HK;
00101        void *vData = NULL;
00102        rss_room_counter *pRoomC;
00103 
00104        At = GetNewHashPos(RSSQueueRooms, 0);
00105 
00106        if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
00107        {
00108               GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
00109               if (vData != NULL)
00110               {
00111                      pRoomC = (rss_room_counter *) vData;
00112                      pRoomC->count --;
00113                      if (pRoomC->count == 0)
00114                             DeleteEntryFromHash(RSSQueueRooms, At);
00115               }
00116        }
00117        DeleteHashPos(&At);
00118 }
00119 
00120 void UnlinkRooms(rss_aggregator *RSSAggr)
00121 {
00122        DeleteRoomReference(RSSAggr->QRnumber);
00123        if (RSSAggr->OtherQRnumbers != NULL)
00124        {
00125               long HKLen;
00126               const char *HK;
00127               HashPos *At;
00128               void *vData;
00129 
00130               At = GetNewHashPos(RSSAggr->OtherQRnumbers, 0);
00131               while (! server_shutting_down &&
00132                      GetNextHashPos(RSSAggr->OtherQRnumbers,
00133                                   At,
00134                                   &HKLen, &HK,
00135                                   &vData) &&
00136                      (vData != NULL))
00137               {
00138                      long *lData = (long*) vData;
00139                      DeleteRoomReference(*lData);
00140               }
00141 
00142               DeleteHashPos(&At);
00143        }
00144 }
00145 
00146 void UnlinkRSSAggregator(rss_aggregator *RSSAggr)
00147 {
00148        HashPos *At;
00149 
00150        pthread_mutex_lock(&RSSQueueMutex);
00151        UnlinkRooms(RSSAggr);
00152 
00153        At = GetNewHashPos(RSSFetchUrls, 0);
00154        if (GetHashPosFromKey(RSSFetchUrls, SKEY(RSSAggr->Url), At))
00155        {
00156               DeleteEntryFromHash(RSSFetchUrls, At);
00157        }
00158        DeleteHashPos(&At);
00159        last_run = time(NULL);
00160        pthread_mutex_unlock(&RSSQueueMutex);
00161 }
00162 
00163 void DeleteRssCfg(void *vptr)
00164 {
00165        rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
00166        AsyncIO *IO = &RSSAggr->IO;
00167        EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n");
00168 
00169        FreeStrBuf(&RSSAggr->Url);
00170        FreeStrBuf(&RSSAggr->rooms);
00171        FreeStrBuf(&RSSAggr->CData);
00172        FreeStrBuf(&RSSAggr->Key);
00173        DeleteHash(&RSSAggr->OtherQRnumbers);
00174 
00175        DeleteHashPos (&RSSAggr->Pos);
00176        DeleteHash (&RSSAggr->Messages);
00177        if (RSSAggr->recp.recp_room != NULL)
00178               free(RSSAggr->recp.recp_room);
00179 
00180 
00181        if (RSSAggr->Item != NULL)
00182        {
00183               flush_rss_item(RSSAggr->Item);
00184 
00185               free(RSSAggr->Item);
00186        }
00187 
00188        FreeAsyncIOContents(&RSSAggr->IO);
00189        memset(RSSAggr, 0, sizeof(rss_aggregator));
00190        free(RSSAggr);
00191 }
00192 
00193 eNextState RSSAggregator_Terminate(AsyncIO *IO)
00194 {
00195        rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
00196 
00197        EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
00198 
00199        StopCurlWatchers(IO);
00200        UnlinkRSSAggregator(RSSAggr);
00201        return eAbort;
00202 }
00203 
00204 eNextState RSSAggregator_TerminateDB(AsyncIO *IO)
00205 {
00206        rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
00207 
00208        EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");
00209 
00210 
00211        StopDBWatchers(&RSSAggr->IO);
00212        UnlinkRSSAggregator(RSSAggr);
00213        return eAbort;
00214 }
00215 
00216 eNextState RSSAggregator_ShutdownAbort(AsyncIO *IO)
00217 {
00218        const char *pUrl;
00219        rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;
00220 
00221        pUrl = IO->ConnectMe->PlainUrl;
00222        if (pUrl == NULL)
00223               pUrl = "";
00224 
00225        EVRSSC_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);
00226 
00227        StopCurlWatchers(IO);
00228        UnlinkRSSAggregator(RSSAggr);
00229        return eAbort;
00230 }
00231 
00232 eNextState RSSSaveMessage(AsyncIO *IO)
00233 {
00234        long len;
00235        const char *Key;
00236        rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;
00237 
00238        RSSAggr->ThisMsg->Msg.cm_fields['M'] =
00239               SmashStrBuf(&RSSAggr->ThisMsg->Message);
00240 
00241        CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);
00242 
00243        /* write the uidl to the use table so we don't store this item again */
00244        cdb_store(CDB_USETABLE,
00245                 SKEY(RSSAggr->ThisMsg->MsgGUID),
00246                 &RSSAggr->ThisMsg->ut,
00247                 sizeof(struct UseTable) );
00248 
00249        if (GetNextHashPos(RSSAggr->Messages,
00250                         RSSAggr->Pos,
00251                         &len, &Key,
00252                         (void**) &RSSAggr->ThisMsg))
00253               return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
00254        else
00255               return eAbort;
00256 }
00257 
00258 eNextState RSS_FetchNetworkUsetableEntry(AsyncIO *IO)
00259 {
00260        const char *Key;
00261        long len;
00262        struct cdbdata *cdbut;
00263        rss_aggregator *Ctx = (rss_aggregator *) IO->Data;
00264 
00265        /* Find out if we've already seen this item */
00266        strcpy(Ctx->ThisMsg->ut.ut_msgid,
00267               ChrPtr(Ctx->ThisMsg->MsgGUID)); 
00268        Ctx->ThisMsg->ut.ut_timestamp = time(NULL);
00269 
00270        cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
00271 #ifndef DEBUG_RSS
00272        if (cdbut != NULL) {
00273               /* Item has already been seen */
00274               EVRSSC_syslog(LOG_DEBUG,
00275                        "%s has already been seen\n",
00276                        ChrPtr(Ctx->ThisMsg->MsgGUID));
00277               cdb_free(cdbut);
00278 
00279               /* rewrite the record anyway, to update the timestamp */
00280               cdb_store(CDB_USETABLE,
00281                        SKEY(Ctx->ThisMsg->MsgGUID),
00282                        &Ctx->ThisMsg->ut, sizeof(struct UseTable) );
00283 
00284               if (GetNextHashPos(Ctx->Messages,
00285                                Ctx->Pos,
00286                                &len, &Key,
00287                                (void**) &Ctx->ThisMsg))
00288                      return NextDBOperation(
00289                             IO,
00290                             RSS_FetchNetworkUsetableEntry);
00291               else
00292                      return eAbort;
00293        }
00294        else
00295 #endif
00296        {
00297               NextDBOperation(IO, RSSSaveMessage);
00298               return eSendMore;
00299        }
00300 }
00301 
00302 /*
00303  * Begin a feed parse
00304  */
00305 int rss_do_fetching(rss_aggregator *RSSAggr)
00306 {
00307        AsyncIO              *IO = &RSSAggr->IO;
00308        rss_item *ri;
00309        time_t now;
00310 
00311        now = time(NULL);
00312 
00313        if ((RSSAggr->next_poll != 0) && (now < RSSAggr->next_poll))
00314               return 0;
00315 
00316        ri = (rss_item*) malloc(sizeof(rss_item));
00317        memset(ri, 0, sizeof(rss_item));
00318        RSSAggr->Item = ri;
00319 
00320        if (! InitcURLIOStruct(&RSSAggr->IO,
00321                             RSSAggr,
00322                             "Citadel RSS Client",
00323                             RSSAggregator_ParseReply,
00324                             RSSAggregator_Terminate,
00325                             RSSAggregator_TerminateDB,
00326                             RSSAggregator_ShutdownAbort))
00327        {
00328               EVRSSCM_syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
00329               return 0;
00330        }
00331 
00332        safestrncpy(((CitContext*)RSSAggr->IO.CitContext)->cs_host,
00333                   ChrPtr(RSSAggr->Url),
00334                   sizeof(((CitContext*)RSSAggr->IO.CitContext)->cs_host));
00335 
00336        EVRSSC_syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(RSSAggr->Url));
00337        ParseURL(&RSSAggr->IO.ConnectMe, RSSAggr->Url, 80);
00338        CurlPrepareURL(RSSAggr->IO.ConnectMe);
00339 
00340        QueueCurlContext(&RSSAggr->IO);
00341        return 1;
00342 }
00343 
00344 /*
00345  * Scan a room's netconfig to determine whether it is requesting any RSS feeds
00346  */
00347 void rssclient_scan_room(struct ctdlroom *qrbuf, void *data)
00348 {
00349        StrBuf *CfgData=NULL;
00350        StrBuf *CfgType;
00351        StrBuf *Line;
00352        rss_room_counter *Count = NULL;
00353        struct stat statbuf;
00354        char filename[PATH_MAX];
00355        int fd;
00356        int Done;
00357        rss_aggregator *RSSAggr = NULL;
00358        rss_aggregator *use_this_RSSAggr = NULL;
00359        void *vptr;
00360        const char *CfgPtr, *lPtr;
00361        const char *Err;
00362 
00363        pthread_mutex_lock(&RSSQueueMutex);
00364        if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
00365        {
00366               EVRSSQ_syslog(LOG_DEBUG,
00367                            "rssclient: [%ld] %s already in progress.\n",
00368                            qrbuf->QRnumber,
00369                            qrbuf->QRname);
00370               pthread_mutex_unlock(&RSSQueueMutex);
00371               return;
00372        }
00373        pthread_mutex_unlock(&RSSQueueMutex);
00374 
00375        assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);
00376 
00377        if (server_shutting_down)
00378               return;
00379 
00380        /* Only do net processing for rooms that have netconfigs */
00381        fd = open(filename, 0);
00382        if (fd <= 0) {
00383               /* syslog(LOG_DEBUG,
00384                  "rssclient: %s no config.\n",
00385                  qrbuf->QRname); */
00386               return;
00387        }
00388 
00389        if (server_shutting_down)
00390               return;
00391 
00392        if (fstat(fd, &statbuf) == -1) {
00393               EVRSSQ_syslog(LOG_DEBUG,
00394                            "ERROR: could not stat configfile '%s' - %s\n",
00395                            filename,
00396                            strerror(errno));
00397               return;
00398        }
00399 
00400        if (server_shutting_down)
00401               return;
00402 
00403        CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);
00404 
00405        if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
00406               close(fd);
00407               FreeStrBuf(&CfgData);
00408               EVRSSQ_syslog(LOG_ERR, "ERROR: reading config '%s' - %s<br>\n",
00409                            filename, strerror(errno));
00410               return;
00411        }
00412        close(fd);
00413        if (server_shutting_down)
00414               return;
00415 
00416        CfgPtr = NULL;
00417        CfgType = NewStrBuf();
00418        Line = NewStrBufPlain(NULL, StrLength(CfgData));
00419        Done = 0;
00420        while (!Done)
00421        {
00422               Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
00423               if (StrLength(Line) > 0)
00424               {
00425                      lPtr = NULL;
00426                      StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
00427                      if (!strcasecmp("rssclient", ChrPtr(CfgType)))
00428                      {
00429                             if (Count == NULL)
00430                             {
00431                                    Count = malloc(
00432                                           sizeof(rss_room_counter));
00433                                    Count->count = 0;
00434                             }
00435                             Count->count ++;
00436                             RSSAggr = (rss_aggregator *) malloc(
00437                                    sizeof(rss_aggregator));
00438 
00439                             memset (RSSAggr, 0, sizeof(rss_aggregator));
00440                             RSSAggr->QRnumber = qrbuf->QRnumber;
00441                             RSSAggr->roomlist_parts = 1;
00442                             RSSAggr->Url = NewStrBuf();
00443 
00444                             StrBufExtract_NextToken(RSSAggr->Url,
00445                                                  Line,
00446                                                  &lPtr,
00447                                                  '|');
00448 
00449                             pthread_mutex_lock(&RSSQueueMutex);
00450                             GetHash(RSSFetchUrls,
00451                                    SKEY(RSSAggr->Url),
00452                                    &vptr);
00453 
00454                             use_this_RSSAggr = (rss_aggregator *)vptr;
00455                             if (use_this_RSSAggr != NULL)
00456                             {
00457                                    long *QRnumber;
00458                                    StrBufAppendBufPlain(
00459                                           use_this_RSSAggr->rooms,
00460                                           qrbuf->QRname,
00461                                           -1, 0);
00462                                    if (use_this_RSSAggr->roomlist_parts==1)
00463                                    {
00464                                           use_this_RSSAggr->OtherQRnumbers
00465                                                  = NewHash(1, lFlathash);
00466                                    }
00467                                    QRnumber = (long*)malloc(sizeof(long));
00468                                    *QRnumber = qrbuf->QRnumber;
00469                                    Put(use_this_RSSAggr->OtherQRnumbers,
00470                                        LKEY(qrbuf->QRnumber),
00471                                        QRnumber,
00472                                        NULL);
00473                                    use_this_RSSAggr->roomlist_parts++;
00474 
00475                                    pthread_mutex_unlock(&RSSQueueMutex);
00476 
00477                                    FreeStrBuf(&RSSAggr->Url);
00478                                    free(RSSAggr);
00479                                    RSSAggr = NULL;
00480                                    continue;
00481                             }
00482                             pthread_mutex_unlock(&RSSQueueMutex);
00483 
00484                             RSSAggr->ItemType = RSS_UNSET;
00485 
00486                             RSSAggr->rooms = NewStrBufPlain(
00487                                    qrbuf->QRname, -1);
00488 
00489                             pthread_mutex_lock(&RSSQueueMutex);
00490 
00491                             Put(RSSFetchUrls,
00492                                 SKEY(RSSAggr->Url),
00493                                 RSSAggr,
00494                                 DeleteRssCfg);
00495 
00496                             pthread_mutex_unlock(&RSSQueueMutex);
00497                      }
00498               }
00499        }
00500        if (Count != NULL)
00501        {
00502               Count->QRnumber = qrbuf->QRnumber;
00503               pthread_mutex_lock(&RSSQueueMutex);
00504               EVRSSQ_syslog(LOG_DEBUG, "client: [%ld] %s now starting.\n",
00505                            qrbuf->QRnumber, qrbuf->QRname);
00506               Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
00507               pthread_mutex_unlock(&RSSQueueMutex);
00508        }
00509        FreeStrBuf(&CfgData);
00510        FreeStrBuf(&CfgType);
00511        FreeStrBuf(&Line);
00512 }
00513 
00514 /*
00515  * Scan for rooms that have RSS client requests configured
00516  */
00517 void rssclient_scan(void) {
00518        int RSSRoomCount, RSSCount;
00519        rss_aggregator *rptr = NULL;
00520        void *vrptr = NULL;
00521        HashPos *it;
00522        long len;
00523        const char *Key;
00524        time_t now = time(NULL);
00525 
00526        /* Run no more than once every 15 minutes. */
00527        if ((now - last_run) < 900) {
00528               EVRSSQ_syslog(LOG_DEBUG,
00529                            "Client: polling interval not yet reached; last run was %ldm%lds ago",
00530                            ((now - last_run) / 60),
00531                            ((now - last_run) % 60)
00532               );
00533               return;
00534        }
00535 
00536        /*
00537         * This is a simple concurrency check to make sure only one rssclient
00538         * run is done at a time.
00539         */
00540        pthread_mutex_lock(&RSSQueueMutex);
00541        RSSCount = GetCount(RSSFetchUrls);
00542        RSSRoomCount = GetCount(RSSQueueRooms);
00543        pthread_mutex_unlock(&RSSQueueMutex);
00544 
00545        if ((RSSRoomCount > 0) || (RSSCount > 0)) {
00546               EVRSSQ_syslog(LOG_DEBUG,
00547                            "rssclient: concurrency check failed; %d rooms and %d url's are queued",
00548                            RSSRoomCount, RSSCount
00549                      );
00550               return;
00551        }
00552 
00553        become_session(&rss_CC);
00554        EVRSSQM_syslog(LOG_DEBUG, "rssclient started\n");
00555        CtdlForEachRoom(rssclient_scan_room, NULL);
00556 
00557        pthread_mutex_lock(&RSSQueueMutex);
00558 
00559        it = GetNewHashPos(RSSFetchUrls, 0);
00560        while (!server_shutting_down &&
00561               GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
00562               (vrptr != NULL)) {
00563               rptr = (rss_aggregator *)vrptr;
00564               if (!rss_do_fetching(rptr))
00565                      UnlinkRSSAggregator(rptr);
00566        }
00567        DeleteHashPos(&it);
00568        pthread_mutex_unlock(&RSSQueueMutex);
00569 
00570        EVRSSQM_syslog(LOG_DEBUG, "rssclient ended\n");
00571        return;
00572 }
00573 
00574 void rss_cleanup(void)
00575 {
00576        /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
00577        DeleteHash(&RSSFetchUrls);
00578        DeleteHash(&RSSQueueRooms);
00579 }
00580 
00581 void LogDebugEnableRSSClient(const int n)
00582 {
00583        RSSClientDebugEnabled = n;
00584 }
00585 
00586 CTDL_MODULE_INIT(rssclient)
00587 {
00588        if (threading)
00589        {
00590               CtdlFillSystemContext(&rss_CC, "rssclient");
00591               pthread_mutex_init(&RSSQueueMutex, NULL);
00592               RSSQueueRooms = NewHash(1, lFlathash);
00593               RSSFetchUrls = NewHash(1, NULL);
00594               syslog(LOG_INFO, "%s\n", curl_version());
00595               CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER, PRIO_AGGR + 300);
00596               CtdlRegisterEVCleanupHook(rss_cleanup);
00597               CtdlRegisterDebugFlagHook(HKEY("rssclient"), LogDebugEnableRSSClient, &RSSClientDebugEnabled);
00598        }
00599        return "rssclient";
00600 }