Back to index

lightning-sunbird  0.9+nobinonly
Classes | Defines | Typedefs | Enumerations | Functions
prtpool.c File Reference
#include "nspr.h"

Go to the source code of this file.

Classes

struct  wthread
struct  timer_jobq
struct  tp_jobq
struct  io_jobq
struct  PRThreadPool
struct  PRJob

Defines

#define JOB_LINKS_PTR(_qp)   ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links)))
#define WTHREAD_LINKS_PTR(_qp)   ((wthread *) ((char *) (_qp) - offsetof(wthread, links)))
#define JOINABLE_JOB(_jobp)   (NULL != (_jobp)->join_cv)
#define JOIN_NOTIFY(_jobp)
#define CANCEL_IO_JOB(jobp)

Typedefs

typedef struct wthread wthread
typedef struct timer_jobq timer_jobq
typedef struct tp_jobq tp_jobq
typedef struct io_jobq io_jobq
typedef enum io_op_type io_op_type

Enumerations

enum  io_op_type { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT }

Functions

static void delete_job (PRJob *jobp)
static PRThreadPoolalloc_threadpool (void)
static PRJoballoc_job (PRBool joinable, PRThreadPool *tp)
static void notify_ioq (PRThreadPool *tp)
static void notify_timerq (PRThreadPool *tp)
static void wstart (void *arg)
static void add_to_jobq (PRThreadPool *tp, PRJob *jobp)
static void io_wstart (void *arg)
static void timer_wstart (void *arg)
static void delete_threadpool (PRThreadPool *tp)
 PR_CreateThreadPool (PRInt32 initial_threads, PRInt32 max_threads, PRUint32 stacksize)
 PR_QueueJob (PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable)
static PRJobqueue_io_job (PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void *arg, PRBool joinable, io_op_type op)
 PR_QueueJob_Read (PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void *arg, PRBool joinable)
 PR_QueueJob_Write (PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void *arg, PRBool joinable)
 PR_QueueJob_Accept (PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void *arg, PRBool joinable)
 PR_QueueJob_Connect (PRThreadPool *tpool, PRJobIoDesc *iod, const PRNetAddr *addr, PRJobFn fn, void *arg, PRBool joinable)
 PR_QueueJob_Timer (PRThreadPool *tpool, PRIntervalTime timeout, PRJobFn fn, void *arg, PRBool joinable)
 PR_CancelJob (PRJob *jobp)
 PR_JoinJob (PRJob *jobp)
 PR_ShutdownThreadPool (PRThreadPool *tpool)
 PR_JoinThreadPool (PRThreadPool *tpool)

Class Documentation

struct wthread

Definition at line 53 of file prtpool.c.

Collaboration diagram for wthread:
Class Members
PRCList links
PRThread * thread
struct timer_jobq

Definition at line 61 of file prtpool.c.

Collaboration diagram for timer_jobq:
Class Members
PRInt32 cnt
PRCondVar * cv
PRCList list
PRLock * lock
PRCList wthreads
struct tp_jobq

Definition at line 72 of file prtpool.c.

Collaboration diagram for tp_jobq:
Class Members
PRInt32 cnt
PRCondVar * cv
PRCList list
PRLock * lock
PRCList wthreads
struct io_jobq

Definition at line 86 of file prtpool.c.

Collaboration diagram for io_jobq:
Class Members
PRInt32 cnt
PRCList list
PRLock * lock
PRFileDesc * notify_fd
PRInt32 npollfds
PRPollDesc * pollfds
PRJob ** polljobs
PRCList wthreads
struct PRThreadPool

Definition at line 100 of file prtpool.c.

Collaboration diagram for PRThreadPool:
Class Members
PRInt32 current_threads
PRInt32 idle_threads
PRInt32 init_threads
io_jobq ioq
tp_jobq jobq
PRLock * join_lock
PRInt32 max_threads
PRBool shutdown
PRCondVar * shutdown_cv
PRUint32 stacksize
timer_jobq timerq
struct PRJob

Definition at line 124 of file prtpool.c.

Collaboration diagram for PRJob:
Class Members
PRIntervalTime absolute
PRCondVar * cancel_cv
PRBool cancel_io
io_op_type io_op
PRInt16 io_poll_flags
PRJobIoDesc * iod
void * job_arg
PRJobFn job_func
PRCondVar * join_cv
PRBool join_wait
PRCList links
PRNetAddr * netaddr
PRBool on_ioq
PRBool on_timerq
PRIntervalTime timeout
PRThreadPool * tpool

Define Documentation

#define CANCEL_IO_JOB (   jobp)
Value:
PR_BEGIN_MACRO                                                 \
                            jobp->cancel_io = PR_FALSE;                      \
                            jobp->on_ioq = PR_FALSE;                         \
                            PR_REMOVE_AND_INIT_LINK(&jobp->links);    \
                            tp->ioq.cnt--;                                                 \
                            PR_NotifyCondVar(jobp->cancel_cv);        \
                            PR_END_MACRO

