Back to index

lightning-sunbird  0.9+nobinonly
prmwait.c
Go to the documentation of this file.
00001 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
00002 /* ***** BEGIN LICENSE BLOCK *****
00003  * Version: MPL 1.1/GPL 2.0/LGPL 2.1
00004  *
00005  * The contents of this file are subject to the Mozilla Public License Version
00006  * 1.1 (the "License"); you may not use this file except in compliance with
00007  * the License. You may obtain a copy of the License at
00008  * http://www.mozilla.org/MPL/
00009  *
00010  * Software distributed under the License is distributed on an "AS IS" basis,
00011  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
00012  * for the specific language governing rights and limitations under the
00013  * License.
00014  *
00015  * The Original Code is the Netscape Portable Runtime (NSPR).
00016  *
00017  * The Initial Developer of the Original Code is
00018  * Netscape Communications Corporation.
00019  * Portions created by the Initial Developer are Copyright (C) 1998-2000
00020  * the Initial Developer. All Rights Reserved.
00021  *
00022  * Contributor(s):
00023  *
00024  * Alternatively, the contents of this file may be used under the terms of
00025  * either the GNU General Public License Version 2 or later (the "GPL"), or
00026  * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
00027  * in which case the provisions of the GPL or the LGPL are applicable instead
00028  * of those above. If you wish to allow use of your version of this file only
00029  * under the terms of either the GPL or the LGPL, and not to allow others to
00030  * use your version of this file under the terms of the MPL, indicate your
00031  * decision by deleting the provisions above and replace them with the notice
00032  * and other provisions required by the GPL or the LGPL. If you do not delete
00033  * the provisions above, a recipient may use your version of this file under
00034  * the terms of any one of the MPL, the GPL or the LGPL.
00035  *
00036  * ***** END LICENSE BLOCK ***** */
00037 
00038 #include "primpl.h"
00039 #include "pprmwait.h"
00040 
00041 #define _MW_REHASH_MAX 11
00042 
00043 static PRLock *mw_lock = NULL;
00044 static _PRGlobalState *mw_state = NULL;
00045 
00046 static PRIntervalTime max_polling_interval;
00047 
00048 #ifdef WINNT
00049 
00050 typedef struct TimerEvent {
00051     PRIntervalTime absolute;
00052     void (*func)(void *);
00053     void *arg;
00054     LONG ref_count;
00055     PRCList links;
00056 } TimerEvent;
00057 
00058 #define TIMER_EVENT_PTR(_qp) \
00059     ((TimerEvent *) ((char *) (_qp) - offsetof(TimerEvent, links)))
00060 
00061 struct {
00062     PRLock *ml;
00063     PRCondVar *new_timer;
00064     PRCondVar *cancel_timer;
00065     PRThread *manager_thread;
00066     PRCList timer_queue;
00067 } tm_vars;
00068 
00069 static PRStatus TimerInit(void);
00070 static void TimerManager(void *arg);
00071 static TimerEvent *CreateTimer(PRIntervalTime timeout,
00072     void (*func)(void *), void *arg);
00073 static PRBool CancelTimer(TimerEvent *timer);
00074 
00075 static void TimerManager(void *arg)
00076 {
00077     PRIntervalTime now;
00078     PRIntervalTime timeout;
00079     PRCList *head;
00080     TimerEvent *timer;
00081 
00082     PR_Lock(tm_vars.ml);
00083     while (1)
00084     {
00085         if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue))
00086         {
00087             PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT);
00088         }
00089         else
00090         {
00091             now = PR_IntervalNow();
00092             head = PR_LIST_HEAD(&tm_vars.timer_queue);
00093             timer = TIMER_EVENT_PTR(head);
00094             if ((PRInt32) (now - timer->absolute) >= 0)
00095             {
00096                 PR_REMOVE_LINK(head);
00097                 /*
00098                  * make its prev and next point to itself so that
00099                  * it's obvious that it's not on the timer_queue.
00100                  */
00101                 PR_INIT_CLIST(head);
00102                 PR_ASSERT(2 == timer->ref_count);
00103                 PR_Unlock(tm_vars.ml);
00104                 timer->func(timer->arg);
00105                 PR_Lock(tm_vars.ml);
00106                 timer->ref_count -= 1;
00107                 if (0 == timer->ref_count)
00108                 {
00109                     PR_NotifyAllCondVar(tm_vars.cancel_timer);
00110                 }
00111             }
00112             else
00113             {
00114                 timeout = (PRIntervalTime)(timer->absolute - now);
00115                 PR_WaitCondVar(tm_vars.new_timer, timeout);
00116             } 
00117         }
00118     }
00119     PR_Unlock(tm_vars.ml);
00120 }
00121 
00122 static TimerEvent *CreateTimer(
00123     PRIntervalTime timeout,
00124     void (*func)(void *),
00125     void *arg)
00126 {
00127     TimerEvent *timer;
00128     PRCList *links, *tail;
00129     TimerEvent *elem;
00130 
00131     timer = PR_NEW(TimerEvent);
00132     if (NULL == timer)
00133     {
00134         PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
00135         return timer;
00136     }
00137     timer->absolute = PR_IntervalNow() + timeout;
00138     timer->func = func;
00139     timer->arg = arg;
00140     timer->ref_count = 2;
00141     PR_Lock(tm_vars.ml);
00142     tail = links = PR_LIST_TAIL(&tm_vars.timer_queue);
00143     while (links->prev != tail)
00144     {
00145         elem = TIMER_EVENT_PTR(links);
00146         if ((PRInt32)(timer->absolute - elem->absolute) >= 0)
00147         {
00148             break;
00149         }
00150         links = links->prev;
00151     }
00152     PR_INSERT_AFTER(&timer->links, links);
00153     PR_NotifyCondVar(tm_vars.new_timer);
00154     PR_Unlock(tm_vars.ml);
00155     return timer;
00156 }
00157 
00158 static PRBool CancelTimer(TimerEvent *timer)
00159 {
00160     PRBool canceled = PR_FALSE;
00161 
00162     PR_Lock(tm_vars.ml);
00163     timer->ref_count -= 1;
00164     if (timer->links.prev == &timer->links)
00165     {
00166         while (timer->ref_count == 1)
00167         {
00168             PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT);
00169         }
00170     }
00171     else
00172     {
00173         PR_REMOVE_LINK(&timer->links);
00174         canceled = PR_TRUE;
00175     }
00176     PR_Unlock(tm_vars.ml);
00177     PR_DELETE(timer);
00178     return canceled; 
00179 }
00180 
00181 static PRStatus TimerInit(void)
00182 {
00183     tm_vars.ml = PR_NewLock();
00184     if (NULL == tm_vars.ml)
00185     {
00186         goto failed;
00187     }
00188     tm_vars.new_timer = PR_NewCondVar(tm_vars.ml);
00189     if (NULL == tm_vars.new_timer)
00190     {
00191         goto failed;
00192     }
00193     tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml);
00194     if (NULL == tm_vars.cancel_timer)
00195     {
00196         goto failed;
00197     }
00198     PR_INIT_CLIST(&tm_vars.timer_queue);
00199     tm_vars.manager_thread = PR_CreateThread(
00200         PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL,
00201         PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0);
00202     if (NULL == tm_vars.manager_thread)
00203     {
00204         goto failed;
00205     }
00206     return PR_SUCCESS;
00207 
00208 failed:
00209     if (NULL != tm_vars.cancel_timer)
00210     {
00211         PR_DestroyCondVar(tm_vars.cancel_timer);
00212     }
00213     if (NULL != tm_vars.new_timer)
00214     {
00215         PR_DestroyCondVar(tm_vars.new_timer);
00216     }
00217     if (NULL != tm_vars.ml)
00218     {
00219         PR_DestroyLock(tm_vars.ml);
00220     }
00221     return PR_FAILURE;
00222 }
00223 
00224 #endif /* WINNT */
00225 
00226 /******************************************************************/
00227 /******************************************************************/
00228 /************************ The private portion *********************/
00229 /******************************************************************/
00230 /******************************************************************/
00231 void _PR_InitMW(void)
00232 {
00233 #ifdef WINNT
00234     /*
00235      * We use NT 4's InterlockedCompareExchange() to operate
00236      * on PRMWStatus variables.
00237      */
00238     PR_ASSERT(sizeof(PVOID) == sizeof(PRMWStatus));
00239     TimerInit();
00240 #endif
00241     mw_lock = PR_NewLock();
00242     PR_ASSERT(NULL != mw_lock);
00243     mw_state = PR_NEWZAP(_PRGlobalState);
00244     PR_ASSERT(NULL != mw_state);
00245     PR_INIT_CLIST(&mw_state->group_list);
00246     max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL);
00247 }  /* _PR_InitMW */
00248 
00249 void _PR_CleanupMW(void)
00250 {
00251     PR_DestroyLock(mw_lock);
00252     mw_lock = NULL;
00253     if (mw_state->group) {
00254         PR_DestroyWaitGroup(mw_state->group);
00255         /* mw_state->group is set to NULL as a side effect. */
00256     }
00257     PR_DELETE(mw_state);
00258 }  /* _PR_CleanupMW */
00259 
00260 static PRWaitGroup *MW_Init2(void)
00261 {
00262     PRWaitGroup *group = mw_state->group;  /* it's the null group */
00263     if (NULL == group)  /* there is this special case */
00264     {
00265         group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH);
00266         if (NULL == group) goto failed_alloc;
00267         PR_Lock(mw_lock);
00268         if (NULL == mw_state->group)
00269         {
00270             mw_state->group = group;
00271             group = NULL;
00272         }
00273         PR_Unlock(mw_lock);
00274         if (group != NULL) (void)PR_DestroyWaitGroup(group);
00275         group = mw_state->group;  /* somebody beat us to it */
00276     }
00277 failed_alloc:
00278     return group;  /* whatever */
00279 }  /* MW_Init2 */
00280 
00281 static _PR_HashStory MW_AddHashInternal(PRRecvWait *desc, _PRWaiterHash *hash)
00282 {
00283     /*
00284     ** The entries are put in the table using the fd (PRFileDesc*) of
00285     ** the receive descriptor as the key. This allows us to locate
00286     ** the appropriate entry aqain when the poll operation finishes.
00287     **
00288     ** The pointer to the file descriptor object is first divided by
00289     ** the natural alignment of a pointer in the belief that object
00290     ** will have at least that many zeros in the low order bits.
00291     ** This may not be a good assuption.
00292     **
00293     ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After
00294     ** that we declare defeat and force the table to be reconstructed.
00295     ** Since some fds might be added more than once, won't that cause
00296     ** collisions even in an empty table?
00297     */
00298     PRIntn rehash = _MW_REHASH_MAX;
00299     PRRecvWait **waiter;
00300     PRUintn hidx = _MW_HASH(desc->fd, hash->length);
00301     PRUintn hoffset = 0;
00302 
00303     while (rehash-- > 0)
00304     {
00305         waiter = &hash->recv_wait;
00306         if (NULL == waiter[hidx])
00307         {
00308             waiter[hidx] = desc;
00309             hash->count += 1;
00310 #if 0
00311             printf("Adding 0x%x->0x%x ", desc, desc->fd);
00312             printf(
00313                 "table[%u:%u:*%u]: 0x%x->0x%x\n",
00314                 hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
00315 #endif
00316             return _prmw_success;
00317         }
00318         if (desc == waiter[hidx])
00319         {
00320             PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);  /* desc already in table */
00321             return _prmw_error;
00322         }
00323 #if 0
00324         printf("Failing 0x%x->0x%x ", desc, desc->fd);
00325         printf(
00326             "table[*%u:%u:%u]: 0x%x->0x%x\n",
00327             hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
00328 #endif
00329         if (0 == hoffset)
00330         {
00331             hoffset = _MW_HASH2(desc->fd, hash->length);
00332             PR_ASSERT(0 != hoffset);
00333         }
00334         hidx = (hidx + hoffset) % (hash->length);
00335     }
00336     return _prmw_rehash;    
00337 }  /* MW_AddHashInternal */
00338 
00339 static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup *group)
00340 {
00341     PRRecvWait **desc;
00342     PRUint32 pidx, length;
00343     _PRWaiterHash *newHash, *oldHash = group->waiter;
00344     PRBool retry;
00345     _PR_HashStory hrv;
00346 
00347     static const PRInt32 prime_number[] = {
00348         _PR_DEFAULT_HASH_LENGTH, 179, 521, 907, 1427,
00349         2711, 3917, 5021, 8219, 11549, 18911, 26711, 33749, 44771};
00350     PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32));
00351 
00352     /* look up the next size we'd like to use for the hash table */
00353     for (pidx = 0; pidx < primes; ++pidx)
00354     {
00355         if (prime_number[pidx] == oldHash->length)
00356         {
00357             break;
00358         }
00359     }
00360     /* table size must be one of the prime numbers */
00361     PR_ASSERT(pidx < primes);
00362 
00363     /* if pidx == primes - 1, we can't expand the table any more */
00364     while (pidx < primes - 1)
00365     {
00366         /* next size */
00367         ++pidx;
00368         length = prime_number[pidx];
00369 
00370         /* allocate the new hash table and fill it in with the old */
00371         newHash = (_PRWaiterHash*)PR_CALLOC(
00372             sizeof(_PRWaiterHash) + (length * sizeof(PRRecvWait*)));
00373         if (NULL == newHash)
00374         {
00375             PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
00376             return _prmw_error;
00377         }
00378 
00379         newHash->length = length;
00380         retry = PR_FALSE;
00381         for (desc = &oldHash->recv_wait;
00382             newHash->count < oldHash->count; ++desc)
00383         {
00384             PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length);
00385             if (NULL != *desc)
00386             {
00387                 hrv = MW_AddHashInternal(*desc, newHash);
00388                 PR_ASSERT(_prmw_error != hrv);
00389                 if (_prmw_success != hrv)
00390                 {
00391                     PR_DELETE(newHash);
00392                     retry = PR_TRUE;
00393                     break;
00394                 }
00395             }
00396         }
00397         if (retry) continue;
00398 
00399         PR_DELETE(group->waiter);
00400         group->waiter = newHash;
00401         group->p_timestamp += 1;
00402         return _prmw_success;
00403     }
00404 
00405     PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
00406     return _prmw_error;  /* we're hosed */
00407 }  /* MW_ExpandHashInternal */
00408 
00409 #ifndef WINNT
00410 static void _MW_DoneInternal(
00411     PRWaitGroup *group, PRRecvWait **waiter, PRMWStatus outcome)
00412 {
00413     /*
00414     ** Add this receive wait object to the list of finished I/O
00415     ** operations for this particular group. If there are other
00416     ** threads waiting on the group, notify one. If not, arrange
00417     ** for this thread to return.
00418     */
00419 
00420 #if 0
00421     printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd);
00422 #endif
00423     (*waiter)->outcome = outcome;
00424     PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready);
00425     PR_NotifyCondVar(group->io_complete);
00426     PR_ASSERT(0 != group->waiter->count);
00427     group->waiter->count -= 1;
00428     *waiter = NULL;
00429 }  /* _MW_DoneInternal */
00430 #endif /* WINNT */
00431 
00432 static PRRecvWait **_MW_LookupInternal(PRWaitGroup *group, PRFileDesc *fd)
00433 {
00434     /*
00435     ** Find the receive wait object corresponding to the file descriptor.
00436     ** Only search the wait group specified.
00437     */
00438     PRRecvWait **desc;
00439     PRIntn rehash = _MW_REHASH_MAX;
00440     _PRWaiterHash *hash = group->waiter;
00441     PRUintn hidx = _MW_HASH(fd, hash->length);
00442     PRUintn hoffset = 0;
00443     
00444     while (rehash-- > 0)
00445     {
00446         desc = (&hash->recv_wait) + hidx;
00447         if ((*desc != NULL) && ((*desc)->fd == fd)) return desc;
00448         if (0 == hoffset)
00449         {
00450             hoffset = _MW_HASH2(fd, hash->length);
00451             PR_ASSERT(0 != hoffset);
00452         }
00453         hidx = (hidx + hoffset) % (hash->length);
00454     }
00455     return NULL;
00456 }  /* _MW_LookupInternal */
00457 
00458 #ifndef WINNT
00459 static PRStatus _MW_PollInternal(PRWaitGroup *group)
00460 {
00461     PRRecvWait **waiter;
00462     PRStatus rv = PR_FAILURE;
00463     PRInt32 count, count_ready;
00464     PRIntervalTime polling_interval;
00465 
00466     group->poller = PR_GetCurrentThread();
00467 
00468     while (PR_TRUE)
00469     {
00470         PRIntervalTime now, since_last_poll;
00471         PRPollDesc *poll_list;
00472 
00473         while (0 == group->waiter->count)
00474         {
00475             PRStatus st;
00476             st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT);
00477             if (_prmw_running != group->state)
00478             {
00479                 PR_SetError(PR_INVALID_STATE_ERROR, 0);
00480                 goto aborted;
00481             }
00482             if (_MW_ABORTED(st)) goto aborted;
00483         }
00484 
00485         /*
00486         ** There's something to do. See if our existing polling list
00487         ** is large enough for what we have to do?
00488         */
00489 
00490         while (group->polling_count < group->waiter->count)
00491         {
00492             PRUint32 old_count = group->waiter->count;
00493             PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE);
00494             PRSize new_size = sizeof(PRPollDesc) * new_count;
00495             PRPollDesc *old_polling_list = group->polling_list;
00496 
00497             PR_Unlock(group->ml);
00498             poll_list = (PRPollDesc*)PR_CALLOC(new_size);
00499             if (NULL == poll_list)
00500             {
00501                 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
00502                 PR_Lock(group->ml);
00503                 goto failed_alloc;
00504             }
00505             if (NULL != old_polling_list)
00506                 PR_DELETE(old_polling_list);
00507             PR_Lock(group->ml);
00508             if (_prmw_running != group->state)
00509             {
00510                 PR_SetError(PR_INVALID_STATE_ERROR, 0);
00511                 goto aborted;
00512             }
00513             group->polling_list = poll_list;
00514             group->polling_count = new_count;
00515         }
00516 
00517         now = PR_IntervalNow();
00518         polling_interval = max_polling_interval;
00519         since_last_poll = now - group->last_poll;
00520 
00521         waiter = &group->waiter->recv_wait;
00522         poll_list = group->polling_list;
00523         for (count = 0; count < group->waiter->count; ++waiter)
00524         {
00525             PR_ASSERT(waiter < &group->waiter->recv_wait
00526                 + group->waiter->length);
00527             if (NULL != *waiter)  /* a live one! */
00528             {
00529                 if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
00530                 && (since_last_poll >= (*waiter)->timeout))
00531                     _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT);
00532                 else
00533                 {
00534                     if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
00535                     {
00536                         (*waiter)->timeout -= since_last_poll;
00537                         if ((*waiter)->timeout < polling_interval)
00538                             polling_interval = (*waiter)->timeout;
00539                     }
00540                     PR_ASSERT(poll_list < group->polling_list
00541                         + group->polling_count);
00542                     poll_list->fd = (*waiter)->fd;
00543                     poll_list->in_flags = PR_POLL_READ;
00544                     poll_list->out_flags = 0;
00545 #if 0
00546                     printf(
00547                         "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n",
00548                         poll_list, count, poll_list->fd, (*waiter)->timeout);
00549 #endif
00550                     poll_list += 1;
00551                     count += 1;
00552                 }
00553             }
00554         } 
00555 
00556         PR_ASSERT(count == group->waiter->count);
00557 
00558         /*
00559         ** If there are no more threads waiting for completion,
00560         ** we need to return.
00561         */
00562         if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
00563         && (1 == group->waiting_threads)) break;
00564 
00565         if (0 == count) continue;  /* wait for new business */
00566 
00567         group->last_poll = now;
00568 
00569         PR_Unlock(group->ml);
00570 
00571         count_ready = PR_Poll(group->polling_list, count, polling_interval);
00572 
00573         PR_Lock(group->ml);
00574 
00575         if (_prmw_running != group->state)
00576         {
00577             PR_SetError(PR_INVALID_STATE_ERROR, 0);
00578             goto aborted;
00579         }
00580         if (-1 == count_ready)
00581         {
00582             goto failed_poll;  /* that's a shame */
00583         }
00584         else if (0 < count_ready)
00585         {
00586             for (poll_list = group->polling_list; count > 0;
00587             poll_list++, count--)
00588             {
00589                 PR_ASSERT(
00590                     poll_list < group->polling_list + group->polling_count);
00591                 if (poll_list->out_flags != 0)
00592                 {
00593                     waiter = _MW_LookupInternal(group, poll_list->fd);
00594                     /*
00595                     ** If 'waiter' is NULL, that means the wait receive
00596                     ** descriptor has been canceled.
00597                     */
00598                     if (NULL != waiter)
00599                         _MW_DoneInternal(group, waiter, PR_MW_SUCCESS);
00600                 }
00601             }
00602         }
00603         /*
00604         ** If there are no more threads waiting for completion,
00605         ** we need to return.
00606         ** This thread was "borrowed" to do the polling, but it really
00607         ** belongs to the client.
00608         */
00609         if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
00610         && (1 == group->waiting_threads)) break;
00611     }
00612 
00613     rv = PR_SUCCESS;
00614 
00615 aborted:
00616 failed_poll:
00617 failed_alloc:
00618     group->poller = NULL;  /* we were that, not we ain't */
00619     if ((_prmw_running == group->state) && (group->waiting_threads > 1))
00620     {
00621         /* Wake up one thread to become the new poller. */
00622         PR_NotifyCondVar(group->io_complete);
00623     }
00624     return rv;  /* we return with the lock held */
00625 }  /* _MW_PollInternal */
00626 #endif /* !WINNT */
00627 
00628 static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup *group)
00629 {
00630     PRMWGroupState rv = group->state;
00631     /*
00632     ** Looking at the group's fields is safe because
00633     ** once the group's state is no longer running, it
00634     ** cannot revert and there is a safe check on entry
00635     ** to make sure no more threads are made to wait.
00636     */
00637     if ((_prmw_stopping == rv)
00638     && (0 == group->waiting_threads))
00639     {
00640         rv = group->state = _prmw_stopped;
00641         PR_NotifyCondVar(group->mw_manage);
00642     }
00643     return rv;
00644 }  /* MW_TestForShutdownInternal */
00645 
00646 #ifndef WINNT
00647 static void _MW_InitialRecv(PRCList *io_ready)
00648 {
00649     PRRecvWait *desc = (PRRecvWait*)io_ready;
00650     if ((NULL == desc->buffer.start)
00651     || (0 == desc->buffer.length))
00652         desc->bytesRecv = 0;
00653     else
00654     {
00655         desc->bytesRecv = (desc->fd->methods->recv)(
00656             desc->fd, desc->buffer.start,
00657             desc->buffer.length, 0, desc->timeout);
00658         if (desc->bytesRecv < 0)  /* SetError should already be there */
00659             desc->outcome = PR_MW_FAILURE;
00660     }
00661 }  /* _MW_InitialRecv */
00662 #endif
00663 
00664 #ifdef WINNT
00665 static void NT_TimeProc(void *arg)
00666 {
00667     _MDOverlapped *overlapped = (_MDOverlapped *)arg;
00668     PRRecvWait *desc =  overlapped->data.mw.desc;
00669     PRFileDesc *bottom;
00670     
00671     if (InterlockedCompareExchange((PVOID *)&desc->outcome,
00672         (PVOID)PR_MW_TIMEOUT, (PVOID)PR_MW_PENDING) != (PVOID)PR_MW_PENDING)
00673     {
00674         /* This wait recv descriptor has already completed. */
00675         return;
00676     }
00677 
00678     /* close the osfd to abort the outstanding async io request */
00679     /* $$$$
00680     ** Little late to be checking if NSPR's on the bottom of stack,
00681     ** but if we don't check, we can't assert that the private data
00682     ** is what we think it is.
00683     ** $$$$
00684     */
00685     bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
00686     PR_ASSERT(NULL != bottom);
00687     if (NULL != bottom)  /* now what!?!?! */
00688     {
00689         bottom->secret->state = _PR_FILEDESC_CLOSED;
00690         if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
00691         {
00692             fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
00693             PR_ASSERT(!"What shall I do?");
00694         }
00695     }
00696     return;
00697 }  /* NT_TimeProc */
00698 
00699 static PRStatus NT_HashRemove(PRWaitGroup *group, PRFileDesc *fd)
00700 {
00701     PRRecvWait **waiter;
00702 
00703     _PR_MD_LOCK(&group->mdlock);
00704     waiter = _MW_LookupInternal(group, fd);
00705     if (NULL != waiter)
00706     {
00707         group->waiter->count -= 1;
00708         *waiter = NULL;
00709     }
00710     _PR_MD_UNLOCK(&group->mdlock);
00711     return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
00712 }
00713 
00714 PRStatus NT_HashRemoveInternal(PRWaitGroup *group, PRFileDesc *fd)
00715 {
00716     PRRecvWait **waiter;
00717 
00718     waiter = _MW_LookupInternal(group, fd);
00719     if (NULL != waiter)
00720     {
00721         group->waiter->count -= 1;
00722         *waiter = NULL;
00723     }
00724     return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
00725 }
00726 #endif /* WINNT */
00727 
00728 /******************************************************************/
00729 /******************************************************************/
00730 /********************** The public API portion ********************/
00731 /******************************************************************/
00732 /******************************************************************/
00733 PR_IMPLEMENT(PRStatus) PR_AddWaitFileDesc(
00734     PRWaitGroup *group, PRRecvWait *desc)
00735 {
00736     _PR_HashStory hrv;
00737     PRStatus rv = PR_FAILURE;
00738 #ifdef WINNT
00739     _MDOverlapped *overlapped;
00740     HANDLE hFile;
00741     BOOL bResult;
00742     DWORD dwError;
00743     PRFileDesc *bottom;
00744 #endif
00745 
00746     if (!_pr_initialized) _PR_ImplicitInitialization();
00747     if ((NULL == group) && (NULL == (group = MW_Init2())))
00748     {
00749         return rv;
00750     }
00751 
00752     PR_ASSERT(NULL != desc->fd);
00753 
00754     desc->outcome = PR_MW_PENDING;  /* nice, well known value */
00755     desc->bytesRecv = 0;  /* likewise, though this value is ambiguious */
00756 
00757     PR_Lock(group->ml);
00758 
00759     if (_prmw_running != group->state)
00760     {
00761         /* Not allowed to add after cancelling the group */
00762         desc->outcome = PR_MW_INTERRUPT;
00763         PR_SetError(PR_INVALID_STATE_ERROR, 0);
00764         PR_Unlock(group->ml);
00765         return rv;
00766     }
00767 
00768 #ifdef WINNT
00769     _PR_MD_LOCK(&group->mdlock);
00770 #endif
00771 
00772     /*
00773     ** If the waiter count is zero at this point, there's no telling
00774     ** how long we've been idle. Therefore, initialize the beginning
00775     ** of the timing interval. As long as the list doesn't go empty,
00776     ** it will maintain itself.
00777     */
00778     if (0 == group->waiter->count)
00779         group->last_poll = PR_IntervalNow();
00780 
00781     do
00782     {
00783         hrv = MW_AddHashInternal(desc, group->waiter);
00784         if (_prmw_rehash != hrv) break;
00785         hrv = MW_ExpandHashInternal(group);  /* gruesome */
00786         if (_prmw_success != hrv) break;
00787     } while (PR_TRUE);
00788 
00789 #ifdef WINNT
00790     _PR_MD_UNLOCK(&group->mdlock);
00791 #endif
00792 
00793     PR_NotifyCondVar(group->new_business);  /* tell the world */
00794     rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE;
00795     PR_Unlock(group->ml);
00796 
00797 #ifdef WINNT
00798     overlapped = PR_NEWZAP(_MDOverlapped);
00799     if (NULL == overlapped)
00800     {
00801         PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
00802         NT_HashRemove(group, desc->fd);
00803         return rv;
00804     }
00805     overlapped->ioModel = _MD_MultiWaitIO;
00806     overlapped->data.mw.desc = desc;
00807     overlapped->data.mw.group = group;
00808     if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
00809     {
00810         overlapped->data.mw.timer = CreateTimer(
00811             desc->timeout,
00812             NT_TimeProc,
00813             overlapped);
00814         if (0 == overlapped->data.mw.timer)
00815         {
00816             NT_HashRemove(group, desc->fd);
00817             PR_DELETE(overlapped);
00818             /*
00819              * XXX It appears that a maximum of 16 timer events can
00820              * be outstanding. GetLastError() returns 0 when I try it.
00821              */
00822             PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError());
00823             return PR_FAILURE;
00824         }
00825     }
00826 
00827     /* Reach to the bottom layer to get the OS fd */
00828     bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
00829     PR_ASSERT(NULL != bottom);
00830     if (NULL == bottom)
00831     {
00832         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
00833         return PR_FAILURE;
00834     }
00835     hFile = (HANDLE)bottom->secret->md.osfd; 
00836     if (!bottom->secret->md.io_model_committed)
00837     {
00838         PRInt32 st;
00839         st = _md_Associate(hFile);
00840         PR_ASSERT(0 != st);
00841         bottom->secret->md.io_model_committed = PR_TRUE;
00842     }
00843     bResult = ReadFile(hFile,
00844         desc->buffer.start,
00845         (DWORD)desc->buffer.length,
00846         NULL,
00847         &overlapped->overlapped);
00848     if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING)
00849     {
00850         if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
00851         {
00852             if (InterlockedCompareExchange((PVOID *)&desc->outcome,
00853                 (PVOID)PR_MW_FAILURE, (PVOID)PR_MW_PENDING)
00854                 == (PVOID)PR_MW_PENDING)
00855             {
00856                 CancelTimer(overlapped->data.mw.timer);
00857             }
00858             NT_HashRemove(group, desc->fd);
00859             PR_DELETE(overlapped);
00860         }
00861         _PR_MD_MAP_READ_ERROR(dwError);
00862         rv = PR_FAILURE;
00863     }
00864 #endif
00865 
00866     return rv;
00867 }  /* PR_AddWaitFileDesc */
00868 
00869 PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup *group)
00870 {
00871     PRCList *io_ready = NULL;
00872 #ifdef WINNT
00873     PRThread *me = _PR_MD_CURRENT_THREAD();
00874     _MDOverlapped *overlapped;    
00875 #endif
00876 
00877     if (!_pr_initialized) _PR_ImplicitInitialization();
00878     if ((NULL == group) && (NULL == (group = MW_Init2()))) goto failed_init;
00879 
00880     PR_Lock(group->ml);
00881 
00882     if (_prmw_running != group->state)
00883     {
00884         PR_SetError(PR_INVALID_STATE_ERROR, 0);
00885         goto invalid_state;
00886     }
00887 
00888     group->waiting_threads += 1;  /* the polling thread is counted */
00889 
00890 #ifdef WINNT
00891     _PR_MD_LOCK(&group->mdlock);
00892     while (PR_CLIST_IS_EMPTY(&group->io_ready))
00893     {
00894         _PR_THREAD_LOCK(me);
00895         me->state = _PR_IO_WAIT;
00896         PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
00897         if (!_PR_IS_NATIVE_THREAD(me))
00898         {
00899             _PR_SLEEPQ_LOCK(me->cpu);
00900             _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
00901             _PR_SLEEPQ_UNLOCK(me->cpu);
00902         }
00903         _PR_THREAD_UNLOCK(me);
00904         _PR_MD_UNLOCK(&group->mdlock);
00905         PR_Unlock(group->ml);
00906         _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
00907         me->state = _PR_RUNNING;
00908         PR_Lock(group->ml);
00909         _PR_MD_LOCK(&group->mdlock);
00910         if (_PR_PENDING_INTERRUPT(me)) {
00911             PR_REMOVE_LINK(&me->waitQLinks);
00912             _PR_MD_UNLOCK(&group->mdlock);
00913             me->flags &= ~_PR_INTERRUPT;
00914             me->io_suspended = PR_FALSE;
00915             PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
00916             goto aborted;
00917         }
00918     }
00919     io_ready = PR_LIST_HEAD(&group->io_ready);
00920     PR_ASSERT(io_ready != NULL);
00921     PR_REMOVE_LINK(io_ready);
00922     _PR_MD_UNLOCK(&group->mdlock);
00923     overlapped = (_MDOverlapped *)
00924         ((char *)io_ready - offsetof(_MDOverlapped, data));
00925     io_ready = &overlapped->data.mw.desc->internal;
00926 #else
00927     do
00928     {
00929         /*
00930         ** If the I/O ready list isn't empty, have this thread
00931         ** return with the first receive wait object that's available.
00932         */
00933         if (PR_CLIST_IS_EMPTY(&group->io_ready))
00934         {
00935             /*
00936             ** Is there a polling thread yet? If not, grab this thread
00937             ** and use it.
00938             */
00939             if (NULL == group->poller)
00940             {
00941                 /*
00942                 ** This thread will stay do polling until it becomes the only one
00943                 ** left to service a completion. Then it will return and there will
00944                 ** be none left to actually poll or to run completions.
00945                 **
00946                 ** The polling function should only return w/ failure or
00947                 ** with some I/O ready.
00948                 */
00949                 if (PR_FAILURE == _MW_PollInternal(group)) goto failed_poll;
00950             }
00951             else
00952             {
00953                 /*
00954                 ** There are four reasons a thread can be awakened from
00955                 ** a wait on the io_complete condition variable.
00956                 ** 1. Some I/O has completed, i.e., the io_ready list
00957                 **    is nonempty.
00958                 ** 2. The wait group is canceled.
00959                 ** 3. The thread is interrupted.
00960                 ** 4. The current polling thread has to leave and needs
00961                 **    a replacement.
00962                 ** The logic to find a new polling thread is made more
00963                 ** complicated by all the other possible events.
00964                 ** I tried my best to write the logic clearly, but
00965                 ** it is still full of if's with continue and goto.
00966                 */
00967                 PRStatus st;
00968                 do 
00969                 {
00970                     st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT);
00971                     if (_prmw_running != group->state)
00972                     {
00973                         PR_SetError(PR_INVALID_STATE_ERROR, 0);
00974                         goto aborted;
00975                     }
00976                     if (_MW_ABORTED(st) || (NULL == group->poller)) break;
00977                 } while (PR_CLIST_IS_EMPTY(&group->io_ready));
00978 
00979                 /*
00980                 ** The thread is interrupted and has to leave.  It might
00981                 ** have also been awakened to process ready i/o or be the
00982                 ** new poller.  To be safe, if either condition is true,
00983                 ** we awaken another thread to take its place.
00984                 */
00985                 if (_MW_ABORTED(st))
00986                 {
00987                     if ((NULL == group->poller
00988                     || !PR_CLIST_IS_EMPTY(&group->io_ready))
00989                     && group->waiting_threads > 1)
00990                         PR_NotifyCondVar(group->io_complete);
00991                     goto aborted;
00992                 }
00993 
00994                 /*
00995                 ** A new poller is needed, but can I be the new poller?
00996                 ** If there is no i/o ready, sure.  But if there is any
00997                 ** i/o ready, it has a higher priority.  I want to
00998                 ** process the ready i/o first and wake up another
00999                 ** thread to be the new poller.
01000                 */ 
01001                 if (NULL == group->poller)
01002                 {
01003                     if (PR_CLIST_IS_EMPTY(&group->io_ready))
01004                         continue;
01005                     if (group->waiting_threads > 1)
01006                         PR_NotifyCondVar(group->io_complete);
01007                 }
01008             }
01009             PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready));
01010         }
01011         io_ready = PR_LIST_HEAD(&group->io_ready);
01012         PR_NotifyCondVar(group->io_taken);
01013         PR_ASSERT(io_ready != NULL);
01014         PR_REMOVE_LINK(io_ready);
01015     } while (NULL == io_ready);
01016 
01017 failed_poll:
01018 
01019 #endif
01020 
01021 aborted:
01022 
01023     group->waiting_threads -= 1;
01024 invalid_state:
01025     (void)MW_TestForShutdownInternal(group);
01026     PR_Unlock(group->ml);
01027 
01028 failed_init:
01029     if (NULL != io_ready)
01030     {
01031         /* If the operation failed, record the reason why */
01032         switch (((PRRecvWait*)io_ready)->outcome)
01033         {
01034             case PR_MW_PENDING:
01035                 PR_ASSERT(0);
01036                 break;
01037             case PR_MW_SUCCESS:
01038 #ifndef WINNT
01039                 _MW_InitialRecv(io_ready);
01040 #endif
01041                 break;
01042 #ifdef WINNT
01043             case PR_MW_FAILURE:
01044                 _PR_MD_MAP_READ_ERROR(overlapped->data.mw.error);
01045                 break;
01046 #endif
01047             case PR_MW_TIMEOUT:
01048                 PR_SetError(PR_IO_TIMEOUT_ERROR, 0);
01049                 break;
01050             case PR_MW_INTERRUPT:
01051                 PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
01052                 break;
01053             default: break;
01054         }
01055 #ifdef WINNT
01056         if (NULL != overlapped->data.mw.timer)
01057         {
01058             PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
01059                 != overlapped->data.mw.desc->timeout);
01060             CancelTimer(overlapped->data.mw.timer);
01061         }
01062         else
01063         {
01064             PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
01065                 == overlapped->data.mw.desc->timeout);
01066         }
01067         PR_DELETE(overlapped);
01068 #endif
01069     }
01070     return (PRRecvWait*)io_ready;
01071 }  /* PR_WaitRecvReady */
01072 
01073 PR_IMPLEMENT(PRStatus) PR_CancelWaitFileDesc(PRWaitGroup *group, PRRecvWait *desc)
01074 {
01075 #if !defined(WINNT)
01076     PRRecvWait **recv_wait;
01077 #endif
01078     PRStatus rv = PR_SUCCESS;
01079     if (NULL == group) group = mw_state->group;
01080     PR_ASSERT(NULL != group);
01081     if (NULL == group)
01082     {
01083         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
01084         return PR_FAILURE;
01085     }
01086 
01087     PR_Lock(group->ml);
01088 
01089     if (_prmw_running != group->state)
01090     {
01091         PR_SetError(PR_INVALID_STATE_ERROR, 0);
01092         rv = PR_FAILURE;
01093         goto unlock;
01094     }
01095 
01096 #ifdef WINNT
01097     if (InterlockedCompareExchange((PVOID *)&desc->outcome,
01098         (PVOID)PR_MW_INTERRUPT, (PVOID)PR_MW_PENDING) == (PVOID)PR_MW_PENDING)
01099     {
01100         PRFileDesc *bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
01101         PR_ASSERT(NULL != bottom);
01102         if (NULL == bottom)
01103         {
01104             PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
01105             goto unlock;
01106         }
01107         bottom->secret->state = _PR_FILEDESC_CLOSED;
01108 #if 0
01109         fprintf(stderr, "cancel wait recv: closing socket\n");
01110 #endif
01111         if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
01112         {
01113             fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
01114             exit(1);
01115         }
01116     }
01117 #else
01118     if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd)))
01119     {
01120         /* it was in the wait table */
01121         _MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT);
01122         goto unlock;
01123     }
01124     if (!PR_CLIST_IS_EMPTY(&group->io_ready))
01125     {
01126         /* is it already complete? */
01127         PRCList *head = PR_LIST_HEAD(&group->io_ready);
01128         do
01129         {
01130             PRRecvWait *done = (PRRecvWait*)head;
01131             if (done == desc) goto unlock;
01132             head = PR_NEXT_LINK(head);
01133         } while (head != &group->io_ready);
01134     }
01135     PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
01136     rv = PR_FAILURE;
01137 
01138 #endif
01139 unlock:
01140     PR_Unlock(group->ml);
01141     return rv;
01142 }  /* PR_CancelWaitFileDesc */
01143 
01144 PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup *group)
01145 {
01146     PRRecvWait **desc;
01147     PRRecvWait *recv_wait = NULL;
01148 #ifdef WINNT
01149     _MDOverlapped *overlapped;
01150     PRRecvWait **end;
01151     PRThread *me = _PR_MD_CURRENT_THREAD();
01152 #endif
01153 
01154     if (NULL == group) group = mw_state->group;
01155     PR_ASSERT(NULL != group);
01156     if (NULL == group)
01157     {
01158         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
01159         return NULL;
01160     }
01161 
01162     PR_Lock(group->ml);
01163     if (_prmw_stopped != group->state)
01164     {
01165         if (_prmw_running == group->state)
01166             group->state = _prmw_stopping;  /* so nothing new comes in */
01167         if (0 == group->waiting_threads)  /* is there anybody else? */
01168             group->state = _prmw_stopped;  /* we can stop right now */
01169         else
01170         {
01171             PR_NotifyAllCondVar(group->new_business);
01172             PR_NotifyAllCondVar(group->io_complete);
01173         }
01174         while (_prmw_stopped != group->state)
01175             (void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT);
01176     }
01177 
01178 #ifdef WINNT
01179     _PR_MD_LOCK(&group->mdlock);
01180 #endif
01181     /* make all the existing descriptors look done/interrupted */
01182 #ifdef WINNT
01183     end = &group->waiter->recv_wait + group->waiter->length;
01184     for (desc = &group->waiter->recv_wait; desc < end; ++desc)
01185     {
01186         if (NULL != *desc)
01187         {
01188             if (InterlockedCompareExchange((PVOID *)&(*desc)->outcome,
01189                 (PVOID)PR_MW_INTERRUPT, (PVOID)PR_MW_PENDING)
01190                 == (PVOID)PR_MW_PENDING)
01191             {
01192                 PRFileDesc *bottom = PR_GetIdentitiesLayer(
01193                     (*desc)->fd, PR_NSPR_IO_LAYER);
01194                 PR_ASSERT(NULL != bottom);
01195                 if (NULL == bottom)
01196                 {
01197                     PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
01198                     goto invalid_arg;
01199                 }
01200                 bottom->secret->state = _PR_FILEDESC_CLOSED;
01201 #if 0
01202                 fprintf(stderr, "cancel wait group: closing socket\n");
01203 #endif
01204                 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
01205                 {
01206                     fprintf(stderr, "closesocket failed: %d\n",
01207                         WSAGetLastError());
01208                     exit(1);
01209                 }
01210             }
01211         }
01212     }
01213     while (group->waiter->count > 0)
01214     {
01215         _PR_THREAD_LOCK(me);
01216         me->state = _PR_IO_WAIT;
01217         PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
01218         if (!_PR_IS_NATIVE_THREAD(me))
01219         {
01220             _PR_SLEEPQ_LOCK(me->cpu);
01221             _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
01222             _PR_SLEEPQ_UNLOCK(me->cpu);
01223         }
01224         _PR_THREAD_UNLOCK(me);
01225         _PR_MD_UNLOCK(&group->mdlock);
01226         PR_Unlock(group->ml);
01227         _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
01228         me->state = _PR_RUNNING;
01229         PR_Lock(group->ml);
01230         _PR_MD_LOCK(&group->mdlock);
01231     }
01232 #else
01233     for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc)
01234     {
01235         PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length);
01236         if (NULL != *desc)
01237             _MW_DoneInternal(group, desc, PR_MW_INTERRUPT);
01238     }
01239 #endif
01240 
01241     /* take first element of finished list and return it or NULL */
01242     if (PR_CLIST_IS_EMPTY(&group->io_ready))
01243         PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
01244     else
01245     {
01246         PRCList *head = PR_LIST_HEAD(&group->io_ready);
01247         PR_REMOVE_AND_INIT_LINK(head);
01248 #ifdef WINNT
01249         overlapped = (_MDOverlapped *)
01250             ((char *)head - offsetof(_MDOverlapped, data));
01251         head = &overlapped->data.mw.desc->internal;
01252         if (NULL != overlapped->data.mw.timer)
01253         {
01254             PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
01255                 != overlapped->data.mw.desc->timeout);
01256             CancelTimer(overlapped->data.mw.timer);
01257         }
01258         else
01259         {
01260             PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
01261                 == overlapped->data.mw.desc->timeout);
01262         }
01263         PR_DELETE(overlapped);
01264 #endif
01265         recv_wait = (PRRecvWait*)head;
01266     }
01267 #ifdef WINNT
01268 invalid_arg:
01269     _PR_MD_UNLOCK(&group->mdlock);
01270 #endif
01271     PR_Unlock(group->ml);
01272 
01273     return recv_wait;
01274 }  /* PR_CancelWaitGroup */
01275 
01276 PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */)
01277 {
01278 #ifdef XP_MAC
01279 #pragma unused (size)
01280 #endif
01281     PRWaitGroup *wg;
01282 
01283     if (NULL == (wg = PR_NEWZAP(PRWaitGroup)))
01284     {
01285         PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
01286         goto failed;
01287     }
01288     /* the wait group itself */
01289     wg->ml = PR_NewLock();
01290     if (NULL == wg->ml) goto failed_lock;
01291     wg->io_taken = PR_NewCondVar(wg->ml);
01292     if (NULL == wg->io_taken) goto failed_cvar0;
01293     wg->io_complete = PR_NewCondVar(wg->ml);
01294     if (NULL == wg->io_complete) goto failed_cvar1;
01295     wg->new_business = PR_NewCondVar(wg->ml);
01296     if (NULL == wg->new_business) goto failed_cvar2;
01297     wg->mw_manage = PR_NewCondVar(wg->ml);
01298     if (NULL == wg->mw_manage) goto failed_cvar3;
01299 
01300     PR_INIT_CLIST(&wg->group_link);
01301     PR_INIT_CLIST(&wg->io_ready);
01302 
01303     /* the waiters sequence */
01304     wg->waiter = (_PRWaiterHash*)PR_CALLOC(
01305         sizeof(_PRWaiterHash) +
01306         (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*)));
01307     if (NULL == wg->waiter)
01308     {
01309         PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
01310         goto failed_waiter;
01311     }
01312     wg->waiter->count = 0;
01313     wg->waiter->length = _PR_DEFAULT_HASH_LENGTH;
01314 
01315 #ifdef WINNT
01316     _PR_MD_NEW_LOCK(&wg->mdlock);
01317     PR_INIT_CLIST(&wg->wait_list);
01318 #endif /* WINNT */
01319 
01320     PR_Lock(mw_lock);
01321     PR_APPEND_LINK(&wg->group_link, &mw_state->group_list);
01322     PR_Unlock(mw_lock);
01323     return wg;
01324 
01325 failed_waiter:
01326     PR_DestroyCondVar(wg->mw_manage);
01327 failed_cvar3:
01328     PR_DestroyCondVar(wg->new_business);
01329 failed_cvar2:
01330     PR_DestroyCondVar(wg->io_complete);
01331 failed_cvar1:
01332     PR_DestroyCondVar(wg->io_taken);
01333 failed_cvar0:
01334     PR_DestroyLock(wg->ml);
01335 failed_lock:
01336     PR_DELETE(wg);
01337     wg = NULL;
01338 
01339 failed:
01340     return wg;
01341 }  /* MW_CreateWaitGroup */
01342 
01343 PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup *group)
01344 {
01345     PRStatus rv = PR_SUCCESS;
01346     if (NULL == group) group = mw_state->group;
01347     PR_ASSERT(NULL != group);
01348     if (NULL != group)
01349     {
01350         PR_Lock(group->ml);
01351         if ((group->waiting_threads == 0)
01352         && (group->waiter->count == 0)
01353         && PR_CLIST_IS_EMPTY(&group->io_ready))
01354         {
01355             group->state = _prmw_stopped;
01356         }
01357         else
01358         {
01359             PR_SetError(PR_INVALID_STATE_ERROR, 0);
01360             rv = PR_FAILURE;
01361         }
01362         PR_Unlock(group->ml);
01363         if (PR_FAILURE == rv) return rv;
01364 
01365         PR_Lock(mw_lock);
01366         PR_REMOVE_LINK(&group->group_link);
01367         PR_Unlock(mw_lock);
01368 
01369 #ifdef WINNT
01370         /*
01371          * XXX make sure wait_list is empty and waiter is empty.
01372          * These must be checked while holding mdlock.
01373          */
01374         _PR_MD_FREE_LOCK(&group->mdlock);
01375 #endif
01376 
01377         PR_DELETE(group->waiter);
01378         PR_DELETE(group->polling_list);
01379         PR_DestroyCondVar(group->mw_manage);
01380         PR_DestroyCondVar(group->new_business);
01381         PR_DestroyCondVar(group->io_complete);
01382         PR_DestroyCondVar(group->io_taken);
01383         PR_DestroyLock(group->ml);
01384         if (group == mw_state->group) mw_state->group = NULL;
01385         PR_DELETE(group);
01386     }
01387     else
01388     {
01389         /* The default wait group is not created yet. */
01390         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
01391         rv = PR_FAILURE;
01392     }
01393     return rv;
01394 }  /* PR_DestroyWaitGroup */
01395 
01396 /**********************************************************************
01397 ***********************************************************************
01398 ******************** Wait group enumerations **************************
01399 ***********************************************************************
01400 **********************************************************************/
01401 
01402 PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup *group)
01403 {
01404     PRMWaitEnumerator *enumerator = PR_NEWZAP(PRMWaitEnumerator);
01405     if (NULL == enumerator) PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
01406     else
01407     {
01408         enumerator->group = group;
01409         enumerator->seal = _PR_ENUM_SEALED;
01410     }
01411     return enumerator;
01412 }  /* PR_CreateMWaitEnumerator */
01413 
01414 PR_IMPLEMENT(PRStatus) PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator)
01415 {
01416     PR_ASSERT(NULL != enumerator);
01417     PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
01418     if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal))
01419     {
01420         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
01421         return PR_FAILURE;
01422     }
01423     enumerator->seal = _PR_ENUM_UNSEALED;
01424     PR_Free(enumerator);
01425     return PR_SUCCESS;
01426 }  /* PR_DestroyMWaitEnumerator */
01427 
01428 PR_IMPLEMENT(PRRecvWait*) PR_EnumerateWaitGroup(
01429     PRMWaitEnumerator *enumerator, const PRRecvWait *previous)
01430 {
01431     PRRecvWait *result = NULL;
01432     
01433     /* entry point sanity checking */
01434     PR_ASSERT(NULL != enumerator);
01435     PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
01436     if ((NULL == enumerator)
01437     || (_PR_ENUM_SEALED != enumerator->seal)) goto bad_argument;
01438 
01439     /* beginning of enumeration */
01440     if (NULL == previous)
01441     {
01442         if (NULL == enumerator->group)
01443         {
01444             enumerator->group = mw_state->group;
01445             if (NULL == enumerator->group)
01446             {
01447                 PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
01448                 return NULL;
01449             }
01450         }
01451         enumerator->waiter = &enumerator->group->waiter->recv_wait;
01452         enumerator->p_timestamp = enumerator->group->p_timestamp;
01453         enumerator->thread = PR_GetCurrentThread();
01454         enumerator->index = 0;
01455     }
01456     /* continuing an enumeration */
01457     else
01458     {
01459         PRThread *me = PR_GetCurrentThread();
01460         PR_ASSERT(me == enumerator->thread);
01461         if (me != enumerator->thread) goto bad_argument;
01462 
01463         /* need to restart the enumeration */
01464         if (enumerator->p_timestamp != enumerator->group->p_timestamp)
01465             return PR_EnumerateWaitGroup(enumerator, NULL);
01466     }
01467 
01468     /* actually progress the enumeration */
01469 #if defined(WINNT)
01470     _PR_MD_LOCK(&enumerator->group->mdlock);
01471 #else
01472     PR_Lock(enumerator->group->ml);
01473 #endif
01474     while (enumerator->index++ < enumerator->group->waiter->length)
01475     {
01476         if (NULL != (result = *(enumerator->waiter)++)) break;
01477     }
01478 #if defined(WINNT)
01479     _PR_MD_UNLOCK(&enumerator->group->mdlock);
01480 #else
01481     PR_Unlock(enumerator->group->ml);
01482 #endif
01483 
01484     return result;  /* what we live for */
01485 
01486 bad_argument:
01487     PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
01488     return NULL;  /* probably ambiguous */
01489 }  /* PR_EnumerateWaitGroup */
01490 
01491 /* prmwait.c */