Back to index

nordugrid-arc-nox  1.1.0~rc6
Classes | Public Member Functions | Protected Member Functions | Protected Attributes | Static Protected Attributes | Static Private Member Functions | Private Attributes | Friends
Arc::MCC_TCP_Service Class Reference

This class is MCC implementing TCP server. More...

#include <MCCTCP.h>

Inheritance diagram for Arc::MCC_TCP_Service:
Inheritance graph
[legend]
Collaboration diagram for Arc::MCC_TCP_Service:
Collaboration graph
[legend]

List of all members.

Classes

class  mcc_tcp_exec_t
class  mcc_tcp_handle_t

Public Member Functions

 MCC_TCP_Service (Config *cfg)
 executing function for connection thread
virtual ~MCC_TCP_Service (void)
virtual MCC_Status process (Message &, Message &)
 Dummy Message processing method.
virtual void Next (MCCInterface *next, const std::string &label="")
 Add reference to next MCC in chain.
virtual void AddSecHandler (Config *cfg, ArcSec::SecHandler *sechandler, const std::string &label="")
 Add security components/handlers to this MCC.
virtual void Unlink ()
 Removing all links.

Protected Member Functions

MCCInterfaceNext (const std::string &label="")
bool ProcessSecHandlers (Message &message, const std::string &label="") const
 Executes security handlers of specified queue.

Protected Attributes

std::map< std::string,
MCCInterface * > 
next_
 Set of labeled "next" components.
std::map< std::string,
std::list< ArcSec::SecHandler * > > 
sechandlers_
 Set of labeled authentication and authorization handlers.

Static Protected Attributes

static Logger logger
 A logger for MCCs.

Static Private Member Functions

static void listener (void *)
static void executer (void *)
 executing function for listening thread

Private Attributes

std::list< mcc_tcp_handle_thandles_
std::list< mcc_tcp_exec_texecuters_
 listening sockets
int max_executers_
 active connections and associated threads
bool max_executers_drop_
Glib::Mutex lock_
Glib::Cond cond_
 lock for safe operations in internal lists

Friends

class mcc_tcp_exec_t
class PayloadTCPSocket

Detailed Description

This class is MCC implementing TCP server.

Upon creation this object binds to specified TCP ports and listens for incoming TCP connections on dedicated thread. Each connection is accepted and dedicated thread is created. Then that thread is used to call process() method of next MCC in chain. That method is passed payload implementing PayloadStreamInterface. On response payload with PayloadRawInterface is expected. Alternatively called MCC may use provided PayloadStreamInterface to send it's response back directly. During processing of request this MCC generates following attributes: TCP:HOST - IP address of interface to which local TCP socket is bound TCP:PORT - port number to which local TCP socket is bound TCP:REMOTEHOST - IP address from which connection is accepted TCP:REMOTEPORT - TCP port from which connection is accepted TCP:ENDPOINT - URL-like representation of remote connection - ://HOST:PORT ENDPOINT - global attribute equal to TCP:ENDPOINT

Definition at line 40 of file MCCTCP.h.


Constructor & Destructor Documentation

executing function for connection thread

