Back to index

nordugrid-arc-nox  1.1.0~rc6
paul.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 
00005 #include <glibmm.h>
00006 #include <sys/stat.h>
00007 #include <sys/types.h>
00008 #include <unistd.h>
00009 #include <fstream>
00010 #include <iostream>
00011 #include <unistd.h>
00012 
00013 #include <arc/URL.h>
00014 #include <arc/Run.h>
00015 #include <arc/message/PayloadSOAP.h>
00016 #include <arc/message/PayloadRaw.h>
00017 #include <arc/message/PayloadStream.h>
00018 #include <arc/ws-addressing/WSA.h>
00019 #include <arc/Thread.h>
00020 #include <arc/StringConv.h>
00021 #include <arc/client/ClientInterface.h>
00022 #ifdef WIN32
00023 #include <arc/win32.h>
00024 #endif
00025 
00026 #include "paul.h"
00027 
00028 namespace Paul {
00029 
00030 // Static initializator
00031 static Arc::Plugin* get_service(Arc::PluginArgument* arg) 
00032 {
00033     Arc::ServicePluginArgument* srvarg =
00034             arg?dynamic_cast<Arc::ServicePluginArgument*>(arg):NULL;
00035     if(!srvarg) return NULL;
00036     return new PaulService((Arc::Config*)(*srvarg));
00037 }
00038 
00039 void PaulService::GetActivities(const std::string &url_str, std::vector<std::string> &ret)
00040 {
00041     // Collect information about resources
00042     // and create Glue compatibile Resource description
00043     Arc::NS glue2_ns;
00044     glue2_ns["glue2"] = ns_["glue2"];
00045     Arc::XMLNode glue2(glue2_ns, "Domains");
00046     if (information_collector(glue2) == false) {
00047         logger_.msg(Arc::ERROR, "Cannot collect resource information");
00048         return;
00049     }
00050     {
00051         std::string str;
00052         glue2.GetDoc(str);
00053         logger_.msg(Arc::VERBOSE, str);
00054     }
00055     // Create client to url
00056     Arc::ClientSOAP *client;
00057     Arc::MCCConfig cfg;
00058     Arc::URL url(url_str);
00059     if (url.Protocol() == "https") {
00060         cfg.AddPrivateKey(configurator.getPki()["PrivateKey"]);
00061         cfg.AddCertificate(configurator.getPki()["CertificatePath"]);
00062         cfg.AddCAFile(configurator.getPki()["CACertificatePath"]);
00063         cfg.AddCADir(configurator.getPki()["CACertificatesDir"]);
00064     }
00065     client = new Arc::ClientSOAP(cfg, url, 60);
00066     // invoke GetActivity SOAP call
00067     Arc::PayloadSOAP request(ns_);
00068     request.NewChild("ibes:GetActivities").NewChild(glue2);
00069     Arc::PayloadSOAP *response;
00070     Arc::MCC_Status status = client->process(&request, &response);
00071     if (!status) {
00072         logger_.msg(Arc::ERROR, "Request failed");
00073         if (response) {
00074            std::string str;
00075            response->GetXML(str);
00076            logger_.msg(Arc::ERROR, str);
00077            delete response;
00078         }
00079         delete client;
00080         return;
00081     }
00082     if (!response) {
00083         logger_.msg(Arc::ERROR, "No response");
00084         delete client;
00085         return;
00086     }
00087     
00088     // Handle soap level error
00089     Arc::XMLNode fs;
00090     (*response)["Fault"]["faultstring"].New(fs);
00091     std::string faultstring = (std::string)fs;
00092     if (faultstring != "") {
00093         logger_.msg(Arc::ERROR, faultstring);
00094         delete response;
00095         delete client;
00096         return;
00097     }
00098     delete client;
00099     // Create jobs from response
00100     Arc::XMLNode activities;
00101     activities = (*response)["ibes:GetActivitiesResponse"]["ibes:Activities"];
00102     Arc::XMLNode activity;
00103     for (int i = 0; (activity = activities.Child(i)) != false; i++) {
00104         Arc::XMLNode id = activity["ActivityIdentifier"];
00105         if (!id) {
00106             logger_.msg(Arc::VERBOSE, "Missing job identifier");
00107             continue;
00108         }
00109         Arc::WSAEndpointReference epr(id);
00110         std::string job_id = epr.ReferenceParameters()["sched:JobID"];
00111         if (job_id.empty()) {
00112             logger_.msg(Arc::VERBOSE, "Cannot find job id");
00113             continue;
00114         }
00115         std::string resource_id = epr.Address();
00116         if (resource_id.empty()) {
00117             logger_.msg(Arc::VERBOSE, "Cannot find scheduler endpoint");
00118             continue;
00119         }
00120         Arc::XMLNode jsdl = activity["ActivityDocument"]["JobDefinition"];
00121         JobRequest jr(jsdl);
00122         Job j(jr);
00123         j.setStatus(NEW);
00124         j.setID(job_id);
00125         j.setResourceID(resource_id); // here the resource is the scheduler endpoint
00126         ret.push_back(job_id);
00127         jobq.addJob(j);
00128         std::string s = sched_status_to_string(j.getStatus());
00129         logger_.msg(Arc::VERBOSE, "Status: %s %d",s,j.getStatus());
00130         // j.save();
00131     }
00132     delete response;
00133 }
00134 
00135 typedef struct {
00136     PaulService *self;
00137     std::string *job_id;
00138 } ServiceAndJob;
00139 
00140 void PaulService::process_job(void *arg)
00141 {
00142     ServiceAndJob &info = *((ServiceAndJob *)arg);
00143     PaulService &self = *(info.self);
00144     Job &j = self.jobq[*(info.job_id)];
00145     self.logger_.msg(Arc::VERBOSE, "Process job: %s", j.getID());
00146     j.setStatus(STARTING);
00147     self.stage_in(j);
00148     self.run(j);
00149     if (!self.in_shutdown) {
00150         self.stage_out(j);
00151         if (j.getStatus() != KILLED && j.getStatus() != KILLING && j.getStatus() != FAILED) {
00152             self.logger_.msg(Arc::VERBOSE, "%s set finished", j.getID());
00153             j.setStatus(FINISHED);
00154         }
00155     }
00156     // free memory
00157     delete info.job_id;
00158     delete &info;
00159     self.logger_.msg(Arc::VERBOSE, "Finished job %s", j.getID());
00160 }
00161 
00162 void PaulService::do_request(void)
00163 {
00164     // XXX pickup scheduler randomly from schdeuler list
00165     std::vector<std::string> schedulers = configurator.getSchedulers();
00166     if (schedulers.size() == 0) {
00167         logger_.msg(Arc::WARNING, "No scheduler configured");
00168         return;
00169     }
00170     std::string url = schedulers[0];
00171     // XXX check if there is no scheduler
00172     logger_.msg(Arc::VERBOSE, "Do Request: %s", url);
00173     // check if there is no free CPU slot
00174     int active_job = 0;
00175     std::map<const std::string, Job *> all = jobq.getAllJobs();
00176     std::map<const std::string, Job *>::iterator it;
00177     for (it = all.begin(); it != all.end(); it++) {
00178         Job *j = it->second;
00179         SchedStatusLevel status = j->getStatus();
00180         if (status == NEW || status == STARTING || status == RUNNING) {
00181             active_job++;
00182         }
00183     }
00184     int cpu_num = sysinfo.getPhysicalCPUs();
00185     if (active_job >= cpu_num) {
00186         logger_.msg(Arc::VERBOSE, "No free CPU slot");
00187         return;
00188     }
00189     std::vector<std::string> job_ids;
00190     GetActivities(url, job_ids);
00191     for (int i = 0; i < job_ids.size(); i++) {
00192         ServiceAndJob *arg = new ServiceAndJob;
00193         arg->self = this;
00194         arg->job_id = new std::string(job_ids[i]);
00195         Arc::CreateThreadFunction(&process_job, arg);
00196     }
00197 }
00198 
00199 // Main request loop
00200 void PaulService::request_loop(void* arg) 
00201 {
00202     PaulService *self = (PaulService *)arg;
00203     
00204     for (;;) {
00205         self->do_request();
00206         int p = self->configurator.getPeriod();
00207         self->logger_.msg(Arc::VERBOSE, "Per: %d", p);
00208         sleep(p);       
00209     }
00210 }
00211 
00212 // Report status of jobs
00213 void PaulService::do_report(void)
00214 {
00215     logger_.msg(Arc::VERBOSE, "Report status");
00216     std::map<const std::string, Job *> all = jobq.getAllJobs();
00217     std::map<const std::string, Job *>::iterator it;
00218     std::map<std::string, Arc::PayloadSOAP *> requests;
00219 
00220     for (it = all.begin(); it != all.end(); it++) {
00221         Job *j = it->second;
00222         std::string sched_url = j->getResourceID();
00223         Arc::XMLNode report;
00224         std::map<std::string, Arc::PayloadSOAP *>::iterator r = requests.find(sched_url);
00225         if (r == requests.end()) {
00226             Arc::PayloadSOAP *request = new Arc::PayloadSOAP(ns_);
00227             report = request->NewChild("ibes:ReportActivitiesStatus");
00228             requests[sched_url] = request;
00229         } else {
00230             report = (*r->second)["ibes:ReportActivitiesStatus"];
00231         }
00232         
00233         Arc::XMLNode activity = report.NewChild("ibes:Activity");
00234         
00235         // request
00236         Arc::WSAEndpointReference identifier(activity.NewChild("ibes:ActivityIdentifier"));
00237         identifier.Address(j->getResourceID()); // address of scheduler service
00238         identifier.ReferenceParameters().NewChild("sched:JobID") = j->getID();
00239 
00240         Arc::XMLNode state = activity.NewChild("ibes:ActivityStatus");
00241         std::string s = sched_status_to_string(j->getStatus());
00242         logger.msg(Arc::VERBOSE, "%s reported %s", j->getID(), s);
00243         state.NewAttribute("ibes:state") = s;
00244         
00245     }
00246     
00247     Arc::MCCConfig cfg;
00248     Arc::ClientSOAP *client;
00249 
00250     std::map<std::string, Arc::PayloadSOAP *>::iterator i;
00251     for (i = requests.begin(); i != requests.end(); i++) {
00252         std::string url_str = i->first;
00253         Arc::PayloadSOAP *request = i->second;
00254         Arc::URL url(url_str);
00255         if (url.Protocol() == "https") {
00256             cfg.AddPrivateKey(configurator.getPki()["PrivateKey"]);
00257             cfg.AddCertificate(configurator.getPki()["CertificatePath"]);
00258             cfg.AddCAFile(configurator.getPki()["CACertificatePath"]);
00259             cfg.AddCADir(configurator.getPki()["CACertificatesDir"]);
00260         }
00261         client = new Arc::ClientSOAP(cfg, url, 60);
00262 
00263         Arc::PayloadSOAP *response;
00264         Arc::MCC_Status status = client->process(request, &response);
00265         if (!status) {
00266             logger_.msg(Arc::ERROR, "Request failed");
00267             if (response) {
00268                 std::string str;
00269                 response->GetXML(str);
00270                 logger_.msg(Arc::ERROR, str);
00271                 delete response;
00272             }
00273             delete client;
00274             continue;
00275         }
00276         if (!response) {
00277             logger_.msg(Arc::ERROR, "No response");
00278             delete response;
00279             delete client;
00280             continue;
00281         }
00282     
00283         // Handle soap level error
00284         Arc::XMLNode fs;
00285         (*response)["Fault"]["faultstring"].New(fs);
00286         std::string faultstring = (std::string)fs;
00287         if (faultstring != "") {
00288             logger_.msg(Arc::ERROR, faultstring);
00289             delete response;
00290             delete client;
00291             continue;
00292         }
00293         delete response;
00294         delete client;
00295         // delete client;
00296         // mark all finsihed job as sucessfully reported finished job
00297         Arc::XMLNode req = (*request)["ibes:ReportActivitiesStatus"];
00298         Arc::XMLNode activity;
00299         for (int i = 0; (activity = req["Activity"][i]) != false; i++) {
00300             Arc::XMLNode id = activity["ActivityIdentifier"];
00301             Arc::WSAEndpointReference epr(id);
00302             std::string job_id = epr.ReferenceParameters()["sched:JobID"];
00303             if (job_id.empty()) {
00304                 logger_.msg(Arc::ERROR, "Cannot find job id");
00305                 continue;
00306             }
00307             logger_.msg(Arc::VERBOSE, "%s reported", job_id);
00308             Job &j = jobq[job_id];
00309             if (j.getStatus() == FINISHED || j.getStatus() == FAILED) {
00310                 logger_.msg(Arc::VERBOSE, "%s job reported finished", j.getID());
00311                 j.finishedReported();
00312             }
00313         }
00314     }
00315     // free
00316     for (i = requests.begin(); i != requests.end(); i++) {
00317         delete i->second;
00318     }
00319 }
00320 
00321 void PaulService::do_action(void)
00322 {
00323     logger_.msg(Arc::VERBOSE, "Get activity status changes");   
00324     std::map<const std::string, Job *> all = jobq.getAllJobs();
00325     std::map<const std::string, Job *>::iterator it;
00326     std::map<std::string, Arc::PayloadSOAP *> requests;
00327     // collect schedulers 
00328     for (it = all.begin(); it != all.end(); it++) {
00329         Job *j = it->second;
00330         std::string sched_url = j->getResourceID();
00331         Arc::XMLNode get;
00332         std::map<std::string, Arc::PayloadSOAP *>::iterator r = requests.find(sched_url);
00333         if (r == requests.end()) {
00334             Arc::PayloadSOAP *request = new Arc::PayloadSOAP(ns_);
00335             get = request->NewChild("ibes:GetActivitiesStatusChanges");
00336             requests[sched_url] = request;
00337         } else {
00338             get = (*r->second)["ibes:GetActivitiesStatusChanges"];
00339         }
00340         // Make response
00341         Arc::WSAEndpointReference identifier(get.NewChild("ibes:ActivityIdentifier"));
00342         identifier.Address(j->getResourceID()); // address of scheduler service
00343         identifier.ReferenceParameters().NewChild("sched:JobID") = j->getID();
00344     }
00345     
00346     Arc::MCCConfig cfg;
00347     // call get activitiy changes to all scheduler
00348     std::map<std::string, Arc::PayloadSOAP *>::iterator i;
00349     for (i = requests.begin(); i != requests.end(); i++) {
00350         std::string sched_url = i->first;
00351         Arc::PayloadSOAP *request = i->second;
00352         Arc::ClientSOAP *client;
00353         Arc::URL url(sched_url);
00354         if (url.Protocol() == "https") {
00355             cfg.AddPrivateKey(configurator.getPki()["PrivateKey"]);
00356             cfg.AddCertificate(configurator.getPki()["CertificatePath"]);
00357             cfg.AddCAFile(configurator.getPki()["CACertificatePath"]);
00358             cfg.AddCADir(configurator.getPki()["CACertificatesDir"]);
00359         }
00360         client = new Arc::ClientSOAP(cfg, url, 60);
00361         Arc::PayloadSOAP *response;
00362         Arc::MCC_Status status = client->process(request, &response);
00363         if (!status) {
00364             logger_.msg(Arc::ERROR, "Request failed");
00365             if (response) {
00366                 std::string str;
00367                 response->GetXML(str);
00368                 logger_.msg(Arc::ERROR, str);
00369                 delete response;
00370             }
00371             delete client;
00372             continue;
00373         }
00374         if (!response) {
00375             logger_.msg(Arc::ERROR, "No response");
00376             delete response;
00377             delete client;
00378             continue;
00379         }
00380     
00381         // Handle soap level error
00382         Arc::XMLNode fs;
00383         (*response)["Fault"]["faultstring"].New(fs);
00384         std::string faultstring = (std::string)fs;
00385         if (faultstring != "") {
00386             logger_.msg(Arc::ERROR, faultstring);
00387             delete response;
00388             delete client;
00389             continue;
00390         }
00391         delete client;
00392         // process response
00393         Arc::XMLNode activities = (*response)["ibes:GetActivitiesStatusChangesResponse"]["Activities"];
00394         Arc::XMLNode activity;
00395         for (int i = 0; (activity = activities["Activity"][i]) != false; i++) {
00396             Arc::XMLNode id = activity["ActivityIdentifier"];
00397             Arc::WSAEndpointReference epr(id);
00398             std::string job_id = epr.ReferenceParameters()["sched:JobID"];
00399             if (job_id.empty()) {
00400                 logger_.msg(Arc::WARNING, "Cannot find job id");
00401             }
00402             std::string new_status = (std::string)activity["NewState"];
00403             Job &j = jobq[job_id];
00404             if (j.isFinishedReported()) {
00405                 // skip job which was already finished
00406                 continue;
00407             }
00408             logger.msg(Arc::VERBOSE, "%s new status: %s", j.getID(), new_status);
00409             j.setStatus(sched_status_from_string(new_status));
00410             // do actions
00411             if (j.getStatus() == KILLED) { 
00412                 j.setStatus(FINISHED);
00413             }
00414             if (j.getStatus() == KILLING) {
00415                 Arc::Run *run = runq[job_id];
00416                 if (run != NULL) {
00417                     logger_.msg(Arc::VERBOSE, "Killing %s", job_id);
00418                     run->Kill(1);
00419                 }
00420                 j.setStatus(KILLED);
00421             }
00422         }
00423         delete response;
00424     }
00425     // free
00426     for (i = requests.begin(); i != requests.end(); i++) {
00427         delete i->second;
00428     }
00429     // cleanup finished process
00430     for (it = all.begin(); it != all.end(); it++) {
00431         Job *j = it->second;
00432         logger_.msg(Arc::VERBOSE, "pre cleanup %s %d", j->getID(), j->getStatus());
00433         if (j->getStatus() == FINISHED || j->getStatus() == FAILED) {
00434             // do clean if and only if the finished state already reported
00435             if (j->isFinishedReported()) {
00436                 logger_.msg(Arc::VERBOSE, "cleanup %s", j->getID());
00437                 j->clean(configurator.getJobRoot());
00438                 logger_.msg(Arc::VERBOSE, "cleanup 2 %s", j->getID());
00439                 jobq.removeJob(*j);
00440             }           
00441         } 
00442     }
00443 
00444 }
00445 
00446 // Main reported loop
00447 void PaulService::report_and_action_loop(void *arg)
00448 {
00449     PaulService *self = (PaulService *)arg;
00450     for (;;) {
00451         self->do_report();
00452         self->do_action();
00453         int p = (int)(self->configurator.getPeriod()*1.1);
00454         sleep(p);
00455     }
00456 }
00457 
00458 // Constructor
00459 PaulService::PaulService(Arc::Config *cfg):RegisteredService(cfg),in_shutdown(false),logger_(Arc::Logger::rootLogger, "Paul"),configurator(cfg)
00460 {
00461     // Define supported namespaces
00462     ns_["ibes"] = "http://www.nordugrid.org/schemas/ibes";
00463     ns_["glue2"] = "http://schemas.ogf.org/glue/2008/05/spec_2.0_d42_r1";
00464     ns_["sched"] = "http://www.nordugrid.org/schemas/sched";
00465     ns_["wsa"] = "http://www.w3.org/2005/08/addressing";
00466 
00467     configurator.setJobQueue(&jobq);
00468     // Start sched thread
00469     Arc::CreateThreadFunction(&request_loop, this);
00470     // Start report and action thread
00471     Arc::CreateThreadFunction(&report_and_action_loop, this);
00472 }
00473 
00474 // Destructor
00475 PaulService::~PaulService(void) 
00476 {
00477     in_shutdown = true;
00478     logger_.msg(Arc::VERBOSE, "PaulService shutdown");
00479     std::map<std::string, Arc::Run *>::iterator it;
00480     for (it = runq.begin(); it != runq.end(); it++) {
00481         if (it->second != NULL) {
00482             logger_.msg(Arc::VERBOSE, "Terminate job %s", it->first);
00483             Arc::Run *r = it->second;
00484             r->Kill(1);
00485         }
00486     }
00487 }
00488 
00489 Arc::MCC_Status PaulService::process(Arc::Message &in, Arc::Message &out)
00490 {
00491     return configurator.process(in, out);
00492 }
00493 
00494 } // namespace Paul
00495 
00496 Arc::PluginDescriptor PLUGINS_TABLE_NAME[] = {
00497     { "paul", "HED:SERVICE", 0, &Paul::get_service },
00498     { NULL, NULL, 0, NULL }
00499 };