Back to index

salome-med  6.5.0
MPIAccess.cxx
Go to the documentation of this file.
00001 // Copyright (C) 2007-2012  CEA/DEN, EDF R&D
00002 //
00003 // This library is free software; you can redistribute it and/or
00004 // modify it under the terms of the GNU Lesser General Public
00005 // License as published by the Free Software Foundation; either
00006 // version 2.1 of the License.
00007 //
00008 // This library is distributed in the hope that it will be useful,
00009 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00010 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00011 // Lesser General Public License for more details.
00012 //
00013 // You should have received a copy of the GNU Lesser General Public
00014 // License along with this library; if not, write to the Free Software
00015 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
00016 //
00017 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
00018 //
00019 
00020 #include "MPIAccess.hxx"
00021 #include "InterpolationUtils.hxx"
00022 
00023 #include <iostream>
00024 
00025 using namespace std;
00026 
00027 namespace ParaMEDMEM
00028 {
00077   MPIAccess::MPIAccess(MPIProcessorGroup * ProcessorGroup, int BaseTag, int MaxTag) :
00078     _comm_interface( ProcessorGroup->getCommInterface() ) ,
00079     _intra_communicator( ProcessorGroup->getComm() )
00080   {
00081     int mpitagub ;
00082     int flag ;
00083     //MPI_Attr_get does not run with _IntraCommunicator ???
00084     //MPI_Attr_get(*_IntraCommunicator,MPI_TAG_UB,&mpitagub,&flag) ;
00085     MPI_Attr_get(MPI_COMM_WORLD,MPI_TAG_UB,&mpitagub,&flag) ;
00086     mpitagub=abs(mpitagub);
00087     if ( BaseTag != 0 )
00088       BaseTag = (BaseTag/MODULO_TAG)*MODULO_TAG ;
00089     if ( MaxTag == 0 )
00090       MaxTag = (mpitagub/MODULO_TAG-1)*MODULO_TAG ;
00091     MPI_Comm_rank( *_intra_communicator, &_my_rank ) ;
00092     if ( !flag | (BaseTag < 0) | (BaseTag >= MaxTag) | (MaxTag > mpitagub) )
00093       throw INTERP_KERNEL::Exception("wrong call to MPIAccess constructor");
00094 
00095     _processor_group = ProcessorGroup ;
00096     _processor_group_size = _processor_group->size() ;
00097     _trace = false ;
00098 
00099     _base_request = -1 ;
00100     _max_request = std::numeric_limits<int>::max() ;
00101     _request = _base_request ;
00102     
00103     _base_MPI_tag = BaseTag ;
00104     _max_MPI_tag = MaxTag ;
00105     
00106     _send_request = new int[ _processor_group_size ] ;
00107     _recv_request = new int[ _processor_group_size ] ;
00108 
00109     _send_requests.resize( _processor_group_size ) ;
00110     _recv_requests.resize( _processor_group_size ) ;
00111 
00112     _send_MPI_tag = new int[ _processor_group_size ] ;
00113     _recv_MPI_Tag = new int[ _processor_group_size ] ;
00114     int i ;
00115     for (i = 0 ; i < _processor_group_size ; i++ )
00116       {
00117         _send_request[ i ] = _max_request ;
00118         _recv_request[ i ] = _max_request ;
00119         _send_requests[ i ].resize(0) ;
00120         _recv_requests[ i ].resize(0) ;
00121         _send_MPI_tag[ i ] = _max_MPI_tag ;
00122         _recv_MPI_Tag[ i ] = _max_MPI_tag ;
00123       }
00124     MPI_Datatype array_of_types[3] ;
00125     array_of_types[0] = MPI_DOUBLE ;
00126     array_of_types[1] = MPI_DOUBLE ;
00127     array_of_types[2] = MPI_INT ;
00128     int array_of_blocklengths[3] ;
00129     array_of_blocklengths[0] = 1 ;
00130     array_of_blocklengths[1] = 1 ;
00131     array_of_blocklengths[2] = 1 ;
00132     MPI_Aint array_of_displacements[3] ;
00133     array_of_displacements[0] = 0 ;
00134     array_of_displacements[1] = sizeof(double) ;
00135     array_of_displacements[2] = 2*sizeof(double) ;
00136     MPI_Type_struct(3, array_of_blocklengths, array_of_displacements,
00137                     array_of_types, &_MPI_TIME) ;
00138     MPI_Type_commit(&_MPI_TIME) ;
00139   }
00140 
00141   MPIAccess::~MPIAccess()
00142   {
00143     delete [] _send_request ;
00144     delete [] _recv_request ;
00145     delete [] _send_MPI_tag ;
00146     delete [] _recv_MPI_Tag ;
00147     MPI_Type_free(&_MPI_TIME) ;
00148   }
00149 
00150   /*
00151     MPIAccess and "RequestIds" :
00152     ============================
00153 
00154     . WARNING : In the specification document, the distinction
00155     between "MPITags" and "RequestIds" is not clear. "MPITags"
00156     are arguments of calls to MPI. "RequestIds" does not concern
00157     calls to MPI. "RequestIds" are named "tag"as arguments in/out
00158     in the MPIAccess API in the specification documentation.
00159     But in the implementation we have the right name RequestId (or
00160     RecvRequestId/SendRequestId).
00161 
00162     . When we have a MPI write/read request via MPIAccess, we get
00163     an identifier "RequestId".
00164     That identifier matches a  structure RequestStruct of
00165     MPIAccess. The access to that structure is done with the map
00166     "_MapOfRequestStruct".
00167     That structure RequestStruct give the possibility to manage
00168     the structures MPI_Request and MPI_Status * of MPI. It give
00169     also the possibility to get informations about that request :
00170     target, send/recv, tag, [a]synchronous, type, outcount.
00171 
00172     . That identifier is used to control an asynchronous request
00173     via MPIAccess : Wait, Test, Probe, etc...
00174 
00175     . In practise "RequestId" is simply an integer fo the interval
00176     [0 , 2**32-1]. There is only one such a cyclic for
00177     [I]Sends and [I]Recvs.
00178 
00179     . That "RequestIds" and their associated structures give an easy
00180     way to manage asynchronous communications.
00181     For example we have mpi_access->Wait( int RequestId ) instead of
00182     MPI_Wait(MPI_Request *request, MPI_Status *status).
00183 
00184     . The API of MPIAccess may give the "SendRequestIds" of a "target",
00185     the "RecvRequestIds" from a "source" or the "SendRequestIds" of
00186     all "targets" or the "RecvRequestIds" of all "sources".
00187     That avoid to manage them in Presentation-ParaMEDMEM.
00188   */
00189 
00190   int MPIAccess::newRequest( MPI_Datatype datatype, int tag , int destsourcerank ,
00191                              bool fromsourcerank , bool asynchronous )
00192   {
00193     RequestStruct *mpiaccessstruct = new RequestStruct;
00194     mpiaccessstruct->MPITag = tag ;
00195     mpiaccessstruct->MPIDatatype = datatype ;
00196     mpiaccessstruct->MPITarget = destsourcerank ;
00197     mpiaccessstruct->MPIIsRecv = fromsourcerank ;
00198     MPI_Status *aStatus = new MPI_Status ;
00199     mpiaccessstruct->MPIStatus = aStatus ;
00200     mpiaccessstruct->MPIAsynchronous = asynchronous ;
00201     mpiaccessstruct->MPICompleted = !asynchronous ;
00202     mpiaccessstruct->MPIOutCount = -1 ;
00203     if ( !asynchronous )
00204       {
00205         mpiaccessstruct->MPIRequest = MPI_REQUEST_NULL ;
00206         mpiaccessstruct->MPIStatus->MPI_SOURCE = destsourcerank ;
00207         mpiaccessstruct->MPIStatus->MPI_TAG = tag ;
00208         mpiaccessstruct->MPIStatus->MPI_ERROR = MPI_SUCCESS ;
00209       }
00210     if ( _request == _max_request )
00211       _request = _base_request ;
00212     _request += 1 ;
00213     _map_of_request_struct[_request] = mpiaccessstruct ;
00214     if ( fromsourcerank )
00215       _recv_request[ destsourcerank ] = _request;
00216     else
00217       _send_request[ destsourcerank ] = _request;
00218     if ( _trace )
00219       cout << "NewRequest" << _my_rank << "( " << _request << " ) "
00220            << mpiaccessstruct << endl ;
00221     return _request ;
00222   }
00223 
00224   /*
00225     MPIAccess and "tags" (or "MPITags") :
00226     =====================================
00227 
00228     . The constructor give the possibility to choose an interval of
00229     tags to use : [BaseTag , MaxTag].
00230     The default is [ 0 , MPI_TAG_UB], MPI_TAG_UB being the maximum
00231     value in an implementation of MPI (minimum 32767 = 2**15-1).
00232     On awa with the implementation lam MPI_TAG_UB value is
00233     7353944. The norma MPI specify that value is the same in all
00234     processes started by mpirun.
00235     In the case of the use of the same IntraCommunicator in a process
00236     for several distinct data flows (or for several IntraCommunicators
00237     with common processes), that permits to avoid ambibuity
00238     and may help debug.
00239 
00240     . In MPIAccess the tags have two parts (#define MODULO_TAG 10) :
00241     + The last decimal digit decimal correspond to MPI_DataType ( 1 for
00242     TimeMessages, 2 for MPI_INT and 3 for MPI_DOUBLE)
00243     + The value of other digits correspond to a circular numero for each
00244     message.
00245     + A TimeMessage and the associated DataMessage have the same numero
00246     (but the types are different and the tags also).
00247 
00248     . For a Send of a message from a process "source" to a process
00249     "target", we have _send_MPI_tag[target] in the process
00250     source (it contains the last "tag" used for the Send of a pour l'envoi de
00251     message to the process target).
00252     And in the process "target" which receive that message, we have
00253     _recv_MPI_Tag[source] (it contains the last "tag" used for the Recv
00254     of messages from the process source).
00255     Naturally in the MPI norma the values of that tags must be the same.
00256   */
00257   int MPIAccess::newSendTag( MPI_Datatype datatype, int destrank , int method ,
00258                              bool asynchronous, int &RequestId )
00259   {
00260     int tag ;
00261     tag = incrTag( _send_MPI_tag[destrank] ) ;
00262     tag = valTag( tag, method ) ;
00263     _send_MPI_tag[ destrank ] = tag ;
00264     RequestId = newRequest( datatype, tag, destrank , false , asynchronous ) ;
00265     _send_request[ destrank ] = RequestId ;
00266     _send_requests[ destrank ].push_back( RequestId ) ;
00267     return tag ;
00268   }
00269 
00270   int MPIAccess::newRecvTag( MPI_Datatype datatype, int sourcerank , int method ,
00271                              bool asynchronous, int &RequestId )
00272   {
00273     int tag ;
00274     tag = incrTag( _recv_MPI_Tag[sourcerank] ) ;
00275     tag = valTag( tag, method ) ;
00276     _recv_MPI_Tag[ sourcerank ] = tag ;
00277     RequestId = newRequest( datatype, tag , sourcerank , true , asynchronous ) ;
00278     _recv_request[ sourcerank ] = RequestId ;
00279     _recv_requests[ sourcerank ].push_back( RequestId ) ;
00280     return tag ;
00281   }
00282 
00283   // Returns the number of all SendRequestIds that may be used to allocate
00284   // ArrayOfSendRequests for the call to SendRequestIds
00285   int MPIAccess::sendRequestIdsSize()
00286   {
00287     int size = 0;
00288     for (int i = 0 ; i < _processor_group_size ; i++ )
00289       size += _send_requests[ i ].size() ;
00290     return size ;
00291   }
00292 
00293   // Returns in ArrayOfSendRequests with the dimension "size" all the
00294   // SendRequestIds
00295   int MPIAccess::sendRequestIds(int size, int *ArrayOfSendRequests)
00296   {
00297     int destrank ;
00298     int i = 0 ;
00299     for ( destrank = 0 ; destrank < _processor_group_size ; destrank++ )
00300       {
00301         list< int >::const_iterator iter ;
00302         for (iter = _send_requests[ destrank ].begin() ; iter != _send_requests[destrank].end() ; iter++ )
00303           ArrayOfSendRequests[i++] = *iter ;
00304       }
00305     return i ;
00306   }
00307 
00308   // Returns the number of all RecvRequestIds that may be used to allocate
00309   // ArrayOfRecvRequests for the call to RecvRequestIds
00310   int MPIAccess::recvRequestIdsSize()
00311   {
00312     int size = 0 ;
00313     for (int i = 0 ; i < _processor_group_size ; i++ )
00314       size += _recv_requests[ i ].size() ;
00315     return size ;
00316   }
00317 
00318   // Returns in ArrayOfRecvRequests with the dimension "size" all the
00319   // RecvRequestIds
00320   int MPIAccess::recvRequestIds(int size, int *ArrayOfRecvRequests)
00321   {
00322     int sourcerank ;
00323     int i = 0 ;
00324     for ( sourcerank = 0 ; sourcerank < _processor_group_size ; sourcerank++ )
00325       {
00326         list< int >::const_iterator iter ;
00327         for (iter = _recv_requests[ sourcerank ].begin() ; iter != _recv_requests[sourcerank].end() ; iter++ )
00328           ArrayOfRecvRequests[i++] = *iter ;
00329       }
00330     return i ;
00331   }
00332 
00333   // Returns in ArrayOfSendRequests with the dimension "size" all the
00334   // SendRequestIds to a destination rank
00335   int MPIAccess::sendRequestIds(int destrank, int size, int *ArrayOfSendRequests)
00336   {
00337     if (size < (int)_send_requests[destrank].size() )
00338       throw INTERP_KERNEL::Exception("wrong call to MPIAccess::SendRequestIds");
00339     int i = 0 ;
00340     list< int >::const_iterator iter ;
00341     for (iter = _send_requests[ destrank ].begin() ; iter != _send_requests[destrank].end() ; iter++ )
00342       ArrayOfSendRequests[i++] = *iter ;
00343     return _send_requests[destrank].size() ;
00344   }
00345 
00346   // Returns in ArrayOfRecvRequests with the dimension "size" all the
00347   // RecvRequestIds from a sourcerank
00348   int MPIAccess::recvRequestIds(int sourcerank, int size, int *ArrayOfRecvRequests)
00349   {
00350     if (size < (int)_recv_requests[sourcerank].size() )
00351       throw INTERP_KERNEL::Exception("wrong call to MPIAccess::RecvRequestIds");
00352     int i = 0 ;
00353     list< int >::const_iterator iter ;
00354     _recv_requests[ sourcerank ] ;
00355     for (iter = _recv_requests[ sourcerank ].begin() ; iter != _recv_requests[sourcerank].end() ; iter++ )
00356       ArrayOfRecvRequests[i++] = *iter ;
00357     return _recv_requests[sourcerank].size() ;
00358   }
00359 
00360   // Send in synchronous mode count values of type datatype from buffer to target
00361   // (returns RequestId identifier even if the corresponding structure is deleted :
00362   // it is only in order to have the same signature as the asynchronous mode)
00363   int MPIAccess::send(void* buffer, int count, MPI_Datatype datatype, int target, int &RequestId)
00364   {
00365     int sts = MPI_SUCCESS ;
00366     RequestId = -1 ;
00367     if ( count )
00368       {
00369         _MessageIdent aMethodIdent = methodId( datatype ) ;
00370         int MPItag = newSendTag( datatype, target , aMethodIdent , false , RequestId ) ;
00371         if ( aMethodIdent == _message_time )
00372           {
00373             TimeMessage *aTimeMsg = (TimeMessage *) buffer ;
00374             aTimeMsg->tag = MPItag ;
00375           }
00376         deleteRequest( RequestId ) ;
00377         sts = _comm_interface.send(buffer, count, datatype, target, MPItag,
00378                                   *_intra_communicator ) ;
00379         if ( _trace )
00380           cout << "MPIAccess::Send" << _my_rank << " SendRequestId "
00381                << RequestId << " count " << count << " target " << target
00382                << " MPItag " << MPItag << endl ;
00383       }
00384     return sts ;
00385   }
00386 
00387   // Receive (read) in synchronous mode count values of type datatype in buffer from source
00388   // (returns RequestId identifier even if the corresponding structure is deleted :
00389   // it is only in order to have the same signature as the asynchronous mode)
00390   // The output argument OutCount is optionnal : *OutCount <= count
00391   int MPIAccess::recv(void* buffer, int count, MPI_Datatype datatype, int source, int &RequestId, int *OutCount)
00392   {
00393     int sts = MPI_SUCCESS ;
00394     RequestId = -1 ;
00395     if ( OutCount != NULL )
00396       *OutCount = -1 ;
00397     if ( count )
00398       {
00399         _MessageIdent aMethodIdent = methodId( datatype ) ;
00400         int MPItag = newRecvTag( datatype, source , aMethodIdent , false , RequestId ) ;
00401         sts =  _comm_interface.recv(buffer, count, datatype, source, MPItag,
00402                                    *_intra_communicator , MPIStatus( RequestId ) ) ;
00403         int outcount = 0 ;
00404         if ( sts == MPI_SUCCESS )
00405           {
00406             MPI_Datatype datatype = MPIDatatype( RequestId ) ;
00407             _comm_interface.getCount(MPIStatus( RequestId ), datatype, &outcount ) ;
00408             setMPIOutCount( RequestId , outcount ) ;
00409             setMPICompleted( RequestId , true ) ;
00410             deleteStatus( RequestId ) ;
00411           }
00412         if ( OutCount != NULL )
00413           *OutCount = outcount ;
00414         if ( _trace )
00415           cout << "MPIAccess::Recv" << _my_rank << " RecvRequestId "
00416                << RequestId << " count " << count << " source " << source
00417                << " MPItag " << MPItag << endl ;
00418         deleteRequest( RequestId ) ;
00419       }
00420     return sts ;
00421   }
00422 
00423   // Send in asynchronous mode count values of type datatype from buffer to target
00424   // Returns RequestId identifier.
00425   int MPIAccess::ISend(void* buffer, int count, MPI_Datatype datatype, int target, int &RequestId)
00426   {
00427     int sts = MPI_SUCCESS ;
00428     RequestId = -1 ;
00429     if ( count )
00430       {
00431         _MessageIdent aMethodIdent = methodId( datatype ) ;
00432         int MPItag = newSendTag( datatype, target , aMethodIdent , true , RequestId ) ;
00433         if ( aMethodIdent == _message_time )
00434           {
00435             TimeMessage *aTimeMsg = (TimeMessage *) buffer ;
00436             aTimeMsg->tag = MPItag ;
00437           }
00438         MPI_Request *aSendRequest = MPIRequest( RequestId ) ;
00439         if ( _trace )
00440           {
00441             cout << "MPIAccess::ISend" << _my_rank << " ISendRequestId "
00442                  << RequestId << " count " << count << " target " << target
00443                  << " MPItag " << MPItag << endl ;
00444             if ( MPItag == 1 )
00445               cout << "MPIAccess::ISend" << _my_rank << " time "
00446                    << ((TimeMessage *)buffer)->time << " " << ((TimeMessage *)buffer)->deltatime
00447                    << endl ;
00448           }
00449         sts = _comm_interface.Isend(buffer, count, datatype, target, MPItag,
00450                                    *_intra_communicator , aSendRequest) ;
00451       }
00452     return sts ;
00453   }
00454 
00455   // Receive (read) in asynchronous mode count values of type datatype in buffer from source
00456   // returns RequestId identifier.
00457   int MPIAccess::IRecv(void* buffer, int count, MPI_Datatype datatype, int source, int &RequestId)
00458   {
00459     int sts = MPI_SUCCESS ;
00460     RequestId = -1 ;
00461     if ( count )
00462       {
00463         _MessageIdent aMethodIdent = methodId( datatype ) ;
00464         int MPItag = newRecvTag( datatype, source , aMethodIdent , true , RequestId ) ;
00465         MPI_Request *aRecvRequest = MPIRequest( RequestId ) ;
00466         if ( _trace )
00467           {
00468             cout << "MPIAccess::IRecv" << _my_rank << " IRecvRequestId "
00469                  << RequestId << " count " << count << " source " << source
00470                  << " MPItag " << MPItag << endl ;
00471             if ( MPItag == 1 )
00472               cout << "MPIAccess::ISend" << _my_rank << " time "
00473                    << ((TimeMessage *)buffer)->time << " " << ((TimeMessage *)buffer)->deltatime
00474                    << endl ;
00475           }
00476         sts = _comm_interface.Irecv(buffer, count, datatype, source, MPItag,
00477                                    *_intra_communicator , aRecvRequest) ;
00478       }
00479     return sts ;
00480   }
00481 
00482   // Perform a Send and a Recv in synchronous mode
00483   int MPIAccess::sendRecv(void* sendbuf, int sendcount, MPI_Datatype sendtype,
00484                           int dest, int &SendRequestId,
00485                           void* recvbuf, int recvcount, MPI_Datatype recvtype,
00486                           int source, int &RecvRequestId, int *OutCount)
00487   {
00488     int sts = MPI_SUCCESS ;
00489     SendRequestId = -1 ;
00490     RecvRequestId = -1 ;
00491     if ( recvcount )
00492       sts = IRecv(recvbuf, recvcount, recvtype, source, RecvRequestId) ;
00493     int outcount = -1 ;
00494     if ( _trace )
00495       cout << "MPIAccess::SendRecv" << _my_rank << " IRecv RecvRequestId "
00496            << RecvRequestId << endl ;
00497     if ( sts == MPI_SUCCESS )
00498       {
00499         if ( sendcount )
00500           sts = send(sendbuf, sendcount, sendtype, dest, SendRequestId) ;
00501         if ( _trace )
00502           cout << "MPIAccess::SendRecv" << _my_rank << " Send SendRequestId "
00503                << SendRequestId << endl ;
00504         if ( sts == MPI_SUCCESS && recvcount )
00505           {
00506             sts = wait( RecvRequestId ) ;
00507             outcount = MPIOutCount( RecvRequestId ) ;
00508             if ( _trace )
00509               cout << "MPIAccess::SendRecv" << _my_rank << " IRecv RecvRequestId "
00510                    << RecvRequestId << " outcount " << outcount << endl ;
00511           }
00512       }
00513     if ( OutCount != NULL )
00514       {
00515         *OutCount = outcount ;
00516         if ( _trace )
00517           cout << "MPIAccess::SendRecv" << _my_rank << " *OutCount = " << *OutCount
00518                << endl ;
00519       }
00520     deleteRequest( RecvRequestId ) ;
00521     return sts ;
00522   }
00523 
00524   // Perform a Send and a Recv in asynchronous mode
00525   int MPIAccess::ISendRecv(void* sendbuf, int sendcount, MPI_Datatype sendtype,
00526                            int dest, int &SendRequestId,
00527                            void* recvbuf, int recvcount, MPI_Datatype recvtype,
00528                            int source, int &RecvRequestId)
00529   {
00530     int sts = MPI_SUCCESS ;
00531     SendRequestId = -1 ;
00532     RecvRequestId = -1 ;
00533     if ( recvcount )
00534       sts = IRecv(recvbuf, recvcount, recvtype, source, RecvRequestId) ;
00535     if ( sts == MPI_SUCCESS )
00536       if ( sendcount )
00537         sts = ISend(sendbuf, sendcount, sendtype, dest, SendRequestId) ;
00538     return sts ;
00539   }
00540 
00541   // Perform a wait of a Send or Recv asynchronous Request
00542   // Do nothing for a synchronous Request
00543   // Manage MPI_Request * and MPI_Status * structure
00544   int MPIAccess::wait( int RequestId )
00545   {
00546     int status = MPI_SUCCESS ;
00547     if ( !MPICompleted( RequestId ) )
00548       {
00549         if ( *MPIRequest( RequestId ) != MPI_REQUEST_NULL )
00550           {
00551             if ( _trace )
00552               cout << "MPIAccess::Wait" << _my_rank << " -> wait( " << RequestId
00553                    << " ) MPIRequest " << MPIRequest( RequestId ) << " MPIStatus "
00554                    << MPIStatus( RequestId ) << " MPITag " << MPITag( RequestId )
00555                    << " MPIIsRecv " << MPIIsRecv( RequestId ) << endl ;
00556             status = _comm_interface.wait(MPIRequest( RequestId ), MPIStatus( RequestId )) ;
00557           }
00558         else
00559           {
00560             if ( _trace )
00561               cout << "MPIAccess::Wait" << _my_rank << " MPIRequest == MPI_REQUEST_NULL"
00562                    << endl ;
00563           }
00564         setMPICompleted( RequestId , true ) ;
00565         if ( MPIIsRecv( RequestId ) && MPIStatus( RequestId ) )
00566           {
00567             MPI_Datatype datatype = MPIDatatype( RequestId ) ;
00568             int outcount ;
00569             status = _comm_interface.getCount(MPIStatus( RequestId ), datatype,
00570                                              &outcount ) ;
00571             if ( status == MPI_SUCCESS )
00572               {
00573                 setMPIOutCount( RequestId , outcount ) ;
00574                 deleteStatus( RequestId ) ;
00575                 if ( _trace )
00576                   cout << "MPIAccess::Wait" << _my_rank << " RequestId " << RequestId
00577                        << "MPIIsRecv " << MPIIsRecv( RequestId ) << " outcount " << outcount
00578                        << endl ;
00579               }
00580             else
00581               {
00582                 if ( _trace )
00583                   cout << "MPIAccess::Wait" << _my_rank << " MPIIsRecv "
00584                        << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
00585               }
00586           }
00587         else
00588           {
00589             if ( _trace )
00590               cout << "MPIAccess::Wait" << _my_rank << " MPIIsRecv " << MPIIsRecv( RequestId )
00591                    << " MPIOutCount " << MPIOutCount( RequestId ) << endl ;
00592           }
00593       }
00594     if ( _trace )
00595       cout << "MPIAccess::Wait" << _my_rank << " RequestId " << RequestId
00596            << " Request " << MPIRequest( RequestId )
00597            << " Status " << MPIStatus( RequestId ) << " MPICompleted "
00598            << MPICompleted( RequestId ) << " MPIOutCount " << MPIOutCount( RequestId )
00599            << endl ;
00600     return status ;
00601   }
00602 
00603   // Perform a "test" of a Send or Recv asynchronous Request
00604   // If the request is done, returns true in the flag argument
00605   // If the request is not finished, returns false in the flag argument
00606   // Do nothing for a synchronous Request
00607   // Manage MPI_request * and MPI_status * structure
00608   int MPIAccess::test(int RequestId, int &flag)
00609   {
00610     int status = MPI_SUCCESS ;
00611     flag = MPICompleted( RequestId ) ;
00612     if ( _trace )
00613       cout << "MPIAccess::Test" << _my_rank << " flag " << flag ;
00614     if ( MPIIsRecv( RequestId ) )
00615       {
00616         if ( _trace )
00617           cout << " Recv" ;
00618       }
00619     else
00620       {
00621         if ( _trace )
00622           cout << " Send" ;
00623       }
00624     if( _trace )
00625       cout << "Request" << RequestId << " " << MPIRequest( RequestId )
00626            << " Status " << MPIStatus( RequestId ) << endl ;
00627     if ( !flag )
00628       {
00629         if ( *MPIRequest( RequestId ) != MPI_REQUEST_NULL )
00630           {
00631             if ( _trace )
00632               cout << "MPIAccess::Test" << _my_rank << " -> test( " << RequestId
00633                    << " ) MPIRequest " << MPIRequest( RequestId )
00634                    << " MPIStatus " << MPIStatus( RequestId )
00635                    << " MPITag " << MPITag( RequestId )
00636                    << " MPIIsRecv " << MPIIsRecv( RequestId ) << endl ;
00637             status = _comm_interface.test(MPIRequest( RequestId ), &flag,
00638                                          MPIStatus( RequestId )) ;
00639           }
00640         else
00641           {
00642             if ( _trace )
00643               cout << "MPIAccess::Test" << _my_rank << " MPIRequest == MPI_REQUEST_NULL"
00644                    << endl ;
00645           }
00646         if ( flag )
00647           {
00648             setMPICompleted( RequestId , true ) ;
00649             if ( MPIIsRecv( RequestId ) && MPIStatus( RequestId ) )
00650               {
00651                 int outcount ;
00652                 MPI_Datatype datatype = MPIDatatype( RequestId ) ;
00653                 status = _comm_interface.getCount( MPIStatus( RequestId ), datatype,
00654                                                   &outcount ) ;
00655                 if ( status == MPI_SUCCESS )
00656                   {
00657                     setMPIOutCount( RequestId , outcount ) ;
00658                     deleteStatus( RequestId ) ;
00659                     if ( _trace )
00660                       cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
00661                            << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
00662                   }
00663                 else
00664                   {
00665                     if ( _trace )
00666                       cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
00667                            << MPIIsRecv( RequestId ) << " outcount " << outcount << endl ;
00668                   }
00669               }
00670             else
00671               {
00672                 if ( _trace )
00673                   cout << "MPIAccess::Test" << _my_rank << " MPIIsRecv "
00674                        << MPIIsRecv( RequestId ) << " MPIOutCount "
00675                        << MPIOutCount( RequestId ) << endl ;
00676               }
00677           }
00678       }
00679     if ( _trace )
00680       cout << "MPIAccess::Test" << _my_rank << " RequestId " << RequestId
00681            << " flag " << flag << " MPICompleted " << MPICompleted( RequestId )
00682            << " MPIOutCount " << MPIOutCount( RequestId ) << endl ;
00683     return status ;
00684   }
00685 
00686   int MPIAccess::waitAny(int count, int *array_of_RequestIds, int &RequestId)
00687   {
00688     int status = MPI_ERR_OTHER ;
00689     RequestId = -1 ;
00690     cout << "MPIAccess::WaitAny not yet implemented" << endl ;
00691     return status ;
00692   }
00693 
00694   int MPIAccess::testAny(int count, int *array_of_RequestIds, int &RequestId, int &flag)
00695   {
00696     int status = MPI_ERR_OTHER ;
00697     RequestId = -1 ;
00698     flag = 0 ;
00699     cout << "MPIAccess::TestAny not yet implemented" << endl ;
00700     return status ;
00701   }
00702   
00703   // Perform a wait of each Send or Recv asynchronous Request of the array 
00704   // array_of_RequestIds of size "count".
00705   // That array may be filled with a call to SendRequestIdsSize or RecvRequestIdsSize
00706   // Do nothing for a synchronous Request
00707   // Manage MPI_Request * and MPI_Status * structure
00708   int MPIAccess::waitAll(int count, int *array_of_RequestIds)
00709   {
00710     if ( _trace )
00711       cout << "WaitAll" << _my_rank << " : count " << count << endl ;
00712     int status ;
00713     int retstatus = MPI_SUCCESS ;
00714     int i ;
00715     for ( i = 0 ; i < count ; i++ )
00716       {
00717         if ( _trace )
00718           cout << "WaitAll" << _my_rank << " " << i << " -> Wait( "
00719                << array_of_RequestIds[i] << " )" << endl ;
00720         status = wait( array_of_RequestIds[i] ) ;
00721         if ( status != MPI_SUCCESS )
00722           retstatus = status ;
00723       }
00724     if ( _trace )
00725       cout << "EndWaitAll" << _my_rank << endl ;
00726     return retstatus ;
00727   }
00728 
00729   // Perform a "test" of each Send or Recv asynchronous Request of the array 
00730   // array_of_RequestIds of size "count".
00731   // That array may be filled with a call to SendRequestIdsSize or RecvRequestIdsSize
00732   // If all requests are done, returns true in the flag argument
00733   // If all requests are not finished, returns false in the flag argument
00734   // Do nothing for a synchronous Request
00735   // Manage MPI_Request * and MPI_Status * structure
00736   int MPIAccess::testAll(int count, int *array_of_RequestIds, int &flag)
00737   {
00738     if ( _trace )
00739       cout << "TestAll" << _my_rank << " : count " << count << endl ;
00740     int status ;
00741     int retstatus = MPI_SUCCESS ;
00742     bool retflag = true ;
00743     int i ;
00744     for ( i = 0 ; i < count ; i++ )
00745       {
00746         status = test( array_of_RequestIds[i] , flag ) ;
00747         retflag = retflag && (flag != 0) ;
00748         if ( status != MPI_SUCCESS )
00749           retstatus = status ;
00750       }
00751     flag = retflag ;
00752     if ( _trace )
00753       cout << "EndTestAll" << _my_rank << endl ;
00754     return retstatus ;
00755   }
00756 
00757   int MPIAccess::waitSome(int count, int *array_of_RequestIds, int outcount,
00758                           int *outarray_of_RequestIds)
00759   {
00760     int status = MPI_ERR_OTHER ;
00761     cout << "MPIAccess::WaitSome not yet implemented" << endl ;
00762     return status ;
00763   }
00764 
00765   int MPIAccess::testSome(int count, int *array_of_RequestIds, int outcounts,
00766                           int *outarray_of_RequestIds)
00767   {
00768     int status = MPI_ERR_OTHER ;
00769     cout << "MPIAccess::TestSome not yet implemented" << endl ;
00770     return status ;
00771   }
00772   
00773   // Probe checks if a message is available for read from FromSource rank.
00774   // Returns the corresponding source, MPITag, datatype and outcount
00775   // Probe is a blocking call which wait until a message is available
00776   int MPIAccess::probe(int FromSource, int &source, int &MPITag,
00777                        MPI_Datatype &myDatatype, int &outcount)
00778   {
00779     MPI_Status aMPIStatus ;
00780     int sts =  _comm_interface.probe( FromSource, MPI_ANY_TAG,
00781                                      *_intra_communicator , &aMPIStatus ) ;
00782     if ( sts == MPI_SUCCESS )
00783       {
00784         source = aMPIStatus.MPI_SOURCE ;
00785         MPITag = aMPIStatus.MPI_TAG ;
00786         int MethodId = (MPITag % MODULO_TAG) ;
00787         myDatatype = datatype( (ParaMEDMEM::_MessageIdent) MethodId ) ;
00788         _comm_interface.getCount(&aMPIStatus, myDatatype, &outcount ) ;
00789         if ( _trace )
00790           cout << "MPIAccess::Probe" << _my_rank << " FromSource " << FromSource
00791                << " source " << source << " MPITag " << MPITag << " MethodId "
00792                << MethodId << " datatype " << myDatatype << " outcount " << outcount
00793                << endl ;
00794       }
00795     else
00796       {
00797         source = -1 ;
00798         MPITag = -1 ;
00799         myDatatype = 0 ;
00800         outcount = -1 ;
00801       }
00802     return sts ;
00803   }
00804 
00805   // IProbe checks if a message is available for read from FromSource rank.
00806   // If there is a message available, returns the corresponding source,
00807   // MPITag, datatype and outcount with flag = true
00808   // If not, returns flag = false
00809   int MPIAccess::IProbe(int FromSource, int &source, int &MPITag,
00810                         MPI_Datatype &myDataType, int &outcount, int &flag)
00811   {
00812     MPI_Status aMPIStatus ;
00813     int sts =  _comm_interface.Iprobe( FromSource, MPI_ANY_TAG,
00814                                       *_intra_communicator , &flag,
00815                                       &aMPIStatus ) ;
00816     if ( sts == MPI_SUCCESS && flag )
00817       {
00818         source = aMPIStatus.MPI_SOURCE ;
00819         MPITag = aMPIStatus.MPI_TAG ;
00820         int MethodId = (MPITag % MODULO_TAG) ;
00821         myDataType = datatype( (ParaMEDMEM::_MessageIdent) MethodId ) ;
00822         _comm_interface.getCount(&aMPIStatus, myDataType, &outcount ) ;
00823         if ( _trace )
00824           cout << "MPIAccess::IProbe" << _my_rank << " FromSource " << FromSource
00825                << " source " << source << " MPITag " << MPITag << " MethodId "
00826                << MethodId << " datatype " << myDataType << " outcount " << outcount
00827                << " flag " << flag << endl ;
00828       }
00829     else
00830       {
00831         source = -1 ;
00832         MPITag = -1 ;
00833         myDataType = 0 ;
00834         outcount = -1 ;
00835       }
00836     return sts ;
00837   }
00838 
00839   // Cancel concerns a "posted" asynchronous IRecv
00840   // Returns flag = true if the receiving request was successfully canceled
00841   // Returns flag = false if the receiving request was finished but not canceled
00842   // Use cancel, wait and test_cancelled of the MPI API
00843   int MPIAccess::cancel( int RecvRequestId, int &flag )
00844   {
00845     flag = 0 ;
00846     int sts = _comm_interface.cancel( MPIRequest( RecvRequestId ) ) ;
00847     if ( sts == MPI_SUCCESS )
00848       {
00849         sts = _comm_interface.wait( MPIRequest( RecvRequestId ) ,
00850                                    MPIStatus( RecvRequestId ) ) ;
00851         if ( sts == MPI_SUCCESS )
00852           sts = _comm_interface.testCancelled( MPIStatus( RecvRequestId ) , &flag ) ;
00853       }
00854     return sts ;
00855   }
00856 
00857   // Cancel concerns a "pending" receiving message (without IRecv "posted")
00858   // Returns flag = true if the message was successfully canceled
00859   // Returns flag = false if the receiving request was finished but not canceled
00860   // Use Irecv, cancel, wait and test_cancelled of the MPI API
00861   int MPIAccess::cancel( int source, int theMPITag, MPI_Datatype datatype, int outcount, int &flag )
00862   {
00863     int sts ;
00864     MPI_Aint extent ;
00865     flag = 0 ;
00866     sts =  MPI_Type_extent( datatype , &extent ) ;
00867     if ( sts == MPI_SUCCESS )
00868       {
00869         void * recvbuf = malloc( extent*outcount ) ;
00870         MPI_Request aRecvRequest ;
00871         if ( _trace )
00872           cout << "MPIAccess::Cancel" << _my_rank << " Irecv extent " << extent
00873                << " datatype " << datatype << " source " << source << " theMPITag "
00874                << theMPITag << endl ;
00875         sts = _comm_interface.Irecv( recvbuf, outcount, datatype, source, theMPITag,
00876                                     *_intra_communicator , &aRecvRequest ) ;
00877         if ( sts == MPI_SUCCESS )
00878           {
00879             sts = _comm_interface.cancel( &aRecvRequest ) ;
00880             if ( _trace )
00881               cout << "MPIAccess::Cancel" << _my_rank << " theMPITag " << theMPITag
00882                    << " cancel done" << endl ;
00883             if ( sts == MPI_SUCCESS )
00884               {
00885                 MPI_Status aStatus ;
00886                 if ( _trace )
00887                   cout << "MPIAccess::Cancel" << _my_rank << " wait" << endl ;
00888                 sts = _comm_interface.wait( &aRecvRequest , &aStatus ) ;
00889                 if ( sts == MPI_SUCCESS )
00890                   {
00891                     if ( _trace )
00892                       cout << "MPIAccess::Cancel" << _my_rank << " test_cancelled" << endl ;
00893                     sts = _comm_interface.testCancelled( &aStatus , &flag ) ;
00894                   }
00895               }
00896           }
00897         if ( _trace && datatype == timeType() )
00898           cout << "MPIAccess::Cancel" << _my_rank << " time "
00899                << ((TimeMessage *) recvbuf)->time << " "
00900                << ((TimeMessage *) recvbuf)->deltatime << endl ;
00901         free( recvbuf ) ;
00902       }
00903     if ( _trace )
00904       cout << "MPIAccess::Cancel" << _my_rank << " flag " << flag << endl ;
00905     return sts ;
00906   }
00907 
00908 
00909   // CancelAll concerns all "pending" receiving message (without IRecv "posted")
00910   // CancelAll use IProbe and Cancel (see obove)
00911   int MPIAccess::cancelAll()
00912   {
00913     int sts = MPI_SUCCESS ;
00914     int target ;
00915     int source ;
00916     int MPITag ;
00917     MPI_Datatype datatype ;
00918     int outcount ;
00919     int flag ;
00920     for ( target = 0 ; target < _processor_group_size ; target++ )
00921       {
00922         sts = IProbe(target, source, MPITag, datatype, outcount, flag) ;
00923         if ( sts == MPI_SUCCESS && flag )
00924           {
00925             sts = cancel(source, MPITag, datatype, outcount, flag) ;
00926             if ( _trace )
00927               cout << "MPIAccess::CancelAll" << _my_rank << " source " << source
00928                    << " MPITag " << MPITag << " datatype " << datatype
00929                    << " outcount " << outcount << " Cancel flag " << flag << endl ;
00930             if ( sts != MPI_SUCCESS )
00931               break ;
00932           }
00933         else if ( sts != MPI_SUCCESS )
00934           break ;
00935       }
00936     return sts ;
00937   }
00938 
00939   // Same as barrier of MPI API
00940   int MPIAccess::barrier()
00941   {
00942     int status = _comm_interface.barrier( *_intra_communicator ) ;
00943     return status ;
00944   }
00945 
00946   // Same as Error_string of MPI API
00947   int MPIAccess::errorString(int errorcode, char *string, int *resultlen) const
00948   {
00949     return _comm_interface.errorString( errorcode, string, resultlen) ;
00950   }
00951   
00952   // Returns source, tag, error and outcount corresponding to receiving RequestId
00953   // By default the corresponding structure of RequestId is deleted
00954   int MPIAccess::status(int RequestId, int &source, int &tag, int &error,
00955                         int &outcount, bool keepRequestStruct)
00956   {
00957     MPI_Status *myStatus = MPIStatus( RequestId ) ;
00958     if ( _trace )
00959       cout << "MPIAccess::status" << _my_rank << " RequestId " << RequestId
00960            << " status " << myStatus << endl ;
00961     if ( myStatus != NULL && MPIAsynchronous( RequestId ) &&
00962          MPICompleted( RequestId ) )
00963       {
00964         if ( MPIIsRecv( RequestId ) )
00965           {
00966             source = myStatus->MPI_SOURCE ;
00967             tag = myStatus->MPI_TAG ;
00968             error = myStatus->MPI_ERROR ;
00969             MPI_Datatype datatype = MPIDatatype( RequestId ) ;
00970             _comm_interface.getCount(myStatus, datatype, &outcount ) ;
00971             if ( _trace )
00972               cout << "MPIAccess::status" << _my_rank << " RequestId " << RequestId
00973                    << " status " << myStatus << " outcount " << outcount << endl ;
00974             setMPIOutCount( RequestId , outcount ) ;
00975           }
00976         else
00977           {
00978             source = MPITarget( RequestId ) ;
00979             tag = MPITag( RequestId ) ;
00980             error = 0 ;
00981             outcount = MPIOutCount( RequestId ) ;
00982           }
00983         if ( !keepRequestStruct )
00984           deleteRequest( RequestId ) ;
00985         return MPI_SUCCESS ;
00986       }
00987     else
00988       {
00989         source = MPITarget( RequestId ) ;
00990         tag = MPITag( RequestId ) ;
00991         error = 0 ;
00992         outcount = MPIOutCount( RequestId ) ;
00993       }
00994     return MPI_SUCCESS ;
00995   }
00996   
00997   int MPIAccess::requestFree( MPI_Request *request )
00998   {
00999     return _comm_interface.requestFree( request ) ;
01000   }
01001   
01002   // Print all informations of all known requests for debugging purpose
01003   void MPIAccess::check() const
01004   {
01005     int i = 0 ;
01006     map< int , RequestStruct * >::const_iterator MapOfRequestStructiterator ;
01007     cout << "MPIAccess::Check" << _my_rank << "_map_of_request_struct_size "
01008          << _map_of_request_struct.size() << endl ;
01009     for ( MapOfRequestStructiterator = _map_of_request_struct.begin() ;
01010           MapOfRequestStructiterator != _map_of_request_struct.end() ;
01011           MapOfRequestStructiterator++ )
01012       {
01013         if ( MapOfRequestStructiterator->second != NULL )
01014           {
01015             cout << "    Check" << _my_rank << " " << i << ". Request"
01016                  << MapOfRequestStructiterator->first << "-->" ;
01017             if ( (MapOfRequestStructiterator->second)->MPIAsynchronous )
01018               cout << "I" ;
01019             if ( (MapOfRequestStructiterator->second)->MPIIsRecv )
01020               cout << "Recv from " ;
01021             else
01022               cout << "Send to " ;
01023             cout << (MapOfRequestStructiterator->second)->MPITarget
01024                  << " MPITag " << (MapOfRequestStructiterator->second)->MPITag
01025                  << " DataType " << (MapOfRequestStructiterator->second)->MPIDatatype
01026                  << " Request " << (MapOfRequestStructiterator->second)->MPIRequest
01027                  << " Status " << (MapOfRequestStructiterator->second)->MPIStatus
01028                  << " Completed " << (MapOfRequestStructiterator->second)->MPICompleted
01029                  << endl ;
01030           }
01031         i++ ;
01032       }
01033   }
01034 
01035   // Outputs fields of a TimeMessage structure
01036   ostream & operator<< (ostream & f ,const TimeMessage & aTimeMsg )
01037   {
01038     f << " time " << aTimeMsg.time << " deltatime " << aTimeMsg.deltatime
01039       << " tag " << aTimeMsg.tag ;
01040     return f;
01041   }
01042 
01043   // Outputs the DataType coded in a Tag
01044   ostream & operator<< (ostream & f ,const _MessageIdent & methodtype )
01045   {
01046     switch (methodtype)
01047       {
01048       case _message_time :
01049         f << " MethodTime ";
01050         break;
01051       case _message_int :
01052         f << " MPI_INT ";
01053         break;
01054       case _message_double :
01055         f << " MPI_DOUBLE ";
01056         break;
01057       default :
01058         f << " UnknownMethodType ";
01059         break;
01060       }
01061     return f;
01062   }
01063 }