Back to index

tor  0.2.3.18-rc
cpuworker.c
Go to the documentation of this file.
00001 /* Copyright (c) 2003-2004, Roger Dingledine.
00002  * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
00003  * Copyright (c) 2007-2012, The Tor Project, Inc. */
00004 /* See LICENSE for licensing information */
00005 
00015 #include "or.h"
00016 #include "buffers.h"
00017 #include "circuitbuild.h"
00018 #include "circuitlist.h"
00019 #include "config.h"
00020 #include "connection.h"
00021 #include "cpuworker.h"
00022 #include "main.h"
00023 #include "onion.h"
00024 #include "router.h"
00025 
00027 #define MAX_CPUWORKERS 16
00028 
00029 #define MIN_CPUWORKERS 1
00030 
00032 #define TAG_LEN 10
00033 
00034 #define LEN_ONION_RESPONSE \
00035   (1+TAG_LEN+ONIONSKIN_REPLY_LEN+CPATH_KEY_MATERIAL_LEN)
00036 
00038 static int num_cpuworkers=0;
00040 static int num_cpuworkers_busy=0;
00044 static time_t last_rotation_time=0;
00045 
00046 static void cpuworker_main(void *data) ATTR_NORETURN;
00047 static int spawn_cpuworker(void);
00048 static void spawn_enough_cpuworkers(void);
00049 static void process_pending_task(connection_t *cpuworker);
00050 
00053 void
00054 cpu_init(void)
00055 {
00056   cpuworkers_rotate();
00057 }
00058 
00060 int
00061 connection_cpu_finished_flushing(connection_t *conn)
00062 {
00063   tor_assert(conn);
00064   tor_assert(conn->type == CONN_TYPE_CPUWORKER);
00065   return 0;
00066 }
00067 
00070 static void
00071 tag_pack(char *tag, uint64_t conn_id, circid_t circ_id)
00072 {
00073   /*XXXX RETHINK THIS WHOLE MESS !!!! !NM NM NM NM*/
00074   set_uint64(tag, conn_id);
00075   set_uint16(tag+8, circ_id);
00076 }
00077 
00080 static void
00081 tag_unpack(const char *tag, uint64_t *conn_id, circid_t *circ_id)
00082 {
00083   *conn_id = get_uint64(tag);
00084   *circ_id = get_uint16(tag+8);
00085 }
00086 
00091 void
00092 cpuworkers_rotate(void)
00093 {
00094   connection_t *cpuworker;
00095   while ((cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER,
00096                                                    CPUWORKER_STATE_IDLE))) {
00097     connection_mark_for_close(cpuworker);
00098     --num_cpuworkers;
00099   }
00100   last_rotation_time = time(NULL);
00101   if (server_mode(get_options()))
00102     spawn_enough_cpuworkers();
00103 }
00104 
00107 int
00108 connection_cpu_reached_eof(connection_t *conn)
00109 {
00110   log_warn(LD_GENERAL,"Read eof. CPU worker died unexpectedly.");
00111   if (conn->state != CPUWORKER_STATE_IDLE) {
00112     /* the circ associated with this cpuworker will have to wait until
00113      * it gets culled in run_connection_housekeeping(), since we have
00114      * no way to find out which circ it was. */
00115     log_warn(LD_GENERAL,"...and it left a circuit queued; abandoning circ.");
00116     num_cpuworkers_busy--;
00117   }
00118   num_cpuworkers--;
00119   spawn_enough_cpuworkers(); /* try to regrow. hope we don't end up
00120                                 spinning. */
00121   connection_mark_for_close(conn);
00122   return 0;
00123 }
00124 
00129 int
00130 connection_cpu_process_inbuf(connection_t *conn)
00131 {
00132   char success;
00133   char buf[LEN_ONION_RESPONSE];
00134   uint64_t conn_id;
00135   circid_t circ_id;
00136   connection_t *tmp_conn;
00137   or_connection_t *p_conn = NULL;
00138   circuit_t *circ;
00139 
00140   tor_assert(conn);
00141   tor_assert(conn->type == CONN_TYPE_CPUWORKER);
00142 
00143   if (!connection_get_inbuf_len(conn))
00144     return 0;
00145 
00146   if (conn->state == CPUWORKER_STATE_BUSY_ONION) {
00147     if (connection_get_inbuf_len(conn) < LEN_ONION_RESPONSE)
00148       return 0; /* not yet */
00149     tor_assert(connection_get_inbuf_len(conn) == LEN_ONION_RESPONSE);
00150 
00151     connection_fetch_from_buf(&success,1,conn);
00152     connection_fetch_from_buf(buf,LEN_ONION_RESPONSE-1,conn);
00153 
00154     /* parse out the circ it was talking about */
00155     tag_unpack(buf, &conn_id, &circ_id);
00156     circ = NULL;
00157     tmp_conn = connection_get_by_global_id(conn_id);
00158     if (tmp_conn && !tmp_conn->marked_for_close &&
00159         tmp_conn->type == CONN_TYPE_OR)
00160       p_conn = TO_OR_CONN(tmp_conn);
00161 
00162     if (p_conn)
00163       circ = circuit_get_by_circid_orconn(circ_id, p_conn);
00164 
00165     if (success == 0) {
00166       log_debug(LD_OR,
00167                 "decoding onionskin failed. "
00168                 "(Old key or bad software.) Closing.");
00169       if (circ)
00170         circuit_mark_for_close(circ, END_CIRC_REASON_TORPROTOCOL);
00171       goto done_processing;
00172     }
00173     if (!circ) {
00174       /* This happens because somebody sends us a destroy cell and the
00175        * circuit goes away, while the cpuworker is working. This is also
00176        * why our tag doesn't include a pointer to the circ, because we'd
00177        * never know if it's still valid.
00178        */
00179       log_debug(LD_OR,"processed onion for a circ that's gone. Dropping.");
00180       goto done_processing;
00181     }
00182     tor_assert(! CIRCUIT_IS_ORIGIN(circ));
00183     if (onionskin_answer(TO_OR_CIRCUIT(circ), CELL_CREATED, buf+TAG_LEN,
00184                          buf+TAG_LEN+ONIONSKIN_REPLY_LEN) < 0) {
00185       log_warn(LD_OR,"onionskin_answer failed. Closing.");
00186       circuit_mark_for_close(circ, END_CIRC_REASON_INTERNAL);
00187       goto done_processing;
00188     }
00189     log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
00190   } else {
00191     tor_assert(0); /* don't ask me to do handshakes yet */
00192   }
00193 
00194  done_processing:
00195   conn->state = CPUWORKER_STATE_IDLE;
00196   num_cpuworkers_busy--;
00197   if (conn->timestamp_created < last_rotation_time) {
00198     connection_mark_for_close(conn);
00199     num_cpuworkers--;
00200     spawn_enough_cpuworkers();
00201   } else {
00202     process_pending_task(conn);
00203   }
00204   return 0;
00205 }
00206 
00223 static void
00224 cpuworker_main(void *data)
00225 {
00226   char question[ONIONSKIN_CHALLENGE_LEN];
00227   uint8_t question_type;
00228   tor_socket_t *fdarray = data;
00229   tor_socket_t fd;
00230 
00231   /* variables for onion processing */
00232   char keys[CPATH_KEY_MATERIAL_LEN];
00233   char reply_to_proxy[ONIONSKIN_REPLY_LEN];
00234   char buf[LEN_ONION_RESPONSE];
00235   char tag[TAG_LEN];
00236   crypto_pk_t *onion_key = NULL, *last_onion_key = NULL;
00237 
00238   fd = fdarray[1]; /* this side is ours */
00239 #ifndef TOR_IS_MULTITHREADED
00240   tor_close_socket(fdarray[0]); /* this is the side of the socketpair the
00241                                  * parent uses */
00242   tor_free_all(1); /* so the child doesn't hold the parent's fd's open */
00243   handle_signals(0); /* ignore interrupts from the keyboard, etc */
00244 #endif
00245   tor_free(data);
00246 
00247   dup_onion_keys(&onion_key, &last_onion_key);
00248 
00249   for (;;) {
00250     ssize_t r;
00251 
00252     if ((r = recv(fd, (void *)&question_type, 1, 0)) != 1) {
00253 //      log_fn(LOG_ERR,"read type failed. Exiting.");
00254       if (r == 0) {
00255         log_info(LD_OR,
00256                  "CPU worker exiting because Tor process closed connection "
00257                  "(either rotated keys or died).");
00258       } else {
00259         log_info(LD_OR,
00260                  "CPU worker exiting because of error on connection to Tor "
00261                  "process.");
00262         log_info(LD_OR,"(Error on %d was %s)",
00263                  fd, tor_socket_strerror(tor_socket_errno(fd)));
00264       }
00265       goto end;
00266     }
00267     tor_assert(question_type == CPUWORKER_TASK_ONION);
00268 
00269     if (read_all(fd, tag, TAG_LEN, 1) != TAG_LEN) {
00270       log_err(LD_BUG,"read tag failed. Exiting.");
00271       goto end;
00272     }
00273 
00274     if (read_all(fd, question, ONIONSKIN_CHALLENGE_LEN, 1) !=
00275         ONIONSKIN_CHALLENGE_LEN) {
00276       log_err(LD_BUG,"read question failed. Exiting.");
00277       goto end;
00278     }
00279 
00280     if (question_type == CPUWORKER_TASK_ONION) {
00281       if (onion_skin_server_handshake(question, onion_key, last_onion_key,
00282           reply_to_proxy, keys, CPATH_KEY_MATERIAL_LEN) < 0) {
00283         /* failure */
00284         log_debug(LD_OR,"onion_skin_server_handshake failed.");
00285         *buf = 0; /* indicate failure in first byte */
00286         memcpy(buf+1,tag,TAG_LEN);
00287         /* send all zeros as answer */
00288         memset(buf+1+TAG_LEN, 0, LEN_ONION_RESPONSE-(1+TAG_LEN));
00289       } else {
00290         /* success */
00291         log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
00292         buf[0] = 1; /* 1 means success */
00293         memcpy(buf+1,tag,TAG_LEN);
00294         memcpy(buf+1+TAG_LEN,reply_to_proxy,ONIONSKIN_REPLY_LEN);
00295         memcpy(buf+1+TAG_LEN+ONIONSKIN_REPLY_LEN,keys,CPATH_KEY_MATERIAL_LEN);
00296       }
00297       if (write_all(fd, buf, LEN_ONION_RESPONSE, 1) != LEN_ONION_RESPONSE) {
00298         log_err(LD_BUG,"writing response buf failed. Exiting.");
00299         goto end;
00300       }
00301       log_debug(LD_OR,"finished writing response.");
00302     }
00303   }
00304  end:
00305   if (onion_key)
00306     crypto_pk_free(onion_key);
00307   if (last_onion_key)
00308     crypto_pk_free(last_onion_key);
00309   tor_close_socket(fd);
00310   crypto_thread_cleanup();
00311   spawn_exit();
00312 }
00313 
00316 static int
00317 spawn_cpuworker(void)
00318 {
00319   tor_socket_t *fdarray;
00320   tor_socket_t fd;
00321   connection_t *conn;
00322   int err;
00323 
00324   fdarray = tor_malloc(sizeof(tor_socket_t)*2);
00325   if ((err = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, fdarray)) < 0) {
00326     log_warn(LD_NET, "Couldn't construct socketpair for cpuworker: %s",
00327              tor_socket_strerror(-err));
00328     tor_free(fdarray);
00329     return -1;
00330   }
00331 
00332   tor_assert(SOCKET_OK(fdarray[0]));
00333   tor_assert(SOCKET_OK(fdarray[1]));
00334 
00335   fd = fdarray[0];
00336   spawn_func(cpuworker_main, (void*)fdarray);
00337   log_debug(LD_OR,"just spawned a cpu worker.");
00338 #ifndef TOR_IS_MULTITHREADED
00339   tor_close_socket(fdarray[1]); /* don't need the worker's side of the pipe */
00340   tor_free(fdarray);
00341 #endif
00342 
00343   conn = connection_new(CONN_TYPE_CPUWORKER, AF_UNIX);
00344 
00345   set_socket_nonblocking(fd);
00346 
00347   /* set up conn so it's got all the data we need to remember */
00348   conn->s = fd;
00349   conn->address = tor_strdup("localhost");
00350   tor_addr_make_unspec(&conn->addr);
00351 
00352   if (connection_add(conn) < 0) { /* no space, forget it */
00353     log_warn(LD_NET,"connection_add for cpuworker failed. Giving up.");
00354     connection_free(conn); /* this closes fd */
00355     return -1;
00356   }
00357 
00358   conn->state = CPUWORKER_STATE_IDLE;
00359   connection_start_reading(conn);
00360 
00361   return 0; /* success */
00362 }
00363 
00367 static void
00368 spawn_enough_cpuworkers(void)
00369 {
00370   int num_cpuworkers_needed = get_num_cpus(get_options());
00371 
00372   if (num_cpuworkers_needed < MIN_CPUWORKERS)
00373     num_cpuworkers_needed = MIN_CPUWORKERS;
00374   if (num_cpuworkers_needed > MAX_CPUWORKERS)
00375     num_cpuworkers_needed = MAX_CPUWORKERS;
00376 
00377   while (num_cpuworkers < num_cpuworkers_needed) {
00378     if (spawn_cpuworker() < 0) {
00379       log_warn(LD_GENERAL,"Cpuworker spawn failed. Will try again later.");
00380       return;
00381     }
00382     num_cpuworkers++;
00383   }
00384 }
00385 
00387 static void
00388 process_pending_task(connection_t *cpuworker)
00389 {
00390   or_circuit_t *circ;
00391   char *onionskin = NULL;
00392 
00393   tor_assert(cpuworker);
00394 
00395   /* for now only process onion tasks */
00396 
00397   circ = onion_next_task(&onionskin);
00398   if (!circ)
00399     return;
00400   if (assign_onionskin_to_cpuworker(cpuworker, circ, onionskin))
00401     log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring.");
00402 }
00403 
00408 #define CPUWORKER_BUSY_TIMEOUT (60*60*12)
00409 
00415 static void
00416 cull_wedged_cpuworkers(void)
00417 {
00418   time_t now = time(NULL);
00419   smartlist_t *conns = get_connection_array();
00420   SMARTLIST_FOREACH(conns, connection_t *, conn,
00421   {
00422     if (!conn->marked_for_close &&
00423         conn->type == CONN_TYPE_CPUWORKER &&
00424         conn->state == CPUWORKER_STATE_BUSY_ONION &&
00425         conn->timestamp_lastwritten + CPUWORKER_BUSY_TIMEOUT < now) {
00426       log_notice(LD_BUG,
00427                  "closing wedged cpuworker. Can somebody find the bug?");
00428       num_cpuworkers_busy--;
00429       num_cpuworkers--;
00430       connection_mark_for_close(conn);
00431     }
00432   });
00433 }
00434 
00443 int
00444 assign_onionskin_to_cpuworker(connection_t *cpuworker,
00445                               or_circuit_t *circ, char *onionskin)
00446 {
00447   char qbuf[1];
00448   char tag[TAG_LEN];
00449   time_t now = approx_time();
00450   static time_t last_culled_cpuworkers = 0;
00451 
00452   /* Checking for wedged cpuworkers requires a linear search over all
00453    * connections, so let's do it only once a minute.
00454    */
00455 #define CULL_CPUWORKERS_INTERVAL 60
00456 
00457   if (last_culled_cpuworkers + CULL_CPUWORKERS_INTERVAL <= now) {
00458     cull_wedged_cpuworkers();
00459     spawn_enough_cpuworkers();
00460     last_culled_cpuworkers = now;
00461   }
00462 
00463   if (1) {
00464     if (num_cpuworkers_busy == num_cpuworkers) {
00465       log_debug(LD_OR,"No idle cpuworkers. Queuing.");
00466       if (onion_pending_add(circ, onionskin) < 0) {
00467         tor_free(onionskin);
00468         return -1;
00469       }
00470       return 0;
00471     }
00472 
00473     if (!cpuworker)
00474       cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER,
00475                                                CPUWORKER_STATE_IDLE);
00476 
00477     tor_assert(cpuworker);
00478 
00479     if (!circ->p_conn) {
00480       log_info(LD_OR,"circ->p_conn gone. Failing circ.");
00481       tor_free(onionskin);
00482       return -1;
00483     }
00484     tag_pack(tag, circ->p_conn->_base.global_identifier,
00485              circ->p_circ_id);
00486 
00487     cpuworker->state = CPUWORKER_STATE_BUSY_ONION;
00488     /* touch the lastwritten timestamp, since that's how we check to
00489      * see how long it's been since we asked the question, and sometimes
00490      * we check before the first call to connection_handle_write(). */
00491     cpuworker->timestamp_lastwritten = time(NULL);
00492     num_cpuworkers_busy++;
00493 
00494     qbuf[0] = CPUWORKER_TASK_ONION;
00495     connection_write_to_buf(qbuf, 1, cpuworker);
00496     connection_write_to_buf(tag, sizeof(tag), cpuworker);
00497     connection_write_to_buf(onionskin, ONIONSKIN_CHALLENGE_LEN, cpuworker);
00498     tor_free(onionskin);
00499   }
00500   return 0;
00501 }
00502