Definition at line 89 of file MCCTCP.cpp.

                                           :MCC_TCP(cfg),max_executers_(-1),max_executers_drop_(false) {
#ifdef WIN32
    WSADATA wsadata;
    if (WSAStartup(MAKEWORD(2,2), &wsadata) != 0) {
      logger.msg(ERROR, "Cannot initialize winsock library");
        return;
    }
#endif
    for(int i = 0;;++i) {
        struct addrinfo hint;
        struct addrinfo *info = NULL;
        memset(&hint, 0, sizeof(hint));
        hint.ai_socktype = SOCK_STREAM;
        hint.ai_protocol = IPPROTO_TCP; // ?
        hint.ai_flags = AI_PASSIVE;
        XMLNode l = (*cfg)["Listen"][i];
        if(!l) break;
        std::string port_s = l["Port"];
        if(port_s.empty()) {
            logger.msg(ERROR, "Missing Port in Listen element");
            continue;
        };
        std::string interface_s = l["Interface"];
        std::string version_s = l["Version"];
        if(!version_s.empty()) {
            if(version_s == "4") { hint.ai_family = AF_INET; }
            else if(version_s == "6") { hint.ai_family = AF_INET6; }
            else {
                logger.msg(ERROR, "Version in Listen element can't be recognized");
                continue;
            };
        };
        int ret = getaddrinfo(interface_s.empty()?NULL:interface_s.c_str(),
                              port_s.c_str(), &hint, &info);
        if (ret != 0) {
            std::string err_str = gai_strerror(ret);
            if(interface_s.empty()) {
              logger.msg(ERROR, "Failed to obtain local address for port %s - %s", port_s, err_str);
            } else {
              logger.msg(ERROR, "Failed to obtain local address for %s:%s - %s", interface_s, port_s, err_str);
            };
            continue;
        };
        for(struct addrinfo *info_ = info;info_;info_=info_->ai_next) {
            if(interface_s.empty()) {
              logger.msg(VERBOSE, "Trying to listen on TCP port %s(%s)", port_s, PROTO_NAME(info_));
            } else {
              logger.msg(VERBOSE, "Trying to listen on %s:%s(%s)", interface_s, port_s, PROTO_NAME(info_));
            };
            int s = ::socket(info_->ai_family,info_->ai_socktype,info_->ai_protocol);
            if(s == -1) {
                std::string e = StrError(errno);
                if(interface_s.empty()) {
                  logger.msg(ERROR, "Failed to create socket for for listening at TCP port %s(%s): %s", port_s, PROTO_NAME(info_),e);
                } else {
                  logger.msg(ERROR, "Failed to create socket for for listening at %s:%s(%s): %s", interface_s, port_s, PROTO_NAME(info_),e);
                };
                continue;
            };
#ifdef IPV6_V6ONLY
            if(info_->ai_family == AF_INET6) {
              int v = 1;
              // Some systems (Linux for example) make v6 support v4 too
              // by default. Some don't. Make it same for everyone - 
              // separate sockets for v4 and v6.
              if(setsockopt(s,IPPROTO_IPV6,IPV6_V6ONLY,&v,sizeof(v)) != 0) {
                if(interface_s.empty()) {
                  logger.msg(ERROR, "Failed to limit socket to IPv6 at TCP port %s - may cause errors for IPv4 at same port", port_s);
                } else {
                  logger.msg(ERROR, "Failed to limit socket to IPv6 at %s:%s - may cause errors for IPv4 at same port", interface_s, port_s);
                };
              };
            };
#endif
            if(::bind(s,info_->ai_addr,info_->ai_addrlen) == -1) {
                std::string e = StrError(errno);
                if(interface_s.empty()) {
                  logger.msg(ERROR, "Failed to bind socket for TCP port %s(%s): %s", port_s, PROTO_NAME(info_),e);
                } else {
                  logger.msg(ERROR, "Failed to bind socket for %s:%s(%s): %s", interface_s, port_s, PROTO_NAME(info_),e);
                };
                close(s);
                continue;
            };
            if(::listen(s,-1) == -1) {
                std::string e = StrError(errno);
                if(interface_s.empty()) {
                  logger.msg(WARNING, "Failed to listen at TCP port %s(%s): %s", port_s, PROTO_NAME(info_),e);
                } else {
                  logger.msg(WARNING, "Failed to listen at %s:%s(%s): %s", interface_s, port_s, PROTO_NAME(info_),e);
                };
                close(s);
                continue;
            };
            bool no_delay = false;
            if(l["NoDelay"]) {
                std::string v = l["NoDelay"];
                if((v == "true") || (v == "1")) no_delay=true;
            }
            int timeout = 60;
            if (l["Timeout"]) {
                std::string v = l["Timeout"];
                timeout = atoi(v.c_str());
            }
            handles_.push_back(mcc_tcp_handle_t(s,timeout,no_delay));
            if(interface_s.empty()) {
              logger.msg(INFO, "Listening on TCP port %s(%s)", port_s, PROTO_NAME(info_));
            } else {
              logger.msg(INFO, "Listening on %s:%s(%s)", interface_s, port_s, PROTO_NAME(info_));
            };
        };
        freeaddrinfo(info);
    };
    if(handles_.size() == 0) {
        logger.msg(ERROR, "No listening ports initiated");
        return;
    };
    if((*cfg)["Limit"]) {
      std::string v = (*cfg)["Limit"];
      max_executers_ = atoi(v.c_str());
      v=(std::string)((*cfg)["Limit"].Attribute("drop"));
      if((v == "yes") || (v == "true") || (v == "1")) {
        max_executers_drop_=true;
      };
      if(max_executers_ > 0) {
        logger.msg(INFO, "Setting connections limit to %i, connections over limit will be %s",max_executers_,max_executers_drop_?"dropped":"put on hold");
      };
    };
    if(!CreateThreadFunction(&listener,this)) {
        logger.msg(ERROR, "Failed to start thread for listening");
        for(std::list<mcc_tcp_handle_t>::iterator i = handles_.begin();i!=handles_.end();i=handles_.erase(i)) ::close(i->handle);
    };
}

