Back to index

courier  0.68.2
pthread.c
Go to the documentation of this file.
00001 /*
00002 ** Copyright 2000-2007 Double Precision, Inc.  See COPYING for
00003 ** distribution information.
00004 */
00005 
00006 
00007 #include      "threadlib.h"
00008 
00009 #include      <pthread.h>
00010 #include      <stdio.h>
00011 #include      <errno.h>
00012 #include      <stdlib.h>
00013 #include      <unistd.h>
00014 
00015 typedef struct {
00016        unsigned      me;
00017        pthread_t     pt;
00018 
00019        pthread_cond_t       gocond;
00020        pthread_mutex_t      gomutex;
00021        int           goflag;
00022 
00023        struct        cthreadinfo *myinfo;
00024 
00025        void          *metadata;
00026        } cthread_t;
00027 
00028 struct cthreadinfo {
00029        unsigned nthreads;
00030        void (*workfunc)(void *);
00031        void (*cleanupfunc)(void *);
00032        cthread_t *threads;
00033        char *metadata_buf;
00034        int *newbuf;
00035        int newcnt;
00036 
00037        pthread_cond_t       newtask_cond;
00038        pthread_mutex_t      newtask_mutex;
00039 
00040        pthread_mutex_t      cleanup_mutex;
00041        } ;
00042 
00043 typedef struct cthreadinfo cthreadinfo_t;
00044 
00045 static void mutexcleanup(void *vp)
00046 {
00047 pthread_mutex_t *mp=(pthread_mutex_t *)vp;
00048 
00049        pthread_mutex_unlock( mp );
00050 }
00051 
00052 #define PTHREAD_CHK(x) (errno=(x))
00053 
00054 /*
00055 **  This is the thread function wrapper.  It waits for its conditional
00056 **  signal, runs the workhorse function, then puts its task id onto the ready
00057 **  queue, then goes back to waiting for another go signal.
00058 */
00059 
00060 static void *threadfunc(void *p)
00061 {
00062 cthread_t     *c= (cthread_t *)p;
00063 cthreadinfo_t *i= c->myinfo;
00064 
00065        for (;;)
00066        {
00067               while (PTHREAD_CHK(pthread_mutex_lock(&c->gomutex)))
00068               {
00069                      perror("pthread_mutex_lock");
00070                      sleep(3);
00071               }
00072 
00073               pthread_cleanup_push(&mutexcleanup, (void *)&c->gomutex);
00074 
00075               while (!c->goflag)
00076               {
00077                      if (PTHREAD_CHK(pthread_cond_wait(&c->gocond,
00078                                                    &c->gomutex)))
00079                      {
00080                             perror("pthread_cond_wait");
00081                             sleep(3);
00082                      }
00083               }
00084               c->goflag=0;
00085               if (PTHREAD_CHK(pthread_mutex_unlock( &c->gomutex )))
00086                      perror("pthread_mutex_unlock");
00087 
00088               pthread_cleanup_pop(0);
00089 
00090               (*i->workfunc)(c->metadata);
00091 
00092               while (PTHREAD_CHK(pthread_mutex_lock(&i->cleanup_mutex)))
00093               {
00094                      perror("pthread_mutex_lock");
00095                      sleep(3);
00096               }
00097 
00098               pthread_cleanup_push(&mutexcleanup, (void *)&i->cleanup_mutex);
00099 
00100               (*i->cleanupfunc)(c->metadata);
00101 
00102               if (PTHREAD_CHK(pthread_mutex_unlock( &i->cleanup_mutex )))
00103                      perror("pthread_mutex_unlock");
00104 
00105               pthread_cleanup_pop(0);
00106 
00107               while (PTHREAD_CHK(pthread_mutex_lock(&i->newtask_mutex)))
00108               {
00109                      perror("pthread_mutex_lock");
00110                      sleep(3);
00111               }
00112 
00113               i->newbuf[i->newcnt++] = c->me;
00114 
00115               if (PTHREAD_CHK(pthread_mutex_unlock( &i->newtask_mutex )))
00116                      perror("pthread_mutex_unlock");
00117 
00118               if (PTHREAD_CHK(pthread_cond_signal( &i->newtask_cond )))
00119                      perror("pthread_cond_signal");
00120        }
00121 }
00122 
00123 static int initcondmutex(pthread_cond_t *c, pthread_mutex_t *m)
00124 {
00125 pthread_condattr_t cattr;
00126 pthread_mutexattr_t mattr;
00127 
00128        if (c)
00129        {
00130               if ( PTHREAD_CHK(pthread_condattr_init(&cattr))) return (-1);
00131               if ( PTHREAD_CHK(pthread_cond_init(c, &cattr)))
00132               {
00133                      pthread_condattr_destroy(&cattr);
00134                      return (-1);
00135               }
00136               pthread_condattr_destroy(&cattr);
00137        }
00138 
00139        if ( PTHREAD_CHK(pthread_mutexattr_init(&mattr)))
00140        {
00141               if (c) pthread_cond_destroy(c);
00142               return (-1);
00143        }
00144 
00145        if ( PTHREAD_CHK(pthread_mutex_init(m, &mattr)))
00146        {
00147               pthread_mutexattr_destroy(&mattr);
00148 
00149               if (c) pthread_cond_destroy(c);
00150               return (-1);
00151        }
00152        pthread_mutexattr_destroy(&mattr);
00153        return (0);
00154 }
00155 
00156 cthreadinfo_t *cthread_init(unsigned nthreads_, unsigned metasize,
00157        void (*workfunc_)(void *),
00158        void (*cleanupfunc_)(void *))
00159 {
00160 unsigned i;
00161 pthread_attr_t       pat;
00162 cthreadinfo_t *cit;
00163 
00164        if ((cit=(cthreadinfo_t *)malloc(sizeof(cthreadinfo_t))) == 0)
00165               return (0);
00166 
00167        cit->nthreads=nthreads_;
00168        cit->cleanupfunc=cleanupfunc_;
00169        cit->workfunc=workfunc_;
00170        if ( (cit->threads=(cthread_t *)malloc(cit->nthreads * sizeof(cthread_t))) == 0)
00171        {
00172               free((char *)cit);
00173               return (0);
00174        }
00175        if ( (cit->metadata_buf=malloc(metasize * cit->nthreads)) == 0)
00176        {
00177               free( (char *)cit->threads);
00178               free( (char *)cit );
00179               return (0);
00180        }
00181 
00182        cit->newcnt=cit->nthreads;
00183        if ( (cit->newbuf=(int *)malloc(cit->nthreads * sizeof(int))) == 0)
00184        {
00185               free(cit->metadata_buf);
00186               free( (char *)cit->threads);
00187               free( (char *)cit );
00188               return (0);
00189        }
00190        for (i=0; i<cit->nthreads; i++)
00191        {
00192               cit->newbuf[i]=i;
00193        }
00194 
00195        if (initcondmutex(&cit->newtask_cond, &cit->newtask_mutex))
00196        {
00197               free(cit->newbuf);
00198               free(cit->metadata_buf);
00199               free( (char *)cit->threads);
00200               free( (char *)cit );
00201               return (0);
00202        }
00203 
00204        if (initcondmutex(0, &cit->cleanup_mutex))
00205        {
00206               pthread_cond_destroy(&cit->newtask_cond);
00207               pthread_mutex_destroy(&cit->newtask_mutex);
00208 
00209               free(cit->newbuf);
00210               free(cit->metadata_buf);
00211               free( (char *)cit->threads);
00212               free( (char *)cit );
00213               return (0);
00214        }
00215 
00216        if (PTHREAD_CHK(pthread_attr_init(&pat)))
00217        {
00218               pthread_mutex_destroy(&cit->cleanup_mutex);
00219               pthread_cond_destroy(&cit->newtask_cond);
00220               pthread_mutex_destroy(&cit->newtask_mutex);
00221               free(cit->newbuf);
00222               free(cit->metadata_buf);
00223               free( (char *)cit->threads);
00224               free( (char *)cit );
00225               return (0);
00226        }
00227 
00228        for (i=0; i<cit->nthreads; i++)
00229        {
00230               cit->threads[i].me=i;
00231               cit->threads[i].metadata=(void *) (cit->metadata_buf+i*metasize);
00232               cit->threads[i].myinfo=cit;
00233 
00234               if (initcondmutex(&cit->threads[i].gocond,
00235                      &cit->threads[i].gomutex))
00236                      break;
00237 
00238               cit->threads[i].goflag=0;
00239 
00240               if (PTHREAD_CHK(pthread_create(&cit->threads[i].pt, &pat,
00241                                           &threadfunc,
00242                                           (void *)&cit->threads[i])))
00243               {
00244                      pthread_cond_destroy(&cit->threads[i].gocond);
00245                      pthread_mutex_destroy(&cit->threads[i].gomutex);
00246                      break;
00247               }
00248        }
00249 
00250        if ( i >= cit->nthreads)
00251        {
00252               pthread_attr_destroy(&pat);
00253               return (cit);
00254        }
00255 
00256        while (i)
00257        {
00258               --i;
00259               if (PTHREAD_CHK(pthread_cancel(cit->threads[i].pt)))
00260                      perror("pthread_cancel");
00261               
00262               if (PTHREAD_CHK(pthread_join(cit->threads[i].pt, NULL)))
00263                      perror("pthread_join");
00264 
00265               pthread_cond_destroy(&cit->threads[i].gocond);
00266               pthread_mutex_destroy(&cit->threads[i].gomutex);
00267        }
00268 
00269        pthread_attr_destroy(&pat);
00270        pthread_mutex_destroy(&cit->cleanup_mutex);
00271        pthread_cond_destroy(&cit->newtask_cond);
00272        pthread_mutex_destroy(&cit->newtask_mutex);
00273        free(cit->newbuf);
00274        free(cit->metadata_buf);
00275        free( (char *)cit->threads);
00276        free( (char *)cit );
00277        return (0);
00278 }
00279 
00280 void cthread_wait(cthreadinfo_t *cit)
00281 {
00282        unsigned i;
00283 
00284        if (PTHREAD_CHK(pthread_mutex_lock(&cit->newtask_mutex)))
00285        {
00286               perror("pthread_mutex_lock");
00287               return;
00288        }
00289 
00290        pthread_cleanup_push(&mutexcleanup, (void *)&cit->newtask_mutex);
00291 
00292        while (cit->newcnt < cit->nthreads)
00293        {
00294               if (PTHREAD_CHK(pthread_cond_wait(&cit->newtask_cond,
00295                                             &cit->newtask_mutex)))
00296               {
00297                      perror("pthread_cond_wait");
00298                      sleep(3);
00299               }
00300        }
00301 
00302        if (PTHREAD_CHK(pthread_mutex_unlock(&cit->newtask_mutex)))
00303               perror("pthread_mutex_unlock");
00304 
00305        pthread_cleanup_pop(0);
00306 
00307        for (i=0; i<cit->nthreads; i++)
00308        {
00309               if (PTHREAD_CHK(pthread_cancel(cit->threads[i].pt) ))
00310                      perror("pthread_cancel");
00311 
00312               if (PTHREAD_CHK(pthread_join(cit->threads[i].pt, NULL)))
00313                      perror("pthread_join");
00314 
00315               if (PTHREAD_CHK(pthread_cond_destroy(&cit->threads[i].gocond)))
00316                      perror("pthread_cond_destroy(gocond)");
00317 
00318               if (PTHREAD_CHK(pthread_mutex_destroy(&cit->threads[i]
00319                                                 .gomutex)))
00320                      perror("pthread_mutex_destroy");
00321        }
00322 
00323        if (PTHREAD_CHK(pthread_mutex_destroy(&cit->cleanup_mutex)))
00324               perror("pthread_mutex_destroy");
00325 
00326        if (PTHREAD_CHK(pthread_cond_destroy(&cit->newtask_cond)))
00327               perror("pthread_cond_destroy(newtask_cond)");
00328 
00329        if (PTHREAD_CHK(pthread_mutex_destroy(&cit->newtask_mutex)))
00330               perror("pthread_mutex_destroy");
00331        free(cit->newbuf);
00332        free(cit->metadata_buf);
00333        free( (char *)cit->threads);
00334        free( (char *)cit);
00335 }
00336 
00337 int cthread_go( cthreadinfo_t *cit, void (*gofunc)(void *, void *), void *arg)
00338 {
00339        int    n=0;
00340        int    err=0;
00341 
00342        if (PTHREAD_CHK(pthread_mutex_lock(&cit->newtask_mutex)))
00343               return (-1);
00344 
00345        pthread_cleanup_push(&mutexcleanup, (void *)&cit->newtask_mutex);
00346 
00347        while (cit->newcnt == 0)
00348        {
00349               if (PTHREAD_CHK(pthread_cond_wait(&cit->newtask_cond,
00350                                             &cit->newtask_mutex)))
00351               {
00352                      err=1;
00353                      break;
00354               }
00355        }
00356 
00357        if (!err)
00358               n=cit->newbuf[ --cit->newcnt ];
00359 
00360        if (PTHREAD_CHK(pthread_mutex_unlock(&cit->newtask_mutex)))
00361               err=1;
00362 
00363        pthread_cleanup_pop(0);
00364        if (err)      return (-1);
00365 
00366        (*gofunc)( cit->threads[n].metadata, arg);
00367 
00368        if (PTHREAD_CHK(pthread_mutex_lock(&cit->threads[n].gomutex)))
00369               return (-1);
00370        cit->threads[n].goflag=1;
00371        if (PTHREAD_CHK(pthread_mutex_unlock(&cit->threads[n].gomutex)))
00372               return (-1);
00373        if (PTHREAD_CHK(pthread_cond_signal( &cit->threads[n].gocond )))
00374               return (-1);
00375        return (0);
00376 }
00377 
00378 struct cthreadlock {
00379        pthread_mutex_t      mutex;
00380        } ;
00381 
00382 struct cthreadlock *cthread_lockcreate(void)
00383 {
00384 struct cthreadlock *p= (struct cthreadlock *)malloc(sizeof(struct cthreadlock));
00385 pthread_mutexattr_t mattr;
00386 
00387        if (!p)       return (0);
00388 
00389        if (PTHREAD_CHK(pthread_mutexattr_init(&mattr)))
00390        {
00391               errno=EIO;
00392               free( (char *)p );
00393               return (0);
00394        }
00395 
00396        if (PTHREAD_CHK(pthread_mutex_init(&p->mutex, &mattr)))
00397        {
00398               pthread_mutexattr_destroy(&mattr);
00399               errno=EIO;
00400               free( (char *)p );
00401               return (0);
00402        }
00403        pthread_mutexattr_destroy(&mattr);
00404        return (p);
00405 }
00406 
00407 void cthread_lockdestroy(struct cthreadlock *p)
00408 {
00409        pthread_mutex_destroy(&p->mutex);
00410        free( (char *)p );
00411 }
00412 
00413 int cthread_lock(struct cthreadlock *p, int (*func)(void *), void *arg)
00414 {
00415 int    rc;
00416 
00417        while (pthread_mutex_lock(&p->mutex))
00418        {
00419               perror("pthread_mutex_lock");
00420               sleep(3);
00421        }
00422 
00423        pthread_cleanup_push(&mutexcleanup, (void *)&p->mutex);
00424 
00425        rc= (*func)(arg);
00426 
00427        if (PTHREAD_CHK(pthread_mutex_unlock( &p->mutex )))
00428               perror("pthread_mutex_unlock");
00429        pthread_cleanup_pop(0);
00430        return (rc);
00431 }