Back to index

salome-kernel  6.5.0
Receivers.cxx
Go to the documentation of this file.
00001 // Copyright (C) 2007-2012  CEA/DEN, EDF R&D, OPEN CASCADE
00002 //
00003 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
00004 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
00005 //
00006 // This library is free software; you can redistribute it and/or
00007 // modify it under the terms of the GNU Lesser General Public
00008 // License as published by the Free Software Foundation; either
00009 // version 2.1 of the License.
00010 //
00011 // This library is distributed in the hope that it will be useful,
00012 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00013 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00014 // Lesser General Public License for more details.
00015 //
00016 // You should have received a copy of the GNU Lesser General Public
00017 // License along with this library; if not, write to the Free Software
00018 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
00019 //
00020 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
00021 //
00022 
00023 #include "omniORB4/poa.h"
00024 #include "utilities.h"
00025 
00026 #define TAILLE_SPLIT 100000
00027 #define TIMEOUT 20
00028 
00029 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
00030 CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaNCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){
00031 }
00032 
00033 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
00034 CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaNCNoCopyReceiver(){
00035   _mySender->release();
00036 }
00037 
00038 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
00039 T *CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size)
00040 {
00041   TSeqCorba seq=_mySender->send();
00042   size=seq->length();
00043   return (T *)seq->get_buffer(1);
00044 }
00045 
00046 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
00047 T *CorbaNCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
00048 {
00049   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
00050 }
00051 
00052 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
00053 CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaNCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){
00054 }
00055 
00056 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
00057 CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaNCWithCopyReceiver(){
00058   _mySender->release();
00059 }
00060 
00061 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
00062 T *CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
00063   size=_mySender->getSize();
00064   long n;
00065   T *ret=new T[size];
00066   T *iter=ret;
00067   for(long i=0;i<size;i+=TAILLE_SPLIT)
00068     {
00069       if(size-i>TAILLE_SPLIT)
00070         n=TAILLE_SPLIT;
00071       else
00072         n=size-i;
00073       TSeqCorba seq=_mySender->sendPart(i,n);
00074       T *seqd=(T *)seq->get_buffer(0);
00075       for(long j=0;j<n;j++)
00076         *iter++=*seqd++;
00077     }
00078   return ret;
00079 }
00080 
00081 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
00082 T *CorbaNCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
00083 {
00084   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
00085 }
00086 
00087 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
00088 CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaWCNoCopyReceiver(CorbaSender mySender):_mySender(mySender){
00089 }
00090 
00091 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
00092 CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaWCNoCopyReceiver(){
00093   _mySender->release();
00094 }
00095 
00096 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
00097 T *CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
00098   size=_mySender->getSize();
00099   long n;
00100   T *ret=new T[size];
00101   T *iter=ret;
00102   for(long i=0;i<size;i+=TAILLE_SPLIT)
00103     {
00104       if(size-i>TAILLE_SPLIT)
00105         n=TAILLE_SPLIT;
00106       else
00107         n=size-i;
00108       TSeqCorba seq=_mySender->sendPart(i,n);
00109       TCorba *seqd=seq->get_buffer(0);
00110       for(long j=0;j<n;j++)
00111         *iter++=*seqd++;
00112     }
00113   return ret;
00114 }
00115 
00116 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
00117 T *CorbaWCNoCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
00118 {
00119   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
00120 }
00121 
00122 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
00123 CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::CorbaWCWithCopyReceiver(CorbaSender mySender):_mySender(mySender){
00124 }
00125 
00126 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
00127 CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::~CorbaWCWithCopyReceiver(){
00128   _mySender->release();
00129 }
00130 
00131 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
00132 T *CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
00133   size=_mySender->getSize();
00134   long n;
00135   T *ret=new T[size];
00136   T *iter=ret;
00137   for(long i=0;i<size;i+=TAILLE_SPLIT)
00138     {
00139       if(size-i>TAILLE_SPLIT)
00140         n=TAILLE_SPLIT;
00141       else
00142         n=size-i;
00143       TSeqCorba seq=_mySender->sendPart(i,n);
00144       TCorba *seqd=seq->get_buffer(0);
00145       for(long j=0;j<n;j++)
00146       *iter++=*seqd++;
00147     }
00148   return ret;
00149 }
00150 
00151 template<class T,class TCorba,class TSeqCorba,class CorbaSender,class servForT,class ptrForT>
00152 T *CorbaWCWithCopyReceiver<T,TCorba,TSeqCorba,CorbaSender,servForT,ptrForT>::getValue(long &size)
00153 {
00154   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
00155 }
00156 
00157 #ifdef HAVE_MPI2
00158 
00159 template<class T,class CorbaSender,class servForT,class ptrForT>
00160 MPIReceiver<T,CorbaSender,servForT,ptrForT>::MPIReceiver(CorbaSender mySender):_mySender(mySender){
00161 }
00162 
00163 template<class T,class CorbaSender,class servForT,class ptrForT>
00164 MPIReceiver<T,CorbaSender,servForT,ptrForT>::~MPIReceiver(){
00165   _mySender->release();
00166 }
00167 
00168 template<class T,class CorbaSender,class servForT,class ptrForT>
00169 T *MPIReceiver<T,CorbaSender,servForT,ptrForT>::getDistValue(long &size){
00170   int i=0;
00171   int myproc;
00172   int sproc;
00173   MPI_Status status;
00174   MPI_Comm com; 
00175   char   port_name_clt [MPI_MAX_PORT_NAME];
00176   T *_v;
00177   long _n;
00178 
00179   
00180   CORBA::Any a; 
00181   MPI_Comm_rank(MPI_COMM_WORLD, &myproc);
00182   SALOME::MPISender::param_var p =_mySender->getParam();
00183   _mySender->send();
00184   sproc = p->myproc;
00185   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
00186   while ( i != TIMEOUT  && MPI_Lookup_name((char*)p->service,MPI_INFO_NULL,port_name_clt) != MPI_SUCCESS) { 
00187     i++;
00188   }       
00189   MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
00190   if ( i == TIMEOUT  ) { 
00191     MPI_Finalize();
00192     exit(-1);
00193   }
00194   else{
00195     //       Connect to service, get the inter-communicator server
00196     //      Attention MPI_Comm_connect est un appel collectif :
00197     //  - Si lancement mpirun -c n -----> uniquement     MPI_COMM_SELF fonctionne
00198     //  - Si lancement client_server&client_server ----> MPI_COMM_WORLD fonctionne
00199     
00200     //      TIMEOUT is inefficient since MPI_Comm_Connect doesn't return if we asked for
00201     //        a service that has been unpublished !
00202     MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
00203     i = 0;
00204     while ( i != TIMEOUT  &&  MPI_Comm_connect(port_name_clt, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &com)!=MPI_SUCCESS ) { 
00205       i++; 
00206     } 
00207     MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_ARE_FATAL);
00208     if ( i == TIMEOUT ) {
00209       MPI_Finalize(); 
00210       exit(-1);
00211     }
00212   }
00213   MPI_Recv( &_n, 1, MPI_LONG, sproc,p->tag1,com,&status);
00214   _v = new T[_n];
00215   MPI_Recv( _v, _n, MPITRAITS<T>::MpiType, sproc,p->tag2,com,&status);
00216   _mySender->close(p);
00217   MPI_Comm_disconnect( &com );  
00218   size=_n;
00219   return _v;
00220 }
00221 
00222 template<class T,class CorbaSender,class servForT,class ptrForT>
00223 T *MPIReceiver<T,CorbaSender,servForT,ptrForT>::getValue(long &size)
00224 {
00225   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
00226 }
00227 
00228 #endif
00229 
00230 #ifdef HAVE_SOCKET
00231 #include <sys/types.h>
00232 #include <sys/socket.h>
00233 #include <netinet/in.h>
00234 #include <arpa/inet.h>
00235 #include <netdb.h>
00236 #include <unistd.h>
00237 #include <rpc/xdr.h>
00238 
00239 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
00240 SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::SocketReceiver(CorbaSender mySender) : _mySender(mySender)
00241 {
00242   _clientSockfd = -1;
00243   _senderDestruc=true;
00244 }
00245 
00246 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
00247 SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::~SocketReceiver()
00248 {
00249   if(_senderDestruc)
00250     {
00251       _mySender->release();
00252     }
00253 }
00254 
00255 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
00256 T *SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::getValue(long &size)
00257 {
00258   return Receiver<T,servForT,ptrForT>::getValue(size,_mySender);
00259 }
00260 
00261 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
00262 T* SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::getDistValue(long &size)
00263 {
00264   int n=0, m;
00265   T *v;
00266   XDR xp; /* pointeur sur le decodeur XDR */
00267 
00268   try{
00269     initCom();
00270 
00271     SALOME::SocketSender::param_var p = _mySender->getParam();
00272 
00273     size = p->lend - p->lstart + 1;
00274     v = new T[size];
00275 
00276     connectCom(p->internet_address, p->myport);
00277   
00278     _mySender->send();
00279 
00280     xdrmem_create(&xp,(char*)v,size*sizeof(T),XDR_DECODE );
00281     while( n < size*sizeof(T) ){
00282       m = read(_clientSockfd, (char*)v+n, size*sizeof(T)-n);
00283       if( m < 0 ){
00284         closeCom();
00285         delete [] v;
00286         SALOME::ExceptionStruct es;
00287         es.type = SALOME::COMM;
00288         es.text = "error read Socket exception";
00289         throw SALOME::SALOME_Exception(es);
00290       }
00291       n += m;
00292     }
00293     xdr_vector( &xp, (char*)v, size, sizeof(T), (xdrproc_t)myFunc);
00294     xdr_destroy( &xp );
00295     
00296     _mySender->endOfCom();
00297     closeCom();
00298   }
00299   catch(SALOME::SALOME_Exception &ex){
00300     if( ex.details.type == SALOME::COMM )
00301       {
00302         _senderDestruc=false;
00303               std::cout << ex.details.text << std::endl;
00304         throw MultiCommException("Unknown sender protocol");
00305       }
00306     else
00307       throw ex;
00308   }
00309  
00310   return v;
00311 }
00312 
00313 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
00314 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::initCom()
00315 {
00316   try{
00317     _mySender->initCom();
00318 
00319     /* Ouverture de la socket */
00320     _clientSockfd = socket(AF_INET, SOCK_STREAM, 0);
00321     if (_clientSockfd < 0) {
00322       closeCom();
00323       SALOME::ExceptionStruct es;
00324       es.type = SALOME::COMM;
00325       es.text = "error Socket exception";
00326       throw SALOME::SALOME_Exception(es);
00327     }
00328   }
00329   catch(SALOME::SALOME_Exception &ex){
00330     if( ex.details.type == SALOME::COMM )
00331       {
00332         _senderDestruc=false;
00333               std::cout << ex.details.text << std::endl;
00334         throw MultiCommException("Unknown sender protocol");
00335       }
00336     else
00337       throw ex;
00338   }
00339 
00340 }
00341 
00342 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
00343 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::connectCom(const char *dest_address, int port)
00344 {
00345   struct sockaddr_in serv_addr;
00346   struct hostent * server;
00347   SALOME::ExceptionStruct es;
00348 
00349   try{
00350     /* reception of the host structure on the remote process */
00351     server = gethostbyname(dest_address);
00352     if( server == NULL ) {
00353       closeCom();
00354       es.type = SALOME::COMM;
00355       es.text = "error unknown host Socket exception";
00356       _senderDestruc=false;
00357       throw SALOME::SALOME_Exception(es);
00358     }
00359 
00360     /* Initialisation of the socket structure */
00361     bzero((char*)&serv_addr,sizeof(serv_addr));
00362     serv_addr.sin_family = AF_INET;
00363     serv_addr.sin_addr.s_addr = INADDR_ANY;
00364     bcopy((char *)server->h_addr, 
00365           (char *)&serv_addr.sin_addr.s_addr,
00366           server->h_length);
00367     serv_addr.sin_port = htons(port);
00368     
00369     if( connect(_clientSockfd, (struct sockaddr *) & serv_addr, sizeof(struct sockaddr)) < 0 ){
00370       closeCom();
00371       es.type = SALOME::COMM;
00372       es.text = "error connect Socket exception";
00373       _senderDestruc=false;
00374       throw SALOME::SALOME_Exception(es);
00375     }
00376 
00377     _mySender->acceptCom();
00378 
00379   }
00380   catch(SALOME::SALOME_Exception &ex){
00381     if( ex.details.type == SALOME::COMM )
00382       {
00383         _senderDestruc=false;
00384         std::cout << ex.details.text << std::endl;
00385         throw MultiCommException("Unknown sender protocol");
00386       }
00387     else
00388       throw ex;
00389   }
00390 
00391 }
00392 
00393 
00394 template<class T,int (*myFunc)(XDR*,T*),class CorbaSender,class servForT,class ptrForT>
00395 void SocketReceiver<T,myFunc,CorbaSender,servForT,ptrForT>::closeCom()
00396 {
00397   _mySender->closeCom();
00398   if( _clientSockfd >= 0 ){
00399     close(_clientSockfd);
00400     _clientSockfd = -1;
00401   }
00402  
00403 }
00404 
00405 #endif