Back to index

salome-kernel  6.5.0
MPIObject_i.cxx
Go to the documentation of this file.
00001 // Copyright (C) 2007-2012  CEA/DEN, EDF R&D, OPEN CASCADE
00002 //
00003 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
00004 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
00005 //
00006 // This library is free software; you can redistribute it and/or
00007 // modify it under the terms of the GNU Lesser General Public
00008 // License as published by the Free Software Foundation; either
00009 // version 2.1 of the License.
00010 //
00011 // This library is distributed in the hope that it will be useful,
00012 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00013 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00014 // Lesser General Public License for more details.
00015 //
00016 // You should have received a copy of the GNU Lesser General Public
00017 // License along with this library; if not, write to the Free Software
00018 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
00019 //
00020 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
00021 //
00022 
00023 //  SALOME MPIContainer : implemenation of container based on MPI libraries
00024 //  File   : MPIObject_i.cxx
00025 //  Module : SALOME
00026 //
00027 #include "MPIObject_i.hxx"
00028 #include "utilities.h"
00029 #include "Utils_SALOME_Exception.hxx"
00030 
00031 #define TIMEOUT 5
00032 
00033 MPIObject_i::MPIObject_i()
00034 {
00035   MPI_Comm_size( MPI_COMM_WORLD, &_nbproc );
00036   MPI_Comm_rank( MPI_COMM_WORLD, &_numproc );
00037   _tior=NULL;
00038 }
00039 
00040 MPIObject_i::~MPIObject_i()
00041 {
00042   if(_tior) delete _tior;
00043 }
00044 
00045 Engines::IORTab* MPIObject_i::tior()
00046 {
00047   Engines::IORTab_var tior = new Engines::IORTab;
00048   tior->length(_tior->length());
00049   for(unsigned int ip=0;ip<tior->length();ip++)
00050     tior[ip] = (*_tior)[ip];
00051   return tior._retn(); 
00052 };
00053 
00054 void MPIObject_i::tior(const Engines::IORTab& ior)
00055 {
00056   _tior = new Engines::IORTab;
00057   _tior->length(ior.length());
00058   for(unsigned int ip=0;ip<ior.length();ip++)
00059     (*_tior)[ip] = ior[ip];
00060 }
00061 
00062 void MPIObject_i::BCastIOR(CORBA::ORB_ptr orb, Engines::MPIObject_ptr pobj, bool amiCont)
00063 {
00064   int err, ip, n;
00065   char *ior;
00066   MPI_Status status; /* status de reception de message MPI */
00067   std::ostringstream msg;
00068 
00069   if( _numproc == 0 )
00070     {
00071 
00072       //Allocation du tableau des IOR
00073       Engines::IORTab_var iort = new Engines::IORTab;
00074       iort->length(_nbproc);
00075       
00076       iort[0] = pobj;
00077 
00078       // Process 0 recupere les ior de l'object sur les autres process
00079       for(ip=1;ip<_nbproc;ip++)
00080         {
00081           err = MPI_Recv(&n,1,MPI_INT,ip,ip,MPI_COMM_WORLD,&status);
00082           if(err)
00083             {
00084               msg << "[" << _numproc << "] MPI_RECV error";
00085               throw SALOME_Exception(msg.str().c_str());
00086             }
00087           // Allocation de la chaine de longueur n
00088           ior = new char[n];
00089           err = MPI_Recv(ior,n,MPI_CHAR,ip,2*ip,MPI_COMM_WORLD,&status);
00090           if(err)
00091             {
00092               msg << "[" << _numproc << "] MPI_RECV error";
00093               throw SALOME_Exception(msg.str().c_str());
00094             }
00095           iort[ip] = orb->string_to_object(ior);
00096           delete [] ior;
00097           if(CORBA::is_nil(iort[ip]))
00098             {
00099               msg << "[" << ip << "] MPI Component not loaded";
00100               throw SALOME_Exception(msg.str().c_str());
00101             }
00102         }
00103       // On donne le tableau des ior a l'objet Corba du process 0
00104       if( amiCont )
00105         tior(*(iort._retn()));
00106       else
00107         pobj->tior(*(iort._retn()));
00108     }
00109   else
00110     {
00111       // Conversion IOR vers string
00112       ior = orb->object_to_string(pobj);
00113       n = strlen(ior) + 1;
00114       // On envoie l'IOR au process 0
00115       err = MPI_Send(&n,1,MPI_INT,0,_numproc,MPI_COMM_WORLD);
00116       if(err)
00117         {
00118           msg << "[" << _numproc << "] MPI_SEND error";
00119           throw SALOME_Exception(msg.str().c_str());
00120         }
00121       err = MPI_Send(ior,n,MPI_CHAR,0,2*_numproc,MPI_COMM_WORLD);
00122       if(err)
00123         {
00124           msg << "[" << _numproc << "] MPI_SEND error";
00125           throw SALOME_Exception(msg.str().c_str());
00126         }
00127       CORBA::string_free(ior);
00128     }
00129  
00130 }
00131 
00132 #ifdef HAVE_MPI2
00133 void MPIObject_i::remoteMPI2Connect(std::string service)
00134 {
00135   int i;
00136   char port_name[MPI_MAX_PORT_NAME];
00137   char port_name_clt[MPI_MAX_PORT_NAME];
00138   std::ostringstream msg;
00139 
00140   if( service.size() == 0 )
00141     {
00142       msg << "[" << _numproc << "] You have to give a service name !";
00143       throw SALOME_Exception(msg.str().c_str());
00144     }
00145 
00146   if( _srv.find(service) != _srv.end() )
00147     {
00148       msg << "[" << _numproc << "] service " << service << " already exist !";
00149       throw SALOME_Exception(msg.str().c_str());
00150     }
00151 
00152   _srv[service] = false;
00153 
00154   MPI_Barrier(MPI_COMM_WORLD);
00155 
00156   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
00157   if( _numproc == 0 )
00158     { 
00159       /* rank 0 try to be a server. If service is already published, try to be a cient */
00160       MPI_Open_port(MPI_INFO_NULL, port_name); 
00161       if ( MPI_Lookup_name((char*)service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS )
00162         {
00163           MESSAGE("[" << _numproc << "] I get the connection with " << service << " at " << port_name_clt << std::endl);
00164           MPI_Close_port( port_name );
00165         }
00166       else if ( MPI_Publish_name((char*)service.c_str(), MPI_INFO_NULL, port_name) == MPI_SUCCESS )
00167         {
00168           _srv[service] = true;
00169           _port_name[service] = port_name;
00170           MESSAGE("[" << _numproc << "] service " << service << " available at " << port_name << std::endl);
00171         }      
00172       else if ( MPI_Lookup_name((char*)service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS )
00173         {
00174           MESSAGE("[" << _numproc << "] I get the connection with " << service << " at " << port_name_clt << std::endl);
00175           MPI_Close_port( port_name );
00176         }
00177       else
00178         {
00179           msg << "[" << _numproc << "] Error on connection with " << service << " at " << port_name_clt;
00180           throw SALOME_Exception(msg.str().c_str());
00181         }
00182     }
00183   else
00184     {
00185       i=0;
00186       /* Waiting rank 0 publish name and try to be a client */
00187       while ( i != TIMEOUT  ) 
00188         {
00189           sleep(1);
00190           if ( MPI_Lookup_name((char*)service.c_str(), MPI_INFO_NULL, port_name_clt) == MPI_SUCCESS )
00191             {
00192               MESSAGE("[" << _numproc << "] I get the connection with " << service << " at " << port_name_clt << std::endl);
00193               break;
00194             }
00195           i++;
00196         }
00197       if(i==TIMEOUT)
00198         {
00199           msg << "[" << _numproc << "] Error on connection with " << service << " at " << port_name_clt;
00200           throw SALOME_Exception(msg.str().c_str());
00201         }
00202     }
00203   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
00204   
00205   /* If rank 0 is server, all processes call MPI_Comm_accept */
00206   /* If rank 0 is not server, all processes call MPI_Comm_connect */
00207   int srv = (int)_srv[service];
00208   MPI_Bcast(&srv,1,MPI_INT,0,MPI_COMM_WORLD);
00209   _srv[service] = (bool)srv;
00210   if ( _srv[service] )
00211     MPI_Comm_accept( port_name, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &(_icom[service]) );
00212   else
00213     MPI_Comm_connect(port_name_clt, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &(_icom[service]) );
00214 
00215   /* create global communicator: servers have low index in global communicator*/
00216   MPI_Intercomm_merge(_icom[service],!_srv[service],&(_gcom[service]));
00217 
00218   /* only rank 0 can be server for unpublish name */
00219   if(_numproc != 0) _srv[service] = false;
00220 
00221 }
00222 
00223 void MPIObject_i::remoteMPI2Disconnect(std::string service)
00224 {
00225   std::ostringstream msg;
00226 
00227   if( service.size() == 0 )
00228     {
00229       msg << "[" << _numproc << "] You have to give a service name !";
00230       throw SALOME_Exception(msg.str().c_str());
00231     }
00232 
00233   if( _srv.find(service) == _srv.end() )
00234     {
00235       msg << "[" << _numproc << "] service " << service << " don't exist !";
00236       throw SALOME_Exception(msg.str().c_str());
00237     }
00238   
00239   MPI_Comm_disconnect( &(_gcom[service]) ); 
00240   if ( _srv[service] )
00241     {
00242 
00243       char port_name[MPI_MAX_PORT_NAME];
00244       strcpy(port_name,_port_name[service].c_str());
00245 
00246       MPI_Unpublish_name((char*)service.c_str(), MPI_INFO_NULL, port_name); 
00247       MESSAGE("[" << _numproc << "] " << service << ": close port " << _port_name[service] << std::endl);
00248       MPI_Close_port( port_name ); 
00249       _port_name.erase(service);
00250     }
00251   
00252   _gcom.erase(service);
00253   _icom.erase(service);
00254   _srv.erase(service);
00255 
00256 }
00257 #endif
00258