Back to index

salome-kernel  6.5.0
Launcher_Job.cxx
Go to the documentation of this file.
00001 // Copyright (C) 2009-2012  CEA/DEN, EDF R&D, OPEN CASCADE
00002 //
00003 // This library is free software; you can redistribute it and/or
00004 // modify it under the terms of the GNU Lesser General Public
00005 // License as published by the Free Software Foundation; either
00006 // version 2.1 of the License.
00007 //
00008 // This library is distributed in the hope that it will be useful,
00009 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00010 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00011 // Lesser General Public License for more details.
00012 //
00013 // You should have received a copy of the GNU Lesser General Public
00014 // License along with this library; if not, write to the Free Software
00015 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
00016 //
00017 // See http://www.salome-platform.org/ or email : webmaster.salome@opencascade.com
00018 //
00019 
00020 // Author: André RIBES - EDF R&D
00021 //
00022 #include "Launcher_Job.hxx"
00023 #include "Launcher.hxx"
00024 
00025 #ifdef WITH_LIBBATCH
00026 #include <Batch/Batch_Constants.hxx>
00027 #endif
00028 
00029 Launcher::Job::Job()
00030 {
00031   _number = -1;
00032   _state = "CREATED";
00033   _launch_date = getLaunchDate();
00034 
00035   _env_file = "";
00036   _job_name = "";
00037   _job_file = "";
00038   _job_file_name = "";
00039   _job_file_name_complete = "";
00040   _work_directory = "";
00041   _local_directory = "";
00042   _result_directory = "";
00043   _maximum_duration = "";
00044   _maximum_duration_in_second = -1;
00045   _resource_required_params.name = "";
00046   _resource_required_params.hostname = "";
00047   _resource_required_params.OS = "";
00048   _resource_required_params.nb_proc = -1;
00049   _resource_required_params.nb_node = -1;
00050   _resource_required_params.nb_proc_per_node = -1;
00051   _resource_required_params.cpu_clock = -1;
00052   _resource_required_params.mem_mb = -1;
00053   _queue = "";
00054   _job_type = "";
00055 
00056 #ifdef WITH_LIBBATCH
00057   _batch_job = new Batch::Job();
00058 #endif
00059 }
00060 
00061 Launcher::Job::~Job() 
00062 {
00063   LAUNCHER_MESSAGE("Deleting job number: " << _number);
00064 #ifdef WITH_LIBBATCH
00065   if (_batch_job)
00066     delete _batch_job;
00067 #endif
00068 }
00069 
00070 void
00071 Launcher::Job::stopJob()
00072 {
00073   LAUNCHER_MESSAGE("Stop resquested for job number: " << _number);
00074   setState("FAILED");
00075 #ifdef WITH_LIBBATCH
00076   if (_batch_job_id.getReference() != "undefined")
00077   {
00078     try 
00079     {
00080       _batch_job_id.deleteJob();
00081     }
00082     catch (const Batch::EmulationException &ex)
00083     {
00084       LAUNCHER_INFOS("WARNING: exception when stopping the job: " << ex.message);
00085     }
00086   }
00087 #endif
00088 }
00089 
00090 
00091 void
00092 Launcher::Job::removeJob()
00093 {
00094   LAUNCHER_MESSAGE("Removing job number: " << _number);
00095 #ifdef WITH_LIBBATCH
00096   if (_batch_job_id.getReference() != "undefined")
00097   {
00098     try 
00099     {
00100       _batch_job_id.deleteJob();
00101     }
00102     catch (const Batch::EmulationException &ex)
00103     {
00104       LAUNCHER_INFOS("WARNING: exception when removing the job: " << ex.message);
00105     }
00106   }
00107 #endif
00108 }
00109 
00110 std::string
00111 Launcher::Job::getJobType()
00112 {
00113   return _job_type;
00114 }
00115 
00116 void
00117 Launcher::Job::setJobName(const std::string & job_name)
00118 {
00119   _job_name = job_name;
00120 }
00121 
00122 std::string
00123 Launcher::Job::getJobName()
00124 {
00125   return _job_name;
00126 }
00127 
00128 void 
00129 Launcher::Job::setState(const std::string & state)
00130 {
00131   // State of a Job: CREATED, QUEUED, RUNNING, FINISHED, FAILED
00132   if (state != "CREATED" &&
00133       state != "IN_PROCESS" &&
00134       state != "QUEUED" &&
00135       state != "RUNNING" &&
00136       state != "PAUSED" &&
00137       state != "FINISHED" &&
00138       state != "FAILED" &&
00139       state != "ERROR")
00140   {
00141     throw LauncherException("Bad state, this state does not exist: " + state);
00142   }
00143   _state = state;
00144 }
00145 
00146 std::string 
00147 Launcher::Job::getState()
00148 {
00149   return _state;
00150 }
00151 
00152 void 
00153 Launcher::Job::setNumber(const int & number)
00154 {
00155   if (_number != -1)
00156     std::cerr << "Launcher::Job::setNumber -- Job number was already defined, before: " << _number << " now: " << number << std::endl;
00157   _number = number;
00158 }
00159 
00160 int
00161 Launcher::Job::getNumber()
00162 {
00163   return _number;
00164 }
00165 
00166 void 
00167 Launcher::Job::setResourceDefinition(const ParserResourcesType & resource_definition)
00168 {
00169   // Check machine_definition
00170   std::string user_name = "";
00171   if (resource_definition.UserName == "")
00172   {
00173     user_name = getenv("USER");
00174     if (user_name == "")
00175     {
00176       std::string mess = "You must define a user name: into your resource description or with env variable USER";
00177       throw LauncherException(mess);
00178     }
00179   }
00180   else
00181     user_name = resource_definition.UserName;
00182 
00183   _resource_definition = resource_definition;
00184   _resource_definition.UserName = user_name;
00185 }
00186 
00187 ParserResourcesType 
00188 Launcher::Job::getResourceDefinition()
00189 {
00190   return _resource_definition;
00191 }
00192 
00193 void 
00194 Launcher::Job::setJobFile(const std::string & job_file)
00195 {
00196   // Check job file
00197   if (job_file == "")
00198   {
00199     std::string mess = "Empty Job File is forbidden !";
00200     throw LauncherException(mess);
00201   }
00202 
00203   _job_file = job_file;
00204   std::string::size_type p1 = _job_file.find_last_of("/");
00205   std::string::size_type p2 = _job_file.find_last_of(".");
00206   _job_file_name_complete = _job_file.substr(p1+1);
00207   _job_file_name = _job_file.substr(p1+1,p2-p1-1);
00208 }
00209 
00210 std::string
00211 Launcher::Job::getJobFile()
00212 {
00213   return _job_file;
00214 }
00215 void 
00216 Launcher::Job::setEnvFile(const std::string & env_file)
00217 {
00218   _env_file = env_file;
00219 }
00220 
00221 std::string
00222 Launcher::Job::getEnvFile()
00223 {
00224   return _env_file;
00225 }
00226 
00227 void 
00228 Launcher::Job::setWorkDirectory(const std::string & work_directory)
00229 {
00230   _work_directory = work_directory;
00231 }
00232 
00233 void 
00234 Launcher::Job::setLocalDirectory(const std::string & local_directory)
00235 {
00236   _local_directory = local_directory;
00237 }
00238 
00239 void 
00240 Launcher::Job::setResultDirectory(const std::string & result_directory)
00241 {
00242   _result_directory = result_directory;
00243 }
00244 
00245 void 
00246 Launcher::Job::add_in_file(const std::string & file)
00247 {
00248   std::list<std::string>::iterator it = std::find(_in_files.begin(), _in_files.end(), file);
00249   if (it == _in_files.end())
00250     _in_files.push_back(file);
00251   else
00252     std::cerr << "Launcher::Job::add_in_file -- Warning file was already entered in in_files: " << file << std::endl;
00253 }
00254 
00255 void 
00256 Launcher::Job::add_out_file(const std::string & file)
00257 {
00258   std::list<std::string>::iterator it = std::find(_out_files.begin(), _out_files.end(), file);
00259   if (it == _out_files.end())
00260     _out_files.push_back(file);
00261   else
00262     std::cerr << "Launcher::Job::add_out_file -- Warning file was already entered in out_files: " << file << std::endl;
00263 }
00264 
00265 void 
00266 Launcher::Job::setMaximumDuration(const std::string & maximum_duration)
00267 {
00268   checkMaximumDuration(maximum_duration);
00269   _maximum_duration_in_second = convertMaximumDuration(maximum_duration);
00270   _maximum_duration = maximum_duration;
00271 }
00272 
00273 void 
00274 Launcher::Job::setResourceRequiredParams(const resourceParams & resource_required_params)
00275 {
00276   checkResourceRequiredParams(resource_required_params);
00277   _resource_required_params = resource_required_params;
00278 }
00279 
00280 void 
00281 Launcher::Job::setQueue(const std::string & queue)
00282 {
00283   _queue = queue;
00284 }
00285 
00286 std::string 
00287 Launcher::Job::getWorkDirectory()
00288 {
00289   return _work_directory;
00290 }
00291 
00292 std::string 
00293 Launcher::Job::getLocalDirectory()
00294 {
00295   return _local_directory;
00296 }
00297 
00298 std::string
00299 Launcher::Job::getResultDirectory()
00300 {
00301   return _result_directory;
00302 }
00303 
00304 const std::list<std::string> & 
00305 Launcher::Job::get_in_files()
00306 {
00307   return _in_files;
00308 }
00309 
00310 const std::list<std::string> & 
00311 Launcher::Job::get_out_files()
00312 {
00313   return _out_files;
00314 }
00315 
00316 std::string 
00317 Launcher::Job::getMaximumDuration()
00318 {
00319   return _maximum_duration;
00320 }
00321 
00322 resourceParams 
00323 Launcher::Job::getResourceRequiredParams()
00324 {
00325   return _resource_required_params;
00326 }
00327 
00328 std::string 
00329 Launcher::Job::getQueue()
00330 {
00331   return _queue;
00332 }
00333 
00334 void 
00335 Launcher::Job::checkMaximumDuration(const std::string & maximum_duration)
00336 {
00337   std::string result("");
00338   std::string edt_value = maximum_duration;
00339   std::size_t pos = edt_value.find(":");
00340 
00341   if (edt_value != "") {
00342     if (pos == edt_value.npos) {
00343       throw LauncherException("[Launcher::Job::checkMaximumDuration] Error on definition: " + edt_value);
00344     }
00345     std::string begin_edt_value = edt_value.substr(0, pos);
00346     std::string mid_edt_value = edt_value.substr(pos, 1);
00347     std::string end_edt_value = edt_value.substr(pos + 1, edt_value.npos);
00348   
00349     long value;
00350     std::istringstream iss(begin_edt_value);
00351     if (!(iss >> value)) {
00352       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! : " + edt_value;
00353     }
00354     else if (value < 0) {
00355       result = "[Launcher::Job::checkExpectedDuration] Error on definition time is negative ! : " + value;
00356     }
00357     std::istringstream iss_2(end_edt_value);
00358     if (!(iss_2 >> value)) {
00359       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! : " + edt_value;
00360     }
00361     else if (value < 0) {
00362       result = "[Launcher::Job::checkExpectedDuration] Error on definition time is negative ! : " + value;
00363     }
00364     if (mid_edt_value != ":") {
00365       result = "[Launcher::Job::checkExpectedDuration] Error on definition ! :" + edt_value;
00366     }
00367   }
00368   if (result != "")
00369     throw LauncherException(result);
00370 }
00371 
00372 void 
00373 Launcher::Job::checkResourceRequiredParams(const resourceParams & resource_required_params)
00374 {
00375   // nb_proc has be to > 0
00376   if (resource_required_params.nb_proc <= 0)
00377   {
00378     std::string message("[Launcher::Job::checkResourceRequiredParams] proc number is not > 0 ! ");
00379     throw LauncherException(message);
00380   }
00381 }
00382 
00383 long 
00384 Launcher::Job::convertMaximumDuration(const std::string & edt)
00385 {
00386   long hh, mm, ret;
00387 
00388   if( edt.size() == 0 )
00389     return -1;
00390 
00391   std::string::size_type pos = edt.find(":");
00392   std::string h = edt.substr(0,pos);
00393   std::string m = edt.substr(pos+1,edt.size()-pos+1);
00394   std::istringstream issh(h);
00395   issh >> hh;
00396   std::istringstream issm(m);
00397   issm >> mm;
00398   ret = hh*60 + mm;
00399   ret = ret * 60;
00400 
00401   return ret;
00402 }
00403 
00404 std::string 
00405 Launcher::Job::getLaunchDate()
00406 {
00407   time_t rawtime;
00408   time(&rawtime);
00409   std::string launch_date = ctime(&rawtime);
00410   int i = 0 ;
00411   for (;i < launch_date.size(); i++) 
00412     if (launch_date[i] == '/' ||
00413         launch_date[i] == '-' ||
00414         launch_date[i] == ':' ||
00415         launch_date[i] == ' ') 
00416       launch_date[i] = '_';
00417   launch_date.erase(--launch_date.end()); // Last caracter is a \n
00418 
00419   return launch_date;
00420 }
00421 
00422 std::string
00423 Launcher::Job::updateJobState()
00424 {
00425 
00426   if (_state != "FINISHED" &&
00427       _state != "ERROR"    &&
00428       _state != "FAILED")
00429   {
00430 #ifdef WITH_LIBBATCH
00431     if (_batch_job_id.getReference() != "undefined")
00432     {
00433       // A batch manager has been affected to the job
00434       Batch::JobInfo job_info = _batch_job_id.queryJob();
00435       Batch::Parametre par = job_info.getParametre();
00436       _state = par[Batch::STATE].str();
00437       LAUNCHER_MESSAGE("State received is: " << par[Batch::STATE].str());
00438     }
00439 #endif
00440   }
00441   return _state;
00442 }
00443 
00444 #ifdef WITH_LIBBATCH
00445 Batch::Job * 
00446 Launcher::Job::getBatchJob()
00447 {
00448   update_job();
00449   return _batch_job;
00450 }
00451 
00452 Batch::Parametre
00453 Launcher::Job::common_job_params()
00454 {
00455   Batch::Parametre params;
00456 
00457   params[Batch::NAME] = getJobName();
00458   params[Batch::USER] = _resource_definition.UserName;
00459   params[Batch::NBPROC] = _resource_required_params.nb_proc;
00460 
00461   // Memory in megabytes
00462   if (_resource_required_params.mem_mb > 0)
00463   {
00464     params[Batch::MAXRAMSIZE] = _resource_required_params.mem_mb;
00465   }
00466 
00467   // We define a default directory based on user time
00468   if (_work_directory == "")
00469   {
00470     std::string thedate;
00471     Batch::Date date = Batch::Date(time(0));
00472     thedate = date.str();
00473     int lend = thedate.size() ;
00474     int i = 0 ;
00475     while ( i < lend ) {
00476       if ( thedate[i] == '/' || thedate[i] == '-' || thedate[i] == ':' ) {
00477         thedate[i] = '_' ;
00478       }
00479       i++ ;
00480     }
00481     _work_directory = std::string("$HOME/Batch/");
00482     _work_directory += thedate;
00483   }
00484   params[Batch::WORKDIR] = _work_directory;
00485   params[Batch::TMPDIR] = _work_directory; // To Compatibility -- remove ??? TODO
00486 
00487   // If result_directory is not defined, we use HOME environnement
00488   if (_result_directory == "")
00489     _result_directory = getenv("HOME");
00490 
00491   // _in_files
00492   std::list<std::string> in_files(_in_files);
00493   in_files.push_back(_job_file);
00494   if (_env_file != "")
00495          in_files.push_back(_env_file);
00496   for(std::list<std::string>::iterator it = in_files.begin(); it != in_files.end(); it++)
00497   {
00498     std::string file = *it;
00499 
00500     // local file -> If file is not an absolute path, we apply _local_directory
00501     std::string local_file;
00502     if (file.substr(0, 1) == std::string("/"))
00503       local_file = file;
00504     else
00505       local_file = _local_directory + "/" + file;
00506     
00507     // remote file -> get only file name from in_files
00508     size_t found = file.find_last_of("/");
00509     std::string remote_file = _work_directory + "/" + file.substr(found+1);
00510 
00511     params[Batch::INFILE] += Batch::Couple(local_file, remote_file);
00512   }
00513    
00514   // _out_files
00515   for(std::list<std::string>::iterator it = _out_files.begin(); it != _out_files.end(); it++)
00516   {
00517     std::string file = *it;
00518 
00519     // local file 
00520     size_t found = file.find_last_of("/");
00521     std::string local_file = _result_directory +  "/" + file.substr(found+1);
00522 
00523     // remote file -> If file is not an absolute path, we apply _work_directory
00524     std::string remote_file;
00525     if (file.substr(0, 1) == std::string("/"))
00526       remote_file = file;
00527     else
00528       remote_file = _work_directory + "/" + file;
00529 
00530     params[Batch::OUTFILE] += Batch::Couple(local_file, remote_file);
00531   }
00532 
00533   // Time
00534   if (_maximum_duration_in_second != -1)
00535     params[Batch::MAXWALLTIME] = _maximum_duration_in_second / 60;
00536 
00537   // Queue
00538   if (_queue != "")
00539     params[Batch::QUEUE] = _queue;
00540 
00541   // Specific parameters
00542   std::map<std::string, std::string>::iterator it = _specific_parameters.find("LoalLevelerJobType");
00543   if (it != _specific_parameters.end())
00544     params["LL_JOBTYPE"] = it->second;
00545   return params;
00546 }
00547 
00548 void 
00549 Launcher::Job::setBatchManagerJobId(Batch::JobId batch_manager_job_id)
00550 {
00551   _batch_job_id = batch_manager_job_id;
00552 }
00553 
00554 Batch::JobId 
00555 Launcher::Job::getBatchManagerJobId()
00556 {
00557   return _batch_job_id;
00558 }
00559 #endif
00560 
00561 void
00562 Launcher::Job::addToXmlDocument(xmlNodePtr root_node)
00563 {
00564   // Begin job
00565   xmlNodePtr job_node = xmlNewChild(root_node, NULL, xmlCharStrdup("job"), NULL);
00566   xmlNewProp(job_node, xmlCharStrdup("type"), xmlCharStrdup(getJobType().c_str()));
00567   xmlNewProp(job_node, xmlCharStrdup("name"), xmlCharStrdup(getJobName().c_str()));
00568 
00569   // Add user part
00570   xmlNodePtr node = xmlNewChild(job_node, NULL, xmlCharStrdup("user_part"), NULL);
00571 
00572   xmlNewChild(node, NULL, xmlCharStrdup("job_file"),         xmlCharStrdup(getJobFile().c_str()));
00573   xmlNewChild(node, NULL, xmlCharStrdup("env_file"),         xmlCharStrdup(getEnvFile().c_str()));
00574   xmlNewChild(node, NULL, xmlCharStrdup("work_directory"),   xmlCharStrdup(getWorkDirectory().c_str()));
00575   xmlNewChild(node, NULL, xmlCharStrdup("local_directory"),  xmlCharStrdup(getLocalDirectory().c_str()));
00576   xmlNewChild(node, NULL, xmlCharStrdup("result_directory"), xmlCharStrdup(getResultDirectory().c_str()));
00577 
00578   // Files
00579   xmlNodePtr files_node = xmlNewChild(node, NULL, xmlCharStrdup("files"), NULL);
00580   std::list<std::string> in_files  = get_in_files();
00581   std::list<std::string> out_files = get_out_files();
00582   for(std::list<std::string>::iterator it = in_files.begin(); it != in_files.end(); it++)
00583     xmlNewChild(files_node, NULL, xmlCharStrdup("in_file"), xmlCharStrdup((*it).c_str()));
00584   for(std::list<std::string>::iterator it = out_files.begin(); it != out_files.end(); it++)
00585     xmlNewChild(files_node, NULL, xmlCharStrdup("out_file"), xmlCharStrdup((*it).c_str()));
00586 
00587   // Resource part
00588   resourceParams resource_params = getResourceRequiredParams();
00589   xmlNodePtr res_node = xmlNewChild(node, NULL, xmlCharStrdup("resource_params"), NULL);
00590   xmlNewChild(res_node, NULL, xmlCharStrdup("name"),   xmlCharStrdup(resource_params.name.c_str()));
00591   xmlNewChild(res_node, NULL, xmlCharStrdup("hostname"),   xmlCharStrdup(resource_params.hostname.c_str()));
00592   xmlNewChild(res_node, NULL, xmlCharStrdup("OS"),   xmlCharStrdup(resource_params.OS.c_str()));
00593   std::ostringstream nb_proc_stream;
00594   std::ostringstream nb_node_stream;
00595   std::ostringstream nb_proc_per_node_stream;
00596   std::ostringstream cpu_clock_stream;
00597   std::ostringstream mem_mb_stream;
00598   nb_proc_stream << resource_params.nb_proc;
00599   nb_node_stream << resource_params.nb_node;
00600   nb_proc_per_node_stream << resource_params.nb_proc_per_node;
00601   cpu_clock_stream << resource_params.cpu_clock;
00602   mem_mb_stream << resource_params.mem_mb;
00603   xmlNewChild(res_node, NULL, xmlCharStrdup("nb_proc"),            xmlCharStrdup(nb_proc_stream.str().c_str()));
00604   xmlNewChild(res_node, NULL, xmlCharStrdup("nb_node"),            xmlCharStrdup(nb_node_stream.str().c_str()));
00605   xmlNewChild(res_node, NULL, xmlCharStrdup("nb_proc_per_node"),   xmlCharStrdup(nb_proc_per_node_stream.str().c_str()));
00606   xmlNewChild(res_node, NULL, xmlCharStrdup("cpu_clock"),          xmlCharStrdup(cpu_clock_stream.str().c_str()));
00607   xmlNewChild(res_node, NULL, xmlCharStrdup("mem_mb"),             xmlCharStrdup(mem_mb_stream.str().c_str()));
00608 
00609   xmlNewChild(node, NULL, xmlCharStrdup("maximum_duration"), xmlCharStrdup(getMaximumDuration().c_str()));
00610   xmlNewChild(node, NULL, xmlCharStrdup("queue"),            xmlCharStrdup(getQueue().c_str()));
00611 
00612   // Specific parameters part
00613   xmlNodePtr specific_parameters_node = xmlNewChild(node, NULL, xmlCharStrdup("specific_parameters"), NULL);
00614   std::map<std::string, std::string> specific_parameters = getSpecificParameters();
00615   for(std::map<std::string, std::string>::iterator it = specific_parameters.begin(); it != specific_parameters.end(); it++)
00616   {
00617     xmlNodePtr specific_parameter_node = xmlNewChild(specific_parameters_node, NULL, xmlCharStrdup("specific_parameter"), NULL);
00618     xmlNewChild(specific_parameter_node, NULL, xmlCharStrdup("name"), xmlCharStrdup((it->first).c_str()));
00619     xmlNewChild(specific_parameter_node, NULL, xmlCharStrdup("value"), xmlCharStrdup((it->second).c_str()));
00620   }
00621 
00622   // Run part
00623   xmlNodePtr run_node = xmlNewChild(job_node, NULL, xmlCharStrdup("run_part"), NULL);
00624   xmlNewChild(run_node, NULL, xmlCharStrdup("job_state"), xmlCharStrdup(getState().c_str()));
00625   ParserResourcesType resource_definition = getResourceDefinition();
00626   xmlNewChild(run_node, NULL, xmlCharStrdup("resource_choosed_name"), xmlCharStrdup(resource_definition.Name.c_str()));
00627 
00628 #ifdef WITH_LIBBATCH
00629   Batch::JobId job_id = getBatchManagerJobId();
00630   xmlNewChild(run_node, NULL, xmlCharStrdup("job_reference"), xmlCharStrdup(job_id.getReference().c_str()));
00631 #endif
00632 }
00633 
00634 void 
00635 Launcher::Job::addSpecificParameter(const std::string & name,
00636                                       const std::string & value)
00637 {
00638   _specific_parameters[name] = value;
00639 }
00640 
00641 const std::map<std::string, std::string> &
00642 Launcher::Job::getSpecificParameters()
00643 {
00644   return _specific_parameters;
00645 }
00646 
00647 void
00648 Launcher::Job::checkSpecificParameters()
00649 {
00650 }