Back to index

radiance  4R0+20100331
netproc.c
Go to the documentation of this file.
00001 #ifndef lint
00002 static const char    RCSid[] = "$Id: netproc.c,v 2.17 2010/03/15 21:31:50 greg Exp $";
00003 #endif
00004 /*
00005  * Parallel network process handling routines
00006  */
00007 
00008 #include <stdlib.h>
00009 #include <stdio.h>
00010 #include <string.h>
00011 #include <signal.h>
00012 #include <fcntl.h>
00013 #include <unistd.h>
00014 #include <sys/wait.h>
00015 
00016 #include "rtmisc.h"
00017 #include "selcall.h"
00018 #include "netproc.h"
00019 #include "paths.h"
00020 
00021 PSERVER       *pslist = NULL;             /* global process server list */
00022 
00023 static NETPROC       *pindex[FD_SETSIZE]; /* process index table */
00024 
00025 static char   ourhost[64];  /* this host name */
00026 static char   ourdir[PATH_MAX];    /* our working directory */
00027 static char   ouruser[32];  /* our user name */
00028 static char   *ourshell;    /* our user's shell */
00029 
00030 static fd_set errdesc;      /* error file descriptors */
00031 static int    maxfd;        /* maximum assigned descriptor */
00032 
00033 extern char   *remsh;              /* externally defined remote shell program */
00034 
00035 static int readerrs(int     fd);
00036 static void wait4end(void);
00037 static int finishjob(PSERVER       *ps, int      pn, int       status);
00038 
00039 
00040 extern PSERVER *
00041 addpserver(          /* add a new process server */
00042        char   *host,
00043        char   *dir,
00044        char   *usr,
00045        int    np
00046 )
00047 {
00048        register PSERVER     *ps;
00049                                    /* allocate the struct */
00050        if (np < 1)
00051               return(NULL);
00052        ps = (PSERVER *)malloc(sizeof(PSERVER)+(np-1)*sizeof(NETPROC));
00053        if (ps == NULL)
00054               return(NULL);
00055        if (!ourhost[0]) {          /* initialize */
00056               char   dirtmp[PATH_MAX];
00057               register char *cp;
00058               register int  len;
00059 
00060               strcpy(ourhost, myhostname());
00061               getcwd(dirtmp, sizeof(dirtmp));
00062               if ((cp = getenv("HOME")) != NULL) {
00063                      if (!strcmp(cp, dirtmp))
00064                             ourdir[0] = '\0';
00065                      else if (!strncmp(cp, dirtmp, len=strlen(cp)) &&
00066                                    dirtmp[len] == '/')
00067                             strcpy(ourdir, dirtmp+len+1);
00068                      else
00069                             strcpy(ourdir, dirtmp);
00070               } else
00071                      strcpy(ourdir, dirtmp);
00072               if ((cp = getenv("USER")) != NULL)
00073                      strcpy(ouruser, cp);
00074               if ((ourshell = getenv("SHELL")) == NULL)
00075                      ourshell = "/bin/sh";
00076               FD_ZERO(&errdesc);
00077               maxfd = -1;
00078        }
00079                                    /* assign host, directory, user */
00080        if (host == NULL || !strcmp(host, ourhost) ||
00081                      !strcmp(host, LHOSTNAME))
00082               ps->hostname[0] = '\0';
00083        else
00084               strcpy(ps->hostname, host);
00085        if (dir == NULL)
00086               strcpy(ps->directory, ourdir);
00087        else
00088               strcpy(ps->directory, dir);
00089        if (usr == NULL || !strcmp(usr, ouruser))
00090               ps->username[0] = '\0';
00091        else
00092               strcpy(ps->username, usr);
00093                                    /* clear process slots */
00094        ps->nprocs = np;
00095        while (np--) {
00096               ps->proc[np].com = NULL;
00097               ps->proc[np].pid = -1;
00098               ps->proc[np].efd = -1;
00099               ps->proc[np].errs = NULL;
00100               ps->proc[np].elen = 0;
00101               ps->proc[np].cf = NULL;
00102        }
00103                                    /* insert in our list */
00104        ps->next = pslist;
00105        pslist = ps;
00106                                    /* check for signs of life */
00107        if (!pserverOK(ps)) {
00108               delpserver(ps);                    /* failure -- abort */
00109               return(NULL);
00110        }
00111        return(ps);
00112 }
00113 
00114 
00115 extern void
00116 delpserver(                        /* delete a process server */
00117        PSERVER       *ps
00118 )
00119 {
00120        PSERVER       pstart;
00121        register PSERVER     *psp;
00122        register int  i;
00123                                    /* find server in our list */
00124        pstart.next = pslist;
00125        for (psp = &pstart; ps != psp->next; psp = psp->next)
00126               if (psp->next == NULL)
00127                      return;                     /* not in our list! */
00128                                    /* kill any running jobs */
00129        for (i = 0; i < ps->nprocs; i++)
00130               if (ps->proc[i].com != NULL) {
00131                      kill(SIGTERM, ps->proc[i].pid);
00132                      wait4job(ps, ps->proc[i].pid);
00133               }
00134                                    /* remove server from list */
00135        psp->next = ps->next;
00136        pslist = pstart.next;
00137        free((void *)ps);           /* free associated memory */
00138 }
00139 
00140 
00141 extern PSERVER *
00142 findjob(                    /* find out where process is running */
00143        register int  *pnp   /* modified */
00144 )
00145 {
00146        register PSERVER     *ps;
00147        register int  i;
00148 
00149        for (ps = pslist; ps != NULL; ps = ps->next)
00150               for (i = 0; i < ps->nprocs; i++)
00151                      if (ps->proc[i].pid == *pnp) {
00152                             *pnp = i;
00153                             return(ps);
00154                      }
00155        return(NULL);        /* not found */
00156 }
00157 
00158 
00159 extern int
00160 startjob(     /* start a job on a process server */
00161        register PSERVER     *ps,
00162        char   *command,
00163        pscompfunc *compf
00164 )
00165 {
00166        char   udirt[PATH_MAX];
00167        char   *av[16];
00168        int    pfd[2], pid;
00169        register int  i;
00170 
00171        if (ps == NULL) {           /* find a server */
00172               for (ps = pslist; ps != NULL; ps = ps->next)
00173                      if ((i = startjob(ps, command, compf)) != -1)
00174                             return(i);           /* got one */
00175               return(-1);   /* no slots anywhere */
00176        }
00177        for (i = 0; i < ps->nprocs; i++)
00178               if (ps->proc[i].com == NULL)
00179                      break;
00180        if (i >= ps->nprocs)
00181               return(-1);          /* out of process slots */
00182                                    /* open pipe */
00183        if (pipe(pfd) < 0) {
00184               perror("cannot open pipe");
00185               exit(1);
00186        }
00187                                    /* start child process */
00188        if ((pid = fork()) == 0) {
00189               close(pfd[0]);                     /* connect stderr to pipe */
00190               if (pfd[1] != 2) {
00191                      dup2(pfd[1], 2);
00192                      close(pfd[1]);
00193               }
00194               if (ps->hostname[0]) {             /* rsh command */
00195                      av[i=0] = remsh;
00196                      av[++i] = ps->hostname;
00197                      av[++i] = "-n";                    /* no stdin */
00198                      if (ps->username[0]) {             /* different user */
00199                             av[++i] = "-l";
00200                             av[++i] = ps->username;
00201                             if (ps->directory[0] != '/') {
00202                                    av[++i] = "cd";
00203                                    udirt[0] = '~';
00204                                    strcpy(udirt+1, ouruser);
00205                                    av[++i] = udirt;
00206                                    av[++i] = ";";
00207                             }
00208                      }
00209                      if (ps->directory[0]) {            /* change directory */
00210                             av[++i] = "cd";
00211                             av[++i] = ps->directory;
00212                             av[++i] = ";";
00213                      }
00214                      av[++i] = command;
00215                      av[++i] = NULL;
00216               } else {                    /* shell command */
00217                      av[0] = ourshell;
00218                      av[1] = "-c";
00219                      av[2] = command;
00220                      av[3] = NULL;
00221               }
00222               execv(av[0], av);
00223               _exit(1);
00224        }
00225        if (pid == -1) {
00226               perror("fork failed");
00227               exit(1);
00228        }
00229        ps->proc[i].com = command;  /* assign process slot */
00230        ps->proc[i].cf = compf;
00231        ps->proc[i].pid = pid;
00232        close(pfd[1]);                     /* get piped stderr file descriptor */
00233        ps->proc[i].efd = pfd[0];
00234        fcntl(pfd[0], F_SETFD, FD_CLOEXEC);       /* set close on exec flag */
00235        pindex[pfd[0]] = ps->proc + i;     /* assign error fd index */
00236        FD_SET(pfd[0], &errdesc);   /* add to select call parameter */
00237        if (pfd[0] > maxfd)
00238               maxfd = pfd[0];
00239        return(pid);                /* return to parent process */
00240 }
00241 
00242 
00243 static int
00244 readerrs(                   /* read error output from fd */
00245        int    fd
00246 )
00247 {
00248        char   errbuf[BUFSIZ];
00249        int    nr;
00250        register NETPROC     *pp;
00251                             /* look up associated process */
00252        if ((pp = pindex[fd]) == NULL)
00253               abort();             /* serious consistency error */
00254        nr = read(fd, errbuf, BUFSIZ-1);
00255        if (nr < 0) {
00256               perror("read error");
00257               exit(1);
00258        }
00259        if (nr == 0)         /* stream closed (process finished) */
00260               return(0);
00261        errbuf[nr] = '\0';   /* add to error buffer */
00262        if (pp->elen == 0)
00263               pp->errs = (char *)malloc(nr+1);
00264        else
00265               pp->errs = (char *)realloc((void *)pp->errs, pp->elen+nr+1);
00266        if (pp->errs == NULL) {
00267               perror("malloc failed");
00268               exit(1);
00269        }
00270        strcpy(pp->errs+pp->elen, errbuf);
00271        pp->elen += nr;
00272        return(nr);
00273 }
00274 
00275 
00276 static void
00277 wait4end(void)                     /* read error streams until someone is done */
00278 {
00279        fd_set readfds, excepfds;
00280        register int  i;
00281                             /* find end of descriptor set */
00282        for ( ; maxfd >= 0; maxfd--)
00283               if (FD_ISSET(maxfd, &errdesc))
00284                      break;
00285        if (maxfd < 0)
00286               return;              /* nothing to read */
00287        readfds = excepfds = errdesc;
00288        while (select(maxfd+1, &readfds, NULL, &excepfds, NULL) > 0)
00289               for (i = 0; i <= maxfd; i++)              /* get pending i/o */
00290                      if (FD_ISSET(i, &readfds) || FD_ISSET(i, &excepfds))
00291                             if (readerrs(i) == 0)
00292                                    return;              /* finished process */
00293        perror("select call failed");
00294        exit(1);
00295 }
00296 
00297 
00298 static int
00299 finishjob(    /* clean up finished process */
00300        PSERVER       *ps,
00301        int    pn,
00302        int    status
00303 )
00304 {
00305        register NETPROC     *pp;
00306 
00307        pp = ps->proc + pn;
00308        if (pp->cf != NULL)                /* client cleanup */
00309               status = (*pp->cf)(ps, pn, status);
00310        close(pp->efd);                           /* close error stream */
00311        pindex[pp->efd] = NULL;
00312        FD_CLR(pp->efd, &errdesc);
00313        free((void *)pp->errs);
00314        pp->com = NULL;                           /* clear settings */
00315        pp->pid = -1;
00316        pp->efd = -1;
00317        pp->errs = NULL;
00318        pp->elen = 0;
00319        pp->cf = NULL;
00320        return(status);
00321 }
00322 
00323 
00324 extern int
00325 wait4job(            /* wait for process to finish */
00326        PSERVER       *ps,
00327        int    pid
00328 )
00329 {
00330        int    status, psn, psn2;
00331        PSERVER       *ps2;
00332 
00333        if (pid == -1) {                   /* wait for first job */
00334               if (ps != NULL) {
00335                      for (psn = ps->nprocs; psn--; )
00336                             if (ps->proc[psn].com != NULL)
00337                                    break;
00338                      if (psn < 0)
00339                             return(-1);   /* no processes this server */
00340               }
00341               do {
00342                      wait4end();          /* wait for something to end */
00343                      if ((psn2 = wait(&status)) == -1)
00344                             return(-1);   /* none left */
00345                      ps2 = findjob(&psn2);
00346                      if (ps2 != NULL)     /* clean up job if ours */
00347                             status = finishjob(ps2, psn2, status);
00348               } while (ps2 == NULL || (ps != NULL && ps2 != ps));
00349               return(status);                    /* return job status */
00350        }
00351        psn = pid;                         /* else find specific job */
00352        ps2 = findjob(&psn);               /* find process slot */
00353        if (ps2 == NULL || (ps != NULL && ps2 != ps))
00354               return(-1);          /* inconsistent target */
00355        ps = ps2;
00356        do {
00357               wait4end();                 /* wait for something to end */
00358               if ((psn2 = wait(&status)) == -1)
00359                      return(-1);          /* none left */
00360               ps2 = findjob(&psn2);
00361               if (ps2 != NULL)            /* clean up job if ours */
00362                      status = finishjob(ps2, psn2, status);
00363        } while (ps2 != ps || psn2 != psn);
00364        return(status);                           /* return job status */
00365 }