Back to index

citadel  8.12
Defines | Functions | Variables
serv_rssclient.c File Reference
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <time.h>
#include <ctype.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <expat.h>
#include <curl/curl.h>
#include <libcitadel.h>
#include "citadel.h"
#include "server.h"
#include "citserver.h"
#include "support.h"
#include "config.h"
#include "threads.h"
#include "ctdl_module.h"
#include "msgbase.h"
#include "parsedate.h"
#include "database.h"
#include "citadel_dirs.h"
#include "md5.h"
#include "context.h"
#include "event_client.h"
#include "rss_atom_parser.h"

Go to the source code of this file.

Defines

#define TMP_MSGDATA   0xFF
#define TMP_SHORTER_URL_OFFSET   0xFE
#define TMP_SHORTER_URLS   0xFD
#define N   ((rss_aggregator*)IO->Data)->QRnumber
#define DBGLOG(LEVEL)   if ((LEVEL != LOG_DEBUG) || (RSSClientDebugEnabled != 0))
#define EVRSSC_syslog(LEVEL, FORMAT,...)
#define EVRSSCM_syslog(LEVEL, FORMAT)
#define EVRSSQ_syslog(LEVEL, FORMAT,...)
#define EVRSSQM_syslog(LEVEL, FORMAT)   DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT)
#define EVRSSCSM_syslog(LEVEL, FORMAT)

Functions

eNextState RSSAggregator_Terminate (AsyncIO *IO)
eNextState RSSAggregator_TerminateDB (AsyncIO *IO)
eNextState RSSAggregator_ShutdownAbort (AsyncIO *IO)
void DeleteRoomReference (long QRnumber)
void UnlinkRooms (rss_aggregator *RSSAggr)
void UnlinkRSSAggregator (rss_aggregator *RSSAggr)
void DeleteRssCfg (void *vptr)
eNextState RSSSaveMessage (AsyncIO *IO)
eNextState RSS_FetchNetworkUsetableEntry (AsyncIO *IO)
int rss_do_fetching (rss_aggregator *RSSAggr)
void rssclient_scan_room (struct ctdlroom *qrbuf, void *data)
void rssclient_scan (void)
void rss_cleanup (void)
void LogDebugEnableRSSClient (const int n)
 CTDL_MODULE_INIT (rssclient)

Variables

time_t last_run = 0L
pthread_mutex_t RSSQueueMutex
HashList * RSSQueueRooms = NULL
HashList * RSSFetchUrls = NULL
struct rssnetcfg * rnclist = NULL
int RSSClientDebugEnabled = 0

Define Documentation

#define DBGLOG (   LEVEL)    if ((LEVEL != LOG_DEBUG) || (RSSClientDebugEnabled != 0))

Definition at line 74 of file serv_rssclient.c.

#define EVRSSC_syslog (   LEVEL,
  FORMAT,
  ... 
)
Value:
DBGLOG(LEVEL) syslog(LEVEL,                             \
                          "IO[%ld]CC[%d][%ld]RSS" FORMAT,             \
                          IO->ID, CCID, N, __VA_ARGS__)

Definition at line 76 of file serv_rssclient.c.

#define EVRSSCM_syslog (   LEVEL,
  FORMAT 
)
Value:
DBGLOG(LEVEL) syslog(LEVEL,                             \
                          "IO[%ld]CC[%d][%ld]RSS" FORMAT,             \
                          IO->ID, CCID, N)

Definition at line 81 of file serv_rssclient.c.

#define EVRSSCSM_syslog (   LEVEL,
  FORMAT 
)
Value:
DBGLOG(LEVEL) syslog(LEVEL, "IO[%ld][%ld]RSS" FORMAT,          \
                          IO->ID, N)

Definition at line 92 of file serv_rssclient.c.

#define EVRSSQ_syslog (   LEVEL,
  FORMAT,
  ... 
)
Value:
DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT,               \
                          __VA_ARGS__)

