Back to index

nordugrid-arc-nox  1.1.0~rc6
CREAMClient.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #include <arc/DateTime.h>
00008 #include <arc/Logger.h>
00009 #include <arc/StringConv.h>
00010 #include <arc/URL.h>
00011 #include <arc/client/ClientInterface.h>
00012 #include <arc/client/JobDescription.h>
00013 #include <arc/credential/Credential.h>
00014 #include <arc/message/MCC.h>
00015 
00016 #include "JobStateCREAM.h"
00017 #include "CREAMClient.h"
00018 
00019 #ifdef ISVALID
00020 #undef ISVALID
00021 #endif
00022 #define ISVALID(NODE) NODE && (std::string)NODE != "N/A" && (std::string)NODE != "[reserved]"
00023 
00024 namespace Arc {
00025   Logger CREAMClient::logger(Logger::rootLogger, "CREAMClient");
00026 
00027   bool stringtoTime(const std::string& timestring, Time& time) {
00028     if (timestring == "" || timestring.length() < 15)
00029       return "";
00030 
00031     //The conversion for example:
00032     //before: 11/5/08 11:52 PM
00033     //after:  2008-11-05T23:52:00.000Z
00034 
00035     tm timestr;
00036     std::string::size_type pos = 0;
00037     if (sscanf(timestring.substr(pos, 8).c_str(),
00038                "%2d/%2d/%2d",
00039                &timestr.tm_mon,
00040                &timestr.tm_mday,
00041                &timestr.tm_year) == 3)
00042       pos += 8;
00043     else if (sscanf(timestring.substr(pos, 7).c_str(),
00044                     "%2d/%d/%2d",
00045                     &timestr.tm_mon,
00046                     &timestr.tm_mday,
00047                     &timestr.tm_year) == 3)
00048       pos += 7;
00049     else if (sscanf(timestring.substr(pos, 7).c_str(),
00050                     "%d/%2d/%2d",
00051                     &timestr.tm_mon,
00052                     &timestr.tm_mday,
00053                     &timestr.tm_year) == 3)
00054       pos += 7;
00055     else if (sscanf(timestring.substr(pos, 6).c_str(),
00056                     "%d/%d/%2d",
00057                     &timestr.tm_mon,
00058                     &timestr.tm_mday,
00059                     &timestr.tm_year) == 3)
00060       pos += 6;
00061     else
00062       return false;
00063 
00064     timestr.tm_year += 100;
00065     timestr.tm_mon--;
00066 
00067     if (timestring[pos] == 'T' || timestring[pos] == ' ')
00068       pos++;
00069 
00070     if (sscanf(timestring.substr(pos, 5).c_str(),
00071                "%2d:%2d",
00072                &timestr.tm_hour,
00073                &timestr.tm_min) == 2)
00074       pos += 5;
00075     else
00076       return false;
00077 
00078     // skip the space characters
00079     while (timestring[pos] == ' ')
00080       pos++;
00081 
00082     if (timestring.substr(pos, pos + 2) == "PM")
00083       timestr.tm_hour += 12;
00084 
00085     time.SetTime(mktime(&timestr));
00086     return true;
00087   }
00088 
00089   static void set_cream_namespaces(NS& ns) {
00090     ns["deleg"] = "http://www.gridsite.org/namespaces/delegation-2";
00091     ns["types"] = "http://glite.org/2007/11/ce/cream/types";
00092   }
00093 
00094   CREAMClient::CREAMClient(const URL& url, const MCCConfig& cfg, int timeout)
00095     : client(NULL),
00096       cafile(cfg.cafile),
00097       cadir(cfg.cadir) {
00098     logger.msg(INFO, "Creating a CREAM client");
00099     client = new ClientSOAP(cfg, url, timeout);
00100     if (!client)
00101       logger.msg(VERBOSE, "Unable to create SOAP client used by CREAMClient.");
00102     set_cream_namespaces(cream_ns);
00103   }
00104 
00105   CREAMClient::~CREAMClient() {
00106     if (client)
00107       delete client;
00108   }
00109 
00110   bool CREAMClient::process(PayloadSOAP& req, XMLNode& response) {
00111     if (!client) {
00112       logger.msg(VERBOSE, "CREAMClient not created properly");
00113       return false;
00114     }
00115 
00116     PayloadSOAP *resp = NULL;
00117     if (!client->process("http://glite.org/2007/11/ce/cream/" + action, &req, &resp)) {
00118       logger.msg(VERBOSE, "%s request failed", action);
00119       return false;
00120     }
00121 
00122     if (resp == NULL) {
00123       logger.msg(VERBOSE, "There was no SOAP response");
00124       return false;
00125     }
00126 
00127     if ((*resp)[action + "Response"]["result"])
00128       (*resp)[action + "Response"]["result"].New(response);
00129     else
00130       (*resp)[action + "Response"].New(response);
00131 
00132     delete resp;
00133 
00134     if (!response) {
00135       logger.msg(VERBOSE, "Empty response");
00136       return false;
00137     }
00138 
00139     XMLNode fault;
00140     if (response["JobUnknownFault"])
00141       fault = response["JobUnknownFault"];
00142     if (response["JobStatusInvalidFault"])
00143       fault = response["JobStatusInvalidFault"];
00144     if (response["DelegationIdMismatchFault"])
00145       fault = response["DelegationIdMismatchFault"];
00146     if (response["DateMismatchFault"])
00147       fault = response["DateMismatchFault"];
00148     if (response["LeaseIdMismatchFault"])
00149       fault = response["LeaseIdMismatchFault"];
00150     if (response["GenericFault"])
00151       response["GenericFault"];
00152 
00153     if (fault) {
00154       logger.msg(VERBOSE, "Request failed: %s", (std::string)(fault["Description"]));
00155       return false;
00156     }
00157 
00158     return true;
00159   }
00160 
00161   bool CREAMClient::stat(const std::string& jobid, Job& job) {
00162     logger.msg(VERBOSE, "Creating and sending a status request");
00163 
00164     action = "JobInfo";
00165 
00166     PayloadSOAP req(cream_ns);
00167     XMLNode jobStatusRequest = req.NewChild("types:" + action). // Should not be concatenated with "Request", as in the other requests.
00168                                    NewChild("types:jobId").
00169                                    NewChild("types:id") = jobid;
00170     if (!delegationId.empty())
00171       jobStatusRequest.NewChild("types:delegationProxyId") = delegationId;
00172 
00173     XMLNode response;
00174     if (!process(req, response))
00175       return false;
00176 
00177     XMLNode jobInfoNode;
00178     jobInfoNode = response["jobInfo"];
00179 
00180     XMLNode lastStatusNode = jobInfoNode.Path("status").back();
00181 
00182     if (lastStatusNode["name"])
00183       job.State = JobStateCREAM((std::string)lastStatusNode["name"]);
00184 
00185     if (!job.State) {
00186       logger.msg(VERBOSE, "Unable to retrieved job status.");
00187       return false;
00188     }
00189 
00190     if (ISVALID(jobInfoNode["jobId"]["id"]) && ISVALID(jobInfoNode["jobId"]["creamURL"]))
00191       job.JobID = URL((std::string)jobInfoNode["jobId"]["creamURL"] + "/" + (std::string)jobInfoNode["jobId"]["id"]);
00192     if (ISVALID(jobInfoNode["jobId"]["creamURL"])) {
00193       job.IDFromEndpoint = URL((std::string)jobInfoNode["jobId"]["creamURL"]);
00194       job.ExecutionCE = job.IDFromEndpoint.fullstr();
00195       job.JobManagementEndpoint = job.IDFromEndpoint.fullstr();
00196     }
00197     if (ISVALID(jobInfoNode["jobId"]["id"]))
00198       job.LocalIDFromManager = (std::string)jobInfoNode["jobId"]["id"];
00199     if (ISVALID(jobInfoNode["type"]))
00200       job.Type = (std::string)jobInfoNode["type"];
00201     if (ISVALID(jobInfoNode["JDL"])) {
00202       job.JobDescription = (std::string)jobInfoNode["JDL"];
00203 
00204       JobDescription jd;
00205       if (jd.Parse(job.JobDescription) && jd) {
00206         if (!jd.Application.Input.empty())
00207           job.StdIn = jd.Application.Input;
00208 
00209         if (!jd.Application.Output.empty())
00210           job.StdOut = jd.Application.Output;
00211 
00212         if (!jd.Application.Error.empty())
00213           job.StdErr = jd.Application.Error;
00214 
00215         if (!jd.Resources.CandidateTarget.empty())
00216           job.Queue = jd.Resources.CandidateTarget.front().QueueName;
00217       }
00218     }
00219     if (ISVALID(lastStatusNode["exitCode"]))
00220       job.ExitCode = stringtoi((std::string)lastStatusNode["exitCode"]);
00221     if (job.State() == "DONE-FAILED")
00222       job.Error.push_back((std::string)lastStatusNode["failureReason"]);
00223     if (ISVALID(jobInfoNode["delegationProxyInfo"])) {
00224       std::string delegationProxy = (std::string)jobInfoNode["delegationProxyInfo"];
00225       std::list<std::string> splited_proxy;
00226       tokenize(delegationProxy, splited_proxy, "\n");
00227       for (std::list<std::string>::iterator it = splited_proxy.begin();
00228            it != splited_proxy.end(); it++) {
00229         if (it->find("Holder Subject") < it->find(":"))
00230           job.Owner = it->substr(it->find_first_of(":") + 1);
00231         if (it->find("Valid To") < it->find(":")) {
00232           Time time;
00233           if (stringtoTime(it->substr(it->find_first_of(":") + 2), time) && time.GetTime() != -1)
00234             job.ProxyExpirationTime = time;
00235         }
00236       }
00237     }
00238     if (ISVALID(jobInfoNode["localUser"]))
00239       job.LocalOwner = (std::string)jobInfoNode["localUser"];
00240     if (ISVALID(jobInfoNode["lastCommand"])) {
00241       int job_register_id_first = -1;
00242       int job_register_id_last = -1;
00243       int job_start_id_first = -1;
00244       int job_start_id_last = -1;
00245       int local_id = 0;
00246       while (true) {
00247         if (!jobInfoNode["lastCommand"][local_id])
00248           break;
00249         if ((std::string)jobInfoNode["lastCommand"][local_id]["name"] == "JOB_REGISTER") {
00250           if (job_register_id_first == -1 && job_register_id_last == -1) {
00251             job_register_id_first = local_id;
00252             job_register_id_last = local_id;
00253           }
00254           else if (job_register_id_last > -1)
00255             job_register_id_last = local_id;
00256         }  //end of the JOB_REGISTER
00257 
00258         if ((std::string)jobInfoNode["lastCommand"][local_id]["name"] == "JOB_START") {
00259           if (job_start_id_first == -1 && job_start_id_last == -1) {
00260             job_start_id_first = local_id;
00261             job_start_id_last = local_id;
00262           }
00263           else if (job_start_id_last > -1)
00264             job_start_id_last = local_id;
00265         }  //end of the JOB_START
00266         local_id++;
00267       }
00268 
00269       //dependent on JOB_REGISTER
00270       if (job_register_id_first > -1)
00271         if (ISVALID(jobInfoNode["lastCommand"][job_register_id_first]["creationTime"])) {
00272           Time time((std::string)jobInfoNode["lastCommand"][job_register_id_first]["creationTime"]);
00273           if (time.GetTime() != -1)
00274             job.SubmissionTime = time;
00275         }
00276 
00277       if (job_register_id_last > -1)
00278         if (ISVALID(jobInfoNode["lastCommand"][job_register_id_last]["creationTime"])) {
00279           Time time((std::string)jobInfoNode["lastCommand"][job_register_id_last]["creationTime"]);
00280           if (time.GetTime() != -1)
00281             job.CreationTime = time;
00282         }
00283       //end of the JOB_REGISTER
00284 
00285       //dependent on JOB_START
00286       if (job_start_id_first > -1) {
00287         if (ISVALID(jobInfoNode["lastCommand"][job_start_id_first]["startSchedulingTime"])) {
00288           Time time((std::string)jobInfoNode["lastCommand"][job_start_id_first]["startSchedulingTime"]);
00289           if (time.GetTime() != -1)
00290             job.ComputingManagerSubmissionTime = time;
00291         }
00292 
00293         if (ISVALID(jobInfoNode["lastCommand"][job_start_id_first]["startProcessingTime"])) {
00294           Time time((std::string)jobInfoNode["lastCommand"][job_start_id_first]["startProcessingTime"]);
00295           if (time.GetTime() != -1)
00296             job.StartTime = time;
00297         }
00298       }
00299 
00300       if (job_start_id_last > -1)
00301         if (ISVALID(jobInfoNode["lastCommand"][job_start_id_last]["executionCompletedTime"])) {
00302           Time time((std::string)jobInfoNode["lastCommand"][job_start_id_last]["executionCompletedTime"]);
00303           if (time.GetTime() != -1)
00304             job.ComputingManagerEndTime = time;
00305         }
00306       //end of the JOB_START
00307     } //end of the LastCommand
00308     if (ISVALID(lastStatusNode["timestamp"]) && (job.State() == "DONE-OK" || job.State() == "DONE-FAILED")) {
00309       Time time((std::string)lastStatusNode["timestamp"]);
00310       if (time.GetTime() != -1)
00311         job.EndTime = time;
00312     }
00313 
00314     return true;
00315   }
00316 
00317   bool CREAMClient::cancel(const std::string& jobid) {
00318     logger.msg(VERBOSE, "Creating and sending request to terminate a job");
00319 
00320     action = "JobCancel";
00321 
00322     PayloadSOAP req(cream_ns);
00323     req.NewChild("types:J" + action + "Request").NewChild("types:jobId").NewChild("types:id") = jobid;
00324 
00325     XMLNode response;
00326     if (!process(req, response))
00327       return false;
00328 
00329     if (!response) {
00330       logger.msg(VERBOSE, "Empty response");
00331       return false;
00332     }
00333 
00334     return true;
00335   }
00336 
00337   bool CREAMClient::purge(const std::string& jobid) {
00338     logger.msg(VERBOSE, "Creating and sending request to clean a job");
00339 
00340     action = "JobPurge";
00341 
00342     PayloadSOAP req(cream_ns);
00343     req.NewChild("types:" + action + "Request").NewChild("types:jobId").NewChild("types:id") = jobid;
00344 
00345     XMLNode response;
00346     if (!process(req, response))
00347       return false;
00348 
00349     if (!response) {
00350       logger.msg(VERBOSE, "Empty response");
00351       return false;
00352     }
00353 
00354     return true;
00355   }
00356 
00357   bool CREAMClient::registerJob(const std::string& jdl_text,
00358                                 creamJobInfo& info) {
00359     logger.msg(VERBOSE, "Creating and sending job register request");
00360 
00361     action = "JobRegister";
00362 
00363     PayloadSOAP req(cream_ns);
00364     XMLNode act_job = req.NewChild("types:" + action + "Request").NewChild("types:JobDescriptionList");
00365     act_job.NewChild("types:JDL") = jdl_text;
00366     act_job.NewChild("types:autoStart") = "false";
00367     if (!delegationId.empty())
00368       act_job.NewChild("types:delegationId") = delegationId;
00369 
00370     XMLNode response;
00371     if (!process(req, response))
00372       return false;
00373 
00374     if (!response) {
00375       logger.msg(VERBOSE, "Empty response");
00376       return false;
00377     }
00378 
00379     if (!response["jobId"]["id"]) {
00380       logger.msg(VERBOSE, "No job ID in response");
00381       return false;
00382     }
00383 
00384     info.jobId = (std::string)response["jobId"]["id"];
00385     if (response["jobId"]["creamURL"])
00386       info.creamURL = URL((std::string)response["jobId"]["creamURL"]);
00387     for (XMLNode property = response["jobId"]["property"]; property; ++property) {
00388       if ((std::string)property["name"] == "CREAMInputSandboxURI")
00389         info.ISB_URI = (std::string)property["value"];
00390       else if ((std::string)property["name"] == "CREAMOutputSandboxURI")
00391         info.OSB_URI = (std::string)property["value"];
00392     }
00393 
00394     return true;
00395   }
00396 
00397   bool CREAMClient::startJob(const std::string& jobid) {
00398     logger.msg(VERBOSE, "Creating and sending job start request");
00399 
00400     action = "JobStart";
00401 
00402     PayloadSOAP req(cream_ns);
00403     XMLNode jobStartRequest = req.NewChild("types:" + action + "Request");
00404     jobStartRequest.NewChild("types:jobId").NewChild("types:id") = jobid;
00405     if (!delegationId.empty())
00406       jobStartRequest.NewChild("types:delegationId") = delegationId;
00407 
00408     XMLNode response;
00409     if (!process(req, response))
00410       return false;
00411 
00412     if (!response) {
00413       logger.msg(VERBOSE, "Empty response");
00414       return false;
00415     }
00416 
00417     if (!response["jobId"]["id"]) {
00418       logger.msg(VERBOSE, "No job ID in response");
00419       return false;
00420     }
00421 
00422     return true;
00423   }
00424 
00425   bool CREAMClient::createDelegation(const std::string& delegation_id,
00426                                      const std::string& proxy) {
00427     logger.msg(VERBOSE, "Creating delegation");
00428 
00429     action = "getProxyReq";
00430 
00431     PayloadSOAP req(cream_ns);
00432     req.NewChild("deleg:" + action).NewChild("delegationID") = delegation_id;
00433 
00434     XMLNode response;
00435     if (!process(req, response))
00436       return false;
00437 
00438     std::string proxyRequestStr = (std::string)response["getProxyReqReturn"];
00439     if (proxyRequestStr.empty()) {
00440       logger.msg(VERBOSE, "Malformed response: missing getProxyReqReturn");
00441       return false;
00442     }
00443 
00444     //Sign the proxy certificate
00445     Credential signer(proxy, "", cadir, cafile);
00446     std::string signedCert;
00447     // TODO: Hardcoded time shift - VERY BAD approach
00448     Time start_time = Time() - Period(300);
00449     Time end_time = signer.GetEndTime();
00450     if(end_time < start_time) {
00451       logger.msg(VERBOSE, "Delegatable credentials expired: %s",end_time.str());
00452       return false;
00453     }
00454     // CREAM is picky about end time of delegated credentials, so
00455     // make sure it does not exceed end time of signer
00456     Credential proxy_cred(start_time,end_time-start_time);
00457     proxy_cred.InquireRequest(proxyRequestStr);
00458     proxy_cred.SetProxyPolicy("gsi2", "", "", -1);
00459 
00460     if (!(signer.SignRequest(&proxy_cred, signedCert))) {
00461       logger.msg(VERBOSE, "Failed signing certificate request");
00462       return false;
00463     }
00464 
00465     std::string signedOutputCert, signedOutputCertChain;
00466     signer.OutputCertificate(signedOutputCert);
00467     signer.OutputCertificateChain(signedOutputCertChain);
00468     signedCert.append(signedOutputCert).append(signedOutputCertChain);
00469 
00470     action = "putProxy";
00471     req = PayloadSOAP(cream_ns);
00472     XMLNode putProxyRequest = req.NewChild("deleg:" + action);
00473     putProxyRequest.NewChild("delegationID") = delegation_id;
00474     putProxyRequest.NewChild("proxy") = signedCert;
00475 
00476     response = XMLNode();
00477     if (!process(req, response))
00478       return false;
00479 
00480     if (!response) {
00481       logger.msg(VERBOSE, "Failed putting signed delegation certificate to service");
00482       return false;
00483     }
00484 
00485     return true;
00486   }
00487 
00488   bool CREAMClient::destroyDelegation(const std::string& delegation_id) {
00489     logger.msg(VERBOSE, "Creating delegation");
00490 
00491     action = "destroy";
00492 
00493     PayloadSOAP req(cream_ns);
00494     req.NewChild("deleg:" + action).NewChild("delegationID") = delegation_id;
00495 
00496     XMLNode response;
00497     if (!process(req, response))
00498       return false;
00499 
00500     if (!response) {
00501       logger.msg(VERBOSE, "Empty response");
00502       return false;
00503     }
00504 
00505     return true;
00506   }
00507 } // namespace Arc