Back to index

citadel  8.12
event_client.c
Go to the documentation of this file.
00001 /*
00002  *
00003  * Copyright (c) 1998-2012 by the citadel.org team
00004  *
00005  *  This program is open source software; you can redistribute it and/or modify
00006  *  it under the terms of the GNU General Public License as published by
00007  *  the Free Software Foundation; either version 3 of the License, or
00008  *  (at your option) any later version.
00009  *
00010  *  This program is distributed in the hope that it will be useful,
00011  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00013  *  GNU General Public License for more details.
00014  *
00015  *  You should have received a copy of the GNU General Public License
00016  *  along with this program; if not, write to the Free Software
00017  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00018  */
00019 
00020 #include "sysdep.h"
00021 #include <stdlib.h>
00022 #include <unistd.h>
00023 #include <stdio.h>
00024 #include <termios.h>
00025 #include <fcntl.h>
00026 #include <signal.h>
00027 #include <pwd.h>
00028 #include <errno.h>
00029 #include <sys/types.h>
00030 #include <syslog.h>
00031 
00032 #if TIME_WITH_SYS_TIME
00033 # include <sys/time.h>
00034 # include <time.h>
00035 #else
00036 # if HAVE_SYS_TIME_H
00037 #  include <sys/time.h>
00038 # else
00039 #  include <time.h>
00040 # endif
00041 #endif
00042 #include <sys/wait.h>
00043 #include <ctype.h>
00044 #include <string.h>
00045 #include <limits.h>
00046 #include <sys/socket.h>
00047 #include <netinet/in.h>
00048 #include <arpa/inet.h>
00049 #include <assert.h>
00050 #if HAVE_BACKTRACE
00051 #include <execinfo.h>
00052 #endif
00053 
00054 #include <libcitadel.h>
00055 #include "citadel.h"
00056 #include "server.h"
00057 #include "citserver.h"
00058 #include "support.h"
00059 #include "config.h"
00060 #include "control.h"
00061 #include "user_ops.h"
00062 #include "database.h"
00063 #include "msgbase.h"
00064 #include "internet_addressing.h"
00065 #include "genstamp.h"
00066 #include "domain.h"
00067 #include "clientsocket.h"
00068 #include "locate_host.h"
00069 #include "citadel_dirs.h"
00070 
00071 #include "event_client.h"
00072 #include "ctdl_module.h"
00073 
00074 static void IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents);
00075 static void IO_abort_shutdown_callback(struct ev_loop *loop,
00076                                    ev_cleanup *watcher,
00077                                    int revents);
00078 
00079 
00080 /*------------------------------------------------------------------------------
00081  *                          Server DB IO
00082  *----------------------------------------------------------------------------*/
00083 extern int evdb_count;
00084 extern pthread_mutex_t DBEventQueueMutex;
00085 extern pthread_mutex_t DBEventExitQueueMutex;
00086 extern HashList *DBInboundEventQueue;
00087 extern struct ev_loop *event_db;
00088 extern ev_async DBAddJob;
00089 extern ev_async DBExitEventLoop;
00090 
00091 eNextState QueueDBOperation(AsyncIO *IO, IO_CallBack CB)
00092 {
00093        IOAddHandler *h;
00094        int i;
00095 
00096        h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
00097        h->IO = IO;
00098        h->EvAttch = CB;
00099        ev_cleanup_init(&IO->db_abort_by_shutdown,
00100                      IO_abort_shutdown_callback);
00101        IO->db_abort_by_shutdown.data = IO;
00102 
00103        pthread_mutex_lock(&DBEventQueueMutex);
00104        if (DBInboundEventQueue == NULL)
00105        {
00106               /* shutting down... */
00107               free(h);
00108               EVM_syslog(LOG_DEBUG, "DBEVENT Q exiting.\n");
00109               pthread_mutex_unlock(&DBEventQueueMutex);
00110               return eAbort;
00111        }
00112        EVM_syslog(LOG_DEBUG, "DBEVENT Q\n");
00113        i = ++evdb_count ;
00114        Put(DBInboundEventQueue, IKEY(i), h, NULL);
00115        pthread_mutex_unlock(&DBEventQueueMutex);
00116 
00117        pthread_mutex_lock(&DBEventExitQueueMutex);
00118        if (event_db == NULL)
00119        {
00120               pthread_mutex_unlock(&DBEventExitQueueMutex);
00121               return eAbort;
00122        }
00123        ev_async_send (event_db, &DBAddJob);
00124        pthread_mutex_unlock(&DBEventExitQueueMutex);
00125 
00126        EVM_syslog(LOG_DEBUG, "DBEVENT Q Done.\n");
00127        return eDBQuery;
00128 }
00129 
00130 void StopDBWatchers(AsyncIO *IO)
00131 {
00132        ev_cleanup_stop(event_db, &IO->db_abort_by_shutdown);
00133        ev_idle_stop(event_db, &IO->db_unwind_stack);
00134 }
00135 
00136 void ShutDownDBCLient(AsyncIO *IO)
00137 {
00138        CitContext *Ctx =IO->CitContext;
00139        become_session(Ctx);
00140 
00141        EVM_syslog(LOG_DEBUG, "DBEVENT Terminating.\n");
00142        StopDBWatchers(IO);
00143 
00144        assert(IO->DBTerminate);
00145        IO->DBTerminate(IO);
00146 }
00147 
00148 void
00149 DB_PerformNext(struct ev_loop *loop, ev_idle *watcher, int revents)
00150 {
00151        AsyncIO *IO = watcher->data;
00152        IO->Now = ev_now(event_db);
00153        EV_syslog(LOG_DEBUG, "%s()", __FUNCTION__);
00154        become_session(IO->CitContext);
00155 
00156        ev_idle_stop(event_db, &IO->db_unwind_stack);
00157 
00158        assert(IO->NextDBOperation);
00159        switch (IO->NextDBOperation(IO))
00160        {
00161        case eDBQuery:
00162               break;
00163        case eSendDNSQuery:
00164        case eReadDNSReply:
00165        case eConnect:
00166        case eSendReply:
00167        case eSendMore:
00168        case eSendFile:
00169        case eReadMessage:
00170        case eReadMore:
00171        case eReadPayload:
00172        case eReadFile:
00173               ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
00174               break;
00175        case eTerminateConnection:
00176        case eAbort:
00177               ev_idle_stop(event_db, &IO->db_unwind_stack);
00178               ev_cleanup_stop(loop, &IO->db_abort_by_shutdown);
00179               ShutDownDBCLient(IO);
00180        }
00181 }
00182 
00183 eNextState NextDBOperation(AsyncIO *IO, IO_CallBack CB)
00184 {
00185        IO->NextDBOperation = CB;
00186        ev_idle_init(&IO->db_unwind_stack,
00187                    DB_PerformNext);
00188        IO->db_unwind_stack.data = IO;
00189        ev_idle_start(event_db, &IO->db_unwind_stack);
00190        return eDBQuery;
00191 }
00192 
00193 /*------------------------------------------------------------------------------
00194  *                   Client IO
00195  *----------------------------------------------------------------------------*/
00196 extern int evbase_count;
00197 extern pthread_mutex_t EventQueueMutex;
00198 extern pthread_mutex_t EventExitQueueMutex; 
00199 extern HashList *InboundEventQueue;
00200 extern struct ev_loop *event_base;
00201 extern ev_async AddJob;
00202 extern ev_async ExitEventLoop;
00203 
00204 static void IO_abort_shutdown_callback(struct ev_loop *loop,
00205                                    ev_cleanup *watcher,
00206                                    int revents)
00207 {
00208        AsyncIO *IO = watcher->data;
00209        EV_syslog(LOG_DEBUG, "EVENT Q: %s\n", __FUNCTION__);
00210        IO->Now = ev_now(event_base);
00211        assert(IO->ShutdownAbort);
00212        IO->ShutdownAbort(IO);
00213 }
00214 
00215 
00216 eNextState QueueEventContext(AsyncIO *IO, IO_CallBack CB)
00217 {
00218        IOAddHandler *h;
00219        int i;
00220 
00221        h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
00222        h->IO = IO;
00223        h->EvAttch = CB;
00224        ev_cleanup_init(&IO->abort_by_shutdown,
00225                      IO_abort_shutdown_callback);
00226        IO->abort_by_shutdown.data = IO;
00227 
00228        pthread_mutex_lock(&EventQueueMutex);
00229        if (InboundEventQueue == NULL)
00230        {
00231               free(h);
00232               /* shutting down... */
00233               EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
00234               pthread_mutex_unlock(&EventQueueMutex);
00235               return eAbort;
00236        }
00237        EVM_syslog(LOG_DEBUG, "EVENT Q\n");
00238        i = ++evbase_count;
00239        Put(InboundEventQueue, IKEY(i), h, NULL);
00240        pthread_mutex_unlock(&EventQueueMutex);
00241 
00242        pthread_mutex_lock(&EventExitQueueMutex);
00243        if (event_base == NULL) {
00244               pthread_mutex_unlock(&EventExitQueueMutex);
00245               return eAbort;
00246        }
00247        ev_async_send (event_base, &AddJob);
00248        pthread_mutex_unlock(&EventExitQueueMutex);
00249        EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
00250        return eSendReply;
00251 }
00252 
00253 extern eNextState evcurl_handle_start(AsyncIO *IO);
00254 
00255 eNextState QueueCurlContext(AsyncIO *IO)
00256 {
00257        IOAddHandler *h;
00258        int i;
00259 
00260        h = (IOAddHandler*)malloc(sizeof(IOAddHandler));
00261        h->IO = IO;
00262        h->EvAttch = evcurl_handle_start;
00263 
00264        pthread_mutex_lock(&EventQueueMutex);
00265        if (InboundEventQueue == NULL)
00266        {
00267               /* shutting down... */
00268               free(h);
00269               EVM_syslog(LOG_DEBUG, "EVENT Q exiting.\n");
00270               pthread_mutex_unlock(&EventQueueMutex);
00271               return eAbort;
00272        }
00273 
00274        EVM_syslog(LOG_DEBUG, "EVENT Q\n");
00275        i = ++evbase_count;
00276        Put(InboundEventQueue, IKEY(i), h, NULL);
00277        pthread_mutex_unlock(&EventQueueMutex);
00278 
00279        pthread_mutex_lock(&EventExitQueueMutex);
00280        if (event_base == NULL) {
00281               pthread_mutex_unlock(&EventExitQueueMutex);
00282               return eAbort;
00283        }
00284        ev_async_send (event_base, &AddJob);
00285        pthread_mutex_unlock(&EventExitQueueMutex);
00286 
00287        EVM_syslog(LOG_DEBUG, "EVENT Q Done.\n");
00288        return eSendReply;
00289 }
00290 
00291 void DestructCAres(AsyncIO *IO);
00292 void FreeAsyncIOContents(AsyncIO *IO)
00293 {
00294        CitContext *Ctx = IO->CitContext;
00295 
00296        FreeStrBuf(&IO->IOBuf);
00297        FreeStrBuf(&IO->SendBuf.Buf);
00298        FreeStrBuf(&IO->RecvBuf.Buf);
00299 
00300        DestructCAres(IO);
00301 
00302        FreeURL(&IO->ConnectMe);
00303        FreeStrBuf(&IO->HttpReq.ReplyData);
00304 
00305        if (Ctx) {
00306               Ctx->state = CON_IDLE;
00307               Ctx->kill_me = 1;
00308        }
00309 }
00310 
00311 
00312 void StopClientWatchers(AsyncIO *IO, int CloseFD)
00313 {
00314        ev_timer_stop (event_base, &IO->rw_timeout);
00315        ev_timer_stop(event_base, &IO->conn_fail);
00316        ev_idle_stop(event_base, &IO->unwind_stack);
00317        ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
00318 
00319        ev_io_stop(event_base, &IO->conn_event);
00320        ev_io_stop(event_base, &IO->send_event);
00321        ev_io_stop(event_base, &IO->recv_event);
00322 
00323        if (CloseFD && (IO->SendBuf.fd > 0)) {
00324               close(IO->SendBuf.fd);
00325               IO->SendBuf.fd = -1;
00326               IO->RecvBuf.fd = -1;
00327        }
00328 }
00329 
00330 void StopCurlWatchers(AsyncIO *IO)
00331 {
00332        ev_timer_stop (event_base, &IO->rw_timeout);
00333        ev_timer_stop(event_base, &IO->conn_fail);
00334        ev_idle_stop(event_base, &IO->unwind_stack);
00335        ev_cleanup_stop(event_base, &IO->abort_by_shutdown);
00336 
00337        ev_io_stop(event_base, &IO->conn_event);
00338        ev_io_stop(event_base, &IO->send_event);
00339        ev_io_stop(event_base, &IO->recv_event);
00340 
00341        if (IO->SendBuf.fd != 0) {
00342               close(IO->SendBuf.fd);
00343        }
00344        IO->SendBuf.fd = 0;
00345        IO->RecvBuf.fd = 0;
00346 }
00347 
00348 void ShutDownCLient(AsyncIO *IO)
00349 {
00350        CitContext *Ctx =IO->CitContext;
00351        become_session(Ctx);
00352 
00353        EVM_syslog(LOG_DEBUG, "EVENT Terminating \n");
00354 
00355        StopClientWatchers(IO, 1);
00356 
00357        if (IO->DNS.Channel != NULL) {
00358               ares_destroy(IO->DNS.Channel);
00359               EV_DNS_LOG_STOP(DNS.recv_event);
00360               EV_DNS_LOG_STOP(DNS.send_event);
00361               ev_io_stop(event_base, &IO->DNS.recv_event);
00362               ev_io_stop(event_base, &IO->DNS.send_event);
00363               IO->DNS.Channel = NULL;
00364        }
00365        assert(IO->Terminate);
00366        IO->Terminate(IO);
00367 }
00368 
00369 void PostInbound(AsyncIO *IO)
00370 {
00371        switch (IO->NextState) {
00372        case eSendFile:
00373               ev_io_start(event_base, &IO->send_event);
00374               break;
00375        case eSendReply:
00376        case eSendMore:
00377               assert(IO->SendDone);
00378               IO->NextState = IO->SendDone(IO);
00379               ev_io_start(event_base, &IO->send_event);
00380               break;
00381        case eReadPayload:
00382        case eReadMore:
00383        case eReadFile:
00384               ev_io_start(event_base, &IO->recv_event);
00385               break;
00386        case eTerminateConnection:
00387               ShutDownCLient(IO);
00388               break;
00389        case eAbort:
00390               ShutDownCLient(IO);
00391               break;
00392        case eSendDNSQuery:
00393        case eReadDNSReply:
00394        case eDBQuery:
00395        case eConnect:
00396        case eReadMessage:
00397               break;
00398        }
00399 }
00400 eReadState HandleInbound(AsyncIO *IO)
00401 {
00402        const char *Err = NULL;
00403        eReadState Finished = eBufferNotEmpty;
00404 
00405        become_session(IO->CitContext);
00406 
00407        while ((Finished == eBufferNotEmpty) &&
00408               ((IO->NextState == eReadMessage)||
00409               (IO->NextState == eReadMore)||
00410               (IO->NextState == eReadFile)||
00411               (IO->NextState == eReadPayload)))
00412        {
00413               /* Reading lines...
00414                * lex line reply in callback,
00415                * or do it ourselves.
00416                * i.e. as nnn-blabla means continue reading in SMTP
00417                */
00418               if ((IO->NextState == eReadFile) &&
00419                   (Finished == eBufferNotEmpty))
00420               {
00421                      Finished = WriteIOBAlreadyRead(&IO->IOB, &Err);
00422                      if (Finished == eReadSuccess)
00423                      {
00424                             IO->NextState = eSendReply;
00425                      }
00426               }
00427               else if (IO->LineReader)
00428                      Finished = IO->LineReader(IO);
00429               else
00430                      Finished = StrBufChunkSipLine(IO->IOBuf,
00431                                                 &IO->RecvBuf);
00432 
00433               switch (Finished) {
00434               case eMustReadMore: 
00435                      break;
00436               case eBufferNotEmpty: /* shouldn't happen... */
00437               case eReadSuccess: 
00438                      break;
00439               case eReadFail: 
00440 
00441                      break;
00442               }
00443 
00444               if (Finished != eMustReadMore) {
00445                      assert(IO->ReadDone);
00446                      ev_io_stop(event_base, &IO->recv_event);
00447                      IO->NextState = IO->ReadDone(IO);
00448                      Finished = StrBufCheckBuffer(&IO->RecvBuf);
00449               }
00450        }
00451 
00452        PostInbound(IO);
00453 
00454        return Finished;
00455 }
00456 
00457 
00458 static void
00459 IO_send_callback(struct ev_loop *loop, ev_io *watcher, int revents)
00460 {
00461        int rc;
00462        AsyncIO *IO = watcher->data;
00463        const char *errmsg = NULL;
00464 
00465        IO->Now = ev_now(event_base);
00466        become_session(IO->CitContext);
00467 #ifdef BIGBAD_IODBG
00468        {
00469               int rv = 0;
00470               char fn [SIZ];
00471               FILE *fd;
00472               const char *pch = ChrPtr(IO->SendBuf.Buf);
00473               const char *pchh = IO->SendBuf.ReadWritePointer;
00474               long nbytes;
00475 
00476               if (pchh == NULL)
00477                      pchh = pch;
00478 
00479               nbytes = StrLength(IO->SendBuf.Buf) - (pchh - pch);
00480               snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
00481                       ((CitContext*)(IO->CitContext))->ServiceName,
00482                       IO->SendBuf.fd);
00483 
00484               fd = fopen(fn, "a+");
00485               fprintf(fd, "Send: BufSize: %ld BufContent: [",
00486                      nbytes);
00487               rv = fwrite(pchh, nbytes, 1, fd);
00488               if (!rv) printf("failed to write debug to %s!\n", fn);
00489               fprintf(fd, "]\n");
00490 #endif
00491               switch (IO->NextState) {
00492               case eSendFile:
00493                      rc = FileSendChunked(&IO->IOB, &errmsg);
00494                      if (rc < 0)
00495                             StrBufPlain(IO->ErrMsg, errmsg, -1);
00496                      break;
00497               default:
00498                      rc = StrBuf_write_one_chunk_callback(IO->SendBuf.fd,
00499                                                       0,
00500                                                       &IO->SendBuf);
00501               }
00502 
00503 #ifdef BIGBAD_IODBG
00504               fprintf(fd, "Sent: BufSize: %d bytes.\n", rc);
00505               fclose(fd);
00506        }
00507 #endif
00508        if (rc == 0)
00509        {
00510               ev_io_stop(event_base, &IO->send_event);
00511               switch (IO->NextState) {
00512               case eSendMore:
00513                      assert(IO->SendDone);
00514                      IO->NextState = IO->SendDone(IO);
00515 
00516                      if ((IO->NextState == eTerminateConnection) ||
00517                          (IO->NextState == eAbort) )
00518                             ShutDownCLient(IO);
00519                      else {
00520                             ev_io_start(event_base, &IO->send_event);
00521                      }
00522                      break;
00523               case eSendFile:
00524                      if (IO->IOB.ChunkSendRemain > 0) {
00525                             ev_io_start(event_base, &IO->recv_event);
00526                             SetNextTimeout(IO, 100.0);
00527 
00528                      } else {
00529                             assert(IO->ReadDone);
00530                             IO->NextState = IO->ReadDone(IO);
00531                             switch(IO->NextState) {
00532                             case eSendDNSQuery:
00533                             case eReadDNSReply:
00534                             case eDBQuery:
00535                             case eConnect:
00536                                    break;
00537                             case eSendReply:
00538                             case eSendMore:
00539                             case eSendFile:
00540                                    ev_io_start(event_base,
00541                                               &IO->send_event);
00542                                    break;
00543                             case eReadMessage:
00544                             case eReadMore:
00545                             case eReadPayload:
00546                             case eReadFile:
00547                                    break;
00548                             case eTerminateConnection:
00549                             case eAbort:
00550                                    break;
00551                             }
00552                      }
00553                      break;
00554               case eSendReply:
00555                   if (StrBufCheckBuffer(&IO->SendBuf) != eReadSuccess)
00556                      break;
00557                   IO->NextState = eReadMore;
00558               case eReadMore:
00559               case eReadMessage:
00560               case eReadPayload:
00561               case eReadFile:
00562                      if (StrBufCheckBuffer(&IO->RecvBuf) == eBufferNotEmpty)
00563                      {
00564                             HandleInbound(IO);
00565                      }
00566                      else {
00567                             ev_io_start(event_base, &IO->recv_event);
00568                      }
00569 
00570                      break;
00571               case eDBQuery:
00572                      /*
00573                       * we now live in another queue,
00574                       * so we have to unregister.
00575                       */
00576                      ev_cleanup_stop(loop, &IO->abort_by_shutdown);
00577                      break;
00578               case eSendDNSQuery:
00579               case eReadDNSReply:
00580               case eConnect:
00581               case eTerminateConnection:
00582               case eAbort:
00583                      break;
00584               }
00585        }
00586        else if (rc < 0) {
00587               if (errno != EAGAIN) {
00588                      StopClientWatchers(IO, 1);
00589                      EV_syslog(LOG_DEBUG,
00590                               "EVENT: Socket Invalid! [%d] [%s] [%d]\n",
00591                               errno, strerror(errno), IO->SendBuf.fd);
00592                      StrBufPrintf(IO->ErrMsg,
00593                                  "Socket Invalid! [%s]",
00594                                  strerror(errno));
00595                      SetNextTimeout(IO, 0.0);
00596               }
00597        }
00598        /* else : must write more. */
00599 }
00600 static void
00601 set_start_callback(struct ev_loop *loop, AsyncIO *IO, int revents)
00602 {
00603        ev_timer_stop(event_base, &IO->conn_fail);
00604        ev_timer_start(event_base, &IO->rw_timeout);
00605 
00606        switch(IO->NextState) {
00607        case eReadMore:
00608        case eReadMessage:
00609        case eReadFile:
00610               StrBufAppendBufPlain(IO->ErrMsg, HKEY("[while waiting for greeting]"), 0);
00611               ev_io_start(event_base, &IO->recv_event);
00612               break;
00613        case eSendReply:
00614        case eSendMore:
00615        case eReadPayload:
00616        case eSendFile:
00617               become_session(IO->CitContext);
00618               IO_send_callback(loop, &IO->send_event, revents);
00619               break;
00620        case eDBQuery:
00621        case eSendDNSQuery:
00622        case eReadDNSReply:
00623        case eConnect:
00624        case eTerminateConnection:
00625        case eAbort:
00627               break;
00628        }
00629 }
00630 
00631 static void
00632 IO_Timeout_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
00633 {
00634        AsyncIO *IO = watcher->data;
00635 
00636        IO->Now = ev_now(event_base);
00637        ev_timer_stop (event_base, &IO->rw_timeout);
00638        become_session(IO->CitContext);
00639 
00640        if (IO->SendBuf.fd != 0)
00641        {
00642               ev_io_stop(event_base, &IO->send_event);
00643               ev_io_stop(event_base, &IO->recv_event);
00644               ev_timer_stop (event_base, &IO->rw_timeout);
00645               close(IO->SendBuf.fd);
00646               IO->SendBuf.fd = IO->RecvBuf.fd = 0;
00647        }
00648 
00649        assert(IO->Timeout);
00650        switch (IO->Timeout(IO))
00651        {
00652        case eAbort:
00653               ShutDownCLient(IO);
00654        default:
00655               break;
00656        }
00657 }
00658 
00659 static void
00660 IO_connfail_callback(struct ev_loop *loop, ev_timer *watcher, int revents)
00661 {
00662        AsyncIO *IO = watcher->data;
00663 
00664        IO->Now = ev_now(event_base);
00665        ev_timer_stop (event_base, &IO->conn_fail);
00666 
00667        if (IO->SendBuf.fd != 0)
00668        {
00669               ev_io_stop(loop, &IO->conn_event);
00670               ev_io_stop(event_base, &IO->send_event);
00671               ev_io_stop(event_base, &IO->recv_event);
00672               ev_timer_stop (event_base, &IO->rw_timeout);
00673               close(IO->SendBuf.fd);
00674               IO->SendBuf.fd = IO->RecvBuf.fd = 0;
00675        }
00676        become_session(IO->CitContext);
00677 
00678        assert(IO->ConnFail);
00679        switch (IO->ConnFail(IO))
00680        {
00681        case eAbort:
00682               ShutDownCLient(IO);
00683        default:
00684               break;
00685 
00686        }
00687 }
00688 
00689 static void
00690 IO_connfailimmediate_callback(struct ev_loop *loop,
00691                            ev_idle *watcher,
00692                            int revents)
00693 {
00694        AsyncIO *IO = watcher->data;
00695 
00696        IO->Now = ev_now(event_base);
00697        ev_idle_stop (event_base, &IO->conn_fail_immediate);
00698 
00699        if (IO->SendBuf.fd != 0)
00700        {
00701               close(IO->SendBuf.fd);
00702               IO->SendBuf.fd = IO->RecvBuf.fd = 0;
00703        }
00704        become_session(IO->CitContext);
00705 
00706        assert(IO->ConnFail);
00707        switch (IO->ConnFail(IO))
00708        {
00709        case eAbort:
00710               ShutDownCLient(IO);
00711        default:
00712               break;
00713 
00714        }
00715 }
00716 
00717 static void
00718 IO_connestd_callback(struct ev_loop *loop, ev_io *watcher, int revents)
00719 {
00720         AsyncIO *IO = watcher->data;
00721         int             so_err = 0;
00722         socklen_t       lon = sizeof(so_err);
00723         int             err;
00724 
00725         IO->Now = ev_now(event_base);
00726         EVM_syslog(LOG_DEBUG, "connect() succeeded.\n");
00727 
00728         ev_io_stop(loop, &IO->conn_event);
00729         ev_timer_stop(event_base, &IO->conn_fail);
00730 
00731         err = getsockopt(IO->SendBuf.fd,
00732                          SOL_SOCKET,
00733                          SO_ERROR,
00734                          (void*)&so_err,
00735                          &lon);
00736 
00737         if ((err == 0) && (so_err != 0))
00738         {
00739                 EV_syslog(LOG_DEBUG, "connect() failed [%d][%s]\n",
00740                           so_err,
00741                           strerror(so_err));
00742                 IO_connfail_callback(loop, &IO->conn_fail, revents);
00743 
00744         }
00745         else
00746         {
00747                 EVM_syslog(LOG_DEBUG, "connect() succeeded\n");
00748                 set_start_callback(loop, IO, revents);
00749         }
00750 }
00751 
00752 static void
00753 IO_recv_callback(struct ev_loop *loop, ev_io *watcher, int revents)
00754 {
00755        const char *errmsg;
00756        ssize_t nbytes;
00757        AsyncIO *IO = watcher->data;
00758 
00759        IO->Now = ev_now(event_base);
00760        switch (IO->NextState) {
00761        case eReadFile:
00762               nbytes = FileRecvChunked(&IO->IOB, &errmsg);
00763               if (nbytes < 0)
00764                      StrBufPlain(IO->ErrMsg, errmsg, -1);
00765               else
00766               {
00767                      if (IO->IOB.ChunkSendRemain == 0)
00768                      {
00769                             IO->NextState = eSendReply;
00770                             assert(IO->ReadDone);
00771                             ev_io_stop(event_base, &IO->recv_event);
00772                             PostInbound(IO);
00773                             return;
00774                      }
00775                      else
00776                             return;
00777               }
00778               break;
00779        default:
00780               nbytes = StrBuf_read_one_chunk_callback(IO->RecvBuf.fd,
00781                                                  0,
00782                                                  &IO->RecvBuf);
00783               break;
00784        }
00785 
00786 #ifdef BIGBAD_IODBG
00787        {
00788               long nbytes;
00789               int rv = 0;
00790               char fn [SIZ];
00791               FILE *fd;
00792               const char *pch = ChrPtr(IO->RecvBuf.Buf);
00793               const char *pchh = IO->RecvBuf.ReadWritePointer;
00794 
00795               if (pchh == NULL)
00796                      pchh = pch;
00797 
00798               nbytes = StrLength(IO->RecvBuf.Buf) - (pchh - pch);
00799               snprintf(fn, SIZ, "/tmp/foolog_ev_%s.%d",
00800                       ((CitContext*)(IO->CitContext))->ServiceName,
00801                       IO->SendBuf.fd);
00802 
00803               fd = fopen(fn, "a+");
00804               fprintf(fd, "Read: BufSize: %ld BufContent: [",
00805                      nbytes);
00806               rv = fwrite(pchh, nbytes, 1, fd);
00807               if (!rv) printf("failed to write debug to %s!\n", fn);
00808               fprintf(fd, "]\n");
00809               fclose(fd);
00810        }
00811 #endif
00812        if (nbytes > 0) {
00813               HandleInbound(IO);
00814        } else if (nbytes == 0) {
00815               SetNextTimeout(IO, 0.0);
00816               return;
00817        } else if (nbytes == -1) {
00818               if (errno != EAGAIN) {
00819                      // FD is gone. kick it. 
00820                      StopClientWatchers(IO, 1);
00821                      EV_syslog(LOG_DEBUG,
00822                               "EVENT: Socket Invalid! [%d] [%s] [%d]\n",
00823                               errno, strerror(errno), IO->SendBuf.fd);
00824                      StrBufPrintf(IO->ErrMsg,
00825                                  "Socket Invalid! [%s]",
00826                                  strerror(errno));
00827                      SetNextTimeout(IO, 0.0);
00828               }
00829               return;
00830        }
00831 }
00832 
00833 void
00834 IO_postdns_callback(struct ev_loop *loop, ev_idle *watcher, int revents)
00835 {
00836        AsyncIO *IO = watcher->data;
00837        IO->Now = ev_now(event_base);
00838        EV_syslog(LOG_DEBUG, "event: %s\n", __FUNCTION__);
00839        become_session(IO->CitContext);
00840        assert(IO->DNS.Query->PostDNS);
00841        switch (IO->DNS.Query->PostDNS(IO))
00842        {
00843        case eAbort:
00844               assert(IO->DNS.Fail);
00845               switch (IO->DNS.Fail(IO)) {
00846               case eAbort:
00848                      ShutDownCLient(IO);
00849               default:
00850                      break;
00851               }
00852        default:
00853               break;
00854        }
00855 }
00856 
00857 
00858 eNextState EvConnectSock(AsyncIO *IO,
00859                       double conn_timeout,
00860                       double first_rw_timeout,
00861                       int ReadFirst)
00862 {
00863        struct sockaddr_in egress_sin;
00864        int fdflags;
00865        int rc = -1;
00866 
00867        become_session(IO->CitContext);
00868 
00869        if (ReadFirst) {
00870               IO->NextState = eReadMessage;
00871        }
00872        else {
00873               IO->NextState = eSendReply;
00874        }
00875 
00876        IO->SendBuf.fd = IO->RecvBuf.fd =
00877               socket(
00878                      (IO->ConnectMe->IPv6)?PF_INET6:PF_INET,
00879                      SOCK_STREAM,
00880                      IPPROTO_TCP);
00881 
00882        if (IO->SendBuf.fd < 0) {
00883               EV_syslog(LOG_ERR,
00884                        "EVENT: socket() failed: %s\n",
00885                        strerror(errno));
00886 
00887               StrBufPrintf(IO->ErrMsg,
00888                           "Failed to create socket: %s",
00889                           strerror(errno));
00890               IO->SendBuf.fd = IO->RecvBuf.fd = 0;
00891               return eAbort;
00892        }
00893        fdflags = fcntl(IO->SendBuf.fd, F_GETFL);
00894        if (fdflags < 0) {
00895               EV_syslog(LOG_ERR,
00896                        "EVENT: unable to get socket flags! %s \n",
00897                        strerror(errno));
00898               StrBufPrintf(IO->ErrMsg,
00899                           "Failed to get socket flags: %s",
00900                           strerror(errno));
00901               close(IO->SendBuf.fd);
00902               IO->SendBuf.fd = IO->RecvBuf.fd = 0;
00903               return eAbort;
00904        }
00905        fdflags = fdflags | O_NONBLOCK;
00906        if (fcntl(IO->SendBuf.fd, F_SETFL, fdflags) < 0) {
00907               EV_syslog(
00908                      LOG_ERR,
00909                      "EVENT: unable to set socket nonblocking flags! %s \n",
00910                      strerror(errno));
00911               StrBufPrintf(IO->ErrMsg,
00912                           "Failed to set socket flags: %s",
00913                           strerror(errno));
00914               close(IO->SendBuf.fd);
00915               IO->SendBuf.fd = IO->RecvBuf.fd = 0;
00916               return eAbort;
00917        }
00918 /* TODO: maye we could use offsetof() to calc the position of data...
00919  * http://doc.dvgu.ru/devel/ev.html#associating_custom_data_with_a_watcher
00920  */
00921        ev_io_init(&IO->recv_event, IO_recv_callback, IO->RecvBuf.fd, EV_READ);
00922        IO->recv_event.data = IO;
00923        ev_io_init(&IO->send_event, IO_send_callback, IO->SendBuf.fd, EV_WRITE);
00924        IO->send_event.data = IO;
00925 
00926        ev_timer_init(&IO->conn_fail, IO_connfail_callback, conn_timeout, 0);
00927        IO->conn_fail.data = IO;
00928        ev_timer_init(&IO->rw_timeout, IO_Timeout_callback, first_rw_timeout,0);
00929        IO->rw_timeout.data = IO;
00930 
00931 
00932 
00933 
00934        /* for debugging you may bypass it like this:
00935         * IO->Addr.sin_addr.s_addr = inet_addr("127.0.0.1");
00936         * ((struct sockaddr_in)IO->ConnectMe->Addr).sin_addr.s_addr =
00937         *   inet_addr("127.0.0.1");
00938         */
00939        if (IO->ConnectMe->IPv6) {
00940               rc = connect(IO->SendBuf.fd,
00941                           &IO->ConnectMe->Addr,
00942                           sizeof(struct sockaddr_in6));
00943        }
00944        else {
00945               /* If citserver is bound to a specific IP address on the host, make
00946                * sure we use that address for outbound connections.
00947                */
00948        
00949               memset(&egress_sin, 0, sizeof(egress_sin));
00950               egress_sin.sin_family = AF_INET;
00951               if (!IsEmptyStr(config.c_ip_addr)) {
00952                      egress_sin.sin_addr.s_addr = inet_addr(config.c_ip_addr);
00953                      if (egress_sin.sin_addr.s_addr == !INADDR_ANY) {
00954                             egress_sin.sin_addr.s_addr = INADDR_ANY;
00955                      }
00956 
00957                      /* If this bind fails, no problem; we can still use INADDR_ANY */
00958                      bind(IO->SendBuf.fd, (struct sockaddr *)&egress_sin, sizeof(egress_sin));
00959               }
00960               rc = connect(IO->SendBuf.fd,
00961                           (struct sockaddr_in *)&IO->ConnectMe->Addr,
00962                           sizeof(struct sockaddr_in));
00963        }
00964 
00965        if (rc >= 0){
00966               EVM_syslog(LOG_DEBUG, "connect() immediate success.\n");
00967               set_start_callback(event_base, IO, 0);
00968               return IO->NextState;
00969        }
00970        else if (errno == EINPROGRESS) {
00971               EVM_syslog(LOG_DEBUG, "connect() have to wait now.\n");
00972 
00973               ev_io_init(&IO->conn_event,
00974                         IO_connestd_callback,
00975                         IO->SendBuf.fd,
00976                         EV_READ|EV_WRITE);
00977 
00978               IO->conn_event.data = IO;
00979 
00980               ev_io_start(event_base, &IO->conn_event);
00981               ev_timer_start(event_base, &IO->conn_fail);
00982               return IO->NextState;
00983        }
00984        else {
00985               ev_idle_init(&IO->conn_fail_immediate,
00986                           IO_connfailimmediate_callback);
00987               IO->conn_fail_immediate.data = IO;
00988               ev_idle_start(event_base, &IO->conn_fail_immediate);
00989 
00990               EV_syslog(LOG_ERR, "connect() failed: %s\n", strerror(errno));
00991               StrBufPrintf(IO->ErrMsg,
00992                           "Failed to connect: %s",
00993                           strerror(errno));
00994               return IO->NextState;
00995        }
00996        return IO->NextState;
00997 }
00998 
00999 void SetNextTimeout(AsyncIO *IO, double timeout)
01000 {
01001        IO->rw_timeout.repeat = timeout;
01002        ev_timer_again (event_base,  &IO->rw_timeout);
01003 }
01004 
01005 
01006 eNextState ReAttachIO(AsyncIO *IO,
01007                     void *pData,
01008                     int ReadFirst)
01009 {
01010        IO->Data = pData;
01011        become_session(IO->CitContext);
01012        ev_cleanup_start(event_base, &IO->abort_by_shutdown);
01013        if (ReadFirst) {
01014               IO->NextState = eReadMessage;
01015        }
01016        else {
01017               IO->NextState = eSendReply;
01018        }
01019        set_start_callback(event_base, IO, 0);
01020 
01021        return IO->NextState;
01022 }
01023 
01024 void InitIOStruct(AsyncIO *IO,
01025                 void *Data,
01026                 eNextState NextState,
01027                 IO_LineReaderCallback LineReader,
01028                 IO_CallBack DNS_Fail,
01029                 IO_CallBack SendDone,
01030                 IO_CallBack ReadDone,
01031                 IO_CallBack Terminate,
01032                 IO_CallBack DBTerminate,
01033                 IO_CallBack ConnFail,
01034                 IO_CallBack Timeout,
01035                 IO_CallBack ShutdownAbort)
01036 {
01037        IO->Data          = Data;
01038 
01039        IO->CitContext    = CloneContext(CC);
01040        ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data;
01041 
01042        IO->NextState     = NextState;
01043 
01044        IO->SendDone      = SendDone;
01045        IO->ReadDone      = ReadDone;
01046        IO->Terminate     = Terminate;
01047        IO->DBTerminate   = DBTerminate;
01048        IO->LineReader    = LineReader;
01049        IO->ConnFail      = ConnFail;
01050        IO->Timeout       = Timeout;
01051        IO->ShutdownAbort = ShutdownAbort;
01052 
01053        IO->DNS.Fail      = DNS_Fail;
01054 
01055        IO->SendBuf.Buf   = NewStrBufPlain(NULL, 1024);
01056        IO->RecvBuf.Buf   = NewStrBufPlain(NULL, 1024);
01057        IO->IOBuf         = NewStrBuf();
01058        EV_syslog(LOG_DEBUG,
01059                 "EVENT: Session lives at %p IO at %p \n",
01060                 Data, IO);
01061 
01062 }
01063 
01064 extern int evcurl_init(AsyncIO *IO);
01065 
01066 int InitcURLIOStruct(AsyncIO *IO,
01067                    void *Data,
01068                    const char* Desc,
01069                    IO_CallBack SendDone,
01070                    IO_CallBack Terminate,
01071                    IO_CallBack DBTerminate,
01072                    IO_CallBack ShutdownAbort)
01073 {
01074        IO->Data          = Data;
01075 
01076        IO->CitContext    = CloneContext(CC);
01077        ((CitContext *)IO->CitContext)->session_specific_data = (char*) Data;
01078 
01079        IO->SendDone      = SendDone;
01080        IO->Terminate     = Terminate;
01081        IO->DBTerminate   = DBTerminate;
01082        IO->ShutdownAbort = ShutdownAbort;
01083 
01084        strcpy(IO->HttpReq.errdesc, Desc);
01085 
01086 
01087        return  evcurl_init(IO);
01088 
01089 }
01090 
01091 extern int DebugEventLoopBacktrace;
01092 void EV_backtrace(AsyncIO *IO)
01093 {
01094 #ifdef HAVE_BACKTRACE
01095        void *stack_frames[50];
01096        size_t size, i;
01097        char **strings;
01098 
01099        if ((IO == NULL) || (DebugEventLoopBacktrace == 0))
01100               return;
01101        size = backtrace(stack_frames, sizeof(stack_frames) / sizeof(void*));
01102        strings = backtrace_symbols(stack_frames, size);
01103        for (i = 0; i < size; i++) {
01104               if (strings != NULL) {
01105                      EV_syslog(LOG_ALERT, " BT %s\n", strings[i]);
01106               }
01107               else {
01108                      EV_syslog(LOG_ALERT, " BT %p\n", stack_frames[i]);
01109               }
01110        }
01111        free(strings);
01112 #endif
01113 }
01114 
01115 
01116 ev_tstamp ctdl_ev_now (void)
01117 {
01118        return ev_now(event_base);
01119 }