Back to index

nordugrid-arc-nox  1.1.0~rc6
Public Member Functions | Static Public Member Functions | Protected Member Functions | Protected Attributes | Private Attributes | Static Private Attributes
Arc::PythonBroker Class Reference

#include <PythonBroker.h>

Inheritance diagram for Arc::PythonBroker:
Inheritance graph
[legend]
Collaboration diagram for Arc::PythonBroker:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 PythonBroker (const UserConfig &usercfg)
virtual ~PythonBroker ()
const ExecutionTargetGetBestTarget ()
 Returns next target from the list of ExecutionTarget objects.
void PreFilterTargets (std::list< ExecutionTarget > &targets, const JobDescription &job)
 ExecutionTarget filtering, view-point: enought memory, diskspace, CPUs, etc.
void RegisterJobsubmission ()
 Register a job submission to the current target.

Static Public Member Functions

static PluginInstance (PluginArgument *arg)

Protected Member Functions

virtual void SortTargets ()
 Custom Brokers should implement this method.

Protected Attributes

const UserConfigusercfg
std::list< ExecutionTarget * > PossibleTargets
 This content the Prefilteres ExecutionTargets.
bool TargetSortingDone
 It is true if "custom" sorting is done.
const JobDescriptionjob

Private Attributes

PyObject * arc_module
PyObject * arc_userconfig_klass
PyObject * arc_jobrepr_klass
PyObject * arc_xtarget_klass
PyObject * module
PyObject * klass
PyObject * object

Static Private Attributes

static Logger logger
static PyThreadState * tstate = NULL
static int refcount = 0
static Glib::Mutex lock

Detailed Description

Definition at line 12 of file PythonBroker.h.


Constructor & Destructor Documentation