Definition at line 162 of file prtpool.c.

#define JOB_LINKS_PTR (   _qp)    ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links)))

Definition at line 146 of file prtpool.c.

#define JOIN_NOTIFY (   _jobp)
Value:
PR_BEGIN_MACRO							\
				PR_Lock(_jobp->tpool->join_lock);            \
                            _jobp->join_wait = PR_FALSE;                     \
                            PR_NotifyCondVar(_jobp->join_cv);         \
                            PR_Unlock(_jobp->tpool->join_lock);              \
                            PR_END_MACRO

Definition at line 154 of file prtpool.c.

#define JOINABLE_JOB (   _jobp)    (NULL != (_jobp)->join_cv)

Definition at line 152 of file prtpool.c.

#define WTHREAD_LINKS_PTR (   _qp)    ((wthread *) ((char *) (_qp) - offsetof(wthread, links)))

Definition at line 149 of file prtpool.c.


Typedef Documentation

typedef struct io_jobq io_jobq
typedef enum io_op_type io_op_type
typedef struct timer_jobq timer_jobq
typedef struct tp_jobq tp_jobq
typedef struct wthread wthread

Enumeration Type Documentation

enum io_op_type
Enumerator:
JOB_IO_READ 
JOB_IO_WRITE 
JOB_IO_CONNECT 
JOB_IO_ACCEPT 

Definition at line 114 of file prtpool.c.


Function Documentation

static void add_to_jobq ( PRThreadPool tp,
PRJob jobp 
) [static]

Definition at line 256 of file prtpool.c.

{
       /*
        * add to jobq
        */
#ifdef OPT_WINNT
       PR_Lock(tp->jobq.lock);
       tp->jobq.cnt++;
       PR_Unlock(tp->jobq.lock);
       /*
        * notify worker thread(s)
        */
       PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0,
            FALSE, &jobp->nt_notifier.overlapped);
#else
       PR_Lock(tp->jobq.lock);
       PR_APPEND_LINK(&jobp->links,&tp->jobq.list);
       tp->jobq.cnt++;
       if ((tp->idle_threads < tp->jobq.cnt) &&
                                   (tp->current_threads < tp->max_threads)) {
              wthread *wthrp;
              /*
               * increment thread count and unlock the jobq lock
               */
              tp->current_threads++;
              PR_Unlock(tp->jobq.lock);
              /* create new worker thread */
              wthrp = PR_NEWZAP(wthread);
              if (wthrp) {
                     wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart,
                                          tp, PR_PRIORITY_NORMAL,
                                          PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize);
                     if (NULL == wthrp->thread) {
                            PR_DELETE(wthrp);  /* this sets wthrp to NULL */
                     }
              }
              PR_Lock(tp->jobq.lock);
              if (NULL == wthrp) {
                     tp->current_threads--;
              } else {
                     PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
              }
       }
       /*
        * wakeup a worker thread
        */
       PR_NotifyCondVar(tp->jobq.cv);
       PR_Unlock(tp->jobq.lock);
#endif
}

Here is the call graph for this function:

Here is the caller graph for this function:

static PRJob * alloc_job ( PRBool  joinable,
PRThreadPool tp 
) [static]

Definition at line 734 of file prtpool.c.

{
       PRJob *jobp;

       jobp = PR_NEWZAP(PRJob);
       if (NULL == jobp) 
              goto failed;
       if (joinable) {
              jobp->join_cv = PR_NewCondVar(tp->join_lock);
              jobp->join_wait = PR_TRUE;
              if (NULL == jobp->join_cv)
                     goto failed;
       } else {
              jobp->join_cv = NULL;
       }
#ifdef OPT_WINNT
       jobp->nt_notifier.jobp = jobp;
#endif
       return jobp;
failed:
       delete_job(jobp);
       PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
       return NULL;
}

Here is the call graph for this function:

Here is the caller graph for this function:

static PRThreadPool * alloc_threadpool ( void  ) [static]

Definition at line 607 of file prtpool.c.

{
PRThreadPool *tp;

       tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp));
       if (NULL == tp)
              goto failed;
       tp->jobq.lock = PR_NewLock();
       if (NULL == tp->jobq.lock)
              goto failed;
       tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
       if (NULL == tp->jobq.cv)
              goto failed;
       tp->join_lock = PR_NewLock();
       if (NULL == tp->join_lock)
              goto failed;
#ifdef OPT_WINNT
       tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
                                                               NULL, 0, 0);
       if (NULL == tp->jobq.nt_completion_port)
              goto failed;
