Back to index

nordugrid-arc-nox  1.1.0~rc6
downloader.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 /*
00005   Download files specified in job.ID.input and check if user uploaded files.
00006   Additionally check if this is a migrated job and if so kill the job on old cluster.
00007   result: 0 - ok, 1 - unrecoverable error, 2 - potentially recoverable,
00008   3 - certificate error, 4 - should retry.
00009 */
00010 #include <sys/types.h>
00011 #include <sys/stat.h>
00012 #include <unistd.h>
00013 #include <fcntl.h>
00014 #include <pwd.h>
00015 #include <errno.h>
00016 
00017 #include <arc/XMLNode.h>
00018 #include <arc/client/Job.h>
00019 #include <arc/client/JobController.h>
00020 #include <arc/UserConfig.h>
00021 #include <arc/data/CheckSum.h>
00022 #include <arc/data/FileCache.h>
00023 #include <arc/data/DataHandle.h>
00024 #include <arc/data/DataMover.h>
00025 #include <arc/message/MCC.h>
00026 #include <arc/StringConv.h>
00027 #include <arc/Thread.h>
00028 #include <arc/URL.h>
00029 #include <arc/Utils.h>
00030 
00031 #include "../jobs/job.h"
00032 #include "../jobs/users.h"
00033 #include "../files/info_types.h"
00034 #include "../files/info_files.h"
00035 #include "../files/delete.h"
00036 #include "../conf/environment.h"
00037 #include "../misc/proxy.h"
00038 #include "../conf/conf_map.h"
00039 #include "../conf/conf_cache.h"
00040 #include "janitor.h"
00041 
00042 static Arc::Logger logger(Arc::Logger::getRootLogger(), "Downloader");
00043 
00044 /* check for user uploaded files every 60 seconds */
00045 #define CHECK_PERIOD 60
00046 /* maximum number of retries (for every source/destination) */
00047 #define MAX_RETRIES 5
00048 /* maximum number simultaneous downloads */
00049 #define MAX_DOWNLOADS 5
00050 /* maximum time for user files to upload (per file) */
00051 #define MAX_USER_TIME 600
00052 
00053 class PointPair;
00054 
00055 static void CollectCredentials(std::string& proxy,std::string& cert,std::string& key,std::string& cadir) {
00056   proxy=Arc::GetEnv("X509_USER_PROXY");
00057   if(proxy.empty()) {
00058     cert=Arc::GetEnv("X509_USER_CERT");
00059     key=Arc::GetEnv("X509_USER_KEY");
00060   };
00061   if(proxy.empty() && cert.empty()) {
00062     proxy="/tmp/x509_up"+Arc::tostring(getuid());
00063   };
00064   cadir=Arc::GetEnv("X509_CERT_DIR");
00065   if(cadir.empty()) cadir="/etc/grid-security/certificates";
00066 }
00067 
00068 class FileDataEx : public FileData {
00069  public:
00070   typedef std::list<FileDataEx>::iterator iterator;
00071   Arc::DataStatus res;
00072   PointPair* pair;
00073   FileDataEx(const FileData& f)
00074     : FileData(f),
00075       res(Arc::DataStatus::Success),
00076       pair(NULL) {}
00077   FileDataEx(const FileData& f, Arc::DataStatus r)
00078     : FileData(f),
00079       res(r),
00080       pair(NULL) {}
00081 };
00082 
00083 static std::list<FileData> job_files_;
00084 static std::list<FileDataEx> job_files;
00085 static std::list<FileDataEx> processed_files;
00086 static std::list<FileDataEx> failed_files;
00087 static Arc::SimpleCondition pair_condition;
00088 static int pairs_initiated = 0;
00089 
00090 class SimpleConditionLock {
00091  private:
00092   Arc::SimpleCondition& cond_;
00093  public:
00094   SimpleConditionLock(Arc::SimpleCondition& cond):cond_(cond) {
00095     cond_.lock();
00096   };
00097   ~SimpleConditionLock(void) {
00098     cond_.unlock();
00099   };
00100 };
00101 
00102 int clean_files(std::list<FileData> &job_files,char* session_dir) {
00103   std::string session(session_dir);
00104   /* delete only downloadable files, let user manage his/hers files */
00105   std::list<FileData> tmp;
00106   if(delete_all_files(session,job_files,false,true,false) != 2) return 0;
00107   return 1;
00108 }
00109 
00110 /*
00111    Check for existence of user uploadable file
00112    returns 0 if file exists
00113            1 - it is not proper file or other error
00114            2 - not here yet
00115 */
00116 int user_file_exists(FileData &dt,char* session_dir,std::string* error = NULL) {
00117   struct stat st;
00118   const char *str = dt.lfn.c_str();
00119   if(strcmp(str,"*.*") == 0) return 0; /* do not wait for this file */
00120   std::string fname=std::string(session_dir) + '/' + dt.pfn;
00121   /* check if file does exist at all */
00122   if(lstat(fname.c_str(),&st) != 0) return 2;
00123   /* check for misconfiguration */
00124   /* parse files information */
00125   char *str_;
00126   unsigned long long int fsize;
00127   unsigned long long int fsum = (unsigned long long int)(-1);
00128   bool have_size = false;
00129   bool have_checksum = false;
00130   errno = 0;
00131   fsize = strtoull(str,&str_,10);
00132   if((*str_) == '.') {
00133     if(str_ != str) have_size=true;
00134     str=str_+1;
00135     fsum = strtoull(str,&str_,10);
00136     if((*str_) != 0) {
00137       logger.msg(Arc::ERROR, "Invalid checksum in %s for %s", dt.lfn, dt.pfn);
00138       if(error) (*error)="Bad information about file: checksum can't be parsed.";
00139       return 1;
00140     };
00141     have_checksum=true;
00142   }
00143   else {
00144     if(str_ != str) have_size=true;
00145     if((*str_) != 0) {
00146       logger.msg(Arc::ERROR, "Invalid file size in %s for %s ", dt.lfn, dt.pfn);
00147       if(error) (*error)="Bad information about file: size can't be parsed.";
00148       return 1;
00149     };
00150   };
00151   if(S_ISDIR(st.st_mode)) {
00152     if(have_size || have_checksum) {
00153       if(error) (*error)="Expected file. Directory found.";
00154       return 1;
00155     };
00156     return 0;
00157   };
00158   if(!S_ISREG(st.st_mode)) {
00159     if(error) (*error)="Expected ordinary file. Special object found.";
00160     return 1;
00161   };
00162   /* now check if proper size */
00163   if(have_size) {
00164     if(st.st_size < fsize) return 2;
00165     if(st.st_size > fsize) {
00166       logger.msg(Arc::ERROR, "Invalid file: %s is too big.", dt.pfn);
00167       if(error) (*error)="Delivered file is bigger than specified.";
00168       return 1; /* too big file */
00169     };
00170   };
00171   if(have_checksum) {
00172     int h=open(fname.c_str(),O_RDONLY);
00173     if(h==-1) { /* if we can't read that file job won't too */
00174       logger.msg(Arc::ERROR, "Error accessing file %s", dt.pfn);
00175       if(error) (*error)="Delivered file is unreadable.";
00176       return 1;
00177     };
00178     Arc::CRC32Sum crc;
00179     char buffer[1024];
00180     ssize_t l;
00181     size_t ll = 0;
00182     for(;;) {
00183       if((l=read(h,buffer,1024)) == -1) {
00184         logger.msg(Arc::ERROR, "Error reading file %s", dt.pfn);
00185         if(error) (*error)="Could not read file to compute checksum.";
00186         return 1;
00187       };
00188       if(l==0) break; ll+=l;
00189       crc.add(buffer,l);
00190     };
00191     close(h);
00192     crc.end();
00193     if(fsum != crc.crc()) {
00194       if(have_size) { /* size was checked - it is an error to have wrong crc */
00195         logger.msg(Arc::ERROR, "File %s has wrong CRC.", dt.pfn);
00196         if(error) (*error)="Delivered file has wrong checksum.";
00197         return 1;
00198       };
00199       return 2; /* just not uploaded yet */
00200     };
00201   };
00202   return 0; /* all checks passed - file is ok */
00203 }
00204 
00205 class PointPair {
00206  private:
00207   Arc::URL source_url;
00208   Arc::URL destination_url;
00209  public:
00210   Arc::DataHandle source;
00211   Arc::DataHandle destination;
00212   PointPair(const std::string& source_str, const std::string& destination_str,
00213       const Arc::UserConfig& usercfg)
00214     : source_url(source_str),
00215       destination_url(destination_str),
00216       source(source_url, usercfg),
00217       destination(destination_url, usercfg) {};
00218   ~PointPair(void) {};
00219   static void callback(Arc::DataMover*,Arc::DataStatus res,void* arg) {
00220     FileDataEx::iterator &it = *((FileDataEx::iterator*)arg);
00221     pair_condition.lock();
00222     if(!res.Passed()) {
00223       it->res=res;
00224       logger.msg(Arc::ERROR, "Failed downloading file %s - %s", it->lfn, std::string(res));
00225       if((it->pair->source->GetTries() <= 0) || (it->pair->destination->GetTries() <= 0)) {
00226         delete it->pair; it->pair=NULL;
00227         failed_files.push_back(*it);
00228       } else {
00229         job_files.push_back(*it);
00230         logger.msg(Arc::ERROR, "Retrying");
00231       };
00232     } else {
00233       logger.msg(Arc::INFO, "Downloaded file %s", it->lfn);
00234       delete it->pair; it->pair=NULL;
00235       processed_files.push_back(*it);
00236     };
00237     job_files.erase(it);
00238     --pairs_initiated;
00239     pair_condition.signal_nonblock();
00240     pair_condition.unlock();
00241     delete &it;
00242   };
00243 };
00244 
00245 int main(int argc,char** argv) {
00246   Arc::LogStream logcerr(std::cerr);
00247   Arc::Logger::getRootLogger().addDestination(logcerr);
00248   Arc::Logger::getRootLogger().setThreshold(Arc::VERBOSE);
00249   int res=0;
00250   bool not_uploaded;
00251   time_t start_time=time(NULL);
00252   time_t upload_timeout = 0;
00253   int n_threads = 1;
00254   int n_files = MAX_DOWNLOADS;
00255   /* if != 0 - change owner of downloaded files to this user */
00256   std::string file_owner_username = "";
00257   uid_t file_owner = 0;
00258   gid_t file_group = 0;
00259   bool use_conf_cache=false;
00260   unsigned long long int min_speed = 0;
00261   time_t min_speed_time = 300;
00262   unsigned long long int min_average_speed = 0;
00263   time_t max_inactivity_time = 300;
00264   bool secure = true;
00265   bool userfiles_only = false;
00266   bool passive = false;
00267   std::string failure_reason("");
00268   std::string x509_proxy, x509_cert, x509_key, x509_cadir;
00269   srand(time(NULL) + getpid()); 
00270 
00271   // process optional arguments
00272   for(;;) {
00273     opterr=0;
00274     int optc=getopt(argc,argv,"+hclpfC:n:t:n:u:U:s:S:a:i:d:");
00275     if(optc == -1) break;
00276     switch(optc) {
00277       case 'h': {
00278         std::cerr<<"Usage: downloader [-hclpf] [-C conf_file] [-n files] [-t threads] [-U uid]"<<std::endl;
00279         std::cerr<<"                  [-u username] [-s min_speed] [-S min_speed_time]"<<std::endl;
00280         std::cerr<<"                  [-a min_average_speed] [-i min_activity_time]"<<std::endl;
00281         std::cerr<<"                  [-d debug_level] job_id control_directory"<<std::endl;
00282         std::cerr<<"                  session_directory [cache options]"<<std::endl;
00283         exit(1);
00284       }; break;
00285       case 'c': {
00286         secure=false;
00287       }; break;
00288       case 'C': {
00289         nordugrid_config_loc(optarg);
00290       }; break;
00291       case 'l': {
00292         userfiles_only=true;
00293       }; break;
00294       case 'p': {
00295         passive=true;
00296       }; break;
00297       case 'f': {
00298         use_conf_cache=true;
00299       }; break;
00300       case 'd': {
00301         Arc::Logger::getRootLogger().setThreshold(Arc::string_to_level(optarg));
00302       }; break;
00303       case 't': {
00304         n_threads=atoi(optarg);
00305         if(n_threads < 1) {
00306           logger.msg(Arc::ERROR, "Wrong number of threads: %s", optarg); exit(1);
00307         };
00308       }; break;
00309       case 'n': {
00310         n_files=atoi(optarg);
00311         if(n_files < 1) {
00312           logger.msg(Arc::ERROR, "Wrong number of files: %s", optarg); exit(1);
00313         };
00314       }; break;
00315       case 'U': {
00316         unsigned int tuid;
00317         if(!Arc::stringto(std::string(optarg),tuid)) {
00318           logger.msg(Arc::ERROR, "Bad number: %s", optarg); exit(1);
00319         };
00320         struct passwd pw_;
00321         struct passwd *pw;
00322         char buf[BUFSIZ];
00323         getpwuid_r(tuid,&pw_,buf,BUFSIZ,&pw);
00324         if(pw == NULL) {
00325           logger.msg(Arc::ERROR, "Wrong user name"); exit(1);
00326         };
00327         file_owner=pw->pw_uid;
00328         file_group=pw->pw_gid;
00329         if(pw->pw_name) file_owner_username=pw->pw_name;
00330         if((getuid() != 0) && (getuid() != file_owner)) {
00331           logger.msg(Arc::ERROR, "Specified user can't be handled"); exit(1);
00332         };
00333       }; break;
00334       case 'u': {
00335         struct passwd pw_;
00336         struct passwd *pw;
00337         char buf[BUFSIZ];
00338         getpwnam_r(optarg,&pw_,buf,BUFSIZ,&pw);
00339         if(pw == NULL) {
00340           logger.msg(Arc::ERROR, "Wrong user name"); exit(1);
00341         };
00342         file_owner=pw->pw_uid;
00343         file_group=pw->pw_gid;
00344         if(pw->pw_name) file_owner_username=pw->pw_name;
00345         if((getuid() != 0) && (getuid() != file_owner)) {
00346           logger.msg(Arc::ERROR, "Specified user can't be handled"); exit(1);
00347         };
00348       }; break;
00349       case 's': {
00350         unsigned int tmpi;
00351         if(!Arc::stringto(std::string(optarg),tmpi)) {
00352           logger.msg(Arc::ERROR, "Bad number: %s", optarg); exit(1);
00353         };
00354         min_speed=tmpi;
00355       }; break;
00356       case 'S': {
00357         unsigned int tmpi;
00358         if(!Arc::stringto(std::string(optarg),tmpi)) {
00359           logger.msg(Arc::ERROR, "Bad number: %s", optarg); exit(1);
00360         };
00361         min_speed_time=tmpi;
00362       }; break;
00363       case 'a': {
00364         unsigned int tmpi;
00365         if(!Arc::stringto(std::string(optarg),tmpi)) {
00366           logger.msg(Arc::ERROR, "Bad number: %s", optarg); exit(1);
00367         };
00368         min_average_speed=tmpi;
00369       }; break;
00370       case 'i': {
00371         unsigned int tmpi;
00372         if(!Arc::stringto(std::string(optarg),tmpi)) {
00373           logger.msg(Arc::ERROR, "Bad number: %s", optarg); exit(1);
00374         };
00375         max_inactivity_time=tmpi;
00376       }; break;
00377       case '?': {
00378         logger.msg(Arc::ERROR, "Unsupported option: %c", (char)optopt);
00379         exit(1);
00380       }; break;
00381       case ':': {
00382         logger.msg(Arc::ERROR, "Missing parameter for option %c", (char)optopt);
00383         exit(1);
00384       }; break;
00385       default: {
00386         logger.msg(Arc::ERROR, "Undefined processing error");
00387         exit(1);
00388       };
00389     };
00390   };
00391   // process required arguments
00392   char * id = argv[optind+0];
00393   if(!id) { logger.msg(Arc::ERROR, "Missing job id"); return 1; };
00394   char* control_dir = argv[optind+1];
00395   if(!control_dir) { logger.msg(Arc::ERROR, "Missing control directory"); return 1; };
00396   char* session_dir = argv[optind+2];
00397   if(!session_dir) { logger.msg(Arc::ERROR, "Missing session directory"); return 1; };
00398 
00399   read_env_vars();
00400   // prepare Job and User descriptions (needed for substitutions in cache dirs)
00401   JobDescription desc(id,session_dir);
00402   uid_t uid;
00403   gid_t gid;
00404   if(file_owner != 0) { uid=file_owner; }
00405   else { uid= getuid(); };
00406   if(file_group != 0) { gid=file_group; }
00407   else { gid= getgid(); };
00408   desc.set_uid(uid,gid);
00409   JobUser user(uid);
00410   user.SetControlDir(control_dir);
00411   user.SetSessionRoot(session_dir);
00412 
00413   // if u or U option not set, use our username
00414   if (file_owner_username == "") {
00415     struct passwd pw_;
00416     struct passwd *pw;
00417     char buf[BUFSIZ];
00418     getpwuid_r(getuid(),&pw_,buf,BUFSIZ,&pw);
00419     if(pw == NULL) {
00420       logger.msg(Arc::ERROR, "Wrong user name"); exit(1);
00421     }
00422     if(pw->pw_name) file_owner_username=pw->pw_name;
00423   }
00424 
00425   Arc::FileCache * cache;
00426 
00427   if (use_conf_cache) {
00428     try {
00429       CacheConfig * cache_config = new CacheConfig(std::string(file_owner_username));
00430       user.SetCacheParams(cache_config);
00431       cache = new Arc::FileCache(cache_config->getCacheDirs(),
00432                                  cache_config->getRemoteCacheDirs(),
00433                                  cache_config->getDrainingCacheDirs(),
00434                                  std::string(id), uid, gid,
00435                                  cache_config->getCacheMax(),
00436                                  cache_config->getCacheMin());
00437       if (!(cache_config->getCacheDirs().size() == 0) && !(*cache)) {
00438         logger.msg(Arc::ERROR, "Error creating cache");
00439         delete cache;
00440         exit(1);
00441       }
00442     }
00443     catch (CacheConfigException e) {
00444       logger.msg(Arc::ERROR, "Error with cache configuration: %s", e.what());
00445       delete cache;
00446       exit(1);
00447     }
00448   }
00449   else if(argv[optind+3]) {
00450     std::string cache_path = argv[optind+3];
00451     if(argv[optind+4])
00452       cache_path += " "+std::string(argv[optind+4]);
00453     cache = new Arc::FileCache(cache_path, std::string(id), uid, gid);
00454     if (!(*cache)) {
00455       logger.msg(Arc::ERROR, "Error creating cache");
00456       delete cache;
00457       exit(1);
00458     }
00459   }
00460   else {
00461     // if no cache defined, use null cache
00462     cache = new Arc::FileCache();
00463   }
00464   
00465   if(min_speed != 0)
00466     logger.msg(Arc::VERBOSE, "Minimal speed: %llu B/s during %i s", min_speed, min_speed_time);
00467   if(min_average_speed != 0)
00468     logger.msg(Arc::VERBOSE, "Minimal average speed: %llu B/s", min_average_speed);
00469   if(max_inactivity_time != 0)
00470     logger.msg(Arc::VERBOSE, "Maximal inactivity time: %i s", max_inactivity_time);
00471 
00472   CollectCredentials(x509_proxy,x509_cert,x509_key,x509_cadir);
00473   prepare_proxy();
00474 
00475   if(n_threads > 10) {
00476     logger.msg(Arc::WARNING, "Won't use more than 10 threads");
00477     n_threads=10;
00478   };
00479 /*
00480  !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!  Add this to DataMove !!!!!!!!!!!!
00481 */
00482   UrlMapConfig url_map;
00483   logger.msg(Arc::INFO, "Downloader started");
00484 
00485   Janitor janitor(desc.get_id(),user.ControlDir());
00486 
00487   Arc::UserConfig usercfg(Arc::initializeCredentialsType(Arc::initializeCredentialsType::TryCredentials));
00488   usercfg.UtilsDirPath(control_dir);
00489 
00490   Arc::DataMover mover;
00491   mover.retry(true);
00492   mover.secure(secure);
00493   mover.passive(passive);
00494   mover.verbose(true); // statistics will be shown if logging is higher than VERBOSE
00495   if(min_speed != 0)
00496     mover.set_default_min_speed(min_speed,min_speed_time);
00497   if(min_average_speed != 0)
00498     mover.set_default_min_average_speed(min_average_speed);
00499   if(max_inactivity_time != 0)
00500     mover.set_default_max_inactivity_time(max_inactivity_time);
00501   bool transfered = true;
00502   bool credentials_expired = false;
00503   std::list<FileData> output_files;
00504 
00505   if(!job_input_read_file(desc.get_id(),user,job_files_)) {
00506     failure_reason+="Internal error in downloader\n";
00507     logger.msg(Arc::ERROR, "Can't read list of input files"); res=1; goto exit;
00508   };
00509   // check for duplicates (see bug 1285)
00510   for (std::list<FileData>::iterator i = job_files_.begin(); i != job_files_.end(); i++) {
00511     for (std::list<FileData>::iterator j = job_files_.begin(); j != job_files_.end(); j++) {
00512       if (i != j && j->pfn == i->pfn) {
00513         failure_reason+="Duplicate input files\n";
00514         logger.msg(Arc::ERROR, "Error: duplicate file in list of input files: %s", i->pfn);
00515         res=1;
00516         goto exit;
00517       }
00518     }
00519   }
00520   // check if any input files are also output files downloadable by user (bug 1387)
00521   if(job_output_read_file(desc.get_id(),user,output_files)) {
00522     for (std::list<FileData>::iterator j = output_files.begin(); j != output_files.end(); j++) {
00523       for (std::list<FileData>::iterator i = job_files_.begin(); i != job_files_.end(); i++) {
00524         if (i->pfn == j->pfn && j->lfn.empty() && i->lfn.find(':') != std::string::npos) {
00525           Arc::URL u(i->lfn);
00526           std::string opt = u.Option("cache");
00527           // don't add copy option if exists or current option is "no" or "renew"
00528           if (opt.empty() || !(opt == "no" || opt == "renew" || opt == "copy")) {
00529             u.AddOption("cache", "copy", true);
00530             i->lfn = u.fullstr();
00531           }
00532         }
00533       }
00534     }
00535   }
00536   else
00537     logger.msg(Arc::WARNING, "Can't read list of output files");
00538 
00539   // remove bad files
00540   if(clean_files(job_files_,session_dir) != 0) {
00541     failure_reason+="Internal error in downloader\n";
00542     logger.msg(Arc::ERROR, "Can't remove junk files"); res=1; goto exit;
00543   };
00544   for(std::list<FileData>::iterator i = job_files_.begin();i!=job_files_.end();++i) {
00545     job_files.push_back(*i);
00546   };
00547 
00548   if(!desc.GetLocalDescription(user)) {
00549     failure_reason+="Internal error in downloader\n";
00550     logger.msg(Arc::ERROR, "Can't read job local description"); res=1; goto exit;
00551   };
00552 
00553   // Start janitor in parallel
00554   if(!janitor) {
00555     if (!janitor.enabled()) {
00556       logger.msg(Arc::VERBOSE,"Not invoking janitor because it's not enabled in the config file");
00557     };
00558     if(desc.get_local()->rtes > 0) {
00559       failure_reason+="Non-existing RTE(s) requested\n";
00560       if (!janitor.enabled()) {
00561         logger.msg(Arc::ERROR, "Janitor not enabled and job contains non-deployed RTEs");
00562       } else {
00563         logger.msg(Arc::ERROR, "Janitor not installed and job contains non-deployed RTEs");
00564       };
00565       res=1; goto exit;
00566     };
00567   } else {
00568     if(!janitor.deploy()) {
00569       failure_reason+="The Janitor failed\n";
00570       logger.msg(Arc::ERROR, "Failed to deploy Janitor"); res=1; goto exit;
00571     };
00572   };
00573 
00574   // initialize structures to handle download
00575   /* TODO: add threads=# to all urls if n_threads!=1 */
00576   // Compute wait time for user files
00577   for(FileDataEx::iterator i=job_files.begin();i!=job_files.end();++i) {
00578     if(i->lfn.find(":") == std::string::npos) { /* is it lfn ? */
00579       upload_timeout+=MAX_USER_TIME;
00580     };
00581   };
00582   // Main download cycle
00583   if(!userfiles_only) for(;;) {
00584     // Initiate transfers
00585     int n = 0;
00586     SimpleConditionLock local_lock(pair_condition);
00587     for(FileDataEx::iterator i=job_files.begin();i!=job_files.end();++i) {
00588       if(i->lfn.find(":") != std::string::npos) { /* is it lfn ? */
00589         ++n;
00590         if(n <= pairs_initiated) continue; // skip files being processed
00591         if(n > n_files) break; // quit if not allowed to process more
00592         /* have place and file to download */
00593         std::string destination=std::string("file://") + session_dir + i->pfn;
00594         std::string source = i->lfn;
00595         if(i->pair == NULL) {
00596           /* define place to store */
00597           if(strncasecmp(source.c_str(),"file:/",6) == 0) {
00598             failure_reason+=std::string("User requested local input file ")+source.c_str()+"\n";
00599             logger.msg(Arc::ERROR, "Local source for download: %s", source); res=1; goto exit;
00600           };
00601           PointPair* pair = new PointPair(source,destination,usercfg);
00602           if(!(pair->source)) {
00603             failure_reason+=std::string("Can't accept URL ")+source.c_str()+"\n";
00604             logger.msg(Arc::ERROR, "Can't accept URL: %s", source); delete pair; res=1; goto exit;
00605           };
00606           if(!(pair->destination)) {
00607             failure_reason+=std::string("Can't accept URL ")+destination.c_str()+"\n";
00608             logger.msg(Arc::ERROR, "Can't accept URL: %s", destination); delete pair; res=1; goto exit;
00609           };
00610           i->pair=pair;
00611         };
00612         FileDataEx::iterator* it = new FileDataEx::iterator(i);
00613         std::string prefix = i->pfn;
00614         if (prefix.find('/') != std::string::npos) prefix.erase(0, prefix.find('/')+1);
00615         Arc::DataStatus dres = mover.Transfer(*(i->pair->source), *(i->pair->destination), *cache,
00616                                               url_map, min_speed, min_speed_time,
00617                                               min_average_speed, max_inactivity_time,
00618                                               &PointPair::callback, it,
00619                                               prefix.c_str());
00620         if (!dres.Passed()) {
00621           failure_reason+=std::string("Failed to initiate file transfer: ")+source.c_str()+" - "+std::string(dres)+"\n";
00622           logger.msg(Arc::ERROR, "Failed to initiate file transfer: %s - %s", source, std::string(dres));
00623           delete it; res=1; goto exit;
00624         };
00625         ++pairs_initiated;
00626       };
00627     };
00628     if(pairs_initiated <= 0) break; // Looks like no more files to process
00629     // Processing initiated - now wait for event
00630     pair_condition.wait_nonblock();
00631   };
00632   // Print download summary
00633   for(FileDataEx::iterator i=processed_files.begin();i!=processed_files.end();++i) {
00634     logger.msg(Arc::INFO, "Downloaded %s", i->lfn);
00635     if(Arc::URL(i->lfn).Option("exec") == "yes") {
00636       fix_file_permissions(session_dir+i->pfn,true);
00637     };
00638   };
00639   for(FileDataEx::iterator i=failed_files.begin();i!=failed_files.end();++i) {
00640     if (i->res.Retryable()) {
00641       logger.msg(Arc::ERROR, "Failed to download (but may be retried) %s",i->lfn);
00642       job_files.push_back(*i);
00643       res = 4;
00644       continue;
00645     }
00646     logger.msg(Arc::ERROR, "Failed to download %s", i->lfn);
00647     failure_reason+="Input file: "+i->lfn+" - "+(std::string)(i->res)+"\n";
00648     if(i->res == Arc::DataStatus::CredentialsExpiredError)
00649       credentials_expired=true;
00650     transfered=false;
00651   };
00652   // Check if all files have been properly downloaded
00653   if(!transfered) {
00654     logger.msg(Arc::INFO, "Some downloads failed"); res=2;
00655     if(credentials_expired) res=3;
00656     goto exit;
00657   };
00658   if(res == 4) logger.msg(Arc::INFO, "Some downloads failed, but may be retried");
00659   job_files_.clear();
00660   for(FileDataEx::iterator i = job_files.begin();i!=job_files.end();++i) job_files_.push_back(*i);
00661   if(!job_input_write_file(desc,user,job_files_)) {
00662     logger.msg(Arc::WARNING, "Failed writing changed input file");
00663   };
00664   // check for user uploadable files
00665   // run cycle waiting for uploaded files
00666   for(;;) {
00667     not_uploaded=false;
00668     for(FileDataEx::iterator i=job_files.begin();i!=job_files.end();) {
00669       if(i->lfn.find(":") == std::string::npos) { /* is it lfn ? */
00670         /* process user uploadable file */
00671         logger.msg(Arc::INFO, "Check user uploadable file: %s", i->pfn);
00672         std::string error;
00673         int err=user_file_exists(*i,session_dir,&error);
00674         if(err == 0) { /* file is uploaded */
00675           logger.msg(Arc::INFO, "User has uploaded file %s", i->pfn);
00676           i=job_files.erase(i);
00677           job_files_.clear();
00678           for(FileDataEx::iterator i = job_files.begin();i!=job_files.end();++i) job_files_.push_back(*i);
00679           if(!job_input_write_file(desc,user,job_files_)) {
00680             logger.msg(Arc::WARNING, "Failed writing changed input file.");
00681           };
00682         }
00683         else if(err == 1) { /* critical failure */
00684           logger.msg(Arc::ERROR, "Critical error for uploadable file %s", i->pfn);
00685           failure_reason+="User file: "+i->pfn+" - "+error+"\n";
00686           res=1; goto exit;
00687         }
00688         else {
00689           not_uploaded=true; ++i;
00690         };
00691       }
00692       else {
00693         ++i;
00694       };
00695     };
00696     if(!not_uploaded) break;
00697     // check for timeout
00698     if((time(NULL)-start_time) > upload_timeout) {
00699       for(FileDataEx::iterator i=job_files.begin();i!=job_files.end();++i) {
00700         if(i->lfn.find(":") == std::string::npos) { /* is it lfn ? */
00701           failure_reason+="User file: "+i->pfn+" - Timeout waiting\n";
00702         };
00703       };
00704       logger.msg(Arc::ERROR, "Uploadable files timed out"); res=2; break;
00705     };
00706     sleep(CHECK_PERIOD);
00707   };
00708   job_files_.clear();
00709   for(FileDataEx::iterator i = job_files.begin();i!=job_files.end();++i) job_files_.push_back(*i);
00710   if(!job_input_write_file(desc,user,job_files_)) {
00711     logger.msg(Arc::WARNING, "Failed writing changed input file.");
00712   };
00713 
00714   // Check for janitor result
00715   if(janitor) {
00716     unsigned int time_passed = time(NULL) - start_time;
00717     // Hardcoding max 30 minutes per RTE + 5 minutes just in case
00718     unsigned int time_left = 30*60*desc.get_local()->rtes + 5*60;
00719     time_left-=(time_left > time_passed)?time_passed:time_left;
00720     if(!janitor.wait(time_left)) {
00721       failure_reason+="The Janitor failed\n";
00722       logger.msg(Arc::ERROR, "Janitor timeout while deploying Dynamic RTE(s)");
00723       res=1; goto exit;
00724     };
00725     if(janitor.result() == Janitor::DEPLOYED) {
00726     } else if(janitor.result() == Janitor::NOTENABLED) {
00727       if(desc.get_local()->rtes > 0) {
00728         failure_reason+="The Janitor failed\n";
00729         logger.msg(Arc::ERROR, "Janitor not enabled and there are missing RTE(s)");
00730         res=1; goto exit;
00731       }
00732     } else {
00733       failure_reason+="The Janitor failed\n";
00734       logger.msg(Arc::ERROR, "Janitor failed to deploy Dynamic RTE(s)");
00735       res=1; goto exit;
00736     };
00737   };
00738 
00739   // Job migration functionality
00740   if (res == 0) {
00741     if(desc.get_local()->migrateactivityid != "") {
00742       // Complete the migration.
00743       const size_t found = desc.get_local()->migrateactivityid.rfind("/");
00744 
00745       if (found != std::string::npos) {
00746         Arc::Job job;
00747         job.Flavour = "ARC1";
00748         job.JobID = Arc::URL(desc.get_local()->migrateactivityid);
00749         job.Cluster = Arc::URL(desc.get_local()->migrateactivityid.substr(0, found));
00750 
00751         Arc::UserConfig usercfg(job.Cluster.Protocol() == "https" ?
00752                                 Arc::initializeCredentialsType() :
00753                                 Arc::initializeCredentialsType(Arc::initializeCredentialsType::SkipCredentials));
00754         if (job.Cluster.Protocol() != "https" ||
00755             (job.Cluster.Protocol() == "https" && usercfg.CredentialsFound())) {
00756           Arc::JobControllerLoader loader;
00757           Arc::JobController *jobctrl = loader.load("ARC1", usercfg);
00758           if (jobctrl) {
00759             jobctrl->FillJobStore(job);
00760 
00761             std::list<std::string> status;
00762             status.push_back("Queuing");
00763 
00764             if (!jobctrl->Kill(status, true) && !desc.get_local()->forcemigration) {
00765               res = 1;
00766               failure_reason = "FATAL ERROR: Migration failed attempting to kill old job \"" + desc.get_local()->migrateactivityid + "\".";
00767             }
00768           }
00769           else {
00770             res = 1;
00771             failure_reason = "FATAL ERROR: Migration failed, could not locate ARC1 JobController plugin. Maybe it is not installed?";
00772           }
00773         }
00774         else {
00775           res = 1;
00776           failure_reason = "FATAL ERROR: Migration failed, unable to find credentials.";
00777         }
00778       }
00779     }
00780   }
00781 
00782 exit:
00783   // clean unfinished files here
00784   job_files_.clear();
00785   for(FileDataEx::iterator i = job_files.begin();i!=job_files.end();++i) job_files_.push_back(*i);
00786   clean_files(job_files_,session_dir);
00787   // release cache just in case
00788   if(res != 0 && res != 4) {
00789     cache->Release();
00790   };
00791   delete cache;
00792   remove_proxy();
00793   if(res != 0 && res != 4) {
00794     job_failed_mark_add(desc,user,failure_reason);
00795   };
00796   logger.msg(Arc::INFO, "Leaving downloader (%i)", res);
00797   return res;
00798 }