Here is the call graph for this function:

Definition at line 223 of file MCCTCP.cpp.

                                      {
    //logger.msg(VERBOSE, "TCP_Service destroy");
    lock_.lock();
    for(std::list<mcc_tcp_handle_t>::iterator i = handles_.begin();i!=handles_.end();++i) {
        ::close(i->handle); i->handle=-1;
    };
    for(std::list<mcc_tcp_exec_t>::iterator e = executers_.begin();e != executers_.end();++e) {
        ::close(e->handle); e->handle=-1;
    };
    // Wait for threads to exit
    while(executers_.size() > 0) {
        lock_.unlock(); sleep(1); lock_.lock();
    };
    while(handles_.size() > 0) {
        lock_.unlock(); sleep(1); lock_.lock();
    };
    lock_.unlock();
#ifdef WIN32
    WSACleanup();
#endif
}

Member Function Documentation

void Arc::MCC::AddSecHandler ( Config cfg,
ArcSec::SecHandler sechandler,
const std::string &  label = "" 
) [virtual, inherited]

Add security components/handlers to this MCC.

Security handlers are stacked into a few queues with each queue identified by its label. The queue labelled 'incoming' is executed for every 'request' message after the message is processed by the MCC on the service side and before processing on the client side. The queue labelled 'outgoing' is run for response message before it is processed by MCC algorithms on the service side and after processing on the client side. Those labels are just a matter of agreement and some MCCs may implement different queues executed at various message processing steps.

Definition at line 35 of file MCC.cpp.

                                {
    if (sechandler) {
      sechandlers_[label].push_back(sechandler);
      // need polishing to put the SecHandlerFactory->getinstance here
      XMLNode cn = (*cfg)["SecHandler"];
      Config cfg_(cn);
    }
  }

Here is the caller graph for this function:

void Arc::MCC_TCP_Service::executer ( void *  arg) [static, private]

executing function for listening thread

