Back to index

lightning-sunbird  0.9+nobinonly
Classes | Typedefs | Functions
prtpool.h File Reference
#include "prtypes.h"
#include "prthread.h"
#include "prio.h"
#include "prerror.h"
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Classes

struct  PRJobIoDesc

Typedefs

typedef PR_BEGIN_EXTERN_C
struct PRJobIoDesc 
PRJobIoDesc
typedef struct PRThreadPool
typedef struct PRJob

Functions

typedef void (PR_CALLBACK *PRJobFn)(void *arg)
 PR_CreateThreadPool (PRInt32 initial_threads, PRInt32 max_threads, PRUint32 stacksize)
 PR_QueueJob (PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable)
 PR_QueueJob_Read (PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void *arg, PRBool joinable)
 PR_QueueJob_Write (PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void *arg, PRBool joinable)
 PR_QueueJob_Accept (PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void *arg, PRBool joinable)
 PR_QueueJob_Connect (PRThreadPool *tpool, PRJobIoDesc *iod, const PRNetAddr *addr, PRJobFn fn, void *arg, PRBool joinable)
 PR_QueueJob_Timer (PRThreadPool *tpool, PRIntervalTime timeout, PRJobFn fn, void *arg, PRBool joinable)
 PR_CancelJob (PRJob *job)
 PR_JoinJob (PRJob *job)
 PR_ShutdownThreadPool (PRThreadPool *tpool)
 PR_JoinThreadPool (PRThreadPool *tpool)

Class Documentation

struct PRJobIoDesc

Definition at line 54 of file prtpool.h.

Collaboration diagram for PRJobIoDesc:
Class Members
PRErrorCode error
PRFileDesc * socket
PRIntervalTime timeout

Typedef Documentation

typedef struct PRJob

Definition at line 61 of file prtpool.h.

typedef struct PRThreadPool

Definition at line 60 of file prtpool.h.


Function Documentation

PR_CancelJob ( PRJob job)

Definition at line 1001 of file prtpool.c.

                          {

       PRStatus rval = PR_FAILURE;
       PRThreadPool *tp;

       if (jobp->on_timerq) {
              /*
               * now, check again while holding the timerq lock
               */
              tp = jobp->tpool;
              PR_Lock(tp->timerq.lock);
              if (jobp->on_timerq) {
                     jobp->on_timerq = PR_FALSE;
                     PR_REMOVE_AND_INIT_LINK(&jobp->links);
                     tp->timerq.cnt--;
                     PR_Unlock(tp->timerq.lock);
                     if (!JOINABLE_JOB(jobp)) {
                            delete_job(jobp);
                     } else {
                            JOIN_NOTIFY(jobp);
                     }
                     rval = PR_SUCCESS;
              } else
                     PR_Unlock(tp->timerq.lock);
       } else if (jobp->on_ioq) {
              /*
               * now, check again while holding the ioq lock
               */
              tp = jobp->tpool;
              PR_Lock(tp->ioq.lock);
              if (jobp->on_ioq) {
                     jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
                     if (NULL == jobp->cancel_cv) {
                            PR_Unlock(tp->ioq.lock);
                            PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
                            return PR_FAILURE;
                     }
                     /*
                      * mark job 'cancelled' and notify io thread(s)
                      * XXXX:
                      *            this assumes there is only one io thread; when there
                      *            are multiple threads, the io thread processing this job
                      *            must be notified.
                      */
                     jobp->cancel_io = PR_TRUE;
                     PR_Unlock(tp->ioq.lock);    /* release, reacquire ioq lock */
                     notify_ioq(tp);
                     PR_Lock(tp->ioq.lock);
                     while (jobp->cancel_io)
                            PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
                     PR_Unlock(tp->ioq.lock);
                     PR_ASSERT(!jobp->on_ioq);
                     if (!JOINABLE_JOB(jobp)) {
                            delete_job(jobp);
                     } else {
                            JOIN_NOTIFY(jobp);
                     }
                     rval = PR_SUCCESS;
              } else
                     PR_Unlock(tp->ioq.lock);
       }
       if (PR_FAILURE == rval)
              PR_SetError(PR_INVALID_STATE_ERROR, 0);
       return rval;
}

Here is the call graph for this function:

Here is the caller graph for this function:

PR_CreateThreadPool ( PRInt32  initial_threads,
PRInt32  max_threads,
PRUint32  stacksize 
)

Definition at line 658 of file prtpool.c.

