Back to index

nordugrid-arc-nox  1.1.0~rc6
DataPointHTTP.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #define __STDC_LIMIT_MACROS
00008 #ifdef HAVE_STDINT_H
00009 #include <stdint.h>
00010 #endif
00011 #include <unistd.h>
00012 
00013 #include <arc/Logger.h>
00014 #include <arc/StringConv.h>
00015 #include <arc/UserConfig.h>
00016 #include <arc/data/DataBuffer.h>
00017 #include <arc/message/MCC.h>
00018 #include <arc/message/PayloadRaw.h>
00019 #include <arc/client/ClientInterface.h>
00020 
00021 #ifdef WIN32
00022 #include <arc/win32.h>
00023 #endif
00024 
00025 #include "DataPointHTTP.h"
00026 
00027 namespace Arc {
00028 
00029   Logger DataPointHTTP::logger(DataPoint::logger, "HTTP");
00030 
00031   typedef struct {
00032     DataPointHTTP *point;
00033     ClientHTTP *client;
00034   } HTTPInfo_t;
00035 
00036   class ChunkControl {
00037   private:
00038     typedef struct {
00039       uint64_t start;
00040       uint64_t end;
00041     } chunk_t;
00042     std::list<chunk_t> chunks_;
00043     Glib::Mutex lock_;
00044   public:
00045     ChunkControl(uint64_t size = UINT64_MAX);
00046     ~ChunkControl();
00047     // Get chunk to be transfered. On input 'length'
00048     // contains maximal acceptable chunk size.
00049     bool Get(uint64_t& start, uint64_t& length);
00050     // Report chunk transfered. It may be _different_
00051     // from one obtained through Get().
00052     void Claim(uint64_t start, uint64_t length);
00053     // Report chunk not transfered. It must be
00054     // _same_ as one obtained by Get().
00055     void Unclaim(uint64_t start, uint64_t length);
00056   };
00057 
00058   class PayloadMemConst
00059     : public PayloadRawInterface {
00060   private:
00061     char *buffer_;
00062     uint64_t begin_;
00063     uint64_t end_;
00064     uint64_t size_;
00065   public:
00066     PayloadMemConst(void *buffer,
00067                     uint64_t offset,
00068                     unsigned int length,
00069                     uint64_t size = 0)
00070       : buffer_((char*)buffer),
00071         begin_(offset),
00072         end_(offset + length),
00073         size_(size) {}
00074     virtual ~PayloadMemConst() {}
00075     virtual char operator[](Size_t pos) const {
00076       if (!buffer_)
00077         return 0;
00078       if (pos < begin_)
00079         return 0;
00080       if (pos >= end_)
00081         return 0;
00082       return buffer_[pos - begin_];
00083     }
00084     virtual char* Content(Size_t pos = -1) {
00085       if (!buffer_)
00086         return NULL;
00087       if (pos < begin_)
00088         return NULL;
00089       if (pos >= end_)
00090         return NULL;
00091       return
00092         buffer_ + (pos - begin_);
00093     }
00094     virtual Size_t Size() const {
00095       return size_;
00096     }
00097     virtual char* Insert(Size_t /* pos */ = 0, Size_t /* size */ = 0) {
00098       return NULL;
00099     }
00100     virtual char* Insert(const char* /* s */,
00101                          Size_t /* pos */ = 0, Size_t /* size */ = 0) {
00102       return NULL;
00103     }
00104     virtual char* Buffer(unsigned int num) {
00105       if (num != 0)
00106         return NULL;
00107       return buffer_;
00108     }
00109     virtual Size_t BufferSize(unsigned int num) const {
00110       if (!buffer_)
00111         return 0;
00112       if (num != 0)
00113         return 0;
00114       return
00115         end_ - begin_;
00116     }
00117     virtual Size_t BufferPos(unsigned int num) const {
00118       if (!buffer_)
00119         return 0;
00120       if (num != 0)
00121         return 0;
00122       return begin_;
00123     }
00124     virtual bool Truncate(Size_t /* size */) {
00125       return false;
00126     }
00127   };
00128 
00129   ChunkControl::ChunkControl(uint64_t /* size */) {
00130     chunk_t chunk = {
00131       0, UINT64_MAX
00132     };
00133     chunks_.push_back(chunk);
00134   }
00135 
00136   ChunkControl::~ChunkControl() {}
00137 
00138   bool ChunkControl::Get(uint64_t& start, uint64_t& length) {
00139     if (length == 0)
00140       return false;
00141     lock_.lock();
00142     std::list<chunk_t>::iterator c = chunks_.begin();
00143     if (c == chunks_.end()) {
00144       lock_.unlock();
00145       return false;
00146     }
00147     start = c->start;
00148     uint64_t l = (c->end) - (c->start);
00149     if (l <= length) {
00150       length = l;
00151       chunks_.erase(c);
00152     }
00153     else
00154       c->start += length;
00155     lock_.unlock();
00156     return true;
00157   }
00158 
00159   void ChunkControl::Claim(uint64_t start, uint64_t length) {
00160     if (length == 0)
00161       return;
00162     uint64_t end = start + length;
00163     lock_.lock();
00164     for (std::list<chunk_t>::iterator c = chunks_.begin();
00165          c != chunks_.end();) {
00166       if (end <= c->start)
00167         break;
00168       if ((start <= c->start) && (end >= c->end)) {
00169         start = c->end;
00170         length = end - start;
00171         c = chunks_.erase(c);
00172         if (length > 0)
00173           continue;
00174         break;
00175       }
00176       if ((start > c->start) && (end < c->end)) {
00177         chunk_t chunk;
00178         chunk.start = c->start;
00179         chunk.end = start;
00180         c->start = end;
00181         chunks_.insert(c, chunk);
00182         break;
00183       }
00184       if ((start <= c->start) && (end < c->end) && (end > c->start)) {
00185         c->start = end;
00186         break;
00187       }
00188       if ((start > c->start) && (start < c->end) && (end >= c->end)) {
00189         uint64_t start_ = c->end;
00190         c->end = start;
00191         start = start_;
00192         length = end - start;
00193         ++c;
00194         if (length > 0)
00195           continue;
00196         break;
00197       }
00198       ++c;
00199     }
00200     lock_.unlock();
00201   }
00202 
00203   void ChunkControl::Unclaim(uint64_t start, uint64_t length) {
00204     if (length == 0)
00205       return;
00206     uint64_t end = start + length;
00207     lock_.lock();
00208     for (std::list<chunk_t>::iterator c = chunks_.begin();
00209          c != chunks_.end(); ++c) {
00210       if ((end >= c->start) && (end <= c->end)) {
00211         if (start < c->start)
00212           c->start = start;
00213         lock_.unlock();
00214         return;
00215       }
00216       if ((start <= c->end) && (start >= c->start)) {
00217         if (end > c->end) {
00218           c->end = end;
00219           std::list<chunk_t>::iterator c_ = c;
00220           ++c_;
00221           while (c_ != chunks_.end())
00222             if (c->end >= c_->start) {
00223               if (c_->end >= c->end) {
00224                 c->end = c_->end;
00225                 break;
00226               }
00227               c_ = chunks_.erase(c_);
00228             }
00229             else
00230               break;
00231         }
00232         lock_.unlock();
00233         return;
00234       }
00235       if ((start <= c->start) && (end >= c->end)) {
00236         c->start = start;
00237         if (end > c->end) {
00238           c->end = end;
00239           std::list<chunk_t>::iterator c_ = c;
00240           ++c_;
00241           while (c_ != chunks_.end())
00242             if (c->end >= c_->start) {
00243               if (c_->end >= c->end) {
00244                 c->end = c_->end;
00245                 break;
00246               }
00247               c_ = chunks_.erase(c_);
00248             }
00249             else
00250               break;
00251         }
00252         lock_.unlock();
00253         return;
00254       }
00255       if (end < c->start) {
00256         chunk_t chunk = {
00257           start, end
00258         };
00259         chunks_.insert(c, chunk);
00260         lock_.unlock();
00261         return;
00262       }
00263     }
00264     lock_.unlock();
00265   }
00266 
00267   DataPointHTTP::DataPointHTTP(const URL& url, const UserConfig& usercfg)
00268     : DataPointDirect(url, usercfg),
00269       chunks(NULL),
00270       transfers_started(0),
00271       transfers_finished(0) {
00272     valid_url_options.push_back("tcpnodelay");
00273   }
00274 
00275   DataPointHTTP::~DataPointHTTP() {
00276     StopReading();
00277     StopWriting();
00278     if (chunks)
00279       delete chunks;
00280   }
00281 
00282   Plugin* DataPointHTTP::Instance(PluginArgument *arg) {
00283     DataPointPluginArgument *dmcarg = dynamic_cast<DataPointPluginArgument*>(arg);
00284     if (!dmcarg)
00285       return NULL;
00286     if (((const URL &)(*dmcarg)).Protocol() != "http" &&
00287         ((const URL &)(*dmcarg)).Protocol() != "https" &&
00288         ((const URL &)(*dmcarg)).Protocol() != "httpg")
00289       return NULL;
00290     return new DataPointHTTP(*dmcarg, *dmcarg);
00291   }
00292 
00293   static bool html2list(const char *html, const URL& base,
00294                         std::list<FileInfo>& files) {
00295     for (const char *pos = html;;) {
00296       // Looking for tag
00297       const char *tag_start = strchr(pos, '<');
00298       if (!tag_start)
00299         break;              // No more tags
00300       // Looking for end of tag
00301       const char *tag_end = strchr(tag_start + 1, '>');
00302       if (!tag_end)
00303         return false;            // Broken html?
00304       // 'A' tag?
00305       if (strncasecmp(tag_start, "<A ", 3) == 0) {
00306         // Lookig for HREF
00307         const char *href = strstr(tag_start + 3, "href=");
00308         if (!href)
00309           href = strstr(tag_start + 3, "HREF=");
00310         if (href) {
00311           const char *url_start = href + 5;
00312           const char *url_end = NULL;
00313           if ((*url_start) == '"') {
00314             ++url_start;
00315             url_end = strchr(url_start, '"');
00316             if ((!url_end) || (url_end > tag_end))
00317               url_end = NULL;
00318           }
00319           else if ((*url_start) == '\'') {
00320             ++url_start;
00321             url_end = strchr(url_start, '\'');
00322             if ((!url_end) || (url_end > tag_end))
00323               url_end = NULL;
00324           }
00325           else {
00326             url_end = strchr(url_start, ' ');
00327             if ((!url_end) || (url_end > tag_end))
00328               url_end = tag_end;
00329           }
00330           if (!url_end)
00331             return false; // Broken HTML
00332           std::string url(url_start, url_end - url_start);
00333           if (url.find("://") != std::string::npos) {
00334             URL u(url);
00335             std::string b = base.str();
00336             if (b[b.size() - 1] != '/')
00337               b += '/';
00338             if (u.str().substr(0, b.size()) == b)
00339               url = u.str().substr(b.size());
00340           }
00341           if (url[0] != '?' && url[0] != '/')
00342             if (url.find('/') == url.size() - 1) {
00343               std::list<FileInfo>::iterator f = files.insert(files.end(), url);
00344               f->SetType(FileInfo::file_type_dir);
00345             }
00346             else if (url.find('/') == std::string::npos) {
00347               std::list<FileInfo>::iterator f = files.insert(files.end(), url);
00348               f->SetType(FileInfo::file_type_file);
00349             }
00350         }
00351       }
00352       pos = tag_end + 1;
00353     }
00354     return true;
00355   }
00356 
00357   DataStatus DataPointHTTP::ListFiles(std::list<FileInfo>& files,
00358                                       bool long_list, bool resolve,
00359                                       bool metadata) {
00360     MCCConfig cfg;
00361     usercfg.ApplyToConfig(cfg);
00362     ClientHTTP client(cfg, url, usercfg.Timeout());
00363 
00364     PayloadRaw request;
00365     PayloadRawInterface *response = NULL;
00366     HTTPClientInfo info;
00367     MCC_Status status = client.process("HEAD", &request, &info, &response);
00368     if (!response)
00369       return DataStatus::ListError;
00370     delete response;
00371     if (!status)
00372       return DataStatus::ListError;
00373 
00374     std::string type = info.type;
00375     std::string::size_type pos = type.find(';');
00376     if (pos != std::string::npos)
00377       type = type.substr(0, pos);
00378 
00379     if (type.empty() || (strcasecmp(type.c_str(), "text/html") == 0)) {
00380 
00381       DataBuffer buffer;
00382 
00383       if (!StartReading(buffer))
00384         return DataStatus::ListError;
00385 
00386       int handle;
00387       unsigned int length;
00388       unsigned long long int offset;
00389       std::string result;
00390 
00391       while (buffer.for_write() || !buffer.eof_read())
00392         if (buffer.for_write(handle, length, offset, true)) {
00393           result.append(buffer[handle], length);
00394           buffer.is_written(handle);
00395         }
00396 
00397       if (!StopReading())
00398         return DataStatus::ListError;
00399 
00400       bool is_html = false;
00401       bool is_body = false;
00402       std::string::size_type tagstart = 0;
00403       std::string::size_type tagend = 0;
00404       std::string::size_type titlestart = std::string::npos;
00405       std::string::size_type titleend = std::string::npos;
00406       do {
00407         tagstart = result.find('<', tagend);
00408         if (tagstart == std::string::npos)
00409           break;
00410         tagend = result.find('>', tagstart);
00411         if (tagend == std::string::npos)
00412           break;
00413         std::string tag = result.substr(tagstart + 1, tagend - tagstart - 1);
00414         if (strcasecmp(tag.c_str(), "title") == 0)
00415           titlestart = tagend + 1;
00416         else if (strcasecmp(tag.c_str(), "/title") == 0)
00417           titleend = tagstart - 1;
00418         else if (strcasecmp(tag.c_str(), "html") == 0)
00419           is_html = true;
00420         else if (strcasecmp(tag.c_str(), "body") == 0)
00421           is_body = is_html;
00422       } while (!is_body);
00423 
00424       std::string title;
00425       if (titlestart != std::string::npos && titleend != std::string::npos)
00426         title = result.substr(titlestart, titleend - titlestart + 1);
00427 
00428       // should maybe find a better way to do this...
00429       //if (title.substr(0, 10) == "Index of /" ||
00430       //    title.substr(0, 5) == "ARex:") {
00431       // Treat every html as potential directory/set of links
00432       if (is_body) {
00433         if (metadata) {
00434           std::list<FileInfo>::iterator f = files.insert(files.end(),
00435                                                          url.FullPath());
00436           f->SetMetaData("path", url.FullPath());
00437           f->SetType(FileInfo::file_type_dir);
00438           f->SetMetaData("type", "dir");
00439           f->SetSize(info.size);
00440           f->SetMetaData("size", tostring(info.size));
00441           f->SetCreated(info.lastModified);
00442           f->SetMetaData("mtime", info.lastModified.str());
00443         }
00444         else
00445           html2list(result.c_str(), url, files);
00446       }
00447       else {
00448         std::list<FileInfo>::iterator f = files.insert(files.end(),
00449                                                        url.FullPath());
00450         f->SetMetaData("path", url.FullPath());
00451         f->SetType(FileInfo::file_type_file);
00452         f->SetMetaData("type", "file");
00453         f->SetSize(info.size);
00454         f->SetMetaData("size", tostring(info.size));
00455         f->SetCreated(info.lastModified);
00456         f->SetMetaData("mtime", info.lastModified.str());
00457       }
00458     }
00459     else {
00460       std::list<FileInfo>::iterator f = files.insert(files.end(),
00461                                                      url.FullPath());
00462       f->SetMetaData("path", url.FullPath());
00463       f->SetType(FileInfo::file_type_file);
00464       f->SetMetaData("type", "file");
00465       f->SetSize(info.size);
00466       f->SetMetaData("size", tostring(info.size));
00467       f->SetCreated(info.lastModified);
00468       f->SetMetaData("mtime", info.lastModified.str());
00469     }
00470 
00471     return DataStatus::Success;
00472   }
00473 
00474   DataStatus DataPointHTTP::StartReading(DataBuffer& buffer) {
00475     if (transfers_started != 0)
00476       return DataStatus::ReadStartError;
00477     int transfer_streams = 1;
00478     int started = 0;
00479     DataPointHTTP::buffer = &buffer;
00480     if (chunks)
00481       delete chunks;
00482     chunks = new ChunkControl;
00483     MCCConfig cfg;
00484     usercfg.ApplyToConfig(cfg);
00485     for (int n = 0; n < transfer_streams; ++n) {
00486       HTTPInfo_t *info = new HTTPInfo_t;
00487       info->point = this;
00488       info->client = new ClientHTTP(cfg, url, usercfg.Timeout());
00489       if (!CreateThreadFunction(&read_thread, info))
00490         delete info;
00491       else
00492         ++started;
00493     }
00494     if (!started) {
00495       StopReading();
00496       return DataStatus::ReadStartError;
00497     }
00498     transfer_lock.lock();
00499     while (transfers_started < started) {
00500       transfer_lock.unlock();
00501       sleep(1);
00502       transfer_lock.lock();
00503     }
00504     transfer_lock.unlock();
00505     return DataStatus::Success;
00506   }
00507 
00508   DataStatus DataPointHTTP::StopReading() {
00509     if (!buffer)
00510       return DataStatus::ReadStopError;
00511     transfer_lock.lock();
00512     if (transfers_finished < transfers_started) {
00513       buffer->error_read(true);
00514       while (transfers_finished < transfers_started) {
00515         transfer_lock.unlock();
00516         sleep(1);
00517         transfer_lock.lock();
00518       }
00519     }
00520     transfer_lock.unlock();
00521     if (chunks)
00522       delete chunks;
00523     chunks = NULL;
00524     transfers_finished = 0;
00525     transfers_started = 0;
00526     if (buffer->error_read()) {
00527       buffer = NULL;
00528       return DataStatus::ReadError;
00529     }
00530     buffer = NULL;
00531     return DataStatus::Success;
00532   }
00533 
00534   DataStatus DataPointHTTP::StartWriting(DataBuffer& buffer,
00535                                          DataCallback*) {
00536     if (transfers_started != 0)
00537       return DataStatus::WriteStartError;
00538     int transfer_streams = 1;
00539     int started = 0;
00540     DataPointHTTP::buffer = &buffer;
00541     if (chunks)
00542       delete chunks;
00543     chunks = new ChunkControl;
00544     MCCConfig cfg;
00545     usercfg.ApplyToConfig(cfg);
00546     for (int n = 0; n < transfer_streams; ++n) {
00547       HTTPInfo_t *info = new HTTPInfo_t;
00548       info->point = this;
00549       info->client = new ClientHTTP(cfg, url, usercfg.Timeout());
00550       if (!CreateThreadFunction(&write_thread, info))
00551         delete info;
00552       else
00553         ++started;
00554     }
00555     if (!started) {
00556       StopWriting();
00557       return DataStatus::WriteStartError;
00558     }
00559     transfer_lock.lock();
00560     while (transfers_started < started) {
00561       transfer_lock.unlock();
00562       sleep(1);
00563       transfer_lock.lock();
00564     }
00565     transfer_lock.unlock();
00566     return DataStatus::Success;
00567   }
00568 
00569   DataStatus DataPointHTTP::StopWriting() {
00570     if (!buffer)
00571       return DataStatus::WriteStopError;
00572     transfer_lock.lock();
00573     if (transfers_finished < transfers_started) {
00574       buffer->error_write(true);
00575       while (transfers_finished < transfers_started) {
00576         transfer_lock.unlock();
00577         sleep(1);
00578         transfer_lock.lock();
00579       }
00580     }
00581     transfer_lock.unlock();
00582     if (chunks)
00583       delete chunks;
00584     chunks = NULL;
00585     transfers_finished = 0;
00586     transfers_started = 0;
00587     if (buffer->error_write()) {
00588       buffer = NULL;
00589       return DataStatus::WriteError;
00590     }
00591     buffer = NULL;
00592     return DataStatus::Success;
00593   }
00594 
00595   DataStatus DataPointHTTP::Check() {
00596     MCCConfig cfg;
00597     usercfg.ApplyToConfig(cfg);
00598     ClientHTTP client(cfg, url, usercfg.Timeout());
00599     PayloadRaw request;
00600     PayloadRawInterface *inbuf;
00601     HTTPClientInfo transfer_info;
00602     // Do HEAD to obtain some metadata
00603     MCC_Status r = client.process("HEAD", &request, &transfer_info, &inbuf);
00604     if (inbuf)
00605       delete inbuf;
00606     // Fail only if protocol involves authentication
00607     if (((!r) || (transfer_info.code != 200)) &&
00608         (url.Protocol() != "http"))
00609       return DataStatus::CheckError;
00610     created = transfer_info.lastModified;
00611     return DataStatus::Success;
00612   }
00613 
00614   DataStatus DataPointHTTP::Remove() {
00615     return DataStatus::DeleteError;
00616   }
00617 
00618   void DataPointHTTP::read_thread(void *arg) {
00619     HTTPInfo_t& info = *((HTTPInfo_t*)arg);
00620     DataPointHTTP& point = *(info.point);
00621     ClientHTTP *client = info.client;
00622     bool transfer_failure = false;
00623     int retries = 0;
00624     point.transfer_lock.lock();
00625     ++(point.transfers_started);
00626     point.transfer_lock.unlock();
00627     for (;;) {
00628       unsigned int transfer_size = 0;
00629       int transfer_handle = -1;
00630       // get first buffer
00631       if (!point.buffer->for_read(transfer_handle, transfer_size, true))
00632         // No transfer buffer - must be failure or close initiated externally
00633         break;
00634       uint64_t transfer_offset = 0;
00635       uint64_t chunk_length = transfer_size;
00636       if (!(point.chunks->Get(transfer_offset, chunk_length)))
00637         // No more chunks to transfer - quit this thread.
00638         break;
00639       uint64_t transfer_end = transfer_offset + chunk_length - 1;
00640       // Read chunk
00641       HTTPClientInfo transfer_info;
00642       PayloadRaw request;
00643       PayloadRawInterface *inbuf;
00644       std::string path = point.CurrentLocation().FullPath();
00645       MCC_Status r = client->process("GET", path, transfer_offset,
00646                                      transfer_end, &request, &transfer_info,
00647                                      &inbuf);
00648       if (!r) {
00649         // Failed to transfer chunk - retry.
00650         // 10 times in a row seems to be reasonable number
00651         // TODO: mark failure?
00652         // TODO: report failure.
00653         if ((++retries) > 10) {
00654           transfer_failure = true;
00655           break;
00656         }
00657         // Return buffer
00658         point.buffer->is_read(transfer_handle, 0, 0);
00659         point.chunks->Unclaim(transfer_offset, chunk_length);
00660         if (inbuf)
00661           delete inbuf;
00662         // Recreate connection
00663         delete client;
00664         client = NULL;
00665         MCCConfig cfg;
00666         point.usercfg.ApplyToConfig(cfg);
00667         client = new ClientHTTP(cfg, point.url, point.usercfg.Timeout());
00668         continue;
00669       }
00670       if (transfer_info.code == 416) { // EOF
00671         point.buffer->is_read(transfer_handle, 0, 0);
00672         point.chunks->Unclaim(transfer_offset, chunk_length);
00673         if (inbuf)
00674           delete inbuf;
00675         // TODO: report file size to chunk control
00676         break;
00677       }
00678       if ((transfer_info.code != 200) &&
00679           (transfer_info.code != 206)) { // HTTP error - retry?
00680         point.buffer->is_read(transfer_handle, 0, 0);
00681         point.chunks->Unclaim(transfer_offset, chunk_length);
00682         if (inbuf)
00683           delete inbuf;
00684         if ((transfer_info.code == 500) ||
00685             (transfer_info.code == 503) ||
00686             (transfer_info.code == 504))
00687           if ((++retries) <= 10)
00688             continue;
00689         transfer_failure = true;
00690         break;
00691       }
00692       // pick up usefull information from HTTP header
00693       point.created = transfer_info.lastModified;
00694       retries = 0;
00695       bool whole = (inbuf && ((transfer_info.size == inbuf->Size()) &&
00696                               (inbuf->BufferPos(0) == 0)) ||
00697                     (inbuf->Size() == -1));
00698       // Temporary solution - copy data between buffers
00699       point.transfer_lock.lock();
00700       point.chunks->Unclaim(transfer_offset, chunk_length);
00701       uint64_t transfer_pos = 0;
00702       for (unsigned int n = 0;; ++n) {
00703         if (!inbuf)
00704           break;
00705         char *buf = inbuf->Buffer(n);
00706         if (!buf)
00707           break;
00708         uint64_t pos = inbuf->BufferPos(n);
00709         unsigned int length = inbuf->BufferSize(n);
00710         transfer_pos = inbuf->BufferPos(n) + inbuf->BufferSize(n);
00711         // In general case returned chunk may be of different size than
00712         // requested
00713         for (; length;) {
00714           if (transfer_handle == -1) {
00715             // Get transfer buffer if needed
00716             transfer_size = 0;
00717             point.transfer_lock.unlock();
00718             if (!point.buffer->for_read(transfer_handle, transfer_size,
00719                                         true)) {
00720               // No transfer buffer - must be failure or close initiated
00721               // externally
00722               point.transfer_lock.lock();
00723               break;
00724             }
00725             point.transfer_lock.lock();
00726           }
00727           unsigned int l = length;
00728           if (l > transfer_size)
00729             l = transfer_size;
00730           char *buf_ = (*point.buffer)[transfer_handle];
00731           memcpy(buf_, buf, l);
00732           point.buffer->is_read(transfer_handle, l, pos);
00733           point.chunks->Claim(pos, l);
00734           length -= l;
00735           pos += l;
00736           buf += l;
00737           transfer_handle = -1;
00738         }
00739       }
00740       if (transfer_handle != -1)
00741         point.buffer->is_read(transfer_handle, 0, 0);
00742       if (inbuf)
00743         delete inbuf;
00744       // If server returned chunk which is not overlaping requested one - seems
00745       // like server has nothing to say any more.
00746       if (transfer_pos <= transfer_offset)
00747         whole = true;
00748       point.transfer_lock.unlock();
00749       if (whole)
00750         break;
00751     }
00752     point.transfer_lock.lock();
00753     ++(point.transfers_finished);
00754     if (transfer_failure)
00755       point.buffer->error_read(true);
00756     if (point.transfers_finished == point.transfers_started)
00757       // TODO: process/report failure?
00758       point.buffer->eof_read(true);
00759     if (client)
00760       delete client;
00761     delete &info;
00762     point.transfer_lock.unlock();
00763   }
00764 
00765   void DataPointHTTP::write_thread(void *arg) {
00766     HTTPInfo_t& info = *((HTTPInfo_t*)arg);
00767     DataPointHTTP& point = *(info.point);
00768     ClientHTTP *client = info.client;
00769     bool transfer_failure = false;
00770     int retries = 0;
00771     point.transfer_lock.lock();
00772     ++(point.transfers_started);
00773     point.transfer_lock.unlock();
00774     for (;;) {
00775       unsigned int transfer_size = 0;
00776       int transfer_handle = -1;
00777       unsigned long long int transfer_offset = 0;
00778       // get first buffer
00779       if (!point.buffer->for_write(transfer_handle, transfer_size,
00780                                    transfer_offset, true))
00781         // No transfer buffer - must be failure or close initiated externally
00782         break;
00783       //uint64_t transfer_offset = 0;
00784       //uint64_t transfer_end = transfer_offset+transfer_size;
00785       // Write chunk
00786       HTTPClientInfo transfer_info;
00787       PayloadMemConst request((*point.buffer)[transfer_handle],
00788                               transfer_offset, transfer_size,
00789                               point.CheckSize() ? point.GetSize() : 0);
00790       PayloadRawInterface *response;
00791       std::string path = point.CurrentLocation().FullPath();
00792       MCC_Status r = client->process("PUT", path, &request, &transfer_info,
00793                                      &response);
00794       if (response)
00795         delete response;
00796       if (!r) {
00797         // Failed to transfer chunk - retry.
00798         // 10 times in a row seems to be reasonable number
00799         // TODO: mark failure?
00800         // TODO: report failure.
00801         if ((++retries) > 10) {
00802           transfer_failure = true;
00803           break;
00804         }
00805         // Return buffer
00806         point.buffer->is_notwritten(transfer_handle);
00807         // Recreate connection
00808         delete client;
00809         client = NULL;
00810         MCCConfig cfg;
00811         point.usercfg.ApplyToConfig(cfg);
00812         client = new ClientHTTP(cfg, point.url, point.usercfg.Timeout());
00813         continue;
00814       }
00815       if ((transfer_info.code != 201) &&
00816           (transfer_info.code != 200) &&
00817           (transfer_info.code != 204)) {  // HTTP error - retry?
00818         point.buffer->is_notwritten(transfer_handle);
00819         if ((transfer_info.code == 500) ||
00820             (transfer_info.code == 503) ||
00821             (transfer_info.code == 504))
00822           if ((++retries) <= 10)
00823             continue;
00824         transfer_failure = true;
00825         break;
00826       }
00827       retries = 0;
00828       point.buffer->is_written(transfer_handle);
00829     }
00830     point.transfer_lock.lock();
00831     ++(point.transfers_finished);
00832     if (transfer_failure)
00833       point.buffer->error_write(true);
00834     if (point.transfers_finished == point.transfers_started) {
00835       // TODO: process/report failure?
00836       point.buffer->eof_write(true);
00837       if ((!(point.buffer->error())) && (point.buffer->eof_position() == 0))
00838         // Zero size data was trasfered - must send at least one empty packet
00839         for (;;) {
00840           HTTPClientInfo transfer_info;
00841           PayloadMemConst request(NULL, 0, 0, 0);
00842           PayloadRawInterface *response;
00843           std::string path = point.CurrentLocation().FullPath();
00844           MCC_Status r = client->process("PUT", path, &request, &transfer_info,
00845                                          &response);
00846           if (response)
00847             delete response;
00848           if (!r) {
00849             if ((++retries) > 10) {
00850               point.buffer->error_write(true);
00851               break;
00852             }
00853             // Recreate connection
00854             delete client;
00855             client = NULL;
00856             MCCConfig cfg;
00857             point.usercfg.ApplyToConfig(cfg);
00858             client = new ClientHTTP(cfg, point.url, point.usercfg.Timeout());
00859             continue;
00860           }
00861           if ((transfer_info.code != 201) &&
00862               (transfer_info.code != 200) &&
00863               (transfer_info.code != 204)) {  // HTTP error - retry?
00864             if ((transfer_info.code == 500) ||
00865                 (transfer_info.code == 503) ||
00866                 (transfer_info.code == 504))
00867               if ((++retries) <= 10)
00868                 continue;
00869             point.buffer->error_write(true);
00870             break;
00871           }
00872           break;
00873         }
00874     }
00875     if (client)
00876       delete client;
00877     delete &info;
00878     point.transfer_lock.unlock();
00879   }
00880 
00881 } // namespace Arc
00882 
00883 Arc::PluginDescriptor PLUGINS_TABLE_NAME[] = {
00884   { "http", "HED:DMC", 0, &Arc::DataPointHTTP::Instance },
00885   { NULL, NULL, 0, NULL }
00886 };