Back to index

salome-kernel  6.5.0
Param_Double_Port_provides_i.cxx
Go to the documentation of this file.
00001 // Copyright (C) 2007-2012  CEA/DEN, EDF R&D, OPEN CASCADE
00002 //
00003 // This library is free software; you can redistribute it and/or
00004 // modify it under the terms of the GNU Lesser General Public
00005 // License as published by the Free Software Foundation; either
00006 // version 2.1 of the License.
00007 //
00008 // This library is distributed in the hope that it will be useful,
00009 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00010 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00011 // Lesser General Public License for more details.
00012 //
00013 // You should have received a copy of the GNU Lesser General Public
00014 // License along with this library; if not, write to the Free Software
00015 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
00016 //
00017 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
00018 //
00019 
00020 //  File   : param_double_port_provides.cxx
00021 //  Author : André RIBES (EDF)
00022 //  Module : KERNEL
00023 //
00024 #include <iostream>
00025 #include <string>
00026 #include <sstream>
00027 
00028 #include "Param_Double_Port_provides_i.hxx"
00029 
00030 #include <paco_omni.h>
00031 #include <paco_dummy.h>
00032 #include <paco_mpi.h>
00033 #include <paco_direct_comScheduling.h>
00034 #include <GaBro.h>
00035 #include <BasicBC.h>
00036 
00037 Param_Double_Port_provides_i::Param_Double_Port_provides_i(CORBA::ORB_ptr orb, char * ior, int rank) :
00038   Ports::Param_Double_Port_serv(orb,ior,rank),
00039   Ports::Param_Double_Port_base_serv(orb,ior,rank),
00040   Ports::Data_Port_serv(orb,ior,rank),
00041   Ports::Data_Port_base_serv(orb,ior,rank),
00042   Ports::Port_serv(orb,ior,rank),
00043   Ports::Port_base_serv(orb,ior,rank),
00044   InterfaceParallel_impl(orb,ior,rank)
00045 {
00046   _seq_data = NULL;
00047 
00048   seq_data_termine = false;                 
00049   seq_data_mutex = new pthread_mutex_t();
00050   pthread_mutex_init(seq_data_mutex, NULL);
00051   seq_data_condition = new pthread_cond_t();
00052   pthread_cond_init(seq_data_condition, NULL);
00053   seq_data_termine_cp = true;               
00054   seq_data_mutex_cp = new pthread_mutex_t();
00055   pthread_mutex_init(seq_data_mutex_cp, NULL);
00056   seq_data_condition_cp = new pthread_cond_t();
00057   pthread_cond_init(seq_data_condition_cp, NULL);
00058 
00059   _seq_results = NULL;
00060 
00061   seq_results_termine = false;              
00062   seq_results_mutex = new pthread_mutex_t();
00063   pthread_mutex_init(seq_results_mutex, NULL);
00064   seq_results_condition = new pthread_cond_t();
00065   pthread_cond_init(seq_results_condition, NULL);
00066   seq_results_termine_cp = true;                    
00067   seq_results_mutex_cp = new pthread_mutex_t();
00068   pthread_mutex_init(seq_results_mutex_cp, NULL);
00069   seq_results_condition_cp = new pthread_cond_t();
00070   pthread_cond_init(seq_results_condition_cp, NULL);
00071 }
00072 
00073 Param_Double_Port_provides_i::~Param_Double_Port_provides_i() 
00074 {
00075   if (_seq_data)
00076     delete _seq_data;
00077 
00078   pthread_mutex_destroy(seq_data_mutex);
00079   delete seq_data_mutex;
00080   pthread_cond_destroy(seq_data_condition);
00081   delete seq_data_condition;
00082   pthread_mutex_destroy(seq_data_mutex_cp);
00083   delete seq_data_mutex_cp;
00084   pthread_cond_destroy(seq_data_condition_cp);
00085   delete seq_data_condition_cp;
00086 
00087   if (_seq_results)
00088     delete _seq_results;
00089 
00090   pthread_mutex_destroy(seq_results_mutex);
00091   delete seq_results_mutex;
00092   pthread_cond_destroy(seq_results_condition);
00093   delete seq_results_condition;
00094   pthread_mutex_destroy(seq_results_mutex_cp);
00095   delete seq_results_mutex_cp;
00096   pthread_cond_destroy(seq_results_condition_cp);
00097   delete seq_results_condition_cp;
00098 }
00099 
00100 Param_Double_Port_provides_i *
00101 Param_Double_Port_provides_i::init_port(Engines_ParallelDSC_i * par_compo, 
00102                                         std::string port_name,
00103                                         CORBA::ORB_ptr orb)
00104 {
00105   int rank = par_compo->getMyRank();
00106   int totalNode = par_compo->getTotalNode();
00107   paco_com * com = par_compo->getCom();
00108 
00109   MESSAGE("Configuration of Param_Double_Port_provides: rank = " << rank << " totalNode = " << totalNode);
00110 
00111   // DOIT ETRE DEJA FAIT AVANT !!!???
00112   paco_fabrique_manager* pfm = paco_getFabriqueManager();
00113   pfm->register_com("pdp_dummy", new paco_dummy_fabrique());
00114   pfm->register_thread("pdp_thread", new paco_omni_fabrique());
00115   pfm->register_comScheduling("pdp_direct", new paco_direct_fabrique());
00116   pfm->register_distribution("pdp_GaBro", new GaBro_fab());
00117   pfm->register_distribution("pdp_BasicBC", new BasicBC_fab());
00118 
00119   Param_Double_Port_provides_i * port = NULL; 
00120   Ports::Param_Double_Port_proxy_impl * proxy_node = NULL; 
00121 
00122   std::cerr << "Creating Proxy" << std::endl;
00123   if (rank == 0) {
00124     // On commence par créer le proxy
00125     // Il est enregistré dans le composant et sera détruit automatiquement
00126     // lorsque le composant sera détruit
00127     proxy_node = 
00128       new Ports::Param_Double_Port_proxy_impl(CORBA::ORB::_duplicate(orb),
00129                                               pfm->get_thread("pdp_thread"));
00130     proxy_node->setLibCom("pdp_dummy", proxy_node);
00131     proxy_node->setLibThread("pdp_thread");
00132     PaCO::PacoTopology_t serveur_topo;
00133     serveur_topo.total = totalNode;
00134     proxy_node->setTopology(serveur_topo);
00135 
00136     // Création de la propriété
00137     PortProperties_i * proxy_node_properties = new PortProperties_i();
00138 
00139     // Enregistrement du proxy
00140     par_compo->add_parallel_provides_proxy_port(proxy_node->_this(), 
00141                                                 port_name.c_str(),
00142                                                 proxy_node_properties->_this());
00143     proxy_node->_remove_ref();
00144     proxy_node_properties->_remove_ref();
00145   }
00146   else {
00147     par_compo->add_parallel_provides_proxy_wait(port_name.c_str());
00148   }
00149 
00150   std::cerr << "Getting proxy" << std::endl;
00151   char * proxy_ior = (char * ) par_compo->get_proxy(port_name.c_str());
00152   std::cerr << "Proxy ior is : " << proxy_ior << std::endl;
00153 
00154   port = new Param_Double_Port_provides_i(CORBA::ORB::_duplicate(orb), proxy_ior, rank);
00155   port->copyClientGlobalContext(par_compo);
00156 
00157   // Il faut maintenant configurer les bibliothèques
00158   // de redistributions de la fonction put
00159   ParallelMethodContext * method_ptr = port->getParallelMethodContext("put");
00160   method_ptr->setLibComScheduling("pdp_direct"); 
00161   method_ptr->setDistLibArg("param_data", "pdp_BasicBC", "in");
00162   BasicBC * dislib = (BasicBC *) method_ptr->getDistLibArg("param_data", "in");
00163   dislib->setEltSize(sizeof(CORBA::Double));
00164 
00165   // Il faut maintenant configurer les bibliothèques
00166   // de redistributions de la fonction get_results
00167   method_ptr = port->getParallelMethodContext("get_results");
00168   method_ptr->setLibComScheduling("pdp_direct"); 
00169   method_ptr->setDistLibArg("param_results", "pdp_GaBro", "out");
00170   GaBro * dislib_gabro = (GaBro *) method_ptr->getDistLibArg("param_results", "out");
00171   dislib_gabro->setEltSize(sizeof(CORBA::Double));
00172 
00173   // Enregistement du port 
00174   for (int i = 0; i < totalNode; i++) 
00175   {
00176     std::ostringstream node_number;
00177     node_number << i;
00178     std::string event_name("AddNode");
00179     event_name += node_number.str();
00180     std::string tag_name = proxy_ior;
00181 
00182     if (i == rank) {
00183       std::cerr << "Adding node of processor : " << i << std::endl;
00184       par_compo->add_parallel_provides_node_port(Ports::Port_PaCO::_narrow(port->_this()), port_name.c_str());
00185       port->_remove_ref();
00186       par_compo->InterfaceParallel_impl::_proxy->send_event(event_name.c_str(), tag_name.c_str());
00187     }
00188 
00189     par_compo->wait_event(event_name.c_str(), tag_name.c_str());
00190   }
00191 
00192   // On démarre l'objet parallèle
00193   std::string event_name("StartingProxy");
00194   std::string tag_name = proxy_ior;
00195   if (rank == 0) 
00196   {
00197     proxy_node->start();
00198     par_compo->InterfaceParallel_impl::_proxy->send_event(event_name.c_str(), tag_name.c_str());
00199   }
00200 
00201   CORBA::string_free(proxy_ior);
00202   return port;
00203 }
00204 
00205 void
00206 Param_Double_Port_provides_i::wait_init_port(Engines_ParallelDSC_i * par_compo, 
00207                                              std::string port_name,
00208                                              CORBA::ORB_ptr orb)
00209 {
00210   int rank = par_compo->getMyRank();
00211   int totalNode = par_compo->getTotalNode();
00212   // Enregistement du port 
00213   for (int i = 0; i < totalNode; i++) 
00214   {
00215     std::ostringstream node_number;
00216     node_number << i;
00217     std::string event_name("WaitingNode");
00218     event_name += node_number.str();
00219     char * proxy_ior = (char * ) par_compo->get_proxy(port_name.c_str());
00220     std::string tag_name(proxy_ior);
00221     CORBA::string_free(proxy_ior);
00222     if (i == rank) 
00223       par_compo->InterfaceParallel_impl::_proxy->send_event(event_name.c_str(), tag_name.c_str());
00224     par_compo->wait_event(event_name.c_str(), tag_name.c_str());
00225   }
00226 }
00227 
00228 void 
00229 Param_Double_Port_provides_i::put(const Ports::Param_Double_Port::seq_double & param_data)
00230 {
00231 
00232   // On attend que le get soit fait
00233   // Au départ seq_data_termine_cp = TRUE
00234   pthread_mutex_lock(seq_data_mutex_cp);
00235   while (seq_data_termine_cp == false)
00236   {
00237      pthread_cond_wait(seq_data_condition_cp, seq_data_mutex_cp);
00238   }
00239   seq_data_termine_cp = false;
00240   pthread_mutex_unlock(seq_data_mutex_cp);
00241 
00242   pthread_mutex_lock(seq_data_mutex);
00243 
00244   // Création d'une nouvelle séquence
00245   // Elle prend le buffer sans le copier
00246   Ports::Param_Double_Port::seq_double * n_param_data = (Ports::Param_Double_Port::seq_double *) &param_data;
00247   _seq_data = new Ports::Param_Double_Port::seq_double(n_param_data->length(), n_param_data->length(), n_param_data->get_buffer(1), 1);
00248 
00249   seq_data_termine = true;
00250   pthread_cond_signal(seq_data_condition);
00251   pthread_mutex_unlock(seq_data_mutex);
00252 }
00253     
00254 void 
00255 Param_Double_Port_provides_i::get_results(Ports::Param_Double_Port::seq_double_out param_results)
00256 {
00257   pthread_mutex_lock(seq_results_mutex);
00258   while (seq_results_termine == false)
00259   {
00260      pthread_cond_wait(seq_results_condition, seq_results_mutex);
00261   }
00262 
00263   // Création d'une nouvelle séquence
00264   // Elle prend le buffer sans le copier
00265   param_results = new Ports::Param_Double_Port::seq_double(_seq_results->length(), _seq_results->length(), _seq_results->get_buffer(1), 1);
00266   delete _seq_results;
00267   _seq_results = NULL;
00268 
00269   seq_results_termine = false;
00270   pthread_mutex_unlock(seq_results_mutex);
00271 
00272   // On indique que l'on a copié la valeur
00273   // Et donc que l'on peut recevoir une nouvelle valeur
00274   pthread_mutex_lock(seq_results_mutex_cp);
00275   seq_results_termine_cp = true;
00276   pthread_cond_signal(seq_results_condition_cp);
00277   pthread_mutex_unlock(seq_results_mutex_cp);
00278 }
00279 
00280 Ports::Param_Double_Port::seq_double *
00281 Param_Double_Port_provides_i::get_data()
00282 {
00283   Ports::Param_Double_Port::seq_double * result = NULL;
00284 
00285   pthread_mutex_lock(seq_data_mutex);
00286   while (seq_data_termine == false)
00287   {
00288      pthread_cond_wait(seq_data_condition, seq_data_mutex);
00289   }
00290 
00291   // Création d'une nouvelle séquence
00292   // Elle prend le buffer sans le copier
00293   result = new Ports::Param_Double_Port::seq_double(_seq_data->length(), _seq_data->length(), _seq_data->get_buffer(1), 1);
00294   delete _seq_data;
00295   _seq_data = NULL;
00296 
00297   seq_data_termine = false;
00298   pthread_mutex_unlock(seq_data_mutex);
00299 
00300   // On indique que l'on a copié la valeur
00301   // Et donc que l'on peut recevoir une nouvelle valeur
00302   pthread_mutex_lock(seq_data_mutex_cp);
00303   seq_data_termine_cp = true;
00304   pthread_cond_signal(seq_data_condition_cp);
00305   pthread_mutex_unlock(seq_data_mutex_cp);
00306   return result;
00307 }
00308 
00309 void
00310 Param_Double_Port_provides_i::set_data(Ports::Param_Double_Port::seq_double * results)
00311 {
00312   // On attend que le get soit fait
00313   // Au départ seq_results_termine_cp = TRUE
00314   pthread_mutex_lock(seq_results_mutex_cp);
00315   while (seq_results_termine_cp == false)
00316   {
00317      pthread_cond_wait(seq_results_condition_cp, seq_results_mutex_cp);
00318   }
00319   seq_results_termine_cp = false;
00320   pthread_mutex_unlock(seq_results_mutex_cp);
00321 
00322   pthread_mutex_lock(seq_results_mutex);
00323 
00324   // Création d'une nouvelle séquence
00325   // Elle prend le buffer sans le copier
00326   _seq_results = new Ports::Param_Double_Port::seq_double(results->length(), results->length(), results->get_buffer(1), 1);
00327 
00328   seq_results_termine = true;
00329   pthread_cond_signal(seq_results_condition);
00330   pthread_mutex_unlock(seq_results_mutex);
00331 }
00332 
00333 void 
00334 Param_Double_Port_provides_i::configure_set_data(int data_length, int totalNbElt, int BeginEltPos)
00335 {
00336   // Configuration de la biblothèque de redistribution
00337   // pour les données actuelles
00338   ParallelMethodContext * method_ptr = getParallelMethodContext("get_results");
00339   GaBro * dislib = (GaBro *) method_ptr->getDistLibArg("param_results", "out");
00340   dislib->setNodeNbElt(data_length);
00341   dislib->setTotalNbElt(totalNbElt);
00342   dislib->setNodePos(BeginEltPos);
00343 }