Back to index

nordugrid-arc-nox  1.1.0~rc6
Public Member Functions | Static Public Member Functions | Protected Member Functions | Protected Attributes | Private Member Functions | Static Private Attributes
Arc::SubmitterARC1 Class Reference

#include <SubmitterARC1.h>

Inheritance diagram for Arc::SubmitterARC1:
Inheritance graph
[legend]
Collaboration diagram for Arc::SubmitterARC1:
Collaboration graph
[legend]

List of all members.

Public Member Functions

virtual URL Submit (const JobDescription &jobdesc, const ExecutionTarget &et) const
 This virtual method should be overridden by plugins which should be capable of submitting jobs, defined in the JobDescription jobdesc, to the ExecutionTarget et.
virtual URL Migrate (const URL &jobid, const JobDescription &jobdesc, const ExecutionTarget &et, bool forcemigration) const
 This virtual method should be overridden by plugins which should be capable of migrating jobs.
virtual bool ModifyJobDescription (JobDescription &jobdesc, const ExecutionTarget &et) const
std::string GetCksum (const std::string &file) const

Static Public Member Functions

static PluginInstance (PluginArgument *arg)
static std::string GetCksum (const std::string &file, const UserConfig &usercfg)

Protected Member Functions

bool PutFiles (const JobDescription &jobdesc, const URL &url) const
void AddJob (const JobDescription &job, const URL &jobid, const URL &cluster, const URL &infoendpoint) const
void AddJob (const JobDescription &job, const URL &jobid, const URL &cluster, const URL &infoendpoint, const std::map< std::string, std::string > &additionalInfo) const

Protected Attributes

const std::string flavour
const UserConfigusercfg

Private Member Functions

 SubmitterARC1 (const UserConfig &usercfg)
 ~SubmitterARC1 ()

Static Private Attributes

static Logger logger

Detailed Description

Definition at line 16 of file SubmitterARC1.h.


Constructor & Destructor Documentation

Arc::SubmitterARC1::SubmitterARC1 ( const UserConfig usercfg) [private]

Definition at line 25 of file SubmitterARC1.cpp.

    : Submitter(usercfg, "ARC1") {}

Here is the caller graph for this function:

Definition at line 28 of file SubmitterARC1.cpp.

{}

Member Function Documentation

void Arc::Submitter::AddJob ( const JobDescription job,
const URL jobid,
const URL cluster,
const URL infoendpoint 
) const [inline, protected, inherited]

Definition at line 69 of file Submitter.h.

                                               {
      std::map<std::string, std::string> additionalInfo;
      AddJob(job, jobid, cluster, infoendpoint, additionalInfo);
    }

Here is the caller graph for this function:

void Arc::Submitter::AddJob ( const JobDescription job,
const URL jobid,
const URL cluster,
const URL infoendpoint,
const std::map< std::string, std::string > &  additionalInfo 
) const [protected, inherited]

Definition at line 72 of file Submitter.cpp.

                                                                                   {
    NS ns;
    XMLNode info(ns, "Job");
    info.NewChild("JobID") = jobid.str();
    if (!job.Identification.JobName.empty())
      info.NewChild("Name") = job.Identification.JobName;
    info.NewChild("Flavour") = flavour;
    info.NewChild("Cluster") = cluster.str();
    info.NewChild("InfoEndpoint") = infoendpoint.str();
    info.NewChild("LocalSubmissionTime") = (std::string)Arc::Time();

    for (std::map<std::string, std::string>::const_iterator it = additionalInfo.begin();
         it != additionalInfo.end(); it++)
      info.NewChild(it->first) = it->second;

    for (std::list<std::string>::const_iterator
           it = job.Identification.ActivityOldId.begin();
         it != job.Identification.ActivityOldId.end(); it++)
      info.NewChild("OldJobID") = *it;

    std::string rep = job.UnParse("arcjsdl");
    info.NewChild("JobDescription") = (std::string)rep;

    for (std::list<FileType>::const_iterator it = job.DataStaging.File.begin();
         it != job.DataStaging.File.end(); it++)
      if (!it->Source.empty())
        if (it->Source.begin()->URI.Protocol() == "file") {
          if (!info["LocalInputFiles"])
            info.NewChild("LocalInputFiles");
          XMLNode File = info["LocalInputFiles"].NewChild("File");
          File.NewChild("Source") = it->Name;
          File.NewChild("CheckSum") = GetCksum(it->Source.begin()->URI.Path());
        }

    FileLock lock(usercfg.JobListFile());
    Config jobstorage;
    jobstorage.ReadFromFile(usercfg.JobListFile());
    jobstorage.NewChild(info);
    jobstorage.SaveToFile(usercfg.JobListFile());
  }

Here is the call graph for this function:

std::string Arc::Submitter::GetCksum ( const std::string &  file) const [inline, inherited]

Definition at line 65 of file Submitter.h.

{ return GetCksum(file, usercfg); }

Here is the call graph for this function:

Here is the caller graph for this function:

std::string Arc::Submitter::GetCksum ( const std::string &  file,
const UserConfig usercfg 
) [static, inherited]

Definition at line 116 of file Submitter.cpp.

                                                                                {
    DataHandle source(file, usercfg);
    DataBuffer buffer;

    MD5Sum md5sum;
    buffer.set(&md5sum);

    if (!source->StartReading(buffer))
      return "";

    int handle;
    unsigned int length;
    unsigned long long int offset;

    while (buffer.for_write() || !buffer.eof_read())
      if (buffer.for_write(handle, length, offset, true))
        buffer.is_written(handle);

    if (!source->StopReading())
      return "";

    if (!buffer.checksum_valid())
      return "";

    char buf[100];
    md5sum.print(buf, 100);
    return buf;
  }

Here is the call graph for this function:

Definition at line 30 of file SubmitterARC1.cpp.

                                                     {
    SubmitterPluginArgument *subarg =
      dynamic_cast<SubmitterPluginArgument*>(arg);
    if (!subarg)
      return NULL;
    return new SubmitterARC1(*subarg);
  }

Here is the call graph for this function:

URL Arc::SubmitterARC1::Migrate ( const URL jobid,
const JobDescription jobdesc,
const ExecutionTarget et,
bool  forcemigration 
) const [virtual]

This virtual method should be overridden by plugins which should be capable of migrating jobs.

The active job which should be migrated is pointed to by the URL jobid, and is represented by the JobDescription jobdesc. The forcemigration boolean specifies if the migration should succeed if the active job cannot be terminated. The protected method AddJob can be used to save job information. This method should return the URL of the migrated job. In case migration fails an empty URL should be returned.

Implements Arc::Submitter.

Definition at line 73 of file SubmitterARC1.cpp.

                                                        {
    MCCConfig cfg;
    usercfg.ApplyToConfig(cfg);
    AREXClient ac(et.url, cfg, usercfg.Timeout());

    std::string idstr;
    AREXClient::createActivityIdentifier(jobid, idstr);

    JobDescription job(jobdesc);

    // Modify the location of local files and files residing in a old session directory.
    for (std::list<FileType>::iterator it = job.DataStaging.File.begin();
         it != job.DataStaging.File.end(); it++) {
      // Do not modify Output and Error files.
      if (it->Name == job.Application.Output ||
          it->Name == job.Application.Error ||
          it->Source.empty())
        continue;

      if (!it->Source.front().URI || it->Source.front().URI.Protocol() == "file") {
        it->Source.front().URI = URL(jobid.str() + "/" + it->Name);
        it->DownloadToCache = false;
      }
      else {
        // URL is valid, and not a local file. Check if the source reside at a
        // old job session directory.
        const size_t foundRSlash = it->Source.front().URI.str().rfind('/');
        if (foundRSlash == std::string::npos)
          continue;

        const std::string uriPath = it->Source.front().URI.str().substr(0, foundRSlash);
        // Check if the input file URI is pointing to a old job session directory.
        for (std::list<std::string>::const_iterator itAOID = job.Identification.ActivityOldId.begin();
             itAOID != job.Identification.ActivityOldId.end(); itAOID++)
          if (uriPath == *itAOID) {
            it->Source.front().URI = URL(jobid.str() + "/" + it->Name);
            it->DownloadToCache = false;
            break;
          }
      }
    }

    if (!ModifyJobDescription(job, et)) {
      logger.msg(INFO, "Failed adapting job description to target resources");
      return URL();
    }

    // Add ActivityOldId.
    job.Identification.ActivityOldId.push_back(jobid.str());

    std::string newjobid;
    if (!ac.migrate(idstr, job.UnParse("ARCJSDL"), forcemigration, newjobid,
                    et.url.Protocol() == "https"))
      return URL();

    if (newjobid.empty()) {
      logger.msg(INFO, "No job identifier returned by A-REX");
      return URL();
    }

    XMLNode newjobidx(newjobid);
    URL session_url((std::string)(newjobidx["ReferenceParameters"]["JobSessionDir"]));

    if (!PutFiles(job, session_url)) {
      logger.msg(INFO, "Failed uploading local input files");
      return URL();
    }

    AddJob(job, session_url, et.Cluster, session_url);

    return session_url;
  }

