Back to index

lightning-sunbird  0.9+nobinonly
thrpool_client.c
Go to the documentation of this file.
00001 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
00002 /* ***** BEGIN LICENSE BLOCK *****
00003  * Version: MPL 1.1/GPL 2.0/LGPL 2.1
00004  *
00005  * The contents of this file are subject to the Mozilla Public License Version
00006  * 1.1 (the "License"); you may not use this file except in compliance with
00007  * the License. You may obtain a copy of the License at
00008  * http://www.mozilla.org/MPL/
00009  *
00010  * Software distributed under the License is distributed on an "AS IS" basis,
00011  * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
00012  * for the specific language governing rights and limitations under the
00013  * License.
00014  *
00015  * The Original Code is the Netscape Portable Runtime (NSPR).
00016  *
00017  * The Initial Developer of the Original Code is
00018  * Netscape Communications Corporation.
00019  * Portions created by the Initial Developer are Copyright (C) 1999-2000
00020  * the Initial Developer. All Rights Reserved.
00021  *
00022  * Contributor(s):
00023  *
00024  * Alternatively, the contents of this file may be used under the terms of
00025  * either the GNU General Public License Version 2 or later (the "GPL"), or
00026  * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
00027  * in which case the provisions of the GPL or the LGPL are applicable instead
00028  * of those above. If you wish to allow use of your version of this file only
00029  * under the terms of either the GPL or the LGPL, and not to allow others to
00030  * use your version of this file under the terms of the MPL, indicate your
00031  * decision by deleting the provisions above and replace them with the notice
00032  * and other provisions required by the GPL or the LGPL. If you do not delete
00033  * the provisions above, a recipient may use your version of this file under
00034  * the terms of any one of the MPL, the GPL or the LGPL.
00035  *
00036  * ***** END LICENSE BLOCK ***** */
00037 
00038 /***********************************************************************
00039 **
00040 ** Name: thrpool_client.c
00041 **
00042 ** Description: Test threadpool functionality.
00043 **
00044 ** Modification History:
00045 */
00046 #include "primpl.h"
00047 
00048 #include "plgetopt.h"
00049 
00050 #include <stdio.h>
00051 #include <string.h>
00052 #include <errno.h>
00053 #ifdef XP_UNIX
00054 #include <sys/mman.h>
00055 #endif
00056 #if defined(_PR_PTHREADS) && !defined(_PR_DCETHREADS)
00057 #include <pthread.h>
00058 #endif
00059 
00060 #ifdef WIN32
00061 #include <process.h>
00062 #endif
00063 
00064 static int _debug_on = 0;
00065 static int server_port = -1;
00066 static char *program_name = NULL;
00067 
00068 #ifdef XP_MAC
00069 #include "prlog.h"
00070 #include "prsem.h"
00071 int fprintf(FILE *stream, const char *fmt, ...)
00072 {
00073     PR_LogPrint(fmt);
00074     return 0;
00075 }
00076 #define printf PR_LogPrint
00077 extern void SetupMacPrintfLog(char *logFile);
00078 #else
00079 #include "obsolete/prsem.h"
00080 #endif
00081 
00082 #ifdef XP_PC
00083 #define mode_t int
00084 #endif
00085 
00086 #define DPRINTF(arg) if (_debug_on) printf arg
00087 
00088 #define    BUF_DATA_SIZE    (2 * 1024)
00089 #define TCP_MESG_SIZE    1024
00090 #define NUM_TCP_CLIENTS            10     /* for a listen queue depth of 5 */
00091 
00092 #define NUM_TCP_CONNECTIONS_PER_CLIENT    10
00093 #define NUM_TCP_MESGS_PER_CONNECTION    10
00094 #define TCP_SERVER_PORT            10000
00095 
00096 static PRInt32 num_tcp_clients = NUM_TCP_CLIENTS;
00097 static PRInt32 num_tcp_connections_per_client = NUM_TCP_CONNECTIONS_PER_CLIENT;
00098 static PRInt32 tcp_mesg_size = TCP_MESG_SIZE;
00099 static PRInt32 num_tcp_mesgs_per_connection = NUM_TCP_MESGS_PER_CONNECTION;
00100 
00101 int failed_already=0;
00102 
00103 typedef struct buffer {
00104     char    data[BUF_DATA_SIZE];
00105 } buffer;
00106 
00107 PRNetAddr tcp_server_addr, udp_server_addr;
00108 
00109 typedef struct Client_Param {
00110     PRNetAddr server_addr;
00111     PRMonitor *exit_mon;    /* monitor to signal on exit */
00112     PRInt32 *exit_counter;    /* counter to decrement, before exit */
00113     PRInt32    datalen;
00114 } Client_Param;
00115 
00116 /*
00117  * readn
00118  *    read data from sockfd into buf
00119  */
00120 static PRInt32
00121 readn(PRFileDesc *sockfd, char *buf, int len)
00122 {
00123     int rem;
00124     int bytes;
00125     int offset = 0;
00126        PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT;
00127 
00128     for (rem=len; rem; offset += bytes, rem -= bytes) {
00129         DPRINTF(("thread = 0x%lx: calling PR_Recv, bytes = %d\n",
00130             PR_GetCurrentThread(), rem));
00131         bytes = PR_Recv(sockfd, buf + offset, rem, 0,
00132               timeout);
00133         DPRINTF(("thread = 0x%lx: returning from PR_Recv, bytes = %d\n",
00134             PR_GetCurrentThread(), bytes));
00135         if (bytes < 0) {
00136                      return -1;
00137               }      
00138     }
00139     return len;
00140 }
00141 
00142 /*
00143  * writen
00144  *    write data from buf to sockfd
00145  */
00146 static PRInt32
00147 writen(PRFileDesc *sockfd, char *buf, int len)
00148 {
00149     int rem;
00150     int bytes;
00151     int offset = 0;
00152 
00153     for (rem=len; rem; offset += bytes, rem -= bytes) {
00154         DPRINTF(("thread = 0x%lx: calling PR_Send, bytes = %d\n",
00155             PR_GetCurrentThread(), rem));
00156         bytes = PR_Send(sockfd, buf + offset, rem, 0,
00157             PR_INTERVAL_NO_TIMEOUT);
00158         DPRINTF(("thread = 0x%lx: returning from PR_Send, bytes = %d\n",
00159             PR_GetCurrentThread(), bytes));
00160         if (bytes <= 0)
00161             return -1;
00162     }
00163     return len;
00164 }
00165 
00166 /*
00167  * TCP_Client
00168  *    Client job
00169  *    Connect to the server at the address specified in the argument.
00170  *    Fill in a buffer, write data to server, read it back and check
00171  *    for data corruption.
00172  *    Close the socket for server connection
00173  */
00174 static void PR_CALLBACK
00175 TCP_Client(void *arg)
00176 {
00177     Client_Param *cp = (Client_Param *) arg;
00178     PRFileDesc *sockfd;
00179     buffer *in_buf, *out_buf;
00180     union PRNetAddr netaddr;
00181     PRInt32 bytes, i, j;
00182 
00183 
00184     DPRINTF(("TCP client started\n"));
00185     bytes = cp->datalen;
00186     out_buf = PR_NEW(buffer);
00187     if (out_buf == NULL) {
00188         fprintf(stderr,"%s: failed to alloc buffer struct\n", program_name);
00189         failed_already=1;
00190         return;
00191     }
00192     in_buf = PR_NEW(buffer);
00193     if (in_buf == NULL) {
00194         fprintf(stderr,"%s: failed to alloc buffer struct\n", program_name);
00195         failed_already=1;
00196         return;
00197     }
00198     netaddr.inet.family = cp->server_addr.inet.family;
00199     netaddr.inet.port = cp->server_addr.inet.port;
00200     netaddr.inet.ip = cp->server_addr.inet.ip;
00201 
00202     for (i = 0; i < num_tcp_connections_per_client; i++) {
00203         if ((sockfd = PR_OpenTCPSocket(PR_AF_INET)) == NULL) {
00204             fprintf(stderr,"%s: PR_OpenTCPSocket failed\n", program_name);
00205             failed_already=1;
00206             return;
00207         }
00208 
00209         DPRINTF(("TCP client connecting to server:%d\n", server_port));
00210         if (PR_Connect(sockfd, &netaddr,PR_INTERVAL_NO_TIMEOUT) < 0){
00211               fprintf(stderr, "PR_Connect failed: (%ld, %ld)\n",
00212                      PR_GetError(), PR_GetOSError());
00213             failed_already=1;
00214             return;
00215         }
00216         for (j = 0; j < num_tcp_mesgs_per_connection; j++) {
00217             /*
00218              * fill in random data
00219              */
00220             memset(out_buf->data, ((PRInt32) (&netaddr)) + i + j, bytes);
00221             /*
00222              * write to server
00223              */
00224             if (writen(sockfd, out_buf->data, bytes) < bytes) {
00225                 fprintf(stderr,"%s: ERROR - TCP_Client:writen\n", program_name);
00226                 failed_already=1;
00227                 return;
00228             }
00229                      /*
00230             DPRINTF(("TCP Client [0x%lx]: out_buf = 0x%lx out_buf[0] = 0x%lx\n",
00231                 PR_GetCurrentThread(), out_buf, (*((int *) out_buf->data))));
00232                      */
00233             if (readn(sockfd, in_buf->data, bytes) < bytes) {
00234                 fprintf(stderr,"%s: ERROR - TCP_Client:readn\n", program_name);
00235                 failed_already=1;
00236                 return;
00237             }
00238             /*
00239              * verify the data read
00240              */
00241             if (memcmp(in_buf->data, out_buf->data, bytes) != 0) {
00242                 fprintf(stderr,"%s: ERROR - data corruption\n", program_name);
00243                 failed_already=1;
00244                 return;
00245             }
00246         }
00247         /*
00248          * shutdown reads and writes
00249          */
00250         if (PR_Shutdown(sockfd, PR_SHUTDOWN_BOTH) < 0) {
00251             fprintf(stderr,"%s: ERROR - PR_Shutdown\n", program_name);
00252             failed_already=1;
00253         }
00254         PR_Close(sockfd);
00255     }
00256 
00257     PR_DELETE(out_buf);
00258     PR_DELETE(in_buf);
00259 
00260     /*
00261      * Decrement exit_counter and notify parent thread
00262      */
00263 
00264     PR_EnterMonitor(cp->exit_mon);
00265     --(*cp->exit_counter);
00266     PR_Notify(cp->exit_mon);
00267     PR_ExitMonitor(cp->exit_mon);
00268     DPRINTF(("TCP_Client exiting\n"));
00269 }
00270 
00271 /*
00272  * TCP_Socket_Client_Server_Test    - concurrent server test
00273  *    
00274  *    Each client connects to the server and sends a chunk of data
00275  *    For each connection, server reads the data
00276  *    from the client and sends it back to the client, unmodified.
00277  *    Each client checks that data received from server is same as the
00278  *    data it sent to the server.
00279  *
00280  */
00281 
00282 static PRInt32
00283 TCP_Socket_Client_Server_Test(void)
00284 {
00285     int i;
00286     Client_Param *cparamp;
00287     PRMonitor *mon2;
00288     PRInt32    datalen;
00289     PRInt32    connections = 0;
00290        PRThread *thr;
00291 
00292     datalen = tcp_mesg_size;
00293     connections = 0;
00294 
00295     mon2 = PR_NewMonitor();
00296     if (mon2 == NULL) {
00297         fprintf(stderr,"%s: PR_NewMonitor failed\n", program_name);
00298         failed_already=1;
00299         return -1;
00300     }
00301 
00302     /*
00303      * Start client jobs
00304      */
00305     cparamp = PR_NEW(Client_Param);
00306     if (cparamp == NULL) {
00307         fprintf(stderr,"%s: PR_NEW failed\n", program_name);
00308         failed_already=1;
00309         return -1;
00310     }
00311     cparamp->server_addr.inet.family = PR_AF_INET;
00312     cparamp->server_addr.inet.port = PR_htons(server_port);
00313     cparamp->server_addr.inet.ip = PR_htonl(PR_INADDR_LOOPBACK);
00314     cparamp->exit_mon = mon2;
00315     cparamp->exit_counter = &connections;
00316     cparamp->datalen = datalen;
00317     for (i = 0; i < num_tcp_clients; i++) {
00318               thr = PR_CreateThread(PR_USER_THREAD, TCP_Client, (void *)cparamp,
00319                      PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_UNJOINABLE_THREAD, 0);
00320         if (NULL == thr) {
00321             fprintf(stderr,"%s: PR_CreateThread failed\n", program_name);
00322             failed_already=1;
00323             return -1;
00324         }
00325        PR_EnterMonitor(mon2);
00326         connections++;
00327        PR_ExitMonitor(mon2);
00328         DPRINTF(("Created TCP client = 0x%lx\n", thr));
00329     }
00330     /* Wait for client jobs to exit */
00331     PR_EnterMonitor(mon2);
00332     while (0 != connections) {
00333         PR_Wait(mon2, PR_INTERVAL_NO_TIMEOUT);
00334         DPRINTF(("Client job count = %d\n", connections));
00335     }
00336     PR_ExitMonitor(mon2);
00337     printf("%30s","TCP_Socket_Client_Server_Test:");
00338     printf("%2ld Server %2ld Clients %2ld connections_per_client\n",1l,
00339         num_tcp_clients, num_tcp_connections_per_client);
00340     printf("%30s %2ld messages_per_connection %4ld bytes_per_message\n",":",
00341         num_tcp_mesgs_per_connection, tcp_mesg_size);
00342 
00343     PR_DELETE(cparamp);
00344     return 0;
00345 }
00346 
00347 /************************************************************************/
00348 
00349 int
00350 main(int argc, char **argv)
00351 {
00352     /*
00353      * -d           debug mode
00354      */
00355     PLOptStatus os;
00356     PLOptState *opt;
00357        program_name = argv[0];
00358 
00359     opt = PL_CreateOptState(argc, argv, "dp:");
00360     while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
00361     {
00362         if (PL_OPT_BAD == os) continue;
00363         switch (opt->option)
00364         {
00365         case 'd':  /* debug mode */
00366             _debug_on = 1;
00367             break;
00368         case 'p':
00369             server_port = atoi(opt->value);
00370             break;
00371         default:
00372             break;
00373         }
00374     }
00375     PL_DestroyOptState(opt);
00376 
00377     PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0);
00378     PR_STDIO_INIT();
00379 
00380 #ifdef XP_MAC
00381     SetupMacPrintfLog("socket.log");
00382 #endif
00383     PR_SetConcurrency(4);
00384 
00385        TCP_Socket_Client_Server_Test();
00386 
00387     PR_Cleanup();
00388     if (failed_already)
00389               return 1;
00390     else
00391               return 0;
00392 }