Back to index

nordugrid-arc-nox  1.1.0~rc6
states.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 
00005 /*
00006   Filename: states.cc
00007   keeps list of states
00008   acts on states
00009 */
00010 
00011 #include <string>
00012 #include <list>
00013 #include <iostream>
00014 
00015 #include "../files/info_files.h"
00016 #include "../jobs/job_request.h"
00017 #include "../run/run_parallel.h"
00018 #include "../conf/environment.h"
00019 #include "../mail/send_mail.h"
00020 /* #include "../url/url_options.h" */
00021 #include "../log/job_log.h"
00022 #include "../conf/conf_file.h"
00023 #include "../jobs/users.h"
00024 #include "../jobs/job.h"
00025 #include "../jobs/plugins.h"
00026 #ifdef HAVE_MYPROXY_H
00027 #include "../misc/proxy.h"
00028 #include "../misc/myproxy_proxy.h"
00029 #endif
00030 #include <iostream>
00031 #include <sys/types.h>
00032 #include <sys/stat.h>
00033 #include <unistd.h>
00034 #include <glibmm.h>
00035 #include <arc/DateTime.h>
00036 #include <arc/StringConv.h>
00037 #include <arc/URL.h>
00038 #include <arc/credential/VOMSUtil.h>
00039 
00040 static Arc::Logger& logger = Arc::Logger::getRootLogger();
00041 
00042 #include "states.h"
00043 
00044 
00045 int JobsList::jobs_num[JOB_STATE_NUM] = { 0, 0, 0, 0, 0, 0, 0, 0, 0 };
00046 int JobsList::jobs_pending = 0;
00047 int JobsList::max_jobs_processing=DEFAULT_MAX_JOBS;
00048 int JobsList::max_jobs_processing_emergency=1;
00049 int JobsList::max_jobs_running=-1;
00050 int JobsList::max_jobs=-1;
00051 int JobsList::max_downloads=-1;
00052 unsigned int JobsList::max_processing_share = 0;
00053 std::map<std::string, int> JobsList::limited_share;
00054 std::string JobsList::share_type = "";
00055 unsigned long long int JobsList::min_speed=0;
00056 time_t JobsList::min_speed_time=300;
00057 unsigned long long int JobsList::min_average_speed=0;
00058 time_t JobsList::max_inactivity_time=300;
00059 int JobsList::max_retries=DEFAULT_MAX_RETRIES;
00060 bool JobsList::use_secure_transfer=false; /* secure data transfer is OFF by default !!! */
00061 bool JobsList::use_passive_transfer=false;
00062 bool JobsList::use_local_transfer=false;
00063 unsigned int JobsList::wakeup_period = 120; // default wakeup every 3 min.
00064 
00065 
00066 #ifdef NO_GLOBUS_CODE
00067 ContinuationPlugins::ContinuationPlugins(void) { };
00068 ContinuationPlugins::~ContinuationPlugins(void) { };
00069 bool ContinuationPlugins::add(const char* state,unsigned int timeout,const char* command) { return true; };
00070 bool ContinuationPlugins::add(job_state_t state,unsigned int timeout,const char* command) { return true; };
00071 bool ContinuationPlugins::add(const char* state,const char* options,const char* command) { return true; };
00072 bool ContinuationPlugins::add(job_state_t state,const char* options,const char* command) { return true; };
00073 void ContinuationPlugins::run(const JobDescription &job,const JobUser& user,std::list<ContinuationPlugins::result_t>& results) { };
00074 void RunPlugin::set(const std::string& cmd) { };
00075 void RunPlugin::set(char const * const * args) { };
00076 #endif
00077 
00078 JobsList::JobsList(JobUser &user,ContinuationPlugins &plugins) {
00079   this->user=&user;
00080   this->plugins=&plugins;
00081   jobs.clear();
00082 }
00083  
00084 JobsList::~JobsList(void){
00085 }
00086 
00087 JobsList::iterator JobsList::FindJob(const JobId &id){
00088   iterator i;
00089   for(i=jobs.begin();i!=jobs.end();++i) {
00090     if((*i) == id) break;
00091   };
00092   return i;
00093 }
00094 
00095 bool JobsList::AddJobNoCheck(const JobId &id,uid_t uid,gid_t gid){
00096   iterator i;
00097   return AddJobNoCheck(id,i,uid,gid);
00098 }
00099 
00100 bool JobsList::AddJobNoCheck(const JobId &id,JobsList::iterator &i,uid_t uid,gid_t gid){
00101   i=jobs.insert(jobs.end(),JobDescription(id,user->SessionRoot(id) + "/" + id));
00102   i->keep_finished=user->KeepFinished();
00103   i->keep_deleted=user->KeepDeleted();
00104   i->set_uid(uid,gid);
00105   return true;
00106 }
00107 
00108 bool JobsList::AddJob(const JobId &id,uid_t uid,gid_t gid){
00109   /* jobs should be unique */
00110   if(FindJob(id) != jobs.end()) return false;
00111   logger.msg(Arc::INFO,"%s: Added",id);
00112   iterator i=jobs.insert(jobs.end(),
00113          JobDescription(id,user->SessionRoot(id) + "/" + id));
00114   i->keep_finished=user->KeepFinished();
00115   i->keep_deleted=user->KeepDeleted();
00116   i->set_uid(uid,gid);
00117   return true;
00118 }
00119 
00120 bool JobsList::AddJob(JobUser &user,const JobId &id,uid_t uid,gid_t gid){
00121   if((&user) != NULL) {
00122     if((this->user) == NULL) { this->user = &user; }
00123     else {
00124       if(this->user != &user) { /* incompatible user */
00125         return false;
00126       };
00127     };
00128   };
00129   return AddJob(id,uid,gid);
00130 }
00131 
00132 #ifndef NO_GLOBUS_CODE
00133 
00134 bool JobsList::ActJob(const JobId &id,bool hard_job)  {
00135   iterator i=FindJob(id);
00136   if(i == jobs.end()) return false;
00137   return ActJob(i,hard_job);
00138 }
00139 
00140 void JobsList::CalculateShares(){
00151     // clear shares with 0 count
00152   for (std::map<std::string, int>::iterator i = preparing_job_share.begin(); i != preparing_job_share.end(); i++)
00153     if (i->second == 0) preparing_job_share.erase(i);
00154   for (std::map<std::string, int>::iterator i = finishing_job_share.begin(); i != finishing_job_share.end(); i++)
00155     if (i->second == 0) finishing_job_share.erase(i);
00156 
00157   // counters of current and potential preparing/finishing jobs
00158   std::map<std::string, int> pre_preparing_job_share = preparing_job_share;
00159   std::map<std::string, int> pre_finishing_job_share = finishing_job_share;
00160 
00161   for (iterator i=jobs.begin();i!=jobs.end();i++) {
00162     if (i->job_state == JOB_STATE_ACCEPTED) {
00163       // is job ready to move to preparing?
00164       if (i->retries == 0 && i->local->processtime != -1) {
00165         if (i->local->processtime <= time(NULL)) {
00166             pre_preparing_job_share[i->transfer_share]++;
00167         }
00168       }
00169       else if (i->next_retry <= time(NULL)) {
00170         pre_preparing_job_share[i->transfer_share]++;
00171       }
00172     }
00173     else if (i->job_state == JOB_STATE_INLRMS) {
00174       // is job ready to move to finishing?
00175       if (job_lrms_mark_check(i->job_id,*user) && i->next_retry <= time(NULL)) {
00176         pre_finishing_job_share[i->transfer_share]++;
00177       }
00178     }
00179   };
00180   
00181   // Now calculate how many of limited transfer shares are active
00182   // We need to try to preserve the maximum number of transfer threads 
00183   // for each active limited share. Jobs that belong to limited 
00184   // shares will be excluded from calculation of a share limit later
00185   int privileged_total_pre_preparing = 0;
00186   int privileged_total_pre_finishing = 0;
00187   int privileged_jobs_processing = 0;
00188   int privileged_preparing_job_share = 0;
00189   int privileged_finishing_job_share = 0;
00190   for (std::map<std::string, int>::iterator i = limited_share.begin(); i != limited_share.end(); i++) {
00191     if (pre_preparing_job_share.find(i->first) != pre_preparing_job_share.end()) {
00192       privileged_preparing_job_share++;
00193       privileged_jobs_processing += i->second;
00194       privileged_total_pre_preparing += pre_preparing_job_share[i->first];
00195     }
00196     if (pre_finishing_job_share.find(i->first) != pre_finishing_job_share.end()) {
00197       privileged_finishing_job_share++;
00198       privileged_jobs_processing += i->second;
00199       privileged_total_pre_finishing += pre_finishing_job_share[i->first];
00200     }
00201   }
00202   int unprivileged_jobs_processing = max_jobs_processing - privileged_jobs_processing;
00203 
00204   // calculate the number of slots that can be allocated per unprivileged share
00205   // count the total number of unprivileged jobs (pre)preparing
00206   int total_pre_preparing = 0;
00207   int unprivileged_preparing_limit;
00208   int unprivileged_preparing_job_share = pre_preparing_job_share.size() - privileged_preparing_job_share;
00209   for (std::map<std::string, int>::iterator i = pre_preparing_job_share.begin(); i != pre_preparing_job_share.end(); i++) { 
00210     total_pre_preparing += i->second;
00211   }
00212   // exclude privileged jobs
00213   total_pre_preparing -= privileged_total_pre_preparing;
00214   if (max_jobs_processing == -1 || unprivileged_preparing_job_share <= (unprivileged_jobs_processing / max_processing_share))
00215     unprivileged_preparing_limit = max_processing_share;
00216   else if (unprivileged_preparing_job_share > unprivileged_jobs_processing || unprivileged_preparing_job_share <= 0)
00217     unprivileged_preparing_limit = 1;
00218   else if (total_pre_preparing <= unprivileged_jobs_processing)
00219     unprivileged_preparing_limit = max_processing_share;
00220   else
00221     unprivileged_preparing_limit = unprivileged_jobs_processing / unprivileged_preparing_job_share;
00222 
00223   // count the total number of jobs (pre)finishing
00224   int total_pre_finishing = 0;
00225   int unprivileged_finishing_limit;
00226   int unprivileged_finishing_job_share = pre_finishing_job_share.size() - privileged_finishing_job_share;
00227   for (std::map<std::string, int>::iterator i = pre_finishing_job_share.begin(); i != pre_finishing_job_share.end(); i++) {
00228     total_pre_finishing += i->second;
00229   }
00230   // exclude privileged jobs
00231   total_pre_finishing -= privileged_total_pre_finishing;
00232   if (max_jobs_processing == -1 || unprivileged_finishing_job_share <= (unprivileged_jobs_processing / max_processing_share))
00233     unprivileged_finishing_limit = max_processing_share;
00234   else if (unprivileged_finishing_job_share > unprivileged_jobs_processing || unprivileged_finishing_job_share <= 0)
00235     unprivileged_finishing_limit = 1;
00236   else if (total_pre_finishing <= unprivileged_jobs_processing)
00237     unprivileged_finishing_limit = max_processing_share;
00238   else
00239     unprivileged_finishing_limit = unprivileged_jobs_processing / unprivileged_finishing_job_share;
00240 
00241   // if there are queued jobs for both preparing and finishing, split the share between the two states
00242   if (max_jobs_processing > 0 && total_pre_preparing > unprivileged_jobs_processing/2 && total_pre_finishing > unprivileged_jobs_processing/2) {
00243     unprivileged_preparing_limit = unprivileged_preparing_limit < 2 ? 1 : unprivileged_preparing_limit/2;
00244     unprivileged_finishing_limit = unprivileged_finishing_limit < 2 ? 1 : unprivileged_finishing_limit/2;
00245   }
00246 
00247   if (max_jobs_processing > 0 && privileged_total_pre_preparing > privileged_jobs_processing/2 && privileged_total_pre_finishing > privileged_jobs_processing/2)
00248   for (std::map<std::string, int>::iterator i = limited_share.begin(); i != limited_share.end(); i++)
00249     i->second = i->second < 2 ? 1 : i->second/2; 
00250       
00251   preparing_max_share = pre_preparing_job_share;
00252   finishing_max_share = pre_finishing_job_share;
00253   for (std::map<std::string, int>::iterator i = preparing_max_share.begin(); i != preparing_max_share.end(); i++){
00254     if (limited_share.find(i->first) != limited_share.end())
00255       i->second = limited_share[i->first];
00256     else
00257       i->second = unprivileged_preparing_limit;
00258   }
00259   for (std::map<std::string, int>::iterator i = finishing_max_share.begin(); i != finishing_max_share.end(); i++){
00260     if (limited_share.find(i->first) != limited_share.end())
00261       i->second = limited_share[i->first];
00262     else
00263       i->second = unprivileged_finishing_limit;
00264   }
00265 }
00266 
00267 bool JobsList::ActJobs(bool hard_job) {
00268 /*
00269    * Need to calculate the shares here here because in the ActJob*
00270    * methods we don't have an overview of all jobs.
00271    * In those methods we check the share to see if each
00272    * job can proceed.
00273 */
00274   if (!JobsList::share_type.empty() && max_processing_share > 0) {
00275     CalculateShares();
00276   } 
00277 
00278   bool res = true;
00279   bool once_more = false;
00280   bool postpone_preparing = false;
00281   bool postpone_finishing = false;
00282   if((max_jobs_processing != -1) && 
00283      (!use_local_transfer) && 
00284      ((JOB_NUM_PROCESSING*3) > (max_jobs_processing*2))) {
00285     if(JOB_NUM_PREPARING > JOB_NUM_FINISHING) { 
00286       postpone_preparing=true; 
00287     } else if(JOB_NUM_PREPARING < JOB_NUM_FINISHING) {
00288       postpone_finishing=true;
00289     };
00290   };
00291   // first pass - optionally skipping some states
00292   for(iterator i=jobs.begin();i!=jobs.end();) {
00293     if(i->job_state == JOB_STATE_UNDEFINED) { once_more=true; }
00294     else if(((i->job_state == JOB_STATE_ACCEPTED) && postpone_preparing) ||
00295             ((i->job_state == JOB_STATE_INLRMS) && postpone_finishing)  ) {
00296       once_more=true;
00297       i++; continue;
00298     };
00299     res &= ActJob(i,hard_job);
00300   };
00301 
00302   /* Recalculation of the shares before the second pass
00303    * to update the shares that appeared as a result of 
00304    * moving some jobs to ACCEPTED during the first pass
00305   */
00306   if (!JobsList::share_type.empty() && max_processing_share > 0) {
00307     CalculateShares();
00308   }
00309 
00310   // second pass - process skipped states and new jobs
00311   if(once_more) for(iterator i=jobs.begin();i!=jobs.end();) {
00312     res &= ActJob(i,hard_job);
00313   };
00314   return res;
00315 }
00316 
00317 bool JobsList::DestroyJobs(bool finished,bool active) {
00318   bool res = true;
00319   for(iterator i=jobs.begin();i!=jobs.end();) {
00320     res &= DestroyJob(i,finished,active);
00321   };
00322   return res;
00323 }
00324 
00325 /* returns false if had to run external process */
00326 bool JobsList::DestroyJob(JobsList::iterator &i,bool finished,bool active) {
00327   logger.msg(Arc::INFO,"%s: Destroying",i->job_id);
00328   job_state_t new_state=i->job_state;
00329   if(new_state == JOB_STATE_UNDEFINED) {
00330     if((new_state=job_state_read_file(i->job_id,*user))==JOB_STATE_UNDEFINED) {
00331       logger.msg(Arc::ERROR,"%s: Can't read state - no comments, just cleaning",i->job_id);
00332       job_clean_final(*i,*user);
00333       if(i->local) { delete i->local; }; i=jobs.erase(i);
00334       return true;
00335     };
00336   };
00337   i->job_state = new_state;
00338   if((new_state == JOB_STATE_FINISHED) && (!finished)) { ++i; return true; };
00339   if(!active) { ++i; return true; };
00340   if((new_state != JOB_STATE_INLRMS) || 
00341      (job_lrms_mark_check(i->job_id,*user))) {
00342     logger.msg(Arc::INFO,"%s: Cleaning control and session directories",i->job_id);
00343     job_clean_final(*i,*user);
00344     if(i->local) { delete i->local; }; i=jobs.erase(i);
00345     return true;
00346   };
00347   logger.msg(Arc::INFO,"%s: This job may be still running - canceling",i->job_id);
00348   bool state_changed = false;
00349   if(!state_submitting(i,state_changed,true)) {
00350     logger.msg(Arc::WARNING,"%s: Cancelation failed (probably job finished) - cleaning anyway",i->job_id);
00351     job_clean_final(*i,*user);
00352     if(i->local) { delete i->local; }; i=jobs.erase(i);
00353     return true;
00354   };
00355   if(!state_changed) { ++i; return false; }; /* child still running */
00356   logger.msg(Arc::INFO,"%s: Cancelation probably succeeded - cleaning",i->job_id);
00357   job_clean_final(*i,*user);
00358   if(i->local) { delete i->local; };
00359   i=jobs.erase(i);
00360   return true;
00361 }
00362 
00363 /* do processing necessary in case of failure */
00364 bool JobsList::FailedJob(const JobsList::iterator &i) {
00365   /* put mark - failed */
00366   if(!job_failed_mark_put(*i,*user,i->failure_reason)) return false;
00367   /* make all output files non-uploadable */
00368   std::list<FileData> fl;
00369   if(!job_output_read_file(i->job_id,*user,fl)) return true; /* no file - no error */
00370   for(std::list<FileData>::iterator ifl=fl.begin();ifl!=fl.end();++ifl) {
00371     // Remove destination without "preserve" option
00372     std::string value = Arc::URL(ifl->lfn).Option("preserve");
00373     if(value != "yes") ifl->lfn="";
00374   };
00375   if(!job_output_write_file(*i,*user,fl)) return false;
00376   if(!(i->local)) {
00377     JobLocalDescription *job_desc = new JobLocalDescription;
00378     if(!job_local_read_file(i->job_id,*user,*job_desc)) {
00379       logger.msg(Arc::ERROR,"%s: Failed reading local information",i->job_id);
00380       delete job_desc;
00381     }
00382     else {
00383       i->local=job_desc;
00384     };
00385   };
00386   if(i->local) {
00387     i->local->uploads=0;
00388     job_local_write_file(*i,*user,*(i->local));
00389   };
00390   return true;
00391 }
00392 
00393 bool JobsList::GetLocalDescription(const JobsList::iterator &i) {
00394   if(!i->GetLocalDescription(*user)) {
00395     logger.msg(Arc::ERROR,"%s: Failed reading local information",i->job_id);
00396     return false;
00397   };
00398   return true;
00399 }
00400 
00401 bool JobsList::state_submitting(const JobsList::iterator &i,bool &state_changed,bool cancel) {
00402   if(i->child == NULL) {
00403     /* no child was running yet, or recovering from fault */
00404     /* write grami file for globus-script-X-submit */
00405     JobLocalDescription* job_desc;
00406     if(i->local) { job_desc=i->local; }
00407     else {
00408       job_desc=new JobLocalDescription;
00409       if(!job_local_read_file(i->job_id,*user,*job_desc)) {
00410         logger.msg(Arc::ERROR,"%s: Failed reading local information",i->job_id);
00411         if(!cancel) i->AddFailure("Internal error: can't read local file");
00412         delete job_desc;
00413         return false;
00414       };
00415       i->local=job_desc;
00416     };
00417     if(!cancel) {  /* in case of cancel all preparations are already done */
00418       const char *local_transfer_s = NULL;
00419       if(use_local_transfer) { 
00420         local_transfer_s="joboption_localtransfer=yes";
00421       };
00422       if(!write_grami(*i,*user,local_transfer_s)) {
00423         logger.msg(Arc::ERROR,"%s: Failed creating grami file",i->job_id);
00424         return false;
00425       };
00426       if(!set_execs(*i,*user,i->SessionDir())) {
00427         logger.msg(Arc::ERROR,"%s: Failed setting executable permissions",i->job_id);
00428         return false;
00429       };
00430       /* precreate file to store diagnostics from lrms */
00431       job_diagnostics_mark_put(*i,*user);
00432       job_lrmsoutput_mark_put(*i,*user);
00433     };
00434     /* submit/cancel job to LRMS using submit/cancel-X-job */
00435     std::string cmd;
00436     if(cancel) { cmd=nordugrid_libexec_loc()+"/cancel-"+job_desc->lrms+"-job"; }
00437     else { cmd=nordugrid_libexec_loc()+"/submit-"+job_desc->lrms+"-job"; };
00438     if(!cancel) {
00439       logger.msg(Arc::INFO,"%s: state SUBMITTING: starting child: %s",i->job_id,cmd);
00440     } else {
00441       if(!job_lrms_mark_check(i->job_id,*user)) {
00442         logger.msg(Arc::INFO,"%s: state CANCELING: starting child: %s",i->job_id,cmd);
00443       } else {
00444         logger.msg(Arc::INFO,"%s: Job has completed already. No action taken to cancel",i->job_id);
00445         state_changed=true;
00446         return true;
00447       }
00448     };
00449     std::string grami = user->ControlDir()+"/job."+(*i).job_id+".grami";
00450     std::string cfg_path = nordugrid_config_loc();
00451     char const * args[5] ={ cmd.c_str(), "--config", cfg_path.c_str(), grami.c_str(), NULL };
00452     job_errors_mark_put(*i,*user);
00453     if(!RunParallel::run(*user,*i,args,&(i->child))) {
00454       if(!cancel) {
00455         i->AddFailure("Failed initiating job submission to LRMS");
00456         logger.msg(Arc::ERROR,"%s: Failed running submission process",i->job_id);
00457       } else {
00458         logger.msg(Arc::ERROR,"%s: Failed running cancel process",i->job_id);
00459       };
00460       return false;
00461     };
00462     return true;
00463   }
00464   else {
00465     /* child was run - check exit code */
00466     if(i->child->Running()) {
00467       /* child is running - come later */
00468       return true;
00469     };
00470     if(!cancel) {
00471       logger.msg(Arc::INFO,"%s: state SUBMITTING: child exited with code %i",i->job_id,i->child->Result());
00472     } else {
00473       logger.msg(Arc::INFO,"%s: state CANCELING: child exited with code %i",i->job_id,i->child->Result());
00474     };
00475     if(i->child->Result() != 0) { 
00476       if(!cancel) {
00477         logger.msg(Arc::ERROR,"%s: Job submission to LRMS failed",i->job_id);
00478         JobFailStateRemember(i,JOB_STATE_SUBMITTING);
00479       } else {
00480         logger.msg(Arc::ERROR,"%s: Failed to cancel running job",i->job_id);
00481       };
00482       delete i->child; i->child=NULL;
00483       if(!cancel) i->AddFailure("Job submission to LRMS failed");
00484       return false;
00485     };
00486     if(!cancel) {
00487       delete i->child; i->child=NULL;
00488       /* success code - get LRMS job id */
00489       std::string local_id=read_grami(i->job_id,*user);
00490       if(local_id.length() == 0) {
00491         logger.msg(Arc::ERROR,"%s: Failed obtaining lrms id",i->job_id);
00492         i->AddFailure("Failed extracting LRMS ID due to some internal error");
00493         JobFailStateRemember(i,JOB_STATE_SUBMITTING);
00494         return false;
00495       };
00496       /* put id into local information file */
00497       if(!GetLocalDescription(i)) {
00498         i->AddFailure("Internal error");
00499         return false;
00500       };   
00501       /*
00502       JobLocalDescription *job_desc;
00503       if(i->local) { job_desc=i->local; }
00504       else { job_desc=new JobLocalDescription; };
00505       if(i->local == NULL) {
00506         if(!job_local_read_file(i->job_id,*user,*job_desc)) {
00507           logger.msg(Arc::ERROR,"%s: Failed reading local information",i->job_id);
00508           i->AddFailure("Internal error");
00509           delete job_desc; return false;
00510         };
00511         i->local=job_desc;
00512       };
00513       */
00514       i->local->localid=local_id;
00515       if(!job_local_write_file(*i,*user,*(i->local))) {
00516         i->AddFailure("Internal error");
00517         logger.msg(Arc::ERROR,"%s: Failed writing local information",i->job_id);
00518         return false;
00519       };
00520     } else {
00521       /* job diagnostics collection done in backgroud (scan-*-job script) */
00522       if(!job_lrms_mark_check(i->job_id,*user)) {
00523         /* job diag not yet collected - come later */
00524         return true;
00525       } else {
00526         logger.msg(Arc::INFO,"%s: state CANCELING: job diagnostics collected",i->job_id);
00527         delete i->child; i->child=NULL;
00528         job_diagnostics_mark_move(*i,*user);
00529       };
00530     };
00531     /* move to next state */
00532     state_changed=true;
00533     return true;
00534   };
00535 }
00536 
00537 bool JobsList::state_loading(const JobsList::iterator &i,bool &state_changed,bool up,bool &retry) {
00538   /* RSL was analyzed/parsed - now run child process downloader
00539      to download job input files and to wait for user uploaded ones */
00540   if(i->child == NULL) { /* no child started */
00541     logger.msg(Arc::INFO,"%s: state: PREPARING/FINISHING: starting new child",i->job_id);
00542     /* no child was running yet, or recovering from fault */
00543     /* run it anyway and exit code will give more inforamtion */
00544     bool switch_user = (user->CachePrivate() || user->StrictSession());
00545     std::string cmd; 
00546     if(up) { cmd=nordugrid_libexec_loc()+"/uploader"; }
00547     else { cmd=nordugrid_libexec_loc()+"/downloader"; };
00548     uid_t user_id = user->get_uid();
00549     if(user_id == 0) user_id=i->get_uid();
00550     std::string user_id_s = Arc::tostring(user_id);
00551     std::string max_files_s;
00552     std::string min_speed_s;
00553     std::string min_speed_time_s;
00554     std::string min_average_speed_s;
00555     std::string max_inactivity_time_s;
00556     int argn=4;
00557     const char* args[] = {
00558       cmd.c_str(),
00559       "-U",
00560       user_id_s.c_str(),
00561       "-f",
00562       NULL, // -n
00563       NULL, // (-n)
00564       NULL, // -c
00565       NULL, // -p
00566       NULL, // -l
00567       NULL, // -s
00568       NULL, // (-s)
00569       NULL, // -S
00570       NULL, // (-S)
00571       NULL, // -a
00572       NULL, // (-a)
00573       NULL, // -i
00574       NULL, // (-i)
00575       NULL, // -d
00576       NULL, // (-d)
00577       NULL, // -C
00578       NULL, // (-C)
00579       NULL, // id
00580       NULL, // control
00581       NULL, // session
00582       NULL,
00583       NULL
00584     };
00585     if(JobsList::max_downloads > 0) {
00586       max_files_s=Arc::tostring(JobsList::max_downloads);
00587       args[argn]="-n"; argn++;
00588       args[argn]=(char*)(max_files_s.c_str()); argn++;
00589     };
00590     if(!use_secure_transfer) { 
00591       args[argn]="-c"; argn++;
00592     };
00593     if(use_passive_transfer) { 
00594       args[argn]="-p"; argn++;
00595     };
00596     if(use_local_transfer) { 
00597       args[argn]="-l"; argn++;
00598     };
00599     if(JobsList::min_speed) {
00600       min_speed_s=Arc::tostring(JobsList::min_speed);
00601       min_speed_time_s=Arc::tostring(JobsList::min_speed_time);
00602       args[argn]="-s"; argn++; 
00603       args[argn]=(char*)(min_speed_s.c_str()); argn++;
00604       args[argn]="-S"; argn++; 
00605       args[argn]=(char*)(min_speed_time_s.c_str()); argn++;
00606     };
00607     if(JobsList::min_average_speed) {
00608       min_average_speed_s=Arc::tostring(JobsList::min_average_speed);
00609       args[argn]="-a"; argn++; 
00610       args[argn]=(char*)(min_average_speed_s.c_str()); argn++;
00611     };
00612     if(JobsList::max_inactivity_time) {
00613       max_inactivity_time_s=Arc::tostring(JobsList::max_inactivity_time);
00614       args[argn]="-i"; argn++; 
00615       args[argn]=(char*)(max_inactivity_time_s.c_str()); argn++;
00616     };
00617     std::string debug_level = Arc::level_to_string(Arc::Logger::getRootLogger().getThreshold());
00618     std::string cfg_path = nordugrid_config_loc();
00619     if (!debug_level.empty()) {
00620       args[argn]="-d"; argn++;
00621       args[argn]=(char*)(debug_level.c_str()); argn++;
00622     }
00623     if (!nordugrid_config_loc().empty()) {
00624       args[argn]="-C"; argn++;
00625       args[argn]=(char*)(cfg_path.c_str()); argn++;
00626     }
00627     args[argn]=(char*)(i->job_id.c_str()); argn++;
00628     args[argn]=(char*)(user->ControlDir().c_str()); argn++;
00629     args[argn]=(char*)(i->SessionDir().c_str()); argn++;
00630 
00631     if(!up) { logger.msg(Arc::INFO,"%s: State PREPARING: starting child: %s",i->job_id,args[0]); }
00632     else { logger.msg(Arc::INFO,"%s: State FINISHING: starting child: %s",i->job_id,args[0]); };
00633     job_errors_mark_put(*i,*user);
00634     job_restart_mark_remove(i->job_id,*user);
00635     if(!RunParallel::run(*user,*i,(char**)args,&(i->child),switch_user)) {
00636       logger.msg(Arc::ERROR,"%s: Failed to run down/uploader process",i->job_id);
00637       if(up) {
00638         i->AddFailure("Failed to run uploader (post-processing)");
00639       } else {
00640         i->AddFailure("Failed to run downloader (pre-processing)");
00641       };
00642       return false;
00643     };
00644   } else {
00645     if(i->child->Running()) {
00646       logger.msg(Arc::VERBOSE,"%s: State: PREPARING/FINISHING: child is running",i->job_id);
00647       /* child is running - come later */
00648       return true;
00649     };
00650     /* child was run - check exit code */
00651     if(!up) { logger.msg(Arc::INFO,"%s: State: PREPARING: child exited with code: %i",i->job_id,i->child->Result()); }
00652     else { logger.msg(Arc::INFO,"%s: State: FINISHING: child exited with code: %i",i->job_id,i->child->Result()); };
00653     if(i->child->Result() != 0) { 
00654       if(i->child->Result() == 1) { 
00655         /* unrecoverable failure detected - all we can do is to kill the job */
00656         if(up) {
00657           logger.msg(Arc::ERROR,"%s: State: FINISHING: unrecoverable error detected (exit code 1)",i->job_id);
00658           i->AddFailure("Failed in files upload (post-processing)");
00659         } else {
00660           logger.msg(Arc::ERROR,"%s: State: PREPARING: unrecoverable error detected (exit code 1)",i->job_id);
00661           i->AddFailure("Failed in files download (pre-processing)");
00662         };
00663       } else if(i->child->Result() == 3) {
00664         /* in case of expired credentials there is a chance to get them 
00665            from credentials server - so far myproxy only */
00666 #ifdef HAVE_MYPROXY_H
00667         if(GetLocalDescription(i)) {
00668           i->AddFailure("Internal error");
00669           if(i->local->credentialserver.length()) {
00670             std::string new_proxy_file =
00671                     user->ControlDir()+"/job."+i->job_id+".proxy.tmp";
00672             std::string old_proxy_file =
00673                     user->ControlDir()+"/job."+i->job_id+".proxy";
00674             remove(new_proxy_file.c_str());
00675             int h = open(new_proxy_file.c_str(),
00676                     O_WRONLY | O_CREAT | O_EXCL,S_IRUSR | S_IWUSR);
00677             if(h!=-1) {
00678               close(h);
00679               if(myproxy_renew(old_proxy_file.c_str(),new_proxy_file.c_str(),
00680                       i->local->credentialserver.c_str())) {
00681                 renew_proxy(old_proxy_file.c_str(),new_proxy_file.c_str());
00682                 /* imitate rerun request */
00683                 job_restart_mark_put(*i,*user);
00684               };
00685             };
00686           };
00687         };
00688 #endif
00689         if(up) {
00690           logger.msg(Arc::ERROR,"%s: State: FINISHING: credentials probably expired (exit code 3)",i->job_id);
00691           i->AddFailure("Failed in files upload due to expired credentials - try to renew");
00692         } else {
00693           logger.msg(Arc::ERROR,"%s: State: PREPARING: credentials probably expired (exit code 3)",i->job_id);
00694           i->AddFailure("Failed in files download due to expired credentials - try to renew");
00695         };
00696       } else if(i->child->Result() == 4) { // retryable cache error
00697         logger.msg(Arc::DEBUG, "%s: State: PREPARING/FINISHING: retryable error", i->job_id);
00698         delete i->child; i->child=NULL;
00699         retry = true;
00700         return true;
00701       } 
00702       else {
00703         if(up) {
00704           logger.msg(Arc::ERROR,"%s: State: FINISHING: some error detected (exit code %i). Recover from such type of errors is not supported yet.",i->job_id,i->child->Result());
00705           i->AddFailure("Failed in files upload (post-processing)");
00706         } else {
00707           logger.msg(Arc::ERROR,"%s: State: PREPARING: some error detected (exit code %i). Recover from such type of errors is not supported yet.",i->job_id,i->child->Result());
00708           i->AddFailure("Failed in files download (pre-processing)");
00709         };
00710       };
00711       delete i->child; i->child=NULL;
00712       if(up) {
00713         JobFailStateRemember(i,JOB_STATE_FINISHING);
00714       } else {
00715         JobFailStateRemember(i,JOB_STATE_PREPARING);
00716       };
00717       return false;
00718     };
00719     /* success code - move to next state */
00720     state_changed=true;
00721     delete i->child; i->child=NULL;
00722   };
00723   return true;
00724 }
00725 
00726 bool JobsList::JobPending(JobsList::iterator &i) {
00727   if(i->job_pending) return true;
00728   i->job_pending=true; 
00729   return job_state_write_file(*i,*user,i->job_state,true);
00730 }
00731 
00732 job_state_t JobsList::JobFailStateGet(const JobsList::iterator &i) {
00733   if(!GetLocalDescription(i)) {
00734     return JOB_STATE_UNDEFINED;
00735   };
00736   if(i->local->failedstate.length() == 0) { return JOB_STATE_UNDEFINED; };
00737   for(int n = 0;states_all[n].name != NULL;n++) {
00738     if(!strcmp(states_all[n].name,i->local->failedstate.c_str())) {
00739       i->local->failedstate="";
00740       if(i->local->reruns <= 0) {
00741         logger.msg(Arc::ERROR,"%s: Job is not allowed to be rerun anymore",i->job_id);
00742         job_local_write_file(*i,*user,*(i->local));
00743         return JOB_STATE_UNDEFINED;
00744       };
00745       i->local->reruns--;
00746       job_local_write_file(*i,*user,*(i->local));
00747       return states_all[n].id;
00748     };
00749   };
00750   logger.msg(Arc::ERROR,"%s: Job failed in unknown state. Won't rerun.",i->job_id);
00751   i->local->failedstate="";
00752   job_local_write_file(*i,*user,*(i->local));
00753   return JOB_STATE_UNDEFINED;
00754 }
00755 
00756 bool JobsList::RecreateTransferLists(const JobsList::iterator &i) {
00757   // Recreate list of output and input files
00758   std::list<FileData> fl_old;
00759   std::list<FileData> fl_new;
00760   std::list<FileData> fi_old;
00761   std::list<FileData> fi_new;
00762   // keep local info
00763   if(!GetLocalDescription(i)) return false;
00764   // keep current lists
00765   if(!job_output_read_file(i->job_id,*user,fl_old)) {
00766     logger.msg(Arc::ERROR,"%s: Failed to read list of output files",i->job_id);
00767     return false;
00768   };
00769   if(!job_input_read_file(i->job_id,*user,fi_old)) {
00770     logger.msg(Arc::ERROR,"%s: Failed to read list of input files",i->job_id);
00771     return false;
00772   };
00773   // recreate lists by reprocessing RSL 
00774   JobLocalDescription job_desc; // placeholder
00775   if(!process_job_req(*user,*i,job_desc)) {
00776     logger.msg(Arc::ERROR,"%s: Reprocessing RSL failed",i->job_id);
00777     return false;
00778   };
00779   // Restore 'local'
00780   if(!job_local_write_file(*i,*user,*(i->local))) return false;
00781   // Read new lists
00782   if(!job_output_read_file(i->job_id,*user,fl_new)) {
00783     logger.msg(Arc::ERROR,"%s: Failed to read reprocessed list of output files",i->job_id);
00784     return false;
00785   };
00786   if(!job_input_read_file(i->job_id,*user,fi_new)) {
00787     logger.msg(Arc::ERROR,"%s: Failed to read reprocessed list of input files",i->job_id);
00788     return false;
00789   };
00790   // remove uploaded files
00791   i->local->uploads=0;
00792   for(std::list<FileData>::iterator i_new = fl_new.begin();
00793                                     i_new!=fl_new.end();) {
00794     if(!(i_new->has_lfn())) { ++i_new; continue; }; // user file - keep
00795     std::list<FileData>::iterator i_old = fl_old.begin();
00796     for(;i_old!=fl_old.end();++i_old) {
00797       if((*i_new) == (*i_old)) break;
00798     };
00799     if(i_old != fl_old.end()) { ++i_new; i->local->uploads++; continue; };
00800     i_new=fl_new.erase(i_new);
00801   };
00802   if(!job_output_write_file(*i,*user,fl_new)) return false;
00803   // remove existing files
00804   i->local->downloads=0;
00805   for(std::list<FileData>::iterator i_new = fi_new.begin();
00806                                     i_new!=fi_new.end();) {
00807     std::string path = i->session_dir+"/"+i_new->pfn;
00808     struct stat st;
00809     if(::stat(path.c_str(),&st) == -1) {
00810       ++i_new; i->local->downloads++;
00811     } else {
00812       i_new=fi_new.erase(i_new);
00813     };
00814   };
00815   if(!job_input_write_file(*i,*user,fi_new)) return false;
00816   return true;
00817 }
00818 
00819 bool JobsList::JobFailStateRemember(const JobsList::iterator &i,job_state_t state) {
00820   if(!(i->local)) {
00821     JobLocalDescription *job_desc = new JobLocalDescription;
00822     if(!job_local_read_file(i->job_id,*user,*job_desc)) {
00823       logger.msg(Arc::ERROR,"%s: Failed reading local information",i->job_id);
00824       delete job_desc; return false;
00825     }
00826     else {
00827       i->local=job_desc;
00828     };
00829   };
00830   if(state == JOB_STATE_UNDEFINED) {
00831     i->local->failedstate="";
00832     return job_local_write_file(*i,*user,*(i->local));
00833   };
00834   if(i->local->failedstate.length() == 0) {
00835     i->local->failedstate=states_all[state].name;
00836     return job_local_write_file(*i,*user,*(i->local));
00837   };
00838   return true;
00839 }
00840 
00841 void JobsList::ActJobUndefined(JobsList::iterator &i,bool /*hard_job*/,
00842                                bool& once_more,bool& /*delete_job*/,
00843                                bool& job_error,bool& /*state_changed*/) {
00844         /* read state from file */
00845         /* undefined means job just detected - read it's status */
00846         /* but first check if it's not too many jobs in system  */
00847         if((JOB_NUM_ACCEPTED < max_jobs) || (max_jobs == -1)) {
00848           job_state_t new_state=job_state_read_file(i->job_id,*user);
00849           if(new_state == JOB_STATE_UNDEFINED) { /* something failed */
00850             logger.msg(Arc::ERROR,"%s: Reading status of new job failed",i->job_id);
00851             job_error=true; i->AddFailure("Failed reading status of the job");
00852             return;
00853           };
00854           //  By keeping once_more==false jobs does not cycle here but
00855           // goes out and registers it's state in counters. This allows
00856           // to maintain limits properly after restart. Except FINISHED
00857           // jobs because they are not kept in memory and should be 
00858           // processed immediately.
00859           i->job_state = new_state; /* this can be any state, if we are
00860                                          recovering after failure */
00861           if(new_state == JOB_STATE_ACCEPTED) {
00862             // parse request (do it here because any other processing can 
00863             // read 'local' and then we never know if it was new job)
00864             JobLocalDescription *job_desc;
00865             job_desc = new JobLocalDescription;
00866             job_desc->sessiondir=i->session_dir;
00867             /* first phase of job - just  accepted - parse request */
00868             logger.msg(Arc::INFO,"%s: State: ACCEPTED: parsing job description",i->job_id);
00869             if(!process_job_req(*user,*i,*job_desc)) {
00870               logger.msg(Arc::ERROR,"%s: Processing job description failed",i->job_id);
00871               job_error=true; i->AddFailure("Could not process job description");
00872               delete job_desc;
00873               return; /* go to next job */
00874             };
00875             i->local=job_desc;
00876             // set transfer share
00877             if (!share_type.empty()) {
00878               std::string user_proxy_file = job_proxy_filename(i->get_id(), *user).c_str();
00879               std::string cert_dir = "/etc/grid-security/certificates";
00880               std::string v = cert_dir_loc();
00881               if(! v.empty()) cert_dir = v;
00882                 Arc::Credential u(user_proxy_file,"",cert_dir,"");
00883                 const std::string share = get_property(u,share_type);
00884                 i->set_share(share);
00885                 logger.msg(Arc::INFO, "%s: adding to transfer share %s",i->get_id(),i->transfer_share);
00886             }
00887             job_desc->transfershare = i->transfer_share;
00888             job_local_write_file(*i,*user,*job_desc);
00889             i->local->transfershare=i->transfer_share;
00890 
00891             // prepare information for logger
00892             job_log.make_file(*i,*user);
00893           } else if(new_state == JOB_STATE_FINISHED) {
00894             once_more=true;
00895           } else if(new_state == JOB_STATE_DELETED) {
00896             once_more=true;
00897           } else {
00898             logger.msg(Arc::INFO,"%s: %s: New job belongs to %i/%i",i->job_id.c_str(),
00899                 JobDescription::get_state_name(new_state),i->get_uid(),i->get_gid());
00900             // Make it clean state after restart
00901             job_state_write_file(*i,*user,i->job_state);
00902             i->retries = JobsList::max_retries;
00903             // set transfer share and counters
00904             JobLocalDescription job_desc;
00905             if (!share_type.empty()) {
00906               std::string user_proxy_file = job_proxy_filename(i->get_id(), *user).c_str();
00907               std::string cert_dir = "/etc/grid-security/certificates";
00908               std::string v = cert_dir_loc();
00909               if(! v.empty()) cert_dir = v;
00910                 Arc::Credential u(user_proxy_file,"",cert_dir,"");
00911                 const std::string share = get_property(u,share_type);
00912                 i->set_share(share);
00913                 logger.msg(Arc::INFO, "%s: adding to transfer share %s",i->get_id(),i->transfer_share);
00914             }
00915             job_desc.transfershare = i->transfer_share;
00916             job_local_write_file(*i,*user,job_desc);
00917             if (new_state == JOB_STATE_PREPARING) preparing_job_share[i->transfer_share]++;
00918             if (new_state == JOB_STATE_FINISHING) finishing_job_share[i->transfer_share]++;
00919           };
00920         }; // Not doing JobPending here because that job kind of does not exist.
00921         return;
00922 }
00923 
00924 void JobsList::ActJobAccepted(JobsList::iterator &i,bool /*hard_job*/,
00925                               bool& once_more,bool& /*delete_job*/,
00926                               bool& job_error,bool& state_changed) {
00927       /* accepted state - job was just accepted by jobmager-ng and we already
00928          know that it is accepted - now we are analyzing/parsing request,
00929          or it can also happen we are waiting for user specified time */
00930         logger.msg(Arc::VERBOSE,"%s: State: ACCEPTED",i->job_id);
00931         if(!GetLocalDescription(i)) {
00932           job_error=true; i->AddFailure("Internal error");
00933           return; /* go to next job */
00934         };
00935         if(i->local->dryrun) {
00936           logger.msg(Arc::INFO,"%s: State: ACCEPTED: dryrun",i->job_id);
00937           i->AddFailure("User requested dryrun. Job skiped.");
00938           job_error=true; 
00939           return; /* go to next job */
00940         };
00941         if((max_jobs_processing == -1) ||
00942            (use_local_transfer) ||
00943            ((i->local->downloads == 0) && (i->local->rtes == 0)) ||
00944            (((JOB_NUM_PROCESSING < max_jobs_processing) ||
00945             ((JOB_NUM_FINISHING >= max_jobs_processing) && 
00946             (JOB_NUM_PREPARING < max_jobs_processing_emergency))) &&
00947            (i->next_retry <= time(NULL)) && 
00948            (share_type.empty() || preparing_job_share[i->transfer_share] < preparing_max_share[i->transfer_share])))
00949         {
00950           /* check for user specified time */
00951           if(i->retries == 0 && i->local->processtime != -1) {
00952             logger.msg(Arc::INFO,"%s: State: ACCEPTED: have processtime %s",i->job_id.c_str(),
00953                   i->local->processtime.str(Arc::UserTime));
00954             if((i->local->processtime) <= time(NULL)) {
00955               logger.msg(Arc::INFO,"%s: State: ACCEPTED: moving to PREPARING",i->job_id);
00956               state_changed=true; once_more=true;
00957               i->job_state = JOB_STATE_PREPARING;
00958               i->retries = JobsList::max_retries;
00959               preparing_job_share[i->transfer_share]++;
00960             };
00961           }
00962           else {
00963             logger.msg(Arc::INFO,"%s: State: ACCEPTED: moving to PREPARING",i->job_id);
00964             state_changed=true; once_more=true;
00965             i->job_state = JOB_STATE_PREPARING;
00966             /* if first pass then reset retries */
00967             if (i->retries ==0) i->retries = JobsList::max_retries;
00968             preparing_job_share[i->transfer_share]++;
00969           };
00970           if(state_changed && i->retries == JobsList::max_retries) {
00971             /* gather some frontend specific information for user,
00972                do it only once */
00973             std::string cmd = nordugrid_libexec_loc()+"/frontend-info-collector";
00974             char const * const args[2] = { cmd.c_str(), NULL };
00975             job_controldiag_mark_put(*i,*user,args);
00976           };
00977         } else JobPending(i);
00978         return;
00979 }
00980 
00981 void JobsList::ActJobPreparing(JobsList::iterator &i,bool /*hard_job*/,
00982                                bool& once_more,bool& /*delete_job*/,
00983                                bool& job_error,bool& state_changed) {
00984         /* preparing state - means job is parsed and we are going to download or
00985            already downloading input files. process downloader is run for
00986            that. it also checks for files user interface have to upload itself*/
00987         logger.msg(Arc::VERBOSE,"%s: State: PREPARING",i->job_id);
00988         bool retry = false;
00989         if(i->job_pending || state_loading(i,state_changed,false,retry)) {
00990           if(i->job_pending || state_changed) {
00991             if (state_changed) preparing_job_share[i->transfer_share]--;
00992             if((JOB_NUM_RUNNING<max_jobs_running) || (max_jobs_running==-1)) {
00993               i->job_state = JOB_STATE_SUBMITTING;
00994               state_changed=true; once_more=true;
00995               i->retries = JobsList::max_retries;
00996             } else {
00997               state_changed=false;
00998               JobPending(i);
00999             };
01000           }
01001           else if (retry){
01002             preparing_job_share[i->transfer_share]--;
01003             if(--i->retries == 0) { // no tries left
01004               logger.msg(Arc::ERROR,"%s: Download failed. No retries left.",i->job_id);
01005               i->AddFailure("downloader failed (pre-processing)");
01006               job_error=true;
01007               JobFailStateRemember(i,JOB_STATE_PREPARING);
01008               return;
01009             }
01010             /* set next retry time
01011                exponential back-off algorithm - wait 10s, 40s, 90s, 160s,...
01012                with a bit of randomness thrown in - vary by up to 50% of wait_time */
01013             int wait_time = 10 * (JobsList::max_retries - i->retries) * (JobsList::max_retries - i->retries);
01014             int randomness = (rand() % wait_time) - (wait_time/2);
01015             wait_time += randomness;
01016             i->next_retry = time(NULL) + wait_time;
01017             logger.msg(Arc::ERROR,"%s: Download failed. %d retries left. Will wait for %ds before retrying",i->job_id,i->retries,wait_time);
01018             /* set back to ACCEPTED */
01019             i->job_state = JOB_STATE_ACCEPTED;
01020             state_changed = true;
01021           }; 
01022         } 
01023         else {
01024           if(i->GetFailure().length() == 0)
01025             i->AddFailure("downloader failed (pre-processing)");
01026           job_error=true;
01027           preparing_job_share[i->transfer_share]--;
01028           return; /* go to next job */
01029         };
01030         return;
01031 }
01032 
01033 void JobsList::ActJobSubmitting(JobsList::iterator &i,bool /*hard_job*/,
01034                                 bool& once_more,bool& /*delete_job*/,
01035                                 bool& job_error,bool& state_changed) {
01036         /* state submitting - everything is ready for submission - 
01037            so run submission */
01038         logger.msg(Arc::VERBOSE,"%s: State: SUBMITTING",i->job_id);
01039         if(state_submitting(i,state_changed)) {
01040           if(state_changed) {
01041             i->job_state = JOB_STATE_INLRMS;
01042             once_more=true;
01043           };
01044         } else {
01045           job_error=true;
01046           return; /* go to next job */
01047         };
01048         return;
01049 }
01050 
01051 void JobsList::ActJobCanceling(JobsList::iterator &i,bool /*hard_job*/,
01052                                bool& once_more,bool& /*delete_job*/,
01053                                bool& job_error,bool& state_changed) {
01054         /* This state is like submitting, only -rm instead of -submit */
01055         logger.msg(Arc::VERBOSE,"%s: State: CANCELING",i->job_id);
01056         if(state_submitting(i,state_changed,true)) {
01057           if(state_changed) {
01058             i->job_state = JOB_STATE_FINISHING;
01059             finishing_job_share[i->transfer_share]++;
01060             once_more=true;
01061           };
01062         }
01063         else { job_error=true; };
01064         return;
01065 }
01066 
01067 void JobsList::ActJobInlrms(JobsList::iterator &i,bool /*hard_job*/,
01068                             bool& once_more,bool& /*delete_job*/,
01069                             bool& job_error,bool& state_changed) {
01070         logger.msg(Arc::VERBOSE,"%s: State: INLRMS",i->job_id);
01071         if(!GetLocalDescription(i)) {
01072           i->AddFailure("Failed reading local job information");
01073           job_error=true;
01074           return; /* go to next job */
01075         };
01076         /* only check lrms job status on first pass */
01077         if(i->retries == 0 || i->retries == JobsList::max_retries) {
01078           if(i->job_pending || job_lrms_mark_check(i->job_id,*user)) {
01079             if(!i->job_pending) {
01080               logger.msg(Arc::INFO,"%s: Job finished",i->job_id);
01081               job_diagnostics_mark_move(*i,*user);
01082               LRMSResult ec = job_lrms_mark_read(i->job_id,*user);
01083               if(ec.code() != 0) {
01084                 logger.msg(Arc::INFO,"%s: State: INLRMS: exit message is %i %s",i->job_id,ec.code(),ec.description());
01085               /*
01086               * check if not asked to rerun job *
01087               JobLocalDescription *job_desc = i->local;
01088               if(job_desc->reruns > 0) { * rerun job once more *
01089                 job_desc->reruns--;
01090                 job_desc->localid="";
01091                 job_local_write_file(*i,*user,*job_desc);
01092                 job_lrms_mark_remove(i->job_id,*user);
01093                 logger.msg(Arc::INFO,"%s: State: INLRMS: job restarted",i->job_id);
01094                 i->job_state = JOB_STATE_SUBMITTING; 
01095                 // INLRMS slot is already taken by this job, so resubmission
01096                 // can be done without any checks
01097               } else {
01098               */
01099                 i->AddFailure("LRMS error: ("+
01100                       Arc::tostring(ec.code())+") "+ec.description());
01101                 job_error=true;
01102                 //i->job_state = JOB_STATE_FINISHING;
01103                 JobFailStateRemember(i,JOB_STATE_INLRMS);
01104                 // This does not require any special postprocessing and
01105                 // can go to next state directly
01106               /*
01107               };
01108               */
01109                 state_changed=true; once_more=true;
01110                 return;
01111               } else {
01112               // i->job_state = JOB_STATE_FINISHING;
01113               };
01114             };
01115             if((max_jobs_processing == -1) ||
01116               (use_local_transfer) ||
01117               (i->local->uploads == 0) ||
01118               (((JOB_NUM_PROCESSING < max_jobs_processing) ||
01119                ((JOB_NUM_PREPARING >= max_jobs_processing) &&
01120                 (JOB_NUM_FINISHING < max_jobs_processing_emergency))) &&
01121                (i->next_retry <= time(NULL)) &&
01122                (share_type.empty() || finishing_job_share[i->transfer_share] < finishing_max_share[i->transfer_share]))) {
01123                  state_changed=true; once_more=true;
01124                  i->job_state = JOB_STATE_FINISHING;
01125                  /* if first pass then reset retries */
01126                  if (i->retries == 0) i->retries = JobsList::max_retries;
01127                  finishing_job_share[i->transfer_share]++;
01128             } else JobPending(i);
01129           };
01130         } else if((max_jobs_processing == -1) ||
01131                   (use_local_transfer) ||
01132                   (i->local->uploads == 0) ||
01133                   (((JOB_NUM_PROCESSING < max_jobs_processing) ||
01134                    ((JOB_NUM_PREPARING >= max_jobs_processing) &&
01135                     (JOB_NUM_FINISHING < max_jobs_processing_emergency))) &&
01136                   (i->next_retry <= time(NULL)) &&
01137                   (share_type.empty() || finishing_job_share[i->transfer_share] < finishing_max_share[i->transfer_share]))) {
01138                     state_changed=true; once_more=true;
01139                     i->job_state = JOB_STATE_FINISHING;
01140                     finishing_job_share[i->transfer_share]++;
01141           } else {
01142               JobPending(i);
01143           };
01144         return;
01145 }
01146 
01147 void JobsList::ActJobFinishing(JobsList::iterator &i,bool hard_job,
01148                                bool& once_more,bool& /*delete_job*/,
01149                                bool& job_error,bool& state_changed) {
01150         logger.msg(Arc::VERBOSE,"%s: State: FINISHING",i->job_id);
01151         bool retry = false;
01152         if(state_loading(i,state_changed,true,retry)) {
01153           if (retry) {
01154             finishing_job_share[i->transfer_share]--;
01155             if(--i->retries == 0) { // no tries left
01156               logger.msg(Arc::ERROR,"%s: Upload failed. No retries left.",i->job_id);
01157               i->AddFailure("uploader failed (post-processing)");
01158               job_error=true;
01159               JobFailStateRemember(i,JOB_STATE_FINISHING);
01160               return;
01161             };
01162             /* set next retry time
01163             exponential back-off algorithm - wait 10s, 40s, 90s, 160s,...
01164             with a bit of randomness thrown in - vary by up to 50% of wait_time */
01165             int wait_time = 10 * (JobsList::max_retries - i->retries) * (JobsList::max_retries - i->retries);
01166             int randomness = (rand() % wait_time) - (wait_time/2);
01167             wait_time += randomness;
01168             i->next_retry = time(NULL) + wait_time;
01169             logger.msg(Arc::ERROR,"%s: Upload failed. %d retries left. Will wait for %ds before retrying.",i->job_id,i->retries,wait_time);
01170             /* set back to INLRMS */
01171             i->job_state = JOB_STATE_INLRMS;
01172             state_changed = true;
01173           }
01174           else if(state_changed) {
01175             finishing_job_share[i->transfer_share]--;
01176             i->job_state = JOB_STATE_FINISHED;
01177             once_more=true; hard_job=true;
01178           };
01179         } else {
01180           // i->job_state = JOB_STATE_FINISHED;
01181           state_changed=true; /* to send mail */
01182           once_more=true; hard_job=true;
01183           if(i->GetFailure().length() == 0)
01184             i->AddFailure("uploader failed (post-processing)");
01185           job_error=true;
01186           finishing_job_share[i->transfer_share]--;
01187           return; /* go to next job */
01188         };
01189         return;
01190 }
01191 
01192 static time_t prepare_cleanuptime(JobId &job_id,time_t &keep_finished,JobsList::iterator &i,JobUser &user) {
01193   JobLocalDescription job_desc;
01194   time_t t = -1;
01195   /* read lifetime - if empty it wont be overwritten */
01196   job_local_read_file(job_id,user,job_desc);
01197   if(!Arc::stringto(job_desc.lifetime,t)) t = keep_finished;
01198   if(t > keep_finished) t = keep_finished;
01199   time_t last_changed=job_state_time(job_id,user);
01200   t=last_changed+t; job_desc.cleanuptime=t;
01201   job_local_write_file(*i,user,job_desc);
01202   return t;
01203 }
01204 
01205 void JobsList::ActJobFinished(JobsList::iterator &i,bool hard_job,
01206                               bool& /*once_more*/,bool& /*delete_job*/,
01207                               bool& /*job_error*/,bool& state_changed) {
01208         if(job_clean_mark_check(i->job_id,*user)) {
01209           logger.msg(Arc::INFO,"%s: Job is requested to clean - deleting",i->job_id);
01210           /* delete everything */
01211           job_clean_final(*i,*user);
01212         } else {
01213           if(job_restart_mark_check(i->job_id,*user)) { 
01214             job_restart_mark_remove(i->job_id,*user); 
01215             /* request to rerun job - check if can */
01216             // Get information about failed state and forget it
01217             job_state_t state_ = JobFailStateGet(i);
01218             if(state_ == JOB_STATE_PREPARING) {
01219               if(RecreateTransferLists(i)) {
01220                 job_failed_mark_remove(i->job_id,*user);
01221                 // state_changed=true;
01222                 i->job_state = JOB_STATE_ACCEPTED;
01223                 JobPending(i); // make it go to end of state immediately
01224                 return;
01225               };
01226             } else if((state_ == JOB_STATE_SUBMITTING) ||
01227                       (state_ == JOB_STATE_INLRMS)) {
01228               if(RecreateTransferLists(i)) {
01229                 job_failed_mark_remove(i->job_id,*user);
01230                 // state_changed=true;
01231                 if((i->local->downloads > 0) || (i->local->rtes > 0)) {
01232                   // missing input files has to be re-downloaded
01233                   i->job_state = JOB_STATE_ACCEPTED;
01234                 } else {
01235                   i->job_state = JOB_STATE_PREPARING;
01236                 };
01237                 JobPending(i); // make it go to end of state immediately
01238                 return;
01239               };
01240             } else if(state_ == JOB_STATE_FINISHING) {
01241               if(RecreateTransferLists(i)) {
01242                 job_failed_mark_remove(i->job_id,*user);
01243                 // state_changed=true;
01244                 i->job_state = JOB_STATE_INLRMS;
01245                 JobPending(i); // make it go to end of state immediately
01246                 return;
01247               };
01248             } else {
01249               logger.msg(Arc::ERROR,"%s: Can't rerun on request - not a suitable state",i->job_id);
01250             };
01251           };
01252           if(hard_job) { /* try to minimize load */
01253             time_t t = -1;
01254             if(!job_local_read_cleanuptime(i->job_id,*user,t)) {
01255               /* must be first time - create cleanuptime */
01256               t=prepare_cleanuptime(i->job_id,i->keep_finished,i,*user);
01257             };
01258             /* check if it is not time to remove that job completely */
01259             if((time(NULL)-t) >= 0) {
01260               logger.msg(Arc::INFO,"%s: Job is too old - deleting",i->job_id);
01261               if(i->keep_deleted) {
01262                 // here we have to get the cache per-job dirs to be deleted
01263                 CacheConfig * cache_config;
01264                 std::list<std::string> cache_per_job_dirs;
01265                 try {
01266                   cache_config = new CacheConfig();
01267                 }
01268                 catch (CacheConfigException e) {
01269                   logger.msg(Arc::ERROR, "Error with cache configuration: %s", e.what());
01270                   job_clean_deleted(*i,*user);
01271                   i->job_state = JOB_STATE_DELETED;
01272                   state_changed=true;
01273                   return;
01274                 }
01275                 std::vector<std::string> conf_caches = cache_config->getCacheDirs();
01276                 // add each dir to our list
01277                 for (std::vector<std::string>::iterator it = conf_caches.begin(); it != conf_caches.end(); it++) {
01278                   cache_per_job_dirs.push_back(it->substr(0, it->find(" "))+"/joblinks");
01279                 }
01280                 // add remote caches
01281                 std::vector<std::string> remote_caches = cache_config->getRemoteCacheDirs();
01282                 for (std::vector<std::string>::iterator it = remote_caches.begin(); it != remote_caches.end(); it++) {
01283                   cache_per_job_dirs.push_back(it->substr(0, it->find(" "))+"/joblinks");
01284                 }
01285                 // add draining caches
01286                 std::vector<std::string> draining_caches = cache_config->getDrainingCacheDirs();
01287                 for (std::vector<std::string>::iterator it = draining_caches.begin(); it != draining_caches.end(); it++) {
01288                   cache_per_job_dirs.push_back(it->substr(0, it->find(" "))+"/joblinks");
01289                 }
01290                 job_clean_deleted(*i,*user,cache_per_job_dirs);
01291                 i->job_state = JOB_STATE_DELETED;
01292                 state_changed=true;
01293               } else {
01294                 /* delete everything */
01295                 job_clean_final(*i,*user);
01296               };
01297             };
01298           };
01299         };
01300         return;
01301 }
01302 
01303 void JobsList::ActJobDeleted(JobsList::iterator &i,bool hard_job,
01304                              bool& /*once_more*/,bool& /*delete_job*/,
01305                              bool& /*job_error*/,bool& /*state_changed*/) {
01306         if(hard_job) { /* try to minimize load */
01307           time_t t = -1;
01308           if(!job_local_read_cleanuptime(i->job_id,*user,t)) {
01309             /* should not happen - delete job */
01310             JobLocalDescription job_desc;
01311             /* read lifetime - if empty it wont be overwritten */
01312             job_clean_final(*i,*user);
01313           } else {
01314             /* check if it is not time to remove remnants of that */
01315             if((time(NULL)-(t+i->keep_deleted)) >= 0) {
01316               logger.msg(Arc::INFO,"%s: Job is ancient - delete rest of information",i->job_id);
01317               /* delete everything */
01318               job_clean_final(*i,*user);
01319             };
01320           };
01321         };
01322         return;
01323 }
01324 
01325 /* Do job's processing: check&change state, run necessary external
01326    programs, do necessary things. Also advance pointer and/or delete
01327    slot if necessary */
01328 bool JobsList::ActJob(JobsList::iterator &i,bool hard_job) {
01329   bool once_more     = true;
01330   bool delete_job    = false;
01331   bool job_error     = false;
01332   bool state_changed = false;
01333   job_state_t old_state = i->job_state;
01334   bool old_pending = i->job_pending;
01335   while(once_more) {
01336     once_more     = false;
01337     delete_job    = false;
01338     job_error     = false;
01339     state_changed = false;
01340  /* some states can not be canceled (or there is no sense to do that) */
01341 /*
01342        (i->job_state != JOB_STATE_FINISHING) &&
01343 */
01344     if((i->job_state != JOB_STATE_CANCELING) &&
01345        (i->job_state != JOB_STATE_FINISHED) &&
01346        (i->job_state != JOB_STATE_DELETED) &&
01347        (i->job_state != JOB_STATE_SUBMITTING)) {
01348       if(job_cancel_mark_check(i->job_id,*user)) {
01349         logger.msg(Arc::INFO,"%s: Canceling job (%s) because of user request",i->job_id,user->UnixName());
01350         /* kill running child */
01351         if(i->child) { 
01352           i->child->Kill(0);
01353           delete i->child; i->child=NULL;
01354         };
01355         /* update transfer share counters */
01356           if (i->job_state == JOB_STATE_PREPARING && !i->job_pending) preparing_job_share[i->transfer_share]--;
01357           else if (i->job_state == JOB_STATE_FINISHING) finishing_job_share[i->transfer_share]--;
01358         /* put some explanation */
01359         i->AddFailure("User requested to cancel the job");
01360         /* behave like if job failed */
01361         if(!FailedJob(i)) {
01362           /* DO NOT KNOW WHAT TO DO HERE !!!!!!!!!! */
01363         };
01364         /* special processing for INLRMS case */
01365         if(i->job_state == JOB_STATE_INLRMS) {
01366           i->job_state = JOB_STATE_CANCELING;
01367         }
01368         else if(i->job_state == JOB_STATE_FINISHING) {
01369           i->job_state = JOB_STATE_FINISHED;
01370         }
01371         else {
01372           i->job_state = JOB_STATE_FINISHING;
01373           finishing_job_share[i->transfer_share]++;
01374         };
01375         job_cancel_mark_remove(i->job_id,*user);
01376         state_changed=true;
01377         once_more=true;
01378       };
01379     };
01380     if(!state_changed) switch(i->job_state) {
01381     /* undefined state - not actual state - job was just added but
01382        not analyzed yet */
01383       case JOB_STATE_UNDEFINED: {
01384        ActJobUndefined(i,hard_job,once_more,delete_job,job_error,state_changed);
01385       }; break;
01386       case JOB_STATE_ACCEPTED: {
01387        ActJobAccepted(i,hard_job,once_more,delete_job,job_error,state_changed);
01388       }; break;
01389       case JOB_STATE_PREPARING: {
01390        ActJobPreparing(i,hard_job,once_more,delete_job,job_error,state_changed);
01391       }; break;
01392       case JOB_STATE_SUBMITTING: {
01393        ActJobSubmitting(i,hard_job,once_more,delete_job,job_error,state_changed);
01394       }; break;
01395       case JOB_STATE_CANCELING: {
01396        ActJobCanceling(i,hard_job,once_more,delete_job,job_error,state_changed);
01397       }; break;
01398       case JOB_STATE_INLRMS: {
01399        ActJobInlrms(i,hard_job,once_more,delete_job,job_error,state_changed);
01400       }; break;
01401       case JOB_STATE_FINISHING: {
01402        ActJobFinishing(i,hard_job,once_more,delete_job,job_error,state_changed);
01403       }; break;
01404       case JOB_STATE_FINISHED: {
01405        ActJobFinished(i,hard_job,once_more,delete_job,job_error,state_changed);
01406       }; break;
01407       case JOB_STATE_DELETED: {
01408        ActJobDeleted(i,hard_job,once_more,delete_job,job_error,state_changed);
01409       }; break;
01410       default: { // should destroy job with unknown state ?!
01411       };
01412     };
01413     do {
01414       // Process errors which happened during processing this job
01415       if(job_error) {
01416         job_error=false;
01417         // always cause rerun - in order not to loose state change
01418         // Failed job - move it to proper state
01419         logger.msg(Arc::ERROR,"%s: Job failure detected",i->job_id);
01420         if(!FailedJob(i)) { /* something is really wrong */
01421           i->AddFailure("Failed during processing failure");
01422           delete_job=true;
01423         } else { /* just move job to proper state */
01424           if((i->job_state == JOB_STATE_FINISHED) ||
01425              (i->job_state == JOB_STATE_DELETED)) {
01426             // Normally these stages should not generate errors
01427             // so ignore them
01428           } else if(i->job_state == JOB_STATE_FINISHING) {
01429             // No matter if FINISHING fails - it still goes to FINISHED
01430             i->job_state = JOB_STATE_FINISHED;
01431             state_changed=true;
01432             once_more=true;
01433           } else {
01434             // Any other failure should cause transfer to FINISHING
01435             i->job_state = JOB_STATE_FINISHING;
01436             finishing_job_share[i->transfer_share]++;
01437             state_changed=true;
01438             once_more=true;
01439           };
01440           i->job_pending=false;
01441         };
01442       };
01443       // Process state changes, also those generated by error processing
01444       if(state_changed) {
01445         state_changed=false;
01446         i->job_pending=false;
01447         // Report state change into log
01448         logger.msg(Arc::INFO,"%s: State: %s from %s",
01449               i->job_id.c_str(),JobDescription::get_state_name(i->job_state),
01450               JobDescription::get_state_name(old_state));
01451         if(!job_state_write_file(*i,*user,i->job_state)) {
01452           i->AddFailure("Failed writing job status");
01453           job_error=true;
01454         } else {
01455           // talk to external plugin to ask if we can proceed
01456           if(plugins) {
01457             std::list<ContinuationPlugins::result_t> results;
01458             plugins->run(*i,*user,results);
01459             std::list<ContinuationPlugins::result_t>::iterator result = results.begin();
01460             while(result != results.end()) {
01461               // analyze results
01462               if(result->action == ContinuationPlugins::act_fail) {
01463                 logger.msg(Arc::ERROR,"%s: Plugin at state %s : %s",
01464                     i->job_id.c_str(),states_all[i->get_state()].name,
01465                     result->response);
01466                 i->AddFailure(std::string("Plugin at state ")+
01467                 states_all[i->get_state()].name+" failed: "+(result->response));
01468                 job_error=true;
01469               } else if(result->action == ContinuationPlugins::act_log) {
01470                 // Scream but go ahead
01471                 logger.msg(Arc::WARNING,"%s: Plugin at state %s : %s",
01472                     i->job_id.c_str(),states_all[i->get_state()].name,
01473                     result->response);
01474               } else if(result->action == ContinuationPlugins::act_pass) {
01475                 // Just continue quietly
01476               } else {
01477                 logger.msg(Arc::ERROR,"%s: Plugin execution failed",i->job_id);
01478                 i->AddFailure(std::string("Failed running plugin at state ")+
01479                     states_all[i->get_state()].name);
01480                 job_error=true;
01481               };
01482               ++result;
01483             };
01484           };
01485           // Processing to be done on state changes 
01486           job_log.make_file(*i,*user);
01487           if(i->job_state == JOB_STATE_FINISHED) {
01488             if(i->GetLocalDescription(*user)) {
01489               job_stdlog_move(*i,*user,i->local->stdlog);
01490             };
01491             job_clean_finished(i->job_id,*user);
01492             job_log.finish_info(*i,*user);
01493             prepare_cleanuptime(i->job_id,i->keep_finished,i,*user);
01494           } else if(i->job_state == JOB_STATE_PREPARING) {
01495             job_log.start_info(*i,*user);
01496           };
01497         };
01498         /* send mail after errora and change are processed */
01499         /* do not send if something really wrong happened to avoid email DoS */
01500         if(!delete_job) send_mail(*i,*user);
01501       };
01502       // Keep repeating till error goes out
01503     } while(job_error);
01504     if(delete_job) { 
01505       logger.msg(Arc::ERROR,"%s: Delete request due to internal problems",i->job_id);
01506       i->job_state = JOB_STATE_FINISHED; /* move to finished in order to 
01507                                             remove from list */
01508       i->job_pending=false;
01509       job_state_write_file(*i,*user,i->job_state); 
01510       i->AddFailure("Serious troubles (problems during processing problems)");
01511       FailedJob(i);  /* put some marks */
01512       if(i->GetLocalDescription(*user)) {
01513         job_stdlog_move(*i,*user,i->local->stdlog);
01514       };
01515       job_clean_finished(i->job_id,*user);  /* clean status files */
01516       once_more=true; hard_job=true; /* to process some things in local */
01517     };
01518   };
01519   /* FINISHED+DELETED jobs are not kept in list - only in files */
01520   /* if job managed to get here with state UNDEFINED - 
01521      means we are overloaded with jobs - do not keep them in list */
01522   if((i->job_state == JOB_STATE_FINISHED) ||
01523      (i->job_state == JOB_STATE_DELETED) ||
01524      (i->job_state == JOB_STATE_UNDEFINED)) {
01525     /* this is the ONLY place there jobs are removed from memory */
01526     /* update counters */
01527     if(!old_pending) {
01528       jobs_num[old_state]--;
01529     } else {
01530       jobs_pending--;
01531     };
01532     if(i->local) { delete i->local; };
01533     i=jobs.erase(i);
01534   }
01535   else {
01536     /* update counters */
01537     if(!old_pending) {
01538       jobs_num[old_state]--;
01539     } else {
01540       jobs_pending--;
01541     };
01542     if(!i->job_pending) {
01543       jobs_num[i->job_state]++;
01544     } else {
01545       jobs_pending++;
01546     }
01547     ++i;
01548   };
01549   return true;
01550 }
01551 
01552 #endif //  NO_GLOBUS_CODE
01553 
01554 class JobFDesc {
01555  public:
01556   JobId id;
01557   uid_t uid;
01558   gid_t gid;
01559   time_t t;
01560   JobFDesc(const char* s,unsigned int n):id(s,n),uid(0),gid(0),t(-1) { };
01561   bool operator<(JobFDesc &right) { return (t < right.t); };
01562 };
01563 
01564 /* find new jobs - sort by date to implement FIFO */
01565 bool JobsList::ScanNewJobs(bool /*hard_job*/) {
01566   std::string file;
01567   std::string cdir=user->ControlDir();
01568   std::list<JobFDesc> ids;
01569   try {
01570     Glib::Dir dir(cdir);
01571     for(;;) {
01572       file=dir.read_name();
01573       if(file.empty()) break;
01574       int l=file.length();
01575       if(l>(4+7)) {  /* job id contains at least 1 character */
01576         if(!strncmp(file.c_str(),"job.",4)) {
01577           if(!strncmp((file.c_str())+(l-7),".status",7)) {
01578             JobFDesc id((file.c_str())+4,l-7-4);
01579             if(FindJob(id.id) == jobs.end()) {
01580               std::string fname=cdir+'/'+file.c_str();
01581               uid_t uid;
01582               gid_t gid;
01583               time_t t;
01584               if(check_file_owner(fname,*user,uid,gid,t)) {
01585                 /* add it to the list */
01586                 id.uid=uid; id.gid=gid; id.t=t;
01587                 ids.push_back(id);
01588               };
01589             };
01590           };
01591         };
01592       };
01593     };
01594   } catch(Glib::FileError& e) {
01595     logger.msg(Arc::ERROR,"Failed reading control directory: %s",user->ControlDir());
01596     return false;
01597   };
01598   /* sorting by date */
01599   ids.sort();
01600   for(std::list<JobFDesc>::iterator id=ids.begin();id!=ids.end();++id) {
01601     iterator i;
01602     /* adding job with file's uid/gid */
01603     if(AddJobNoCheck(id->id,i,id->uid,id->gid)) {
01604 /*    ActJob(i,hard_job);  */
01605     };
01606  /* failed AddJob most probably means it is already in list or limit exceeded */
01607   };
01608   return true;
01609 }
01610