Back to index

tor  0.2.3.18-rc
relay.c
Go to the documentation of this file.
00001 /* Copyright (c) 2001 Matej Pfajfar.
00002  * Copyright (c) 2001-2004, Roger Dingledine.
00003  * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
00004  * Copyright (c) 2007-2012, The Tor Project, Inc. */
00005 /* See LICENSE for licensing information */
00006 
00013 #include <math.h>
00014 #define RELAY_PRIVATE
00015 #include "or.h"
00016 #include "buffers.h"
00017 #include "circuitbuild.h"
00018 #include "circuitlist.h"
00019 #include "config.h"
00020 #include "connection.h"
00021 #include "connection_edge.h"
00022 #include "connection_or.h"
00023 #include "control.h"
00024 #include "geoip.h"
00025 #include "main.h"
00026 #include "mempool.h"
00027 #include "networkstatus.h"
00028 #include "nodelist.h"
00029 #include "policies.h"
00030 #include "reasons.h"
00031 #include "relay.h"
00032 #include "rendcommon.h"
00033 #include "router.h"
00034 #include "routerlist.h"
00035 #include "routerparse.h"
00036 
00037 static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell,
00038                                             cell_direction_t cell_direction,
00039                                             crypt_path_t *layer_hint);
00040 
00041 static int connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
00042                                               edge_connection_t *conn,
00043                                               crypt_path_t *layer_hint);
00044 static void circuit_consider_sending_sendme(circuit_t *circ,
00045                                             crypt_path_t *layer_hint);
00046 static void circuit_resume_edge_reading(circuit_t *circ,
00047                                         crypt_path_t *layer_hint);
00048 static int circuit_resume_edge_reading_helper(edge_connection_t *conn,
00049                                               circuit_t *circ,
00050                                               crypt_path_t *layer_hint);
00051 static int circuit_consider_stop_edge_reading(circuit_t *circ,
00052                                               crypt_path_t *layer_hint);
00053 static int circuit_queue_streams_are_blocked(circuit_t *circ);
00054 
00057 #define CELL_QUEUE_HIGHWATER_SIZE 256
00058 
00060 #define CELL_QUEUE_LOWWATER_SIZE 64
00061 
00065 uint64_t stats_n_relay_cells_relayed = 0;
00069 uint64_t stats_n_relay_cells_delivered = 0;
00070 
00074 static void
00075 relay_set_digest(crypto_digest_t *digest, cell_t *cell)
00076 {
00077   char integrity[4];
00078   relay_header_t rh;
00079 
00080   crypto_digest_add_bytes(digest, (char*)cell->payload, CELL_PAYLOAD_SIZE);
00081   crypto_digest_get_digest(digest, integrity, 4);
00082 //  log_fn(LOG_DEBUG,"Putting digest of %u %u %u %u into relay cell.",
00083 //    integrity[0], integrity[1], integrity[2], integrity[3]);
00084   relay_header_unpack(&rh, cell->payload);
00085   memcpy(rh.integrity, integrity, 4);
00086   relay_header_pack(cell->payload, &rh);
00087 }
00088 
00095 static int
00096 relay_digest_matches(crypto_digest_t *digest, cell_t *cell)
00097 {
00098   char received_integrity[4], calculated_integrity[4];
00099   relay_header_t rh;
00100   crypto_digest_t *backup_digest=NULL;
00101 
00102   backup_digest = crypto_digest_dup(digest);
00103 
00104   relay_header_unpack(&rh, cell->payload);
00105   memcpy(received_integrity, rh.integrity, 4);
00106   memset(rh.integrity, 0, 4);
00107   relay_header_pack(cell->payload, &rh);
00108 
00109 //  log_fn(LOG_DEBUG,"Reading digest of %u %u %u %u from relay cell.",
00110 //    received_integrity[0], received_integrity[1],
00111 //    received_integrity[2], received_integrity[3]);
00112 
00113   crypto_digest_add_bytes(digest, (char*) cell->payload, CELL_PAYLOAD_SIZE);
00114   crypto_digest_get_digest(digest, calculated_integrity, 4);
00115 
00116   if (tor_memneq(received_integrity, calculated_integrity, 4)) {
00117 //    log_fn(LOG_INFO,"Recognized=0 but bad digest. Not recognizing.");
00118 // (%d vs %d).", received_integrity, calculated_integrity);
00119     /* restore digest to its old form */
00120     crypto_digest_assign(digest, backup_digest);
00121     /* restore the relay header */
00122     memcpy(rh.integrity, received_integrity, 4);
00123     relay_header_pack(cell->payload, &rh);
00124     crypto_digest_free(backup_digest);
00125     return 0;
00126   }
00127   crypto_digest_free(backup_digest);
00128   return 1;
00129 }
00130 
00138 static int
00139 relay_crypt_one_payload(crypto_cipher_t *cipher, uint8_t *in,
00140                         int encrypt_mode)
00141 {
00142   int r;
00143   (void)encrypt_mode;
00144   r = crypto_cipher_crypt_inplace(cipher, (char*) in, CELL_PAYLOAD_SIZE);
00145 
00146   if (r) {
00147     log_warn(LD_BUG,"Error during relay encryption");
00148     return -1;
00149   }
00150   return 0;
00151 }
00152 
00165 int
00166 circuit_receive_relay_cell(cell_t *cell, circuit_t *circ,
00167                            cell_direction_t cell_direction)
00168 {
00169   or_connection_t *or_conn=NULL;
00170   crypt_path_t *layer_hint=NULL;
00171   char recognized=0;
00172   int reason;
00173 
00174   tor_assert(cell);
00175   tor_assert(circ);
00176   tor_assert(cell_direction == CELL_DIRECTION_OUT ||
00177              cell_direction == CELL_DIRECTION_IN);
00178   if (circ->marked_for_close)
00179     return 0;
00180 
00181   if (relay_crypt(circ, cell, cell_direction, &layer_hint, &recognized) < 0) {
00182     log_warn(LD_BUG,"relay crypt failed. Dropping connection.");
00183     return -END_CIRC_REASON_INTERNAL;
00184   }
00185 
00186   if (recognized) {
00187     edge_connection_t *conn = relay_lookup_conn(circ, cell, cell_direction,
00188                                                 layer_hint);
00189     if (cell_direction == CELL_DIRECTION_OUT) {
00190       ++stats_n_relay_cells_delivered;
00191       log_debug(LD_OR,"Sending away from origin.");
00192       if ((reason=connection_edge_process_relay_cell(cell, circ, conn, NULL))
00193           < 0) {
00194         log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
00195                "connection_edge_process_relay_cell (away from origin) "
00196                "failed.");
00197         return reason;
00198       }
00199     }
00200     if (cell_direction == CELL_DIRECTION_IN) {
00201       ++stats_n_relay_cells_delivered;
00202       log_debug(LD_OR,"Sending to origin.");
00203       if ((reason = connection_edge_process_relay_cell(cell, circ, conn,
00204                                                        layer_hint)) < 0) {
00205         log_warn(LD_OR,
00206                  "connection_edge_process_relay_cell (at origin) failed.");
00207         return reason;
00208       }
00209     }
00210     return 0;
00211   }
00212 
00213   /* not recognized. pass it on. */
00214   if (cell_direction == CELL_DIRECTION_OUT) {
00215     cell->circ_id = circ->n_circ_id; /* switch it */
00216     or_conn = circ->n_conn;
00217   } else if (! CIRCUIT_IS_ORIGIN(circ)) {
00218     cell->circ_id = TO_OR_CIRCUIT(circ)->p_circ_id; /* switch it */
00219     or_conn = TO_OR_CIRCUIT(circ)->p_conn;
00220   } else {
00221     log_fn(LOG_PROTOCOL_WARN, LD_OR,
00222            "Dropping unrecognized inbound cell on origin circuit.");
00223     return 0;
00224   }
00225 
00226   if (!or_conn) {
00227     // XXXX Can this splice stuff be done more cleanly?
00228     if (! CIRCUIT_IS_ORIGIN(circ) &&
00229         TO_OR_CIRCUIT(circ)->rend_splice &&
00230         cell_direction == CELL_DIRECTION_OUT) {
00231       or_circuit_t *splice = TO_OR_CIRCUIT(circ)->rend_splice;
00232       tor_assert(circ->purpose == CIRCUIT_PURPOSE_REND_ESTABLISHED);
00233       tor_assert(splice->_base.purpose == CIRCUIT_PURPOSE_REND_ESTABLISHED);
00234       cell->circ_id = splice->p_circ_id;
00235       cell->command = CELL_RELAY; /* can't be relay_early anyway */
00236       if ((reason = circuit_receive_relay_cell(cell, TO_CIRCUIT(splice),
00237                                                CELL_DIRECTION_IN)) < 0) {
00238         log_warn(LD_REND, "Error relaying cell across rendezvous; closing "
00239                  "circuits");
00240         /* XXXX Do this here, or just return -1? */
00241         circuit_mark_for_close(circ, -reason);
00242         return reason;
00243       }
00244       return 0;
00245     }
00246     log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
00247            "Didn't recognize cell, but circ stops here! Closing circ.");
00248     return -END_CIRC_REASON_TORPROTOCOL;
00249   }
00250 
00251   log_debug(LD_OR,"Passing on unrecognized cell.");
00252 
00253   ++stats_n_relay_cells_relayed; /* XXXX no longer quite accurate {cells}
00254                                   * we might kill the circ before we relay
00255                                   * the cells. */
00256 
00257   append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction, 0);
00258   return 0;
00259 }
00260 
00278 int
00279 relay_crypt(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction,
00280             crypt_path_t **layer_hint, char *recognized)
00281 {
00282   relay_header_t rh;
00283 
00284   tor_assert(circ);
00285   tor_assert(cell);
00286   tor_assert(recognized);
00287   tor_assert(cell_direction == CELL_DIRECTION_IN ||
00288              cell_direction == CELL_DIRECTION_OUT);
00289 
00290   if (cell_direction == CELL_DIRECTION_IN) {
00291     if (CIRCUIT_IS_ORIGIN(circ)) { /* We're at the beginning of the circuit.
00292                                     * We'll want to do layered decrypts. */
00293       crypt_path_t *thishop, *cpath = TO_ORIGIN_CIRCUIT(circ)->cpath;
00294       thishop = cpath;
00295       if (thishop->state != CPATH_STATE_OPEN) {
00296         log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
00297                "Relay cell before first created cell? Closing.");
00298         return -1;
00299       }
00300       do { /* Remember: cpath is in forward order, that is, first hop first. */
00301         tor_assert(thishop);
00302 
00303         if (relay_crypt_one_payload(thishop->b_crypto, cell->payload, 0) < 0)
00304           return -1;
00305 
00306         relay_header_unpack(&rh, cell->payload);
00307         if (rh.recognized == 0) {
00308           /* it's possibly recognized. have to check digest to be sure. */
00309           if (relay_digest_matches(thishop->b_digest, cell)) {
00310             *recognized = 1;
00311             *layer_hint = thishop;
00312             return 0;
00313           }
00314         }
00315 
00316         thishop = thishop->next;
00317       } while (thishop != cpath && thishop->state == CPATH_STATE_OPEN);
00318       log_fn(LOG_PROTOCOL_WARN, LD_OR,
00319              "Incoming cell at client not recognized. Closing.");
00320       return -1;
00321     } else { /* we're in the middle. Just one crypt. */
00322       if (relay_crypt_one_payload(TO_OR_CIRCUIT(circ)->p_crypto,
00323                                   cell->payload, 1) < 0)
00324         return -1;
00325 //      log_fn(LOG_DEBUG,"Skipping recognized check, because we're not "
00326 //             "the client.");
00327     }
00328   } else /* cell_direction == CELL_DIRECTION_OUT */ {
00329     /* we're in the middle. Just one crypt. */
00330 
00331     if (relay_crypt_one_payload(TO_OR_CIRCUIT(circ)->n_crypto,
00332                                 cell->payload, 0) < 0)
00333       return -1;
00334 
00335     relay_header_unpack(&rh, cell->payload);
00336     if (rh.recognized == 0) {
00337       /* it's possibly recognized. have to check digest to be sure. */
00338       if (relay_digest_matches(TO_OR_CIRCUIT(circ)->n_digest, cell)) {
00339         *recognized = 1;
00340         return 0;
00341       }
00342     }
00343   }
00344   return 0;
00345 }
00346 
00351 static int
00352 circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
00353                            cell_direction_t cell_direction,
00354                            crypt_path_t *layer_hint, streamid_t on_stream)
00355 {
00356   or_connection_t *conn; /* where to send the cell */
00357 
00358   if (cell_direction == CELL_DIRECTION_OUT) {
00359     crypt_path_t *thishop; /* counter for repeated crypts */
00360     conn = circ->n_conn;
00361     if (!CIRCUIT_IS_ORIGIN(circ) || !conn) {
00362       log_warn(LD_BUG,"outgoing relay cell has n_conn==NULL. Dropping.");
00363       return 0; /* just drop it */
00364     }
00365 
00366     relay_set_digest(layer_hint->f_digest, cell);
00367 
00368     thishop = layer_hint;
00369     /* moving from farthest to nearest hop */
00370     do {
00371       tor_assert(thishop);
00372       /* XXXX RD This is a bug, right? */
00373       log_debug(LD_OR,"crypting a layer of the relay cell.");
00374       if (relay_crypt_one_payload(thishop->f_crypto, cell->payload, 1) < 0) {
00375         return -1;
00376       }
00377 
00378       thishop = thishop->prev;
00379     } while (thishop != TO_ORIGIN_CIRCUIT(circ)->cpath->prev);
00380 
00381   } else { /* incoming cell */
00382     or_circuit_t *or_circ;
00383     if (CIRCUIT_IS_ORIGIN(circ)) {
00384       /* We should never package an _incoming_ cell from the circuit
00385        * origin; that means we messed up somewhere. */
00386       log_warn(LD_BUG,"incoming relay cell at origin circuit. Dropping.");
00387       assert_circuit_ok(circ);
00388       return 0; /* just drop it */
00389     }
00390     or_circ = TO_OR_CIRCUIT(circ);
00391     conn = or_circ->p_conn;
00392     relay_set_digest(or_circ->p_digest, cell);
00393     if (relay_crypt_one_payload(or_circ->p_crypto, cell->payload, 1) < 0)
00394       return -1;
00395   }
00396   ++stats_n_relay_cells_relayed;
00397 
00398   append_cell_to_circuit_queue(circ, conn, cell, cell_direction, on_stream);
00399   return 0;
00400 }
00401 
00405 static edge_connection_t *
00406 relay_lookup_conn(circuit_t *circ, cell_t *cell,
00407                   cell_direction_t cell_direction, crypt_path_t *layer_hint)
00408 {
00409   edge_connection_t *tmpconn;
00410   relay_header_t rh;
00411 
00412   relay_header_unpack(&rh, cell->payload);
00413 
00414   if (!rh.stream_id)
00415     return NULL;
00416 
00417   /* IN or OUT cells could have come from either direction, now
00418    * that we allow rendezvous *to* an OP.
00419    */
00420 
00421   if (CIRCUIT_IS_ORIGIN(circ)) {
00422     for (tmpconn = TO_ORIGIN_CIRCUIT(circ)->p_streams; tmpconn;
00423          tmpconn=tmpconn->next_stream) {
00424       if (rh.stream_id == tmpconn->stream_id &&
00425           !tmpconn->_base.marked_for_close &&
00426           tmpconn->cpath_layer == layer_hint) {
00427         log_debug(LD_APP,"found conn for stream %d.", rh.stream_id);
00428         return tmpconn;
00429       }
00430     }
00431   } else {
00432     for (tmpconn = TO_OR_CIRCUIT(circ)->n_streams; tmpconn;
00433          tmpconn=tmpconn->next_stream) {
00434       if (rh.stream_id == tmpconn->stream_id &&
00435           !tmpconn->_base.marked_for_close) {
00436         log_debug(LD_EXIT,"found conn for stream %d.", rh.stream_id);
00437         if (cell_direction == CELL_DIRECTION_OUT ||
00438             connection_edge_is_rendezvous_stream(tmpconn))
00439           return tmpconn;
00440       }
00441     }
00442     for (tmpconn = TO_OR_CIRCUIT(circ)->resolving_streams; tmpconn;
00443          tmpconn=tmpconn->next_stream) {
00444       if (rh.stream_id == tmpconn->stream_id &&
00445           !tmpconn->_base.marked_for_close) {
00446         log_debug(LD_EXIT,"found conn for stream %d.", rh.stream_id);
00447         return tmpconn;
00448       }
00449     }
00450   }
00451   return NULL; /* probably a begin relay cell */
00452 }
00453 
00458 void
00459 relay_header_pack(uint8_t *dest, const relay_header_t *src)
00460 {
00461   set_uint8(dest, src->command);
00462   set_uint16(dest+1, htons(src->recognized));
00463   set_uint16(dest+3, htons(src->stream_id));
00464   memcpy(dest+5, src->integrity, 4);
00465   set_uint16(dest+9, htons(src->length));
00466 }
00467 
00471 void
00472 relay_header_unpack(relay_header_t *dest, const uint8_t *src)
00473 {
00474   dest->command = get_uint8(src);
00475   dest->recognized = ntohs(get_uint16(src+1));
00476   dest->stream_id = ntohs(get_uint16(src+3));
00477   memcpy(dest->integrity, src+5, 4);
00478   dest->length = ntohs(get_uint16(src+9));
00479 }
00480 
00482 static const char *
00483 relay_command_to_string(uint8_t command)
00484 {
00485   switch (command) {
00486     case RELAY_COMMAND_BEGIN: return "BEGIN";
00487     case RELAY_COMMAND_DATA: return "DATA";
00488     case RELAY_COMMAND_END: return "END";
00489     case RELAY_COMMAND_CONNECTED: return "CONNECTED";
00490     case RELAY_COMMAND_SENDME: return "SENDME";
00491     case RELAY_COMMAND_EXTEND: return "EXTEND";
00492     case RELAY_COMMAND_EXTENDED: return "EXTENDED";
00493     case RELAY_COMMAND_TRUNCATE: return "TRUNCATE";
00494     case RELAY_COMMAND_TRUNCATED: return "TRUNCATED";
00495     case RELAY_COMMAND_DROP: return "DROP";
00496     case RELAY_COMMAND_RESOLVE: return "RESOLVE";
00497     case RELAY_COMMAND_RESOLVED: return "RESOLVED";
00498     case RELAY_COMMAND_BEGIN_DIR: return "BEGIN_DIR";
00499     case RELAY_COMMAND_ESTABLISH_INTRO: return "ESTABLISH_INTRO";
00500     case RELAY_COMMAND_ESTABLISH_RENDEZVOUS: return "ESTABLISH_RENDEZVOUS";
00501     case RELAY_COMMAND_INTRODUCE1: return "INTRODUCE1";
00502     case RELAY_COMMAND_INTRODUCE2: return "INTRODUCE2";
00503     case RELAY_COMMAND_RENDEZVOUS1: return "RENDEZVOUS1";
00504     case RELAY_COMMAND_RENDEZVOUS2: return "RENDEZVOUS2";
00505     case RELAY_COMMAND_INTRO_ESTABLISHED: return "INTRO_ESTABLISHED";
00506     case RELAY_COMMAND_RENDEZVOUS_ESTABLISHED:
00507       return "RENDEZVOUS_ESTABLISHED";
00508     case RELAY_COMMAND_INTRODUCE_ACK: return "INTRODUCE_ACK";
00509     default: return "(unrecognized)";
00510   }
00511 }
00512 
00522 int
00523 relay_send_command_from_edge(streamid_t stream_id, circuit_t *circ,
00524                              uint8_t relay_command, const char *payload,
00525                              size_t payload_len, crypt_path_t *cpath_layer)
00526 {
00527   cell_t cell;
00528   relay_header_t rh;
00529   cell_direction_t cell_direction;
00530   /* XXXX NM Split this function into a separate versions per circuit type? */
00531 
00532   tor_assert(circ);
00533   tor_assert(payload_len <= RELAY_PAYLOAD_SIZE);
00534 
00535   memset(&cell, 0, sizeof(cell_t));
00536   cell.command = CELL_RELAY;
00537   if (cpath_layer) {
00538     cell.circ_id = circ->n_circ_id;
00539     cell_direction = CELL_DIRECTION_OUT;
00540   } else if (! CIRCUIT_IS_ORIGIN(circ)) {
00541     cell.circ_id = TO_OR_CIRCUIT(circ)->p_circ_id;
00542     cell_direction = CELL_DIRECTION_IN;
00543   } else {
00544     return -1;
00545   }
00546 
00547   memset(&rh, 0, sizeof(rh));
00548   rh.command = relay_command;
00549   rh.stream_id = stream_id;
00550   rh.length = payload_len;
00551   relay_header_pack(cell.payload, &rh);
00552   if (payload_len)
00553     memcpy(cell.payload+RELAY_HEADER_SIZE, payload, payload_len);
00554 
00555   log_debug(LD_OR,"delivering %d cell %s.", relay_command,
00556             cell_direction == CELL_DIRECTION_OUT ? "forward" : "backward");
00557 
00558   /* If we are sending an END cell and this circuit is used for a tunneled
00559    * directory request, advance its state. */
00560   if (relay_command == RELAY_COMMAND_END && circ->dirreq_id)
00561     geoip_change_dirreq_state(circ->dirreq_id, DIRREQ_TUNNELED,
00562                               DIRREQ_END_CELL_SENT);
00563 
00564   if (cell_direction == CELL_DIRECTION_OUT && circ->n_conn) {
00565     /* if we're using relaybandwidthrate, this conn wants priority */
00566     circ->n_conn->client_used = approx_time();
00567   }
00568 
00569   if (cell_direction == CELL_DIRECTION_OUT) {
00570     origin_circuit_t *origin_circ = TO_ORIGIN_CIRCUIT(circ);
00571     if (origin_circ->remaining_relay_early_cells > 0 &&
00572         (relay_command == RELAY_COMMAND_EXTEND ||
00573          cpath_layer != origin_circ->cpath)) {
00574       /* If we've got any relay_early cells left and (we're sending
00575        * an extend cell or we're not talking to the first hop), use
00576        * one of them.  Don't worry about the conn protocol version:
00577        * append_cell_to_circuit_queue will fix it up. */
00578       cell.command = CELL_RELAY_EARLY;
00579       --origin_circ->remaining_relay_early_cells;
00580       log_debug(LD_OR, "Sending a RELAY_EARLY cell; %d remaining.",
00581                 (int)origin_circ->remaining_relay_early_cells);
00582       /* Memorize the command that is sent as RELAY_EARLY cell; helps debug
00583        * task 878. */
00584       origin_circ->relay_early_commands[
00585           origin_circ->relay_early_cells_sent++] = relay_command;
00586     } else if (relay_command == RELAY_COMMAND_EXTEND) {
00587       /* If no RELAY_EARLY cells can be sent over this circuit, log which
00588        * commands have been sent as RELAY_EARLY cells before; helps debug
00589        * task 878. */
00590       smartlist_t *commands_list = smartlist_new();
00591       int i = 0;
00592       char *commands = NULL;
00593       for (; i < origin_circ->relay_early_cells_sent; i++)
00594         smartlist_add(commands_list, (char *)
00595             relay_command_to_string(origin_circ->relay_early_commands[i]));
00596       commands = smartlist_join_strings(commands_list, ",", 0, NULL);
00597       log_warn(LD_BUG, "Uh-oh.  We're sending a RELAY_COMMAND_EXTEND cell, "
00598                "but we have run out of RELAY_EARLY cells on that circuit. "
00599                "Commands sent before: %s", commands);
00600       tor_free(commands);
00601       smartlist_free(commands_list);
00602     }
00603   }
00604 
00605   if (circuit_package_relay_cell(&cell, circ, cell_direction, cpath_layer,
00606                                  stream_id) < 0) {
00607     log_warn(LD_BUG,"circuit_package_relay_cell failed. Closing.");
00608     circuit_mark_for_close(circ, END_CIRC_REASON_INTERNAL);
00609     return -1;
00610   }
00611   return 0;
00612 }
00613 
00623 int
00624 connection_edge_send_command(edge_connection_t *fromconn,
00625                              uint8_t relay_command, const char *payload,
00626                              size_t payload_len)
00627 {
00628   /* XXXX NM Split this function into a separate versions per circuit type? */
00629   circuit_t *circ;
00630   crypt_path_t *cpath_layer = fromconn->cpath_layer;
00631   tor_assert(fromconn);
00632   circ = fromconn->on_circuit;
00633 
00634   if (fromconn->_base.marked_for_close) {
00635     log_warn(LD_BUG,
00636              "called on conn that's already marked for close at %s:%d.",
00637              fromconn->_base.marked_for_close_file,
00638              fromconn->_base.marked_for_close);
00639     return 0;
00640   }
00641 
00642   if (!circ) {
00643     if (fromconn->_base.type == CONN_TYPE_AP) {
00644       log_info(LD_APP,"no circ. Closing conn.");
00645       connection_mark_unattached_ap(EDGE_TO_ENTRY_CONN(fromconn),
00646                                     END_STREAM_REASON_INTERNAL);
00647     } else {
00648       log_info(LD_EXIT,"no circ. Closing conn.");
00649       fromconn->edge_has_sent_end = 1; /* no circ to send to */
00650       fromconn->end_reason = END_STREAM_REASON_INTERNAL;
00651       connection_mark_for_close(TO_CONN(fromconn));
00652     }
00653     return -1;
00654   }
00655 
00656   return relay_send_command_from_edge(fromconn->stream_id, circ,
00657                                       relay_command, payload,
00658                                       payload_len, cpath_layer);
00659 }
00660 
00664 #define MAX_RESOLVE_FAILURES 3
00665 
00668 static int
00669 edge_reason_is_retriable(int reason)
00670 {
00671   return reason == END_STREAM_REASON_HIBERNATING ||
00672          reason == END_STREAM_REASON_RESOURCELIMIT ||
00673          reason == END_STREAM_REASON_EXITPOLICY ||
00674          reason == END_STREAM_REASON_RESOLVEFAILED ||
00675          reason == END_STREAM_REASON_MISC ||
00676          reason == END_STREAM_REASON_NOROUTE;
00677 }
00678 
00683 static int
00684 connection_ap_process_end_not_open(
00685     relay_header_t *rh, cell_t *cell, origin_circuit_t *circ,
00686     entry_connection_t *conn, crypt_path_t *layer_hint)
00687 {
00688   struct in_addr in;
00689   node_t *exitrouter;
00690   int reason = *(cell->payload+RELAY_HEADER_SIZE);
00691   int control_reason = reason | END_STREAM_REASON_FLAG_REMOTE;
00692   edge_connection_t *edge_conn = ENTRY_TO_EDGE_CONN(conn);
00693   (void) layer_hint; /* unused */
00694 
00695   if (rh->length > 0 && edge_reason_is_retriable(reason) &&
00696       /* avoid retry if rend */
00697       !connection_edge_is_rendezvous_stream(edge_conn)) {
00698     const char *chosen_exit_digest =
00699       circ->build_state->chosen_exit->identity_digest;
00700     log_info(LD_APP,"Address '%s' refused due to '%s'. Considering retrying.",
00701              safe_str(conn->socks_request->address),
00702              stream_end_reason_to_string(reason));
00703     exitrouter = node_get_mutable_by_id(chosen_exit_digest);
00704     switch (reason) {
00705       case END_STREAM_REASON_EXITPOLICY:
00706         if (rh->length >= 5) {
00707           uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+1));
00708           int ttl;
00709           if (!addr) {
00710             log_info(LD_APP,"Address '%s' resolved to 0.0.0.0. Closing,",
00711                      safe_str(conn->socks_request->address));
00712             connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL);
00713             return 0;
00714           }
00715           if (rh->length >= 9)
00716             ttl = (int)ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+5));
00717           else
00718             ttl = -1;
00719 
00720           if (get_options()->ClientDNSRejectInternalAddresses &&
00721               is_internal_IP(addr, 0)) {
00722             log_info(LD_APP,"Address '%s' resolved to internal. Closing,",
00723                      safe_str(conn->socks_request->address));
00724             connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL);
00725             return 0;
00726           }
00727           client_dns_set_addressmap(conn->socks_request->address, addr,
00728                                     conn->chosen_exit_name, ttl);
00729         }
00730         /* check if he *ought* to have allowed it */
00731         if (exitrouter &&
00732             (rh->length < 5 ||
00733              (tor_inet_aton(conn->socks_request->address, &in) &&
00734               !conn->chosen_exit_name))) {
00735           log_info(LD_APP,
00736                  "Exitrouter %s seems to be more restrictive than its exit "
00737                  "policy. Not using this router as exit for now.",
00738                  node_describe(exitrouter));
00739           policies_set_node_exitpolicy_to_reject_all(exitrouter);
00740         }
00741         /* rewrite it to an IP if we learned one. */
00742         if (addressmap_rewrite(conn->socks_request->address,
00743                                sizeof(conn->socks_request->address),
00744                                NULL, NULL)) {
00745           control_event_stream_status(conn, STREAM_EVENT_REMAP, 0);
00746         }
00747         if (conn->chosen_exit_optional ||
00748             conn->chosen_exit_retries) {
00749           /* stop wanting a specific exit */
00750           conn->chosen_exit_optional = 0;
00751           /* A non-zero chosen_exit_retries can happen if we set a
00752            * TrackHostExits for this address under a port that the exit
00753            * relay allows, but then try the same address with a different
00754            * port that it doesn't allow to exit. We shouldn't unregister
00755            * the mapping, since it is probably still wanted on the
00756            * original port. But now we give away to the exit relay that
00757            * we probably have a TrackHostExits on it. So be it. */
00758           conn->chosen_exit_retries = 0;
00759           tor_free(conn->chosen_exit_name); /* clears it */
00760         }
00761         if (connection_ap_detach_retriable(conn, circ, control_reason) >= 0)
00762           return 0;
00763         /* else, conn will get closed below */
00764         break;
00765       case END_STREAM_REASON_CONNECTREFUSED:
00766         if (!conn->chosen_exit_optional)
00767           break; /* break means it'll close, below */
00768         /* Else fall through: expire this circuit, clear the
00769          * chosen_exit_name field, and try again. */
00770       case END_STREAM_REASON_RESOLVEFAILED:
00771       case END_STREAM_REASON_TIMEOUT:
00772       case END_STREAM_REASON_MISC:
00773       case END_STREAM_REASON_NOROUTE:
00774         if (client_dns_incr_failures(conn->socks_request->address)
00775             < MAX_RESOLVE_FAILURES) {
00776           /* We haven't retried too many times; reattach the connection. */
00777           circuit_log_path(LOG_INFO,LD_APP,circ);
00778           /* Mark this circuit "unusable for new streams". */
00779           /* XXXX024 this is a kludgy way to do this. */
00780           tor_assert(circ->_base.timestamp_dirty);
00781           circ->_base.timestamp_dirty -= get_options()->MaxCircuitDirtiness;
00782 
00783           if (conn->chosen_exit_optional) {
00784             /* stop wanting a specific exit */
00785             conn->chosen_exit_optional = 0;
00786             tor_free(conn->chosen_exit_name); /* clears it */
00787           }
00788           if (connection_ap_detach_retriable(conn, circ, control_reason) >= 0)
00789             return 0;
00790           /* else, conn will get closed below */
00791         } else {
00792           log_notice(LD_APP,
00793                      "Have tried resolving or connecting to address '%s' "
00794                      "at %d different places. Giving up.",
00795                      safe_str(conn->socks_request->address),
00796                      MAX_RESOLVE_FAILURES);
00797           /* clear the failures, so it will have a full try next time */
00798           client_dns_clear_failures(conn->socks_request->address);
00799         }
00800         break;
00801       case END_STREAM_REASON_HIBERNATING:
00802       case END_STREAM_REASON_RESOURCELIMIT:
00803         if (exitrouter) {
00804           policies_set_node_exitpolicy_to_reject_all(exitrouter);
00805         }
00806         if (conn->chosen_exit_optional) {
00807           /* stop wanting a specific exit */
00808           conn->chosen_exit_optional = 0;
00809           tor_free(conn->chosen_exit_name); /* clears it */
00810         }
00811         if (connection_ap_detach_retriable(conn, circ, control_reason) >= 0)
00812           return 0;
00813         /* else, will close below */
00814         break;
00815     } /* end switch */
00816     log_info(LD_APP,"Giving up on retrying; conn can't be handled.");
00817   }
00818 
00819   log_info(LD_APP,
00820            "Edge got end (%s) before we're connected. Marking for close.",
00821        stream_end_reason_to_string(rh->length > 0 ? reason : -1));
00822   circuit_log_path(LOG_INFO,LD_APP,circ);
00823   /* need to test because of detach_retriable */
00824   if (!ENTRY_TO_CONN(conn)->marked_for_close)
00825     connection_mark_unattached_ap(conn, control_reason);
00826   return 0;
00827 }
00828 
00832 static void
00833 remap_event_helper(entry_connection_t *conn, uint32_t new_addr)
00834 {
00835   struct in_addr in;
00836 
00837   in.s_addr = htonl(new_addr);
00838   tor_inet_ntoa(&in, conn->socks_request->address,
00839                 sizeof(conn->socks_request->address));
00840   control_event_stream_status(conn, STREAM_EVENT_REMAP,
00841                               REMAP_STREAM_SOURCE_EXIT);
00842 }
00843 
00851 static int
00852 connection_edge_process_relay_cell_not_open(
00853     relay_header_t *rh, cell_t *cell, circuit_t *circ,
00854     edge_connection_t *conn, crypt_path_t *layer_hint)
00855 {
00856   if (rh->command == RELAY_COMMAND_END) {
00857     if (CIRCUIT_IS_ORIGIN(circ) && conn->_base.type == CONN_TYPE_AP) {
00858       return connection_ap_process_end_not_open(rh, cell,
00859                                                 TO_ORIGIN_CIRCUIT(circ),
00860                                                 EDGE_TO_ENTRY_CONN(conn),
00861                                                 layer_hint);
00862     } else {
00863       /* we just got an 'end', don't need to send one */
00864       conn->edge_has_sent_end = 1;
00865       conn->end_reason = *(cell->payload+RELAY_HEADER_SIZE) |
00866                          END_STREAM_REASON_FLAG_REMOTE;
00867       connection_mark_for_close(TO_CONN(conn));
00868       return 0;
00869     }
00870   }
00871 
00872   if (conn->_base.type == CONN_TYPE_AP &&
00873       rh->command == RELAY_COMMAND_CONNECTED) {
00874     entry_connection_t *entry_conn = EDGE_TO_ENTRY_CONN(conn);
00875     tor_assert(CIRCUIT_IS_ORIGIN(circ));
00876     if (conn->_base.state != AP_CONN_STATE_CONNECT_WAIT) {
00877       log_fn(LOG_PROTOCOL_WARN, LD_APP,
00878              "Got 'connected' while not in state connect_wait. Dropping.");
00879       return 0;
00880     }
00881     conn->_base.state = AP_CONN_STATE_OPEN;
00882     log_info(LD_APP,"'connected' received after %d seconds.",
00883              (int)(time(NULL) - conn->_base.timestamp_lastread));
00884     if (rh->length >= 4) {
00885       uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE));
00886       int ttl;
00887       if (!addr || (get_options()->ClientDNSRejectInternalAddresses &&
00888                     is_internal_IP(addr, 0))) {
00889         log_info(LD_APP, "...but it claims the IP address was %s. Closing.",
00890                  fmt_addr32(addr));
00891         connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL);
00892         connection_mark_unattached_ap(entry_conn,
00893                                       END_STREAM_REASON_TORPROTOCOL);
00894         return 0;
00895       }
00896       if (rh->length >= 8)
00897         ttl = (int)ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+4));
00898       else
00899         ttl = -1;
00900       client_dns_set_addressmap(entry_conn->socks_request->address, addr,
00901                                 entry_conn->chosen_exit_name, ttl);
00902 
00903       remap_event_helper(entry_conn, addr);
00904     }
00905     circuit_log_path(LOG_INFO,LD_APP,TO_ORIGIN_CIRCUIT(circ));
00906     /* don't send a socks reply to transparent conns */
00907     tor_assert(entry_conn->socks_request != NULL);
00908     if (!entry_conn->socks_request->has_finished)
00909       connection_ap_handshake_socks_reply(entry_conn, NULL, 0, 0);
00910 
00911     /* Was it a linked dir conn? If so, a dir request just started to
00912      * fetch something; this could be a bootstrap status milestone. */
00913     log_debug(LD_APP, "considering");
00914     if (TO_CONN(conn)->linked_conn &&
00915         TO_CONN(conn)->linked_conn->type == CONN_TYPE_DIR) {
00916       connection_t *dirconn = TO_CONN(conn)->linked_conn;
00917       log_debug(LD_APP, "it is! %d", dirconn->purpose);
00918       switch (dirconn->purpose) {
00919         case DIR_PURPOSE_FETCH_CERTIFICATE:
00920           if (consensus_is_waiting_for_certs())
00921             control_event_bootstrap(BOOTSTRAP_STATUS_LOADING_KEYS, 0);
00922           break;
00923         case DIR_PURPOSE_FETCH_CONSENSUS:
00924           control_event_bootstrap(BOOTSTRAP_STATUS_LOADING_STATUS, 0);
00925           break;
00926         case DIR_PURPOSE_FETCH_SERVERDESC:
00927           control_event_bootstrap(BOOTSTRAP_STATUS_LOADING_DESCRIPTORS,
00928                                   count_loading_descriptors_progress());
00929           break;
00930       }
00931     }
00932     /* This is definitely a success, so forget about any pending data we
00933      * had sent. */
00934     if (entry_conn->pending_optimistic_data) {
00935       generic_buffer_free(entry_conn->pending_optimistic_data);
00936       entry_conn->pending_optimistic_data = NULL;
00937     }
00938 
00939     /* handle anything that might have queued */
00940     if (connection_edge_package_raw_inbuf(conn, 1, NULL) < 0) {
00941       /* (We already sent an end cell if possible) */
00942       connection_mark_for_close(TO_CONN(conn));
00943       return 0;
00944     }
00945     return 0;
00946   }
00947   if (conn->_base.type == CONN_TYPE_AP &&
00948       rh->command == RELAY_COMMAND_RESOLVED) {
00949     int ttl;
00950     int answer_len;
00951     uint8_t answer_type;
00952     entry_connection_t *entry_conn = EDGE_TO_ENTRY_CONN(conn);
00953     if (conn->_base.state != AP_CONN_STATE_RESOLVE_WAIT) {
00954       log_fn(LOG_PROTOCOL_WARN, LD_APP, "Got a 'resolved' cell while "
00955              "not in state resolve_wait. Dropping.");
00956       return 0;
00957     }
00958     tor_assert(SOCKS_COMMAND_IS_RESOLVE(entry_conn->socks_request->command));
00959     answer_len = cell->payload[RELAY_HEADER_SIZE+1];
00960     if (rh->length < 2 || answer_len+2>rh->length) {
00961       log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
00962              "Dropping malformed 'resolved' cell");
00963       connection_mark_unattached_ap(entry_conn, END_STREAM_REASON_TORPROTOCOL);
00964       return 0;
00965     }
00966     answer_type = cell->payload[RELAY_HEADER_SIZE];
00967     if (rh->length >= answer_len+6)
00968       ttl = (int)ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+
00969                                   2+answer_len));
00970     else
00971       ttl = -1;
00972     if (answer_type == RESOLVED_TYPE_IPV4 && answer_len == 4) {
00973       uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+2));
00974       if (get_options()->ClientDNSRejectInternalAddresses &&
00975           is_internal_IP(addr, 0)) {
00976         log_info(LD_APP,"Got a resolve with answer %s. Rejecting.",
00977                  fmt_addr32(addr));
00978         connection_ap_handshake_socks_resolved(entry_conn,
00979                                                RESOLVED_TYPE_ERROR_TRANSIENT,
00980                                                0, NULL, 0, TIME_MAX);
00981         connection_mark_unattached_ap(entry_conn,
00982                                       END_STREAM_REASON_TORPROTOCOL);
00983         return 0;
00984       }
00985     }
00986     connection_ap_handshake_socks_resolved(entry_conn,
00987                    answer_type,
00988                    cell->payload[RELAY_HEADER_SIZE+1], /*answer_len*/
00989                    cell->payload+RELAY_HEADER_SIZE+2, /*answer*/
00990                    ttl,
00991                    -1);
00992     if (answer_type == RESOLVED_TYPE_IPV4 && answer_len == 4) {
00993       uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+2));
00994       remap_event_helper(entry_conn, addr);
00995     }
00996     connection_mark_unattached_ap(entry_conn,
00997                               END_STREAM_REASON_DONE |
00998                               END_STREAM_REASON_FLAG_ALREADY_SOCKS_REPLIED);
00999     return 0;
01000   }
01001 
01002   log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
01003          "Got an unexpected relay command %d, in state %d (%s). Dropping.",
01004          rh->command, conn->_base.state,
01005          conn_state_to_string(conn->_base.type, conn->_base.state));
01006   return 0; /* for forward compatibility, don't kill the circuit */
01007 //  connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL);
01008 //  connection_mark_for_close(conn);
01009 //  return -1;
01010 }
01011 
01021 static int
01022 connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
01023                                    edge_connection_t *conn,
01024                                    crypt_path_t *layer_hint)
01025 {
01026   static int num_seen=0;
01027   relay_header_t rh;
01028   unsigned domain = layer_hint?LD_APP:LD_EXIT;
01029   int reason;
01030   int optimistic_data = 0; /* Set to 1 if we receive data on a stream
01031                             * that's in the EXIT_CONN_STATE_RESOLVING
01032                             * or EXIT_CONN_STATE_CONNECTING states. */
01033 
01034   tor_assert(cell);
01035   tor_assert(circ);
01036 
01037   relay_header_unpack(&rh, cell->payload);
01038 //  log_fn(LOG_DEBUG,"command %d stream %d", rh.command, rh.stream_id);
01039   num_seen++;
01040   log_debug(domain, "Now seen %d relay cells here (command %d, stream %d).",
01041             num_seen, rh.command, rh.stream_id);
01042 
01043   if (rh.length > RELAY_PAYLOAD_SIZE) {
01044     log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
01045            "Relay cell length field too long. Closing circuit.");
01046     return - END_CIRC_REASON_TORPROTOCOL;
01047   }
01048 
01049   /* either conn is NULL, in which case we've got a control cell, or else
01050    * conn points to the recognized stream. */
01051 
01052   if (conn && !connection_state_is_open(TO_CONN(conn))) {
01053     if (conn->_base.type == CONN_TYPE_EXIT &&
01054         (conn->_base.state == EXIT_CONN_STATE_CONNECTING ||
01055          conn->_base.state == EXIT_CONN_STATE_RESOLVING) &&
01056         rh.command == RELAY_COMMAND_DATA) {
01057       /* Allow DATA cells to be delivered to an exit node in state
01058        * EXIT_CONN_STATE_CONNECTING or EXIT_CONN_STATE_RESOLVING.
01059        * This speeds up HTTP, for example. */
01060       optimistic_data = 1;
01061     } else {
01062       return connection_edge_process_relay_cell_not_open(
01063                &rh, cell, circ, conn, layer_hint);
01064     }
01065   }
01066 
01067   switch (rh.command) {
01068     case RELAY_COMMAND_DROP:
01069 //      log_info(domain,"Got a relay-level padding cell. Dropping.");
01070       return 0;
01071     case RELAY_COMMAND_BEGIN:
01072     case RELAY_COMMAND_BEGIN_DIR:
01073       if (layer_hint &&
01074           circ->purpose != CIRCUIT_PURPOSE_S_REND_JOINED) {
01075         log_fn(LOG_PROTOCOL_WARN, LD_APP,
01076                "Relay begin request unsupported at AP. Dropping.");
01077         return 0;
01078       }
01079       if (circ->purpose == CIRCUIT_PURPOSE_S_REND_JOINED &&
01080           layer_hint != TO_ORIGIN_CIRCUIT(circ)->cpath->prev) {
01081         log_fn(LOG_PROTOCOL_WARN, LD_APP,
01082                "Relay begin request to Hidden Service "
01083                "from intermediary node. Dropping.");
01084         return 0;
01085       }
01086       if (conn) {
01087         log_fn(LOG_PROTOCOL_WARN, domain,
01088                "Begin cell for known stream. Dropping.");
01089         return 0;
01090       }
01091       if (rh.command == RELAY_COMMAND_BEGIN_DIR) {
01092         /* Assign this circuit and its app-ward OR connection a unique ID,
01093          * so that we can measure download times. The local edge and dir
01094          * connection will be assigned the same ID when they are created
01095          * and linked. */
01096         static uint64_t next_id = 0;
01097         circ->dirreq_id = ++next_id;
01098         TO_CONN(TO_OR_CIRCUIT(circ)->p_conn)->dirreq_id = circ->dirreq_id;
01099       }
01100 
01101       return connection_exit_begin_conn(cell, circ);
01102     case RELAY_COMMAND_DATA:
01103       ++stats_n_data_cells_received;
01104       if (( layer_hint && --layer_hint->deliver_window < 0) ||
01105           (!layer_hint && --circ->deliver_window < 0)) {
01106         log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
01107                "(relay data) circ deliver_window below 0. Killing.");
01108         if (conn) {
01109           /* XXXX Do we actually need to do this?  Will killing the circuit
01110            * not send an END and mark the stream for close as appropriate? */
01111           connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL);
01112           connection_mark_for_close(TO_CONN(conn));
01113         }
01114         return -END_CIRC_REASON_TORPROTOCOL;
01115       }
01116       log_debug(domain,"circ deliver_window now %d.", layer_hint ?
01117                 layer_hint->deliver_window : circ->deliver_window);
01118 
01119       circuit_consider_sending_sendme(circ, layer_hint);
01120 
01121       if (!conn) {
01122         log_info(domain,"data cell dropped, unknown stream (streamid %d).",
01123                  rh.stream_id);
01124         return 0;
01125       }
01126 
01127       if (--conn->deliver_window < 0) { /* is it below 0 after decrement? */
01128         log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
01129                "(relay data) conn deliver_window below 0. Killing.");
01130         return -END_CIRC_REASON_TORPROTOCOL;
01131       }
01132 
01133       stats_n_data_bytes_received += rh.length;
01134       connection_write_to_buf((char*)(cell->payload + RELAY_HEADER_SIZE),
01135                               rh.length, TO_CONN(conn));
01136 
01137       if (!optimistic_data) {
01138         /* Only send a SENDME if we're not getting optimistic data; otherwise
01139          * a SENDME could arrive before the CONNECTED.
01140          */
01141         connection_edge_consider_sending_sendme(conn);
01142       }
01143 
01144       return 0;
01145     case RELAY_COMMAND_END:
01146       reason = rh.length > 0 ?
01147         get_uint8(cell->payload+RELAY_HEADER_SIZE) : END_STREAM_REASON_MISC;
01148       if (!conn) {
01149         log_info(domain,"end cell (%s) dropped, unknown stream.",
01150                  stream_end_reason_to_string(reason));
01151         return 0;
01152       }
01153 /* XXX add to this log_fn the exit node's nickname? */
01154       log_info(domain,"%d: end cell (%s) for stream %d. Removing stream.",
01155                conn->_base.s,
01156                stream_end_reason_to_string(reason),
01157                conn->stream_id);
01158       if (conn->_base.type == CONN_TYPE_AP) {
01159         entry_connection_t *entry_conn = EDGE_TO_ENTRY_CONN(conn);
01160         if (entry_conn->socks_request &&
01161             !entry_conn->socks_request->has_finished)
01162           log_warn(LD_BUG,
01163                    "open stream hasn't sent socks answer yet? Closing.");
01164       }
01165       /* We just *got* an end; no reason to send one. */
01166       conn->edge_has_sent_end = 1;
01167       if (!conn->end_reason)
01168         conn->end_reason = reason | END_STREAM_REASON_FLAG_REMOTE;
01169       if (!conn->_base.marked_for_close) {
01170         /* only mark it if not already marked. it's possible to
01171          * get the 'end' right around when the client hangs up on us. */
01172         connection_mark_and_flush(TO_CONN(conn));
01173       }
01174       return 0;
01175     case RELAY_COMMAND_EXTEND: {
01176       static uint64_t total_n_extend=0, total_nonearly=0;
01177       total_n_extend++;
01178       if (conn) {
01179         log_fn(LOG_PROTOCOL_WARN, domain,
01180                "'extend' cell received for non-zero stream. Dropping.");
01181         return 0;
01182       }
01183       if (cell->command != CELL_RELAY_EARLY &&
01184           !networkstatus_get_param(NULL,"AllowNonearlyExtend",0,0,1)) {
01185 #define EARLY_WARNING_INTERVAL 3600
01186         static ratelim_t early_warning_limit =
01187           RATELIM_INIT(EARLY_WARNING_INTERVAL);
01188         char *m;
01189         if (cell->command == CELL_RELAY) {
01190           ++total_nonearly;
01191           if ((m = rate_limit_log(&early_warning_limit, approx_time()))) {
01192             double percentage = ((double)total_nonearly)/total_n_extend;
01193             percentage *= 100;
01194             log_fn(LOG_PROTOCOL_WARN, domain, "EXTEND cell received, "
01195                    "but not via RELAY_EARLY. Dropping.%s", m);
01196             log_fn(LOG_PROTOCOL_WARN, domain, "  (We have dropped %.02f%% of "
01197                    "all EXTEND cells for this reason)", percentage);
01198             tor_free(m);
01199           }
01200         } else {
01201           log_fn(LOG_WARN, domain,
01202                  "EXTEND cell received, in a cell with type %d! Dropping.",
01203                  cell->command);
01204         }
01205         return 0;
01206       }
01207       return circuit_extend(cell, circ);
01208     }
01209     case RELAY_COMMAND_EXTENDED:
01210       if (!layer_hint) {
01211         log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
01212                "'extended' unsupported at non-origin. Dropping.");
01213         return 0;
01214       }
01215       log_debug(domain,"Got an extended cell! Yay.");
01216       if ((reason = circuit_finish_handshake(TO_ORIGIN_CIRCUIT(circ),
01217                                        CELL_CREATED,
01218                                        cell->payload+RELAY_HEADER_SIZE)) < 0) {
01219         log_warn(domain,"circuit_finish_handshake failed.");
01220         return reason;
01221       }
01222       if ((reason=circuit_send_next_onion_skin(TO_ORIGIN_CIRCUIT(circ)))<0) {
01223         log_info(domain,"circuit_send_next_onion_skin() failed.");
01224         return reason;
01225       }
01226       return 0;
01227     case RELAY_COMMAND_TRUNCATE:
01228       if (layer_hint) {
01229         log_fn(LOG_PROTOCOL_WARN, LD_APP,
01230                "'truncate' unsupported at origin. Dropping.");
01231         return 0;
01232       }
01233       if (circ->n_conn) {
01234         uint8_t trunc_reason = *(uint8_t*)(cell->payload + RELAY_HEADER_SIZE);
01235         circuit_clear_cell_queue(circ, circ->n_conn);
01236         connection_or_send_destroy(circ->n_circ_id, circ->n_conn,
01237                                    trunc_reason);
01238         circuit_set_n_circid_orconn(circ, 0, NULL);
01239       }
01240       log_debug(LD_EXIT, "Processed 'truncate', replying.");
01241       {
01242         char payload[1];
01243         payload[0] = (char)END_CIRC_REASON_REQUESTED;
01244         relay_send_command_from_edge(0, circ, RELAY_COMMAND_TRUNCATED,
01245                                      payload, sizeof(payload), NULL);
01246       }
01247       return 0;
01248     case RELAY_COMMAND_TRUNCATED:
01249       if (!layer_hint) {
01250         log_fn(LOG_PROTOCOL_WARN, LD_EXIT,
01251                "'truncated' unsupported at non-origin. Dropping.");
01252         return 0;
01253       }
01254       circuit_truncated(TO_ORIGIN_CIRCUIT(circ), layer_hint);
01255       return 0;
01256     case RELAY_COMMAND_CONNECTED:
01257       if (conn) {
01258         log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
01259                "'connected' unsupported while open. Closing circ.");
01260         return -END_CIRC_REASON_TORPROTOCOL;
01261       }
01262       log_info(domain,
01263                "'connected' received, no conn attached anymore. Ignoring.");
01264       return 0;
01265     case RELAY_COMMAND_SENDME:
01266       if (!conn) {
01267         if (layer_hint) {
01268           layer_hint->package_window += CIRCWINDOW_INCREMENT;
01269           log_debug(LD_APP,"circ-level sendme at origin, packagewindow %d.",
01270                     layer_hint->package_window);
01271           circuit_resume_edge_reading(circ, layer_hint);
01272         } else {
01273           circ->package_window += CIRCWINDOW_INCREMENT;
01274           log_debug(LD_APP,
01275                     "circ-level sendme at non-origin, packagewindow %d.",
01276                     circ->package_window);
01277           circuit_resume_edge_reading(circ, layer_hint);
01278         }
01279         return 0;
01280       }
01281       conn->package_window += STREAMWINDOW_INCREMENT;
01282       log_debug(domain,"stream-level sendme, packagewindow now %d.",
01283                 conn->package_window);
01284       if (circuit_queue_streams_are_blocked(circ)) {
01285         /* Still waiting for queue to flush; don't touch conn */
01286         return 0;
01287       }
01288       connection_start_reading(TO_CONN(conn));
01289       /* handle whatever might still be on the inbuf */
01290       if (connection_edge_package_raw_inbuf(conn, 1, NULL) < 0) {
01291         /* (We already sent an end cell if possible) */
01292         connection_mark_for_close(TO_CONN(conn));
01293         return 0;
01294       }
01295       return 0;
01296     case RELAY_COMMAND_RESOLVE:
01297       if (layer_hint) {
01298         log_fn(LOG_PROTOCOL_WARN, LD_APP,
01299                "resolve request unsupported at AP; dropping.");
01300         return 0;
01301       } else if (conn) {
01302         log_fn(LOG_PROTOCOL_WARN, domain,
01303                "resolve request for known stream; dropping.");
01304         return 0;
01305       } else if (circ->purpose != CIRCUIT_PURPOSE_OR) {
01306         log_fn(LOG_PROTOCOL_WARN, domain,
01307                "resolve request on circ with purpose %d; dropping",
01308                circ->purpose);
01309         return 0;
01310       }
01311       connection_exit_begin_resolve(cell, TO_OR_CIRCUIT(circ));
01312       return 0;
01313     case RELAY_COMMAND_RESOLVED:
01314       if (conn) {
01315         log_fn(LOG_PROTOCOL_WARN, domain,
01316                "'resolved' unsupported while open. Closing circ.");
01317         return -END_CIRC_REASON_TORPROTOCOL;
01318       }
01319       log_info(domain,
01320                "'resolved' received, no conn attached anymore. Ignoring.");
01321       return 0;
01322     case RELAY_COMMAND_ESTABLISH_INTRO:
01323     case RELAY_COMMAND_ESTABLISH_RENDEZVOUS:
01324     case RELAY_COMMAND_INTRODUCE1:
01325     case RELAY_COMMAND_INTRODUCE2:
01326     case RELAY_COMMAND_INTRODUCE_ACK:
01327     case RELAY_COMMAND_RENDEZVOUS1:
01328     case RELAY_COMMAND_RENDEZVOUS2:
01329     case RELAY_COMMAND_INTRO_ESTABLISHED:
01330     case RELAY_COMMAND_RENDEZVOUS_ESTABLISHED:
01331       rend_process_relay_cell(circ, layer_hint,
01332                               rh.command, rh.length,
01333                               cell->payload+RELAY_HEADER_SIZE);
01334       return 0;
01335   }
01336   log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
01337          "Received unknown relay command %d. Perhaps the other side is using "
01338          "a newer version of Tor? Dropping.",
01339          rh.command);
01340   return 0; /* for forward compatibility, don't kill the circuit */
01341 }
01342 
01344 uint64_t stats_n_data_cells_packaged = 0;
01348 uint64_t stats_n_data_bytes_packaged = 0;
01350 uint64_t stats_n_data_cells_received = 0;
01354 uint64_t stats_n_data_bytes_received = 0;
01355 
01366 int
01367 connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial,
01368                                   int *max_cells)
01369 {
01370   size_t bytes_to_process, length;
01371   char payload[CELL_PAYLOAD_SIZE];
01372   circuit_t *circ;
01373   const unsigned domain = conn->_base.type == CONN_TYPE_AP ? LD_APP : LD_EXIT;
01374   int sending_from_optimistic = 0;
01375   const int sending_optimistically =
01376     conn->_base.type == CONN_TYPE_AP &&
01377     conn->_base.state != AP_CONN_STATE_OPEN;
01378   entry_connection_t *entry_conn =
01379     conn->_base.type == CONN_TYPE_AP ? EDGE_TO_ENTRY_CONN(conn) : NULL;
01380   crypt_path_t *cpath_layer = conn->cpath_layer;
01381 
01382   tor_assert(conn);
01383 
01384   if (conn->_base.marked_for_close) {
01385     log_warn(LD_BUG,
01386              "called on conn that's already marked for close at %s:%d.",
01387              conn->_base.marked_for_close_file, conn->_base.marked_for_close);
01388     return 0;
01389   }
01390 
01391   if (max_cells && *max_cells <= 0)
01392     return 0;
01393 
01394  repeat_connection_edge_package_raw_inbuf:
01395 
01396   circ = circuit_get_by_edge_conn(conn);
01397   if (!circ) {
01398     log_info(domain,"conn has no circuit! Closing.");
01399     conn->end_reason = END_STREAM_REASON_CANT_ATTACH;
01400     return -1;
01401   }
01402 
01403   if (circuit_consider_stop_edge_reading(circ, cpath_layer))
01404     return 0;
01405 
01406   if (conn->package_window <= 0) {
01407     log_info(domain,"called with package_window %d. Skipping.",
01408              conn->package_window);
01409     connection_stop_reading(TO_CONN(conn));
01410     return 0;
01411   }
01412 
01413   sending_from_optimistic = entry_conn &&
01414     entry_conn->sending_optimistic_data != NULL;
01415 
01416   if (PREDICT_UNLIKELY(sending_from_optimistic)) {
01417     bytes_to_process = generic_buffer_len(entry_conn->sending_optimistic_data);
01418     if (PREDICT_UNLIKELY(!bytes_to_process)) {
01419       log_warn(LD_BUG, "sending_optimistic_data was non-NULL but empty");
01420       bytes_to_process = connection_get_inbuf_len(TO_CONN(conn));
01421       sending_from_optimistic = 0;
01422     }
01423   } else {
01424     bytes_to_process = connection_get_inbuf_len(TO_CONN(conn));
01425   }
01426 
01427   if (!bytes_to_process)
01428     return 0;
01429 
01430   if (!package_partial && bytes_to_process < RELAY_PAYLOAD_SIZE)
01431     return 0;
01432 
01433   if (bytes_to_process > RELAY_PAYLOAD_SIZE) {
01434     length = RELAY_PAYLOAD_SIZE;
01435   } else {
01436     length = bytes_to_process;
01437   }
01438   stats_n_data_bytes_packaged += length;
01439   stats_n_data_cells_packaged += 1;
01440 
01441   if (PREDICT_UNLIKELY(sending_from_optimistic)) {
01442     /* XXXX We could be more efficient here by sometimes packing
01443      * previously-sent optimistic data in the same cell with data
01444      * from the inbuf. */
01445     generic_buffer_get(entry_conn->sending_optimistic_data, payload, length);
01446     if (!generic_buffer_len(entry_conn->sending_optimistic_data)) {
01447         generic_buffer_free(entry_conn->sending_optimistic_data);
01448         entry_conn->sending_optimistic_data = NULL;
01449     }
01450   } else {
01451     connection_fetch_from_buf(payload, length, TO_CONN(conn));
01452   }
01453 
01454   log_debug(domain,"(%d) Packaging %d bytes (%d waiting).", conn->_base.s,
01455             (int)length, (int)connection_get_inbuf_len(TO_CONN(conn)));
01456 
01457   if (sending_optimistically && !sending_from_optimistic) {
01458     /* This is new optimistic data; remember it in case we need to detach and
01459        retry */
01460     if (!entry_conn->pending_optimistic_data)
01461       entry_conn->pending_optimistic_data = generic_buffer_new();
01462     generic_buffer_add(entry_conn->pending_optimistic_data, payload, length);
01463   }
01464 
01465   if (connection_edge_send_command(conn, RELAY_COMMAND_DATA,
01466                                    payload, length) < 0 )
01467     /* circuit got marked for close, don't continue, don't need to mark conn */
01468     return 0;
01469 
01470   if (!cpath_layer) { /* non-rendezvous exit */
01471     tor_assert(circ->package_window > 0);
01472     circ->package_window--;
01473   } else { /* we're an AP, or an exit on a rendezvous circ */
01474     tor_assert(cpath_layer->package_window > 0);
01475     cpath_layer->package_window--;
01476   }
01477 
01478   if (--conn->package_window <= 0) { /* is it 0 after decrement? */
01479     connection_stop_reading(TO_CONN(conn));
01480     log_debug(domain,"conn->package_window reached 0.");
01481     circuit_consider_stop_edge_reading(circ, cpath_layer);
01482     return 0; /* don't process the inbuf any more */
01483   }
01484   log_debug(domain,"conn->package_window is now %d",conn->package_window);
01485 
01486   if (max_cells) {
01487     *max_cells -= 1;
01488     if (*max_cells <= 0)
01489       return 0;
01490   }
01491 
01492   /* handle more if there's more, or return 0 if there isn't */
01493   goto repeat_connection_edge_package_raw_inbuf;
01494 }
01495 
01503 void
01504 connection_edge_consider_sending_sendme(edge_connection_t *conn)
01505 {
01506   circuit_t *circ;
01507 
01508   if (connection_outbuf_too_full(TO_CONN(conn)))
01509     return;
01510 
01511   circ = circuit_get_by_edge_conn(conn);
01512   if (!circ) {
01513     /* this can legitimately happen if the destroy has already
01514      * arrived and torn down the circuit */
01515     log_info(LD_APP,"No circuit associated with conn. Skipping.");
01516     return;
01517   }
01518 
01519   while (conn->deliver_window <= STREAMWINDOW_START - STREAMWINDOW_INCREMENT) {
01520     log_debug(conn->_base.type == CONN_TYPE_AP ?LD_APP:LD_EXIT,
01521               "Outbuf %d, Queuing stream sendme.",
01522               (int)conn->_base.outbuf_flushlen);
01523     conn->deliver_window += STREAMWINDOW_INCREMENT;
01524     if (connection_edge_send_command(conn, RELAY_COMMAND_SENDME,
01525                                      NULL, 0) < 0) {
01526       log_warn(LD_APP,"connection_edge_send_command failed. Skipping.");
01527       return; /* the circuit's closed, don't continue */
01528     }
01529   }
01530 }
01531 
01537 static void
01538 circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
01539 {
01540   if (circuit_queue_streams_are_blocked(circ)) {
01541     log_debug(layer_hint?LD_APP:LD_EXIT,"Too big queue, no resuming");
01542     return;
01543   }
01544   log_debug(layer_hint?LD_APP:LD_EXIT,"resuming");
01545 
01546   if (CIRCUIT_IS_ORIGIN(circ))
01547     circuit_resume_edge_reading_helper(TO_ORIGIN_CIRCUIT(circ)->p_streams,
01548                                        circ, layer_hint);
01549   else
01550     circuit_resume_edge_reading_helper(TO_OR_CIRCUIT(circ)->n_streams,
01551                                        circ, layer_hint);
01552 }
01553 
01558 static int
01559 circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
01560                                    circuit_t *circ,
01561                                    crypt_path_t *layer_hint)
01562 {
01563   edge_connection_t *conn;
01564   int n_packaging_streams, n_streams_left;
01565   int packaged_this_round;
01566   int cells_on_queue;
01567   int cells_per_conn;
01568   edge_connection_t *chosen_stream = NULL;
01569 
01570   /* How many cells do we have space for?  It will be the minimum of
01571    * the number needed to exhaust the package window, and the minimum
01572    * needed to fill the cell queue. */
01573   int max_to_package = circ->package_window;
01574   if (CIRCUIT_IS_ORIGIN(circ)) {
01575     cells_on_queue = circ->n_conn_cells.n;
01576   } else {
01577     or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
01578     cells_on_queue = or_circ->p_conn_cells.n;
01579   }
01580   if (CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue < max_to_package)
01581     max_to_package = CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue;
01582 
01583   /* Once we used to start listening on the streams in the order they
01584    * appeared in the linked list.  That leads to starvation on the
01585    * streams that appeared later on the list, since the first streams
01586    * would always get to read first.  Instead, we just pick a random
01587    * stream on the list, and enable reading for streams starting at that
01588    * point (and wrapping around as if the list were circular).  It would
01589    * probably be better to actually remember which streams we've
01590    * serviced in the past, but this is simple and effective. */
01591 
01592   /* Select a stream uniformly at random from the linked list.  We
01593    * don't need cryptographic randomness here. */
01594   {
01595     int num_streams = 0;
01596     for (conn = first_conn; conn; conn = conn->next_stream) {
01597       num_streams++;
01598       if ((tor_weak_random() % num_streams)==0)
01599         chosen_stream = conn;
01600       /* Invariant: chosen_stream has been chosen uniformly at random from
01601        * among the first num_streams streams on first_conn. */
01602     }
01603   }
01604 
01605   /* Count how many non-marked streams there are that have anything on
01606    * their inbuf, and enable reading on all of the connections. */
01607   n_packaging_streams = 0;
01608   /* Activate reading starting from the chosen stream */
01609   for (conn=chosen_stream; conn; conn = conn->next_stream) {
01610     /* Start reading for the streams starting from here */
01611     if (conn->_base.marked_for_close || conn->package_window <= 0)
01612       continue;
01613     if (!layer_hint || conn->cpath_layer == layer_hint) {
01614       connection_start_reading(TO_CONN(conn));
01615 
01616       if (connection_get_inbuf_len(TO_CONN(conn)) > 0)
01617         ++n_packaging_streams;
01618     }
01619   }
01620   /* Go back and do the ones we skipped, circular-style */
01621   for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) {
01622     if (conn->_base.marked_for_close || conn->package_window <= 0)
01623       continue;
01624     if (!layer_hint || conn->cpath_layer == layer_hint) {
01625       connection_start_reading(TO_CONN(conn));
01626 
01627       if (connection_get_inbuf_len(TO_CONN(conn)) > 0)
01628         ++n_packaging_streams;
01629     }
01630   }
01631 
01632   if (n_packaging_streams == 0) /* avoid divide-by-zero */
01633     return 0;
01634 
01635  again:
01636 
01637   cells_per_conn = CEIL_DIV(max_to_package, n_packaging_streams);
01638 
01639   packaged_this_round = 0;
01640   n_streams_left = 0;
01641 
01642   /* Iterate over all connections.  Package up to cells_per_conn cells on
01643    * each.  Update packaged_this_round with the total number of cells
01644    * packaged, and n_streams_left with the number that still have data to
01645    * package.
01646    */
01647   for (conn=first_conn; conn; conn=conn->next_stream) {
01648     if (conn->_base.marked_for_close || conn->package_window <= 0)
01649       continue;
01650     if (!layer_hint || conn->cpath_layer == layer_hint) {
01651       int n = cells_per_conn, r;
01652       /* handle whatever might still be on the inbuf */
01653       r = connection_edge_package_raw_inbuf(conn, 1, &n);
01654 
01655       /* Note how many we packaged */
01656       packaged_this_round += (cells_per_conn-n);
01657 
01658       if (r<0) {
01659         /* Problem while packaging. (We already sent an end cell if
01660          * possible) */
01661         connection_mark_for_close(TO_CONN(conn));
01662         continue;
01663       }
01664 
01665       /* If there's still data to read, we'll be coming back to this stream. */
01666       if (connection_get_inbuf_len(TO_CONN(conn)))
01667           ++n_streams_left;
01668 
01669       /* If the circuit won't accept any more data, return without looking
01670        * at any more of the streams. Any connections that should be stopped
01671        * have already been stopped by connection_edge_package_raw_inbuf. */
01672       if (circuit_consider_stop_edge_reading(circ, layer_hint))
01673         return -1;
01674       /* XXXX should we also stop immediately if we fill up the cell queue?
01675        * Probably. */
01676     }
01677   }
01678 
01679   /* If we made progress, and we are willing to package more, and there are
01680    * any streams left that want to package stuff... try again!
01681    */
01682   if (packaged_this_round && packaged_this_round < max_to_package &&
01683       n_streams_left) {
01684     max_to_package -= packaged_this_round;
01685     n_packaging_streams = n_streams_left;
01686     goto again;
01687   }
01688 
01689   return 0;
01690 }
01691 
01698 static int
01699 circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
01700 {
01701   edge_connection_t *conn = NULL;
01702   unsigned domain = layer_hint ? LD_APP : LD_EXIT;
01703 
01704   if (!layer_hint) {
01705     or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
01706     log_debug(domain,"considering circ->package_window %d",
01707               circ->package_window);
01708     if (circ->package_window <= 0) {
01709       log_debug(domain,"yes, not-at-origin. stopped.");
01710       for (conn = or_circ->n_streams; conn; conn=conn->next_stream)
01711         connection_stop_reading(TO_CONN(conn));
01712       return 1;
01713     }
01714     return 0;
01715   }
01716   /* else, layer hint is defined, use it */
01717   log_debug(domain,"considering layer_hint->package_window %d",
01718             layer_hint->package_window);
01719   if (layer_hint->package_window <= 0) {
01720     log_debug(domain,"yes, at-origin. stopped.");
01721     for (conn = TO_ORIGIN_CIRCUIT(circ)->p_streams; conn;
01722          conn=conn->next_stream) {
01723       if (conn->cpath_layer == layer_hint)
01724         connection_stop_reading(TO_CONN(conn));
01725     }
01726     return 1;
01727   }
01728   return 0;
01729 }
01730 
01737 static void
01738 circuit_consider_sending_sendme(circuit_t *circ, crypt_path_t *layer_hint)
01739 {
01740 //  log_fn(LOG_INFO,"Considering: layer_hint is %s",
01741 //         layer_hint ? "defined" : "null");
01742   while ((layer_hint ? layer_hint->deliver_window : circ->deliver_window) <=
01743           CIRCWINDOW_START - CIRCWINDOW_INCREMENT) {
01744     log_debug(LD_CIRC,"Queuing circuit sendme.");
01745     if (layer_hint)
01746       layer_hint->deliver_window += CIRCWINDOW_INCREMENT;
01747     else
01748       circ->deliver_window += CIRCWINDOW_INCREMENT;
01749     if (relay_send_command_from_edge(0, circ, RELAY_COMMAND_SENDME,
01750                                      NULL, 0, layer_hint) < 0) {
01751       log_warn(LD_CIRC,
01752                "relay_send_command_from_edge failed. Circuit's closed.");
01753       return; /* the circuit's closed, don't continue */
01754     }
01755   }
01756 }
01757 
01758 #ifdef ACTIVE_CIRCUITS_PARANOIA
01759 #define assert_active_circuits_ok_paranoid(conn) \
01760      assert_active_circuits_ok(conn)
01761 #else
01762 #define assert_active_circuits_ok_paranoid(conn)
01763 #endif
01764 
01766 static int total_cells_allocated = 0;
01767 
01769 static mp_pool_t *cell_pool = NULL;
01770 
01773 static mp_pool_t *it_pool = NULL;
01774 
01776 void
01777 init_cell_pool(void)
01778 {
01779   tor_assert(!cell_pool);
01780   cell_pool = mp_pool_new(sizeof(packed_cell_t), 128*1024);
01781 }
01782 
01785 void
01786 free_cell_pool(void)
01787 {
01788   /* Maybe we haven't called init_cell_pool yet; need to check for it. */
01789   if (cell_pool) {
01790     mp_pool_destroy(cell_pool);
01791     cell_pool = NULL;
01792   }
01793   if (it_pool) {
01794     mp_pool_destroy(it_pool);
01795     it_pool = NULL;
01796   }
01797 }
01798 
01800 void
01801 clean_cell_pool(void)
01802 {
01803   tor_assert(cell_pool);
01804   mp_pool_clean(cell_pool, 0, 1);
01805 }
01806 
01808 static INLINE void
01809 packed_cell_free_unchecked(packed_cell_t *cell)
01810 {
01811   --total_cells_allocated;
01812   mp_pool_release(cell);
01813 }
01814 
01816 static INLINE packed_cell_t *
01817 packed_cell_alloc(void)
01818 {
01819   ++total_cells_allocated;
01820   return mp_pool_get(cell_pool);
01821 }
01822 
01825 void
01826 dump_cell_pool_usage(int severity)
01827 {
01828   circuit_t *c;
01829   int n_circs = 0;
01830   int n_cells = 0;
01831   for (c = _circuit_get_global_list(); c; c = c->next) {
01832     n_cells += c->n_conn_cells.n;
01833     if (!CIRCUIT_IS_ORIGIN(c))
01834       n_cells += TO_OR_CIRCUIT(c)->p_conn_cells.n;
01835     ++n_circs;
01836   }
01837   log(severity, LD_MM, "%d cells allocated on %d circuits. %d cells leaked.",
01838       n_cells, n_circs, total_cells_allocated - n_cells);
01839   mp_pool_log_status(cell_pool, severity);
01840 }
01841 
01843 static INLINE packed_cell_t *
01844 packed_cell_copy(const cell_t *cell)
01845 {
01846   packed_cell_t *c = packed_cell_alloc();
01847   cell_pack(c, cell);
01848   c->next = NULL;
01849   return c;
01850 }
01851 
01853 void
01854 cell_queue_append(cell_queue_t *queue, packed_cell_t *cell)
01855 {
01856   if (queue->tail) {
01857     tor_assert(!queue->tail->next);
01858     queue->tail->next = cell;
01859   } else {
01860     queue->head = cell;
01861   }
01862   queue->tail = cell;
01863   cell->next = NULL;
01864   ++queue->n;
01865 }
01866 
01868 void
01869 cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell)
01870 {
01871   packed_cell_t *copy = packed_cell_copy(cell);
01872   /* Remember the time when this cell was put in the queue. */
01873   if (get_options()->CellStatistics) {
01874     struct timeval now;
01875     uint32_t added;
01876     insertion_time_queue_t *it_queue = queue->insertion_times;
01877     if (!it_pool)
01878       it_pool = mp_pool_new(sizeof(insertion_time_elem_t), 1024);
01879     tor_gettimeofday_cached(&now);
01880 #define SECONDS_IN_A_DAY 86400L
01881     added = (uint32_t)(((now.tv_sec % SECONDS_IN_A_DAY) * 100L)
01882             + ((uint32_t)now.tv_usec / (uint32_t)10000L));
01883     if (!it_queue) {
01884       it_queue = tor_malloc_zero(sizeof(insertion_time_queue_t));
01885       queue->insertion_times = it_queue;
01886     }
01887     if (it_queue->last && it_queue->last->insertion_time == added) {
01888       it_queue->last->counter++;
01889     } else {
01890       insertion_time_elem_t *elem = mp_pool_get(it_pool);
01891       elem->next = NULL;
01892       elem->insertion_time = added;
01893       elem->counter = 1;
01894       if (it_queue->last) {
01895         it_queue->last->next = elem;
01896         it_queue->last = elem;
01897       } else {
01898         it_queue->first = it_queue->last = elem;
01899       }
01900     }
01901   }
01902   cell_queue_append(queue, copy);
01903 }
01904 
01906 void
01907 cell_queue_clear(cell_queue_t *queue)
01908 {
01909   packed_cell_t *cell, *next;
01910   cell = queue->head;
01911   while (cell) {
01912     next = cell->next;
01913     packed_cell_free_unchecked(cell);
01914     cell = next;
01915   }
01916   queue->head = queue->tail = NULL;
01917   queue->n = 0;
01918   if (queue->insertion_times) {
01919     while (queue->insertion_times->first) {
01920       insertion_time_elem_t *elem = queue->insertion_times->first;
01921       queue->insertion_times->first = elem->next;
01922       mp_pool_release(elem);
01923     }
01924     tor_free(queue->insertion_times);
01925   }
01926 }
01927 
01930 static INLINE packed_cell_t *
01931 cell_queue_pop(cell_queue_t *queue)
01932 {
01933   packed_cell_t *cell = queue->head;
01934   if (!cell)
01935     return NULL;
01936   queue->head = cell->next;
01937   if (cell == queue->tail) {
01938     tor_assert(!queue->head);
01939     queue->tail = NULL;
01940   }
01941   --queue->n;
01942   return cell;
01943 }
01944 
01947 static INLINE circuit_t **
01948 next_circ_on_conn_p(circuit_t *circ, or_connection_t *conn)
01949 {
01950   tor_assert(circ);
01951   tor_assert(conn);
01952   if (conn == circ->n_conn) {
01953     return &circ->next_active_on_n_conn;
01954   } else {
01955     or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
01956     tor_assert(conn == orcirc->p_conn);
01957     return &orcirc->next_active_on_p_conn;
01958   }
01959 }
01960 
01963 static INLINE circuit_t **
01964 prev_circ_on_conn_p(circuit_t *circ, or_connection_t *conn)
01965 {
01966   tor_assert(circ);
01967   tor_assert(conn);
01968   if (conn == circ->n_conn) {
01969     return &circ->prev_active_on_n_conn;
01970   } else {
01971     or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
01972     tor_assert(conn == orcirc->p_conn);
01973     return &orcirc->prev_active_on_p_conn;
01974   }
01975 }
01976 
01978 static int
01979 compare_cell_ewma_counts(const void *p1, const void *p2)
01980 {
01981   const cell_ewma_t *e1=p1, *e2=p2;
01982   if (e1->cell_count < e2->cell_count)
01983     return -1;
01984   else if (e1->cell_count > e2->cell_count)
01985     return 1;
01986   else
01987     return 0;
01988 }
01989 
01991 static circuit_t *
01992 cell_ewma_to_circuit(cell_ewma_t *ewma)
01993 {
01994   if (ewma->is_for_p_conn) {
01995     /* This is an or_circuit_t's p_cell_ewma. */
01996     or_circuit_t *orcirc = SUBTYPE_P(ewma, or_circuit_t, p_cell_ewma);
01997     return TO_CIRCUIT(orcirc);
01998   } else {
01999     /* This is some circuit's n_cell_ewma. */
02000     return SUBTYPE_P(ewma, circuit_t, n_cell_ewma);
02001   }
02002 }
02003 
02004 /* ==== Functions for scaling cell_ewma_t ====
02005 
02006    When choosing which cells to relay first, we favor circuits that have been
02007    quiet recently.  This gives better latency on connections that aren't
02008    pushing lots of data, and makes the network feel more interactive.
02009 
02010    Conceptually, we take an exponentially weighted mean average of the number
02011    of cells a circuit has sent, and allow active circuits (those with cells to
02012    relay) to send cells in reverse order of their exponentially-weighted mean
02013    average (EWMA) cell count.  [That is, a cell sent N seconds ago 'counts'
02014    F^N times as much as a cell sent now, for 0<F<1.0, and we favor the
02015    circuit that has sent the fewest cells]
02016 
02017    If 'double' had infinite precision, we could do this simply by counting a
02018    cell sent at startup as having weight 1.0, and a cell sent N seconds later
02019    as having weight F^-N.  This way, we would never need to re-scale
02020    any already-sent cells.
02021 
02022    To prevent double from overflowing, we could count a cell sent now as
02023    having weight 1.0 and a cell sent N seconds ago as having weight F^N.
02024    This, however, would mean we'd need to re-scale *ALL* old circuits every
02025    time we wanted to send a cell.
02026 
02027    So as a compromise, we divide time into 'ticks' (currently, 10-second
02028    increments) and say that a cell sent at the start of a current tick is
02029    worth 1.0, a cell sent N seconds before the start of the current tick is
02030    worth F^N, and a cell sent N seconds after the start of the current tick is
02031    worth F^-N.  This way we don't overflow, and we don't need to constantly
02032    rescale.
02033  */
02034 
02036 #define EWMA_TICK_LEN 10
02037 
02040 #define EWMA_DEFAULT_HALFLIFE 0.0
02041 
02049 static unsigned
02050 cell_ewma_tick_from_timeval(const struct timeval *now,
02051                             double *remainder_out)
02052 {
02053   unsigned res = (unsigned) (now->tv_sec / EWMA_TICK_LEN);
02054   /* rem */
02055   double rem = (now->tv_sec % EWMA_TICK_LEN) +
02056     ((double)(now->tv_usec)) / 1.0e6;
02057   *remainder_out = rem / EWMA_TICK_LEN;
02058   return res;
02059 }
02060 
02062 unsigned
02063 cell_ewma_get_tick(void)
02064 {
02065   return ((unsigned)approx_time() / EWMA_TICK_LEN);
02066 }
02067 
02072 static double ewma_scale_factor = 0.1;
02073 /* DOCDOC ewma_enabled */
02074 static int ewma_enabled = 0;
02075 
02076 /*DOCDOC*/
02077 #define EPSILON 0.00001
02078 /*DOCDOC*/
02079 #define LOG_ONEHALF -0.69314718055994529
02080 
02082 void
02083 cell_ewma_set_scale_factor(const or_options_t *options,
02084                            const networkstatus_t *consensus)
02085 {
02086   int32_t halflife_ms;
02087   double halflife;
02088   const char *source;
02089   if (options && options->CircuitPriorityHalflife >= -EPSILON) {
02090     halflife = options->CircuitPriorityHalflife;
02091     source = "CircuitPriorityHalflife in configuration";
02092   } else if (consensus && (halflife_ms = networkstatus_get_param(
02093                  consensus, "CircuitPriorityHalflifeMsec",
02094                  -1, -1, INT32_MAX)) >= 0) {
02095     halflife = ((double)halflife_ms)/1000.0;
02096     source = "CircuitPriorityHalflifeMsec in consensus";
02097   } else {
02098     halflife = EWMA_DEFAULT_HALFLIFE;
02099     source = "Default value";
02100   }
02101 
02102   if (halflife <= EPSILON) {
02103     /* The cell EWMA algorithm is disabled. */
02104     ewma_scale_factor = 0.1;
02105     ewma_enabled = 0;
02106     log_info(LD_OR,
02107              "Disabled cell_ewma algorithm because of value in %s",
02108              source);
02109   } else {
02110     /* convert halflife into halflife-per-tick. */
02111     halflife /= EWMA_TICK_LEN;
02112     /* compute per-tick scale factor. */
02113     ewma_scale_factor = exp( LOG_ONEHALF / halflife );
02114     ewma_enabled = 1;
02115     log_info(LD_OR,
02116              "Enabled cell_ewma algorithm because of value in %s; "
02117              "scale factor is %f per %d seconds",
02118              source, ewma_scale_factor, EWMA_TICK_LEN);
02119   }
02120 }
02121 
02124 static INLINE double
02125 get_scale_factor(unsigned from_tick, unsigned to_tick)
02126 {
02127   /* This math can wrap around, but that's okay: unsigned overflow is
02128      well-defined */
02129   int diff = (int)(to_tick - from_tick);
02130   return pow(ewma_scale_factor, diff);
02131 }
02132 
02135 static void
02136 scale_single_cell_ewma(cell_ewma_t *ewma, unsigned cur_tick)
02137 {
02138   double factor = get_scale_factor(ewma->last_adjusted_tick, cur_tick);
02139   ewma->cell_count *= factor;
02140   ewma->last_adjusted_tick = cur_tick;
02141 }
02142 
02145 static void
02146 scale_active_circuits(or_connection_t *conn, unsigned cur_tick)
02147 {
02148 
02149   double factor = get_scale_factor(
02150               conn->active_circuit_pqueue_last_recalibrated,
02151               cur_tick);
02154   SMARTLIST_FOREACH(conn->active_circuit_pqueue, cell_ewma_t *, e, {
02155       tor_assert(e->last_adjusted_tick ==
02156                  conn->active_circuit_pqueue_last_recalibrated);
02157       e->cell_count *= factor;
02158       e->last_adjusted_tick = cur_tick;
02159   });
02160   conn->active_circuit_pqueue_last_recalibrated = cur_tick;
02161 }
02162 
02165 static void
02166 add_cell_ewma_to_conn(or_connection_t *conn, cell_ewma_t *ewma)
02167 {
02168   tor_assert(ewma->heap_index == -1);
02169   scale_single_cell_ewma(ewma,
02170                          conn->active_circuit_pqueue_last_recalibrated);
02171 
02172   smartlist_pqueue_add(conn->active_circuit_pqueue,
02173                        compare_cell_ewma_counts,
02174                        STRUCT_OFFSET(cell_ewma_t, heap_index),
02175                        ewma);
02176 }
02177 
02179 static void
02180 remove_cell_ewma_from_conn(or_connection_t *conn, cell_ewma_t *ewma)
02181 {
02182   tor_assert(ewma->heap_index != -1);
02183   smartlist_pqueue_remove(conn->active_circuit_pqueue,
02184                           compare_cell_ewma_counts,
02185                           STRUCT_OFFSET(cell_ewma_t, heap_index),
02186                           ewma);
02187 }
02188 
02191 static cell_ewma_t *
02192 pop_first_cell_ewma_from_conn(or_connection_t *conn)
02193 {
02194   return smartlist_pqueue_pop(conn->active_circuit_pqueue,
02195                               compare_cell_ewma_counts,
02196                               STRUCT_OFFSET(cell_ewma_t, heap_index));
02197 }
02198 
02201 void
02202 make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn)
02203 {
02204   circuit_t **nextp = next_circ_on_conn_p(circ, conn);
02205   circuit_t **prevp = prev_circ_on_conn_p(circ, conn);
02206 
02207   if (*nextp && *prevp) {
02208     /* Already active. */
02209     return;
02210   }
02211 
02212   assert_active_circuits_ok_paranoid(conn);
02213 
02214   if (! conn->active_circuits) {
02215     conn->active_circuits = circ;
02216     *prevp = *nextp = circ;
02217   } else {
02218     circuit_t *head = conn->active_circuits;
02219     circuit_t *old_tail = *prev_circ_on_conn_p(head, conn);
02220     *next_circ_on_conn_p(old_tail, conn) = circ;
02221     *nextp = head;
02222     *prev_circ_on_conn_p(head, conn) = circ;
02223     *prevp = old_tail;
02224   }
02225 
02226   if (circ->n_conn == conn) {
02227     add_cell_ewma_to_conn(conn, &circ->n_cell_ewma);
02228   } else {
02229     or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
02230     tor_assert(conn == orcirc->p_conn);
02231     add_cell_ewma_to_conn(conn, &orcirc->p_cell_ewma);
02232   }
02233 
02234   assert_active_circuits_ok_paranoid(conn);
02235 }
02236 
02239 void
02240 make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn)
02241 {
02242   circuit_t **nextp = next_circ_on_conn_p(circ, conn);
02243   circuit_t **prevp = prev_circ_on_conn_p(circ, conn);
02244   circuit_t *next = *nextp, *prev = *prevp;
02245 
02246   if (!next && !prev) {
02247     /* Already inactive. */
02248     return;
02249   }
02250 
02251   assert_active_circuits_ok_paranoid(conn);
02252 
02253   tor_assert(next && prev);
02254   tor_assert(*prev_circ_on_conn_p(next, conn) == circ);
02255   tor_assert(*next_circ_on_conn_p(prev, conn) == circ);
02256 
02257   if (next == circ) {
02258     conn->active_circuits = NULL;
02259   } else {
02260     *prev_circ_on_conn_p(next, conn) = prev;
02261     *next_circ_on_conn_p(prev, conn) = next;
02262     if (conn->active_circuits == circ)
02263       conn->active_circuits = next;
02264   }
02265   *prevp = *nextp = NULL;
02266 
02267   if (circ->n_conn == conn) {
02268     remove_cell_ewma_from_conn(conn, &circ->n_cell_ewma);
02269   } else {
02270     or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
02271     tor_assert(conn == orcirc->p_conn);
02272     remove_cell_ewma_from_conn(conn, &orcirc->p_cell_ewma);
02273   }
02274 
02275   assert_active_circuits_ok_paranoid(conn);
02276 }
02277 
02280 void
02281 connection_or_unlink_all_active_circs(or_connection_t *orconn)
02282 {
02283   circuit_t *head = orconn->active_circuits;
02284   circuit_t *cur = head;
02285   if (! head)
02286     return;
02287   do {
02288     circuit_t *next = *next_circ_on_conn_p(cur, orconn);
02289     *prev_circ_on_conn_p(cur, orconn) = NULL;
02290     *next_circ_on_conn_p(cur, orconn) = NULL;
02291     cur = next;
02292   } while (cur != head);
02293   orconn->active_circuits = NULL;
02294 
02295   SMARTLIST_FOREACH(orconn->active_circuit_pqueue, cell_ewma_t *, e,
02296                     e->heap_index = -1);
02297   smartlist_clear(orconn->active_circuit_pqueue);
02298 }
02299 
02309 static int
02310 set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
02311                             int block, streamid_t stream_id)
02312 {
02313   edge_connection_t *edge = NULL;
02314   int n = 0;
02315   if (circ->n_conn == orconn) {
02316     circ->streams_blocked_on_n_conn = block;
02317     if (CIRCUIT_IS_ORIGIN(circ))
02318       edge = TO_ORIGIN_CIRCUIT(circ)->p_streams;
02319   } else {
02320     circ->streams_blocked_on_p_conn = block;
02321     tor_assert(!CIRCUIT_IS_ORIGIN(circ));
02322     edge = TO_OR_CIRCUIT(circ)->n_streams;
02323   }
02324 
02325   for (; edge; edge = edge->next_stream) {
02326     connection_t *conn = TO_CONN(edge);
02327     if (stream_id && edge->stream_id != stream_id)
02328       continue;
02329 
02330     if (edge->edge_blocked_on_circ != block) {
02331       ++n;
02332       edge->edge_blocked_on_circ = block;
02333     }
02334 
02335     if (!conn->read_event && !HAS_BUFFEREVENT(conn)) {
02336       /* This connection is a placeholder for something; probably a DNS
02337        * request.  It can't actually stop or start reading.*/
02338       continue;
02339     }
02340 
02341     if (block) {
02342       if (connection_is_reading(conn))
02343         connection_stop_reading(conn);
02344     } else {
02345       /* Is this right? */
02346       if (!connection_is_reading(conn))
02347         connection_start_reading(conn);
02348     }
02349   }
02350 
02351   return n;
02352 }
02353 
02358 int
02359 connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
02360                                               time_t now)
02361 {
02362   int n_flushed;
02363   cell_queue_t *queue;
02364   circuit_t *circ;
02365   int streams_blocked;
02366 
02367   /* The current (hi-res) time */
02368   struct timeval now_hires;
02369 
02370   /* The EWMA cell counter for the circuit we're flushing. */
02371   cell_ewma_t *cell_ewma = NULL;
02372   double ewma_increment = -1;
02373 
02374   circ = conn->active_circuits;
02375   if (!circ) return 0;
02376   assert_active_circuits_ok_paranoid(conn);
02377 
02378   /* See if we're doing the ewma circuit selection algorithm. */
02379   if (ewma_enabled) {
02380     unsigned tick;
02381     double fractional_tick;
02382     tor_gettimeofday_cached(&now_hires);
02383     tick = cell_ewma_tick_from_timeval(&now_hires, &fractional_tick);
02384 
02385     if (tick != conn->active_circuit_pqueue_last_recalibrated) {
02386       scale_active_circuits(conn, tick);
02387     }
02388 
02389     ewma_increment = pow(ewma_scale_factor, -fractional_tick);
02390 
02391     cell_ewma = smartlist_get(conn->active_circuit_pqueue, 0);
02392     circ = cell_ewma_to_circuit(cell_ewma);
02393   }
02394 
02395   if (circ->n_conn == conn) {
02396     queue = &circ->n_conn_cells;
02397     streams_blocked = circ->streams_blocked_on_n_conn;
02398   } else {
02399     queue = &TO_OR_CIRCUIT(circ)->p_conn_cells;
02400     streams_blocked = circ->streams_blocked_on_p_conn;
02401   }
02402   tor_assert(*next_circ_on_conn_p(circ,conn));
02403 
02404   for (n_flushed = 0; n_flushed < max && queue->head; ) {
02405     packed_cell_t *cell = cell_queue_pop(queue);
02406     tor_assert(*next_circ_on_conn_p(circ,conn));
02407 
02408     /* Calculate the exact time that this cell has spent in the queue. */
02409     if (get_options()->CellStatistics && !CIRCUIT_IS_ORIGIN(circ)) {
02410       struct timeval tvnow;
02411       uint32_t flushed;
02412       uint32_t cell_waiting_time;
02413       insertion_time_queue_t *it_queue = queue->insertion_times;
02414       tor_gettimeofday_cached(&tvnow);
02415       flushed = (uint32_t)((tvnow.tv_sec % SECONDS_IN_A_DAY) * 100L +
02416                  (uint32_t)tvnow.tv_usec / (uint32_t)10000L);
02417       if (!it_queue || !it_queue->first) {
02418         log_info(LD_GENERAL, "Cannot determine insertion time of cell. "
02419                              "Looks like the CellStatistics option was "
02420                              "recently enabled.");
02421       } else {
02422         or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
02423         insertion_time_elem_t *elem = it_queue->first;
02424         cell_waiting_time =
02425             (uint32_t)((flushed * 10L + SECONDS_IN_A_DAY * 1000L -
02426                         elem->insertion_time * 10L) %
02427                        (SECONDS_IN_A_DAY * 1000L));
02428 #undef SECONDS_IN_A_DAY
02429         elem->counter--;
02430         if (elem->counter < 1) {
02431           it_queue->first = elem->next;
02432           if (elem == it_queue->last)
02433             it_queue->last = NULL;
02434           mp_pool_release(elem);
02435         }
02436         orcirc->total_cell_waiting_time += cell_waiting_time;
02437         orcirc->processed_cells++;
02438       }
02439     }
02440 
02441     /* If we just flushed our queue and this circuit is used for a
02442      * tunneled directory request, possibly advance its state. */
02443     if (queue->n == 0 && TO_CONN(conn)->dirreq_id)
02444       geoip_change_dirreq_state(TO_CONN(conn)->dirreq_id,
02445                                 DIRREQ_TUNNELED,
02446                                 DIRREQ_CIRC_QUEUE_FLUSHED);
02447 
02448     connection_write_to_buf(cell->body, CELL_NETWORK_SIZE, TO_CONN(conn));
02449 
02450     packed_cell_free_unchecked(cell);
02451     ++n_flushed;
02452     if (cell_ewma) {
02453       cell_ewma_t *tmp;
02454       cell_ewma->cell_count += ewma_increment;
02455       /* We pop and re-add the cell_ewma_t here, not above, since we need to
02456        * re-add it immediately to keep the priority queue consistent with
02457        * the linked-list implementation */
02458       tmp = pop_first_cell_ewma_from_conn(conn);
02459       tor_assert(tmp == cell_ewma);
02460       add_cell_ewma_to_conn(conn, cell_ewma);
02461     }
02462     if (circ != conn->active_circuits) {
02463       /* If this happens, the current circuit just got made inactive by
02464        * a call in connection_write_to_buf().  That's nothing to worry about:
02465        * circuit_make_inactive_on_conn() already advanced conn->active_circuits
02466        * for us.
02467        */
02468       assert_active_circuits_ok_paranoid(conn);
02469       goto done;
02470     }
02471   }
02472   tor_assert(*next_circ_on_conn_p(circ,conn));
02473   assert_active_circuits_ok_paranoid(conn);
02474   conn->active_circuits = *next_circ_on_conn_p(circ, conn);
02475 
02476   /* Is the cell queue low enough to unblock all the streams that are waiting
02477    * to write to this circuit? */
02478   if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
02479     set_streams_blocked_on_circ(circ, conn, 0, 0); /* unblock streams */
02480 
02481   /* Did we just run out of cells on this circuit's queue? */
02482   if (queue->n == 0) {
02483     log_debug(LD_GENERAL, "Made a circuit inactive.");
02484     make_circuit_inactive_on_conn(circ, conn);
02485   }
02486  done:
02487   if (n_flushed)
02488     conn->timestamp_last_added_nonpadding = now;
02489   return n_flushed;
02490 }
02491 
02494 void
02495 append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
02496                              cell_t *cell, cell_direction_t direction,
02497                              streamid_t fromstream)
02498 {
02499   cell_queue_t *queue;
02500   int streams_blocked;
02501   if (circ->marked_for_close)
02502     return;
02503 
02504   if (direction == CELL_DIRECTION_OUT) {
02505     queue = &circ->n_conn_cells;
02506     streams_blocked = circ->streams_blocked_on_n_conn;
02507   } else {
02508     or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
02509     queue = &orcirc->p_conn_cells;
02510     streams_blocked = circ->streams_blocked_on_p_conn;
02511   }
02512 
02513   cell_queue_append_packed_copy(queue, cell);
02514 
02515   /* If we have too many cells on the circuit, we should stop reading from
02516    * the edge streams for a while. */
02517   if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE)
02518     set_streams_blocked_on_circ(circ, orconn, 1, 0); /* block streams */
02519 
02520   if (streams_blocked && fromstream) {
02521     /* This edge connection is apparently not blocked; block it. */
02522     set_streams_blocked_on_circ(circ, orconn, 1, fromstream);
02523   }
02524 
02525   if (queue->n == 1) {
02526     /* This was the first cell added to the queue.  We need to make this
02527      * circuit active. */
02528     log_debug(LD_GENERAL, "Made a circuit active.");
02529     make_circuit_active_on_conn(circ, orconn);
02530   }
02531 
02532   if (! connection_get_outbuf_len(TO_CONN(orconn))) {
02533     /* There is no data at all waiting to be sent on the outbuf.  Add a
02534      * cell, so that we can notice when it gets flushed, flushed_some can
02535      * get called, and we can start putting more data onto the buffer then.
02536      */
02537     log_debug(LD_GENERAL, "Primed a buffer.");
02538     connection_or_flush_from_first_active_circuit(orconn, 1, approx_time());
02539   }
02540 }
02541 
02549 int
02550 append_address_to_payload(uint8_t *payload_out, const tor_addr_t *addr)
02551 {
02552   uint32_t a;
02553   switch (tor_addr_family(addr)) {
02554   case AF_INET:
02555     payload_out[0] = RESOLVED_TYPE_IPV4;
02556     payload_out[1] = 4;
02557     a = tor_addr_to_ipv4n(addr);
02558     memcpy(payload_out+2, &a, 4);
02559     return 6;
02560   case AF_INET6:
02561     payload_out[0] = RESOLVED_TYPE_IPV6;
02562     payload_out[1] = 16;
02563     memcpy(payload_out+2, tor_addr_to_in6_addr8(addr), 16);
02564     return 18;
02565   case AF_UNSPEC:
02566   default:
02567     return -1;
02568   }
02569 }
02570 
02575 const uint8_t *
02576 decode_address_from_payload(tor_addr_t *addr_out, const uint8_t *payload,
02577                             int payload_len)
02578 {
02579   if (payload_len < 2)
02580     return NULL;
02581   if (payload_len < 2+payload[1])
02582     return NULL;
02583 
02584   switch (payload[0]) {
02585   case RESOLVED_TYPE_IPV4:
02586     if (payload[1] != 4)
02587       return NULL;
02588     tor_addr_from_ipv4n(addr_out, get_uint32(payload+2));
02589     break;
02590   case RESOLVED_TYPE_IPV6:
02591     if (payload[1] != 16)
02592       return NULL;
02593     tor_addr_from_ipv6_bytes(addr_out, (char*)(payload+2));
02594     break;
02595   default:
02596     tor_addr_make_unspec(addr_out);
02597     break;
02598   }
02599   return payload + 2 + payload[1];
02600 }
02601 
02603 void
02604 circuit_clear_cell_queue(circuit_t *circ, or_connection_t *orconn)
02605 {
02606   cell_queue_t *queue;
02607   if (circ->n_conn == orconn) {
02608     queue = &circ->n_conn_cells;
02609   } else {
02610     or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
02611     tor_assert(orcirc->p_conn == orconn);
02612     queue = &orcirc->p_conn_cells;
02613   }
02614 
02615   if (queue->n)
02616     make_circuit_inactive_on_conn(circ,orconn);
02617 
02618   cell_queue_clear(queue);
02619 }
02620 
02623 void
02624 assert_active_circuits_ok(or_connection_t *orconn)
02625 {
02626   circuit_t *head = orconn->active_circuits;
02627   circuit_t *cur = head;
02628   int n = 0;
02629   if (! head)
02630     return;
02631   do {
02632     circuit_t *next = *next_circ_on_conn_p(cur, orconn);
02633     circuit_t *prev = *prev_circ_on_conn_p(cur, orconn);
02634     cell_ewma_t *ewma;
02635     tor_assert(next);
02636     tor_assert(prev);
02637     tor_assert(*next_circ_on_conn_p(prev, orconn) == cur);
02638     tor_assert(*prev_circ_on_conn_p(next, orconn) == cur);
02639     if (orconn == cur->n_conn) {
02640       ewma = &cur->n_cell_ewma;
02641       tor_assert(!ewma->is_for_p_conn);
02642     } else {
02643       ewma = &TO_OR_CIRCUIT(cur)->p_cell_ewma;
02644       tor_assert(ewma->is_for_p_conn);
02645     }
02646     tor_assert(ewma->heap_index != -1);
02647     tor_assert(ewma == smartlist_get(orconn->active_circuit_pqueue,
02648                                      ewma->heap_index));
02649     n++;
02650     cur = next;
02651   } while (cur != head);
02652 
02653   tor_assert(n == smartlist_len(orconn->active_circuit_pqueue));
02654 }
02655 
02659 static int
02660 circuit_queue_streams_are_blocked(circuit_t *circ)
02661 {
02662   if (CIRCUIT_IS_ORIGIN(circ)) {
02663     return circ->streams_blocked_on_n_conn;
02664   } else {
02665     return circ->streams_blocked_on_p_conn;
02666   }
02667 }
02668