Back to index

nordugrid-arc-nox  1.1.0~rc6
Thread.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 #define USE_THREAD_POOL
00007 
00008 #ifdef HAVE_STDINT_H
00009 #include <stdint.h>
00010 #endif
00011 
00012 #include <glibmm/init.h>
00013 
00014 #ifdef USE_THREAD_POOL
00015 #include <sys/time.h>
00016 #ifndef WIN32
00017 #include <sys/resource.h>
00018 #endif
00019 #endif
00020 
00021 #include "Thread.h"
00022 #include "Logger.h"
00023 
00024 
00025 namespace Arc {
00026 
00027   static Logger threadLogger(Logger::getRootLogger(), "Thread");
00028 
00029 #ifdef USE_THREAD_POOL
00030   class ThreadInc {
00031    public:
00032     ThreadInc(void);
00033     ~ThreadInc(void);
00034   };
00035 #endif
00036 
00037   class ThreadArgument {
00038   public:
00039     typedef void (*func_t)(void*);
00040     void *arg;
00041     func_t func;
00042 #ifdef USE_THREAD_POOL
00043     ThreadInc* resource;
00044     ThreadArgument& acquire(void) {
00045       resource = new ThreadInc;
00046       return *this;
00047     }
00048     void release(void) {
00049       if(resource) delete resource;
00050       resource = NULL;
00051     }
00052 #endif
00053     ThreadArgument(func_t f, void *a)
00054       : arg(a),
00055         func(f)
00056 #ifdef USE_THREAD_POOL
00057         ,resource(NULL)
00058 #endif
00059     {}
00060     ~ThreadArgument(void) {}
00061     void thread(void);
00062   };
00063 
00064 
00065 #ifdef USE_THREAD_POOL
00066   // This is not a real pool. It is just a queue of requests for
00067   // new threads. Hopefuly it will be converted to pool later for
00068   // better performance.
00069   class ThreadPool {
00070    public:
00071    friend class ThreadInc;
00072    private:
00073     int max_count;
00074     int count;
00075     Glib::Mutex count_lock;
00076     Glib::Mutex queue_lock;
00077     std::list<ThreadArgument*> queue;
00078     int CheckQueue(void);
00079     ~ThreadPool(void) { };
00080    public:
00081     ThreadPool(void);
00082     void PushQueue(ThreadArgument* arg);
00083   };
00084 
00085   ThreadPool::ThreadPool(void):max_count(0),count(0) {
00086     // Estimating amount of available memory 
00087     uint64_t n = 0;
00088 #ifndef WIN32
00089     struct rlimit rl;
00090     if(getrlimit(RLIMIT_AS,&rl) == 0) {
00091       if(rl.rlim_cur != RLIM_INFINITY) {
00092         // Dividing by 2 assuming each thread will equally use 
00093         // stack and heap
00094         n = rl.rlim_cur/thread_stacksize/2;
00095         if(n == 0) {
00096           // What else can we do. Application will fail on first thread
00097           n=1;
00098         }
00099       } else if(rl.rlim_max != RLIM_INFINITY) {
00100         n = rl.rlim_max/thread_stacksize/2;
00101         if(n == 0) {
00102           n=1;
00103         }
00104       }
00105     }
00106 #endif
00107     if(n == 0) {
00108       // Very rough estimation of number of threads which can be run
00109       n = (((uint64_t)1)<<(8*sizeof(int*) - 2))/thread_stacksize;
00110     }
00111     if(n > INT_MAX) n = INT_MAX;
00112     max_count = (int)n;
00113     // TODO: can't use logger here because it will try to initilize pool
00114     //threadLogger.msg(DEBUG, "Maximum number of threads is %i",max_count);
00115   }
00116   int ThreadPool::CheckQueue(void) {
00117     Glib::Mutex::Lock lock(queue_lock, Glib::TRY_LOCK);
00118     if(!lock.locked()) return -1;
00119     int size = queue.size();
00120     while((count < max_count) && (size > 0)) {
00121       ThreadArgument* argument = *(queue.begin());
00122       argument->acquire();
00123       try {
00124         UserSwitch usw(0,0);
00125         Glib::Thread::create(sigc::mem_fun(*argument,
00126                                            &ThreadArgument::thread),
00127                              thread_stacksize, false, false,
00128                              Glib::THREAD_PRIORITY_NORMAL);
00129         queue.erase(queue.begin());
00130       } catch (Glib::Error& e) {
00131         threadLogger.msg(ERROR, e.what());
00132         argument->release();
00133       } catch (std::exception& e) {
00134         threadLogger.msg(ERROR, e.what());
00135         argument->release();
00136       };
00137       size = queue.size();
00138     }
00139     return size;
00140   }
00141 
00142   void ThreadPool::PushQueue(ThreadArgument* arg) {
00143     Glib::Mutex::Lock lock(queue_lock);
00144     queue.push_back(arg);
00145     lock.release();
00146     if(CheckQueue() > 0)
00147       threadLogger.msg(INFO, "Maximum number of threads running - puting new request into queue");
00148   }
00149 
00150   static ThreadPool* pool = NULL;
00151 
00152   ThreadInc::ThreadInc(void) {
00153     if(!pool) return;
00154     pool->count_lock.lock();
00155     ++(pool->count);
00156     pool->count_lock.unlock();
00157   }
00158 
00159   ThreadInc::~ThreadInc(void) {
00160     if(!pool) return;
00161     pool->count_lock.lock();
00162     --(pool->count);
00163     pool->count_lock.unlock();
00164     pool->CheckQueue();
00165   }
00166 #endif
00167 
00168   void ThreadArgument::thread(void) {
00169 #ifdef USE_THREAD_POOL
00170     ThreadInc resource_;
00171     release();
00172 #endif
00173     func_t f_temp = func;
00174     void *a_temp = arg;
00175     delete this;
00176     (*f_temp)(a_temp);
00177   }
00178 
00179   bool CreateThreadFunction(void (*func)(void*), void *arg) {
00180 #ifdef USE_THREAD_POOL
00181     if(!pool) return false;
00182     ThreadArgument *argument = new ThreadArgument(func, arg);
00183     pool->PushQueue(argument);
00184 #else
00185     ThreadArgument *argument = new ThreadArgument(func, arg);
00186     try {
00187       UserSwitch usw(0,0);
00188       Glib::Thread::create(sigc::mem_fun(*argument, &ThreadArgument::thread),
00189                            thread_stacksize, false, false,
00190                            Glib::THREAD_PRIORITY_NORMAL);
00191     } catch (std::exception& e) {
00192       threadLogger.msg(ERROR, e.what());
00193       delete argument;
00194       return false;
00195     };
00196 #endif
00197     return true;
00198   }
00199 
00200 /*
00201   bool CreateThreadFunction(void (*func)(void*), void *arg, Glib::Thread *&thr) {
00202     ThreadArgument *argument = new ThreadArgument(func, arg);
00203     Glib::Thread *thread;
00204     try {
00205       UserSwitch usw(0,0);
00206       thread = Glib::Thread::create(sigc::mem_fun(*argument, &ThreadArgument::thread),             
00207                          thread_stacksize,
00208                                   true,  // thread joinable
00209                                  false,
00210           Glib::THREAD_PRIORITY_NORMAL);
00211     } catch (std::exception& e) {
00212       threadLogger.msg(ERROR, e.what());
00213       delete argument;
00214       return false;
00215     };    
00216     thr = thread;
00217     return true;
00218   }
00219 */
00220 
00221   /*
00222      Example of how to use CreateThreadClass macro
00223 
00224      class testclass {
00225       public:
00226           int a;
00227           testclass(int v) { a=v; };
00228           void run(void) { a=0; };
00229      };
00230 
00231      void test(void) {
00232      testclass tc(1);
00233      CreateThreadClass(tc,testclass::run);
00234      }
00235    */
00236 
00237   void GlibThreadInitialize(void) {
00238     Glib::init();
00239     if (!Glib::thread_supported()) {
00240       Glib::thread_init();
00241 #ifdef USE_THREAD_POOL
00242       pool = new ThreadPool;
00243 #endif
00244     }
00245   }
00246 
00247   ThreadRegistry::ThreadRegistry(void):counter_(0),cancel_(false) {
00248   }
00249 
00250   ThreadRegistry::~ThreadRegistry(void) {
00251   }
00252 
00253   void ThreadRegistry::RegisterThread(void) {
00254     lock_.lock();
00255     ++counter_;
00256     lock_.unlock();
00257   }
00258 
00259   void ThreadRegistry::UnregisterThread(void) {
00260     lock_.lock();
00261     ++counter_;
00262     cond_.broadcast();
00263     lock_.unlock();
00264   }
00265 
00266   bool ThreadRegistry::WaitOrCancel(int timeout) {
00267     bool v = false;
00268     lock_.lock();
00269     Glib::TimeVal etime;
00270     etime.assign_current_time();
00271     etime.add_milliseconds(timeout);
00272     while (!cancel_) {
00273       if(!cond_.timed_wait(lock_, etime)) break;
00274     }
00275     v = cancel_;
00276     lock_.unlock();
00277     return v;
00278   }
00279 
00280   bool ThreadRegistry::WaitForExit(int timeout) {
00281     int n = 0;
00282     lock_.lock();
00283     if(timeout >= 0) {
00284       Glib::TimeVal etime;
00285       etime.assign_current_time();
00286       etime.add_milliseconds(timeout);
00287       while (counter_ > 0) {
00288         if(!cond_.timed_wait(lock_, etime)) break;
00289       }
00290     } else {
00291       while (counter_ > 0) {
00292         cond_.wait(lock_);
00293       }
00294     }
00295     n = counter_;
00296     lock_.unlock();
00297     return (n <= 0);
00298   }
00299 
00300   void ThreadRegistry::RequestCancel(void) {
00301     lock_.lock();
00302     cancel_=true;
00303     cond_.broadcast();
00304     lock_.unlock();
00305   }
00306 
00307 } // namespace Arc