Back to index

nordugrid-arc-nox  1.1.0~rc6
DataBuffer.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #include <cstdlib>
00008 
00009 #include <arc/data/CheckSum.h>
00010 #include <arc/data/DataBuffer.h>
00011 
00012 namespace Arc {
00013 
00014   bool DataBuffer::set(CheckSum *cksum, unsigned int size, int blocks) {
00015     lock.lock();
00016     if (blocks < 0) {
00017       lock.unlock();
00018       return false;
00019     }
00020     if (bufs != NULL) {
00021       for (int i = 0; i < bufs_n; i++)
00022         if (bufs[i].start)
00023           free(bufs[i].start);
00024       free(bufs);
00025       bufs_n = 0;
00026       bufs = NULL;
00027       set_counter++;
00028       cond.broadcast(); /* make all waiting loops to exit */
00029     }
00030     if ((size == 0) || (blocks == 0)) {
00031       lock.unlock();
00032       return true;
00033     }
00034     bufs = (buf_desc*)malloc(sizeof(buf_desc) * blocks);
00035     if (bufs == NULL) {
00036       lock.unlock();
00037       return false;
00038     }
00039     bufs_n = blocks;
00040     for (int i = 0; i < blocks; i++) {
00041       bufs[i].start = NULL;
00042       bufs[i].taken_for_read = false;
00043       bufs[i].taken_for_write = false;
00044       bufs[i].size = size;
00045       bufs[i].used = 0;
00046       bufs[i].offset = 0;
00047     }
00048     //checksum = cksum;
00049     checksums.clear();
00050     checksums.push_back(checksum_desc(cksum));
00051     if (cksum)
00052       cksum->start();
00053     lock.unlock();
00054     return true;
00055   }
00056 
00057   int DataBuffer::add(CheckSum *cksum) {
00058     if (!cksum)
00059       return -1;
00060     lock.lock();
00061     checksum_desc cs = cksum;
00062     cs.sum->start();
00063     for (int i = 0; i < bufs_n; i++)
00064       if (bufs[i].used != 0)
00065         if (bufs[i].offset == cs.offset) {
00066           cs.sum->add(bufs[i].start, bufs[i].used);
00067           cs.offset += bufs[i].used;
00068           i = -1;
00069           cs.ready = true;
00070         }
00071         else if (cs.offset < bufs[i].offset)
00072           cs.ready = false;
00073     if (eof_read_flag && cs.ready)
00074       cs.sum->end();
00075     checksums.push_back(cs);
00076     int res = checksums.size() - 1;
00077     lock.unlock();
00078     return res;
00079   }
00080 
00081   DataBuffer::DataBuffer(unsigned int size, int blocks) {
00082     bufs_n = 0;
00083     bufs = NULL;
00084     set_counter = 0;
00085     eof_read_flag = false;
00086     eof_write_flag = false;
00087     error_read_flag = false;
00088     error_write_flag = false;
00089     error_transfer_flag = false;
00090     set(NULL, size, blocks);
00091     eof_pos = 0;
00092   }
00093 
00094   DataBuffer::DataBuffer(CheckSum *cksum, unsigned int size,
00095                          int blocks) {
00096     bufs_n = 0;
00097     bufs = NULL;
00098     set_counter = 0;
00099     eof_read_flag = false;
00100     eof_write_flag = false;
00101     error_read_flag = false;
00102     error_write_flag = false;
00103     error_transfer_flag = false;
00104     set(cksum, size, blocks);
00105     eof_pos = 0;
00106   }
00107 
00108   DataBuffer::~DataBuffer() {
00109     set(NULL, 0, 0);
00110   }
00111 
00112   bool DataBuffer::eof_read() {
00113     return eof_read_flag;
00114   }
00115 
00116   bool DataBuffer::eof_write() {
00117     return eof_write_flag;
00118   }
00119 
00120   bool DataBuffer::error_transfer() {
00121     return error_transfer_flag;
00122   }
00123 
00124   bool DataBuffer::error_read() {
00125     return error_read_flag;
00126   }
00127 
00128   bool DataBuffer::error_write() {
00129     return error_write_flag;
00130   }
00131 
00132   void DataBuffer::eof_read(bool eof_) {
00133     lock.lock();
00134     if (eof_)
00135       for (std::list<checksum_desc>::iterator itCheckSum = checksums.begin();
00136            itCheckSum != checksums.end(); itCheckSum++)
00137         if (itCheckSum->sum)
00138           itCheckSum->sum->end();
00139     eof_read_flag = eof_;
00140     cond.broadcast();
00141     lock.unlock();
00142   }
00143 
00144   void DataBuffer::eof_write(bool eof_) {
00145     lock.lock();
00146     eof_write_flag = eof_;
00147     cond.broadcast();
00148     lock.unlock();
00149   }
00150 
00151   bool DataBuffer::error() {
00152     return (error_read_flag || error_write_flag || error_transfer_flag);
00153   }
00154 
00155   void DataBuffer::error_read(bool error_) {
00156     lock.lock();
00157     // error_read_flag=error_;
00158     if (error_) {
00159       if (!(error_write_flag || error_transfer_flag))
00160         error_read_flag = true;
00161       for (std::list<checksum_desc>::iterator itCheckSum = checksums.begin();
00162            itCheckSum != checksums.end(); itCheckSum++)
00163         if (itCheckSum->sum)
00164           itCheckSum->sum->end();
00165       eof_read_flag = true;
00166     }
00167     else
00168       error_read_flag = false;
00169     cond.broadcast();
00170     lock.unlock();
00171   }
00172 
00173   void DataBuffer::error_write(bool error_) {
00174     lock.lock();
00175     // error_write_flag=error_;
00176     if (error_) {
00177       if (!(error_read_flag || error_transfer_flag))
00178         error_write_flag = true;
00179       eof_write_flag = true;
00180     }
00181     else
00182       error_write_flag = false;
00183     cond.broadcast();
00184     lock.unlock();
00185   }
00186 
00187   bool DataBuffer::wait_eof_read() {
00188     lock.lock();
00189     for (;;) {
00190       if (eof_read_flag)
00191         break;
00192       cond.wait(lock);
00193     }
00194     lock.unlock();
00195     return true;
00196   }
00197 
00198   bool DataBuffer::wait_read() {
00199     lock.lock();
00200     for (;;) {
00201       if (eof_read_flag)
00202         break;
00203       if (error_read_flag)
00204         break;
00205       cond.wait(lock);
00206     }
00207     lock.unlock();
00208     return true;
00209   }
00210 
00211   bool DataBuffer::wait_eof_write() {
00212     lock.lock();
00213     for (;;) {
00214       if (eof_write_flag)
00215         break;
00216       cond.wait(lock);
00217     }
00218     lock.unlock();
00219     return true;
00220   }
00221 
00222   bool DataBuffer::wait_write() {
00223     lock.lock();
00224     for (;;) {
00225       if (eof_write_flag)
00226         break;
00227       if (error_write_flag)
00228         break;
00229       cond.wait(lock);
00230     }
00231     lock.unlock();
00232     return true;
00233   }
00234 
00235   bool DataBuffer::wait_eof() {
00236     lock.lock();
00237     for (;;) {
00238       if (eof_read_flag && eof_write_flag)
00239         break;
00240       cond.wait(lock);
00241     }
00242     lock.unlock();
00243     return true;
00244   }
00245 
00246   bool DataBuffer::cond_wait() {
00247     // Wait for any event
00248     int tmp = set_counter;
00249     bool eof_read_flag_tmp = eof_read_flag;
00250     bool eof_write_flag_tmp = eof_write_flag;
00251     // cond.wait(lock);
00252     bool err = false;
00253     for (;;) {
00254       if (!speed.transfer())
00255         if ((!(error_read_flag || error_write_flag)) &&
00256             (!(eof_read_flag && eof_write_flag)))
00257           error_transfer_flag = true;
00258       if (eof_read_flag && eof_write_flag) { // there wil be no more events
00259         lock.unlock();
00260         Glib::Thread::yield();
00261         lock.lock();
00262         return true;
00263       }
00264       if (eof_read_flag_tmp != eof_read_flag)
00265         return true;
00266       if (eof_write_flag_tmp != eof_write_flag)
00267         return true;
00268       if (error())
00269         return false; // useless to wait for - better fail
00270       if (set_counter != tmp)
00271         return false;
00272       if (err)
00273         break; // Some event
00274       int t = 60;
00275       Glib::TimeVal stime;
00276       stime.assign_current_time();
00277       // Using timeout to workaround lost signal
00278       err = cond.timed_wait(lock, stime + t);
00279     }
00280     return true;
00281   }
00282 
00283   bool DataBuffer::for_read() {
00284     if (bufs == NULL)
00285       return false;
00286     lock.lock();
00287     for (int i = 0; i < bufs_n; i++)
00288       if ((!bufs[i].taken_for_read) && (!bufs[i].taken_for_write) &&
00289           (bufs[i].used == 0)) {
00290         lock.unlock();
00291         return true;
00292       }
00293     lock.unlock();
00294     return false;
00295   }
00296 
00297   bool DataBuffer::for_read(int& handle, unsigned int& length, bool wait) {
00298     lock.lock();
00299     if (bufs == NULL) {
00300       lock.unlock();
00301       return false;
00302     }
00303     for (;;) {
00304       if (error()) { /* errors detected/set - any continuation is unusable */
00305         lock.unlock();
00306         return false;
00307       }
00308       for (int i = 0; i < bufs_n; i++)
00309         if ((!bufs[i].taken_for_read) && (!bufs[i].taken_for_write) &&
00310             (bufs[i].used == 0)) {
00311           if (bufs[i].start == NULL) {
00312             bufs[i].start = (char*)malloc(bufs[i].size);
00313             if (bufs[i].start == NULL)
00314               continue;
00315           }
00316           handle = i;
00317           bufs[i].taken_for_read = true;
00318           length = bufs[i].size;
00319           cond.broadcast();
00320           lock.unlock();
00321           return true;
00322         }
00323       /* suitable block not found - wait for changes or quit */
00324       if (eof_write_flag) { /* writing side quited, no need to wait */
00325         lock.unlock();
00326         return false;
00327       }
00328       if (!wait) {
00329         lock.unlock();
00330         return false;
00331       }
00332       if (!cond_wait()) {
00333         lock.unlock();
00334         return false;
00335       }
00336     }
00337     lock.unlock();
00338     return false;
00339   }
00340 
00341   bool DataBuffer::is_read(char *buf, unsigned int length,
00342                            unsigned long long int offset) {
00343     lock.lock();
00344     for (int i = 0; i < bufs_n; i++)
00345       if (bufs[i].start == buf) {
00346         lock.unlock();
00347         return is_read(i, length, offset);
00348       }
00349     lock.unlock();
00350     return false;
00351   }
00352 
00353   bool DataBuffer::is_read(int handle, unsigned int length,
00354                            unsigned long long int offset) {
00355     lock.lock();
00356     if (bufs == NULL) {
00357       lock.unlock();
00358       return false;
00359     }
00360     if (handle >= bufs_n) {
00361       lock.unlock();
00362       return false;
00363     }
00364     if (!bufs[handle].taken_for_read) {
00365       lock.unlock();
00366       return false;
00367     }
00368     if (length > bufs[handle].size) {
00369       lock.unlock();
00370       return false;
00371     }
00372     bufs[handle].taken_for_read = false;
00373     bufs[handle].used = length;
00374     bufs[handle].offset = offset;
00375     if ((offset + length) > eof_pos)
00376       eof_pos = offset + length;
00377     /* checksum on the fly */
00378     for (std::list<checksum_desc>::iterator itCheckSum = checksums.begin();
00379          itCheckSum != checksums.end(); itCheckSum++)
00380       if ((itCheckSum->sum != NULL) && (offset == itCheckSum->offset))
00381         for (int i = handle; i < bufs_n; i++)
00382           if (bufs[i].used != 0)
00383             if (bufs[i].offset == itCheckSum->offset) {
00384               itCheckSum->sum->add(bufs[i].start, bufs[i].used);
00385               itCheckSum->offset += bufs[i].used;
00386               i = -1;
00387               itCheckSum->ready = true;
00388             }
00389             else if (itCheckSum->offset < bufs[i].offset)
00390               itCheckSum->ready = false;
00391     cond.broadcast();
00392     lock.unlock();
00393     return true;
00394   }
00395 
00396   bool DataBuffer::for_write() {
00397     if (bufs == NULL)
00398       return false;
00399     lock.lock();
00400     for (int i = 0; i < bufs_n; i++)
00401       if ((!bufs[i].taken_for_read) && (!bufs[i].taken_for_write) &&
00402           (bufs[i].used != 0)) {
00403         lock.unlock();
00404         return true;
00405       }
00406     lock.unlock();
00407     return false;
00408   }
00409 
00410   /* return true + buffer with data,
00411      return false in case of failure, or eof + no buffers claimed for read */
00412   bool DataBuffer::for_write(int& handle, unsigned int& length,
00413                              unsigned long long int& offset, bool wait) {
00414     lock.lock();
00415     if (bufs == NULL) {
00416       lock.unlock();
00417       return false;
00418     }
00419     for (;;) {
00420       if (error()) { /* internal/external errors - no need to continue */
00421         lock.unlock();
00422         return false;
00423       }
00424       bool have_for_read = false;
00425       bool have_unused = false;
00426       unsigned long long int min_offset = (unsigned long long int)(-1);
00427       handle = -1;
00428       for (int i = 0; i < bufs_n; i++) {
00429         if (bufs[i].taken_for_read)
00430           have_for_read = true;
00431         if ((!bufs[i].taken_for_read) && (!bufs[i].taken_for_write) &&
00432             (bufs[i].used != 0))
00433           if (bufs[i].offset < min_offset) {
00434             min_offset = bufs[i].offset;
00435             handle = i;
00436           }
00437         if (bufs[i].taken_for_read || (bufs[i].used == 0))
00438           have_unused = true;
00439       }
00440       if (handle != -1) {
00441         bool keep_buffers = false;
00442         for (std::list<checksum_desc>::iterator itCheckSum = checksums.begin();
00443              itCheckSum != checksums.end(); itCheckSum++)
00444           if ((!itCheckSum->ready) && (bufs[handle].offset >= itCheckSum->offset)) {
00445             keep_buffers = true;
00446             break;
00447           }
00448 
00449         if (keep_buffers)
00450           /* try to keep buffers as long as possible for checksuming */
00451           if (have_unused && (!eof_read_flag)) {
00452             /* still have chances to get that block */
00453             if (!wait) {
00454               lock.unlock();
00455               return false;
00456             }
00457             if (!cond_wait()) {
00458               lock.unlock();
00459               return false;
00460             }
00461             continue;
00462           }
00463 
00464         bufs[handle].taken_for_write = true;
00465         length = bufs[handle].used;
00466         offset = bufs[handle].offset;
00467         cond.broadcast();
00468         lock.unlock();
00469         return true;
00470       }
00471       if (eof_read_flag && (!have_for_read)) {
00472         lock.unlock();
00473         return false;
00474       }
00475       /* suitable block not found - wait for changes or quit */
00476       if (!wait) {
00477         lock.unlock();
00478         return false;
00479       }
00480       if (!cond_wait()) {
00481         lock.unlock();
00482         return false;
00483       }
00484     }
00485     lock.unlock();
00486     return false;
00487   }
00488 
00489   bool DataBuffer::is_written(char *buf) {
00490     lock.lock();
00491     for (int i = 0; i < bufs_n; i++)
00492       if (bufs[i].start == buf) {
00493         lock.unlock();
00494         return is_written(i);
00495       }
00496     lock.unlock();
00497     return false;
00498   }
00499 
00500   bool DataBuffer::is_notwritten(char *buf) {
00501     lock.lock();
00502     for (int i = 0; i < bufs_n; i++)
00503       if (bufs[i].start == buf) {
00504         lock.unlock();
00505         return is_notwritten(i);
00506       }
00507     lock.unlock();
00508     return false;
00509   }
00510 
00511   bool DataBuffer::is_written(int handle) {
00512     lock.lock();
00513     if (bufs == NULL) {
00514       lock.unlock();
00515       return false;
00516     }
00517     if (handle >= bufs_n) {
00518       lock.unlock();
00519       return false;
00520     }
00521     if (!bufs[handle].taken_for_write) {
00522       lock.unlock();
00523       return false;
00524     }
00525     /* speed control */
00526     if (!speed.transfer(bufs[handle].used))
00527       if ((!(error_read_flag || error_write_flag)) &&
00528           (!(eof_read_flag && eof_write_flag)))
00529         error_transfer_flag = true;
00530     bufs[handle].taken_for_write = false;
00531     bufs[handle].used = 0;
00532     bufs[handle].offset = 0;
00533     cond.broadcast();
00534     lock.unlock();
00535     return true;
00536   }
00537 
00538   bool DataBuffer::is_notwritten(int handle) {
00539     lock.lock();
00540     if (bufs == NULL) {
00541       lock.unlock();
00542       return false;
00543     }
00544     if (handle >= bufs_n) {
00545       lock.unlock();
00546       return false;
00547     }
00548     if (!bufs[handle].taken_for_write) {
00549       lock.unlock();
00550       return false;
00551     }
00552     bufs[handle].taken_for_write = false;
00553     cond.broadcast();
00554     lock.unlock();
00555     return true;
00556   }
00557 
00558   char* DataBuffer::operator[](int block) {
00559     lock.lock();
00560     if ((block < 0) || (block >= bufs_n)) {
00561       lock.unlock();
00562       return NULL;
00563     }
00564     char *tmp = bufs[block].start;
00565     lock.unlock();
00566     return tmp;
00567   }
00568 
00569   bool DataBuffer::wait_any() {
00570     lock.lock();
00571     bool res = cond_wait();
00572     lock.unlock();
00573     return res;
00574   }
00575 
00576   bool DataBuffer::wait_used() {
00577     lock.lock();
00578     for (int i = 0; i < bufs_n; i++)
00579       if ((bufs[i].taken_for_read) || (bufs[i].taken_for_write) ||
00580           (bufs[i].used != 0)) {
00581         if (!cond_wait()) {
00582           lock.unlock();
00583           return false;
00584         }
00585         i = -1;
00586       }
00587     lock.unlock();
00588     return true;
00589   }
00590 
00591   bool DataBuffer::checksum_valid() const {
00592     if (checksums.size() != 0)
00593       return checksums.begin()->ready;
00594     else
00595       return false;
00596   }
00597 
00598   bool DataBuffer::checksum_valid(int index) const {
00599     if (index < 0)
00600       return false;
00601     int i = 0;
00602     for (std::list<checksum_desc>::const_iterator itCheckSum = checksums.begin();
00603          itCheckSum != checksums.end(); itCheckSum++) {
00604       if (index == i)
00605         return itCheckSum->ready;
00606       i++;
00607     }
00608 
00609     return false;
00610   }
00611 
00612   const CheckSum* DataBuffer::checksum_object() const {
00613     if (checksums.size() != 0)
00614       return checksums.begin()->sum;
00615     else
00616       return NULL;
00617   }
00618 
00619   const CheckSum* DataBuffer::checksum_object(int index) const {
00620     if (index < 0)
00621       return NULL;
00622     int i = 0;
00623     for (std::list<checksum_desc>::const_iterator itCheckSum = checksums.begin();
00624          itCheckSum != checksums.end(); itCheckSum++) {
00625       if (index == i)
00626         return itCheckSum->sum;
00627       i++;
00628     }
00629 
00630     return NULL;
00631   }
00632 
00633   unsigned int DataBuffer::buffer_size() const {
00634     if (bufs == NULL)
00635       return 65536;
00636     unsigned int size = 0;
00637     for (int i = 0; i < bufs_n; i++)
00638       if (size < bufs[i].size)
00639         size = bufs[i].size;
00640     return size;
00641   }
00642 
00643 } // namespace Arc