{
PRThreadPool *tp;
PRThread *thr;
int i;
wthread *wthrp;

       tp = alloc_threadpool();
       if (NULL == tp)
              return NULL;

       tp->init_threads = initial_threads;
       tp->max_threads = max_threads;
       tp->stacksize = stacksize;
       PR_INIT_CLIST(&tp->jobq.list);
       PR_INIT_CLIST(&tp->ioq.list);
       PR_INIT_CLIST(&tp->timerq.list);
       PR_INIT_CLIST(&tp->jobq.wthreads);
       PR_INIT_CLIST(&tp->ioq.wthreads);
       PR_INIT_CLIST(&tp->timerq.wthreads);
       tp->shutdown = PR_FALSE;

       PR_Lock(tp->jobq.lock);
       for(i=0; i < initial_threads; ++i) {

              thr = PR_CreateThread(PR_USER_THREAD, wstart,
                                          tp, PR_PRIORITY_NORMAL,
                                          PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize);
              PR_ASSERT(thr);
              wthrp = PR_NEWZAP(wthread);
              PR_ASSERT(wthrp);
              wthrp->thread = thr;
              PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
       }
       tp->current_threads = initial_threads;

       thr = PR_CreateThread(PR_USER_THREAD, io_wstart,
                                   tp, PR_PRIORITY_NORMAL,
                                   PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
       PR_ASSERT(thr);
       wthrp = PR_NEWZAP(wthread);
       PR_ASSERT(wthrp);
       wthrp->thread = thr;
       PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);

       thr = PR_CreateThread(PR_USER_THREAD, timer_wstart,
                                   tp, PR_PRIORITY_NORMAL,
                                   PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
       PR_ASSERT(thr);
       wthrp = PR_NEWZAP(wthread);
       PR_ASSERT(wthrp);
       wthrp->thread = thr;
       PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);

       PR_Unlock(tp->jobq.lock);
       return tp;
}

Here is the call graph for this function:

Here is the caller graph for this function:

PR_JoinJob ( PRJob job)

Definition at line 1069 of file prtpool.c.

{
       if (!JOINABLE_JOB(jobp)) {
              PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
              return PR_FAILURE;
       }
       PR_Lock(jobp->tpool->join_lock);
       while(jobp->join_wait)
              PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
       PR_Unlock(jobp->tpool->join_lock);
       delete_job(jobp);
       return PR_SUCCESS;
}

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 1103 of file prtpool.c.

{
PRStatus rval = PR_SUCCESS;
PRCList *head;
PRStatus rval_status;

       PR_Lock(tpool->jobq.lock);
       while (!tpool->shutdown)
              PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);

       /*
        * wakeup worker threads
        */
#ifdef OPT_WINNT
       /*
        * post shutdown notification for all threads
        */
       {
              int i;
              for(i=0; i < tpool->current_threads; i++) {
                     PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0,
                                                                                    TRUE, NULL);
              }
       }
#else
       PR_NotifyAllCondVar(tpool->jobq.cv);
#endif

       /*
        * wakeup io thread(s)
        */
       notify_ioq(tpool);

       /*
        * wakeup timer thread(s)
        */
       PR_Lock(tpool->timerq.lock);
       notify_timerq(tpool);
       PR_Unlock(tpool->timerq.lock);

       while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {
              wthread *wthrp;

              head = PR_LIST_HEAD(&tpool->jobq.wthreads);
              PR_REMOVE_AND_INIT_LINK(head);
              PR_Unlock(tpool->jobq.lock);
              wthrp = WTHREAD_LINKS_PTR(head);
              rval_status = PR_JoinThread(wthrp->thread);
              PR_ASSERT(PR_SUCCESS == rval_status);
              PR_DELETE(wthrp);
              PR_Lock(tpool->jobq.lock);
       }
       PR_Unlock(tpool->jobq.lock);
       while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {
              wthread *wthrp;

              head = PR_LIST_HEAD(&tpool->ioq.wthreads);
              PR_REMOVE_AND_INIT_LINK(head);
              wthrp = WTHREAD_LINKS_PTR(head);
              rval_status = PR_JoinThread(wthrp->thread);
              PR_ASSERT(PR_SUCCESS == rval_status);
              PR_DELETE(wthrp);
       }

       while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {
              wthread *wthrp;

              head = PR_LIST_HEAD(&tpool->timerq.wthreads);
              PR_REMOVE_AND_INIT_LINK(head);
              wthrp = WTHREAD_LINKS_PTR(head);
              rval_status = PR_JoinThread(wthrp->thread);
              PR_ASSERT(PR_SUCCESS == rval_status);
              PR_DELETE(wthrp);
       }

       /*
        * Delete queued jobs
        */
       while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
              PRJob *jobp;

              head = PR_LIST_HEAD(&tpool->jobq.list);
              PR_REMOVE_AND_INIT_LINK(head);
              jobp = JOB_LINKS_PTR(head);
              tpool->jobq.cnt--;
              delete_job(jobp);
       }

       /* delete io jobs */
       while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
              PRJob *jobp;

              head = PR_LIST_HEAD(&tpool->ioq.list);
              PR_REMOVE_AND_INIT_LINK(head);
              tpool->ioq.cnt--;
              jobp = JOB_LINKS_PTR(head);
              delete_job(jobp);
       }

       /* delete timer jobs */
       while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
              PRJob *jobp;

              head = PR_LIST_HEAD(&tpool->timerq.list);
              PR_REMOVE_AND_INIT_LINK(head);
              tpool->timerq.cnt--;
              jobp = JOB_LINKS_PTR(head);
              delete_job(jobp);
       }

       PR_ASSERT(0 == tpool->jobq.cnt);
       PR_ASSERT(0 == tpool->ioq.cnt);
       PR_ASSERT(0 == tpool->timerq.cnt);

       delete_threadpool(tpool);
       return rval;
}