Definition at line 86 of file serv_rssclient.c.

#define EVRSSQM_syslog (   LEVEL,
  FORMAT 
)    DBGLOG(LEVEL) syslog(LEVEL, "RSS" FORMAT)

Definition at line 89 of file serv_rssclient.c.

#define N   ((rss_aggregator*)IO->Data)->QRnumber

Definition at line 72 of file serv_rssclient.c.

#define TMP_MSGDATA   0xFF

Definition at line 55 of file serv_rssclient.c.

#define TMP_SHORTER_URL_OFFSET   0xFE

Definition at line 56 of file serv_rssclient.c.

#define TMP_SHORTER_URLS   0xFD

Definition at line 57 of file serv_rssclient.c.


Function Documentation

CTDL_MODULE_INIT ( rssclient  )

Definition at line 586 of file serv_rssclient.c.

{
       if (threading)
       {
              CtdlFillSystemContext(&rss_CC, "rssclient");
              pthread_mutex_init(&RSSQueueMutex, NULL);
              RSSQueueRooms = NewHash(1, lFlathash);
              RSSFetchUrls = NewHash(1, NULL);
              syslog(LOG_INFO, "%s\n", curl_version());
              CtdlRegisterSessionHook(rssclient_scan, EVT_TIMER, PRIO_AGGR + 300);
              CtdlRegisterEVCleanupHook(rss_cleanup);
              CtdlRegisterDebugFlagHook(HKEY("rssclient"), LogDebugEnableRSSClient, &RSSClientDebugEnabled);
       }
       return "rssclient";
}

Here is the call graph for this function:

void DeleteRoomReference ( long  QRnumber)

Definition at line 96 of file serv_rssclient.c.

{
       HashPos *At;
       long HKLen;
       const char *HK;
       void *vData = NULL;
       rss_room_counter *pRoomC;

       At = GetNewHashPos(RSSQueueRooms, 0);

       if (GetHashPosFromKey(RSSQueueRooms, LKEY(QRnumber), At))
       {
              GetHashPos(RSSQueueRooms, At, &HKLen, &HK, &vData);
              if (vData != NULL)
              {
                     pRoomC = (rss_room_counter *) vData;
                     pRoomC->count --;
                     if (pRoomC->count == 0)
                            DeleteEntryFromHash(RSSQueueRooms, At);
              }
       }
       DeleteHashPos(&At);
}

Here is the caller graph for this function:

void DeleteRssCfg ( void *  vptr)

Definition at line 163 of file serv_rssclient.c.

{
       rss_aggregator *RSSAggr = (rss_aggregator *)vptr;
       AsyncIO *IO = &RSSAggr->IO;
       EVRSSCM_syslog(LOG_DEBUG, "RSS: destroying\n");

       FreeStrBuf(&RSSAggr->Url);
       FreeStrBuf(&RSSAggr->rooms);
       FreeStrBuf(&RSSAggr->CData);
       FreeStrBuf(&RSSAggr->Key);
       DeleteHash(&RSSAggr->OtherQRnumbers);

       DeleteHashPos (&RSSAggr->Pos);
       DeleteHash (&RSSAggr->Messages);
       if (RSSAggr->recp.recp_room != NULL)
              free(RSSAggr->recp.recp_room);


       if (RSSAggr->Item != NULL)
       {
              flush_rss_item(RSSAggr->Item);

              free(RSSAggr->Item);
       }

       FreeAsyncIOContents(&RSSAggr->IO);
       memset(RSSAggr, 0, sizeof(rss_aggregator));
       free(RSSAggr);
}

Here is the call graph for this function:

Here is the caller graph for this function:

void LogDebugEnableRSSClient ( const int  n)

Definition at line 581 of file serv_rssclient.c.

Here is the caller graph for this function:

void rss_cleanup ( void  )

Definition at line 574 of file serv_rssclient.c.

