Back to index

nordugrid-arc-nox  1.1.0~rc6
job.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 
00005 #include <cstdlib>
00006 // NOTE: On Solaris errno is not working properly if cerrno is included first
00007 #include <cerrno>
00008 
00009 #include <sys/stat.h>
00010 #include <sys/types.h>
00011 #include <unistd.h>
00012 #include <fcntl.h>
00013 
00014 #include <arc/DateTime.h>
00015 #include <arc/Thread.h>
00016 #include <arc/StringConv.h>
00017 #include <arc/FileUtils.h>
00018 #include <arc/security/ArcPDP/Evaluator.h>
00019 #include <arc/security/ArcPDP/EvaluatorLoader.h>
00020 #include <arc/message/SecAttr.h>
00021 #include <arc/credential/Credential.h>
00022 #include <arc/ws-addressing/WSA.h>
00023 
00024 #include "grid-manager/conf/environment.h"
00025 #include "grid-manager/conf/conf_pre.h"
00026 #include "grid-manager/jobs/job.h"
00027 #include "grid-manager/jobs/plugins.h"
00028 #include "grid-manager/jobs/job_request.h"
00029 #include "grid-manager/jobs/commfifo.h"
00030 #include "grid-manager/run/run_plugin.h"
00031 #include "grid-manager/files/info_files.h"
00032  
00033 #include "job.h"
00034 
00035 using namespace ARex;
00036 
00037 #define JOB_POLICY_OPERATION_URN "http://www.nordugrid.org/schemas/policy-arc/types/a-rex/joboperation"
00038 
00039 static bool env_initialized = false;
00040 Glib::StaticMutex env_lock = GLIBMM_STATIC_MUTEX_INIT;
00041 
00042 bool ARexGMConfig::InitEnvironment(const std::string& configfile) {
00043   if(env_initialized) return true;
00044   env_lock.lock();
00045   if(!env_initialized) {
00046     if(!configfile.empty()) nordugrid_config_loc(configfile);
00047     env_initialized=read_env_vars();
00048   };
00049   env_lock.unlock();
00050   return env_initialized;
00051 }
00052 
00053 ARexGMConfig::~ARexGMConfig(void) {
00054   if(user_) delete user_;
00055 }
00056 
00057 ARexGMConfig::ARexGMConfig(const std::string& configfile,const std::string& uname,const std::string& grid_name,const std::string& service_endpoint):user_(NULL),readonly_(false),grid_name_(grid_name),service_endpoint_(service_endpoint) {
00058   if(!InitEnvironment(configfile)) return;
00059   // const char* uname = user_s.get_uname();
00060   //if((bool)job_map) uname=job_map.unix_name();
00061   user_=new JobUser(uname);
00062   if(!user_->is_valid()) { delete user_; user_=NULL; return; };
00063   if(nordugrid_loc().empty() != 0) { delete user_; user_=NULL; return; };
00064   /* read configuration */
00065   std::vector<std::string> session_roots;
00066   std::string control_dir;
00067   std::string default_lrms;
00068   std::string default_queue;
00069   RunPlugin* cred_plugin = new RunPlugin;
00070   std::string allowsubmit;
00071   bool strict_session;
00072   if(!configure_user_dirs(uname,control_dir,session_roots,
00073                           session_roots_non_draining_,
00074                           default_lrms,default_queue,queues_,
00075                           cont_plugins_,*cred_plugin,
00076                           allowsubmit,strict_session)) {
00077     // olog<<"Failed processing grid-manager configuration"<<std::endl;
00078     delete user_; user_=NULL; delete cred_plugin; return;
00079   };
00080   delete cred_plugin;
00081   if(default_queue.empty() && (queues_.size() == 1)) {
00082     default_queue=*(queues_.begin());
00083   };
00084   user_->SetControlDir(control_dir);
00085   user_->SetSessionRoot(session_roots);
00086   user_->SetLRMS(default_lrms,default_queue);
00087   user_->SetStrictSession(strict_session);
00088   //for(;allowsubmit.length();) {
00089   //  std::string group = config_next_arg(allowsubmit);
00090   //  if(group.length()) readonly=true;
00091   //  if(user_a.check_group(group)) { readonly=false; break; };
00092   //};
00093   //if(readonly) olog<<"This user is denied to submit new jobs"<<std::endl;
00094   /*
00095           * link to the class for direct file access *
00096           std::string direct_config = "mount "+session_root+"\n";
00097           direct_config+="dir / nouser read cd dirlist delete append overwrite";          direct_config+=" create "+
00098              inttostring(user->get_uid())+":"+inttostring(user->get_gid())+
00099              " 600:600";
00100           direct_config+=" mkdir "+
00101              inttostring(user->get_uid())+":"+inttostring(user->get_gid())+
00102              " 700:700\n";
00103           direct_config+="end\n";
00104 #ifdef HAVE_SSTREAM
00105           std::stringstream fake_cfile(direct_config);
00106 #else
00107           std::strstream fake_cfile;
00108           fake_cfile<<direct_config;
00109 #endif
00110           direct_fs = new DirectFilePlugin(fake_cfile,user_s);
00111           if((bool)job_map) {
00112             olog<<"Job submission user: "<<uname<<
00113                   " ("<<user->get_uid()<<":"<<user->get_gid()<<")"<<std::endl;
00114   */
00115 }
00116 
00117 template <typename T> class AutoPointer {
00118  private:
00119   T* object;
00120   void operator=(const AutoPointer<T>&) { };
00121   void operator=(T*) { };
00122   AutoPointer(const AutoPointer&):object(NULL) { };
00123  public:
00124   AutoPointer(void):object(NULL) { };
00125   AutoPointer(T* o):object(o) { }
00126   ~AutoPointer(void) { if(object) delete object; };
00127   T& operator*(void) const { return *object; };
00128   T* operator->(void) const { return object; };
00129   operator bool(void) const { return (object!=NULL); };
00130   bool operator!(void) const { return (object==NULL); };
00131   operator T*(void) const { return object; };
00132 };
00133 
00134 static ARexJobFailure setfail(JobReqResult res) {
00135   switch(res) {
00136     case JobReqSuccess: return ARexJobNoError;
00137     case JobReqInternalFailure: return ARexJobInternalError;
00138     case JobReqSyntaxFailure: return ARexJobDescriptionSyntaxError;
00139     case JobReqUnsupportedFailure: return ARexJobDescriptionUnsupportedError;
00140     case JobReqMissingFailure: return ARexJobDescriptionMissingError;
00141     case JobReqLogicalFailure: return ARexJobDescriptionLogicalError;
00142   };
00143   return ARexJobInternalError;
00144 }
00145 
00146 bool ARexJob::is_allowed(bool fast) {
00147   allowed_to_see_=false;
00148   allowed_to_maintain_=false;
00149   // Checking user's grid name against owner
00150   if(config_.GridName() == job_.DN) {
00151     allowed_to_see_=true;
00152     allowed_to_maintain_=true;
00153     return true;
00154   };
00155   if(fast) return true;
00156   // Do fine-grained authorization requested by job's owner
00157   if(config_.beginAuth() == config_.endAuth()) return true;
00158   std::string acl;
00159   if(!job_acl_read_file(id_,*config_.User(),acl)) return true; // safe to ignore
00160   if(acl.empty()) return true; // No policy defiled - only owner allowed
00161   // Identify and parse policy
00162   ArcSec::EvaluatorLoader eval_loader;
00163   AutoPointer<ArcSec::Policy> policy(eval_loader.getPolicy(ArcSec::Source(acl)));
00164   if(!policy) {
00165     logger_.msg(Arc::VERBOSE, "%s: Failed to parse user policy", id_);
00166     return true;
00167   };
00168   AutoPointer<ArcSec::Evaluator> eval(eval_loader.getEvaluator(policy));
00169   if(!eval) {
00170     logger_.msg(Arc::VERBOSE, "%s: Failed to load evaluator for user policy ", id_);
00171     return true;
00172   };
00173   std::string policyname = policy->getName();
00174   if((policyname.length() > 7) && 
00175      (policyname.substr(policyname.length()-7) == ".policy")) {
00176     policyname.resize(policyname.length()-7);
00177   };
00178   if(policyname == "arc") {
00179     // Creating request - directly with XML
00180     // Creating top of request document
00181     Arc::NS ns;
00182     ns["ra"]="http://www.nordugrid.org/schemas/request-arc";
00183     Arc::XMLNode request(ns,"ra:Request");
00184     // Collect all security attributes
00185     for(std::list<Arc::MessageAuth*>::iterator a = config_.beginAuth();a!=config_.endAuth();++a) {
00186       if(*a) (*a)->Export(Arc::SecAttr::ARCAuth,request);
00187     };
00188     // Leave only client identities
00189     for(Arc::XMLNode item = request["RequestItem"];(bool)item;++item) {
00190       for(Arc::XMLNode a = item["Action"];(bool)a;a=item["Action"]) a.Destroy();
00191       for(Arc::XMLNode r = item["Resource"];(bool)r;r=item["Resource"]) r.Destroy();
00192     };
00193     // Fix namespace
00194     request.Namespaces(ns);
00195     // Create A-Rex specific action
00196     // TODO: make helper classes for such operations
00197     Arc::XMLNode item = request["ra:RequestItem"];
00198     if(!item) item=request.NewChild("ra:RequestItem");
00199     // Possible operations are Modify and Read
00200     Arc::XMLNode action;
00201     action=item.NewChild("ra:Action");
00202     action="Read"; action.NewAttribute("Type")="string";
00203     action.NewAttribute("AttributeId")=JOB_POLICY_OPERATION_URN;
00204     action=item.NewChild("ra:Action");
00205     action="Modify"; action.NewAttribute("Type")="string";
00206     action.NewAttribute("AttributeId")=JOB_POLICY_OPERATION_URN;
00207     // Evaluating policy
00208     ArcSec::Response *resp = eval->evaluate(request,policy);
00209     // Analyzing response in order to understand which operations are allowed
00210     if(!resp) return true; // Not authorized
00211     // Following should be somehow made easier
00212     ArcSec::ResponseList& rlist = resp->getResponseItems();
00213     for(int n = 0; n<rlist.size(); ++n) {
00214       ArcSec::ResponseItem* ritem = rlist[n];
00215       if(!ritem) continue;
00216       if(ritem->res != ArcSec::DECISION_PERMIT) continue;
00217       if(!(ritem->reqtp)) continue;
00218       for(ArcSec::Action::iterator a = ritem->reqtp->act.begin();a!=ritem->reqtp->act.end();++a) {
00219         ArcSec::RequestAttribute* attr = *a;
00220         if(!attr) continue;
00221         ArcSec::AttributeValue* value = attr->getAttributeValue();
00222         if(!value) continue;
00223         std::string action = value->encode();
00224         if(action == "Read") allowed_to_see_=true;
00225         if(action == "Modify") allowed_to_maintain_=true;
00226       };
00227     };
00228   } else if(policyname == "gacl") {
00229     // Creating request - directly with XML
00230     Arc::NS ns;
00231     Arc::XMLNode request(ns,"gacl");
00232     // Collect all security attributes
00233     for(std::list<Arc::MessageAuth*>::iterator a = config_.beginAuth();a!=config_.endAuth();++a) {
00234       if(*a) (*a)->Export(Arc::SecAttr::GACL,request);
00235     };
00236     // Leave only client identities
00237     int entries = 0;
00238     for(Arc::XMLNode entry = request["entry"];(bool)entry;++entry) {
00239       for(Arc::XMLNode a = entry["allow"];(bool)a;a=entry["allow"]) a.Destroy();
00240       for(Arc::XMLNode a = entry["deny"];(bool)a;a=entry["deny"]) a.Destroy();
00241       ++entries;
00242     };
00243     if(!entries) request.NewChild("entry");
00244     // Evaluate every action separately
00245     for(Arc::XMLNode entry = request["entry"];(bool)entry;++entry) {
00246       entry.NewChild("allow").NewChild("read");
00247     };
00248     ArcSec::Response *resp;
00249     resp=eval->evaluate(request,policy);
00250     if(resp) {
00251       ArcSec::ResponseList& rlist = resp->getResponseItems();
00252       for(int n = 0; n<rlist.size(); ++n) {
00253         ArcSec::ResponseItem* ritem = rlist[n];
00254         if(!ritem) continue;
00255         if(ritem->res != ArcSec::DECISION_PERMIT) continue;
00256         allowed_to_see_=true; break;
00257       };
00258     };
00259     for(Arc::XMLNode entry = request["entry"];(bool)entry;++entry) {
00260       entry["allow"].Destroy();
00261       entry.NewChild("allow").NewChild("write");
00262     };
00263     resp=eval->evaluate(request,policy);
00264     if(resp) {
00265       ArcSec::ResponseList& rlist = resp->getResponseItems();
00266       for(int n = 0; n<rlist.size(); ++n) {
00267         ArcSec::ResponseItem* ritem = rlist[n];
00268         if(!ritem) continue;
00269         if(ritem->res != ArcSec::DECISION_PERMIT) continue;
00270         allowed_to_maintain_=true; break;
00271       };
00272     };
00273     // TODO: <list/>, <admin/>
00274   } else {
00275     logger_.msg(Arc::VERBOSE, "%s: Unknown user policy '%s'", id_, policyname);
00276   };
00277   return true;
00278 }
00279 
00280 ARexJob::ARexJob(const std::string& id,ARexGMConfig& config,Arc::Logger& logger,bool fast_auth_check):id_(id),logger_(logger),config_(config) {
00281   if(id_.empty()) return;
00282   if(!config_) { id_.clear(); return; };
00283   // Reading essential information about job
00284   if(!job_local_read_file(id_,*config_.User(),job_)) { id_.clear(); return; };
00285   // Checking if user is allowed to do anything with that job
00286   if(!is_allowed(fast_auth_check)) { id_.clear(); return; };
00287   if(!(allowed_to_see_ || allowed_to_maintain_)) { id_.clear(); return; };
00288 }
00289 
00290 ARexJob::ARexJob(Arc::XMLNode jsdl,ARexGMConfig& config,const std::string& credentials,const std::string& clientid, Arc::Logger& logger, Arc::XMLNode migration):id_(""),logger_(logger),config_(config) {
00291   if(!config_) return;
00292   // New job is created here
00293   // First get and acquire new id
00294   if(!make_job_id()) return;
00295   // Turn JSDL into text
00296   std::string job_desc_str;
00297   // Make full XML doc out of subtree
00298   {
00299     Arc::XMLNode jsdldoc;
00300     jsdl.New(jsdldoc);
00301     jsdldoc.GetDoc(job_desc_str);
00302   };
00303   // Store description
00304   std::string fname = config_.User()->ControlDir() + "/job." + id_ + ".description";
00305   if(!job_description_write_file(fname,job_desc_str)) {
00306     delete_job_id();
00307     failure_="Failed to store job RSL description";
00308     failure_type_=ARexJobInternalError;
00309     return;
00310   };
00311   // Analyze JSDL (checking, substituting, etc)
00312   std::string acl("");
00313   if((failure_type_=setfail(parse_job_req(fname.c_str(),job_,&acl))) != ARexJobNoError) {
00314     if(failure_.empty()) {
00315       failure_="Failed to parse job/action description";
00316       failure_type_=ARexJobInternalError;
00317     };
00318     delete_job_id();
00319     return;
00320   };
00321   // Check for proper LRMS name in request. If there is no LRMS name
00322   // in user configuration that means service is opaque frontend and
00323   // accepts any LRMS in request.
00324   if((!job_.lrms.empty()) && (!config_.User()->DefaultLRMS().empty())) {
00325     if(job_.lrms != config_.User()->DefaultLRMS()) {
00326       failure_="Requested LRMS is not supported by this service";
00327       failure_type_=ARexJobInternalError;
00328       //failure_type_=ARexJobDescriptionLogicalError;
00329       delete_job_id();
00330       return;
00331     };
00332   };
00333   if(job_.lrms.empty()) job_.lrms=config_.User()->DefaultLRMS();
00334   // Check for proper queue in request.
00335   if(job_.queue.empty()) job_.queue=config_.User()->DefaultQueue();
00336   if(job_.queue.empty()) {
00337     failure_="Request has no queue defined";
00338     failure_type_=ARexJobDescriptionMissingError;
00339     delete_job_id();
00340     return;
00341   };
00342   if(config_.Queues().size() > 0) { // If no queues configured - service takes any
00343     for(std::list<std::string>::const_iterator q = config_.Queues().begin();;++q) {
00344       if(q == config_.Queues().end()) {
00345         failure_="Requested queue "+job_.queue+" does not match any of available queues";
00346         //failure_type_=ARexJobDescriptionLogicalError;
00347         failure_type_=ARexJobInternalError;
00348         delete_job_id();
00349         return;
00350       };
00351       if(*q == job_.queue) break;
00352     };
00353   };
00354   // Start local file 
00355   /* !!!!! some parameters are unchecked here - rerun,diskspace !!!!! */
00356   job_.jobid=id_;
00357   job_.starttime=Arc::Time();
00358   job_.DN=config_.GridName();
00359   job_.clientname=clientid;
00360   job_.migrateactivityid=(std::string)migration["ActivityIdentifier"];
00361   job_.forcemigration=(migration["ForceMigration"]=="true");
00362   // BES ActivityIdentifier is global job ID
00363   Arc::NS ns_;
00364   ns_["bes-factory"]="http://schemas.ggf.org/bes/2006/08/bes-factory";
00365   ns_["a-rex"]="http://www.nordugrid.org/schemas/a-rex";
00366   Arc::XMLNode node_(ns_,"bes-factory:ActivityIdentifier");
00367   Arc::WSAEndpointReference identifier(node_);
00368   identifier.Address(config.Endpoint()); // address of service
00369   identifier.ReferenceParameters().NewChild("a-rex:JobID")=id_;
00370   identifier.ReferenceParameters().NewChild("a-rex:JobSessionDir")=config.Endpoint()+"/"+id_;
00371   std::string globalid;
00372   ((Arc::XMLNode)identifier).GetDoc(globalid);
00373   std::string::size_type nlp;
00374   while ((nlp=globalid.find('\n')) != std::string::npos)
00375     globalid.replace(nlp,1," "); // squeeze into 1 line
00376   job_.globalid=globalid;
00377   // Try to create proxy
00378   if(!update_credentials(credentials)) {
00379     failure_="Failed to store credentials";
00380     failure_type_=ARexJobInternalError;
00381     delete_job_id();
00382     return;
00383   };
00384   // Choose session directory
00385   std::string sessiondir;
00386   if (!ChooseSessionDir(id_, sessiondir)) {
00387     delete_job_id();
00388     failure_="Failed to find valid session directory";
00389     failure_type_=ARexJobInternalError;
00390     return;
00391   };
00392   config_.User()->SetSessionRoot(sessiondir);
00393   // Write local file
00394   JobDescription job(id_,config_.User()->SessionRoot()+"/"+id_,JOB_STATE_ACCEPTED);
00395   job.set_local(&job_); // need this for write_grami
00396   if(!job_local_write_file(job,*config_.User(),job_)) {
00397     delete_job_id();
00398     failure_="Failed to create job description";
00399     failure_type_=ARexJobInternalError;
00400     return;
00401   };
00402   // Write grami file
00403   Arc::JobDescription desc;
00404   desc.AddHint("SOURCEDIALECT","GRIDMANAGER");
00405   desc.Parse(job_desc_str);
00406   if(!write_grami(desc,job,*config_.User(),NULL)) {
00407     delete_job_id();
00408     failure_="Failed to create grami file";
00409     failure_type_=ARexJobInternalError;
00410     return;
00411   };
00412   // Write ACL file
00413   if(!acl.empty()) {
00414     if(!job_acl_write_file(id_,*config.User(),acl)) {
00415       delete_job_id();
00416       failure_="Failed to process/store job ACL";
00417       failure_type_=ARexJobInternalError;
00418       return;
00419     };
00420   };
00421   // Call authentication/authorization plugin/exec
00422   {
00423     // talk to external plugin to ask if we can proceed
00424     std::list<ContinuationPlugins::result_t> results;
00425     config_.Plugins().run(job,*config_.User(),results);
00426     std::list<ContinuationPlugins::result_t>::iterator result = results.begin();
00427     while(result != results.end()) {
00428       // analyze results
00429       if(result->action == ContinuationPlugins::act_fail) {
00430         delete_job_id();
00431         failure_="Job is not allowed by external plugin: "+result->response;
00432         failure_type_=ARexJobInternalError;
00433         return;
00434       } else if(result->action == ContinuationPlugins::act_log) {
00435         // Scream but go ahead
00436         logger_.msg(Arc::WARNING, "Failed to run external plugin: %s", result->response);
00437       } else if(result->action == ContinuationPlugins::act_pass) {
00438         // Just continue
00439         if(result->response.length()) {
00440           logger_.msg(Arc::INFO, "Plugin response: %s", result->response);
00441         };
00442       } else {
00443         delete_job_id();
00444         failure_="Failed to pass external plugin: "+result->response;
00445         failure_type_=ARexJobInternalError;
00446         return;
00447       };
00448       ++result;
00449     };
00450   };
00451 /*@
00452   // Make access to filesystem on behalf of local user
00453   if(cred_plugin && (*cred_plugin)) {
00454     job_subst_t subst_arg;
00455     subst_arg.user=user;
00456     subst_arg.job=&job_id;
00457     subst_arg.reason="new";
00458     // run external plugin to acquire non-unix local credentials
00459     if(!cred_plugin->run(job_subst,&subst_arg)) {
00460       olog << "Failed to run plugin" << std::endl;
00461       delete_job_id();
00462       failure_type_=ARexJobInternalError;
00463       error_description="Failed to obtain external credentials";
00464       return 1;
00465     };
00466     if(cred_plugin->result() != 0) {
00467       olog << "Plugin failed: " << cred_plugin->result() << std::endl;
00468       delete_job_id();
00469       error_description="Failed to obtain external credentials";
00470       failure_type_=ARexJobInternalError;
00471       return 1;
00472     };
00473   };
00474 */
00475   // Create session directory
00476   if(!job_session_create(job,*config_.User())) {
00477     delete_job_id();
00478     failure_="Failed to create session directory";
00479     failure_type_=ARexJobInternalError;
00480     return;
00481   };
00482   // Create status file (do it last so GM picks job up here)
00483   if(!job_state_write_file(job,*config_.User(),JOB_STATE_ACCEPTED)) {
00484     delete_job_id();
00485     failure_="Failed registering job in grid-manager";
00486     failure_type_=ARexJobInternalError;
00487     return;
00488   };
00489   SignalFIFO(*config_.User());
00490   return;
00491 }
00492 
00493 bool ARexJob::GetDescription(Arc::XMLNode& jsdl) {
00494   if(id_.empty()) return false;
00495   std::string sdesc;
00496   if(!job_description_read_file(id_,*config_.User(),sdesc)) return false;
00497   Arc::XMLNode xdesc(sdesc);
00498   if(!xdesc) return false;
00499   jsdl.Replace(xdesc);
00500   return true;
00501 }
00502 
00503 bool ARexJob::Cancel(void) {
00504   if(id_.empty()) return false;
00505   JobDescription job_desc(id_,"");
00506   if(!job_cancel_mark_put(job_desc,*config_.User())) return false;
00507   return true;
00508 }
00509 
00510 bool ARexJob::Clean(void) {
00511   if(id_.empty()) return false;
00512   JobDescription job_desc(id_,"");
00513   if(!job_clean_mark_put(job_desc,*config_.User())) return false;
00514   return true;
00515 }
00516 
00517 bool ARexJob::Resume(void) {
00518   if(id_.empty()) return false;
00519   if(job_.failedstate.length() == 0) {
00520     // Job can't be restarted.
00521     return false;
00522   };
00523   if(job_.reruns <= 0) {
00524     // Job run out of number of allowed retries.
00525     return false;
00526   };
00527   if(!job_restart_mark_put(JobDescription(id_,""),*config_.User())) {
00528     // Failed to report restart request.
00529     return false;
00530   };
00531   return true;
00532 }
00533 
00534 std::string ARexJob::State(void) {
00535   bool job_pending;
00536   return State(job_pending);
00537 }
00538 
00539 std::string ARexJob::State(bool& job_pending) {
00540   if(id_.empty()) return "";
00541   job_state_t state = job_state_read_file(id_,*config_.User(),job_pending);
00542   if(state > JOB_STATE_UNDEFINED) state=JOB_STATE_UNDEFINED;
00543   return states_all[state].name;
00544 }
00545 
00546 bool ARexJob::Failed(void) {
00547   if(id_.empty()) return false;
00548   return job_failed_mark_check(id_,*config_.User());
00549 }
00550 
00551 bool ARexJob::UpdateCredentials(const std::string& credentials) {
00552   if(id_.empty()) return false;
00553   if(!update_credentials(credentials)) return false;
00554   JobDescription job(id_,config_.User()->SessionRoot(id_)+"/"+id_,JOB_STATE_ACCEPTED);
00555   if(!job_local_write_file(job,*config_.User(),job_)) return false;
00556   return true;
00557 }
00558 
00559 bool ARexJob::update_credentials(const std::string& credentials) {
00560   if(credentials.empty()) return true;
00561   std::string fname=config_.User()->ControlDir()+"/job."+id_+".proxy";
00562   ::unlink(fname.c_str());
00563   int h=::open(fname.c_str(),O_WRONLY | O_CREAT | O_EXCL,0600);
00564   if(h == -1) return false;
00565   fix_file_owner(fname,*config_.User());
00566   const char* s = credentials.c_str();
00567   int ll = credentials.length();
00568   int l = 0;
00569   for(;(ll>0) && (l!=-1);s+=l,ll-=l) l=::write(h,s,ll);
00570   ::close(h);
00571   if(l==-1) return false;
00572   job_.expiretime = Arc::Credential(fname,"","","").GetEndTime();
00573   return true;
00574 }
00575 
00576 /*
00577 bool ARexJob::make_job_id(const std::string &id) {
00578   if((id.find('/') != std::string::npos) || (id.find('\n') != std::string::npos)) {
00579     olog<<"ID contains forbidden characters"<<std::endl;
00580     return false;
00581   };
00582   if((id == "new") || (id == "info")) return false;
00583   // claim id by creating empty description file
00584   std::string fname=user->ControlDir()+"/job."+id+".description";
00585   struct stat st;
00586   if(stat(fname.c_str(),&st) == 0) return false;
00587   int h = ::open(fname.c_str(),O_RDWR | O_CREAT | O_EXCL,S_IRWXU);
00588   // So far assume control directory is on local fs.
00589   // TODO: add locks or links for NFS
00590   if(h == -1) return false;
00591   fix_file_owner(fname,*user);
00592   close(h);
00593   delete_job_id();
00594   job_id=id;
00595   return true;
00596 }
00597 */
00598 
00599 bool ARexJob::make_job_id(void) {
00600   if(!config_) return false;
00601   int i;
00602   //@ delete_job_id();
00603   for(i=0;i<100;i++) {
00604     id_=Arc::tostring((unsigned int)getpid())+
00605         Arc::tostring((unsigned int)time(NULL))+
00606         Arc::tostring(rand(),1);
00607     std::string fname=config_.User()->ControlDir()+"/job."+id_+".description";
00608     struct stat st;
00609     if(stat(fname.c_str(),&st) == 0) continue;
00610     int h = ::open(fname.c_str(),O_RDWR | O_CREAT | O_EXCL,0600);
00611     // So far assume control directory is on local fs.
00612     // TODO: add locks or links for NFS
00613     int err = errno;
00614     if(h == -1) {
00615       if(err == EEXIST) continue;
00616       logger_.msg(Arc::ERROR, "Failed to create file in %s", config_.User()->ControlDir());
00617       id_="";
00618       return false;
00619     };
00620     fix_file_owner(fname,*config_.User());
00621     close(h);
00622     return true;
00623   };
00624   logger_.msg(Arc::ERROR, "Out of tries while allocating new job id in %s", config_.User()->ControlDir());
00625   id_="";
00626   return false;
00627 }
00628 
00629 bool ARexJob::delete_job_id(void) {
00630   if(!config_) return true;
00631   if(!id_.empty()) {
00632     job_clean_final(JobDescription(id_,
00633                 config_.User()->SessionRoot(id_)+"/"+id_),*config_.User());
00634     id_="";
00635   };
00636   return true;
00637 }
00638 
00639 int ARexJob::TotalJobs(ARexGMConfig& config,Arc::Logger& logger) {
00640   ContinuationPlugins plugins;
00641   JobsList jobs(*config.User(),plugins);
00642   jobs.ScanNewJobs();
00643   return jobs.size();
00644 }
00645 
00646 std::list<std::string> ARexJob::Jobs(ARexGMConfig& config,Arc::Logger& logger) {
00647   std::list<std::string> jlist;
00648   ContinuationPlugins plugins;
00649   JobsList jobs(*config.User(),plugins);
00650   jobs.ScanNewJobs();
00651   JobsList::iterator i = jobs.begin();
00652   for(;i!=jobs.end();++i) {
00653     ARexJob job(i->get_id(),config,logger,true);
00654     if(job) jlist.push_back(i->get_id());
00655   };
00656   return jlist;
00657 }
00658 
00659 std::string ARexJob::SessionDir(void) {
00660   if(id_.empty()) return "";
00661   return config_.User()->SessionRoot(id_)+"/"+id_;
00662 }
00663 
00664 std::string ARexJob::LogDir(void) {
00665   return job_.stdlog;
00666 }
00667 
00668 static bool normalize_filename(std::string& filename) {
00669   std::string::size_type p = 0;
00670   if(filename[0] != G_DIR_SEPARATOR) filename.insert(0,G_DIR_SEPARATOR_S);
00671   for(;p != std::string::npos;) {
00672     if((filename[p+1] == '.') &&
00673        (filename[p+2] == '.') &&
00674        ((filename[p+3] == 0) || (filename[p+3] == G_DIR_SEPARATOR))
00675       ) {
00676       std::string::size_type pr = std::string::npos;
00677       if(p > 0) pr = filename.rfind(G_DIR_SEPARATOR,p-1);
00678       if(pr == std::string::npos) return false;
00679       filename.erase(pr,p-pr+3);
00680       p=pr;
00681     } else if((filename[p+1] == '.') && (filename[p+2] == G_DIR_SEPARATOR)) {
00682       filename.erase(p,2);
00683     } else if(filename[p+1] == G_DIR_SEPARATOR) {
00684       filename.erase(p,1);
00685     };
00686     p = filename.find(G_DIR_SEPARATOR,p+1);
00687   };
00688   if(!filename.empty()) filename.erase(0,1); // removing leading separator
00689   return true;
00690 }
00691 
00692 int ARexJob::CreateFile(const std::string& filename) {
00693   if(id_.empty()) return -1;
00694   std::string fname = filename;
00695   if((!normalize_filename(fname)) || (fname.empty())) {
00696     failure_="File name is not acceptable";
00697     failure_type_=ARexJobInternalError;
00698     return -1;
00699   };
00700   int lname = fname.length();
00701   fname = config_.User()->SessionRoot(id_)+"/"+id_+"/"+fname;
00702   // First try to create/open file
00703   int h = Arc::FileOpen(fname.c_str(),O_WRONLY | O_CREAT,config_.User()->get_uid(),config_.User()->get_gid(),S_IRUSR | S_IWUSR);
00704   if(h != -1) return h;
00705   if(errno != ENOENT) return -1;
00706   // If open reports missing directory - try to create all sudirectories
00707   std::string::size_type n = fname.length()-lname;
00708   for(;;) {
00709     n=fname.find('/',n);
00710     if(n == std::string::npos) break;
00711     std::string dname = fname.substr(0,n);
00712     ++n;
00713     if(Arc::DirCreate(dname.c_str(),config_.User()->get_uid(),config_.User()->get_gid(),S_IRUSR | S_IWUSR | S_IXUSR)) continue;
00714     if(errno == EEXIST) continue;
00715   };
00716   return Arc::FileOpen(fname.c_str(),O_WRONLY | O_CREAT,config_.User()->get_uid(),config_.User()->get_gid(),S_IRUSR | S_IWUSR);
00717 }
00718 
00719 int ARexJob::OpenFile(const std::string& filename,bool for_read,bool for_write) {
00720   if(id_.empty()) return -1;
00721   std::string fname = filename;
00722   if((!normalize_filename(fname)) || (fname.empty())) {
00723     failure_="File name is not acceptable";
00724     failure_type_=ARexJobInternalError;
00725     return -1;
00726   };
00727   fname = config_.User()->SessionRoot(id_)+"/"+id_+"/"+fname;
00728   int flags = 0;
00729   if(for_read && for_write) { flags=O_RDWR; }
00730   else if(for_read) { flags=O_RDONLY; }
00731   else if(for_write) { flags=O_WRONLY; }
00732   return Arc::FileOpen(fname.c_str(),flags,config_.User()->get_uid(),config_.User()->get_gid(),0);
00733 }
00734 
00735 Glib::Dir* ARexJob::OpenDir(const std::string& dirname) {
00736   if(id_.empty()) return NULL;
00737   std::string dname = dirname;
00738   if(!normalize_filename(dname)) return NULL;
00739   //if(dname.empty()) return NULL;
00740   dname = config_.User()->SessionRoot(id_)+"/"+id_+"/"+dname;
00741   Glib::Dir* dir = Arc::DirOpen(dname.c_str(),config_.User()->get_uid(),config_.User()->get_gid());
00742   return dir;
00743 }
00744 
00745 int ARexJob::OpenLogFile(const std::string& name) {
00746   if(id_.empty()) return -1;
00747   if(strchr(name.c_str(),'/')) return -1;
00748   std::string fname = config_.User()->ControlDir() + "/job." + id_ + "." + name;
00749   return Arc::FileOpen(fname.c_str(),O_RDONLY,0,0,0);
00750 }
00751 
00752 std::list<std::string> ARexJob::LogFiles(void) {
00753   std::list<std::string> logs;
00754   if(id_.empty()) return logs;
00755   std::string dname = config_.User()->ControlDir();
00756   std::string prefix = "job." + id_ + ".";
00757   Glib::Dir* dir = Arc::DirOpen(dname.c_str(),config_.User()->get_uid(),config_.User()->get_gid());
00758   if(!dir) return logs;
00759   for(;;) {
00760     std::string name = dir->read_name();
00761     if(name.empty()) break;
00762     if(strncmp(prefix.c_str(),name.c_str(),prefix.length()) != 0) continue;
00763     logs.push_back(name.substr(prefix.length())); 
00764   };
00765   return logs;
00766 }
00767 
00768 std::string ARexJob::GetFilePath(const std::string& filename) {
00769   if(id_.empty()) return "";
00770   std::string fname = filename;
00771   if(!normalize_filename(fname)) return "";
00772   if(fname.empty()) config_.User()->SessionRoot(id_)+"/"+id_;
00773   return config_.User()->SessionRoot(id_)+"/"+id_+"/"+fname;
00774 }
00775 
00776 std::string ARexJob::GetLogFilePath(const std::string& name) {
00777   if(id_.empty()) return "";
00778   return config_.User()->ControlDir() + "/job." + id_ + "." + name;
00779 }
00780 
00781 bool ARexJob::ChooseSessionDir(const std::string& jobid, std::string& sessiondir) {
00782   if (config_.SessionRootsNonDraining().size() == 0) {
00783     // no active session dirs available
00784     logger_.msg(Arc::ERROR, "No non-draining session dirs available");
00785     return false;
00786   }
00787   // choose randomly from non-draining session dirs
00788   sessiondir = config_.SessionRootsNonDraining().at(rand() % config_.SessionRootsNonDraining().size());
00789   return true;
00790 }