Back to index

nordugrid-arc-nox  1.1.0~rc6
information_collector.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 
00005 #include <sstream>
00006 #include <fstream>
00007 #include <sys/types.h>
00008 #include <sys/stat.h>
00009 #include <unistd.h>
00010 #include <sys/mman.h>
00011 #include <fcntl.h>
00012 
00013 #include <glibmm.h>
00014 
00015 #include <arc/Run.h>
00016 #include <arc/wsrf/WSResourceProperties.h>
00017 #include <arc/message/PayloadSOAP.h>
00018 
00019 #include "ldif/LDIFtoXML.h"
00020 #include "config/ngconfig.h"
00021 #include "grid-manager/conf/environment.h"
00022 #include "job.h"
00023 #include "arex.h"
00024 
00025 namespace ARex {
00026 
00027 static void GetGlueStates(Arc::XMLNode infodoc,std::map<std::string,std::string>& states);
00028 
00029 void ARexService::InformationCollector(void) {
00030   thread_count_.RegisterThread();
00031   for(;;) {
00032     // Run information provider
00033     std::string xml_str;
00034     int r = -1;
00035     {
00036       std::string cmd;
00037       cmd=nordugrid_libexec_loc()+"/CEinfo.pl --config "+nordugrid_config_loc();
00038       std::string stdin_str;
00039       std::string stderr_str;
00040       Arc::Run run(cmd);
00041       run.AssignStdin(stdin_str);
00042       run.AssignStdout(xml_str);
00043       run.AssignStderr(stderr_str);
00044       logger_.msg(Arc::DEBUG,"Cluster information provider: %s",cmd);
00045       if(!run.Start()) {
00046       };
00047       if(!run.Wait(infoprovider_wakeup_period_*10)) {
00048         logger_.msg(Arc::WARNING,"Cluster information provider timeout: %u seconds",
00049                     infoprovider_wakeup_period_*10);
00050       } else {
00051         r = run.Result();
00052         if (r!=0) logger_.msg(Arc::WARNING,"Cluster information provider failed with exit status: %i",r);
00053       };
00054       logger_.msg(Arc::DEBUG,"Cluster information provider log:\n%s",stderr_str);
00055     };
00056     if (r!=0) {
00057       logger_.msg(Arc::DEBUG,"No new informational document assigned");
00058     } else {
00059       logger_.msg(Arc::VERBOSE,"Obtained XML: %s",xml_str.substr(0,100));
00060       /*
00061       Arc::XMLNode root(xml_str);
00062       if(root) {
00063         // Collect job states
00064         GetGlueStates(root,glue_states_);
00065         // Put result into container
00066         infodoc_.Arc::InformationContainer::Assign(root,true);
00067         logger_.msg(Arc::DEBUG,"Assigned new informational document");
00068       } else {
00069         logger_.msg(Arc::ERROR,"Failed to create informational document");
00070       };
00071       */
00072       if(!xml_str.empty()) {
00073         infodoc_.Assign(xml_str);
00074         Arc::XMLNode root = infodoc_.Acquire();
00075         if(root) {
00076           logger_.msg(Arc::DEBUG,"Assigned new informational document");
00077           // Collect job states
00078           GetGlueStates(root,glue_states_);
00079         } else {
00080           logger_.msg(Arc::ERROR,"Failed to create informational document");
00081         };
00082         infodoc_.Release();
00083       } else {
00084         logger_.msg(Arc::ERROR,"Informational document is empty");
00085       };
00086     };
00087     if(thread_count_.WaitOrCancel(infoprovider_wakeup_period_*1000)) break;
00088   };
00089   thread_count_.UnregisterThread();
00090 }
00091 
00092 bool ARexService::RegistrationCollector(Arc::XMLNode &doc) {
00093   //Arc::XMLNode root = infodoc_.Acquire();
00094   logger_.msg(Arc::VERBOSE,"Passing service's information from collector to registrator");
00095   Arc::XMLNode empty(ns_, "RegEntry");
00096   empty.New(doc);
00097 
00098   doc.NewChild("SrcAdv");
00099   doc.NewChild("MetaSrcAdv");
00100 
00101   doc["SrcAdv"].NewChild("Type") = "org.nordugrid.execution.arex";
00102   doc["SrcAdv"].NewChild("EPR").NewChild("Address") = endpoint_;
00103   //doc["SrcAdv"].NewChild("SSPair");
00104 
00105   return true;
00106   //
00107   // TODO: filter information here.
00108   //Arc::XMLNode regdoc("<Service/>");
00109   //regdoc.New(doc);
00110   //doc.NewChild(root);
00111   //infodoc_.Release();
00112 }
00113 
00114 std::string ARexService::getID() {
00115   return "ARC:AREX";
00116 }
00117 
00118 static void GetGlueStates(Arc::XMLNode infodoc,std::map<std::string,std::string>& states) {
00119   std::string path = "Domains/AdminDomain/Services/ComputingService/ComputingEndpoint/ComputingActivities/ComputingActivity";
00120   // Obtaining all job descriptions
00121   Arc::XMLNodeList nodes = infodoc.Path(path);
00122   // Pulling ids and states
00123   for(Arc::XMLNodeList::iterator node = nodes.begin();node!=nodes.end();++node) {
00124     // Exract ID of job
00125     std::string id = (*node)["IDFromEndpoint"];
00126     if(id.empty()) id = (std::string)((*node)["ID"]);
00127     if(id.empty()) continue;
00128     std::string::size_type p = id.rfind('/');
00129     if(p != std::string::npos) id.erase(0,p+1);
00130     if(id.empty()) continue;
00131     Arc::XMLNode state_node = (*node)["State"];
00132     for(;(bool)state_node;++state_node) {
00133       std::string state  = (std::string)state_node;
00134       if(state.empty()) continue;
00135       // Look for nordugrid prefix
00136       if(strncmp("nordugrid:",state.c_str(),10) == 0) {
00137         // Remove prefix
00138         state.erase(0,10);
00139         // Store state under id
00140         states[id] = state;
00141       };
00142     };
00143   };
00144 }
00145 
00146 class PrefixedFilePayload: public Arc::PayloadRawInterface {
00147  private:
00148   std::string prefix_;
00149   std::string postfix_;
00150   int handle_;
00151   void* addr_;
00152   size_t length_;
00153  public:
00154   PrefixedFilePayload(const std::string& prefix,const std::string& postfix,int handle) {
00155     prefix_ = prefix;
00156     postfix_ = postfix;
00157     handle_ = handle;
00158     addr_ = NULL;
00159     length_ = 0;
00160     if(handle != -1) {
00161       struct stat st;
00162       if(::fstat(handle,&st) == 0) {
00163         if(st.st_size > 0) {
00164           length_ = st.st_size;
00165           addr_ = ::mmap(NULL,st.st_size,PROT_READ,MAP_PRIVATE,handle,0);
00166           if(!addr_) length_=0;
00167         };
00168       };
00169     };
00170   };
00171   ~PrefixedFilePayload(void) {
00172     if(addr_) ::munmap(addr_,length_);
00173     ::close(handle_);
00174   };
00175   virtual char operator[](Size_t pos) const {
00176     char* p = ((PrefixedFilePayload*)this)->Content(pos);
00177     if(!p) return 0;
00178     return *p;
00179   };
00180   virtual char* Content(Size_t pos) {
00181     if(pos < prefix_.length()) return (char*)(prefix_.c_str() + pos);
00182     pos -= prefix_.length();
00183     if(pos < length_) return ((char*)(addr_) + pos);
00184     pos -= length_; 
00185     if(pos < postfix_.length()) return (char*)(postfix_.c_str() + pos);
00186     return NULL;
00187   };
00188   virtual Size_t Size(void) const {
00189     return (prefix_.length() + length_ + postfix_.length());
00190   };
00191   virtual char* Insert(Size_t pos = 0,Size_t size = 0) {
00192     return NULL;
00193   };
00194   virtual char* Insert(const char* s,Size_t pos = 0,Size_t size = -1) {
00195     return NULL;
00196   };
00197   virtual char* Buffer(unsigned int num = 0) {
00198     if(num == 0) return (char*)(prefix_.c_str());
00199     if(addr_) {
00200       if(num == 1) return (char*)addr_;
00201     } else {
00202       ++num;
00203     };
00204     if(num == 2) return (char*)(postfix_.c_str());
00205     return NULL;
00206   };
00207   virtual Size_t BufferSize(unsigned int num = 0) const {
00208     if(num == 0) return prefix_.length();
00209     if(addr_) {
00210       if(num == 1) return length_;
00211     } else {
00212       ++num;
00213     };
00214     if(num == 2) return postfix_.length();
00215     return 0;
00216   };
00217   virtual Size_t BufferPos(unsigned int num = 0) const {
00218     if(num == 0) return 0;
00219     if(addr_) {
00220       if(num == 1) return prefix_.length();
00221     } else {
00222       ++num;
00223     };
00224     if(num == 2) return (prefix_.length() + length_);
00225     return (prefix_.length() + length_ + postfix_.length());
00226   };
00227   virtual bool Truncate(Size_t size) { return false; };
00228 };
00229 
00230 OptimizedInformationContainer::OptimizedInformationContainer(void) {
00231   handle_=-1;
00232 }
00233 
00234 OptimizedInformationContainer::~OptimizedInformationContainer(void) {
00235   if(handle_ != -1) ::close(handle_);
00236   if(!filename_.empty()) ::unlink(filename_.c_str());
00237 }
00238 
00239 int OptimizedInformationContainer::OpenDocument(void) {
00240   int h = -1;
00241   olock_.lock();
00242   if(handle_ != -1) h = ::dup(handle_);
00243   olock_.unlock();
00244   return h;
00245 }
00246 
00247 Arc::MessagePayload* OptimizedInformationContainer::Process(Arc::SOAPEnvelope& in) {
00248   Arc::WSRF& wsrp = Arc::CreateWSRP(in);
00249   if(!wsrp) { delete &wsrp; return NULL; };
00250   try {
00251     Arc::WSRPGetResourcePropertyDocumentRequest* req =
00252          dynamic_cast<Arc::WSRPGetResourcePropertyDocumentRequest*>(&wsrp);
00253     if(!req) throw std::exception();
00254     if(!(*req)) throw std::exception();
00255     // Request for whole document
00256     std::string fake_str("<fake>fake</fake>");
00257     Arc::XMLNode xresp(fake_str);
00258     Arc::WSRPGetResourcePropertyDocumentResponse resp(xresp);
00259     std::string rest_str;
00260     resp.SOAP().GetDoc(rest_str);
00261     std::string::size_type p = rest_str.find(fake_str);
00262     if(p == std::string::npos) throw std::exception();
00263     PrefixedFilePayload* outpayload = new PrefixedFilePayload(rest_str.substr(0,p),rest_str.substr(p+fake_str.length()),OpenDocument());
00264     delete &wsrp;
00265     return outpayload;
00266   } catch(std::exception& e) { };
00267   delete &wsrp;
00268   Arc::NS ns;
00269   Arc::SOAPEnvelope* out = InformationContainer::Process(in);
00270   if(!out) return NULL;
00271   Arc::PayloadSOAP* outpayload = new Arc::PayloadSOAP(ns);
00272   out->Swap(*outpayload);
00273   delete out;
00274   return outpayload;
00275 }
00276 
00277 void OptimizedInformationContainer::AssignFile(const std::string& filename) {
00278   olock_.lock();
00279   if(!filename_.empty()) ::unlink(filename_.c_str());
00280   if(handle_ != -1) ::close(handle_);
00281   filename_ = filename;
00282   handle_ = -1;
00283   if(!filename_.empty()) {
00284     handle_ = ::open(filename_.c_str(),O_RDONLY);
00285     lock_.lock();
00286     doc_.ReadFromFile(filename_);
00287     lock_.unlock();
00288     Arc::InformationContainer::Assign(doc_,false);
00289   };
00290   olock_.unlock();
00291 }
00292 
00293 void OptimizedInformationContainer::Assign(const std::string& xml) {
00294   std::string filename;
00295   int h = Glib::file_open_tmp(filename);
00296   if(h == -1) {
00297     Arc::Logger::getRootLogger().msg(Arc::ERROR,"OptimizedInformationContainer failed to create temporary file");
00298     return;
00299   };
00300   Arc::Logger::getRootLogger().msg(Arc::VERBOSE,"OptimizedInformationContainer created temporary file: %s",filename);
00301   for(std::string::size_type p = 0;p<xml.length();++p) {
00302     ssize_t l = ::write(h,xml.c_str()+p,xml.length()-p);
00303     if(l == -1) {
00304       ::unlink(filename.c_str());
00305       ::close(h);
00306       Arc::Logger::getRootLogger().msg(Arc::ERROR,"OptimizedInformationContainer failed to store XML document to temporary file");
00307       return;
00308     };
00309     p+=l;
00310   };
00311   Arc::XMLNode newxml(xml);
00312   if(!newxml) {
00313     ::unlink(filename.c_str());
00314     ::close(h);
00315     Arc::Logger::getRootLogger().msg(Arc::ERROR,"OptimizedInformationContainer failed to parse XML");
00316     return;
00317   };
00318   // Here we have XML stored in file and parsed
00319   olock_.lock();
00320   if(!filename_.empty()) ::unlink(filename_.c_str());
00321   if(handle_ != -1) ::close(handle_);
00322   filename_ = filename;
00323   handle_ = h;
00324   lock_.lock();
00325   doc_.Swap(newxml);
00326   lock_.unlock();
00327   Arc::InformationContainer::Assign(doc_,false);
00328   olock_.unlock();
00329 }
00330 
00331 }
00332