Back to index

nordugrid-arc-nox  1.1.0~rc6
Broker.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #include <algorithm>
00008 
00009 #include <arc/StringConv.h>
00010 #include <arc/ArcConfig.h>
00011 #include <arc/client/Broker.h>
00012 #include <arc/client/ExecutionTarget.h>
00013 #include <arc/loader/FinderLoader.h>
00014 
00015 namespace Arc {
00016 
00017   Logger Broker::logger(Logger::getRootLogger(), "Broker");
00018 
00019   Broker::Broker(const UserConfig& usercfg)
00020     : usercfg(usercfg),
00021       TargetSortingDone(false),
00022       job() {}
00023 
00024   Broker::~Broker() {}
00025 
00026   void Broker::PreFilterTargets(std::list<ExecutionTarget>& targets,
00027                                 const JobDescription& jobdesc) {
00028     job = &jobdesc;
00029 
00030     for (std::list<ExecutionTarget>::iterator target = targets.begin();
00031          target != targets.end(); target++) {
00032       logger.msg(VERBOSE, "Performing matchmaking against target (%s).", target->url.str());
00033 
00034       if (!job->Resources.CandidateTarget.empty()) {
00035         if (target->url.Host().empty())
00036           logger.msg(VERBOSE, "URL of ExecutionTarget is not properly defined");
00037         if (target->ComputingShareName.empty())
00038           logger.msg(VERBOSE, "ComputingShareName of ExecutionTarget (%s) is not defined", target->url.str());
00039 
00040         bool dropTarget = true;
00041 
00042         if (!target->url.Host().empty() || !target->ComputingShareName.empty()) {
00043           for (std::list<ResourceTargetType>::const_iterator it = job->Resources.CandidateTarget.begin();
00044                it != job->Resources.CandidateTarget.end(); it++) {
00045 
00046             if (!it->EndPointURL.Host().empty() &&
00047                 target->url.Host().empty()) { // Drop target since URL is not defined.
00048               logger.msg(VERBOSE, "URL of ExecutionTarget is not properly defined: %s.", target->url.str());
00049               break;
00050             }
00051 
00052             if (!it->QueueName.empty() &&
00053                 target->ComputingShareName.empty()) { // Drop target since ComputingShareName is not published.
00054               logger.msg(VERBOSE, "ComputingShareName of ExecutionTarget is not published, and a queue (%s) have been requested.", it->QueueName);
00055               break;
00056             }
00057 
00058             if (!it->EndPointURL.Host().empty() &&
00059                 target->url.Host() == it->EndPointURL.Host()) { // Example: knowarc1.grid.niif.hu
00060               dropTarget = false;
00061               break;
00062             }
00063 
00064             if (!it->QueueName.empty() &&
00065                 target->ComputingShareName == it->QueueName) {
00066               dropTarget = false;
00067               break;
00068             }
00069           }
00070 
00071           if (dropTarget) {
00072             logger.msg(VERBOSE, "ExecutionTarget does not satisfy any of the CandidateTargets.");
00073             continue;
00074           }
00075         }
00076         else {
00077           logger.msg(VERBOSE, "Neither URL or ComputingShareName is reported by the cluster");
00078           continue;
00079         }
00080       }
00081 
00082       if ((int)job->Application.ProcessingStartTime.GetTime() != -1) {
00083         if ((int)target->DowntimeStarts.GetTime() != -1 && (int)target->DowntimeEnds.GetTime() != -1) {
00084           if (target->DowntimeStarts <= job->Application.ProcessingStartTime && job->Application.ProcessingStartTime <= target->DowntimeEnds) {
00085             logger.msg(VERBOSE, "ProcessingStartTime (%s) specified in job description is inside the targets downtime period [ %s - %s ].", (std::string)job->Application.ProcessingStartTime, (std::string)target->DowntimeStarts, (std::string)target->DowntimeEnds);
00086             continue;
00087           }
00088         }
00089         else
00090           logger.msg(WARNING, "The downtime of the target (%s) is not published. Keeping target.", target->url.str());
00091       }
00092 
00093       if (!target->HealthState.empty()) {
00094 
00095         if (target->HealthState != "ok") { // Enumeration for healthstate: ok, critical, other, unknown, warning
00096           logger.msg(VERBOSE, "HealthState of ExecutionTarget (%s) is not OK (%s)", target->url.str(), target->HealthState);
00097           continue;
00098         }
00099       }
00100       else {
00101         logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, HealthState is not defined", target->url.str());
00102         continue;
00103       }
00104 
00105       if (!job->Resources.CEType.empty()) {
00106         if (!target->Implementation().empty()) {
00107           if (!job->Resources.CEType.isSatisfied(target->Implementation)) {
00108             logger.msg(VERBOSE, "Matchmaking, Computing endpoint requirement not satisfied. ExecutionTarget: %s", (std::string)target->Implementation);
00109             continue;
00110           }
00111         }
00112         else {
00113           logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, ImplementationName is not defined", target->url.str());
00114           continue;
00115         }
00116       }
00117 
00118       {
00119         typedef std::pair<std::string, int> EtTimePair;
00120         EtTimePair etTime[] = {EtTimePair("MaxWallTime", (int)target->MaxWallTime.GetPeriod()),
00121                                EtTimePair("MinWallTime", (int)target->MinWallTime.GetPeriod()),
00122                                EtTimePair("MaxCPUTime", (int)target->MaxCPUTime.GetPeriod()),
00123                                EtTimePair("MinCPUTime", (int)target->MinCPUTime.GetPeriod())};
00124 
00125         typedef std::pair<std::string, const ScalableTime<int>*> JobTimePair;
00126         JobTimePair jobTime[] = {JobTimePair("TotalWallTime", &job->Resources.TotalWallTime),
00127                                  JobTimePair("TotalCPUTime", &job->Resources.TotalCPUTime)};
00128 
00129         // Check if ARC-clockrate is defined, if not add it. Included to support the XRSL attribute gridtime.
00130         if (job->Resources.TotalCPUTime.benchmark.first == "ARC-clockrate") {
00131           target->Benchmarks["ARC-clockrate"] = (target->CPUClockSpeed > 0 ? (double)target->CPUClockSpeed : 1000.);
00132         }
00133 
00134         int i = 0;
00135         for (; i < 4; i++) {
00136           JobTimePair *jTime = &jobTime[i/2];
00137           if (i%2 == 0 && jTime->second->range.max != -1 ||
00138               i%2 == 1 && jTime->second->range.min != -1) {
00139             if (etTime[i].second != -1) {
00140               if (jTime->second->benchmark.first.empty()) { // No benchmark defined, do not scale.
00141                 if (i%2 == 0 && jTime->second->range.max > etTime[i].second ||
00142                     i%2 == 1 && jTime->second->range.min < etTime[i].second) {
00143                   logger.msg(VERBOSE,
00144                              "Matchmaking, %s (%d) is %s than %s (%d) published by the ExecutionTarget.",
00145                              jTime->first,
00146                              (i%2 == 0 ? jTime->second->range.max
00147                                        : jTime->second->range.min),
00148                              (i%2 == 0 ? "greater" : "less"),
00149                              etTime[i].first,
00150                              etTime[i].second);
00151                   break;
00152                 }
00153               }
00154               else { // Benchmark defined => scale using benchmark.
00155                 if (target->Benchmarks.find(jTime->second->benchmark.first) != target->Benchmarks.end()) {
00156                   if (i%2 == 0 && jTime->second->scaleMax(target->Benchmarks.find(jTime->second->benchmark.first)->second) > etTime[i].second ||
00157                       i%2 == 1 && jTime->second->scaleMin(target->Benchmarks.find(jTime->second->benchmark.first)->second) < etTime[i].second) {
00158                     logger.msg(VERBOSE,
00159                                "Matchmaking, The %s scaled %s (%d) is %s than the %s (%d) published by the ExecutionTarget.",
00160                                jTime->second->benchmark.first,
00161                                jTime->first,
00162                                (i%2 == 0 ? jTime->second->scaleMax(target->Benchmarks.find(jTime->second->benchmark.first)->second)
00163                                          : jTime->second->scaleMin(target->Benchmarks.find(jTime->second->benchmark.first)->second)),
00164                                (i%2 == 0 ? "greater" : "less"),
00165                                etTime[i].first,
00166                                etTime[i].second);
00167                     break;
00168                   }
00169                 }
00170                 else {
00171                   logger.msg(VERBOSE, "Matchmaking, Benchmark %s is not published by the ExecutionTarget.", jTime->second->benchmark.first);
00172                   break;
00173                 }
00174               }
00175             }
00176             // Do not drop target if it does not publish attribute.
00177           }
00178         }
00179 
00180         if (i != 4) // Above loop exited too early, which means target should be dropped.
00181           continue;
00182       }
00183 
00184       if (job->Resources.IndividualPhysicalMemory != -1) {
00185         if (target->MainMemorySize != -1) {     // Example: 678
00186           if (target->MainMemorySize < job->Resources.IndividualPhysicalMemory) {
00187             logger.msg(VERBOSE, "Matchmaking, MainMemorySize problem, ExecutionTarget: %d (MainMemorySize), JobDescription: %d (IndividualPhysicalMemory)", target->MainMemorySize, job->Resources.IndividualPhysicalMemory.max);
00188             continue;
00189           }
00190         }
00191         else if (target->MaxMainMemory != -1) {     // Example: 678
00192           if (target->MaxMainMemory < job->Resources.IndividualPhysicalMemory) {
00193             logger.msg(VERBOSE, "Matchmaking, MaxMainMemory problem, ExecutionTarget: %d (MaxMainMemory), JobDescription: %d (IndividualPhysicalMemory)", target->MaxMainMemory, job->Resources.IndividualPhysicalMemory.max);
00194             continue;
00195           }
00196 
00197         }
00198         else {
00199           logger.msg(VERBOSE, "Matchmaking, ExecutionTarget: %s, MaxMainMemory and MainMemorySize are not defined", target->url.str());
00200           continue;
00201         }
00202       }
00203 
00204       if (job->Resources.IndividualVirtualMemory != -1) {
00205         if (target->MaxVirtualMemory != -1) {     // Example: 678
00206           if (target->MaxVirtualMemory < job->Resources.IndividualVirtualMemory) {
00207             logger.msg(VERBOSE, "Matchmaking, MaxVirtualMemory problem, ExecutionTarget: %d (MaxVirtualMemory), JobDescription: %d (IndividualVirtualMemory)", target->MaxVirtualMemory, job->Resources.IndividualVirtualMemory.max);
00208             continue;
00209           }
00210         }
00211         else {
00212           logger.msg(VERBOSE, "Matchmaking, ExecutionTarget: %s, MaxVirtualMemory is not defined", target->url.str());
00213           continue;
00214         }
00215       }
00216 
00217       if (!job->Resources.Platform.empty()) {
00218         if (!target->Platform.empty()) {    // Example: i386
00219           if (target->Platform != job->Resources.Platform) {
00220             logger.msg(VERBOSE, "Matchmaking, Platform problem, ExecutionTarget: %s (Platform) JobDescription: %s (Platform)", target->Platform, job->Resources.Platform);
00221             continue;
00222           }
00223         }
00224         else {
00225           logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, Platform is not defined", target->url.str());
00226           continue;
00227         }
00228       }
00229 
00230       if (!job->Resources.OperatingSystem.empty()) {
00231         if (!target->OperatingSystem.empty()) {
00232           if (!job->Resources.OperatingSystem.isSatisfied(target->OperatingSystem)) {
00233             logger.msg(VERBOSE, "Matchmaking, ExecutionTarget: %s, OperatingSystem requirements not satisfied", target->url.str());
00234             continue;
00235           }
00236         }
00237         else {
00238           logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s,  OperatingSystem is not defined", target->url.str());
00239           continue;
00240         }
00241       }
00242 
00243       if (!job->Resources.RunTimeEnvironment.empty()) {
00244         if (!target->ApplicationEnvironments.empty()) {
00245           if (!job->Resources.RunTimeEnvironment.isSatisfied(target->ApplicationEnvironments)) {
00246             logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, RunTimeEnvironment requirements not satisfied", target->url.str());
00247             continue;
00248           }
00249         }
00250         else {
00251           logger.msg(VERBOSE, "Matchmaking, ExecutionTarget: %s, ApplicationEnvironments not defined", target->url.str());
00252           continue;
00253         }
00254       }
00255 
00256       if (!job->Resources.NetworkInfo.empty())
00257         if (!target->NetworkInfo.empty()) {    // Example: infiniband
00258           if (std::find(target->NetworkInfo.begin(), target->NetworkInfo.end(),
00259                         job->Resources.NetworkInfo) == target->NetworkInfo.end()) {
00260             logger.msg(VERBOSE, "Matchmaking, NetworkInfo demand not fulfilled, ExecutionTarget do not support %s, specified in the JobDescription.", job->Resources.NetworkInfo);
00261             continue;
00262           }
00263           else {
00264             logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, NetworkInfo is not defined", target->url.str());
00265             continue;
00266           }
00267         }
00268 
00269       if (job->Resources.DiskSpaceRequirement.SessionDiskSpace != -1) {
00270         if (target->MaxDiskSpace != -1) {     // Example: 5656
00271           if (target->MaxDiskSpace < job->Resources.DiskSpaceRequirement.SessionDiskSpace) {
00272             logger.msg(VERBOSE, "Matchmaking, MaxDiskSpace problem, ExecutionTarget: %d (MaxDiskSpace) JobDescription: %d (SessionDiskSpace)", target->MaxDiskSpace, job->Resources.DiskSpaceRequirement.SessionDiskSpace);
00273             continue;
00274           }
00275         }
00276         else if (target->WorkingAreaTotal != -1) {     // Example: 5656
00277           if (target->WorkingAreaTotal < job->Resources.DiskSpaceRequirement.SessionDiskSpace) {
00278             logger.msg(VERBOSE, "Matchmaking, WorkingAreaTotal problem, ExecutionTarget: %d (WorkingAreaTotal) JobDescription: %d (SessionDiskSpace)", target->WorkingAreaTotal, job->Resources.DiskSpaceRequirement.SessionDiskSpace);
00279             continue;
00280           }
00281         }
00282         else {
00283           logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, MaxDiskSpace and WorkingAreaTotal are not defined", target->url.str());
00284           continue;
00285         }
00286       }
00287 
00288       if (job->Resources.DiskSpaceRequirement.DiskSpace != -1 && job->Resources.DiskSpaceRequirement.CacheDiskSpace != -1) {
00289         if (target->MaxDiskSpace != -1) {     // Example: 5656
00290           if (target->MaxDiskSpace < job->Resources.DiskSpaceRequirement.DiskSpace - job->Resources.DiskSpaceRequirement.CacheDiskSpace) {
00291             logger.msg(VERBOSE, "Matchmaking, MaxDiskSpace >= DiskSpace - CacheDiskSpace problem, ExecutionTarget: %d (MaxDiskSpace) JobDescription: %d (DiskSpace) - %d (CacheDiskSpace)", target->MaxDiskSpace, job->Resources.DiskSpaceRequirement.DiskSpace.max, job->Resources.DiskSpaceRequirement.CacheDiskSpace);
00292             continue;
00293           }
00294         }
00295         else if (target->WorkingAreaTotal != -1) {     // Example: 5656
00296           if (target->WorkingAreaTotal < job->Resources.DiskSpaceRequirement.DiskSpace - job->Resources.DiskSpaceRequirement.CacheDiskSpace) {
00297             logger.msg(VERBOSE, "Matchmaking, WorkingAreaTotal >= DiskSpace - CacheDiskSpace problem, ExecutionTarget: %d (MaxDiskSpace) JobDescription: %d (DiskSpace) - %d (CacheDiskSpace)", target->WorkingAreaTotal, job->Resources.DiskSpaceRequirement.DiskSpace.max, job->Resources.DiskSpaceRequirement.CacheDiskSpace);
00298             continue;
00299           }
00300         }
00301         else {
00302           logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, MaxDiskSpace and WorkingAreaTotal are not defined", target->url.str());
00303           continue;
00304         }
00305       }
00306 
00307       if (job->Resources.DiskSpaceRequirement.DiskSpace != -1) {
00308         if (target->MaxDiskSpace != -1) {     // Example: 5656
00309           if (target->MaxDiskSpace < job->Resources.DiskSpaceRequirement.DiskSpace) {
00310             logger.msg(VERBOSE, "Matchmaking, MaxDiskSpace problem, ExecutionTarget: %d (MaxDiskSpace) JobDescription: %d (DiskSpace)", target->MaxDiskSpace, job->Resources.DiskSpaceRequirement.DiskSpace.max);
00311             continue;
00312           }
00313         }
00314         else if (target->WorkingAreaTotal != -1) {     // Example: 5656
00315           if (target->WorkingAreaTotal < job->Resources.DiskSpaceRequirement.DiskSpace) {
00316             logger.msg(VERBOSE, "Matchmaking, WorkingAreaTotal problem, ExecutionTarget: %d (WorkingAreaTotal) JobDescription: %d (DiskSpace)", target->WorkingAreaTotal, job->Resources.DiskSpaceRequirement.DiskSpace.max);
00317             continue;
00318           }
00319         }
00320         else {
00321           logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, MaxDiskSpace and WorkingAreaTotal are not defined", target->url.str());
00322           continue;
00323         }
00324       }
00325 
00326       if (job->Resources.DiskSpaceRequirement.CacheDiskSpace != -1) {
00327         if (target->CacheTotal != -1) {     // Example: 5656
00328           if (target->CacheTotal < job->Resources.DiskSpaceRequirement.CacheDiskSpace) {
00329             logger.msg(VERBOSE, "Matchmaking, CacheTotal problem, ExecutionTarget: %d (CacheTotal) JobDescription: %d (CacheDiskSpace)", target->CacheTotal, job->Resources.DiskSpaceRequirement.CacheDiskSpace);
00330             continue;
00331           }
00332         }
00333         else {
00334           logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, CacheTotal is not defined", target->url.str());
00335           continue;
00336         }
00337       }
00338 
00339       if (job->Resources.SlotRequirement.NumberOfSlots != -1) {
00340         if (target->TotalSlots != -1) {     // Example: 5656
00341           if (target->TotalSlots < job->Resources.SlotRequirement.NumberOfSlots) {
00342             logger.msg(VERBOSE, "Matchmaking, TotalSlots problem, ExecutionTarget: %d (TotalSlots) JobDescription: %d (NumberOfProcesses)", target->TotalSlots, job->Resources.SlotRequirement.NumberOfSlots.max);
00343             continue;
00344           }
00345         }
00346         if (target->MaxSlotsPerJob != -1) {     // Example: 5656
00347           if (target->MaxSlotsPerJob < job->Resources.SlotRequirement.NumberOfSlots) {
00348             logger.msg(VERBOSE, "Matchmaking, MaxSlotsPerJob problem, ExecutionTarget: %d (MaxSlotsPerJob) JobDescription: %d (NumberOfProcesses)", target->TotalSlots, job->Resources.SlotRequirement.NumberOfSlots.max);
00349             continue;
00350           }
00351         }
00352 
00353         if (target->TotalSlots == -1 && target->MaxSlotsPerJob == -1) {
00354           logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, TotalSlots and MaxSlotsPerJob are not defined", target->url.str());
00355           continue;
00356         }
00357       }
00358 
00359       if ((int)job->Resources.SessionLifeTime.GetPeriod() != -1) {
00360         if ((int)target->WorkingAreaLifeTime.GetPeriod() != -1) {     // Example: 123
00361           if (target->WorkingAreaLifeTime < job->Resources.SessionLifeTime) {
00362             logger.msg(VERBOSE, "Matchmaking, WorkingAreaLifeTime problem, ExecutionTarget: %s (WorkingAreaLifeTime) JobDescription: %s (SessionLifeTime)", (std::string)target->WorkingAreaLifeTime, (std::string)job->Resources.SessionLifeTime);
00363             continue;
00364           }
00365         }
00366         else {
00367           logger.msg(VERBOSE, "Matchmaking, ExecutionTarget:  %s, WorkingAreaLifeTime is not defined", target->url.str());
00368           continue;
00369         }
00370       }
00371 
00372       if ((job->Resources.NodeAccess == NAT_INBOUND ||
00373            job->Resources.NodeAccess == NAT_INOUTBOUND) &&
00374           !target->ConnectivityIn) {     // Example: false (boolean)
00375         logger.msg(VERBOSE, "Matchmaking, ConnectivityIn problem, ExecutionTarget: %s (ConnectivityIn) JobDescription: %s (InBound)", (job->Resources.NodeAccess == NAT_INBOUND ? "INBOUND" : "INOUTBOUND"), (target->ConnectivityIn ? "true" : "false"));
00376         continue;
00377       }
00378 
00379       if ((job->Resources.NodeAccess == NAT_OUTBOUND ||
00380            job->Resources.NodeAccess == NAT_INOUTBOUND) &&
00381           !target->ConnectivityOut) {     // Example: false (boolean)
00382         logger.msg(VERBOSE, "Matchmaking, ConnectivityOut problem, ExecutionTarget: %s (ConnectivityOut) JobDescription: %s (OutBound)", (job->Resources.NodeAccess == NAT_OUTBOUND ? "OUTBOUND" : "INOUTBOUND"), (target->ConnectivityIn ? "true" : "false"));
00383         continue;
00384       }
00385 
00386       PossibleTargets.push_back(&*target);
00387 
00388     } //end loop over all found targets
00389 
00390     logger.msg(VERBOSE, "Possible targets after prefiltering: %d", PossibleTargets.size());
00391 
00392     std::list<ExecutionTarget*>::iterator iter = PossibleTargets.begin();
00393 
00394     for (int i = 1; iter != PossibleTargets.end(); iter++, i++) {
00395       logger.msg(VERBOSE, "%d. Cluster: %s; Queue: %s", i, (*iter)->DomainName, (*iter)->ComputingShareName);
00396       logger.msg(VERBOSE, "Health State: %s", (*iter)->HealthState);
00397     }
00398 
00399     TargetSortingDone = false;
00400   }
00401 
00402   const ExecutionTarget* Broker::GetBestTarget() {
00403     if (PossibleTargets.size() <= 0 || current == PossibleTargets.end())
00404       return NULL;
00405 
00406     if (!TargetSortingDone) {
00407       logger.msg(DEBUG, "Target sorting not done, sorting them now");
00408       SortTargets();
00409       current = PossibleTargets.begin();
00410     }
00411     else
00412       current++;
00413 
00414     return (current != PossibleTargets.end() ? *current : NULL);
00415   }
00416 
00417   void Broker::RegisterJobsubmission() {
00418     if (!job || current == PossibleTargets.end())
00419       return;
00420     if ((*current)->FreeSlots >= job->Resources.SlotRequirement.NumberOfSlots) {   //The job will start directly
00421       (*current)->FreeSlots -= job->Resources.SlotRequirement.NumberOfSlots;
00422       if ((*current)->UsedSlots != -1)
00423         (*current)->UsedSlots += job->Resources.SlotRequirement.NumberOfSlots;
00424     }
00425     else                                           //The job will be queued
00426       if ((*current)->WaitingJobs != -1)
00427         (*current)->WaitingJobs += job->Resources.SlotRequirement.NumberOfSlots;
00428   }
00429 
00430   BrokerLoader::BrokerLoader()
00431     : Loader(BaseConfig().MakeConfig(Config()).Parent()) {}
00432 
00433   BrokerLoader::~BrokerLoader() {
00434     for (std::list<Broker*>::iterator it = brokers.begin();
00435          it != brokers.end(); it++)
00436       delete *it;
00437   }
00438 
00439   Broker* BrokerLoader::load(const std::string& name,
00440                              const UserConfig& usercfg) {
00441     if (name.empty())
00442       return NULL;
00443 
00444     if(!factory_->load(FinderLoader::GetLibrariesList(),
00445                        "HED:Broker", name)) {
00446       logger.msg(ERROR, "Broker plugin \"%s\" not found.", name);
00447       return NULL;
00448     }
00449 
00450     BrokerPluginArgument arg(usercfg);
00451     Broker *broker = factory_->GetInstance<Broker>("HED:Broker", name, &arg, false);
00452 
00453     if (!broker) {
00454       logger.msg(ERROR, "Broker %s could not be created", name);
00455       return NULL;
00456     }
00457 
00458     brokers.push_back(broker);
00459     logger.msg(INFO, "Loaded Broker %s", name);
00460     return broker;
00461   }
00462 
00463 } // namespace Arc