Back to index

nordugrid-arc-nox  1.1.0~rc6
PayloadTCPSocket.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 
00005 #include <ctime>
00006 #include <unistd.h>
00007 #include <sys/types.h>
00008 #include <sys/stat.h>
00009 #include <errno.h>
00010 #ifdef WIN32
00011 #include <arc/win32.h>
00012 #else // UNIX
00013 #include <sys/types.h>
00014 #include <sys/socket.h>
00015 #include <netdb.h>
00016 #include <netinet/in.h>
00017 #include <sys/poll.h>
00018 #include <netinet/tcp.h>
00019 #include <fcntl.h>
00020 #endif
00021 
00022 #include <glibmm.h>
00023 
00024 #include <arc/StringConv.h>
00025 #include <arc/Utils.h>
00026 #include "PayloadTCPSocket.h"
00027 
00028 namespace Arc {
00029 
00030 int PayloadTCPSocket::connect_socket(const char* hostname,int port) 
00031 {
00032   struct addrinfo hint;
00033   memset(&hint, 0, sizeof(hint));
00034   hint.ai_family = AF_UNSPEC;
00035   hint.ai_socktype = SOCK_STREAM;
00036   hint.ai_protocol = IPPROTO_TCP;
00037   std::string port_str = tostring(port);
00038   struct addrinfo *info = NULL;
00039   int ret = getaddrinfo(hostname, port_str.c_str(), &hint, &info);
00040   if (ret != 0) {
00041     std::string err_str = gai_strerror(ret); 
00042     logger.msg(VERBOSE, "Failed to resolve %s (%s)", hostname, err_str);
00043        return -1;
00044   }
00045   int s = -1;
00046   for(struct addrinfo *info_ = info;info_;info_=info_->ai_next) {
00047     logger.msg(VERBOSE,"Trying to connect %s(%s):%d",
00048                      hostname,info_->ai_family==AF_INET6?"IPv6":"IPv4",port);
00049     s = ::socket(info_->ai_family, info_->ai_socktype, info_->ai_protocol);
00050     if(s == -1) {
00051       logger.msg(VERBOSE, "Failed to create socket to %s(%s):%d - %s",
00052                         hostname,info_->ai_family==AF_INET6?"IPv6":"IPv4",port,
00053                         Arc::StrError(errno));
00054       continue;
00055     }
00056 #ifndef WIN32
00057     // In *NIX we can use non-blocking socket because poll() will 
00058     // be used for waiting.
00059     int s_flags = ::fcntl(s, F_GETFL, 0);
00060     if(s_flags != -1) {
00061       ::fcntl(s, F_SETFL, s_flags | O_NONBLOCK);
00062     } else {
00063       logger.msg(VERBOSE, "Failed to get TCP socket options for connection"
00064                         " to %s(%s):%d - timeout won't work - %s",
00065                         hostname,info_->ai_family==AF_INET6?"IPv6":"IPv4",port,
00066                         Arc::StrError(errno));
00067     }
00068     if(::connect(s, info_->ai_addr, info_->ai_addrlen) == -1) {
00069       if(errno != EINPROGRESS) {
00070         logger.msg(VERBOSE, "Failed to connect to %s(%s):%i - %s",
00071                         hostname,info_->ai_family==AF_INET6?"IPv6":"IPv4",port,
00072                         Arc::StrError(errno));
00073         close(s); s = -1;
00074         continue;
00075       }
00076       int pres;
00077       // Second resolution is enough
00078       time_t c_time = time(NULL);
00079       time_t f_time = c_time + timeout_;
00080       struct pollfd fd;
00081       for(;;) {
00082         fd.fd=s; fd.events=POLLOUT | POLLPRI; fd.revents=0;
00083         pres = ::poll(&fd,1,(f_time-c_time)*1000);
00084         // Checking for operation interrupted by signal
00085         if((pres == -1) && (errno == EINTR)) {
00086           c_time = time(NULL);
00087           // TODO: protection against time jumping backward.
00088           if(((int)(f_time - c_time)) < 0) c_time = f_time; 
00089         }
00090         break;
00091       }
00092       if(pres == 0) {
00093         logger.msg(VERBOSE, "Timeout connecting to %s(%s):%i - %i s",
00094                         hostname,info_->ai_family==AF_INET6?"IPv6":"IPv4",port,
00095                         timeout_);
00096         close(s); s = -1;
00097         continue;
00098       }
00099       if(pres != 1) {
00100         logger.msg(VERBOSE, "Failed waiting connection to %s(%s):%i - %s",
00101                         hostname,info_->ai_family==AF_INET6?"IPv6":"IPv4",port,
00102                         Arc::StrError(errno));
00103         close(s); s = -1;
00104         continue;
00105       }
00106       // man connect says one has to check SO_ERROR, but poll() returns
00107       // POLLERR and POLLHUP so we can use them directly. 
00108       if(fd.revents & (POLLERR | POLLHUP)) {
00109         logger.msg(VERBOSE, "Failed to connect to %s(%s):%i",
00110                         hostname,info_->ai_family==AF_INET6?"IPv6":"IPv4",port);
00111         close(s); s = -1;
00112         continue;
00113       }
00114     }
00115 #else
00116     if(::connect(s, info_->ai_addr, info_->ai_addrlen) == -1) {
00117       logger.msg(VERBOSE, "Failed to connect to %s(%s):%i",
00118                         hostname,info_->ai_family==AF_INET6?"IPv6":"IPv4",port);
00119       close(s); s = -1;
00120       continue;
00121     };
00122 #endif
00123     break;
00124   };
00125   if(s == -1) {
00126     logger.msg(VERBOSE, "Failed to establish connection to %s:%i", hostname, port);
00127   };
00128   freeaddrinfo(info);
00129   return s;
00130 }
00131 
00132 PayloadTCPSocket::PayloadTCPSocket(const char* hostname,
00133                                    int port,
00134                                    int timeout,
00135                                    Logger& logger) :
00136   logger(logger)
00137 {
00138   timeout_=timeout;
00139   handle_=connect_socket(hostname,port);
00140   acquired_=true;
00141 }
00142 
00143 PayloadTCPSocket::PayloadTCPSocket(const std::string endpoint, int timeout,
00144                                                Logger& logger) :
00145   logger(logger)
00146 {
00147   std::string hostname = endpoint;
00148   std::string::size_type p = hostname.find(':');
00149   if(p == std::string::npos) return;
00150   int port = atoi(hostname.c_str()+p+1);
00151   hostname.resize(p);
00152   timeout_=timeout;
00153   handle_=connect_socket(hostname.c_str(),port);
00154   acquired_=true;
00155 }
00156 
00157 PayloadTCPSocket::~PayloadTCPSocket(void) {
00158   if(acquired_) { shutdown(handle_,2); close(handle_); };
00159 }
00160 
00161 bool PayloadTCPSocket::Get(char* buf,int& size) {
00162   if(handle_ == -1) return false;
00163   ssize_t l = size;
00164   size=0;
00165 #ifndef WIN32
00166   struct pollfd fd;
00167   fd.fd=handle_; fd.events=POLLIN | POLLPRI | POLLERR; fd.revents=0;
00168   if(::poll(&fd,1,timeout_*1000) != 1) return false;
00169   if(!(fd.revents & (POLLIN | POLLPRI))) return false;
00170 #endif
00171   l=::recv(handle_,buf,l,0);
00172   if(l == -1) return false;
00173   size=l;
00174 #ifndef WIN32
00175   if((l == 0) && (fd.revents && POLLERR)) return false;
00176 #else
00177   if(l == 0) return false;
00178 #endif
00179   return true;
00180 }
00181 
00182 bool PayloadTCPSocket::Get(std::string& buf) {
00183   char tbuf[1024];
00184   int l = sizeof(tbuf);
00185   bool result = Get(tbuf,l);
00186   buf.assign(tbuf,l);
00187   return result;
00188 }
00189 
00190 bool PayloadTCPSocket::Put(const char* buf,Size_t size) {
00191   ssize_t l;
00192   if(handle_ == -1) return false;
00193   time_t start = time(NULL);
00194   for(;size;) {
00195 #ifndef WIN32
00196     struct pollfd fd;
00197     fd.fd=handle_; fd.events=POLLOUT | POLLERR; fd.revents=0;
00198     int to = timeout_-(unsigned int)(time(NULL)-start);
00199     if(to < 0) to=0;
00200     if(::poll(&fd,1,to*1000) != 1) return false;
00201     if(!(fd.revents & POLLOUT)) return false;
00202 #endif
00203     l=::send(handle_, buf, size, 0);
00204     if(l == -1) return false;
00205     buf+=l; size-=l;
00206 #ifdef WIN32
00207     int to = timeout_-(unsigned int)(time(NULL)-start);
00208     if(to < 0) return false;
00209 #endif
00210   };  
00211   return true;
00212 }
00213 
00214 void PayloadTCPSocket::NoDelay(bool val) {
00215   if(handle_ == -1) return;
00216   int flag = val?1:0;
00217 
00218 #ifdef WIN32
00219  ::setsockopt(handle_,IPPROTO_TCP,TCP_NODELAY,(const char*)(&flag),sizeof(flag));
00220 #else
00221  ::setsockopt(handle_,IPPROTO_TCP,TCP_NODELAY,&flag,sizeof(flag));
00222 #endif
00223 
00224 }
00225 
00226 } // namespace Arc