Back to index

nordugrid-arc-nox  1.1.0~rc6
DataPointARC.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #define __STDC_LIMIT_MACROS
00008 #ifdef HAVE_STDINT_H
00009 #include <stdint.h>
00010 #endif
00011 
00012 #include <unistd.h>
00013 #include <sstream>
00014 #include <glibmm.h>
00015 #include <algorithm>
00016 
00017 #include <arc/Logger.h>
00018 #include <arc/StringConv.h>
00019 #include <arc/UserConfig.h>
00020 #include <arc/client/ClientInterface.h>
00021 #include <arc/data/DataBuffer.h>
00022 #include <arc/message/MCC.h>
00023 #include <arc/message/PayloadRaw.h>
00024 
00025 #ifdef WIN32
00026 #include <arc/win32.h>
00027 #endif
00028 
00029 #include "DataPointARC.h"
00030 
00031 namespace Arc {
00032 
00033   Logger DataPointARC::logger(DataPoint::logger, "ARC");
00034 
00035   typedef struct {
00036     DataPointARC *point;
00037     ClientSOAP *client;
00038   } ARCInfo_t;
00039 
00040   bool DataPointARC::checkBartenderURL(const URL& bartender_url){
00041     MCCConfig cfg;
00042     usercfg.ApplyToConfig(cfg);
00043 
00044     ClientSOAP client(cfg, bartender_url, usercfg.Timeout());
00045     std::string xml;
00046 
00047     NS ns("bar", "http://www.nordugrid.org/schemas/bartender");
00048     PayloadSOAP request(ns);
00049     request.NewChild("bar:list").NewChild("bar:listRequestList").NewChild("bar:listRequestElement").NewChild("bar:requestID") = "0";
00050     request["bar:list"]["bar:listRequestList"]["bar:listRequestElement"].NewChild("bar:LN") = bartender_url.Path();
00051     request["bar:list"].NewChild("bar:neededMetadataList").NewChild("bar:neededMetadataElement").NewChild("bar:section") = "entry";
00052     request["bar:list"]["bar:neededMetadatList"]["bar:neededMetadataElement"].NewChild("bar:property") = "";
00053     request.GetXML(xml, true);
00054 
00055     PayloadSOAP *response = NULL;
00056 
00057     MCC_Status status;
00058     try{
00059       status = client.process(&request, &response);
00060     }
00061     catch(std::exception e){
00062       logger.msg(WARNING, bartender_url.Path(), "did not work");
00063     }
00064 
00065     bool ret = true;
00066     if(!response)
00067       ret = false;
00068     else{
00069       response->Child().GetXML(xml, true);
00070       logger.msg(VERBOSE, "checingBartenderURL: Response:\n%s", xml);
00071       if(xml.find("Failed to send SOAP message") != std::string::npos)
00072         ret = false;
00073     }
00074     if(!status)
00075       ret = false;
00076     delete response;
00077     return ret;
00078   }
00079 
00080   DataPointARC::DataPointARC(const URL& url, const UserConfig& usercfg)
00081     : DataPointDirect(url, usercfg),
00082       transfer(NULL),
00083       md5sum(NULL),
00084       reading(false),
00085       writing(false),
00086       bartender_url(url.HTTPOption("BartenderURL")) {
00087     if (!bartender_url) {
00088       if (!usercfg.Bartender().empty()){
00089         std::vector<int> shuffledKeys;
00090         for (int i = 0; i < usercfg.Bartender().size(); i++)
00091           shuffledKeys.push_back(i);
00092         std::random_shuffle(shuffledKeys.begin(), shuffledKeys.end());
00093 
00094         // pick random bartender url:
00095         for (int i = 0; i < shuffledKeys.size(); i++) {
00096           if (checkBartenderURL(usercfg.Bartender()[shuffledKeys[i]])) {
00097             bartender_url = usercfg.Bartender()[shuffledKeys[i]];
00098             break;
00099           }
00100         }
00101       }
00102       if (!bartender_url)
00103         bartender_url = URL("http://localhost:60000/Bartender");
00104     }
00105 
00106     md5sum = new MD5Sum();
00107   }
00108 
00109   DataPointARC::~DataPointARC() {
00110     StopReading();
00111     StopWriting();
00112     if (md5sum) {
00113       delete md5sum;
00114       md5sum = NULL;
00115     }
00116     if (transfer){
00117       delete transfer;
00118       transfer = NULL;
00119     }
00120   }
00121 
00122   Plugin* DataPointARC::Instance(PluginArgument *arg) {
00123     DataPointPluginArgument *dmcarg = dynamic_cast<DataPointPluginArgument*>(arg);
00124     if (!dmcarg)
00125       return NULL;
00126     if (((const URL&)(*dmcarg)).Protocol() != "arc")
00127       return NULL;
00128     return new DataPointARC(*dmcarg, *dmcarg);
00129   }
00130 
00131   DataStatus DataPointARC::ListFiles(std::list<FileInfo>& files, bool, bool, bool) {
00132     if (!url.Host().empty()){
00133       logger.msg(ERROR, "Hostname is not implemented for arc protocol");
00134       return DataStatus::UnimplementedError;
00135     }
00136     MCCConfig cfg;
00137     usercfg.ApplyToConfig(cfg);
00138 
00139     ClientSOAP client(cfg, bartender_url, usercfg.Timeout());
00140     std::string xml;
00141 
00142     NS ns("bar", "http://www.nordugrid.org/schemas/bartender");
00143     PayloadSOAP request(ns);
00144     request.NewChild("bar:list").NewChild("bar:listRequestList").NewChild("bar:listRequestElement").NewChild("bar:requestID") = "0";
00145     request["bar:list"]["bar:listRequestList"]["bar:listRequestElement"].NewChild("bar:LN") = url.Path();
00146     request["bar:list"].NewChild("bar:neededMetadataList").NewChild("bar:neededMetadataElement").NewChild("bar:section") = "entry";
00147     request["bar:list"]["bar:neededMetadatList"]["bar:neededMetadataElement"].NewChild("bar:property") = "";
00148     request.GetXML(xml, true);
00149     logger.msg(INFO, "Request:\n%s", xml);
00150 
00151     PayloadSOAP *response = NULL;
00152 
00153     MCC_Status status = client.process(&request, &response);
00154 
00155     if (!status) {
00156       logger.msg(ERROR, (std::string)status);
00157       if (response)
00158         delete response;
00159       return DataStatus::ListError;
00160     }
00161 
00162     if (!response) {
00163       logger.msg(ERROR, "No SOAP response");
00164       return DataStatus::ListError;
00165     }
00166 
00167     response->Child().GetXML(xml, true);
00168     logger.msg(INFO, "Response:\n%s", xml);
00169 
00170     XMLNode nd = (*response).Child()["listResponseList"]["listResponseElement"];
00171     nd.GetXML(xml, true);
00172 
00173     logger.msg(INFO, "nd:\n%s", xml);
00174 
00175     if (nd["status"] == "not found")
00176       return DataStatus::ListError;
00177 
00178     if (nd["status"] == "found")
00179       for (int i = 0;; i++) {
00180         XMLNode cnd = nd["entries"]["entry"][i];
00181         if (!cnd)
00182           break;
00183         std::string file_name = cnd["name"];
00184         std::string type;
00185         for (int j = 0;; j++) {
00186           XMLNode ccnd = cnd["metadataList"]["metadata"][j];
00187           if (!ccnd)
00188             break;
00189           if (ccnd["property"] == "type")
00190             type = (std::string)ccnd["value"];
00191         }
00192         logger.msg(INFO, "cnd:\n%s is a %s", file_name, type);
00193         std::list<FileInfo>::iterator f = files.insert(files.end(), FileInfo(file_name.c_str()));
00194         if (type == "collection")
00195           f->SetType(FileInfo::file_type_dir);
00196         else
00197           f->SetType(FileInfo::file_type_file);
00198       }
00199     else {
00200       // its a file or something
00201       // we know it exists so we use file name from url
00202       std::string path = url.Path();
00203       std::string::size_type i = path.rfind(G_DIR_SEPARATOR, path.length());
00204       std::string file_name = path.substr((i != std::string::npos)?(i+1):0);
00205       std::list<FileInfo>::iterator f = files.insert(files.end(), FileInfo(file_name.c_str()));
00206       f->SetType(FileInfo::file_type_file);
00207     }
00208 
00209     std::string answer = (std::string)((*response).Child().Name());
00210 
00211     delete response;
00212 
00213     logger.msg(INFO, answer);
00214 
00215     return DataStatus::Success;
00216   }
00217 
00218   DataStatus DataPointARC::StartReading(DataBuffer& buf) {
00219     if (!url.Host().empty()){
00220       logger.msg(ERROR, "Hostname is not implemented for arc protocol");
00221       return DataStatus::UnimplementedError;
00222     }
00223 
00224     logger.msg(VERBOSE, "StartReading");
00225     if (reading)
00226       return DataStatus::IsReadingError;
00227     if (writing)
00228       return DataStatus::IsWritingError;
00229 
00230     reading = true;
00231     buffer = &buf;
00232     MCCConfig cfg;
00233     usercfg.ApplyToConfig(cfg);
00234 
00235     // get TURL from bartender
00236     ClientSOAP client(cfg, bartender_url, usercfg.Timeout());
00237     std::string xml;
00238 
00239     NS ns("bar", "http://www.nordugrid.org/schemas/bartender");
00240     PayloadSOAP request(ns);
00241     request.NewChild("bar:getFile").NewChild("bar:getFileRequestList").NewChild("bar:getFileRequestElement").NewChild("bar:requestID") = "0";
00242     request["bar:getFile"]["bar:getFileRequestList"]["bar:getFileRequestElement"].NewChild("bar:LN") = url.Path();
00243     // only supports http protocol:
00244     request["bar:getFile"]["bar:getFileRequestList"]["bar:getFileRequestElement"].NewChild("bar:protocol") = "http";
00245     request.GetXML(xml, true);
00246     logger.msg(INFO, "Request:\n%s", xml);
00247 
00248     PayloadSOAP *response = NULL;
00249 
00250     MCC_Status status = client.process(&request, &response);
00251 
00252     if (!status) {
00253       logger.msg(ERROR, (std::string)status);
00254       if (response)
00255         delete response;
00256       return DataStatus::ReadError;
00257     }
00258 
00259     if (!response) {
00260       logger.msg(ERROR, "No SOAP response");
00261       return DataStatus::ReadError;
00262     }
00263 
00264     response->Child().GetXML(xml, true);
00265     logger.msg(INFO, "Response:\n%s", xml);
00266 
00267     XMLNode nd = (*response).Child()["getFileResponseList"]["getFileResponseElement"];
00268     nd.GetXML(xml, true);
00269 
00270     logger.msg(INFO, "nd:\n%s", xml);
00271 
00272     if (nd["success"] != "done" || !nd["TURL"]) {
00273       delete response;
00274       return DataStatus::ReadError;
00275     }
00276 
00277     logger.msg(INFO, "Recieved transfer URL: %s", (std::string)nd["TURL"]);
00278 
00279     turl = (std::string) nd["TURL"];
00280     delete response;
00281     // redirect actual reading to http dmc
00282     if (transfer){ 
00283       delete transfer;
00284       transfer = NULL;
00285     }
00286     transfer = new DataHandle(turl, usercfg);
00287     if (!(*transfer)->StartReading(buf)) {
00288       if (transfer) {
00289         delete transfer;
00290         transfer = NULL;
00291       }
00292       reading = false;
00293       return DataStatus::ReadError;
00294     }
00295 
00296     return DataStatus::Success;
00297   }
00298 
00299   DataStatus DataPointARC::StopReading() {
00300     if (!reading)
00301       return DataStatus::ReadStopError;
00302     reading = false;
00303     if (!transfer)
00304       return DataStatus::Success;
00305     DataStatus ret = (*transfer)->StopReading();
00306     delete transfer;
00307     transfer = NULL;
00308     return ret;
00309   }
00310 
00311   DataStatus DataPointARC::StartWriting(DataBuffer& buf,
00312                                         DataCallback *callback) {
00313     if (!url.Host().empty()){
00314       logger.msg(ERROR, "Hostname is not implemented for arc protocol");
00315       return DataStatus::UnimplementedError;
00316     }
00317     logger.msg(VERBOSE, "StartWriting");
00318     if (reading)
00319       return DataStatus::IsReadingError;
00320     if (writing)
00321       return DataStatus::IsWritingError;
00322 
00323     writing = true;
00324     buffer = &buf;
00325     chksum_index = buffer->add(md5sum);
00326     MCCConfig cfg;
00327     usercfg.ApplyToConfig(cfg);
00328 
00329     // get TURL from bartender
00330     ClientSOAP client(cfg, bartender_url, usercfg.Timeout());
00331     std::string xml;
00332     std::stringstream out;
00333     out << this->GetSize();
00334     std::string size_str = out.str();
00335     NS ns("bar", "http://www.nordugrid.org/schemas/bartender");
00336     PayloadSOAP request(ns);
00337     request.NewChild("bar:putFile").NewChild("bar:putFileRequestList").NewChild("bar:putFileRequestElement").NewChild("bar:requestID") = "0";
00338     request["bar:putFile"]["bar:putFileRequestList"]["bar:putFileRequestElement"].NewChild("bar:LN") = url.Path();
00339     // only supports http protocol:
00340     request["bar:putFile"]["bar:putFileRequestList"]["bar:putFileRequestElement"].NewChild("bar:protocol") = "http";
00341     request["bar:putFile"]["bar:putFileRequestList"]["bar:putFileRequestElement"].NewChild("bar:metadataList").NewChild("bar:metadata").NewChild("bar:section") = "states";
00342     request["bar:putFile"]["bar:putFileRequestList"]["bar:putFileRequestElement"]["bar:metadataList"]["bar:metadata"].NewChild("bar:property") = "checksum";
00343     request["bar:putFile"]["bar:putFileRequestList"]["bar:putFileRequestElement"]["bar:metadataList"]["bar:metadata"].NewChild("bar:value") = "";
00344     request["bar:putFile"]["bar:putFileRequestList"]["bar:putFileRequestElement"]["bar:metadataList"].NewChild("bar:metadata").NewChild("bar:section") = "states";
00345     request["bar:putFile"]["bar:putFileRequestList"]["bar:putFileRequestElement"]["bar:metadataList"]["bar:metadata"][1].NewChild("bar:property") = "checksumType";
00346     request["bar:putFile"]["bar:putFileRequestList"]["bar:putFileRequestElement"]["bar:metadataList"]["bar:metadata"][1].NewChild("bar:value") = "md5";
00347     request["bar:putFile"]["bar:putFileRequestList"]["bar:putFileRequestElement"]["bar:metadataList"].NewChild("bar:metadata").NewChild("bar:section") = "states";
00348     request["bar:putFile"]["bar:putFileRequestList"]["bar:putFileRequestElement"]["bar:metadataList"]["bar:metadata"][2].NewChild("bar:property") = "neededReplicas";
00349     request["bar:putFile"]["bar:putFileRequestList"]["bar:putFileRequestElement"]["bar:metadataList"]["bar:metadata"][2].NewChild("bar:value") = "3";
00350     request["bar:putFile"]["bar:putFileRequestList"]["bar:putFileRequestElement"]["bar:metadataList"].NewChild("bar:metadata").NewChild("bar:section") = "states";
00351     request["bar:putFile"]["bar:putFileRequestList"]["bar:putFileRequestElement"]["bar:metadataList"]["bar:metadata"][3].NewChild("bar:property") = "size";
00352     request["bar:putFile"]["bar:putFileRequestList"]["bar:putFileRequestElement"]["bar:metadataList"]["bar:metadata"][3].NewChild("bar:value") = size_str;
00353     request.GetXML(xml, true);
00354     logger.msg(INFO, "Request:\n%s", xml);
00355 
00356     PayloadSOAP *response = NULL;
00357 
00358     MCC_Status status = client.process(&request, &response);
00359 
00360     if (!status) {
00361       logger.msg(ERROR, (std::string)status);
00362       if (response)
00363         delete response;
00364       return DataStatus::WriteError;
00365     }
00366 
00367     if (!response) {
00368       logger.msg(ERROR, "No SOAP response");
00369       return DataStatus::WriteError;
00370     }
00371 
00372     response->Child().GetXML(xml, true);
00373     logger.msg(INFO, "Response:\n%s", xml);
00374 
00375     XMLNode nd = (*response).Child()["putFileResponseList"]["putFileResponseElement"];
00376     nd.GetXML(xml, true);
00377 
00378     logger.msg(INFO, "nd:\n%s", xml);
00379 
00380     if (nd["success"] != "done" || !nd["TURL"]) {
00381       delete response;
00382       return DataStatus::WriteError;
00383     }
00384 
00385     logger.msg(INFO, "Recieved transfer URL: %s", (std::string)nd["TURL"]);
00386 
00387     turl = (std::string) nd["TURL"];
00388     // redirect actual writing to http dmc
00389     if (transfer){
00390       delete transfer;
00391       transfer = NULL;
00392     }
00393     transfer = new DataHandle(turl, usercfg);
00394     delete response;
00395     if (!(*transfer)->StartWriting(buf, callback)) {
00396       if (transfer) {
00397         delete transfer;
00398         transfer = NULL;
00399       }
00400       writing = false;
00401       return DataStatus::WriteError;
00402     }
00403     return DataStatus::Success;
00404 
00405   }
00406 
00407   DataStatus DataPointARC::StopWriting() {
00408     if (!writing)
00409       return DataStatus::WriteStopError;
00410     writing = false;
00411     if (!transfer)
00412       return DataStatus::Success;
00413     // update checksum and size
00414     DataStatus ret = (*transfer)->StopWriting();
00415     buffer->wait_read();
00416     // md5res_u memory is handled by md5sum
00417     unsigned char *md5res_u;
00418     unsigned int length;
00419     md5sum->result(md5res_u, length);
00420     std::string md5str = "";
00421     for (int i = 0; i < length; i++) {
00422       char tmpChar[3];
00423       snprintf(tmpChar, sizeof(tmpChar), "%.2x", md5res_u[i]);
00424       md5str += tmpChar;
00425     }
00426     logger.msg(VERBOSE, "Calculated checksum: %s", md5str);
00427 
00428     MCCConfig cfg;
00429     usercfg.ApplyToConfig(cfg);
00430 
00431     // get TURL from bartender
00432     ClientSOAP client(cfg, bartender_url, usercfg.Timeout());
00433     std::string xml;
00434     std::stringstream out;
00435     out << this->GetSize();
00436     std::string size_str = out.str();
00437     NS ns("bar", "http://www.nordugrid.org/schemas/bartender");
00438     PayloadSOAP request(ns);
00439     request.NewChild("bar:modify").NewChild("bar:modifyRequestList").NewChild("bar:modifyRequestElement").NewChild("bar:changeID") = "0";
00440     request["bar:modify"]["bar:modifyRequestList"]["bar:modifyRequestElement"].NewChild("bar:LN") = url.Path();
00441     request["bar:modify"]["bar:modifyRequestList"]["bar:modifyRequestElement"].NewChild("bar:changeType") = "set";
00442     request["bar:modify"]["bar:modifyRequestList"]["bar:modifyRequestElement"].NewChild("bar:section") = "states";
00443     request["bar:modify"]["bar:modifyRequestList"]["bar:modifyRequestElement"].NewChild("bar:property") = "checksum";
00444     request["bar:modify"]["bar:modifyRequestList"]["bar:modifyRequestElement"].NewChild("bar:value") = md5str;
00445     request.GetXML(xml, true);
00446     logger.msg(INFO, "Request:\n%s", xml);
00447 
00448     PayloadSOAP *response = NULL;
00449 
00450     MCC_Status status = client.process(&request, &response);
00451 
00452     if (!status) {
00453       logger.msg(ERROR, (std::string)status);
00454       if (response)
00455         delete response;
00456       return DataStatus::WriteError;
00457     }
00458 
00459     if (!response) {
00460       logger.msg(ERROR, "No SOAP response");
00461       return DataStatus::WriteError;
00462     }
00463 
00464     response->Child().GetXML(xml, true);
00465     logger.msg(INFO, "Response:\n%s", xml);
00466 
00467     XMLNode nd = (*response).Child()["modifyResponseList"]["modifyResponseElement"];
00468     nd.GetXML(xml, true);
00469 
00470     logger.msg(INFO, "nd:\n%s", xml);
00471 
00472     if (nd["success"] != "set")
00473       return DataStatus::WriteError;
00474 
00475     delete md5sum;
00476     md5sum = NULL;
00477     delete transfer;
00478     transfer = NULL;
00479     return ret;
00480   }
00481 
00482   DataStatus DataPointARC::Check() {
00483     if (!url.Host().empty()){
00484       logger.msg(ERROR, "Hostname is not implemented for arc protocol");
00485       return DataStatus::CheckError;
00486     }
00487     return DataStatus::Success;
00488   }
00489 
00490   DataStatus DataPointARC::Remove() {
00491     std::string host = url.Host();
00492     if (!url.Host().empty()){
00493       logger.msg(ERROR, "Hostname is not implemented for arc protocol");
00494       return DataStatus::UnimplementedError;
00495     }
00496     MCCConfig cfg;
00497     usercfg.ApplyToConfig(cfg);
00498 
00499     ClientSOAP client(cfg, bartender_url, usercfg.Timeout());
00500     std::string xml;
00501 
00502     NS ns("bar", "http://www.nordugrid.org/schemas/bartender");
00503     PayloadSOAP request(ns);
00504     request.NewChild("bar:delFile").NewChild("bar:delFileRequestList").NewChild("bar:delFileRequestElement").NewChild("bar:requestID") = "0";
00505     request["bar:delFile"]["bar:delFileRequestList"]["bar:delFileRequestElement"].NewChild("bar:LN") = url.Path();
00506 
00507     request.GetXML(xml, true);
00508     logger.msg(INFO, "Request:\n%s", xml);
00509 
00510     PayloadSOAP *response = NULL;
00511 
00512     MCC_Status status = client.process(&request, &response);
00513 
00514     if (!status) {
00515       logger.msg(ERROR, (std::string)status);
00516       if (response)
00517         delete response;
00518       return DataStatus::DeleteError;
00519     }
00520 
00521     if (!response) {
00522       logger.msg(ERROR, "No SOAP response");
00523       return DataStatus::DeleteError;
00524     }
00525 
00526     response->Child().GetXML(xml, true);
00527     logger.msg(INFO, "Response:\n%s", xml);
00528 
00529     XMLNode nd = (*response).Child()["delFileResponseList"]["delFileResponseElement"];
00530 
00531     if (nd["success"] == "deleted")
00532       logger.msg(INFO, "Deleted %s", url.Path());
00533     delete response;
00534     return DataStatus::Success;
00535   }
00536 
00537 } // namespace Arc
00538 
00539 Arc::PluginDescriptor PLUGINS_TABLE_NAME[] = {
00540   { "arc", "HED:DMC", 0, &Arc::DataPointARC::Instance },
00541   { NULL, NULL, 0, NULL }
00542 };