Back to index

nordugrid-arc-nox  1.1.0~rc6
JobControllerARC0.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #include <fstream>
00008 #include <map>
00009 #include <glibmm.h>
00010 
00011 #include <arc/client/JobDescription.h>
00012 #include <arc/data/DataBuffer.h>
00013 #include <arc/data/DataHandle.h>
00014 #include <arc/data/DataMover.h>
00015 #include <arc/data/URLMap.h>
00016 #include <arc/StringConv.h>
00017 #include <arc/XMLNode.h>
00018 #include <arc/UserConfig.h>
00019 #include <arc/Utils.h>
00020 
00021 #ifdef WIN32
00022 #include <arc/win32.h>
00023 #include <fcntl.h>
00024 #endif
00025 
00026 
00027 #include "JobStateARC0.h"
00028 #include "JobControllerARC0.h"
00029 #include "FTPControl.h"
00030 
00031 namespace Arc {
00032 
00033   Logger JobControllerARC0::logger(JobController::logger, "ARC0");
00034 
00035   JobControllerARC0::JobControllerARC0(const UserConfig& usercfg)
00036     : JobController(usercfg, "ARC0") {}
00037 
00038   JobControllerARC0::~JobControllerARC0() {}
00039 
00040   Plugin* JobControllerARC0::Instance(PluginArgument *arg) {
00041     JobControllerPluginArgument *jcarg =
00042       dynamic_cast<JobControllerPluginArgument*>(arg);
00043     if (!jcarg)
00044       return NULL;
00045     return new JobControllerARC0(*jcarg);
00046   }
00047 
00048   void JobControllerARC0::GetJobInformation() {
00049 
00050     std::map<std::string, std::list<Job*> > jobsbyhost;
00051     for (std::list<Job>::iterator it = jobstore.begin();
00052          it != jobstore.end(); it++)
00053       jobsbyhost[it->InfoEndpoint.ConnectionURL() +
00054                  it->InfoEndpoint.Path()].push_back(&*it);
00055 
00056     for (std::map<std::string, std::list<Job*> >::iterator hostit =
00057            jobsbyhost.begin(); hostit != jobsbyhost.end(); hostit++) {
00058 
00059       URL infourl = (*hostit->second.begin())->InfoEndpoint;
00060 
00061       // merge filters
00062       std::string filter = "(|";
00063       for (std::list<Job*>::iterator it = hostit->second.begin();
00064            it != hostit->second.end(); it++)
00065         filter += (*it)->InfoEndpoint.LDAPFilter();
00066       filter += ")";
00067 
00068       infourl.ChangeLDAPFilter(filter);
00069 
00070       DataHandle handler(infourl, usercfg);
00071       DataBuffer buffer;
00072 
00073       if (!handler) {
00074         logger.msg(INFO, "Can't create information handle - is the ARC ldap DMC plugin available?");
00075         return;
00076       }
00077 
00078       if (!handler->StartReading(buffer))
00079         return;
00080 
00081       int handle;
00082       unsigned int length;
00083       unsigned long long int offset;
00084       std::string result;
00085 
00086       while (buffer.for_write() || !buffer.eof_read())
00087         if (buffer.for_write(handle, length, offset, true)) {
00088           result.append(buffer[handle], length);
00089           buffer.is_written(handle);
00090         }
00091 
00092       if (!handler->StopReading())
00093         return;
00094 
00095       XMLNode xmlresult(result);
00096 
00097       for (std::list<Job*>::iterator it = hostit->second.begin();
00098            it != hostit->second.end(); it++) {
00099 
00100         XMLNodeList jobinfolist =
00101           xmlresult.XPathLookup("//nordugrid-job-globalid"
00102                                 "[nordugrid-job-globalid='" +
00103                                 (*it)->JobID.str() + "']", NS());
00104 
00105         if (jobinfolist.empty())
00106           continue;
00107 
00108         XMLNode& jobinfo = *jobinfolist.begin();
00109 
00110         if (jobinfo["nordugrid-job-status"])
00111           (*it)->State = JobStateARC0((std::string)jobinfo["nordugrid-job-status"]);
00112         if (jobinfo["nordugrid-job-globalowner"])
00113           (*it)->Owner = (std::string)jobinfo["nordugrid-job-globalowner"];
00114         if (jobinfo["nordugrid-job-execcluster"])
00115           (*it)->ExecutionCE =
00116             (std::string)jobinfo["nordugrid-job-execcluster"];
00117         if (jobinfo["nordugrid-job-execqueue"])
00118           (*it)->Queue = (std::string)jobinfo["nordugrid-job-execqueue"];
00119         if (jobinfo["nordugrid-job-submissionui"])
00120           (*it)->SubmissionHost =
00121             (std::string)jobinfo["nordugrid-job-submissionui"];
00122         if (jobinfo["nordugrid-job-submissiontime"])
00123           (*it)->SubmissionTime =
00124             (std::string)jobinfo["nordugrid-job-submissiontime"];
00125         if (jobinfo["nordugrid-job-sessiondirerasetime"])
00126           (*it)->WorkingAreaEraseTime =
00127             (std::string)jobinfo["nordugrid-job-sessiondirerasetime"];
00128         if (jobinfo["nordugrid-job-proxyexpirationtime"])
00129           (*it)->ProxyExpirationTime =
00130             (std::string)jobinfo["nordugrid-job-proxyexpirationtime"];
00131         if (jobinfo["nordugrid-job-completiontime"])
00132           (*it)->EndTime =
00133             (std::string)jobinfo["nordugrid-job-completiontime"];
00134         if (jobinfo["nordugrid-job-cpucount"])
00135           (*it)->UsedSlots = stringtoi(jobinfo["nordugrid-job-cpucount"]);
00136         if (jobinfo["nordugrid-job-usedcputime"])
00137           (*it)->UsedTotalCPUTime =
00138             (std::string)jobinfo["nordugrid-job-usedcputime"];
00139         if (jobinfo["nordugrid-job-usedwalltime"])
00140           (*it)->UsedTotalWallTime =
00141             (std::string)jobinfo["nordugrid-job-usedwalltime"];
00142         if (jobinfo["nordugrid-job-exitcode"])
00143           (*it)->ExitCode = stringtoi(jobinfo["nordugrid-job-exitcode"]);
00144         if (jobinfo["Mds-validfrom"]) {
00145           (*it)->CreationTime = (std::string)(jobinfo["Mds-validfrom"]);
00146           if (jobinfo["Mds-validto"]) {
00147             Time Validto = (std::string)(jobinfo["Mds-validto"]);
00148             (*it)->Validity = Validto - (*it)->CreationTime;
00149           }
00150         }
00151         if (jobinfo["nordugrid-job-stdout"])
00152           (*it)->StdOut = (std::string)(jobinfo["nordugrid-job-stdout"]);
00153         if (jobinfo["nordugrid-job-stderr"])
00154           (*it)->StdErr = (std::string)(jobinfo["nordugrid-job-stderr"]);
00155         if (jobinfo["nordugrid-job-stdin"])
00156           (*it)->StdIn = (std::string)(jobinfo["nordugrid-job-stdin"]);
00157         if (jobinfo["nordugrid-job-reqcputime"])
00158           (*it)->RequestedTotalCPUTime =
00159             (std::string)(jobinfo["nordugrid-job-reqcputime"]);
00160         if (jobinfo["nordugrid-job-reqwalltime"])
00161           (*it)->RequestedTotalWallTime =
00162             (std::string)(jobinfo["nordugrid-job-reqwalltime"]);
00163         if (jobinfo["nordugrid-job-rerunable"])
00164           (*it)->RestartState =
00165             (std::string)(jobinfo["nordugrid-job-rerunable"]);
00166         if (jobinfo["nordugrid-job-queuerank"])
00167           (*it)->WaitingPosition =
00168             stringtoi(jobinfo["nordugrid-job-queuerank"]);
00169         if (jobinfo["nordugrid-job-comment"])
00170           (*it)->OtherMessages.push_back(
00171             (std::string)(jobinfo["nordugrid-job-comment"]));
00172         if (jobinfo["nordugrid-job-usedmem"])
00173           (*it)->UsedMainMemory =
00174             stringtoi(jobinfo["nordugrid-job-usedmem"]);
00175         if (jobinfo["nordugrid-job-errors"])
00176           for (XMLNode n = jobinfo["nordugrid-job-errors"]; n; ++n)
00177             (*it)->Error.push_back((std::string)n);
00178         if (jobinfo["nordugrid-job-jobname"])
00179           (*it)->Name = (std::string)(jobinfo["nordugrid-job-jobname"]);
00180         if (jobinfo["nordugrid-job-gmlog"])
00181           (*it)->LogDir = (std::string)(jobinfo["nordugrid-job-gmlog"]);
00182         if (jobinfo["nordugrid-job-clientsofware"])
00183           (*it)->SubmissionClientName =
00184             (std::string)(jobinfo["nordugrid-job-clientsoftware"]);
00185         if (jobinfo["nordugrid-job-executionnodes"])
00186           for (XMLNode n = jobinfo["nordugrid-job-executionnodes"]; n; ++n)
00187             (*it)->ExecutionNode.push_back((std::string)n);
00188         if (jobinfo["nordugrid-job-runtimeenvironment"])
00189           for (XMLNode n = jobinfo["nordugrid-job-runtimeenvironment"]; n; ++n)
00190             (*it)->UsedApplicationEnvironment.push_back((std::string)n);
00191       }
00192     }
00193   }
00194 
00195   bool JobControllerARC0::GetJob(const Job& job,
00196                                  const std::string& downloaddir) {
00197 
00198     logger.msg(VERBOSE, "Downloading job: %s", job.JobID.str());
00199 
00200     std::string path = job.JobID.Path();
00201     std::string::size_type pos = path.rfind('/');
00202     std::string jobidnum = path.substr(pos + 1);
00203 
00204     std::list<std::string> files = GetDownloadFiles(job.JobID);
00205 
00206     URL src(job.JobID);
00207     URL dst(downloaddir.empty() ? jobidnum : downloaddir + G_DIR_SEPARATOR_S + jobidnum);
00208 
00209     std::string srcpath = src.Path();
00210     std::string dstpath = dst.Path();
00211 
00212     if (srcpath[srcpath.size() - 1] != '/')
00213       srcpath += '/';
00214     if (dstpath[dstpath.size() - 1] != G_DIR_SEPARATOR)
00215       dstpath += G_DIR_SEPARATOR_S;
00216 
00217     bool ok = true;
00218 
00219     for (std::list<std::string>::iterator it = files.begin();
00220          it != files.end(); it++) {
00221       src.ChangePath(srcpath + *it);
00222       dst.ChangePath(dstpath + *it);
00223       if (!ARCCopyFile(src, dst)) {
00224         logger.msg(INFO, "Failed dowloading %s to %s", src.str(), dst.str());
00225         ok = false;
00226       }
00227     }
00228 
00229     return ok;
00230   }
00231 
00232   bool JobControllerARC0::CleanJob(const Job& job, bool force) {
00233 
00234     logger.msg(VERBOSE, "Cleaning job: %s", job.JobID.str());
00235 
00236     FTPControl ctrl;
00237     if (!ctrl.Connect(job.JobID, usercfg.ProxyPath(), usercfg.CertificatePath(),
00238                       usercfg.KeyPath(), usercfg.Timeout())) {
00239       logger.msg(INFO, "Failed to connect for job cleaning");
00240       return false;
00241     }
00242 
00243     std::string path = job.JobID.Path();
00244     std::string::size_type pos = path.rfind('/');
00245     std::string jobpath = path.substr(0, pos);
00246     std::string jobidnum = path.substr(pos + 1);
00247 
00248     if (!ctrl.SendCommand("CWD " + jobpath, usercfg.Timeout())) {
00249       logger.msg(INFO, "Failed sending CWD command for job cleaning");
00250       return false;
00251     }
00252 
00253     if (!ctrl.SendCommand("RMD " + jobidnum, usercfg.Timeout())) {
00254       logger.msg(INFO, "Failed sending RMD command for job cleaning");
00255       return false;
00256     }
00257 
00258     if (!ctrl.Disconnect(usercfg.Timeout())) {
00259       logger.msg(INFO, "Failed to disconnect after job cleaning");
00260       return false;
00261     }
00262 
00263     logger.msg(VERBOSE, "Job cleaning successful");
00264 
00265     return true;
00266   }
00267 
00268   bool JobControllerARC0::CancelJob(const Job& job) {
00269 
00270     logger.msg(VERBOSE, "Cleaning job: %s", job.JobID.str());
00271 
00272     FTPControl ctrl;
00273     if (!ctrl.Connect(job.JobID, usercfg.ProxyPath(), usercfg.CertificatePath(),
00274                       usercfg.KeyPath(), usercfg.Timeout())) {
00275       logger.msg(INFO, "Failed to connect for job cleaning");
00276       return false;
00277     }
00278 
00279     std::string path = job.JobID.Path();
00280     std::string::size_type pos = path.rfind('/');
00281     std::string jobpath = path.substr(0, pos);
00282     std::string jobidnum = path.substr(pos + 1);
00283 
00284     if (!ctrl.SendCommand("CWD " + jobpath, usercfg.Timeout())) {
00285       logger.msg(INFO, "Failed sending CWD command for job cancelling");
00286       return false;
00287     }
00288 
00289     if (!ctrl.SendCommand("DELE " + jobidnum, usercfg.Timeout())) {
00290       logger.msg(INFO, "Failed sending DELE command for job cancelling");
00291       return false;
00292     }
00293 
00294     if (!ctrl.Disconnect(usercfg.Timeout())) {
00295       logger.msg(INFO, "Failed to disconnect after job cancelling");
00296       return false;
00297     }
00298 
00299     logger.msg(VERBOSE, "Job cancelling successful");
00300 
00301     return true;
00302   }
00303 
00304   bool JobControllerARC0::RenewJob(const Job& job) {
00305 
00306     logger.msg(VERBOSE, "Renewing credentials for job: %s", job.JobID.str());
00307 
00308     FTPControl ctrl;
00309     if (!ctrl.Connect(job.JobID, usercfg.ProxyPath(), usercfg.CertificatePath(),
00310                       usercfg.KeyPath(), usercfg.Timeout())) {
00311       logger.msg(INFO, "Failed to connect for credential renewal");
00312       return false;
00313     }
00314 
00315     std::string path = job.JobID.Path();
00316     std::string::size_type pos = path.rfind('/');
00317     std::string jobpath = path.substr(0, pos);
00318     std::string jobidnum = path.substr(pos + 1);
00319 
00320     if (!ctrl.SendCommand("CWD " + jobpath, usercfg.Timeout())) {
00321       logger.msg(INFO, "Failed sending CWD command for credentials renewal");
00322       return false;
00323     }
00324 
00325     if (!ctrl.SendCommand("CWD " + jobidnum, usercfg.Timeout())) {
00326       logger.msg(INFO, "Failed sending CWD command for credentials renewal");
00327       return false;
00328     }
00329 
00330     if (!ctrl.Disconnect(usercfg.Timeout())) {
00331       logger.msg(INFO, "Failed to disconnect after credentials renewal");
00332       return false;
00333     }
00334 
00335     logger.msg(VERBOSE, "Renewal of credentials was successful");
00336 
00337     return true;
00338   }
00339 
00340   bool JobControllerARC0::ResumeJob(const Job& job) {
00341     if (job.RestartState.empty()) {
00342       logger.msg(INFO, "Job %s does not report a resumable state", job.JobID.str());
00343       return false;
00344     }
00345     std::cout << "Resuming job " << job.JobID.str() << " at state " << job.RestartState << std::endl;
00346 
00347     RenewJob(job);
00348 
00349     // dump rsl into temporary file
00350     std::string urlstr = job.JobID.str();
00351     std::string::size_type pos = urlstr.rfind('/');
00352     if (pos == std::string::npos || pos == 0) {
00353       logger.msg(INFO, "Illegal jobid specified");
00354       return false;
00355     }
00356     std::string jobnr = urlstr.substr(pos + 1);
00357     urlstr = urlstr.substr(0, pos) + "/new/action";
00358     logger.msg(VERBOSE, "HER: %s", urlstr);
00359 
00360     std::string rsl("&(action=restart)(jobid=" + jobnr + ")");
00361 
00362     std::string filename = Glib::build_filename(Glib::get_tmp_dir(), "arcresume.XXXXXX");
00363     int tmp_h = Glib::mkstemp(filename);
00364     if (tmp_h == -1) {
00365       logger.msg(INFO, "Could not create temporary file: %s", filename);
00366       return false;
00367     }
00368     std::ofstream outfile(filename.c_str(), std::ofstream::binary);
00369     outfile.write(rsl.c_str(), rsl.size());
00370     if (outfile.fail()) {
00371       logger.msg(INFO, "Could not write temporary file: %s", filename);
00372       return false;
00373     }
00374     outfile.close();
00375 
00376     // Send temporary file to cluster
00377     DataMover mover;
00378     FileCache cache;
00379     URL source_url(filename);
00380     URL dest_url(urlstr);
00381     DataHandle source(source_url, usercfg);
00382     DataHandle destination(dest_url, usercfg);
00383     source->SetTries(1);
00384     destination->SetTries(1);
00385     DataStatus res = mover.Transfer(*source, *destination, cache, URLMap(),
00386                                     0, 0, 0, usercfg.Timeout());
00387     if (!res.Passed()) {
00388       if (!res.GetDesc().empty())
00389         logger.msg(INFO, "Current transfer FAILED: %s - %s", std::string(res), res.GetDesc());
00390       else
00391         logger.msg(INFO, "Current transfer FAILED: %s", std::string(res));
00392       mover.Delete(*destination);
00393       return false;
00394     }
00395     else
00396       logger.msg(INFO, "Current transfer complete");
00397 
00398     //Cleaning up
00399     source->Remove();
00400 
00401     logger.msg(VERBOSE, "Job resumed successful");
00402 
00403     return true;
00404   }
00405 
00406   URL JobControllerARC0::GetFileUrlForJob(const Job& job,
00407                                           const std::string& whichfile) {
00408 
00409     URL url(job.JobID);
00410 
00411     if (whichfile == "stdout")
00412       url.ChangePath(url.Path() + '/' + job.StdOut);
00413     else if (whichfile == "stderr")
00414       url.ChangePath(url.Path() + '/' + job.StdErr);
00415     else if (whichfile == "gmlog") {
00416       std::string path = url.Path();
00417       path.insert(path.rfind('/'), "/info");
00418       url.ChangePath(path + "/errors");
00419     }
00420 
00421     return url;
00422   }
00423 
00424   bool JobControllerARC0::GetJobDescription(const Job& job, std::string& desc_str) {
00425 
00426     std::string jobid = job.JobID.str();
00427     logger.msg(VERBOSE, "Trying to retrieve job description of %s from cluster", jobid);
00428 
00429     std::string::size_type pos = jobid.rfind("/");
00430     if (pos == std::string::npos) {
00431       logger.msg(INFO, "invalid jobid: %s", jobid);
00432       return false;
00433     }
00434     std::string cluster = jobid.substr(0, pos);
00435     std::string shortid = jobid.substr(pos + 1);
00436 
00437     // Transfer job description
00438     DataMover mover;
00439     mover.secure(false);
00440     mover.passive(true);
00441     mover.verbose(false);
00442     mover.force_to_meta(false);
00443     mover.retry(true);
00444     FileCache cache;
00445     URL source_url(cluster + "/info/" + shortid + "/description");
00446     std::string tmpfile = shortid + G_DIR_SEPARATOR_S + "description";
00447     std::string localfile = Glib::build_filename(Glib::get_tmp_dir(), tmpfile);
00448     URL dest_url(localfile);
00449     DataHandle source(source_url, usercfg);
00450     DataHandle destination(dest_url, usercfg);
00451     source->SetTries(1);
00452     destination->SetTries(1);
00453     DataStatus res = mover.Transfer(*source, *destination, cache, URLMap(),
00454                                     0, 0, 0, usercfg.Timeout());
00455     if (!res.Passed()) {
00456       if (!res.GetDesc().empty())
00457         logger.msg(INFO, "Current transfer FAILED: %s - %s", std::string(res), res.GetDesc());
00458       else
00459         logger.msg(INFO, "Current transfer FAILED: %s", std::string(res));
00460       mover.Delete(*destination);
00461       return false;
00462     }
00463     else
00464       logger.msg(INFO, "Current transfer complete");
00465 
00466     // Read job description from file
00467     std::ifstream descriptionfile(localfile.c_str());
00468 
00469     if (!descriptionfile) {
00470       logger.msg(INFO, "Can not open job description file: %s", localfile);
00471       return false;
00472     }
00473 
00474     descriptionfile.seekg(0, std::ios::end);
00475     std::streamsize length = descriptionfile.tellg();
00476     descriptionfile.seekg(0, std::ios::beg);
00477 
00478     char *buffer = new char[length + 1];
00479     descriptionfile.read(buffer, length);
00480     descriptionfile.close();
00481 
00482     buffer[length] = '\0';
00483 
00484     desc_str = (std::string)buffer;
00485     //Cleaning up
00486     delete[] buffer;
00487     destination->Remove();
00488 
00489     // Extracting original client xrsl
00490     pos = desc_str.find("clientxrsl");
00491     if (pos != std::string::npos) {
00492       logger.msg(VERBOSE, "clientxrsl found");
00493       std::string::size_type pos1 = desc_str.find("&", pos);
00494       if (pos1 == std::string::npos) {
00495         logger.msg(INFO, "could not find start of clientxrsl");
00496         return false;
00497       }
00498       std::string::size_type pos2 = desc_str.find(")\"", pos1);
00499       if (pos2 == std::string::npos) {
00500         logger.msg(INFO, "could not find end of clientxrsl");
00501         return false;
00502       }
00503       desc_str.erase(pos2 + 1);
00504       desc_str.erase(0, pos1);
00505       for (std::string::size_type i = 0; i != std::string::npos;) {
00506         i = desc_str.find("\"\"", i);
00507         if (i != std::string::npos)
00508           desc_str.erase(i, 1);
00509       }
00510       logger.msg(DEBUG, "Job description: %s", desc_str);
00511     }
00512     else {
00513       logger.msg(INFO, "clientxrsl not found");
00514       return false;
00515     }
00516 
00517     JobDescription desc;
00518     desc.Parse(desc_str);
00519     if (!desc) {
00520       logger.msg(INFO, "Invalid JobDescription:");
00521       std::cout << desc_str << std::endl;
00522       return false;
00523     }
00524     logger.msg(VERBOSE, "Valid JobDescription found");
00525     return true;
00526   }
00527 
00528 } // namespace Arc