Back to index

nordugrid-arc-nox  1.1.0~rc6
Public Member Functions | Static Public Member Functions | Protected Attributes | Private Member Functions | Static Private Attributes
Arc::JobControllerARC1 Class Reference

#include <JobControllerARC1.h>

Inheritance diagram for Arc::JobControllerARC1:
Inheritance graph
[legend]
Collaboration diagram for Arc::JobControllerARC1:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 ~JobControllerARC1 ()
virtual void GetJobInformation ()
void FillJobStore (const std::list< URL > &jobids)
 Fill jobstore.
void FillJobStore (const Job &job)
bool Get (const std::list< std::string > &status, const std::string &downloaddir, const bool keep)
bool Kill (const std::list< std::string > &status, const bool keep)
bool Clean (const std::list< std::string > &status, const bool force)
bool Cat (const std::list< std::string > &status, const std::string &whichfile)
bool PrintJobStatus (const std::list< std::string > &status, const bool longlist)
 Print job status to stdout.
bool Migrate (TargetGenerator &targetGen, Broker *broker, const UserConfig &usercfg, const bool forcemigration, std::list< URL > &migratedJobIDs)
 Migrate job from cluster A to Cluster B.
bool Renew (const std::list< std::string > &status)
bool Resume (const std::list< std::string > &status)
bool RemoveJobs (const std::list< URL > &jobids)
std::list< std::string > GetDownloadFiles (const URL &dir)
bool ARCCopyFile (const URL &src, const URL &dst)
std::list< JobGetJobDescriptions (const std::list< std::string > &status, const bool getlocal)
void CheckLocalDescription (std::list< Job > &jobs)
const std::list< Job > & GetJobs () const

Static Public Member Functions

static PluginInstance (PluginArgument *arg)

Protected Attributes

const std::string flavour
const UserConfigusercfg
std::list< Jobjobstore
Config jobstorage

Private Member Functions

 JobControllerARC1 (const UserConfig &usercfg)
virtual bool GetJob (const Job &job, const std::string &downloaddir)
virtual bool CleanJob (const Job &job, bool force)
virtual bool CancelJob (const Job &job)
virtual bool RenewJob (const Job &job)
virtual bool ResumeJob (const Job &job)
virtual URL GetFileUrlForJob (const Job &job, const std::string &whichfile)
virtual bool GetJobDescription (const Job &job, std::string &desc_str)

Static Private Attributes

static Logger logger

Detailed Description

Definition at line 13 of file JobControllerARC1.h.


Constructor & Destructor Documentation

Arc::JobControllerARC1::JobControllerARC1 ( const UserConfig usercfg) [private]

Definition at line 25 of file JobControllerARC1.cpp.

    : JobController(usercfg, "ARC1") {}

Here is the caller graph for this function:

Definition at line 28 of file JobControllerARC1.cpp.

{}

Member Function Documentation

bool Arc::JobController::ARCCopyFile ( const URL src,
const URL dst 
) [inherited]

Definition at line 684 of file JobController.cpp.

                                                                {

    DataMover mover;
    mover.retry(true);
    mover.secure(false);
    mover.passive(true);
    mover.verbose(false);

    logger.msg(VERBOSE, "Now copying (from -> to)");
    logger.msg(VERBOSE, " %s -> %s", src.str(), dst.str());

    DataHandle source(src, usercfg);
    if (!source) {
      logger.msg(ERROR, "Unable to initialise connection to source: %s", src.str());
      return false;
    }

    DataHandle destination(dst, usercfg);
    if (!destination) {
      logger.msg(ERROR, "Unable to initialise connection to destination: %s",
                 dst.str());
      return false;
    }

    FileCache cache;
    DataStatus res =
      mover.Transfer(*source, *destination, cache, URLMap(), 0, 0, 0,
                     usercfg.Timeout());
    if (!res.Passed()) {
      if (!res.GetDesc().empty())
        logger.msg(ERROR, "File download failed: %s - %s", std::string(res), res.GetDesc());
      else
        logger.msg(ERROR, "File download failed: %s", std::string(res));
      return false;
    }

    return true;

  }

Here is the call graph for this function:

Here is the caller graph for this function:

bool Arc::JobControllerARC1::CancelJob ( const Job job) [private, virtual]

Implements Arc::JobController.

Definition at line 98 of file JobControllerARC1.cpp.

                                                  {
    MCCConfig cfg;
    usercfg.ApplyToConfig(cfg);
    AREXClient ac(job.Cluster, cfg, usercfg.Timeout());
    std::string idstr;
    AREXClient::createActivityIdentifier(job.JobID, idstr);
    return ac.kill(idstr);
  }

Here is the call graph for this function:

bool Arc::JobController::Cat ( const std::list< std::string > &  status,
const std::string &  whichfile 
) [inherited]

