Back to index

nordugrid-arc-nox  1.1.0~rc6
SubmitterARC0.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #include <unistd.h>
00008 #include <glibmm.h>
00009 
00010 #include <arc/Logger.h>
00011 #include <arc/UserConfig.h>
00012 #include <arc/client/ExecutionTarget.h>
00013 #include <arc/client/JobDescription.h>
00014 
00015 #include "SubmitterARC0.h"
00016 #include "FTPControl.h"
00017 
00018 namespace Arc {
00019 
00020   Logger SubmitterARC0::logger(Submitter::logger, "ARC0");
00021 
00022   SubmitterARC0::SubmitterARC0(const UserConfig& usercfg)
00023     : Submitter(usercfg, "ARC0") {}
00024 
00025   SubmitterARC0::~SubmitterARC0() {}
00026 
00027   Plugin* SubmitterARC0::Instance(PluginArgument *arg) {
00028     SubmitterPluginArgument *subarg =
00029       dynamic_cast<SubmitterPluginArgument*>(arg);
00030     if (!subarg)
00031       return NULL;
00032     return new SubmitterARC0(*subarg);
00033   }
00034 
00035   URL SubmitterARC0::Submit(const JobDescription& jobdesc,
00036                             const ExecutionTarget& et) const {
00037 
00038     FTPControl ctrl;
00039 
00040     if (!ctrl.Connect(et.url,
00041                       usercfg.ProxyPath(), usercfg.CertificatePath(),
00042                       usercfg.KeyPath(), usercfg.Timeout())) {
00043       logger.msg(INFO, "Submit: Failed to connect");
00044       return URL();
00045     }
00046 
00047     if (!ctrl.SendCommand("CWD " + et.url.Path(), usercfg.Timeout())) {
00048       logger.msg(INFO, "Submit: Failed sending CWD command");
00049       ctrl.Disconnect(usercfg.Timeout());
00050       return URL();
00051     }
00052 
00053     std::string response;
00054 
00055     if (!ctrl.SendCommand("CWD new", response, usercfg.Timeout())) {
00056       logger.msg(INFO, "Submit: Failed sending CWD new command");
00057       ctrl.Disconnect(usercfg.Timeout());
00058       return URL();
00059     }
00060 
00061     std::string::size_type pos2 = response.rfind('"');
00062     std::string::size_type pos1 = response.rfind('/', pos2 - 1);
00063     std::string jobnumber = response.substr(pos1 + 1, pos2 - pos1 - 1);
00064 
00065     JobDescription job(jobdesc);
00066 
00067     if (!ModifyJobDescription(job, et)) {
00068       logger.msg(INFO, "Submit: Failed to modify job description "
00069                         "to be sent to target.");
00070       ctrl.Disconnect(usercfg.Timeout());
00071       return URL();
00072     }
00073 
00074     std::string jobdescstring = job.UnParse("XRSL");
00075 
00076     if (!ctrl.SendData(jobdescstring, "job", usercfg.Timeout())) {
00077       logger.msg(INFO, "Submit: Failed sending job description");
00078       ctrl.Disconnect(usercfg.Timeout());
00079       return URL();
00080     }
00081 
00082     if (!ctrl.Disconnect(usercfg.Timeout())) {
00083       logger.msg(INFO, "Submit: Failed to disconnect after submission");
00084       return URL();
00085     }
00086 
00087     URL jobid(et.url);
00088     jobid.ChangePath(jobid.Path() + '/' + jobnumber);
00089 
00090     if (!PutFiles(job, jobid)) {
00091       logger.msg(INFO, "Submit: Failed uploading local input files");
00092       return URL();
00093     }
00094 
00095     // Prepare contact url for information about this job
00096     URL infoEndpoint(et.Cluster);
00097     infoEndpoint.ChangeLDAPFilter("(nordugrid-job-globalid=" +
00098                                   jobid.str() + ")");
00099     infoEndpoint.ChangeLDAPScope(URL::subtree);
00100 
00101     AddJob(job, jobid, et.Cluster, infoEndpoint);
00102 
00103     return jobid;
00104   }
00105 
00106   URL SubmitterARC0::Migrate(const URL& jobid, const JobDescription& jobdesc,
00107                              const ExecutionTarget& et, bool forcemigration) const {
00108     logger.msg(INFO, "Trying to migrate to %s: Migration to a ARC0 cluster is not supported.", et.url.str());
00109     return URL();
00110   }
00111 
00112   bool SubmitterARC0::ModifyJobDescription(JobDescription& jobdesc, const ExecutionTarget& et) const {
00113     if (jobdesc.XRSL_elements["clientxrsl"].empty())
00114       jobdesc.XRSL_elements["clientxrsl"] = jobdesc.UnParse("XRSL");
00115 
00116     // Check for identical file names.
00117     // Check if executable and input is contained in the file list.
00118     bool inputIsAdded(false), executableIsAdded(false), outputIsAdded(false), errorIsAdded(false), logDirIsAdded(false);
00119     for (std::list<FileType>::const_iterator it1 = jobdesc.DataStaging.File.begin();
00120          it1 != jobdesc.DataStaging.File.end(); it1++) {
00121       for (std::list<FileType>::const_iterator it2 = it1;
00122            it2 != jobdesc.DataStaging.File.end(); it2++) {
00123         if (it1 == it2) continue;
00124 
00125         if (it1->Name == it2->Name && (!it1->Source.empty() && !it2->Source.empty() ||
00126                                        !it1->Target.empty() && !it2->Target.empty())) {
00127           logger.msg(VERBOSE, "Two files have identical file name '%s'.", it1->Name);
00128           return false;
00129         }
00130       }
00131 
00132       executableIsAdded  |= (it1->Name == jobdesc.Application.Executable.Name);
00133       inputIsAdded       |= (it1->Name == jobdesc.Application.Input);
00134       outputIsAdded      |= (it1->Name == jobdesc.Application.Output);
00135       errorIsAdded       |= (it1->Name == jobdesc.Application.Error);
00136       logDirIsAdded      |= (it1->Name == jobdesc.Application.LogDir);
00137     }
00138 
00139     if (!executableIsAdded &&
00140         !Glib::path_is_absolute(jobdesc.Application.Executable.Name)) {
00141       FileType file;
00142       file.Name = jobdesc.Application.Executable.Name;
00143       DataSourceType s;
00144       s.URI = file.Name;
00145       file.KeepData = false;
00146       file.IsExecutable = true;
00147       file.DownloadToCache = false;
00148       jobdesc.DataStaging.File.push_back(file);
00149     }
00150 
00151     if (!jobdesc.Application.Input.empty() && !inputIsAdded) {
00152       FileType file;
00153       file.Name = jobdesc.Application.Input;
00154       DataSourceType s;
00155       s.URI = file.Name;
00156       file.Source.push_back(s);
00157       file.Source.push_back(s);
00158       file.KeepData = false;
00159       file.IsExecutable = false;
00160       file.DownloadToCache = false;
00161       jobdesc.DataStaging.File.push_back(file);
00162     }
00163 
00164     if (!jobdesc.Application.Output.empty() && !outputIsAdded) {
00165       FileType file;
00166       file.Name = jobdesc.Application.Output;
00167       file.KeepData = true;
00168       file.IsExecutable = false;
00169       file.DownloadToCache = false;
00170       jobdesc.DataStaging.File.push_back(file);
00171     }
00172 
00173     if (!jobdesc.Application.Error.empty() && !errorIsAdded) {
00174       FileType file;
00175       file.Name = jobdesc.Application.Error;
00176       file.KeepData = true;
00177       file.IsExecutable = false;
00178       file.DownloadToCache = false;
00179       jobdesc.DataStaging.File.push_back(file);
00180     }
00181 
00182     if (!jobdesc.Application.LogDir.empty() && !logDirIsAdded) {
00183       FileType file;
00184       file.Name = jobdesc.Application.LogDir;
00185       file.KeepData = true;
00186       file.IsExecutable = false;
00187       file.DownloadToCache = false;
00188       jobdesc.DataStaging.File.push_back(file);
00189     }
00190 
00191     if (!jobdesc.Resources.RunTimeEnvironment.empty() &&
00192         !jobdesc.Resources.RunTimeEnvironment.selectSoftware(et.ApplicationEnvironments)) {
00193       // This error should never happen since RTE is checked in the Broker.
00194       logger.msg(VERBOSE, "Unable to select run time environment");
00195       return false;
00196     }
00197 
00198     if (!jobdesc.Resources.CEType.empty() &&
00199         !jobdesc.Resources.CEType.selectSoftware(et.Implementation)) {
00200       // This error should never happen since Middleware is checked in the Broker.
00201       logger.msg(VERBOSE, "Unable to select middleware");
00202       return false;
00203     }
00204 
00205     if (!jobdesc.Resources.OperatingSystem.empty() &&
00206         !jobdesc.Resources.OperatingSystem.selectSoftware(et.Implementation)) {
00207       // This error should never happen since OS is checked in the Broker.
00208       logger.msg(VERBOSE, "Unable to select operating system.");
00209       return false;
00210     }
00211 
00212     if (jobdesc.Resources.CandidateTarget.empty()) {
00213       ResourceTargetType candidateTarget;
00214       candidateTarget.EndPointURL = URL();
00215       candidateTarget.QueueName = et.ComputingShareName;
00216       jobdesc.Resources.CandidateTarget.push_back(candidateTarget);
00217     }
00218     else if (jobdesc.Resources.CandidateTarget.front().QueueName.empty())
00219       jobdesc.Resources.CandidateTarget.front().QueueName = et.ComputingShareName;
00220 
00221     jobdesc.XRSL_elements["action"] = "request";
00222     jobdesc.XRSL_elements["savestate"] = "yes";
00223     jobdesc.XRSL_elements["clientsoftware"] = "arclibclient-" VERSION;
00224 #ifdef HAVE_GETHOSTNAME
00225     char hostname[1024];
00226     gethostname(hostname, 1024);
00227     jobdesc.XRSL_elements["hostname"] = hostname;
00228 #endif
00229 
00230     jobdesc.AddHint("TARGETDIALECT","GRIDMANAGER");
00231 
00232     return true;
00233   }
00234 
00235 } // namespace Arc