#endif

       tp->ioq.lock = PR_NewLock();
       if (NULL == tp->ioq.lock)
              goto failed;

       /* Timer queue */

       tp->timerq.lock = PR_NewLock();
       if (NULL == tp->timerq.lock)
              goto failed;
       tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);
       if (NULL == tp->timerq.cv)
              goto failed;

       tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);
       if (NULL == tp->shutdown_cv)
              goto failed;
       tp->ioq.notify_fd = PR_NewPollableEvent();
       if (NULL == tp->ioq.notify_fd)
              goto failed;
       return tp;
failed:
       delete_threadpool(tp);
       PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
       return NULL;
}

Here is the call graph for this function:

Here is the caller graph for this function:

static void delete_job ( PRJob jobp) [static]

Definition at line 718 of file prtpool.c.

{
       if (NULL != jobp) {
              if (NULL != jobp->join_cv) {
                     PR_DestroyCondVar(jobp->join_cv);
                     jobp->join_cv = NULL;
              }
              if (NULL != jobp->cancel_cv) {
                     PR_DestroyCondVar(jobp->cancel_cv);
                     jobp->cancel_cv = NULL;
              }
              PR_DELETE(jobp);
       }
}

Here is the call graph for this function:

Here is the caller graph for this function:

static void delete_threadpool ( PRThreadPool tp) [static]

Definition at line 574 of file prtpool.c.

{
       if (NULL != tp) {
              if (NULL != tp->shutdown_cv)
                     PR_DestroyCondVar(tp->shutdown_cv);
              if (NULL != tp->jobq.cv)
                     PR_DestroyCondVar(tp->jobq.cv);
              if (NULL != tp->jobq.lock)
                     PR_DestroyLock(tp->jobq.lock);
              if (NULL != tp->join_lock)
                     PR_DestroyLock(tp->join_lock);
#ifdef OPT_WINNT
              if (NULL != tp->jobq.nt_completion_port)
                     CloseHandle(tp->jobq.nt_completion_port);
#endif
              /* Timer queue */
              if (NULL != tp->timerq.cv)
                     PR_DestroyCondVar(tp->timerq.cv);
              if (NULL != tp->timerq.lock)
                     PR_DestroyLock(tp->timerq.lock);

              if (NULL != tp->ioq.lock)
                     PR_DestroyLock(tp->ioq.lock);
              if (NULL != tp->ioq.pollfds)
                     PR_Free(tp->ioq.pollfds);
              if (NULL != tp->ioq.notify_fd)
                     PR_DestroyPollableEvent(tp->ioq.notify_fd);
              PR_Free(tp);
       }
       return;
}

Here is the call graph for this function:

Here is the caller graph for this function:

static void io_wstart ( void arg) [static]

Definition at line 310 of file prtpool.c.

