Back to index

nordugrid-arc-nox  1.1.0~rc6
ibes.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 
00005 #include "grid_sched.h"
00006 
00007 namespace GridScheduler {
00008 
00009 class MatchSelector: public Arc::JobSelector
00010 {
00011     private:
00012         Arc::XMLNode resource_desc_;
00013         Arc::Logger logger_;
00014     public:
00015         MatchSelector(Arc::XMLNode &rd):logger_(Arc::Logger::rootLogger, "MatchSelector") { resource_desc_ = rd; };
00016         bool match_application_environment(Arc::Job *j);
00017         virtual bool match(Arc::Job *j);
00018 };
00019 
00020 bool
00021 MatchSelector::match_application_environment(Arc::Job *j)
00022 {
00023     // match runtime environment
00024     Arc::XMLNode job_resources = j->getJSDL()["JobDescription"]["Resources"];
00025     Arc::XMLNode job_rt;
00026     Arc::XMLNode app_envs = resource_desc_["AdminDomain"]["Services"]["ComputingService"]["ComputingManager"]["ApplicationEnvironments"];
00027     {
00028         std::string s;
00029         resource_desc_.GetXML(s);
00030         logger_.msg(Arc::VERBOSE, s);
00031         s = "";
00032         job_resources.GetXML(s);
00033         logger_.msg(Arc::VERBOSE, s);
00034         s = "";
00035         app_envs.GetXML(s);
00036         logger_.msg(Arc::VERBOSE, s);
00037     }
00038     int match_req = job_resources.Size();
00039     int matched = 0;
00040     Arc::XMLNode ae;
00041     for (int i = 0; (job_rt = job_resources["RunTimeEnvironment"][i]) != false; i++)
00042     {
00043         std::string name = (std::string)job_rt["Name"];
00044         std::string version = (std::string)job_rt["Version"];
00045         for (int j = 0; (ae = app_envs["ApplicationEnvironment"][j]) != false; j++) {
00046             std::string ae_name = (std::string)ae["Name"];
00047             std::string ae_version = (std::string)ae["Version"];
00048             if (ae_name == name && ae_version == version) {
00049                 matched++;
00050             }
00051         }
00052     }
00053 
00054     logger_.msg(Arc::VERBOSE, "%d <> %d", match_req, matched);
00055     if (match_req == matched) {
00056         return true;
00057     }
00058 
00059     return false;
00060 }
00061 
00062 bool
00063 MatchSelector::match(Arc::Job *j)
00064 {
00065     Arc::SchedJobStatus status = j->getStatus();
00066     if (status != Arc::JOB_STATUS_SCHED_NEW 
00067         && status != Arc::JOB_STATUS_SCHED_RESCHEDULED) {
00068         return false;
00069     }
00070     return match_application_environment(j);
00071 }
00072 
00073 Arc::MCC_Status 
00074 GridSchedulerService::GetActivities(Arc::XMLNode &in, Arc::XMLNode &out, const std::string &resource_id) 
00075 {
00076     {
00077         std::string s;
00078         in.GetXML(s);
00079         logger_.msg(Arc::VERBOSE, s);
00080     }
00081     Arc::XMLNode activities = out.NewChild("ibes:Activities");
00082     // create resource
00083     if (resource_id.empty()) {
00084         logger_.msg(Arc::WARNING, "Cannot get resource ID");
00085         return Arc::MCC_Status(Arc::STATUS_OK);
00086     }
00087     
00088     // XXX error handling
00089     Arc::Job *job = NULL;
00090     Arc::XMLNode domain = in.Child(0);
00091     MatchSelector *selector = new MatchSelector(domain);
00092     for (Arc::JobQueueIterator jobs = jobq.getAll((Arc::JobSelector *)selector); 
00093          jobs.hasMore(); jobs++){
00094         Arc::Job *j = *jobs;
00095         Arc::XMLNode a = activities.NewChild("ibes:Activity");
00096     
00097         // Make job's ID
00098         Arc::WSAEndpointReference identifier(a.NewChild("ibes:ActivityIdentifier"));
00099         identifier.Address(endpoint); // address of this service
00100         identifier.ReferenceParameters().NewChild("sched:JobID") = j->getID();
00101         Arc::XMLNode activity_doc = a.NewChild("ibes:ActivityDocument");
00102         activity_doc.NewChild(j->getJSDL());
00103         j->setStatus(Arc::JOB_STATUS_SCHED_STARTING);
00104         // set job scheduling meta data
00105         Arc::JobSchedMetaData *m = j->getJobSchedMetaData();
00106         m->setResourceID(resource_id);
00107         Arc::Time now;
00108         m->setLastUpdated(now);
00109         m->setLastChecked(now);
00110         // save job state
00111         if (jobs.refresh() == true) {
00112             // XXX only one job 
00113             return Arc::MCC_Status(Arc::STATUS_OK);
00114         } else {
00115             return Arc::MCC_Status();
00116         }
00117     }
00118     logger_.msg(Arc::VERBOSE, "NO job");
00119     return Arc::MCC_Status(Arc::STATUS_OK);
00120 }
00121 
00122 Arc::MCC_Status 
00123 GridSchedulerService::GetActivitiesStatusChanges(Arc::XMLNode &in, Arc::XMLNode &out, const std::string &resource_id) 
00124 {
00125     Arc::XMLNode id_node;
00126     Arc::XMLNode activities = out.NewChild("ibes:Activities");
00127     for (int i = 0; (id_node = in["ibes:ActivityIdentifier"][i]) != false; i++) {
00128         Arc::WSAEndpointReference id(id_node);
00129         std::string job_id = id.ReferenceParameters()["sched:JobID"];
00130         if (job_id.empty()) {
00131             logger_.msg(Arc::VERBOSE, "invalid job id");
00132             continue;
00133         }
00134         Arc::XMLNode activity = activities.NewChild("ibes:Activity");
00135         activity.NewChild(id_node); // copy identifier from inut
00136         Arc::XMLNode new_state = activity.NewChild("ibes:NewState");
00137         try {
00138             Arc::Job *j = jobq[job_id];
00139             Arc::SchedJobStatus status = j->getStatus();
00140             if (status == Arc::JOB_STATUS_SCHED_RESCHEDULED) {
00141                 new_state = Arc::sched_status_to_string(Arc::JOB_STATUS_SCHED_KILLING);
00142             } else {
00143                 new_state = Arc::sched_status_to_string(j->getStatus());
00144             }
00145             delete j;
00146         } catch (Arc::JobNotFoundException &e) {
00147             logger_.msg(Arc::ERROR, "Cannot find job id: %s", job_id);
00148             // said kill job ?
00149         }
00150     }
00151     return Arc::MCC_Status(Arc::STATUS_OK);
00152 }
00153 
00154 Arc::MCC_Status 
00155 GridSchedulerService::ReportActivitiesStatus(Arc::XMLNode &in, Arc::XMLNode &/*out*/, const std::string &resource_id) 
00156 {
00157     Arc::XMLNode activity;
00158     for (int i = 0; (activity = in["Activity"][i]) != false; i++) {
00159         Arc::XMLNode id = activity["ActivityIdentifier"];
00160         Arc::WSAEndpointReference epr(id);
00161         std::string job_id = epr.ReferenceParameters()["sched:JobID"];
00162         if (job_id.empty()) {
00163             logger_.msg(Arc::ERROR, "Cannot find job id");
00164             continue;
00165         }
00166         Arc::XMLNode status = activity["ActivityStatus"];
00167         Arc::XMLNode state = status.Attribute("state");
00168         if (!state) {
00169             logger_.msg(Arc::ERROR, "Invalid status report");
00170             continue;
00171         }
00172         try {
00173             Arc::Job *j = jobq[job_id];
00174             Arc::JobSchedMetaData *m = j->getJobSchedMetaData();
00175             if (m->getResourceID() != resource_id) {
00176                 logger_.msg(Arc::WARNING, "%s reports job status of %s but it is running on %s", resource_id, j->getID(), m->getResourceID());
00177                 delete j;
00178                 continue;
00179             }
00180             Arc::SchedJobStatus old_status = j->getStatus();
00181             Arc::SchedJobStatus new_status = Arc::sched_status_from_string((std::string)state);
00182             logger_.msg(Arc::VERBOSE, "%s try to status change: %s->%s", 
00183                         j->getID(),
00184                         Arc::sched_status_to_string(j->getStatus()), 
00185                         (std::string)state);
00186             if (j->getStatus() != Arc::JOB_STATUS_SCHED_KILLING
00187                 || j->getStatus() != Arc::JOB_STATUS_SCHED_RESCHEDULED
00188                 || new_status == Arc::JOB_STATUS_SCHED_KILLED) {
00189                 // do not update job which was requested to kill or rescheduled
00190                 j->setStatus(new_status);
00191             }
00192             // update times
00193             Arc::Time now;
00194             m->setLastUpdated(now);
00195             if (new_status == Arc::JOB_STATUS_SCHED_RUNNING 
00196                 && (old_status == Arc::JOB_STATUS_SCHED_NEW 
00197                     || old_status == Arc::JOB_STATUS_SCHED_STARTING)) {
00198                 m->setStartTime(now);
00199             }
00200             if (new_status == Arc::JOB_STATUS_SCHED_FINISHED 
00201                 || new_status == Arc::JOB_STATUS_SCHED_KILLED
00202                 || new_status == Arc::JOB_STATUS_SCHED_FAILED) {
00203                 m->setEndTime(now);
00204             }
00205             jobq.refresh(*j);
00206             delete j;
00207         } catch (Arc::JobNotFoundException &e) {
00208             logger_.msg(Arc::ERROR, "Cannot find job id: %s", job_id);
00209         }
00210     }
00211     return Arc::MCC_Status(Arc::STATUS_OK);
00212 }
00213 
00214 } // namespace