Back to index

nordugrid-arc-nox  1.1.0~rc6
hopi.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 
00005 #include <unistd.h>
00006 #include <sys/types.h>
00007 #include <sys/stat.h>
00008 #include <fcntl.h>
00009 #include <errno.h>
00010 #include <glibmm/fileutils.h>
00011 #include <glibmm/miscutils.h>
00012 #include <arc/message/MessageAttributes.h>
00013 #include <arc/message/PayloadRaw.h>
00014 #include <arc/message/PayloadStream.h>
00015 #include <arc/URL.h>
00016 #include <arc/Utils.h>
00017 #include <arc/Thread.h>
00018 #include <arc/StringConv.h>
00019 
00020 
00021 #include "hopi.h"
00022 #include "PayloadFile.h"
00023 
00024 namespace Hopi {
00025 
00026 static Arc::Plugin *get_service(Arc::PluginArgument* arg)
00027 {
00028     Arc::ServicePluginArgument* srvarg =
00029             arg?dynamic_cast<Arc::ServicePluginArgument*>(arg):NULL;
00030     if(!srvarg) return NULL;
00031     return new Hopi((Arc::Config*)(*srvarg));
00032 }
00033 
00034 Arc::Logger Hopi::logger(Arc::Logger::rootLogger, "Hopi");
00035 
00036 class HopiFileChunks {
00037  private:
00038   static std::map<std::string,HopiFileChunks> files;
00039   static Glib::Mutex lock;
00040   static int timeout;
00041   static time_t last_timeout;
00042   typedef std::list<std::pair<off_t,off_t> > chunks_t;
00043   chunks_t chunks;
00044   off_t size;
00045   time_t last_accessed;
00046   int refcount;
00047   std::map<std::string,HopiFileChunks>::iterator self;
00048   HopiFileChunks(void);
00049  public:
00050   static void Timeout(int t) { timeout=t; };
00051   void Add(off_t start,off_t end);
00052   off_t Size(void) { return size; };
00053   void Size(off_t size) {
00054     lock.lock();
00055     if(size > HopiFileChunks::size) HopiFileChunks::size = size;
00056     lock.unlock();
00057   };
00058   std::string Path(void) { return self->first; };
00059   static HopiFileChunks& Get(std::string path);
00060   static HopiFileChunks* GetStuck(void);
00061   static HopiFileChunks* GetFirst(void);
00062   void Remove(void);
00063   void Release(void);
00064   bool Complete(void);
00065   void Print(void);
00066 };
00067 
00068 std::map<std::string,HopiFileChunks> HopiFileChunks::files;
00069 Glib::Mutex HopiFileChunks::lock;
00070 int HopiFileChunks::timeout = 600; // 10 minutes by default
00071 time_t HopiFileChunks::last_timeout = time(NULL);
00072 
00073 void HopiFileChunks::Print(void) {
00074   int n = 0;
00075   for(chunks_t::iterator c = chunks.begin();c!=chunks.end();++c) {
00076     Hopi::logger.msg(Arc::DEBUG, "Chunk %u: %u - %u",n,c->first,c->second);
00077   };
00078 }
00079 
00080 HopiFileChunks::HopiFileChunks(void):size(0),last_accessed(time(NULL)),refcount(0),self(files.end()) {
00081 }
00082 
00083 HopiFileChunks* HopiFileChunks::GetStuck(void) {
00084   if(((int)(time(NULL)-last_timeout)) < timeout) return NULL;
00085   lock.lock();
00086   for(std::map<std::string,HopiFileChunks>::iterator f = files.begin();
00087                     f != files.end();++f) {
00088     if((f->second.refcount <= 0) && 
00089        (((int)(time(NULL) - f->second.last_accessed)) >= timeout )) {
00090       ++(f->second.refcount);
00091       lock.unlock();
00092       return &(f->second);
00093     }
00094   }
00095   last_timeout=time(NULL);
00096   lock.unlock();
00097   return NULL;
00098 }
00099 
00100 HopiFileChunks* HopiFileChunks::GetFirst(void) {
00101   lock.lock();
00102   std::map<std::string,HopiFileChunks>::iterator f = files.begin();
00103   if(f != files.end()) {
00104     ++(f->second.refcount);
00105     lock.unlock();
00106     return &(f->second);
00107   };
00108   lock.unlock();
00109   return NULL;
00110 }
00111 
00112 void HopiFileChunks::Remove(void) {
00113   lock.lock();
00114   --refcount;
00115   if(refcount <= 0) {
00116     if(self != files.end()) {
00117       files.erase(self);
00118     }
00119   }
00120   lock.unlock();
00121 }
00122 
00123 HopiFileChunks& HopiFileChunks::Get(std::string path) {
00124   lock.lock();
00125   std::map<std::string,HopiFileChunks>::iterator c = files.find(path);
00126   if(c == files.end()) {
00127     c=files.insert(std::pair<std::string,HopiFileChunks>(path,HopiFileChunks())).first;
00128     c->second.self=c;
00129   }
00130   ++(c->second.refcount);
00131   lock.unlock();
00132   return (c->second);
00133 }
00134 
00135 void HopiFileChunks::Release(void) {
00136   lock.lock();
00137   if(chunks.empty()) {
00138     lock.unlock();
00139     Remove();
00140   } else {
00141     --refcount;
00142     lock.unlock();
00143   }
00144 }
00145 
00146 void HopiFileChunks::Add(off_t start,off_t end) {
00147   lock.lock();
00148   last_accessed=time(NULL);
00149   if(end > size) size=end;
00150   for(chunks_t::iterator chunk = chunks.begin();chunk!=chunks.end();++chunk) {
00151     if((start >= chunk->first) && (start <= chunk->second)) {
00152       // New chunk starts within existing chunk
00153       if(end > chunk->second) {
00154         // Extend chunk
00155         chunk->second=end;
00156         // Merge overlapping chunks
00157         chunks_t::iterator chunk_ = chunk;
00158         ++chunk_;
00159         for(;chunk_!=chunks.end();) {
00160           if(chunk->second < chunk_->first) break;
00161           // Merge two chunks
00162           if(chunk_->second > chunk->second) chunk->second=chunk_->second;
00163           chunk_=chunks.erase(chunk_);
00164         };
00165       };
00166       lock.unlock();
00167       return;
00168     } else if((end >= chunk->first) && (end <= chunk->second)) {
00169       // New chunk ends within existing chunk
00170       if(start < chunk->first) {
00171         // Extend chunk
00172         chunk->first=start;
00173       };
00174       lock.unlock();
00175       return;
00176     } else if(end < chunk->first) {
00177       // New chunk is between existing chunks or first chunk
00178       chunks.insert(chunk,std::pair<off_t,off_t>(start,end));
00179       lock.unlock();
00180       return;
00181     };
00182   };
00183   // New chunk is last chunk or there are no chunks currently
00184   chunks.insert(chunks.end(),std::pair<off_t,off_t>(start,end));
00185   lock.unlock();
00186 }
00187 
00188 bool HopiFileChunks::Complete(void) {
00189   lock.lock();
00190   bool r = ((chunks.size() == 1) &&
00191             (chunks.begin()->first == 0) &&
00192             (chunks.begin()->second == size));
00193   lock.unlock();
00194   return r;
00195 }
00196 
00197 class HopiFile {
00198   int handle;
00199   std::string path;
00200   bool for_read;
00201   bool slave;
00202   HopiFileChunks& chunks;
00203  public:
00204   HopiFile(const std::string& path,bool for_read,bool slave);
00205   ~HopiFile(void);
00206   int Write(void* buf,off_t offset,int size);
00207   int Read(void* buf,off_t offset,int size);
00208   int Write(off_t offset,int size);
00209   int Read(off_t offset,int size);
00210   void Size(off_t size) { chunks.Size(size); };
00211   off_t Size(void) { return chunks.Size(); };
00212   operator bool(void) { return (handle != -1); };
00213   bool operator!(void) { return (handle == -1); };
00214   void Destroy(void);
00215   static void DestroyStuck(void);
00216   static void DestroyAll(void);
00217 };
00218 
00219 HopiFile::HopiFile(const std::string& path,bool for_read,bool slave):handle(-1),chunks(HopiFileChunks::Get(path)) {
00220   HopiFile::for_read=for_read;
00221   HopiFile::slave=slave;
00222   HopiFile::path=path;
00223   if(for_read) {
00224     handle=open(path.c_str(),O_RDONLY);
00225   } else {
00226     if(slave) {
00227       handle=open(path.c_str(),O_WRONLY);
00228       if(handle == -1) {
00229         if(errno == ENOENT) {
00230           Hopi::logger.msg(Arc::ERROR, "Hopi SlaveMode is active, PUT is only allowed to existing files");
00231         }
00232       }
00233     } else {
00234       handle=open(path.c_str(),O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);
00235     }
00236   }
00237   if(handle == -1) {
00238     Hopi::logger.msg(Arc::ERROR, Arc::StrError(errno));
00239   }
00240 }
00241   
00242 HopiFile::~HopiFile(void) {
00243   if(handle != -1) {
00244     close(handle);
00245     if(!for_read) {
00246       if(chunks.Complete()) {
00247         if(slave) {
00248           Hopi::logger.msg(Arc::VERBOSE, "Removing complete file in slave mode");
00249           ::unlink(path.c_str());
00250         }
00251         chunks.Remove();
00252         return;
00253       }
00254     }
00255   }
00256   chunks.Release();
00257 }
00258 
00259 void HopiFile::Destroy(void) {
00260   if(handle != -1) close(handle);
00261   handle=-1;
00262   ::unlink(path.c_str());
00263   chunks.Remove();
00264 }
00265 
00266 void HopiFile::DestroyStuck(void) {
00267   std::string prev_path;
00268   for(;;) {
00269     HopiFileChunks* stuck_chunks = HopiFileChunks::GetStuck();
00270     if(!stuck_chunks) return;
00271     std::string stuck_path = stuck_chunks->Path();
00272     if(stuck_path == prev_path) {
00273       // This may happen if other thread just started accessing this file
00274       stuck_chunks->Release();
00275       return;
00276     }
00277     ::unlink(stuck_path.c_str());
00278     stuck_chunks->Remove();
00279     prev_path=stuck_path;
00280   }
00281 }
00282 
00283 void HopiFile::DestroyAll(void) {
00284   std::string prev_path;
00285   for(;;) {
00286     HopiFileChunks* first_chunks = HopiFileChunks::GetFirst();
00287     if(!first_chunks) return;
00288     std::string first_path = first_chunks->Path();
00289     if(first_path == prev_path) {
00290       // This may happen if other thread just started accessing this file
00291       first_chunks->Release();
00292       return;
00293     }
00294     ::unlink(first_path.c_str());
00295     first_chunks->Remove();
00296     prev_path=first_path;
00297   }
00298 }
00299 
00300 int HopiFile::Write(void* buf,off_t offset,int size) {
00301   if(handle == -1) return -1;
00302   if(for_read) return -1;
00303   int s = size;
00304   if(lseek(handle,offset,SEEK_SET) != offset) return 0;
00305   for(;s>0;) {
00306     ssize_t l = write(handle,buf,s);
00307     if(l == -1) return -1;
00308     chunks.Add(offset,offset+l);
00309     chunks.Print();
00310     s-=l; buf=((char*)buf)+l; offset+=l;
00311   }
00312   return size;
00313 }
00314 
00315 int HopiFile::Write(off_t offset,int size) {
00316   if(handle == -1) return -1;
00317   if(for_read) return -1;
00318   chunks.Add(offset,offset+size);
00319   chunks.Print();
00320   return size;
00321 }
00322 
00323 int HopiFile::Read(void* buf,off_t offset,int size) {
00324   if(handle == -1) return -1;
00325   if(!for_read) return -1;
00326   if(lseek(handle,offset,SEEK_SET) != offset) return 0;
00327   return read(handle,buf,size);
00328 }
00329 
00330 int HopiFile::Read(off_t offset,int size) {
00331   if(handle == -1) return -1;
00332   if(!for_read) return -1;
00333   return size;
00334 }
00335 
00336 class HopiFileTimeout {
00337  private:
00338   static std::map<std::string,time_t> files;
00339   static Glib::Mutex lock;
00340   static int timeout;
00341   std::string path;
00342  public:
00343   HopiFileTimeout(const std::string& p);
00344   ~HopiFileTimeout(void);
00345   static void Timeout(int t) { timeout=t; };
00346   void Destroy(void);
00347   static void Add(const std::string& p);
00348   static void DestroyOld(void);
00349   static void DestroyAll(void);
00350 };
00351 
00352 std::map<std::string,time_t> HopiFileTimeout::files;
00353 Glib::Mutex HopiFileTimeout::lock;
00354 int HopiFileTimeout::timeout = 10;
00355 
00356 HopiFileTimeout::HopiFileTimeout(const std::string& p):path(p) {
00357   lock.lock();
00358   files[path]=time(NULL);
00359   lock.unlock();
00360 }
00361 
00362 HopiFileTimeout::~HopiFileTimeout(void) {
00363 }
00364 
00365 void HopiFileTimeout::Destroy(void) {
00366   lock.lock();
00367   std::map<std::string,time_t>::iterator f = files.find(path);
00368   if(f != files.end()) files.erase(f);
00369   lock.unlock();
00370   ::unlink(path.c_str());
00371 }
00372 
00373 void HopiFileTimeout::Add(const std::string& p) {
00374   lock.lock();
00375   files[p]=time(NULL);
00376   lock.unlock();
00377 }
00378 
00379 void HopiFileTimeout::DestroyOld(void) {
00380   lock.lock();
00381   std::map<std::string,time_t>::iterator f = files.begin();
00382   for(;f != files.end();) {
00383     int delta = (unsigned int)(time(NULL) - f->second);
00384     if(delta >= timeout) {
00385       ::unlink(f->first.c_str());
00386       std::map<std::string,time_t>::iterator f_ = f;
00387       ++f;
00388       files.erase(f_);
00389       continue;
00390     };
00391     ++f;
00392   }
00393   lock.unlock();
00394 }
00395 
00396 void HopiFileTimeout::DestroyAll(void) {
00397   lock.lock();
00398   std::map<std::string,time_t>::iterator f = files.begin();
00399   for(;f != files.end();) {
00400     ::unlink(f->first.c_str());
00401     std::map<std::string,time_t>::iterator f_ = f;
00402     ++f;
00403     files.erase(f_);
00404   }
00405   lock.unlock();
00406 }
00407 
00408 
00409 Hopi::Hopi(Arc::Config *cfg):RegisteredService(cfg),slave_mode(false)
00410 {
00411     logger.msg(Arc::INFO, "Hopi Initialized"); 
00412     doc_root = (std::string)((*cfg)["DocumentRoot"]);
00413     if (doc_root.empty()) {
00414         doc_root = "./";
00415     }
00416     logger.msg(Arc::INFO, "Hopi DocumentRoot is " + doc_root);
00417     slave_mode = (((std::string)((*cfg)["SlaveMode"])) == "1");
00418     if (slave_mode) logger.msg(Arc::INFO, "Hopi SlaveMode is on!");
00419     int timeout;
00420     if(Arc::stringto((std::string)((*cfg)["UploadTimeout"]),timeout)) {
00421         if(timeout > 0) HopiFileChunks::Timeout(timeout);
00422     }
00423     if(Arc::stringto((std::string)((*cfg)["DownloadTimeout"]),timeout)) {
00424         if(timeout > 0) HopiFileTimeout::Timeout(timeout);
00425     }
00426     uint64_t threshold;
00427     if(Arc::stringto((std::string)((*cfg)["MemoryMapThreshold"]),threshold)) {
00428       if(threshold > 0) PayloadBigFile::Threshold(threshold);
00429     }
00430 }
00431 
00432 bool Hopi::RegistrationCollector(Arc::XMLNode &doc) {
00433     Arc::NS isis_ns; isis_ns["isis"] = "http://www.nordugrid.org/schemas/isis/2008/08";
00434     Arc::XMLNode regentry(isis_ns, "RegEntry");
00435     regentry.NewChild("SrcAdv").NewChild("Type") = "org.nordugrid.storage.hopi";
00436     regentry.New(doc);
00437     return true;
00438 }
00439 
00440 Hopi::~Hopi(void)
00441 {
00442     logger.msg(Arc::INFO, "Hopi shutdown");
00443     HopiFile::DestroyAll();
00444     HopiFileTimeout::DestroyAll();
00445 }
00446 
00447 Arc::MessagePayload *Hopi::Get(const std::string &path, const std::string &base_url, unsigned long long int range_start, unsigned long long int range_end)
00448 {
00449     // XXX eliminate relative paths first
00450     std::string full_path = Glib::build_filename(doc_root, path);
00451     if (Glib::file_test(full_path, Glib::FILE_TEST_EXISTS) == true) {
00452         if (Glib::file_test(full_path, Glib::FILE_TEST_IS_REGULAR) == true) {
00453             // register file for removal after timeout
00454             Arc::MessagePayload * pf = newFileRead(full_path.c_str(),range_start,range_end);
00455             if(pf && slave_mode) HopiFileTimeout::Add(full_path);
00456             return pf;
00457         } else if (Glib::file_test(full_path, Glib::FILE_TEST_IS_DIR) && !slave_mode) {
00458             std::string html = "<HTML>\r\n<HEAD>Directory list of '" + path + "'</HEAD>\r\n<BODY><UL>\r\n";
00459             Glib::Dir dir(full_path);
00460             std::string d;
00461             std::string p;
00462             if (path == "/") {
00463                 p = "";
00464             } else {
00465                 p = path;
00466             }
00467             while ((d = dir.read_name()) != "") {
00468                 html += "<LI><a href=\""+ base_url + p + "/"+d+"\">"+d+"</a></LI>\r\n";
00469             }
00470             html += "</UL></BODY></HTML>";
00471             Arc::PayloadRaw *buf = new Arc::PayloadRaw();
00472             buf->Insert(html.c_str(), 0, html.length());
00473             return buf;
00474         }
00475     }
00476     return NULL;
00477 }
00478 
00479 static off_t GetEntitySize(Arc::MessagePayload &payload) {
00480     try {
00481         return dynamic_cast<Arc::PayloadRawInterface&>(payload).Size();
00482     } catch (std::exception &e) {
00483     }
00484     return 0;
00485 }
00486 
00487 
00488 Arc::MCC_Status Hopi::Put(const std::string &path, Arc::MessagePayload &payload)
00489 {
00490     // XXX eliminate relative paths first
00491     logger.msg(Arc::VERBOSE, "PUT called");
00492     std::string full_path = Glib::build_filename(doc_root, path);
00493     if (slave_mode && (Glib::file_test(full_path, Glib::FILE_TEST_EXISTS) == false)) {
00494         logger.msg(Arc::ERROR, "Hopi SlaveMode is active, PUT is only allowed to existing files");        
00495         return Arc::MCC_Status();
00496     }
00497     HopiFile fd(full_path.c_str(),false,slave_mode);
00498     if (!fd) {
00499         return Arc::MCC_Status();
00500     }
00501     fd.Size(GetEntitySize(payload));
00502     logger.msg(Arc::DEBUG, "File size is %u",fd.Size());
00503     try {
00504         Arc::PayloadStreamInterface& stream = dynamic_cast<Arc::PayloadStreamInterface&>(payload);
00505         char sbuf[1024*1024];
00506         for(;;) {
00507             int size = sizeof(sbuf);
00508             off_t offset = stream.Pos();
00509             if(!stream.Get(sbuf,size)) {
00510                 if(!stream) {
00511                     logger.msg(Arc::VERBOSE, "error reading from HTTP stream");
00512                     return Arc::MCC_Status();
00513                 }
00514                 break;
00515             }
00516             if(fd.Write(sbuf,offset,size) != size) {
00517               logger.msg(Arc::VERBOSE, "error on write");
00518               return Arc::MCC_Status();
00519             }
00520         }
00521     } catch (std::exception &e) {
00522         try {
00523             Arc::PayloadRawInterface& buf = dynamic_cast<Arc::PayloadRawInterface&>(payload);
00524             for(int n = 0;;++n) {
00525                 char* sbuf = buf.Buffer(n);
00526                 if(sbuf == NULL) break;
00527                 off_t offset = buf.BufferPos(n);
00528                 size_t size = buf.BufferSize(n);
00529                 if(size > 0) {
00530                     if(fd.Write(sbuf,offset,size) != size) {
00531                         logger.msg(Arc::VERBOSE, "error on write");
00532                         return Arc::MCC_Status();
00533                     }
00534                 }
00535             }
00536         } catch (std::exception &e) {
00537             logger.msg(Arc::ERROR, "Input for PUT operation is neither stream nor buffer");
00538             return Arc::MCC_Status();
00539         }
00540     }
00541     return Arc::MCC_Status(Arc::STATUS_OK);
00542 }
00543 
00544 static std::string GetPath(Arc::Message &inmsg,std::string &base) {
00545   base = inmsg.Attributes()->get("HTTP:ENDPOINT");
00546   Arc::AttributeIterator iterator = inmsg.Attributes()->getAll("PLEXER:EXTENSION");
00547   std::string path;
00548   if(iterator.hasMore()) {
00549     // Service is behind plexer
00550     path = *iterator;
00551     if(base.length() > path.length()) base.resize(base.length()-path.length());
00552   } else {
00553     // Standalone service
00554     path=Arc::URL(base).Path();
00555     base.resize(0);
00556   };
00557   return path;
00558 }
00559 
00560 Arc::MCC_Status Hopi::process(Arc::Message &inmsg, Arc::Message &outmsg)
00561 {
00562     std::string method = inmsg.Attributes()->get("HTTP:METHOD");
00563     std::string base_url;
00564     std::string path = GetPath(inmsg,base_url);
00565 
00566     logger.msg(Arc::VERBOSE, "method=%s, path=%s, url=%s, base=%s", method, path, inmsg.Attributes()->get("HTTP:ENDPOINT"), base_url);
00567     // Do file cleaning here to avoid running dedicated thread
00568     HopiFile::DestroyStuck();
00569     HopiFileTimeout::DestroyOld();
00570     if (method == "GET") {
00571         size_t range_start = 0;
00572         size_t range_end = (size_t)(-1);
00573         {
00574           std::string val;
00575           val=inmsg.Attributes()->get("HTTP:RANGESTART");
00576           if(!val.empty()) { // Negative ranges not supported
00577             if(!Arc::stringto<size_t>(val,range_start)) {
00578               range_start=0;
00579             } else {
00580               val=inmsg.Attributes()->get("HTTP:RANGEEND");
00581               if(!val.empty()) {
00582                 if(!Arc::stringto<size_t>(val,range_end)) {
00583                   range_end=(size_t)(-1);
00584                 } else {
00585                   range_end+=1;
00586                 };
00587               };
00588             };
00589           };
00590         };
00591         Arc::MessagePayload *buf = Get(path, base_url, range_start, range_end);
00592         if (!buf) {
00593             // XXX: HTTP error
00594             return Arc::MCC_Status();
00595         }
00596         outmsg.Payload(buf);
00597         return Arc::MCC_Status(Arc::STATUS_OK);
00598     } else if (method == "PUT") {
00599         Arc::MessagePayload *inpayload = inmsg.Payload();
00600         if(!inpayload) {
00601             logger.msg(Arc::WARNING, "No content provided for PUT operation");
00602             return Arc::MCC_Status();
00603         }
00604         Arc::MCC_Status ret = Put(path, *inpayload);
00605         if (!ret) {
00606             // XXX: HTTP error
00607             return Arc::MCC_Status();
00608         }
00609         Arc::PayloadRaw *buf = new Arc::PayloadRaw();
00610         outmsg.Payload(buf);
00611         return ret;
00612     } 
00613     logger.msg(Arc::WARNING, "Not supported operation");
00614     return Arc::MCC_Status();
00615 }
00616 
00617 } // namespace Hopi
00618 
00619 Arc::PluginDescriptor PLUGINS_TABLE_NAME[] = {
00620     { "hopi", "HED:SERVICE", 0, &Hopi::get_service },
00621     { NULL, NULL, 0, NULL}
00622 };