{
PRThreadPool *tp = (PRThreadPool *) arg;
int pollfd_cnt, pollfds_used;
int rv;
PRCList *qp, *nextqp;
PRPollDesc *pollfds;
PRJob **polljobs;
int poll_timeout;
PRIntervalTime now;

       /*
        * scan io_jobq
        * construct poll list
        * call PR_Poll
        * for all fds, for which poll returns true, move the job to
        * jobq and wakeup worker thread.
        */
       while (!tp->shutdown) {
              PRJob *jobp;

              pollfd_cnt = tp->ioq.cnt + 10;
              if (pollfd_cnt > tp->ioq.npollfds) {

                     /*
                      * re-allocate pollfd array if the current one is not large
                      * enough
                      */
                     if (NULL != tp->ioq.pollfds)
                            PR_Free(tp->ioq.pollfds);
                     tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt *
                                          (sizeof(PRPollDesc) + sizeof(PRJob *)));
                     PR_ASSERT(NULL != tp->ioq.pollfds);
                     /*
                      * array of pollfds
                      */
                     pollfds = tp->ioq.pollfds;
                     tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]);
                     /*
                      * parallel array of jobs
                      */
                     polljobs = tp->ioq.polljobs;
                     tp->ioq.npollfds = pollfd_cnt;
              }

              pollfds_used = 0;
              /*
               * add the notify fd; used for unblocking io thread(s)
               */
              pollfds[pollfds_used].fd = tp->ioq.notify_fd;
              pollfds[pollfds_used].in_flags = PR_POLL_READ;
              pollfds[pollfds_used].out_flags = 0;
              polljobs[pollfds_used] = NULL;
              pollfds_used++;
              /*
               * fill in the pollfd array
               */
              PR_Lock(tp->ioq.lock);
              for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
                     nextqp = qp->next;
                     jobp = JOB_LINKS_PTR(qp);
                     if (jobp->cancel_io) {
                            CANCEL_IO_JOB(jobp);
                            continue;
                     }
                     if (pollfds_used == (pollfd_cnt))
                            break;
                     pollfds[pollfds_used].fd = jobp->iod->socket;
                     pollfds[pollfds_used].in_flags = jobp->io_poll_flags;
                     pollfds[pollfds_used].out_flags = 0;
                     polljobs[pollfds_used] = jobp;

                     pollfds_used++;
              }
              if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) {
                     qp = tp->ioq.list.next;
                     jobp = JOB_LINKS_PTR(qp);
                     if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
                            poll_timeout = PR_INTERVAL_NO_TIMEOUT;
                     else if (PR_INTERVAL_NO_WAIT == jobp->timeout)
                            poll_timeout = PR_INTERVAL_NO_WAIT;
                     else {
                            poll_timeout = jobp->absolute - PR_IntervalNow();
                            if (poll_timeout <= 0) /* already timed out */
                                   poll_timeout = PR_INTERVAL_NO_WAIT;
                     }
              } else {
                     poll_timeout = PR_INTERVAL_NO_TIMEOUT;
              }
              PR_Unlock(tp->ioq.lock);

              /*
               * XXXX
               * should retry if more jobs have been added to the queue?
               *
               */
              PR_ASSERT(pollfds_used <= pollfd_cnt);
              rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);

              if (tp->shutdown) {
                     break;
              }

              if (rv > 0) {
                     /*
                      * at least one io event is set
                      */
                     PRStatus rval_status;
                     PRInt32 index;

                     PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd);
                     /*
                      * reset the pollable event, if notified
                      */
                     if (pollfds[0].out_flags & PR_POLL_READ) {
                            rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);
                            PR_ASSERT(PR_SUCCESS == rval_status);
                     }

                     for(index = 1; index < (pollfds_used); index++) {
                PRInt16 events = pollfds[index].in_flags;
                PRInt16 revents = pollfds[index].out_flags;    
                            jobp = polljobs[index];     

                if ((revents & PR_POLL_NVAL) ||  /* busted in all cases */
                     (revents & PR_POLL_ERR) ||
                                   ((events & PR_POLL_WRITE) &&
                                                 (revents & PR_POLL_HUP))) { /* write op & hup */
                                   PR_Lock(tp->ioq.lock);
                                   if (jobp->cancel_io) {
                                          CANCEL_IO_JOB(jobp);
                                          PR_Unlock(tp->ioq.lock);
                                          continue;
                                   }
                                   PR_REMOVE_AND_INIT_LINK(&jobp->links);
                                   tp->ioq.cnt--;
                                   jobp->on_ioq = PR_FALSE;
                                   PR_Unlock(tp->ioq.lock);

                                   /* set error */
                    if (PR_POLL_NVAL & revents)
                                          jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR;
                    else if (PR_POLL_HUP & revents)
                                          jobp->iod->error = PR_CONNECT_RESET_ERROR;
                    else 
                                          jobp->iod->error = PR_IO_ERROR;

                                   /*
                                    * add to jobq
                                    */
                                   add_to_jobq(tp, jobp);
                            } else if (revents) {
                                   /*
                                    * add to jobq
                                    */
                                   PR_Lock(tp->ioq.lock);
                                   if (jobp->cancel_io) {
                                          CANCEL_IO_JOB(jobp);
                                          PR_Unlock(tp->ioq.lock);
                                          continue;
                                   }
                                   PR_REMOVE_AND_INIT_LINK(&jobp->links);
                                   tp->ioq.cnt--;
                                   jobp->on_ioq = PR_FALSE;
                                   PR_Unlock(tp->ioq.lock);

                                   if (jobp->io_op == JOB_IO_CONNECT) {
                                          if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS)
                                                 jobp->iod->error = 0;
                                          else
                                                 jobp->iod->error = PR_GetError();
                                   } else
                                          jobp->iod->error = 0;

                                   add_to_jobq(tp, jobp);
                            }
                     }
              }
              /*
               * timeout processing
               */
              now = PR_IntervalNow();
              PR_Lock(tp->ioq.lock);
              for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
                     nextqp = qp->next;
                     jobp = JOB_LINKS_PTR(qp);
                     if (jobp->cancel_io) {
                            CANCEL_IO_JOB(jobp);
                            continue;
                     }
                     if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
                            break;
                     if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&
                                                        ((PRInt32)(jobp->absolute - now) > 0))
                            break;
                     PR_REMOVE_AND_INIT_LINK(&jobp->links);
                     tp->ioq.cnt--;
                     jobp->on_ioq = PR_FALSE;
                     jobp->iod->error = PR_IO_TIMEOUT_ERROR;
                     add_to_jobq(tp, jobp);
              }
              PR_Unlock(tp->ioq.lock);
       }
}

Here is the call graph for this function:

Here is the caller graph for this function:

static void notify_ioq ( PRThreadPool tp) [static]

Definition at line 984 of file prtpool.c.

