Back to index

nordugrid-arc-nox  1.1.0~rc6
put.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 
00005 #include <sys/types.h>
00006 #include <sys/stat.h>
00007 #include <unistd.h>
00008 #include <dirent.h>
00009 #include <fcntl.h>
00010 #include <sys/types.h>
00011 #include <unistd.h>
00012 
00013 #include <string>
00014 #include <fstream>
00015 
00016 #include <arc/StringConv.h>
00017 #include <arc/Utils.h>
00018 #include <arc/message/PayloadStream.h>
00019 #include <arc/message/PayloadRaw.h>
00020 #include "PayloadFile.h"
00021 #include "job.h"
00022 
00023 #include "arex.h"
00024 
00025 #define MAX_CHUNK_SIZE (10*1024*1024)
00026 
00027 namespace ARex {
00028 
00029 static Arc::MCC_Status http_put(ARexJob& job,const std::string& hpath,Arc::Logger& logger,Arc::PayloadStreamInterface& stream);
00030 static Arc::MCC_Status http_put(ARexJob& job,const std::string& hpath,Arc::Logger& logger,Arc::PayloadRawInterface& buf);
00031 
00032 // TODO: monitor chunks written into files and report when file is complete
00033 Arc::MCC_Status ARexService::Put(Arc::Message& inmsg,Arc::Message& /*outmsg*/,ARexGMConfig& config,const std::string& id,const std::string& subpath) {
00034   if(id.empty()) return Arc::MCC_Status();
00035   ARexJob job(id,config,logger_);
00036   if(!job) {
00037     // There is no such job
00038     logger_.msg(Arc::ERROR, "Put: there is no job: %s - %s", id, job.Failure());
00039     // TODO: make proper html message
00040     return Arc::MCC_Status();
00041   };
00042   Arc::MessagePayload* payload = inmsg.Payload();
00043   if(!payload) {
00044     logger_.msg(Arc::ERROR, "Put: there is no payload for file %s in job: %s", subpath, id);
00045     return Arc::MCC_Status();
00046   };
00047   Arc::PayloadStreamInterface* stream = NULL;
00048   try {
00049     stream = dynamic_cast<Arc::PayloadStreamInterface*>(payload);
00050   } catch(std::exception& e) { };
00051   if(stream) return http_put(job,subpath,logger_,*stream);
00052   Arc::PayloadRawInterface* buf = NULL;
00053   try {
00054     buf = dynamic_cast<Arc::PayloadRawInterface*>(payload);
00055   } catch(std::exception& e) { };
00056   if(buf) return http_put(job,subpath,logger_,*buf);
00057   logger_.msg(Arc::ERROR, "Put: unrecognized payload for file %s in job: %s", subpath, id);
00058   return Arc::MCC_Status();
00059 } 
00060 
00061 static bool write_file(int h,char* buf,size_t size) {
00062   for(;size>0;) {
00063     ssize_t l = write(h,buf,size);
00064     if(l == -1) return false;
00065     size-=l; buf+=l;
00066   };
00067   return true;
00068 }
00069 
00070 static Arc::MCC_Status http_put(ARexJob& job,const std::string& hpath,Arc::Logger& logger,Arc::PayloadStreamInterface& stream) {
00071   // TODO: Use memory mapped file to minimize number of in memory copies
00072   // File 
00073   const int bufsize = 1024*1024;
00074   int h = job.CreateFile(hpath.c_str());
00075   if(h == -1) {
00076     // TODO: report something
00077     logger.msg(Arc::ERROR, "Put: failed to create file %s for job %s - %s", hpath, job.ID(), job.Failure());
00078     return Arc::MCC_Status();
00079   };
00080   int pos = stream.Pos(); 
00081   if(lseek(h,pos,SEEK_SET) != pos) {
00082     std::string err = Arc::StrError();
00083     ::close(h);
00084     logger.msg(Arc::ERROR, "Put: failed to set position of file %s for job %s to %i - %s", hpath, job.ID(), pos, err);
00085     return Arc::MCC_Status();
00086   };
00087   char* buf = new char[bufsize];
00088   if(!buf) {
00089     ::close(h);
00090     logger.msg(Arc::ERROR, "Put: failed to allocate memory for file %s in job %s", hpath, job.ID());
00091     return Arc::MCC_Status();
00092   };
00093   for(;;) {
00094     int size = bufsize;
00095     if(!stream.Get(buf,size)) break;
00096     if(!write_file(h,buf,size)) {
00097       std::string err = Arc::StrError();
00098       delete[] buf; ::close(h);
00099       logger.msg(Arc::ERROR, "Put: failed to write to file %s for job %s - %s", hpath, job.ID(), err);
00100       return Arc::MCC_Status();
00101     };
00102   };
00103   delete[] buf; ::close(h);
00104   return Arc::MCC_Status(Arc::STATUS_OK);
00105 }
00106 
00107 static Arc::MCC_Status http_put(ARexJob& job,const std::string& hpath,Arc::Logger& logger,Arc::PayloadRawInterface& buf) {
00108   // File 
00109   int h = job.CreateFile(hpath.c_str());
00110   if(h == -1) {
00111     // TODO: report something
00112     logger.msg(Arc::ERROR, "Put: failed to create file %s for job %s - %s", hpath, job.ID(), job.Failure());
00113     return Arc::MCC_Status();
00114   };
00115   for(int n = 0;;++n) {
00116     char* sbuf = buf.Buffer(n);
00117     if(sbuf == NULL) break;
00118     off_t offset = buf.BufferPos(n);
00119     size_t size = buf.BufferSize(n);
00120     if(size > 0) {
00121       off_t o = lseek(h,offset,SEEK_SET);
00122       if(o != offset) {
00123         ::close(h);
00124         return Arc::MCC_Status();
00125       };
00126       if(!write_file(h,sbuf,size)) {
00127         ::close(h);
00128         return Arc::MCC_Status();
00129       };
00130     };
00131   };
00132   ::close(h);
00133   return Arc::MCC_Status(Arc::STATUS_OK);
00134 }
00135 
00136 } // namespace ARex
00137