Back to index

lightning-sunbird  0.9+nobinonly
prtpool.c
Go to the documentation of this file.
00001 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
00002 /* ***** BEGIN LICENSE BLOCK *****
00003  * Version: MPL 1.1/GPL 2.0/LGPL 2.1
00004  *
00005  * The contents of this file are subject to the Mozilla Public License Version
00006  * 1.1 (the "License"); you may not use this file except in compliance with
00007  * the License. You may obtain a copy of the License at
00008  * http://www.mozilla.org/MPL/
00009  *
00010  * Software distributed under the License is distributed on an "AS IS" basis,
00011  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
00012  * for the specific language governing rights and limitations under the
00013  * License.
00014  *
00015  * The Original Code is the Netscape Portable Runtime (NSPR).
00016  *
00017  * The Initial Developer of the Original Code is
00018  * Netscape Communications Corporation.
00019  * Portions created by the Initial Developer are Copyright (C) 1999-2000
00020  * the Initial Developer. All Rights Reserved.
00021  *
00022  * Contributor(s):
00023  *
00024  * Alternatively, the contents of this file may be used under the terms of
00025  * either the GNU General Public License Version 2 or later (the "GPL"), or
00026  * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
00027  * in which case the provisions of the GPL or the LGPL are applicable instead
00028  * of those above. If you wish to allow use of your version of this file only
00029  * under the terms of either the GPL or the LGPL, and not to allow others to
00030  * use your version of this file under the terms of the MPL, indicate your
00031  * decision by deleting the provisions above and replace them with the notice
00032  * and other provisions required by the GPL or the LGPL. If you do not delete
00033  * the provisions above, a recipient may use your version of this file under
00034  * the terms of any one of the MPL, the GPL or the LGPL.
00035  *
00036  * ***** END LICENSE BLOCK ***** */
00037 
00038 #include "nspr.h"
00039 
00040 /*
00041  * Thread pools
00042  *     Thread pools create and manage threads to provide support for
00043  *     scheduling jobs onto one or more threads.
00044  *
00045  */
00046 #ifdef OPT_WINNT
00047 #include <windows.h>
00048 #endif
00049 
00050 /*
00051  * worker thread
00052  */
00053 typedef struct wthread {
00054        PRCList              links;
00055        PRThread      *thread;
00056 } wthread;
00057 
00058 /*
00059  * queue of timer jobs
00060  */
00061 typedef struct timer_jobq {
00062        PRCList              list;
00063        PRLock        *lock;
00064        PRCondVar     *cv;
00065        PRInt32              cnt;
00066        PRCList       wthreads;
00067 } timer_jobq;
00068 
00069 /*
00070  * queue of jobs
00071  */
00072 typedef struct tp_jobq {
00073        PRCList              list;
00074        PRInt32              cnt;
00075        PRLock        *lock;
00076        PRCondVar     *cv;
00077        PRCList       wthreads;
00078 #ifdef OPT_WINNT
00079        HANDLE        nt_completion_port;
00080 #endif
00081 } tp_jobq;
00082 
00083 /*
00084  * queue of IO jobs
00085  */
00086 typedef struct io_jobq {
00087        PRCList              list;
00088        PRPollDesc  *pollfds;
00089        PRInt32       npollfds;
00090        PRJob         **polljobs;
00091        PRLock        *lock;
00092        PRInt32              cnt;
00093        PRFileDesc    *notify_fd;
00094        PRCList       wthreads;
00095 } io_jobq;
00096 
00097 /*
00098  * Threadpool
00099  */
00100 struct PRThreadPool {
00101        PRInt32              init_threads;
00102        PRInt32              max_threads;
00103        PRInt32              current_threads;
00104        PRInt32              idle_threads;
00105        PRUint32      stacksize;
00106        tp_jobq              jobq;
00107        io_jobq              ioq;
00108        timer_jobq    timerq;
00109        PRLock        *join_lock;          /* used with jobp->join_cv */
00110        PRCondVar     *shutdown_cv;
00111        PRBool        shutdown;
00112 };
00113 
00114 typedef enum io_op_type
00115        { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type;
00116 
00117 #ifdef OPT_WINNT
00118 typedef struct NT_notifier {
00119        OVERLAPPED overlapped;             /* must be first */
00120        PRJob  *jobp;
00121 } NT_notifier;
00122 #endif
00123 
00124 struct PRJob {
00125        PRCList                     links;        /*     for linking jobs */
00126        PRBool               on_ioq;              /* job on ioq */
00127        PRBool               on_timerq;    /* job on timerq */
00128        PRJobFn                     job_func;
00129        void                 *job_arg;
00130        PRCondVar            *join_cv;
00131        PRBool               join_wait;    /* == PR_TRUE, when waiting to join */
00132        PRCondVar            *cancel_cv;   /* for cancelling IO jobs */
00133        PRBool               cancel_io;    /* for cancelling IO jobs */
00134        PRThreadPool  *tpool;              /* back pointer to thread pool */
00135        PRJobIoDesc          *iod;
00136        io_op_type           io_op;
00137        PRInt16                     io_poll_flags;
00138        PRNetAddr            *netaddr;
00139        PRIntervalTime       timeout;      /* relative value */
00140        PRIntervalTime       absolute;
00141 #ifdef OPT_WINNT
00142        NT_notifier          nt_notifier;  
00143 #endif
00144 };
00145 
00146 #define JOB_LINKS_PTR(_qp) \
00147     ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links)))
00148 
00149 #define WTHREAD_LINKS_PTR(_qp) \
00150     ((wthread *) ((char *) (_qp) - offsetof(wthread, links)))
00151 
00152 #define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)
00153 
00154 #define JOIN_NOTIFY(_jobp)                                                   \
00155                             PR_BEGIN_MACRO                                                 \
00156                             PR_Lock(_jobp->tpool->join_lock);         \
00157                             _jobp->join_wait = PR_FALSE;                     \
00158                             PR_NotifyCondVar(_jobp->join_cv);         \
00159                             PR_Unlock(_jobp->tpool->join_lock);              \
00160                             PR_END_MACRO
00161 
00162 #define CANCEL_IO_JOB(jobp)                                                  \
00163                             PR_BEGIN_MACRO                                                 \
00164                             jobp->cancel_io = PR_FALSE;                      \
00165                             jobp->on_ioq = PR_FALSE;                         \
00166                             PR_REMOVE_AND_INIT_LINK(&jobp->links);    \
00167                             tp->ioq.cnt--;                                                 \
00168                             PR_NotifyCondVar(jobp->cancel_cv);        \
00169                             PR_END_MACRO
00170 
00171 static void delete_job(PRJob *jobp);
00172 static PRThreadPool * alloc_threadpool(void);
00173 static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp);
00174 static void notify_ioq(PRThreadPool *tp);
00175 static void notify_timerq(PRThreadPool *tp);
00176 
00177 /*
00178  * locks are acquired in the following order
00179  *
00180  *     tp->ioq.lock,tp->timerq.lock
00181  *                   |
00182  *                   V
00183  *            tp->jobq->lock              
00184  */
00185 
00186 /*
00187  * worker thread function
00188  */
00189 static void wstart(void *arg)
00190 {
00191 PRThreadPool *tp = (PRThreadPool *) arg;
00192 PRCList *head;
00193 
00194        /*
00195         * execute jobs until shutdown
00196         */
00197        while (!tp->shutdown) {
00198               PRJob *jobp;
00199 #ifdef OPT_WINNT
00200               BOOL rv;
00201               DWORD unused, shutdown;
00202               LPOVERLAPPED olp;
00203 
00204               PR_Lock(tp->jobq.lock);
00205               tp->idle_threads++;
00206               PR_Unlock(tp->jobq.lock);
00207               rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port,
00208                                    &unused, &shutdown, &olp, INFINITE);
00209               
00210               PR_ASSERT(rv);
00211               if (shutdown)
00212                      break;
00213               jobp = ((NT_notifier *) olp)->jobp;
00214               PR_Lock(tp->jobq.lock);
00215               tp->idle_threads--;
00216               tp->jobq.cnt--;
00217               PR_Unlock(tp->jobq.lock);
00218 #else
00219 
00220               PR_Lock(tp->jobq.lock);
00221               while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) {
00222                      tp->idle_threads++;
00223                      PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT);
00224                      tp->idle_threads--;
00225               }      
00226               if (tp->shutdown) {
00227                      PR_Unlock(tp->jobq.lock);
00228                      break;
00229               }
00230               head = PR_LIST_HEAD(&tp->jobq.list);
00231               /*
00232                * remove job from queue
00233                */
00234               PR_REMOVE_AND_INIT_LINK(head);
00235               tp->jobq.cnt--;
00236               jobp = JOB_LINKS_PTR(head);
00237               PR_Unlock(tp->jobq.lock);
00238 #endif
00239 
00240               jobp->job_func(jobp->job_arg);
00241               if (!JOINABLE_JOB(jobp)) {
00242                      delete_job(jobp);
00243               } else {
00244                      JOIN_NOTIFY(jobp);
00245               }
00246        }
00247        PR_Lock(tp->jobq.lock);
00248        tp->current_threads--;
00249        PR_Unlock(tp->jobq.lock);
00250 }
00251 
00252 /*
00253  * add a job to the work queue
00254  */
00255 static void
00256 add_to_jobq(PRThreadPool *tp, PRJob *jobp)
00257 {
00258        /*
00259         * add to jobq
00260         */
00261 #ifdef OPT_WINNT
00262        PR_Lock(tp->jobq.lock);
00263        tp->jobq.cnt++;
00264        PR_Unlock(tp->jobq.lock);
00265        /*
00266         * notify worker thread(s)
00267         */
00268        PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0,
00269             FALSE, &jobp->nt_notifier.overlapped);
00270 #else
00271        PR_Lock(tp->jobq.lock);
00272        PR_APPEND_LINK(&jobp->links,&tp->jobq.list);
00273        tp->jobq.cnt++;
00274        if ((tp->idle_threads < tp->jobq.cnt) &&
00275                                    (tp->current_threads < tp->max_threads)) {
00276               wthread *wthrp;
00277               /*
00278                * increment thread count and unlock the jobq lock
00279                */
00280               tp->current_threads++;
00281               PR_Unlock(tp->jobq.lock);
00282               /* create new worker thread */
00283               wthrp = PR_NEWZAP(wthread);
00284               if (wthrp) {
00285                      wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart,
00286                                           tp, PR_PRIORITY_NORMAL,
00287                                           PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize);
00288                      if (NULL == wthrp->thread) {
00289                             PR_DELETE(wthrp);  /* this sets wthrp to NULL */
00290                      }
00291               }
00292               PR_Lock(tp->jobq.lock);
00293               if (NULL == wthrp) {
00294                      tp->current_threads--;
00295               } else {
00296                      PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
00297               }
00298        }
00299        /*
00300         * wakeup a worker thread
00301         */
00302        PR_NotifyCondVar(tp->jobq.cv);
00303        PR_Unlock(tp->jobq.lock);
00304 #endif
00305 }
00306 
00307 /*
00308  * io worker thread function
00309  */
00310 static void io_wstart(void *arg)
00311 {
00312 PRThreadPool *tp = (PRThreadPool *) arg;
00313 int pollfd_cnt, pollfds_used;
00314 int rv;
00315 PRCList *qp, *nextqp;
00316 PRPollDesc *pollfds;
00317 PRJob **polljobs;
00318 int poll_timeout;
00319 PRIntervalTime now;
00320 
00321        /*
00322         * scan io_jobq
00323         * construct poll list
00324         * call PR_Poll
00325         * for all fds, for which poll returns true, move the job to
00326         * jobq and wakeup worker thread.
00327         */
00328        while (!tp->shutdown) {
00329               PRJob *jobp;
00330 
00331               pollfd_cnt = tp->ioq.cnt + 10;
00332               if (pollfd_cnt > tp->ioq.npollfds) {
00333 
00334                      /*
00335                       * re-allocate pollfd array if the current one is not large
00336                       * enough
00337                       */
00338                      if (NULL != tp->ioq.pollfds)
00339                             PR_Free(tp->ioq.pollfds);
00340                      tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt *
00341                                           (sizeof(PRPollDesc) + sizeof(PRJob *)));
00342                      PR_ASSERT(NULL != tp->ioq.pollfds);
00343                      /*
00344                       * array of pollfds
00345                       */
00346                      pollfds = tp->ioq.pollfds;
00347                      tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]);
00348                      /*
00349                       * parallel array of jobs
00350                       */
00351                      polljobs = tp->ioq.polljobs;
00352                      tp->ioq.npollfds = pollfd_cnt;
00353               }
00354 
00355               pollfds_used = 0;
00356               /*
00357                * add the notify fd; used for unblocking io thread(s)
00358                */
00359               pollfds[pollfds_used].fd = tp->ioq.notify_fd;
00360               pollfds[pollfds_used].in_flags = PR_POLL_READ;
00361               pollfds[pollfds_used].out_flags = 0;
00362               polljobs[pollfds_used] = NULL;
00363               pollfds_used++;
00364               /*
00365                * fill in the pollfd array
00366                */
00367               PR_Lock(tp->ioq.lock);
00368               for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
00369                      nextqp = qp->next;
00370                      jobp = JOB_LINKS_PTR(qp);
00371                      if (jobp->cancel_io) {
00372                             CANCEL_IO_JOB(jobp);
00373                             continue;
00374                      }
00375                      if (pollfds_used == (pollfd_cnt))
00376                             break;
00377                      pollfds[pollfds_used].fd = jobp->iod->socket;
00378                      pollfds[pollfds_used].in_flags = jobp->io_poll_flags;
00379                      pollfds[pollfds_used].out_flags = 0;
00380                      polljobs[pollfds_used] = jobp;
00381 
00382                      pollfds_used++;
00383               }
00384               if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) {
00385                      qp = tp->ioq.list.next;
00386                      jobp = JOB_LINKS_PTR(qp);
00387                      if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
00388                             poll_timeout = PR_INTERVAL_NO_TIMEOUT;
00389                      else if (PR_INTERVAL_NO_WAIT == jobp->timeout)
00390                             poll_timeout = PR_INTERVAL_NO_WAIT;
00391                      else {
00392                             poll_timeout = jobp->absolute - PR_IntervalNow();
00393                             if (poll_timeout <= 0) /* already timed out */
00394                                    poll_timeout = PR_INTERVAL_NO_WAIT;
00395                      }
00396               } else {
00397                      poll_timeout = PR_INTERVAL_NO_TIMEOUT;
00398               }
00399               PR_Unlock(tp->ioq.lock);
00400 
00401               /*
00402                * XXXX
00403                * should retry if more jobs have been added to the queue?
00404                *
00405                */
00406               PR_ASSERT(pollfds_used <= pollfd_cnt);
00407               rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);
00408 
00409               if (tp->shutdown) {
00410                      break;
00411               }
00412 
00413               if (rv > 0) {
00414                      /*
00415                       * at least one io event is set
00416                       */
00417                      PRStatus rval_status;
00418                      PRInt32 index;
00419 
00420                      PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd);
00421                      /*
00422                       * reset the pollable event, if notified
00423                       */
00424                      if (pollfds[0].out_flags & PR_POLL_READ) {
00425                             rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);
00426                             PR_ASSERT(PR_SUCCESS == rval_status);
00427                      }
00428 
00429                      for(index = 1; index < (pollfds_used); index++) {
00430                 PRInt16 events = pollfds[index].in_flags;
00431                 PRInt16 revents = pollfds[index].out_flags;    
00432                             jobp = polljobs[index];     
00433 
00434                 if ((revents & PR_POLL_NVAL) ||  /* busted in all cases */
00435                      (revents & PR_POLL_ERR) ||
00436                                    ((events & PR_POLL_WRITE) &&
00437                                                  (revents & PR_POLL_HUP))) { /* write op & hup */
00438                                    PR_Lock(tp->ioq.lock);
00439                                    if (jobp->cancel_io) {
00440                                           CANCEL_IO_JOB(jobp);
00441                                           PR_Unlock(tp->ioq.lock);
00442                                           continue;
00443                                    }
00444                                    PR_REMOVE_AND_INIT_LINK(&jobp->links);
00445                                    tp->ioq.cnt--;
00446                                    jobp->on_ioq = PR_FALSE;
00447                                    PR_Unlock(tp->ioq.lock);
00448 
00449                                    /* set error */
00450                     if (PR_POLL_NVAL & revents)
00451                                           jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR;
00452                     else if (PR_POLL_HUP & revents)
00453                                           jobp->iod->error = PR_CONNECT_RESET_ERROR;
00454                     else 
00455                                           jobp->iod->error = PR_IO_ERROR;
00456 
00457                                    /*
00458                                     * add to jobq
00459                                     */
00460                                    add_to_jobq(tp, jobp);
00461                             } else if (revents) {
00462                                    /*
00463                                     * add to jobq
00464                                     */
00465                                    PR_Lock(tp->ioq.lock);
00466                                    if (jobp->cancel_io) {
00467                                           CANCEL_IO_JOB(jobp);
00468                                           PR_Unlock(tp->ioq.lock);
00469                                           continue;
00470                                    }
00471                                    PR_REMOVE_AND_INIT_LINK(&jobp->links);
00472                                    tp->ioq.cnt--;
00473                                    jobp->on_ioq = PR_FALSE;
00474                                    PR_Unlock(tp->ioq.lock);
00475 
00476                                    if (jobp->io_op == JOB_IO_CONNECT) {
00477                                           if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS)
00478                                                  jobp->iod->error = 0;
00479                                           else
00480                                                  jobp->iod->error = PR_GetError();
00481                                    } else
00482                                           jobp->iod->error = 0;
00483 
00484                                    add_to_jobq(tp, jobp);
00485                             }
00486                      }
00487               }
00488               /*
00489                * timeout processing
00490                */
00491               now = PR_IntervalNow();
00492               PR_Lock(tp->ioq.lock);
00493               for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
00494                      nextqp = qp->next;
00495                      jobp = JOB_LINKS_PTR(qp);
00496                      if (jobp->cancel_io) {
00497                             CANCEL_IO_JOB(jobp);
00498                             continue;
00499                      }
00500                      if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
00501                             break;
00502                      if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&
00503                                                         ((PRInt32)(jobp->absolute - now) > 0))
00504                             break;
00505                      PR_REMOVE_AND_INIT_LINK(&jobp->links);
00506                      tp->ioq.cnt--;
00507                      jobp->on_ioq = PR_FALSE;
00508                      jobp->iod->error = PR_IO_TIMEOUT_ERROR;
00509                      add_to_jobq(tp, jobp);
00510               }
00511               PR_Unlock(tp->ioq.lock);
00512        }
00513 }
00514 
00515 /*
00516  * timer worker thread function
00517  */
00518 static void timer_wstart(void *arg)
00519 {
00520 PRThreadPool *tp = (PRThreadPool *) arg;
00521 PRCList *qp;
00522 PRIntervalTime timeout;
00523 PRIntervalTime now;
00524 
00525        /*
00526         * call PR_WaitCondVar with minimum value of all timeouts
00527         */
00528        while (!tp->shutdown) {
00529               PRJob *jobp;
00530 
00531               PR_Lock(tp->timerq.lock);
00532               if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
00533                      timeout = PR_INTERVAL_NO_TIMEOUT;
00534               } else {
00535                      PRCList *qp;
00536 
00537                      qp = tp->timerq.list.next;
00538                      jobp = JOB_LINKS_PTR(qp);
00539 
00540                      timeout = jobp->absolute - PR_IntervalNow();
00541             if (timeout <= 0)
00542                             timeout = PR_INTERVAL_NO_WAIT;  /* already timed out */
00543               }
00544               if (PR_INTERVAL_NO_WAIT != timeout)
00545                      PR_WaitCondVar(tp->timerq.cv, timeout);
00546               if (tp->shutdown) {
00547                      PR_Unlock(tp->timerq.lock);
00548                      break;
00549               }
00550               /*
00551                * move expired-timer jobs to jobq
00552                */
00553               now = PR_IntervalNow();     
00554               while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
00555                      qp = tp->timerq.list.next;
00556                      jobp = JOB_LINKS_PTR(qp);
00557 
00558                      if ((PRInt32)(jobp->absolute - now) > 0) {
00559                             break;
00560                      }
00561                      /*
00562                       * job timed out
00563                       */
00564                      PR_REMOVE_AND_INIT_LINK(&jobp->links);
00565                      tp->timerq.cnt--;
00566                      jobp->on_timerq = PR_FALSE;
00567                      add_to_jobq(tp, jobp);
00568               }
00569               PR_Unlock(tp->timerq.lock);
00570        }
00571 }
00572 
00573 static void
00574 delete_threadpool(PRThreadPool *tp)
00575 {
00576        if (NULL != tp) {
00577               if (NULL != tp->shutdown_cv)
00578                      PR_DestroyCondVar(tp->shutdown_cv);
00579               if (NULL != tp->jobq.cv)
00580                      PR_DestroyCondVar(tp->jobq.cv);
00581               if (NULL != tp->jobq.lock)
00582                      PR_DestroyLock(tp->jobq.lock);
00583               if (NULL != tp->join_lock)
00584                      PR_DestroyLock(tp->join_lock);
00585 #ifdef OPT_WINNT
00586               if (NULL != tp->jobq.nt_completion_port)
00587                      CloseHandle(tp->jobq.nt_completion_port);
00588 #endif
00589               /* Timer queue */
00590               if (NULL != tp->timerq.cv)
00591                      PR_DestroyCondVar(tp->timerq.cv);
00592               if (NULL != tp->timerq.lock)
00593                      PR_DestroyLock(tp->timerq.lock);
00594 
00595               if (NULL != tp->ioq.lock)
00596                      PR_DestroyLock(tp->ioq.lock);
00597               if (NULL != tp->ioq.pollfds)
00598                      PR_Free(tp->ioq.pollfds);
00599               if (NULL != tp->ioq.notify_fd)
00600                      PR_DestroyPollableEvent(tp->ioq.notify_fd);
00601               PR_Free(tp);
00602        }
00603        return;
00604 }
00605 
00606 static PRThreadPool *
00607 alloc_threadpool(void)
00608 {
00609 PRThreadPool *tp;
00610 
00611        tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp));
00612        if (NULL == tp)
00613               goto failed;
00614        tp->jobq.lock = PR_NewLock();
00615        if (NULL == tp->jobq.lock)
00616               goto failed;
00617        tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
00618        if (NULL == tp->jobq.cv)
00619               goto failed;
00620        tp->join_lock = PR_NewLock();
00621        if (NULL == tp->join_lock)
00622               goto failed;
00623 #ifdef OPT_WINNT
00624        tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
00625                                                                NULL, 0, 0);
00626        if (NULL == tp->jobq.nt_completion_port)
00627               goto failed;
00628 #endif
00629 
00630        tp->ioq.lock = PR_NewLock();
00631        if (NULL == tp->ioq.lock)
00632               goto failed;
00633 
00634        /* Timer queue */
00635 
00636        tp->timerq.lock = PR_NewLock();
00637        if (NULL == tp->timerq.lock)
00638               goto failed;
00639        tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);
00640        if (NULL == tp->timerq.cv)
00641               goto failed;
00642 
00643        tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);
00644        if (NULL == tp->shutdown_cv)
00645               goto failed;
00646        tp->ioq.notify_fd = PR_NewPollableEvent();
00647        if (NULL == tp->ioq.notify_fd)
00648               goto failed;
00649        return tp;
00650 failed:
00651        delete_threadpool(tp);
00652        PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
00653        return NULL;
00654 }
00655 
00656 /* Create thread pool */
00657 PR_IMPLEMENT(PRThreadPool *)
00658 PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads,
00659                                 PRUint32 stacksize)
00660 {
00661 PRThreadPool *tp;
00662 PRThread *thr;
00663 int i;
00664 wthread *wthrp;
00665 
00666        tp = alloc_threadpool();
00667        if (NULL == tp)
00668               return NULL;
00669 
00670        tp->init_threads = initial_threads;
00671        tp->max_threads = max_threads;
00672        tp->stacksize = stacksize;
00673        PR_INIT_CLIST(&tp->jobq.list);
00674        PR_INIT_CLIST(&tp->ioq.list);
00675        PR_INIT_CLIST(&tp->timerq.list);
00676        PR_INIT_CLIST(&tp->jobq.wthreads);
00677        PR_INIT_CLIST(&tp->ioq.wthreads);
00678        PR_INIT_CLIST(&tp->timerq.wthreads);
00679        tp->shutdown = PR_FALSE;
00680 
00681        PR_Lock(tp->jobq.lock);
00682        for(i=0; i < initial_threads; ++i) {
00683 
00684               thr = PR_CreateThread(PR_USER_THREAD, wstart,
00685                                           tp, PR_PRIORITY_NORMAL,
00686                                           PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize);
00687               PR_ASSERT(thr);
00688               wthrp = PR_NEWZAP(wthread);
00689               PR_ASSERT(wthrp);
00690               wthrp->thread = thr;
00691               PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
00692        }
00693        tp->current_threads = initial_threads;
00694 
00695        thr = PR_CreateThread(PR_USER_THREAD, io_wstart,
00696                                    tp, PR_PRIORITY_NORMAL,
00697                                    PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
00698        PR_ASSERT(thr);
00699        wthrp = PR_NEWZAP(wthread);
00700        PR_ASSERT(wthrp);
00701        wthrp->thread = thr;
00702        PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);
00703 
00704        thr = PR_CreateThread(PR_USER_THREAD, timer_wstart,
00705                                    tp, PR_PRIORITY_NORMAL,
00706                                    PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
00707        PR_ASSERT(thr);
00708        wthrp = PR_NEWZAP(wthread);
00709        PR_ASSERT(wthrp);
00710        wthrp->thread = thr;
00711        PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);
00712 
00713        PR_Unlock(tp->jobq.lock);
00714        return tp;
00715 }
00716 
00717 static void
00718 delete_job(PRJob *jobp)
00719 {
00720        if (NULL != jobp) {
00721               if (NULL != jobp->join_cv) {
00722                      PR_DestroyCondVar(jobp->join_cv);
00723                      jobp->join_cv = NULL;
00724               }
00725               if (NULL != jobp->cancel_cv) {
00726                      PR_DestroyCondVar(jobp->cancel_cv);
00727                      jobp->cancel_cv = NULL;
00728               }
00729               PR_DELETE(jobp);
00730        }
00731 }
00732 
00733 static PRJob *
00734 alloc_job(PRBool joinable, PRThreadPool *tp)
00735 {
00736        PRJob *jobp;
00737 
00738        jobp = PR_NEWZAP(PRJob);
00739        if (NULL == jobp) 
00740               goto failed;
00741        if (joinable) {
00742               jobp->join_cv = PR_NewCondVar(tp->join_lock);
00743               jobp->join_wait = PR_TRUE;
00744               if (NULL == jobp->join_cv)
00745                      goto failed;
00746        } else {
00747               jobp->join_cv = NULL;
00748        }
00749 #ifdef OPT_WINNT
00750        jobp->nt_notifier.jobp = jobp;
00751 #endif
00752        return jobp;
00753 failed:
00754        delete_job(jobp);
00755        PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
00756        return NULL;
00757 }
00758 
00759 /* queue a job */
00760 PR_IMPLEMENT(PRJob *)
00761 PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable)
00762 {
00763        PRJob *jobp;
00764 
00765        jobp = alloc_job(joinable, tpool);
00766        if (NULL == jobp)
00767               return NULL;
00768 
00769        jobp->job_func = fn;
00770        jobp->job_arg = arg;
00771        jobp->tpool = tpool;
00772 
00773        add_to_jobq(tpool, jobp);
00774        return jobp;
00775 }
00776 
00777 /* queue a job, when a socket is readable or writeable */
00778 static PRJob *
00779 queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
00780                             PRBool joinable, io_op_type op)
00781 {
00782        PRJob *jobp;
00783        PRIntervalTime now;
00784 
00785        jobp = alloc_job(joinable, tpool);
00786        if (NULL == jobp) {
00787               return NULL;
00788        }
00789 
00790        /*
00791         * Add a new job to io_jobq
00792         * wakeup io worker thread
00793         */
00794 
00795        jobp->job_func = fn;
00796        jobp->job_arg = arg;
00797        jobp->tpool = tpool;
00798        jobp->iod = iod;
00799        if (JOB_IO_READ == op) {
00800               jobp->io_op = JOB_IO_READ;
00801               jobp->io_poll_flags = PR_POLL_READ;
00802        } else if (JOB_IO_WRITE == op) {
00803               jobp->io_op = JOB_IO_WRITE;
00804               jobp->io_poll_flags = PR_POLL_WRITE;
00805        } else if (JOB_IO_ACCEPT == op) {
00806               jobp->io_op = JOB_IO_ACCEPT;
00807               jobp->io_poll_flags = PR_POLL_READ;
00808        } else if (JOB_IO_CONNECT == op) {
00809               jobp->io_op = JOB_IO_CONNECT;
00810               jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT;
00811        } else {
00812               delete_job(jobp);
00813               PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
00814               return NULL;
00815        }
00816 
00817        jobp->timeout = iod->timeout;
00818        if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) ||
00819                      (PR_INTERVAL_NO_WAIT == iod->timeout)) {
00820               jobp->absolute = iod->timeout;
00821        } else {
00822               now = PR_IntervalNow();
00823               jobp->absolute = now + iod->timeout;
00824        }
00825 
00826 
00827        PR_Lock(tpool->ioq.lock);
00828 
00829        if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) ||
00830                      (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) {
00831               PR_APPEND_LINK(&jobp->links,&tpool->ioq.list);
00832        } else if (PR_INTERVAL_NO_WAIT == iod->timeout) {
00833               PR_INSERT_LINK(&jobp->links,&tpool->ioq.list);
00834        } else {
00835               PRCList *qp;
00836               PRJob *tmp_jobp;
00837               /*
00838                * insert into the timeout-sorted ioq
00839                */
00840               for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list;
00841                                                  qp = qp->prev) {
00842                      tmp_jobp = JOB_LINKS_PTR(qp);
00843                      if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
00844                             break;
00845                      }
00846               }
00847               PR_INSERT_AFTER(&jobp->links,qp);
00848        }
00849 
00850        jobp->on_ioq = PR_TRUE;
00851        tpool->ioq.cnt++;
00852        /*
00853         * notify io worker thread(s)
00854         */
00855        PR_Unlock(tpool->ioq.lock);
00856        notify_ioq(tpool);
00857        return jobp;
00858 }
00859 
00860 /* queue a job, when a socket is readable */
00861 PR_IMPLEMENT(PRJob *)
00862 PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
00863                                                                              PRBool joinable)
00864 {
00865        return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
00866 }
00867 
00868 /* queue a job, when a socket is writeable */
00869 PR_IMPLEMENT(PRJob *)
00870 PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg,
00871                                                                       PRBool joinable)
00872 {
00873        return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
00874 }
00875 
00876 
00877 /* queue a job, when a socket has a pending connection */
00878 PR_IMPLEMENT(PRJob *)
00879 PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,
00880                                                         void * arg, PRBool joinable)
00881 {
00882        return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
00883 }
00884 
00885 /* queue a job, when a socket can be connected */
00886 PR_IMPLEMENT(PRJob *)
00887 PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod,
00888                      const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable)
00889 {
00890        PRStatus rv;
00891        PRErrorCode err;
00892 
00893        rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);
00894        if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){
00895               /* connection pending */
00896               return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
00897        } else {
00898               /*
00899                * connection succeeded or failed; add to jobq right away
00900                */
00901               if (rv == PR_FAILURE)
00902                      iod->error = err;
00903               else
00904                      iod->error = 0;
00905               return(PR_QueueJob(tpool, fn, arg, joinable));
00906        }
00907 }
00908 
00909 /* queue a job, when a timer expires */
00910 PR_IMPLEMENT(PRJob *)
00911 PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout,
00912                                                  PRJobFn fn, void * arg, PRBool joinable)
00913 {
00914        PRIntervalTime now;
00915        PRJob *jobp;
00916 
00917        if (PR_INTERVAL_NO_TIMEOUT == timeout) {
00918               PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
00919               return NULL;
00920        }
00921        if (PR_INTERVAL_NO_WAIT == timeout) {
00922               /*
00923                * no waiting; add to jobq right away
00924                */
00925               return(PR_QueueJob(tpool, fn, arg, joinable));
00926        }
00927        jobp = alloc_job(joinable, tpool);
00928        if (NULL == jobp) {
00929               return NULL;
00930        }
00931 
00932        /*
00933         * Add a new job to timer_jobq
00934         * wakeup timer worker thread
00935         */
00936 
00937        jobp->job_func = fn;
00938        jobp->job_arg = arg;
00939        jobp->tpool = tpool;
00940        jobp->timeout = timeout;
00941 
00942        now = PR_IntervalNow();
00943        jobp->absolute = now + timeout;
00944 
00945 
00946        PR_Lock(tpool->timerq.lock);
00947        jobp->on_timerq = PR_TRUE;
00948        if (PR_CLIST_IS_EMPTY(&tpool->timerq.list))
00949               PR_APPEND_LINK(&jobp->links,&tpool->timerq.list);
00950        else {
00951               PRCList *qp;
00952               PRJob *tmp_jobp;
00953               /*
00954                * insert into the sorted timer jobq
00955                */
00956               for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
00957                                                  qp = qp->prev) {
00958                      tmp_jobp = JOB_LINKS_PTR(qp);
00959                      if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
00960                             break;
00961                      }
00962               }
00963               PR_INSERT_AFTER(&jobp->links,qp);
00964        }
00965        tpool->timerq.cnt++;
00966        /*
00967         * notify timer worker thread(s)
00968         */
00969        notify_timerq(tpool);
00970        PR_Unlock(tpool->timerq.lock);
00971        return jobp;
00972 }
00973 
00974 static void
00975 notify_timerq(PRThreadPool *tp)
00976 {
00977        /*
00978         * wakeup the timer thread(s)
00979         */
00980        PR_NotifyCondVar(tp->timerq.cv);
00981 }
00982 
00983 static void
00984 notify_ioq(PRThreadPool *tp)
00985 {
00986 PRStatus rval_status;
00987 
00988        /*
00989         * wakeup the io thread(s)
00990         */
00991        rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);
00992        PR_ASSERT(PR_SUCCESS == rval_status);
00993 }
00994 
00995 /*
00996  * cancel a job
00997  *
00998  *     XXXX: is this needed? likely to be removed
00999  */
01000 PR_IMPLEMENT(PRStatus)
01001 PR_CancelJob(PRJob *jobp) {
01002 
01003        PRStatus rval = PR_FAILURE;
01004        PRThreadPool *tp;
01005 
01006        if (jobp->on_timerq) {
01007               /*
01008                * now, check again while holding the timerq lock
01009                */
01010               tp = jobp->tpool;
01011               PR_Lock(tp->timerq.lock);
01012               if (jobp->on_timerq) {
01013                      jobp->on_timerq = PR_FALSE;
01014                      PR_REMOVE_AND_INIT_LINK(&jobp->links);
01015                      tp->timerq.cnt--;
01016                      PR_Unlock(tp->timerq.lock);
01017                      if (!JOINABLE_JOB(jobp)) {
01018                             delete_job(jobp);
01019                      } else {
01020                             JOIN_NOTIFY(jobp);
01021                      }
01022                      rval = PR_SUCCESS;
01023               } else
01024                      PR_Unlock(tp->timerq.lock);
01025        } else if (jobp->on_ioq) {
01026               /*
01027                * now, check again while holding the ioq lock
01028                */
01029               tp = jobp->tpool;
01030               PR_Lock(tp->ioq.lock);
01031               if (jobp->on_ioq) {
01032                      jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
01033                      if (NULL == jobp->cancel_cv) {
01034                             PR_Unlock(tp->ioq.lock);
01035                             PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
01036                             return PR_FAILURE;
01037                      }
01038                      /*
01039                       * mark job 'cancelled' and notify io thread(s)
01040                       * XXXX:
01041                       *            this assumes there is only one io thread; when there
01042                       *            are multiple threads, the io thread processing this job
01043                       *            must be notified.
01044                       */
01045                      jobp->cancel_io = PR_TRUE;
01046                      PR_Unlock(tp->ioq.lock);    /* release, reacquire ioq lock */
01047                      notify_ioq(tp);
01048                      PR_Lock(tp->ioq.lock);
01049                      while (jobp->cancel_io)
01050                             PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
01051                      PR_Unlock(tp->ioq.lock);
01052                      PR_ASSERT(!jobp->on_ioq);
01053                      if (!JOINABLE_JOB(jobp)) {
01054                             delete_job(jobp);
01055                      } else {
01056                             JOIN_NOTIFY(jobp);
01057                      }
01058                      rval = PR_SUCCESS;
01059               } else
01060                      PR_Unlock(tp->ioq.lock);
01061        }
01062        if (PR_FAILURE == rval)
01063               PR_SetError(PR_INVALID_STATE_ERROR, 0);
01064        return rval;
01065 }
01066 
01067 /* join a job, wait until completion */
01068 PR_IMPLEMENT(PRStatus)
01069 PR_JoinJob(PRJob *jobp)
01070 {
01071        if (!JOINABLE_JOB(jobp)) {
01072               PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
01073               return PR_FAILURE;
01074        }
01075        PR_Lock(jobp->tpool->join_lock);
01076        while(jobp->join_wait)
01077               PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
01078        PR_Unlock(jobp->tpool->join_lock);
01079        delete_job(jobp);
01080        return PR_SUCCESS;
01081 }
01082 
01083 /* shutdown threadpool */
01084 PR_IMPLEMENT(PRStatus)
01085 PR_ShutdownThreadPool(PRThreadPool *tpool)
01086 {
01087 PRStatus rval = PR_SUCCESS;
01088 
01089        PR_Lock(tpool->jobq.lock);
01090        tpool->shutdown = PR_TRUE;
01091        PR_NotifyAllCondVar(tpool->shutdown_cv);
01092        PR_Unlock(tpool->jobq.lock);
01093 
01094        return rval;
01095 }
01096 
01097 /*
01098  * join thread pool
01099  *     wait for termination of worker threads
01100  *     reclaim threadpool resources
01101  */
01102 PR_IMPLEMENT(PRStatus)
01103 PR_JoinThreadPool(PRThreadPool *tpool)
01104 {
01105 PRStatus rval = PR_SUCCESS;
01106 PRCList *head;
01107 PRStatus rval_status;
01108 
01109        PR_Lock(tpool->jobq.lock);
01110        while (!tpool->shutdown)
01111               PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);
01112 
01113        /*
01114         * wakeup worker threads
01115         */
01116 #ifdef OPT_WINNT
01117        /*
01118         * post shutdown notification for all threads
01119         */
01120        {
01121               int i;
01122               for(i=0; i < tpool->current_threads; i++) {
01123                      PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0,
01124                                                                                     TRUE, NULL);
01125               }
01126        }
01127 #else
01128        PR_NotifyAllCondVar(tpool->jobq.cv);
01129 #endif
01130 
01131        /*
01132         * wakeup io thread(s)
01133         */
01134        notify_ioq(tpool);
01135 
01136        /*
01137         * wakeup timer thread(s)
01138         */
01139        PR_Lock(tpool->timerq.lock);
01140        notify_timerq(tpool);
01141        PR_Unlock(tpool->timerq.lock);
01142 
01143        while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {
01144               wthread *wthrp;
01145 
01146               head = PR_LIST_HEAD(&tpool->jobq.wthreads);
01147               PR_REMOVE_AND_INIT_LINK(head);
01148               PR_Unlock(tpool->jobq.lock);
01149               wthrp = WTHREAD_LINKS_PTR(head);
01150               rval_status = PR_JoinThread(wthrp->thread);
01151               PR_ASSERT(PR_SUCCESS == rval_status);
01152               PR_DELETE(wthrp);
01153               PR_Lock(tpool->jobq.lock);
01154        }
01155        PR_Unlock(tpool->jobq.lock);
01156        while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {
01157               wthread *wthrp;
01158 
01159               head = PR_LIST_HEAD(&tpool->ioq.wthreads);
01160               PR_REMOVE_AND_INIT_LINK(head);
01161               wthrp = WTHREAD_LINKS_PTR(head);
01162               rval_status = PR_JoinThread(wthrp->thread);
01163               PR_ASSERT(PR_SUCCESS == rval_status);
01164               PR_DELETE(wthrp);
01165        }
01166 
01167        while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {
01168               wthread *wthrp;
01169 
01170               head = PR_LIST_HEAD(&tpool->timerq.wthreads);
01171               PR_REMOVE_AND_INIT_LINK(head);
01172               wthrp = WTHREAD_LINKS_PTR(head);
01173               rval_status = PR_JoinThread(wthrp->thread);
01174               PR_ASSERT(PR_SUCCESS == rval_status);
01175               PR_DELETE(wthrp);
01176        }
01177 
01178        /*
01179         * Delete queued jobs
01180         */
01181        while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
01182               PRJob *jobp;
01183 
01184               head = PR_LIST_HEAD(&tpool->jobq.list);
01185               PR_REMOVE_AND_INIT_LINK(head);
01186               jobp = JOB_LINKS_PTR(head);
01187               tpool->jobq.cnt--;
01188               delete_job(jobp);
01189        }
01190 
01191        /* delete io jobs */
01192        while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
01193               PRJob *jobp;
01194 
01195               head = PR_LIST_HEAD(&tpool->ioq.list);
01196               PR_REMOVE_AND_INIT_LINK(head);
01197               tpool->ioq.cnt--;
01198               jobp = JOB_LINKS_PTR(head);
01199               delete_job(jobp);
01200        }
01201 
01202        /* delete timer jobs */
01203        while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
01204               PRJob *jobp;
01205 
01206               head = PR_LIST_HEAD(&tpool->timerq.list);
01207               PR_REMOVE_AND_INIT_LINK(head);
01208               tpool->timerq.cnt--;
01209               jobp = JOB_LINKS_PTR(head);
01210               delete_job(jobp);
01211        }
01212 
01213        PR_ASSERT(0 == tpool->jobq.cnt);
01214        PR_ASSERT(0 == tpool->ioq.cnt);
01215        PR_ASSERT(0 == tpool->timerq.cnt);
01216 
01217        delete_threadpool(tpool);
01218        return rval;
01219 }