Back to index

nordugrid-arc-nox  1.1.0~rc6
Run_unix.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 
00008 #include <sys/types.h>
00009 #include <sys/wait.h>
00010 #include <fcntl.h>
00011 #include <signal.h>
00012 #include <poll.h>
00013 
00014 #include <iostream>
00015 
00016 #include <glibmm.h>
00017 
00018 #include <arc/Thread.h>
00019 #include <arc/Logger.h>
00020 
00021 #include "User.h"
00022 #include "Run.h"
00023 
00024 
00025 namespace Arc {
00026 
00027 #define SAFE_DISCONNECT(CONNECTOR) { \
00028     try { \
00029       (CONNECTOR).disconnect(); \
00030     } catch (Glib::Exception& e) { \
00031     } catch (std::exception& e) { \
00032     }; \
00033 }
00034 
00035   class RunPump {
00036     friend class Run;
00037   private:
00038     static Glib::Mutex instance_lock_;
00039     static RunPump *instance_;
00040     static unsigned int mark_;
00041 #define RunPumpMagic (0xA73E771F)
00042     class Abandoned {
00043     friend class RunPump;
00044     private:
00045       sigc::connection child_conn_;
00046       Glib::Pid pid_;
00047     public:
00048       Abandoned(Glib::Pid pid,sigc::connection child_conn):child_conn_(child_conn),pid_(pid) { };
00049     };
00050     std::list<Abandoned> abandoned_;
00051     //std::list<Run*> runs_;
00052     Glib::Mutex abandoned_lock_;
00053     Glib::Mutex list_lock_;
00054     Glib::Mutex pump_lock_;
00055     Glib::RefPtr<Glib::MainContext> context_;
00056     Glib::Thread *thread_;
00057     RunPump(void);
00058     ~RunPump(void);
00059     static RunPump& Instance(void);
00060     operator bool(void) {
00061       return (bool)context_;
00062     }
00063     bool operator!(void) {
00064       return !(bool)context_;
00065     }
00066     void Pump(void);
00067     void Add(Run *r);
00068     void Remove(Run *r);
00069     void child_handler(Glib::Pid pid, int result);
00070   };
00071 
00072   Glib::Mutex RunPump::instance_lock_;
00073   RunPump* RunPump::instance_ = NULL;
00074   unsigned int RunPump::mark_ = ~RunPumpMagic;
00075 
00076   class Pid {
00077   private:
00078     Glib::Pid p_;
00079   public:
00080     Pid(void):p_(0) { }; 
00081     Pid(Glib::Pid p):p_(p) { }; 
00082     operator Glib::Pid(void) { return p_; };
00083     Glib::Pid pid(void) { return p_; };
00084     Glib::Pid operator=(Glib::Pid p) { return (p_=p); };
00085   };
00086 
00087   class RunInitializerArgument {
00088   private:
00089     void *arg_;
00090     void (*func_)(void*);
00091   public:
00092     RunInitializerArgument(void(*func)(void*), void *arg)
00093       : arg_(arg),
00094         func_(func) {}
00095     void Run(void);
00096   };
00097 
00098   void RunInitializerArgument::Run(void) {
00099     void *arg = arg_;
00100     void (*func)(void*) = func_;
00101     delete this;
00102     // To leave clean environment reset all signal.
00103     // Otherwise we may get some signals non-intentionally ignored.
00104     // Glib takes care of open handles.
00105 #ifdef SIGRTMIN
00106     for(int n = SIGHUP; n < SIGRTMIN; ++n)
00107 #else
00108     // At least reset all signals whoe numbers are well defined
00109     for(int n = SIGHUP; n < SIGTERM; ++n)
00110 #endif
00111       signal(n,SIG_DFL);
00112     if (!func)
00113       return;
00114     (*func)(arg);
00115     return;
00116   }
00117 
00118   RunPump::RunPump(void)
00119     : context_(NULL),
00120       thread_(NULL) {
00121     try {
00122       thread_ = Glib::Thread::create(sigc::mem_fun(*this, &RunPump::Pump), false);
00123     } catch (Glib::Exception& e) {} catch (std::exception& e) {}
00124     ;
00125     if (thread_ == NULL)
00126       return;
00127     // Wait for context_ to be intialized
00128     // TODO: what to do if context never initialized
00129     for (;;) {
00130       if (context_)
00131         break;
00132       thread_->yield(); // This is simpler than condition+mutex
00133     }
00134   }
00135 
00136   RunPump& RunPump::Instance(void) {
00137     instance_lock_.lock();
00138     if ((instance_ == NULL) || (mark_ != RunPumpMagic)) {
00139       instance_ = new RunPump();
00140       mark_ = RunPumpMagic;
00141     }
00142     instance_lock_.unlock();
00143     return *instance_;
00144   }
00145 
00146   void RunPump::Pump(void) {
00147     // TODO: put try/catch everythere for glibmm errors
00148     try {
00149       context_ = Glib::MainContext::create();
00150       // In infinite loop monitor state of children processes
00151       // and pump information to/from std* channels if requested
00152       //context_->acquire();
00153       for (;;) {
00154         list_lock_.lock();
00155         //      sleep(1);
00156         list_lock_.unlock();
00157         pump_lock_.lock();
00158         bool dispatched = context_->iteration(true);
00159         for(std::list<Abandoned>::iterator a = abandoned_.begin();
00160                             a != abandoned_.end();) {
00161           if(a->pid_ == 0) {
00162             SAFE_DISCONNECT(a->child_conn_);
00163             a = abandoned_.erase(a);
00164           } else {
00165             ++a;
00166           }
00167         }
00168         pump_lock_.unlock();
00169         thread_->yield();
00170         if (!dispatched)
00171           sleep(1);
00172       }
00173     } catch (Glib::Exception& e) {} catch (std::exception& e) {}
00174     ;
00175   }
00176 
00177   void RunPump::Add(Run *r) {
00178     if (!r)
00179       return;
00180     if (!(*r))
00181       return;
00182     if (!(*this))
00183       return;
00184     // Take full control over context
00185     list_lock_.lock();
00186     while (true) {
00187       context_->wakeup();
00188       if (pump_lock_.trylock())
00189         break;
00190       sleep(1);
00191     }
00192     try {
00193       // Add sources to context
00194       if (r->stdout_str_ && !(r->stdout_keep_))
00195         r->stdout_conn_ = context_->signal_io().connect(sigc::mem_fun(*r, &Run::stdout_handler), r->stdout_, Glib::IO_IN | Glib::IO_HUP);
00196       if (r->stderr_str_ && !(r->stderr_keep_))
00197         r->stderr_conn_ = context_->signal_io().connect(sigc::mem_fun(*r, &Run::stderr_handler), r->stderr_, Glib::IO_IN | Glib::IO_HUP);
00198       if (r->stdin_str_ && !(r->stdin_keep_))
00199         r->stdin_conn_ = context_->signal_io().connect(sigc::mem_fun(*r, &Run::stdin_handler), r->stdin_, Glib::IO_OUT | Glib::IO_HUP);
00200 #ifdef HAVE_GLIBMM_CHILDWATCH
00201       r->child_conn_ = context_->signal_child_watch().connect(sigc::mem_fun(*r, &Run::child_handler), r->pid_->pid());
00202       //if(r->child_conn_.empty()) std::cerr<<"connect for signal_child_watch failed"<<std::endl;
00203 #endif
00204     } catch (Glib::Exception& e) {} catch (std::exception& e) {}
00205     ;
00206     pump_lock_.unlock();
00207     list_lock_.unlock();
00208   }
00209 
00210   void RunPump::Remove(Run *r) {
00211     if (!r)
00212       return;
00213     if (!(*r))
00214       return;
00215     if (!(*this))
00216       return;
00217     // Take full control over context
00218     list_lock_.lock();
00219     while (true) {
00220       context_->wakeup();
00221       if (pump_lock_.trylock())
00222         break;
00223       sleep(1);
00224     }
00225     // Disconnect sources from context
00226     SAFE_DISCONNECT(r->stdout_conn_);
00227     SAFE_DISCONNECT(r->stderr_conn_);
00228     SAFE_DISCONNECT(r->stdin_conn_);
00229     SAFE_DISCONNECT(r->child_conn_);
00230     if(r->running_) {
00231       abandoned_.push_back(Abandoned(r->pid_->pid(),context_->signal_child_watch().connect(sigc::mem_fun(*this,&RunPump::child_handler), r->pid_->pid())));
00232       r->running_ = false;
00233     };
00234     pump_lock_.unlock();
00235     list_lock_.unlock();
00236   }
00237 
00238   void RunPump::child_handler(Glib::Pid pid, int result) {
00239     abandoned_lock_.lock();
00240     for(std::list<Abandoned>::iterator a = abandoned_.begin();
00241                         a != abandoned_.end();++a) {
00242       if(a->pid_ == pid) {
00243         a->pid_ = 0;
00244         break;
00245       }
00246     }
00247     abandoned_lock_.unlock();
00248   }
00249 
00250   Run::Run(const std::string& cmdline)
00251     : working_directory("."),
00252       stdout_(-1),
00253       stderr_(-1),
00254       stdin_(-1),
00255       stdout_str_(NULL),
00256       stderr_str_(NULL),
00257       stdin_str_(NULL),
00258       stdout_keep_(false),
00259       stderr_keep_(false),
00260       stdin_keep_(false),
00261       pid_(NULL),
00262       argv_(Glib::shell_parse_argv(cmdline)),
00263       initializer_func_(NULL),
00264       initializer_arg_(NULL),
00265       kicker_func_(NULL),
00266       started_(false),
00267       running_(false),
00268       abandoned_(false),
00269       result_(-1) {
00270     pid_ = new Pid;
00271   }
00272 
00273   Run::Run(const std::list<std::string>& argv)
00274     : working_directory("."),
00275       stdout_(-1),
00276       stderr_(-1),
00277       stdin_(-1),
00278       stdout_str_(NULL),
00279       stderr_str_(NULL),
00280       stdin_str_(NULL),
00281       pid_(0),
00282       argv_(argv),
00283       initializer_func_(NULL),
00284       initializer_arg_(NULL),
00285       kicker_func_(NULL),
00286       started_(false),
00287       running_(false),
00288       abandoned_(false),
00289       result_(-1) {
00290     pid_ = new Pid;
00291   }
00292 
00293   Run::~Run(void) {
00294     if(*this) {
00295       if(!abandoned_) Kill(0);
00296       CloseStdout();
00297       CloseStderr();
00298       CloseStdin();
00299       RunPump::Instance().Remove(this);
00300     };
00301   }
00302 
00303   bool Run::Start(void) {
00304     if (started_)
00305       return false;
00306     if (argv_.size() < 1)
00307       return false;
00308     RunPump& pump = RunPump::Instance();
00309     RunInitializerArgument *arg = NULL;
00310     try {
00311       UserSwitch usw(0,0);
00312       running_ = true;
00313       Glib::Pid pid;
00314       arg = new RunInitializerArgument(initializer_func_, initializer_arg_);
00315       spawn_async_with_pipes(working_directory, argv_,
00316                              Glib::SpawnFlags(Glib::SPAWN_DO_NOT_REAP_CHILD),
00317                              sigc::mem_fun(*arg, &RunInitializerArgument::Run),
00318                              &pid,
00319                              stdin_keep_  ? NULL : &stdin_,
00320                              stdout_keep_ ? NULL : &stdout_,
00321                              stderr_keep_ ? NULL : &stderr_);
00322       *pid_ = pid;
00323       if (!stdin_keep_)
00324         fcntl(stdin_, F_SETFL, fcntl(stdin_, F_GETFL) | O_NONBLOCK);
00325       if (!stdout_keep_)
00326         fcntl(stdout_, F_SETFL, fcntl(stdout_, F_GETFL) | O_NONBLOCK);
00327       if (!stderr_keep_)
00328         fcntl(stderr_, F_SETFL, fcntl(stderr_, F_GETFL) | O_NONBLOCK);
00329       started_ = true;
00330     } catch (Glib::Exception& e) {
00331       running_ = false;
00332       // TODO: report error
00333       return false;
00334     } catch (std::exception& e) {
00335       running_ = false;
00336       return false;
00337     };
00338     pump.Add(this);
00339     if (arg)
00340       delete arg;
00341     return true;
00342   }
00343 
00344   void Run::Kill(int timeout) {
00345 #ifndef HAVE_GLIBMM_CHILDWATCH
00346     Wait(0);
00347 #endif
00348     if (!running_)
00349       return;
00350     if (timeout > 0) {
00351       // Kill softly
00352       ::kill(pid_->pid(), SIGTERM);
00353       Wait(timeout);
00354     }
00355     if (!running_)
00356       return;
00357     // Kill with no merci
00358     ::kill(pid_->pid(), SIGKILL);
00359   }
00360 
00361   void Run::Abandon(void) {
00362     if(*this) {
00363       CloseStdout();
00364       CloseStderr();
00365       CloseStdin();
00366       abandoned_=true;
00367     }
00368   }
00369 
00370   bool Run::stdout_handler(Glib::IOCondition) {
00371     if (stdout_str_) {
00372       char buf[256];
00373       int l = ReadStdout(0, buf, sizeof(buf));
00374       if ((l == 0) || (l == -1)) {
00375         CloseStdout();
00376         return false;
00377       }
00378       else
00379         stdout_str_->append(buf, l);
00380     }
00381     else {
00382       // Event shouldn't happen if not expected
00383 
00384     }
00385     return true;
00386   }
00387 
00388   bool Run::stderr_handler(Glib::IOCondition) {
00389     if (stderr_str_) {
00390       char buf[256];
00391       int l = ReadStderr(0, buf, sizeof(buf));
00392       if ((l == 0) || (l == -1)) {
00393         CloseStderr();
00394         return false;
00395       }
00396       else
00397         stderr_str_->append(buf, l);
00398     }
00399     else {
00400       // Event shouldn't happen if not expected
00401 
00402     }
00403     return true;
00404   }
00405 
00406   bool Run::stdin_handler(Glib::IOCondition) {
00407     if (stdin_str_) {
00408       if (stdin_str_->length() == 0) {
00409         CloseStdin();
00410         stdin_str_ = NULL;
00411       }
00412       else {
00413         int l = WriteStdin(0, stdin_str_->c_str(), stdin_str_->length());
00414         if (l == -1) {
00415           CloseStdin();
00416           return false;
00417         }
00418         else
00419           // Not very effective
00420           *stdin_str_ = stdin_str_->substr(l);
00421       }
00422     }
00423     else {
00424       // Event shouldn't happen if not expected
00425 
00426     }
00427     return true;
00428   }
00429 
00430   void Run::child_handler(Glib::Pid, int result) {
00431     if (stdout_str_)
00432       for (;;)
00433         if (!stdout_handler(Glib::IO_IN))
00434           break;
00435     if (stderr_str_)
00436       for (;;)
00437         if (!stderr_handler(Glib::IO_IN))
00438           break;
00439     CloseStdout();
00440     CloseStderr();
00441     CloseStdin();
00442     lock_.lock();
00443     cond_.signal();
00444     // There is reference in Glib manual that 'result' is same
00445     // as returned by waitpid. It is not clear how it works for
00446     // windows but atleast for *nix we can use waitpid related
00447     // macros.
00448     if(WIFEXITED(result)) {
00449       result_ = WEXITSTATUS(result);
00450     } else {
00451       result_ = -1;
00452     }
00453     running_ = false;
00454     lock_.unlock();
00455     if (kicker_func_)
00456       (*kicker_func_)(kicker_arg_);
00457   }
00458 
00459   void Run::CloseStdout(void) {
00460     if (stdout_ != -1)
00461       ::close(stdout_);
00462     stdout_ = -1;
00463     SAFE_DISCONNECT(stdout_conn_);
00464   }
00465 
00466   void Run::CloseStderr(void) {
00467     if (stderr_ != -1)
00468       ::close(stderr_);
00469     stderr_ = -1;
00470     SAFE_DISCONNECT(stderr_conn_);
00471   }
00472 
00473   void Run::CloseStdin(void) {
00474     if (stdin_ != -1)
00475       ::close(stdin_);
00476     stdin_ = -1;
00477     SAFE_DISCONNECT(stdin_conn_);
00478   }
00479 
00480   int Run::ReadStdout(int timeout, char *buf, int size) {
00481     if (stdout_ == -1)
00482       return -1;
00483     // TODO: do it through context for timeout?
00484     pollfd fd;
00485     fd.fd = stdout_; fd.events = POLLIN; fd.revents = 0;
00486     int err = poll(&fd, 1, timeout);
00487     if(err <= 0) return err;
00488     if(!(fd.revents & POLLIN)) return -1;
00489     return ::read(stdout_, buf, size);
00490   }
00491 
00492   int Run::ReadStderr(int timeout, char *buf, int size) {
00493     if (stderr_ == -1)
00494       return -1;
00495     // TODO: do it through context for timeout
00496     pollfd fd;
00497     fd.fd = stderr_; fd.events = POLLIN; fd.revents = 0;
00498     int err = poll(&fd, 1, timeout);
00499     if(err <= 0) return err;
00500     if(!(fd.revents & POLLIN)) return -1;
00501     return ::read(stderr_, buf, size);
00502   }
00503 
00504   int Run::WriteStdin(int timeout, const char *buf, int size) {
00505     if (stdin_ == -1)
00506       return -1;
00507     // TODO: do it through context for timeout
00508     pollfd fd;
00509     fd.fd = stdout_; fd.events = POLLOUT; fd.revents = 0;
00510     int err = poll(&fd, 1, timeout);
00511     if(err <= 0) return err;
00512     if(!(fd.revents & POLLOUT)) return -1;
00513     return write(stdin_, buf, size);
00514   }
00515 
00516   bool Run::Running(void) {
00517 #ifdef HAVE_GLIBMM_CHILDWATCH
00518     return running_;
00519 #else
00520     Wait(0);
00521     return running_;
00522 #endif
00523   }
00524 
00525   bool Run::Wait(int timeout) {
00526     if (!started_)
00527       return false;
00528     if (!running_)
00529       return true;
00530     Glib::TimeVal till;
00531     till.assign_current_time();
00532     till += timeout;
00533     lock_.lock();
00534     while (running_) {
00535       Glib::TimeVal t;
00536       t.assign_current_time();
00537       t.subtract(till);
00538 #ifdef HAVE_GLIBMM_CHILDWATCH
00539       if (!t.negative())
00540         break;
00541       cond_.timed_wait(lock_, till);
00542 #else
00543       int status;
00544       int r = waitpid(pid_->pid(), &status, WNOHANG);
00545       if (r == 0) {
00546         if (!t.negative())
00547           break;
00548         lock_.unlock();
00549         sleep(1);
00550         lock_.lock();
00551         continue;
00552       }
00553       if (r == -1) // Child lost?
00554         status = -1;
00555       else
00556         status = WEXITSTATUS(status);
00557       // Child exited
00558       lock_.unlock();
00559       child_handler(pid_->pid(), status << 8);
00560       lock_.lock();
00561 #endif
00562     }
00563     lock_.unlock();
00564     return (!running_);
00565   }
00566 
00567   bool Run::Wait(void) {
00568     if (!started_)
00569       return false;
00570     if (!running_)
00571       return true;
00572     lock_.lock();
00573     Glib::TimeVal till;
00574     while (running_) {
00575 #ifdef HAVE_GLIBMM_CHILDWATCH
00576       till.assign_current_time();
00577       till += 1; // one sec later
00578       cond_.timed_wait(lock_, till);
00579 #else
00580       int status;
00581       int r = waitpid(pid_->pid(), &status, WNOHANG);
00582       if (r == 0) {
00583         lock_.unlock();
00584         sleep(1);
00585         lock_.lock();
00586         continue;
00587       }
00588       if (r == -1) // Child lost?
00589         status = -1;
00590       else
00591         status = WEXITSTATUS(status);
00592       // Child exited
00593       lock_.unlock();
00594       child_handler(pid_->pid(), status << 8);
00595       lock_.lock();
00596 #endif
00597     }
00598     lock_.unlock();
00599     return (!running_);
00600   }
00601 
00602   void Run::AssignStdout(std::string& str) {
00603     if (!running_)
00604       stdout_str_ = &str;
00605   }
00606 
00607   void Run::AssignStderr(std::string& str) {
00608     if (!running_)
00609       stderr_str_ = &str;
00610   }
00611 
00612   void Run::AssignStdin(std::string& str) {
00613     if (!running_)
00614       stdin_str_ = &str;
00615   }
00616 
00617   void Run::KeepStdout(bool keep) {
00618     if (!running_)
00619       stdout_keep_ = keep;
00620   }
00621 
00622   void Run::KeepStderr(bool keep) {
00623     if (!running_)
00624       stderr_keep_ = keep;
00625   }
00626 
00627   void Run::KeepStdin(bool keep) {
00628     if (!running_)
00629       stdin_keep_ = keep;
00630   }
00631 
00632   void Run::AssignInitializer(void (*initializer_func)(void *arg), void *initializer_arg) {
00633     if (!running_) {
00634       initializer_arg_ = initializer_arg;
00635       initializer_func_ = initializer_func;
00636     }
00637   }
00638 
00639   void Run::AssignKicker(void (*kicker_func)(void *arg), void *kicker_arg) {
00640     if (!running_) {
00641       kicker_arg_ = kicker_arg;
00642       kicker_func_ = kicker_func;
00643     }
00644   }
00645 
00646 }