Back to index

nordugrid-arc-nox  1.1.0~rc6
job_queue.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 
00005 #include <arc/ByteArray.h>
00006 #include "job_queue.h"
00007 
00008 namespace Arc
00009 {
00010 
00011 static JobSelector *default_selector = new JobSelector();
00012 
00013 JobQueueIterator::JobQueueIterator()
00014 {
00015     tid_ = NULL;
00016     cursor_ = NULL;
00017     job_ = NULL;
00018     has_more_ = false; 
00019     have_status_ = false;
00020     selector_ = default_selector;
00021 }
00022 
00023 void JobQueueIterator::next(void)
00024 {
00025     int ret;
00026     Dbt key, value;
00027     key.set_flags(0);
00028     value.set_flags(0);
00029     for (;;) {
00030         try {
00031             ret = cursor_->get(&key, &value, DB_NEXT); 
00032             if (ret == DB_NOTFOUND) {
00033                 has_more_ = false;
00034                 break;
00035             }
00036         } catch (DbDeadlockException &de) {
00037             has_more_ = false;
00038             break;
00039         }
00040         // free(key.get_data());
00041         ByteArray a(value.get_data(), value.get_size());
00042         // free(value.get_data());
00043         job_ = new Job(a);
00044         if (have_status_ == false) {
00045             // only one query
00046             break;
00047         } else {
00048             if (selector_->match(job_) == true) {
00049                 // query until found job right by selector
00050                 break;
00051             } else {
00052                 delete job_;
00053                 job_ = NULL;
00054             }
00055         }
00056     }
00057 }
00058 
00059 
00060 JobQueueIterator::JobQueueIterator(DbTxn *tid, Dbc *cursor)
00061 {
00062     has_more_ = true;
00063     tid_ = tid;
00064     cursor_ = cursor;
00065     have_status_ = false;
00066     job_ = NULL;
00067     selector_ = default_selector;
00068     next();
00069 }
00070 
00071 JobQueueIterator::JobQueueIterator(DbTxn *tid, Dbc *cursor, JobSelector *selector)
00072 {
00073     has_more_ = true;
00074     tid_ = tid;
00075     cursor_ = cursor;
00076     have_status_ = true;
00077     job_ = NULL;
00078     selector_ = selector;
00079     next();
00080 }
00081 
00082 const JobQueueIterator &JobQueueIterator::operator++()
00083 {
00084     delete job_;
00085     job_ = NULL;
00086     next();
00087     return *this;
00088 }
00089 
00090 const JobQueueIterator &JobQueueIterator::operator++(int)
00091 {
00092     delete job_;
00093     job_ = NULL;
00094     next();
00095     return *this;
00096 }
00097 
00098 void JobQueueIterator::finish(void)
00099 {
00100     if (job_ != NULL) {
00101         delete job_;
00102         job_ = NULL;
00103     }
00104     if (cursor_ != NULL) {
00105         cursor_->close();
00106         cursor_ = NULL;
00107     }
00108     if (tid_ != NULL) {
00109         tid_->commit(0);
00110         tid_ = NULL;
00111     }
00112 }
00113 
00114 bool JobQueueIterator::refresh()
00115 {
00116     // generate key
00117     void *buf = (void *)job_->getID().c_str();
00118     int size = job_->getID().size() + 1;
00119     Dbt key(buf, size);
00120     // generate data
00121     ByteArray &a = job_->serialize();
00122     Dbt data(a.data(), a.size());
00123     try {
00124         cursor_->put(&key, &data, DB_KEYFIRST);
00125         return true;
00126     } catch (DbDeadlockException &de) {
00127         return false;        
00128     } catch (DbException &e) {
00129         return false;
00130     }
00131 }
00132 
00133 void JobQueueIterator::remove()
00134 {
00135     cursor_->del(0);
00136 }
00137 
00138 JobQueueIterator::~JobQueueIterator()
00139 {
00140     finish();
00141 }
00142 
00143 void JobQueue::init(const std::string &dbroot, const std::string &store_name)
00144 {
00145     env_ = NULL;
00146     db_ = NULL;
00147     env_ = new DbEnv(0); // Exception will occure
00148     // env_->open(dbroot.c_str(), DB_CREATE | DB_INIT_CDB | DB_INIT_MPOOL | DB_THREAD, 0644);
00149     env_->open(dbroot.c_str(), DB_CREATE | DB_INIT_MPOOL | DB_INIT_LOCK | DB_INIT_TXN | DB_RECOVER | DB_THREAD, 0644);
00150     // setup internal deadlock detection mechanizm
00151     env_->set_lk_detect(DB_LOCK_DEFAULT);
00152     db_ = new Db(env_, 0);
00153     try {
00154 #if (DB_VERSION_MAJOR < 4) || (DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR == 0)
00155         db_->open(store_name.c_str(), DB_BTREE, DB_CREATE | DB_THREAD, 0644);
00156 #else
00157         DbTxn *tid = NULL;
00158         env_->txn_begin(NULL, &tid, 0);
00159         db_->open(tid, store_name.c_str(), NULL, DB_BTREE, DB_CREATE | DB_THREAD, 0644);
00160         tid->commit(0);
00161 #endif
00162     } catch (DbException &e) {
00163         logger_.msg(Arc::ERROR, "Cannot open database");
00164         db_ = NULL;
00165         return;
00166     }
00167 }
00168 
00169 JobQueue::~JobQueue(void) 
00170 {
00171     if (db_ != NULL) {
00172         db_->close(0);
00173         delete db_;
00174     }
00175     if (env_ != NULL) {
00176         env_->close(0);
00177         delete env_;
00178     }
00179 }
00180 
00181 void JobQueue::refresh(Job &j)
00182 {
00183     // generate key
00184     void *buf = (void *)j.getID().c_str();
00185     int size = j.getID().size() + 1;
00186     Dbt key(buf, size);
00187     // generate data
00188     ByteArray &a = j.serialize();
00189     Dbt data(a.data(), a.size());
00190     
00191     DbTxn *tid = NULL;
00192     while (true) {
00193         try {
00194             env_->txn_begin(NULL, &tid, 0);
00195             db_->put(tid, &key, &data, 0);
00196             tid->commit(0);
00197             return;
00198         } catch (DbDeadlockException &de) {
00199             try {
00200                 tid->abort();
00201             } catch (DbException &e) {
00202                 logger_.msg(Arc::ERROR, "refresh: Cannot abort transaction: %s", e.what());
00203                 return;
00204             }
00205         } catch (DbException &e) {
00206             logger_.msg(Arc::ERROR, "refresh: Error during transaction: %s", e.what());
00207             tid->abort();
00208             return;
00209         }
00210     }
00211 }
00212 
00213 Job *JobQueue::operator[](const std::string &id)
00214 {
00215     void *buf = (void *)id.c_str();
00216     int size = id.size() + 1;
00217     Dbt key(buf, size);
00218     Dbt data;
00219     data.set_flags(DB_DBT_MALLOC);
00220     DbTxn *tid = NULL;
00221     while (true) {
00222         try {
00223             env_->txn_begin(NULL, &tid, 0);
00224             if (db_->get(tid, &key, &data, 0) != DB_NOTFOUND) {
00225                 ByteArray a(data.get_data(), data.get_size());
00226                 free(data.get_data());
00227                 Job *j = new Job(a);
00228                 tid->commit(0);
00229                 return j;
00230             } else {
00231                 tid->commit(0);
00232                 throw JobNotFoundException();
00233             }
00234         } catch (DbDeadlockException &e) {
00235             try {
00236                 tid->abort();
00237             } catch (DbException &e) {
00238                 logger_.msg(Arc::ERROR, "operator[]: Cannot abort transaction: %s", e.what()); 
00239             }
00240             return NULL;
00241         } catch (DbException &e) {
00242             try {
00243                 tid->abort();
00244             } catch (DbException &e) {
00245                 logger_.msg(Arc::ERROR, "operator[]: Cannot abort transaction: %s", e.what()); 
00246             }
00247             return NULL;
00248         }
00249     }
00250 }
00251 
00252 void JobQueue::remove(Job &j)
00253 {
00254     remove(j.getID());
00255 }
00256 
00257 void JobQueue::remove(const std::string &id)
00258 {
00259     DbTxn *tid = NULL;
00260     void *buf = (void *)id.c_str();
00261     int size = id.size() + 1;
00262     Dbt key(buf, size);
00263     while (true) { 
00264         try {
00265             env_->txn_begin(NULL, &tid, 0);
00266             db_->del(tid, &key, 0);
00267             tid->commit(0);
00268         } catch (DbDeadlockException &e) {
00269             try {
00270                 tid->abort();
00271             } catch (DbException &e) {
00272                 logger_.msg(Arc::ERROR, "remove: Cannot abort transaction: %s", e.what()); 
00273             }
00274             return;
00275         } catch (DbException &e) {
00276             try {
00277                 tid->abort();
00278             } catch (DbException &e) {
00279                 logger_.msg(Arc::ERROR, "remove: Cannot abort transaction: %s", e.what()); 
00280             }
00281             return;
00282         }
00283     }
00284 }
00285 
00286 JobQueueIterator JobQueue::getAll(void) 
00287 {
00288     Dbc *cursor;
00289     DbTxn *tid = NULL;
00290     try {
00291         env_->txn_begin(NULL, &tid, 0);
00292     } catch (std::exception &e) {
00293         return JobQueueIterator();
00294     }
00295     try {
00296         db_->cursor(tid, &cursor, 0);
00297         return JobQueueIterator(tid, cursor);
00298     } catch (DbException &e) {
00299         tid->abort();
00300         return JobQueueIterator();
00301     }
00302 }
00303 
00304 JobQueueIterator JobQueue::getAll(JobSelector *selector_) 
00305 {
00306     Dbc *cursor;
00307     DbTxn *tid = NULL;
00308     try {
00309         env_->txn_begin(NULL, &tid, 0);
00310     } catch (std::exception &e) {
00311         return JobQueueIterator();
00312     }
00313     try {
00314         db_->cursor(tid, &cursor, 0);
00315         return JobQueueIterator(tid, cursor, selector_);
00316     } catch (DbException &e) {
00317         tid->abort();
00318         return JobQueueIterator();
00319     }
00320 }
00321 
00322 void JobQueue::sync(void)
00323 {
00324     // db_->sync(0);
00325 }
00326 
00327 void JobQueue::checkpoint(void)
00328 {
00329     try {
00330         env_->txn_checkpoint(0, 0, 0);
00331     } catch(DbException &e) {
00332         logger_.msg(Arc::ERROR, "checkpoint: %s", e.what());
00333     }
00334 }
00335 
00336 } // namespace