Back to index

salome-kernel  6.5.0
Public Member Functions | Static Public Member Functions | Private Attributes
Param_Double_Port_provides_i Class Reference

#include <Param_Double_Port_provides_i.hxx>

List of all members.

Public Member Functions

 Param_Double_Port_provides_i (CORBA::ORB_ptr orb, char *ior, int rank)
virtual ~Param_Double_Port_provides_i ()
void put (const Ports::Param_Double_Port::seq_double &param_data)
void get_results (Ports::Param_Double_Port::seq_double_out param_results)
Ports::Param_Double_Port::seq_doubleget_data ()
void set_data (Ports::Param_Double_Port::seq_double *results)
void configure_set_data (int data_length, int totalNbElt, int BeginEltPos)

Static Public Member Functions

static
Param_Double_Port_provides_i
init_port (Engines_ParallelDSC_i *par_compo, std::string port_name, CORBA::ORB_ptr orb)
static void wait_init_port (Engines_ParallelDSC_i *par_compo, std::string port_name, CORBA::ORB_ptr orb)

Private Attributes

Ports::Param_Double_Port::seq_double_seq_data
Ports::Param_Double_Port::seq_double_seq_results
pthread_mutex_t * seq_data_mutex
pthread_cond_t * seq_data_condition
bool seq_data_termine
pthread_mutex_t * seq_data_mutex_cp
pthread_cond_t * seq_data_condition_cp
bool seq_data_termine_cp
pthread_mutex_t * seq_results_mutex
pthread_cond_t * seq_results_condition
bool seq_results_termine
pthread_mutex_t * seq_results_mutex_cp
pthread_cond_t * seq_results_condition_cp
bool seq_results_termine_cp

Detailed Description

Definition at line 32 of file Param_Double_Port_provides_i.hxx.


Constructor & Destructor Documentation

Param_Double_Port_provides_i::Param_Double_Port_provides_i ( CORBA::ORB_ptr  orb,
char *  ior,
int  rank 
)

Definition at line 37 of file Param_Double_Port_provides_i.cxx.

                                                                                                 :
  Ports::Param_Double_Port_serv(orb,ior,rank),
  Ports::Param_Double_Port_base_serv(orb,ior,rank),
  Ports::Data_Port_serv(orb,ior,rank),
  Ports::Data_Port_base_serv(orb,ior,rank),
  Ports::Port_serv(orb,ior,rank),
  Ports::Port_base_serv(orb,ior,rank),
  InterfaceParallel_impl(orb,ior,rank)
{
  _seq_data = NULL;

  seq_data_termine = false;                 
  seq_data_mutex = new pthread_mutex_t();
  pthread_mutex_init(seq_data_mutex, NULL);
  seq_data_condition = new pthread_cond_t();
  pthread_cond_init(seq_data_condition, NULL);
  seq_data_termine_cp = true;               
  seq_data_mutex_cp = new pthread_mutex_t();
  pthread_mutex_init(seq_data_mutex_cp, NULL);
  seq_data_condition_cp = new pthread_cond_t();
  pthread_cond_init(seq_data_condition_cp, NULL);

  _seq_results = NULL;

  seq_results_termine = false;              
  seq_results_mutex = new pthread_mutex_t();
  pthread_mutex_init(seq_results_mutex, NULL);
  seq_results_condition = new pthread_cond_t();
  pthread_cond_init(seq_results_condition, NULL);
  seq_results_termine_cp = true;                    
  seq_results_mutex_cp = new pthread_mutex_t();
  pthread_mutex_init(seq_results_mutex_cp, NULL);
  seq_results_condition_cp = new pthread_cond_t();
  pthread_cond_init(seq_results_condition_cp, NULL);
}

Here is the caller graph for this function:

Definition at line 73 of file Param_Double_Port_provides_i.cxx.

