Back to index

citadel  8.12
serv_netspool.c
Go to the documentation of this file.
00001 /*
00002  * This module handles shared rooms, inter-Citadel mail, and outbound
00003  * mailing list processing.
00004  *
00005  * Copyright (c) 2000-2012 by the citadel.org team
00006  *
00007  *  This program is open source software; you can redistribute it and/or modify
00008  *  it under the terms of the GNU General Public License as published by
00009  *  the Free Software Foundation; either version 3 of the License, or
00010  *  (at your option) any later version.
00011  *
00012  *  This program is distributed in the hope that it will be useful,
00013  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  *  GNU General Public License for more details.
00016  *
00017  *  You should have received a copy of the GNU General Public License
00018  *  along with this program; if not, write to the Free Software
00019  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020  *
00021  * ** NOTE **   A word on the S_NETCONFIGS semaphore:
00022  * This is a fairly high-level type of critical section.  It ensures that no
00023  * two threads work on the netconfigs files at the same time.  Since we do
00024  * so many things inside these, here are the rules:
00025  *  1. begin_critical_section(S_NETCONFIGS) *before* begin_ any others.
00026  *  2. Do *not* perform any I/O with the client during these sections.
00027  *
00028  */
00029 
00030 /*
00031  * Duration of time (in seconds) after which pending list subscribe/unsubscribe
00032  * requests that have not been confirmed will be deleted.
00033  */
00034 #define EXP   259200 /* three days */
00035 
00036 #include "sysdep.h"
00037 #include <stdlib.h>
00038 #include <unistd.h>
00039 #include <stdio.h>
00040 #include <fcntl.h>
00041 #include <ctype.h>
00042 #include <signal.h>
00043 #include <pwd.h>
00044 #include <errno.h>
00045 #include <sys/stat.h>
00046 #include <sys/types.h>
00047 #include <dirent.h>
00048 #if TIME_WITH_SYS_TIME
00049 # include <sys/time.h>
00050 # include <time.h>
00051 #else
00052 # if HAVE_SYS_TIME_H
00053 #  include <sys/time.h>
00054 # else
00055 #  include <time.h>
00056 # endif
00057 #endif
00058 #ifdef HAVE_SYSCALL_H
00059 # include <syscall.h>
00060 #else 
00061 # if HAVE_SYS_SYSCALL_H
00062 #  include <sys/syscall.h>
00063 # endif
00064 #endif
00065 
00066 #include <sys/wait.h>
00067 #include <string.h>
00068 #include <limits.h>
00069 #include <libcitadel.h>
00070 #include "citadel.h"
00071 #include "server.h"
00072 #include "citserver.h"
00073 #include "support.h"
00074 #include "config.h"
00075 #include "user_ops.h"
00076 #include "database.h"
00077 #include "msgbase.h"
00078 #include "internet_addressing.h"
00079 #include "serv_network.h"
00080 #include "clientsocket.h"
00081 #include "file_ops.h"
00082 #include "citadel_dirs.h"
00083 #include "threads.h"
00084 
00085 #ifndef HAVE_SNPRINTF
00086 #include "snprintf.h"
00087 #endif
00088 
00089 #include "context.h"
00090 #include "netconfig.h"
00091 #include "netspool.h"
00092 #include "netmail.h"
00093 #include "ctdl_module.h"
00094 
00095 
00096        
00097 
00098 int read_spoolcontrol_file(SpoolControl **scc, char *filename)
00099 {
00100        FILE *fp;
00101        char instr[SIZ];
00102        char buf[SIZ];
00103        char nodename[256];
00104        char roomname[ROOMNAMELEN];
00105        size_t miscsize = 0;
00106        size_t linesize = 0;
00107        int skipthisline = 0;
00108        namelist *nptr = NULL;
00109        maplist *mptr = NULL;
00110        SpoolControl *sc;
00111 
00112        fp = fopen(filename, "r");
00113        if (fp == NULL) {
00114               return 0;
00115        }
00116        sc = malloc(sizeof(SpoolControl));
00117        memset(sc, 0, sizeof(SpoolControl));
00118        *scc = sc;
00119 
00120        while (fgets(buf, sizeof buf, fp) != NULL) {
00121               buf[strlen(buf)-1] = 0;
00122 
00123               extract_token(instr, buf, 0, '|', sizeof instr);
00124               if (!strcasecmp(instr, strof(lastsent))) {
00125                      sc->lastsent = extract_long(buf, 1);
00126               }
00127               else if (!strcasecmp(instr, strof(listrecp))) {
00128                      nptr = (namelist *)
00129                             malloc(sizeof(namelist));
00130                      nptr->next = sc->listrecps;
00131                      extract_token(nptr->name, buf, 1, '|', sizeof nptr->name);
00132                      sc->listrecps = nptr;
00133               }
00134               else if (!strcasecmp(instr, strof(participate))) {
00135                      nptr = (namelist *)
00136                             malloc(sizeof(namelist));
00137                      nptr->next = sc->participates;
00138                      extract_token(nptr->name, buf, 1, '|', sizeof nptr->name);
00139                      sc->participates = nptr;
00140               }
00141               else if (!strcasecmp(instr, strof(digestrecp))) {
00142                      nptr = (namelist *)
00143                             malloc(sizeof(namelist));
00144                      nptr->next = sc->digestrecps;
00145                      extract_token(nptr->name, buf, 1, '|', sizeof nptr->name);
00146                      sc->digestrecps = nptr;
00147               }
00148               else if (!strcasecmp(instr, strof(ignet_push_share))) {
00149                      extract_token(nodename, buf, 1, '|', sizeof nodename);
00150                      extract_token(roomname, buf, 2, '|', sizeof roomname);
00151                      mptr = (maplist *) malloc(sizeof(maplist));
00152                      mptr->next = sc->ignet_push_shares;
00153                      strcpy(mptr->remote_nodename, nodename);
00154                      strcpy(mptr->remote_roomname, roomname);
00155                      sc->ignet_push_shares = mptr;
00156               }
00157               else {
00158                      /* Preserve 'other' lines ... *unless* they happen to
00159                       * be subscribe/unsubscribe pendings with expired
00160                       * timestamps.
00161                       */
00162                      skipthisline = 0;
00163                      if (!strncasecmp(buf, strof(subpending)"|", 11)) {
00164                             if (time(NULL) - extract_long(buf, 4) > EXP) {
00165                                    skipthisline = 1;
00166                             }
00167                      }
00168                      if (!strncasecmp(buf, strof(unsubpending)"|", 13)) {
00169                             if (time(NULL) - extract_long(buf, 3) > EXP) {
00170                                    skipthisline = 1;
00171                             }
00172                      }
00173 
00174                      if (skipthisline == 0) {
00175                             linesize = strlen(buf);
00176                             sc->misc = realloc(sc->misc,
00177                                    (miscsize + linesize + 2) );
00178                             sprintf(&sc->misc[miscsize], "%s\n", buf);
00179                             miscsize = miscsize + linesize + 1;
00180                      }
00181               }
00182 
00183 
00184        }
00185        fclose(fp);
00186        return 1;
00187 }
00188 
00189 void free_spoolcontrol_struct(SpoolControl **scc)
00190 {
00191        SpoolControl *sc;
00192        namelist *nptr = NULL;
00193        maplist *mptr = NULL;
00194 
00195        sc = *scc;
00196        while (sc->listrecps != NULL) {
00197               nptr = sc->listrecps->next;
00198               free(sc->listrecps);
00199               sc->listrecps = nptr;
00200        }
00201        /* Do the same for digestrecps */
00202        while (sc->digestrecps != NULL) {
00203               nptr = sc->digestrecps->next;
00204               free(sc->digestrecps);
00205               sc->digestrecps = nptr;
00206        }
00207        /* Do the same for participates */
00208        while (sc->participates != NULL) {
00209               nptr = sc->participates->next;
00210               free(sc->participates);
00211               sc->participates = nptr;
00212        }
00213        while (sc->ignet_push_shares != NULL) {
00214               mptr = sc->ignet_push_shares->next;
00215               free(sc->ignet_push_shares);
00216               sc->ignet_push_shares = mptr;
00217        }
00218        free(sc->misc);
00219        free(sc);
00220        *scc=NULL;
00221 }
00222 
00223 int writenfree_spoolcontrol_file(SpoolControl **scc, char *filename)
00224 {
00225        char tempfilename[PATH_MAX];
00226        int TmpFD;
00227        SpoolControl *sc;
00228        namelist *nptr = NULL;
00229        maplist *mptr = NULL;
00230        long len;
00231        time_t unixtime;
00232        struct timeval tv;
00233        long reltid; /* if we don't have SYS_gettid, use "random" value */
00234        StrBuf *Cfg;
00235        int rc;
00236 
00237        len = strlen(filename);
00238        memcpy(tempfilename, filename, len + 1);
00239 
00240 
00241 #if defined(HAVE_SYSCALL_H) && defined (SYS_gettid)
00242        reltid = syscall(SYS_gettid);
00243 #endif
00244        gettimeofday(&tv, NULL);
00245        /* Promote to time_t; types differ on some OSes (like darwin) */
00246        unixtime = tv.tv_sec;
00247 
00248        sprintf(tempfilename + len, ".%ld-%ld", reltid, unixtime);
00249        sc = *scc;
00250        errno = 0;
00251        TmpFD = open(tempfilename, O_CREAT|O_EXCL|O_RDWR, S_IRUSR|S_IWUSR);
00252        Cfg = NewStrBuf();
00253        if ((TmpFD < 0) || (errno != 0)) {
00254               syslog(LOG_CRIT, "ERROR: cannot open %s: %s\n",
00255                      filename, strerror(errno));
00256               free_spoolcontrol_struct(scc);
00257               unlink(tempfilename);
00258        }
00259        else {
00260               fchown(TmpFD, config.c_ctdluid, 0);
00261               StrBufAppendPrintf(Cfg, "lastsent|%ld\n", sc->lastsent);
00262               
00263               /* Write out the listrecps while freeing from memory at the
00264                * same time.  Am I clever or what?  :)
00265                */
00266               while (sc->listrecps != NULL) {
00267                   StrBufAppendPrintf(Cfg, "listrecp|%s\n", sc->listrecps->name);
00268                      nptr = sc->listrecps->next;
00269                      free(sc->listrecps);
00270                      sc->listrecps = nptr;
00271               }
00272               /* Do the same for digestrecps */
00273               while (sc->digestrecps != NULL) {
00274                      StrBufAppendPrintf(Cfg, "digestrecp|%s\n", sc->digestrecps->name);
00275                      nptr = sc->digestrecps->next;
00276                      free(sc->digestrecps);
00277                      sc->digestrecps = nptr;
00278               }
00279               /* Do the same for participates */
00280               while (sc->participates != NULL) {
00281                      StrBufAppendPrintf(Cfg, "participate|%s\n", sc->participates->name);
00282                      nptr = sc->participates->next;
00283                      free(sc->participates);
00284                      sc->participates = nptr;
00285               }
00286               while (sc->ignet_push_shares != NULL) {
00287                      StrBufAppendPrintf(Cfg, "ignet_push_share|%s", sc->ignet_push_shares->remote_nodename);
00288                      if (!IsEmptyStr(sc->ignet_push_shares->remote_roomname)) {
00289                             StrBufAppendPrintf(Cfg, "|%s", sc->ignet_push_shares->remote_roomname);
00290                      }
00291                      StrBufAppendPrintf(Cfg, "\n");
00292                      mptr = sc->ignet_push_shares->next;
00293                      free(sc->ignet_push_shares);
00294                      sc->ignet_push_shares = mptr;
00295               }
00296               if (sc->misc != NULL) {
00297                      StrBufAppendBufPlain(Cfg, sc->misc, -1, 0);
00298               }
00299               free(sc->misc);
00300 
00301               rc = write(TmpFD, ChrPtr(Cfg), StrLength(Cfg));
00302               if ((rc >=0 ) && (rc == StrLength(Cfg))) 
00303               {
00304                      close(TmpFD);
00305                      rename(tempfilename, filename);
00306               }
00307               else {
00308                      syslog(LOG_EMERG, 
00309                                   "unable to write %s; [%s]; not enough space on the disk?\n", 
00310                                   tempfilename, 
00311                                   strerror(errno));
00312                      close(TmpFD);
00313                      unlink(tempfilename);
00314               }
00315               FreeStrBuf(&Cfg);
00316               free(sc);
00317               *scc=NULL;
00318        }
00319        return 1;
00320 }
00321 int is_recipient(SpoolControl *sc, const char *Name)
00322 {
00323        namelist *nptr;
00324        size_t len;
00325 
00326        len = strlen(Name);
00327        nptr = sc->listrecps;
00328        while (nptr != NULL) {
00329               if (strncmp(Name, nptr->name, len)==0)
00330                      return 1;
00331               nptr = nptr->next;
00332        }
00333        /* Do the same for digestrecps */
00334        nptr = sc->digestrecps;
00335        while (nptr != NULL) {
00336               if (strncmp(Name, nptr->name, len)==0)
00337                      return 1;
00338               nptr = nptr->next;
00339        }
00340        /* Do the same for participates */
00341        nptr = sc->participates;
00342        while (nptr != NULL) {
00343               if (strncmp(Name, nptr->name, len)==0)
00344                      return 1;
00345               nptr = nptr->next;
00346        }
00347        return 0;
00348 }
00349 
00350 
00351 /*
00352  * Batch up and send all outbound traffic from the current room
00353  */
00354 void network_spoolout_room(RoomProcList *room_to_spool,                      
00355                         HashList *working_ignetcfg,
00356                         HashList *the_netmap)
00357 {
00358        char buf[SIZ];
00359        char filename[PATH_MAX];
00360        SpoolControl *sc;
00361        int i;
00362 
00363        /*
00364         * If the room doesn't exist, don't try to perform its networking tasks.
00365         * Normally this should never happen, but once in a while maybe a room gets
00366         * queued for networking and then deleted before it can happen.
00367         */
00368        if (CtdlGetRoom(&CC->room, room_to_spool->name) != 0) {
00369               syslog(LOG_CRIT, "ERROR: cannot load <%s>\n", room_to_spool->name);
00370               return;
00371        }
00372 
00373        assoc_file_name(filename, sizeof filename, &CC->room, ctdl_netcfg_dir);
00374        begin_critical_section(S_NETCONFIGS);
00375 
00376        /* Only do net processing for rooms that have netconfigs */
00377        if (!read_spoolcontrol_file(&sc, filename))
00378        {
00379               end_critical_section(S_NETCONFIGS);
00380               return;
00381        }
00382        syslog(LOG_INFO, "Networking started for <%s>\n", CC->room.QRname);
00383 
00384        sc->working_ignetcfg = working_ignetcfg;
00385        sc->the_netmap = the_netmap;
00386 
00387        /* If there are digest recipients, we have to build a digest */
00388        if (sc->digestrecps != NULL) {
00389               sc->digestfp = tmpfile();
00390               fprintf(sc->digestfp, "Content-type: text/plain\n\n");
00391        }
00392 
00393        /* Do something useful */
00394        CtdlForEachMessage(MSGS_GT, sc->lastsent, NULL, NULL, NULL,
00395               network_spool_msg, sc);
00396 
00397        /* If we wrote a digest, deliver it and then close it */
00398        snprintf(buf, sizeof buf, "room_%s@%s",
00399               CC->room.QRname, config.c_fqdn);
00400        for (i=0; buf[i]; ++i) {
00401               buf[i] = tolower(buf[i]);
00402               if (isspace(buf[i])) buf[i] = '_';
00403        }
00404        if (sc->digestfp != NULL) {
00405               fprintf(sc->digestfp,       " -----------------------------------"
00406                                    "------------------------------------"
00407                                    "-------\n"
00408                                    "You are subscribed to the '%s' "
00409                                    "list.\n"
00410                                    "To post to the list: %s\n",
00411                                    CC->room.QRname, buf
00412               );
00413               network_deliver_digest(sc); /* deliver and close */
00414        }
00415 
00416        /* Now rewrite the config file */
00417        writenfree_spoolcontrol_file(&sc, filename);
00418        end_critical_section(S_NETCONFIGS);
00419 }
00420 
00421 /*
00422  * Process a buffer containing a single message from a single file
00423  * from the inbound queue 
00424  */
00425 void network_process_buffer(char *buffer, long size, HashList *working_ignetcfg, HashList *the_netmap, int *netmap_changed)
00426 {
00427        struct CitContext *CCC = CC;
00428        StrBuf *Buf = NULL;
00429        struct CtdlMessage *msg = NULL;
00430        long pos;
00431        int field;
00432        struct recptypes *recp = NULL;
00433        char target_room[ROOMNAMELEN];
00434        struct ser_ret sermsg;
00435        char *oldpath = NULL;
00436        char filename[PATH_MAX];
00437        FILE *fp;
00438        const StrBuf *nexthop = NULL;
00439        unsigned char firstbyte;
00440        unsigned char lastbyte;
00441 
00442        QN_syslog(LOG_DEBUG, "network_process_buffer() processing %ld bytes\n", size);
00443 
00444        /* Validate just a little bit.  First byte should be FF and * last byte should be 00. */
00445        firstbyte = buffer[0];
00446        lastbyte = buffer[size-1];
00447        if ( (firstbyte != 255) || (lastbyte != 0) ) {
00448               QN_syslog(LOG_ERR, "Corrupt message ignored.  Length=%ld, firstbyte = %d, lastbyte = %d\n",
00449                        size, firstbyte, lastbyte);
00450               return;
00451        }
00452 
00453        /* Set default target room to trash */
00454        strcpy(target_room, TWITROOM);
00455 
00456        /* Load the message into memory */
00457        msg = (struct CtdlMessage *) malloc(sizeof(struct CtdlMessage));
00458        memset(msg, 0, sizeof(struct CtdlMessage));
00459        msg->cm_magic = CTDLMESSAGE_MAGIC;
00460        msg->cm_anon_type = buffer[1];
00461        msg->cm_format_type = buffer[2];
00462 
00463        for (pos = 3; pos < size; ++pos) {
00464               field = buffer[pos];
00465               msg->cm_fields[field] = strdup(&buffer[pos+1]);
00466               pos = pos + strlen(&buffer[(int)pos]);
00467        }
00468 
00469        /* Check for message routing */
00470        if (msg->cm_fields['D'] != NULL) {
00471               if (strcasecmp(msg->cm_fields['D'], config.c_nodename)) {
00472 
00473                      /* route the message */
00474                      Buf = NewStrBufPlain(msg->cm_fields['D'], -1);
00475                      if (is_valid_node(&nexthop, 
00476                                      NULL, 
00477                                      Buf, 
00478                                      working_ignetcfg, 
00479                                      the_netmap) == 0) 
00480                      {
00481                             /* prepend our node to the path */
00482                             if (msg->cm_fields['P'] != NULL) {
00483                                    oldpath = msg->cm_fields['P'];
00484                                    msg->cm_fields['P'] = NULL;
00485                             }
00486                             else {
00487                                    oldpath = strdup("unknown_user");
00488                             }
00489                             size = strlen(oldpath) + SIZ;
00490                             msg->cm_fields['P'] = malloc(size);
00491                             snprintf(msg->cm_fields['P'], size, "%s!%s",
00492                                    config.c_nodename, oldpath);
00493                             free(oldpath);
00494 
00495                             /* serialize the message */
00496                             serialize_message(&sermsg, msg);
00497 
00498                             /* now send it */
00499                             if (StrLength(nexthop) == 0) {
00500                                    nexthop = Buf;
00501                             }
00502                             snprintf(filename,
00503                                     sizeof filename,
00504                                     "%s/%s@%lx%x",
00505                                     ctdl_netout_dir,
00506                                     ChrPtr(nexthop),
00507                                     time(NULL),
00508                                     rand()
00509                             );
00510                             QN_syslog(LOG_DEBUG, "Appending to %s\n", filename);
00511                             fp = fopen(filename, "ab");
00512                             if (fp != NULL) {
00513                                    fwrite(sermsg.ser, sermsg.len, 1, fp);
00514                                    fclose(fp);
00515                             }
00516                             else {
00517                                    QN_syslog(LOG_ERR, "%s: %s\n", filename, strerror(errno));
00518                             }
00519                             free(sermsg.ser);
00520                             CtdlFreeMessage(msg);
00521                             FreeStrBuf(&Buf);
00522                             return;
00523                      }
00524                      
00525                      else { /* invalid destination node name */
00526                             FreeStrBuf(&Buf);
00527 
00528                             network_bounce(msg,
00529 "A message you sent could not be delivered due to an invalid destination node"
00530 " name.  Please check the address and try sending the message again.\n");
00531                             msg = NULL;
00532                             return;
00533 
00534                      }
00535               }
00536        }
00537 
00538        /*
00539         * Check to see if we already have a copy of this message, and
00540         * abort its processing if so.  (We used to post a warning to Aide>
00541         * every time this happened, but the network is now so densely
00542         * connected that it's inevitable.)
00543         */
00544        if (network_usetable(msg) != 0) {
00545               CtdlFreeMessage(msg);
00546               return;
00547        }
00548 
00549        /* Learn network topology from the path */
00550        if ((msg->cm_fields['N'] != NULL) && (msg->cm_fields['P'] != NULL)) {
00551               network_learn_topology(msg->cm_fields['N'], 
00552                                    msg->cm_fields['P'], 
00553                                    the_netmap, 
00554                                    netmap_changed);
00555        }
00556 
00557        /* Is the sending node giving us a very persuasive suggestion about
00558         * which room this message should be saved in?  If so, go with that.
00559         */
00560        if (msg->cm_fields['C'] != NULL) {
00561               safestrncpy(target_room, msg->cm_fields['C'], sizeof target_room);
00562        }
00563 
00564        /* Otherwise, does it have a recipient?  If so, validate it... */
00565        else if (msg->cm_fields['R'] != NULL) {
00566               recp = validate_recipients(msg->cm_fields['R'], NULL, 0);
00567               if (recp != NULL) if (recp->num_error != 0) {
00568                      network_bounce(msg,
00569                             "A message you sent could not be delivered due to an invalid address.\n"
00570                             "Please check the address and try sending the message again.\n");
00571                      msg = NULL;
00572                      free_recipients(recp);
00573                      QNM_syslog(LOG_DEBUG, "Bouncing message due to invalid recipient address.\n");
00574                      return;
00575               }
00576               strcpy(target_room, "");    /* no target room if mail */
00577        }
00578 
00579        /* Our last shot at finding a home for this message is to see if
00580         * it has the O field (Originating room) set.
00581         */
00582        else if (msg->cm_fields['O'] != NULL) {
00583               safestrncpy(target_room, msg->cm_fields['O'], sizeof target_room);
00584        }
00585 
00586        /* Strip out fields that are only relevant during transit */
00587        if (msg->cm_fields['D'] != NULL) {
00588               free(msg->cm_fields['D']);
00589               msg->cm_fields['D'] = NULL;
00590        }
00591        if (msg->cm_fields['C'] != NULL) {
00592               free(msg->cm_fields['C']);
00593               msg->cm_fields['C'] = NULL;
00594        }
00595 
00596        /* save the message into a room */
00597        if (PerformNetprocHooks(msg, target_room) == 0) {
00598               msg->cm_flags = CM_SKIP_HOOKS;
00599               CtdlSubmitMsg(msg, recp, target_room, 0);
00600        }
00601        CtdlFreeMessage(msg);
00602        free_recipients(recp);
00603 }
00604 
00605 
00606 /*
00607  * Process a single message from a single file from the inbound queue 
00608  */
00609 void network_process_message(FILE *fp, 
00610                           long msgstart, 
00611                           long msgend,
00612                           HashList *working_ignetcfg,
00613                           HashList *the_netmap, 
00614                           int *netmap_changed)
00615 {
00616        long hold_pos;
00617        long size;
00618        char *buffer;
00619 
00620        hold_pos = ftell(fp);
00621        size = msgend - msgstart + 1;
00622        buffer = malloc(size);
00623        if (buffer != NULL) {
00624               fseek(fp, msgstart, SEEK_SET);
00625               if (fread(buffer, size, 1, fp) > 0) {
00626                      network_process_buffer(buffer, 
00627                                           size, 
00628                                           working_ignetcfg, 
00629                                           the_netmap, 
00630                                           netmap_changed);
00631               }
00632               free(buffer);
00633        }
00634 
00635        fseek(fp, hold_pos, SEEK_SET);
00636 }
00637 
00638 
00639 /*
00640  * Process a single file from the inbound queue 
00641  */
00642 void network_process_file(char *filename,
00643                        HashList *working_ignetcfg,
00644                        HashList *the_netmap, 
00645                        int *netmap_changed)
00646 {
00647        struct CitContext *CCC = CC;
00648        FILE *fp;
00649        long msgstart = (-1L);
00650        long msgend = (-1L);
00651        long msgcur = 0L;
00652        int ch;
00653 
00654 
00655        fp = fopen(filename, "rb");
00656        if (fp == NULL) {
00657               QN_syslog(LOG_CRIT, "Error opening %s: %s\n", filename, strerror(errno));
00658               return;
00659        }
00660 
00661        fseek(fp, 0L, SEEK_END);
00662        QN_syslog(LOG_INFO, "network: processing %ld bytes from %s\n", ftell(fp), filename);
00663        rewind(fp);
00664 
00665        /* Look for messages in the data stream and break them out */
00666        while (ch = getc(fp), ch >= 0) {
00667        
00668               if (ch == 255) {
00669                      if (msgstart >= 0L) {
00670                             msgend = msgcur - 1;
00671                             network_process_message(fp,
00672                                                  msgstart,
00673                                                  msgend,
00674                                                  working_ignetcfg,
00675                                                  the_netmap,
00676                                                  netmap_changed);
00677                      }
00678                      msgstart = msgcur;
00679               }
00680 
00681               ++msgcur;
00682        }
00683 
00684        msgend = msgcur - 1;
00685        if (msgstart >= 0L) {
00686               network_process_message(fp,
00687                                    msgstart,
00688                                    msgend,
00689                                    working_ignetcfg,
00690                                    the_netmap,
00691                                    netmap_changed);
00692        }
00693 
00694        fclose(fp);
00695        unlink(filename);
00696 }
00697 
00698 
00699 /*
00700  * Process anything in the inbound queue
00701  */
00702 void network_do_spoolin(HashList *working_ignetcfg, HashList *the_netmap, int *netmap_changed)
00703 {
00704        struct CitContext *CCC = CC;
00705        DIR *dp;
00706        struct dirent *d;
00707        struct stat statbuf;
00708        char filename[PATH_MAX];
00709        static time_t last_spoolin_mtime = 0L;
00710 
00711        /*
00712         * Check the spoolin directory's modification time.  If it hasn't
00713         * been touched, we don't need to scan it.
00714         */
00715        if (stat(ctdl_netin_dir, &statbuf)) return;
00716        if (statbuf.st_mtime == last_spoolin_mtime) {
00717               QNM_syslog(LOG_DEBUG, "network: nothing in inbound queue\n");
00718               return;
00719        }
00720        last_spoolin_mtime = statbuf.st_mtime;
00721        QNM_syslog(LOG_DEBUG, "network: processing inbound queue\n");
00722 
00723        /*
00724         * Ok, there's something interesting in there, so scan it.
00725         */
00726        dp = opendir(ctdl_netin_dir);
00727        if (dp == NULL) return;
00728 
00729        while (d = readdir(dp), d != NULL) {
00730               if ((strcmp(d->d_name, ".")) && (strcmp(d->d_name, ".."))) {
00731                      snprintf(filename, 
00732                             sizeof filename,
00733                             "%s/%s",
00734                             ctdl_netin_dir,
00735                             d->d_name
00736                      );
00737                      network_process_file(filename,
00738                                         working_ignetcfg,
00739                                         the_netmap,
00740                                         netmap_changed);
00741               }
00742        }
00743 
00744        closedir(dp);
00745 }
00746 
00747 /*
00748  * Step 1: consolidate files in the outbound queue into one file per neighbor node
00749  * Step 2: delete any files in the outbound queue that were for neighbors who no longer exist.
00750  */
00751 void network_consolidate_spoolout(HashList *working_ignetcfg, HashList *the_netmap)
00752 {
00753        struct CitContext *CCC = CC;
00754        IOBuffer IOB;
00755        FDIOBuffer FDIO;
00756         int d_namelen;
00757        DIR *dp;
00758        struct dirent *d;
00759        struct dirent *filedir_entry;
00760        const char *pch;
00761        char spooloutfilename[PATH_MAX];
00762        char filename[PATH_MAX];
00763        const StrBuf *nexthop;
00764        StrBuf *NextHop;
00765        int i;
00766        int nFailed = 0;
00767 
00768        /* Step 1: consolidate files in the outbound queue into one file per neighbor node */
00769        d = (struct dirent *)malloc(offsetof(struct dirent, d_name) + PATH_MAX + 1);
00770        if (d == NULL)       return;
00771 
00772        dp = opendir(ctdl_netout_dir);
00773        if (dp == NULL) {
00774               free(d);
00775               return;
00776        }
00777 
00778        NextHop = NewStrBuf();
00779        memset(&IOB, 0, sizeof(IOBuffer));
00780        memset(&FDIO, 0, sizeof(FDIOBuffer));
00781        FDIO.IOB = &IOB;
00782 
00783        while ((readdir_r(dp, d, &filedir_entry) == 0) &&
00784               (filedir_entry != NULL))
00785        {
00786 #ifdef _DIRENT_HAVE_D_NAMELEN
00787               d_namelen = filedir_entry->d_namelen;
00788 #else
00789 
00790 #ifndef DT_UNKNOWN
00791 #define DT_UNKNOWN     0
00792 #define DT_DIR         4
00793 #define DT_REG         8
00794 #define DT_LNK         10
00795 
00796 #define IFTODT(mode)   (((mode) & 0170000) >> 12)
00797 #define DTTOIF(dirtype)        ((dirtype) << 12)
00798 #endif
00799               d_namelen = strlen(filedir_entry->d_name);
00800 #endif
00801               if ((d_namelen > 1) && filedir_entry->d_name[d_namelen - 1] == '~')
00802                      continue; /* Ignore backup files... */
00803 
00804               if ((d_namelen == 1) && 
00805                   (filedir_entry->d_name[0] == '.'))
00806                      continue;
00807 
00808               if ((d_namelen == 2) && 
00809                   (filedir_entry->d_name[0] == '.') &&
00810                   (filedir_entry->d_name[1] == '.'))
00811                      continue;
00812 
00813               pch = strchr(filedir_entry->d_name, '@');
00814               if (pch == NULL)
00815                      continue;
00816 
00817               snprintf(filename, 
00818                       sizeof filename,
00819                       "%s/%s",
00820                       ctdl_netout_dir,
00821                       filedir_entry->d_name);
00822 
00823               StrBufPlain(NextHop,
00824                          filedir_entry->d_name,
00825                          pch - filedir_entry->d_name);
00826 
00827               snprintf(spooloutfilename,
00828                       sizeof spooloutfilename,
00829                       "%s/%s",
00830                       ctdl_netout_dir,
00831                       ChrPtr(NextHop));
00832 
00833               QN_syslog(LOG_DEBUG, "Consolidate %s to %s\n", filename, ChrPtr(NextHop));
00834               if (network_talking_to(SKEY(NextHop), NTT_CHECK)) {
00835                      nFailed++;
00836                      QN_syslog(LOG_DEBUG,
00837                               "Currently online with %s - skipping for now\n",
00838                               ChrPtr(NextHop)
00839                             );
00840               }
00841               else {
00842                      size_t dsize;
00843                      size_t fsize;
00844                      int fd;
00845                      const char *err = NULL;
00846                      network_talking_to(SKEY(NextHop), NTT_ADD);
00847 
00848                      IOB.fd = open(filename, O_RDONLY);
00849                      if (IOB.fd == -1) {
00850                             nFailed++;
00851                             QN_syslog(LOG_ERR,
00852                                      "failed to open %s for reading due to %s; skipping.\n",
00853                                      filename, strerror(errno)
00854                                    );
00855                             network_talking_to(SKEY(NextHop), NTT_REMOVE);
00856                             continue;                          
00857                      }
00858                      
00859                      fd = open(spooloutfilename,
00860                               O_EXCL|O_CREAT|O_NONBLOCK|O_WRONLY, 
00861                               S_IRUSR|S_IWUSR);
00862                      if (fd == -1)
00863                      {
00864                             fd = open(spooloutfilename,
00865                                      O_EXCL|O_NONBLOCK|O_WRONLY, 
00866                                      S_IRUSR | S_IWUSR);
00867                      }
00868                      if (fd == -1) {
00869                             nFailed++;
00870                             QN_syslog(LOG_ERR,
00871                                      "failed to open %s for reading due to %s; skipping.\n",
00872                                      spooloutfilename, strerror(errno)
00873                                    );
00874                             close(IOB.fd);
00875                             network_talking_to(SKEY(NextHop), NTT_REMOVE);
00876                             continue;
00877                      }
00878                      dsize = lseek(fd, 0, SEEK_END);
00879                      fsize = lseek(IOB.fd, 0, SEEK_END);
00880                      
00881                      FDIOBufferInit(&FDIO, &IOB, fd, fsize + dsize);
00882                      FDIO.ChunkSendRemain = fsize;
00883                      FDIO.TotalSentAlready = dsize;
00884                      err = NULL;
00885                      do {} while ((FileMoveChunked(&FDIO, &err) > 0) && (err == NULL));
00886                      if (err == NULL) {
00887                             unlink(filename);
00888                      }
00889                      else {
00890                             nFailed++;
00891                             QN_syslog(LOG_ERR,
00892                                      "failed to append to %s [%s]; rolling back..\n",
00893                                      spooloutfilename, strerror(errno)
00894                                    );
00895                             /* whoops partial append?? truncate spooloutfilename again! */
00896                             ftruncate(fd, dsize);
00897                      }
00898                      FDIOBufferDelete(&FDIO);
00899                      close(IOB.fd);
00900                      close(fd);
00901                      network_talking_to(SKEY(NextHop), NTT_REMOVE);
00902               }
00903        }
00904        closedir(dp);
00905 
00906        if (nFailed > 0) {
00907               FreeStrBuf(&NextHop);
00908               QN_syslog(LOG_INFO,
00909                        "skipping Spoolcleanup because of %d files unprocessed.\n",
00910                        nFailed
00911                      );
00912 
00913               return;
00914        }
00915 
00916        /* Step 2: delete any files in the outbound queue that were for neighbors who no longer exist */
00917        dp = opendir(ctdl_netout_dir);
00918        if (dp == NULL) {
00919               FreeStrBuf(&NextHop);
00920               free(d);
00921               return;
00922        }
00923 
00924        while ((readdir_r(dp, d, &filedir_entry) == 0) &&
00925               (filedir_entry != NULL))
00926        {
00927 #ifdef _DIRENT_HAVE_D_NAMELEN
00928               d_namelen = filedir_entry->d_namelen;
00929               d_type = filedir_entry->d_type;
00930 #else
00931 
00932 #ifndef DT_UNKNOWN
00933 #define DT_UNKNOWN     0
00934 #define DT_DIR         4
00935 #define DT_REG         8
00936 #define DT_LNK         10
00937 
00938 #define IFTODT(mode)   (((mode) & 0170000) >> 12)
00939 #define DTTOIF(dirtype)        ((dirtype) << 12)
00940 #endif
00941               d_namelen = strlen(filedir_entry->d_name);
00942 #endif
00943               if ((d_namelen == 1) && 
00944                   (filedir_entry->d_name[0] == '.'))
00945                      continue;
00946 
00947               if ((d_namelen == 2) && 
00948                   (filedir_entry->d_name[0] == '.') &&
00949                   (filedir_entry->d_name[1] == '.'))
00950                      continue;
00951 
00952               pch = strchr(filedir_entry->d_name, '@');
00953               if (pch == NULL) /* no @ in name? consolidated file. */
00954                      continue;
00955 
00956               StrBufPlain(NextHop,
00957                          filedir_entry->d_name,
00958                          pch - filedir_entry->d_name);
00959 
00960               snprintf(filename, 
00961                      sizeof filename,
00962                      "%s/%s",
00963                      ctdl_netout_dir,
00964                      filedir_entry->d_name
00965               );
00966 
00967               i = is_valid_node(&nexthop,
00968                               NULL,
00969                               NextHop,
00970                               working_ignetcfg,
00971                               the_netmap);
00972        
00973               if ( (i != 0) || (StrLength(nexthop) > 0) ) {
00974                      unlink(filename);
00975               }
00976        }
00977        FreeStrBuf(&NextHop);
00978        free(d);
00979        closedir(dp);
00980 }
00981 
00982 
00983 
00984 
00985 /*
00986  * It's ok if these directories already exist.  Just fail silently.
00987  */
00988 void create_spool_dirs(void) {
00989        if ((mkdir(ctdl_spool_dir, 0700) != 0) && (errno != EEXIST))
00990               syslog(LOG_EMERG, "unable to create directory [%s]: %s", ctdl_spool_dir, strerror(errno));
00991        if (chown(ctdl_spool_dir, CTDLUID, (-1)) != 0)
00992               syslog(LOG_EMERG, "unable to set the access rights for [%s]: %s", ctdl_spool_dir, strerror(errno));
00993        if ((mkdir(ctdl_netin_dir, 0700) != 0) && (errno != EEXIST))
00994               syslog(LOG_EMERG, "unable to create directory [%s]: %s", ctdl_netin_dir, strerror(errno));
00995        if (chown(ctdl_netin_dir, CTDLUID, (-1)) != 0)
00996               syslog(LOG_EMERG, "unable to set the access rights for [%s]: %s", ctdl_netin_dir, strerror(errno));
00997        if ((mkdir(ctdl_nettmp_dir, 0700) != 0) && (errno != EEXIST))
00998               syslog(LOG_EMERG, "unable to create directory [%s]: %s", ctdl_nettmp_dir, strerror(errno));
00999        if (chown(ctdl_nettmp_dir, CTDLUID, (-1)) != 0)
01000               syslog(LOG_EMERG, "unable to set the access rights for [%s]: %s", ctdl_nettmp_dir, strerror(errno));
01001        if ((mkdir(ctdl_netout_dir, 0700) != 0) && (errno != EEXIST))
01002               syslog(LOG_EMERG, "unable to create directory [%s]: %s", ctdl_netout_dir, strerror(errno));
01003        if (chown(ctdl_netout_dir, CTDLUID, (-1)) != 0)
01004               syslog(LOG_EMERG, "unable to set the access rights for [%s]: %s", ctdl_netout_dir, strerror(errno));
01005 }
01006 
01007 /*
01008  * Module entry point
01009  */
01010 CTDL_MODULE_INIT(network_spool)
01011 {
01012        if (!threading)
01013        {
01014               create_spool_dirs();
01016        }
01017        return "network_spool";
01018 }