Back to index

nordugrid-arc-nox  1.1.0~rc6
MCCTCP.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 
00005 #include <cstdlib>
00006 #include <unistd.h>
00007 
00008 #ifdef WIN32
00009 #include <arc/win32.h>
00010 #include <winsock2.h>
00011 typedef int socklen_t;
00012 #define ErrNo WSAGetLastError()
00013 
00014 #include <stdio.h>
00015 // There is no inet_ntop on WIN32
00016 inline const char *inet_ntop(int af, const void *__restrict src, char *__restrict dest, socklen_t size)
00017 {
00018   // IPV6 not supported (yet?)
00019   if(AF_INET!=af)
00020   {
00021     printf("inet_ntop is only implemented for AF_INET address family on win32/msvc8");
00022     abort();
00023   }
00024 
00025   // Format address
00026   char *s=inet_ntoa(*reinterpret_cast<const in_addr*>(src));
00027   if(!s)
00028     return 0;
00029 
00030   // Copy to given buffer
00031   socklen_t len=(socklen_t)strlen(s);
00032   if(len>=size)
00033     return 0;
00034   return strncpy(dest, s, len);
00035 }
00036 
00037 #else // UNIX
00038 // NOTE: On Solaris errno is not working properly if cerrno is included first
00039 #include <cerrno>
00040 #include <sys/types.h>
00041 #include <sys/socket.h>
00042 #include <arpa/inet.h>
00043 #include <netinet/in.h>
00044 #include <netinet/tcp.h>
00045 #include <netdb.h>
00046 #define ErrNo errno
00047 #endif
00048 
00049 #include <arc/message/PayloadStream.h>
00050 #include <arc/message/PayloadRaw.h>
00051 #include <arc/XMLNode.h>
00052 #include <arc/Thread.h>
00053 #include <arc/Logger.h>
00054 #include <arc/StringConv.h>
00055 #include <arc/Utils.h>
00056 
00057 #include "MCCTCP.h"
00058 
00059 #define PROTO_NAME(ADDR) ((ADDR->ai_family==AF_INET6)?"IPv6":"IPv4")
00060 Arc::Logger Arc::MCC_TCP::logger(Arc::MCC::logger,"TCP");
00061 
00062 Arc::MCC_TCP::MCC_TCP(Arc::Config *cfg) : Arc::MCC(cfg) {
00063 }
00064 
00065 static Arc::Plugin* get_mcc_service(Arc::PluginArgument* arg) {
00066     Arc::MCCPluginArgument* mccarg =
00067             arg?dynamic_cast<Arc::MCCPluginArgument*>(arg):NULL;
00068     if(!mccarg) return NULL;
00069     return new Arc::MCC_TCP_Service((Arc::Config*)(*mccarg));
00070 }
00071 
00072 static Arc::Plugin* get_mcc_client(Arc::PluginArgument* arg) {
00073     Arc::MCCPluginArgument* mccarg =
00074             arg?dynamic_cast<Arc::MCCPluginArgument*>(arg):NULL;
00075     if(!mccarg) return NULL;
00076     return new Arc::MCC_TCP_Client((Arc::Config*)(*mccarg));
00077 }
00078 
00079 Arc::PluginDescriptor PLUGINS_TABLE_NAME[] = {
00080     { "tcp.service", "HED:MCC", 0, &get_mcc_service },
00081     { "tcp.client",  "HED:MCC", 0, &get_mcc_client  },
00082     { NULL, NULL, 0, NULL }
00083 };
00084 
00085 
00086 namespace Arc {
00087 
00088 
00089 MCC_TCP_Service::MCC_TCP_Service(Config *cfg):MCC_TCP(cfg),max_executers_(-1),max_executers_drop_(false) {
00090 #ifdef WIN32
00091     WSADATA wsadata;
00092     if (WSAStartup(MAKEWORD(2,2), &wsadata) != 0) {
00093       logger.msg(ERROR, "Cannot initialize winsock library");
00094         return;
00095     }
00096 #endif
00097     for(int i = 0;;++i) {
00098         struct addrinfo hint;
00099         struct addrinfo *info = NULL;
00100         memset(&hint, 0, sizeof(hint));
00101         hint.ai_socktype = SOCK_STREAM;
00102         hint.ai_protocol = IPPROTO_TCP; // ?
00103         hint.ai_flags = AI_PASSIVE;
00104         XMLNode l = (*cfg)["Listen"][i];
00105         if(!l) break;
00106         std::string port_s = l["Port"];
00107         if(port_s.empty()) {
00108             logger.msg(ERROR, "Missing Port in Listen element");
00109             continue;
00110         };
00111         std::string interface_s = l["Interface"];
00112         std::string version_s = l["Version"];
00113         if(!version_s.empty()) {
00114             if(version_s == "4") { hint.ai_family = AF_INET; }
00115             else if(version_s == "6") { hint.ai_family = AF_INET6; }
00116             else {
00117                 logger.msg(ERROR, "Version in Listen element can't be recognized");
00118                 continue;
00119             };
00120         };
00121         int ret = getaddrinfo(interface_s.empty()?NULL:interface_s.c_str(),
00122                               port_s.c_str(), &hint, &info);
00123         if (ret != 0) {
00124             std::string err_str = gai_strerror(ret);
00125             if(interface_s.empty()) {
00126               logger.msg(ERROR, "Failed to obtain local address for port %s - %s", port_s, err_str);
00127             } else {
00128               logger.msg(ERROR, "Failed to obtain local address for %s:%s - %s", interface_s, port_s, err_str);
00129             };
00130             continue;
00131         };
00132         for(struct addrinfo *info_ = info;info_;info_=info_->ai_next) {
00133             if(interface_s.empty()) {
00134               logger.msg(VERBOSE, "Trying to listen on TCP port %s(%s)", port_s, PROTO_NAME(info_));
00135             } else {
00136               logger.msg(VERBOSE, "Trying to listen on %s:%s(%s)", interface_s, port_s, PROTO_NAME(info_));
00137             };
00138             int s = ::socket(info_->ai_family,info_->ai_socktype,info_->ai_protocol);
00139             if(s == -1) {
00140                 std::string e = StrError(errno);
00141                 if(interface_s.empty()) {
00142                   logger.msg(ERROR, "Failed to create socket for for listening at TCP port %s(%s): %s", port_s, PROTO_NAME(info_),e);
00143                 } else {
00144                   logger.msg(ERROR, "Failed to create socket for for listening at %s:%s(%s): %s", interface_s, port_s, PROTO_NAME(info_),e);
00145                 };
00146                 continue;
00147             };
00148 #ifdef IPV6_V6ONLY
00149             if(info_->ai_family == AF_INET6) {
00150               int v = 1;
00151               // Some systems (Linux for example) make v6 support v4 too
00152               // by default. Some don't. Make it same for everyone - 
00153               // separate sockets for v4 and v6.
00154               if(setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,&v,sizeof(v)) != 0) {
00155                 if(interface_s.empty()) {
00156                   logger.msg(ERROR, "Failed to limit socket to IPv6 at TCP port %s - may cause errors for IPv4 at same port", port_s);
00157                 } else {
00158                   logger.msg(ERROR, "Failed to limit socket to IPv6 at %s:%s - may cause errors for IPv4 at same port", interface_s, port_s);
00159                 };
00160               };
00161             };
00162 #endif
00163             if(::bind(s,info_->ai_addr,info_->ai_addrlen) == -1) {
00164                 std::string e = StrError(errno);
00165                 if(interface_s.empty()) {
00166                   logger.msg(ERROR, "Failed to bind socket for TCP port %s(%s): %s", port_s, PROTO_NAME(info_),e);
00167                 } else {
00168                   logger.msg(ERROR, "Failed to bind socket for %s:%s(%s): %s", interface_s, port_s, PROTO_NAME(info_),e);
00169                 };
00170                 close(s);
00171                 continue;
00172             };
00173             if(::listen(s,-1) == -1) {
00174                 std::string e = StrError(errno);
00175                 if(interface_s.empty()) {
00176                   logger.msg(WARNING, "Failed to listen at TCP port %s(%s): %s", port_s, PROTO_NAME(info_),e);
00177                 } else {
00178                   logger.msg(WARNING, "Failed to listen at %s:%s(%s): %s", interface_s, port_s, PROTO_NAME(info_),e);
00179                 };
00180                 close(s);
00181                 continue;
00182             };
00183             bool no_delay = false;
00184             if(l["NoDelay"]) {
00185                 std::string v = l["NoDelay"];
00186                 if((v == "true") || (v == "1")) no_delay=true;
00187             }
00188             int timeout = 60;
00189             if (l["Timeout"]) {
00190                 std::string v = l["Timeout"];
00191                 timeout = atoi(v.c_str());
00192             }
00193             handles_.push_back(mcc_tcp_handle_t(s,timeout,no_delay));
00194             if(interface_s.empty()) {
00195               logger.msg(INFO, "Listening on TCP port %s(%s)", port_s, PROTO_NAME(info_));
00196             } else {
00197               logger.msg(INFO, "Listening on %s:%s(%s)", interface_s, port_s, PROTO_NAME(info_));
00198             };
00199         };
00200         freeaddrinfo(info);
00201     };
00202     if(handles_.size() == 0) {
00203         logger.msg(ERROR, "No listening ports initiated");
00204         return;
00205     };
00206     if((*cfg)["Limit"]) {
00207       std::string v = (*cfg)["Limit"];
00208       max_executers_ = atoi(v.c_str());
00209       v=(std::string)((*cfg)["Limit"].Attribute("drop"));
00210       if((v == "yes") || (v == "true") || (v == "1")) {
00211         max_executers_drop_=true;
00212       };
00213       if(max_executers_ > 0) {
00214         logger.msg(INFO, "Setting connections limit to %i, connections over limit will be %s",max_executers_,max_executers_drop_?"dropped":"put on hold");
00215       };
00216     };
00217     if(!CreateThreadFunction(&listener,this)) {
00218         logger.msg(ERROR, "Failed to start thread for listening");
00219         for(std::list<mcc_tcp_handle_t>::iterator i = handles_.begin();i!=handles_.end();i=handles_.erase(i)) ::close(i->handle);
00220     };
00221 }
00222 
00223 MCC_TCP_Service::~MCC_TCP_Service(void) {
00224     //logger.msg(VERBOSE, "TCP_Service destroy");
00225     lock_.lock();
00226     for(std::list<mcc_tcp_handle_t>::iterator i = handles_.begin();i!=handles_.end();++i) {
00227         ::close(i->handle); i->handle=-1;
00228     };
00229     for(std::list<mcc_tcp_exec_t>::iterator e = executers_.begin();e != executers_.end();++e) {
00230         ::close(e->handle); e->handle=-1;
00231     };
00232     // Wait for threads to exit
00233     while(executers_.size() > 0) {
00234         lock_.unlock(); sleep(1); lock_.lock();
00235     };
00236     while(handles_.size() > 0) {
00237         lock_.unlock(); sleep(1); lock_.lock();
00238     };
00239     lock_.unlock();
00240 #ifdef WIN32
00241     WSACleanup();
00242 #endif
00243 }
00244 
00245 MCC_TCP_Service::mcc_tcp_exec_t::mcc_tcp_exec_t(MCC_TCP_Service* o,int h,int t,bool nd):obj(o),handle(h),no_delay(nd),timeout(t) {
00246     static int local_id = 0;
00247     if(handle == -1) return;
00248     id=local_id++;
00249     // list is locked externally
00250     std::list<mcc_tcp_exec_t>::iterator e = o->executers_.insert(o->executers_.end(),*this);
00251     if(!CreateThreadFunction(&MCC_TCP_Service::executer,&(*e))) {
00252         logger.msg(ERROR, "Failed to start thread for communication");
00253         ::shutdown(handle,2);
00254 #ifdef WIN32
00255         ::closesocket(handle); handle=-1; o->executers_.erase(e);
00256 #else
00257         ::close(handle);  handle=-1; o->executers_.erase(e);
00258 #endif
00259     };
00260 }
00261 
00262 void MCC_TCP_Service::listener(void* arg) {
00263     MCC_TCP_Service& it = *((MCC_TCP_Service*)arg);
00264     for(;;) {
00265         int max_s = -1;
00266         fd_set readfds;
00267         FD_ZERO(&readfds);
00268         it.lock_.lock();
00269         for(std::list<mcc_tcp_handle_t>::iterator i = it.handles_.begin();i!=it.handles_.end();) {
00270             int s = i->handle;
00271             if(s == -1) { i=it.handles_.erase(i); continue; };
00272             FD_SET(s,&readfds);
00273             if(s > max_s) max_s = s;
00274             ++i;
00275         };
00276         it.lock_.unlock();
00277         if(max_s == -1) break;
00278         struct timeval tv; tv.tv_sec = 2; tv.tv_usec = 0;
00279         int n = select(max_s+1,&readfds,NULL,NULL,&tv);
00280         if(n < 0) {
00281             if(ErrNo != EINTR) {
00282                 logger.msg(ERROR, "Failed while waiting for connection request");
00283                 it.lock_.lock();
00284                 for(std::list<mcc_tcp_handle_t>::iterator i = it.handles_.begin();i!=it.handles_.end();) {
00285                     int s = i->handle;
00286                     ::close(s);
00287                     i=it.handles_.erase(i);
00288                 };
00289                 it.lock_.unlock();
00290                 return;
00291             };
00292             continue;
00293         } else if(n == 0) continue;
00294         it.lock_.lock();
00295         for(std::list<mcc_tcp_handle_t>::iterator i = it.handles_.begin();i!=it.handles_.end();++i) {
00296             int s = i->handle;
00297             if(s == -1) continue;
00298             if(FD_ISSET(s,&readfds)) {
00299                 it.lock_.unlock();
00300                 struct sockaddr addr;
00301                 socklen_t addrlen = sizeof(addr);
00302                 int h = ::accept(s,&addr,&addrlen);
00303                 if(h == -1) {
00304                     logger.msg(ERROR, "Failed to accept connection request");
00305                     it.lock_.lock();
00306                 } else {
00307                     it.lock_.lock();
00308                     bool rejected = false;
00309                     bool first_time = true;
00310                     while((it.max_executers_ > 0) &&
00311                           (it.executers_.size() >= it.max_executers_)) {
00312                       if(it.max_executers_drop_) {
00313                         logger.msg(WARNING, "Too many connections - dropping new one");
00314                         ::shutdown(s,2);
00315                         ::close(s);
00316                         rejected = true;
00317                         break;
00318                       } else {
00319                         if(first_time)
00320                           logger.msg(WARNING, "Too many connections - waiting for old to close");
00321                         Glib::TimeVal etime;
00322                         etime.assign_current_time();
00323                         etime.add_milliseconds(10000); // 10 s
00324                         it.cond_.timed_wait(it.lock_,etime);
00325                       };
00326                     };
00327                     if(!rejected) {
00328                       mcc_tcp_exec_t t(&it,h,i->timeout,i->no_delay);
00329                     };
00330                 };
00331             };
00332         };
00333         it.lock_.unlock();
00334     };
00335     return;
00336 }
00337 
00338 class TCPSecAttr: public SecAttr {
00339  friend class MCC_TCP_Service;
00340  public:
00341   TCPSecAttr(const std::string& remote_ip, const std::string &remote_port, const std::string& local_ip, const std::string& local_port);
00342   virtual ~TCPSecAttr(void);
00343   virtual operator bool(void);
00344   virtual bool Export(SecAttrFormat format,XMLNode &val) const;
00345  protected:
00346   std::string local_ip_;
00347   std::string local_port_;
00348   std::string remote_ip_;
00349   std::string remote_port_;
00350   virtual bool equal(const SecAttr &b) const;
00351 };
00352 
00353 TCPSecAttr::TCPSecAttr(const std::string& remote_ip, const std::string &remote_port, const std::string& local_ip, const std::string& local_port) :
00354  remote_ip_(remote_ip), remote_port_(remote_port), local_ip_(local_ip), local_port_(local_port) {
00355 }
00356 
00357 TCPSecAttr::~TCPSecAttr(void) {
00358 }
00359 
00360 TCPSecAttr::operator bool(void) {
00361   return true;
00362 }
00363 
00364 bool TCPSecAttr::equal(const SecAttr &b) const {
00365   try {
00366     const TCPSecAttr& a = (const TCPSecAttr&)b;
00367     if((!local_ip_.empty()) && (!a.local_ip_.empty()) && (local_ip_ != a.local_ip_)) return false;
00368     if((!local_port_.empty()) && (!a.local_port_.empty()) && (local_port_ != a.local_port_)) return false;
00369     if((!remote_ip_.empty()) && (!a.remote_ip_.empty()) && (remote_ip_ != a.remote_ip_)) return false;
00370     if((!remote_port_.empty()) && (!a.remote_port_.empty()) && (remote_port_ != a.remote_port_)) return false;
00371     return true;
00372   } catch(std::exception&) { };
00373   return false;
00374 }
00375 
00376 static void fill_arc_string_attribute(XMLNode object,std::string value,const char* id) {
00377   object=value;
00378   object.NewAttribute("Type")="string";
00379   object.NewAttribute("AttributeId")=id;
00380 }
00381 
00382 static void fill_xacml_string_attribute(XMLNode object,std::string value,const char* id) {
00383   object.NewChild("ra:AttributeValue")=value;
00384   object.NewAttribute("DataType")="xs:string";
00385   object.NewAttribute("AttributeId")=id;
00386 }
00387 
00388 bool TCPSecAttr::Export(SecAttrFormat format,XMLNode &val) const {
00389   if(format == UNDEFINED) {
00390   } else if(format == ARCAuth) {
00391     NS ns;
00392     ns["ra"]="http://www.nordugrid.org/schemas/request-arc";
00393     val.Namespaces(ns); val.Name("ra:Request");
00394     XMLNode item = val.NewChild("ra:RequestItem");
00395     if(!local_port_.empty()) {
00396       fill_arc_string_attribute(item.NewChild("ra:Resource"),local_ip_+":"+local_port_,"http://www.nordugrid.org/schemas/policy-arc/types/tcp/localendpoint");
00397     } else if(!local_ip_.empty()) {
00398       fill_arc_string_attribute(item.NewChild("ra:Resource"),local_ip_,"http://www.nordugrid.org/schemas/policy-arc/types/tcp/localendpoint");
00399     };
00400     if(!remote_port_.empty()) {
00401       fill_arc_string_attribute(item.NewChild("ra:Subject").NewChild("ra:SubjectAttribute"),remote_ip_+":"+remote_port_,"http://www.nordugrid.org/schemas/policy-arc/types/tcp/remoteendpoint");
00402     } else if(!remote_ip_.empty()) {
00403       fill_arc_string_attribute(item.NewChild("ra:Subject").NewChild("ra:SubjectAttribute"),remote_ip_,"http://www.nordugrid.org/schemas/policy-arc/types/tcp/remoteiendpoint");
00404     };
00405     return true;
00406   } else if(format == XACML) {
00407     NS ns;
00408     ns["ra"]="urn:oasis:names:tc:xacml:2.0:context:schema:os";
00409     val.Namespaces(ns); val.Name("ra:Request");
00410     if(!local_port_.empty()) {
00411       fill_xacml_string_attribute(val.NewChild("ra:Resource").NewChild("ra:Attribute"),local_ip_+":"+local_port_,"http://www.nordugrid.org/schemas/policy-arc/types/tcp/localendpoint");
00412     } else if(!local_ip_.empty()) {
00413       fill_xacml_string_attribute(val.NewChild("ra:Resource").NewChild("ra:Attribute"),local_ip_,"http://www.nordugrid.org/schemas/policy-arc/types/tcp/localendpoint");
00414     };
00415     if(!remote_port_.empty()) {
00416       fill_xacml_string_attribute(val.NewChild("ra:Subject").NewChild("ra:Attribute"),remote_ip_+":"+remote_port_,"http://www.nordugrid.org/schemas/policy-arc/types/tcp/remoteendpoint");
00417     } else if(!remote_ip_.empty()) {
00418       fill_xacml_string_attribute(val.NewChild("ra:Subject").NewChild("ra:Attribute"),remote_ip_,"http://www.nordugrid.org/schemas/policy-arc/types/tcp/remoteiendpoint");
00419     };
00420     return true;
00421   } else {
00422   };
00423   return false;
00424 }
00425 
00426 static bool get_host_port(struct sockaddr_storage *addr, std::string &host, std::string &port)
00427 {
00428     char buf[INET6_ADDRSTRLEN];
00429     memset(buf,0,sizeof(buf));
00430     const char *ret = NULL;
00431     switch (addr->ss_family) {
00432         case AF_INET: {
00433             struct sockaddr_in *sin = (struct sockaddr_in *)addr;
00434             ret = inet_ntop(AF_INET, &(sin->sin_addr), buf, sizeof(buf)-1);
00435             if (ret != NULL) {
00436                 port = tostring(ntohs(sin->sin_port));
00437             }
00438             break;
00439         }
00440         case AF_INET6: {
00441             struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)addr;
00442             if (!IN6_IS_ADDR_V4MAPPED(&(sin6->sin6_addr))) {
00443                 ret = inet_ntop(AF_INET6, &(sin6->sin6_addr), buf, sizeof(buf)-1);
00444             } else {
00445                 // ipv4 address mapped to ipv6 so resolve as ipv4 address
00446                 struct sockaddr_in sin;
00447                 memset(&sin, 0, sizeof(struct sockaddr_in));
00448                 sin.sin_family = AF_INET;
00449                 sin.sin_port = sin6->sin6_port;
00450                 sin.sin_addr.s_addr = ((uint32_t *)&sin6->sin6_addr)[3];
00451                 memcpy(addr, &sin, sizeof(struct sockaddr_in));
00452                 ret = inet_ntop(AF_INET, &(sin.sin_addr), buf, sizeof(buf)-1);
00453             }
00454             if (ret != NULL) {
00455                 port = tostring(ntohs(sin6->sin6_port));
00456             }
00457             break;
00458         }
00459         default:
00460             return false;
00461             break;
00462     }
00463     if (ret != NULL) {
00464         buf[sizeof(buf)-1] = 0;
00465         host = buf;
00466     } else {
00467         return false;
00468     }
00469     return true;
00470 }
00471 
00472 void MCC_TCP_Service::executer(void* arg) {
00473     MCC_TCP_Service& it = *(((mcc_tcp_exec_t*)arg)->obj);
00474     int s = ((mcc_tcp_exec_t*)arg)->handle;
00475     int id = ((mcc_tcp_exec_t*)arg)->id;
00476     int no_delay = ((mcc_tcp_exec_t*)arg)->no_delay;
00477     int timeout = ((mcc_tcp_exec_t*)arg)->timeout;
00478     std::string host_attr,port_attr;
00479     std::string remotehost_attr,remoteport_attr;
00480     std::string endpoint_attr;
00481     // Extract useful attributes
00482     {
00483         struct sockaddr_storage addr;
00484         socklen_t addrlen;
00485         addrlen=sizeof(addr);
00486         if(getsockname(s, (struct sockaddr*)(&addr), &addrlen) == 0) {
00487             if (get_host_port(&addr, host_attr, port_attr) == true) {
00488                 endpoint_attr = "://"+host_attr+":"+port_attr;
00489             }
00490         }
00491         if(getpeername(s, (struct sockaddr*)&addr, &addrlen) == 0) {
00492             get_host_port(&addr, remotehost_attr, remoteport_attr);
00493         }
00494         // SESSIONID
00495     };
00496 
00497     // Creating stream payload
00498     PayloadTCPSocket stream(s, timeout, logger);
00499     stream.NoDelay(no_delay);
00500     MessageContext context;
00501     MessageAuthContext auth_context;
00502     for(;;) {
00503         // TODO: Check state of socket here and leave immediately if not connected anymore.
00504         // Preparing Message objects for chain
00505         MessageAttributes attributes_in;
00506         MessageAttributes attributes_out;
00507         MessageAuth auth_in;
00508         MessageAuth auth_out;
00509         Message nextinmsg;
00510         Message nextoutmsg;
00511         nextinmsg.Payload(&stream);
00512         nextinmsg.Attributes(&attributes_in);
00513         nextinmsg.Attributes()->set("TCP:HOST",host_attr);
00514         nextinmsg.Attributes()->set("TCP:PORT",port_attr);
00515         nextinmsg.Attributes()->set("TCP:REMOTEHOST",remotehost_attr);
00516         nextinmsg.Attributes()->set("TCP:REMOTEPORT",remoteport_attr);
00517         nextinmsg.Attributes()->set("TCP:ENDPOINT",endpoint_attr);
00518         nextinmsg.Attributes()->set("ENDPOINT",endpoint_attr);
00519         nextinmsg.Context(&context);
00520         nextoutmsg.Attributes(&attributes_out);
00521         nextinmsg.Auth(&auth_in);
00522         TCPSecAttr* tattr = new TCPSecAttr(remotehost_attr, remoteport_attr, host_attr, port_attr);
00523         nextinmsg.Auth()->set("TCP",tattr);
00524         nextoutmsg.Auth(&auth_out);
00525         nextoutmsg.Context(&context);
00526         nextoutmsg.AuthContext(&auth_context);
00527         if(!it.ProcessSecHandlers(nextinmsg,"incoming")) break;
00528         // Call next MCC
00529         MCCInterface* next = it.Next();
00530         if(!next) break;
00531         logger.msg(VERBOSE, "next chain element called");
00532         MCC_Status ret = next->process(nextinmsg,nextoutmsg);
00533         if(!it.ProcessSecHandlers(nextoutmsg,"outgoing")) {
00534           if(nextoutmsg.Payload()) delete nextoutmsg.Payload();
00535           break;
00536         };
00537         // If nextoutmsg contains some useful payload send it here.
00538         // So far only buffer payload is supported
00539         // Extracting payload
00540         if(nextoutmsg.Payload()) {
00541             PayloadRawInterface* outpayload = NULL;
00542             try {
00543                 outpayload = dynamic_cast<PayloadRawInterface*>(nextoutmsg.Payload());
00544             } catch(std::exception& e) { };
00545             if(!outpayload) {
00546                 logger.msg(WARNING, "Only Raw Buffer payload is supported for output");
00547             } else {
00548                 // Sending payload
00549                 for(int n=0;;++n) {
00550                     char* buf = outpayload->Buffer(n);
00551                     if(!buf) break;
00552                     int bufsize = outpayload->BufferSize(n);
00553                     if(!(stream.Put(buf,bufsize))) {
00554                         logger.msg(ERROR, "Failed to send content of buffer");
00555                         break;
00556                     };
00557                 };
00558             };
00559             delete nextoutmsg.Payload();
00560         };
00561         if(!ret) break;
00562     };
00563     it.lock_.lock();
00564     for(std::list<mcc_tcp_exec_t>::iterator e = it.executers_.begin();e != it.executers_.end();++e) {
00565         if(id == e->id) {
00566             s=e->handle;
00567             it.executers_.erase(e);
00568             break;
00569         };
00570     };
00571     ::shutdown(s,2);
00572     ::close(s);
00573     it.cond_.signal();
00574     it.lock_.unlock();
00575     return;
00576 }
00577 
00578 MCC_Status MCC_TCP_Service::process(Message&,Message&) {
00579   // Service is not really processing messages because there
00580   // are no lower lelel MCCs in chain.
00581   return MCC_Status();
00582 }
00583 
00584 MCC_TCP_Client::MCC_TCP_Client(Config *cfg):MCC_TCP(cfg),s_(NULL) {
00585 #ifdef WIN32
00586     WSADATA wsadata;
00587     if (WSAStartup(MAKEWORD(2,2), &wsadata) != 0) {
00588       logger.msg(ERROR, "Cannot initialize winsock library");
00589         return;
00590     }
00591 #endif
00592     XMLNode c = (*cfg)["Connect"][0];
00593     if(!c) {
00594         logger.msg(ERROR,"No Connect element specified");
00595         return;
00596     };
00597 
00598     std::string port_s = c["Port"];
00599     if(port_s.empty()) {
00600         logger.msg(ERROR,"Missing Port in Connect element");
00601         return;
00602     };
00603 
00604     std::string host_s = c["Host"];
00605     if(host_s.empty()) {
00606         logger.msg(ERROR,"Missing Host in Connect element");
00607         return;
00608     };
00609 
00610     int port = atoi(port_s.c_str());
00611 
00612     std::string timeout_s = c["Timeout"];
00613     int timeout = 60;
00614     if (!timeout_s.empty()) {
00615         timeout = atoi(timeout_s.c_str());
00616     }
00617     s_ = new PayloadTCPSocket(host_s.c_str(),port,timeout,logger);
00618     if(!(*s_)) {
00619         delete s_; s_ = NULL;
00620     } else {
00621        std::string v = c["NoDelay"];
00622        s_->NoDelay((v == "true") || (v == "1"));
00623     };
00624 }
00625 
00626 MCC_TCP_Client::~MCC_TCP_Client(void) {
00627     if(s_) delete(s_);
00628 #ifdef WIN32
00629     WSACleanup();
00630 #endif
00631 }
00632 
00633 MCC_Status MCC_TCP_Client::process(Message& inmsg,Message& outmsg) {
00634     // Accepted payload is Raw
00635     // Returned payload is Stream
00636 
00637     logger.msg(VERBOSE, "client process called");
00638     //outmsg.Attributes(inmsg.Attributes());
00639     //outmsg.Context(inmsg.Context());
00640     if(!s_) return MCC_Status(GENERIC_ERROR);
00641     // Extracting payload
00642     if(!inmsg.Payload()) return MCC_Status(GENERIC_ERROR);
00643     PayloadRawInterface* inpayload = NULL;
00644     try {
00645         inpayload = dynamic_cast<PayloadRawInterface*>(inmsg.Payload());
00646     } catch(std::exception& e) { };
00647     if(!inpayload) return MCC_Status(GENERIC_ERROR);
00648     if(!ProcessSecHandlers(inmsg,"outgoing")) return MCC_Status(GENERIC_ERROR);
00649     // Sending payload
00650     for(int n=0;;++n) {
00651         char* buf = inpayload->Buffer(n);
00652         if(!buf) break;
00653         int bufsize = inpayload->BufferSize(n);
00654         if(!(s_->Put(buf,bufsize))) {
00655             logger.msg(ERROR, "Failed to send content of buffer");
00656             return MCC_Status();
00657         };
00658     };
00659     std::string host_attr,port_attr;
00660     std::string remotehost_attr,remoteport_attr;
00661     std::string endpoint_attr;
00662     // Extract useful attributes
00663     {
00664       struct sockaddr_storage addr;
00665       socklen_t addrlen;
00666       addrlen=sizeof(addr);
00667       if (getsockname(s_->GetHandle(), (struct sockaddr*)&addr, &addrlen) == 0)
00668         get_host_port(&addr, host_attr, port_attr);
00669       addrlen=sizeof(addr);
00670       if (getpeername(s_->GetHandle(), (struct sockaddr*)&addr, &addrlen) == 0)
00671         if (get_host_port(&addr, remotehost_attr, remoteport_attr))
00672           endpoint_attr = "://"+remotehost_attr+":"+remoteport_attr;
00673     }
00674     outmsg.Payload(new PayloadTCPSocket(*s_));
00675     outmsg.Attributes()->set("TCP:HOST",host_attr);
00676     outmsg.Attributes()->set("TCP:PORT",port_attr);
00677     outmsg.Attributes()->set("TCP:REMOTEHOST",remotehost_attr);
00678     outmsg.Attributes()->set("TCP:REMOTEPORT",remoteport_attr);
00679     outmsg.Attributes()->set("TCP:ENDPOINT",endpoint_attr);
00680     outmsg.Attributes()->set("ENDPOINT",endpoint_attr);
00681     if(!ProcessSecHandlers(outmsg,"incoming")) return MCC_Status(GENERIC_ERROR);
00682     return MCC_Status(STATUS_OK);
00683 }
00684 } // namespace ARC