Definition at line 98 of file PythonBroker.cpp.

    : Broker(usercfg),
      arc_module(NULL),
      arc_userconfig_klass(NULL),
      arc_jobrepr_klass(NULL),
      arc_xtarget_klass(NULL),
      module(NULL),
      klass(NULL),
      object(NULL) {

    if (!tstate) {
      logger.msg(ERROR, "Main python thread is not initialized");
      return;
    }

    logger.msg(VERBOSE, "PythonBroker init");

    std::string args = usercfg.Broker().second;
    std::string::size_type pos = args.find(':');
    if (pos != std::string::npos)
      args.resize(pos);
    pos = args.rfind('.');
    if (pos == std::string::npos) {
      logger.msg(ERROR, "Invalid class name");
      return;
    }
    std::string module_name = args.substr(0, pos);
    std::string class_name = args.substr(pos + 1);
    logger.msg(VERBOSE, "class name: %s", class_name);
    logger.msg(VERBOSE, "module name: %s", module_name);

    // Import arc python module
    PyObjectP py_arc_module_name = PyString_FromString("arc");
    if (!py_arc_module_name) {
      logger.msg(ERROR, "Cannot convert arc module name to Python string");
      if (PyErr_Occurred())
        PyErr_Print();
      return;
    }

    arc_module = PyImport_Import(py_arc_module_name);
    if (!arc_module) {
      logger.msg(ERROR, "Cannot import arc module");
      if (PyErr_Occurred())
        PyErr_Print();
      return;
    }

    // Get dictionary of arc module content (borrowed reference)
    PyObject *arc_dict = PyModule_GetDict(arc_module);
    if (!arc_dict) {
      logger.msg(ERROR, "Cannot get dictionary of arc module");
      if (PyErr_Occurred())
        PyErr_Print();
      return;
    }

    // Get the Config class (borrowed reference)
    arc_userconfig_klass = PyDict_GetItemString(arc_dict, "UserConfig");
    if (!arc_userconfig_klass) {
      logger.msg(ERROR, "Cannot find arc UserConfig class");
      if (PyErr_Occurred())
        PyErr_Print();
      return;
    }

    // check is it really a class
    if (!PyCallable_Check(arc_userconfig_klass)) {
      logger.msg(ERROR, "UserConfig class is not an object");
      return;
    }

    // Get the JobDescription class (borrowed reference)
    arc_jobrepr_klass = PyDict_GetItemString(arc_dict, "JobDescription");
    if (!arc_jobrepr_klass) {
      logger.msg(ERROR, "Cannot find arc JobDescription class");
      if (PyErr_Occurred())
        PyErr_Print();
      return;
    }

    // check is it really a class
    if (!PyCallable_Check(arc_jobrepr_klass)) {
      logger.msg(ERROR, "JobDescription class is not an object");
      return;
    }

    // Get the ExecutionTarget class (borrowed reference)
    arc_xtarget_klass = PyDict_GetItemString(arc_dict, "ExecutionTarget");
    if (!arc_xtarget_klass) {
      logger.msg(ERROR, "Cannot find arc ExecutionTarget class");
      if (PyErr_Occurred())
        PyErr_Print();
      return;
    }

    // check is it really a class
    if (!PyCallable_Check(arc_xtarget_klass)) {
      logger.msg(ERROR, "ExecutionTarget class is not an object");
      return;
    }

    // Import custom broker module
    PyObjectP py_module_name = PyString_FromString(module_name.c_str());
    if (!py_module_name) {
      logger.msg(ERROR, "Cannot convert module name to Python string");
      if (PyErr_Occurred())
        PyErr_Print();
      return;
    }

    module = PyImport_Import(py_module_name);
    if (!module) {
      logger.msg(ERROR, "Cannot import module");
      if (PyErr_Occurred())
        PyErr_Print();
      return;
    }

    // Get dictionary of module content (borrowed reference)
    PyObject *dict = PyModule_GetDict(module);
    if (!dict) {
      logger.msg(ERROR, "Cannot get dictionary of custom broker module");
      if (PyErr_Occurred())
        PyErr_Print();
      return;
    }

    // Get the class (borrowed reference)
    klass = PyDict_GetItemString(dict, (char*)class_name.c_str());
    if (!klass) {
      logger.msg(ERROR, "Cannot find custom broker class");
      if (PyErr_Occurred())
        PyErr_Print();
      return;
    }

    // check is it really a class
    if (!PyCallable_Check(klass)) {
      logger.msg(ERROR, "%s class is not an object", class_name);
      return;
    }

    PyObjectP usercfgarg = Py_BuildValue("(l)", (long int)usercfg);
    if (!usercfgarg) {
      logger.msg(ERROR, "Cannot create UserConfig argument");
      if (PyErr_Occurred())
        PyErr_Print();
      return;
    }

    PyObject *py_usercfg = PyObject_CallObject(arc_userconfig_klass, usercfgarg);
    if (!py_usercfg) {
      logger.msg(ERROR, "Cannot convert UserConfig to python object");
      if (PyErr_Occurred())
        PyErr_Print();
      return;
    }

    PyObjectP arg = Py_BuildValue("(O)", py_usercfg);
    if (!arg) {
      logger.msg(ERROR, "Cannot create argument of the constructor");
      if (PyErr_Occurred())
        PyErr_Print();
      return;
    }

    // create instance of class
    object = PyObject_CallObject(klass, arg);
    if (!object) {
      logger.msg(ERROR, "Cannot create instance of python class");
      if (PyErr_Occurred())
        PyErr_Print();
      return;
    }

    logger.msg(VERBOSE, "Python broker constructor called (%d)", refcount);
  }

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 277 of file PythonBroker.cpp.

                              {

    if (module)
      Py_DECREF(module);
    if (arc_module)
      Py_DECREF(arc_module);

    lock.lock();

    refcount--;

    // Finish the Python Interpreter
    if (refcount == 0) {
      PyEval_AcquireThread(tstate);
      Py_Finalize();
    }

    lock.unlock();

    logger.msg(VERBOSE, "Python broker destructor called (%d)", refcount);
  }

Here is the call graph for this function:


Member Function Documentation

Returns next target from the list of ExecutionTarget objects.

When first called this method will sort its list of ExecutionTarget objects, which have been filled by the PreFilterTargets method, and then the first target in the list will be returned.

If this is not the first call then the next target in the list is simply returned.

If there are no targets in the list or the end of the target list have been reached the NULL pointer is returned.

Returns:
The pointer to the next ExecutionTarget in the list is returned.

Definition at line 402 of file Broker.cpp.

                                               {
    if (PossibleTargets.size() <= 0 || current == PossibleTargets.end())
      return NULL;

    if (!TargetSortingDone) {
      logger.msg(DEBUG, "Target sorting not done, sorting them now");
      SortTargets();
      current = PossibleTargets.begin();
    }
    else
      current++;

    return (current != PossibleTargets.end() ? *current : NULL);
  }

Here is the call graph for this function:

Here is the caller graph for this function:

Definition at line 57 of file PythonBroker.cpp.

                                                    {

    BrokerPluginArgument *brokerarg = dynamic_cast<BrokerPluginArgument*>(arg);
    if (!brokerarg)
      return NULL;

    lock.lock();

    // Initialize the Python Interpreter
    if (!Py_IsInitialized()) {
#ifdef HAVE_PYTHON_INITIALIZE_EX
      Py_InitializeEx(0);            // Python does not handle signals
#endif
      PyEval_InitThreads();          // Main thread created and lock acquired
      tstate = PyThreadState_Get();  // Get current thread
      if (!tstate) {
        logger.msg(ERROR, "Failed to initialize main Python thread");
        return NULL;
      }
    }
    else {
      if (!tstate) {
        logger.msg(ERROR, "Main Python thread was not initialized");
        return NULL;
      }
      PyEval_AcquireThread(tstate);
    }

    refcount++;

    lock.unlock();

    logger.msg(DEBUG, "Loading python broker (%i)", refcount);

    Broker *broker = new PythonBroker(*brokerarg);

    PyEval_ReleaseThread(tstate); // Release current thread

    return broker;
  }

Here is the call graph for this function:

void Arc::Broker::PreFilterTargets ( std::list< ExecutionTarget > &  targets,
const JobDescription job 
) [inherited]

ExecutionTarget filtering, view-point: enought memory, diskspace, CPUs, etc.

The "bad" targets will be ignored and only the good targets will be added to to the list of ExecutionTarget objects which be used for brokering.

Parameters:
targetsA list of ExecutionTarget objects to be considered for addition to the Broker.
jdJobDescription object of the actual job.

Definition at line 26 of file Broker.cpp.

                                                               {
    job = &jobdesc;

    for (std::list<ExecutionTarget>::iterator target = targets.begin();
         target != targets.end(); target++) {
      logger.msg(VERBOSE, "Performing matchmaking against target (%s).", target->url.str());

      if (!job->Resources.CandidateTarget.empty()) {
        if (target->url.Host().empty())
          logger.msg(VERBOSE, "URL of ExecutionTarget is not properly defined");
        if (target->ComputingShareName.empty())
          logger.msg(VERBOSE, "ComputingShareName of ExecutionTarget (%s) is not defined", target->url.str());

        bool dropTarget = true;

        if (!target->url.Host().empty() || !target->ComputingShareName.empty()) {
          for (std::list<ResourceTargetType>::const_iterator it = job->Resources.CandidateTarget.begin();
               it != job->Resources.CandidateTarget.end(); it++) {

            if (!it->EndPointURL.Host().empty() &&
                target->url.Host().empty()) { // Drop target since URL is not defined.
              logger.msg(VERBOSE, "URL of ExecutionTarget is not properly defined: %s.", target->url.str());
              break;
            }

            if (!it->QueueName.empty() &&
                target->ComputingShareName.empty()) { // Drop target since ComputingShareName is not published.
              logger.msg(VERBOSE, "ComputingShareName of ExecutionTarget is not published, and a queue (%s) have been requested.", it->QueueName);
              break;
            }

            if (!it->EndPointURL.Host().empty() &&
                target->url.Host() == it->EndPointURL.Host()) { // Example: knowarc1.grid.niif.hu
              dropTarget = false;
              break;
            }

            if (!it->QueueName.empty() &&
                target->ComputingShareName == it->QueueName) {
              dropTarget = false;
              break;
            }
          }

          if (dropTarget) {
            logger.msg(VERBOSE, "ExecutionTarget does not satisfy any of the CandidateTargets.");
            continue;
          }
        }
        else {
          logger.msg(VERBOSE, "Neither URL or ComputingShareName is reported by the cluster");
          continue;
        }
      }

      if ((int)job->Application.ProcessingStartTime.GetTime() != -1) {
        if ((int)target->DowntimeStarts.GetTime() != -1 && (int)target->DowntimeEnds.GetTime() != -1) {
          if (target->DowntimeStarts <= job->Application.ProcessingStartTime && job->Application.ProcessingStartTime <= target->DowntimeEnds) {
            logger.msg(VERBOSE, "ProcessingStartTime (%s) specified in job description is inside the targets downtime period [ %s - %s ].", (std::string)job->Application.ProcessingStartTime, (std::string)target->DowntimeStarts, (std::string)target->DowntimeEnds);
            continue;
          }
        }
        else
          logger.msg(WARNING, "The downtime of the target (%s) is not published. Keeping target.", target->url.str());
      }

      if (!target->HealthState.empty()) {

        if (target->HealthState != "ok") { // Enumeration for healthstate: ok, critical, other, unknown, warning
          logger.msg(VERBOSE, "HealthState of ExecutionTarget (%s) is not OK (%s)", target->url.str(), target->HealthState);
          continue;
        }
      }
      else {
        logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, HealthState is not defined", target->url.str());
        continue;
      }

      if (!job->Resources.CEType.empty()) {
        if (!target->Implementation().empty()) {
          if (!job->Resources.CEType.isSatisfied(target->Implementation)) {
            logger.msg(VERBOSE, "Matchmaking, Computing endpoint requirement not satisfied. ExecutionTarget: %s", (std::string)target->Implementation);
            continue;
          }
        }
        else {
          logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, ImplementationName is not defined", target->url.str());
          continue;
        }
      }

      {
        typedef std::pair<std::string, int> EtTimePair;
        EtTimePair etTime[] = {EtTimePair("MaxWallTime", (int)target->MaxWallTime.GetPeriod()),
                               EtTimePair("MinWallTime", (int)target->MinWallTime.GetPeriod()),
                               EtTimePair("MaxCPUTime", (int)target->MaxCPUTime.GetPeriod()),
                               EtTimePair("MinCPUTime", (int)target->MinCPUTime.GetPeriod())};

        typedef std::pair<std::string, const ScalableTime<int>*> JobTimePair;
        JobTimePair jobTime[] = {JobTimePair("TotalWallTime", &job->Resources.TotalWallTime),
                                 JobTimePair("TotalCPUTime", &job->Resources.TotalCPUTime)};

        // Check if ARC-clockrate is defined, if not add it. Included to support the XRSL attribute gridtime.
        if (job->Resources.TotalCPUTime.benchmark.first == "ARC-clockrate") {
          target->Benchmarks["ARC-clockrate"] = (target->CPUClockSpeed > 0 ? (double)target->CPUClockSpeed : 1000.);
        }

        int i = 0;
        for (; i < 4; i++) {
          JobTimePair *jTime = &jobTime[i/2];
          if (i%2 == 0 && jTime->second->range.max != -1 ||
              i%2 == 1 && jTime->second->range.min != -1) {
            if (etTime[i].second != -1) {
              if (jTime->second->benchmark.first.empty()) { // No benchmark defined, do not scale.
                if (i%2 == 0 && jTime->second->range.max > etTime[i].second ||
                    i%2 == 1 && jTime->second->range.min < etTime[i].second) {
                  logger.msg(VERBOSE,
                             "Matchmaking, %s (%d) is %s than %s (%d) published by the ExecutionTarget.",
                             jTime->first,
                             (i%2 == 0 ? jTime->second->range.max
                                       : jTime->second->range.min),
                             (i%2 == 0 ? "greater" : "less"),
                             etTime[i].first,
                             etTime[i].second);
                  break;
                }
              }
              else { // Benchmark defined => scale using benchmark.
                if (target->Benchmarks.find(jTime->second->benchmark.first) != target->Benchmarks.end()) {
                  if (i%2 == 0 && jTime->second->scaleMax(target->Benchmarks.find(jTime->second->benchmark.first)->second) > etTime[i].second ||
                      i%2 == 1 && jTime->second->scaleMin(target->Benchmarks.find(jTime->second->benchmark.first)->second) < etTime[i].second) {
                    logger.msg(VERBOSE,
                               "Matchmaking, The %s scaled %s (%d) is %s than the %s (%d) published by the ExecutionTarget.",
                               jTime->second->benchmark.first,
                               jTime->first,
                               (i%2 == 0 ? jTime->second->scaleMax(target->Benchmarks.find(jTime->second->benchmark.first)->second)
                                         : jTime->second->scaleMin(target->Benchmarks.find(jTime->second->benchmark.first)->second)),
                               (i%2 == 0 ? "greater" : "less"),
                               etTime[i].first,
                               etTime[i].second);
                    break;
                  }
                }
                else {
                  logger.msg(VERBOSE, "Matchmaking, Benchmark %s is not published by the ExecutionTarget.", jTime->second->benchmark.first);
                  break;
                }
              }
            }
            // Do not drop target if it does not publish attribute.
          }
        }

        if (i != 4) // Above loop exited too early, which means target should be dropped.
          continue;
      }

      if (job->Resources.IndividualPhysicalMemory != -1) {
        if (target->MainMemorySize != -1) {     // Example: 678
          if (target->MainMemorySize < job->Resources.IndividualPhysicalMemory) {
            logger.msg(VERBOSE, "Matchmaking, MainMemorySize problem, ExecutionTarget: %d (MainMemorySize), JobDescription: %d (IndividualPhysicalMemory)", target->MainMemorySize, job->Resources.IndividualPhysicalMemory.max);
            continue;
          }
        }
        else if (target->MaxMainMemory != -1) {     // Example: 678
          if (target->MaxMainMemory < job->Resources.IndividualPhysicalMemory) {
            logger.msg(VERBOSE, "Matchmaking, MaxMainMemory problem, ExecutionTarget: %d (MaxMainMemory), JobDescription: %d (IndividualPhysicalMemory)", target->MaxMainMemory, job->Resources.IndividualPhysicalMemory.max);
            continue;
          }

        }
        else {
          logger.msg(VERBOSE, "Matchmaking, ExecutionTarget: %s, MaxMainMemory and MainMemorySize are not defined", target->url.str());
          continue;
        }
      }

      if (job->Resources.IndividualVirtualMemory != -1) {
        if (target->MaxVirtualMemory != -1) {     // Example: 678
          if (target->MaxVirtualMemory < job->Resources.IndividualVirtualMemory) {
            logger.msg(VERBOSE, "Matchmaking, MaxVirtualMemory problem, ExecutionTarget: %d (MaxVirtualMemory), JobDescription: %d (IndividualVirtualMemory)", target->MaxVirtualMemory, job->Resources.IndividualVirtualMemory.max);
            continue;
          }
        }
        else {
          logger.msg(VERBOSE, "Matchmaking, ExecutionTarget: %s, MaxVirtualMemory is not defined", target->url.str());
          continue;
        }
      }

      if (!job->Resources.Platform.empty()) {
        if (!target->Platform.empty()) {    // Example: i386
          if (target->Platform != job->Resources.Platform) {
            logger.msg(VERBOSE, "Matchmaking, Platform problem, ExecutionTarget: %s (Platform) JobDescription: %s (Platform)", target->Platform, job->Resources.Platform);
            continue;
          }
        }
        else {
          logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, Platform is not defined", target->url.str());
          continue;
        }
      }

      if (!job->Resources.OperatingSystem.empty()) {
        if (!target->OperatingSystem.empty()) {
          if (!job->Resources.OperatingSystem.isSatisfied(target->OperatingSystem)) {
            logger.msg(VERBOSE, "Matchmaking, ExecutionTarget: %s, OperatingSystem requirements not satisfied", target->url.str());
            continue;
          }
        }
        else {
          logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s,  OperatingSystem is not defined", target->url.str());
          continue;
        }
      }

      if (!job->Resources.RunTimeEnvironment.empty()) {
        if (!target->ApplicationEnvironments.empty()) {
          if (!job->Resources.RunTimeEnvironment.isSatisfied(target->ApplicationEnvironments)) {
            logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, RunTimeEnvironment requirements not satisfied", target->url.str());
            continue;
          }
        }
        else {
          logger.msg(VERBOSE, "Matchmaking, ExecutionTarget: %s, ApplicationEnvironments not defined", target->url.str());
          continue;
        }
      }

      if (!job->Resources.NetworkInfo.empty())
        if (!target->NetworkInfo.empty()) {    // Example: infiniband
          if (std::find(target->NetworkInfo.begin(), target->NetworkInfo.end(),
                        job->Resources.NetworkInfo) == target->NetworkInfo.end()) {
            logger.msg(VERBOSE, "Matchmaking, NetworkInfo demand not fulfilled, ExecutionTarget do not support %s, specified in the JobDescription.", job->Resources.NetworkInfo);
            continue;
          }
          else {
            logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, NetworkInfo is not defined", target->url.str());
            continue;
          }
        }

      if (job->Resources.DiskSpaceRequirement.SessionDiskSpace != -1) {
        if (target->MaxDiskSpace != -1) {     // Example: 5656
          if (target->MaxDiskSpace < job->Resources.DiskSpaceRequirement.SessionDiskSpace) {
            logger.msg(VERBOSE, "Matchmaking, MaxDiskSpace problem, ExecutionTarget: %d (MaxDiskSpace) JobDescription: %d (SessionDiskSpace)", target->MaxDiskSpace, job->Resources.DiskSpaceRequirement.SessionDiskSpace);
            continue;
          }
        }
        else if (target->WorkingAreaTotal != -1) {     // Example: 5656
          if (target->WorkingAreaTotal < job->Resources.DiskSpaceRequirement.SessionDiskSpace) {
            logger.msg(VERBOSE, "Matchmaking, WorkingAreaTotal problem, ExecutionTarget: %d (WorkingAreaTotal) JobDescription: %d (SessionDiskSpace)", target->WorkingAreaTotal, job->Resources.DiskSpaceRequirement.SessionDiskSpace);
            continue;
          }
        }
        else {
          logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, MaxDiskSpace and WorkingAreaTotal are not defined", target->url.str());
          continue;
        }
      }

      if (job->Resources.DiskSpaceRequirement.DiskSpace != -1 && job->Resources.DiskSpaceRequirement.CacheDiskSpace != -1) {
        if (target->MaxDiskSpace != -1) {     // Example: 5656
          if (target->MaxDiskSpace < job->Resources.DiskSpaceRequirement.DiskSpace - job->Resources.DiskSpaceRequirement.CacheDiskSpace) {
            logger.msg(VERBOSE, "Matchmaking, MaxDiskSpace >= DiskSpace - CacheDiskSpace problem, ExecutionTarget: %d (MaxDiskSpace) JobDescription: %d (DiskSpace) - %d (CacheDiskSpace)", target->MaxDiskSpace, job->Resources.DiskSpaceRequirement.DiskSpace.max, job->Resources.DiskSpaceRequirement.CacheDiskSpace);
            continue;
          }
        }
        else if (target->WorkingAreaTotal != -1) {     // Example: 5656
          if (target->WorkingAreaTotal < job->Resources.DiskSpaceRequirement.DiskSpace - job->Resources.DiskSpaceRequirement.CacheDiskSpace) {
            logger.msg(VERBOSE, "Matchmaking, WorkingAreaTotal >= DiskSpace - CacheDiskSpace problem, ExecutionTarget: %d (MaxDiskSpace) JobDescription: %d (DiskSpace) - %d (CacheDiskSpace)", target->WorkingAreaTotal, job->Resources.DiskSpaceRequirement.DiskSpace.max, job->Resources.DiskSpaceRequirement.CacheDiskSpace);
            continue;
          }
        }
        else {
          logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, MaxDiskSpace and WorkingAreaTotal are not defined", target->url.str());
          continue;
        }
      }

      if (job->Resources.DiskSpaceRequirement.DiskSpace != -1) {
        if (target->MaxDiskSpace != -1) {     // Example: 5656
          if (target->MaxDiskSpace < job->Resources.DiskSpaceRequirement.DiskSpace) {
            logger.msg(VERBOSE, "Matchmaking, MaxDiskSpace problem, ExecutionTarget: %d (MaxDiskSpace) JobDescription: %d (DiskSpace)", target->MaxDiskSpace, job->Resources.DiskSpaceRequirement.DiskSpace.max);
            continue;
          }
        }
        else if (target->WorkingAreaTotal != -1) {     // Example: 5656
          if (target->WorkingAreaTotal < job->Resources.DiskSpaceRequirement.DiskSpace) {
            logger.msg(VERBOSE, "Matchmaking, WorkingAreaTotal problem, ExecutionTarget: %d (WorkingAreaTotal) JobDescription: %d (DiskSpace)", target->WorkingAreaTotal, job->Resources.DiskSpaceRequirement.DiskSpace.max);
            continue;
          }
        }
        else {
          logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, MaxDiskSpace and WorkingAreaTotal are not defined", target->url.str());
          continue;
        }
      }

      if (job->Resources.DiskSpaceRequirement.CacheDiskSpace != -1) {
        if (target->CacheTotal != -1) {     // Example: 5656
          if (target->CacheTotal < job->Resources.DiskSpaceRequirement.CacheDiskSpace) {
            logger.msg(VERBOSE, "Matchmaking, CacheTotal problem, ExecutionTarget: %d (CacheTotal) JobDescription: %d (CacheDiskSpace)", target->CacheTotal, job->Resources.DiskSpaceRequirement.CacheDiskSpace);
            continue;
          }
        }
        else {
          logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, CacheTotal is not defined", target->url.str());
          continue;
        }
      }

      if (job->Resources.SlotRequirement.NumberOfSlots != -1) {
        if (target->TotalSlots != -1) {     // Example: 5656
          if (target->TotalSlots < job->Resources.SlotRequirement.NumberOfSlots) {
            logger.msg(VERBOSE, "Matchmaking, TotalSlots problem, ExecutionTarget: %d (TotalSlots) JobDescription: %d (NumberOfProcesses)", target->TotalSlots, job->Resources.SlotRequirement.NumberOfSlots.max);
            continue;
          }
        }
        if (target->MaxSlotsPerJob != -1) {     // Example: 5656
          if (target->MaxSlotsPerJob < job->Resources.SlotRequirement.NumberOfSlots) {
            logger.msg(VERBOSE, "Matchmaking, MaxSlotsPerJob problem, ExecutionTarget: %d (MaxSlotsPerJob) JobDescription: %d (NumberOfProcesses)", target->TotalSlots, job->Resources.SlotRequirement.NumberOfSlots.max);
            continue;
          }
        }

        if (target->TotalSlots == -1 && target->MaxSlotsPerJob == -1) {
          logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, TotalSlots and MaxSlotsPerJob are not defined", target->url.str());
          continue;
        }
      }

      if ((int)job->Resources.SessionLifeTime.GetPeriod() != -1) {
        if ((int)target->WorkingAreaLifeTime.GetPeriod() != -1) {     // Example: 123
          if (target->WorkingAreaLifeTime < job->Resources.SessionLifeTime) {
            logger.msg(VERBOSE, "Matchmaking, WorkingAreaLifeTime problem, ExecutionTarget: %s (WorkingAreaLifeTime) JobDescription: %s (SessionLifeTime)", (std::string)target->WorkingAreaLifeTime, (std::string)job->Resources.SessionLifeTime);
            continue;
          }
        }
        else {
          logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, WorkingAreaLifeTime is not defined", target->url.str());
          continue;
        }
      }

      if ((job->Resources.NodeAccess == NAT_INBOUND ||
           job->Resources.NodeAccess == NAT_INOUTBOUND) &&
          !target->ConnectivityIn) {     // Example: false (boolean)
        logger.msg(VERBOSE, "Matchmaking, ConnectivityIn problem, ExecutionTarget: %s (ConnectivityIn) JobDescription: %s (InBound)", (job->Resources.NodeAccess == NAT_INBOUND ? "INBOUND" : "INOUTBOUND"), (target->ConnectivityIn ? "true" : "false"));
        continue;
      }

      if ((job->Resources.NodeAccess == NAT_OUTBOUND ||
           job->Resources.NodeAccess == NAT_INOUTBOUND) &&
          !target->ConnectivityOut) {     // Example: false (boolean)
        logger.msg(VERBOSE, "Matchmaking, ConnectivityOut problem, ExecutionTarget: %s (ConnectivityOut) JobDescription: %s (OutBound)", (job->Resources.NodeAccess == NAT_OUTBOUND ? "OUTBOUND" : "INOUTBOUND"), (target->ConnectivityIn ? "true" : "false"));
        continue;
      }

      PossibleTargets.push_back(&*target);

    } //end loop over all found targets

    logger.msg(VERBOSE, "Possible targets after prefiltering: %d", PossibleTargets.size());

    std::list<ExecutionTarget*>::iterator iter = PossibleTargets.begin();

    for (int i = 1; iter != PossibleTargets.end(); iter++, i++) {
      logger.msg(VERBOSE, "%d. Cluster: %s; Queue: %s", i, (*iter)->DomainName, (*iter)->ComputingShareName);
      logger.msg(VERBOSE, "Health State: %s", (*iter)->HealthState);
    }

    TargetSortingDone = false;
  }

Here is the call graph for this function:

Here is the caller graph for this function:

void Arc::Broker::RegisterJobsubmission ( ) [inherited]

Register a job submission to the current target.

Definition at line 417 of file Broker.cpp.

                                     {
    if (!job || current == PossibleTargets.end())
      return;
    if ((*current)->FreeSlots >= job->Resources.SlotRequirement.NumberOfSlots) {   //The job will start directly
      (*current)->FreeSlots -= job->Resources.SlotRequirement.NumberOfSlots;
      if ((*current)->UsedSlots != -1)
        (*current)->UsedSlots += job->Resources.SlotRequirement.NumberOfSlots;
    }
    else                                           //The job will be queued
      if ((*current)->WaitingJobs != -1)
        (*current)->WaitingJobs += job->Resources.SlotRequirement.NumberOfSlots;
  }

Here is the caller graph for this function:

void Arc::PythonBroker::SortTargets ( ) [protected, virtual]

Custom Brokers should implement this method.

The task is to sort the PossibleTargets list by "custom" way, for example: FastestQueueBroker, ExecutionTarget which has the shortest queue lenght will be at the begining of the PossibleTargets list

Implements Arc::Broker.

Definition at line 299 of file PythonBroker.cpp.

                                 {

    PythonLock pylock;

    // Convert JobDescription to python object
    PyObjectP arg = Py_BuildValue("(l)", job);
    if (!arg) {
      logger.msg(ERROR, "Cannot create JobDescription argument");
      if (PyErr_Occurred())
        PyErr_Print();
      return;
    }

    PyObjectP py_job = PyObject_CallObject(arc_jobrepr_klass, arg);
    if (!py_job) {
      logger.msg(ERROR,
                 "Cannot convert JobDescription to python object");
      if (PyErr_Occurred())
        PyErr_Print();
      return;
    }

    // Convert incoming PossibleTargets to python list
    PyObjectP py_list = PyList_New(0);
    if (!py_list) {
      logger.msg(ERROR, "Cannot create python list");
      if (PyErr_Occurred())
        PyErr_Print();
      return;
    }

    for (std::list<ExecutionTarget*>::iterator it = PossibleTargets.begin();
         it != PossibleTargets.end(); it++) {

      PyObjectP arg = Py_BuildValue("(l)", (long int)&**it);
      if (!arg) {
        logger.msg(ERROR, "Cannot create ExecutionTarget argument");
        if (PyErr_Occurred())
          PyErr_Print();
        return;
      }

      PyObject *py_xtarget = PyObject_CallObject(arc_xtarget_klass, arg);
      if (!py_xtarget) {
        logger.msg(ERROR, "Cannot convert ExecutionTarget to python object");
        if (PyErr_Occurred())
          PyErr_Print();
        return;
      }

      PyList_Append(py_list, py_xtarget);
    }

    PyObjectP py_status = PyObject_CallMethod(object, (char*)"SortTargets",
                                              (char*)"(OO)",
                                              (PyObject*)py_list,
                                              (PyObject*)py_job);
    if (!py_status) {
      if (PyErr_Occurred())
        PyErr_Print();
      return;
    }

    PossibleTargets.clear();

    for (int i = 0; i < PyList_Size(py_list); i++) {
      PyObject *obj = PyList_GetItem(py_list, i);
      char this_str[] = "this";
      if (!PyObject_HasAttrString(obj, this_str))
        return;
      PyObject *thisattr = PyObject_GetAttrString(obj, this_str);
      if (!thisattr)
        return;
      void *ptr = ((PySwigObject*)thisattr)->ptr;
      PossibleTargets.push_back(((ExecutionTarget*)ptr));
      Py_DECREF(thisattr);
    }

    TargetSortingDone = true;

    return;
  }

Here is the call graph for this function:


Member Data Documentation

Definition at line 26 of file PythonBroker.h.

PyObject* Arc::PythonBroker::arc_module [private]

Definition at line 24 of file PythonBroker.h.

Definition at line 25 of file PythonBroker.h.

Definition at line 27 of file PythonBroker.h.

const JobDescription* Arc::Broker::job [protected, inherited]

Definition at line 71 of file Broker.h.

PyObject* Arc::PythonBroker::klass [private]

Definition at line 29 of file PythonBroker.h.

Glib::Mutex Arc::PythonBroker::lock [static, private]

Definition at line 35 of file PythonBroker.h.

Logger Arc::PythonBroker::logger [static, private]

Reimplemented from Arc::Broker.

Definition at line 32 of file PythonBroker.h.

PyObject* Arc::PythonBroker::module [private]

Definition at line 28 of file PythonBroker.h.

PyObject* Arc::PythonBroker::object [private]

Definition at line 30 of file PythonBroker.h.

std::list<ExecutionTarget*> Arc::Broker::PossibleTargets [protected, inherited]

This content the Prefilteres ExecutionTargets.

If an Execution Tartget has enought memory, CPU, diskspace, etc. for the actual job requirement than it will be added to the PossibleTargets list

Definition at line 68 of file Broker.h.

int Arc::PythonBroker::refcount = 0 [static, private]

Definition at line 34 of file PythonBroker.h.

bool Arc::Broker::TargetSortingDone [protected, inherited]

It is true if "custom" sorting is done.

Definition at line 70 of file Broker.h.

PyThreadState * Arc::PythonBroker::tstate = NULL [static, private]

Definition at line 33 of file PythonBroker.h.

const UserConfig& Arc::Broker::usercfg [protected, inherited]

Definition at line 62 of file Broker.h.


The documentation for this class was generated from the following files: