Changeset 497


Ignore:
Timestamp:
03/26/09 17:36:44 (3 years ago)
Author:
eagle
Message:

SOCKS connector completely rewritten. Works now without dynamic thread pool. This is more memory efficient.

Location:
trunk
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • trunk/ChangeLog

    r491 r497  
    11* version 0.1.12 
     2 - socks connector completely rewritten. Works now 
     3   without dynamic thread pool. This is more memory efficient. 
    24 - Windows branch merges into trunk 
    35 - syslogging implemented 
  • trunk/configure

    r496 r497  
    11#! /bin/sh 
    22# Guess values for system-dependent variables and create Makefiles. 
    3 # Generated by GNU Autoconf 2.63 for onioncat 0.1.12.r493. 
     3# Generated by GNU Autoconf 2.63 for onioncat 0.1.12.r496. 
    44# 
    55# Report bugs to <rahra@cypherpunk.at>. 
     
    597597PACKAGE_NAME='onioncat' 
    598598PACKAGE_TARNAME='onioncat' 
    599 PACKAGE_VERSION='0.1.12.r493' 
    600 PACKAGE_STRING='onioncat 0.1.12.r493' 
     599PACKAGE_VERSION='0.1.12.r496' 
     600PACKAGE_STRING='onioncat 0.1.12.r496' 
    601601PACKAGE_BUGREPORT='rahra@cypherpunk.at' 
    602602 
     
    12501250  # This message is too long to be a string in the A/UX 3.1 sh. 
    12511251  cat <<_ACEOF 
    1252 \`configure' configures onioncat 0.1.12.r493 to adapt to many kinds of systems. 
     1252\`configure' configures onioncat 0.1.12.r496 to adapt to many kinds of systems. 
    12531253 
    12541254Usage: $0 [OPTION]... [VAR=VALUE]... 
     
    13161316if test -n "$ac_init_help"; then 
    13171317  case $ac_init_help in 
    1318      short | recursive ) echo "Configuration of onioncat 0.1.12.r493:";; 
     1318     short | recursive ) echo "Configuration of onioncat 0.1.12.r496:";; 
    13191319   esac 
    13201320  cat <<\_ACEOF 
     
    14081408if $ac_init_version; then 
    14091409  cat <<\_ACEOF 
    1410 onioncat configure 0.1.12.r493 
     1410onioncat configure 0.1.12.r496 
    14111411generated by GNU Autoconf 2.63 
    14121412 
     
    14221422running configure, to aid debugging if configure makes a mistake. 
    14231423 
    1424 It was created by onioncat $as_me 0.1.12.r493, which was 
     1424It was created by onioncat $as_me 0.1.12.r496, which was 
    14251425generated by GNU Autoconf 2.63.  Invocation command line was 
    14261426 
     
    21382138# Define the identity of the package. 
    21392139 PACKAGE='onioncat' 
    2140  VERSION='0.1.12.r493' 
     2140 VERSION='0.1.12.r496' 
    21412141 
    21422142 
     
    22852285 
    22862286cat >>confdefs.h <<\_ACEOF 
    2287 #define SVN_REVISION "493" 
     2287#define SVN_REVISION "496" 
    22882288_ACEOF 
    22892289 
     
    48044804# values after options handling. 
    48054805ac_log=" 
    4806 This file was extended by onioncat $as_me 0.1.12.r493, which was 
     4806This file was extended by onioncat $as_me 0.1.12.r496, which was 
    48074807generated by GNU Autoconf 2.63.  Invocation command line was 
    48084808 
     
    48674867cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1 
    48684868ac_cs_version="\\ 
    4869 onioncat config.status 0.1.12.r493 
     4869onioncat config.status 0.1.12.r496 
    48704870configured by $0, generated by GNU Autoconf 2.63, 
    48714871  with options \\"`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`\\" 
  • trunk/glob_id.txt

    r313 r497  
    88=> IPv6: FD87:D87E:EB43::/48 
    99 
     10/*  
     11   DO NOT USE THIS! 
     12   IT IS JUST A PERSONAL REMARK... 
     13generation for I2P 
     14NTP timestamp: 0xcd6cd83f947d805e 
     15EUI-64: 21b:24ff:fece:d108 
     16=> key: cd6cd83f947d805e21b24fffeced108 
     17=> SHA1: 366168b7aa3fd7326df894b907991b60db4dddb5 
     18=> global ID: 60db4dddb5 
     19=> IPv6: FD60:DB4D:DDB5::/48 
     20*/ 
     21 
  • trunk/src/ocat.c

    r486 r497  
    468468   } 
    469469 
    470    // create socks connector thread 
    471    run_ocat_thread("connector", socks_connector, NULL); 
     470   // create socks connector thread and communication queue 
     471   if (pipe(CNF(socksfd)) == -1) 
     472      log_msg(LOG_EMERG, "couldn't create socks connector pipe: \"%s\"", strerror(errno)), exit(1); 
     473   run_ocat_thread("connector", socks_connector_sel, NULL); 
     474 
    472475#ifdef PACKET_QUEUE 
    473476   // start packet dequeuer 
  • trunk/src/ocat.h

    r492 r497  
    187187#define THREAD_STACK_SIZE 262144 
    188188 
     189#define SOCKS_NEW 0 
    189190#define SOCKS_CONNECTING 1 
     191#define SOCKS_4AREQ_SENT 2 
     192#define SOCKS_4ARESPONSE 3 
     193#define SOCKS_DELETE 127 
     194 
    190195#define SOCKS_MAX_RETRY 3 
    191196 
     
    217222//! RECONN_ATTEMPTS must not be faster than MIN_RECONNECT_TIME 
    218223#define MIN_RECONNECT_TIME 30 
     224 
     225#define MFD_SET(f,s,m) {FD_SET(f, s); m = f > m ? f : m;} 
    219226 
    220227//! copy an IPv6 address from b to a 
     
    297304   int *ctrl_listen_fd; 
    298305   int ctrl_listen_cnt; 
     306   //! communication pipe for socks "selected" connector 
     307   int socksfd[2]; 
    299308}; 
    300309 
     
    369378   int state; 
    370379   int perm; 
     380   int fd; 
     381   time_t restart_time; 
     382   time_t connect_time; 
     383   int retry; 
    371384} SocksQueue_t; 
    372385 
     
    507520#endif 
    508521void *socket_acceptor(void *); 
    509 void *socks_connector(void *); 
    510522void *socket_cleaner(void *); 
    511523void *ocat_controller(void *); 
     
    578590void print_socks_queue(FILE *); 
    579591void sig_socks_connector(void); 
     592void *socks_connector_sel(void *); 
    580593 
    581594/* ocatlibe.c */ 
  • trunk/src/ocatctrl.c

    r433 r497  
    3535void *ctrl_handler(void *p) 
    3636{ 
    37    int fd, c; 
     37   int fd, c, i; 
    3838   FILE *ff, *fo; 
    3939   char buf[FRAME_SIZE], addrstr[INET6_ADDRSTRLEN], onionstr[ONION_NAME_SIZE], timestr[32], *s, *tokbuf; 
     
    4242   OcatPeer_t *peer; 
    4343   struct in6_addr in6; 
     44   int pfd[2]; 
    4445 
    4546   detach_thread(); 
     47 
     48   if (pipe(pfd) == -1) 
     49      log_msg(LOG_EMERG, "couldn't create pipe: \"%s\"", strerror(errno)), exit(1); 
    4650 
    4751   fd = (int) p; 
     
    240244      else if (!strcmp(buf, "queue")) 
    241245      { 
    242          print_socks_queue(ff); 
     246         print_socks_queue((FILE*) pfd[1]); 
     247         for (;;) 
     248         { 
     249            read(pfd[0], buf, 1); 
     250            if (!buf[0]) 
     251               break; 
     252            fprintf(ff, "%c", buf[0]); 
     253         } 
    243254      } 
    244255      else if (!strcmp(buf, "setup")) 
  • trunk/src/ocatsetup.c

    r486 r497  
    9898   2 
    9999#endif 
     100   , 
     101   // socksfd 
     102   {-1, -1} 
    100103}; 
    101104 
  • trunk/src/ocatsocks.c

    r435 r497  
    3030// SOCKS connector queue vars 
    3131static SocksQueue_t *socks_queue_ = NULL; 
    32 static int socks_connect_cnt_ = 0; 
    33 static int socks_thread_cnt_ = 0; 
    34 static pthread_mutex_t socks_queue_mutex_ = PTHREAD_MUTEX_INITIALIZER; 
    35 static pthread_cond_t socks_queue_cond_ = PTHREAD_COND_INITIALIZER; 
    36  
    37  
    38 int socks_srv_con(void) 
    39 { 
    40    int fd, t, maxfd, so_err; 
    41    socklen_t err_len; 
    42    char addr[INET6_ADDRSTRLEN]; 
    43    fd_set cset; 
    44    struct timeval tv; 
    45  
    46    if ((fd = socket(CNF(socks_dst)->sin_family == AF_INET ? PF_INET : PF_INET6, SOCK_STREAM, 0)) < 0) 
    47       return E_SOCKS_SOCK; 
    48    set_nonblock(fd); 
    49  
    50    t = time(NULL); 
    51    if (connect(fd, (struct sockaddr*) CNF(socks_dst), SOCKADDR_SIZE(CNF(socks_dst))) == -1) 
    52    { 
    53       if (errno != EINPROGRESS) 
    54       { 
    55          log_msg(LOG_ERR, "connect() to SOCKS port %s:%d failed: \"%s\". Sleeping for %d seconds.",  
    56             inet_ntop(CNF(socks_dst)->sin_family,  
    57                CNF(socks_dst)->sin_family == AF_INET ? (char*) &CNF(socks_dst)->sin_addr : (char*) &CNF(socks_dst6)->sin6_addr, addr, sizeof(addr)),  
    58             ntohs(CNF(socks_dst)->sin_port), strerror(errno), TOR_SOCKS_CONN_TIMEOUT); 
    59          oe_close(fd); 
    60          sleep(TOR_SOCKS_CONN_TIMEOUT); 
    61          return E_SOCKS_CONN; 
    62       } 
    63       log_debug("connection in progress"); 
    64    } 
    65    else 
    66    { 
    67       log_debug("connected"); 
    68       return fd; 
    69    } 
    70  
    71    for (;;) 
    72    { 
    73       if (term_req()) 
    74       { 
    75          oe_close(fd); 
    76          return -1; 
    77       } 
    78  
    79       FD_ZERO(&cset); 
    80       FD_SET(fd, &cset); 
    81       maxfd = fd; 
    82  
    83       set_select_timeout(&tv); 
    84       log_debug("selecting (maxfd = %d)", maxfd); 
    85       if ((maxfd = select(maxfd + 1, NULL, &cset, NULL, &tv)) == -1) 
    86       { 
    87          log_msg(LOG_EMERG, "select encountered error: \"%s\", restarting", strerror(errno)); 
    88          continue; 
    89       } 
    90       log_debug("select returned %d", maxfd); 
    91  
    92       if (!maxfd) 
    93       { 
    94          log_debug("select timed out, restarting"); 
    95          continue; 
    96       } 
    97  
    98       if (!FD_ISSET(fd, &cset)) 
    99       { 
    100          log_msg(LOG_ERR, "fd %d not in fd_set! Restarting.", fd); 
    101          continue; 
    102       } 
    103  
    104       // test if connect() worked 
    105       log_debug("check socket error"); 
    106       err_len = sizeof(so_err); 
    107       if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_err, &err_len) == -1) 
    108       { 
    109          log_msg(LOG_ERR, "getsockopt failed: \"%s\"", strerror(errno)); 
    110          oe_close(fd); 
    111          return -1; 
    112       } 
    113       if (so_err) 
    114       { 
    115          log_msg(LOG_ERR, "getsockopt returned %d (\"%s\")", so_err, strerror(so_err)); 
    116          oe_close(fd); 
    117          return -1; 
    118       } 
    119       // everything seems to be ok, break loop 
    120       break; 
    121    } // for (;;) 
    122  
    123    log_debug("connected"); 
    124    return fd; 
    125 } 
    126  
    127  
    128 int socks_connect(const SocksQueue_t *sq) 
    129 { 
    130    int fd, t, len; 
    131    char buf[FRAME_SIZE], onion[ONION_NAME_SIZE]; 
     32 
     33#define SOCKS_BUFLEN (sizeof(SocksHdr_t) + ONION_NAME_SIZE + strlen(CNF(usrname)) + 2) 
     34 
     35 
     36int socks_send_request(const SocksQueue_t *sq) 
     37{ 
     38   int len, ret; 
     39   char buf[SOCKS_BUFLEN], onion[ONION_NAME_SIZE]; 
    13240   SocksHdr_t *shdr = (SocksHdr_t*) buf; 
    133    OcatPeer_t *peer; 
    134    int maxfd; 
    135    fd_set rset; 
    136    struct timeval tv; 
    13741 
    13842   ipv6tonion(&sq->addr, onion); 
    13943   strlcat(onion, ".onion", sizeof(onion)); 
    140  
    141    log_msg(LOG_INFO, "trying to connect to \"%s\" [%s]", onion, inet_ntop(AF_INET6, &sq->addr, buf, FRAME_SIZE)); 
    142    t = time(NULL); 
    143    if ((fd = socks_srv_con()) < 0) 
    144       return fd; 
     44   log_msg(LOG_INFO, "trying to connect to \"%s\" [%s]", onion, inet_ntop(AF_INET6, &sq->addr, buf, SOCKS_BUFLEN)); 
    14545 
    14646   log_debug("doing SOCKS4a handshake"); 
    147  
    14847   shdr->ver = 4; 
    14948   shdr->cmd = 1; 
    15049   shdr->port = htons(CNF(ocat_dest_port)); 
    15150   shdr->addr.s_addr = htonl(0x00000001); 
    152    /* 
    153    strlcpy(buf + sizeof(SocksHdr_t), usrname_, strlen(usrname_) + 1); 
    154    strlcpy(buf + sizeof(SocksHdr_t) + strlen(usrname_) + 1, onion, sizeof(onion)); 
    155    */ 
    15651   memcpy(buf + sizeof(SocksHdr_t), CNF(usrname), strlen(CNF(usrname)) + 1); 
    15752   memcpy(buf + sizeof(SocksHdr_t) + strlen(CNF(usrname)) + 1, onion, strlen(onion) + 1); 
    15853   len = sizeof(SocksHdr_t) + strlen(CNF(usrname)) + strlen(onion) + 2; 
    159    if (write(fd, shdr, len) != len) 
    160       // FIXME: there should be some additional error handling 
    161       log_msg(LOG_ERR, "couldn't write %d bytes to SOCKS connection %d", len, fd); 
    162    log_debug("connect request sent"); 
    163  
    164    for (;;) 
    165    { 
    166       if (term_req()) 
    167       { 
    168          oe_close(fd); 
    169          return E_SOCKS_TERMREQ; 
    170       } 
    171  
    172       FD_ZERO(&rset); 
    173       FD_SET(fd, &rset); 
    174       maxfd = fd; 
    175  
    176       set_select_timeout(&tv); 
    177       log_debug("selecting (maxfd = %d)", maxfd); 
    178       if ((maxfd = select(maxfd + 1, &rset, NULL, NULL, &tv)) == -1) 
    179       { 
    180          log_msg(LOG_EMERG, "select encountered error: \"%s\", restarting", strerror(errno)); 
    181          continue; 
    182       } 
    183  
    184       log_debug("select returned %d", maxfd); 
    185       if (!maxfd) 
    186       { 
    187          log_debug("select timed out, restarting"); 
    188          continue; 
    189       } 
    190  
    191       if (FD_ISSET(fd, &rset)) 
    192          break; 
    193  
    194       log_msg(LOG_ERR, "fd %d not in fd_set! Restarting.", fd); 
    195       continue; 
    196    } // for (;;) 
    197  
    198    if (read(fd, shdr, sizeof(SocksHdr_t)) < sizeof(SocksHdr_t)) 
    199    { 
    200       log_msg(LOG_ERR | LOG_FCONN, "short read, closing."); 
    201       oe_close(fd); 
    202       return E_SOCKS_REQ; 
    203    } 
     54   if ((ret = write(sq->fd, shdr, len)) == -1) 
     55   { 
     56      log_msg(LOG_ERR, "error writing %d bytes to fd %d: \"%s\"", len, sq->fd, strerror(errno)); 
     57      return -1; 
     58   } 
     59   if (ret < len) 
     60   { 
     61      log_msg(LOG_ERR, "SOCKS request truncated to %d of %d bytes", ret, len); 
     62      return -1; 
     63   } 
     64   log_debug("SOCKS request sent successfully"); 
     65   return 0; 
     66} 
     67 
     68 
     69int socks_rec_response(SocksQueue_t *sq) 
     70{ 
     71   SocksHdr_t shdr; 
     72   int ret, len; 
     73 
     74   len = sizeof(SocksHdr_t); 
     75   if ((ret = read(sq->fd, &shdr, len)) == -1) 
     76   { 
     77      log_msg(LOG_ERR, "reading SOCKS response on fd %d failed: \"%s\"", sq->fd, strerror(errno)); 
     78      return -1; 
     79   } 
     80   if (ret < len) 
     81   { 
     82      log_msg(LOG_ERR, "SOCKS response truncated to %d of %d bytes", ret, len); 
     83      return -1; 
     84   } 
     85 
    20486   log_debug("SOCKS response received"); 
    205  
    206    if (shdr->ver || (shdr->cmd != 90)) 
    207    { 
    208       log_msg(LOG_ERR, "request failed, reason = %d", shdr->cmd); 
    209       oe_close(fd); 
    210       return E_SOCKS_RQFAIL; 
    211    } 
    212    log_msg(LOG_INFO | LOG_FCONN, "connection to %s successfully opened on fd %d", onion, fd); 
    213  
    214    insert_peer(fd, sq, time(NULL) - t); 
     87   if (shdr.ver || (shdr.cmd != 90)) 
     88   { 
     89      log_msg(LOG_ERR, "SOCKS request failed, reason = %d", shdr.cmd); 
     90      return -1; 
     91   } 
     92 
     93   log_msg(LOG_INFO | LOG_FCONN, "SOCKS connection successfully opened on fd %d", sq->fd); 
     94   return 0; 
     95} 
     96 
     97 
     98int socks_activate_peer(SocksQueue_t *sq) 
     99{ 
     100   OcatPeer_t *peer; 
     101 
     102   insert_peer(sq->fd, sq, time(NULL) - sq->connect_time); 
    215103 
    216104   // Send first keepalive immediately 
     
    219107      lock_peer(peer); 
    220108   else 
    221       log_msg(LOG_EMERG, "newly inserted peer not found, fd = %d", fd); 
     109      log_msg(LOG_EMERG, "newly inserted peer not found, fd = %d", sq->fd); 
    222110   unlock_peers(); 
    223111   if (peer) 
     
    227115   } 
    228116 
    229    // return new file descriptor 
    230    return fd; 
     117   return 0; 
     118} 
     119 
     120 
     121void socks_pipe_request(const SocksQueue_t *sq) 
     122{ 
     123   fd_set wset; 
     124   int maxfd; 
     125   int len = sizeof(*sq), ret; 
     126 
     127   FD_ZERO(&wset); 
     128   FD_SET(CNF(socksfd[1]), &wset); 
     129   maxfd = CNF(socksfd[1]); 
     130 
     131   log_debug("selecting until socks request pipe gets ready"); 
     132 
     133      log_debug("selecting (maxfd = %d)", maxfd); 
     134      if ((maxfd = select(maxfd + 1, NULL, &wset, NULL, NULL)) == -1) 
     135      { 
     136         log_msg(LOG_EMERG, "select encountered error: \"%s\", restarting", strerror(errno)); 
     137         return; 
     138      } 
     139      log_debug("select returned %d", maxfd); 
     140 
     141 
     142      if (maxfd && FD_ISSET(CNF(socksfd[1]), &wset)) 
     143      { 
     144 
     145   log_debug("writing %d bytes to fd %d", len, CNF(socksfd[1])); 
     146   if ((ret = write(CNF(socksfd[1]), sq, len) == -1)) 
     147   { 
     148      log_msg(LOG_WARNING, "error writing to SOCKS request pipe fd %d: \"%s\"", CNF(socksfd[1]), strerror(errno)); 
     149   } 
     150   else if (ret < len) 
     151   { 
     152      log_msg(LOG_WARNING, "write to SOCKS request pipe fd %d truncated to %d bytes of %d", CNF(socksfd[1]), ret, len); 
     153   } 
     154   else 
     155   { 
     156      log_debug("wrote %d bytes to SOCKS request pipe fd %d", len, CNF(socksfd[1])); 
     157   } 
     158      } 
     159      else 
     160         log_msg(LOG_WARNING, "fd %d not in write set", CNF(socksfd[1])); 
    231161} 
    232162 
     
    234164void sig_socks_connector(void) 
    235165{ 
    236    pthread_cond_signal(&socks_queue_cond_); 
    237 } 
    238  
    239  
     166   SocksQueue_t sq; 
     167 
     168   memset(&sq, 0, sizeof(sq)); 
     169   socks_pipe_request(&sq); 
     170} 
     171 
     172 
     173/*! Add and link a SOCKS request to the SOCKS queue. 
     174 *  @param sq Request structure to add. 
     175 */ 
     176void socks_enqueue(const SocksQueue_t *sq) 
     177{ 
     178   SocksQueue_t *squeue; 
     179 
     180   log_debug("queueing new SOCKS connection request"); 
     181   if (!(squeue = malloc(sizeof(SocksQueue_t)))) 
     182      log_msg(LOG_EMERG, "could not get memory for SocksQueue entry: \"%s\"", strerror(errno)), exit(1); 
     183   memcpy(squeue, sq, sizeof(*squeue)); 
     184 
     185   squeue->next = socks_queue_; 
     186   socks_queue_ = squeue; 
     187} 
     188 
     189 
     190/*! Send a SOCKS request to the request pipe in order to get 
     191 *  added to the SOCKS queue with socks_enqueue() 
     192 *  @param addr IPv6 address to be requested 
     193 *  @param perm 1 if connection should kept opened inifitely after successful request, 0 else. 
     194 */ 
    240195void socks_queue(struct in6_addr addr, int perm) 
    241196{ 
    242    SocksQueue_t *squeue; 
    243  
    244    pthread_mutex_lock(&socks_queue_mutex_); 
     197   SocksQueue_t *squeue, sq; 
     198 
    245199   for (squeue = socks_queue_; squeue; squeue = squeue->next) 
    246200      if (IN6_ARE_ADDR_EQUAL(&squeue->addr, &addr)) 
    247201         break; 
     202 
    248203   if (!squeue) 
    249204   { 
    250205      log_debug("queueing new SOCKS connection request"); 
    251       if (!(squeue = calloc(1, sizeof(SocksQueue_t)))) 
    252          log_msg(LOG_EMERG, "could not get memory for SocksQueue entry: \"%s\"", strerror(errno)), exit(1); 
    253       IN6_ADDR_COPY(&squeue->addr, &addr); 
    254       squeue->perm = perm; 
    255       squeue->next = socks_queue_; 
    256       socks_queue_ = squeue; 
     206      memset(&sq, 0, sizeof(sq)); 
     207      IN6_ADDR_COPY(&sq.addr, &addr); 
     208      sq.perm = perm; 
    257209      log_debug("signalling connector"); 
    258       sig_socks_connector(); 
     210      socks_pipe_request(&sq); 
    259211   } 
    260212   else 
    261213      log_debug("connection already exists, not queueing SOCKS connection"); 
    262    pthread_mutex_unlock(&socks_queue_mutex_); 
    263214} 
    264215 
     
    282233 
    283234 
    284 void *socks_connector(void *p) 
    285 { 
    286    OcatPeer_t *peer; 
     235void print_socks_queue(FILE *f) 
     236{ 
     237   SocksQueue_t sq; 
     238 
     239   memset(&sq, 0, sizeof(sq)); 
     240   sq.next = (SocksQueue_t*) f; 
     241   socks_pipe_request(&sq); 
     242} 
     243 
     244 
     245void socks_output_queue(FILE *f) 
     246{ 
     247   int i; 
     248   char addrstr[INET6_ADDRSTRLEN], onstr[ONION_NAME_LEN], buf[SIZE_1K]; 
    287249   SocksQueue_t *squeue; 
    288    int i, ps, run = 1, t_abs, t_diff; 
    289    char thn[THREAD_NAME_LEN] = "cn:", on[ONION_NAME_LEN]; 
    290  
    291    detach_thread(); 
    292  
    293    pthread_mutex_lock(&socks_queue_mutex_); 
    294    socks_thread_cnt_++; 
    295    log_debug("%d connector threads running", socks_thread_cnt_); 
    296    pthread_mutex_unlock(&socks_queue_mutex_); 
    297  
    298    while (run) 
    299    { 
    300       pthread_mutex_lock(&socks_queue_mutex_); 
    301       for (;;) 
    302       { 
    303          for (squeue = socks_queue_; squeue; squeue = squeue->next) 
    304             if (!squeue->state) 
    305             { 
    306                log_debug("unhandled queue entry found"); 
    307                break; 
    308             } 
    309  
    310          if (squeue) 
    311             break; 
    312  
    313          log_debug("waiting for new queue entry"); 
    314          pthread_cond_wait(&socks_queue_cond_, &socks_queue_mutex_); 
    315          // check termination request 
    316          if (term_req()) 
    317          { 
    318             pthread_mutex_unlock(&socks_queue_mutex_); 
    319             return NULL; 
    320          } 
    321       } 
    322  
    323       // spawn spare thread if there is no one left 
    324       log_debug("change queue element state to CONNECTING"); 
    325       squeue->state = SOCKS_CONNECTING; 
    326       socks_connect_cnt_++; 
    327       if (socks_thread_cnt_ <= socks_connect_cnt_) 
    328       { 
    329          log_debug("spawning new connector threads"); 
    330          run_ocat_thread("connector", socks_connector, NULL); 
    331       } 
    332       pthread_mutex_unlock(&socks_queue_mutex_); 
    333  
    334       // changing thread name 
    335       log_debug("changing thread name"); 
    336       ipv6tonion(&squeue->addr, on); 
    337       strlcat(thn, on, THREAD_NAME_LEN); 
    338       set_thread_name(thn); 
    339  
    340       // search for existing peer 
    341       lock_peers(); 
    342       peer = search_peer(&squeue->addr); 
    343       unlock_peers(); 
    344  
    345       // connect via SOCKS if no peer exists 
    346       if (!peer) 
    347          for (i = 0, ps = -1, t_abs = 0; ((i < SOCKS_MAX_RETRY) || squeue->perm) && ps < 0; i++) 
    348          { 
    349             // FIXME: term_req should be checked here 
    350  
    351             // every third connection attempt 
    352             if (!(i % RECONN_ATTEMPTS)) 
    353             { 
    354                // check that it does not reconnect too fast 
    355                t_diff = time(NULL) - t_abs; 
    356                if (t_diff < MIN_RECONNECT_TIME) 
    357                { 
    358                   // and sleep if necessary 
    359                   log_msg(LOG_WARNING, "reconnecting too fast. sleeping %d seconds", MIN_RECONNECT_TIME - t_diff); 
    360                   sleep(MIN_RECONNECT_TIME - t_diff); 
    361                } 
    362                t_abs = time(NULL); 
    363             } 
    364             log_debug("%d. SOCKS connection attempt", i + 1); 
    365             ps = socks_connect(squeue); 
    366          } 
    367       else 
    368          log_msg(LOG_INFO, "peer already exists, ignoring"); 
    369  
    370       // remove request from queue after connect 
    371       log_debug("removing destination from SOCKS queue"); 
    372       pthread_mutex_lock(&socks_queue_mutex_); 
    373  
    374       socks_unqueue(squeue); 
    375       socks_connect_cnt_--; 
    376  
    377       // if there are more threads then pending connections 
    378       // terminate thread 
    379       if (socks_connect_cnt_ < socks_thread_cnt_ - 1) 
    380       { 
    381          log_debug("going to terminate connector thread"); 
    382          socks_thread_cnt_--; 
    383          run = 0; 
    384       } 
    385       pthread_mutex_unlock(&socks_queue_mutex_); 
    386    } 
    387    return NULL; 
    388 } 
    389  
    390  
    391 void print_socks_queue(FILE *f) 
    392 { 
    393    int i; 
    394    char addrstr[INET6_ADDRSTRLEN], onstr[ONION_NAME_LEN]; 
    395    SocksQueue_t *squeue; 
    396  
    397    pthread_mutex_lock(&socks_queue_mutex_); 
    398250 
    399251   for (squeue = socks_queue_, i = 0; squeue; squeue = squeue->next, i++) 
     
    405257      } 
    406258 
    407       fprintf(f, "%d %39s %s.onion %s(%d) %s(%d)\n", 
     259      snprintf(buf, SIZE_1K, "%d: %39s, %s.onion, state = %d, %s(%d), retry = %d, connect_time = %ld, restart_time = %ld", 
    408260            i,  
    409261            addrstr,  
    410262            ipv6tonion(&squeue->addr, onstr), 
    411             squeue->state == SOCKS_CONNECTING ? "CONNECTING" : "QUEUED",  
    412263            squeue->state, 
    413264            squeue->perm ? "PERMANENT" : "TEMPORARY", 
    414             squeue->perm); 
    415    } 
    416  
    417    pthread_mutex_unlock(&socks_queue_mutex_); 
    418 } 
    419  
    420  
     265            squeue->perm, 
     266            squeue->retry, 
     267            squeue->connect_time, 
     268            squeue->restart_time 
     269            ); 
     270//      log_debug("%s", buf); 
     271      write((int) f, buf, strlen(buf)); 
     272      write((int) f, "\n", 1); 
     273   } 
     274   write((int) f, "\0", 1); 
     275   log_debug("socks_output_queue() finished"); 
     276} 
     277 
     278 
     279#if 0 
    421280int socks5_connect(const SocksQueue_t *sq) 
    422281{ 
     
    442301   return fd; 
    443302} 
    444  
     303#endif 
     304 
     305 
     306int socks_tcp_connect(int fd, struct sockaddr *addr, int len) 
     307{ 
     308   char astr[INET6_ADDRSTRLEN]; 
     309   if (connect(fd, addr, len) == -1) 
     310   { 
     311      if (errno != EINPROGRESS) 
     312      { 
     313         log_msg(LOG_ERR, "connect() to SOCKS port %s:%d failed: \"%s\". Sleeping for %d seconds.",  
     314            inet_ntop(CNF(socks_dst)->sin_family,  
     315               CNF(socks_dst)->sin_family == AF_INET ? (char*) &CNF(socks_dst)->sin_addr : (char*) &CNF(socks_dst6)->sin6_addr, astr, sizeof(astr)),  
     316            ntohs(CNF(socks_dst)->sin_port), strerror(errno), TOR_SOCKS_CONN_TIMEOUT); 
     317         return -1; 
     318      } 
     319      log_debug("connection in progress"); 
     320   } 
     321   else 
     322      log_debug("connected"); 
     323 
     324   return 0; 
     325} 
     326 
     327 
     328void socks_reschedule(SocksQueue_t *squeue) 
     329{ 
     330   log_msg(LOG_ERR, "rescheduling SOCKS request"); 
     331   if (squeue->fd > 0) 
     332   { 
     333      oe_close(squeue->fd); 
     334      squeue->fd = 0; 
     335   } 
     336   squeue->restart_time = time(NULL) + TOR_SOCKS_CONN_TIMEOUT; 
     337   squeue->state = SOCKS_NEW; 
     338} 
     339 
     340  
     341void *socks_connector_sel(void *p) 
     342{ 
     343   fd_set rset, wset; 
     344   int maxfd = 0, len, so_err; 
     345   SocksQueue_t *squeue, sq; 
     346   time_t t; 
     347   struct timeval tv; 
     348   socklen_t err_len; 
     349 
     350   for (;;) 
     351   { 
     352      if (term_req()) 
     353         return NULL; 
     354 
     355      FD_ZERO(&rset); 
     356      FD_ZERO(&wset); 
     357      MFD_SET(CNF(socksfd[0]), &rset, maxfd); 
     358      t = time(NULL); 
     359 
     360      for (squeue = socks_queue_; squeue; squeue = squeue->next) 
     361      { 
     362         switch (squeue->state) 
     363         { 
     364            case SOCKS_NEW: 
     365               /*if (!squeue->fd) 
     366               { 
     367                  log_msg(LOG_CRIT, "SOCKS_NEW and fd = %d, but should be 0", squeue->fd); 
     368                  squeue->state = SOCKS_DELETE; 
     369                  continue; 
     370               }*/ 
     371 
     372               if (t < squeue->restart_time) 
     373               { 
     374                  log_debug("SOCKS request is scheduled for connection not before %ds", squeue->restart_time - t); 
     375                  continue; 
     376               } 
     377 
     378               // check and increase retry counter 
     379               squeue->retry++; 
     380               if (!squeue->perm && (squeue->retry > SOCKS_MAX_RETRY)) 
     381               { 
     382                  log_msg(LOG_NOTICE, "temporary request failed %d times and will be removed", squeue->retry - 1); 
     383                  squeue->state = SOCKS_DELETE; 
     384                  continue; 
     385               } 
     386 
     387               log_debug("creating socket for unconnected SOCKS request"); 
     388               if ((squeue->fd = socket(CNF(socks_dst)->sin_family == AF_INET ? PF_INET : PF_INET6, SOCK_STREAM, 0)) == -1) 
     389               { 
     390                  log_msg(LOG_ERR, "cannot create socket for new SOCKS request: \"%s\"", strerror(errno)); 
     391                  continue; 
     392               } 
     393 
     394               set_nonblock(squeue->fd); 
     395               log_debug("queueing fd %d for connect", squeue->fd); 
     396               squeue->connect_time = t; 
     397               if (socks_tcp_connect(squeue->fd, (struct sockaddr*) CNF(socks_dst), SOCKADDR_SIZE(CNF(socks_dst))) == -1) 
     398               { 
     399                  socks_reschedule(squeue); 
     400                  continue; 
     401               } 
     402 
     403               squeue->state = SOCKS_CONNECTING; 
     404               MFD_SET(squeue->fd, &wset, maxfd); 
     405 
     406               break; 
     407 
     408            case SOCKS_4AREQ_SENT: 
     409               MFD_SET(squeue->fd, &rset, maxfd); 
     410               break; 
     411         } 
     412      } 
     413 
     414      // select all file descriptors 
     415      set_select_timeout(&tv); 
     416      log_debug("selecting (maxfd = %d)", maxfd); 
     417      if ((maxfd = select(maxfd + 1, &rset, &wset, NULL, &tv)) == -1) 
     418      { 
     419         log_msg(LOG_EMERG, "select encountered error: \"%s\", restarting", strerror(errno)); 
     420         continue; 
     421      } 
     422      log_debug("select returned %d", maxfd); 
     423 
     424      // check socks request pipe 
     425      if (FD_ISSET(CNF(socksfd[0]), &rset)) 
     426      { 
     427         maxfd--; 
     428         if ((len = read(CNF(socksfd[0]), &sq, sizeof(sq))) == -1) 
     429            log_msg(LOG_ERR, "failed to read from SOCKS request pipe, fd = %d: \"%s\"",  
     430                  CNF(socksfd[0]), strerror(errno)); 
     431         if (len < sizeof(sq)) 
     432            log_msg(LOG_ERR, "read from SOCKS request pipe truncated to %d of %d bytes, ignoring.",  
     433                  len, sizeof(sq)); 
     434         else 
     435         { 
     436            log_debug("received %d bytes on SOCKS request pipe fd %d", len, CNF(socksfd[0])); 
     437            if (sq.next) 
     438            { 
     439               log_debug("output of SOCKS request queue triggered"); 
     440               socks_output_queue((FILE*) sq.next); 
     441            } 
     442            else if (IN6_IS_ADDR_UNSPECIFIED(&sq.addr)) 
     443            { 
     444               log_debug("termination request on SOCKS request queue received"); 
     445            } 
     446            else 
     447            { 
     448               log_debug("SOCKS queuing request received"); 
     449               socks_enqueue(&sq); 
     450            } 
     451         } 
     452      } 
     453 
     454      // handle all other file descriptors 
     455      t = time(NULL); 
     456      for (squeue = socks_queue_; maxfd && squeue; squeue = squeue->next) 
     457      { 
     458         // check write set, this is valid after connect() 
     459         if (FD_ISSET(squeue->fd, &wset)) 
     460         { 
     461            maxfd--; 
     462            if (squeue->state == SOCKS_CONNECTING) 
     463            { 
     464               // test if connect() worked 
     465               log_debug("check socket error"); 
     466               err_len = sizeof(so_err); 
     467               if (getsockopt(squeue->fd, SOL_SOCKET, SO_ERROR, &so_err, &err_len) == -1) 
     468               { 
     469                  log_msg(LOG_ERR, "getsockopt failed: \"%s\", rescheduling request", strerror(errno)); 
     470                  socks_reschedule(squeue); 
     471                  continue; 
     472               } 
     473               if (so_err) 
     474               { 
     475                  log_msg(LOG_ERR, "getsockopt returned %d (\"%s\")", so_err, strerror(so_err)); 
     476                  socks_reschedule(squeue); 
     477                  continue; 
     478               } 
     479               // everything seems to be ok, now check request status 
     480               if (socks_send_request(squeue) == -1) 
     481               { 
     482                  log_msg(LOG_ERR, "SOCKS request failed"); 
     483                  socks_reschedule(squeue); 
     484                  continue; 
     485               } 
     486               // request successfully sent, advance state machine 
     487               squeue->state = SOCKS_4AREQ_SENT; 
     488            } 
     489            else 
     490               log_debug("unknown state %d in write set", squeue->state); 
     491         } 
     492 
     493         // check read set, this is valid after write, i.e. receiving SOCKS response 
     494         if (FD_ISSET(squeue->fd, &rset)) 
     495         { 
     496            maxfd--; 
     497            if (squeue->state == SOCKS_4AREQ_SENT) 
     498            { 
     499               if (socks_rec_response(squeue) == -1) 
     500               { 
     501                  socks_reschedule(squeue); 
     502                  continue; 
     503               } 
     504               // success 
     505               log_debug("activating peer fd %d", squeue->fd); 
     506               socks_activate_peer(squeue); 
     507               squeue->state = SOCKS_DELETE; 
     508            } 
     509            else 
     510               log_debug("unknown state %d in read set", squeue->state); 
     511         } 
     512      } 
     513 
     514      // delete requests from queue which are marked for deletion 
     515      for (squeue = socks_queue_; squeue; squeue = squeue->next) 
     516         if (squeue->state == SOCKS_DELETE) 
     517         { 
     518            socks_unqueue(squeue); 
     519            // restart loop 
     520            squeue = socks_queue_; 
     521            if (!squeue) 
     522            { 
     523               log_debug("last entry deleted, breaking loop"); 
     524               break; 
     525            } 
     526         } 
     527   } 
     528} 
     529 
Note: See TracChangeset for help on using the changeset viewer.