Back to index

nordugrid-arc-nox  1.1.0~rc6
Submitter.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #include <arc/ArcConfig.h>
00008 #include <arc/FileLock.h>
00009 #include <arc/StringConv.h>
00010 #include <arc/Thread.h>
00011 #include <arc/client/ExecutionTarget.h>
00012 #include <arc/client/JobDescription.h>
00013 #include <arc/client/Submitter.h>
00014 #include <arc/UserConfig.h>
00015 #include <arc/data/FileCache.h>
00016 #include <arc/data/CheckSum.h>
00017 #include <arc/data/DataBuffer.h>
00018 #include <arc/data/DataMover.h>
00019 #include <arc/data/DataHandle.h>
00020 #include <arc/data/URLMap.h>
00021 #include <arc/loader/FinderLoader.h>
00022 
00023 
00024 namespace Arc {
00025 
00026   Logger Submitter::logger(Logger::getRootLogger(), "Submitter");
00027 
00028   Submitter::Submitter(const UserConfig& usercfg,
00029                        const std::string& flavour)
00030     : flavour(flavour),
00031       usercfg(usercfg) {}
00032 
00033   Submitter::~Submitter() {}
00034 
00035   bool Submitter::PutFiles(const JobDescription& job, const URL& url) const {
00036 
00037     FileCache cache;
00038     DataMover mover;
00039     mover.retry(true);
00040     mover.secure(false);
00041     mover.passive(true);
00042     mover.verbose(false);
00043 
00044     for (std::list<FileType>::const_iterator it = job.DataStaging.File.begin();
00045          it != job.DataStaging.File.end(); it++)
00046       if (!it->Source.empty()) {
00047         const URL& src = it->Source.begin()->URI;
00048         if (src.Protocol() == "file") {
00049           URL dst(std::string(url.str() + '/' + it->Name));
00050           DataHandle source(src, usercfg);
00051           DataHandle destination(dst, usercfg);
00052           DataStatus res =
00053             mover.Transfer(*source, *destination, cache, URLMap(), 0, 0, 0,
00054                            usercfg.Timeout());
00055           if (!res.Passed()) {
00056             if (!res.GetDesc().empty())
00057               logger.msg(ERROR, "Failed uploading file: %s - %s",
00058                          std::string(res), res.GetDesc());
00059             else
00060               logger.msg(ERROR, "Failed uploading file: %s", std::string(res));
00061             return false;
00062           }
00063         }
00064       }
00065 
00066     return true;
00067   }
00068 
00069   // TODO: This method is not scaling well in case of many submitted jobs.
00070   // For few thousands it may take tens of second to finish. Taking into
00071   // account it is called after every job submission it needs rewriting.
00072   void Submitter::AddJob(const JobDescription& job, const URL& jobid,
00073                          const URL& cluster,
00074                          const URL& infoendpoint,
00075                          const std::map<std::string, std::string>& additionalInfo) const {
00076     NS ns;
00077     XMLNode info(ns, "Job");
00078     info.NewChild("JobID") = jobid.str();
00079     if (!job.Identification.JobName.empty())
00080       info.NewChild("Name") = job.Identification.JobName;
00081     info.NewChild("Flavour") = flavour;
00082     info.NewChild("Cluster") = cluster.str();
00083     info.NewChild("InfoEndpoint") = infoendpoint.str();
00084     info.NewChild("LocalSubmissionTime") = (std::string)Arc::Time();
00085 
00086     for (std::map<std::string, std::string>::const_iterator it = additionalInfo.begin();
00087          it != additionalInfo.end(); it++)
00088       info.NewChild(it->first) = it->second;
00089 
00090     for (std::list<std::string>::const_iterator
00091            it = job.Identification.ActivityOldId.begin();
00092          it != job.Identification.ActivityOldId.end(); it++)
00093       info.NewChild("OldJobID") = *it;
00094 
00095     std::string rep = job.UnParse("arcjsdl");
00096     info.NewChild("JobDescription") = (std::string)rep;
00097 
00098     for (std::list<FileType>::const_iterator it = job.DataStaging.File.begin();
00099          it != job.DataStaging.File.end(); it++)
00100       if (!it->Source.empty())
00101         if (it->Source.begin()->URI.Protocol() == "file") {
00102           if (!info["LocalInputFiles"])
00103             info.NewChild("LocalInputFiles");
00104           XMLNode File = info["LocalInputFiles"].NewChild("File");
00105           File.NewChild("Source") = it->Name;
00106           File.NewChild("CheckSum") = GetCksum(it->Source.begin()->URI.Path());
00107         }
00108 
00109     FileLock lock(usercfg.JobListFile());
00110     Config jobstorage;
00111     jobstorage.ReadFromFile(usercfg.JobListFile());
00112     jobstorage.NewChild(info);
00113     jobstorage.SaveToFile(usercfg.JobListFile());
00114   }
00115 
00116   std::string Submitter::GetCksum(const std::string& file, const UserConfig& usercfg) {
00117     DataHandle source(file, usercfg);
00118     DataBuffer buffer;
00119 
00120     MD5Sum md5sum;
00121     buffer.set(&md5sum);
00122 
00123     if (!source->StartReading(buffer))
00124       return "";
00125 
00126     int handle;
00127     unsigned int length;
00128     unsigned long long int offset;
00129 
00130     while (buffer.for_write() || !buffer.eof_read())
00131       if (buffer.for_write(handle, length, offset, true))
00132         buffer.is_written(handle);
00133 
00134     if (!source->StopReading())
00135       return "";
00136 
00137     if (!buffer.checksum_valid())
00138       return "";
00139 
00140     char buf[100];
00141     md5sum.print(buf, 100);
00142     return buf;
00143   }
00144 
00145   SubmitterLoader::SubmitterLoader()
00146     : Loader(BaseConfig().MakeConfig(Config()).Parent()) {}
00147 
00148   SubmitterLoader::~SubmitterLoader() {
00149     for (std::list<Submitter*>::iterator it = submitters.begin();
00150          it != submitters.end(); it++)
00151       delete *it;
00152   }
00153 
00154   Submitter* SubmitterLoader::load(const std::string& name,
00155                                    const UserConfig& usercfg) {
00156     if (name.empty())
00157       return NULL;
00158 
00159     if(!factory_->load(FinderLoader::GetLibrariesList(),
00160                        "HED:Submitter", name)) {
00161       logger.msg(ERROR, "Submitter plugin \"%s\" not found.", name);
00162       return NULL;
00163     }
00164 
00165     SubmitterPluginArgument arg(usercfg);
00166     Submitter *submitter =
00167       factory_->GetInstance<Submitter>("HED:Submitter", name, &arg, false);
00168 
00169     if (!submitter) {
00170       logger.msg(ERROR, "Submitter %s could not be created", name);
00171       return NULL;
00172     }
00173 
00174     submitters.push_back(submitter);
00175     logger.msg(INFO, "Loaded Submitter %s", name);
00176     return submitter;
00177   }
00178 
00179 } // namespace Arc