Here is the call graph for this function:

Here is the caller graph for this function:

PR_QueueJob ( PRThreadPool tpool,
PRJobFn  fn,
void arg,
PRBool  joinable 
)

Definition at line 761 of file prtpool.c.

{
       PRJob *jobp;

       jobp = alloc_job(joinable, tpool);
       if (NULL == jobp)
              return NULL;

       jobp->job_func = fn;
       jobp->job_arg = arg;
       jobp->tpool = tpool;

       add_to_jobq(tpool, jobp);
       return jobp;
}

Here is the call graph for this function:

Here is the caller graph for this function:

PR_QueueJob_Accept ( PRThreadPool tpool,
PRJobIoDesc iod,
PRJobFn  fn,
void arg,
PRBool  joinable 
)

Definition at line 879 of file prtpool.c.

{
       return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
}

Here is the call graph for this function:

Here is the caller graph for this function:

PR_QueueJob_Connect ( PRThreadPool tpool,
PRJobIoDesc iod,
const PRNetAddr addr,
PRJobFn  fn,
void arg,
PRBool  joinable 
)

Definition at line 887 of file prtpool.c.

{
       PRStatus rv;
       PRErrorCode err;

       rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);
       if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){
              /* connection pending */
              return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
       } else {
              /*
               * connection succeeded or failed; add to jobq right away
               */
              if (rv == PR_FAILURE)
                     iod->error = err;
              else
                     iod->error = 0;
              return(PR_QueueJob(tpool, fn, arg, joinable));
       }
}

Here is the call graph for this function:

PR_QueueJob_Read ( PRThreadPool tpool,
PRJobIoDesc iod,
PRJobFn  fn,
void arg,
PRBool  joinable 
)

Definition at line 862 of file prtpool.c.

{
       return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
}

Here is the call graph for this function:

Here is the caller graph for this function:

PR_QueueJob_Timer ( PRThreadPool tpool,
PRIntervalTime  timeout,
PRJobFn  fn,
void arg,
PRBool  joinable 
)

Definition at line 911 of file prtpool.c.

{
       PRIntervalTime now;
       PRJob *jobp;

       if (PR_INTERVAL_NO_TIMEOUT == timeout) {
              PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
              return NULL;
       }
       if (PR_INTERVAL_NO_WAIT == timeout) {
              /*
               * no waiting; add to jobq right away
               */
              return(PR_QueueJob(tpool, fn, arg, joinable));
       }
       jobp = alloc_job(joinable, tpool);
       if (NULL == jobp) {
              return NULL;
       }

       /*
        * Add a new job to timer_jobq
        * wakeup timer worker thread
        */

       jobp->job_func = fn;
       jobp->job_arg = arg;
       jobp->tpool = tpool;
       jobp->timeout = timeout;

       now = PR_IntervalNow();
       jobp->absolute = now + timeout;


       PR_Lock(tpool->timerq.lock);
       jobp->on_timerq = PR_TRUE;
       if (PR_CLIST_IS_EMPTY(&tpool->timerq.list))
              PR_APPEND_LINK(&jobp->links,&tpool->timerq.list);
       else {
              PRCList *qp;
              PRJob *tmp_jobp;
              /*
               * insert into the sorted timer jobq
               */
              for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
                                                 qp = qp->prev) {
                     tmp_jobp = JOB_LINKS_PTR(qp);
                     if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
                            break;
                     }
              }
              PR_INSERT_AFTER(&jobp->links,qp);
       }
       tpool->timerq.cnt++;
       /*
        * notify timer worker thread(s)
        */
       notify_timerq(tpool);
       PR_Unlock(tpool->timerq.lock);
       return jobp;
}

Here is the call graph for this function:

Here is the caller graph for this function:

PR_QueueJob_Write ( PRThreadPool tpool,
PRJobIoDesc iod,
PRJobFn  fn,
void arg,
PRBool  joinable 
)

Definition at line 870 of file prtpool.c.

{
       return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
}

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 1085 of file prtpool.c.

Here is the call graph for this function:

Here is the caller graph for this function:

typedef void ( PR_CALLBACK PRJobFn)