Back to index

nordugrid-arc-nox  1.1.0~rc6
job_log.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 
00005 /* write essential information about job started/finished */
00006 #include <fstream>
00007 #include <arc/StringConv.h>
00008 #include <arc/DateTime.h>
00009 #include "../files/info_types.h"
00010 #include "../files/info_log.h"
00011 #include "../conf/environment.h"
00012 #include "job_log.h"
00013 #include <unistd.h>
00014 
00015 #if defined __GNUC__ && __GNUC__ >= 3
00016 
00017 #include <limits>
00018 #define istream_readline(__f,__s,__n) {      \
00019    __f.get(__s,__n,__f.widen('\n'));         \
00020    if(__f.fail()) __f.clear();               \
00021    __f.ignore(std::numeric_limits<std::streamsize>::max(), __f.widen('\n')); \
00022 }
00023 
00024 #else
00025 
00026 #define istream_readline(__f,__s,__n) {      \
00027    __f.get(__s,__n,'\n');         \
00028    if(__f.fail()) __f.clear();               \
00029    __f.ignore(INT_MAX,'\n'); \
00030 }
00031 
00032 #endif
00033 
00034 JobLog::JobLog(void):filename(""),proc(NULL),last_run(0),ex_period(0) {
00035 }
00036 
00037 JobLog::JobLog(const char* fname):proc(NULL),last_run(0),ex_period(0) {
00038   filename=fname;
00039 }
00040 
00041 void JobLog::SetOutput(const char* fname) {
00042   filename=fname;
00043 }
00044 
00045 void JobLog::SetExpiration(time_t period) {
00046   ex_period=period;
00047 }
00048 
00049 bool JobLog::open_stream(std::ofstream &o) {
00050     o.open(filename.c_str(),std::ofstream::app);
00051     if(!o.is_open()) return false;
00052     o<<" ";
00053     o<<(Arc::Time().str(Arc::UserTime));
00054     return true;
00055 }
00056 
00057 bool JobLog::start_info(JobDescription &job,const JobUser &user) {
00058   if(filename.length()==0) return true;
00059     std::ofstream o;
00060     if(!open_stream(o)) return false;
00061     o<<"Started - job id: "<<job.get_id()<<", unix user: "<<job.get_uid()<<":"<<job.get_gid()<<", ";
00062     if(job.GetLocalDescription(user)) {
00063       JobLocalDescription *job_desc = job.get_local();
00064       std::string tmps;
00065       tmps=job_desc->jobname; make_escaped_string(tmps,'"');
00066       o<<"name: \""<<tmps<<"\", ";
00067       tmps=job_desc->DN; make_escaped_string(tmps,'"');
00068       o<<"owner: \""<<tmps<<"\", ";
00069       o<<"lrms: "<<job_desc->lrms<<", queue: "<<job_desc->queue;
00070     };
00071     o<<std::endl;
00072     o.close();
00073     return true;
00074 }
00075 
00076 bool JobLog::finish_info(JobDescription &job,const JobUser &user) {
00077   if(filename.length()==0) return true;
00078     std::ofstream o;
00079     if(!open_stream(o)) return false;
00080     o<<"Finished - job id: "<<job.get_id()<<", unix user: "<<job.get_uid()<<":"<<job.get_gid()<<", ";
00081     std::string tmps;
00082     if(job.GetLocalDescription(user)) {
00083       JobLocalDescription *job_desc = job.get_local();
00084       tmps=job_desc->jobname; make_escaped_string(tmps,'"');
00085       o<<"name: \""<<tmps<<"\", ";
00086       tmps=job_desc->DN; make_escaped_string(tmps,'"');
00087       o<<"owner: \""<<tmps<<"\", ";
00088       o<<"lrms: "<<job_desc->lrms<<", queue: "<<job_desc->queue;
00089       if(job_desc->localid.length() >0) o<<", lrmsid: "<<job_desc->localid;
00090     };
00091     tmps = job.GetFailure();
00092     if(tmps.length()) {
00093       for(std::string::size_type i=0;;) {
00094         i=tmps.find('\n',i);
00095         if(i==std::string::npos) break;
00096         tmps[i]='.';
00097       };
00098       make_escaped_string(tmps,'"');
00099       o<<", failure: \""<<tmps<<"\"";
00100     };
00101     o<<std::endl;
00102     o.close();
00103     return true;
00104 } 
00105 
00106 
00107 bool JobLog::read_info(std::fstream &i,bool &processed,bool &jobstart,struct tm &t,JobId &jobid,JobLocalDescription &job_desc,std::string &failure) {
00108   processed=false;
00109   if(!i.is_open()) return false;
00110   char line[4096];
00111   std::streampos start_p=i.tellp();
00112   istream_readline(i,line,sizeof(line));
00113   std::streampos end_p=i.tellp();
00114   if((line[0] == 0) || (line[0] == '*')) { processed=true; return true; };
00115   char* p = line;
00116   if((*p) == ' ') p++;
00117   // struct tm t;
00118   /* read time */
00119   if(sscanf(p,"%d-%d-%d %d:%d:%d ",
00120        &t.tm_mday,&t.tm_mon,&t.tm_year,&t.tm_hour,&t.tm_min,&t.tm_sec) != 6) {
00121     return false;
00122   };
00123   t.tm_year-=1900;
00124   t.tm_mon-=1;
00125   /* skip time */
00126   for(;(*p) && ((*p)==' ');p++) {} if(!(*p)) return false;
00127   for(;(*p) && ((*p)!=' ');p++) {} if(!(*p)) return false;
00128   for(;(*p) && ((*p)==' ');p++) {} if(!(*p)) return false;
00129   for(;(*p) && ((*p)!=' ');p++) {} if(!(*p)) return false;
00130   for(;(*p) && ((*p)==' ');p++) {} if(!(*p)) return false;
00131   // bool jobstart;
00132   if(strncmp("Finished - ",p,11) == 0) {
00133     jobstart=false; p+=11;
00134   } else if(strncmp("Started - ",p,10) == 0) {
00135     jobstart=true; p+=10;
00136   } else {
00137     return false;
00138   };
00139   /* read values */
00140   char* name;
00141   char* value;
00142   char* pp;
00143   for(;;) {
00144     for(;(*p) && ((*p)==' ');p++) {} if(!(*p)) break;
00145     if((pp=strchr(p,':')) == NULL) break;
00146     name=p; (*pp)=0; pp++;
00147     for(;(*pp) && ((*pp)==' ');pp++) {}
00148     value=pp;
00149     if((*value) == '"') {
00150       value++;
00151       pp=make_unescaped_string(value,'"');
00152       for(;(*pp) && ((*pp) != ',');pp++) {}
00153       if((*pp)) pp++;
00154     } else {
00155       for(;(*pp) && ((*pp) != ',');pp++) {}
00156       if((*pp)) { (*pp)=0; pp++; };
00157     };
00158     p=pp;
00159     /* use name:value pair */
00160     if(strcasecmp("job id",name) == 0) {
00161       jobid=value;
00162     } else if(strcasecmp("name",name) == 0) {
00163       job_desc.jobname=value;
00164     } else if(strcasecmp("unix user",name) == 0) {
00165 
00166     } else if(strcasecmp("owner",name) == 0) {
00167       job_desc.DN=value;
00168     } else if(strcasecmp("lrms",name) == 0) {
00169       job_desc.lrms=value;
00170     } else if(strcasecmp("queue",name) == 0) {
00171       job_desc.queue=value;
00172     } else if(strcasecmp("lrmsid",name) == 0) {
00173       job_desc.localid=value;
00174     } else if(strcasecmp("failure",name) == 0) {
00175       failure=value;
00176     } else {
00177 
00178     };
00179   };
00180   i.seekp(start_p); i<<"*"; i.seekp(end_p);
00181   return true;
00182 }
00183 
00184 #ifndef NO_GLOBUS_CODE
00185 
00186 bool JobLog::RunReporter(JobUsers &users) {
00187   //if(!is_reporting()) return true;
00188   if(proc != NULL) {
00189     if(proc->Running()) return true; /* running */
00190     delete proc;
00191     proc=NULL;
00192   };
00193   if(time(NULL) < (last_run+3600)) return true; // once per hour
00194   last_run=time(NULL);
00195   if(users.size() <= 0) return true; // no users to report
00196   const char** args = (const char**)malloc(sizeof(char*)*(users.size()+6)); 
00197   if(args == NULL) return false;
00198   std::string cmd = nordugrid_libexec_loc()+"/logger";
00199   int argc=0; args[argc++]=(char*)cmd.c_str();
00200   std::string ex_str = Arc::tostring(ex_period);
00201   if(ex_period) {
00202     args[argc++]="-E";
00203     args[argc++]=(char*)ex_str.c_str();
00204   };
00205   for(JobUsers::iterator i = users.begin();i != users.end();++i) {
00206     args[argc++]=(char*)(i->ControlDir().c_str());
00207   };
00208   args[argc]=NULL;
00209   JobUser user(getuid());
00210   user.SetControlDir(users.begin()->ControlDir());
00211   bool res = RunParallel::run(user,"logger",args,&proc,false,false);
00212   free(args);
00213   return res;
00214 }
00215 
00216 #endif // NO_GLOBUS_CODE
00217 
00218 bool JobLog::SetReporter(const char* destination) {
00219   if(destination) urls.push_back(std::string(destination));
00220   return true;
00221 }
00222 
00223 bool JobLog::make_file(JobDescription &job,JobUser &user) {
00224   //if(!is_reporting()) return true;
00225   if((job.get_state() != JOB_STATE_ACCEPTED) &&
00226      (job.get_state() != JOB_STATE_FINISHED)) return true;
00227   bool result = true;
00228   // for configured loggers
00229   for(std::list<std::string>::iterator u = urls.begin();u!=urls.end();u++) {
00230     if(u->length()) result = job_log_make_file(job,user,*u,report_config) && result;
00231   };
00232   // for user's logger
00233   JobLocalDescription* local;
00234   if(!job.GetLocalDescription(user)) {
00235     result=false;
00236   } else if((local=job.get_local()) == NULL) { 
00237     result=false;
00238   } else {
00239     if(!(local->jobreport.empty())) 
00240     {
00241       for (std::list<std::string>::iterator v = local->jobreport.begin();
00242           v!=local->jobreport.end(); v++)
00243        {
00244          result = job_log_make_file(job,user,*v,report_config) && result;
00245        }
00246     };
00247   };
00248   return result;
00249 }
00250 
00251 void JobLog::set_credentials(std::string &key_path,std::string &certificate_path,std::string &ca_certificates_dir)
00252 {
00253   if (!key_path.empty()) 
00254     report_config.push_back(std::string("key_path=")+key_path);
00255   if (!certificate_path.empty())
00256     report_config.push_back(std::string("certificate_path=")+certificate_path);
00257   if (!ca_certificates_dir.empty())
00258     report_config.push_back(std::string("ca_certificates_dir=")+ca_certificates_dir);
00259 }
00260 
00261 JobLog::~JobLog(void) {
00262 #ifndef NO_GLOBUS_CODE
00263   if(proc != NULL) {
00264     if(proc->Running()) proc->Kill(0);
00265     delete proc;
00266     proc=NULL;
00267   };
00268 #endif // NO_GLOBUS_CODE
00269 }
00270