{
       /* citthread_mutex_destroy(&RSSQueueMutex); TODO */
       DeleteHash(&RSSFetchUrls);
       DeleteHash(&RSSQueueRooms);
}

Here is the caller graph for this function:

int rss_do_fetching ( rss_aggregator RSSAggr)

Definition at line 305 of file serv_rssclient.c.

{
       AsyncIO              *IO = &RSSAggr->IO;
       rss_item *ri;
       time_t now;

       now = time(NULL);

       if ((RSSAggr->next_poll != 0) && (now < RSSAggr->next_poll))
              return 0;

       ri = (rss_item*) malloc(sizeof(rss_item));
       memset(ri, 0, sizeof(rss_item));
       RSSAggr->Item = ri;

       if (! InitcURLIOStruct(&RSSAggr->IO,
                            RSSAggr,
                            "Citadel RSS Client",
                            RSSAggregator_ParseReply,
                            RSSAggregator_Terminate,
                            RSSAggregator_TerminateDB,
                            RSSAggregator_ShutdownAbort))
       {
              EVRSSCM_syslog(LOG_ALERT, "Unable to initialize libcurl.\n");
              return 0;
       }

       safestrncpy(((CitContext*)RSSAggr->IO.CitContext)->cs_host,
                  ChrPtr(RSSAggr->Url),
                  sizeof(((CitContext*)RSSAggr->IO.CitContext)->cs_host));

       EVRSSC_syslog(LOG_DEBUG, "Fetching RSS feed <%s>\n", ChrPtr(RSSAggr->Url));
       ParseURL(&RSSAggr->IO.ConnectMe, RSSAggr->Url, 80);
       CurlPrepareURL(RSSAggr->IO.ConnectMe);

       QueueCurlContext(&RSSAggr->IO);
       return 1;
}

Here is the call graph for this function:

Here is the caller graph for this function:

TODO

Definition at line 258 of file serv_rssclient.c.

{
       const char *Key;
       long len;
       struct cdbdata *cdbut;
       rss_aggregator *Ctx = (rss_aggregator *) IO->Data;

       /* Find out if we've already seen this item */
       strcpy(Ctx->ThisMsg->ut.ut_msgid,
              ChrPtr(Ctx->ThisMsg->MsgGUID)); 
       Ctx->ThisMsg->ut.ut_timestamp = time(NULL);

       cdbut = cdb_fetch(CDB_USETABLE, SKEY(Ctx->ThisMsg->MsgGUID));
#ifndef DEBUG_RSS
       if (cdbut != NULL) {
              /* Item has already been seen */
              EVRSSC_syslog(LOG_DEBUG,
                       "%s has already been seen\n",
                       ChrPtr(Ctx->ThisMsg->MsgGUID));
              cdb_free(cdbut);

              /* rewrite the record anyway, to update the timestamp */
              cdb_store(CDB_USETABLE,
                       SKEY(Ctx->ThisMsg->MsgGUID),
                       &Ctx->ThisMsg->ut, sizeof(struct UseTable) );

              if (GetNextHashPos(Ctx->Messages,
                               Ctx->Pos,
                               &len, &Key,
                               (void**) &Ctx->ThisMsg))
                     return NextDBOperation(
                            IO,
                            RSS_FetchNetworkUsetableEntry);
              else
                     return eAbort;
       }
       else
#endif
       {
              NextDBOperation(IO, RSSSaveMessage);
              return eSendMore;
       }
}

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 216 of file serv_rssclient.c.

{
       const char *pUrl;
       rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;

       pUrl = IO->ConnectMe->PlainUrl;
       if (pUrl == NULL)
              pUrl = "";

       EVRSSC_syslog(LOG_DEBUG, "RSS: Aborting by shutdown: %s.\n", pUrl);

       StopCurlWatchers(IO);
       UnlinkRSSAggregator(RSSAggr);
       return eAbort;
}

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 193 of file serv_rssclient.c.