Definition at line 472 of file MCCTCP.cpp.

                                        {
    MCC_TCP_Service& it = *(((mcc_tcp_exec_t*)arg)->obj);
    int s = ((mcc_tcp_exec_t*)arg)->handle;
    int id = ((mcc_tcp_exec_t*)arg)->id;
    int no_delay = ((mcc_tcp_exec_t*)arg)->no_delay;
    int timeout = ((mcc_tcp_exec_t*)arg)->timeout;
    std::string host_attr,port_attr;
    std::string remotehost_attr,remoteport_attr;
    std::string endpoint_attr;
    // Extract useful attributes
    {
        struct sockaddr_storage addr;
        socklen_t addrlen;
        addrlen=sizeof(addr);
        if(getsockname(s, (struct sockaddr*)(&addr), &addrlen) == 0) {
            if (get_host_port(&addr, host_attr, port_attr) == true) {
                endpoint_attr = "://"+host_attr+":"+port_attr;
            }
        }
        if(getpeername(s, (struct sockaddr*)&addr, &addrlen) == 0) {
            get_host_port(&addr, remotehost_attr, remoteport_attr);
        }
        // SESSIONID
    };

    // Creating stream payload
    PayloadTCPSocket stream(s, timeout, logger);
    stream.NoDelay(no_delay);
    MessageContext context;
    MessageAuthContext auth_context;
    for(;;) {
        // TODO: Check state of socket here and leave immediately if not connected anymore.
        // Preparing Message objects for chain
        MessageAttributes attributes_in;
        MessageAttributes attributes_out;
        MessageAuth auth_in;
        MessageAuth auth_out;
        Message nextinmsg;
        Message nextoutmsg;
        nextinmsg.Payload(&stream);
        nextinmsg.Attributes(&attributes_in);
        nextinmsg.Attributes()->set("TCP:HOST",host_attr);
        nextinmsg.Attributes()->set("TCP:PORT",port_attr);
        nextinmsg.Attributes()->set("TCP:REMOTEHOST",remotehost_attr);
        nextinmsg.Attributes()->set("TCP:REMOTEPORT",remoteport_attr);
        nextinmsg.Attributes()->set("TCP:ENDPOINT",endpoint_attr);
        nextinmsg.Attributes()->set("ENDPOINT",endpoint_attr);
        nextinmsg.Context(&context);
        nextoutmsg.Attributes(&attributes_out);
        nextinmsg.Auth(&auth_in);
        TCPSecAttr* tattr = new TCPSecAttr(remotehost_attr, remoteport_attr, host_attr, port_attr);
        nextinmsg.Auth()->set("TCP",tattr);
        nextoutmsg.Auth(&auth_out);
        nextoutmsg.Context(&context);
        nextoutmsg.AuthContext(&auth_context);
        if(!it.ProcessSecHandlers(nextinmsg,"incoming")) break;
        // Call next MCC
        MCCInterface* next = it.Next();
        if(!next) break;
        logger.msg(VERBOSE, "next chain element called");
        MCC_Status ret = next->process(nextinmsg,nextoutmsg);
        if(!it.ProcessSecHandlers(nextoutmsg,"outgoing")) {
          if(nextoutmsg.Payload()) delete nextoutmsg.Payload();
          break;
        };
        // If nextoutmsg contains some useful payload send it here.
        // So far only buffer payload is supported
        // Extracting payload
        if(nextoutmsg.Payload()) {
            PayloadRawInterface* outpayload = NULL;
            try {
                outpayload = dynamic_cast<PayloadRawInterface*>(nextoutmsg.Payload());
            } catch(std::exception& e) { };
            if(!outpayload) {
                logger.msg(WARNING, "Only Raw Buffer payload is supported for output");
            } else {
                // Sending payload
                for(int n=0;;++n) {
                    char* buf = outpayload->Buffer(n);
                    if(!buf) break;
                    int bufsize = outpayload->BufferSize(n);
                    if(!(stream.Put(buf,bufsize))) {
                        logger.msg(ERROR, "Failed to send content of buffer");
                        break;
                    };
                };
            };
            delete nextoutmsg.Payload();
        };
        if(!ret) break;
    };
    it.lock_.lock();
    for(std::list<mcc_tcp_exec_t>::iterator e = it.executers_.begin();e != it.executers_.end();++e) {
        if(id == e->id) {
            s=e->handle;
            it.executers_.erase(e);
            break;
        };
    };
    ::shutdown(s,2);
    ::close(s);
    it.cond_.signal();
    it.lock_.unlock();
    return;
}

Here is the call graph for this function:

Here is the caller graph for this function:

void Arc::MCC_TCP_Service::listener ( void *  arg) [static, private]