Definition at line 399 of file JobController.cpp.

                                                      {

    if (whichfile != "stdout" && whichfile != "stderr" && whichfile != "gmlog") {
      logger.msg(ERROR, "Unknown output %s", whichfile);
      return false;
    }

    GetJobInformation();

    std::list<Job*> catable;
    for (std::list<Job>::iterator it = jobstore.begin();
         it != jobstore.end(); it++) {

      if (!it->State) {
        logger.msg(WARNING, "Job state information not found: %s",
                   it->JobID.str());
        continue;
      }

      if (!status.empty() &&
          std::find(status.begin(), status.end(), it->State()) == status.end() &&
          std::find(status.begin(), status.end(), it->State.GetGeneralState()) == status.end())
        continue;

      if (it->State == JobState::DELETED) {
        logger.msg(WARNING, "Job deleted: %s", it->JobID.str());
        continue;
      }

      if (!it->State.IsFinished() &&
          it->State != JobState::RUNNING &&
          it->State != JobState::FINISHING) {
        logger.msg(WARNING, "Job has not started yet: %s", it->JobID.str());
        continue;
      }

      if (whichfile == "stdout" && it->StdOut.empty() ||
          whichfile == "stderr" && it->StdErr.empty() ||
          whichfile == "gmlog" && it->LogDir.empty()) {
        logger.msg(ERROR, "Can not determine the %s location: %s",
                   whichfile, it->JobID.str());
        continue;
      }

      catable.push_back(&(*it));
    }

    bool ok = true;
    for (std::list<Job*>::iterator it = catable.begin();
         it != catable.end(); it++) {
      std::string filename = Glib::build_filename(Glib::get_tmp_dir(), "arccat.XXXXXX");
      int tmp_h = Glib::mkstemp(filename);
      if (tmp_h == -1) {
        logger.msg(INFO, "Could not create temporary file \"%s\"", filename);
        logger.msg(ERROR, "Cannot output %s for job (%s)", whichfile, (*it)->JobID.str());
        ok = false;
        continue;
      }
      close(tmp_h);

      logger.msg(VERBOSE, "Catting %s for job %s", whichfile, (*it)->JobID.str());

      URL src = GetFileUrlForJob((**it), whichfile);
      if (!src) {
        logger.msg(ERROR, "Cannot output %s for job (%s): Invalid source %s", whichfile, (*it)->JobID.str(), src.str());
        continue;
      }

      URL dst(filename);
      if (!dst) {
        logger.msg(ERROR, "Cannot output %s for job (%s): Invalid destination %s", whichfile, (*it)->JobID.str(), dst.str());
        continue;
      }

      bool copied = ARCCopyFile(src, dst);

      if (copied) {
        std::cout << IString("%s from job %s", whichfile,
                             (*it)->JobID.str()) << std::endl;
        std::ifstream is(filename.c_str());
        char c;
        while (is.get(c))
          std::cout.put(c);
        is.close();
        unlink(filename.c_str());
      }
      else
        ok = false;
    }

    return ok;
  }

Here is the call graph for this function:

void Arc::JobController::CheckLocalDescription ( std::list< Job > &  jobs) [inherited]

Definition at line 812 of file JobController.cpp.

                                                              {
    for (std::list<Job>::iterator it = jobs.begin();
         it != jobs.end();) {
      // Search for jobids
      XMLNodeList xmljobs =
        jobstorage.XPathLookup("//Job[JobID='" + it->JobID.str() + "']", NS());

      if (xmljobs.empty()) {
        logger.msg(INFO, "Job not found in job list: %s", it->JobID.str());
        it++;
        continue;
      }
      XMLNode& xmljob = *xmljobs.begin();

      if (xmljob["JobDescription"]) {
        JobDescription jobdesc;
        jobdesc.Parse((std::string)xmljob["JobDescription"]);

        // Check for valid job description
        if (jobdesc)
          logger.msg(VERBOSE, "Valid jobdescription found for: %s", it->JobID.str());
        else {
          logger.msg(INFO, "Invalid jobdescription found for: %s", it->JobID.str());
          it++;
          continue;
        }

        // Check checksums of local input files
        bool CKSUM = true;
        int size = xmljob["LocalInputFiles"].Size();
        for (int i = 0; i < size; i++) {
          const std::string file = (std::string)xmljob["LocalInputFiles"]["File"][i]["Source"];
          const std::string cksum_old = (std::string)xmljob["LocalInputFiles"]["File"][i]["CheckSum"];
          const std::string cksum_new = Submitter::GetCksum(file, usercfg);
          if (cksum_old != cksum_new) {
            logger.msg(WARNING, "Checksum of input file %s has changed.", file);
            CKSUM = false;
          }
          else
            logger.msg(VERBOSE, "Stored and new checksum of input file %s are identical.", file);
        }
        // Push_back job and job descriptions
        if (CKSUM) {
          logger.msg(INFO, "Job description for %s retrieved locally", it->JobID.str());
          it->JobDescription = (std::string)xmljob["JobDescription"];
          it++;
        }
        else {
          logger.msg(WARNING, "Job %s can not be resubmitted", it->JobID.str());
          it = jobs.erase(it);
        }
      }
      else {
        logger.msg(INFO, "Job description for %s could not be retrieved locally", it->JobID.str());
        it++;
      }
    } //end loop over jobs
    return;
  }

Here is the call graph for this function:

Here is the caller graph for this function:

bool Arc::JobController::Clean ( const std::list< std::string > &  status,
const bool  force 
) [inherited]

Definition at line 340 of file JobController.cpp.

                                              {
    std::list<URL> toberemoved;

    GetJobInformation();

    std::list<Job*> cleanable;
    for (std::list<Job>::iterator it = jobstore.begin();
         it != jobstore.end(); it++) {

      if (!it->State && force && status.empty()) {
        logger.msg(WARNING, "Job information not found, job %s will only be deleted from local joblist",
                   it->JobID.str());
        toberemoved.push_back(it->JobID);
        continue;
      }

      if (!it->State) {
        logger.msg(WARNING, "Job information not found: %s", it->JobID.str());
        continue;
      }

      // Job state is not among the specified states.
      if (!status.empty() &&
          std::find(status.begin(), status.end(), it->State()) == status.end() &&
          std::find(status.begin(), status.end(), it->State.GetGeneralState()) == status.end())
        continue;

      if (!it->State.IsFinished()) {
        if (force)
          toberemoved.push_back(it->JobID);
        else
          logger.msg(WARNING, "Job has not finished yet: %s", it->JobID.str());
        continue;
      }

      cleanable.push_back(&(*it));
    }

    bool ok = true;
    for (std::list<Job*>::iterator it = cleanable.begin();
         it != cleanable.end(); it++) {
      bool cleaned = CleanJob(**it, force);
      if (!cleaned) {
        if (force)
          toberemoved.push_back((*it)->JobID);
        logger.msg(ERROR, "Failed cleaning job %s", (*it)->JobID.str());
        ok = false;
        continue;
      }
      toberemoved.push_back((*it)->JobID);
    }

    if (toberemoved.size() > 0)
      RemoveJobs(toberemoved);

    return ok;
  }

Here is the call graph for this function:

bool Arc::JobControllerARC1::CleanJob ( const Job job,
bool  force 
) [private, virtual]

Implements Arc::JobController.

Definition at line 89 of file JobControllerARC1.cpp.

                                                             {
    MCCConfig cfg;
    usercfg.ApplyToConfig(cfg);
    AREXClient ac(job.Cluster, cfg, usercfg.Timeout());
    std::string idstr;
    AREXClient::createActivityIdentifier(job.JobID, idstr);
    return ac.clean(idstr);
  }

Here is the call graph for this function:

void Arc::JobController::FillJobStore ( const std::list< URL > &  jobids) [inherited]

Fill jobstore.

Method to fill the jobstore with jobs that should be managed.

Parameters:
jobidsList of jobids to be loaded to the jobstore. If empty all jobs of the specialized grid flavour present in the joblist file (given through the usercfg to the constructor) will be loaded to the jobstore.

Definition at line 43 of file JobController.cpp.

                                                             {

    if (!usercfg.JobListFile().empty()) {
      logger.msg(VERBOSE, "Using job list file %s", usercfg.JobListFile());
      FileLock lock(usercfg.JobListFile());
      jobstorage.ReadFromFile(usercfg.JobListFile());
    }
    else {
      logger.msg(ERROR, "Job controller has no job list configuration");
      return;
    }

    if (!jobids.empty()) {
      logger.msg(VERBOSE, "Filling job store with jobs according to "
                 "specified jobids");

      for (std::list<URL>::const_iterator it = jobids.begin();
           it != jobids.end(); it++) {

        XMLNodeList xmljobs =
          jobstorage.XPathLookup("//Job[JobID='" + it->str() + "']", NS());

        if (xmljobs.empty()) {
          logger.msg(VERBOSE, "Job not found in job list: %s", it->str());
          continue;
        }

        XMLNode& xmljob = *xmljobs.begin();

        if (flavour == (std::string)xmljob["Flavour"]) {
          Job job;
          job.JobID = (std::string)xmljob["JobID"];
          job.Flavour = (std::string)xmljob["Flavour"];
          job.Cluster = (std::string)xmljob["Cluster"];
          job.SubmissionEndpoint = (std::string)xmljob["SubmissionEndpoint"];
          job.InfoEndpoint = (std::string)xmljob["InfoEndpoint"];
          job.ISB = (std::string)xmljob["ISB"];
          job.OSB = (std::string)xmljob["OSB"];
          job.StdOut = (std::string)xmljob["StdOut"];
          job.StdErr = (std::string)xmljob["StdErr"];
          job.AuxInfo = (std::string)xmljob["AuxInfo"];
          if (!((std::string)xmljob["LocalSubmissionTime"]).empty())
            job.LocalSubmissionTime = (std::string)xmljob["LocalSubmissionTime"];
          else
            job.LocalSubmissionTime = Time(-1);
          for (int i = 0; (bool)xmljob["ActivityOldId"][i]; i++)
            job.ActivityOldId.push_back((std::string)xmljob["ActivityOldId"][i]);
          jobstore.push_back(job);
        }
      }
    }

    URLListMap::const_iterator itSelectedClusters = usercfg.GetSelectedServices(COMPUTING).find(flavour);
    if (itSelectedClusters != usercfg.GetSelectedServices(COMPUTING).end() &&
        !itSelectedClusters->second.empty()) {
      const std::list<URL>& selectedClusters = itSelectedClusters->second;
      logger.msg(VERBOSE, "Filling job store with jobs according to list of "
                 "selected clusters");

      XMLNodeList xmljobs =
        jobstorage.XPathLookup("//Job[Flavour='" + flavour + "']", NS());

      for (XMLNodeList::iterator it = xmljobs.begin();
           it != xmljobs.end(); it++) {

        URL cluster = (std::string)(*it)["Cluster"];

        if (std::find(selectedClusters.begin(), selectedClusters.end(),
                      cluster) != selectedClusters.end()) {
          Job job;
          job.JobID = (std::string)(*it)["JobID"];
          job.Flavour = (std::string)(*it)["Flavour"];
          job.Cluster = (std::string)(*it)["Cluster"];
          job.SubmissionEndpoint = (std::string)(*it)["SubmissionEndpoint"];
          job.InfoEndpoint = (std::string)(*it)["InfoEndpoint"];
          job.ISB = (std::string)(*it)["ISB"];
          job.OSB = (std::string)(*it)["OSB"];
          job.StdOut = (std::string)(*it)["StdOut"];
          job.StdErr = (std::string)(*it)["StdErr"];
          job.AuxInfo = (std::string)(*it)["AuxInfo"];
          if (!((std::string)(*it)["LocalSubmissionTime"]).empty())
            job.LocalSubmissionTime = (std::string)(*it)["LocalSubmissionTime"];
          else
            job.LocalSubmissionTime = Time(-1);
          for (int i = 0; (bool)(*it)["ActivityOldId"][i]; i++)
            job.ActivityOldId.push_back((std::string)(*it)["ActivityOldId"][i]);
          jobstore.push_back(job);
        }
      }
    }

    URLListMap::const_iterator itRejectedClusters = usercfg.GetRejectedServices(COMPUTING).find(flavour);
    if (itRejectedClusters != usercfg.GetRejectedServices(COMPUTING).end() &&
        !itRejectedClusters->second.empty())
      if (!jobstore.empty()) {
        const std::list<URL>& rejectedClusters = itRejectedClusters->second;

        logger.msg(VERBOSE, "Removing jobs from job store according to list of "
                   "rejected clusters");

        std::list<Job>::iterator it = jobstore.begin();
        while (it != jobstore.end())
          if (std::find(rejectedClusters.begin(), rejectedClusters.end(),
                        it->Cluster) != rejectedClusters.end()) {
            logger.msg(VERBOSE, "Removing job %s from job store since it runs "
                       "on a rejected cluster", it->JobID.str());
            it = jobstore.erase(it);
          }
          else
            it++;
      }

    if (jobids.empty() && usercfg.GetSelectedServices(COMPUTING).empty()) {
      logger.msg(VERBOSE, "Filling job store with all jobs, except those "
                 "running on rejected clusters");

      const std::list<URL>* rejectedClusters = (itRejectedClusters == usercfg.GetRejectedServices(COMPUTING).end() ? NULL : &itRejectedClusters->second);

      XMLNodeList xmljobs =
        jobstorage.XPathLookup("//Job[Flavour='" + flavour + "']", NS());

      for (XMLNodeList::iterator it = xmljobs.begin();
           it != xmljobs.end(); it++) {

        URL cluster = (std::string)(*it)["Cluster"];

        if (!rejectedClusters ||
            std::find(rejectedClusters->begin(), rejectedClusters->end(), cluster) == rejectedClusters->end()) {
          Job job;
          job.JobID = (std::string)(*it)["JobID"];
          job.Flavour = (std::string)(*it)["Flavour"];
          job.Cluster = (std::string)(*it)["Cluster"];
          job.SubmissionEndpoint = (std::string)(*it)["SubmissionEndpoint"];
          job.InfoEndpoint = (std::string)(*it)["InfoEndpoint"];
          job.ISB = (std::string)(*it)["ISB"];
          job.OSB = (std::string)(*it)["OSB"];
          job.StdOut = (std::string)(*it)["StdOut"];
          job.StdErr = (std::string)(*it)["StdErr"];
          job.AuxInfo = (std::string)(*it)["AuxInfo"];
          if (!((std::string)(*it)["LocalSubmissionTime"]).empty())
            job.LocalSubmissionTime = (std::string)(*it)["LocalSubmissionTime"];
          else
            job.LocalSubmissionTime = Time(-1);
          for (int i = 0; (bool)(*it)["ActivityOldId"][i]; i++)
            job.ActivityOldId.push_back((std::string)(*it)["ActivityOldId"][i]);
          jobstore.push_back(job);
        }
      }
    }

    logger.msg(VERBOSE, "FillJobStore has finished successfully");
    logger.msg(VERBOSE, "Job store for %s contains %ld jobs",
               flavour, jobstore.size());
  }

Here is the call graph for this function:

Here is the caller graph for this function:

void Arc::JobController::FillJobStore ( const Job job) [inherited]

Definition at line 198 of file JobController.cpp.

                                                 {

    if (job.Flavour != flavour) {
      logger.msg(WARNING, "The middleware flavour of the job (%s) does not match that of the job controller (%s)", job.Flavour, flavour);
      return;
    }

    if (!job.JobID) {
      logger.msg(WARNING, "The job ID (%s) is not a valid URL", job.JobID.str());
      return;
    }

    if (!job.Cluster) {
      logger.msg(WARNING, "The cluster URL is not a valid URL", job.Cluster.str());
      return;
    }

    jobstore.push_back(job);
  }

Here is the call graph for this function:

bool Arc::JobController::Get ( const std::list< std::string > &  status,
const std::string &  downloaddir,
const bool  keep 
) [inherited]

Definition at line 218 of file JobController.cpp.

                                           {
    std::list<URL> toberemoved;

    GetJobInformation();

    std::list<Job*> downloadable;
    for (std::list<Job>::iterator it = jobstore.begin();
         it != jobstore.end(); it++) {

      if (!it->State) {
        logger.msg(WARNING, "Job information not found: %s", it->JobID.str());
        continue;
      }

      if (!status.empty() &&
          std::find(status.begin(), status.end(), it->State()) == status.end() &&
          std::find(status.begin(), status.end(), it->State.GetGeneralState()) == status.end())
        continue;

      if (it->State == JobState::DELETED) {
        logger.msg(WARNING, "Job has already been deleted: %s",
                   it->JobID.str());
        continue;
      }
      else if (!it->State.IsFinished()) {
        logger.msg(WARNING, "Job has not finished yet: %s", it->JobID.str());
        continue;
      }

      downloadable.push_back(&(*it));
    }

    bool ok = true;
    for (std::list<Job*>::iterator it = downloadable.begin();
         it != downloadable.end(); it++) {

      bool downloaded = GetJob(**it, downloaddir);
      if (!downloaded) {
        logger.msg(ERROR, "Failed downloading job %s", (*it)->JobID.str());
        ok = false;
        continue;
      }

      if (!keep) {
        bool cleaned = CleanJob(**it, true);
        if (!cleaned) {
          logger.msg(ERROR, "Failed cleaning job %s", (*it)->JobID.str());
          ok = false;
          continue;
        }
        toberemoved.push_back((*it)->JobID);
      }
    }

    if (toberemoved.size() > 0)
      RemoveJobs(toberemoved);

    return ok;
  }

Here is the call graph for this function:

std::list< std::string > Arc::JobController::GetDownloadFiles ( const URL dir) [inherited]

Definition at line 644 of file JobController.cpp.

                                                                   {

    std::list<std::string> files;
    std::list<FileInfo> outputfiles;

    DataHandle handle(dir, usercfg);
    if (!handle) {
      logger.msg(INFO, "Unable to list files at %s", dir.str());
      return files;
    }
    handle->ListFiles(outputfiles, true, false, false);

    for (std::list<FileInfo>::iterator i = outputfiles.begin();
         i != outputfiles.end(); i++) {

      if (i->GetName() == ".." || i->GetName() == ".")
        continue;

      if (i->GetType() == FileInfo::file_type_unknown ||
          i->GetType() == FileInfo::file_type_file)
        files.push_back(i->GetName());
      else if (i->GetType() == FileInfo::file_type_dir) {

        std::string path = dir.Path();
        if (path[path.size() - 1] != '/')
          path += "/";
        URL tmpdir(dir);
        tmpdir.ChangePath(path + i->GetName());
        std::list<std::string> morefiles = GetDownloadFiles(tmpdir);
        std::string dirname = i->GetName();
        if (dirname[dirname.size() - 1] != '/')
          dirname += "/";
        for (std::list<std::string>::iterator it = morefiles.begin();
             it != morefiles.end(); it++)
          files.push_back(dirname + *it);
      }
    }
    return files;
  }

Here is the call graph for this function:

Here is the caller graph for this function:

URL Arc::JobControllerARC1::GetFileUrlForJob ( const Job job,
const std::string &  whichfile 
) [private, virtual]

Implements Arc::JobController.

Definition at line 132 of file JobControllerARC1.cpp.

                                                                      {
    URL url(job.JobID);

    if (whichfile == "stdout")
      url.ChangePath(url.Path() + '/' + job.StdOut);
    else if (whichfile == "stderr")
      url.ChangePath(url.Path() + '/' + job.StdErr);
    else if (whichfile == "gmlog")
      url.ChangePath(url.Path() + "/" + job.LogDir + "/errors");

    return url;
  }

Here is the call graph for this function:

bool Arc::JobControllerARC1::GetJob ( const Job job,
const std::string &  downloaddir 
) [private, virtual]

Implements Arc::JobController.

