Back to index

nordugrid-arc-nox  1.1.0~rc6
UNICOREClient.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #include <arc/UserConfig.h>
00008 #include <arc/client/ClientInterface.h>
00009 #include <arc/delegation/DelegationInterface.h>
00010 #include <arc/loader/Loader.h>
00011 #include <arc/ws-addressing/WSA.h>
00012 
00013 #include "UNICOREClient.h"
00014 
00015 namespace Arc {
00016 
00017   static const std::string BES_FACTORY_ACTIONS_BASE_URL("http://schemas.ggf.org/bes/2006/08/bes-factory/BESFactoryPortType/");
00018   static const std::string BES_MANAGEMENT_ACTIONS_BASE_URL("http://schemas.ggf.org/bes/2006/08/bes-management/BESManagementPortType/");
00019 
00020   static XMLNode find_xml_node(XMLNode node,
00021                                const std::string& el_name,
00022                                const std::string& attr_name,
00023                                const std::string& attr_value) {
00024     if (MatchXMLName(node, el_name) &&
00025         (((std::string)node.Attribute(attr_name)) == attr_value))
00026       return node;
00027     XMLNode cn = node[el_name];
00028     while (cn) {
00029       XMLNode fn = find_xml_node(cn, el_name, attr_name, attr_value);
00030       if (fn)
00031         return fn;
00032       cn = cn[1];
00033     }
00034     return XMLNode();
00035   }
00036 
00037   Logger UNICOREClient::logger(Logger::rootLogger, "UNICORE-Client");
00038 
00039   static void set_UNICORE_namespaces(NS& ns) {
00040     ns["bes-factory"] = "http://schemas.ggf.org/bes/2006/08/bes-factory";
00041     ns["wsa"] = "http://www.w3.org/2005/08/addressing";
00042     ns["jsdl"] = "http://schemas.ggf.org/jsdl/2005/11/jsdl";
00043     ns["jsdl-posix"] = "http://schemas.ggf.org/jsdl/2005/11/jsdl-posix";
00044     ns["jsdl-hpcpa"] = "http://schemas.ggf.org/jsdl/2006/07/jsdl-hpcpa";
00045     ns["ns0"] = "urn:oasis:names:tc:SAML:2.0:assertion";
00046     ns["rp"] = "http://docs.oasis-open.org/wsrf/rp-2";
00047     ns["u6"] = "http://www.unicore.eu/unicore6";
00048     ns["jms"] = "http://unigrids.org/2006/04/services/jms";
00049   }
00050 
00051   static void set_bes_factory_action(SOAPEnvelope& soap, const char *op) {
00052     WSAHeader(soap).Action(BES_FACTORY_ACTIONS_BASE_URL + op);
00053   }
00054 
00055   // static void set_bes_management_action(SOAPEnvelope& soap,const char* op) {
00056   //   WSAHeader(soap).Action(BES_MANAGEMENT_ACTIONS_BASE_URL+op);
00057   // }
00058 
00059   UNICOREClient::UNICOREClient(const URL& url,
00060                                const MCCConfig& cfg,
00061                                int timeout)
00062     : client_config(cfg),
00063       client_loader(NULL),
00064       client(NULL),
00065       client_entry(NULL) {
00066 
00067     logger.msg(INFO, "Creating a UNICORE client");
00068     MCCConfig client_cfg(cfg);
00069     proxyPath = cfg.proxy;
00070     if (false) { //future test if proxy should be used or not
00071       client_cfg.AddProxy("");
00072     }
00073     client = new ClientSOAP(client_cfg, url, timeout);
00074     rurl = url;
00075     set_UNICORE_namespaces(unicore_ns);
00076   }
00077 
00078   UNICOREClient::~UNICOREClient() {
00079     if (client_loader)
00080       delete client_loader;
00081     if (client)
00082       delete client;
00083   }
00084 
00085   bool UNICOREClient::submit(const JobDescription& jobdesc, XMLNode& id,
00086                              bool delegate) {
00087 
00088     std::string faultstring;
00089 
00090     logger.msg(INFO, "Creating and sending request");
00091 
00092     // Create job request
00093     /*
00094        bes-factory:CreateActivity
00095          bes-factory:ActivityDocument
00096            jsdl:JobDefinition
00097      */
00098     PayloadSOAP req(unicore_ns);
00099     XMLNode op = req.NewChild("bes-factory:CreateActivity");
00100     XMLNode act_doc = op.NewChild("bes-factory:ActivityDocument");
00101     set_bes_factory_action(req, "CreateActivity");
00102     WSAHeader(req).To(rurl.str());
00103     //XMLNode proxyHeader = req.Header().NewChild("u6:Proxy");
00104     if (true) {
00105       std::string pem_str;
00106       std::ifstream proxy_file(proxyPath.c_str()/*, ifstream::in*/);
00107       std::getline<char>(proxy_file, pem_str, 0);
00108       req.Header().NewChild("u6:Proxy") = pem_str;
00109       //std::cout << "\n----\n" << "pem_str = " << pem_str << "\n----\n"; //debug code, remove!
00110     }
00111     //std::string jsdl_str;
00112     //std::getline<char>(jsdl_file, jsdl_str, 0);
00113     std::string jsdl_str = jobdesc.UnParse("ARCJSDL");
00114     XMLNode jsdl_doc = act_doc.NewChild(XMLNode(jsdl_str));
00115     //std::cout << "\n----\n" << jsdl_str << "\n----\n"; //Debug line to verify the activity document
00116     jsdl_doc.Namespaces(unicore_ns); // Unify namespaces
00117     PayloadSOAP *resp = NULL;
00118 
00119     XMLNode ds =
00120       act_doc["jsdl:JobDefinition"]["jsdl:JobDescription"]["jsdl:DataStaging"];
00121     for (; (bool)ds; ds = ds[1]) {
00122       // FilesystemName - ignore
00123       // CreationFlag - ignore
00124       // DeleteOnTermination - ignore
00125       XMLNode source = ds["jsdl:Source"];
00126       XMLNode target = ds["jsdl:Target"];
00127       if ((bool)source) {
00128         std::string s_name = ds["jsdl:FileName"];
00129         if (!s_name.empty()) {
00130           XMLNode x_url = source["jsdl:URI"];
00131           std::string s_url = x_url;
00132           if (s_url.empty())
00133             s_url = "./" + s_name;
00134           else {
00135             URL u_url(s_url);
00136             if (!u_url) {
00137               if (s_url[0] != '/')
00138                 s_url = "./" + s_url;
00139             }
00140             else {
00141               if (u_url.Protocol() == "file") {
00142                 s_url = u_url.Path();
00143                 if (s_url[0] != '/')
00144                   s_url = "./" + s_url;
00145               }
00146               else
00147                 s_url.resize(0);
00148             }
00149           }
00150           if (!s_url.empty())
00151             x_url.Destroy();
00152         }
00153       }
00154     }
00155     act_doc.GetXML(jsdl_str);
00156     logger.msg(DEBUG, "Job description to be sent: %s", jsdl_str);
00157 
00158     // Try to figure out which credentials are used
00159     // TODO: Method used is unstable beacuse it assumes some predefined
00160     // structure of configuration file. Maybe there should be some
00161     // special methods of ClientTCP class introduced.
00162     std::string deleg_cert;
00163     std::string deleg_key;
00164     if (delegate) {
00165       client->Load(); // Make sure chain is ready
00166       XMLNode tls_cfg = find_xml_node((client->GetConfig())["Chain"],
00167                                       "Component", "name", "tls.client");
00168       if (tls_cfg) {
00169         deleg_cert = (std::string)(tls_cfg["ProxyPath"]);
00170         if (deleg_cert.empty()) {
00171           deleg_cert = (std::string)(tls_cfg["CertificatePath"]);
00172           deleg_key = (std::string)(tls_cfg["KeyPath"]);
00173         }
00174         else
00175           deleg_key = deleg_cert;
00176       }
00177       if (deleg_cert.empty() || deleg_key.empty()) {
00178         logger.msg(ERROR, "Failed to find delegation credentials in "
00179                    "client configuration");
00180         return false;
00181       }
00182     }
00183     // Send job request + delegation
00184     if (client) {
00185       if (delegate) {
00186         DelegationProviderSOAP deleg(deleg_cert, deleg_key);
00187         logger.msg(INFO, "Initiating delegation procedure");
00188         if (!deleg.DelegateCredentialsInit(*(client->GetEntry()),
00189                                            &(client->GetContext()))) {
00190           logger.msg(ERROR, "Failed to initiate delegation");
00191           return false;
00192         }
00193         deleg.DelegatedToken(op);
00194       }
00195       MCC_Status status =
00196         client->process("http://schemas.ggf.org/bes/2006/08/bes-factory/"
00197                         "BESFactoryPortType/CreateActivity", &req, &resp);
00198       if (!status) {
00199         logger.msg(ERROR, "Submission request failed");
00200         return false;
00201       }
00202       if (resp == NULL) {
00203         logger.msg(ERROR, "There was no SOAP response");
00204         return false;
00205       }
00206     }
00207     else if (client_entry) {
00208       Message reqmsg;
00209       Message repmsg;
00210       MessageAttributes attributes_req;
00211       attributes_req.set("SOAP:ACTION", "http://schemas.ggf.org/bes/2006/08/"
00212                          "bes-factory/BESFactoryPortType/CreateActivity");
00213       MessageAttributes attributes_rep;
00214       MessageContext context;
00215 
00216       if (delegate) {
00217         DelegationProviderSOAP deleg(deleg_cert, deleg_key);
00218         logger.msg(INFO, "Initiating delegation procedure");
00219         if (!deleg.DelegateCredentialsInit(*client_entry, &context)) {
00220           logger.msg(ERROR, "Failed to initiate delegation");
00221           return false;
00222         }
00223         deleg.DelegatedToken(op);
00224       }
00225       reqmsg.Payload(&req);
00226       reqmsg.Attributes(&attributes_req);
00227       reqmsg.Context(&context);
00228       repmsg.Attributes(&attributes_rep);
00229       repmsg.Context(&context);
00230       MCC_Status status = client_entry->process(reqmsg, repmsg);
00231       if (!status) {
00232         logger.msg(ERROR, "Submission request failed");
00233         return false;
00234       }
00235       logger.msg(INFO, "Submission request succeed");
00236       if (repmsg.Payload() == NULL) {
00237         logger.msg(ERROR, "There was no response to a submission request");
00238         return false;
00239       }
00240       try {
00241         resp = dynamic_cast<PayloadSOAP*>(repmsg.Payload());
00242       } catch (std::exception&) {}
00243       if (resp == NULL) {
00244         logger.msg(ERROR, "A response to a submission request was not "
00245                    "a SOAP message");
00246         delete repmsg.Payload();
00247         return false;
00248       }
00249     }
00250     else {
00251       logger.msg(ERROR, "There is no connection chain configured");
00252       return false;
00253     }
00254     //XMLNode id;
00255     SOAPFault fs(*resp);
00256     if (!fs) {
00257       (*resp)["CreateActivityResponse"]["ActivityIdentifier"].New(id);
00258       //id.GetDoc(jobid);
00259       //std::cout << "\n---\nActivityIdentifier:\n" << (std::string)((*resp)["CreateActivityResponse"]["ActivityIdentifier"]) << "\n---\n";//debug code
00260       delete resp;
00261 
00262       UNICOREClient luc((std::string)id["Address"], client_config); //local unicore client
00263       //std::cout << "\n---\nid element containing (?) Job Address:\n" << (std::string)id << "\n---\n";//debug code
00264       return luc.uasStartJob();
00265       //return true;
00266     }
00267     else {
00268       faultstring = fs.Reason();
00269       std::string s;
00270       resp->GetXML(s);
00271       delete resp;
00272       logger.msg(DEBUG, "Submission returned failure: %s", s);
00273       logger.msg(ERROR, "Submission failed, service returned: %s", faultstring);
00274       return false;
00275     }
00276   }
00277 
00278   bool UNICOREClient::uasStartJob(){
00279         std::string state, faultstring;
00280     logger.msg(INFO, "Creating and sending a start job request");
00281 
00282     PayloadSOAP req(unicore_ns);
00283     XMLNode SOAPMethod =
00284       req.NewChild("jms:Start");
00285     WSAHeader(req).To(rurl.str());
00286     WSAHeader(req).Action("http://schemas.ggf.org/bes/2006/08/bes-activity/BESActivityPortType/StartRequest");
00287 
00288     // Send status request
00289     PayloadSOAP *resp = NULL;
00290     if (client) {
00291       MCC_Status status =
00292         client->process("http://schemas.ggf.org/bes/2006/08/bes-activity/BESActivityPortType/StartRequest",
00293                         &req, &resp);
00294       if (resp == NULL) {
00295         logger.msg(ERROR, "There was no SOAP response");
00296         return false;
00297       }
00298     }
00299     else if (client_entry) {
00300       Message reqmsg;
00301       Message repmsg;
00302       MessageAttributes attributes_req;
00303       attributes_req.set("SOAP:ACTION", "http://schemas.ggf.org/bes/2006/08/bes-activity/BESActivityPortType/StartRequest");
00304       MessageAttributes attributes_rep;
00305       MessageContext context;
00306       reqmsg.Payload(&req);
00307       reqmsg.Attributes(&attributes_req);
00308       reqmsg.Context(&context);
00309       repmsg.Attributes(&attributes_rep);
00310       repmsg.Context(&context);
00311       MCC_Status status = client_entry->process(reqmsg, repmsg);
00312       if (!status) {
00313         logger.msg(ERROR, "A start job request failed");
00314         return false;
00315       }
00316       logger.msg(INFO, "A start job request succeeded");
00317       if (repmsg.Payload() == NULL) {
00318         logger.msg(ERROR,
00319                    "There was no response to a start job request");
00320         return false;
00321       }
00322       try {
00323         resp = dynamic_cast<PayloadSOAP*>(repmsg.Payload());
00324       } catch (std::exception&) {}
00325       if (resp == NULL) {
00326         logger.msg(ERROR, "The response of a start job request was "
00327                    "not a SOAP message");
00328         delete repmsg.Payload();
00329         return false;
00330       }
00331     }
00332     else {
00333       logger.msg(ERROR, "There is no connection chain configured");
00334       return false;
00335     }
00336     SOAPFault fs(*resp);
00337     if (!fs) {
00338       return true;
00339     }
00340     else {
00341       faultstring = fs.Reason();
00342       std::string s;
00343       resp->GetXML(s);
00344       delete resp;
00345       logger.msg(DEBUG, "Submission returned failure: %s", s);
00346       logger.msg(ERROR, "Submission failed, service returned: %s", faultstring);
00347       return false;
00348     }
00349 
00350 
00351   }
00352 
00353   bool UNICOREClient::stat(const std::string& jobid, std::string& status) {
00354 
00355     std::string state, substate, faultstring;
00356     logger.msg(INFO, "Creating and sending a status request");
00357 
00358     PayloadSOAP req(unicore_ns);
00359     XMLNode jobref =
00360       req.NewChild("bes-factory:GetActivityStatuses").
00361       NewChild(XMLNode(jobid));
00362     set_bes_factory_action(req, "GetActivityStatuses");
00363     WSAHeader(req).To(rurl.str());
00364 
00365     // Send status request
00366     PayloadSOAP *resp = NULL;
00367 
00368     if (client) {
00369       MCC_Status status =
00370         client->process("http://schemas.ggf.org/bes/2006/08/bes-factory/"
00371                         "BESFactoryPortType/GetActivityStatuses", &req, &resp);
00372       if (resp == NULL) {
00373         logger.msg(ERROR, "There was no SOAP response");
00374         return false;
00375       }
00376     }
00377     else if (client_entry) {
00378       Message reqmsg;
00379       Message repmsg;
00380       MessageAttributes attributes_req;
00381       attributes_req.set("SOAP:ACTION", "http://schemas.ggf.org/bes/2006/08/"
00382                          "bes-factory/BESFactoryPortType/GetActivityStatuses");
00383       MessageAttributes attributes_rep;
00384       MessageContext context;
00385       reqmsg.Payload(&req);
00386       reqmsg.Attributes(&attributes_req);
00387       reqmsg.Context(&context);
00388       repmsg.Attributes(&attributes_rep);
00389       repmsg.Context(&context);
00390       MCC_Status status = client_entry->process(reqmsg, repmsg);
00391       if (!status) {
00392         logger.msg(ERROR, "A status request failed");
00393         return false;
00394       }
00395       logger.msg(INFO, "A status request succeed");
00396       if (repmsg.Payload() == NULL) {
00397         logger.msg(ERROR, "There was no response to a status request");
00398         return false;
00399       }
00400       try {
00401         resp = dynamic_cast<PayloadSOAP*>(repmsg.Payload());
00402       } catch (std::exception&) {}
00403       if (resp == NULL) {
00404         logger.msg(ERROR,
00405                    "The response of a status request was not a SOAP message");
00406         delete repmsg.Payload();
00407         return false;
00408       }
00409     }
00410     else {
00411       logger.msg(ERROR, "There is no connection chain configured");
00412       return false;
00413     }
00414     XMLNode st, fs;
00415     (*resp)["GetActivityStatusesResponse"]["Response"]
00416     ["ActivityStatus"].New(st);
00417     state = (std::string)st.Attribute("state");
00418     XMLNode sst;
00419     (*resp)["GetActivityStatusesResponse"]["Response"]
00420     ["ActivityStatus"]["state"].New(sst);
00421     substate = (std::string)sst;
00422     (*resp)["Fault"]["faultstring"].New(fs);
00423     faultstring = (std::string)fs;
00424     delete resp;
00425     if (faultstring != "") {
00426       logger.msg(ERROR, faultstring);
00427       return false;
00428     }
00429     else if (state == "") {
00430       logger.msg(ERROR, "The job status could not be retrieved");
00431       return false;
00432     }
00433     else {
00434       status = state + "/" + substate;
00435       return true;
00436     }
00437   }
00438 
00439   bool UNICOREClient::listTargetSystemFactories(std::list< std::pair<URL, ServiceType> >& tsf) {
00440 
00441     logger.msg(INFO, "Creating and sending an index service query");
00442     PayloadSOAP req(unicore_ns);
00443     XMLNode query = req.NewChild("rp:QueryResourceProperties");
00444     XMLNode exp = query.NewChild("rp:QueryExpression");
00445     exp.NewAttribute("Dialect") = "http://www.w3.org/TR/1999/REC-xpath-19991116";
00446     exp = "//*";
00447     PayloadSOAP *resp = NULL;
00448     client->process("http://docs.oasis-open.org/wsrf/rpw-2"
00449                     "/QueryResourceProperties/QueryResourcePropertiesRequest",
00450                     &req, &resp);
00451     if (resp == NULL) {
00452       logger.msg(ERROR, "There was no SOAP response");
00453       return false;
00454     }
00455 
00456     XMLNodeList memberServices = resp->Body().Path("QueryResourcePropertiesResponse/Entry/MemberServiceEPR");
00457     for (XMLNodeList::iterator it = memberServices.begin(); it != memberServices.end(); it++) {
00458       if (((std::string)(*it)["Metadata"]["InterfaceName"]).find("BESFactoryPortType") != std::string::npos) { // it.Metadata.InterfaceName should contain 'BESFactoryPortType'...
00459         tsf.push_back(std::pair<URL, ServiceType>(URL((std::string)(*it)["Address"]), COMPUTING));
00460       }
00461     }
00462 
00463     return true;
00464   }
00465 
00466   bool UNICOREClient::sstat(std::string& status) {
00467 
00468     std::string state, faultstring;
00469     logger.msg(INFO, "Creating and sending a service status request");
00470 
00471     PayloadSOAP req(unicore_ns);
00472     XMLNode jobref =
00473       req.NewChild("bes-factory:GetFactoryAttributesDocument");
00474     set_bes_factory_action(req, "GetFactoryAttributesDocument");
00475     WSAHeader(req).To(rurl.str());
00476 
00477     // Send status request
00478     PayloadSOAP *resp = NULL;
00479     if (client) {
00480       MCC_Status status =
00481         client->process("http://schemas.ggf.org/bes/2006/08/bes-factory/"
00482                         "BESFactoryPortType/GetFactoryAttributesDocument",
00483                         &req, &resp);
00484       if (resp == NULL) {
00485         logger.msg(ERROR, "There was no SOAP response");
00486         return false;
00487       }
00488     }
00489     else if (client_entry) {
00490       Message reqmsg;
00491       Message repmsg;
00492       MessageAttributes attributes_req;
00493       attributes_req.set("SOAP:ACTION", "http://schemas.ggf.org/bes/2006/08/"
00494                          "bes-factory/BESFactoryPortType/"
00495                          "GetFactoryAttributesDocument");
00496       MessageAttributes attributes_rep;
00497       MessageContext context;
00498       reqmsg.Payload(&req);
00499       reqmsg.Attributes(&attributes_req);
00500       reqmsg.Context(&context);
00501       repmsg.Attributes(&attributes_rep);
00502       repmsg.Context(&context);
00503       MCC_Status status = client_entry->process(reqmsg, repmsg);
00504       if (!status) {
00505         logger.msg(ERROR, "A service status request failed");
00506         return false;
00507       }
00508       logger.msg(INFO, "A service status request succeeded");
00509       if (repmsg.Payload() == NULL) {
00510         logger.msg(ERROR,
00511                    "There was no response to a service status request");
00512         return false;
00513       }
00514       try {
00515         resp = dynamic_cast<PayloadSOAP*>(repmsg.Payload());
00516       } catch (std::exception&) {}
00517       if (resp == NULL) {
00518         logger.msg(ERROR, "The response of a service status request was "
00519                    "not a SOAP message");
00520         delete repmsg.Payload();
00521         return false;
00522       }
00523     }
00524     else {
00525       logger.msg(ERROR, "There is no connection chain configured");
00526       return false;
00527     }
00528     XMLNode st;
00529     logger.msg(DEBUG, "Response:\n%s", (std::string)(*resp));
00530     (*resp)["GetFactoryAttributesDocumentResponse"]
00531     ["FactoryResourceAttributesDocument"].New(st);
00532     st.GetDoc(state, true);
00533     delete resp;
00534     if (state == "") {
00535       logger.msg(ERROR, "The service status could not be retrieved");
00536       return false;
00537     }
00538     else {
00539       status = state;
00540       return true;
00541     }
00542   }
00543 
00544   bool UNICOREClient::kill(const std::string& jobid) {
00545 
00546     std::string result, faultstring;
00547     logger.msg(INFO, "Creating and sending request to terminate a job");
00548 
00549     PayloadSOAP req(unicore_ns);
00550     XMLNode jobref =
00551       req.NewChild("bes-factory:TerminateActivities").
00552       NewChild(XMLNode(jobid));
00553     set_bes_factory_action(req, "TerminateActivities");
00554     WSAHeader(req).To(rurl.str());
00555 
00556     // Send kill request
00557     PayloadSOAP *resp = NULL;
00558     if (client) {
00559       MCC_Status status =
00560         client->process("http://schemas.ggf.org/bes/2006/08/bes-factory/"
00561                         "BESFactoryPortType/TerminateActivities", &req, &resp);
00562       if (resp == NULL) {
00563         logger.msg(ERROR, "There was no SOAP response");
00564         return false;
00565       }
00566     }
00567     else if (client_entry) {
00568       Message reqmsg;
00569       Message repmsg;
00570       MessageAttributes attributes_req;
00571       attributes_req.set("SOAP:ACTION", "http://schemas.ggf.org/bes/2006/08/"
00572                          "bes-factory/BESFactoryPortType/TerminateActivities");
00573       MessageAttributes attributes_rep;
00574       MessageContext context;
00575       reqmsg.Payload(&req);
00576       reqmsg.Attributes(&attributes_req);
00577       reqmsg.Context(&context);
00578       repmsg.Attributes(&attributes_rep);
00579       repmsg.Context(&context);
00580       MCC_Status status = client_entry->process(reqmsg, repmsg);
00581       if (!status) {
00582         logger.msg(ERROR, "A job termination request failed");
00583         return false;
00584       }
00585       logger.msg(INFO, "A job termination request succeed");
00586       if (repmsg.Payload() == NULL) {
00587         logger.msg(ERROR,
00588                    "There was no response to a job termination request");
00589         return false;
00590       }
00591       try {
00592         resp = dynamic_cast<PayloadSOAP*>(repmsg.Payload());
00593       } catch (std::exception&) {}
00594       if (resp == NULL) {
00595         logger.msg(ERROR, "The response of a job termination request was "
00596                    "not a SOAP message");
00597         delete repmsg.Payload();
00598         return false;
00599       }
00600     }
00601     else {
00602       logger.msg(ERROR, "There is no connection chain configured");
00603       return false;
00604     }
00605 
00606     XMLNode cancelled, fs;
00607     (*resp)["TerminateActivitiesResponse"]
00608     ["Response"]["Cancelled"].New(cancelled);
00609     result = (std::string)cancelled;
00610     (*resp)["Fault"]["faultstring"].New(fs);
00611     faultstring = (std::string)fs;
00612     delete resp;
00613     if (faultstring != "") {
00614       logger.msg(ERROR, faultstring);
00615       return false;
00616     }
00617     if (result != "true") {
00618       logger.msg(ERROR, "Job termination failed");
00619       return false;
00620     }
00621     return true;
00622   }
00623 
00624   bool UNICOREClient::clean(const std::string& jobid) {
00625 
00626     std::string result, faultstring;
00627     logger.msg(INFO, "Creating and sending request to terminate a job");
00628 
00629     PayloadSOAP req(unicore_ns);
00630     XMLNode op = req.NewChild("a-rex:ChangeActivityStatus");
00631     XMLNode jobref = op.NewChild(XMLNode(jobid));
00632     XMLNode jobstate = op.NewChild("a-rex:NewStatus");
00633     jobstate.NewAttribute("bes-factory:state") = "Finished";
00634     jobstate.NewChild("a-rex:state") = "Deleted";
00635     // Send clean request
00636     PayloadSOAP *resp = NULL;
00637     if (client) {
00638       MCC_Status status = client->process("", &req, &resp);
00639       if (resp == NULL) {
00640         logger.msg(ERROR, "There was no SOAP response");
00641         return false;
00642       }
00643     }
00644     else if (client_entry) {
00645       Message reqmsg;
00646       Message repmsg;
00647       MessageAttributes attributes_req;
00648       MessageAttributes attributes_rep;
00649       MessageContext context;
00650       reqmsg.Payload(&req);
00651       reqmsg.Attributes(&attributes_req);
00652       reqmsg.Context(&context);
00653       repmsg.Attributes(&attributes_rep);
00654       repmsg.Context(&context);
00655       MCC_Status status = client_entry->process(reqmsg, repmsg);
00656       if (!status) {
00657         logger.msg(ERROR, "A job cleaning request failed");
00658         return false;
00659       }
00660       logger.msg(INFO, "A job cleaning request succeed");
00661       if (repmsg.Payload() == NULL) {
00662         logger.msg(ERROR,
00663                    "There was no response to a job cleaning request");
00664         return false;
00665       }
00666       try {
00667         resp = dynamic_cast<PayloadSOAP*>(repmsg.Payload());
00668       } catch (std::exception&) {}
00669       if (resp == NULL) {
00670         logger.msg(ERROR, "The response of a job cleaning request was not "
00671                    "a SOAP message");
00672         delete repmsg.Payload();
00673         return false;
00674       }
00675     }
00676     else {
00677       logger.msg(ERROR, "There is no connection chain configured");
00678       return false;
00679     }
00680 
00681     if (!((*resp)["ChangeActivityStatusResponse"])) {
00682       delete resp;
00683       XMLNode fs;
00684       (*resp)["Fault"]["faultstring"].New(fs);
00685       faultstring = (std::string)fs;
00686       if (faultstring != "") {
00687         logger.msg(ERROR, faultstring);
00688         return false;
00689       }
00690       if (result != "true") {
00691         logger.msg(ERROR, "Job termination failed");
00692         return false;
00693       }
00694     }
00695     delete resp;
00696     return true;
00697   }
00698 
00699 }