Back to index

nordugrid-arc-nox  1.1.0~rc6
DataBroker.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #include <algorithm>
00008 
00009 #include <arc/StringConv.h>
00010 #include <arc/URL.h>
00011 #include <arc/UserConfig.h>
00012 #include <arc/client/ClientInterface.h>
00013 #include <arc/client/ExecutionTarget.h>
00014 #include <arc/message/MCC.h>
00015 #include <arc/message/PayloadSOAP.h>
00016 
00017 #include "DataBroker.h"
00018 
00019 namespace Arc {
00020 
00021   std::map<std::string, long> CacheMappingTable;
00022 
00023   bool DataBroker::CacheCheck(void) {
00024 
00025     MCCConfig cfg;
00026     usercfg.ApplyToConfig(cfg);
00027 
00028     NS ns;
00029     PayloadSOAP request(ns);
00030     XMLNode req = request.NewChild("CacheCheck").NewChild("TheseFilesNeedToCheck");
00031 
00032     for (std::list<FileType>::const_iterator it = job->DataStaging.File.begin();
00033          it != job->DataStaging.File.end(); it++)
00034       if (!it->Source.empty())
00035         req.NewChild("FileURL") = it->Source.front().URI.fullstr();
00036 
00037     PayloadSOAP *response = NULL;
00038 
00039     for (std::list<ExecutionTarget*>::const_iterator target = PossibleTargets.begin();
00040          target != PossibleTargets.end(); target++) {
00041       ClientSOAP client(cfg, (*target)->url, usercfg.Timeout());
00042 
00043       long DataSize = 0;
00044       int j = 0;
00045 
00046       MCC_Status status = client.process(&request, &response);
00047 
00048       if (!status)
00049         CacheMappingTable[(*target)->url.fullstr()] = 0;
00050       if (response == NULL)
00051         CacheMappingTable[(*target)->url.fullstr()] = 0;
00052 
00053       XMLNode ExistCount = (*response)["CacheCheckResponse"]["CacheCheckResult"]["Result"];
00054 
00055       for (int i = 0; ExistCount[i]; i++) {
00056         if (((std::string)ExistCount[i]["ExistInTheCache"]) == "true")
00057           j++;
00058         DataSize += stringto<long>((std::string)ExistCount[i]["FileSize"]);
00059       }
00060 
00061       CacheMappingTable[(*target)->url.fullstr()] = DataSize;
00062 
00063       if (response != NULL) {
00064         delete response;
00065         response = NULL;
00066       }
00067     }
00068 
00069     return true;
00070   }
00071 
00072   bool DataCompare(const ExecutionTarget *T1, const ExecutionTarget *T2) {
00073     return CacheMappingTable[T1->url.fullstr()] > CacheMappingTable[T2->url.fullstr()];
00074   }
00075 
00076   DataBroker::DataBroker(const UserConfig& usercfg)
00077     : Broker(usercfg) {}
00078 
00079   DataBroker::~DataBroker() {}
00080 
00081   Plugin* DataBroker::Instance(PluginArgument *arg) {
00082     BrokerPluginArgument *brokerarg = dynamic_cast<BrokerPluginArgument*>(arg);
00083     if (!brokerarg)
00084       return NULL;
00085     return new DataBroker(*brokerarg);
00086   }
00087 
00088   void DataBroker::SortTargets() {
00089 
00090     // Remove targets which are not A-REX (>= ARC-1).
00091 
00092     std::list<ExecutionTarget*>::iterator iter = PossibleTargets.begin();
00093 
00094     while (iter != PossibleTargets.end()) {
00095       if ((*iter)->Implementation >= Software("ARC", "1")) {
00096         iter = PossibleTargets.erase(iter);
00097         continue;
00098       }
00099       iter++;
00100     }
00101 
00102     logger.msg(VERBOSE, "Matching against job description, following targets possible for DataBroker: %d", PossibleTargets.size());
00103 
00104     iter = PossibleTargets.begin();
00105 
00106     for (int i = 1; iter != PossibleTargets.end(); iter++, i++)
00107       logger.msg(VERBOSE, "%d. Cluster: %s; Queue: %s", i, (*iter)->DomainName, (*iter)->ComputingShareName);
00108 
00109     CacheCheck();
00110     PossibleTargets.sort(DataCompare);
00111 
00112     logger.msg(VERBOSE, "Best targets are: %d", PossibleTargets.size());
00113 
00114     iter = PossibleTargets.begin();
00115 
00116     for (int i = 1; iter != PossibleTargets.end(); iter++, i++)
00117       logger.msg(VERBOSE, "%d. Cluster: %s; Queue: %s", i, (*iter)->DomainName, (*iter)->ComputingShareName);
00118 
00119     TargetSortingDone = true;
00120   }
00121 } // namespace Arc