Definition at line 52 of file JobControllerARC1.cpp.

                                                               {

    logger.msg(VERBOSE, "Downloading job: %s", job.JobID.str());

    std::string path = job.JobID.Path();
    std::string::size_type pos = path.rfind('/');
    std::string jobidnum = path.substr(pos + 1);

    std::list<std::string> files = GetDownloadFiles(job.JobID);

    URL src(job.JobID);
    URL dst(downloaddir.empty() ? jobidnum : downloaddir + G_DIR_SEPARATOR_S + jobidnum);

    std::string srcpath = src.Path();
    std::string dstpath = dst.Path();

    if (srcpath.empty() || (srcpath[srcpath.size() - 1] != '/'))
      srcpath += '/';
    if (dstpath.empty() || (dstpath[dstpath.size() - 1] != G_DIR_SEPARATOR))
      dstpath += G_DIR_SEPARATOR_S;

    bool ok = true;

    for (std::list<std::string>::iterator it = files.begin();
         it != files.end(); it++) {
      src.ChangePath(srcpath + *it);
      dst.ChangePath(dstpath + *it);
      if (!ARCCopyFile(src, dst)) {
        logger.msg(INFO, "Failed dowloading %s to %s", src.str(), dst.str());
        ok = false;
      }
    }

    return ok;
  }

Here is the call graph for this function:

bool Arc::JobControllerARC1::GetJobDescription ( const Job job,
std::string &  desc_str 
) [private, virtual]

Implements Arc::JobController.

Definition at line 146 of file JobControllerARC1.cpp.

                                                                               {
    MCCConfig cfg;
    usercfg.ApplyToConfig(cfg);
    AREXClient ac(job.Cluster, cfg, usercfg.Timeout());
    std::string idstr;
    AREXClient::createActivityIdentifier(job.JobID, idstr);
    if (ac.getdesc(idstr, desc_str)) {
      JobDescription desc;
      if (desc.Parse(desc_str))
        return true;
    }

    logger.msg(ERROR, "Failed retrieving job description for job: %s", job.JobID.str());
    return false;
  }

Here is the call graph for this function:

std::list< Job > Arc::JobController::GetJobDescriptions ( const std::list< std::string > &  status,
const bool  getlocal 
) [inherited]

Definition at line 764 of file JobController.cpp.

                                                                        {

    GetJobInformation();

    // Only selected jobs with specified status
    std::list<Job> gettable;
    for (std::list<Job>::iterator it = jobstore.begin();
         it != jobstore.end(); it++) {
      if (!status.empty() && !it->State) {
        logger.msg(WARNING, "Job information not found: %s", it->JobID.str());
        continue;
      }

      if (!status.empty() && std::find(status.begin(), status.end(),
                                       it->State()) == status.end())
        continue;
      gettable.push_back(*it);
    }

    //First try to get descriptions from local job file
    if (getlocal) {
      logger.msg(VERBOSE, "Getting job decriptions from local job file");
      CheckLocalDescription(gettable);
    }
    else
      logger.msg(VERBOSE, "Disregarding job decriptions from local job file");

    // Try to get description from cluster
    for (std::list<Job>::iterator it = gettable.begin();
         it != gettable.end();) {
      if (!it->JobDescription.empty()) {
        it++;
        continue;
      }
      if (GetJobDescription(*it, it->JobDescription)) {
        logger.msg(VERBOSE, "Got job description for %s", it->JobID.str());
        it++;
      }
      else {
        logger.msg(INFO, "Failed getting job description for %s", it->JobID.str());
        it = gettable.erase(it);
      }
    }
    return gettable;

  }

Here is the call graph for this function:

Implements Arc::JobController.

Definition at line 38 of file JobControllerARC1.cpp.

                                            {
    MCCConfig cfg;
    usercfg.ApplyToConfig(cfg);

    for (std::list<Job>::iterator iter = jobstore.begin();
         iter != jobstore.end(); iter++) {
      AREXClient ac(iter->Cluster, cfg, usercfg.Timeout());
      std::string idstr;
      AREXClient::createActivityIdentifier(iter->JobID, idstr);
      if (!ac.stat(idstr, *iter))
        logger.msg(INFO, "Failed retrieving information for job: %s", iter->JobID.str());
    }
  }

Here is the call graph for this function:

const std::list<Job>& Arc::JobController::GetJobs ( ) const [inline, inherited]

Definition at line 131 of file JobController.h.

                                        {
      return jobstore;
    }

Here is the caller graph for this function:

Definition at line 30 of file JobControllerARC1.cpp.

                                                         {
    JobControllerPluginArgument *jcarg =
      dynamic_cast<JobControllerPluginArgument*>(arg);
    if (!jcarg)
      return NULL;
    return new JobControllerARC1(*jcarg);
  }

Here is the call graph for this function:

bool Arc::JobController::Kill ( const std::list< std::string > &  status,
const bool  keep 
) [inherited]