Definition at line 262 of file MCCTCP.cpp.

                                        {
    MCC_TCP_Service& it = *((MCC_TCP_Service*)arg);
    for(;;) {
        int max_s = -1;
        fd_set readfds;
        FD_ZERO(&readfds);
        it.lock_.lock();
        for(std::list<mcc_tcp_handle_t>::iterator i = it.handles_.begin();i!=it.handles_.end();) {
            int s = i->handle;
            if(s == -1) { i=it.handles_.erase(i); continue; };
            FD_SET(s,&readfds);
            if(s > max_s) max_s = s;
            ++i;
        };
        it.lock_.unlock();
        if(max_s == -1) break;
        struct timeval tv; tv.tv_sec = 2; tv.tv_usec = 0;
        int n = select(max_s+1,&readfds,NULL,NULL,&tv);
        if(n < 0) {
            if(ErrNo != EINTR) {
                logger.msg(ERROR, "Failed while waiting for connection request");
                it.lock_.lock();
                for(std::list<mcc_tcp_handle_t>::iterator i = it.handles_.begin();i!=it.handles_.end();) {
                    int s = i->handle;
                    ::close(s);
                    i=it.handles_.erase(i);
                };
                it.lock_.unlock();
                return;
            };
            continue;
        } else if(n == 0) continue;
        it.lock_.lock();
        for(std::list<mcc_tcp_handle_t>::iterator i = it.handles_.begin();i!=it.handles_.end();++i) {
            int s = i->handle;
            if(s == -1) continue;
            if(FD_ISSET(s,&readfds)) {
                it.lock_.unlock();
                struct sockaddr addr;
                socklen_t addrlen = sizeof(addr);
                int h = ::accept(s,&addr,&addrlen);
                if(h == -1) {
                    logger.msg(ERROR, "Failed to accept connection request");
                    it.lock_.lock();
                } else {
                    it.lock_.lock();
                    bool rejected = false;
                    bool first_time = true;
                    while((it.max_executers_ > 0) &&
                          (it.executers_.size() >= it.max_executers_)) {
                      if(it.max_executers_drop_) {
                        logger.msg(WARNING, "Too many connections - dropping new one");
                        ::shutdown(s,2);
                        ::close(s);
                        rejected = true;
                        break;
                      } else {
                        if(first_time)
                          logger.msg(WARNING, "Too many connections - waiting for old to close");
                        Glib::TimeVal etime;
                        etime.assign_current_time();
                        etime.add_milliseconds(10000); // 10 s
                        it.cond_.timed_wait(it.lock_,etime);
                      };
                    };
                    if(!rejected) {
                      mcc_tcp_exec_t t(&it,h,i->timeout,i->no_delay);
                    };
                };
            };
        };
        it.lock_.unlock();
    };
    return;
}

Here is the call graph for this function:

Here is the caller graph for this function:

MCCInterface * Arc::MCC::Next ( const std::string &  label = "") [protected, inherited]

Definition at line 22 of file MCC.cpp.

                                                {
    std::map<std::string, MCCInterface *>::iterator n = next_.find(label);
    if (n == next_.end())
      return NULL;
    return n->second;
  }

Here is the caller graph for this function:

void Arc::MCC::Next ( MCCInterface next,
const std::string &  label = "" 
) [virtual, inherited]

Add reference to next MCC in chain.

This method is called by Loader for every potentially labeled link to next component which implements MCCInterface. If next is NULL corresponding link is removed.

Reimplemented in Arc::MCC_TLS_Client, Arc::Plexer, and Arc::MCC_GSI_Client.

Definition at line 15 of file MCC.cpp.

                                                           {
    if (next == NULL)
      next_.erase(label);
    else
      next_[label] = next;
  }

Dummy Message processing method.

Just a placeholder.

Reimplemented from Arc::MCC.