{
PRStatus rval_status;

       /*
        * wakeup the io thread(s)
        */
       rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);
       PR_ASSERT(PR_SUCCESS == rval_status);
}

Here is the caller graph for this function:

static void notify_timerq ( PRThreadPool tp) [static]

Definition at line 975 of file prtpool.c.

{
       /*
        * wakeup the timer thread(s)
        */
       PR_NotifyCondVar(tp->timerq.cv);
}

Here is the call graph for this function:

Here is the caller graph for this function:

PR_CancelJob ( PRJob jobp)

Definition at line 1001 of file prtpool.c.

                          {

       PRStatus rval = PR_FAILURE;
       PRThreadPool *tp;

       if (jobp->on_timerq) {
              /*
               * now, check again while holding the timerq lock
               */
              tp = jobp->tpool;
              PR_Lock(tp->timerq.lock);
              if (jobp->on_timerq) {
                     jobp->on_timerq = PR_FALSE;
                     PR_REMOVE_AND_INIT_LINK(&jobp->links);
                     tp->timerq.cnt--;
                     PR_Unlock(tp->timerq.lock);
                     if (!JOINABLE_JOB(jobp)) {
                            delete_job(jobp);
                     } else {
                            JOIN_NOTIFY(jobp);
                     }
                     rval = PR_SUCCESS;
              } else
                     PR_Unlock(tp->timerq.lock);
       } else if (jobp->on_ioq) {
              /*
               * now, check again while holding the ioq lock
               */
              tp = jobp->tpool;
              PR_Lock(tp->ioq.lock);
              if (jobp->on_ioq) {
                     jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
                     if (NULL == jobp->cancel_cv) {
                            PR_Unlock(tp->ioq.lock);
                            PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
                            return PR_FAILURE;
                     }
                     /*
                      * mark job 'cancelled' and notify io thread(s)
                      * XXXX:
                      *            this assumes there is only one io thread; when there
                      *            are multiple threads, the io thread processing this job
                      *            must be notified.
                      */
                     jobp->cancel_io = PR_TRUE;
                     PR_Unlock(tp->ioq.lock);    /* release, reacquire ioq lock */
                     notify_ioq(tp);
                     PR_Lock(tp->ioq.lock);
                     while (jobp->cancel_io)
                            PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
                     PR_Unlock(tp->ioq.lock);
                     PR_ASSERT(!jobp->on_ioq);
                     if (!JOINABLE_JOB(jobp)) {
                            delete_job(jobp);
                     } else {
                            JOIN_NOTIFY(jobp);
                     }
                     rval = PR_SUCCESS;
              } else
                     PR_Unlock(tp->ioq.lock);
       }
       if (PR_FAILURE == rval)
              PR_SetError(PR_INVALID_STATE_ERROR, 0);
       return rval;
}

Here is the call graph for this function:

Here is the caller graph for this function:

PR_CreateThreadPool ( PRInt32  initial_threads,
PRInt32  max_threads,
PRUint32  stacksize 
)

Definition at line 658 of file prtpool.c.

{
PRThreadPool *tp;
PRThread *thr;
int i;
wthread *wthrp;

       tp = alloc_threadpool();
       if (NULL == tp)
              return NULL;

       tp->init_threads = initial_threads;
       tp->max_threads = max_threads;
       tp->stacksize = stacksize;
       PR_INIT_CLIST(&tp->jobq.list);
       PR_INIT_CLIST(&tp->ioq.list);
       PR_INIT_CLIST(&tp->timerq.list);
       PR_INIT_CLIST(&tp->jobq.wthreads);
       PR_INIT_CLIST(&tp->ioq.wthreads);
       PR_INIT_CLIST(&tp->timerq.wthreads);
       tp->shutdown = PR_FALSE;

       PR_Lock(tp->jobq.lock);
       for(i=0; i < initial_threads; ++i) {

              thr = PR_CreateThread(PR_USER_THREAD, wstart,
                                          tp, PR_PRIORITY_NORMAL,
                                          PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize);
              PR_ASSERT(thr);
              wthrp = PR_NEWZAP(wthread);
              PR_ASSERT(wthrp);
              wthrp->thread = thr;
              PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
       }
       tp->current_threads = initial_threads;

       thr = PR_CreateThread(PR_USER_THREAD, io_wstart,
                                   tp, PR_PRIORITY_NORMAL,
                                   PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
       PR_ASSERT(thr);
       wthrp = PR_NEWZAP(wthread);
       PR_ASSERT(wthrp);
       wthrp->thread = thr;
       PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);

       thr = PR_CreateThread(PR_USER_THREAD, timer_wstart,
                                   tp, PR_PRIORITY_NORMAL,
                                   PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
       PR_ASSERT(thr);
       wthrp = PR_NEWZAP(wthread);
       PR_ASSERT(wthrp);
       wthrp->thread = thr;
       PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);

       PR_Unlock(tp->jobq.lock);
       return tp;
}