Definition at line 280 of file JobController.cpp.

                                            {
    std::list<URL> toberemoved;

    GetJobInformation();

    std::list<Job*> killable;
    for (std::list<Job>::iterator it = jobstore.begin();
         it != jobstore.end(); it++) {

      if (!it->State) {
        logger.msg(WARNING, "Job information not found: %s", it->JobID.str());
        continue;
      }

      if (!status.empty() &&
          std::find(status.begin(), status.end(), it->State()) == status.end() &&
          std::find(status.begin(), status.end(), it->State.GetGeneralState()) == status.end())
        continue;

      if (it->State == JobState::DELETED) {
        logger.msg(WARNING, "Job has already been deleted: %s", it->JobID.str());
        continue;
      }
      else if (it->State.IsFinished()) {
        logger.msg(WARNING, "Job has already finished: %s", it->JobID.str());
        continue;
      }

      killable.push_back(&(*it));
    }

    bool ok = true;
    for (std::list<Job*>::iterator it = killable.begin();
         it != killable.end(); it++) {

      bool cancelled = CancelJob(**it);
      if (!cancelled) {
        logger.msg(ERROR, "Failed cancelling job %s", (*it)->JobID.str());
        ok = false;
        continue;
      }

      if (!keep) {
        bool cleaned = CleanJob(**it, true);
        if (!cleaned) {
          logger.msg(ERROR, "Failed cleaning job %s", (*it)->JobID.str());
          ok = false;
          continue;
        }
        toberemoved.push_back((*it)->JobID.str());
      }
    }

    if (toberemoved.size() > 0)
      RemoveJobs(toberemoved);

    return ok;
  }

Here is the call graph for this function:

Here is the caller graph for this function:

bool Arc::JobController::Migrate ( TargetGenerator targetGen,
Broker broker,
const UserConfig usercfg,
const bool  forcemigration,
std::list< URL > &  migratedJobIDs 
) [inherited]

Migrate job from cluster A to Cluster B.

Method to migrate the jobs contained in the jobstore.

Parameters:
targetGenTargetGenerator with targets to migrate the job to.
brokerBroker to be used when selecting target.
forcemigrationboolean which specifies whether a migrated job should persist if the new cluster does not succeed sending a kill/terminate request for the job.

Definition at line 519 of file JobController.cpp.

                                                            {
    bool retVal = true;
    std::list<URL> toberemoved;

    GetJobInformation();
    for (std::list<Job>::iterator itJob = jobstore.begin(); itJob != jobstore.end(); itJob++) {
      if (itJob->State != JobState::QUEUING) {
        logger.msg(WARNING, "Cannot migrate job %s, it is not queuing.", itJob->JobID.str());
        continue;
      }

      JobDescription jobDesc;
      if (!GetJobDescription(*itJob, itJob->JobDescription))
        continue;

      jobDesc.Parse(itJob->JobDescription);

      broker->PreFilterTargets(targetGen.ModifyFoundTargets(), jobDesc);
      while (true) {
        const ExecutionTarget *currentTarget = broker->GetBestTarget();
        if (!currentTarget) {
          logger.msg(ERROR, "Job migration failed, for job %s, no more possible targets", itJob->JobID.str());
          retVal = false;
          break;
        }

        URL jobid = currentTarget->GetSubmitter(usercfg)->Migrate(itJob->JobID, jobDesc, *currentTarget, forcemigration);
        if (!jobid)
          continue;

        broker->RegisterJobsubmission();
        migratedJobIDs.push_back(jobid);
        toberemoved.push_back(URL(itJob->JobID.str()));
        break;
      }
    }

    if (toberemoved.size() > 0)
      RemoveJobs(toberemoved);

    return retVal;
  }

Here is the call graph for this function:

bool Arc::JobController::PrintJobStatus ( const std::list< std::string > &  status,
const bool  longlist 
) [inherited]

Print job status to stdout.

The job status is printed to stdout when calling this method. More specifically the Job::Print method is called on each of the Job objects stored in this object, and the boolean argument longlist is passed directly to the method indicating whether verbose job status should be printed. The status argument is a list of strings each representing a job state (JobState) which is used to indicate that only jobs with a job state in the list should be considered. If the list status is empty all jobs will be considered.

This method is not supposed to be overloaded by extending classes.

Parameters:
statusa list of strings representing states to be considered.
longlista boolean indicating whether verbose job information should be printed.
Returns:
This method always returns true.
See also:
GetJobInformation
Job::Print
JobState

Definition at line 493 of file JobController.cpp.

                                                          {

    GetJobInformation();

    for (std::list<Job>::iterator it = jobstore.begin();
         it != jobstore.end(); it++) {
      if (!it->State) {
        logger.msg(WARNING, "Job information not found: %s", it->JobID.str());
        if (Time() - it->LocalSubmissionTime < 90)
          logger.msg(WARNING, "This job was very recently "
                     "submitted and might not yet "
                     "have reached the information-system");
        continue;
      }

      if (!status.empty() &&
          std::find(status.begin(), status.end(), it->State()) == status.end() &&
          std::find(status.begin(), status.end(), it->State.GetGeneralState()) == status.end())
        continue;

      it->Print(longlist);
    }
    return true;
  }

Here is the call graph for this function:

bool Arc::JobController::RemoveJobs ( const std::list< URL > &  jobids) [inherited]