{
       rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;

       EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");

       StopCurlWatchers(IO);
       UnlinkRSSAggregator(RSSAggr);
       return eAbort;
}

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 204 of file serv_rssclient.c.

{
       rss_aggregator *RSSAggr = (rss_aggregator *)IO->Data;

       EVRSSCM_syslog(LOG_DEBUG, "RSS: Terminating.\n");


       StopDBWatchers(&RSSAggr->IO);
       UnlinkRSSAggregator(RSSAggr);
       return eAbort;
}

Here is the call graph for this function:

Here is the caller graph for this function:

void rssclient_scan ( void  )

Definition at line 517 of file serv_rssclient.c.

                          {
       int RSSRoomCount, RSSCount;
       rss_aggregator *rptr = NULL;
       void *vrptr = NULL;
       HashPos *it;
       long len;
       const char *Key;
       time_t now = time(NULL);

       /* Run no more than once every 15 minutes. */
       if ((now - last_run) < 900) {
              EVRSSQ_syslog(LOG_DEBUG,
                           "Client: polling interval not yet reached; last run was %ldm%lds ago",
                           ((now - last_run) / 60),
                           ((now - last_run) % 60)
              );
              return;
       }

       /*
        * This is a simple concurrency check to make sure only one rssclient
        * run is done at a time.
        */
       pthread_mutex_lock(&RSSQueueMutex);
       RSSCount = GetCount(RSSFetchUrls);
       RSSRoomCount = GetCount(RSSQueueRooms);
       pthread_mutex_unlock(&RSSQueueMutex);

       if ((RSSRoomCount > 0) || (RSSCount > 0)) {
              EVRSSQ_syslog(LOG_DEBUG,
                           "rssclient: concurrency check failed; %d rooms and %d url's are queued",
                           RSSRoomCount, RSSCount
                     );
              return;
       }

       become_session(&rss_CC);
       EVRSSQM_syslog(LOG_DEBUG, "rssclient started\n");
       CtdlForEachRoom(rssclient_scan_room, NULL);

       pthread_mutex_lock(&RSSQueueMutex);

       it = GetNewHashPos(RSSFetchUrls, 0);
       while (!server_shutting_down &&
              GetNextHashPos(RSSFetchUrls, it, &len, &Key, &vrptr) &&
              (vrptr != NULL)) {
              rptr = (rss_aggregator *)vrptr;
              if (!rss_do_fetching(rptr))
                     UnlinkRSSAggregator(rptr);
       }
       DeleteHashPos(&it);
       pthread_mutex_unlock(&RSSQueueMutex);

       EVRSSQM_syslog(LOG_DEBUG, "rssclient ended\n");
       return;
}

Here is the call graph for this function:

Here is the caller graph for this function:

void rssclient_scan_room ( struct ctdlroom qrbuf,
void *  data 
)

Definition at line 347 of file serv_rssclient.c.

