Back to index

nordugrid-arc-nox  1.1.0~rc6
arcresub.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #include <fstream>
00008 #include <iostream>
00009 #include <list>
00010 #include <string>
00011 #include <sys/types.h>
00012 #include <sys/stat.h>
00013 #include <unistd.h>
00014 
00015 #include <arc/ArcConfig.h>
00016 #include <arc/ArcLocation.h>
00017 #include <arc/DateTime.h>
00018 #include <arc/FileLock.h>
00019 #include <arc/IString.h>
00020 #include <arc/Logger.h>
00021 #include <arc/OptionParser.h>
00022 #include <arc/StringConv.h>
00023 #include <arc/Utils.h>
00024 #include <arc/URL.h>
00025 #include <arc/XMLNode.h>
00026 #include <arc/client/Submitter.h>
00027 #include <arc/client/JobDescription.h>
00028 #include <arc/client/JobController.h>
00029 #include <arc/client/JobSupervisor.h>
00030 #include <arc/client/TargetGenerator.h>
00031 #include <arc/UserConfig.h>
00032 #include <arc/client/Broker.h>
00033 
00034 int main(int argc, char **argv) {
00035 
00036   setlocale(LC_ALL, "");
00037 
00038   Arc::Logger logger(Arc::Logger::getRootLogger(), "arcresub");
00039   Arc::LogStream logcerr(std::cerr);
00040   logcerr.setFormat(Arc::ShortFormat);
00041   Arc::Logger::getRootLogger().addDestination(logcerr);
00042   Arc::Logger::getRootLogger().setThreshold(Arc::WARNING);
00043 
00044   Arc::ArcLocation::Init(argv[0]);
00045 
00046   Arc::OptionParser options(istring("[job ...]\n"));
00047 
00048   bool all = false;
00049   options.AddOption('a', "all",
00050                     istring("all jobs"),
00051                     all);
00052 
00053   std::string joblist;
00054   options.AddOption('j', "joblist",
00055                     istring("file where the jobs will be stored"),
00056                     istring("filename"),
00057                     joblist);
00058 
00059   std::list<std::string> clusters;
00060   options.AddOption('c', "cluster",
00061                     istring("explicity select or reject a specific cluster"),
00062                     istring("[-]name"),
00063                     clusters);
00064 
00065   std::list<std::string> qlusters;
00066   options.AddOption('q', "qluster",
00067                     istring("explicity select or reject a specific cluster "
00068                             "for the new job"),
00069                     istring("[-]name"),
00070                     qlusters);
00071 
00072   std::list<std::string> indexurls;
00073   options.AddOption('i', "index",
00074                     istring("explicity select or reject an index server"),
00075                     istring("[-]name"),
00076                     indexurls);
00077 
00078   bool keep = false;
00079   options.AddOption('k', "keep",
00080                     istring("keep the files on the server (do not clean)"),
00081                     keep);
00082 
00083   bool same = false;
00084   options.AddOption('m', "same",
00085                     istring("resubmit to the same cluster"),
00086                     same);
00087 
00088   std::list<std::string> status;
00089   options.AddOption('s', "status",
00090                     istring("only select jobs whose status is statusstr"),
00091                     istring("statusstr"),
00092                     status);
00093 
00094   int timeout = -1;
00095   options.AddOption('t', "timeout", istring("timeout in seconds (default 20)"),
00096                     istring("seconds"), timeout);
00097 
00098   std::string conffile;
00099   options.AddOption('z', "conffile",
00100                     istring("configuration file (default ~/.arc/client.conf)"),
00101                     istring("filename"), conffile);
00102 
00103   std::string debug;
00104   options.AddOption('d', "debug",
00105                     istring("FATAL, ERROR, WARNING, INFO, VERBOSE or DEBUG"),
00106                     istring("debuglevel"), debug);
00107 
00108   std::string broker;
00109   options.AddOption('b', "broker",
00110                     istring("select broker method (Random (default), FastestQueue, or custom)"),
00111                     istring("broker"), broker);
00112 
00113   bool version = false;
00114   options.AddOption('v', "version", istring("print version information"),
00115                     version);
00116 
00117 
00118   std::list<std::string> jobs = options.Parse(argc, argv);
00119 
00120   // If debug is specified as argument, it should be set before loading the configuration.
00121   if (!debug.empty())
00122     Arc::Logger::getRootLogger().setThreshold(Arc::string_to_level(debug));
00123 
00124   Arc::UserConfig usercfg(conffile, joblist);
00125   if (!usercfg) {
00126     logger.msg(Arc::ERROR, "Failed configuration initialization");
00127     return 1;
00128   }
00129 
00130   if (debug.empty() && !usercfg.Verbosity().empty())
00131     Arc::Logger::getRootLogger().setThreshold(Arc::string_to_level(usercfg.Verbosity()));
00132 
00133   if (timeout > 0)
00134     usercfg.Timeout(timeout);
00135 
00136   if (!broker.empty())
00137     usercfg.Broker(broker);
00138 
00139   if (version) {
00140     std::cout << Arc::IString("%s version %s", "arcresub", VERSION)
00141               << std::endl;
00142     return 0;
00143   }
00144 
00145   if ((!joblist.empty() || !status.empty()) && jobs.empty() && clusters.empty())
00146     all = true;
00147 
00148   if (jobs.empty() && clusters.empty() && !all) {
00149     logger.msg(Arc::ERROR, "No jobs given");
00150     return 1;
00151   }
00152 
00153   // Different selected services are needed in two different context,
00154   // so the two copies of UserConfig objects will contain different
00155   // selected services.
00156   Arc::UserConfig usercfg2 = usercfg;
00157 
00158   if (!jobs.empty() || all)
00159     usercfg.ClearSelectedServices();
00160 
00161   if (!clusters.empty()) {
00162     usercfg.ClearSelectedServices();
00163     usercfg.AddServices(clusters, Arc::COMPUTING);
00164   }
00165 
00166   Arc::JobSupervisor jobmaster(usercfg, jobs);
00167   if (!jobmaster.JobsFound()) {
00168     std::cout << "No jobs" << std::endl;
00169     return 0;
00170   }
00171   std::list<Arc::JobController*> jobcont = jobmaster.GetJobControllers();
00172 
00173   // If the user specified a joblist on the command line joblist equals
00174   // usercfg.JobListFile(). If not use the default, ie. usercfg.JobListFile().
00175   if (jobcont.empty()) {
00176     logger.msg(Arc::ERROR, "No job controller plugins loaded");
00177     return 1;
00178   }
00179 
00180   // Clearing jobs.
00181   jobs.clear();
00182 
00183   std::list<Arc::Job> toberesubmitted;
00184   for (std::list<Arc::JobController*>::iterator it = jobcont.begin();
00185        it != jobcont.end(); it++) {
00186     std::list<Arc::Job> cont_jobs;
00187     cont_jobs = (*it)->GetJobDescriptions(status, true);
00188     toberesubmitted.insert(toberesubmitted.begin(), cont_jobs.begin(), cont_jobs.end());
00189   }
00190   if (toberesubmitted.empty()) {
00191     logger.msg(Arc::ERROR, "No jobs to resubmit");
00192     return 1;
00193   }
00194 
00195   if (same) {
00196     qlusters.clear();
00197     usercfg2.ClearSelectedServices();
00198   }
00199   else if (!qlusters.empty() || !indexurls.empty())
00200     usercfg2.ClearSelectedServices();
00201 
00202   // Preventing resubmitted jobs to be send to old clusters
00203   for (std::list<Arc::Job>::iterator it = toberesubmitted.begin();
00204        it != toberesubmitted.end(); it++)
00205     if (same) {
00206       qlusters.push_back(it->Flavour + ":" + it->Cluster.str());
00207       logger.msg(Arc::VERBOSE, "Trying to resubmit job to %s", qlusters.front());
00208     }
00209     else {
00210       qlusters.remove(it->Flavour + ":" + it->Cluster.str());
00211       qlusters.push_back("-" + it->Flavour + ":" + it->Cluster.str());
00212       logger.msg(Arc::VERBOSE, "Disregarding %s", it->Cluster.str());
00213     }
00214   qlusters.sort();
00215   qlusters.unique();
00216 
00217   usercfg2.AddServices(qlusters, Arc::COMPUTING);
00218   if (!same && !indexurls.empty())
00219     usercfg2.AddServices(indexurls, Arc::INDEX);
00220 
00221   // Resubmitting jobs
00222   Arc::TargetGenerator targen(usercfg2);
00223   targen.GetTargets(0, 1);
00224 
00225   if (targen.FoundTargets().empty()) {
00226     std::cout << Arc::IString("Job submission aborted because no clusters returned any information") << std::endl;
00227     return 1;
00228   }
00229 
00230   Arc::BrokerLoader loader;
00231   Arc::Broker *ChosenBroker = loader.load(usercfg.Broker().first, usercfg2);
00232   if (!ChosenBroker) {
00233     logger.msg(Arc::ERROR, "Unable to load broker %s", usercfg2.Broker().first);
00234     return 1;
00235   }
00236   logger.msg(Arc::INFO, "Broker %s loaded", usercfg2.Broker().first);
00237 
00238   // Loop over jobs
00239   for (std::list<Arc::Job>::iterator it = toberesubmitted.begin();
00240        it != toberesubmitted.end(); it++) {
00241 
00242     Arc::JobDescription jobdesc;
00243     jobdesc.Parse(it->JobDescription);
00244     jobdesc.Identification.ActivityOldId = it->ActivityOldId;
00245     jobdesc.Identification.ActivityOldId.push_back(it->JobID.str());
00246     ChosenBroker->PreFilterTargets(targen.ModifyFoundTargets(), jobdesc);
00247     while (true) {
00248       const Arc::ExecutionTarget* target = ChosenBroker->GetBestTarget();
00249       if (!target) {
00250         std::cout << Arc::IString("Job submission failed, no more possible targets") << std::endl;
00251         break;
00252       }
00253 
00254       Arc::Submitter *submitter = target->GetSubmitter(usercfg2);
00255 
00256       //submit the job
00257       Arc::URL jobid = submitter->Submit(jobdesc, *target);
00258       if (!jobid) {
00259         std::cout << Arc::IString("Submission to %s failed, trying next target", target->url.str()) << std::endl;
00260         continue;
00261       }
00262 
00263       ChosenBroker->RegisterJobsubmission();
00264       std::cout << Arc::IString("Job resubmitted with new jobid: %s",
00265                                 jobid.str()) << std::endl;
00266 
00267       jobs.push_back(it->JobID.str());
00268       break;
00269     } //end loop over all possible targets
00270   } //end loop over all job descriptions
00271 
00272   if (jobs.empty())
00273     return 0;
00274 
00275   usercfg.ClearSelectedServices();
00276 
00277   // Only kill and clean jobs that have been resubmitted
00278   Arc::JobSupervisor killmaster(usercfg, jobs);
00279   if (!killmaster.JobsFound()) {
00280     std::cout << "No jobs" << std::endl;
00281     return 0;
00282   }
00283   std::list<Arc::JobController*> killcont = killmaster.GetJobControllers();
00284   if (killcont.empty()) {
00285     logger.msg(Arc::ERROR, "No job controller plugins loaded");
00286     return 1;
00287   }
00288 
00289   for (std::list<Arc::JobController*>::iterator it = killcont.begin();
00290        it != killcont.end(); it++)
00291     if (!(*it)->Kill(status, keep))
00292       if (!keep)
00293         if (!(*it)->Clean(status, true))
00294           logger.msg(Arc::WARNING, "Job could not be killed or cleaned");
00295 
00296   /*
00297      if (toberesubmitted.size() > 1) {
00298      std::cout << std::endl << Arc::IString("Job submission summary:")
00299      << std::endl;
00300      std::cout << "-----------------------" << std::endl;
00301      std::cout << Arc::IString("%d of %d jobs were submitted",
00302      toberesubmitted.size() - notresubmitted.size(),
00303      toberesubmitted.size()) << std::endl;
00304      if (notresubmitted.size()) {
00305      std::cout << Arc::IString("The following %d were not submitted",
00306      notresubmitted.size()) << std::endl;
00307      }
00308      }*/
00309   return 0;
00310 }