Back to index

nordugrid-arc-nox  1.1.0~rc6
SubmitterARC1.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #include <string>
00008 #include <sstream>
00009 
00010 #include <glibmm.h>
00011 
00012 #include <arc/StringConv.h>
00013 #include <arc/UserConfig.h>
00014 #include <arc/client/ExecutionTarget.h>
00015 #include <arc/client/JobDescription.h>
00016 #include <arc/message/MCC.h>
00017 
00018 #include "SubmitterARC1.h"
00019 #include "AREXClient.h"
00020 
00021 namespace Arc {
00022 
00023   Logger SubmitterARC1::logger(Submitter::logger, "ARC1");
00024 
00025   SubmitterARC1::SubmitterARC1(const UserConfig& usercfg)
00026     : Submitter(usercfg, "ARC1") {}
00027 
00028   SubmitterARC1::~SubmitterARC1() {}
00029 
00030   Plugin* SubmitterARC1::Instance(PluginArgument *arg) {
00031     SubmitterPluginArgument *subarg =
00032       dynamic_cast<SubmitterPluginArgument*>(arg);
00033     if (!subarg)
00034       return NULL;
00035     return new SubmitterARC1(*subarg);
00036   }
00037 
00038   URL SubmitterARC1::Submit(const JobDescription& jobdesc,
00039                             const ExecutionTarget& et) const {
00040     MCCConfig cfg;
00041     usercfg.ApplyToConfig(cfg);
00042     AREXClient ac(et.url, cfg, usercfg.Timeout());
00043 
00044     JobDescription job(jobdesc);
00045 
00046     if (!ModifyJobDescription(job, et)) {
00047       logger.msg(INFO, "Failed adapting job description to target resources");
00048       return URL();
00049     }
00050 
00051     std::string jobid;
00052     if (!ac.submit(job.UnParse("ARCJSDL"), jobid, et.url.Protocol() == "https"))
00053       return URL();
00054 
00055     if (jobid.empty()) {
00056       logger.msg(INFO, "No job identifier returned by A-REX");
00057       return URL();
00058     }
00059 
00060     XMLNode jobidx(jobid);
00061     URL session_url((std::string)(jobidx["ReferenceParameters"]["JobSessionDir"]));
00062 
00063     if (!PutFiles(job, session_url)) {
00064       logger.msg(INFO, "Failed uploading local input files");
00065       return URL();
00066     }
00067 
00068     AddJob(job, session_url, et.Cluster, session_url);
00069 
00070     return session_url;
00071   }
00072 
00073   URL SubmitterARC1::Migrate(const URL& jobid, const JobDescription& jobdesc,
00074                              const ExecutionTarget& et,
00075                              bool forcemigration) const {
00076     MCCConfig cfg;
00077     usercfg.ApplyToConfig(cfg);
00078     AREXClient ac(et.url, cfg, usercfg.Timeout());
00079 
00080     std::string idstr;
00081     AREXClient::createActivityIdentifier(jobid, idstr);
00082 
00083     JobDescription job(jobdesc);
00084 
00085     // Modify the location of local files and files residing in a old session directory.
00086     for (std::list<FileType>::iterator it = job.DataStaging.File.begin();
00087          it != job.DataStaging.File.end(); it++) {
00088       // Do not modify Output and Error files.
00089       if (it->Name == job.Application.Output ||
00090           it->Name == job.Application.Error ||
00091           it->Source.empty())
00092         continue;
00093 
00094       if (!it->Source.front().URI || it->Source.front().URI.Protocol() == "file") {
00095         it->Source.front().URI = URL(jobid.str() + "/" + it->Name);
00096         it->DownloadToCache = false;
00097       }
00098       else {
00099         // URL is valid, and not a local file. Check if the source reside at a
00100         // old job session directory.
00101         const size_t foundRSlash = it->Source.front().URI.str().rfind('/');
00102         if (foundRSlash == std::string::npos)
00103           continue;
00104 
00105         const std::string uriPath = it->Source.front().URI.str().substr(0, foundRSlash);
00106         // Check if the input file URI is pointing to a old job session directory.
00107         for (std::list<std::string>::const_iterator itAOID = job.Identification.ActivityOldId.begin();
00108              itAOID != job.Identification.ActivityOldId.end(); itAOID++)
00109           if (uriPath == *itAOID) {
00110             it->Source.front().URI = URL(jobid.str() + "/" + it->Name);
00111             it->DownloadToCache = false;
00112             break;
00113           }
00114       }
00115     }
00116 
00117     if (!ModifyJobDescription(job, et)) {
00118       logger.msg(INFO, "Failed adapting job description to target resources");
00119       return URL();
00120     }
00121 
00122     // Add ActivityOldId.
00123     job.Identification.ActivityOldId.push_back(jobid.str());
00124 
00125     std::string newjobid;
00126     if (!ac.migrate(idstr, job.UnParse("ARCJSDL"), forcemigration, newjobid,
00127                     et.url.Protocol() == "https"))
00128       return URL();
00129 
00130     if (newjobid.empty()) {
00131       logger.msg(INFO, "No job identifier returned by A-REX");
00132       return URL();
00133     }
00134 
00135     XMLNode newjobidx(newjobid);
00136     URL session_url((std::string)(newjobidx["ReferenceParameters"]["JobSessionDir"]));
00137 
00138     if (!PutFiles(job, session_url)) {
00139       logger.msg(INFO, "Failed uploading local input files");
00140       return URL();
00141     }
00142 
00143     AddJob(job, session_url, et.Cluster, session_url);
00144 
00145     return session_url;
00146   }
00147 
00148   bool SubmitterARC1::ModifyJobDescription(JobDescription& jobdesc, const ExecutionTarget& et) const {
00149     // Check for identical file names.
00150     bool executableIsAdded(false), inputIsAdded(false), outputIsAdded(false), errorIsAdded(false), logDirIsAdded(false);
00151     for (std::list<FileType>::const_iterator it1 = jobdesc.DataStaging.File.begin();
00152          it1 != jobdesc.DataStaging.File.end(); it1++) {
00153       for (std::list<FileType>::const_iterator it2 = it1;
00154            it2 != jobdesc.DataStaging.File.end(); it2++) {
00155         if (it1 == it2) continue;
00156 
00157         if (it1->Name == it2->Name && (!it1->Source.empty() && !it2->Source.empty() ||
00158                                        !it1->Target.empty() && !it2->Target.empty())) {
00159           logger.msg(VERBOSE, "Two files have identical file name '%s'.", it1->Name);
00160           return false;
00161         }
00162 
00163       }
00164 
00165       executableIsAdded  |= (it1->Name == jobdesc.Application.Executable.Name);
00166       inputIsAdded       |= (it1->Name == jobdesc.Application.Input);
00167       outputIsAdded      |= (it1->Name == jobdesc.Application.Output);
00168       errorIsAdded       |= (it1->Name == jobdesc.Application.Error);
00169       logDirIsAdded      |= (it1->Name == jobdesc.Application.LogDir);
00170     }
00171 
00172     if (!executableIsAdded &&
00173         !Glib::path_is_absolute(jobdesc.Application.Executable.Name)) {
00174       FileType file;
00175       file.Name = jobdesc.Application.Executable.Name;
00176       DataSourceType s;
00177       s.URI = file.Name;
00178       file.Source.push_back(s);
00179       file.KeepData = false;
00180       file.IsExecutable = true;
00181       file.DownloadToCache = false;
00182       jobdesc.DataStaging.File.push_back(file);
00183     }
00184 
00185     if (!jobdesc.Application.Input.empty() && !inputIsAdded) {
00186       FileType file;
00187       file.Name = jobdesc.Application.Input;
00188       DataSourceType s;
00189       s.URI = file.Name;
00190       file.Source.push_back(s);
00191       file.KeepData = false;
00192       file.IsExecutable = false;
00193       file.DownloadToCache = false;
00194       jobdesc.DataStaging.File.push_back(file);
00195     }
00196 
00197     if (!jobdesc.Application.Output.empty() && !outputIsAdded) {
00198       FileType file;
00199       file.Name = jobdesc.Application.Output;
00200       file.KeepData = true;
00201       file.IsExecutable = false;
00202       file.DownloadToCache = false;
00203       jobdesc.DataStaging.File.push_back(file);
00204     }
00205 
00206     if (!jobdesc.Application.Error.empty() && !errorIsAdded) {
00207       FileType file;
00208       file.Name = jobdesc.Application.Error;
00209       file.KeepData = true;
00210       file.IsExecutable = false;
00211       file.DownloadToCache = false;
00212       jobdesc.DataStaging.File.push_back(file);
00213     }
00214 
00215     if (!jobdesc.Application.LogDir.empty() && !logDirIsAdded) {
00216       FileType file;
00217       file.Name = jobdesc.Application.LogDir;
00218       file.KeepData = true;
00219       file.IsExecutable = false;
00220       file.DownloadToCache = false;
00221       jobdesc.DataStaging.File.push_back(file);
00222     }
00223 
00224     if (!jobdesc.Resources.RunTimeEnvironment.empty() &&
00225         !jobdesc.Resources.RunTimeEnvironment.selectSoftware(et.ApplicationEnvironments)) {
00226       // This error should never happen since RTE is checked in the Broker.
00227       logger.msg(VERBOSE, "Unable to select run time environment");
00228       return false;
00229     }
00230 
00231     if (!jobdesc.Resources.CEType.empty() &&
00232         !jobdesc.Resources.CEType.selectSoftware(et.Implementation)) {
00233       // This error should never happen since Middleware is checked in the Broker.
00234       logger.msg(VERBOSE, "Unable to select middleware");
00235       return false;
00236     }
00237 
00238     if (!jobdesc.Resources.OperatingSystem.empty() &&
00239         !jobdesc.Resources.OperatingSystem.selectSoftware(et.Implementation)) {
00240       // This error should never happen since OS is checked in the Broker.
00241       logger.msg(VERBOSE, "Unable to select operating system.");
00242       return false;
00243     }
00244 
00245     // Set cluster and queue if not specified by user.
00246     if (jobdesc.Resources.CandidateTarget.empty()) {
00247       ResourceTargetType candidateTarget;
00248       candidateTarget.EndPointURL = URL();
00249       candidateTarget.QueueName = et.ComputingShareName;
00250       jobdesc.Resources.CandidateTarget.push_back(candidateTarget);
00251     }
00252     else if (jobdesc.Resources.CandidateTarget.front().QueueName.empty())
00253       jobdesc.Resources.CandidateTarget.front().QueueName = et.ComputingShareName;
00254 
00255     return true;
00256   }
00257 
00258 } // namespace Arc