{
       StrBuf *CfgData=NULL;
       StrBuf *CfgType;
       StrBuf *Line;
       rss_room_counter *Count = NULL;
       struct stat statbuf;
       char filename[PATH_MAX];
       int fd;
       int Done;
       rss_aggregator *RSSAggr = NULL;
       rss_aggregator *use_this_RSSAggr = NULL;
       void *vptr;
       const char *CfgPtr, *lPtr;
       const char *Err;

       pthread_mutex_lock(&RSSQueueMutex);
       if (GetHash(RSSQueueRooms, LKEY(qrbuf->QRnumber), &vptr))
       {
              EVRSSQ_syslog(LOG_DEBUG,
                           "rssclient: [%ld] %s already in progress.\n",
                           qrbuf->QRnumber,
                           qrbuf->QRname);
              pthread_mutex_unlock(&RSSQueueMutex);
              return;
       }
       pthread_mutex_unlock(&RSSQueueMutex);

       assoc_file_name(filename, sizeof filename, qrbuf, ctdl_netcfg_dir);

       if (server_shutting_down)
              return;

       /* Only do net processing for rooms that have netconfigs */
       fd = open(filename, 0);
       if (fd <= 0) {
              /* syslog(LOG_DEBUG,
                 "rssclient: %s no config.\n",
                 qrbuf->QRname); */
              return;
       }

       if (server_shutting_down)
              return;

       if (fstat(fd, &statbuf) == -1) {
              EVRSSQ_syslog(LOG_DEBUG,
                           "ERROR: could not stat configfile '%s' - %s\n",
                           filename,
                           strerror(errno));
              return;
       }

       if (server_shutting_down)
              return;

       CfgData = NewStrBufPlain(NULL, statbuf.st_size + 1);

       if (StrBufReadBLOB(CfgData, &fd, 1, statbuf.st_size, &Err) < 0) {
              close(fd);
              FreeStrBuf(&CfgData);
              EVRSSQ_syslog(LOG_ERR, "ERROR: reading config '%s' - %s<br>\n",
                           filename, strerror(errno));
              return;
       }
       close(fd);
       if (server_shutting_down)
              return;

       CfgPtr = NULL;
       CfgType = NewStrBuf();
       Line = NewStrBufPlain(NULL, StrLength(CfgData));
       Done = 0;
       while (!Done)
       {
              Done = StrBufSipLine(Line, CfgData, &CfgPtr) == 0;
              if (StrLength(Line) > 0)
              {
                     lPtr = NULL;
                     StrBufExtract_NextToken(CfgType, Line, &lPtr, '|');
                     if (!strcasecmp("rssclient", ChrPtr(CfgType)))
                     {
                            if (Count == NULL)
                            {
                                   Count = malloc(
                                          sizeof(rss_room_counter));
                                   Count->count = 0;
                            }
                            Count->count ++;
                            RSSAggr = (rss_aggregator *) malloc(
                                   sizeof(rss_aggregator));

                            memset (RSSAggr, 0, sizeof(rss_aggregator));
                            RSSAggr->QRnumber = qrbuf->QRnumber;
                            RSSAggr->roomlist_parts = 1;
                            RSSAggr->Url = NewStrBuf();

                            StrBufExtract_NextToken(RSSAggr->Url,
                                                 Line,
                                                 &lPtr,
                                                 '|');

                            pthread_mutex_lock(&RSSQueueMutex);
                            GetHash(RSSFetchUrls,
                                   SKEY(RSSAggr->Url),
                                   &vptr);

                            use_this_RSSAggr = (rss_aggregator *)vptr;
                            if (use_this_RSSAggr != NULL)
                            {
                                   long *QRnumber;
                                   StrBufAppendBufPlain(
                                          use_this_RSSAggr->rooms,
                                          qrbuf->QRname,
                                          -1, 0);
                                   if (use_this_RSSAggr->roomlist_parts==1)
                                   {
                                          use_this_RSSAggr->OtherQRnumbers
                                                 = NewHash(1, lFlathash);
                                   }
                                   QRnumber = (long*)malloc(sizeof(long));
                                   *QRnumber = qrbuf->QRnumber;
                                   Put(use_this_RSSAggr->OtherQRnumbers,
                                       LKEY(qrbuf->QRnumber),
                                       QRnumber,
                                       NULL);
                                   use_this_RSSAggr->roomlist_parts++;

                                   pthread_mutex_unlock(&RSSQueueMutex);

                                   FreeStrBuf(&RSSAggr->Url);
                                   free(RSSAggr);
                                   RSSAggr = NULL;
                                   continue;
                            }
                            pthread_mutex_unlock(&RSSQueueMutex);

                            RSSAggr->ItemType = RSS_UNSET;

                            RSSAggr->rooms = NewStrBufPlain(
                                   qrbuf->QRname, -1);

                            pthread_mutex_lock(&RSSQueueMutex);

                            Put(RSSFetchUrls,
                                SKEY(RSSAggr->Url),
                                RSSAggr,
                                DeleteRssCfg);

                            pthread_mutex_unlock(&RSSQueueMutex);
                     }
              }
       }
       if (Count != NULL)
       {
              Count->QRnumber = qrbuf->QRnumber;
              pthread_mutex_lock(&RSSQueueMutex);
              EVRSSQ_syslog(LOG_DEBUG, "client: [%ld] %s now starting.\n",
                           qrbuf->QRnumber, qrbuf->QRname);
              Put(RSSQueueRooms, LKEY(qrbuf->QRnumber), Count, NULL);
              pthread_mutex_unlock(&RSSQueueMutex);
       }
       FreeStrBuf(&CfgData);
       FreeStrBuf(&CfgType);
       FreeStrBuf(&Line);
}

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 232 of file serv_rssclient.c.

