Back to index

nordugrid-arc-nox  1.1.0~rc6
job_request.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 
00005 #include <iostream>
00006 #include <fstream>
00007 
00008 #include <glibmm.h>
00009 
00010 #include <arc/Logger.h>
00011 #include <arc/StringConv.h>
00012 #include <arc/URL.h>
00013 #include <arc/client/JobDescription.h>
00014 #include <arc/Utils.h>
00015 
00016 
00017 #include "users.h"
00018 #include "job.h"
00019 #include "../files/info_files.h"
00020 #include "../run/run_function.h"
00021 #include "../conf/environment.h"
00022 
00023 #include "job_request.h"
00024 
00025 #if defined __GNUC__ && __GNUC__ >= 3
00026 
00027 #include <limits>
00028 #define istream_readline(__f,__s,__n) {      \
00029    __f.get(__s,__n,__f.widen('\n'));         \
00030    if(__f.fail()) __f.clear();               \
00031    __f.ignore(std::numeric_limits<std::streamsize>::max(), __f.widen('\n')); \
00032 }
00033 
00034 #else
00035 
00036 #define istream_readline(__f,__s,__n) {      \
00037    __f.get(__s,__n,'\n');         \
00038    if(__f.fail()) __f.clear();               \
00039    __f.ignore(INT_MAX,'\n'); \
00040 }
00041 
00042 #endif
00043 
00044 
00045 static Arc::Logger& logger = Arc::Logger::getRootLogger();
00046 
00047 typedef enum {
00048   job_req_unknown,
00049   job_req_rsl,
00050 // #ifndef JSDL_MISSING
00051   job_req_jsdl
00052 // #endif
00053 } job_req_type_t;
00054 
00055 
00056 static int filter_rtes(const std::string& rte_root,const std::list<std::string>& rte) {
00057   int rtes = 0;
00058   if(rte_root.empty()) return rte.size();
00059   for(std::list<std::string>::const_iterator r = rte.begin();r!=rte.end();++r) {
00060     std::string path = Glib::build_filename(rte_root,*r);
00061     if(Glib::file_test(path,Glib::FILE_TEST_IS_REGULAR)) continue;
00062     ++rtes;
00063   }
00064   return rtes;
00065 }
00066 
00067 /* function for jobmanager-ng */
00068 bool parse_job_req_for_action(const char* fname,
00069                std::string &action,std::string &jobid,
00070                std::string &lrms,std::string &queue) {
00071   JobLocalDescription job_desc;
00072   std::string filename(fname);
00073   if(parse_job_req(filename,job_desc) == JobReqSuccess) {
00074     action=job_desc.action;
00075     jobid=job_desc.jobid;
00076     lrms=job_desc.lrms;
00077     queue=job_desc.queue;
00078     return true;
00079   };
00080   return false;
00081 }
00082 
00083 /* parse RSL and write few information files */
00084 bool process_job_req(JobUser &user,const JobDescription &desc) {
00085   JobLocalDescription job_desc;
00086   return process_job_req(user,desc,job_desc);
00087 }
00088 
00089 bool process_job_req(JobUser &user,const JobDescription &desc,JobLocalDescription &job_desc) {
00090   /* read local first to get some additional info pushed here by script */
00091   job_local_read_file(desc.get_id(),user,job_desc);
00092   /* some default values */
00093   job_desc.lrms=user.DefaultLRMS();
00094   job_desc.queue=user.DefaultQueue();
00095   job_desc.lifetime=Arc::tostring(user.KeepFinished());
00096   std::string filename;
00097   filename = user.ControlDir() + "/job." + desc.get_id() + ".description";
00098   if(parse_job_req(filename,job_desc) != JobReqSuccess) return false;
00099   if(job_desc.reruns>user.Reruns()) job_desc.reruns=user.Reruns();
00100   if((job_desc.diskspace>user.DiskSpace()) || (job_desc.diskspace==0)) {
00101     job_desc.diskspace=user.DiskSpace();
00102   };
00103   // Adjust number of rtes - exclude existing ones
00104   job_desc.rtes = filter_rtes(runtime_config_dir(),job_desc.rte);
00105   if(!job_local_write_file(desc,user,job_desc)) return false;
00106   if(!job_input_write_file(desc,user,job_desc.inputdata)) return false;
00107   if(!job_output_write_file(desc,user,job_desc.outputdata)) return false;
00108   if(!job_rte_write_file(desc,user,job_desc.rte)) return false;
00109   return true;
00110 }
00111 
00112 /* parse job request, fill job description (local) */
00113 JobReqResult parse_job_req(const std::string &fname,JobLocalDescription &job_desc,std::string* acl, std::string* failure) {
00114   Arc::JobDescription arc_job_desc;
00115   if (!get_arc_job_description(fname, arc_job_desc)) {
00116     if (failure) *failure = "Unable to read or parse job description.";
00117     return JobReqInternalFailure;
00118   }
00119 
00120   if (!arc_job_desc.Resources.RunTimeEnvironment.isResolved()) {
00121     if (failure)
00122       *failure = "Runtime environments have not been resolved.";
00123     return JobReqInternalFailure;
00124   }
00125 
00126   job_desc = arc_job_desc;
00127 
00128   if (acl) return get_acl(arc_job_desc, *acl);
00129   return JobReqSuccess;
00130 }
00131 
00132 typedef struct {
00133   const Arc::JobDescription* arc_job_desc;
00134   const std::string* session_dir;
00135 } set_execs_t;
00136 
00137 static int set_execs_callback(void* arg) {
00138   const Arc::JobDescription& arc_job_desc = *(((set_execs_t*)arg)->arc_job_desc);
00139   const std::string& session_dir = *(((set_execs_t*)arg)->session_dir);
00140   if (set_execs(arc_job_desc, session_dir)) return 0;
00141   return -1;
00142 };
00143 
00144 /* parse job description and set specified file permissions to executable */
00145 bool set_execs(const JobDescription &desc,const JobUser &user,const std::string &session_dir) {
00146   std::string fname = user.ControlDir() + "/job." + desc.get_id() + ".description";
00147   Arc::JobDescription arc_job_desc;
00148   if (!get_arc_job_description(fname, arc_job_desc)) return false;
00149 
00150   if (user.StrictSession()) {
00151     JobUser tmp_user(user.get_uid()==0?desc.get_uid():user.get_uid());
00152     set_execs_t arg; arg.arc_job_desc=&arc_job_desc; arg.session_dir=&session_dir;
00153     return (RunFunction::run(tmp_user, "set_execs", &set_execs_callback, &arg, 20) == 0);
00154   }
00155   return set_execs(arc_job_desc, session_dir);
00156 }
00157 
00158 bool write_grami(const JobDescription &desc,const JobUser &user,const char *opt_add) {
00159   const std::string fname = user.ControlDir() + "/job." + desc.get_id() + ".description";
00160 
00161   Arc::JobDescription arc_job_desc;
00162   if (!get_arc_job_description(fname, arc_job_desc)) return false;
00163 
00164   return write_grami(arc_job_desc, desc, user, opt_add);
00165 }
00166 
00167 /* extract joboption_jobid from grami file */
00168 std::string read_grami(const JobId &job_id,const JobUser &user) {
00169   const char* local_id_param = "joboption_jobid=";
00170   int l = strlen(local_id_param);
00171   std::string id = "";
00172   char buf[256];
00173   std::string fgrami = user.ControlDir() + "/job." + job_id + ".grami";
00174   std::ifstream f(fgrami.c_str());
00175   if(!f.is_open()) return id;
00176   for(;!f.eof();) {
00177     istream_readline(f,buf,sizeof(buf));
00178     if(strncmp(local_id_param,buf,l)) continue;
00179     if(buf[0]=='\'') {
00180       l++; int ll = strlen(buf);
00181       if(buf[ll-1]=='\'') buf[ll-1]=0;
00182     };
00183     id=buf+l; break;
00184   };
00185   f.close();
00186   return id;
00187 }