Back to index

nordugrid-arc-nox  1.1.0~rc6
grid_sched.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 
00005 #include <glibmm.h>
00006 #include <sys/stat.h>
00007 #include <sys/types.h>
00008 #include <unistd.h>
00009 #include <fstream>
00010 #include <iostream>
00011 
00012 #include <arc/URL.h>
00013 #include <arc/Thread.h>
00014 #include <arc/StringConv.h>
00015 #include <arc/message/PayloadSOAP.h>
00016 #include <arc/message/PayloadStream.h>
00017 #include <arc/ws-addressing/WSA.h>
00018 #ifdef WIN32
00019 #include <arc/win32.h>
00020 #endif
00021 
00022 #include "grid_sched.h"
00023 
00024 namespace GridScheduler {
00025 
00026 class StatusJobSelector: public Arc::JobSelector
00027 {
00028     private:
00029         Arc::SchedJobStatus status_;
00030     public:
00031         StatusJobSelector(Arc::SchedJobStatus status) { status_ = status; };
00032         virtual bool match(Arc::Job *j) { if (j->getStatus() == status_) { return true; }; return false; };
00033 };
00034 
00035 #if 0
00036 Arc::MCC_Status 
00037 GridSchedulerService::make_fault(Arc::Message& /*outmsg*/) 
00038 {
00039     return Arc::MCC_Status();
00040 }
00041 
00042 Arc::MCC_Status 
00043 GridSchedulerService::make_response(Arc::Message& outmsg) 
00044 {
00045     Arc::PayloadRaw* outpayload = new Arc::PayloadRaw();
00046     outmsg.Payload(outpayload);
00047     return Arc::MCC_Status(Arc::STATUS_OK);
00048 }
00049 #endif
00050 
00051 // Static initializator
00052 static Arc::Plugin *
00053 get_service(Arc::PluginArgument* arg) 
00054 {
00055     Arc::ServicePluginArgument* srvarg =
00056             arg?dynamic_cast<Arc::ServicePluginArgument*>(arg):NULL;
00057     if(!srvarg) return NULL;
00058     return new GridSchedulerService((Arc::Config*)(*srvarg));
00059 }
00060 
00061 // Create Faults
00062 Arc::MCC_Status 
00063 GridSchedulerService::make_soap_fault(Arc::Message& outmsg) 
00064 {
00065     Arc::PayloadSOAP* outpayload = new Arc::PayloadSOAP(ns_,true);
00066     Arc::SOAPFault* fault = outpayload?outpayload->Fault():NULL;
00067     if(fault) {
00068         fault->Code(Arc::SOAPFault::Sender);
00069         fault->Reason("Failed processing request");
00070     }
00071 
00072     outmsg.Payload(outpayload);
00073     return Arc::MCC_Status(Arc::STATUS_OK);
00074 }
00075 
00076 // Main process 
00077 Arc::MCC_Status 
00078 GridSchedulerService::process(Arc::Message& inmsg, Arc::Message& outmsg) 
00079 {
00080     // Both input and output are supposed to be SOAP
00081     // Extracting payload
00082     Arc::PayloadSOAP* inpayload = NULL;
00083     try {
00084         inpayload = dynamic_cast<Arc::PayloadSOAP*>(inmsg.Payload());
00085     } catch(std::exception& e) { };
00086     if(!inpayload) {
00087         logger_.msg(Arc::ERROR, "input is not SOAP");
00088         return make_soap_fault(outmsg);
00089     }
00090 
00091     inpayload->Namespaces(ns_);
00092     {   
00093         std::string str;
00094         inpayload->GetDoc(str, true);
00095         logger_.msg(Arc::VERBOSE, "process: request=%s",str);
00096     }; 
00097 
00098     // Get operation
00099     Arc::XMLNode op = inpayload->Child(0);
00100     if(!op) {
00101         logger_.msg(Arc::ERROR, "input does not define operation");
00102         return make_soap_fault(outmsg);
00103     }
00104     logger_.msg(Arc::VERBOSE, "process: operation: %s", op.Name());
00105     // BES Factory operations
00106     Arc::PayloadSOAP* outpayload = new Arc::PayloadSOAP(ns_);
00107     Arc::PayloadSOAP& res = *outpayload;
00108     Arc::MCC_Status ret;
00109     if(MatchXMLName(op, "CreateActivity")) {
00110         Arc::XMLNode r = res.NewChild("bes-factory:CreateActivityResponse");
00111         ret = CreateActivity(op, r);
00112     } else if(MatchXMLName(op, "GetActivityStatuses")) {
00113         Arc::XMLNode r = res.NewChild("bes-factory:GetActivityStatusesResponse");
00114         ret = GetActivityStatuses(op, r);
00115     } else if(MatchXMLName(op, "TerminateActivities")) {
00116         Arc::XMLNode r = res.NewChild("bes-factory:TerminateActivitiesResponse");
00117         ret = TerminateActivities(op, r);
00118     } else if(MatchXMLName(op, "GetActivityDocuments")) {
00119         Arc::XMLNode r = res.NewChild("bes-factory:GetActivityDocumentsResponse");
00120         ret = GetActivityDocuments(op, r);
00121     } else if(MatchXMLName(op, "GetFactoryAttributesDocument")) {
00122         Arc::XMLNode r = res.NewChild("bes-factory:GetFactoryAttributesDocumentResponse");
00123         ret = GetFactoryAttributesDocument(op, r);
00124     } else if(MatchXMLName(op, "StopAcceptingNewActivities")) {
00125         Arc::XMLNode r = res.NewChild("bes-mgmt:StopAcceptingNewActivitiesResponse");
00126         ret = StopAcceptingNewActivities(op, r);
00127     } else if(MatchXMLName(op, "StartAcceptingNewActivities")) {
00128         Arc::XMLNode r = res.NewChild("bes-mgmt:StartAcceptingNewActivitiesResponse");
00129         ret = StartAcceptingNewActivities(op, r);
00130     } else if(MatchXMLName(op, "ChangeActivityStatus")) {
00131         Arc::XMLNode r = res.NewChild("bes-factory:ChangeActivityStatusResponse");
00132         ret = ChangeActivityStatus(op, r);
00133     // iBES
00134     } else if(MatchXMLName(op, "GetActivities")) {
00135         Arc::XMLNode r = res.NewChild("ibes:GetActivitiesResponse");
00136         std::string resource_id = inmsg.Attributes()->get("TCP:REMOTEHOST");
00137         ret = GetActivities(op, r, resource_id);
00138     } else if(MatchXMLName(op, "ReportActivitiesStatus")) {
00139         Arc::XMLNode r = res.NewChild("ibes:ReportActivitiesStatusResponse");
00140         std::string resource_id = inmsg.Attributes()->get("TCP:REMOTEHOST");
00141         ret = ReportActivitiesStatus(op, r, resource_id);
00142     } else if(MatchXMLName(op, "GetActivitiesStatusChanges")) {
00143         Arc::XMLNode r = res.NewChild("ibes:GetActivitiesStatusChangesResponse");
00144         std::string resource_id = inmsg.Attributes()->get("TCP:REMOTEHOST");
00145         ret = GetActivitiesStatusChanges(op, r, resource_id);
00146     // Delegation
00147     } else if(MatchXMLName(op, "DelegateCredentialsInit")) {
00148         if(!delegations_.DelegateCredentialsInit(*inpayload,*outpayload)) {
00149           delete inpayload;
00150           return make_soap_fault(outmsg);
00151         };
00152     // WS-Property
00153     } else if(MatchXMLNamespace(op,"http://docs.oasis-open.org/wsrf/rp-2")) {
00154         Arc::SOAPEnvelope* out_ = infodoc_.Process(*inpayload);
00155         if(out_) {
00156           *outpayload=*out_;
00157           delete out_;
00158         } else {
00159           delete inpayload; delete outpayload;
00160           return make_soap_fault(outmsg);
00161         };
00162     // Undefined operation
00163     } else {
00164         logger_.msg(Arc::ERROR, "SOAP operation is not supported: %s", op.Name());
00165         return make_soap_fault(outmsg);
00166     
00167     }
00168     {
00169         // VERBOSE
00170         std::string str;
00171         outpayload->GetXML(str);
00172         logger_.msg(Arc::VERBOSE, "process: response=%s", str);
00173     }
00174     // Set output
00175     outmsg.Payload(outpayload);
00176     return Arc::MCC_Status(Arc::STATUS_OK);
00177 }
00178 
00179 void GridSchedulerService::doSched(void)
00180 {   
00181     logger_.msg(Arc::VERBOSE, "doSched");
00182     jobq.checkpoint();
00183     logger_.msg(Arc::VERBOSE, "jobq checkpoint done");
00184 #if 0
00185     // log status
00186     logger_.msg(Arc::VERBOSE, "Count of jobs: %i"
00187                 " Count of resources: %i"
00188                 " Scheduler period: %i"
00189                 " Endpoint: %s"
00190                 " DBPath: %s",
00191                 sched_queue.size(), sched_resources.size(),
00192                 getPeriod(), endpoint, db_path);
00193 
00194     // searching for new sched jobs:
00195     std::map<const std::string, Job *> new_jobs = sched_queue.getJobsWithState(NEW);
00196     // submit new jobs
00197     // XXX make it two step: collect job and mark them to going to submit, lock the queue until this, and do the submit after it
00198     std::map<const std::string, Job *>::iterator iter;
00199     for (iter = new_jobs.begin(); iter != new_jobs.end(); iter++) {
00200         const std::string &job_id = iter->first;
00201         logger_.msg(Arc::VERBOSE, "NEW job: %s", job_id);
00202         Resource &arex = sched_resources.random();
00203         Job *j = iter->second;
00204         Arc::XMLNode &jsdl = j->getJSDL();
00205         // XXX better error handling
00206         std::string arex_job_id = arex.CreateActivity(jsdl);
00207         logger_.msg(Arc::VERBOSE, "A-REX ID: %s", arex.getURL());
00208         if (arex_job_id != "") {
00209             j->setResourceJobID(arex_job_id);
00210             j->setResourceID(arex.getURL());
00211             j->setStatus(STARTING);
00212         } else {
00213             logger_.msg(Arc::VERBOSE, "Sched job ID: %s NOT SUBMITTED", job_id);
00214             sched_resources.refresh(arex.getURL());
00215         }
00216         j->save();
00217     }
00218 #endif
00219     // search for job which are killed by user
00220     StatusJobSelector sel(Arc::JOB_STATUS_SCHED_KILLING);
00221     for (Arc::JobQueueIterator jobs = jobq.getAll((Arc::JobSelector *)&sel); jobs.hasMore(); jobs++){
00222         Arc::Job *j = *jobs;
00223         Arc::JobSchedMetaData *m = j->getJobSchedMetaData();
00224         if (m->getResourceID().empty()) {
00225             logger_.msg(Arc::VERBOSE, "%s set killed", j->getID());
00226             j->setStatus(Arc::JOB_STATUS_SCHED_KILLED);
00227             m->setEndTime(Arc::Time());
00228         } 
00229         /*
00230         else {
00231             Resource &arex = sched_resources.get(j->getResourceID());
00232             if (arex.TerminateActivity(arex_job_id)) {
00233                 logger_.msg(Arc::VERBOSE, "JobID: %s KILLED", job_id);
00234                 j->setStatus(KILLED);
00235                 sched_queue.removeJob(job_id);
00236             }
00237         } */
00238         jobs.refresh();
00239     }
00240 
00241     // cleanup jobq
00242     for (Arc::JobQueueIterator jobs = jobq.getAll(); jobs.hasMore(); jobs++)
00243     {
00244         Arc::Job *j = *jobs;
00245         Arc::JobSchedMetaData *m = j->getJobSchedMetaData();
00246         Arc::SchedJobStatus status = j->getStatus();
00247         if (status == Arc::JOB_STATUS_SCHED_FINISHED
00248             || status == Arc::JOB_STATUS_SCHED_KILLED
00249             || status == Arc::JOB_STATUS_SCHED_FAILED
00250             || status == Arc::JOB_STATUS_SCHED_UNKNOWN)
00251         {
00252             Arc::Period p(lifetime_after_done);
00253             Arc::Time now;
00254             if (now > (m->getEndTime() + p)) {
00255                 logger_.msg(Arc::VERBOSE, "%s remove from queue", j->getID());
00256                 jobs.remove();
00257             }
00258         }
00259     }
00260 
00261 #if 0
00262     // query a-rexes for the job statuses:
00263     std::map<const std::string, Job *> all_job = sched_queue.getAllJobs();
00264     for (iter = all_job.begin(); iter != all_job.end(); iter++) {
00265         const std::string &job_id = iter->first;
00266         Job *j = iter->second;
00267         SchedStatusLevel job_stat = j->getStatus();
00268         // skip jobs with FINISHED state
00269         if (job_stat == FINISHED) {
00270             continue;
00271         }
00272         const std::string &arex_job_id = j->getResourceJobID();
00273         // skip unscheduled jobs
00274         if (arex_job_id.empty()) {
00275             logger_.msg(Arc::VERBOSE, "Sched job ID: %s (A-REX job ID is empty)", job_id);
00276             continue; 
00277         }
00278         // get state of the job at the resource
00279         Resource &arex = sched_resources.get(j->getResourceID());
00280         std::string state = arex.GetActivityStatus(arex_job_id);
00281         if (state == "UNKOWN") {
00282             if (!j->CheckTimeout()) {  // job timeout check
00283                 j->setStatus(NEW);
00284                 j->setResourceJobID("");
00285                 j->setResourceID("");
00286                 j->save();
00287                 sched_resources.refresh(arex.getURL());
00288                 // std::string url = arex.getURL();
00289                 // sched_resources.removeResource(url);
00290                 logger_.msg(Arc::VERBOSE, "Job RESCHEDULE: %s", job_id);
00291             }
00292         } else {
00293             // refresh status from A-REX state
00294             job_stat = sched_status_from_arex_status(state); 
00295             j->setStatus(job_stat);
00296             j->save();
00297             logger_.msg(Arc::VERBOSE, "JobID: %s state: %s", job_id, state);
00298         }
00299     }
00300 #endif
00301 }
00302 
00303 void sched(void* arg) 
00304 {
00305     GridSchedulerService *self = (GridSchedulerService*) arg;
00306     
00307     for(;;) {
00308         sleep(self->getPeriod());
00309         self->doSched();
00310     }
00311 }
00312 
00313 void
00314 GridSchedulerService::doReschedule(void)
00315 {
00316     logger_.msg(Arc::VERBOSE, "doReschedule");
00317     for (Arc::JobQueueIterator jobs = jobq.getAll(); jobs.hasMore(); jobs++){
00318         Arc::Job *j = *jobs;
00319         Arc::JobSchedMetaData *m = j->getJobSchedMetaData();
00320         Arc::Time now;
00321         Arc::Period p(reschedule_wait);
00322         m->setLastChecked(now);
00323         Arc::SchedJobStatus status = j->getStatus();
00324         if (status == Arc::JOB_STATUS_SCHED_FAILED ||
00325             status == Arc::JOB_STATUS_SCHED_NEW ||
00326             status == Arc::JOB_STATUS_SCHED_KILLING ||
00327             status == Arc::JOB_STATUS_SCHED_KILLED ||
00328             status == Arc::JOB_STATUS_SCHED_FINISHED) {
00329             // ignore this states
00330             jobs.refresh();
00331             continue;
00332         }
00333         logger_.msg(Arc::VERBOSE, "check: %s (%s - %s > %s (%s))", j->getID(), (std::string)now, (std::string)m->getLastChecked(), (std::string)(m->getLastUpdated() + p), (std::string)m->getLastUpdated());
00334         if (m->getLastChecked() > (m->getLastUpdated() + p)) {
00335             logger_.msg(Arc::VERBOSE, "Rescheduled job: %s", j->getID());
00336             j->setStatus(Arc::JOB_STATUS_SCHED_RESCHEDULED);
00337             m->setResourceID("");
00338         }
00339         jobs.refresh();
00340     }
00341 }
00342 
00343 void reschedule(void *arg)
00344 {
00345     GridSchedulerService *self = (GridSchedulerService *)arg;
00346     for (;;) {
00347         sleep(self->getReschedulePeriod());
00348         self->doReschedule();
00349     }
00350 }
00351 
00352 static void thread_starter(void* arg) {
00353   if(!arg) return;
00354   ((GridSchedulerService*)arg)->InformationCollector();
00355 }
00356 
00357 GridSchedulerService::GridSchedulerService(Arc::Config *cfg):RegisteredService(cfg),logger_(Arc::Logger::rootLogger, "GridScheduler") 
00358 {
00359     // Define supported namespaces
00360     // XXX use defs from ARC1 LIBS
00361     ns_["a-rex"]="http://www.nordugrid.org/schemas/a-rex";
00362     ns_["bes-factory"]="http://schemas.ggf.org/bes/2006/08/bes-factory";
00363     ns_["deleg"]="http://www.nordugrid.org/schemas/delegation";
00364     ns_["wsa"]="http://www.w3.org/2005/08/addressing";
00365     ns_["jsdl"]="http://schemas.ggf.org/jsdl/2005/11/jsdl";
00366     ns_["wsrf-bf"]="http://docs.oasis-open.org/wsrf/bf-2";
00367     ns_["wsrf-r"]="http://docs.oasis-open.org/wsrf/r-2";
00368     ns_["wsrf-rw"]="http://docs.oasis-open.org/wsrf/rw-2";
00369     ns_["ibes"]="http://www.nordugrid.org/schemas/ibes";
00370     ns_["sched"]="http://www.nordugrid.org/schemas/sched";
00371     ns_["bes-mgmt"]="http://schemas.ggf.org/bes/2006/08/bes-management";
00372     
00373     // Read configs 
00374     endpoint = (std::string)((*cfg)["Endpoint"]);
00375     period = Arc::stringtoi((std::string)((*cfg)["SchedulingPeriod"]));
00376     db_path = (std::string)((*cfg)["DataDirectoryPath"]);
00377     if (!Glib::file_test(db_path, Glib::FILE_TEST_IS_DIR)) {
00378         if (mkdir(db_path.c_str(), 0700) != 0) {
00379             logger.msg(Arc::ERROR, "cannot create directory: %s", db_path);
00380             return;
00381         }
00382     }
00383     try {
00384         jobq.init(db_path, "jobq");
00385     } catch (std::exception &e) {
00386         logger_.msg(Arc::ERROR, "Error during database open: %s", e.what());
00387         return;
00388     }
00389     timeout = Arc::stringtoi((std::string)((*cfg)["Timeout"]));
00390     reschedule_period = Arc::stringtoi((std::string)((*cfg)["ReschedulePeriod"]));
00391     lifetime_after_done = Arc::stringtoi((std::string)((*cfg)["LifetimeAfterDone"]));  
00392     reschedule_wait = Arc::stringtoi((std::string)((*cfg)["RescheduleWaitTime"]));  
00393     cli_config["CertificatePath"] = (std::string)((*cfg)["arccli:CertificatePath"]);
00394     cli_config["PrivateKey"] = (std::string)((*cfg)["arccli:PrivateKey"]);  
00395     cli_config["CACertificatePath"] = (std::string)((*cfg)["arccli:CACertificatePath"]);  
00396     IsAcceptingNewActivities = true;
00397 
00398     CreateThreadFunction(&thread_starter,this);
00399   
00400     if (period > 0) { 
00401         // start scheduler thread
00402         Arc::CreateThreadFunction(&sched, this);
00403     }
00404     if (reschedule_period > 0) {
00405         // Rescheduler thread
00406         Arc::CreateThreadFunction(&reschedule, this);
00407     }
00408 
00409 }
00410 
00411 // Destructor
00412 GridSchedulerService::~GridSchedulerService(void) 
00413 {
00414     // NOP
00415 }
00416 
00417 } // namespace GridScheduler
00418 
00419 Arc::PluginDescriptor PLUGINS_TABLE_NAME[] = {
00420     { "grid_sched", "HED:SERVICE", 0, &GridScheduler::get_service },
00421     { NULL, NULL, 0, NULL }
00422 };