{
  if (_seq_data)
    delete _seq_data;

  pthread_mutex_destroy(seq_data_mutex);
  delete seq_data_mutex;
  pthread_cond_destroy(seq_data_condition);
  delete seq_data_condition;
  pthread_mutex_destroy(seq_data_mutex_cp);
  delete seq_data_mutex_cp;
  pthread_cond_destroy(seq_data_condition_cp);
  delete seq_data_condition_cp;

  if (_seq_results)
    delete _seq_results;

  pthread_mutex_destroy(seq_results_mutex);
  delete seq_results_mutex;
  pthread_cond_destroy(seq_results_condition);
  delete seq_results_condition;
  pthread_mutex_destroy(seq_results_mutex_cp);
  delete seq_results_mutex_cp;
  pthread_cond_destroy(seq_results_condition_cp);
  delete seq_results_condition_cp;
}

Member Function Documentation

void Param_Double_Port_provides_i::configure_set_data ( int  data_length,
int  totalNbElt,
int  BeginEltPos 
)

Definition at line 334 of file Param_Double_Port_provides_i.cxx.

{
  // Configuration de la biblothèque de redistribution
  // pour les données actuelles
  ParallelMethodContext * method_ptr = getParallelMethodContext("get_results");
  GaBro * dislib = (GaBro *) method_ptr->getDistLibArg("param_results", "out");
  dislib->setNodeNbElt(data_length);
  dislib->setTotalNbElt(totalNbElt);
  dislib->setNodePos(BeginEltPos);
}

Definition at line 281 of file Param_Double_Port_provides_i.cxx.

{
  Ports::Param_Double_Port::seq_double * result = NULL;

  pthread_mutex_lock(seq_data_mutex);
  while (seq_data_termine == false)
  {
     pthread_cond_wait(seq_data_condition, seq_data_mutex);
  }

  // Création d'une nouvelle séquence
  // Elle prend le buffer sans le copier
  result = new Ports::Param_Double_Port::seq_double(_seq_data->length(), _seq_data->length(), _seq_data->get_buffer(1), 1);
  delete _seq_data;
  _seq_data = NULL;

  seq_data_termine = false;
  pthread_mutex_unlock(seq_data_mutex);

  // On indique que l'on a copié la valeur
  // Et donc que l'on peut recevoir une nouvelle valeur
  pthread_mutex_lock(seq_data_mutex_cp);
  seq_data_termine_cp = true;
  pthread_cond_signal(seq_data_condition_cp);
  pthread_mutex_unlock(seq_data_mutex_cp);
  return result;
}
void Param_Double_Port_provides_i::get_results ( Ports::Param_Double_Port::seq_double_out  param_results)

Definition at line 255 of file Param_Double_Port_provides_i.cxx.

{
  pthread_mutex_lock(seq_results_mutex);
  while (seq_results_termine == false)
  {
     pthread_cond_wait(seq_results_condition, seq_results_mutex);
  }

  // Création d'une nouvelle séquence
  // Elle prend le buffer sans le copier
  param_results = new Ports::Param_Double_Port::seq_double(_seq_results->length(), _seq_results->length(), _seq_results->get_buffer(1), 1);
  delete _seq_results;
  _seq_results = NULL;

  seq_results_termine = false;
  pthread_mutex_unlock(seq_results_mutex);

  // On indique que l'on a copié la valeur
  // Et donc que l'on peut recevoir une nouvelle valeur
  pthread_mutex_lock(seq_results_mutex_cp);
  seq_results_termine_cp = true;
  pthread_cond_signal(seq_results_condition_cp);
  pthread_mutex_unlock(seq_results_mutex_cp);
}
Param_Double_Port_provides_i * Param_Double_Port_provides_i::init_port ( Engines_ParallelDSC_i par_compo,
std::string  port_name,
CORBA::ORB_ptr  orb 
) [static]

Definition at line 101 of file Param_Double_Port_provides_i.cxx.

{
  int rank = par_compo->getMyRank();
  int totalNode = par_compo->getTotalNode();
  paco_com * com = par_compo->getCom();

  MESSAGE("Configuration of Param_Double_Port_provides: rank = " << rank << " totalNode = " << totalNode);

  // DOIT ETRE DEJA FAIT AVANT !!!???
  paco_fabrique_manager* pfm = paco_getFabriqueManager();
  pfm->register_com("pdp_dummy", new paco_dummy_fabrique());
  pfm->register_thread("pdp_thread", new paco_omni_fabrique());
  pfm->register_comScheduling("pdp_direct", new paco_direct_fabrique());
  pfm->register_distribution("pdp_GaBro", new GaBro_fab());
  pfm->register_distribution("pdp_BasicBC", new BasicBC_fab());

  Param_Double_Port_provides_i * port = NULL; 
  Ports::Param_Double_Port_proxy_impl * proxy_node = NULL; 

  std::cerr << "Creating Proxy" << std::endl;
  if (rank == 0) {
    // On commence par créer le proxy
    // Il est enregistré dans le composant et sera détruit automatiquement
    // lorsque le composant sera détruit
    proxy_node = 
      new Ports::Param_Double_Port_proxy_impl(CORBA::ORB::_duplicate(orb),
                                              pfm->get_thread("pdp_thread"));
    proxy_node->setLibCom("pdp_dummy", proxy_node);
    proxy_node->setLibThread("pdp_thread");
    PaCO::PacoTopology_t serveur_topo;
    serveur_topo.total = totalNode;
    proxy_node->setTopology(serveur_topo);

    // Création de la propriété
    PortProperties_i * proxy_node_properties = new PortProperties_i();

    // Enregistrement du proxy
    par_compo->add_parallel_provides_proxy_port(proxy_node->_this(), 
                                                port_name.c_str(),
                                                proxy_node_properties->_this());
    proxy_node->_remove_ref();
    proxy_node_properties->_remove_ref();
  }
  else {
    par_compo->add_parallel_provides_proxy_wait(port_name.c_str());
  }

  std::cerr << "Getting proxy" << std::endl;
  char * proxy_ior = (char * ) par_compo->get_proxy(port_name.c_str());
  std::cerr << "Proxy ior is : " << proxy_ior << std::endl;

  port = new Param_Double_Port_provides_i(CORBA::ORB::_duplicate(orb), proxy_ior, rank);
  port->copyClientGlobalContext(par_compo);

  // Il faut maintenant configurer les bibliothèques
  // de redistributions de la fonction put
  ParallelMethodContext * method_ptr = port->getParallelMethodContext("put");
  method_ptr->setLibComScheduling("pdp_direct"); 
  method_ptr->setDistLibArg("param_data", "pdp_BasicBC", "in");
  BasicBC * dislib = (BasicBC *) method_ptr->getDistLibArg("param_data", "in");
  dislib->setEltSize(sizeof(CORBA::Double));

  // Il faut maintenant configurer les bibliothèques
  // de redistributions de la fonction get_results
  method_ptr = port->getParallelMethodContext("get_results");
  method_ptr->setLibComScheduling("pdp_direct"); 
  method_ptr->setDistLibArg("param_results", "pdp_GaBro", "out");
  GaBro * dislib_gabro = (GaBro *) method_ptr->getDistLibArg("param_results", "out");
  dislib_gabro->setEltSize(sizeof(CORBA::Double));

  // Enregistement du port 
  for (int i = 0; i < totalNode; i++) 
  {
    std::ostringstream node_number;
    node_number << i;
    std::string event_name("AddNode");
    event_name += node_number.str();
    std::string tag_name = proxy_ior;

    if (i == rank) {
      std::cerr << "Adding node of processor : " << i << std::endl;
      par_compo->add_parallel_provides_node_port(Ports::Port_PaCO::_narrow(port->_this()), port_name.c_str());
      port->_remove_ref();
      par_compo->InterfaceParallel_impl::_proxy->send_event(event_name.c_str(), tag_name.c_str());
    }

    par_compo->wait_event(event_name.c_str(), tag_name.c_str());
  }

  // On démarre l'objet parallèle
  std::string event_name("StartingProxy");
  std::string tag_name = proxy_ior;
  if (rank == 0) 
  {
    proxy_node->start();
    par_compo->InterfaceParallel_impl::_proxy->send_event(event_name.c_str(), tag_name.c_str());
  }

  CORBA::string_free(proxy_ior);
  return port;
}

Here is the call graph for this function:

Definition at line 229 of file Param_Double_Port_provides_i.cxx.

{

  // On attend que le get soit fait
  // Au départ seq_data_termine_cp = TRUE
  pthread_mutex_lock(seq_data_mutex_cp);
  while (seq_data_termine_cp == false)
  {
     pthread_cond_wait(seq_data_condition_cp, seq_data_mutex_cp);
  }
  seq_data_termine_cp = false;
  pthread_mutex_unlock(seq_data_mutex_cp);

  pthread_mutex_lock(seq_data_mutex);

  // Création d'une nouvelle séquence
  // Elle prend le buffer sans le copier
  Ports::Param_Double_Port::seq_double * n_param_data = (Ports::Param_Double_Port::seq_double *) &param_data;
  _seq_data = new Ports::Param_Double_Port::seq_double(n_param_data->length(), n_param_data->length(), n_param_data->get_buffer(1), 1);

  seq_data_termine = true;
  pthread_cond_signal(seq_data_condition);
  pthread_mutex_unlock(seq_data_mutex);
}

Definition at line 310 of file Param_Double_Port_provides_i.cxx.

{
  // On attend que le get soit fait
  // Au départ seq_results_termine_cp = TRUE
  pthread_mutex_lock(seq_results_mutex_cp);
  while (seq_results_termine_cp == false)
  {
     pthread_cond_wait(seq_results_condition_cp, seq_results_mutex_cp);
  }
  seq_results_termine_cp = false;
  pthread_mutex_unlock(seq_results_mutex_cp);

  pthread_mutex_lock(seq_results_mutex);

  // Création d'une nouvelle séquence
  // Elle prend le buffer sans le copier
  _seq_results = new Ports::Param_Double_Port::seq_double(results->length(), results->length(), results->get_buffer(1), 1);

  seq_results_termine = true;
  pthread_cond_signal(seq_results_condition);
  pthread_mutex_unlock(seq_results_mutex);
}
void Param_Double_Port_provides_i::wait_init_port ( Engines_ParallelDSC_i par_compo,
std::string  port_name,
CORBA::ORB_ptr  orb 
) [static]

Definition at line 206 of file Param_Double_Port_provides_i.cxx.

{
  int rank = par_compo->getMyRank();
  int totalNode = par_compo->getTotalNode();
  // Enregistement du port 
  for (int i = 0; i < totalNode; i++) 
  {
    std::ostringstream node_number;
    node_number << i;
    std::string event_name("WaitingNode");
    event_name += node_number.str();
    char * proxy_ior = (char * ) par_compo->get_proxy(port_name.c_str());
    std::string tag_name(proxy_ior);
    CORBA::string_free(proxy_ior);
    if (i == rank) 
      par_compo->InterfaceParallel_impl::_proxy->send_event(event_name.c_str(), tag_name.c_str());
    par_compo->wait_event(event_name.c_str(), tag_name.c_str());
  }
}

Here is the call graph for this function:


Member Data Documentation

Definition at line 60 of file Param_Double_Port_provides_i.hxx.

Definition at line 61 of file Param_Double_Port_provides_i.hxx.

Definition at line 65 of file Param_Double_Port_provides_i.hxx.

Definition at line 68 of file Param_Double_Port_provides_i.hxx.

Definition at line 64 of file Param_Double_Port_provides_i.hxx.

Definition at line 67 of file Param_Double_Port_provides_i.hxx.

Definition at line 66 of file Param_Double_Port_provides_i.hxx.

Definition at line 69 of file Param_Double_Port_provides_i.hxx.

Definition at line 73 of file Param_Double_Port_provides_i.hxx.

Definition at line 76 of file Param_Double_Port_provides_i.hxx.

Definition at line 72 of file Param_Double_Port_provides_i.hxx.

Definition at line 75 of file Param_Double_Port_provides_i.hxx.

Definition at line 74 of file Param_Double_Port_provides_i.hxx.

Definition at line 77 of file Param_Double_Port_provides_i.hxx.


The documentation for this class was generated from the following files: