Back to index

avfs  1.0.1
filtprog.c
Go to the documentation of this file.
00001 /*  
00002     AVFS: A Virtual File System Library
00003     Copyright (C) 1998  Miklos Szeredi <miklos@szeredi.hu>
00004     
00005     This program can be distributed under the terms of the GNU GPL.
00006     See the file COPYING.
00007 */
00008 
00009 #include "filtprog.h"
00010 #include "filebuf.h"
00011 #include "prog.h"
00012 #include "oper.h"
00013 
00014 #include <unistd.h>
00015 #include <sys/time.h>
00016 
00017 struct filtprog {
00018     vfile *vf;
00019     struct filtdata *filtdat;
00020 };
00021 
00022 #define cbufsize 16384
00023 
00024 struct filtconn {
00025     struct filtprog *fp;
00026     struct filebuf *fbs[3];
00027     struct proginfo pri;
00028     int cbufat;
00029     int cbuflen;
00030     char cbuf[cbufsize];
00031 };
00032 
00033 static int filtprog_fill_cbuf(struct filtconn *fc)
00034 {
00035     avssize_t res;
00036 
00037     res = av_read(fc->fp->vf, fc->cbuf, cbufsize);
00038     if(res < 0)
00039         return res;
00040 
00041     if(res == 0) {
00042         av_unref_obj(fc->fbs[0]);
00043         fc->fbs[0] = NULL;
00044     }
00045     else {
00046         fc->cbuflen = res;
00047         fc->cbufat = 0;
00048     }
00049 
00050     return 0;
00051 }
00052 
00053 static int filtprog_check_error(struct filtconn *fc)
00054 {
00055     char *line;
00056     int res;
00057     int gotsome = 0;
00058 
00059     do {
00060         res = av_filebuf_readline(fc->fbs[2], &line);
00061         if(res < 0)
00062             return res;
00063 
00064         if(res == 1) {
00065             av_log(AVLOG_ERROR, "%s stderr: %s", fc->fp->filtdat->prog[0],
00066                      line);
00067             av_free(line);
00068             gotsome = 1;
00069         }
00070     } while(res == 1);
00071 
00072     return gotsome;
00073 }
00074 
00075 static int filtprog_write_input(struct filtconn *fc)
00076 {
00077     int res;
00078 
00079     if(fc->cbuflen == 0) {
00080         res = filtprog_fill_cbuf(fc);
00081         if(res < 0)
00082             return res;
00083 
00084         if(fc->fbs[0] == NULL)
00085             return 0;
00086     }
00087     
00088     res = av_filebuf_write(fc->fbs[0], fc->cbuf + fc->cbufat,
00089                              fc->cbuflen);
00090     if(res < 0)
00091         return res;
00092     
00093     fc->cbufat += res;
00094     fc->cbuflen -= res;
00095 
00096     return 0;
00097 }
00098 
00099 static avssize_t filtprog_read(void *data, char *buf, avsize_t nbyte)
00100 {
00101     avssize_t res;
00102     struct filtconn *fc = (struct filtconn *) data;
00103 
00104     while(1) {
00105         res = filtprog_check_error(fc);
00106         if(res < 0)
00107             return res;
00108 
00109         if(res == 0) {
00110             res = av_filebuf_read(fc->fbs[1], buf, nbyte);
00111             if(res != 0)
00112                 return res;
00113             
00114             if(av_filebuf_eof(fc->fbs[1])) {
00115                 res = av_wait_prog(&fc->pri, 0, 0);
00116                 if(res < 0)
00117                     return res;
00118 
00119                 return 0;
00120             }
00121             
00122             if(fc->fbs[0] != NULL) {
00123                 res = filtprog_write_input(fc);
00124                 if(res < 0)
00125                     return res;
00126             }
00127         }
00128 
00129         res = av_filebuf_check(fc->fbs, 3, -1);
00130         if(res < 0)
00131             return res;
00132     }
00133 }
00134 
00135 static int filtprog_read_input(struct filtconn *fc)
00136 {
00137     avssize_t res;
00138 
00139     res  = av_filebuf_read(fc->fbs[1], fc->cbuf + fc->cbufat,
00140                              cbufsize - fc->cbufat);
00141     
00142     if(res > 0) {
00143         fc->cbufat += res;
00144         if(fc->cbufat == cbufsize) {
00145             res = av_write(fc->fp->vf, fc->cbuf, fc->cbufat);
00146             fc->cbufat = 0;
00147         }
00148     }
00149 
00150     return res;
00151 }
00152 
00153 static avssize_t filtprog_write(void *data, const char *buf, avsize_t nbyte)
00154 {
00155     avssize_t res;
00156     struct filtconn *fc = (struct filtconn *) data;
00157 
00158     while(1) {
00159         res = filtprog_check_error(fc);
00160         if(res < 0)
00161             return res;
00162 
00163         if(res == 0) {
00164             res = av_filebuf_write(fc->fbs[0], buf, nbyte);
00165             if(res != 0)
00166                 return res;
00167             
00168             res = filtprog_read_input(fc);
00169             if(res < 0)
00170                 return res;
00171         }
00172 
00173         res = av_filebuf_check(fc->fbs, 3, -1);
00174         if(res < 0)
00175             return res;
00176     }
00177 }
00178 
00179 static int filtprog_endput(void *data)
00180 {
00181     int res;
00182     struct filtconn *fc = (struct filtconn *) data;
00183     
00184     av_unref_obj(fc->fbs[0]);
00185     fc->fbs[0] = NULL;
00186 
00187     while(1) {
00188         res = filtprog_check_error(fc);
00189         if(res < 0)
00190             return res;
00191 
00192         if(res == 0) {
00193             res = filtprog_read_input(fc);
00194             if(res < 0)
00195                 return res;
00196 
00197             if(av_filebuf_eof(fc->fbs[1]))
00198                 break;
00199         }
00200 
00201         res = av_filebuf_check(fc->fbs, 3, -1);
00202         if(res < 0)
00203             return res;        
00204     }
00205 
00206     res = av_write(fc->fp->vf, fc->cbuf, fc->cbufat);
00207     if(res < 0)
00208         return res;
00209 
00210     res = av_wait_prog(&fc->pri, 0, 0);
00211     if(res < 0)
00212         return res;
00213 
00214     return 0;
00215 }
00216 
00217 static void filtprog_stop(struct filtconn *fc)
00218 {
00219     av_unref_obj(fc->fbs[0]);
00220     av_unref_obj(fc->fbs[1]);   
00221     av_unref_obj(fc->fbs[2]);
00222     av_wait_prog(&fc->pri, 1, 0);
00223     av_lseek(fc->fp->vf, 0, AVSEEK_SET);
00224 }
00225 
00226 static int filtprog_init_pipes(int pipein[2], int pipeout[2], int pipeerr[2])
00227 {
00228     int res;
00229 
00230     pipein[0] = -1,  pipein[1] = -1;
00231     pipeout[0] = -1, pipeout[1] = -1;
00232     pipeerr[0] = -1, pipeerr[1] = -1;
00233 
00234     if(pipe(pipein) == -1 || pipe(pipeout) == -1 || pipe(pipeerr) == -1) {
00235         res = -errno;
00236         close(pipein[0]), close(pipein[1]);
00237         close(pipeout[0]), close(pipeout[1]);
00238         return res;
00239     }
00240 
00241     av_registerfd(pipein[1]);
00242     av_registerfd(pipeout[0]);
00243     av_registerfd(pipeerr[0]);
00244 
00245     return 0;
00246 }
00247 
00248 static int filtprog_start(struct filtprog *fp, char **prog,
00249                           struct filtconn **resp)
00250 {
00251     struct filtconn *fc;
00252     int res;
00253     int pipein[2];
00254     int pipeout[2];
00255     int pipeerr[2];
00256     struct proginfo pri;
00257 
00258     res = filtprog_init_pipes(pipein, pipeout, pipeerr);
00259     if(res < 0)
00260         return res;
00261 
00262     av_init_proginfo(&pri);
00263     
00264     pri.prog = (const char **) prog;
00265     pri.ifd = pipein[0];
00266     pri.ofd = pipeout[1];
00267     pri.efd = pipeerr[1];
00268 
00269     res = av_start_prog(&pri);
00270     close(pri.ifd);
00271     close(pri.ofd);
00272     close(pri.efd);
00273 
00274     if(res < 0) {
00275        close(pipein[1]);
00276        close(pipeout[0]);
00277        close(pipeerr[0]);
00278        return res;
00279     }
00280 
00281     AV_NEW_OBJ(fc, filtprog_stop);
00282     
00283     fc->fp = fp;
00284     fc->fbs[0] = av_filebuf_new(pipein[1], FILEBUF_NONBLOCK | FILEBUF_WRITE);
00285     fc->fbs[1] = av_filebuf_new(pipeout[0], FILEBUF_NONBLOCK);
00286     fc->fbs[2] = av_filebuf_new(pipeerr[0], FILEBUF_NONBLOCK);
00287     fc->pri = pri;
00288     fc->cbufat = 0;
00289     fc->cbuflen = 0;
00290 
00291     *resp = fc;
00292     return 0;
00293 }
00294 
00295 static int filtprog_startget(void *data, void **resp)
00296 {
00297     int res;
00298     struct filtprog *fp = (struct filtprog *) data;
00299     struct filtconn *fc;
00300 
00301     res = filtprog_start(fp, fp->filtdat->prog, &fc);
00302     if(res < 0)
00303         return res;
00304 
00305     *resp = fc;
00306 
00307     return 0;
00308 }
00309 
00310 static int filtprog_startput(void *data, void **resp)
00311 {
00312     int res;
00313     struct filtprog *fp = (struct filtprog *) data;
00314     struct filtconn *fc;
00315 
00316     res = av_ftruncate(fp->vf, 0);
00317     if(res < 0)
00318         return res;
00319 
00320     res = filtprog_start(fp, fp->filtdat->revprog, &fc);
00321     if(res < 0)
00322         return res;
00323 
00324     *resp = fc;
00325 
00326     return 0;
00327 }
00328 
00329 struct sfile *av_filtprog_new(vfile *vf, struct filtdata *filtdat)
00330 {
00331     struct filtprog *fp;
00332     struct sfile *sf;
00333     static const struct sfilefuncs func = {
00334         filtprog_startget,
00335         filtprog_read,
00336         filtprog_startput,
00337         filtprog_write,
00338         filtprog_endput
00339     };
00340 
00341     AV_NEW_OBJ(fp, NULL);
00342     fp->vf = vf;
00343     fp->filtdat = filtdat;
00344 
00345     sf = av_sfile_new(&func, fp, 0);
00346 
00347     return sf;
00348 }
00349 
00350 void av_filtprog_change(struct sfile *sf, vfile *newvf)
00351 {
00352     struct filtprog *fp = (struct filtprog *) av_sfile_getdata(sf);
00353     
00354     fp->vf = newvf;
00355 }