Back to index

nordugrid-arc-nox  1.1.0~rc6
JobControllerARC1.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #include <glib.h>
00008 
00009 #include <arc/StringConv.h>
00010 #include <arc/UserConfig.h>
00011 #include <arc/XMLNode.h>
00012 #include <arc/client/JobDescription.h>
00013 #include <arc/data/DataMover.h>
00014 #include <arc/data/DataHandle.h>
00015 #include <arc/data/URLMap.h>
00016 #include <arc/message/MCC.h>
00017 
00018 #include "AREXClient.h"
00019 #include "JobControllerARC1.h"
00020 
00021 namespace Arc {
00022 
00023   Logger JobControllerARC1::logger(JobController::logger, "ARC1");
00024 
00025   JobControllerARC1::JobControllerARC1(const UserConfig& usercfg)
00026     : JobController(usercfg, "ARC1") {}
00027 
00028   JobControllerARC1::~JobControllerARC1() {}
00029 
00030   Plugin* JobControllerARC1::Instance(PluginArgument *arg) {
00031     JobControllerPluginArgument *jcarg =
00032       dynamic_cast<JobControllerPluginArgument*>(arg);
00033     if (!jcarg)
00034       return NULL;
00035     return new JobControllerARC1(*jcarg);
00036   }
00037 
00038   void JobControllerARC1::GetJobInformation() {
00039     MCCConfig cfg;
00040     usercfg.ApplyToConfig(cfg);
00041 
00042     for (std::list<Job>::iterator iter = jobstore.begin();
00043          iter != jobstore.end(); iter++) {
00044       AREXClient ac(iter->Cluster, cfg, usercfg.Timeout());
00045       std::string idstr;
00046       AREXClient::createActivityIdentifier(iter->JobID, idstr);
00047       if (!ac.stat(idstr, *iter))
00048         logger.msg(INFO, "Failed retrieving information for job: %s", iter->JobID.str());
00049     }
00050   }
00051 
00052   bool JobControllerARC1::GetJob(const Job& job,
00053                                  const std::string& downloaddir) {
00054 
00055     logger.msg(VERBOSE, "Downloading job: %s", job.JobID.str());
00056 
00057     std::string path = job.JobID.Path();
00058     std::string::size_type pos = path.rfind('/');
00059     std::string jobidnum = path.substr(pos + 1);
00060 
00061     std::list<std::string> files = GetDownloadFiles(job.JobID);
00062 
00063     URL src(job.JobID);
00064     URL dst(downloaddir.empty() ? jobidnum : downloaddir + G_DIR_SEPARATOR_S + jobidnum);
00065 
00066     std::string srcpath = src.Path();
00067     std::string dstpath = dst.Path();
00068 
00069     if (srcpath.empty() || (srcpath[srcpath.size() - 1] != '/'))
00070       srcpath += '/';
00071     if (dstpath.empty() || (dstpath[dstpath.size() - 1] != G_DIR_SEPARATOR))
00072       dstpath += G_DIR_SEPARATOR_S;
00073 
00074     bool ok = true;
00075 
00076     for (std::list<std::string>::iterator it = files.begin();
00077          it != files.end(); it++) {
00078       src.ChangePath(srcpath + *it);
00079       dst.ChangePath(dstpath + *it);
00080       if (!ARCCopyFile(src, dst)) {
00081         logger.msg(INFO, "Failed dowloading %s to %s", src.str(), dst.str());
00082         ok = false;
00083       }
00084     }
00085 
00086     return ok;
00087   }
00088 
00089   bool JobControllerARC1::CleanJob(const Job& job, bool force) {
00090     MCCConfig cfg;
00091     usercfg.ApplyToConfig(cfg);
00092     AREXClient ac(job.Cluster, cfg, usercfg.Timeout());
00093     std::string idstr;
00094     AREXClient::createActivityIdentifier(job.JobID, idstr);
00095     return ac.clean(idstr);
00096   }
00097 
00098   bool JobControllerARC1::CancelJob(const Job& job) {
00099     MCCConfig cfg;
00100     usercfg.ApplyToConfig(cfg);
00101     AREXClient ac(job.Cluster, cfg, usercfg.Timeout());
00102     std::string idstr;
00103     AREXClient::createActivityIdentifier(job.JobID, idstr);
00104     return ac.kill(idstr);
00105   }
00106 
00107   bool JobControllerARC1::RenewJob(const Job& job) {
00108     logger.msg(INFO, "Renewal of ARC1 jobs is not supported");
00109     return false;
00110   }
00111 
00112   bool JobControllerARC1::ResumeJob(const Job& job) {
00113 
00114     if (job.RestartState.empty()) {
00115       logger.msg(INFO, "Job %s does not report a resumable state", job.JobID.str());
00116       return false;
00117     }
00118 
00119     logger.msg(VERBOSE, "Resuming job: %s at state: %s", job.JobID.str(), job.RestartState);
00120 
00121     MCCConfig cfg;
00122     usercfg.ApplyToConfig(cfg);
00123     AREXClient ac(job.Cluster, cfg, usercfg.Timeout());
00124     std::string idstr;
00125     AREXClient::createActivityIdentifier(job.JobID, idstr);
00126     bool ok = ac.resume(idstr);
00127     if (ok)
00128       logger.msg(VERBOSE, "Job resuming successful");
00129     return ok;
00130   }
00131 
00132   URL JobControllerARC1::GetFileUrlForJob(const Job& job,
00133                                           const std::string& whichfile) {
00134     URL url(job.JobID);
00135 
00136     if (whichfile == "stdout")
00137       url.ChangePath(url.Path() + '/' + job.StdOut);
00138     else if (whichfile == "stderr")
00139       url.ChangePath(url.Path() + '/' + job.StdErr);
00140     else if (whichfile == "gmlog")
00141       url.ChangePath(url.Path() + "/" + job.LogDir + "/errors");
00142 
00143     return url;
00144   }
00145 
00146   bool JobControllerARC1::GetJobDescription(const Job& job, std::string& desc_str) {
00147     MCCConfig cfg;
00148     usercfg.ApplyToConfig(cfg);
00149     AREXClient ac(job.Cluster, cfg, usercfg.Timeout());
00150     std::string idstr;
00151     AREXClient::createActivityIdentifier(job.JobID, idstr);
00152     if (ac.getdesc(idstr, desc_str)) {
00153       JobDescription desc;
00154       if (desc.Parse(desc_str))
00155         return true;
00156     }
00157 
00158     logger.msg(ERROR, "Failed retrieving job description for job: %s", job.JobID.str());
00159     return false;
00160   }
00161 } // namespace Arc