Back to index

citadel  8.12
serv_eventclient.c
Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 1998-2012 by the citadel.org team
00003  *
00004  *  This program is open source software; you can redistribute it and/or modify
00005  *  it under the terms of the GNU General Public License version 3.
00006  *  
00007  *  
00008  *
00009  *  This program is distributed in the hope that it will be useful,
00010  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012  *  GNU General Public License for more details.
00013  *
00014  *  
00015  *  
00016  *  
00017  */
00018 
00019 #include "sysdep.h"
00020 #include <stdlib.h>
00021 #include <unistd.h>
00022 #include <stdio.h>
00023 #include <termios.h>
00024 #include <fcntl.h>
00025 #include <signal.h>
00026 #include <pwd.h>
00027 #include <errno.h>
00028 #include <sys/types.h>
00029 #include <syslog.h>
00030 
00031 #if TIME_WITH_SYS_TIME
00032 # include <sys/time.h>
00033 # include <time.h>
00034 #else
00035 # if HAVE_SYS_TIME_H
00036 #  include <sys/time.h>
00037 # else
00038 #  include <time.h>
00039 # endif
00040 #endif
00041 #include <sys/wait.h>
00042 #include <ctype.h>
00043 #include <string.h>
00044 #include <limits.h>
00045 #include <sys/socket.h>
00046 #include <netinet/in.h>
00047 #include <assert.h>
00048 #include <arpa/inet.h>
00049 #include <libcitadel.h>
00050 #include <curl/curl.h>
00051 #include <curl/multi.h>
00052 #include "citadel.h"
00053 #include "server.h"
00054 #include "citserver.h"
00055 #include "support.h"
00056 
00057 #include "ctdl_module.h"
00058 
00059 #include "event_client.h"
00060 #include "serv_curl.h"
00061 
00062 ev_loop *event_base;
00063 int DebugEventLoop = 0;
00064 int DebugEventLoopBacktrace = 0;
00065 int DebugCurl = 0;
00066 
00067 long EvIDSource = 1;
00068 /*****************************************************************************
00069  *                   libevent / curl integration                             *
00070  *****************************************************************************/
00071 #define DBGLOG(LEVEL) if ((LEVEL != LOG_DEBUG) || (DebugCurl != 0))
00072 
00073 #define EVCURL_syslog(LEVEL, FORMAT, ...)                      \
00074        DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT,   \
00075                            IO->ID, CCID, __VA_ARGS__)
00076 
00077 #define EVCURLM_syslog(LEVEL, FORMAT)                                 \
00078        DBGLOG (LEVEL) syslog(LEVEL, "EVCURL:IO[%ld]CC[%d] " FORMAT,   \
00079                            IO->ID, CCID)
00080 
00081 #define CURL_syslog(LEVEL, FORMAT, ...)                               \
00082        DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT, __VA_ARGS__)
00083 
00084 #define CURLM_syslog(LEVEL, FORMAT)                     \
00085        DBGLOG (LEVEL) syslog(LEVEL, "CURL: " FORMAT)
00086 
00087 #define MOPT(s, v)                                             \
00088        do {                                                    \
00089               sta = curl_multi_setopt(mhnd, (CURLMOPT_##s), (v));     \
00090               if (sta) {                                       \
00091                      EVQ_syslog(LOG_ERR, "error setting option "      \
00092                             #s " on curl multi handle: %s\n",  \
00093                             curl_easy_strerror(sta));          \
00094                      exit (1);                                 \
00095               }                                                \
00096        } while (0)
00097 
00098 
00099 typedef struct _evcurl_global_data {
00100        int magic;
00101        CURLM *mhnd;
00102        ev_timer timeev;
00103        int nrun;
00104 } evcurl_global_data;
00105 
00106 ev_async WakeupCurl;
00107 evcurl_global_data global;
00108 
00109 static void
00110 gotstatus(int nnrun)
00111 {
00112        CURLMsg *msg;
00113        int nmsg;
00114 
00115        global.nrun = nnrun;
00116 
00117        CURLM_syslog(LOG_DEBUG,
00118                    "gotstatus(): about to call curl_multi_info_read\n");
00119        while ((msg = curl_multi_info_read(global.mhnd, &nmsg))) {
00120               CURL_syslog(LOG_DEBUG,
00121                          "got curl multi_info message msg=%d\n",
00122                          msg->msg);
00123 
00124               if (CURLMSG_DONE == msg->msg) {
00125                      CURL *chnd;
00126                      char *chandle = NULL;
00127                      CURLcode sta;
00128                      CURLMcode msta;
00129                      AsyncIO*IO;
00130 
00131                      chandle = NULL;;
00132                      chnd = msg->easy_handle;
00133                      sta = curl_easy_getinfo(chnd,
00134                                           CURLINFO_PRIVATE,
00135                                           &chandle);
00136                      if (sta) {
00137                             syslog(LOG_ERR,
00138                                    "error asking curl for private"
00139                                    " cookie of curl handle: %s\n",
00140                                    curl_easy_strerror(sta));
00141                             continue;
00142                      }
00143                      IO = (AsyncIO *)chandle;
00144                      if (IO->ID == 0) {
00145                             EVCURL_syslog(LOG_ERR,
00146                                          "Error, invalid IO context %p\n",
00147                                          IO);
00148                             continue;
00149                      }
00150 
00151                      EVCURLM_syslog(LOG_DEBUG, "request complete\n");
00152 
00153                      IO->Now = ev_now(event_base);
00154 
00155                      ev_io_stop(event_base, &IO->recv_event);
00156                      ev_io_stop(event_base, &IO->send_event);
00157 
00158                      sta = msg->data.result;
00159                      if (sta) {
00160                             EVCURL_syslog(LOG_ERR,
00161                                          "error description: %s\n",
00162                                          IO->HttpReq.errdesc);
00163                             EVCURL_syslog(LOG_ERR,
00164                                          "error performing request: %s\n",
00165                                          curl_easy_strerror(sta));
00166                      }
00167                      sta = curl_easy_getinfo(chnd,
00168                                           CURLINFO_RESPONSE_CODE,
00169                                           &IO->HttpReq.httpcode);
00170                      if (sta)
00171                             EVCURL_syslog(LOG_ERR,
00172                                          "error asking curl for "
00173                                          "response code from request: %s\n",
00174                                          curl_easy_strerror(sta));
00175                      EVCURL_syslog(LOG_DEBUG,
00176                                   "http response code was %ld\n",
00177                                   (long)IO->HttpReq.httpcode);
00178 
00179 
00180                      curl_slist_free_all(IO->HttpReq.headers);
00181                      msta = curl_multi_remove_handle(global.mhnd, chnd);
00182                      if (msta)
00183                             EVCURL_syslog(LOG_ERR,
00184                                          "warning problem detaching "
00185                                          "completed handle from curl multi: "
00186                                          "%s\n",
00187                                          curl_multi_strerror(msta));
00188 
00189                      ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
00190 
00191                      IO->HttpReq.attached = 0;
00192                      switch(IO->SendDone(IO))
00193                      {
00194                      case eDBQuery:
00195                             curl_easy_cleanup(IO->HttpReq.chnd);
00196                             IO->HttpReq.chnd = NULL;
00197                             break;
00198                      case eSendDNSQuery:
00199                      case eReadDNSReply:
00200                      case eConnect:
00201                      case eSendReply:
00202                      case eSendMore:
00203                      case eSendFile:
00204                      case eReadMessage:
00205                      case eReadMore:
00206                      case eReadPayload:
00207                      case eReadFile:
00208                             curl_easy_cleanup(IO->HttpReq.chnd);
00209                             IO->HttpReq.chnd = NULL;
00210                             break;
00211                      case eTerminateConnection:
00212                      case eAbort:
00213                             curl_easy_cleanup(IO->HttpReq.chnd);
00214                             IO->HttpReq.chnd = NULL;
00215                             FreeStrBuf(&IO->HttpReq.ReplyData);
00216                             FreeURL(&IO->ConnectMe);
00217                             RemoveContext(IO->CitContext);
00218                             IO->Terminate(IO);
00219                      }
00220               }
00221        }
00222 }
00223 
00224 static void
00225 stepmulti(void *data, curl_socket_t fd, int which)
00226 {
00227        int running_handles = 0;
00228        CURLMcode msta;
00229 
00230        msta = curl_multi_socket_action(global.mhnd,
00231                                    fd,
00232                                    which,
00233                                    &running_handles);
00234 
00235        CURLM_syslog(LOG_DEBUG, "stepmulti(): calling gotstatus()\n");
00236        if (msta)
00237               CURL_syslog(LOG_ERR,
00238                          "error in curl processing events"
00239                          "on multi handle, fd %d: %s\n",
00240                          (int)fd,
00241                          curl_multi_strerror(msta));
00242 
00243        if (global.nrun != running_handles)
00244               gotstatus(running_handles);
00245 }
00246 
00247 static void
00248 gottime(struct ev_loop *loop, ev_timer *timeev, int events)
00249 {
00250        CURLM_syslog(LOG_DEBUG, "EVCURL: waking up curl for timeout\n");
00251        stepmulti(NULL, CURL_SOCKET_TIMEOUT, 0);
00252 }
00253 
00254 static void
00255 got_in(struct ev_loop *loop, ev_io *ioev, int events)
00256 {
00257        CURL_syslog(LOG_DEBUG,
00258                   "EVCURL: waking up curl for io on fd %d\n",
00259                   (int)ioev->fd);
00260 
00261        stepmulti(ioev->data, ioev->fd, CURL_CSELECT_IN);
00262 }
00263 
00264 static void
00265 got_out(struct ev_loop *loop, ev_io *ioev, int events)
00266 {
00267        CURL_syslog(LOG_DEBUG,
00268                   "waking up curl for io on fd %d\n",
00269                   (int)ioev->fd);
00270 
00271        stepmulti(ioev->data, ioev->fd, CURL_CSELECT_OUT);
00272 }
00273 
00274 static size_t
00275 gotdata(void *data, size_t size, size_t nmemb, void *cglobal) {
00276        AsyncIO *IO = (AsyncIO*) cglobal;
00277 
00278        if (IO->HttpReq.ReplyData == NULL)
00279        {
00280               IO->HttpReq.ReplyData = NewStrBufPlain(NULL, SIZ);
00281        }
00282        IO->Now = ev_now(event_base);
00283        return CurlFillStrBuf_callback(data,
00284                                    size,
00285                                    nmemb,
00286                                    IO->HttpReq.ReplyData);
00287 }
00288 
00289 static int
00290 gotwatchtime(CURLM *multi, long tblock_ms, void *cglobal) {
00291        CURL_syslog(LOG_DEBUG, "EVCURL: gotwatchtime called %ld ms\n", tblock_ms);
00292        evcurl_global_data *global = cglobal;
00293        ev_timer_stop(EV_DEFAULT, &global->timeev);
00294        if (tblock_ms < 0 || 14000 < tblock_ms)
00295               tblock_ms = 14000;
00296        ev_timer_set(&global->timeev, 0.5e-3 + 1.0e-3 * tblock_ms, 14.0);
00297        ev_timer_start(EV_DEFAULT_UC, &global->timeev);
00298        curl_multi_perform(global, &global->nrun);
00299        return 0;
00300 }
00301 
00302 static int
00303 gotwatchsock(CURL *easy,
00304             curl_socket_t fd,
00305             int action,
00306             void *cglobal,
00307             void *vIO)
00308 {
00309        evcurl_global_data *global = cglobal;
00310        CURLM *mhnd = global->mhnd;
00311        char *f;
00312        AsyncIO *IO = (AsyncIO*) vIO;
00313        CURLcode sta;
00314        const char *Action;
00315 
00316        if (IO == NULL) {
00317               sta = curl_easy_getinfo(easy, CURLINFO_PRIVATE, &f);
00318               if (sta) {
00319                      CURL_syslog(LOG_ERR,
00320                                 "EVCURL: error asking curl for private "
00321                                 "cookie of curl handle: %s\n",
00322                                 curl_easy_strerror(sta));
00323                      return -1;
00324               }
00325               IO = (AsyncIO *) f;
00326               EVCURL_syslog(LOG_DEBUG,
00327                            "EVCURL: got socket for URL: %s\n",
00328                            IO->ConnectMe->PlainUrl);
00329 
00330               if (IO->SendBuf.fd != 0)
00331               {
00332                      ev_io_stop(event_base, &IO->recv_event);
00333                      ev_io_stop(event_base, &IO->send_event);
00334               }
00335               IO->SendBuf.fd = fd;
00336               ev_io_init(&IO->recv_event, &got_in, fd, EV_READ);
00337               ev_io_init(&IO->send_event, &got_out, fd, EV_WRITE);
00338               curl_multi_assign(mhnd, fd, IO);
00339        }
00340 
00341        IO->Now = ev_now(event_base);
00342 
00343        Action = "";
00344        switch (action)
00345        {
00346        case CURL_POLL_NONE:
00347               Action = "CURL_POLL_NONE";
00348               break;
00349        case CURL_POLL_REMOVE:
00350               Action = "CURL_POLL_REMOVE";
00351               break;
00352        case CURL_POLL_IN:
00353               Action = "CURL_POLL_IN";
00354               break;
00355        case CURL_POLL_OUT:
00356               Action = "CURL_POLL_OUT";
00357               break;
00358        case CURL_POLL_INOUT:
00359               Action = "CURL_POLL_INOUT";
00360               break;
00361        }
00362 
00363 
00364        EVCURL_syslog(LOG_DEBUG,
00365                     "EVCURL: gotwatchsock called fd=%d action=%s[%d]\n",
00366                     (int)fd, Action, action);
00367 
00368        switch (action)
00369        {
00370        case CURL_POLL_NONE:
00371               EVCURLM_syslog(LOG_DEBUG,
00372                             "called first time "
00373                             "to register this sockwatcker\n");
00374               break;
00375        case CURL_POLL_REMOVE:
00376               EVCURLM_syslog(LOG_DEBUG,
00377                             "called last time to unregister "
00378                             "this sockwatcher\n");
00379               ev_io_stop(event_base, &IO->recv_event);
00380               ev_io_stop(event_base, &IO->send_event);
00381               break;
00382        case CURL_POLL_IN:
00383               ev_io_start(event_base, &IO->recv_event);
00384               ev_io_stop(event_base, &IO->send_event);
00385               break;
00386        case CURL_POLL_OUT:
00387               ev_io_start(event_base, &IO->send_event);
00388               ev_io_stop(event_base, &IO->recv_event);
00389               break;
00390        case CURL_POLL_INOUT:
00391               ev_io_start(event_base, &IO->send_event);
00392               ev_io_start(event_base, &IO->recv_event);
00393               break;
00394        }
00395        return 0;
00396 }
00397 
00398 void curl_init_connectionpool(void)
00399 {
00400        CURLM *mhnd ;
00401 
00402        ev_timer_init(&global.timeev, &gottime, 14.0, 14.0);
00403        global.timeev.data = (void *)&global;
00404        global.nrun = -1;
00405        CURLcode sta = curl_global_init(CURL_GLOBAL_ALL);
00406 
00407        if (sta)
00408        {
00409               CURL_syslog(LOG_ERR,
00410                          "error initializing curl library: %s\n",
00411                          curl_easy_strerror(sta));
00412 
00413               exit(1);
00414        }
00415        mhnd = global.mhnd = curl_multi_init();
00416        if (!mhnd)
00417        {
00418               CURLM_syslog(LOG_ERR,
00419                           "error initializing curl multi handle\n");
00420               exit(3);
00421        }
00422 
00423        MOPT(SOCKETFUNCTION, &gotwatchsock);
00424        MOPT(SOCKETDATA, (void *)&global);
00425        MOPT(TIMERFUNCTION, &gotwatchtime);
00426        MOPT(TIMERDATA, (void *)&global);
00427 
00428        return;
00429 }
00430 
00431 int evcurl_init(AsyncIO *IO)
00432 {
00433        CURLcode sta;
00434        CURL *chnd;
00435 
00436        EVCURLM_syslog(LOG_DEBUG, "EVCURL: evcurl_init called ms\n");
00437        IO->HttpReq.attached = 0;
00438        chnd = IO->HttpReq.chnd = curl_easy_init();
00439        if (!chnd)
00440        {
00441               EVCURLM_syslog(LOG_ERR, "EVCURL: error initializing curl handle\n");
00442               return 0;
00443        }
00444 
00445 #if DEBUG
00446        OPT(VERBOSE, (long)1);
00447 #endif
00448        OPT(NOPROGRESS, 1L);
00449 
00450        OPT(NOSIGNAL, 1L);
00451        OPT(FAILONERROR, (long)1);
00452        OPT(ENCODING, "");
00453        OPT(FOLLOWLOCATION, (long)0);
00454        OPT(MAXREDIRS, (long)0);
00455        OPT(USERAGENT, CITADEL);
00456 
00457        OPT(TIMEOUT, (long)1800);
00458        OPT(LOW_SPEED_LIMIT, (long)64);
00459        OPT(LOW_SPEED_TIME, (long)600);
00460        OPT(CONNECTTIMEOUT, (long)600);
00461        OPT(PRIVATE, (void *)IO);
00462 
00463        OPT(FORBID_REUSE, 1);
00464        OPT(WRITEFUNCTION, &gotdata);
00465        OPT(WRITEDATA, (void *)IO);
00466        OPT(ERRORBUFFER, IO->HttpReq.errdesc);
00467 
00468        if ((!IsEmptyStr(config.c_ip_addr))
00469               && (strcmp(config.c_ip_addr, "*"))
00470               && (strcmp(config.c_ip_addr, "::"))
00471               && (strcmp(config.c_ip_addr, "0.0.0.0"))
00472               )
00473        {
00474               OPT(INTERFACE, config.c_ip_addr);
00475        }
00476 
00477 #ifdef CURLOPT_HTTP_CONTENT_DECODING
00478        OPT(HTTP_CONTENT_DECODING, 1);
00479        OPT(ENCODING, "");
00480 #endif
00481 
00482        IO->HttpReq.headers = curl_slist_append(IO->HttpReq.headers,
00483                                           "Connection: close");
00484 
00485        return 1;
00486 }
00487 
00488 
00489 static void IOcurl_abort_shutdown_callback(struct ev_loop *loop,
00490                                       ev_cleanup *watcher,
00491                                       int revents)
00492 {
00493        CURLMcode msta;
00494        AsyncIO *IO = watcher->data;
00495 
00496        if (IO == NULL)
00497               return;
00498        IO->Now = ev_now(event_base);
00499        EVCURL_syslog(LOG_DEBUG, "EVENT Curl: %s\n", __FUNCTION__);
00500 
00501        curl_slist_free_all(IO->HttpReq.headers);
00502        msta = curl_multi_remove_handle(global.mhnd, IO->HttpReq.chnd);
00503        if (msta)
00504        {
00505               EVCURL_syslog(LOG_ERR,
00506                            "EVCURL: warning problem detaching completed handle "
00507                            "from curl multi: %s\n",
00508                            curl_multi_strerror(msta));
00509        }
00510 
00511        curl_easy_cleanup(IO->HttpReq.chnd);
00512        IO->HttpReq.chnd = NULL;
00513        ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
00514        ev_io_stop(event_base, &IO->recv_event);
00515        ev_io_stop(event_base, &IO->send_event);
00516        assert(IO->ShutdownAbort);
00517        IO->ShutdownAbort(IO);
00518 }
00519 eNextState
00520 evcurl_handle_start(AsyncIO *IO)
00521 {
00522        CURLMcode msta;
00523        CURLcode sta;
00524        CURL *chnd;
00525 
00526        chnd = IO->HttpReq.chnd;
00527        EVCURL_syslog(LOG_DEBUG,
00528                 "EVCURL: Loading URL: %s\n", IO->ConnectMe->PlainUrl);
00529        OPT(URL, IO->ConnectMe->PlainUrl);
00530        if (StrLength(IO->ConnectMe->CurlCreds))
00531        {
00532               OPT(HTTPAUTH, (long)CURLAUTH_BASIC);
00533               OPT(USERPWD, ChrPtr(IO->ConnectMe->CurlCreds));
00534        }
00535        if (StrLength(IO->HttpReq.PostData) > 0)
00536        {
00537               OPT(POSTFIELDS, ChrPtr(IO->HttpReq.PostData));
00538               OPT(POSTFIELDSIZE, StrLength(IO->HttpReq.PostData));
00539 
00540        }
00541        else if ((IO->HttpReq.PlainPostDataLen != 0) &&
00542                (IO->HttpReq.PlainPostData != NULL))
00543        {
00544               OPT(POSTFIELDS, IO->HttpReq.PlainPostData);
00545               OPT(POSTFIELDSIZE, IO->HttpReq.PlainPostDataLen);
00546        }
00547        OPT(HTTPHEADER, IO->HttpReq.headers);
00548 
00549        IO->NextState = eConnect;
00550        EVCURLM_syslog(LOG_DEBUG, "EVCURL: attaching to curl multi handle\n");
00551        msta = curl_multi_add_handle(global.mhnd, IO->HttpReq.chnd);
00552        if (msta)
00553        {
00554               EVCURL_syslog(LOG_ERR,
00555                        "EVCURL: error attaching to curl multi handle: %s\n",
00556                        curl_multi_strerror(msta));
00557        }
00558 
00559        IO->HttpReq.attached = 1;
00560        ev_async_send (event_base, &WakeupCurl);
00561 
00562        ev_cleanup_init(&IO->abort_by_shutdown,
00563                      IOcurl_abort_shutdown_callback);
00564 
00565        ev_cleanup_start(event_base, &IO->abort_by_shutdown);
00566 
00567        return eReadMessage;
00568 }
00569 
00570 static void WakeupCurlCallback(EV_P_ ev_async *w, int revents)
00571 {
00572        CURLM_syslog(LOG_DEBUG, "waking up curl multi handle\n");
00573 
00574        curl_multi_perform(&global, CURL_POLL_NONE);
00575 }
00576 
00577 static void evcurl_shutdown (void)
00578 {
00579        curl_global_cleanup();
00580        curl_multi_cleanup(global.mhnd);
00581        CURLM_syslog(LOG_DEBUG, "exiting\n");
00582 }
00583 /*****************************************************************************
00584  *                       libevent integration                                *
00585  *****************************************************************************/
00586 /*
00587  * client event queue plus its methods.
00588  * this currently is the main loop (which may change in some future?)
00589  */
00590 int evbase_count = 0;
00591 pthread_mutex_t EventQueueMutex; /* locks the access to the following vars: */
00592 pthread_mutex_t EventExitQueueMutex; /* locks the access to the event queue pointer on exit. */
00593 HashList *QueueEvents = NULL;
00594 HashList *InboundEventQueue = NULL;
00595 HashList *InboundEventQueues[2] = { NULL, NULL };
00596 
00597 ev_async AddJob;
00598 ev_async ExitEventLoop;
00599 
00600 static void QueueEventAddCallback(EV_P_ ev_async *w, int revents)
00601 {
00602        CitContext *Ctx;
00603        ev_tstamp Now;
00604        HashList *q;
00605        void *v;
00606        HashPos*It;
00607        long len;
00608        const char *Key;
00609 
00610        /* get the control command... */
00611        pthread_mutex_lock(&EventQueueMutex);
00612 
00613        if (InboundEventQueues[0] == InboundEventQueue) {
00614               InboundEventQueue = InboundEventQueues[1];
00615               q = InboundEventQueues[0];
00616        }
00617        else {
00618               InboundEventQueue = InboundEventQueues[0];
00619               q = InboundEventQueues[1];
00620        }
00621        pthread_mutex_unlock(&EventQueueMutex);
00622        Now = ev_now (event_base);
00623        It = GetNewHashPos(q, 0);
00624        while (GetNextHashPos(q, It, &len, &Key, &v))
00625        {
00626               IOAddHandler *h = v;
00627               if (h->IO->ID == 0) {
00628                      h->IO->ID = EvIDSource++;
00629               }
00630               if (h->IO->StartIO == 0.0)
00631                      h->IO->StartIO = Now;
00632 
00633               Ctx = h->IO->CitContext;
00634               become_session(Ctx);
00635 
00636               h->IO->Now = Now;
00637               h->EvAttch(h->IO);
00638        }
00639        DeleteHashPos(&It);
00640        DeleteHashContent(&q);
00641        EVQM_syslog(LOG_DEBUG, "EVENT Q Add done.\n");
00642 }
00643 
00644 
00645 static void EventExitCallback(EV_P_ ev_async *w, int revents)
00646 {
00647        ev_break(event_base, EVBREAK_ALL);
00648 
00649        EVQM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
00650 }
00651 
00652 
00653 
00654 void InitEventQueue(void)
00655 {
00656        pthread_mutex_init(&EventQueueMutex, NULL);
00657        pthread_mutex_init(&EventExitQueueMutex, NULL);
00658 
00659        QueueEvents = NewHash(1, Flathash);
00660        InboundEventQueues[0] = NewHash(1, Flathash);
00661        InboundEventQueues[1] = NewHash(1, Flathash);
00662        InboundEventQueue = InboundEventQueues[0];
00663 }
00664 extern void CtdlDestroyEVCleanupHooks(void);
00665 
00666 extern int EVQShutDown;
00667 /*
00668  * this thread operates the select() etc. via libev.
00669  */
00670 void *client_event_thread(void *arg) 
00671 {
00672        struct CitContext libev_client_CC;
00673 
00674        CtdlFillSystemContext(&libev_client_CC, "LibEv Thread");
00675 //     citthread_setspecific(MyConKey, (void *)&smtp_queue_CC);
00676        EVQM_syslog(LOG_DEBUG, "client_event_thread() initializing\n");
00677 
00678        event_base = ev_default_loop (EVFLAG_AUTO);
00679        ev_async_init(&AddJob, QueueEventAddCallback);
00680        ev_async_start(event_base, &AddJob);
00681        ev_async_init(&ExitEventLoop, EventExitCallback);
00682        ev_async_start(event_base, &ExitEventLoop);
00683        ev_async_init(&WakeupCurl, WakeupCurlCallback);
00684        ev_async_start(event_base, &WakeupCurl);
00685 
00686        curl_init_connectionpool();
00687 
00688        ev_run (event_base, 0);
00689 
00690        EVQM_syslog(LOG_DEBUG, "client_event_thread() exiting\n");
00691 
00693        pthread_mutex_lock(&EventExitQueueMutex);
00694        ev_loop_destroy (EV_DEFAULT_UC);
00695        event_base = NULL;
00696        DeleteHash(&QueueEvents);
00697        InboundEventQueue = NULL;
00698        DeleteHash(&InboundEventQueues[0]);
00699        DeleteHash(&InboundEventQueues[1]);
00700 /*     citthread_mutex_destroy(&EventQueueMutex); TODO */
00701        evcurl_shutdown();
00702 
00703        CtdlDestroyEVCleanupHooks();
00704 
00705        pthread_mutex_unlock(&EventExitQueueMutex);
00706        EVQShutDown = 1;
00707        return(NULL);
00708 }
00709 
00710 /*----------------------------------------------------------------------------*/
00711 /*
00712  * DB-Queue; does async bdb operations.
00713  * has its own set of handlers.
00714  */
00715 ev_loop *event_db;
00716 int evdb_count = 0;
00717 pthread_mutex_t DBEventQueueMutex; /* locks the access to the following vars: */
00718 pthread_mutex_t DBEventExitQueueMutex; /* locks the access to the db-event queue pointer on exit. */
00719 HashList *DBQueueEvents = NULL;
00720 HashList *DBInboundEventQueue = NULL;
00721 HashList *DBInboundEventQueues[2] = { NULL, NULL };
00722 
00723 ev_async DBAddJob;
00724 ev_async DBExitEventLoop;
00725 
00726 extern void ShutDownDBCLient(AsyncIO *IO);
00727 
00728 static void DBQueueEventAddCallback(EV_P_ ev_async *w, int revents)
00729 {
00730        CitContext *Ctx;
00731        ev_tstamp Now;
00732        HashList *q;
00733        void *v;
00734        HashPos *It;
00735        long len;
00736        const char *Key;
00737 
00738        /* get the control command... */
00739        pthread_mutex_lock(&DBEventQueueMutex);
00740 
00741        if (DBInboundEventQueues[0] == DBInboundEventQueue) {
00742               DBInboundEventQueue = DBInboundEventQueues[1];
00743               q = DBInboundEventQueues[0];
00744        }
00745        else {
00746               DBInboundEventQueue = DBInboundEventQueues[0];
00747               q = DBInboundEventQueues[1];
00748        }
00749        pthread_mutex_unlock(&DBEventQueueMutex);
00750 
00751        Now = ev_now (event_db);
00752        It = GetNewHashPos(q, 0);
00753        while (GetNextHashPos(q, It, &len, &Key, &v))
00754        {
00755               IOAddHandler *h = v;
00756               eNextState rc;
00757               if (h->IO->ID == 0)
00758                      h->IO->ID = EvIDSource++;
00759               if (h->IO->StartDB == 0.0)
00760                      h->IO->StartDB = Now;
00761               h->IO->Now = Now;
00762 
00763               Ctx = h->IO->CitContext;
00764               become_session(Ctx);
00765               ev_cleanup_start(event_db, &h->IO->db_abort_by_shutdown);
00766               rc = h->EvAttch(h->IO);
00767               switch (rc)
00768               {
00769               case eAbort:
00770                      ShutDownDBCLient(h->IO);
00771               default:
00772                      break;
00773               }
00774        }
00775        DeleteHashPos(&It);
00776        DeleteHashContent(&q);
00777        EVQM_syslog(LOG_DEBUG, "DBEVENT Q Add done.\n");
00778 }
00779 
00780 
00781 static void DBEventExitCallback(EV_P_ ev_async *w, int revents)
00782 {
00783        EVQM_syslog(LOG_DEBUG, "DB EVENT Q exiting.\n");
00784        ev_break(event_db, EVBREAK_ALL);
00785 }
00786 
00787 
00788 
00789 void DBInitEventQueue(void)
00790 {
00791        pthread_mutex_init(&DBEventQueueMutex, NULL);
00792        pthread_mutex_init(&DBEventExitQueueMutex, NULL);
00793 
00794        DBQueueEvents = NewHash(1, Flathash);
00795        DBInboundEventQueues[0] = NewHash(1, Flathash);
00796        DBInboundEventQueues[1] = NewHash(1, Flathash);
00797        DBInboundEventQueue = DBInboundEventQueues[0];
00798 }
00799 
00800 /*
00801  * this thread operates writing to the message database via libev.
00802  */
00803 void *db_event_thread(void *arg)
00804 {
00805        ev_loop *tmp;
00806        struct CitContext libev_msg_CC;
00807 
00808        CtdlFillSystemContext(&libev_msg_CC, "LibEv DB IO Thread");
00809 
00810        EVQM_syslog(LOG_DEBUG, "dbevent_thread() initializing\n");
00811 
00812        tmp = event_db = ev_loop_new (EVFLAG_AUTO);
00813 
00814        ev_async_init(&DBAddJob, DBQueueEventAddCallback);
00815        ev_async_start(event_db, &DBAddJob);
00816        ev_async_init(&DBExitEventLoop, DBEventExitCallback);
00817        ev_async_start(event_db, &DBExitEventLoop);
00818 
00819        ev_run (event_db, 0);
00820 
00821        pthread_mutex_lock(&DBEventExitQueueMutex);
00822 
00823        event_db = NULL;
00824        EVQM_syslog(LOG_INFO, "dbevent_thread() exiting\n");
00825 
00826        DeleteHash(&DBQueueEvents);
00827        DBInboundEventQueue = NULL;
00828        DeleteHash(&DBInboundEventQueues[0]);
00829        DeleteHash(&DBInboundEventQueues[1]);
00830 
00831 /*     citthread_mutex_destroy(&DBEventQueueMutex); TODO */
00832 
00833        ev_loop_destroy (tmp);
00834        pthread_mutex_unlock(&DBEventExitQueueMutex);
00835        return(NULL);
00836 }
00837 
00838 void ShutDownEventQueues(void)
00839 {
00840        EVQM_syslog(LOG_DEBUG, "EVENT Qs triggering exits.\n");
00841 
00842        pthread_mutex_lock(&DBEventQueueMutex);
00843        ev_async_send (event_db, &DBExitEventLoop);
00844        pthread_mutex_unlock(&DBEventQueueMutex);
00845 
00846        pthread_mutex_lock(&EventQueueMutex);
00847        ev_async_send (EV_DEFAULT_ &ExitEventLoop);
00848        pthread_mutex_unlock(&EventQueueMutex);
00849 }
00850 
00851 void DebugEventloopEnable(const int n)
00852 {
00853        DebugEventLoop = n;
00854 }
00855 void DebugEventloopBacktraceEnable(const int n)
00856 {
00857        DebugEventLoopBacktrace = n;
00858 }
00859 
00860 void DebugCurlEnable(const int n)
00861 {
00862        DebugCurl = n;
00863 }
00864 
00865 CTDL_MODULE_INIT(event_client)
00866 {
00867        if (!threading)
00868        {
00869               CtdlRegisterDebugFlagHook(HKEY("eventloop"), DebugEventloopEnable, &DebugEventLoop);
00870               CtdlRegisterDebugFlagHook(HKEY("eventloopbacktrace"), DebugEventloopBacktraceEnable, &DebugEventLoopBacktrace);
00871               CtdlRegisterDebugFlagHook(HKEY("curl"), DebugCurlEnable, &DebugCurl);
00872               InitEventQueue();
00873               DBInitEventQueue();
00874               CtdlThreadCreate(client_event_thread);
00875               CtdlThreadCreate(db_event_thread);
00876        }
00877        return "event";
00878 }