Back to index

salome-med  6.5.0
ParaMEDMEMComponent_i.cxx
Go to the documentation of this file.
00001 // Copyright (C) 2007-2012  CEA/DEN, EDF R&D
00002 //
00003 // This library is free software; you can redistribute it and/or
00004 // modify it under the terms of the GNU Lesser General Public
00005 // License as published by the Free Software Foundation; either
00006 // version 2.1 of the License.
00007 //
00008 // This library is distributed in the hope that it will be useful,
00009 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00010 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00011 // Lesser General Public License for more details.
00012 //
00013 // You should have received a copy of the GNU Lesser General Public
00014 // License along with this library; if not, write to the Free Software
00015 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
00016 //
00017 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
00018 //
00019 
00020 #include "ParaMEDMEMComponent_i.hxx"
00021 #include "utilities.h"
00022 #include "Utils_SALOME_Exception.hxx"
00023 using namespace std;
00024 using namespace ParaMEDMEM;
00025 
00026 typedef struct
00027 {
00028   bool exception;
00029   string msg;
00030 } except_st;
00031 
00032 pthread_mutex_t m1 = PTHREAD_MUTEX_INITIALIZER;
00033 pthread_mutex_t m2 = PTHREAD_MUTEX_INITIALIZER;
00034 
00035 ParaMEDMEMComponent_i::ParaMEDMEMComponent_i()
00036 {
00037   _interface = new CommInterface;
00038 }
00039 
00040 ParaMEDMEMComponent_i::ParaMEDMEMComponent_i(CORBA::ORB_ptr orb,
00041                                              PortableServer::POA_ptr poa, 
00042                                              PortableServer::ObjectId * contId, 
00043                                              const char *instanceName,
00044                                              const char *interfaceName,
00045                                              bool regist)
00046   : Engines_Component_i(orb,poa,contId,instanceName,interfaceName,false,regist)
00047 {
00048   _interface = new CommInterface;
00049 }
00050 
00051 ParaMEDMEMComponent_i::~ParaMEDMEMComponent_i()
00052 {
00053   MESSAGE("* [" << _numproc << "] ParaMEDMEMComponent destructor");
00054   delete _interface;
00055   pthread_mutex_destroy (&m1);
00056   pthread_mutex_destroy (&m2);
00057 }
00058 
00059 void ParaMEDMEMComponent_i::initializeCoupling(const char * coupling, const char * ior) throw(SALOME::SALOME_Exception)
00060 {
00061   int gsize, grank;
00062   except_st *est;
00063   void *ret_th;
00064   pthread_t *th;
00065   ostringstream msg;
00066   
00067   pthread_mutex_lock(&m1);
00068   if(_numproc == 0)
00069     {
00070       th = new pthread_t[_nbproc];
00071       for(int ip=1;ip<_nbproc;ip++)
00072         {
00073           thread_st *st = new thread_st;
00074           st->ip = ip;
00075           st->tior = _tior;
00076           st->coupling = coupling;
00077           st->ior = ior;
00078           pthread_create(&(th[ip]),NULL,th_initializecoupling,(void*)st);
00079         }
00080     }
00081 
00082   try{
00083     string service = coupling;
00084     if( service.size() == 0 )
00085       throw SALOME_Exception("You have to give a service name !");
00086     
00087     if( _gcom.find(service) != _gcom.end() )
00088       {
00089         msg << "service " << service << " already exists !";
00090         throw SALOME_Exception(msg.str().c_str());
00091       }
00092 
00093     // Connection to distributed parallel component
00094 #ifdef HAVE_MPI2
00095     remoteMPI2Connect(coupling);
00096 #else
00097     throw SALOME_Exception("You have to use a MPI2 compliant mpi implementation !");
00098 #endif
00099 
00100     MPI_Comm_size( _gcom[coupling], &gsize );
00101     MPI_Comm_rank( _gcom[coupling], &grank );
00102     MESSAGE("[" << grank << "] new communicator of " << gsize << " processes");
00103 
00104     // Creation of processors group for ParaMEDMEM
00105     // source is always the lower processor numbers
00106     // target is always the upper processor numbers
00107     if(_numproc==grank)
00108       {
00109         _source[coupling] = new MPIProcessorGroup(*_interface,0,_nbproc-1,_gcom[coupling]);
00110         _target[coupling] = new MPIProcessorGroup(*_interface,_nbproc,gsize-1,_gcom[coupling]);
00111         _commgroup[coupling] = _source[coupling];
00112       }
00113     else
00114       {
00115         _source[coupling] = new MPIProcessorGroup(*_interface,0,gsize-_nbproc-1,_gcom[coupling]);
00116         _target[coupling] = new MPIProcessorGroup(*_interface,gsize-_nbproc,gsize-1,_gcom[coupling]);
00117         _commgroup[coupling] = _target[coupling];
00118       }
00119     _connectto [coupling] = ior;
00120     _dec[coupling] = NULL;
00121     _dec_options[coupling] = NULL;
00122     
00123   }
00124   catch(const std::exception &ex)
00125     {
00126       MESSAGE(ex.what());
00127       THROW_SALOME_CORBA_EXCEPTION(ex.what(),SALOME::INTERNAL_ERROR);
00128     }
00129 
00130   pthread_mutex_unlock(&m1);
00131   if(_numproc == 0)
00132     {
00133       for(int ip=1;ip<_nbproc;ip++)
00134         {
00135           pthread_join(th[ip],&ret_th);
00136           est = (except_st*)ret_th;
00137           if(est->exception)
00138             {
00139               msg << "[" << ip << "] " << est->msg;
00140               THROW_SALOME_CORBA_EXCEPTION(msg.str().c_str(),SALOME::INTERNAL_ERROR);
00141             }
00142           delete est;
00143         }
00144       delete[] th;
00145     }
00146 }
00147 
00148 void ParaMEDMEMComponent_i::terminateCoupling(const char * coupling) throw(SALOME::SALOME_Exception)
00149 {
00150   except_st *est;
00151   void *ret_th;
00152   pthread_t *th;
00153   ostringstream msg;
00154 
00155   pthread_mutex_lock(&m2);
00156   if(_numproc == 0)
00157     {
00158       th = new pthread_t[_nbproc];
00159       for(int ip=1;ip<_nbproc;ip++)
00160         {
00161           thread_st *st = new thread_st;
00162           st->ip = ip;
00163           st->tior = _tior;
00164           st->coupling = coupling;
00165           pthread_create(&(th[ip]),NULL,th_terminatecoupling,(void*)st);
00166         }
00167     }
00168 
00169   try{
00170     string service = coupling;
00171     if( service.size() == 0 )
00172       throw SALOME_Exception("You have to give a service name !");
00173 
00174     if( _gcom.find(service) == _gcom.end() )
00175       {
00176         msg << "service " << service << " doesn't exist !";
00177         throw SALOME_Exception(msg.str().c_str());
00178       }
00179 
00180     // Disconnection to distributed parallel component
00181 #ifdef HAVE_MPI2
00182     remoteMPI2Disconnect(coupling);
00183 #else
00184     throw SALOME_Exception("You have to use a MPI2 compliant mpi implementation !");
00185 #endif
00186 
00187     /* Processors groups and DEC destruction */
00188     delete _source[coupling];
00189     _source.erase(coupling);
00190     delete _target[coupling];
00191     _target.erase(coupling);
00192     delete _dec[coupling];
00193     _dec.erase(coupling);
00194     _commgroup.erase(coupling);
00195     if(_dec_options[coupling])
00196       {
00197         delete _dec_options[coupling];
00198         _dec_options.erase(coupling);
00199       }
00200     _connectto.erase(coupling);
00201   }
00202   catch(const std::exception &ex)
00203     {
00204       MESSAGE(ex.what());
00205       THROW_SALOME_CORBA_EXCEPTION(ex.what(),SALOME::INTERNAL_ERROR);
00206     }
00207   pthread_mutex_unlock(&m2);
00208   if(_numproc == 0)
00209     {
00210       for(int ip=1;ip<_nbproc;ip++)
00211         {
00212           pthread_join(th[ip],&ret_th);
00213           est = (except_st*)ret_th;
00214           if(est->exception)
00215             {
00216               ostringstream msg;
00217               msg << "[" << ip << "] " << est->msg;
00218               THROW_SALOME_CORBA_EXCEPTION(msg.str().c_str(),SALOME::INTERNAL_ERROR);
00219             }
00220           delete est;
00221         }
00222       delete[] th;
00223     }
00224 }
00225 
00226 void ParaMEDMEMComponent_i::setInterpolationOptions(const char * coupling,
00227                                                     CORBA::Long print_level,
00228                                                     const char * intersection_type,
00229                                                     CORBA::Double precision,
00230                                                     CORBA::Double median_plane,
00231                                                     CORBA::Boolean do_rotate,
00232                                                     CORBA::Double bounding_box_adjustment,
00233                                                     CORBA::Double bounding_box_adjustment_abs,
00234                                                     CORBA::Double max_distance_for_3Dsurf_intersect,
00235                                                     CORBA::Long orientation,
00236                                                     CORBA::Boolean measure_abs,
00237                                                     const char * splitting_policy,
00238                                                     CORBA::Boolean P1P0_bary_method ) throw(SALOME::SALOME_Exception)
00239 {
00240   except_st *est;
00241   void *ret_th;
00242   pthread_t *th;
00243   ostringstream msg;
00244 
00245   if(_numproc == 0)
00246     {
00247       th = new pthread_t[_nbproc];
00248       for(int ip=1;ip<_nbproc;ip++)
00249         {
00250           thread_st *st = new thread_st;
00251           st->ip = ip;
00252           st->tior = _tior;
00253           st->coupling = coupling;
00254           st->print_level = print_level;
00255           st->intersection_type = intersection_type;
00256           st->precision = precision;
00257           st->median_plane = median_plane;
00258           st->do_rotate = do_rotate;
00259           st->bounding_box_adjustment = bounding_box_adjustment;
00260           st->bounding_box_adjustment_abs = bounding_box_adjustment_abs;
00261           st->max_distance_for_3Dsurf_intersect = max_distance_for_3Dsurf_intersect;
00262           st->orientation = orientation;
00263           st->measure_abs = measure_abs;
00264           st->splitting_policy = splitting_policy;
00265           st->P1P0_bary_method = P1P0_bary_method;
00266           pthread_create(&(th[ip]),NULL,th_setinterpolationoptions,(void*)st);
00267         }
00268     }
00269 
00270   if(!_dec_options[coupling])
00271     _dec_options[coupling] = new INTERP_KERNEL::InterpolationOptions;
00272 
00273   bool ret = _dec_options[coupling]->setInterpolationOptions(print_level,
00274                                                              intersection_type,
00275                                                              precision,
00276                                                              median_plane,
00277                                                              do_rotate,
00278                                                              bounding_box_adjustment,
00279                                                              bounding_box_adjustment_abs,
00280                                                              max_distance_for_3Dsurf_intersect,
00281                                                              orientation,
00282                                                              measure_abs,
00283                                                              splitting_policy,
00284                                                              P1P0_bary_method );
00285 
00286   if(!ret)
00287     {
00288       MESSAGE("Error on setting interpolation options");
00289       THROW_SALOME_CORBA_EXCEPTION("Error on setting interpolation options",SALOME::INTERNAL_ERROR);
00290     }
00291   
00292   if(_numproc == 0)
00293     {
00294       for(int ip=1;ip<_nbproc;ip++)
00295         {
00296           pthread_join(th[ip],&ret_th);
00297           est = (except_st*)ret_th;
00298           if(est->exception)
00299             {
00300               msg << "[" << ip << "] " << est->msg;
00301               THROW_SALOME_CORBA_EXCEPTION(msg.str().c_str(),SALOME::INTERNAL_ERROR);
00302             }
00303           delete est;
00304         }
00305       delete[] th;
00306     }
00307 }
00308 
00309 void ParaMEDMEMComponent_i::_setInputField(SALOME_MED::MPIMEDCouplingFieldDoubleCorbaInterface_ptr fieldptr, MEDCouplingFieldDouble *field)
00310 {
00311   int grank;
00312   except_st *est;
00313   void *ret_th;
00314   pthread_t th;
00315   ostringstream msg;
00316   string coupling;
00317   
00318   std::map<std::string,std::string>::const_iterator it = mapSearchByValue(_connectto, fieldptr->getRef());
00319   if(it != _connectto.end())
00320     coupling = (*it).first.c_str();
00321   else
00322     throw SALOME_Exception("Reference of remote component doesn't find in connectto map !");
00323 
00324   if(_numproc == 0)
00325     {
00326       thread_st *st = new thread_st;
00327       st->fieldptr = fieldptr;
00328       st->coupling = coupling;
00329       pthread_create(&th,NULL,th_getdata,(void*)st);
00330     }
00331 
00332   if( coupling.size() == 0 )
00333     throw SALOME_Exception("You have to give a service name !");
00334 
00335   if( _gcom.find(coupling) == _gcom.end() )
00336     {
00337       msg << "service " << coupling << " doesn't exist !";
00338       throw SALOME_Exception(msg.str().c_str());
00339     }
00340 
00341   if(!_dec[coupling])
00342     {
00343 
00344       MPI_Comm_rank( _gcom[coupling], &grank );
00345 
00346       // Creating the intersection Data Exchange Channel
00347       // Processors which received the field are always the second argument of InterpKernelDEC object
00348       if(_numproc==grank)
00349         _dec[coupling] = new InterpKernelDEC(*_target[coupling], *_source[coupling]);
00350       else
00351         _dec[coupling] = new InterpKernelDEC(*_source[coupling], *_target[coupling]);
00352 
00353       if(_dec_options[coupling])
00354         _dec[coupling]->copyOptions(*(_dec_options[coupling]));
00355       
00356       //Attaching the field to the DEC
00357       _dec[coupling]->attachLocalField(field);
00358 
00359       // computing the interpolation matrix
00360       _dec[coupling]->synchronize();
00361 
00362     }
00363   else
00364     //Attaching the field to the DEC
00365     _dec[coupling]->attachLocalField(field);
00366   
00367   //Receiving data
00368   _dec[coupling]->recvData();
00369 
00370   if(_numproc == 0)
00371     {
00372       pthread_join(th,&ret_th);
00373       est = (except_st*)ret_th;
00374       if(est->exception)
00375         throw SALOME_Exception(est->msg.c_str());
00376       delete est;
00377     }
00378 
00379 }
00380 
00381 void ParaMEDMEMComponent_i::_getOutputField(const char * coupling, MEDCouplingFieldDouble *field)
00382 {
00383   int grank;
00384   string service = coupling;
00385   ostringstream msg;
00386 
00387   if( service.size() == 0 )
00388     throw SALOME_Exception("You have to give a service name !");
00389 
00390   if( _gcom.find(service) == _gcom.end() )
00391     {
00392       msg << "service " << service << " doesn't exist !";
00393       throw SALOME_Exception(msg.str().c_str());
00394     }
00395 
00396   if(!_dec[coupling])
00397     {
00398 
00399       MPI_Comm_rank( _gcom[coupling], &grank );
00400 
00401       // Creating the intersection Data Exchange Channel
00402       // Processors which sent the field are always the first argument of InterpKernelDEC object
00403       if(_numproc==grank)
00404         _dec[coupling] = new InterpKernelDEC(*_source[coupling], *_target[coupling]);
00405       else
00406         _dec[coupling] = new InterpKernelDEC(*_target[coupling], *_source[coupling]);
00407   
00408       if(_dec_options[coupling])
00409         _dec[coupling]->copyOptions(*(_dec_options[coupling]));
00410       
00411       //Attaching the field to the DEC
00412       _dec[coupling]->attachLocalField(field);
00413     
00414       // computing the interpolation matrix
00415       _dec[coupling]->synchronize();
00416     }
00417   else
00418     //Attaching the field to the DEC
00419     _dec[coupling]->attachLocalField(field);
00420 
00421   //Sending data
00422   _dec[coupling]->sendData();
00423 }
00424 
00425 void ParaMEDMEMComponent_i::_initializeCoupling(SALOME_MED::MPIMEDCouplingFieldDoubleCorbaInterface_ptr fieldptr)
00426 {
00427   except_st *est;
00428   void *ret_th;
00429   pthread_t *th;
00430   //this string specifies the coupling
00431   string coupling;
00432   //getting IOR string of the remote object
00433   string rcompo = fieldptr->getRef();
00434   if(_numproc == 0){
00435     //getting IOR string of the local object
00436     CORBA::Object_var my_ref = _poa->servant_to_reference (_thisObj);
00437     string lcompo = _orb->object_to_string(my_ref);
00438     //the component does not communicate with itself, a connection is required
00439     if( rcompo.find(lcompo) == std::string::npos ){
00440       th = new pthread_t[1];
00441       //finding the IOR of the remote object in the map
00442       std::map<std::string,std::string>::const_iterator it = mapSearchByValue(_connectto, rcompo);
00443       //if it is not found : connecting two objects : this is the first (and the only) connection between these objects
00444       if (it == _connectto.end()){
00445         //generating the coupling string : concatenation of two IOR strings
00446         coupling = lcompo + rcompo;
00447 
00448         //initializing the coupling on the remote object in a thread
00449         thread_st *st = new thread_st;
00450         CORBA::Object_var obj = _orb->string_to_object (rcompo.c_str());
00451         SALOME_MED::ParaMEDMEMComponent_var compo = SALOME_MED::ParaMEDMEMComponent::_narrow(obj);
00452         st->compo = compo._retn();
00453         st->coupling = coupling;
00454         st->ior = lcompo;
00455        
00456         pthread_create(&(th[0]),NULL,th_initializecouplingdist,(void*)st);
00457          
00458         //initializing the coupling on the local object
00459         initializeCoupling (coupling.c_str(), rcompo.c_str());
00460         pthread_join (th[0], &ret_th); 
00461         est = (except_st*)ret_th;
00462         if(est->exception)
00463           THROW_SALOME_CORBA_EXCEPTION(est->msg.c_str(),SALOME::INTERNAL_ERROR);
00464         delete est;
00465         delete[] th;
00466       }
00467     }
00468   }
00469 }
00470 
00471 std::map<std::string,std::string>::const_iterator ParaMEDMEMComponent_i::mapSearchByValue(std::map<std::string,std::string> & search_map, std::string search_val)
00472 {
00473   std::map<std::string,std::string>::const_iterator iRet = search_map.end();
00474   for (std::map<std::string,std::string>::const_iterator iTer = search_map.begin(); iTer != search_map.end(); iTer ++)
00475     {
00476       if( iTer->second.find(search_val) != std::string::npos )
00477         {
00478           iRet = iTer;
00479           break;
00480         }
00481     }
00482   return iRet;
00483 }
00484 
00485 bool ParaMEDMEMComponent_i::amICoupledWithThisComponent(const char* cref)
00486 {
00487   std::map<std::string,std::string>::const_iterator it = mapSearchByValue(_connectto, cref);
00488   if(it != _connectto.end())
00489     return true;
00490   else
00491     return false;
00492 }
00493 
00494 void *th_setinterpolationoptions(void *s)
00495 {
00496   ostringstream msg;
00497   thread_st *st = (thread_st*)s;
00498   except_st *est = new except_st;
00499   est->exception = false;
00500   try
00501     {
00502       SALOME_MED::ParaMEDMEMComponent_var compo=SALOME_MED::ParaMEDMEMComponent::_narrow((*(st->tior))[st->ip]);
00503       compo->setInterpolationOptions(st->coupling.c_str(),
00504                                      st->print_level,
00505                                      st->intersection_type,
00506                                      st->precision,
00507                                      st->median_plane,
00508                                      st->do_rotate,
00509                                      st->bounding_box_adjustment,
00510                                      st->bounding_box_adjustment_abs,
00511                                      st->max_distance_for_3Dsurf_intersect,
00512                                      st->orientation,
00513                                      st->measure_abs,
00514                                      st->splitting_policy,
00515                                      st->P1P0_bary_method);
00516     }
00517   catch(const SALOME::SALOME_Exception &ex)
00518     {
00519       est->exception = true;
00520       est->msg = ex.details.text;
00521     }
00522   catch(const CORBA::Exception &ex)
00523     {
00524       est->exception = true;
00525       msg << "CORBA::Exception: " << ex;
00526       est->msg = msg.str();
00527     }
00528   delete st;
00529   return((void*)est);
00530 }
00531 
00532 void *th_initializecoupling(void *s)
00533 {
00534   ostringstream msg;
00535   thread_st *st = (thread_st*)s;
00536   except_st *est = new except_st;
00537   est->exception = false;
00538 
00539   try
00540     {
00541       SALOME_MED::ParaMEDMEMComponent_var compo=SALOME_MED::ParaMEDMEMComponent::_narrow((*(st->tior))[st->ip]);
00542       compo->initializeCoupling(st->coupling.c_str(),st->ior.c_str());
00543     }
00544   catch(const SALOME::SALOME_Exception &ex)
00545     {
00546       est->exception = true;
00547       est->msg = ex.details.text;
00548     }
00549   catch(const CORBA::Exception &ex)
00550     {
00551       est->exception = true;
00552       msg << "CORBA::Exception: " << ex;
00553       est->msg = msg.str();
00554     }
00555   delete st;
00556   return((void*)est);
00557 }
00558 
00559 void *th_terminatecoupling(void *s)
00560 {
00561   ostringstream msg;
00562   thread_st *st = (thread_st*)s;
00563   except_st *est = new except_st;
00564   est->exception = false;
00565 
00566   try
00567     {
00568       SALOME_MED::ParaMEDMEMComponent_var compo=SALOME_MED::ParaMEDMEMComponent::_narrow((*(st->tior))[st->ip]);
00569       compo->terminateCoupling(st->coupling.c_str());
00570     }
00571   catch(const SALOME::SALOME_Exception &ex)
00572     {
00573       est->exception = true;
00574       est->msg = ex.details.text;
00575     }
00576   catch(const CORBA::Exception &ex)
00577     {
00578       est->exception = true;
00579       msg << "CORBA::Exception: " << ex;
00580       est->msg = msg.str();
00581     }
00582   delete st;
00583   return((void*)est);
00584 }
00585 
00586 void *th_getdata(void *s)
00587 {
00588   ostringstream msg;
00589   thread_st *st = (thread_st*)s;
00590   except_st *est = new except_st;
00591   est->exception = false;
00592 
00593   try
00594     {
00595       st->fieldptr->getDataByMPI(st->coupling.c_str());
00596     }
00597   catch(const SALOME::SALOME_Exception &ex)
00598     {
00599       est->exception = true;
00600       est->msg = ex.details.text;
00601     }
00602   catch(const CORBA::Exception &ex)
00603     {
00604       est->exception = true;
00605       msg << "CORBA::Exception: " << ex;
00606       est->msg = msg.str();
00607     }
00608   delete st;
00609   return((void*)est);
00610 }
00611 
00612 void *th_initializecouplingdist(void *s)
00613 {
00614   ostringstream msg;
00615   thread_st *st = (thread_st*)s;
00616   except_st *est = new except_st;
00617   est->exception = false;
00618 
00619   try
00620     {
00621       st->compo->initializeCoupling(st->coupling.c_str(), st->ior.c_str());
00622     }
00623   catch(const SALOME::SALOME_Exception &ex)
00624     {
00625       est->exception = true;
00626       est->msg = ex.details.text;
00627     }
00628   catch(const CORBA::Exception &ex)
00629     {
00630       est->exception = true;
00631       msg << "CORBA::Exception: " << ex;
00632       est->msg = msg.str();
00633     }
00634   delete st;
00635   return((void*)est);
00636 }
00637 
00638 void *th_terminatecouplingdist(void *s)
00639 {
00640   ostringstream msg;
00641   thread_st *st = (thread_st*)s;
00642   except_st *est = new except_st;
00643   est->exception = false;
00644 
00645   try
00646     {
00647       st->compo->terminateCoupling(st->coupling.c_str());
00648     }
00649   catch(const SALOME::SALOME_Exception &ex)
00650     {
00651       est->exception = true;
00652       est->msg = ex.details.text;
00653     }
00654   catch(const CORBA::Exception &ex)
00655     {
00656       est->exception = true;
00657       msg << "CORBA::Exception: " << ex;
00658       est->msg = msg.str();
00659     }
00660   delete st;
00661   return((void*)est);
00662 }