Back to index

nordugrid-arc-nox  1.1.0~rc6
isis.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 
00005 #include <sstream>
00006 #include <map>
00007 #include <math.h>
00008 #include <algorithm>
00009 
00010 #include <arc/loader/Loader.h>
00011 #include <arc/data/FileCacheHash.h>
00012 #include <arc/message/PayloadSOAP.h>
00013 #include <arc/message/MCCLoader.h>
00014 #include <arc/client/ClientInterface.h>
00015 #include <arc/Thread.h>
00016 #include <arc/Utils.h>
00017 #include <arc/StringConv.h>
00018 
00019 #include "security.h"
00020 
00021 #include "isis.h"
00022 
00023 #ifdef WIN32
00024 #include <arc/win32.h>
00025 #endif
00026 
00027 namespace ISIS
00028 {
00029 
00030 std::vector<std::string>::iterator Neighbor_Container::find_element(const std::string value) {
00031     return find(content.begin(), content.end(), value);
00032 }
00033 
00034 bool Neighbor_Container::contains(const std::string val) {
00035     while (locked) {}
00036     locked = true;
00037     bool ret_val = (content.end() != find(content.begin(), content.end(), val));
00038     locked = false;
00039     return ret_val;
00040 }
00041 
00042 void Neighbor_Container::push(const std::string val) {
00043     while (locked) {}
00044     locked = true;
00045     content.push_back(val);
00046     locked = false;
00047 }
00048 
00049 void Neighbor_Container::remove(std::string value) {
00050     while (locked) {}
00051     locked = true;
00052     std::vector<std::string>::iterator it = find_element(value);
00053     if (it != content.end()) content.erase(it);
00054     locked = false;
00055 }
00056 
00057 int Neighbor_Container::count() {
00058     return (int) content.size();
00059 }
00060 
00061 static Arc::Logger thread_logger(Arc::Logger::rootLogger, "ISIS_Thread");
00062 
00063 // Current time calculation and convert to the UTC time format.
00064 std::string Current_Time( time_t parameter_time = time(NULL) ){
00065 
00066     time_t rawtime;
00067     if ( parameter_time == time(NULL) ){
00068         time ( &rawtime );    //current time
00069     } else {
00070         rawtime = parameter_time;
00071     }
00072     tm * ptm;
00073     ptm = gmtime ( &rawtime );
00074 
00075     std::string mon_prefix = (ptm->tm_mon+1 < 10)?"0":"";
00076     std::string day_prefix = (ptm->tm_mday < 10)?"0":"";
00077     std::string hour_prefix = (ptm->tm_hour < 10)?"0":"";
00078     std::string min_prefix = (ptm->tm_min < 10)?"0":"";
00079     std::string sec_prefix = (ptm->tm_sec < 10)?"0":"";
00080     std::stringstream out;
00081     if ( parameter_time == time(NULL) ){
00082         out << ptm->tm_year+1900<<"-"<<mon_prefix<<ptm->tm_mon+1<<"-"<<day_prefix<<ptm->tm_mday<<"T"<<hour_prefix<<ptm->tm_hour<<":"<<min_prefix<<ptm->tm_min<<":"<<sec_prefix<<ptm->tm_sec<<"+0000";
00083     } else {
00084         out << ptm->tm_year+1900<<mon_prefix<<ptm->tm_mon+1<<day_prefix<<ptm->tm_mday<<"."<<hour_prefix<<ptm->tm_hour<<min_prefix<<ptm->tm_min<<sec_prefix<<ptm->tm_sec;
00085     }
00086     return out.str();
00087 }
00088 
00089 class Service_data {
00090     public:
00091         std::string serviceID;
00092         Arc::ISIS_description service;
00093         std::string peerID;
00094 };
00095 
00096 class Thread_data {
00097     public:
00098         std::vector<Arc::ISIS_description> isis_list;
00099         Arc::XMLNode node;
00100         Neighbor_Container* not_av_neighbors;
00101 };
00102 
00103 static void message_send_thread(void *arg) {
00104     Arc::AutoPointer<ISIS::Thread_data> data((ISIS::Thread_data*)arg);
00105     if(!data) {
00106         if (arg) delete (ISIS::Thread_data*)arg;
00107         return;
00108     }
00109     if ( data->isis_list.empty() ) {
00110        thread_logger.msg(Arc::ERROR, "Empty URL list add to the thread.");
00111        return;
00112     }
00113     if ( !bool(((ISIS::Thread_data *)data)->node) ) {
00114        thread_logger.msg(Arc::ERROR, "Empty message add to the thread.");
00115        return;
00116     }
00117     Neighbor_Container* not_availables_neighbors  = data->not_av_neighbors;
00118 
00119     for (unsigned int i=0; i<data->isis_list.size(); i++ ){
00120         std::string url = data->isis_list[i].url;
00121         //Send SOAP message to the neighbor.
00122         Arc::PayloadSOAP *response = NULL;
00123         Arc::MCCConfig mcc_cfg;
00124         mcc_cfg.AddPrivateKey(((ISIS::Thread_data *)data)->isis_list[i].key);
00125         mcc_cfg.AddCertificate(((ISIS::Thread_data *)data)->isis_list[i].cert);
00126         mcc_cfg.AddProxy(((ISIS::Thread_data *)data)->isis_list[i].proxy);
00127         mcc_cfg.AddCADir(((ISIS::Thread_data *)data)->isis_list[i].cadir);
00128         mcc_cfg.AddCAFile(((ISIS::Thread_data *)data)->isis_list[i].cafile);
00129 
00130         Arc::ClientSOAP client_entry(mcc_cfg, url, 60);
00131 
00132         // Create and send "Register/RemoveRegistrations" request
00133         Arc::NS message_ns;
00134         //message_ns[""] = "http://www.nordugrid.org/schemas/isis/2007/06";
00135         message_ns["wsa"] = "http://www.w3.org/2005/08/addressing";
00136         message_ns["glue2"] = GLUE2_D42_NAMESPACE;
00137         message_ns["isis"] = ISIS_NAMESPACE;
00138         Arc::PayloadSOAP req(message_ns);
00139 
00140         req.NewChild(((ISIS::Thread_data *)data)->node);
00141         Arc::MCC_Status status;
00142         thread_logger.msg(Arc::VERBOSE, "Sending \"Register/RemoveRegistrations\" message to %s and waiting for the response.", url );
00143         status= client_entry.process(&req,&response);
00144 
00145         if ( (!status.isOk()) || (!response) || (response->IsFault()) ) {
00146            if ( !not_availables_neighbors->contains(url) && i == 0)
00147               not_availables_neighbors->push(url);
00148            thread_logger.msg(Arc::ERROR, "Status (%s): Failed", url);
00149         } else {
00150            // Remove url just in case - implemented in the Neighbor_Container class
00151            not_availables_neighbors->remove(url);
00152            thread_logger.msg(Arc::VERBOSE, "Status (%s): OK",url );
00153            if(response) delete response;
00154            break;
00155         };
00156         if(response) delete response;
00157     }
00158     return;
00159 }
00160 
00161 void SendToNeighbors(Arc::XMLNode& node, std::vector<Arc::ISIS_description> neighbors_,
00162                      Arc::Logger& logger_, Arc::ISIS_description isis_desc, Neighbor_Container* not_availables_neighbors,
00163                      std::string endpoint, std::multimap<std::string,Arc::ISIS_description>& hash_table) {
00164     if ( !bool(node) ) {
00165        logger_.msg(Arc::WARNING, "Empty message won't be send to the neighbors.");
00166        return;
00167     }
00168 
00169     for (std::vector<Arc::ISIS_description>::iterator it = neighbors_.begin(); it < neighbors_.end(); it++) {
00170         if ( isis_desc.url != (*it).url ) {
00171            //thread creation
00172            ISIS::Thread_data* data;
00173            // This data will be freed in the message_send_thread function after successful or
00174            // unsuccessful termination.
00175            data = new ISIS::Thread_data;
00176            std::string url = (*it).url;
00177            std::string next_url = endpoint;
00178            if ( it+1 < neighbors_.end() ) {
00179                next_url = (*(it+1)).url;
00180            }
00181 
00182            // find neighbor's place in the hash table
00183            std::multimap<std::string,Arc::ISIS_description>::const_iterator it_hash;
00184            for (it_hash = hash_table.begin(); it_hash!=hash_table.end(); it_hash++) {
00185                if ( (it_hash->second).url == url )
00186                    break;
00187            }
00188            // add isis into the list until the next neighbor
00189            while ( (it_hash->second).url != next_url ){
00190                if ( 0 < data->isis_list.size() && (it_hash->second).url == url)
00191                    break;
00192                Arc::ISIS_description isis(it_hash->second);
00193                isis.key = isis_desc.key;
00194                isis.cert = isis_desc.cert;
00195                isis.proxy = isis_desc.proxy;
00196                isis.cadir = isis_desc.cadir;
00197                isis.cafile = isis_desc.cafile;
00198                data->isis_list.push_back(isis);
00199                it_hash++;
00200                if ( it_hash == hash_table.end() )
00201                    it_hash = hash_table.begin();
00202            }
00203            node.New(data->node);
00204            data->not_av_neighbors = not_availables_neighbors;
00205            Arc::CreateThreadFunction(&message_send_thread, data);
00206         }
00207     }
00208 
00209     return;
00210 }
00211 
00212 class Soft_State {
00213     public:
00214         std::string function;
00215         int sleep;
00216         std::string query;
00217         Arc::XmlDatabase* database;
00218         ISIS::ISIService* isis;
00219         bool* kill_thread;
00220         int* threads_count;
00221         bool* available_provider_;
00222         bool* neighbors_update_needed_;
00223         std::vector<Arc::ISIS_description>* providers;
00224 };
00225 
00226 
00227 static void soft_state_thread(void *data) {
00228     Arc::AutoPointer<Soft_State> self((Soft_State *)data);
00229     if(!self) {
00230         if (data) delete (Soft_State *)data;
00231         return;
00232     }
00233     std::string method = self->function;
00234     unsigned int sleep_time = self->sleep; //seconds
00235     std::string query_string = self->query;
00236     Arc::XmlDatabase* db_ = self->database;
00237     bool* available_providers = self->available_provider_;
00238     std::vector<Arc::ISIS_description>* providers_ = self->providers;
00239 
00240     (*(self->threads_count))++;
00241 
00242     // "sleep_period" is the time, when the thread wakes up and checks the "KillTread" variable's value and then sleep away.
00243     unsigned int sleep_period = 60;
00244     while (true){
00245         thread_logger.msg(Arc::VERBOSE, "%s: %d seconds to the next database cleaning.", method, sleep_time);
00246 
00247         // "sleep_time" is comminuted to some little period
00248         unsigned int tmp_sleep_time = sleep_time;
00249         while ( tmp_sleep_time > 0 ) {
00250             // Whether ISIS's destructor called or not
00251             if( *(self->kill_thread) ) {
00252                (*(self->threads_count))--;
00253                thread_logger.msg(Arc::VERBOSE, "%s: Soft-State thread is finished.", method);
00254                return;
00255             }
00256 
00257             if( tmp_sleep_time > sleep_period ) {
00258                tmp_sleep_time = tmp_sleep_time - sleep_period;
00259                sleep(sleep_period);
00260             }
00261             else {
00262                sleep(tmp_sleep_time);
00263                tmp_sleep_time = 0;
00264             }
00265         }
00266 
00267         time_t rawtime;
00268         time ( &rawtime );    //current time
00269 
00270         if ( method == "ETValid") {
00271             // Current this is the Query
00272             //"//RegEntry/MetaSrcAdv[count(Expiration)=1 and number(translate(GenTime,'TZ:-','.')) < number('20090420.082903')]/ServiceID"
00273 
00274             // This Query is better, but it is not working now
00275             //"//RegEntry/MetaSrcAdv[count(Expiration)=1 and ( (years-from-duration(Expiration)*1000) +(months-from-duration(Expiration)*10) + (days-from-duration(Expiration)) + (hours-from-duration(Expiration)*0.01) + (minutes-from-duration(Expiration)*0.0001) + (seconds-from-duration(Expiration)*0.000001) + number(translate(GenTime,'TZ:-','.'))) < number('20090420.132903')]/ServiceID"
00276             std::string valid_query("//RegEntry/MetaSrcAdv[count(Expiration)=1 and number(translate(GenTime,'TZ:-+','.')) < number(translate('");
00277             valid_query += Current_Time(rawtime);
00278             valid_query += "','TZ:-+','.'))]/ServiceID";
00279 
00280             query_string = valid_query;
00281         }
00282         if ( method == "ETRemove") {
00283             std::string remove_query("/RegEntry/MetaSrcAdv[count(Expiration)=0 and number(translate(GenTime,'TZ:-+','.')) < number(translate('");
00284             remove_query += Current_Time(rawtime);
00285             remove_query += "','TZ:-+','.'))]/ServiceID";
00286             query_string = remove_query;
00287         }
00288 
00289         // Database cleaning
00290         std::vector<std::string> service_ids;
00291 
00292         // Query from the database
00293         std::map<std::string, Arc::XMLNodeList> result;
00294         db_->queryAll(query_string, result);
00295         std::map<std::string, Arc::XMLNodeList>::iterator it;
00296         for (it = result.begin(); it != result.end(); it++) {
00297             if (it->second.size() == 0 || it->first == "" ) {
00298                 continue;
00299             }
00300 
00301             // If add better XPath for ETValid, then this block can be remove
00302             if ( method == "ETValid" ){
00303                Arc::XMLNode data;
00304                //The next function calling is db_->get(ServiceID, RegistrationEntry);
00305                db_->get(it->first, data);
00306                Arc::Time gentime( (std::string)data["MetaSrcAdv"]["GenTime"]);
00307                Arc::Period expiration((std::string)data["MetaSrcAdv"]["Expiration"]);
00308 
00309                Arc::Time current_time(Current_Time());
00310 
00311                 if ( (gentime.GetTime() + 2* expiration.GetPeriod()) > current_time.GetTime() ) {
00312                     // Now the information is not expired
00313                     continue;
00314                 }
00315 
00316                 std::string type = (std::string)data["SrcAdv"]["Type"];
00317                 if ( type == "org.nordugrid.infosys.isis") {
00318                     *(self->neighbors_update_needed_) = true;
00319                     std::string isis_url = (std::string)data["SrcAdv"]["EPR"]["Address"];
00320                     // the remove service is my provider or not
00321                     for (unsigned int j=0; j < providers_->size(); j++ ) {
00322                         if ( (*providers_)[j].url == isis_url ) {
00323                             *available_providers = false;
00324                             break;
00325                         }
00326                     }
00327                 }
00328 
00329             }
00330             // end of the block
00331 
00332             service_ids.push_back(it->first);
00333         }
00334 
00335         // Remove all old datas
00336         std::vector<std::string>::iterator id_it;
00337         for (id_it = service_ids.begin(); id_it != service_ids.end(); id_it++) {
00338             db_->del(*id_it);
00339         }
00340     }
00341 }
00342 
00343     ISIService::ISIService(Arc::Config *cfg):RegisteredService(cfg),logger_(Arc::Logger::rootLogger, "ISIS"),valid("PT1D"),remove("PT1D"),db_(NULL),neighbors_update_needed(false),available_provider(false),neighbors_count(0),neighbors_lock(false),connection_lock(false) {
00344 
00345         logger_.msg(Arc::VERBOSE, "Parsing configuration parameters");
00346 
00347         // Endpoint url from the configuration
00348         endpoint_=(std::string)((*cfg)["endpoint"]);
00349         logger_.msg(Arc::VERBOSE, "Endpoint: %s", endpoint_);
00350         if ( endpoint_.empty()){
00351            logger_.msg(Arc::ERROR, "Empty endpoint element in the configuration!");
00352            return;
00353         }
00354          // Key from the configuration
00355          my_key=(std::string)((*cfg)["KeyPath"]);
00356          if (!my_key.empty()) logger_.msg(Arc::VERBOSE, "KeyPath: %s", my_key);
00357 
00358          // Cert from the configuration
00359          my_cert=(std::string)((*cfg)["CertificatePath"]);
00360          if (!my_cert.empty()) logger_.msg(Arc::VERBOSE, "CertificatePath: %s", my_cert);
00361 
00362          // Proxy from the configuration
00363          my_proxy=(std::string)((*cfg)["ProxyPath"]);
00364          if (!my_proxy.empty()) logger_.msg(Arc::VERBOSE, "ProxyPath: %s", my_proxy);
00365 
00366          // CaDir from the configuration
00367          my_cadir=(std::string)((*cfg)["CACertificatesDir"]);
00368          if (!my_cadir.empty()) logger_.msg(Arc::VERBOSE, "CACertificatesDir: %s", my_cadir);
00369 
00370          // CA Certificate Path from the configuration
00371          my_cafile=(std::string)((*cfg)["CACertificatePath"]);
00372          if (!my_cafile.empty()) logger_.msg(Arc::VERBOSE, "CACertficatePath: %s", my_cafile);
00373 
00374          // Checking for credentials
00375          if (my_key.empty() && my_proxy.empty()){
00376              logger_.msg(Arc::WARNING, "Missing or empty KeyPath element in the configuration!");
00377          }
00378          if (my_cert.empty() && my_proxy.empty()){
00379              logger_.msg(Arc::WARNING, "Misisng or empty CertificatePath element in the configuration!");
00380          }
00381          if (my_proxy.empty() && (my_cert.empty() || my_key.empty()) ){
00382              logger_.msg(Arc::WARNING, "Missing or empty ProxyPath element in the configuration!");
00383          }
00384          if (my_cadir.empty() && my_cafile.empty()){
00385              logger_.msg(Arc::WARNING, "Missing or empty CACertificatesDir element in the configuration!");
00386              logger_.msg(Arc::WARNING, "Missing or empty CACertificatePath element in the configuration!");
00387          }
00388 
00389         // Assigning service description - Glue2 document should go here.
00390         infodoc_.Assign(Arc::XMLNode(
00391         "<?xml version=\"1.0\"?><Domains xmlns=\"http://schemas.ogf.org/glue/2008/05/spec_2.0_d41_r01\"><AdminDomain Distributed=\"\"><Services><Service Name=\"\" ID=\"\" Validity=\"\" OtherInfo=\"\"><Associations /><Endpoint><HealthState>ok</HealthState><ServingState>production</ServingState></Endpoint><Capability>information.provenance</Capability></Service></Services></AdminDomain></Domains>"
00392         ),true);
00393 
00394 
00395         if ((bool)(*cfg)["retry"]) {
00396             if (!((std::string)(*cfg)["retry"]).empty()) {
00397                 if(EOF == sscanf(((std::string)(*cfg)["retry"]).c_str(), "%d", &retry) || retry < 1)
00398                 {
00399                     logger_.msg(Arc::ERROR, "Configuration error. Retry: \"%s\" is not a valid value. Default value will be used.",(std::string)(*cfg)["retry"]);
00400                     retry = 5;
00401                 }
00402             } else retry = 5;
00403         } else retry = 5;
00404 
00405         logger_.msg(Arc::VERBOSE, "Retry: %d", retry);
00406 
00407         if ((bool)(*cfg)["sparsity"]) {
00408             if (!((std::string)(*cfg)["sparsity"]).empty()) {
00409                 if(EOF == sscanf(((std::string)(*cfg)["sparsity"]).c_str(), "%d", &sparsity) || sparsity < 2)
00410                 {
00411                     logger_.msg(Arc::ERROR, "Configuration error. Sparsity: \"%s\" is not a valid value. Default value will be used.",(std::string)(*cfg)["sparsity"]);
00412                     sparsity = 2;
00413                 }
00414             } else sparsity = 2;
00415         } else sparsity = 2;
00416 
00417         logger_.msg(Arc::VERBOSE, "Sparsity: %d", sparsity);
00418 
00419         ThreadsCount = 0;
00420         KillThread = false;
00421 
00422         // Set up ETValid if there is any in the configuration
00423         if ((bool)(*cfg)["ETValid"]) {
00424             if (!((std::string)(*cfg)["ETValid"]).empty()) {
00425                 Arc::Period validp((std::string)(*cfg)["ETValid"]);
00426                 if(validp.GetPeriod() <= 0) {
00427                     logger_.msg(Arc::ERROR, "Configuration error. ETValid: \"%s\" is not a valid value. Default value will be used.",(std::string)(*cfg)["ETValid"]);
00428                 } else {
00429                     valid.SetPeriod( validp.GetPeriod() );
00430                 }
00431             } else logger_.msg(Arc::ERROR, "Configuration error. ETValid is empty. Default value will be used.");
00432         } else logger_.msg(Arc::VERBOSE, "ETValid: Default value will be used.");
00433 
00434         logger_.msg(Arc::VERBOSE, "ETValid: %d seconds", valid.GetPeriod());
00435 
00436         // Set up ETRemove if there is any in the configuration
00437         if ((bool)(*cfg)["ETRemove"]) {
00438             if (!((std::string)(*cfg)["ETRemove"]).empty()) {
00439                 Arc::Period removep((std::string)(*cfg)["ETRemove"]);
00440                 if(removep.GetPeriod() <= 0) {
00441                     logger_.msg(Arc::ERROR, "Configuration error. ETRemove: \"%s\" is not a valid value. Default value will be used.",(std::string)(*cfg)["ETRemove"]);
00442                 } else {
00443                     remove.SetPeriod( removep.GetPeriod() );
00444                 }
00445             } else logger_.msg(Arc::ERROR, "Configuration error. ETRemove is empty. Default value will be used.");
00446         } else logger_.msg(Arc::VERBOSE, "ETRemove: Default value will be used.");
00447 
00448         logger_.msg(Arc::VERBOSE, "ETRemove: %d seconds", remove.GetPeriod());
00449 
00450         ns_["isis"] = "http://www.nordugrid.org/schemas/isis/2008/08";
00451 
00452         std::string db_path = (std::string)(*cfg)["DBPath"];
00453         if (db_path.empty()) {
00454             logger_.msg(Arc::ERROR, "Invalid database path definition");
00455             return;
00456         }
00457 
00458         // Create ServiceURL hash
00459         FileCacheHash md5;
00460         // calculate my hash from the endpoint URL
00461         my_hash = md5.getHash(endpoint_);
00462 
00463         // Init database
00464         db_ = new Arc::XmlDatabase(db_path, "isis");
00465 
00466         // Exit if the database was unable to initialize
00467         if ( !(*db_) ) {
00468             exit(EXIT_FAILURE);
00469         }
00470 
00471         // -DB cleaning
00472         std::map<std::string, Arc::XMLNodeList> result;
00473         db_->queryAll("/*", result);
00474         std::map<std::string, Arc::XMLNodeList>::iterator it;
00475         for (it = result.begin(); it != result.end(); it++) {
00476              if (it->second.size() == 0) {
00477                 continue;
00478              }
00479              db_->del(it->first);
00480         }
00481 
00482         // Connection to the cloud in 6 steps.
00483         // 1. step: Put it's own EndpoingURL(s) from configuration in the set of neighbors for testing purpose.
00484         int i=0;
00485         while ((bool)(*cfg)["InfoProvider"][i]) {
00486             if ( endpoint_ != (std::string)(*cfg)["InfoProvider"][i]["URL"] ) {
00487                if ((std::string)(*cfg)["InfoProvider"][i]["URL"] == "") {
00488                   available_provider = true;
00489                   logger_.msg(Arc::WARNING, "The InfoProvider URL is empty.");
00490                } else {
00491                  Arc::ISIS_description isisdesc;
00492                  isisdesc.url = (std::string)(*cfg)["InfoProvider"][i]["URL"];
00493                  infoproviders_.push_back(isisdesc);
00494                }
00495             }
00496             i++;
00497         }
00498         // 2.-6. steps are in the BootStrap function.
00499         BootStrap(retry);
00500 
00501         // Create Soft-State database threads
00502         // Valid thread creation
00503         Soft_State* valid_data;
00504         // This data will be freed in the soft_state_thread function that periodically checks whether the ISIS is stoped
00505         // or not, and if yes, then destroy itself and free the relevant pointer.
00506         valid_data = new Soft_State();
00507         valid_data->function = "ETValid";
00508         valid_data->sleep = ((int)valid.GetPeriod())/2;
00509         valid_data->query = "//RegEntry/MetaSrcAdv[count(Expiration)=1]/ServiceID";
00510         valid_data->database = db_;
00511         valid_data->kill_thread = &KillThread;
00512         valid_data->threads_count = &ThreadsCount;
00513         valid_data->available_provider_ = &available_provider;
00514         valid_data->providers = &infoproviders_;
00515         valid_data->neighbors_update_needed_ = &neighbors_update_needed;
00516         Arc::CreateThreadFunction(&soft_state_thread, valid_data);
00517 
00518 
00519         // Remove thread creation
00520         Soft_State* remove_data;
00521         // This data will be freed in the soft_state_thread function that periodically checks whether the ISIS is stoped
00522         // or not, and if yes, then destroy itself and free the relevant pointer.
00523         remove_data = new Soft_State();
00524         remove_data->function = "ETRemove";
00525         remove_data->sleep = ((int)remove.GetPeriod())/2;
00526         remove_data->query = "//RegEntry/MetaSrcAdv[count(Expiration)=0]/ServiceID";
00527         remove_data->database = db_;
00528         remove_data->kill_thread = &KillThread;
00529         remove_data->threads_count = &ThreadsCount;
00530         Arc::CreateThreadFunction(&soft_state_thread, remove_data);
00531 
00532     }
00533 
00534     ISIService::~ISIService(void){
00535         // RemoveRegistration message send to neighbors with in my serviceID.
00536         std::map<std::string, Arc::XMLNodeList> result;
00537         std::string query_string = "/RegEntry/SrcAdv/EPR[ Address = '";
00538         query_string += endpoint_;
00539         query_string += "']";
00540         db_->queryAll(query_string, result);
00541         std::map<std::string, Arc::XMLNodeList>::iterator it;
00542         for (it = result.begin(); it != result.end(); it++) {
00543             if (it->second.size() == 0 || it->first == "" ) {
00544                 continue;
00545             }
00546             Arc::XMLNode data;
00547             //The next function calling is db_->get(ServiceID, RegistrationEntry);
00548             db_->get(it->first, data);
00549             std::string serviceid((std::string)data["MetaSrcAdv"]["ServiceID"]);
00550             if ( !serviceid.empty() ) {
00551                Arc::NS reg_ns;
00552                reg_ns["isis"] = ISIS_NAMESPACE;
00553 
00554                Arc::XMLNode remove_message(reg_ns,"isis:RemoveRegistrations");
00555                remove_message.NewChild("ServiceID") = serviceid;
00556                remove_message.NewChild("MessageGenerationTime") = Current_Time();
00557                Arc::ISIS_description isis;
00558                isis.url = endpoint_;
00559                isis.key = my_key;
00560                isis.cert = my_cert;
00561                isis.proxy = my_proxy;
00562                isis.cadir = my_cadir;
00563                isis.cafile = my_cafile;
00564                std::multimap<std::string,Arc::ISIS_description> local_hash_table;
00565                local_hash_table = hash_table;
00566                logger_.msg(Arc::VERBOSE, "RemoveRegistrations message sent to neighbors.");
00567                SendToNeighbors(remove_message, neighbors_, logger_, isis, &not_availables_neighbors_,endpoint_,local_hash_table);
00568             }
00569             break;
00570         }
00571 
00572         KillThread = true;
00573         //Waiting until the all RemoveRegistration message send to neighbors.
00574         sleep(10);
00575         for (unsigned int i=0; i< garbage_collector.size(); i++) {
00576             if(garbage_collector[i]) delete garbage_collector[i];
00577         }
00578         while (ThreadsCount > 0){
00579             logger_.msg(Arc::VERBOSE, "ISIS (%s) has %d more thread%s", endpoint_, ThreadsCount, ThreadsCount>1?"s.":".");
00580             sleep(10);
00581         }
00582 
00583         if (db_ != NULL) {
00584             delete db_;
00585         }
00586         logger_.msg(Arc::VERBOSE, "ISIS (%s) destroyed.", endpoint_);
00587     }
00588 
00589     bool ISIService::RegistrationCollector(Arc::XMLNode &doc) {
00590           // RegEntry element generation
00591           Arc::XMLNode empty(ns_, "RegEntry");
00592           empty.New(doc);
00593 
00594           doc.NewChild("SrcAdv");
00595           doc.NewChild("MetaSrcAdv");
00596 
00597           doc["SrcAdv"].NewChild("Type") = "org.nordugrid.infosys.isis";
00598           Arc::XMLNode peerID = doc["SrcAdv"].NewChild("SSPair");
00599           peerID.NewChild("Name") = "peerID";
00600           peerID.NewChild("Value") = my_hash;
00601 
00602           return true;
00603     }
00604 
00605     Arc::MCC_Status ISIService::Query(Arc::XMLNode &request, Arc::XMLNode &response) {
00606         std::string querystring_ = request["QueryString"];
00607         logger_.msg(Arc::VERBOSE, "Query received: %s", querystring_);
00608         if (querystring_.empty()) {
00609             Arc::SOAPEnvelope fault(ns_, true);
00610             if (fault) {
00611                 fault.Fault()->Code(Arc::SOAPFault::Sender);
00612                 fault.Fault()->Reason("Invalid query (It is empty.)");
00613                 response.Replace(fault.Child());
00614             }
00615             return Arc::MCC_Status();
00616         }
00617 
00618         std::map<std::string, Arc::XMLNodeList> result;
00619         db_->queryAll(querystring_, result);
00620         std::map<std::string, Arc::XMLNodeList>::iterator it;
00621         for (it = result.begin(); it != result.end(); it++) {
00622             if (it->second.size() == 0) {
00623                 continue;
00624             }
00625             Arc::XMLNode data_;
00626             //The next function calling is db_->get(ServiceID, RegistrationEntry);
00627             try {
00628                 db_->get(it->first, data_);
00629             } catch ( std::exception &e ) {
00630                 Arc::SOAPEnvelope fault(ns_, true);
00631                 if (fault) {
00632                     fault.Fault()->Code(Arc::SOAPFault::Sender);
00633                     fault.Fault()->Reason("Invalid query (not supported expression)");
00634                     fault.Fault()->Reason(e.what());
00635                     response.Replace(fault.Child());
00636                 }
00637                 return Arc::MCC_Status();
00638             }
00639             // add data to output
00640             response.NewChild(data_);
00641         }
00642         return Arc::MCC_Status(Arc::STATUS_OK);
00643     }
00644 
00645     Arc::MCC_Status ISIService::Register(Arc::XMLNode &request, Arc::XMLNode &response) {
00646         int i=0;
00647         while ((bool) request["RegEntry"][i]) {
00648             Arc::XMLNode regentry_ = request["RegEntry"][i++];
00649             logger_.msg(Arc::VERBOSE, "Register received: ID=%s; EPR=%s; MsgGenTime=%s",
00650                 (std::string) regentry_["MetaSrcAdv"]["ServiceID"], (std::string) regentry_["SrcAdv"]["EPR"]["Address"],
00651                 (std::string) request["Header"]["MessageGenerationTime"]);
00652 
00653             //search and check in the database
00654             Arc::XMLNode db_regentry;
00655             //The next function calling is db_->get(ServiceID, RegistrationEntry);
00656             db_->get((std::string) regentry_["MetaSrcAdv"]["ServiceID"], db_regentry);
00657 
00658             Arc::Time new_gentime((std::string) regentry_["MetaSrcAdv"]["GenTime"]);
00659             if ( !bool(db_regentry) ||
00660                 ( bool(db_regentry) && Arc::Time((std::string)db_regentry["MetaSrcAdv"]["GenTime"]) < new_gentime ) ) {
00661                Arc::XMLNode regentry_xml;
00662                regentry_.New(regentry_xml);
00663                db_->put((std::string) regentry_["MetaSrcAdv"]["ServiceID"], regentry_xml);
00664             }
00665             else {
00666                regentry_.Destroy();
00667             }
00668         }
00669         //Send to neighbors the Registration(s).
00670         Arc::ISIS_description isis;
00671         isis.url = endpoint_;
00672         isis.key = my_key;
00673         isis.cert = my_cert;
00674         isis.proxy = my_proxy;
00675         isis.cadir = my_cadir;
00676         isis.cafile = my_cafile;
00677         if ( bool(request["RegEntry"]) ) {
00678             std::multimap<std::string,Arc::ISIS_description> local_hash_table;
00679             local_hash_table = hash_table;
00680             SendToNeighbors(request, neighbors_, logger_, isis,&not_availables_neighbors_,endpoint_,local_hash_table);
00681             for (int i=0; bool(request["RegEntry"][i]); i++) {
00682                 Arc::XMLNode regentry = request["RegEntry"][i];
00683                 if ( (std::string)regentry["SrcAdv"]["Type"] == "org.nordugrid.infosys.isis" && 
00684                      hash_table.find(PeerID(regentry)) == hash_table.end()) {
00685                     // Search the hash value in the request message
00686                     Neighbors_Update();
00687                 }
00688            }
00689         }
00690 
00691         return Arc::MCC_Status(Arc::STATUS_OK);
00692     }
00693 
00694     Arc::MCC_Status ISIService::RemoveRegistrations(Arc::XMLNode &request, Arc::XMLNode &response) {
00695         int i=0;
00696         while ((bool) request["ServiceID"][i]) {
00697             std::string service_id = (std::string) request["ServiceID"][i];
00698             logger_.msg(Arc::VERBOSE, "RemoveRegistrations received: ID=%s", service_id);
00699 
00700             //search and check in the database
00701             Arc::XMLNode regentry;
00702             //The next function calling is db_->get(ServiceID, RegistrationEntry);
00703             db_->get(service_id, regentry);
00704             if ( bool(regentry) ) {
00705                std::string type = (std::string)regentry["SrcAdv"]["Type"];
00706                if ( type == "org.nordugrid.infosys.isis") {
00707                   std::string url = (std::string)regentry["SrcAdv"]["EPR"]["Address"];
00708                   // the remove service is my provider or not
00709                   for (unsigned int j=0; j < infoproviders_.size(); j++ ) {
00710                      if ( infoproviders_[j].url == url ) {
00711                         available_provider = false;
00712                         break;
00713                      }
00714                   }
00715                   neighbors_update_needed = true;
00716                }
00717                Arc::Time old_gentime((std::string)regentry["MetaSrcAdv"]["GenTime"]);
00718                Arc::Time new_gentime((std::string)request["MessageGenerationTime"]);
00719                if ( old_gentime >= new_gentime &&  !bool(regentry["MetaSrcAdv"]["Expiration"])) {
00720                   //Removed the ServiceID from the RemoveRegistrations message.
00721                   request["ServiceID"][i].Destroy();
00722                }
00723                else {
00724                   //update the database
00725                   Arc::XMLNode new_data(ns_, "RegEntry");
00726                   new_data.NewChild("MetaSrcAdv").NewChild("ServiceID") = service_id;
00727                   new_data["MetaSrcAdv"].NewChild("GenTime") = (std::string)request["MessageGenerationTime"];
00728                   db_->put(service_id, new_data);
00729                }
00730             }
00731             else {
00732                //add this element in the database
00733                Arc::XMLNode new_data(ns_, "RegEntry");
00734                new_data.NewChild("MetaSrcAdv").NewChild("ServiceID") = service_id;
00735                new_data["MetaSrcAdv"].NewChild("GenTime") = (std::string)request["MessageGenerationTime"];
00736                db_->put(service_id, new_data);
00737             }
00738             i++;
00739         }
00740 
00741         // Send RemoveRegistration message to the other(s) neighbors ISIS.
00742         Arc::ISIS_description isis;
00743         isis.url = endpoint_;
00744         isis.key = my_key;
00745         isis.cert = my_cert;
00746         isis.proxy = my_proxy;
00747         isis.cadir = my_cadir;
00748         isis.cafile = my_cafile;
00749         if ( bool(request["ServiceID"]) ){
00750            std::multimap<std::string,Arc::ISIS_description> local_hash_table;
00751            local_hash_table = hash_table;
00752            SendToNeighbors(request, neighbors_, logger_, isis, &not_availables_neighbors_,endpoint_,local_hash_table);
00753            for (int i=0; bool(request["ServiceID"][i]); i++) {
00754               // Search the hash value in my database
00755               Arc::XMLNode data;
00756               //The next function calling is db_->get(ServiceID, RegistrationEntry);
00757               db_->get((std::string)request["ServiceID"][i], data);
00758            }
00759         }
00760         return Arc::MCC_Status(Arc::STATUS_OK);
00761     }
00762 
00763     Arc::MCC_Status ISIService::GetISISList(Arc::XMLNode &request, Arc::XMLNode &response) {
00764         logger_.msg(Arc::VERBOSE, "GetISISList received");
00765         // If the neighbors_ vector is empty, then return with the own
00766         // address else with the list of neighbors.
00767         if (neighbors_.size() == 0 ) {
00768             response.NewChild("EPR") = endpoint_;
00769         }
00770         for (std::vector<Arc::ISIS_description>::iterator it = neighbors_.begin(); it < neighbors_.end(); it++) {
00771             response.NewChild("EPR") = (*it).url;
00772         }
00773         return Arc::MCC_Status(Arc::STATUS_OK);
00774     }
00775 
00776     Arc::MCC_Status ISIService::Connect(Arc::XMLNode &request, Arc::XMLNode &response) {
00777         logger_.msg(Arc::VERBOSE, "Connect received");
00778 
00779         // Database Dump
00780         response.NewChild("Database");
00781         std::map<std::string, Arc::XMLNodeList> result;
00782         db_->queryAll("/RegEntry", result);
00783         std::map<std::string, Arc::XMLNodeList>::iterator it;
00784         for (it = result.begin(); it != result.end(); it++) {
00785             if (it->second.size() == 0) {
00786                continue;
00787             }
00788             Arc::XMLNode data_;
00789             //The next function calling is db_->get(ServiceID, RegistrationEntry);
00790             db_->get(it->first, data_);
00791             // add data to output
00792             response["Database"].NewChild(data_);
00793         }
00794 
00795         response.NewChild("Config");
00796         response.NewChild("EndpointURL") = endpoint_;
00797 
00798         return Arc::MCC_Status(Arc::STATUS_OK);
00799     }
00800 
00801     bool ISIService::CheckAuth(const std::string& action, Arc::Message &inmsg, Arc::Message &outmsg) {
00802         inmsg.Auth()->set("ISIS",new ISISSecAttr(action));
00803         if(!ProcessSecHandlers(inmsg,"incoming")) {
00804             logger_.msg(Arc::ERROR, "Security check failed in ISIS for incoming message");
00805             make_soap_fault(outmsg, "Not allowed");
00806             return false;
00807         };
00808         return true;
00809     }
00810 
00811     bool ISIService::CheckAuth(const std::string& action, Arc::Message &inmsg,Arc::XMLNode &response) {
00812         inmsg.Auth()->set("ISIS",new ISISSecAttr(action));
00813         if(!ProcessSecHandlers(inmsg,"incoming")) {
00814             logger_.msg(Arc::ERROR, "Security check failed in ISIS for incoming message");
00815             make_soap_fault(response, "Not allowed");
00816             return false;
00817         };
00818         return true;
00819     }
00820 
00821     #define RegisterXMLPath "RegEntry/MetaSrcAdv/ServiceID"
00822     #define RemoveXMLPath   "ServiceID"
00823     static bool IsOwnID(Arc::XMLNode node,const std::string& path,const std::string& id) {
00824         if(id.empty()) return false;
00825         Arc::XMLNodeList ids = node.Path(path);
00826         if(ids.empty()) return false;
00827         if(ids.size() > 1) return false;
00828         std::string node_id = *(ids.begin());
00829         if(node_id != id) return false;
00830         return true;
00831     }
00832 
00833     Arc::MCC_Status ISIService::process(Arc::Message &inmsg, Arc::Message &outmsg) {
00834 
00835         // Return with fault if the service wasn't initialized and configured properly.
00836         if ( db_ == NULL ) return make_soap_fault(outmsg);
00837 
00838         if ( neighbors_count == 0 || (!available_provider && infoproviders_.size() > 0) ) {
00839             if ( !connection_lock ) {
00840                 connection_lock = true;
00841                 BootStrap(1);
00842                 connection_lock = false;
00843             }
00844             neighbors_update_needed = false;
00845         } else if ( neighbors_count > 0 && neighbors_.size() == not_availables_neighbors_.count() ){
00846             // Reposition itself in the peer-to-peer network
00847             // if disconnected from every neighbors then reconnect to
00848             // the network
00849             FileCacheHash md5;
00850             my_hash = md5.getHash(my_hash);
00851             if ( !connection_lock ) {
00852                 connection_lock = true;
00853                 BootStrap(retry);
00854                 connection_lock = false;
00855             }
00856             neighbors_update_needed = false;
00857         } else if (neighbors_update_needed) {
00858             Neighbors_Update();
00859             neighbors_update_needed = false;
00860         }
00861 
00862         // Both input and output are supposed to be SOAP
00863         // Extracting payload
00864         Arc::PayloadSOAP* inpayload = NULL;
00865         try {
00866             inpayload = dynamic_cast<Arc::PayloadSOAP*>(inmsg.Payload());
00867         } catch(std::exception& e) { };
00868         if(!inpayload) {
00869             logger_.msg(Arc::ERROR, "Communication error: input is not SOAP");
00870             return make_soap_fault(outmsg);
00871         }
00872 
00873         Arc::PayloadSOAP* outpayload = new Arc::PayloadSOAP(ns_);
00874         Arc::PayloadSOAP& res = *outpayload;
00875         Arc::MCC_Status ret = Arc::MCC_Status(Arc::STATUS_OK);
00876         // TODO: needed agereement on what is service id
00877         std::string client_id = inmsg.Attributes()->get("TLS:IDENTITYDN");
00878 
00879         // If the requested operation was: Register
00880         if (MatchXMLName((*inpayload).Child(0), "Register")) {
00881             Arc::XMLNode r = res.NewChild("isis:RegisterResponse");
00882             Arc::XMLNode register_ = (*inpayload).Child(0);
00883             if(CheckAuth(IsOwnID(register_,RegisterXMLPath,client_id)?"service":"isis", inmsg, r)) {
00884                 ret = Register(register_, r);
00885             }
00886         }
00887         // If the requested operation was: Query
00888         else if (MatchXMLName((*inpayload).Child(0), "Query")) {
00889             Arc::XMLNode r = res.NewChild("isis:QueryResponse");
00890             if(CheckAuth("client", inmsg, r)) {
00891                 Arc::XMLNode query_ = (*inpayload).Child(0);
00892                 ret = Query(query_, r);
00893             }
00894         }
00895         // If the requested operation was: RemoveRegistrations
00896         else if (MatchXMLName((*inpayload).Child(0), "RemoveRegistrations")) {
00897             Arc::XMLNode r = res.NewChild("isis:RemoveRegistrationsResponse");
00898             Arc::XMLNode remove_ = (*inpayload).Child(0);
00899             if(CheckAuth(IsOwnID(remove_,RemoveXMLPath,client_id)?"service":"isis", inmsg, r)) {
00900                 ret = RemoveRegistrations(remove_, r);
00901             }
00902         }
00903         // If the requested operation was: GetISISList
00904         else if (MatchXMLName((*inpayload).Child(0), "GetISISList")) {
00905             Arc::XMLNode r = res.NewChild("isis:GetISISListResponse");
00906             Arc::XMLNode isislist_= (*inpayload).Child(0);
00907             if(CheckAuth("client", inmsg, r)) {
00908                 Arc::XMLNode isislist_= (*inpayload).Child(0);
00909                 ret = GetISISList(isislist_, r);
00910             }
00911         }
00912         // If the requested operation was: Connect
00913         else if (MatchXMLName((*inpayload).Child(0), "Connect")) {
00914             Arc::XMLNode r = res.NewChild("isis:ConnectResponse");
00915             Arc::XMLNode connect_= (*inpayload).Child(0);
00916             if(CheckAuth("isis", inmsg, r)) {
00917                 ret = Connect(connect_, r);
00918             }
00919         }
00920 
00921         else if(MatchXMLNamespace((*inpayload).Child(0),"http://docs.oasis-open.org/wsrf/rp-2")) {
00922             if(CheckAuth("client", inmsg, outmsg)) {
00923                 // Update infodoc_
00924                 Arc::XMLNode workingcopy;
00925                 infodoc_.Acquire().New(workingcopy);
00926                 workingcopy["AdminDomain"].Attribute("Distributed") = "True";
00927                 workingcopy["AdminDomain"]["Services"]["Service"].Attribute("Name") = "ISIS";
00928                 workingcopy["AdminDomain"]["Services"]["Service"].Attribute("ID") = endpoint_;
00929                 workingcopy["AdminDomain"]["Services"]["Service"].Attribute("CreationTime") = Current_Time();
00930                 workingcopy["AdminDomain"]["Services"]["Service"].Attribute("Validity") = "600";
00931                 std::stringstream sparsity_string;
00932                 sparsity_string << "isis_sparsity=" << sparsity;
00933                 workingcopy["AdminDomain"]["Services"]["Service"].Attribute("OtherInfo") = sparsity_string.str();
00934                 // TODO: Update infodoc_
00935                 infodoc_.Release();
00936                 infodoc_.Assign(workingcopy, true);
00937                 // TODO: do not copy out_ to outpayload.
00938                 Arc::SOAPEnvelope* out_ = infodoc_.Process(*inpayload);
00939                 if(out_) {
00940                     *outpayload=*out_;
00941                     delete out_;
00942                 } else {
00943                     if (outpayload) delete outpayload;
00944                     return make_soap_fault(outmsg);
00945                 }
00946             }
00947         }
00948 
00949         outmsg.Payload(outpayload);
00950         return ret;
00951     }
00952 
00953     Arc::MCC_Status ISIService::make_soap_fault(Arc::Message &outmsg,const std::string& reason) {
00954         Arc::PayloadSOAP* outpayload = new Arc::PayloadSOAP(ns_,true);
00955         Arc::SOAPFault* fault = outpayload?outpayload->Fault():NULL;
00956         if(fault) {
00957             fault->Code(Arc::SOAPFault::Receiver);
00958             if(reason.empty()) {
00959                 fault->Reason("Failed processing request");
00960             } else {
00961                 fault->Reason(reason);
00962             }
00963         }
00964 
00965         outmsg.Payload(outpayload);
00966         return Arc::MCC_Status(Arc::STATUS_OK);
00967     }
00968 
00969     void ISIService::make_soap_fault(Arc::XMLNode &response,const std::string& reason) {
00970         Arc::SOAPEnvelope fault(ns_,true);
00971         if(fault) {
00972             fault.Fault()->Code(Arc::SOAPFault::Receiver);
00973             if(reason.empty()) {
00974                 fault.Fault()->Reason("Failed processing request");
00975             } else {
00976                 fault.Fault()->Reason(reason);
00977             }
00978             response.Replace(fault.Child());
00979         }
00980     }
00981 
00982     static Arc::Plugin *get_service(Arc::PluginArgument* arg) {
00983         Arc::ServicePluginArgument* srvarg = arg?dynamic_cast<Arc::ServicePluginArgument*>(arg):NULL;
00984         if(!srvarg) return NULL;
00985         return new ISIService((Arc::Config*)(*srvarg));
00986     }
00987 
00988     void ISIService::Neighbors_Update() {
00989         // wait until the neighbors list in used
00990         while ( neighbors_lock ) {
00991            //sleep(10);
00992         }
00993 
00994         // neighbors lock start
00995         neighbors_lock = true;
00996 
00997         // -hash_table recalculate
00998         hash_table.clear();
00999         std::map<std::string, Arc::XMLNodeList> result;
01000         db_->queryAll("/RegEntry/SrcAdv[ Type = 'org.nordugrid.infosys.isis']", result);
01001         std::map<std::string, Arc::XMLNodeList>::iterator query_it;
01002         for (query_it = result.begin(); query_it != result.end(); query_it++) {
01003              if (query_it->second.size() == 0) {
01004                 continue;
01005              }
01006              Arc::XMLNode data_;
01007              //The next function calling is db_->get(ServiceID, RegistrationEntry);
01008              db_->get(query_it->first, data_);
01009              Arc::XMLNode regentry = data_;
01010              Arc::ISIS_description service;
01011              service.url = (std::string)data_["SrcAdv"]["EPR"]["Address"];
01012              if ( service.url.empty() )
01013                 service.url = query_it->first;
01014              hash_table.insert( std::pair<std::string,Arc::ISIS_description>( PeerID(regentry), service) );
01015         }
01016 
01017         // neighbors count update
01018         // log(2)x = (log(10)x)/(log(10)2)
01019         int new_neighbors_count = 0;
01020         if ( hash_table.size() > 0){
01021             new_neighbors_count = (int)ceil(log10(hash_table.size())/log10(sparsity));
01022         }
01023         logger_.msg(Arc::VERBOSE, "Neighbors count recalculate from %d to %d (at ISIS %s)", neighbors_count, new_neighbors_count, endpoint_);
01024 
01025         // neighbors vector filling
01026         std::multimap<std::string,Arc::ISIS_description>::const_iterator it = hash_table.upper_bound(my_hash);
01027         Neighbors_Calculate(it, new_neighbors_count);
01028         neighbors_count = new_neighbors_count;
01029 
01030         // neighbors lock end
01031         neighbors_lock = false;
01032         return;
01033     }
01034 
01035     void ISIService::Neighbors_Calculate(std::multimap<std::string, Arc::ISIS_description>::const_iterator it, int count) {
01036         int sum_step = 1;
01037         neighbors_.clear();
01038         for (int i=0; i<count; i++) {
01039             if (it == hash_table.end())
01040                it = hash_table.begin();
01041             neighbors_.push_back(it->second);
01042             //calculate the next neighbors
01043             for (int step=0; step<sum_step; step++){
01044                it++;
01045                if (it == hash_table.end())
01046                   it = hash_table.begin();
01047             }
01048             sum_step = sum_step*sparsity;
01049         }
01050         return;
01051     }
01052 
01053     std::string ISIService::PeerID( Arc::XMLNode& regentry){
01054         std::string peerid;
01055         for (int j=0; bool(regentry["SrcAdv"]["SSPair"][j]); j++ ){
01056             if ("peerID" == (std::string)regentry["SrcAdv"]["SSPair"][j]["Name"]){
01057                peerid = (std::string)regentry["SrcAdv"]["SSPair"][j]["Value"];
01058                break;
01059             } else {
01060                continue;
01061             }
01062         }
01063 
01064         if ( peerid.empty() ){
01065             FileCacheHash md5;
01066             // calculate hash from the endpoint URL or serviceID
01067             if ( bool(regentry["SrcAdv"]["EPR"]["Address"]) ){
01068                peerid = md5.getHash((std::string)regentry["SrcAdv"]["EPR"]["Address"]);
01069             } else {
01070                peerid = md5.getHash((std::string)regentry["MetaSrcAdv"]["ServiceID"]);
01071             }
01072         }
01073         return peerid;
01074     }
01075 
01076     std::string ISIService::Cert( Arc::XMLNode& regentry){
01077         std::string cert;
01078         for (int j=0; bool(regentry["SrcAdv"]["SSPair"][j]); j++ ){
01079             if ("Cert" == (std::string)regentry["SrcAdv"]["SSPair"][j]["Name"]){
01080                cert = (std::string)regentry["SrcAdv"]["SSPair"][j]["Value"];
01081                break;
01082             } else {
01083                continue;
01084             }
01085         }
01086         return cert;
01087     }
01088 
01089     std::string ISIService::Key( Arc::XMLNode& regentry){
01090         std::string key;
01091         for (int j=0; bool(regentry["SrcAdv"]["SSPair"][j]); j++ ){
01092             if ("Key" == (std::string)regentry["SrcAdv"]["SSPair"][j]["Name"]){
01093                key = (std::string)regentry["SrcAdv"]["SSPair"][j]["Value"];
01094                break;
01095             } else {
01096                continue;
01097             }
01098         }
01099         return key;
01100     }
01101 
01102     std::string ISIService::Proxy( Arc::XMLNode& regentry){
01103         std::string proxy;
01104         for (int j=0; bool(regentry["SrcAdv"]["SSPair"][j]); j++ ){
01105             if ("Proxy" == (std::string)regentry["SrcAdv"]["SSPair"][j]["Name"]){
01106                proxy = (std::string)regentry["SrcAdv"]["SSPair"][j]["Value"];
01107                break;
01108             } else {
01109                continue;
01110             }
01111         }
01112         return proxy;
01113     }
01114 
01115     std::string ISIService::CaDir( Arc::XMLNode& regentry){
01116         std::string cadir;
01117         for (int j=0; bool(regentry["SrcAdv"]["SSPair"][j]); j++ ){
01118             if ("CaDir" == (std::string)regentry["SrcAdv"]["SSPair"][j]["Name"]){
01119                cadir = (std::string)regentry["SrcAdv"]["SSPair"][j]["Value"];
01120                break;
01121             } else {
01122                continue;
01123             }
01124         }
01125         return cadir;
01126     }
01127 
01128     void ISIService::BootStrap( int retry_count){
01129         // 2. step: goto InfoProviderISIS (one of the list)
01130         if ( infoproviders_.size() != 0 ){
01131             std::srand(time(NULL));
01132             Arc::ISIS_description rndProvider = infoproviders_[std::rand() % infoproviders_.size()];
01133 
01134             std::map<std::string,int> retry_;
01135             for( unsigned int i=0; i< infoproviders_.size(); i++ ) {
01136                retry_[ infoproviders_[i].url ] = retry_count;
01137             }
01138 
01139             // 3. step: Send Query SOAP message to the providerISIS with Filter
01140             Arc::PayloadSOAP *response = NULL;
01141             Arc::MCCConfig mcc_cfg;
01142             mcc_cfg.AddPrivateKey(my_key);
01143             mcc_cfg.AddCertificate(my_cert);
01144             mcc_cfg.AddProxy(my_proxy);
01145             mcc_cfg.AddCADir(my_cadir);
01146             mcc_cfg.AddCAFile(my_cafile);
01147             // Create and send "Query" request
01148             Arc::NS message_ns;
01149             message_ns[""] = "http://www.nordugrid.org/schemas/isis/2007/06";
01150             message_ns["wsa"] = "http://www.w3.org/2005/08/addressing";
01151             message_ns["glue2"] = GLUE2_D42_NAMESPACE;
01152             message_ns["isis"] = ISIS_NAMESPACE;
01153             Arc::PayloadSOAP req(message_ns);
01154 
01155             req.NewChild("Query");
01156             req["Query"].NewChild("QueryString") = "/RegEntry/SrcAdv[ Type = 'org.nordugrid.infosys.isis']";
01157             Arc::MCC_Status status;
01158 
01159             std::vector<Arc::ISIS_description> temporary_provider;
01160             temporary_provider = infoproviders_;
01161             bool isavailable = false;
01162             while ( !isavailable && retry_.size() > 0 ) {
01163                 Arc::ClientSOAP client_entry(mcc_cfg, rndProvider.url, 60);
01164                 logger_.msg(Arc::VERBOSE, "Sending Query message to the InfoProvider (%s) and waiting for the response.", rndProvider.url );
01165                 status= client_entry.process(&req,&response);
01166 
01167                 if ( (!status.isOk()) || (!response) || (response->IsFault()) ) {
01168                    logger_.msg(Arc::INFO, "Query failed at %s, choosing new InfoProvider.", rndProvider.url);
01169                    retry_[rndProvider.url]--;
01170                    if ( retry_[rndProvider.url] < 1 ) {
01171                       retry_.erase(rndProvider.url);
01172                       for (unsigned int i=0; i<temporary_provider.size(); i++){
01173                           if (temporary_provider[i].url == rndProvider.url){
01174                              temporary_provider.erase(temporary_provider.begin()+i);
01175                              break;
01176                           }
01177                       }
01178                       logger_.msg(Arc::INFO, "Remove ISIS (%s) from the list of InfoProviders.", rndProvider.url);
01179                    }
01180                    // new provider search
01181                    if ( temporary_provider.size() > 0 )
01182                       rndProvider = temporary_provider[std::rand() % temporary_provider.size()];
01183                 } else {
01184                    logger_.msg(Arc::VERBOSE, "Status (%s): OK", rndProvider.url );
01185                    isavailable = true;
01186                    bootstrapISIS = rndProvider.url;
01187                 };
01188             }
01189             available_provider = isavailable;
01190 
01191             // 4. step: Hash table and neighbors filling
01192             std::vector<Service_data> find_servicedatas;
01193             for (unsigned int i=0; bool( (*response)["QueryResponse"]["RegEntry"][i]); i++ ) {
01194                 std::string serviceid = (std::string)(*response)["QueryResponse"]["RegEntry"][i]["MetaSrcAdv"]["ServiceID"];
01195                 if ( serviceid.empty() )
01196                     continue;
01197                 std::string serviceurl = (std::string)(*response)["QueryResponse"]["RegEntry"][i]["SrcAdv"]["EPR"]["Address"];
01198                 if ( serviceurl.empty() )
01199                     serviceurl = serviceid;
01200                 Service_data sdata;
01201                 sdata.serviceID = serviceid;
01202                 sdata.service.url = serviceurl;
01203                 Arc::XMLNode regentry = (*response)["QueryResponse"]["RegEntry"][i];
01204                 /*sdata.service.cert = Cert(regentry);
01205                 sdata.service.key = Key(regentry);
01206                 sdata.service.proxy = Proxy(regentry);
01207                 sdata.service.cadir = CaDir(regentry);*/
01208                 sdata.peerID = PeerID(regentry);
01209                 find_servicedatas.push_back( sdata );
01210             }
01211             if(response) delete response;
01212 
01213             if ( available_provider )
01214                 hash_table.clear();
01215             for (unsigned int i=0; i < find_servicedatas.size(); i++) {
01216                 // add the hash and the service info into the hash table
01217                 hash_table.insert( std::pair<std::string,Arc::ISIS_description>( find_servicedatas[i].peerID, find_servicedatas[i].service) );
01218             }
01219 
01220             neighbors_count = 0;
01221             if ( !isavailable) {
01222                if ( neighbors_.size() >0 ){
01223                   Neighbors_Update();
01224                }
01225                logger_.msg(Arc::VERBOSE, "No InfoProvider is available." );
01226             }
01227             else if ( hash_table.size() == 0 ) {
01228                if ( neighbors_.size() >0 ){
01229                   Neighbors_Update();
01230                }
01231                logger_.msg(Arc::VERBOSE, "The hash table is empty. New cloud has been created." );
01232             } else {
01233                // log(2)x = (log(10)x)/(log(10)2)
01234                // and the largest integral value that is not greater than x.
01235                neighbors_count = (int)ceil(log10(hash_table.size())/log10(sparsity));
01236                if (neighbors_count == 0)
01237                   neighbors_count = 1;
01238 
01239                // neighbors vector filling
01240                std::multimap<std::string,Arc::ISIS_description>::const_iterator it = hash_table.upper_bound(my_hash);
01241                Neighbors_Calculate(it, neighbors_count);
01242                logger_.msg(Arc::VERBOSE, "Neighbors count: %d", neighbors_.size() );
01243 
01244                // 5. step: Connect message send to one ISIS of the neighbors
01245                Arc::PayloadSOAP connect_req(message_ns);
01246                connect_req.NewChild("Connect").NewChild("URL") = endpoint_;
01247 
01248                bool isavailable_connect = false;
01249                bool no_more_isis = false;
01250                unsigned int current = 0;
01251                Arc::PayloadSOAP *response_c = NULL;
01252                while ( !isavailable_connect && !no_more_isis) {
01253                    int retry_connect = retry;
01254                    // Try to connect one ISIS of the neighbors list
01255                    while ( !isavailable_connect && retry_connect>0) {
01256                        if (neighbors_[current].url == endpoint_) {
01257                            retry_connect = 0;
01258                            continue;
01259                        }
01260                        Arc::ClientSOAP connectclient_entry(mcc_cfg, neighbors_[current].url, 60);
01261                        logger_.msg(Arc::VERBOSE, "Sending Connect request to the ISIS(%s) and waiting for the response.", neighbors_[current].url );
01262 
01263                        status= connectclient_entry.process(&connect_req,&response_c);
01264                        if ( (!status.isOk()) || (!response_c) || (response_c->IsFault()) ) {
01265                           logger_.msg(Arc::INFO, "Connect status (%s): Failed", neighbors_[current].url );
01266                           retry_connect--;
01267                        } else {
01268                           logger_.msg(Arc::VERBOSE, "Connect status (%s): OK", neighbors_[current].url );
01269                           isavailable_connect = true;
01270                        };
01271                    }
01272 
01273                    if ( current+1 == neighbors_.size() ) {
01274                       no_more_isis = true;
01275                       not_availables_neighbors_.push(neighbors_[current].url);
01276                       logger_.msg(Arc::VERBOSE, "No more available ISIS in the neighbors list." );
01277                    } else if (!isavailable_connect) {
01278                       if ( !not_availables_neighbors_.contains(neighbors_[current].url) )
01279                         not_availables_neighbors_.push(neighbors_[current].url);
01280                       current++;
01281                    }
01282                }
01283 
01284                if ( isavailable_connect ){
01285                   // 6. step: response data processing (DB sync, Config saving)
01286                   // Remove url just in case - implemented in the Neighbor_Container class
01287                   not_availables_neighbors_.remove(neighbors_[current].url);
01288 
01289                   // -DB syncronisation
01290                   // serviceIDs in my DB
01291                   std::vector<std::string> ids;
01292                   std::map<std::string, Arc::XMLNodeList> result;
01293                   db_->queryAll("/RegEntry", result);
01294                   std::map<std::string, Arc::XMLNodeList>::iterator it_db;
01295                   for (it_db = result.begin(); it_db != result.end(); it_db++) {
01296                       if (it_db->second.size() == 0) {
01297                           continue;
01298                       }
01299                       ids.push_back(it_db->first);
01300                   }
01301 
01302                   Arc::NS reg_ns;
01303                   reg_ns["isis"] = ISIS_NAMESPACE;
01304 
01305                   Arc::XMLNode sync_datas(reg_ns,"isis:Register");
01306                   Arc::XMLNode header = sync_datas.NewChild("isis:Header");
01307 
01308                   header.NewChild("MessageGenerationTime") = Current_Time();
01309 
01310                   for (unsigned int i=0; bool((*response_c)["ConnectResponse"]["Database"]["RegEntry"][i]); i++ ){
01311                       Arc::XMLNode regentry_xml;
01312                       (*response_c)["ConnectResponse"]["Database"]["RegEntry"][i].New(regentry_xml);
01313                       std::string id = regentry_xml["MetaSrcAdv"]["ServiceID"];
01314                       if ( find(ids.begin(), ids.end(), id) == ids.end() ){
01315                          db_->put( id, regentry_xml);
01316                       } else {
01317                          // ID is in the DataBase.
01318                          Arc::XMLNode data_;
01319                          //The next function calling is db_->get(ServiceID, RegistrationEntry);
01320                          db_->get(id, data_);
01321                          if ( Arc::Time((std::string)data_["MetaSrcAdv"]["GenTime"]) <=
01322                               Arc::Time((std::string)regentry_xml["MetaSrcAdv"]["GenTime"])){
01323                             db_->put( id, regentry_xml);
01324                          } else {
01325                             // add data to syncronisation data
01326                             sync_datas.NewChild(data_);
01327                          }
01328                           ids.erase(find(ids.begin(),ids.end(),id));
01329                       }
01330                   }
01331 
01332                   // with almost one probability the neighbor update will necessary after connection
01333                   neighbors_update_needed = true;
01334 
01335                   for (unsigned int i=0; i<ids.size(); i++){
01336                       Arc::XMLNode data_;
01337                       //The next function calling is db_->get(ServiceID, RegistrationEntry);
01338                       db_->get(ids[i], data_);
01339                       sync_datas.NewChild(data_);
01340                   }
01341                   if ( bool(sync_datas["RegEntry"]) ){
01342                      Arc::ISIS_description isis;
01343                      isis.url = endpoint_;
01344                      isis.key = my_key;
01345                      isis.cert = my_cert;
01346                      isis.proxy = my_proxy;
01347                      isis.cadir = my_cadir;
01348                      isis.cafile = my_cafile;
01349                      std::multimap<std::string,Arc::ISIS_description> local_hash_table;
01350                      local_hash_table = hash_table;
01351                      SendToNeighbors(sync_datas, neighbors_, logger_, isis, &not_availables_neighbors_,endpoint_,local_hash_table);
01352                   }
01353                   logger_.msg(Arc::VERBOSE, "Database mass updated." );
01354                }
01355                if (response_c) delete response_c;
01356             }
01357         } else {
01358             Neighbors_Update();
01359         }
01360         return;
01361     }
01362 } // namespace
01363 
01364 Arc::PluginDescriptor PLUGINS_TABLE_NAME[] = {
01365     { "isis", "HED:SERVICE", 0, &ISIS::get_service },
01366     { NULL, NULL, 0, NULL }
01367 };
01368