Here is the call graph for this function:

bool Arc::SubmitterARC1::ModifyJobDescription ( JobDescription jobdesc,
const ExecutionTarget et 
) const [virtual]

Implements Arc::Submitter.

Definition at line 148 of file SubmitterARC1.cpp.

                                                                                                   {
    // Check for identical file names.
    bool executableIsAdded(false), inputIsAdded(false), outputIsAdded(false), errorIsAdded(false), logDirIsAdded(false);
    for (std::list<FileType>::const_iterator it1 = jobdesc.DataStaging.File.begin();
         it1 != jobdesc.DataStaging.File.end(); it1++) {
      for (std::list<FileType>::const_iterator it2 = it1;
           it2 != jobdesc.DataStaging.File.end(); it2++) {
        if (it1 == it2) continue;

        if (it1->Name == it2->Name && (!it1->Source.empty() && !it2->Source.empty() ||
                                       !it1->Target.empty() && !it2->Target.empty())) {
          logger.msg(VERBOSE, "Two files have identical file name '%s'.", it1->Name);
          return false;
        }

      }

      executableIsAdded  |= (it1->Name == jobdesc.Application.Executable.Name);
      inputIsAdded       |= (it1->Name == jobdesc.Application.Input);
      outputIsAdded      |= (it1->Name == jobdesc.Application.Output);
      errorIsAdded       |= (it1->Name == jobdesc.Application.Error);
      logDirIsAdded      |= (it1->Name == jobdesc.Application.LogDir);
    }

    if (!executableIsAdded &&
        !Glib::path_is_absolute(jobdesc.Application.Executable.Name)) {
      FileType file;
      file.Name = jobdesc.Application.Executable.Name;
      DataSourceType s;
      s.URI = file.Name;
      file.Source.push_back(s);
      file.KeepData = false;
      file.IsExecutable = true;
      file.DownloadToCache = false;
      jobdesc.DataStaging.File.push_back(file);
    }

    if (!jobdesc.Application.Input.empty() && !inputIsAdded) {
      FileType file;
      file.Name = jobdesc.Application.Input;
      DataSourceType s;
      s.URI = file.Name;
      file.Source.push_back(s);
      file.KeepData = false;
      file.IsExecutable = false;
      file.DownloadToCache = false;
      jobdesc.DataStaging.File.push_back(file);
    }

    if (!jobdesc.Application.Output.empty() && !outputIsAdded) {
      FileType file;
      file.Name = jobdesc.Application.Output;
      file.KeepData = true;
      file.IsExecutable = false;
      file.DownloadToCache = false;
      jobdesc.DataStaging.File.push_back(file);
    }

    if (!jobdesc.Application.Error.empty() && !errorIsAdded) {
      FileType file;
      file.Name = jobdesc.Application.Error;
      file.KeepData = true;
      file.IsExecutable = false;
      file.DownloadToCache = false;
      jobdesc.DataStaging.File.push_back(file);
    }

    if (!jobdesc.Application.LogDir.empty() && !logDirIsAdded) {
      FileType file;
      file.Name = jobdesc.Application.LogDir;
      file.KeepData = true;
      file.IsExecutable = false;
      file.DownloadToCache = false;
      jobdesc.DataStaging.File.push_back(file);
    }

    if (!jobdesc.Resources.RunTimeEnvironment.empty() &&
        !jobdesc.Resources.RunTimeEnvironment.selectSoftware(et.ApplicationEnvironments)) {
      // This error should never happen since RTE is checked in the Broker.
      logger.msg(VERBOSE, "Unable to select run time environment");
      return false;
    }

    if (!jobdesc.Resources.CEType.empty() &&
        !jobdesc.Resources.CEType.selectSoftware(et.Implementation)) {
      // This error should never happen since Middleware is checked in the Broker.
      logger.msg(VERBOSE, "Unable to select middleware");
      return false;
    }

    if (!jobdesc.Resources.OperatingSystem.empty() &&
        !jobdesc.Resources.OperatingSystem.selectSoftware(et.Implementation)) {
      // This error should never happen since OS is checked in the Broker.
      logger.msg(VERBOSE, "Unable to select operating system.");
      return false;
    }

    // Set cluster and queue if not specified by user.
    if (jobdesc.Resources.CandidateTarget.empty()) {
      ResourceTargetType candidateTarget;
      candidateTarget.EndPointURL = URL();
      candidateTarget.QueueName = et.ComputingShareName;
      jobdesc.Resources.CandidateTarget.push_back(candidateTarget);
    }
    else if (jobdesc.Resources.CandidateTarget.front().QueueName.empty())
      jobdesc.Resources.CandidateTarget.front().QueueName = et.ComputingShareName;

    return true;
  }

