Back to index

nordugrid-arc-nox  1.1.0~rc6
JobController.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #include <algorithm>
00008 #include <fstream>
00009 #include <iostream>
00010 
00011 #include <unistd.h>
00012 #include <glibmm/fileutils.h>
00013 #include <glibmm.h>
00014 
00015 #include <arc/ArcConfig.h>
00016 #include <arc/FileLock.h>
00017 #include <arc/IString.h>
00018 #include <arc/StringConv.h>
00019 #include <arc/XMLNode.h>
00020 #include <arc/client/Broker.h>
00021 #include <arc/client/ExecutionTarget.h>
00022 #include <arc/client/Submitter.h>
00023 #include <arc/client/TargetGenerator.h>
00024 #include <arc/UserConfig.h>
00025 #include <arc/data/DataMover.h>
00026 #include <arc/data/DataHandle.h>
00027 #include <arc/data/FileCache.h>
00028 #include <arc/data/URLMap.h>
00029 #include <arc/loader/FinderLoader.h>
00030 #include "JobController.h"
00031 
00032 namespace Arc {
00033 
00034   Logger JobController::logger(Logger::getRootLogger(), "JobController");
00035 
00036   JobController::JobController(const UserConfig& usercfg,
00037                                const std::string& flavour)
00038     : flavour(flavour),
00039       usercfg(usercfg) {}
00040 
00041   JobController::~JobController() {}
00042 
00043   void JobController::FillJobStore(const std::list<URL>& jobids) {
00044 
00045     if (!usercfg.JobListFile().empty()) {
00046       logger.msg(VERBOSE, "Using job list file %s", usercfg.JobListFile());
00047       FileLock lock(usercfg.JobListFile());
00048       jobstorage.ReadFromFile(usercfg.JobListFile());
00049     }
00050     else {
00051       logger.msg(ERROR, "Job controller has no job list configuration");
00052       return;
00053     }
00054 
00055     if (!jobids.empty()) {
00056       logger.msg(VERBOSE, "Filling job store with jobs according to "
00057                  "specified jobids");
00058 
00059       for (std::list<URL>::const_iterator it = jobids.begin();
00060            it != jobids.end(); it++) {
00061 
00062         XMLNodeList xmljobs =
00063           jobstorage.XPathLookup("//Job[JobID='" + it->str() + "']", NS());
00064 
00065         if (xmljobs.empty()) {
00066           logger.msg(VERBOSE, "Job not found in job list: %s", it->str());
00067           continue;
00068         }
00069 
00070         XMLNode& xmljob = *xmljobs.begin();
00071 
00072         if (flavour == (std::string)xmljob["Flavour"]) {
00073           Job job;
00074           job.JobID = (std::string)xmljob["JobID"];
00075           job.Flavour = (std::string)xmljob["Flavour"];
00076           job.Cluster = (std::string)xmljob["Cluster"];
00077           job.SubmissionEndpoint = (std::string)xmljob["SubmissionEndpoint"];
00078           job.InfoEndpoint = (std::string)xmljob["InfoEndpoint"];
00079           job.ISB = (std::string)xmljob["ISB"];
00080           job.OSB = (std::string)xmljob["OSB"];
00081           job.StdOut = (std::string)xmljob["StdOut"];
00082           job.StdErr = (std::string)xmljob["StdErr"];
00083           job.AuxInfo = (std::string)xmljob["AuxInfo"];
00084           if (!((std::string)xmljob["LocalSubmissionTime"]).empty())
00085             job.LocalSubmissionTime = (std::string)xmljob["LocalSubmissionTime"];
00086           else
00087             job.LocalSubmissionTime = Time(-1);
00088           for (int i = 0; (bool)xmljob["ActivityOldId"][i]; i++)
00089             job.ActivityOldId.push_back((std::string)xmljob["ActivityOldId"][i]);
00090           jobstore.push_back(job);
00091         }
00092       }
00093     }
00094 
00095     URLListMap::const_iterator itSelectedClusters = usercfg.GetSelectedServices(COMPUTING).find(flavour);
00096     if (itSelectedClusters != usercfg.GetSelectedServices(COMPUTING).end() &&
00097         !itSelectedClusters->second.empty()) {
00098       const std::list<URL>& selectedClusters = itSelectedClusters->second;
00099       logger.msg(VERBOSE, "Filling job store with jobs according to list of "
00100                  "selected clusters");
00101 
00102       XMLNodeList xmljobs =
00103         jobstorage.XPathLookup("//Job[Flavour='" + flavour + "']", NS());
00104 
00105       for (XMLNodeList::iterator it = xmljobs.begin();
00106            it != xmljobs.end(); it++) {
00107 
00108         URL cluster = (std::string)(*it)["Cluster"];
00109 
00110         if (std::find(selectedClusters.begin(), selectedClusters.end(),
00111                       cluster) != selectedClusters.end()) {
00112           Job job;
00113           job.JobID = (std::string)(*it)["JobID"];
00114           job.Flavour = (std::string)(*it)["Flavour"];
00115           job.Cluster = (std::string)(*it)["Cluster"];
00116           job.SubmissionEndpoint = (std::string)(*it)["SubmissionEndpoint"];
00117           job.InfoEndpoint = (std::string)(*it)["InfoEndpoint"];
00118           job.ISB = (std::string)(*it)["ISB"];
00119           job.OSB = (std::string)(*it)["OSB"];
00120           job.StdOut = (std::string)(*it)["StdOut"];
00121           job.StdErr = (std::string)(*it)["StdErr"];
00122           job.AuxInfo = (std::string)(*it)["AuxInfo"];
00123           if (!((std::string)(*it)["LocalSubmissionTime"]).empty())
00124             job.LocalSubmissionTime = (std::string)(*it)["LocalSubmissionTime"];
00125           else
00126             job.LocalSubmissionTime = Time(-1);
00127           for (int i = 0; (bool)(*it)["ActivityOldId"][i]; i++)
00128             job.ActivityOldId.push_back((std::string)(*it)["ActivityOldId"][i]);
00129           jobstore.push_back(job);
00130         }
00131       }
00132     }
00133 
00134     URLListMap::const_iterator itRejectedClusters = usercfg.GetRejectedServices(COMPUTING).find(flavour);
00135     if (itRejectedClusters != usercfg.GetRejectedServices(COMPUTING).end() &&
00136         !itRejectedClusters->second.empty())
00137       if (!jobstore.empty()) {
00138         const std::list<URL>& rejectedClusters = itRejectedClusters->second;
00139 
00140         logger.msg(VERBOSE, "Removing jobs from job store according to list of "
00141                    "rejected clusters");
00142 
00143         std::list<Job>::iterator it = jobstore.begin();
00144         while (it != jobstore.end())
00145           if (std::find(rejectedClusters.begin(), rejectedClusters.end(),
00146                         it->Cluster) != rejectedClusters.end()) {
00147             logger.msg(VERBOSE, "Removing job %s from job store since it runs "
00148                        "on a rejected cluster", it->JobID.str());
00149             it = jobstore.erase(it);
00150           }
00151           else
00152             it++;
00153       }
00154 
00155     if (jobids.empty() && usercfg.GetSelectedServices(COMPUTING).empty()) {
00156       logger.msg(VERBOSE, "Filling job store with all jobs, except those "
00157                  "running on rejected clusters");
00158 
00159       const std::list<URL>* rejectedClusters = (itRejectedClusters == usercfg.GetRejectedServices(COMPUTING).end() ? NULL : &itRejectedClusters->second);
00160 
00161       XMLNodeList xmljobs =
00162         jobstorage.XPathLookup("//Job[Flavour='" + flavour + "']", NS());
00163 
00164       for (XMLNodeList::iterator it = xmljobs.begin();
00165            it != xmljobs.end(); it++) {
00166 
00167         URL cluster = (std::string)(*it)["Cluster"];
00168 
00169         if (!rejectedClusters ||
00170             std::find(rejectedClusters->begin(), rejectedClusters->end(), cluster) == rejectedClusters->end()) {
00171           Job job;
00172           job.JobID = (std::string)(*it)["JobID"];
00173           job.Flavour = (std::string)(*it)["Flavour"];
00174           job.Cluster = (std::string)(*it)["Cluster"];
00175           job.SubmissionEndpoint = (std::string)(*it)["SubmissionEndpoint"];
00176           job.InfoEndpoint = (std::string)(*it)["InfoEndpoint"];
00177           job.ISB = (std::string)(*it)["ISB"];
00178           job.OSB = (std::string)(*it)["OSB"];
00179           job.StdOut = (std::string)(*it)["StdOut"];
00180           job.StdErr = (std::string)(*it)["StdErr"];
00181           job.AuxInfo = (std::string)(*it)["AuxInfo"];
00182           if (!((std::string)(*it)["LocalSubmissionTime"]).empty())
00183             job.LocalSubmissionTime = (std::string)(*it)["LocalSubmissionTime"];
00184           else
00185             job.LocalSubmissionTime = Time(-1);
00186           for (int i = 0; (bool)(*it)["ActivityOldId"][i]; i++)
00187             job.ActivityOldId.push_back((std::string)(*it)["ActivityOldId"][i]);
00188           jobstore.push_back(job);
00189         }
00190       }
00191     }
00192 
00193     logger.msg(VERBOSE, "FillJobStore has finished successfully");
00194     logger.msg(VERBOSE, "Job store for %s contains %ld jobs",
00195                flavour, jobstore.size());
00196   }
00197 
00198   void JobController::FillJobStore(const Job& job) {
00199 
00200     if (job.Flavour != flavour) {
00201       logger.msg(WARNING, "The middleware flavour of the job (%s) does not match that of the job controller (%s)", job.Flavour, flavour);
00202       return;
00203     }
00204 
00205     if (!job.JobID) {
00206       logger.msg(WARNING, "The job ID (%s) is not a valid URL", job.JobID.str());
00207       return;
00208     }
00209 
00210     if (!job.Cluster) {
00211       logger.msg(WARNING, "The cluster URL is not a valid URL", job.Cluster.str());
00212       return;
00213     }
00214 
00215     jobstore.push_back(job);
00216   }
00217 
00218   bool JobController::Get(const std::list<std::string>& status,
00219                           const std::string& downloaddir,
00220                           const bool keep) {
00221     std::list<URL> toberemoved;
00222 
00223     GetJobInformation();
00224 
00225     std::list<Job*> downloadable;
00226     for (std::list<Job>::iterator it = jobstore.begin();
00227          it != jobstore.end(); it++) {
00228 
00229       if (!it->State) {
00230         logger.msg(WARNING, "Job information not found: %s", it->JobID.str());
00231         continue;
00232       }
00233 
00234       if (!status.empty() &&
00235           std::find(status.begin(), status.end(), it->State()) == status.end() &&
00236           std::find(status.begin(), status.end(), it->State.GetGeneralState()) == status.end())
00237         continue;
00238 
00239       if (it->State == JobState::DELETED) {
00240         logger.msg(WARNING, "Job has already been deleted: %s",
00241                    it->JobID.str());
00242         continue;
00243       }
00244       else if (!it->State.IsFinished()) {
00245         logger.msg(WARNING, "Job has not finished yet: %s", it->JobID.str());
00246         continue;
00247       }
00248 
00249       downloadable.push_back(&(*it));
00250     }
00251 
00252     bool ok = true;
00253     for (std::list<Job*>::iterator it = downloadable.begin();
00254          it != downloadable.end(); it++) {
00255 
00256       bool downloaded = GetJob(**it, downloaddir);
00257       if (!downloaded) {
00258         logger.msg(ERROR, "Failed downloading job %s", (*it)->JobID.str());
00259         ok = false;
00260         continue;
00261       }
00262 
00263       if (!keep) {
00264         bool cleaned = CleanJob(**it, true);
00265         if (!cleaned) {
00266           logger.msg(ERROR, "Failed cleaning job %s", (*it)->JobID.str());
00267           ok = false;
00268           continue;
00269         }
00270         toberemoved.push_back((*it)->JobID);
00271       }
00272     }
00273 
00274     if (toberemoved.size() > 0)
00275       RemoveJobs(toberemoved);
00276 
00277     return ok;
00278   }
00279 
00280   bool JobController::Kill(const std::list<std::string>& status,
00281                            const bool keep) {
00282     std::list<URL> toberemoved;
00283 
00284     GetJobInformation();
00285 
00286     std::list<Job*> killable;
00287     for (std::list<Job>::iterator it = jobstore.begin();
00288          it != jobstore.end(); it++) {
00289 
00290       if (!it->State) {
00291         logger.msg(WARNING, "Job information not found: %s", it->JobID.str());
00292         continue;
00293       }
00294 
00295       if (!status.empty() &&
00296           std::find(status.begin(), status.end(), it->State()) == status.end() &&
00297           std::find(status.begin(), status.end(), it->State.GetGeneralState()) == status.end())
00298         continue;
00299 
00300       if (it->State == JobState::DELETED) {
00301         logger.msg(WARNING, "Job has already been deleted: %s", it->JobID.str());
00302         continue;
00303       }
00304       else if (it->State.IsFinished()) {
00305         logger.msg(WARNING, "Job has already finished: %s", it->JobID.str());
00306         continue;
00307       }
00308 
00309       killable.push_back(&(*it));
00310     }
00311 
00312     bool ok = true;
00313     for (std::list<Job*>::iterator it = killable.begin();
00314          it != killable.end(); it++) {
00315 
00316       bool cancelled = CancelJob(**it);
00317       if (!cancelled) {
00318         logger.msg(ERROR, "Failed cancelling job %s", (*it)->JobID.str());
00319         ok = false;
00320         continue;
00321       }
00322 
00323       if (!keep) {
00324         bool cleaned = CleanJob(**it, true);
00325         if (!cleaned) {
00326           logger.msg(ERROR, "Failed cleaning job %s", (*it)->JobID.str());
00327           ok = false;
00328           continue;
00329         }
00330         toberemoved.push_back((*it)->JobID.str());
00331       }
00332     }
00333 
00334     if (toberemoved.size() > 0)
00335       RemoveJobs(toberemoved);
00336 
00337     return ok;
00338   }
00339 
00340   bool JobController::Clean(const std::list<std::string>& status,
00341                             const bool force) {
00342     std::list<URL> toberemoved;
00343 
00344     GetJobInformation();
00345 
00346     std::list<Job*> cleanable;
00347     for (std::list<Job>::iterator it = jobstore.begin();
00348          it != jobstore.end(); it++) {
00349 
00350       if (!it->State && force && status.empty()) {
00351         logger.msg(WARNING, "Job information not found, job %s will only be deleted from local joblist",
00352                    it->JobID.str());
00353         toberemoved.push_back(it->JobID);
00354         continue;
00355       }
00356 
00357       if (!it->State) {
00358         logger.msg(WARNING, "Job information not found: %s", it->JobID.str());
00359         continue;
00360       }
00361 
00362       // Job state is not among the specified states.
00363       if (!status.empty() &&
00364           std::find(status.begin(), status.end(), it->State()) == status.end() &&
00365           std::find(status.begin(), status.end(), it->State.GetGeneralState()) == status.end())
00366         continue;
00367 
00368       if (!it->State.IsFinished()) {
00369         if (force)
00370           toberemoved.push_back(it->JobID);
00371         else
00372           logger.msg(WARNING, "Job has not finished yet: %s", it->JobID.str());
00373         continue;
00374       }
00375 
00376       cleanable.push_back(&(*it));
00377     }
00378 
00379     bool ok = true;
00380     for (std::list<Job*>::iterator it = cleanable.begin();
00381          it != cleanable.end(); it++) {
00382       bool cleaned = CleanJob(**it, force);
00383       if (!cleaned) {
00384         if (force)
00385           toberemoved.push_back((*it)->JobID);
00386         logger.msg(ERROR, "Failed cleaning job %s", (*it)->JobID.str());
00387         ok = false;
00388         continue;
00389       }
00390       toberemoved.push_back((*it)->JobID);
00391     }
00392 
00393     if (toberemoved.size() > 0)
00394       RemoveJobs(toberemoved);
00395 
00396     return ok;
00397   }
00398 
00399   bool JobController::Cat(const std::list<std::string>& status,
00400                           const std::string& whichfile) {
00401 
00402     if (whichfile != "stdout" && whichfile != "stderr" && whichfile != "gmlog") {
00403       logger.msg(ERROR, "Unknown output %s", whichfile);
00404       return false;
00405     }
00406 
00407     GetJobInformation();
00408 
00409     std::list<Job*> catable;
00410     for (std::list<Job>::iterator it = jobstore.begin();
00411          it != jobstore.end(); it++) {
00412 
00413       if (!it->State) {
00414         logger.msg(WARNING, "Job state information not found: %s",
00415                    it->JobID.str());
00416         continue;
00417       }
00418 
00419       if (!status.empty() &&
00420           std::find(status.begin(), status.end(), it->State()) == status.end() &&
00421           std::find(status.begin(), status.end(), it->State.GetGeneralState()) == status.end())
00422         continue;
00423 
00424       if (it->State == JobState::DELETED) {
00425         logger.msg(WARNING, "Job deleted: %s", it->JobID.str());
00426         continue;
00427       }
00428 
00429       if (!it->State.IsFinished() &&
00430           it->State != JobState::RUNNING &&
00431           it->State != JobState::FINISHING) {
00432         logger.msg(WARNING, "Job has not started yet: %s", it->JobID.str());
00433         continue;
00434       }
00435 
00436       if (whichfile == "stdout" && it->StdOut.empty() ||
00437           whichfile == "stderr" && it->StdErr.empty() ||
00438           whichfile == "gmlog" && it->LogDir.empty()) {
00439         logger.msg(ERROR, "Can not determine the %s location: %s",
00440                    whichfile, it->JobID.str());
00441         continue;
00442       }
00443 
00444       catable.push_back(&(*it));
00445     }
00446 
00447     bool ok = true;
00448     for (std::list<Job*>::iterator it = catable.begin();
00449          it != catable.end(); it++) {
00450       std::string filename = Glib::build_filename(Glib::get_tmp_dir(), "arccat.XXXXXX");
00451       int tmp_h = Glib::mkstemp(filename);
00452       if (tmp_h == -1) {
00453         logger.msg(INFO, "Could not create temporary file \"%s\"", filename);
00454         logger.msg(ERROR, "Cannot output %s for job (%s)", whichfile, (*it)->JobID.str());
00455         ok = false;
00456         continue;
00457       }
00458       close(tmp_h);
00459 
00460       logger.msg(VERBOSE, "Catting %s for job %s", whichfile, (*it)->JobID.str());
00461 
00462       URL src = GetFileUrlForJob((**it), whichfile);
00463       if (!src) {
00464         logger.msg(ERROR, "Cannot output %s for job (%s): Invalid source %s", whichfile, (*it)->JobID.str(), src.str());
00465         continue;
00466       }
00467 
00468       URL dst(filename);
00469       if (!dst) {
00470         logger.msg(ERROR, "Cannot output %s for job (%s): Invalid destination %s", whichfile, (*it)->JobID.str(), dst.str());
00471         continue;
00472       }
00473 
00474       bool copied = ARCCopyFile(src, dst);
00475 
00476       if (copied) {
00477         std::cout << IString("%s from job %s", whichfile,
00478                              (*it)->JobID.str()) << std::endl;
00479         std::ifstream is(filename.c_str());
00480         char c;
00481         while (is.get(c))
00482           std::cout.put(c);
00483         is.close();
00484         unlink(filename.c_str());
00485       }
00486       else
00487         ok = false;
00488     }
00489 
00490     return ok;
00491   }
00492 
00493   bool JobController::PrintJobStatus(const std::list<std::string>& status,
00494                                      const bool longlist) {
00495 
00496     GetJobInformation();
00497 
00498     for (std::list<Job>::iterator it = jobstore.begin();
00499          it != jobstore.end(); it++) {
00500       if (!it->State) {
00501         logger.msg(WARNING, "Job information not found: %s", it->JobID.str());
00502         if (Time() - it->LocalSubmissionTime < 90)
00503           logger.msg(WARNING, "This job was very recently "
00504                      "submitted and might not yet "
00505                      "have reached the information-system");
00506         continue;
00507       }
00508 
00509       if (!status.empty() &&
00510           std::find(status.begin(), status.end(), it->State()) == status.end() &&
00511           std::find(status.begin(), status.end(), it->State.GetGeneralState()) == status.end())
00512         continue;
00513 
00514       it->Print(longlist);
00515     }
00516     return true;
00517   }
00518 
00519   bool JobController::Migrate(TargetGenerator& targetGen,
00520                               Broker *broker,
00521                               const UserConfig& usercfg,
00522                               const bool forcemigration,
00523                               std::list<URL>& migratedJobIDs) {
00524     bool retVal = true;
00525     std::list<URL> toberemoved;
00526 
00527     GetJobInformation();
00528     for (std::list<Job>::iterator itJob = jobstore.begin(); itJob != jobstore.end(); itJob++) {
00529       if (itJob->State != JobState::QUEUING) {
00530         logger.msg(WARNING, "Cannot migrate job %s, it is not queuing.", itJob->JobID.str());
00531         continue;
00532       }
00533 
00534       JobDescription jobDesc;
00535       if (!GetJobDescription(*itJob, itJob->JobDescription))
00536         continue;
00537 
00538       jobDesc.Parse(itJob->JobDescription);
00539 
00540       broker->PreFilterTargets(targetGen.ModifyFoundTargets(), jobDesc);
00541       while (true) {
00542         const ExecutionTarget *currentTarget = broker->GetBestTarget();
00543         if (!currentTarget) {
00544           logger.msg(ERROR, "Job migration failed, for job %s, no more possible targets", itJob->JobID.str());
00545           retVal = false;
00546           break;
00547         }
00548 
00549         URL jobid = currentTarget->GetSubmitter(usercfg)->Migrate(itJob->JobID, jobDesc, *currentTarget, forcemigration);
00550         if (!jobid)
00551           continue;
00552 
00553         broker->RegisterJobsubmission();
00554         migratedJobIDs.push_back(jobid);
00555         toberemoved.push_back(URL(itJob->JobID.str()));
00556         break;
00557       }
00558     }
00559 
00560     if (toberemoved.size() > 0)
00561       RemoveJobs(toberemoved);
00562 
00563     return retVal;
00564   }
00565 
00566   bool JobController::Renew(const std::list<std::string>& status) {
00567 
00568     GetJobInformation();
00569 
00570     std::list<Job*> renewable;
00571     for (std::list<Job>::iterator it = jobstore.begin();
00572          it != jobstore.end(); it++) {
00573 
00574       if (!it->State) {
00575         logger.msg(WARNING, "Job information not found: %s", it->JobID.str());
00576         continue;
00577       }
00578 
00579       if (!status.empty() &&
00580           std::find(status.begin(), status.end(), it->State()) == status.end() &&
00581           std::find(status.begin(), status.end(), it->State.GetGeneralState()) == status.end())
00582         continue;
00583 
00584       if (!it->State.IsFinished()) {
00585         logger.msg(WARNING, "Job has not finished yet: %s", it->JobID.str());
00586         continue;
00587       }
00588 
00589       renewable.push_back(&(*it));
00590     }
00591 
00592     bool ok = true;
00593     for (std::list<Job*>::iterator it = renewable.begin();
00594          it != renewable.end(); it++) {
00595       bool renewed = RenewJob(**it);
00596       if (!renewed) {
00597         logger.msg(ERROR, "Failed renewing job %s", (*it)->JobID.str());
00598         ok = false;
00599         continue;
00600       }
00601     }
00602     return ok;
00603   }
00604 
00605   bool JobController::Resume(const std::list<std::string>& status) {
00606 
00607     GetJobInformation();
00608 
00609     std::list<Job*> resumable;
00610     for (std::list<Job>::iterator it = jobstore.begin();
00611          it != jobstore.end(); it++) {
00612 
00613       if (!it->State) {
00614         logger.msg(WARNING, "Job information not found: %s", it->JobID.str());
00615         continue;
00616       }
00617 
00618       if (!status.empty() &&
00619           std::find(status.begin(), status.end(), it->State()) == status.end() &&
00620           std::find(status.begin(), status.end(), it->State.GetGeneralState()) == status.end())
00621         continue;
00622 
00623       if (!it->State.IsFinished()) {
00624         logger.msg(WARNING, "Job has not finished yet: %s", it->JobID.str());
00625         continue;
00626       }
00627 
00628       resumable.push_back(&(*it));
00629     }
00630 
00631     bool ok = true;
00632     for (std::list<Job*>::iterator it = resumable.begin();
00633          it != resumable.end(); it++) {
00634       bool resumed = ResumeJob(**it);
00635       if (!resumed) {
00636         logger.msg(ERROR, "Failed resuming job %s", (*it)->JobID.str());
00637         ok = false;
00638         continue;
00639       }
00640     }
00641     return ok;
00642   }
00643 
00644   std::list<std::string> JobController::GetDownloadFiles(const URL& dir) {
00645 
00646     std::list<std::string> files;
00647     std::list<FileInfo> outputfiles;
00648 
00649     DataHandle handle(dir, usercfg);
00650     if (!handle) {
00651       logger.msg(INFO, "Unable to list files at %s", dir.str());
00652       return files;
00653     }
00654     handle->ListFiles(outputfiles, true, false, false);
00655 
00656     for (std::list<FileInfo>::iterator i = outputfiles.begin();
00657          i != outputfiles.end(); i++) {
00658 
00659       if (i->GetName() == ".." || i->GetName() == ".")
00660         continue;
00661 
00662       if (i->GetType() == FileInfo::file_type_unknown ||
00663           i->GetType() == FileInfo::file_type_file)
00664         files.push_back(i->GetName());
00665       else if (i->GetType() == FileInfo::file_type_dir) {
00666 
00667         std::string path = dir.Path();
00668         if (path[path.size() - 1] != '/')
00669           path += "/";
00670         URL tmpdir(dir);
00671         tmpdir.ChangePath(path + i->GetName());
00672         std::list<std::string> morefiles = GetDownloadFiles(tmpdir);
00673         std::string dirname = i->GetName();
00674         if (dirname[dirname.size() - 1] != '/')
00675           dirname += "/";
00676         for (std::list<std::string>::iterator it = morefiles.begin();
00677              it != morefiles.end(); it++)
00678           files.push_back(dirname + *it);
00679       }
00680     }
00681     return files;
00682   }
00683 
00684   bool JobController::ARCCopyFile(const URL& src, const URL& dst) {
00685 
00686     DataMover mover;
00687     mover.retry(true);
00688     mover.secure(false);
00689     mover.passive(true);
00690     mover.verbose(false);
00691 
00692     logger.msg(VERBOSE, "Now copying (from -> to)");
00693     logger.msg(VERBOSE, " %s -> %s", src.str(), dst.str());
00694 
00695     DataHandle source(src, usercfg);
00696     if (!source) {
00697       logger.msg(ERROR, "Unable to initialise connection to source: %s", src.str());
00698       return false;
00699     }
00700 
00701     DataHandle destination(dst, usercfg);
00702     if (!destination) {
00703       logger.msg(ERROR, "Unable to initialise connection to destination: %s",
00704                  dst.str());
00705       return false;
00706     }
00707 
00708     FileCache cache;
00709     DataStatus res =
00710       mover.Transfer(*source, *destination, cache, URLMap(), 0, 0, 0,
00711                      usercfg.Timeout());
00712     if (!res.Passed()) {
00713       if (!res.GetDesc().empty())
00714         logger.msg(ERROR, "File download failed: %s - %s", std::string(res), res.GetDesc());
00715       else
00716         logger.msg(ERROR, "File download failed: %s", std::string(res));
00717       return false;
00718     }
00719 
00720     return true;
00721 
00722   }
00723 
00724   bool JobController::RemoveJobs(const std::list<URL>& jobids) {
00725 
00726     logger.msg(VERBOSE, "Removing jobs from job list and job store");
00727 
00728     FileLock lock(usercfg.JobListFile());
00729     jobstorage.ReadFromFile(usercfg.JobListFile());
00730 
00731     for (std::list<URL>::const_iterator it = jobids.begin();
00732          it != jobids.end(); it++) {
00733 
00734       XMLNodeList xmljobs = jobstorage.XPathLookup("//Job[JobID='" + it->str() + "']", NS());
00735 
00736       if (xmljobs.empty())
00737         logger.msg(ERROR, "Job %s not found in job list.", it->str());
00738       else {
00739         XMLNode& xmljob = *xmljobs.begin();
00740         if (xmljob) {
00741           logger.msg(INFO, "Removing job %s from job list file", it->str());
00742           xmljob.Destroy();
00743         }
00744       }
00745 
00746       std::list<Job>::iterator it2 = jobstore.begin();
00747       while (it2 != jobstore.end()) {
00748         if (it2->JobID == *it) {
00749           it2 = jobstore.erase(it2);
00750           break;
00751         }
00752         it2++;
00753       }
00754     }
00755 
00756     jobstorage.SaveToFile(usercfg.JobListFile());
00757 
00758     logger.msg(VERBOSE, "Job store for %s now contains %d jobs", flavour, jobstore.size());
00759     logger.msg(VERBOSE, "Finished removing jobs from job list and job store");
00760 
00761     return true;
00762   }
00763 
00764   std::list<Job> JobController::GetJobDescriptions(const std::list<std::string>& status,
00765                                                    const bool getlocal) {
00766 
00767     GetJobInformation();
00768 
00769     // Only selected jobs with specified status
00770     std::list<Job> gettable;
00771     for (std::list<Job>::iterator it = jobstore.begin();
00772          it != jobstore.end(); it++) {
00773       if (!status.empty() && !it->State) {
00774         logger.msg(WARNING, "Job information not found: %s", it->JobID.str());
00775         continue;
00776       }
00777 
00778       if (!status.empty() && std::find(status.begin(), status.end(),
00779                                        it->State()) == status.end())
00780         continue;
00781       gettable.push_back(*it);
00782     }
00783 
00784     //First try to get descriptions from local job file
00785     if (getlocal) {
00786       logger.msg(VERBOSE, "Getting job decriptions from local job file");
00787       CheckLocalDescription(gettable);
00788     }
00789     else
00790       logger.msg(VERBOSE, "Disregarding job decriptions from local job file");
00791 
00792     // Try to get description from cluster
00793     for (std::list<Job>::iterator it = gettable.begin();
00794          it != gettable.end();) {
00795       if (!it->JobDescription.empty()) {
00796         it++;
00797         continue;
00798       }
00799       if (GetJobDescription(*it, it->JobDescription)) {
00800         logger.msg(VERBOSE, "Got job description for %s", it->JobID.str());
00801         it++;
00802       }
00803       else {
00804         logger.msg(INFO, "Failed getting job description for %s", it->JobID.str());
00805         it = gettable.erase(it);
00806       }
00807     }
00808     return gettable;
00809 
00810   }
00811 
00812   void JobController::CheckLocalDescription(std::list<Job>& jobs) {
00813     for (std::list<Job>::iterator it = jobs.begin();
00814          it != jobs.end();) {
00815       // Search for jobids
00816       XMLNodeList xmljobs =
00817         jobstorage.XPathLookup("//Job[JobID='" + it->JobID.str() + "']", NS());
00818 
00819       if (xmljobs.empty()) {
00820         logger.msg(INFO, "Job not found in job list: %s", it->JobID.str());
00821         it++;
00822         continue;
00823       }
00824       XMLNode& xmljob = *xmljobs.begin();
00825 
00826       if (xmljob["JobDescription"]) {
00827         JobDescription jobdesc;
00828         jobdesc.Parse((std::string)xmljob["JobDescription"]);
00829 
00830         // Check for valid job description
00831         if (jobdesc)
00832           logger.msg(VERBOSE, "Valid jobdescription found for: %s", it->JobID.str());
00833         else {
00834           logger.msg(INFO, "Invalid jobdescription found for: %s", it->JobID.str());
00835           it++;
00836           continue;
00837         }
00838 
00839         // Check checksums of local input files
00840         bool CKSUM = true;
00841         int size = xmljob["LocalInputFiles"].Size();
00842         for (int i = 0; i < size; i++) {
00843           const std::string file = (std::string)xmljob["LocalInputFiles"]["File"][i]["Source"];
00844           const std::string cksum_old = (std::string)xmljob["LocalInputFiles"]["File"][i]["CheckSum"];
00845           const std::string cksum_new = Submitter::GetCksum(file, usercfg);
00846           if (cksum_old != cksum_new) {
00847             logger.msg(WARNING, "Checksum of input file %s has changed.", file);
00848             CKSUM = false;
00849           }
00850           else
00851             logger.msg(VERBOSE, "Stored and new checksum of input file %s are identical.", file);
00852         }
00853         // Push_back job and job descriptions
00854         if (CKSUM) {
00855           logger.msg(INFO, "Job description for %s retrieved locally", it->JobID.str());
00856           it->JobDescription = (std::string)xmljob["JobDescription"];
00857           it++;
00858         }
00859         else {
00860           logger.msg(WARNING, "Job %s can not be resubmitted", it->JobID.str());
00861           it = jobs.erase(it);
00862         }
00863       }
00864       else {
00865         logger.msg(INFO, "Job description for %s could not be retrieved locally", it->JobID.str());
00866         it++;
00867       }
00868     } //end loop over jobs
00869     return;
00870   }
00871 
00872   JobControllerLoader::JobControllerLoader()
00873     : Loader(BaseConfig().MakeConfig(Config()).Parent()) {}
00874 
00875   JobControllerLoader::~JobControllerLoader() {
00876     for (std::list<JobController*>::iterator it = jobcontrollers.begin();
00877          it != jobcontrollers.end(); it++)
00878       delete *it;
00879   }
00880 
00881   JobController* JobControllerLoader::load(const std::string& name,
00882                                            const UserConfig& usercfg) {
00883     if (name.empty())
00884       return NULL;
00885 
00886     if(!factory_->load(FinderLoader::GetLibrariesList(),
00887                        "HED:JobController", name)) {
00888       logger.msg(ERROR, "JobController plugin \"%s\" not found.", name);
00889       return NULL;
00890     }
00891 
00892     JobControllerPluginArgument arg(usercfg);
00893     JobController *jobcontroller =
00894       factory_->GetInstance<JobController>("HED:JobController", name, &arg, false);
00895 
00896     if (!jobcontroller) {
00897       logger.msg(ERROR, "JobController %s could not be created", name);
00898       return NULL;
00899     }
00900 
00901     jobcontrollers.push_back(jobcontroller);
00902     logger.msg(INFO, "Loaded JobController %s", name);
00903     return jobcontroller;
00904   }
00905 
00906 } // namespace Arc