Back to index

nordugrid-arc-nox  1.1.0~rc6
grid_manager.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 
00005 #include <iostream>
00006 #include <sys/types.h>
00007 #include <pwd.h>
00008 #include <string>
00009 #include <cstdio>
00010 #include <fstream>
00011 #include <list>
00012 #include <signal.h>
00013 
00014 #include <arc/Logger.h>
00015 #include <arc/Run.h>
00016 #include <arc/Thread.h>
00017 #include <arc/StringConv.h>
00018 #include "jobs/users.h"
00019 #include "jobs/states.h"
00020 #include "jobs/commfifo.h"
00021 #include "conf/environment.h"
00022 #include "conf/conf_file.h"
00023 #include "conf/daemon.h"
00024 #include "files/info_types.h"
00025 #include "files/delete.h"
00026 #include "run/run_parallel.h"
00027 
00028 #include "grid_manager.h"
00029 
00030 /* do job cleaning every 2 hours */
00031 #define HARD_JOB_PERIOD 7200
00032 
00033 /* cache cleaning and registration every 5 minutes */
00034 #define CACHE_CLEAN_PERIOD 300
00035 
00036 #define DEFAULT_LOG_FILE "/var/log/grid-manager.log"
00037 #define DEFAULT_PID_FILE "/var/run/grid-manager.pid"
00038 
00039 namespace ARex {
00040 
00041 static Arc::Logger logger(Arc::Logger::getRootLogger(),"AREX:GM");
00042 
00043 static void* cache_func(void* arg) {
00044   const JobUsers* users = (const JobUsers*)arg;
00045   Arc::Run *proc = NULL;
00046   JobUser gmuser(getuid()); // Cleaning should run under the GM user 
00047   
00048   // run cache cleaning periodically 
00049   for(;;) {
00050     // go through each user and clean their caches. One user is processed per clean period
00051     for (JobUsers::const_iterator cacheuser = users->begin(); cacheuser != users->end(); ++cacheuser) {
00052       int exit_code = -1;
00053       // if previous process has not finished
00054       if(proc != NULL) {  
00055         if(!(proc->Running())) {
00056           exit_code=proc->Result();
00057           delete proc;
00058           proc=NULL;
00059         };
00060       };
00061       if(proc == NULL) { // previous already exited
00062         CacheConfig * cache_info = cacheuser->CacheParams();
00063         if (!cache_info->cleanCache()) continue;
00064         
00065         // get the cache dirs
00066         std::vector<std::string> cache_info_dirs = cache_info->getCacheDirs();
00067 
00068         // in arc.conf % of used space is given, but cleanbyage uses % of free space
00069         std::string minfreespace = Arc::tostring(100-cache_info->getCacheMax());
00070         std::string maxfreespace = Arc::tostring(100-cache_info->getCacheMin());  
00071 
00072         // set log file location - controldir/job.cache-clean.errors
00073         // TODO: use GM log?
00074         gmuser.SetControlDir(cacheuser->ControlDir()); // Should this requirement be removed ?
00075         int argc=0;
00076         char* args[7+cache_info_dirs.size()+1];
00077         
00078         // do cache-clean -h for explanation of options
00079         std::string cmd = nordugrid_libexec_loc() + "/cache-clean";
00080         args[argc++]=(char*)cmd.c_str();
00081         args[argc++]=(char*)"-m";
00082         args[argc++]=(char*)minfreespace.c_str();
00083         args[argc++]=(char*)"-M";
00084         args[argc++]=(char*)maxfreespace.c_str();
00085         args[argc++]=(char*)"-D";
00086         args[argc++]=(char*)cache_info->getLogLevel().c_str();
00087         std::vector<std::string> cache_dirs;
00088         // have to loop over twice to avoid repeating the same pointer in args
00089         for (std::vector<std::string>::iterator i = cache_info_dirs.begin(); i != cache_info_dirs.end(); i++) {
00090           cache_dirs.push_back(i->substr(0, i->find(" ")));
00091         }
00092         for (std::vector<std::string>::iterator i = cache_dirs.begin(); i != cache_dirs.end(); i++) {
00093           args[argc++]=(char*)(*i).c_str();
00094         }
00095         args[argc]=NULL;
00096         if(!RunParallel::run(gmuser,"cache-clean",args,&proc,false,false)) {
00097           logger.msg(Arc::ERROR,"Failed to run cache cleanup script: %s", cmd);
00098         };
00099       };
00100       for(unsigned int t=CACHE_CLEAN_PERIOD;t;) t=sleep(t);
00101     };
00102   };
00103   return NULL;
00104 }
00105 
00106 typedef struct {
00107   pthread_cond_t* sleep_cond;
00108   pthread_mutex_t* sleep_mutex;
00109   CommFIFO* timeout;
00110 } sleep_st;
00111 
00112 static void* wakeup_func(void* arg) {
00113   sleep_st* s = (sleep_st*)arg;
00114   for(;;) {
00115     s->timeout->wait();
00116     pthread_mutex_lock(s->sleep_mutex);
00117     pthread_cond_signal(s->sleep_cond);
00118     pthread_mutex_unlock(s->sleep_mutex);
00119   };
00120   return NULL;
00121 }
00122 
00123 static void kick_func(void* arg) {
00124   sleep_st* s = (sleep_st*)arg;
00125   pthread_mutex_lock(s->sleep_mutex);
00126   pthread_cond_signal(s->sleep_cond);
00127   pthread_mutex_unlock(s->sleep_mutex);
00128 }
00129 
00130 typedef struct {
00131   int argc;
00132   char** argv;
00133 } args_st;
00134 
00135 static void grid_manager(void* arg) {
00136   const char* config_filename = (const char*)arg;
00137   if(!arg) return;
00138   unsigned int clean_first_level=0;
00139   // int n;
00140   // int argc = ((args_st*)arg)->argc;
00141   // char** argv = ((args_st*)arg)->argv;
00142   setpgid(0,0);
00143   opterr=0;
00144   if(config_filename) nordugrid_config_loc(config_filename);
00145 
00146   logger.msg(Arc::INFO,"Starting grid-manager thread");
00147   Daemon daemon;
00148   // Only supported option now is -c
00149   /*
00150   while((n=daemon.getopt(argc,argv,"hvC:c:")) != -1) {
00151     switch(n) {
00152       case ':': { logger.msg(Arc::ERROR,"Missing argument"); return; };
00153       case '?': { logger.msg(Arc::ERROR,"Unrecognized option: %s",(char)optopt); return; };
00154       case '.': { return; };
00155       case 'h': {
00156         std::cout<<"grid-manager [-C clean_level] [-v] [-h] [-c configuration_file] "<<daemon.short_help()<<std::endl;
00157          return;
00158       };
00159       case 'v': {
00160         std::cout<<"grid-manager: version "<<VERSION<<std::endl;
00161         return;
00162       };
00163       case 'C': {
00164         if(sscanf(optarg,"%u",&clean_first_level) != 1) {
00165           logger.msg(Arc::ERROR,"Wrong clean level");
00166           return;
00167         };
00168       }; break;
00169       case 'c': {
00170         nordugrid_config_loc=optarg;
00171       }; break;
00172       default: { logger.msg(Arc::ERROR,"Option processing error"); return; };
00173     };
00174   };
00175   */
00176 
00177   JobUsers users;
00178   std::string my_username("");
00179   uid_t my_uid=getuid();
00180   JobUser *my_user = NULL;
00181   if(!read_env_vars()) {
00182     logger.msg(Arc::FATAL,"Can't initialize runtime environment - EXITING"); return;
00183   };
00184   
00185   /* recognize itself */
00186   {
00187     struct passwd pw_;
00188     struct passwd *pw;
00189     char buf[BUFSIZ];
00190     getpwuid_r(my_uid,&pw_,buf,BUFSIZ,&pw);
00191     if(pw != NULL) { my_username=pw->pw_name; };
00192   };
00193   if(my_username.length() == 0) {
00194     logger.msg(Arc::FATAL,"Can't recognize own username - EXITING"); return;
00195   };
00196   my_user = new JobUser(my_username);
00197   if(!configure_serviced_users(users,my_uid,my_username,*my_user,&daemon)) {
00198     logger.msg(Arc::INFO,"Used configuration file %s",nordugrid_config_loc());
00199     logger.msg(Arc::FATAL,"Error processing configuration - EXITING"); return;
00200   };
00201   if(users.size() == 0) {
00202     logger.msg(Arc::FATAL,"No suitable users found in configuration - EXITING"); return;
00203   };
00204 
00205   //daemon.logfile(DEFAULT_LOG_FILE);
00206   //daemon.pidfile(DEFAULT_PID_FILE);
00207   //if(daemon.daemon() != 0) {
00208   //  perror("Error - daemonization failed");
00209   //  exit(1);
00210   //}; 
00211   logger.msg(Arc::INFO,"Used configuration file %s",nordugrid_config_loc());
00212   print_serviced_users(users);
00213 
00214   //unsigned int wakeup_period = JobsList::WakeupPeriod();
00215   CommFIFO wakeup_interface;
00216   pthread_t wakeup_thread;
00217   pthread_t cache_thread;
00218   time_t hard_job_time; 
00219   pthread_cond_t sleep_cond = PTHREAD_COND_INITIALIZER;
00220   pthread_mutex_t sleep_mutex = PTHREAD_MUTEX_INITIALIZER;
00221   sleep_st wakeup_h;
00222   wakeup_h.sleep_cond=&sleep_cond;
00223   wakeup_h.sleep_mutex=&sleep_mutex;
00224   wakeup_h.timeout=&wakeup_interface;
00225   for(JobUsers::iterator i = users.begin();i!=users.end();++i) {
00226     wakeup_interface.add(*i);
00227   };
00228   wakeup_interface.timeout(JobsList::WakeupPeriod());
00229 
00230   // Prepare signal handler(s). Must be done after fork/daemon and preferably
00231   // before any new thread is started. 
00232   //# RunParallel run(&sleep_cond);
00233   //# if(!run.is_initialized()) {
00234   //#   logger.msg(Arc::ERROR,"Error - initialization of signal environment failed");
00235   //#   goto exit;
00236   //# };
00237 
00238   // I hope nothing till now used Globus
00239 
00240   // It looks like Globus screws signal setup somehow
00241   //# run.reinit(false);
00242 
00243   /* start timer thread - wake up every 2 minutes */
00244   if(pthread_create(&wakeup_thread,NULL,&wakeup_func,&wakeup_h) != 0) {
00245     logger.msg(Arc::ERROR,"Failed to start new thread"); return;
00246   };
00247   RunParallel::kicker(&kick_func,&wakeup_h);
00248   if(clean_first_level) {
00249     bool clean_finished = false;
00250     bool clean_active = false;
00251     bool clean_junk = false;
00252     if(clean_first_level >= 1) {
00253       clean_finished=true;
00254       if(clean_first_level >= 2) {
00255         clean_active=true;
00256         if(clean_first_level >= 3) {
00257           clean_junk=true;
00258         };
00259       };
00260     };
00261     for(;;) { 
00262       bool cleaned_all=true;
00263       for(JobUsers::iterator user = users.begin();user != users.end();++user) {
00264         size_t njobs = user->get_jobs()->size();
00265         user->get_jobs()->ScanNewJobs(false);
00266         if(user->get_jobs()->size() == njobs) break;
00267         cleaned_all=false;
00268         if(!(user->get_jobs()->DestroyJobs(clean_finished,clean_active)))  {
00269           logger.msg(Arc::WARNING,"Not all jobs are cleaned yet");
00270           sleep(10); 
00271           logger.msg(Arc::WARNING,"Trying again");
00272         };
00273         kill(getpid(),SIGCHLD);  /* make sure no child is missed */
00274       };
00275       if(cleaned_all) {
00276         if(clean_junk && clean_active && clean_finished) {  
00277           /* at the moment cleaning junk means cleaning all the files in 
00278              session and control directories */
00279           for(JobUsers::iterator user=users.begin();user!=users.end();++user) {
00280             std::list<FileData> flist;
00281             for(std::vector<std::string>::const_iterator i = user->SessionRoots().begin(); i != user->SessionRoots().end(); i++) {
00282               logger.msg(Arc::INFO,"Cleaning all files in directory %s", *i);
00283               delete_all_files(*i,flist,true);
00284             }
00285             logger.msg(Arc::INFO,"Cleaning all files in directory %s", user->ControlDir());
00286             delete_all_files(user->ControlDir(),flist,true);
00287           };
00288         };
00289         break;
00290       };
00291     };
00292     logger.msg(Arc::INFO,"Jobs cleaned");
00293   };
00294   // check if cleaning is enabled for any user, if so activate cleaning thread
00295   for (JobUsers::const_iterator cacheuser = users.begin(); cacheuser != users.end(); ++cacheuser) {
00296     if (cacheuser->CacheParams() && cacheuser->CacheParams()->cleanCache()) {
00297       if(pthread_create(&cache_thread,NULL,&cache_func,(void*)(&users))!=0) {
00298         logger.msg(Arc::INFO,"Failed to start new thread: cache won't be cleaned");
00299       }
00300       break;
00301     }
00302   }
00303   /* create control and session directories */
00304   for(JobUsers::iterator user = users.begin();user != users.end();++user) {
00305     user->CreateDirectories();
00306   };
00307   /* main loop - forewer */
00308   logger.msg(Arc::INFO,"Starting jobs' monitoring");
00309   hard_job_time = time(NULL) + HARD_JOB_PERIOD;
00310   for(;;) { 
00311     users.run_helpers();
00312     job_log.RunReporter(users);
00313     my_user->run_helpers();
00314     bool hard_job = time(NULL) > hard_job_time;
00315     for(JobUsers::iterator user = users.begin();user != users.end();++user) {
00316       /* look for new jobs */
00317       user->get_jobs()->ScanNewJobs(hard_job);
00318       /* process know jobs */
00319       user->get_jobs()->ActJobs(hard_job);
00320     };
00321     if(hard_job) hard_job_time = time(NULL) + HARD_JOB_PERIOD;
00322     pthread_mutex_lock(&sleep_mutex);
00323     pthread_cond_wait(&sleep_cond,&sleep_mutex);
00324     pthread_mutex_unlock(&sleep_mutex);
00325 //#    if(run.was_hup()) {
00326 //#      logger.msg(Arc::INFO,"SIGHUP detected");
00327 //#//      if(!configure_serviced_users(users,my_uid,my_username,*my_user)) {
00328 //#//        std::cout<<"Error processing configuration"<<std::endl; goto exit;
00329 //#//      };
00330 //#    }
00331 //#    else {
00332       logger.msg(Arc::DEBUG,"Waking up");
00333 //#    };
00334   };
00335   return;
00336 }
00337 
00338 /*
00339 GridManager::GridManager(Arc::XMLNode argv):active_(false) {
00340   args_st* args = new args_st;
00341   if(!args) return;
00342   args->argv=(char**)malloc(sizeof(char*)*(argv.Size()+1));
00343   args->argc=0;
00344   args->argv[args->argc]=strdup("grid-manager");
00345   logger.msg(Arc::VERBOSE, "ARG: %s", args->argv[args->argc]);
00346   for(;;) {
00347     Arc::XMLNode arg = argv["arg"][args->argc];
00348     ++(args->argc);
00349     if(!arg) break;
00350     args->argv[args->argc]=strdup(((std::string)arg).c_str());
00351     logger.msg(Arc::VERBOSE, "ARG: %s", args->argv[args->argc]);
00352   };
00353   active_=Arc::CreateThreadFunction(&grid_manager,args);
00354   if(!active_) delete args;
00355 }
00356 */
00357 
00358 GridManager::GridManager(const char* config_filename):active_(false) {
00359   void* arg = config_filename?strdup(config_filename):NULL;
00360   active_=Arc::CreateThreadFunction(&grid_manager,arg);
00361   if(!active_) if(arg) free(arg);
00362 }
00363 
00364 GridManager::~GridManager(void) {
00365 }
00366 
00367 } // namespace ARex
00368