Back to index

lightning-sunbird  0.9+nobinonly
multiwait.c
Go to the documentation of this file.
00001 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
00002 /* ***** BEGIN LICENSE BLOCK *****
00003  * Version: MPL 1.1/GPL 2.0/LGPL 2.1
00004  *
00005  * The contents of this file are subject to the Mozilla Public License Version
00006  * 1.1 (the "License"); you may not use this file except in compliance with
00007  * the License. You may obtain a copy of the License at
00008  * http://www.mozilla.org/MPL/
00009  *
00010  * Software distributed under the License is distributed on an "AS IS" basis,
00011  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
00012  * for the specific language governing rights and limitations under the
00013  * License.
00014  *
00015  * The Original Code is the Netscape Portable Runtime (NSPR).
00016  *
00017  * The Initial Developer of the Original Code is
00018  * Netscape Communications Corporation.
00019  * Portions created by the Initial Developer are Copyright (C) 1998-2000
00020  * the Initial Developer. All Rights Reserved.
00021  *
00022  * Contributor(s):
00023  *
00024  * Alternatively, the contents of this file may be used under the terms of
00025  * either the GNU General Public License Version 2 or later (the "GPL"), or
00026  * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
00027  * in which case the provisions of the GPL or the LGPL are applicable instead
00028  * of those above. If you wish to allow use of your version of this file only
00029  * under the terms of either the GPL or the LGPL, and not to allow others to
00030  * use your version of this file under the terms of the MPL, indicate your
00031  * decision by deleting the provisions above and replace them with the notice
00032  * and other provisions required by the GPL or the LGPL. If you do not delete
00033  * the provisions above, a recipient may use your version of this file under
00034  * the terms of any one of the MPL, the GPL or the LGPL.
00035  *
00036  * ***** END LICENSE BLOCK ***** */
00037 
00038 #include "prio.h"
00039 #include "prprf.h"
00040 #include "prlog.h"
00041 #include "prmem.h"
00042 #include "pratom.h"
00043 #include "prlock.h"
00044 #include "prmwait.h"
00045 #include "prclist.h"
00046 #include "prerror.h"
00047 #include "prinrval.h"
00048 #include "prnetdb.h"
00049 #include "prthread.h"
00050 
00051 #include "plstr.h"
00052 #include "plerror.h"
00053 #include "plgetopt.h"
00054 
00055 #include <string.h>
00056 
00057 typedef struct Shared
00058 {
00059     const char *title;
00060     PRLock *list_lock;
00061     PRWaitGroup *group;
00062     PRIntervalTime timeout;
00063 } Shared;
00064 
00065 typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity;
00066 
00067 static PRFileDesc *debug = NULL;
00068 static PRInt32 desc_allocated = 0;
00069 static PRUint16 default_port = 12273;
00070 static enum Verbosity verbosity = quiet;
00071 static PRInt32 ops_required = 1000, ops_done = 0;
00072 static PRThreadScope thread_scope = PR_LOCAL_THREAD;
00073 static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50;
00074 
00075 #if defined(DEBUG)
00076 #define MW_ASSERT(_expr) \
00077     ((_expr)?((void)0):_MW_Assert(# _expr,__FILE__,__LINE__))
00078 static void _MW_Assert(const char *s, const char *file, PRIntn ln)
00079 {
00080     if (NULL != debug) PL_FPrintError(debug, NULL);
00081     PR_Assert(s, file, ln);
00082 }  /* _MW_Assert */
00083 #else
00084 #define MW_ASSERT(_expr)
00085 #endif
00086 
00087 static void PrintRecvDesc(PRRecvWait *desc, const char *msg)
00088 {
00089     const char *tag[] = {
00090         "PR_MW_INTERRUPT", "PR_MW_TIMEOUT",
00091         "PR_MW_FAILURE", "PR_MW_SUCCESS", "PR_MW_PENDING"};
00092     PR_fprintf(
00093         debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n",
00094         msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout);
00095 }  /* PrintRecvDesc */
00096 
00097 static Shared *MakeShared(const char *title)
00098 {
00099     Shared *shared = PR_NEWZAP(Shared);
00100     shared->group = PR_CreateWaitGroup(1);
00101     shared->timeout = PR_SecondsToInterval(1);
00102     shared->list_lock = PR_NewLock();
00103     shared->title = title;
00104     return shared;
00105 }  /* MakeShared */
00106 
00107 static void DestroyShared(Shared *shared)
00108 {
00109     PRStatus rv;
00110     if (verbosity > quiet)
00111         PR_fprintf(debug, "%s: destroying group\n", shared->title);
00112     rv = PR_DestroyWaitGroup(shared->group);
00113     MW_ASSERT(PR_SUCCESS == rv);
00114     PR_DestroyLock(shared->list_lock);
00115     PR_DELETE(shared);
00116 }  /* DestroyShared */
00117 
00118 static PRRecvWait *CreateRecvWait(PRFileDesc *fd, PRIntervalTime timeout)
00119 {
00120     PRRecvWait *desc_out = PR_NEWZAP(PRRecvWait);
00121     MW_ASSERT(NULL != desc_out);
00122 
00123     MW_ASSERT(NULL != fd);
00124     desc_out->fd = fd;
00125     desc_out->timeout = timeout;
00126     desc_out->buffer.length = 120;
00127     desc_out->buffer.start = PR_CALLOC(120);
00128 
00129     PR_AtomicIncrement(&desc_allocated);
00130 
00131     if (verbosity > chatty)
00132         PrintRecvDesc(desc_out, "Allocated");
00133     return desc_out;
00134 }  /* CreateRecvWait */
00135 
00136 static void DestroyRecvWait(PRRecvWait *desc_out)
00137 {
00138     if (verbosity > chatty)
00139         PrintRecvDesc(desc_out, "Destroying");
00140     PR_Close(desc_out->fd);
00141     if (NULL != desc_out->buffer.start)
00142         PR_DELETE(desc_out->buffer.start);
00143     PR_Free(desc_out);
00144     (void)PR_AtomicDecrement(&desc_allocated);
00145 }  /* DestroyRecvWait */
00146 
00147 static void CancelGroup(Shared *shared)
00148 {
00149     PRRecvWait *desc_out;
00150 
00151     if (verbosity > quiet)
00152         PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title);
00153 
00154     do
00155     {
00156         desc_out = PR_CancelWaitGroup(shared->group);
00157         if (NULL != desc_out) DestroyRecvWait(desc_out);
00158     } while (NULL != desc_out);
00159 
00160     MW_ASSERT(0 == desc_allocated);
00161     MW_ASSERT(PR_GROUP_EMPTY_ERROR == PR_GetError());
00162 }  /* CancelGroup */
00163 
00164 static void PR_CALLBACK ClientThread(void* arg)
00165 {
00166     PRStatus rv;
00167     PRInt32 bytes;
00168     PRIntn empty_flags = 0;
00169     PRNetAddr server_address;
00170     unsigned char buffer[100];
00171     Shared *shared = (Shared*)arg;
00172     PRFileDesc *server = PR_NewTCPSocket();
00173     if ((NULL == server)
00174     && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) return;
00175     MW_ASSERT(NULL != server);
00176 
00177     if (verbosity > chatty)
00178         PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server);
00179 
00180     /* Initialize the buffer so that Purify won't complain */
00181     memset(buffer, 0, sizeof(buffer));
00182 
00183     rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address);
00184     MW_ASSERT(PR_SUCCESS == rv);
00185 
00186     if (verbosity > quiet)
00187         PR_fprintf(debug, "%s: Client opening connection\n", shared->title);
00188     rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT);
00189 
00190     if (PR_FAILURE == rv)
00191     {
00192         if (verbosity > silent) PL_FPrintError(debug, "Client connect failed");
00193         return;
00194     }
00195 
00196     while (ops_done < ops_required)
00197     {
00198         bytes = PR_Send(
00199             server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
00200         if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break;
00201         MW_ASSERT(sizeof(buffer) == bytes);
00202         if (verbosity > chatty)
00203             PR_fprintf(
00204                 debug, "%s: Client sent %d bytes\n",
00205                 shared->title, sizeof(buffer));
00206         bytes = PR_Recv(
00207             server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
00208         if (verbosity > chatty)
00209             PR_fprintf(
00210                 debug, "%s: Client received %d bytes\n",
00211                 shared->title, sizeof(buffer));
00212         if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break;
00213         MW_ASSERT(sizeof(buffer) == bytes);
00214         PR_Sleep(shared->timeout);
00215     }
00216     rv = PR_Close(server);
00217     MW_ASSERT(PR_SUCCESS == rv);
00218 
00219 }  /* ClientThread */
00220 
00221 static void OneInThenCancelled(Shared *shared)
00222 {
00223     PRStatus rv;
00224     PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
00225 
00226     shared->timeout = PR_INTERVAL_NO_TIMEOUT;
00227 
00228     desc_in->fd = PR_NewTCPSocket();
00229     desc_in->timeout = shared->timeout;
00230 
00231     if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc");
00232 
00233     rv = PR_AddWaitFileDesc(shared->group, desc_in);
00234     MW_ASSERT(PR_SUCCESS == rv);
00235 
00236     if (verbosity > chatty) PrintRecvDesc(desc_in, "Cancelling");
00237     rv = PR_CancelWaitFileDesc(shared->group, desc_in);
00238     MW_ASSERT(PR_SUCCESS == rv);
00239 
00240     desc_out = PR_WaitRecvReady(shared->group);
00241     MW_ASSERT(desc_out == desc_in);
00242     MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome);
00243     MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
00244     if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
00245 
00246     rv = PR_Close(desc_in->fd);
00247     MW_ASSERT(PR_SUCCESS == rv);
00248 
00249     if (verbosity > quiet)
00250         PR_fprintf(debug, "%s: destroying group\n", shared->title);
00251 
00252     PR_DELETE(desc_in);
00253 }  /* OneInThenCancelled */
00254 
00255 static void OneOpOneThread(Shared *shared)
00256 {
00257     PRStatus rv;
00258     PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
00259 
00260     desc_in->fd = PR_NewTCPSocket();
00261     desc_in->timeout = shared->timeout;
00262 
00263     if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc");
00264 
00265     rv = PR_AddWaitFileDesc(shared->group, desc_in);
00266     MW_ASSERT(PR_SUCCESS == rv);
00267     desc_out = PR_WaitRecvReady(shared->group);
00268     MW_ASSERT(desc_out == desc_in);
00269     MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
00270     MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
00271     if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
00272 
00273     rv = PR_Close(desc_in->fd);
00274     MW_ASSERT(PR_SUCCESS == rv);
00275 
00276     PR_DELETE(desc_in);
00277 }  /* OneOpOneThread */
00278 
00279 static void ManyOpOneThread(Shared *shared)
00280 {
00281     PRStatus rv;
00282     PRIntn index;
00283     PRRecvWait *desc_in;
00284     PRRecvWait *desc_out;
00285 
00286     if (verbosity > quiet)
00287         PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects);
00288 
00289     for (index = 0; index < wait_objects; ++index)
00290     {
00291         desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
00292 
00293         rv = PR_AddWaitFileDesc(shared->group, desc_in);
00294         MW_ASSERT(PR_SUCCESS == rv);
00295     }
00296 
00297     while (ops_done < ops_required)
00298     {
00299         desc_out = PR_WaitRecvReady(shared->group);
00300         MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
00301         MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
00302         if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready/readding");
00303         rv = PR_AddWaitFileDesc(shared->group, desc_out);
00304         MW_ASSERT(PR_SUCCESS == rv);
00305         (void)PR_AtomicIncrement(&ops_done);
00306     }
00307 
00308     CancelGroup(shared);
00309 }  /* ManyOpOneThread */
00310 
00311 static void PR_CALLBACK SomeOpsThread(void *arg)
00312 {
00313     PRRecvWait *desc_out;
00314     PRStatus rv = PR_SUCCESS;
00315     Shared *shared = (Shared*)arg;
00316     do  /* until interrupted */
00317     {
00318         desc_out = PR_WaitRecvReady(shared->group);
00319         if (NULL == desc_out)
00320         {
00321             MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
00322             if (verbosity > quiet) PR_fprintf(debug, "Aborted\n");
00323             break;
00324         }
00325         MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
00326         MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
00327         if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
00328 
00329         if (verbosity > chatty) PrintRecvDesc(desc_out, "Re-Adding");
00330         desc_out->timeout = shared->timeout;
00331         rv = PR_AddWaitFileDesc(shared->group, desc_out);
00332         PR_AtomicIncrement(&ops_done);
00333         if (ops_done > ops_required) break;
00334     } while (PR_SUCCESS == rv);
00335     MW_ASSERT(PR_SUCCESS == rv);
00336 }  /* SomeOpsThread */
00337 
00338 static void SomeOpsSomeThreads(Shared *shared)
00339 {
00340     PRStatus rv;
00341     PRThread **thread;
00342     PRIntn index;
00343     PRRecvWait *desc_in;
00344 
00345     thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
00346 
00347     /* Create some threads */
00348 
00349     if (verbosity > quiet)
00350         PR_fprintf(debug, "%s: creating threads\n", shared->title);
00351     for (index = 0; index < worker_threads; ++index)
00352     {
00353         thread[index] = PR_CreateThread(
00354             PR_USER_THREAD, SomeOpsThread, shared,
00355             PR_PRIORITY_HIGH, thread_scope,
00356             PR_JOINABLE_THREAD, 16 * 1024);
00357     }
00358 
00359     /* then create some operations */
00360     if (verbosity > quiet)
00361         PR_fprintf(debug, "%s: creating desc\n", shared->title);
00362     for (index = 0; index < wait_objects; ++index)
00363     {
00364         desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
00365         rv = PR_AddWaitFileDesc(shared->group, desc_in);
00366         MW_ASSERT(PR_SUCCESS == rv);
00367     }
00368 
00369     if (verbosity > quiet)
00370         PR_fprintf(debug, "%s: sleeping\n", shared->title);
00371     while (ops_done < ops_required) PR_Sleep(shared->timeout);
00372 
00373     if (verbosity > quiet)
00374         PR_fprintf(debug, "%s: interrupting/joining threads\n", shared->title);
00375     for (index = 0; index < worker_threads; ++index)
00376     {
00377         rv = PR_Interrupt(thread[index]);
00378         MW_ASSERT(PR_SUCCESS == rv);
00379         rv = PR_JoinThread(thread[index]);
00380         MW_ASSERT(PR_SUCCESS == rv);
00381     }
00382     PR_DELETE(thread);
00383 
00384     CancelGroup(shared);
00385 }  /* SomeOpsSomeThreads */
00386 
00387 static PRStatus ServiceRequest(Shared *shared, PRRecvWait *desc)
00388 {
00389     PRInt32 bytes_out;
00390 
00391     if (verbosity > chatty)
00392         PR_fprintf(
00393             debug, "%s: Service received %d bytes\n",
00394             shared->title, desc->bytesRecv);
00395 
00396     if (0 == desc->bytesRecv) goto quitting;
00397     if ((-1 == desc->bytesRecv)
00398     && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted;
00399 
00400     bytes_out = PR_Send(
00401         desc->fd, desc->buffer.start, desc->bytesRecv, 0, shared->timeout);
00402     if (verbosity > chatty)
00403         PR_fprintf(
00404             debug, "%s: Service sent %d bytes\n",
00405             shared->title, bytes_out);
00406 
00407     if ((-1 == bytes_out)
00408     && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted;
00409     MW_ASSERT(bytes_out == desc->bytesRecv);
00410 
00411     return PR_SUCCESS;
00412 
00413 aborted:
00414 quitting:
00415     return PR_FAILURE;
00416 }  /* ServiceRequest */
00417 
00418 static void PR_CALLBACK ServiceThread(void *arg)
00419 {
00420     PRStatus rv = PR_SUCCESS;
00421     PRRecvWait *desc_out = NULL;
00422     Shared *shared = (Shared*)arg;
00423     do  /* until interrupted */
00424     {
00425         if (NULL != desc_out)
00426         {
00427             desc_out->timeout = PR_INTERVAL_NO_TIMEOUT;
00428             if (verbosity > chatty)
00429                 PrintRecvDesc(desc_out, "Service re-adding");
00430             rv = PR_AddWaitFileDesc(shared->group, desc_out);
00431             MW_ASSERT(PR_SUCCESS == rv);
00432         }
00433 
00434         desc_out = PR_WaitRecvReady(shared->group);
00435         if (NULL == desc_out)
00436         {
00437             MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
00438             break;
00439         }
00440 
00441         switch (desc_out->outcome)
00442         {
00443             case PR_MW_SUCCESS:
00444             {
00445                 PR_AtomicIncrement(&ops_done);
00446                 if (verbosity > chatty)
00447                     PrintRecvDesc(desc_out, "Service ready");
00448                 rv = ServiceRequest(shared, desc_out);
00449                 break;
00450             }
00451             case PR_MW_INTERRUPT:
00452                 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
00453                 rv = PR_FAILURE;  /* if interrupted, then exit */
00454                 break;
00455             case PR_MW_TIMEOUT:
00456                 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
00457             case PR_MW_FAILURE:
00458                 if (verbosity > silent)
00459                     PL_FPrintError(debug, "RecvReady failure");
00460                 break;
00461             default:
00462                 break;
00463         }
00464     } while (PR_SUCCESS == rv);
00465 
00466     if (NULL != desc_out) DestroyRecvWait(desc_out);
00467 
00468 }  /* ServiceThread */
00469 
00470 static void PR_CALLBACK EnumerationThread(void *arg)
00471 {
00472     PRStatus rv;
00473     PRIntn count;
00474     PRRecvWait *desc;
00475     Shared *shared = (Shared*)arg;
00476     PRIntervalTime five_seconds = PR_SecondsToInterval(5);
00477     PRMWaitEnumerator *enumerator = PR_CreateMWaitEnumerator(shared->group);
00478     MW_ASSERT(NULL != enumerator);
00479 
00480     while (PR_SUCCESS == PR_Sleep(five_seconds))
00481     {
00482         count = 0;
00483         desc = NULL;
00484         while (NULL != (desc = PR_EnumerateWaitGroup(enumerator, desc)))
00485         {
00486             if (verbosity > chatty) PrintRecvDesc(desc, shared->title);
00487             count += 1;
00488         }
00489         if (verbosity > silent)
00490             PR_fprintf(debug,
00491                 "%s Enumerated %d objects\n", shared->title, count);
00492     }
00493 
00494     MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
00495 
00496 
00497     rv = PR_DestroyMWaitEnumerator(enumerator);
00498     MW_ASSERT(PR_SUCCESS == rv);
00499 }  /* EnumerationThread */
00500 
00501 static void PR_CALLBACK ServerThread(void *arg)
00502 {
00503     PRStatus rv;
00504     PRIntn index;
00505     PRRecvWait *desc_in;
00506     PRThread **worker_thread;
00507     Shared *shared = (Shared*)arg;
00508     PRFileDesc *listener, *service;
00509     PRNetAddr server_address, client_address;
00510 
00511     worker_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
00512     if (verbosity > quiet)
00513         PR_fprintf(debug, "%s: Server creating worker_threads\n", shared->title);
00514     for (index = 0; index < worker_threads; ++index)
00515     {
00516         worker_thread[index] = PR_CreateThread(
00517             PR_USER_THREAD, ServiceThread, shared,
00518             PR_PRIORITY_HIGH, thread_scope,
00519             PR_JOINABLE_THREAD, 16 * 1024);
00520     }
00521 
00522     rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &server_address);
00523     MW_ASSERT(PR_SUCCESS == rv);
00524 
00525     listener = PR_NewTCPSocket(); MW_ASSERT(NULL != listener);
00526     if (verbosity > chatty)
00527         PR_fprintf(
00528             debug, "%s: Server listener socket @0x%x\n",
00529             shared->title, listener);
00530     rv = PR_Bind(listener, &server_address); MW_ASSERT(PR_SUCCESS == rv);
00531     rv = PR_Listen(listener, 10); MW_ASSERT(PR_SUCCESS == rv);
00532     while (ops_done < ops_required)
00533     {
00534         if (verbosity > quiet)
00535             PR_fprintf(debug, "%s: Server accepting connection\n", shared->title);
00536         service = PR_Accept(listener, &client_address, PR_INTERVAL_NO_TIMEOUT);
00537         if (NULL == service)
00538         {
00539             if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) break;
00540             PL_PrintError("Accept failed");
00541             MW_ASSERT(!"Accept failed");
00542         }
00543         else
00544         {
00545             desc_in = CreateRecvWait(service, shared->timeout);
00546             desc_in->timeout = PR_INTERVAL_NO_TIMEOUT;
00547             if (verbosity > chatty)
00548                 PrintRecvDesc(desc_in, "Service adding");
00549             rv = PR_AddWaitFileDesc(shared->group, desc_in);
00550             MW_ASSERT(PR_SUCCESS == rv);
00551         }
00552     }
00553 
00554     if (verbosity > quiet)
00555         PR_fprintf(debug, "%s: Server interrupting worker_threads\n", shared->title);
00556     for (index = 0; index < worker_threads; ++index)
00557     {
00558         rv = PR_Interrupt(worker_thread[index]);
00559         MW_ASSERT(PR_SUCCESS == rv);
00560         rv = PR_JoinThread(worker_thread[index]);
00561         MW_ASSERT(PR_SUCCESS == rv);
00562     }
00563     PR_DELETE(worker_thread);
00564 
00565     PR_Close(listener);
00566 
00567     CancelGroup(shared);
00568 
00569 }  /* ServerThread */
00570 
00571 static void RealOneGroupIO(Shared *shared)
00572 {
00573     /*
00574     ** Create a server that listens for connections and then services
00575     ** requests that come in over those connections. The server never
00576     ** deletes a connection and assumes a basic RPC model of operation.
00577     **
00578     ** Use worker_threads threads to service how every many open ports
00579     ** there might be.
00580     **
00581     ** Oh, ya. Almost forget. Create (some) clients as well.
00582     */
00583     PRStatus rv;
00584     PRIntn index;
00585     PRThread *server_thread, *enumeration_thread, **client_thread;
00586 
00587     if (verbosity > quiet)
00588         PR_fprintf(debug, "%s: creating server_thread\n", shared->title);
00589 
00590     server_thread = PR_CreateThread(
00591         PR_USER_THREAD, ServerThread, shared,
00592         PR_PRIORITY_HIGH, thread_scope,
00593         PR_JOINABLE_THREAD, 16 * 1024);
00594 
00595     if (verbosity > quiet)
00596         PR_fprintf(debug, "%s: creating enumeration_thread\n", shared->title);
00597 
00598     enumeration_thread = PR_CreateThread(
00599         PR_USER_THREAD, EnumerationThread, shared,
00600         PR_PRIORITY_HIGH, thread_scope,
00601         PR_JOINABLE_THREAD, 16 * 1024);
00602 
00603     if (verbosity > quiet)
00604         PR_fprintf(debug, "%s: snoozing before creating clients\n", shared->title);
00605     PR_Sleep(5 * shared->timeout);
00606 
00607     if (verbosity > quiet)
00608         PR_fprintf(debug, "%s: creating client_threads\n", shared->title);
00609     client_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * client_threads);
00610     for (index = 0; index < client_threads; ++index)
00611     {
00612         client_thread[index] = PR_CreateThread(
00613             PR_USER_THREAD, ClientThread, shared,
00614             PR_PRIORITY_NORMAL, thread_scope,
00615             PR_JOINABLE_THREAD, 16 * 1024);
00616     }
00617 
00618     while (ops_done < ops_required) PR_Sleep(shared->timeout);
00619 
00620     if (verbosity > quiet)
00621         PR_fprintf(debug, "%s: interrupting/joining client_threads\n", shared->title);
00622     for (index = 0; index < client_threads; ++index)
00623     {
00624         rv = PR_Interrupt(client_thread[index]);
00625         MW_ASSERT(PR_SUCCESS == rv);
00626         rv = PR_JoinThread(client_thread[index]);
00627         MW_ASSERT(PR_SUCCESS == rv);
00628     }
00629     PR_DELETE(client_thread);
00630 
00631     if (verbosity > quiet)
00632         PR_fprintf(debug, "%s: interrupting/joining enumeration_thread\n", shared->title);
00633     rv = PR_Interrupt(enumeration_thread);
00634     MW_ASSERT(PR_SUCCESS == rv);
00635     rv = PR_JoinThread(enumeration_thread);
00636     MW_ASSERT(PR_SUCCESS == rv);
00637 
00638     if (verbosity > quiet)
00639         PR_fprintf(debug, "%s: interrupting/joining server_thread\n", shared->title);
00640     rv = PR_Interrupt(server_thread);
00641     MW_ASSERT(PR_SUCCESS == rv);
00642     rv = PR_JoinThread(server_thread);
00643     MW_ASSERT(PR_SUCCESS == rv);
00644 }  /* RealOneGroupIO */
00645 
00646 static void RunThisOne(
00647     void (*func)(Shared*), const char *name, const char *test_name)
00648 {
00649     Shared *shared;
00650     if ((NULL == test_name) || (0 == PL_strcmp(name, test_name)))
00651     {
00652         if (verbosity > silent)
00653             PR_fprintf(debug, "%s()\n", name);
00654         shared = MakeShared(name);
00655         ops_done = 0;
00656         func(shared);  /* run the test */
00657         MW_ASSERT(0 == desc_allocated);
00658         DestroyShared(shared);
00659     }
00660 }  /* RunThisOne */
00661 
00662 static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta)
00663 {
00664     PRIntn verbage = (PRIntn)verbosity;
00665     return (Verbosity)(verbage += delta);
00666 }  /* ChangeVerbosity */
00667 
00668 PRIntn main(PRIntn argc, char **argv)
00669 {
00670     PLOptStatus os;
00671     const char *test_name = NULL;
00672     PLOptState *opt = PL_CreateOptState(argc, argv, "dqGc:o:p:t:w:");
00673 
00674     while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
00675     {
00676         if (PL_OPT_BAD == os) continue;
00677         switch (opt->option)
00678         {
00679         case 0:
00680             test_name = opt->value;
00681             break;
00682         case 'd':  /* debug mode */
00683             if (verbosity < noisy)
00684                 verbosity = ChangeVerbosity(verbosity, 1);
00685             break;
00686         case 'q':  /* debug mode */
00687             if (verbosity > silent)
00688                 verbosity = ChangeVerbosity(verbosity, -1);
00689             break;
00690         case 'G':  /* use global threads */
00691             thread_scope = PR_GLOBAL_THREAD;
00692             break;
00693         case 'c':  /* number of client threads */
00694             client_threads = atoi(opt->value);
00695             break;
00696         case 'o':  /* operations to compelete */
00697             ops_required = atoi(opt->value);
00698             break;
00699         case 'p':  /* default port */
00700             default_port = atoi(opt->value);
00701             break;
00702         case 't':  /* number of threads waiting */
00703             worker_threads = atoi(opt->value);
00704             break;
00705         case 'w':  /* number of wait objects */
00706             wait_objects = atoi(opt->value);
00707             break;
00708         default:
00709             break;
00710         }
00711     }
00712     PL_DestroyOptState(opt);
00713 
00714     if (verbosity > 0)
00715         debug = PR_GetSpecialFD(PR_StandardError);
00716 
00717     RunThisOne(OneInThenCancelled, "OneInThenCancelled", test_name);
00718     RunThisOne(OneOpOneThread, "OneOpOneThread", test_name);
00719     RunThisOne(ManyOpOneThread, "ManyOpOneThread", test_name);
00720     RunThisOne(SomeOpsSomeThreads, "SomeOpsSomeThreads", test_name);
00721     RunThisOne(RealOneGroupIO, "RealOneGroupIO", test_name);
00722     return 0;
00723 }  /* main */
00724 
00725 /* multwait.c */