Back to index

nordugrid-arc-nox  1.1.0~rc6
LutsDestination.cpp
Go to the documentation of this file.
00001 #include "LutsDestination.h"
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #include "jura.h"
00008 #include "arc/Utils.h"
00009 #include <sstream>
00010 
00011 namespace Arc
00012 {
00013   LutsDestination::LutsDestination(JobLogFile& joblog):
00014     logger(Arc::Logger::rootLogger, "JURA.LutsDestination"),
00015     urn(0),
00016     usagerecordset(Arc::NS("","http://schema.ogf.org/urf/2003/09/urf"),
00017                    "UsageRecords")
00018   {
00019     std::string dump;
00020 
00021     //Set up the Client Message Chain:
00022     Arc::NS ns;
00023     ns[""]="http://www.nordugrid.org/schemas/ArcConfig/2007";
00024     ns["tcp"]="http://www.nordugrid.org/schemas/ArcMCCTCP/2007";
00025     clientchain.Namespaces(ns);
00026 
00027     //Get service URL, cert, key, CA path from job log file
00028     std::string serviceurl=joblog["loggerurl"];
00029     std::string certfile=joblog["certificate_path"];
00030     std::string keyfile=joblog["key_path"];
00031     std::string cadir=joblog["ca_certificates_dir"];
00032     // ...or get them from environment
00033     if (certfile.empty())
00034       certfile=Arc::GetEnv("X509_USER_CERT");
00035     if (keyfile.empty())
00036       keyfile=Arc::GetEnv("X509_USER_KEY");
00037     if (cadir.empty())
00038       cadir=Arc::GetEnv("X509_CERT_DIR");
00039     // ...or by default, use host cert, key, CA path
00040     if (certfile.empty())
00041       certfile=JURA_DEFAULT_CERT_FILE;
00042     if (keyfile.empty())
00043       keyfile=JURA_DEFAULT_KEY_FILE;
00044     if (cadir.empty())
00045       cadir=JURA_DEFAULT_CA_DIR;
00046 
00047     //  Tokenize service URL
00048     std::string host, port, endpoint;
00049     if (serviceurl.empty())
00050       {
00051         logger.msg(Arc::ERROR, "ServiceURL missing");
00052       }
00053     else
00054       {
00055         Arc::URL url(serviceurl);
00056         if (url.Protocol()!="https")
00057           {
00058             logger.msg(Arc::ERROR, "Protocol is %s, should be https",
00059                        url.Protocol());
00060           }
00061         host=url.Host();
00062         std::ostringstream os;
00063         os<<url.Port();
00064         port=os.str();
00065         endpoint=url.Path();
00066       }
00067 
00068 #ifdef HAVE_CONFIG_H
00069     //MCC module path(s):
00070     clientchain.NewChild("ModuleManager").NewChild("Path")=
00071       INSTPREFIX "/" LIBSUBDIR;
00072     clientchain["ModuleManager"].NewChild("Path")=
00073       INSTPREFIX "/" PKGLIBSUBDIR;
00074 #endif
00075 
00076     //The protocol stack: SOAP over HTTP over SSL over TCP
00077     clientchain.NewChild("Plugins").NewChild("Name")="mcctcp";
00078     clientchain.NewChild("Plugins").NewChild("Name")="mcctls";
00079     clientchain.NewChild("Plugins").NewChild("Name")="mcchttp";
00080     clientchain.NewChild("Plugins").NewChild("Name")="mccsoap";
00081 
00082 
00083     //The chain
00084     Arc::XMLNode chain=clientchain.NewChild("Chain");
00085     Arc::XMLNode component;
00086   
00087     //  TCP
00088     component=chain.NewChild("Component");
00089     component.NewAttribute("name")="tcp.client";
00090     component.NewAttribute("id")="tcp";
00091     Arc::XMLNode connect=component.NewChild("tcp:Connect");
00092     connect.NewChild("tcp:Host")=host;
00093     connect.NewChild("tcp:Port")=port;
00094 
00095     //  TLS (SSL)
00096     component=chain.NewChild("Component");
00097     component.NewAttribute("name")="tls.client";
00098     component.NewAttribute("id")="tls";
00099     component.NewChild("next").NewAttribute("id")="tcp";
00100     if (!certfile.empty())
00101       component.NewChild("CertificatePath")=certfile;
00102     if (!keyfile.empty())
00103       component.NewChild("KeyPath")=keyfile;
00104     if (!cadir.empty())
00105       component.NewChild("CACertificatesDir")=cadir;
00106   
00107     //  HTTP
00108     component=chain.NewChild("Component");
00109     component.NewAttribute("name")="http.client";
00110     component.NewAttribute("id")="http";
00111     component.NewChild("next").NewAttribute("id")="tls";
00112     component.NewChild("Method")="POST";
00113     component.NewChild("Endpoint")=endpoint;
00114   
00115     //  SOAP
00116     component=chain.NewChild("Component");
00117     component.NewAttribute("name")="soap.client";
00118     component.NewAttribute("id")="soap";
00119     component.NewAttribute("entry")="soap";
00120     component.NewChild("next").NewAttribute("id")="http";
00121 
00122     clientchain.GetDoc(dump,true);
00123     logger.msg(Arc::VERBOSE, "Client chain configuration: %s",
00124                dump.c_str() );
00125 
00126     //Get Batch Size:
00127     //Default value:
00128     max_ur_set_size=JURA_DEFAULT_MAX_UR_SET_SIZE;
00129     //From jobreport_options:
00130     std::string urbatch=joblog["jobreport_option_urbatch"];
00131     if (!urbatch.empty())
00132       {
00133        std::istringstream is(urbatch);
00134        is>>max_ur_set_size;
00135       }
00136 
00137   }
00138 
00139   void LutsDestination::report(Arc::JobLogFile &joblog)
00140   {
00141     //if (joblog.exists())
00142       {
00143         //Store copy of job log
00144         joblogs.push_back(joblog);
00145         //Create UR if can
00146         Arc::XMLNode usagerecord(Arc::NS(), "");
00147        joblog.createUsageRecord(usagerecord);
00148        if (usagerecord)
00149          {
00150            usagerecordset.NewChild(usagerecord);
00151            ++urn;
00152          }
00153        else
00154          {
00155            logger.msg(Arc::INFO,"Ignoring incomplete log file \"%s\"",
00156                      joblog.getFilename().c_str());
00157            joblog.remove();
00158          }
00159       }
00160     
00161     if (urn==max_ur_set_size)
00162       // Batch is full. Submit and delete job log files.
00163       submit_batch();
00164   }
00165 
00166   void LutsDestination::finish()
00167   {
00168     if (urn>0)
00169       // Send the remaining URs and delete job log files.
00170       submit_batch();
00171   }
00172 
00173   int LutsDestination::submit_batch()
00174   {
00175     std::string urstr;
00176     usagerecordset.GetDoc(urstr,false);
00177 
00178     logger.msg(Arc::INFO, 
00179                "Logging UR set of %d URs.",
00180                urn);
00181     logger.msg(Arc::DEBUG, 
00182                "UR set dump: %s",
00183                urstr.c_str());
00184   
00185     // Communication with LUTS server
00186     Arc::MCC_Status status=send_request(urstr);
00187 
00188     if (status.isOk())
00189       {
00190         // Delete log files
00191         for (std::list<JobLogFile>::iterator jp=joblogs.begin();
00192              jp!=joblogs.end();
00193              jp++
00194              )
00195           {
00196             (*jp).remove();
00197           }
00198         clear();
00199         return 0;
00200       }
00201     else // status.isnotOk
00202       {
00203         logger.msg(Arc::ERROR, 
00204                    "%s: %s",
00205                    status.getOrigin().c_str(),
00206                    status.getExplanation().c_str()
00207                    );
00208         clear();
00209         return -1;
00210       }
00211   }
00212 
00213   Arc::MCC_Status LutsDestination::send_request(const std::string &urset)
00214   {
00215     Arc::NS _empty_ns;
00216     Arc::PayloadSOAP req(_empty_ns);
00217     Arc::PayloadSOAP *resp=NULL;
00218     Arc::Message inmsg,outmsg;
00219 
00220     //Create MCC loader. This also sets up TCP connection!
00221     mccloader=new Arc::MCCLoader(clientchain);
00222     //(and we also get a load of log entries)
00223 
00224     soapmcc=(*mccloader)["soap"];
00225     if (!soapmcc)
00226     {
00227       delete mccloader;
00228       return Arc::MCC_Status(Arc::GENERIC_ERROR,
00229                              "lutsclient",
00230                              "No SOAP entry point in chain");
00231     }
00232 
00233     //TODO ws-addressing!
00234 
00235     //Build request structure:
00236     Arc::NS ns_wsrp("",
00237      "http://docs.oasis-open.org/wsrf/2004/06/wsrf-WS-ResourceProperties-1.2-draft-01.xsd"
00238                     );
00239     Arc::XMLNode query=req.NewChild("QueryResourceProperties",ns_wsrp).
00240                            NewChild("QueryExpression");
00241     query.NewAttribute("Dialect")=
00242       "http://www.sgas.se/namespaces/2005/06/publish/query";
00243     query=urset;
00244     //put into message:
00245     inmsg.Payload(&req);
00246 
00247     //Send
00248     Arc::MCC_Status status;
00249 
00250     status=soapmcc->process(inmsg, outmsg);
00251 
00252     //extract response:
00253     try
00254     {
00255       resp=dynamic_cast<Arc::PayloadSOAP*>(outmsg.Payload());
00256     }
00257     catch (std::exception&) {}
00258 
00259     if (resp==NULL)
00260       {
00261         //Unintelligible non-SOAP response
00262         delete mccloader;
00263         return Arc::MCC_Status(Arc::PROTOCOL_RECOGNIZED_ERROR,
00264                                "lutsclient",
00265                                "Response not SOAP");
00266       }
00267 
00268     if (status && ! ((*resp)["QueryResourcePropertiesResponse"]))
00269       {
00270         // Status OK, but wrong response
00271         std::string soapfault;
00272         resp->GetDoc(soapfault,false);
00273 
00274         delete mccloader;
00275         return Arc::MCC_Status(Arc::PARSING_ERROR,
00276                "lutsclient",
00277                std::string(
00278                  "No QueryResourcePropertiesResponse element in response: "
00279                            )+ 
00280                soapfault
00281                );
00282       }
00283     
00284     delete mccloader;
00285     return status;
00286   }
00287 
00288   void LutsDestination::clear()
00289   {
00290     urn=0;
00291     joblogs.clear();
00292     usagerecordset.Replace(
00293         Arc::XMLNode(Arc::NS("",
00294                              "http://schema.ogf.org/urf/2003/09/urf"
00295                             ),
00296                      "UsageRecords")
00297                     );
00298   }
00299 
00300   LutsDestination::~LutsDestination()
00301   {
00302     finish();
00303   }
00304 }