Here is the call graph for this function:

Here is the caller graph for this function:

PR_JoinJob ( PRJob jobp)

Definition at line 1069 of file prtpool.c.

{
       if (!JOINABLE_JOB(jobp)) {
              PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
              return PR_FAILURE;
       }
       PR_Lock(jobp->tpool->join_lock);
       while(jobp->join_wait)
              PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
       PR_Unlock(jobp->tpool->join_lock);
       delete_job(jobp);
       return PR_SUCCESS;
}

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 1103 of file prtpool.c.

{
PRStatus rval = PR_SUCCESS;
PRCList *head;
PRStatus rval_status;

       PR_Lock(tpool->jobq.lock);
       while (!tpool->shutdown)
              PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);

       /*
        * wakeup worker threads
        */
#ifdef OPT_WINNT
       /*
        * post shutdown notification for all threads
        */
       {
              int i;
              for(i=0; i < tpool->current_threads; i++) {
                     PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0,
                                                                                    TRUE, NULL);
              }
       }
#else
       PR_NotifyAllCondVar(tpool->jobq.cv);
#endif

       /*
        * wakeup io thread(s)
        */
       notify_ioq(tpool);

       /*
        * wakeup timer thread(s)
        */
       PR_Lock(tpool->timerq.lock);
       notify_timerq(tpool);
       PR_Unlock(tpool->timerq.lock);

       while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {
              wthread *wthrp;

              head = PR_LIST_HEAD(&tpool->jobq.wthreads);
              PR_REMOVE_AND_INIT_LINK(head);
              PR_Unlock(tpool->jobq.lock);
              wthrp = WTHREAD_LINKS_PTR(head);
              rval_status = PR_JoinThread(wthrp->thread);
              PR_ASSERT(PR_SUCCESS == rval_status);
              PR_DELETE(wthrp);
              PR_Lock(tpool->jobq.lock);
       }
       PR_Unlock(tpool->jobq.lock);
       while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {
              wthread *wthrp;

              head = PR_LIST_HEAD(&tpool->ioq.wthreads);
              PR_REMOVE_AND_INIT_LINK(head);
              wthrp = WTHREAD_LINKS_PTR(head);
              rval_status = PR_JoinThread(wthrp->thread);
              PR_ASSERT(PR_SUCCESS == rval_status);
              PR_DELETE(wthrp);
       }

       while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {
              wthread *wthrp;

              head = PR_LIST_HEAD(&tpool->timerq.wthreads);
              PR_REMOVE_AND_INIT_LINK(head);
              wthrp = WTHREAD_LINKS_PTR(head);
              rval_status = PR_JoinThread(wthrp->thread);
              PR_ASSERT(PR_SUCCESS == rval_status);
              PR_DELETE(wthrp);
       }

       /*
        * Delete queued jobs
        */
       while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
              PRJob *jobp;

              head = PR_LIST_HEAD(&tpool->jobq.list);
              PR_REMOVE_AND_INIT_LINK(head);
              jobp = JOB_LINKS_PTR(head);
              tpool->jobq.cnt--;
              delete_job(jobp);
       }

       /* delete io jobs */
       while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
              PRJob *jobp;

              head = PR_LIST_HEAD(&tpool->ioq.list);
              PR_REMOVE_AND_INIT_LINK(head);
              tpool->ioq.cnt--;
              jobp = JOB_LINKS_PTR(head);
              delete_job(jobp);
       }

       /* delete timer jobs */
       while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
              PRJob *jobp;

              head = PR_LIST_HEAD(&tpool->timerq.list);
              PR_REMOVE_AND_INIT_LINK(head);
              tpool->timerq.cnt--;
              jobp = JOB_LINKS_PTR(head);
              delete_job(jobp);
       }

       PR_ASSERT(0 == tpool->jobq.cnt);
       PR_ASSERT(0 == tpool->ioq.cnt);
       PR_ASSERT(0 == tpool->timerq.cnt);

       delete_threadpool(tpool);
       return rval;
}

Here is the call graph for this function:

Here is the caller graph for this function:

PR_QueueJob ( PRThreadPool tpool,
PRJobFn  fn,
void arg,
PRBool  joinable 
)

Definition at line 761 of file prtpool.c.

{
       PRJob *jobp;

       jobp = alloc_job(joinable, tpool);
       if (NULL == jobp)
              return NULL;

       jobp->job_func = fn;
       jobp->job_arg = arg;
       jobp->tpool = tpool;

       add_to_jobq(tpool, jobp);
       return jobp;
}

Here is the call graph for this function:

Here is the caller graph for this function:

PR_QueueJob_Accept ( PRThreadPool tpool,
PRJobIoDesc iod,
PRJobFn  fn,
void arg,
PRBool  joinable 
)