Definition at line 724 of file JobController.cpp.

                                                           {

    logger.msg(VERBOSE, "Removing jobs from job list and job store");

    FileLock lock(usercfg.JobListFile());
    jobstorage.ReadFromFile(usercfg.JobListFile());

    for (std::list<URL>::const_iterator it = jobids.begin();
         it != jobids.end(); it++) {

      XMLNodeList xmljobs = jobstorage.XPathLookup("//Job[JobID='" + it->str() + "']", NS());

      if (xmljobs.empty())
        logger.msg(ERROR, "Job %s not found in job list.", it->str());
      else {
        XMLNode& xmljob = *xmljobs.begin();
        if (xmljob) {
          logger.msg(INFO, "Removing job %s from job list file", it->str());
          xmljob.Destroy();
        }
      }

      std::list<Job>::iterator it2 = jobstore.begin();
      while (it2 != jobstore.end()) {
        if (it2->JobID == *it) {
          it2 = jobstore.erase(it2);
          break;
        }
        it2++;
      }
    }

    jobstorage.SaveToFile(usercfg.JobListFile());

    logger.msg(VERBOSE, "Job store for %s now contains %d jobs", flavour, jobstore.size());
    logger.msg(VERBOSE, "Finished removing jobs from job list and job store");

    return true;
  }

Here is the call graph for this function:

Here is the caller graph for this function:

bool Arc::JobController::Renew ( const std::list< std::string > &  status) [inherited]

Definition at line 566 of file JobController.cpp.

                                                            {

    GetJobInformation();

    std::list<Job*> renewable;
    for (std::list<Job>::iterator it = jobstore.begin();
         it != jobstore.end(); it++) {

      if (!it->State) {
        logger.msg(WARNING, "Job information not found: %s", it->JobID.str());
        continue;
      }

      if (!status.empty() &&
          std::find(status.begin(), status.end(), it->State()) == status.end() &&
          std::find(status.begin(), status.end(), it->State.GetGeneralState()) == status.end())
        continue;

      if (!it->State.IsFinished()) {
        logger.msg(WARNING, "Job has not finished yet: %s", it->JobID.str());
        continue;
      }

      renewable.push_back(&(*it));
    }

    bool ok = true;
    for (std::list<Job*>::iterator it = renewable.begin();
         it != renewable.end(); it++) {
      bool renewed = RenewJob(**it);
      if (!renewed) {
        logger.msg(ERROR, "Failed renewing job %s", (*it)->JobID.str());
        ok = false;
        continue;
      }
    }
    return ok;
  }

Here is the call graph for this function:

bool Arc::JobControllerARC1::RenewJob ( const Job job) [private, virtual]

Implements Arc::JobController.

Definition at line 107 of file JobControllerARC1.cpp.

                                                 {
    logger.msg(INFO, "Renewal of ARC1 jobs is not supported");
    return false;
  }

Here is the call graph for this function:

bool Arc::JobController::Resume ( const std::list< std::string > &  status) [inherited]

Definition at line 605 of file JobController.cpp.

                                                             {

    GetJobInformation();

    std::list<Job*> resumable;
    for (std::list<Job>::iterator it = jobstore.begin();
         it != jobstore.end(); it++) {

      if (!it->State) {
        logger.msg(WARNING, "Job information not found: %s", it->JobID.str());
        continue;
      }

      if (!status.empty() &&
          std::find(status.begin(), status.end(), it->State()) == status.end() &&
          std::find(status.begin(), status.end(), it->State.GetGeneralState()) == status.end())
        continue;

      if (!it->State.IsFinished()) {
        logger.msg(WARNING, "Job has not finished yet: %s", it->JobID.str());
        continue;
      }

      resumable.push_back(&(*it));
    }

    bool ok = true;
    for (std::list<Job*>::iterator it = resumable.begin();
         it != resumable.end(); it++) {
      bool resumed = ResumeJob(**it);
      if (!resumed) {
        logger.msg(ERROR, "Failed resuming job %s", (*it)->JobID.str());
        ok = false;
        continue;
      }
    }
    return ok;
  }

Here is the call graph for this function:

bool Arc::JobControllerARC1::ResumeJob ( const Job job) [private, virtual]

Implements Arc::JobController.

Definition at line 112 of file JobControllerARC1.cpp.

                                                  {

    if (job.RestartState.empty()) {
      logger.msg(INFO, "Job %s does not report a resumable state", job.JobID.str());
      return false;
    }

    logger.msg(VERBOSE, "Resuming job: %s at state: %s", job.JobID.str(), job.RestartState);

    MCCConfig cfg;
    usercfg.ApplyToConfig(cfg);
    AREXClient ac(job.Cluster, cfg, usercfg.Timeout());
    std::string idstr;
    AREXClient::createActivityIdentifier(job.JobID, idstr);
    bool ok = ac.resume(idstr);
    if (ok)
      logger.msg(VERBOSE, "Job resuming successful");
    return ok;
  }

Here is the call graph for this function:


Member Data Documentation

const std::string Arc::JobController::flavour [protected, inherited]

Definition at line 147 of file JobController.h.

Config Arc::JobController::jobstorage [protected, inherited]

Definition at line 150 of file JobController.h.

std::list<Job> Arc::JobController::jobstore [protected, inherited]

Definition at line 149 of file JobController.h.

Reimplemented from Arc::JobController.

Definition at line 33 of file JobControllerARC1.h.

const UserConfig& Arc::JobController::usercfg [protected, inherited]

Definition at line 148 of file JobController.h.


The documentation for this class was generated from the following files: