Back to index

nordugrid-arc-nox  1.1.0~rc6
FastestQueueBroker.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #include <cstdlib>
00008 #include <algorithm>
00009 
00010 #include <arc/client/ExecutionTarget.h>
00011 
00012 #include "FastestQueueBroker.h"
00013 
00014 namespace Arc {
00015 
00016   bool CompareExecutionTarget(const ExecutionTarget *T1,
00017                               const ExecutionTarget *T2) {
00018     //Scale queue to become cluster size independent
00019     float T1queue = (float)T1->WaitingJobs / T1->TotalSlots;
00020     float T2queue = (float)T2->WaitingJobs / T2->TotalSlots;
00021     return T1queue < T2queue;
00022   }
00023 
00024   FastestQueueBroker::FastestQueueBroker(const UserConfig& usercfg)
00025     : Broker(usercfg) {}
00026 
00027   FastestQueueBroker::~FastestQueueBroker() {}
00028 
00029   Plugin* FastestQueueBroker::Instance(PluginArgument *arg) {
00030     BrokerPluginArgument *brokerarg = dynamic_cast<BrokerPluginArgument*>(arg);
00031     if (!brokerarg)
00032       return NULL;
00033     return new FastestQueueBroker(*brokerarg);
00034   }
00035 
00036   void FastestQueueBroker::SortTargets() {
00037 
00038     logger.msg(VERBOSE, "FastestQueueBroker is filtering %d targets",
00039                PossibleTargets.size());
00040 
00041     //Remove clusters with incomplete information for target sorting
00042     std::list<ExecutionTarget*>::iterator iter = PossibleTargets.begin();
00043     while (iter != PossibleTargets.end()) {
00044       if ((*iter)->WaitingJobs == -1 || (*iter)->TotalSlots == -1 || (*iter)->FreeSlots == -1) {
00045         if ((*iter)->WaitingJobs == -1)
00046           logger.msg(VERBOSE, "Target %s removed by FastestQueueBroker, doesn't report number of waiting jobs", (*iter)->DomainName);
00047         else if ((*iter)->TotalSlots == -1)
00048           logger.msg(VERBOSE, "Target %s removed by FastestQueueBroker, doesn't report number of total slots", (*iter)->DomainName);
00049         else if ((*iter)->FreeSlots == -1)
00050           logger.msg(VERBOSE, "Target %s removed by FastestQueueBroker, doesn't report number of free slots", (*iter)->DomainName);
00051         iter = PossibleTargets.erase(iter);
00052         continue;
00053       }
00054       iter++;
00055     }
00056 
00057     logger.msg(VERBOSE, "FastestQueueBroker will rank the following %d targets", PossibleTargets.size());
00058     iter = PossibleTargets.begin();
00059     for (int i = 1; iter != PossibleTargets.end(); iter++, i++)
00060       logger.msg(VERBOSE, "%d. Cluster: %s; Queue: %s", i, (*iter)->DomainName, (*iter)->ComputingShareName);
00061 
00062     //Sort the targets according to the number of waiting jobs (in % of the cluster size)
00063     PossibleTargets.sort(CompareExecutionTarget);
00064 
00065     //Check is several clusters(queues) have 0 waiting jobs
00066     int ZeroQueueCluster = 0;
00067     int TotalFreeCPUs = 0;
00068     for (iter = PossibleTargets.begin(); iter != PossibleTargets.end(); iter++)
00069       if ((*iter)->WaitingJobs == 0) {
00070         ZeroQueueCluster++;
00071         TotalFreeCPUs += (*iter)->FreeSlots / abs(job->Resources.SlotRequirement.NumberOfSlots);
00072       }
00073 
00074     //If several clusters(queues) have free slots (CPUs) do basic load balancing
00075     if (ZeroQueueCluster > 1)
00076       for (std::list<ExecutionTarget*>::iterator itN = PossibleTargets.begin();
00077            itN != PossibleTargets.end() && (*itN)->WaitingJobs == 0;
00078            itN++) {
00079         double RandomCPU = rand() * TotalFreeCPUs;
00080         for (std::list<ExecutionTarget*>::iterator itJ = itN;
00081              itJ != PossibleTargets.end() && (*itJ)->WaitingJobs == 0;
00082              itJ++) {
00083           if (((*itJ)->FreeSlots / abs(job->Resources.SlotRequirement.NumberOfSlots)) > RandomCPU) {
00084             TotalFreeCPUs -= ((*itJ)->FreeSlots / abs(job->Resources.SlotRequirement.NumberOfSlots));
00085             std::iter_swap(itJ, itN);
00086             break;
00087           }
00088           else
00089             RandomCPU -= ((*itJ)->FreeSlots / abs(job->Resources.SlotRequirement.NumberOfSlots));
00090         }
00091       }
00092 
00093     logger.msg(VERBOSE, "Best targets are: %d", PossibleTargets.size());
00094 
00095     iter = PossibleTargets.begin();
00096 
00097     for (int i = 1; iter != PossibleTargets.end(); iter++, i++)
00098       logger.msg(VERBOSE, "%d. Cluster: %s; Queue: %s", i, (*iter)->DomainName, (*iter)->ComputingShareName);
00099 
00100     TargetSortingDone = true;
00101 
00102   }
00103 
00104 } // namespace Arc