Back to index

salome-kernel  6.5.0
SALOME_ParallelComponent_i.cxx
Go to the documentation of this file.
00001 // Copyright (C) 2007-2012  CEA/DEN, EDF R&D, OPEN CASCADE
00002 //
00003 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
00004 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
00005 //
00006 // This library is free software; you can redistribute it and/or
00007 // modify it under the terms of the GNU Lesser General Public
00008 // License as published by the Free Software Foundation; either
00009 // version 2.1 of the License.
00010 //
00011 // This library is distributed in the hope that it will be useful,
00012 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00013 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00014 // Lesser General Public License for more details.
00015 //
00016 // You should have received a copy of the GNU Lesser General Public
00017 // License along with this library; if not, write to the Free Software
00018 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
00019 //
00020 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
00021 //
00022 
00023 //  SALOME_ParallelComponent : implementation of container and engine for Parallel Kernel
00024 //  File   : SALOME_ParallelComponent_i.cxx
00025 //  Author : André RIBES, EDF
00026 //  Author : Paul RASCLE, EDF - MARC TAJCHMAN, CEA
00027 //
00028 #include "SALOME_ParallelComponent_i.hxx"
00029 #include "SALOME_ParallelContainer_i.hxx"
00030 
00031 #include "OpUtil.hxx"
00032 #include <stdio.h>
00033 #ifndef WIN32
00034 #include <dlfcn.h>
00035 #endif
00036 #include <cstdlib>
00037 #include "utilities.h"
00038 #include "Basics_Utils.hxx"
00039 
00040 #ifndef WIN32
00041 #include <sys/time.h>
00042 #include <sys/resource.h>
00043 #include <unistd.h>
00044 #else
00045 #include <sys/timeb.h>
00046 int SIGUSR11 = 1000;
00047 #endif
00048 
00049 #include <paco_dummy.h>
00050 #include <paco_omni.h>
00051 
00052 
00053 extern bool _Sleeping ;
00054 static Engines_Parallel_Component_i * theEngines_Component ;
00055 
00056 bool Engines_Parallel_Component_i::_isMultiStudy = true;
00057 bool Engines_Parallel_Component_i::_isMultiInstance = false;
00058 
00059 //=============================================================================
00070 //=============================================================================
00071 
00072 Engines_Parallel_Component_i::Engines_Parallel_Component_i(CORBA::ORB_ptr orb, char * ior, int rank,
00073                                          PortableServer::POA_ptr poa, 
00074                                          PortableServer::ObjectId * contId, 
00075                                          const char *instanceName,
00076                                          const char *interfaceName,
00077                                          bool notif,
00078                                          bool regist) :
00079   InterfaceParallel_impl(orb,ior,rank), 
00080   Engines::EngineComponent_serv(orb,ior,rank),
00081   Engines::EngineComponent_base_serv(orb,ior,rank),
00082   Engines::Parallel_Component_serv(orb,ior,rank),
00083   Engines::Parallel_Component_base_serv(orb,ior,rank),
00084   _instanceName(instanceName),
00085   _interfaceName(interfaceName),
00086   _id(NULL),
00087   _myConnexionToRegistry(0),
00088   _ThreadId(0) ,
00089   _ThreadCpuUsed(0) ,
00090   _Executed(false) ,
00091   _graphName("") ,
00092   _nodeName(""),
00093   _studyId(-1),
00094   _destroyed(false),
00095   _CanceledThread(false)
00096 {
00097   MESSAGE("Parallel Component constructor with instanceName "<< _instanceName);
00098   //SCRUTE(pd_refCount);
00099   _orb = CORBA::ORB::_duplicate(orb);
00100   _poa = PortableServer::POA::_duplicate(poa);
00101   _contId = contId ;
00102   CORBA::Object_var o = _poa->id_to_reference(*contId); // container ior...
00103 
00104   if (regist)
00105   {
00106     CORBA::String_var the_ior = _orb->object_to_string(o);
00107     _myConnexionToRegistry = new RegistryConnexion(0, 0, the_ior,"theSession",
00108                                                    _instanceName.c_str());
00109   }
00110   _notifSupplier = new NOTIFICATION_Supplier(instanceName, notif);
00111 
00112   deploy_mutex = new pthread_mutex_t();
00113   pthread_mutex_init(deploy_mutex, NULL);
00114   _proxy = NULL;
00115  //SCRUTE(pd_refCount);
00116 }
00117 
00118 //=============================================================================
00124 //=============================================================================
00125 
00126 Engines_Parallel_Component_i::~Engines_Parallel_Component_i()
00127 {
00128   MESSAGE("Parallel Component destructor");
00129   Engines_Parallel_Container_i::decInstanceCnt(_interfaceName);
00130   if(_myConnexionToRegistry)delete _myConnexionToRegistry;
00131   if(_notifSupplier)delete _notifSupplier;
00132   if (_id)
00133     delete(_id);
00134 
00135   pthread_mutex_destroy(deploy_mutex);
00136   delete deploy_mutex;
00137   if (_proxy)
00138     delete _proxy;
00139 }
00140 
00141 //=============================================================================
00145 //=============================================================================
00146 
00147 char* Engines_Parallel_Component_i::instanceName()
00148 {
00149    return CORBA::string_dup(_instanceName.c_str()) ;
00150 }
00151 
00152 //=============================================================================
00156 //=============================================================================
00157 
00158 char* Engines_Parallel_Component_i::interfaceName()
00159 {
00160   return CORBA::string_dup(_interfaceName.c_str()) ;
00161 }
00162 
00163 //=============================================================================
00170 //=============================================================================
00171 
00172 CORBA::Long Engines_Parallel_Component_i::getStudyId()
00173 {
00174   return _studyId;
00175 }
00176 
00177 //=============================================================================
00181 //=============================================================================
00182 
00183 void Engines_Parallel_Component_i::ping()
00184 {
00185 #ifndef WIN32
00186   MESSAGE("Engines_Parallel_Component_i::ping() pid "<< getpid() << " threadid "
00187           << pthread_self());
00188 #else
00189   MESSAGE("Engines_Parallel_Component_i::ping() pid "<< _getpid()<< " threadid "
00190           << pthread_self().p );
00191 #endif
00192 }
00193 
00194 //=============================================================================
00203 //=============================================================================
00204 
00205 void Engines_Parallel_Component_i::destroy()
00206 {
00207   MESSAGE("Engines_Parallel_Component_i::destroy()");
00208   MESSAGE("Object Instance will be deleted when Shutdown of the container will be called");
00209   if (!_destroyed)
00210   {
00211     _remove_ref();
00212     _destroyed = true;
00213   }
00214 }
00215 
00216 //=============================================================================
00221 //=============================================================================
00222 
00223 Engines::Container_ptr Engines_Parallel_Component_i::GetContainerRef()
00224 {
00225   MESSAGE("Engines_Parallel_Component_i::GetContainerRef");
00226   CORBA::Object_var o = _poa->id_to_reference(*_contId) ;
00227   return Engines::Container::_narrow(o);
00228 }
00229 
00230 //=============================================================================
00239 //=============================================================================
00240 
00241 void Engines_Parallel_Component_i::setProperties(const Engines::FieldsDict& dico)
00242 {
00243   _fieldsDict.clear();
00244   for (CORBA::ULong i=0; i<dico.length(); i++)
00245     {
00246       std::string cle(dico[i].key);
00247       _fieldsDict[cle] = dico[i].value;
00248     }
00249 }
00250 
00251 //=============================================================================
00257 //=============================================================================
00258 
00259 Engines::FieldsDict* Engines_Parallel_Component_i::getProperties()
00260 {
00261   Engines::FieldsDict_var copie = new Engines::FieldsDict;
00262   copie->length(_fieldsDict.size());
00263   std::map<std::string,CORBA::Any>::iterator it;
00264   CORBA::ULong i = 0;
00265   for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++, i++)
00266     {
00267       std::string cle((*it).first);
00268       copie[i].key = CORBA::string_dup(cle.c_str());
00269       copie[i].value = _fieldsDict[cle];
00270     }
00271   return copie._retn();
00272 }
00273 
00274 //=============================================================================
00278 //=============================================================================
00279 
00280 void Engines_Parallel_Component_i::Names( const char * graphName ,
00281                                  const char * nodeName )
00282 {
00283   _graphName = graphName;
00284   _nodeName = nodeName;
00285   MESSAGE("Engines_Parallel_Component_i::Names( '" << _graphName << "' , '" 
00286                                                    << _nodeName << "' )");
00287 }
00288 
00289 //=============================================================================
00293 //=============================================================================
00294 
00295 bool Engines_Parallel_Component_i::Kill_impl() 
00296 {
00297 //  MESSAGE("Engines_Parallel_Component_i::Kill_i() pthread_t "<< pthread_self()
00298 //          << " pid " << getpid() << " instanceName "
00299 //          << _instanceName.c_str() << " interface " << _interfaceName.c_str()
00300 //          << " machineName " << GetHostname().c_str()<< " _id " << hex << _id
00301 //          << dec << " _ThreadId " << _ThreadId << " this " << hex << this
00302 //          << dec ) ;
00303 
00304   bool RetVal = false ;
00305 #ifndef WIN32
00306   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
00307     {
00308       RetVal = Killer( _ThreadId , SIGUSR2 ) ;
00309       _ThreadId = (pthread_t ) -1 ;
00310     }
00311 
00312 #else
00313   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
00314     {
00315       RetVal = Killer( *_ThreadId , 0 ) ;
00316       _ThreadId = (pthread_t* ) 0 ;
00317     }
00318 
00319 #endif
00320   return RetVal ;
00321 }
00322 
00323 //=============================================================================
00327 //=============================================================================
00328 
00329 bool Engines_Parallel_Component_i::Stop_impl()
00330 {
00331 #ifndef WIN32
00332   MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self()
00333           << " pid " << getpid() << " instanceName "
00334           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
00335           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
00336           << dec << " _ThreadId " << _ThreadId );
00337 #else
00338   MESSAGE("Engines_Parallel_Component_i::Stop_i() pthread_t "<< pthread_self().p
00339           << " pid " << _getpid() << " instanceName "
00340           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
00341           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
00342           << dec << " _ThreadId " << _ThreadId );
00343 #endif
00344   
00345 
00346   bool RetVal = false ;
00347 #ifndef WIN32
00348   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
00349     {
00350       RetVal = Killer( _ThreadId , 0 ) ;
00351       _ThreadId = (pthread_t ) -1 ;
00352     }
00353 #else
00354   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
00355     {
00356       RetVal = Killer( *_ThreadId , 0 ) ;
00357       _ThreadId = (pthread_t* ) 0 ;
00358     }
00359 #endif
00360   return RetVal ;
00361 }
00362 
00363 //=============================================================================
00367 //=============================================================================
00368 
00369 bool Engines_Parallel_Component_i::Suspend_impl()
00370 {
00371 #ifndef WIN32
00372   MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self()
00373           << " pid " << getpid() << " instanceName "
00374           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
00375           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
00376           << dec << " _ThreadId " << _ThreadId );
00377 #else
00378   MESSAGE("Engines_Parallel_Component_i::Suspend_i() pthread_t "<< pthread_self().p
00379           << " pid " << _getpid() << " instanceName "
00380           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
00381           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
00382           << dec << " _ThreadId " << _ThreadId );
00383 #endif
00384 
00385   bool RetVal = false ;
00386 #ifndef WIN32
00387   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
00388 #else
00389   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
00390 #endif
00391     {
00392       if ( _Sleeping )
00393         {
00394           return false ;
00395         }
00396     else 
00397       {
00398 #ifndef WIN32
00399         RetVal = Killer( _ThreadId ,SIGINT ) ;
00400 #else
00401         RetVal = Killer( *_ThreadId ,SIGINT ) ;
00402 #endif
00403         //if ( RetVal ) _Sleeping = true;
00404 
00405       }
00406     }
00407   return RetVal ;
00408 }
00409 
00410 //=============================================================================
00414 //=============================================================================
00415 
00416 bool Engines_Parallel_Component_i::Resume_impl()
00417 {
00418 #ifndef WIN32
00419   MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self()
00420           << " pid " << getpid() << " instanceName "
00421           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
00422           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
00423           << dec << " _ThreadId " << _ThreadId );
00424 #else
00425   MESSAGE("Engines_Parallel_Component_i::Resume_i() pthread_t "<< pthread_self().p
00426           << " pid " << _getpid() << " instanceName "
00427           << _instanceName.c_str() << " interface " << _interfaceName.c_str()
00428           << " machineName " << Kernel_Utils::GetHostname().c_str()<< " _id " << hex << _id
00429           << dec << " _ThreadId " << _ThreadId );
00430 #endif
00431   bool RetVal = false ;
00432 #ifndef WIN32
00433   if ( _ThreadId > 0 && pthread_self() != _ThreadId )
00434 #else
00435   if ( _ThreadId > 0 && pthread_self().p != _ThreadId->p )
00436 #endif
00437     {
00438     if ( _Sleeping ) 
00439       {
00440         _Sleeping = false ;
00441         RetVal = true ;
00442       }
00443     else
00444       {
00445         RetVal = false ;
00446       }
00447     }
00448   return RetVal ;
00449 }
00450 
00451 //=============================================================================
00455 //=============================================================================
00456 
00457 CORBA::Long Engines_Parallel_Component_i::CpuUsed_impl()
00458 {
00459   long cpu = 0 ;
00460   if ( _ThreadId || _Executed )
00461     {
00462     if ( _ThreadId > 0 )
00463       {
00464 #ifndef WIN32
00465       if ( pthread_self() != _ThreadId )
00466 #else
00467       if ( pthread_self().p != _ThreadId->p )
00468 #endif
00469         {
00470         if ( _Sleeping )
00471           {
00472           }
00473         else
00474           {
00475             // Get Cpu in the appropriate thread with that object !...
00476             theEngines_Component = this ;
00477 #ifndef WIN32
00478             Killer( _ThreadId ,SIGUSR1 ) ;
00479 #else
00480             Killer( *_ThreadId ,SIGUSR11 ) ;
00481 #endif
00482           }
00483         cpu = _ThreadCpuUsed ;
00484         }
00485       else
00486         {
00487           _ThreadCpuUsed = CpuUsed() ;
00488           cpu = _ThreadCpuUsed ;
00489           // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
00490           //      << _serviceName << " " << cpu << std::endl ;
00491       }
00492     }
00493     else 
00494       {
00495         cpu = _ThreadCpuUsed ;
00496         // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed_impl "
00497         //      << _serviceName << " " << cpu<< std::endl ;
00498       }
00499     }
00500   else
00501     {
00502       // std::cout<< pthread_self()<<"Engines_Parallel_Component_i::CpuUsed_impl _ThreadId "
00503       //     <<_ThreadId <<" "<<_serviceName<<" _StartUsed "<<_StartUsed<<std::endl;
00504     }
00505   return cpu ;
00506 }
00507 
00508 
00509 //=============================================================================
00513 //=============================================================================
00514 
00515 Engines_Parallel_Container_i *Engines_Parallel_Component_i::GetContainerPtr()
00516 {
00517   return dynamic_cast<Engines_Parallel_Container_i*>(_poa->id_to_servant(*_contId)) ;
00518 }
00519 
00520 //=============================================================================
00529 //=============================================================================
00530 
00531 CORBA::Boolean Engines_Parallel_Component_i::setStudyId(CORBA::Long studyId)
00532 {
00533   ASSERT( studyId >= 0);
00534   CORBA::Boolean ret = false;
00535   if (_studyId < 0) // --- not yet initialized 
00536     {
00537       _studyId = studyId;
00538       ret = true;
00539     }
00540   else
00541     if ( _studyId == studyId) ret = true;
00542   return ret;
00543 }
00544 
00545 //=============================================================================
00550 //=============================================================================
00551 
00552 PortableServer::ObjectId * Engines_Parallel_Component_i::getId()
00553 {
00554 //  MESSAGE("PortableServer::ObjectId * Engines_Parallel_Component_i::getId()");
00555   return _id ;
00556 }
00557 
00558 //=============================================================================
00562 //=============================================================================
00563 
00564 void Engines_Parallel_Component_i::beginService(const char *serviceName)
00565 {
00566 #ifndef WIN32
00567   MESSAGE(pthread_self() << "Send BeginService notification for " <<serviceName
00568           << endl << "Parallel Component instance : " << _instanceName << endl << endl);
00569 #else
00570   MESSAGE(pthread_self().p << "Send BeginService notification for " <<serviceName
00571           << endl << "Parallel Component instance : " << _instanceName << endl << endl);
00572 #endif
00573 #ifndef WIN32
00574   _ThreadId = pthread_self() ;
00575 #else
00576   _ThreadId = new pthread_t;
00577   _ThreadId->p = pthread_self().p ;
00578   _ThreadId->x = pthread_self().x ;
00579 #endif
00580   _StartUsed = 0 ;
00581   _StartUsed = CpuUsed_impl() ;
00582   _ThreadCpuUsed = 0 ;
00583   _Executed = true ;
00584   _serviceName = serviceName ;
00585   if ( pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS , NULL ) )
00586     {
00587       perror("pthread_setcanceltype ") ;
00588       exit(0) ;
00589     }
00590   if ( pthread_setcancelstate( PTHREAD_CANCEL_ENABLE , NULL ) )
00591     {
00592       perror("pthread_setcancelstate ") ;
00593       exit(0) ;
00594     }
00595 //  MESSAGE(pthread_self() << " Return from BeginService for " << serviceName
00596 //          << " ThreadId " << _ThreadId << " StartUsed " << _StartUsed
00597 //          << " _graphName " << _graphName << " _nodeName " << _nodeName );
00598 
00599   // --- for supervisor : all strings given with setProperties
00600   //     are set in environment
00601   bool overwrite = true;
00602   std::map<std::string,CORBA::Any>::iterator it;
00603   for (it = _fieldsDict.begin(); it != _fieldsDict.end(); it++)
00604     {
00605       std::string cle((*it).first);
00606       if ((*it).second.type()->kind() == CORBA::tk_string)
00607         {
00608           const char* value;
00609           (*it).second >>= value;
00610           // ---todo: replace __GNUC__ test by an autoconf macro AC_CHECK_FUNC.
00611 #if defined __GNUC__
00612           //int ret = setenv(cle.c_str(), value, overwrite);
00613           setenv(cle.c_str(), value, overwrite);
00614 #else
00615           //CCRT porting : setenv not defined in stdlib.h
00616           std::string s(cle);
00617           s+='=';
00618           s+=value;
00619           // char* cast because 1st arg of linux putenv function
00620           // is not a const char* !
00621           //int ret=putenv((char *)s.c_str());
00622           putenv((char *)s.c_str());
00623           //End of CCRT porting
00624 #endif
00625           MESSAGE("--- setenv: "<<cle<<" = "<< value);
00626         }
00627     }
00628 }
00629 
00630 //=============================================================================
00634 //=============================================================================
00635 
00636 void Engines_Parallel_Component_i::endService(const char *serviceName)
00637 {
00638   if ( !_CanceledThread )
00639     _ThreadCpuUsed = CpuUsed_impl() ;
00640 
00641 #ifndef WIN32
00642   MESSAGE(pthread_self() << " Send EndService notification for " << serviceName
00643           << endl << " Parallel Component instance : " << _instanceName << " StartUsed "
00644           << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
00645 #else
00646   MESSAGE(pthread_self().p << " Send EndService notification for " << serviceName
00647           << endl << " Parallel Component instance : " << _instanceName << " StartUsed "
00648     << _StartUsed << " _ThreadCpuUsed "<< _ThreadCpuUsed << endl <<endl);
00649 #endif
00650   _ThreadId = 0 ;
00651 }
00652 
00653 //=============================================================================
00657 //=============================================================================
00658 
00659 char* Engines_Parallel_Component_i::graphName()
00660 {
00661   return CORBA::string_dup( _graphName.c_str() ) ;
00662 }
00663 
00664 //=============================================================================
00668 //=============================================================================
00669 
00670 char* Engines_Parallel_Component_i::nodeName()
00671 {
00672   return CORBA::string_dup( _nodeName.c_str() ) ;
00673 }
00674 
00675 //=============================================================================
00679 //=============================================================================
00680 
00681 bool Engines_Parallel_Component_i::Killer( pthread_t ThreadId , int signum )
00682 {
00683 #ifndef WIN32
00684   if ( ThreadId )
00685 #else
00686   if ( ThreadId.p )
00687 #endif
00688     {
00689       if ( signum == 0 )
00690         {
00691           if ( pthread_cancel( ThreadId ) )
00692             {
00693               perror("Killer pthread_cancel error") ;
00694               return false ;
00695             }
00696           else
00697             {
00698 #ifndef WIN32
00699               MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
00700                       << " pthread_canceled") ;
00701 #else
00702         MESSAGE(pthread_self().p << "Killer : ThreadId " << ThreadId.p
00703                       << " pthread_canceled") ;
00704 #endif
00705             }
00706         }
00707       else
00708         {
00709           if ( pthread_kill( ThreadId , signum ) == -1 )
00710             {
00711               perror("Killer pthread_kill error") ;
00712               return false ;
00713             }
00714           else 
00715             {
00716 #ifndef WIN32
00717         MESSAGE(pthread_self() << "Killer : ThreadId " << ThreadId
00718                       << " pthread_killed(" << signum << ")") ;
00719 #else
00720         MESSAGE(pthread_self().p << "Killer : ThreadId " << ThreadId.p
00721                       << " pthread_killed(" << signum << ")") ;
00722 #endif
00723             }
00724         }
00725     }
00726   return true ;
00727 }
00728 
00729 //=============================================================================
00733 //=============================================================================
00734 
00735 void SetCpuUsed()
00736 {
00737   if (theEngines_Component)
00738     theEngines_Component->SetCurCpu();
00739 }
00740 
00741 //=============================================================================
00745 //=============================================================================
00746 
00747 void Engines_Parallel_Component_i::SetCurCpu()
00748 {
00749   _ThreadCpuUsed =  CpuUsed() ;
00750   //  MESSAGE(pthread_self() << 
00751   //  " Engines_Parallel_Component_i::SetCurCpu() _ThreadCpuUsed " << _ThreadCpuUsed) ;
00752 }
00753 
00754 //=============================================================================
00758 //=============================================================================
00759 
00760 long Engines_Parallel_Component_i::CpuUsed()
00761 {
00762   long cpu = 0 ;
00763 #ifndef WIN32
00764   struct rusage usage ;
00765   if ( _ThreadId || _Executed )
00766     {
00767       if ( getrusage( RUSAGE_SELF , &usage ) == -1 )
00768         {
00769           perror("Engines_Parallel_Component_i::CpuUsed") ;
00770           return 0 ;
00771         }
00772       cpu = usage.ru_utime.tv_sec - _StartUsed ;
00773       // std::cout << pthread_self() << " Engines_Parallel_Component_i::CpuUsed " << " "
00774       //      << _serviceName   << usage.ru_utime.tv_sec << " - " << _StartUsed
00775       //      << " = " << cpu << std::endl ;
00776     }
00777   else
00778     {
00779       // std::cout << pthread_self() << "Engines_Parallel_Component_i::CpuUsed _ThreadId "
00780       //      << _ThreadId << " " << _serviceName<< " _StartUsed " 
00781       //      << _StartUsed << std::endl ;
00782     }
00783 #else
00784         // NOT implementet yet
00785 #endif
00786 
00787 
00788   return cpu ;
00789 }
00790 
00791 void CallCancelThread()
00792 {
00793   if ( theEngines_Component )
00794     theEngines_Component->CancelThread() ;
00795 }
00796 
00797 //=============================================================================
00801 //=============================================================================
00802 
00803 void Engines_Parallel_Component_i::CancelThread()
00804 {
00805   _CanceledThread = true;
00806 }
00807 
00808 //=============================================================================
00812 //=============================================================================
00813 
00814 void Engines_Parallel_Component_i::sendMessage(const char *event_type,
00815                                       const char *message)
00816 {
00817     _notifSupplier->Send(graphName(), nodeName(), event_type, message);
00818 }
00819 
00820 //=============================================================================
00824 //=============================================================================
00825 
00826 std::string Engines_Parallel_Component_i::GetDynLibraryName(const char *componentName)
00827 {
00828   std::string ret="lib";
00829   ret+=componentName;
00830   ret+="Engine.so";
00831   return ret;
00832 }
00833 
00834 //=============================================================================
00838 //=============================================================================
00839 
00840 Engines::TMPFile* Engines_Parallel_Component_i::DumpPython(CORBA::Object_ptr theStudy, 
00841                                                            CORBA::Boolean isPublished, 
00842                                                            CORBA::Boolean isMultiFile,
00843                                                            CORBA::Boolean& isValidScript)
00844 {
00845   const char* aScript = isMultiFile ? "def RebuildData(theStudy): pass" : "";
00846   char* aBuffer = new char[strlen(aScript)+1];
00847   strcpy(aBuffer, aScript);
00848   CORBA::Octet* anOctetBuf =  (CORBA::Octet*)aBuffer;
00849   int aBufferSize = strlen(aBuffer)+1;
00850   Engines::TMPFile_var aStreamFile = new Engines::TMPFile(aBufferSize, aBufferSize, anOctetBuf, 1); 
00851   isValidScript = true;
00852   return aStreamFile._retn(); 
00853 }
00854 
00855 
00856 Engines::Salome_file_ptr 
00857 Engines_Parallel_Component_i::setInputFileToService(const char* service_name, 
00858                                                     const char* Salome_file_name) 
00859 {
00860   // Try to find the service, if it doesn't exist, we add it.
00861   _Service_file_map_it = _Input_Service_file_map.find(service_name);
00862   if (_Service_file_map_it ==  _Input_Service_file_map.end()) {
00863     _t_Salome_file_map * _map = new _t_Salome_file_map();
00864     _Input_Service_file_map[service_name] = _map;
00865     _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
00866     _Proxy_Input_Service_file_map[service_name] = _proxy_map;
00867     _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
00868     _IOR_Proxy_Input_Service_file_map[service_name] = _IOR_proxy_map;
00869   }
00870   _t_Salome_file_map * _map = _Input_Service_file_map[service_name];
00871   _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
00872   _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Input_Service_file_map[service_name];
00873   
00874   pthread_mutex_lock(deploy_mutex);
00875   std::string proxy_ior;
00876 
00877   // Try to find the Salome_file ...
00878   _Salome_file_map_it = _map->find(Salome_file_name);
00879   if (_Salome_file_map_it ==  _map->end()) {
00880 
00881     // We create a new PaCO++ object.
00882     // He has the same configuration than
00883     // his component
00884 
00885     // Firstly, we have to create the proxy object
00886     // of the Salome_file and transmit his
00887     // reference to the other nodes.
00888     Engines::Parallel_Salome_file_proxy_impl * proxy = NULL;
00889     if (getMyRank() == 0) {
00890       proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
00891                                                            new paco_omni_fabrique());
00892       proxy->copyGlobalContext(this); 
00893       PaCO::PacoTopology_t serveur_topo;
00894       serveur_topo.total = getTotalNode();
00895       proxy->setTopology(serveur_topo);
00896 
00897       // We register the CORBA objet into the POA
00898       CORBA::Object_ptr proxy_ref = proxy->_this();
00899 
00900       // We send the reference to all the nodes...
00901       Engines::Parallel_Component_var component_proxy = 
00902         Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
00903       component_proxy->send_parallel_proxy_object(proxy_ref);
00904 
00905       // Adding proxy into the map
00906       (*_proxy_map)[Salome_file_name] = proxy;
00907     }
00908     else {
00909       this->wait_parallel_object_proxy();
00910     }
00911 
00912     proxy_ior = this->get_parallel_proxy_object();
00913     (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
00914 
00915     // We register each node of the parallel Salome_file object
00916     // into the proxy.
00917     for (int i = 0; i < getTotalNode(); i++) {
00918       if (i ==  getMyRank()) {
00919         Parallel_Salome_file_i * servant = 
00920           new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb), 
00921                                      proxy_ior.c_str(),
00922                                      i);
00923         servant->copyGlobalContext(this); 
00924         
00925         // We register the CORBA objet into the POA
00926         servant->POA_PaCO::InterfaceParallel::_this();
00927 
00928         // Register the servant
00929         servant->deploy();
00930 
00931         // Adding servant to the map
00932         (*_map)[Salome_file_name] = servant;
00933       }
00934 
00935       _my_com->paco_barrier();
00936       // start parallel object
00937       if (getMyRank() == 0) {
00938         proxy->start();
00939         _my_com->paco_barrier();
00940       }
00941       else
00942         _my_com->paco_barrier();
00943     }
00944     // Parallel_Salome_file is created and deployed
00945     delete _proxy;
00946     _proxy = NULL;
00947   }
00948 
00949   pthread_mutex_unlock(deploy_mutex);
00950   proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
00951   CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
00952   return Engines::Salome_file::_narrow(proxy_ref);
00953 }
00954 
00955 Engines::Salome_file_ptr 
00956 Engines_Parallel_Component_i::setOutputFileToService(const char* service_name, 
00957                                                      const char* Salome_file_name) 
00958 {
00959   // Try to find the service, if it doesn't exist, we add it.
00960   _Service_file_map_it = _Output_Service_file_map.find(service_name);
00961   if (_Service_file_map_it ==  _Output_Service_file_map.end()) {
00962     _t_Salome_file_map * _map = new _t_Salome_file_map();
00963     _Output_Service_file_map[service_name] = _map;
00964     _t_Proxy_Salome_file_map * _proxy_map = new _t_Proxy_Salome_file_map();
00965     _Proxy_Output_Service_file_map[service_name] = _proxy_map;
00966     _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = new _t_IOR_Proxy_Salome_file_map();
00967     _IOR_Proxy_Output_Service_file_map[service_name] = _IOR_proxy_map;
00968   }
00969   _t_Salome_file_map * _map = _Output_Service_file_map[service_name];
00970   _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Output_Service_file_map[service_name];
00971   _t_IOR_Proxy_Salome_file_map * _IOR_proxy_map = _IOR_Proxy_Output_Service_file_map[service_name];
00972   
00973   pthread_mutex_lock(deploy_mutex);
00974   std::string proxy_ior;
00975 
00976   // Try to find the Salome_file ...
00977   _Salome_file_map_it = _map->find(Salome_file_name);
00978   Engines::Parallel_Salome_file_proxy_impl * proxy;
00979   if (_Salome_file_map_it ==  _map->end()) {
00980 
00981     // We create a new PaCO++ object.
00982     // He has the same configuration than
00983     // his component
00984 
00985     // Firstly, we have to create the proxy object
00986     // of the Salome_file and transmit his
00987     // reference to the other nodes.
00988     if (getMyRank() == 0) {
00989         proxy = new Engines::Parallel_Salome_file_proxy_impl(CORBA::ORB::_duplicate(_orb),
00990                                                              new paco_omni_fabrique());
00991       proxy->copyGlobalContext(this); 
00992       PaCO::PacoTopology_t serveur_topo;
00993       serveur_topo.total = getTotalNode();
00994       proxy->setTopology(serveur_topo);
00995 
00996       // We register the CORBA objet into the POA
00997       CORBA::Object_ptr proxy_ref = proxy->_this();
00998 
00999       // We send the reference to all the nodes...
01000       Engines::Parallel_Component_var component_proxy = 
01001         Engines::Parallel_Component::_narrow(InterfaceParallel_impl::_proxy);
01002       component_proxy->send_parallel_proxy_object(proxy_ref);
01003 
01004       // Adding proxy into the map
01005       (*_proxy_map)[Salome_file_name] = proxy;
01006     }
01007     else {
01008       this->wait_parallel_object_proxy();
01009     }
01010 
01011     proxy_ior = this->get_parallel_proxy_object();
01012     (*_IOR_proxy_map)[Salome_file_name] = proxy_ior;
01013 
01014     // We register each node of the parallel Salome_file object
01015     // into the proxy.
01016     for (int i = 0; i < getTotalNode(); i++) {
01017       if (i ==  getMyRank()) {
01018         Parallel_Salome_file_i * servant = 
01019           new Parallel_Salome_file_i(CORBA::ORB::_duplicate(_orb), 
01020                                      proxy_ior.c_str(),
01021                                      i);
01022         servant->copyGlobalContext(this); 
01023         
01024         // We register the CORBA objet into the POA
01025         servant->POA_PaCO::InterfaceParallel::_this();
01026 
01027         // Register the servant
01028         servant->deploy();
01029 
01030         // Adding servant to the map
01031         (*_map)[Salome_file_name] = servant;
01032       }
01033 
01034       _my_com->paco_barrier();
01035       // start parallel object
01036       if (getMyRank() == 0) {
01037         proxy->start();
01038         _my_com->paco_barrier();
01039       }
01040       else
01041         _my_com->paco_barrier();
01042     }
01043 
01044     // Parallel_Salome_file is created and deployed
01045     delete _proxy;
01046     _proxy = NULL;
01047   }
01048   pthread_mutex_unlock(deploy_mutex);
01049   proxy_ior = (*_IOR_proxy_map)[Salome_file_name];
01050   CORBA::Object_ptr proxy_ref = _orb->string_to_object(proxy_ior.c_str());
01051   return Engines::Salome_file::_narrow(proxy_ref);
01052 }
01053 
01054 Engines::Salome_file_ptr 
01055 Engines_Parallel_Component_i::getInputFileToService(const char* service_name, 
01056                                                     const char* Salome_file_name) 
01057 {
01058   // Try to find the service, if it doesn't exist, we throw an exception.
01059   _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
01060   if (_Proxy_Service_file_map_it ==  _Proxy_Input_Service_file_map.end()) {
01061     SALOME::ExceptionStruct es;
01062     es.type = SALOME::INTERNAL_ERROR;
01063     es.text = "service doesn't have salome files";
01064     throw SALOME::SALOME_Exception(es);
01065   }
01066   _t_Proxy_Salome_file_map * _map = _Proxy_Input_Service_file_map[service_name];
01067 
01068   // Try to find the Salome_file ...
01069   _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
01070   if (_Proxy_Salome_file_map_it ==  _map->end()) {
01071     SALOME::ExceptionStruct es;
01072     es.type = SALOME::INTERNAL_ERROR;
01073     es.text = "service doesn't have this Salome_file";
01074     throw SALOME::SALOME_Exception(es);
01075   }
01076 
01077   // Client get the proxy object
01078   Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
01079   return Sfile->_this();
01080 }
01081 
01082 Engines::Salome_file_ptr 
01083 Engines_Parallel_Component_i::getOutputFileToService(const char* service_name, 
01084                                                      const char* Salome_file_name) 
01085 {
01086   // Try to find the service, if it doesn't exist, we throw an exception.
01087   _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
01088   if (_Proxy_Service_file_map_it ==  _Proxy_Output_Service_file_map.end()) {
01089     SALOME::ExceptionStruct es;
01090     es.type = SALOME::INTERNAL_ERROR;
01091     es.text = "service doesn't have salome files";
01092     throw SALOME::SALOME_Exception(es);
01093   }
01094   _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
01095 
01096   // Try to find the Salome_file ...
01097   _Proxy_Salome_file_map_it = _map->find(Salome_file_name);
01098   if (_Proxy_Salome_file_map_it ==  _map->end()) {
01099     SALOME::ExceptionStruct es;
01100     es.type = SALOME::INTERNAL_ERROR;
01101     es.text = "service doesn't have this Salome_file";
01102     throw SALOME::SALOME_Exception(es);
01103   }
01104 
01105   // Client get the proxy object
01106   Engines::Parallel_Salome_file_proxy_impl * Sfile = (*_map)[Salome_file_name];
01107   return Sfile->_this();
01108 }
01109 
01110 
01111 void 
01112 Engines_Parallel_Component_i::checkInputFilesToService(const char* service_name) 
01113 {
01114   // Try to find the service, if it doesn't exist, nothing to do.
01115   _Proxy_Service_file_map_it = _Proxy_Input_Service_file_map.find(service_name);
01116   if (_Proxy_Service_file_map_it !=  _Proxy_Input_Service_file_map.end()) {
01117     _t_Proxy_Salome_file_map * _proxy_map = _Proxy_Input_Service_file_map[service_name];
01118     _t_Proxy_Salome_file_map::iterator begin = _proxy_map->begin();
01119     _t_Proxy_Salome_file_map::iterator end = _proxy_map->end();
01120 
01121     for(;begin!=end;begin++) {
01122       Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
01123       std::string file_port_name = begin->first;
01124       configureSalome_file(service_name, file_port_name, file);
01125       file->recvFiles();
01126     }
01127   }
01128 }
01129 
01130 void 
01131 Engines_Parallel_Component_i::checkOutputFilesToService(const char* service_name) 
01132 {
01133   // Try to find the service, if it doesn't exist, nothing to do.
01134   _Proxy_Service_file_map_it = _Proxy_Output_Service_file_map.find(service_name);
01135   if (_Proxy_Service_file_map_it !=  _Proxy_Output_Service_file_map.end()) {
01136     _t_Proxy_Salome_file_map * _map = _Proxy_Output_Service_file_map[service_name];
01137     _t_Proxy_Salome_file_map::iterator begin = _map->begin();
01138     _t_Proxy_Salome_file_map::iterator end = _map->end();
01139 
01140     for(;begin!=end;begin++) {
01141       Engines::Parallel_Salome_file_proxy_impl * file = begin->second;
01142       std::string file_port_name = begin->first;
01143       configureSalome_file(service_name, file_port_name, file);
01144       file->recvFiles();
01145     }
01146   }
01147 
01148 }
01149 
01150 //=============================================================================
01154 //=============================================================================
01155 void 
01156 Engines_Parallel_Component_i::send_parallel_proxy_object(CORBA::Object_ptr proxy_ref) {
01157   _proxy = _orb->object_to_string(proxy_ref);
01158 }
01159 
01160 //=============================================================================
01164 //=============================================================================
01165 void 
01166 Engines_Parallel_Component_i::wait_parallel_object_proxy() {
01167   char * proxy = NULL;
01168   proxy =  get_parallel_proxy_object();
01169   while(proxy == NULL)
01170   {
01171     sleep(1);
01172     proxy = get_parallel_proxy_object();
01173   }
01174 }
01175 
01176 //=============================================================================
01180 //=============================================================================
01181 char * 
01182 Engines_Parallel_Component_i::get_parallel_proxy_object() {
01183   return _proxy;
01184 }
01185 
01186 
01187 //=============================================================================
01194 //=============================================================================
01195 void
01196 Engines_Parallel_Component_i::configureSalome_file(std::string service_name,
01197                                                    std::string file_port_name,
01198                                                    Engines::Parallel_Salome_file_proxy_impl * file) 
01199 {
01200   // By default this method does nothing
01201 }