Back to index

salome-kernel  6.5.0
Parallel_Salome_file_i.cxx
Go to the documentation of this file.
00001 // Copyright (C) 2007-2012  CEA/DEN, EDF R&D, OPEN CASCADE
00002 //
00003 // Copyright (C) 2003-2007  OPEN CASCADE, EADS/CCR, LIP6, CEA/DEN,
00004 // CEDRAT, EDF R&D, LEG, PRINCIPIA R&D, BUREAU VERITAS
00005 //
00006 // This library is free software; you can redistribute it and/or
00007 // modify it under the terms of the GNU Lesser General Public
00008 // License as published by the Free Software Foundation; either
00009 // version 2.1 of the License.
00010 //
00011 // This library is distributed in the hope that it will be useful,
00012 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00013 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00014 // Lesser General Public License for more details.
00015 //
00016 // You should have received a copy of the GNU Lesser General Public
00017 // License along with this library; if not, write to the Free Software
00018 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
00019 //
00020 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
00021 //
00022 
00023 //  File   : Parallel_Salome_file_i.cxx
00024 //  Author : André RIBES, EDF
00025 //  Module : SALOME
00026 //  $Header: 
00027 //
00028 #include "Parallel_Salome_file_i.hxx"
00029 #include "utilities.h"
00030 
00031 Parallel_Salome_file_i::Parallel_Salome_file_i(CORBA::ORB_ptr orb, 
00032                                                const char * ior,
00033                                                int rank) :
00034   InterfaceParallel_impl(orb,ior,rank), 
00035   Engines::Salome_file_serv(orb,ior,rank),
00036   Engines::Salome_file_base_serv(orb,ior,rank),
00037   Engines::fileTransfer_serv(orb,ior,rank),
00038   Engines::Parallel_Salome_file_serv(orb,ior,rank),
00039   Engines::fileTransfer_base_serv(orb,ior,rank),
00040   SALOME::GenericObj_serv(orb,ior,rank),
00041   SALOME::GenericObj_base_serv(orb,ior,rank),
00042   Engines::Parallel_Salome_file_base_serv(orb,ior,rank)
00043 {
00044   CORBA::Object_ptr obj = _orb->string_to_object(ior);
00045   proxy = Engines::Parallel_Salome_file::_narrow(obj);
00046   parallel_file = NULL;
00047 }
00048 
00049 Parallel_Salome_file_i::~Parallel_Salome_file_i() {}
00050 
00051 void 
00052 Parallel_Salome_file_i::load(const char* hdf5_file) {
00053   MESSAGE("Parallel_Salome_file_i::load : NOT YET IMPLEMENTED");
00054   SALOME::ExceptionStruct es;
00055   es.type = SALOME::INTERNAL_ERROR;
00056   es.text = "Parallel_Salome_file_i::load : NOT YET IMPLEMENTED";
00057   throw SALOME::SALOME_Exception(es);
00058 }
00059 
00060 void 
00061 Parallel_Salome_file_i::save(const char* hdf5_file) {
00062   MESSAGE("Parallel_Salome_file_i::save : NOT YET IMPLEMENTED");
00063   SALOME::ExceptionStruct es;
00064   es.type = SALOME::INTERNAL_ERROR;
00065   es.text = "Parallel_Salome_file_i::save : NOT YET IMPLEMENTED";
00066   throw SALOME::SALOME_Exception(es);
00067 }
00068 
00069 void 
00070 Parallel_Salome_file_i::save_all(const char* hdf5_file) {
00071   MESSAGE("Parallel_Salome_file_i::save_all : NOT YET IMPLEMENTED");
00072   SALOME::ExceptionStruct es;
00073   es.type = SALOME::INTERNAL_ERROR;
00074   es.text = "Parallel_Salome_file_i::save_all : NOT YET IMPLEMENTED";
00075   throw SALOME::SALOME_Exception(es);
00076 }
00077 
00078 void 
00079 Parallel_Salome_file_i::connect(Engines::Salome_file_ptr source_Salome_file) {
00080   // only one file managed case 
00081   Salome_file_i::connect(source_Salome_file);
00082 
00083   // Test if the file is managed in an another node
00084   // If yes, node is updated
00085   _t_fileManaged::iterator begin = _fileManaged.begin();
00086   _t_fileManaged::iterator end = _fileManaged.end();
00087   for(;begin!=end;begin++) {
00088     std::string file_name = begin->first;
00089     if (_fileManaged[file_name].node > 0 && getMyRank() == 0) {
00090       if (parallel_file == NULL)
00091         parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
00092       parallel_file->connect(source_Salome_file, _fileManaged[file_name].node);
00093     }
00094   }
00095 }
00096 
00097 void 
00098 Parallel_Salome_file_i::connectDistributedFile(const char * file_name,
00099                                                Engines::Salome_file_ptr source_Salome_file) {
00100   Salome_file_i::connectDistributedFile(file_name, source_Salome_file);
00101 
00102   // Test if the file is managed in an another node
00103   // If yes, node is updated
00104   std::string fname(file_name);
00105   if (_fileManaged[fname].node > 0 && getMyRank() == 0) {
00106     if (parallel_file == NULL)
00107       parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
00108     parallel_file->connectDistributedFile(file_name, source_Salome_file, _fileManaged[fname].node);
00109   }
00110 }
00111 
00112 void 
00113 Parallel_Salome_file_i::setDistributedSourceFile(const char* file_name,
00114                                                  const char * source_file_name) {
00115   Salome_file_i::setDistributedSourceFile(file_name, source_file_name);
00116   // Test if the file is managed in an another node
00117   // If yes, node is updated
00118   std::string fname(file_name);
00119   if (_fileManaged[fname].node > 0 && getMyRank() == 0) {
00120     if (parallel_file == NULL)
00121       parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
00122     parallel_file->setDistributedSourceFile(file_name, source_file_name, _fileManaged[fname].node);
00123   }
00124 }
00125 
00126 void
00127 Parallel_Salome_file_i::recvFiles() {
00128   if (parallel_file == NULL)
00129     parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
00130 
00131   std::string files_not_ok("");
00132   int total = getTotalNode();
00133   for (int i =0; i<total; i++) {
00134     try {
00135      parallel_file->recvFiles_node(i);
00136     }
00137     catch (SALOME::SALOME_Exception & ex) {
00138       files_not_ok = files_not_ok + std::string(ex.details.text.in());
00139     }
00140   }
00141 
00142   if (files_not_ok != "")
00143   {
00144     SALOME::ExceptionStruct es;
00145     es.type = SALOME::INTERNAL_ERROR;
00146     std::string text = "files not ready : " + files_not_ok;
00147     es.text = CORBA::string_dup(text.c_str());
00148     throw SALOME::SALOME_Exception(es);
00149   }
00150   else
00151   {
00152     // We change the state of the Salome_file
00153     _state.files_ok = true;
00154   }
00155 }
00156 
00157 void 
00158 Parallel_Salome_file_i::recvFiles_node() {
00159 
00160   std::string files_not_ok("");
00161   _t_fileManaged::iterator begin = _fileManaged.begin();
00162   _t_fileManaged::iterator end = _fileManaged.end();
00163   for(;begin!=end;begin++) 
00164   {
00165     bool result = true;
00166     Engines::file file_infos = begin->second;
00167     if (file_infos.node == getMyRank()) {
00168       // Test if the file is local or distributed
00169       if (std::string(file_infos.type.in()) == "local")
00170       {
00171         if (std::string(file_infos.status.in()) == "not_ok")
00172           result = checkLocalFile(file_infos.file_name.in());
00173       }
00174       else
00175       {
00176         if (std::string(file_infos.status.in()) == "not_ok") {
00177           // 2 cases :
00178           // Source file is a Salome_file
00179           // Source file is a Parallel_Salome_file
00180           PaCO::InterfaceManager_var interface_manager = 
00181             PaCO::InterfaceManager::_narrow(_fileDistributedSource[file_infos.file_name.in()]);
00182           if (CORBA::is_nil(interface_manager))
00183             result = getDistributedFile(file_infos.file_name.in());
00184           else
00185             result = getParallelDistributedFile(file_infos.file_name.in());
00186         }
00187       }
00188       // if the result is false
00189       // we add this file to files_not_ok
00190       if (!result) 
00191       {
00192         files_not_ok.append(" ");
00193         files_not_ok.append(file_infos.file_name.in());
00194       }
00195     }
00196   }
00197   if (files_not_ok != "")
00198   {
00199     SALOME::ExceptionStruct es;
00200     es.type = SALOME::INTERNAL_ERROR;
00201     std::string text = files_not_ok;
00202     es.text = CORBA::string_dup(text.c_str());
00203     throw SALOME::SALOME_Exception(es);
00204   }
00205 }
00206 
00207 bool 
00208 Parallel_Salome_file_i::getParallelDistributedFile(std::string file_name) {
00209 
00210   bool result = true;
00211   const char * source_file_name = _fileManaged[file_name].source_file_name.in();
00212   int fileId;
00213   FILE* fp;
00214   std::string comp_file_name(_fileManaged[file_name].path.in());
00215   comp_file_name.append("/");
00216   comp_file_name.append(_fileManaged[file_name].file_name.in());
00217 
00218   // Test if the process can write on disk
00219   if ((fp = fopen(comp_file_name.c_str(),"wb")) == NULL)
00220   {
00221     INFOS("file " << comp_file_name << " cannot be open for writing");
00222     _fileManaged[file_name].status = CORBA::string_dup("not_ok");
00223     result = false;
00224     return result;
00225   }
00226 
00227   Engines::PaCO_Parallel_Salome_file * parallel_source_file = 
00228     Engines::PaCO_Parallel_Salome_file::PaCO_narrow(_fileDistributedSource[file_name], _orb);
00229 
00230   int node = parallel_source_file->getFileNode(source_file_name);
00231 
00232   try 
00233   {
00234     fileId = parallel_source_file->open(source_file_name, node);
00235   }
00236   catch (...) 
00237   {
00238     _fileManaged[file_name].status = CORBA::string_dup("not_ok");
00239     fclose(fp);
00240     result = false;
00241     return result;
00242   }
00243 
00244   if (fileId > 0)
00245   {
00246     Engines::fileBlock* aBlock;
00247     int toFollow = 1;
00248     int ctr=0;
00249     MESSAGE("begin of transfer of " << comp_file_name);
00250     while (toFollow)
00251     {
00252       ctr++;
00253       aBlock = parallel_source_file->getBlock(fileId, node);
00254       toFollow = aBlock->length();
00255       CORBA::Octet *buf = aBlock->get_buffer();
00256       int nbWri = fwrite(buf, sizeof(CORBA::Octet), toFollow, fp);
00257       delete aBlock;
00258       ASSERT(nbWri == toFollow);
00259     }
00260     fclose(fp);
00261     MESSAGE("end of transfer of " << comp_file_name);
00262     parallel_source_file->close(fileId, node);
00263   }
00264   else
00265   {
00266     INFOS("open reference file for copy impossible");
00267     result = false;
00268     fclose(fp);
00269     _fileManaged[file_name].status = CORBA::string_dup("not_ok");
00270     return result;
00271   }
00272 
00273   _fileManaged[file_name].status = CORBA::string_dup("ok");
00274   return result;
00275 }
00276 
00277 void 
00278 Parallel_Salome_file_i::setContainer(Engines::Container_ptr container) {
00279   _container = Engines::Container::_duplicate(container);
00280 
00281   // Update All the files managed by the node
00282   _t_fileManaged::iterator begin = _fileManaged.begin();
00283   _t_fileManaged::iterator end = _fileManaged.end();
00284   for(;begin!=end;begin++) {
00285     begin->second.container = Engines::Container::_duplicate(container);
00286   }
00287 }
00288 
00289 void 
00290 Parallel_Salome_file_i::setFileNode(const char* file_name, CORBA::Long node) {
00291   
00292   // Test if this file is managed
00293   std::string fname(file_name);
00294   _t_fileManaged::iterator it = _fileManaged.find(fname);
00295   if (it == _fileManaged.end()) 
00296   {
00297     SALOME::ExceptionStruct es;
00298     es.type = SALOME::INTERNAL_ERROR;
00299     es.text = "file is not managed";
00300     throw SALOME::SALOME_Exception(es);
00301   }
00302 
00303   // Update file infos into this node (node 0)
00304   // and into the node that actually managed it
00305   _fileManaged[fname].node = node;
00306 
00307   if (node > 0) {
00308     if (parallel_file == NULL)
00309       parallel_file = Engines::PaCO_Parallel_Salome_file::PaCO_narrow(proxy, _orb);
00310 
00311     Engines::Container_ptr cont = parallel_file->updateFile(_fileManaged[fname], node);
00312     parallel_file->connectDistributedFile(fname.c_str(),
00313                                           _fileDistributedSource[fname],
00314                                           node);
00315 
00316     // Update file infos with the new reference of the container
00317     _fileManaged[fname].container = Engines::Container::_duplicate(cont);
00318   }
00319 }
00320 
00321 Engines::Container_ptr
00322 Parallel_Salome_file_i::updateFile(const Engines::file& file) {
00323   // Copy file
00324   Engines::file new_file_infos(file);
00325 
00326   // Adding it to node list
00327   new_file_infos.container = Engines::Container::_duplicate(_container);
00328   std::string fname(new_file_infos.file_name.in());
00329   _fileManaged[fname] = new_file_infos;
00330 
00331   // Return the new reference of the container associated to the file
00332   return Engines::Container::_duplicate(_container);
00333 }
00334 
00335 CORBA::Long 
00336 Parallel_Salome_file_i::getFileNode(const char* file_name) {
00337   
00338   // Test if this file is managed
00339   std::string fname(file_name);
00340   if (fname == "") {
00341     // We enter in the simple case where the user
00342     // has not used setDistributedSourceFile.
00343     // In this case we try to see if the Salome_file
00344     if (_fileManaged.size() == 1) 
00345     {
00346       // only one file managed 
00347       _t_fileManaged::iterator it = _fileManaged.begin();
00348       fname = it->first;
00349     }
00350     else
00351     {
00352       SALOME::ExceptionStruct es;
00353       es.type = SALOME::INTERNAL_ERROR;
00354       es.text = "Error : there is more than one file that is managed";
00355       throw SALOME::SALOME_Exception(es);
00356     }
00357   }
00358   _t_fileManaged::iterator it = _fileManaged.find(fname);
00359   if (it == _fileManaged.end()) 
00360   {
00361     SALOME::ExceptionStruct es;
00362     es.type = SALOME::INTERNAL_ERROR;
00363     es.text = "file is not managed";
00364     throw SALOME::SALOME_Exception(es);
00365   }
00366 
00367   return _fileManaged[fname].node;
00368 }