Definition at line 578 of file MCCTCP.cpp.

                                                     {
  // Service is not really processing messages because there
  // are no lower lelel MCCs in chain.
  return MCC_Status();
}
bool Arc::MCC::ProcessSecHandlers ( Message message,
const std::string &  label = "" 
) const [protected, inherited]

Executes security handlers of specified queue.

Returns true if the message is authorized for further processing or if there are no security handlers which implement authorization functionality. This is a convenience method and has to be called by the implemention of the MCC.

Definition at line 45 of file MCC.cpp.

                                           {
    // Each MCC/Service can define security handler queues in the configuration
    // file, the queues have labels specified in handlers configuration 'event'
    // attribute.
    // Security handlers in one queue are called sequentially.
    // Each one should be configured carefully, because there can be some
    // relationship between them (e.g. authentication should be put in front
    // of authorization).
    // The SecHandler::Handle() only returns true/false with true meaning that
    // handler processed message successfuly. If SecHandler implements
    // authorization functionality, it returns false if message is disallowed
    // and true otherwise.
    // If any SecHandler in the handler chain produces some information which
    // will be used by some following handler, the information should be
    // stored in the attributes of message (e.g. the Identity extracted from
    // authentication will be used by authorization to make access control
    // decision).
    std::map<std::string, std::list<ArcSec::SecHandler *> >::const_iterator q =
      sechandlers_.find(label);
    if (q == sechandlers_.end()) {
      logger.msg(DEBUG,
     "No security processing/check requested for '%s'", label);
      return true;
    }
    for (std::list<ArcSec::SecHandler *>::const_iterator h = q->second.begin();
         h != q->second.end(); ++h) {
      const ArcSec::SecHandler *handler = *h;
      if (!handler)
        continue; // Shouldn't happen. Just a sanity check.
      if (!(handler->Handle(&message))) {
        logger.msg(INFO, "Security processing/check failed");
        return false;
      }
    }
    logger.msg(DEBUG, "Security processing/check passed");
    return true;
  }

Here is the call graph for this function:

Here is the caller graph for this function:

void Arc::MCC::Unlink ( ) [virtual, inherited]

Removing all links.

Useful for destroying chains.

Definition at line 29 of file MCC.cpp.

                   {
    for (std::map<std::string, MCCInterface *>::iterator n = next_.begin();
         n != next_.end(); n = next_.begin())
      next_.erase(n);
  }

Here is the caller graph for this function:


Friends And Related Function Documentation

friend class mcc_tcp_exec_t [friend]

Definition at line 42 of file MCCTCP.h.

friend class PayloadTCPSocket [friend, inherited]

Definition at line 20 of file MCCTCP.h.


Member Data Documentation

Glib::Cond Arc::MCC_TCP_Service::cond_ [private]

lock for safe operations in internal lists

Definition at line 69 of file MCCTCP.h.

listening sockets

Definition at line 64 of file MCCTCP.h.

Definition at line 63 of file MCCTCP.h.

Glib::Mutex Arc::MCC_TCP_Service::lock_ [private]

Definition at line 68 of file MCCTCP.h.

Arc::Logger Arc::MCC_TCP::logger [static, protected, inherited]

A logger for MCCs.

A logger intended to be the parent of loggers in the different MCCs.

Reimplemented from Arc::MCC.

Definition at line 19 of file MCCTCP.h.

active connections and associated threads

Definition at line 65 of file MCCTCP.h.

Definition at line 66 of file MCCTCP.h.

std::map<std::string, MCCInterface *> Arc::MCC::next_ [protected, inherited]

Set of labeled "next" components.

Each implemented MCC must call process() method of corresponding MCCInterface from this set in own process() method.

Definition at line 50 of file MCC.h.

std::map<std::string, std::list<ArcSec::SecHandler *> > Arc::MCC::sechandlers_ [protected, inherited]

Set of labeled authentication and authorization handlers.

MCC calls sequence of handlers at specific point depending on associated identifier. In most aces those are "in" and "out" for incoming and outgoing messages correspondingly.

Definition at line 57 of file MCC.h.


The documentation for this class was generated from the following files: