Back to index

nordugrid-arc-nox  1.1.0~rc6
arex_client.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 
00005 // arex_client.cpp
00006 
00007 #include <arc/delegation/DelegationInterface.h>
00008 
00009 #include "arex_client.h"
00010 
00011 namespace Arc {
00012 
00013   // TODO: probably worth moving it to common library
00014   // Of course xpath can be used too. But such solution is probably an overkill.
00015   static Arc::XMLNode find_xml_node(Arc::XMLNode node,const std::string& el_name,
00016                                     const std::string& attr_name,const std::string& attr_value) {
00017     if(MatchXMLName(node,el_name) && 
00018        (((std::string)node.Attribute(attr_name)) == attr_value)) return node;
00019     XMLNode cn = node[el_name];
00020     while(cn) {
00021       XMLNode fn = find_xml_node(cn,el_name,attr_name,attr_value);
00022       if(fn) return fn;
00023       cn=cn[1];
00024     };
00025     return XMLNode();
00026   }
00027 
00028   Compiler_AREXClientError::Compiler_AREXClientError(const std::string& what) :
00029     std::runtime_error(what)
00030   {
00031   }
00032 
00033   Arc::Logger Compiler_AREXClient::logger(Arc::Logger::rootLogger, "A-REX-Client");
00034 
00035   static void set_arex_namespaces(Arc::NS& ns) {
00036     ns["a-rex"]="http://www.nordugrid.org/schemas/a-rex";
00037     ns["bes-factory"]="http://schemas.ggf.org/bes/2006/08/bes-factory";
00038     ns["wsa"]="http://www.w3.org/2005/08/addressing";
00039     ns["jsdl"]="http://schemas.ggf.org/jsdl/2005/11/jsdl";    
00040     ns["jsdl-posix"]="http://schemas.ggf.org/jsdl/2005/11/jsdl-posix";
00041     ns["jsdl-arc"]="http://www.nordugrid.org/ws/schemas/jsdl-arc";
00042     ns["jsdl-hpcpa"]="http://schemas.ggf.org/jsdl/2006/07/jsdl-hpcpa";
00043   }
00044 
00045   Compiler_AREXClient::Compiler_AREXClient(std::string configFile) throw(Compiler_AREXClientError)
00046     :client_config(NULL),client_loader(NULL),client(NULL),client_entry(NULL)
00047   {
00048     logger.msg(Arc::INFO, "Creating an A-REX client");
00049 
00050     if (configFile=="" && getenv("ARC_Compiler_AREX_CONFIG"))
00051       configFile = getenv("ARC_Compiler_AREX_CONFIG");
00052     if (configFile=="")
00053       configFile = "./arex_client.xml";
00054 
00055     client_config = new Arc::Config(configFile.c_str());
00056     if(!*client_config) {
00057       logger.msg(Arc::ERROR, "Failed to load client configuration");
00058       throw Compiler_AREXClientError("Failed to load client configuration");
00059     }
00060 
00061     client_loader = new Arc::MCCLoader(*client_config);
00062     logger.msg(Arc::INFO, "Client side MCCs are loaded");
00063     client_entry = (*client_loader)["soap"];
00064     if(!client_entry) {
00065       logger.msg(Arc::ERROR, "Client chain does not have entry point");
00066       throw Compiler_AREXClientError("Client chain does not have entry point");
00067     }
00068 
00069     set_arex_namespaces(arex_ns);
00070   }
00071   
00072   Compiler_AREXClient::Compiler_AREXClient(const Arc::URL& url,
00073                          const Arc::MCCConfig& cfg) throw(Compiler_AREXClientError)
00074     :client_config(NULL),client_loader(NULL),client(NULL),client_entry(NULL) {
00075 
00076     logger.msg(Arc::INFO, "Creating an A-REX client");
00077     client = new Arc::ClientSOAP(cfg,url, 60);
00078     set_arex_namespaces(arex_ns);
00079   }
00080   
00081   Compiler_AREXClient::~Compiler_AREXClient()
00082   {
00083     if(client_loader) delete client_loader;
00084     if(client_config) delete client_config;
00085     if(client) delete client;
00086   }
00087   
00088   std::string Compiler_AREXClient::submit(std::istream& jsdl_file,Compiler_AREXFileList& file_list,bool delegate)
00089     throw(Compiler_AREXClientError)
00090   {
00091     std::string jobid, faultstring;
00092     file_list.resize(0);
00093 
00094     logger.msg(Arc::INFO, "Creating and sending request");
00095 
00096     // Create job request
00097     /*
00098       bes-factory:CreateActivity
00099         bes-factory:ActivityDocument
00100           jsdl:JobDefinition
00101     */
00102     Arc::PayloadSOAP req(arex_ns);
00103     Arc::XMLNode op = req.NewChild("bes-factory:CreateActivity");
00104     Arc::XMLNode act_doc = op.NewChild("bes-factory:ActivityDocument");
00105     std::string jsdl_str; 
00106     std::getline<char>(jsdl_file,jsdl_str,0);
00107     act_doc.NewChild(Arc::XMLNode(jsdl_str));
00108     act_doc.Child(0).Namespaces(arex_ns); // Unify namespaces
00109     Arc::PayloadSOAP* resp = NULL;
00110 
00111     XMLNode ds = act_doc["jsdl:JobDefinition"]["jsdl:JobDescription"]["jsdl:DataStaging"];
00112     for(;(bool)ds;ds=ds[1]) {
00113       // FilesystemName - ignore
00114       // CreationFlag - ignore
00115       // DeleteOnTermination - ignore
00116       XMLNode source = ds["jsdl:Source"];
00117       XMLNode target = ds["jsdl:Target"];
00118       if((bool)source) {
00119         std::string s_name = ds["jsdl:FileName"];
00120         if(!s_name.empty()) {
00121           XMLNode x_url = source["jsdl:URI"];
00122           std::string s_url = x_url;
00123           if(s_url.empty()) {
00124             s_url="./"+s_name;
00125           } else {
00126             URL u_url(s_url);
00127             if(!u_url) {
00128               if(s_url[0] != '/') s_url="./"+s_url;
00129             } else {
00130               if(u_url.Protocol() == "file") {
00131                 s_url=u_url.Path();
00132                 if(s_url[0] != '/') s_url="./"+s_url;
00133               } else {
00134                 s_url.resize(0);
00135               };
00136             };
00137           };
00138           if(!s_url.empty()) {
00139             x_url.Destroy();
00140             Compiler_AREXFile file(s_name,s_url);
00141             file_list.push_back(file);
00142           };
00143         };
00144       };
00145     }; 
00146     act_doc.GetXML(jsdl_str);
00147     logger.msg(Arc::DEBUG, "Job description to be sent: %s",jsdl_str);
00148 
00149     // Try to figure out which credentials are used
00150     // TODO: Method used is unstable beacuse it assumes some predefined 
00151     // structure of configuration file. Maybe there should be some 
00152     // special methods of ClientTCP class introduced.
00153     std::string deleg_cert;
00154     std::string deleg_key;
00155     if(delegate) {
00156       client->Load(); // Make sure chain is ready
00157       Arc::XMLNode tls_cfg = find_xml_node((client->GetConfig())["Chain"],"Component","name","tls.client");
00158       if(tls_cfg) {
00159         deleg_cert=(std::string)(tls_cfg["ProxyPath"]);
00160         if(deleg_cert.empty()) {
00161           deleg_cert=(std::string)(tls_cfg["CertificatePath"]);
00162           deleg_key=(std::string)(tls_cfg["KeyPath"]);
00163         } else {
00164           deleg_key=deleg_cert;
00165         };
00166       };
00167       if(deleg_cert.empty() || deleg_key.empty()) {
00168 std::string s;
00169 client->GetConfig().GetXML(s);
00170 std::cerr<<s<<std::endl;
00171         logger.msg(Arc::ERROR,"Failed to find delegation credentials in client configuration");
00172         throw Compiler_AREXClientError("Failed to find delegation credentials in client configuration");
00173       };
00174     };
00175     // Send job request + delegation
00176     if(client) {
00177       {
00178         if(delegate) {
00179           Arc::DelegationProviderSOAP deleg(deleg_cert,deleg_key);
00180           logger.msg(Arc::INFO, "Initiating delegation procedure");
00181           if(!deleg.DelegateCredentialsInit(*(client->GetEntry()),&(client->GetContext()))) {
00182             logger.msg(Arc::ERROR,"Failed to initiate delegation");
00183             throw Compiler_AREXClientError("Failed to initiate delegation");
00184           };
00185           deleg.DelegatedToken(op);
00186         };
00187       };
00188       Arc::MCC_Status status = client->process(
00189          "http://schemas.ggf.org/bes/2006/08/bes-factory/BESFactoryPortType/CreateActivity",
00190          &req,&resp);
00191       if(!status) {
00192         logger.msg(Arc::ERROR, "Submission request failed");
00193         throw Compiler_AREXClientError("Submission request failed");
00194       }
00195       if(resp == NULL) {
00196         logger.msg(Arc::ERROR,"There was no SOAP response");
00197         throw Compiler_AREXClientError("There was no SOAP response");
00198       };
00199     } else if (client_entry) {
00200       Arc::Message reqmsg;
00201       Arc::Message repmsg;
00202       Arc::MessageAttributes attributes_req;
00203       attributes_req.set("SOAP:ACTION","http://schemas.ggf.org/bes/2006/08/bes-factory/BESFactoryPortType/CreateActivity");
00204       Arc::MessageAttributes attributes_rep;
00205       Arc::MessageContext context;
00206       {
00207         if(delegate) {
00208           Arc::DelegationProviderSOAP deleg(deleg_cert,deleg_key);
00209           logger.msg(Arc::INFO, "Initiating delegation procedure");
00210           if(!deleg.DelegateCredentialsInit(*client_entry,&context)) {
00211             logger.msg(Arc::ERROR,"Failed to initiate delegation");
00212             throw Compiler_AREXClientError("Failed to initiate delegation");
00213           };
00214           deleg.DelegatedToken(op);
00215         };
00216       };
00217       reqmsg.Payload(&req);
00218       reqmsg.Attributes(&attributes_req);
00219       reqmsg.Context(&context);
00220       repmsg.Attributes(&attributes_rep);
00221       repmsg.Context(&context);
00222       Arc::MCC_Status status = client_entry->process(reqmsg,repmsg);
00223       if(!status) {
00224         logger.msg(Arc::ERROR, "Submission request failed");
00225         throw Compiler_AREXClientError("Submission request failed");
00226       }
00227       logger.msg(Arc::INFO, "Submission request succeed");
00228       if(repmsg.Payload() == NULL) {
00229         logger.msg(Arc::ERROR, "There was no response to a submission request");
00230         throw Compiler_AREXClientError("There was no response to the submission request");
00231       }
00232       try {
00233         resp = dynamic_cast<Arc::PayloadSOAP*>(repmsg.Payload());
00234       } catch(std::exception&) { };
00235       if(resp == NULL) {
00236         logger.msg(Arc::ERROR,"A response to a submission request was not a SOAP message");
00237         delete repmsg.Payload();
00238         throw Compiler_AREXClientError("The response to the submission request was not a SOAP message");
00239       };
00240     } else {
00241       throw Compiler_AREXClientError("There is no connection chain configured");
00242     };
00243     Arc::XMLNode id, fs;
00244     (*resp)["CreateActivityResponse"]["ActivityIdentifier"].New(id);
00245     (*resp)["Fault"]["faultstring"].New(fs);
00246     id.GetDoc(jobid);
00247     faultstring=(std::string)fs;
00248     delete resp;
00249     if (faultstring=="")
00250       return jobid;
00251     else
00252       throw Compiler_AREXClientError(faultstring);
00253   }
00254   
00255   std::string Compiler_AREXClient::stat(const std::string& jobid)
00256     throw(Compiler_AREXClientError)
00257   {
00258     std::string state, substate, faultstring;
00259     logger.msg(Arc::INFO, "Creating and sending a status request");
00260     
00261     Arc::PayloadSOAP req(arex_ns);
00262     Arc::XMLNode jobref =
00263       req.NewChild("bes-factory:GetActivityStatuses").
00264       NewChild(Arc::XMLNode(jobid));
00265     
00266     // Send status request
00267     Arc::PayloadSOAP* resp = NULL;
00268 
00269     if(client) {
00270       Arc::MCC_Status status = client->process(
00271           "http://schemas.ggf.org/bes/2006/08/bes-factory/BESFactoryPortType/GetActivityStatuses",
00272           &req,&resp);
00273       if(resp == NULL) {
00274         logger.msg(Arc::ERROR,"There was no SOAP response");
00275         throw Compiler_AREXClientError("There was no SOAP response");
00276       }
00277     } else if(client_entry) {
00278       Arc::Message reqmsg;
00279       Arc::Message repmsg;
00280       Arc::MessageAttributes attributes_req;
00281       attributes_req.set("SOAP:ACTION","http://schemas.ggf.org/bes/2006/08/bes-factory/BESFactoryPortType/GetActivityStatuses");
00282       Arc::MessageAttributes attributes_rep;
00283       Arc::MessageContext context;
00284       reqmsg.Payload(&req);
00285       reqmsg.Attributes(&attributes_req);
00286       reqmsg.Context(&context);
00287       repmsg.Attributes(&attributes_rep);
00288       repmsg.Context(&context);
00289       Arc::MCC_Status status = client_entry->process(reqmsg,repmsg);
00290       if(!status) {
00291         logger.msg(Arc::ERROR, "Status request failed");
00292         throw Compiler_AREXClientError("Status request failed");
00293       }
00294       logger.msg(Arc::INFO, "Status request succeed");
00295       if(repmsg.Payload() == NULL) {
00296         logger.msg(Arc::ERROR, "There was no response to a status request");
00297         throw Compiler_AREXClientError("There was no response");
00298       }
00299       try {
00300         resp = dynamic_cast<Arc::PayloadSOAP*>(repmsg.Payload());
00301       } catch(std::exception&) { };
00302       if(resp == NULL) {
00303         logger.msg(Arc::ERROR,
00304                "The response to a status request was not a SOAP message");
00305         delete repmsg.Payload();
00306         throw Compiler_AREXClientError("The response is not a SOAP message");
00307       }
00308     } else {
00309       throw Compiler_AREXClientError("There is no connection chain configured");
00310     };
00311     Arc::XMLNode st, fs;
00312     (*resp)["GetActivityStatusesResponse"]["Response"]
00313            ["ActivityStatus"].New(st);
00314     state = (std::string)st.Attribute("state");
00315     Arc::XMLNode sst;
00316     (*resp)["GetActivityStatusesResponse"]["Response"]
00317            ["ActivityStatus"]["state"].New(sst);
00318     substate = (std::string)sst;
00319     (*resp)["Fault"]["faultstring"].New(fs);
00320     faultstring=(std::string)fs;
00321     delete resp;
00322     if (faultstring!="")
00323       throw Compiler_AREXClientError(faultstring);
00324     else if (state=="")
00325       throw Compiler_AREXClientError("The job status could not be retrieved");
00326     else
00327       return state+"/"+substate;
00328   }
00329   
00330   std::string Compiler_AREXClient::sstat(void)
00331     throw(Compiler_AREXClientError)
00332   {
00333     std::string state, faultstring;
00334     logger.msg(Arc::INFO, "Creating and sending a service status request");
00335     
00336     Arc::PayloadSOAP req(arex_ns);
00337     Arc::XMLNode jobref =
00338       req.NewChild("bes-factory:GetFactoryAttributesDocument");
00339     
00340     // Send status request
00341     Arc::PayloadSOAP* resp = NULL;
00342     if(client) {
00343       Arc::MCC_Status status = client->process(
00344          "http://schemas.ggf.org/bes/2006/08/bes-factory/BESFactoryPortType/GetFactoryAttributesDocument",
00345          &req,&resp);
00346       if(resp == NULL) {
00347         logger.msg(Arc::ERROR,"There was no SOAP response");
00348         throw Compiler_AREXClientError("There was no SOAP response");
00349       }
00350     } else if(client_entry) {
00351       Arc::Message reqmsg;
00352       Arc::Message repmsg;
00353       Arc::MessageAttributes attributes_req;
00354       attributes_req.set("SOAP:ACTION","http://schemas.ggf.org/bes/2006/08/bes-factory/BESFactoryPortType/GetFactoryAttributesDocument");
00355       Arc::MessageAttributes attributes_rep;
00356       Arc::MessageContext context;
00357       reqmsg.Payload(&req);
00358       reqmsg.Attributes(&attributes_req);
00359       reqmsg.Context(&context);
00360       repmsg.Attributes(&attributes_rep);
00361       repmsg.Context(&context);
00362       Arc::MCC_Status status = client_entry->process(reqmsg,repmsg);
00363       if(!status) {
00364         logger.msg(Arc::ERROR, "Service status request failed");
00365         throw Compiler_AREXClientError("Service status request failed");
00366       }
00367       logger.msg(Arc::INFO, "Service status request succeed");
00368       if(repmsg.Payload() == NULL) {
00369         logger.msg(Arc::ERROR, "There was no response to a service status request");
00370         throw Compiler_AREXClientError("There was no response");
00371       }
00372       try {
00373         resp = dynamic_cast<Arc::PayloadSOAP*>(repmsg.Payload());
00374       } catch(std::exception&) { };
00375       if(resp == NULL) {
00376         logger.msg(Arc::ERROR,
00377                "The response of a service status request was not a SOAP message");
00378         delete repmsg.Payload();
00379         throw Compiler_AREXClientError("The response is not a SOAP message");
00380       }
00381     } else {
00382       throw Compiler_AREXClientError("There is no connection chain configured");
00383     };
00384     Arc::XMLNode st;
00385     (*resp)["GetFactoryAttributesDocumentResponse"]
00386            ["FactoryResourceAttributesDocument"].New(st);
00387     st.GetDoc(state);
00388     delete resp;
00389     if (state=="")
00390       throw Compiler_AREXClientError("The service status could not be retrieved");
00391     else
00392       return state;
00393   }
00394 
00395   void Compiler_AREXClient::kill(const std::string& jobid)
00396     throw(Compiler_AREXClientError)
00397   {
00398     std::string result, faultstring;
00399     logger.msg(Arc::INFO, "Creating and sending request to terminate a job");
00400     
00401     Arc::PayloadSOAP req(arex_ns);
00402     Arc::XMLNode jobref =
00403       req.NewChild("bes-factory:TerminateActivities").
00404       NewChild(Arc::XMLNode(jobid));
00405     
00406     // Send kill request
00407     Arc::PayloadSOAP* resp = NULL;
00408     if(client) {
00409       Arc::MCC_Status status = client->process(
00410          "http://schemas.ggf.org/bes/2006/08/bes-factory/BESFactoryPortType/TerminateActivities",
00411          &req,&resp);
00412       if(resp == NULL) {
00413         logger.msg(Arc::ERROR,"There was no SOAP response");
00414         throw Compiler_AREXClientError("There was no SOAP response");
00415       }
00416     } else if(client_entry) {
00417       Arc::Message reqmsg;
00418       Arc::Message repmsg;
00419       Arc::MessageAttributes attributes_req;
00420       attributes_req.set("SOAP:ACTION","http://schemas.ggf.org/bes/2006/08/bes-factory/BESFactoryPortType/TerminateActivities");
00421       Arc::MessageAttributes attributes_rep;
00422       Arc::MessageContext context;
00423       reqmsg.Payload(&req);
00424       reqmsg.Attributes(&attributes_req);
00425       reqmsg.Context(&context);
00426       repmsg.Attributes(&attributes_rep);
00427       repmsg.Context(&context);
00428       Arc::MCC_Status status = client_entry->process(reqmsg,repmsg);
00429       if(!status) {
00430         logger.msg(Arc::ERROR, "Job termination request failed");
00431         throw Compiler_AREXClientError("Job termination request failed");
00432       }
00433       logger.msg(Arc::INFO, "Job termination request succeed");
00434       if(repmsg.Payload() == NULL) {
00435         logger.msg(Arc::ERROR,
00436                "There was no response to a job termination request");
00437         throw Compiler_AREXClientError
00438        ("There was no response to the job termination request");
00439       }
00440       try {
00441         resp = dynamic_cast<Arc::PayloadSOAP*>(repmsg.Payload());
00442       } catch(std::exception&) { };
00443       if(resp == NULL) {
00444         logger.msg(Arc::ERROR,
00445        "The response of a job termination request was not a SOAP message");
00446         delete repmsg.Payload();
00447         throw Compiler_AREXClientError("The response is not a SOAP message");
00448       }
00449     } else {
00450       throw Compiler_AREXClientError("There is no connection chain configured");
00451     };
00452 
00453     Arc::XMLNode cancelled, fs;
00454     (*resp)["TerminateActivitiesResponse"]
00455            ["Response"]["Cancelled"].New(cancelled);
00456     result = (std::string)cancelled;
00457     (*resp)["Fault"]["faultstring"].New(fs);
00458     faultstring=(std::string)fs;
00459     delete resp;
00460     if (faultstring!="")
00461       throw Compiler_AREXClientError(faultstring);
00462     if (result!="true")
00463       throw Compiler_AREXClientError("Job termination failed");
00464   }
00465   
00466   void Compiler_AREXClient::clean(const std::string& jobid)
00467     throw(Compiler_AREXClientError)
00468   {
00469     std::string result, faultstring;
00470     logger.msg(Arc::INFO, "Creating and sending request to terminate a job");
00471     
00472     Arc::PayloadSOAP req(arex_ns);
00473     Arc::XMLNode op = req.NewChild("a-rex:ChangeActivityStatus");
00474     Arc::XMLNode jobref = op.NewChild(Arc::XMLNode(jobid));
00475     Arc::XMLNode jobstate = op.NewChild("a-rex:NewStatus");
00476     jobstate.NewAttribute("bes-factory:state")="Finished";
00477     jobstate.NewChild("a-rex:state")="Deleted";
00478     // Send clean request
00479     Arc::PayloadSOAP* resp = NULL;
00480     if(client) {
00481       Arc::MCC_Status status = client->process("",&req,&resp);
00482       if(resp == NULL) {
00483         logger.msg(Arc::ERROR,"There was no SOAP response");
00484         throw Compiler_AREXClientError("There was no SOAP response");
00485       }
00486     } else if(client_entry) {
00487       Arc::Message reqmsg;
00488       Arc::Message repmsg;
00489       Arc::MessageAttributes attributes_req;
00490       Arc::MessageAttributes attributes_rep;
00491       Arc::MessageContext context;
00492       reqmsg.Payload(&req);
00493       reqmsg.Attributes(&attributes_req);
00494       reqmsg.Context(&context);
00495       repmsg.Attributes(&attributes_rep);
00496       repmsg.Context(&context);
00497       Arc::MCC_Status status = client_entry->process(reqmsg,repmsg);
00498       if(!status) {
00499         logger.msg(Arc::ERROR, "Job cleaning request failed");
00500         throw Compiler_AREXClientError("Job cleaning request failed");
00501       }
00502       logger.msg(Arc::INFO, "Job cleaning request succeed");
00503       if(repmsg.Payload() == NULL) {
00504         logger.msg(Arc::ERROR,
00505                "There was no response to a job cleaning request");
00506         throw Compiler_AREXClientError
00507        ("There was no response to the job cleaning request");
00508       }
00509       try {
00510         resp = dynamic_cast<Arc::PayloadSOAP*>(repmsg.Payload());
00511       } catch(std::exception&) { };
00512       if(resp == NULL) {
00513         logger.msg(Arc::ERROR,
00514         "The response of a job cleaning request was not a SOAP message");
00515         delete repmsg.Payload();
00516         throw Compiler_AREXClientError("The response is not a SOAP message");
00517       }
00518     } else {
00519       throw Compiler_AREXClientError("There is no connection chain configured");
00520     };
00521 
00522     if(!((*resp)["ChangeActivityStatusResponse"])) {
00523       delete resp;
00524       Arc::XMLNode fs;
00525       (*resp)["Fault"]["faultstring"].New(fs);
00526       faultstring=(std::string)fs;
00527       if (faultstring!="")
00528         throw Compiler_AREXClientError(faultstring);
00529       if (result!="true")
00530         throw Compiler_AREXClientError("Job termination failed");
00531     };
00532     delete resp;
00533   }
00534 
00535 }