Back to index

lightning-sunbird  0.9+nobinonly
thrpool_server.c
Go to the documentation of this file.
00001 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
00002 /* ***** BEGIN LICENSE BLOCK *****
00003  * Version: MPL 1.1/GPL 2.0/LGPL 2.1
00004  *
00005  * The contents of this file are subject to the Mozilla Public License Version
00006  * 1.1 (the "License"); you may not use this file except in compliance with
00007  * the License. You may obtain a copy of the License at
00008  * http://www.mozilla.org/MPL/
00009  *
00010  * Software distributed under the License is distributed on an "AS IS" basis,
00011  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
00012  * for the specific language governing rights and limitations under the
00013  * License.
00014  *
00015  * The Original Code is the Netscape Portable Runtime (NSPR).
00016  *
00017  * The Initial Developer of the Original Code is
00018  * Netscape Communications Corporation.
00019  * Portions created by the Initial Developer are Copyright (C) 1999-2000
00020  * the Initial Developer. All Rights Reserved.
00021  *
00022  * Contributor(s):
00023  *
00024  * Alternatively, the contents of this file may be used under the terms of
00025  * either the GNU General Public License Version 2 or later (the "GPL"), or
00026  * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
00027  * in which case the provisions of the GPL or the LGPL are applicable instead
00028  * of those above. If you wish to allow use of your version of this file only
00029  * under the terms of either the GPL or the LGPL, and not to allow others to
00030  * use your version of this file under the terms of the MPL, indicate your
00031  * decision by deleting the provisions above and replace them with the notice
00032  * and other provisions required by the GPL or the LGPL. If you do not delete
00033  * the provisions above, a recipient may use your version of this file under
00034  * the terms of any one of the MPL, the GPL or the LGPL.
00035  *
00036  * ***** END LICENSE BLOCK ***** */
00037 
00038 /***********************************************************************
00039 **
00040 ** Name: thrpool.c
00041 **
00042 ** Description: Test threadpool functionality.
00043 **
00044 ** Modification History:
00045 */
00046 #include "primpl.h"
00047 
00048 #include "plgetopt.h"
00049 
00050 #include <stdio.h>
00051 #include <string.h>
00052 #include <errno.h>
00053 #ifdef XP_UNIX
00054 #include <sys/mman.h>
00055 #endif
00056 #if defined(_PR_PTHREADS) && !defined(_PR_DCETHREADS)
00057 #include <pthread.h>
00058 #endif
00059 
00060 /* for getcwd */
00061 #if defined(XP_UNIX) || defined (XP_OS2_EMX) || defined(XP_BEOS)
00062 #include <unistd.h>
00063 #elif defined(XP_PC)
00064 #include <direct.h>
00065 #endif
00066 
00067 #ifdef WIN32
00068 #include <process.h>
00069 #endif
00070 
00071 static int _debug_on = 0;
00072 static char *program_name = NULL;
00073 static void serve_client_write(void *arg);
00074 
00075 #ifdef XP_MAC
00076 #include "prlog.h"
00077 #include "prsem.h"
00078 int fprintf(FILE *stream, const char *fmt, ...)
00079 {
00080     PR_LogPrint(fmt);
00081     return 0;
00082 }
00083 #define printf PR_LogPrint
00084 extern void SetupMacPrintfLog(char *logFile);
00085 #else
00086 #include "obsolete/prsem.h"
00087 #endif
00088 
00089 #ifdef XP_PC
00090 #define mode_t int
00091 #endif
00092 
00093 #define DPRINTF(arg) if (_debug_on) printf arg
00094 
00095 
00096 #define BUF_DATA_SIZE    (2 * 1024)
00097 #define TCP_MESG_SIZE    1024
00098 #define NUM_TCP_CLIENTS  10 /* for a listen queue depth of 5 */
00099 
00100 
00101 #define NUM_TCP_CONNECTIONS_PER_CLIENT  10
00102 #define NUM_TCP_MESGS_PER_CONNECTION    10
00103 #define TCP_SERVER_PORT                          10000
00104 #define SERVER_MAX_BIND_COUNT             100
00105 
00106 static PRInt32 num_tcp_clients = NUM_TCP_CLIENTS;
00107 static PRInt32 num_tcp_connections_per_client = NUM_TCP_CONNECTIONS_PER_CLIENT;
00108 static PRInt32 tcp_mesg_size = TCP_MESG_SIZE;
00109 static PRInt32 num_tcp_mesgs_per_connection = NUM_TCP_MESGS_PER_CONNECTION;
00110 static void TCP_Server_Accept(void *arg);
00111 
00112 
00113 int failed_already=0;
00114 typedef struct buffer {
00115     char    data[BUF_DATA_SIZE];
00116 } buffer;
00117 
00118 
00119 typedef struct Server_Param {
00120     PRJobIoDesc iod;    /* socket to read from/write to    */
00121     PRInt32          datalen;    /* bytes of data transfered in each read/write */
00122     PRNetAddr netaddr;
00123     PRMonitor *exit_mon;    /* monitor to signal on exit            */
00124     PRInt32          *job_counterp;    /* counter to decrement, before exit        */
00125     PRInt32          conn_counter;    /* counter to decrement, before exit        */
00126        PRThreadPool *tp;
00127 } Server_Param;
00128 
00129 typedef struct Serve_Client_Param {
00130     PRJobIoDesc iod;    /* socket to read from/write to    */
00131     PRInt32    datalen;    /* bytes of data transfered in each read/write */
00132     PRMonitor *exit_mon;    /* monitor to signal on exit            */
00133     PRInt32 *job_counterp;    /* counter to decrement, before exit        */
00134        PRThreadPool *tp;
00135 } Serve_Client_Param;
00136 
00137 typedef struct Session {
00138     PRJobIoDesc iod;    /* socket to read from/write to    */
00139        buffer        *in_buf;
00140        PRInt32 bytes;
00141        PRInt32 msg_num;
00142        PRInt32 bytes_read;
00143     PRMonitor *exit_mon;    /* monitor to signal on exit            */
00144     PRInt32 *job_counterp;    /* counter to decrement, before exit        */
00145        PRThreadPool *tp;
00146 } Session;
00147 
00148 static void
00149 serve_client_read(void *arg)
00150 {
00151        Session *sp = (Session *) arg;
00152     int rem;
00153     int bytes;
00154     int offset;
00155        PRFileDesc *sockfd;
00156        char *buf;
00157        PRJob *jobp;
00158 
00159        PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT;
00160 
00161        sockfd = sp->iod.socket;
00162        buf = sp->in_buf->data;
00163 
00164     PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection);
00165        PR_ASSERT(sp->bytes_read < sp->bytes);
00166 
00167        offset = sp->bytes_read;
00168        rem = sp->bytes - offset;
00169        bytes = PR_Recv(sockfd, buf + offset, rem, 0, timeout);
00170        if (bytes < 0) {
00171               return;
00172        }
00173        sp->bytes_read += bytes;
00174        sp->iod.timeout = PR_SecondsToInterval(60);
00175        if (sp->bytes_read <  sp->bytes) {
00176               jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
00177                                                  PR_FALSE);
00178               PR_ASSERT(NULL != jobp);
00179               return;
00180        }
00181        PR_ASSERT(sp->bytes_read == sp->bytes);
00182        DPRINTF(("serve_client: read complete, msg(%d) \n", sp->msg_num));
00183 
00184        sp->iod.timeout = PR_SecondsToInterval(60);
00185        jobp = PR_QueueJob_Write(sp->tp, &sp->iod, serve_client_write, sp,
00186                                                  PR_FALSE);
00187        PR_ASSERT(NULL != jobp);
00188 
00189     return;
00190 }
00191 
00192 static void
00193 serve_client_write(void *arg)
00194 {
00195        Session *sp = (Session *) arg;
00196     int bytes;
00197        PRFileDesc *sockfd;
00198        char *buf;
00199        PRJob *jobp;
00200 
00201        sockfd = sp->iod.socket;
00202        buf = sp->in_buf->data;
00203 
00204     PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection);
00205 
00206        bytes = PR_Send(sockfd, buf, sp->bytes, 0, PR_INTERVAL_NO_TIMEOUT);
00207        PR_ASSERT(bytes == sp->bytes);
00208 
00209        if (bytes < 0) {
00210               return;
00211        }
00212        DPRINTF(("serve_client: write complete, msg(%d) \n", sp->msg_num));
00213     sp->msg_num++;
00214     if (sp->msg_num < num_tcp_mesgs_per_connection) {
00215               sp->bytes_read = 0;
00216               sp->iod.timeout = PR_SecondsToInterval(60);
00217               jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
00218                                                  PR_FALSE);
00219               PR_ASSERT(NULL != jobp);
00220               return;
00221        }
00222 
00223        DPRINTF(("serve_client: read/write complete, msg(%d) \n", sp->msg_num));
00224     if (PR_Shutdown(sockfd, PR_SHUTDOWN_BOTH) < 0) {
00225         fprintf(stderr,"%s: ERROR - PR_Shutdown\n", program_name);
00226     }
00227 
00228     PR_Close(sockfd);
00229     PR_EnterMonitor(sp->exit_mon);
00230     --(*sp->job_counterp);
00231     PR_Notify(sp->exit_mon);
00232     PR_ExitMonitor(sp->exit_mon);
00233 
00234     PR_DELETE(sp->in_buf);
00235     PR_DELETE(sp);
00236 
00237     return;
00238 }
00239 
00240 /*
00241  * Serve_Client
00242  *    Thread, started by the server, for serving a client connection.
00243  *    Reads data from socket and writes it back, unmodified, and
00244  *    closes the socket
00245  */
00246 static void PR_CALLBACK
00247 Serve_Client(void *arg)
00248 {
00249     Serve_Client_Param *scp = (Serve_Client_Param *) arg;
00250     buffer *in_buf;
00251        Session *sp;
00252        PRJob *jobp;
00253 
00254        sp = PR_NEW(Session);
00255        sp->iod = scp->iod;
00256 
00257     in_buf = PR_NEW(buffer);
00258     if (in_buf == NULL) {
00259         fprintf(stderr,"%s: failed to alloc buffer struct\n",program_name);
00260         failed_already=1;
00261         return;
00262     }
00263 
00264        sp->in_buf = in_buf;
00265        sp->bytes = scp->datalen;
00266        sp->msg_num = 0;
00267        sp->bytes_read = 0;
00268        sp->tp = scp->tp;
00269        sp->exit_mon = scp->exit_mon;
00270     sp->job_counterp = scp->job_counterp;
00271 
00272        sp->iod.timeout = PR_SecondsToInterval(60);
00273        jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
00274                                                  PR_FALSE);
00275        PR_ASSERT(NULL != jobp);
00276        PR_DELETE(scp);
00277 }
00278 
00279 static void
00280 print_stats(void *arg)
00281 {
00282     Server_Param *sp = (Server_Param *) arg;
00283     PRThreadPool *tp = sp->tp;
00284     PRInt32 counter;
00285        PRJob *jobp;
00286 
00287        PR_EnterMonitor(sp->exit_mon);
00288        counter = (*sp->job_counterp);
00289        PR_ExitMonitor(sp->exit_mon);
00290 
00291        printf("PRINT_STATS: #client connections = %d\n",counter);
00292 
00293 
00294        jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500),
00295                                           print_stats, sp, PR_FALSE);
00296 
00297        PR_ASSERT(NULL != jobp);
00298 }
00299 
00300 static int job_counter = 0;
00301 /*
00302  * TCP Server
00303  *    Server binds an address to a socket, starts a client process and
00304  *       listens for incoming connections.
00305  *    Each client connects to the server and sends a chunk of data
00306  *    Starts a Serve_Client job for each incoming connection, to read
00307  *    the data from the client and send it back to the client, unmodified.
00308  *    Each client checks that data received from server is same as the
00309  *    data it sent to the server.
00310  *       Finally, the threadpool is shutdown
00311  */
00312 static void PR_CALLBACK
00313 TCP_Server(void *arg)
00314 {
00315     PRThreadPool *tp = (PRThreadPool *) arg;
00316     Server_Param *sp;
00317     PRFileDesc *sockfd;
00318     PRNetAddr netaddr;
00319        PRMonitor *sc_mon;
00320        PRJob *jobp;
00321        int i;
00322        PRStatus rval;
00323 
00324     /*
00325      * Create a tcp socket
00326      */
00327     if ((sockfd = PR_NewTCPSocket()) == NULL) {
00328         fprintf(stderr,"%s: PR_NewTCPSocket failed\n", program_name);
00329         return;
00330     }
00331     memset(&netaddr, 0 , sizeof(netaddr));
00332     netaddr.inet.family = PR_AF_INET;
00333     netaddr.inet.port = PR_htons(TCP_SERVER_PORT);
00334     netaddr.inet.ip = PR_htonl(PR_INADDR_ANY);
00335     /*
00336      * try a few times to bind server's address, if addresses are in
00337      * use
00338      */
00339        i = 0;
00340     while (PR_Bind(sockfd, &netaddr) < 0) {
00341         if (PR_GetError() == PR_ADDRESS_IN_USE_ERROR) {
00342             netaddr.inet.port += 2;
00343             if (i++ < SERVER_MAX_BIND_COUNT)
00344                 continue;
00345         }
00346         fprintf(stderr,"%s: ERROR - PR_Bind failed\n", program_name);
00347         perror("PR_Bind");
00348         failed_already=1;
00349         return;
00350     }
00351 
00352     if (PR_Listen(sockfd, 32) < 0) {
00353         fprintf(stderr,"%s: ERROR - PR_Listen failed\n", program_name);
00354         failed_already=1;
00355         return;
00356     }
00357 
00358     if (PR_GetSockName(sockfd, &netaddr) < 0) {
00359         fprintf(stderr,"%s: ERROR - PR_GetSockName failed\n", program_name);
00360         failed_already=1;
00361         return;
00362     }
00363 
00364     DPRINTF((
00365        "TCP_Server: PR_BIND netaddr.inet.ip = 0x%lx, netaddr.inet.port = %d\n",
00366         netaddr.inet.ip, netaddr.inet.port));
00367 
00368        sp = PR_NEW(Server_Param);
00369        if (sp == NULL) {
00370               fprintf(stderr,"%s: PR_NEW failed\n", program_name);
00371               failed_already=1;
00372               return;
00373        }
00374        sp->iod.socket = sockfd;
00375        sp->iod.timeout = PR_SecondsToInterval(60);
00376        sp->datalen = tcp_mesg_size;
00377        sp->exit_mon = sc_mon;
00378        sp->job_counterp = &job_counter;
00379        sp->conn_counter = 0;
00380        sp->tp = tp;
00381        sp->netaddr = netaddr;
00382 
00383        /* create and cancel an io job */
00384        jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
00385                                                  PR_FALSE);
00386        PR_ASSERT(NULL != jobp);
00387        rval = PR_CancelJob(jobp);
00388        PR_ASSERT(PR_SUCCESS == rval);
00389 
00390        /*
00391         * create the client process
00392         */
00393        {
00394 #define MAX_ARGS 4
00395               char *argv[MAX_ARGS + 1];
00396               int index = 0;
00397               char port[32];
00398         char path[1024 + sizeof("/thrpool_client")];
00399         (void)getcwd(path, sizeof(path));
00400         (void)strcat(path, "/thrpool_client");
00401 #ifdef XP_PC
00402         (void)strcat(path, ".exe");
00403 #endif
00404         argv[index++] = path;
00405               sprintf(port,"%d",PR_ntohs(netaddr.inet.port));
00406         if (_debug_on)
00407         {
00408             argv[index++] = "-d";
00409             argv[index++] = "-p";
00410             argv[index++] = port;
00411             argv[index++] = NULL;
00412         } else {
00413             argv[index++] = "-p";
00414             argv[index++] = port;
00415                      argv[index++] = NULL;
00416               }
00417               PR_ASSERT(MAX_ARGS >= (index - 1));
00418         
00419         DPRINTF(("creating client process %s ...\n", path));
00420         if (PR_FAILURE == PR_CreateProcessDetached(path, argv, NULL, NULL)) {
00421               fprintf(stderr,
00422                             "thrpool_server: ERROR - PR_CreateProcessDetached failed\n");
00423               failed_already=1;
00424               return;
00425               }
00426        }
00427 
00428     sc_mon = PR_NewMonitor();
00429     if (sc_mon == NULL) {
00430         fprintf(stderr,"%s: PR_NewMonitor failed\n", program_name);
00431         failed_already=1;
00432         return;
00433     }
00434 
00435        sp->iod.socket = sockfd;
00436        sp->iod.timeout = PR_SecondsToInterval(60);
00437        sp->datalen = tcp_mesg_size;
00438        sp->exit_mon = sc_mon;
00439        sp->job_counterp = &job_counter;
00440        sp->conn_counter = 0;
00441        sp->tp = tp;
00442        sp->netaddr = netaddr;
00443 
00444        /* create and cancel a timer job */
00445        jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(5000),
00446                                           print_stats, sp, PR_FALSE);
00447        PR_ASSERT(NULL != jobp);
00448        rval = PR_CancelJob(jobp);
00449        PR_ASSERT(PR_SUCCESS == rval);
00450 
00451     DPRINTF(("TCP_Server: Accepting connections \n"));
00452 
00453        jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
00454                                                  PR_FALSE);
00455        PR_ASSERT(NULL != jobp);
00456        return;
00457 }
00458 
00459 static void
00460 TCP_Server_Accept(void *arg)
00461 {
00462     Server_Param *sp = (Server_Param *) arg;
00463     PRThreadPool *tp = sp->tp;
00464     Serve_Client_Param *scp;
00465        PRFileDesc *newsockfd;
00466        PRJob *jobp;
00467 
00468        if ((newsockfd = PR_Accept(sp->iod.socket, &sp->netaddr,
00469               PR_INTERVAL_NO_TIMEOUT)) == NULL) {
00470               fprintf(stderr,"%s: ERROR - PR_Accept failed\n", program_name);
00471               failed_already=1;
00472               goto exit;
00473        }
00474        scp = PR_NEW(Serve_Client_Param);
00475        if (scp == NULL) {
00476               fprintf(stderr,"%s: PR_NEW failed\n", program_name);
00477               failed_already=1;
00478               goto exit;
00479        }
00480 
00481        /*
00482         * Start a Serve_Client job for each incoming connection
00483         */
00484        scp->iod.socket = newsockfd;
00485        scp->iod.timeout = PR_SecondsToInterval(60);
00486        scp->datalen = tcp_mesg_size;
00487        scp->exit_mon = sp->exit_mon;
00488        scp->job_counterp = sp->job_counterp;
00489        scp->tp = sp->tp;
00490 
00491        PR_EnterMonitor(sp->exit_mon);
00492        (*sp->job_counterp)++;
00493        PR_ExitMonitor(sp->exit_mon);
00494        jobp = PR_QueueJob(tp, Serve_Client, scp,
00495                                           PR_FALSE);
00496 
00497        PR_ASSERT(NULL != jobp);
00498        DPRINTF(("TCP_Server: Created Serve_Client = 0x%lx\n", jobp));
00499 
00500        /*
00501         * single-threaded update; no lock needed
00502         */
00503     sp->conn_counter++;
00504     if (sp->conn_counter <
00505                      (num_tcp_clients * num_tcp_connections_per_client)) {
00506               jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
00507                                                         PR_FALSE);
00508               PR_ASSERT(NULL != jobp);
00509               return;
00510        }
00511        jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500),
00512                                           print_stats, sp, PR_FALSE);
00513 
00514        PR_ASSERT(NULL != jobp);
00515        DPRINTF(("TCP_Server: Created print_stats timer job = 0x%lx\n", jobp));
00516 
00517 exit:
00518        PR_EnterMonitor(sp->exit_mon);
00519     /* Wait for server jobs to finish */
00520     while (0 != *sp->job_counterp) {
00521         PR_Wait(sp->exit_mon, PR_INTERVAL_NO_TIMEOUT);
00522         DPRINTF(("TCP_Server: conn_counter = %d\n",
00523                                                                                     *sp->job_counterp));
00524     }
00525 
00526     PR_ExitMonitor(sp->exit_mon);
00527     if (sp->iod.socket) {
00528         PR_Close(sp->iod.socket);
00529     }
00530        PR_DestroyMonitor(sp->exit_mon);
00531     printf("%30s","TCP_Socket_Client_Server_Test:");
00532     printf("%2ld Server %2ld Clients %2ld connections_per_client\n",1l,
00533         num_tcp_clients, num_tcp_connections_per_client);
00534     printf("%30s %2ld messages_per_connection %4ld bytes_per_message\n",":",
00535         num_tcp_mesgs_per_connection, tcp_mesg_size);
00536 
00537        DPRINTF(("%s: calling PR_ShutdownThreadPool\n", program_name));
00538        PR_ShutdownThreadPool(sp->tp);
00539        PR_DELETE(sp);
00540 }
00541 
00542 /************************************************************************/
00543 
00544 #define DEFAULT_INITIAL_THREADS           4
00545 #define DEFAULT_MAX_THREADS               100
00546 #define DEFAULT_STACKSIZE                 (512 * 1024)
00547 
00548 int
00549 main(int argc, char **argv)
00550 {
00551        PRInt32 initial_threads = DEFAULT_INITIAL_THREADS;
00552        PRInt32 max_threads = DEFAULT_MAX_THREADS;
00553        PRInt32 stacksize = DEFAULT_STACKSIZE;
00554        PRThreadPool *tp = NULL;
00555        PRStatus rv;
00556        PRJob *jobp;
00557 
00558     /*
00559      * -d           debug mode
00560      */
00561     PLOptStatus os;
00562     PLOptState *opt;
00563 
00564        program_name = argv[0];
00565     opt = PL_CreateOptState(argc, argv, "d");
00566     while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
00567     {
00568         if (PL_OPT_BAD == os) continue;
00569         switch (opt->option)
00570         {
00571         case 'd':  /* debug mode */
00572             _debug_on = 1;
00573             break;
00574         default:
00575             break;
00576         }
00577     }
00578     PL_DestroyOptState(opt);
00579 
00580     PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0);
00581     PR_STDIO_INIT();
00582 
00583 #ifdef XP_MAC
00584     SetupMacPrintfLog("socket.log");
00585 #endif
00586     PR_SetConcurrency(4);
00587 
00588        tp = PR_CreateThreadPool(initial_threads, max_threads, stacksize);
00589     if (NULL == tp) {
00590         printf("PR_CreateThreadPool failed\n");
00591         failed_already=1;
00592         goto done;
00593        }
00594        jobp = PR_QueueJob(tp, TCP_Server, tp, PR_TRUE);
00595        rv = PR_JoinJob(jobp);             
00596        PR_ASSERT(PR_SUCCESS == rv);
00597 
00598        DPRINTF(("%s: calling PR_JoinThreadPool\n", program_name));
00599        rv = PR_JoinThreadPool(tp);
00600        PR_ASSERT(PR_SUCCESS == rv);
00601        DPRINTF(("%s: returning from PR_JoinThreadPool\n", program_name));
00602 
00603 done:
00604     PR_Cleanup();
00605     if (failed_already) return 1;
00606     else return 0;
00607 }