Definition at line 879 of file prtpool.c.

{
       return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
}

Here is the call graph for this function:

Here is the caller graph for this function:

PR_QueueJob_Connect ( PRThreadPool tpool,
PRJobIoDesc iod,
const PRNetAddr addr,
PRJobFn  fn,
void arg,
PRBool  joinable 
)

Definition at line 887 of file prtpool.c.

{
       PRStatus rv;
       PRErrorCode err;

       rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);
       if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){
              /* connection pending */
              return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
       } else {
              /*
               * connection succeeded or failed; add to jobq right away
               */
              if (rv == PR_FAILURE)
                     iod->error = err;
              else
                     iod->error = 0;
              return(PR_QueueJob(tpool, fn, arg, joinable));
       }
}

Here is the call graph for this function:

PR_QueueJob_Read ( PRThreadPool tpool,
PRJobIoDesc iod,
PRJobFn  fn,
void arg,
PRBool  joinable 
)

Definition at line 862 of file prtpool.c.

{
       return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
}

Here is the call graph for this function:

Here is the caller graph for this function:

PR_QueueJob_Timer ( PRThreadPool tpool,
PRIntervalTime  timeout,
PRJobFn  fn,
void arg,
PRBool  joinable 
)

Definition at line 911 of file prtpool.c.

{
       PRIntervalTime now;
       PRJob *jobp;

       if (PR_INTERVAL_NO_TIMEOUT == timeout) {
              PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
              return NULL;
       }
       if (PR_INTERVAL_NO_WAIT == timeout) {
              /*
               * no waiting; add to jobq right away
               */
              return(PR_QueueJob(tpool, fn, arg, joinable));
       }
       jobp = alloc_job(joinable, tpool);
       if (NULL == jobp) {
              return NULL;
       }

       /*
        * Add a new job to timer_jobq
        * wakeup timer worker thread
        */

       jobp->job_func = fn;
       jobp->job_arg = arg;
       jobp->tpool = tpool;
       jobp->timeout = timeout;

       now = PR_IntervalNow();
       jobp->absolute = now + timeout;


       PR_Lock(tpool->timerq.lock);
       jobp->on_timerq = PR_TRUE;
       if (PR_CLIST_IS_EMPTY(&tpool->timerq.list))
              PR_APPEND_LINK(&jobp->links,&tpool->timerq.list);
       else {
              PRCList *qp;
              PRJob *tmp_jobp;
              /*
               * insert into the sorted timer jobq
               */
              for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
                                                 qp = qp->prev) {
                     tmp_jobp = JOB_LINKS_PTR(qp);
                     if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
                            break;
                     }
              }
              PR_INSERT_AFTER(&jobp->links,qp);
       }
       tpool->timerq.cnt++;
       /*
        * notify timer worker thread(s)
        */
       notify_timerq(tpool);
       PR_Unlock(tpool->timerq.lock);
       return jobp;
}

Here is the call graph for this function:

Here is the caller graph for this function:

PR_QueueJob_Write ( PRThreadPool tpool,
PRJobIoDesc iod,
PRJobFn  fn,
void arg,
PRBool  joinable 
)

Definition at line 870 of file prtpool.c.

{
       return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
}

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 1085 of file prtpool.c.

Here is the call graph for this function:

Here is the caller graph for this function:

static PRJob* queue_io_job ( PRThreadPool tpool,
PRJobIoDesc iod,
PRJobFn  fn,
void arg,
PRBool  joinable,
io_op_type  op 
) [static]

Definition at line 779 of file prtpool.c.

{
       PRJob *jobp;
       PRIntervalTime now;

       jobp = alloc_job(joinable, tpool);
       if (NULL == jobp) {
              return NULL;
       }

       /*
        * Add a new job to io_jobq
        * wakeup io worker thread
        */

       jobp->job_func = fn;
       jobp->job_arg = arg;
       jobp->tpool = tpool;
       jobp->iod = iod;
       if (JOB_IO_READ == op) {
              jobp->io_op = JOB_IO_READ;
              jobp->io_poll_flags = PR_POLL_READ;
       } else if (JOB_IO_WRITE == op) {
              jobp->io_op = JOB_IO_WRITE;
              jobp->io_poll_flags = PR_POLL_WRITE;
       } else if (JOB_IO_ACCEPT == op) {
              jobp->io_op = JOB_IO_ACCEPT;
              jobp->io_poll_flags = PR_POLL_READ;
       } else if (JOB_IO_CONNECT == op) {
              jobp->io_op = JOB_IO_CONNECT;
              jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT;
       } else {
              delete_job(jobp);
              PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
              return NULL;
       }

       jobp->timeout = iod->timeout;
       if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) ||
                     (PR_INTERVAL_NO_WAIT == iod->timeout)) {
              jobp->absolute = iod->timeout;
       } else {
              now = PR_IntervalNow();
              jobp->absolute = now + iod->timeout;
       }


       PR_Lock(tpool->ioq.lock);

       if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) ||
                     (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) {
              PR_APPEND_LINK(&jobp->links,&tpool->ioq.list);
       } else if (PR_INTERVAL_NO_WAIT == iod->timeout) {
              PR_INSERT_LINK(&jobp->links,&tpool->ioq.list);
       } else {
              PRCList *qp;
              PRJob *tmp_jobp;
              /*
               * insert into the timeout-sorted ioq
               */
              for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list;
                                                 qp = qp->prev) {
                     tmp_jobp = JOB_LINKS_PTR(qp);
                     if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
                            break;
                     }
              }
              PR_INSERT_AFTER(&jobp->links,qp);
       }

       jobp->on_ioq = PR_TRUE;
       tpool->ioq.cnt++;
       /*
        * notify io worker thread(s)
        */
       PR_Unlock(tpool->ioq.lock);
       notify_ioq(tpool);
       return jobp;
}

Here is the call graph for this function:

Here is the caller graph for this function:

static void timer_wstart ( void arg) [static]

Definition at line 518 of file prtpool.c.

{
PRThreadPool *tp = (PRThreadPool *) arg;
PRCList *qp;
PRIntervalTime timeout;
PRIntervalTime now;

       /*
        * call PR_WaitCondVar with minimum value of all timeouts
        */
       while (!tp->shutdown) {
              PRJob *jobp;

              PR_Lock(tp->timerq.lock);
              if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
                     timeout = PR_INTERVAL_NO_TIMEOUT;
              } else {
                     PRCList *qp;

                     qp = tp->timerq.list.next;
                     jobp = JOB_LINKS_PTR(qp);

                     timeout = jobp->absolute - PR_IntervalNow();
            if (timeout <= 0)
                            timeout = PR_INTERVAL_NO_WAIT;  /* already timed out */
              }
              if (PR_INTERVAL_NO_WAIT != timeout)
                     PR_WaitCondVar(tp->timerq.cv, timeout);
              if (tp->shutdown) {
                     PR_Unlock(tp->timerq.lock);
                     break;
              }
              /*
               * move expired-timer jobs to jobq
               */
              now = PR_IntervalNow();     
              while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
                     qp = tp->timerq.list.next;
                     jobp = JOB_LINKS_PTR(qp);

                     if ((PRInt32)(jobp->absolute - now) > 0) {
                            break;
                     }
                     /*
                      * job timed out
                      */
                     PR_REMOVE_AND_INIT_LINK(&jobp->links);
                     tp->timerq.cnt--;
                     jobp->on_timerq = PR_FALSE;
                     add_to_jobq(tp, jobp);
              }
              PR_Unlock(tp->timerq.lock);
       }
}

Here is the call graph for this function:

Here is the caller graph for this function:

static void wstart ( void arg) [static]

Definition at line 189 of file prtpool.c.

{
PRThreadPool *tp = (PRThreadPool *) arg;
PRCList *head;

       /*
        * execute jobs until shutdown
        */
       while (!tp->shutdown) {
              PRJob *jobp;
#ifdef OPT_WINNT
              BOOL rv;
              DWORD unused, shutdown;
              LPOVERLAPPED olp;

              PR_Lock(tp->jobq.lock);
              tp->idle_threads++;
              PR_Unlock(tp->jobq.lock);
              rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port,
                                   &unused, &shutdown, &olp, INFINITE);
              
              PR_ASSERT(rv);
              if (shutdown)
                     break;
              jobp = ((NT_notifier *) olp)->jobp;
              PR_Lock(tp->jobq.lock);
              tp->idle_threads--;
              tp->jobq.cnt--;
              PR_Unlock(tp->jobq.lock);
#else

              PR_Lock(tp->jobq.lock);
              while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) {
                     tp->idle_threads++;
                     PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT);
                     tp->idle_threads--;
              }      
              if (tp->shutdown) {
                     PR_Unlock(tp->jobq.lock);
                     break;
              }
              head = PR_LIST_HEAD(&tp->jobq.list);
              /*
               * remove job from queue
               */
              PR_REMOVE_AND_INIT_LINK(head);
              tp->jobq.cnt--;
              jobp = JOB_LINKS_PTR(head);
              PR_Unlock(tp->jobq.lock);
#endif

              jobp->job_func(jobp->job_arg);
              if (!JOINABLE_JOB(jobp)) {
                     delete_job(jobp);
              } else {
                     JOIN_NOTIFY(jobp);
              }
       }
       PR_Lock(tp->jobq.lock);
       tp->current_threads--;
       PR_Unlock(tp->jobq.lock);
}

Here is the call graph for this function:

Here is the caller graph for this function: