Back to index

nordugrid-arc-nox  1.1.0~rc6
run_parallel.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 
00005 #include <unistd.h>
00006 #include <sys/types.h>
00007 #include <sys/stat.h>
00008 #include <fcntl.h>
00009 
00010 #include <arc/Logger.h>
00011 #include <arc/Utils.h>
00012 
00013 #include "../conf/environment.h"
00014 #include "run_parallel.h"
00015 
00016 typedef struct {
00017   const JobUser* user;
00018   const JobDescription* job;
00019   const char* reason;
00020 } job_subst_t;
00021 
00022 static Arc::Logger& logger = Arc::Logger::getRootLogger();
00023 
00024 void (*RunParallel::kicker_func_)(void*) = NULL;
00025 void* RunParallel::kicker_arg_ = NULL;
00026 
00027 static void job_subst(std::string& str,void* arg) {
00028   job_subst_t* subs = (job_subst_t*)arg;
00029   for(std::string::size_type p = 0;;) {
00030     p=str.find('%',p);
00031     if(p==std::string::npos) break;
00032     if(str[p+1]=='I') {
00033       str.replace(p,2,subs->job->get_id().c_str());
00034       p+=subs->job->get_id().length();
00035     } else if(str[p+1]=='S') {
00036       str.replace(p,2,subs->job->get_state_name());
00037       p+=strlen(subs->job->get_state_name());
00038     } else if(str[p+1]=='O') {
00039       str.replace(p,2,subs->reason);
00040       p+=strlen(subs->reason);
00041     } else {
00042       p+=2;
00043     };
00044   };
00045   subs->user->substitute(str);
00046 }
00047 
00048 bool RunParallel::run(JobUser& user,const JobDescription& desc,const char *const args[],Arc::Run** ere,bool su) {
00049   RunPlugin* cred = user.CredPlugin();
00050   job_subst_t subs; subs.user=&user; subs.job=&desc; subs.reason="external";
00051   if((!cred) || (!(*cred))) { cred=NULL; };
00052   //RunPlugin* cred = NULL;
00053   if(user.get_uid() == 0) {
00054     JobUser tmp_user(desc.get_uid());
00055     if(!tmp_user.is_valid()) return false;
00056     tmp_user.SetControlDir(user.ControlDir());
00057     tmp_user.SetSessionRoot(user.SessionRoot(desc.get_id()));
00058     return run(tmp_user,desc.get_id().c_str(),args,ere,su,
00059                                         true,cred,&job_subst,&subs);
00060   };
00061   return run(user,desc.get_id().c_str(),args,ere,su,
00062                                       true,cred,&job_subst,&subs);
00063 }
00064 
00065 /* fork & execute child process with stderr redirected 
00066    to job.ID.errors, stdin and stdout to /dev/null */
00067 bool RunParallel::run(JobUser& user,const char* jobid,const char *const args[],Arc::Run** ere,bool su,bool job_proxy,RunPlugin* cred,RunPlugin::substitute_t subst,void* subst_arg) {
00068   *ere=NULL;
00069   std::list<std::string> args_;
00070   for(int n = 0;args[n];++n) args_.push_back(std::string(args[n]));
00071   Arc::Run* re = new Arc::Run(args_);
00072   if((!re) || (!(*re))) {
00073     if(re) delete re;
00074     logger.msg(Arc::ERROR,"%s: Failure creating slot for child process",jobid?jobid:"");
00075     return false;
00076   };
00077   if(kicker_func_) re->AssignKicker(kicker_func_,kicker_arg_);
00078   RunParallel* rp = new RunParallel(user,jobid,su,job_proxy,cred,subst,subst_arg);
00079   if((!rp) || (!(*rp))) {
00080     if(rp) delete rp;
00081     delete re;
00082     logger.msg(Arc::ERROR,"%s: Failure creating data storage for child process",jobid?jobid:"");
00083     return false;
00084   };
00085   re->AssignInitializer(&initializer,rp);
00086   if(!re->Start()) {
00087     delete rp;
00088     delete re;
00089     logger.msg(Arc::ERROR,"%s: Failure starting child process",jobid?jobid:"");
00090     return false;
00091   };
00092   delete rp;
00093   *ere=re;
00094   return true;
00095 }
00096 
00097 void RunParallel::initializer(void* arg) {
00098 #ifdef WIN32
00099 #error This functionality is not available in Windows environement
00100 #else
00101   // child
00102   RunParallel* it = (RunParallel*)arg;
00103   struct rlimit lim;
00104   int max_files;
00105   if(getrlimit(RLIMIT_NOFILE,&lim) == 0) { max_files=lim.rlim_cur; }
00106   else { max_files=4096; };
00107   // change user
00108   if(!(it->user_.SwitchUser(it->su_))) {
00109     logger.msg(Arc::ERROR,"%s: Failed switching user",it->jobid_); sleep(10); exit(1);
00110   };
00111   if(it->cred_) {
00112     // run external plugin to acquire non-unix local credentials
00113     if(!it->cred_->run(it->subst_,it->subst_arg_)) {
00114       logger.msg(Arc::ERROR,"%s: Failed to run plugin",it->jobid_); sleep(10); _exit(1);
00115     };
00116     if(it->cred_->result() != 0) {
00117       logger.msg(Arc::ERROR,"%s: Plugin failed",it->jobid_); sleep(10); _exit(1);
00118     };
00119   };
00120   // close all handles inherited from parent
00121   if(max_files == RLIM_INFINITY) max_files=4096;
00122   for(int i=0;i<max_files;i++) { close(i); };
00123   int h;
00124   // set up stdin,stdout and stderr
00125   h=open("/dev/null",O_RDONLY); 
00126   if(h != 0) { if(dup2(h,0) != 0) { sleep(10); exit(1); }; close(h); };
00127   h=open("/dev/null",O_WRONLY);
00128   if(h != 1) { if(dup2(h,1) != 1) { sleep(10); exit(1); }; close(h); };
00129   std::string errlog;
00130   if(!(it->jobid_.empty())) { 
00131     errlog = it->user_.ControlDir() + "/job." + it->jobid_ + ".errors";
00132     h=open(errlog.c_str(),O_WRONLY | O_CREAT | O_APPEND,S_IRUSR | S_IWUSR);
00133     if(h==-1) { h=open("/dev/null",O_WRONLY); };
00134   }
00135   else { h=open("/dev/null",O_WRONLY); };
00136   if(h != 2) { if(dup2(h,2) != 2) { sleep(10); exit(1); }; close(h); };
00137   // setting environment  - TODO - better environment 
00138   if(it->job_proxy_) {
00139     Arc::SetEnv("GLOBUS_LOCATION",globus_loc());
00140     Arc::UnsetEnv("X509_USER_KEY");
00141     Arc::UnsetEnv("X509_USER_CERT");
00142     Arc::UnsetEnv("X509_USER_PROXY");
00143     Arc::UnsetEnv("X509_RUN_AS_SERVER");
00144     if(!(it->jobid_.empty())) {
00145       std::string proxy = it->user_.ControlDir() + "/job." + it->jobid_ + ".proxy";
00146       Arc::SetEnv("X509_USER_PROXY",proxy);
00147       // for Globus 2.2 set fake cert and key, or else it takes 
00148       // those from host in case of root user.
00149       // 2.4 needs names and 2.2 will work too.
00150       // 3.x requires fake ones again.
00151 #if GLOBUS_IO_VERSION>=5
00152       Arc::SetEnv("X509_USER_KEY",(std::string("fake")));
00153       Arc::SetEnv("X509_USER_CERT",(std::string("fake")));
00154 #else
00155       Arc::SetEnv("X509_USER_KEY",proxy);
00156       Arc::SetEnv("X509_USER_CERT",proxy);
00157 #endif
00158     };
00159   };
00160   //# execv(args[0],args);
00161   //# perror("execv");
00162   //# std::cerr<<(jobid?jobid:"")<<"Failed to start external program: "<<args[0]<<std::endl;
00163   //# sleep(10); exit(1);
00164 #endif
00165 }
00166