{
       long len;
       const char *Key;
       rss_aggregator *RSSAggr = (rss_aggregator *) IO->Data;

       RSSAggr->ThisMsg->Msg.cm_fields['M'] =
              SmashStrBuf(&RSSAggr->ThisMsg->Message);

       CtdlSubmitMsg(&RSSAggr->ThisMsg->Msg, &RSSAggr->recp, NULL, 0);

       /* write the uidl to the use table so we don't store this item again */
       cdb_store(CDB_USETABLE,
                SKEY(RSSAggr->ThisMsg->MsgGUID),
                &RSSAggr->ThisMsg->ut,
                sizeof(struct UseTable) );

       if (GetNextHashPos(RSSAggr->Messages,
                        RSSAggr->Pos,
                        &len, &Key,
                        (void**) &RSSAggr->ThisMsg))
              return NextDBOperation(IO, RSS_FetchNetworkUsetableEntry);
       else
              return eAbort;
}

Here is the call graph for this function:

Here is the caller graph for this function:

void UnlinkRooms ( rss_aggregator RSSAggr)

Definition at line 120 of file serv_rssclient.c.

{
       DeleteRoomReference(RSSAggr->QRnumber);
       if (RSSAggr->OtherQRnumbers != NULL)
       {
              long HKLen;
              const char *HK;
              HashPos *At;
              void *vData;

              At = GetNewHashPos(RSSAggr->OtherQRnumbers, 0);
              while (! server_shutting_down &&
                     GetNextHashPos(RSSAggr->OtherQRnumbers,
                                  At,
                                  &HKLen, &HK,
                                  &vData) &&
                     (vData != NULL))
              {
                     long *lData = (long*) vData;
                     DeleteRoomReference(*lData);
              }

              DeleteHashPos(&At);
       }
}

Here is the call graph for this function:

Here is the caller graph for this function:

void UnlinkRSSAggregator ( rss_aggregator RSSAggr)

Definition at line 146 of file serv_rssclient.c.

{
       HashPos *At;

       pthread_mutex_lock(&RSSQueueMutex);
       UnlinkRooms(RSSAggr);

       At = GetNewHashPos(RSSFetchUrls, 0);
       if (GetHashPosFromKey(RSSFetchUrls, SKEY(RSSAggr->Url), At))
       {
              DeleteEntryFromHash(RSSFetchUrls, At);
       }
       DeleteHashPos(&At);
       last_run = time(NULL);
       pthread_mutex_unlock(&RSSQueueMutex);
}

Here is the call graph for this function:

Here is the caller graph for this function:


Variable Documentation

time_t last_run = 0L

Definition at line 59 of file serv_rssclient.c.

struct rssnetcfg* rnclist = NULL

Definition at line 70 of file serv_rssclient.c.

Definition at line 71 of file serv_rssclient.c.

HashList* RSSFetchUrls = NULL

Definition at line 63 of file serv_rssclient.c.

pthread_mutex_t RSSQueueMutex

Definition at line 61 of file serv_rssclient.c.

HashList* RSSQueueRooms = NULL

Definition at line 62 of file serv_rssclient.c.