Here is the call graph for this function:

Here is the caller graph for this function:

bool Arc::Submitter::PutFiles ( const JobDescription jobdesc,
const URL url 
) const [protected, inherited]

Definition at line 35 of file Submitter.cpp.

                                                                          {

    FileCache cache;
    DataMover mover;
    mover.retry(true);
    mover.secure(false);
    mover.passive(true);
    mover.verbose(false);

    for (std::list<FileType>::const_iterator it = job.DataStaging.File.begin();
         it != job.DataStaging.File.end(); it++)
      if (!it->Source.empty()) {
        const URL& src = it->Source.begin()->URI;
        if (src.Protocol() == "file") {
          URL dst(std::string(url.str() + '/' + it->Name));
          DataHandle source(src, usercfg);
          DataHandle destination(dst, usercfg);
          DataStatus res =
            mover.Transfer(*source, *destination, cache, URLMap(), 0, 0, 0,
                           usercfg.Timeout());
          if (!res.Passed()) {
            if (!res.GetDesc().empty())
              logger.msg(ERROR, "Failed uploading file: %s - %s",
                         std::string(res), res.GetDesc());
            else
              logger.msg(ERROR, "Failed uploading file: %s", std::string(res));
            return false;
          }
        }
      }

    return true;
  }

Here is the call graph for this function:

Here is the caller graph for this function:

URL Arc::SubmitterARC1::Submit ( const JobDescription jobdesc,
const ExecutionTarget et 
) const [virtual]

This virtual method should be overridden by plugins which should be capable of submitting jobs, defined in the JobDescription jobdesc, to the ExecutionTarget et.

The protected convenience method AddJob can be used to save job information. This method should return the URL of the submitted job. In case submission fails an empty URL should be returned.

Implements Arc::Submitter.

Definition at line 38 of file SubmitterARC1.cpp.

                                                             {
    MCCConfig cfg;
    usercfg.ApplyToConfig(cfg);
    AREXClient ac(et.url, cfg, usercfg.Timeout());

    JobDescription job(jobdesc);

    if (!ModifyJobDescription(job, et)) {
      logger.msg(INFO, "Failed adapting job description to target resources");
      return URL();
    }

    std::string jobid;
    if (!ac.submit(job.UnParse("ARCJSDL"), jobid, et.url.Protocol() == "https"))
      return URL();

    if (jobid.empty()) {
      logger.msg(INFO, "No job identifier returned by A-REX");
      return URL();
    }

    XMLNode jobidx(jobid);
    URL session_url((std::string)(jobidx["ReferenceParameters"]["JobSessionDir"]));

    if (!PutFiles(job, session_url)) {
      logger.msg(INFO, "Failed uploading local input files");
      return URL();
    }

    AddJob(job, session_url, et.Cluster, session_url);

    return session_url;
  }

Here is the call graph for this function:


Member Data Documentation

const std::string Arc::Submitter::flavour [protected, inherited]

Definition at line 82 of file Submitter.h.

Reimplemented from Arc::Submitter.

Definition at line 20 of file SubmitterARC1.h.

const UserConfig& Arc::Submitter::usercfg [protected, inherited]

Definition at line 83 of file Submitter.h.


The documentation for this class was generated from the following files: