Back to index

nordugrid-arc-nox  1.1.0~rc6
uploader.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 /*
00005   Upload files specified in job.ID.output.
00006   result: 0 - ok, 1 - unrecoverable error, 2 - potentially recoverable,
00007   3 - certificate error, 4 - should retry.
00008 */
00009 #include <sys/types.h>
00010 #include <sys/stat.h>
00011 #include <unistd.h>
00012 #include <fcntl.h>
00013 #include <pwd.h>
00014 
00015 #include <glibmm.h>
00016 
00017 #include <arc/data/CheckSum.h>
00018 #include <arc/data/FileCache.h>
00019 #include <arc/data/DataHandle.h>
00020 #include <arc/data/DataMover.h>
00021 #include <arc/StringConv.h>
00022 #include <arc/Thread.h>
00023 #include <arc/URL.h>
00024 #include <arc/UserConfig.h>
00025 #include <arc/Utils.h>
00026 
00027 #include "../jobs/job.h"
00028 #include "../jobs/users.h"
00029 #include "../files/info_types.h"
00030 #include "../files/info_files.h"
00031 #include "../files/delete.h"
00032 #include "../conf/environment.h"
00033 #include "../misc/proxy.h"
00034 #include "../conf/conf_map.h"
00035 #include "../conf/conf_cache.h"
00036 #include "janitor.h"
00037 
00038 static Arc::Logger logger(Arc::Logger::getRootLogger(), "Uploader");
00039 
00040 /* maximum number of retries (for every source/destination) */
00041 #define MAX_RETRIES 5
00042 /* maximum number simultaneous uploads */
00043 #define MAX_UPLOADS 5
00044 
00045 class PointPair;
00046 
00047 class FileDataEx : public FileData {
00048  public:
00049   typedef std::list<FileDataEx>::iterator iterator;
00050   Arc::DataStatus res;
00051   PointPair* pair;
00052   FileDataEx(const FileData& f) : 
00053       FileData(f),
00054       res(Arc::DataStatus::Success),
00055       pair(NULL) {}
00056   FileDataEx(const FileData& f, Arc::DataStatus r) :
00057       FileData(f),
00058       res(r),
00059       pair(NULL) {}
00060 };
00061 
00062 static std::list<FileData> job_files_;
00063 static std::list<FileDataEx> job_files;
00064 static std::list<FileDataEx> processed_files;
00065 static std::list<FileDataEx> failed_files;
00066 static Arc::SimpleCondition pair_condition;
00067 static int pairs_initiated = 0;
00068 
00069 class SimpleConditionLock {
00070  private:
00071   Arc::SimpleCondition& cond_;
00072  public:
00073   SimpleConditionLock(Arc::SimpleCondition& cond):cond_(cond) {
00074     cond_.lock();
00075   };
00076   ~SimpleConditionLock(void) {
00077     cond_.unlock();
00078   };
00079 };
00080 
00081 int clean_files(std::list<FileData> &job_files,char* session_dir) {
00082   std::string session(session_dir);
00083   if(delete_all_files(session,job_files,true) != 2) return 0;
00084   return 1;
00085 }
00086 
00087 class PointPair {
00088  private:
00089   Arc::URL source_url;
00090   Arc::URL destination_url;
00091  public:
00092   Arc::DataHandle source;
00093   Arc::DataHandle destination;
00094   PointPair(const std::string& source_str, const std::string& destination_str,
00095            const Arc::UserConfig& usercfg)
00096     : source_url(source_str),
00097       destination_url(destination_str),
00098       source(source_url, usercfg),
00099       destination(destination_url, usercfg) {};
00100   ~PointPair(void) {};
00101   static void callback(Arc::DataMover*,Arc::DataStatus res,void* arg) {
00102     FileDataEx::iterator &it = *((FileDataEx::iterator*)arg);
00103     pair_condition.lock();
00104     if(!res.Passed()) {
00105       it->res=res;
00106       logger.msg(Arc::ERROR, "Failed uploading file %s - %s", it->lfn, std::string(res));
00107       if((it->pair->source->GetTries() <= 0) || (it->pair->destination->GetTries() <= 0)) {
00108         delete it->pair; it->pair=NULL;
00109         failed_files.push_back(*it);
00110       } else {
00111         job_files.push_back(*it);
00112         logger.msg(Arc::ERROR, "Retrying");
00113       };
00114     } else {
00115       logger.msg(Arc::INFO, "Uploaded file %s", it->lfn);
00116       delete it->pair; it->pair=NULL;
00117       processed_files.push_back(*it);
00118     };
00119     job_files.erase(it);
00120     --pairs_initiated;
00121     pair_condition.signal_nonblock();
00122     pair_condition.unlock();
00123     delete &it;
00124   };
00125 };
00126 
00127 void expand_files(std::list<FileData> &job_files,char* session_dir) {
00128   for(FileData::iterator i = job_files.begin();i!=job_files.end();) {
00129     std::string url = i->lfn;
00130     // Only ftp and gsiftp can be expanded to directories so far
00131     if(strncasecmp(url.c_str(),"ftp://",6) && 
00132        strncasecmp(url.c_str(),"gsiftp://",9)) { ++i; continue; };
00133     // user must ask explicitly
00134     if(url[url.length()-1] != '/') { ++i; continue; };
00135     std::string path(session_dir); path+="/"; path+=i->pfn;
00136     int l = strlen(session_dir) + 1;
00137     try {
00138       Glib::Dir dir(path);
00139       std::string file;
00140       for(;;) {
00141         file=dir.read_name();
00142         if(file.empty()) break;
00143         if(file == ".") continue;
00144         if(file == "..") continue;
00145         std::string path_ = path; path_+="/"; path+=file;
00146         struct stat st;
00147         if(lstat(path_.c_str(),&st) != 0) continue; // do not follow symlinks
00148         if(S_ISREG(st.st_mode)) {
00149           std::string lfn = url+file;
00150           job_files.push_back(FileData(path_.c_str()+l,lfn.c_str()));
00151         } else if(S_ISDIR(st.st_mode)) {
00152           std::string lfn = url+file+"/"; // cause recursive search
00153           job_files.push_back(FileData(path_.c_str()+l,lfn.c_str()));
00154         };
00155       };
00156       i=job_files.erase(i);
00157     } catch(Glib::FileError& e) {
00158       ++i;
00159     }; 
00160   };
00161 }
00162 
00163 int main(int argc,char** argv) {
00164   Arc::LogStream logcerr(std::cerr);
00165   Arc::Logger::getRootLogger().addDestination(logcerr);
00166   Arc::Logger::getRootLogger().setThreshold(Arc::VERBOSE);
00167   int res=0;
00168   int n_threads = 1;
00169   int n_files = MAX_UPLOADS;
00170   /* used to find caches used by this user */
00171   std::string file_owner_username = "";
00172   uid_t file_owner = 0;
00173   gid_t file_group = 0;
00174   bool use_conf_cache = false;
00175   unsigned long long int min_speed = 0;
00176   time_t min_speed_time = 300;
00177   unsigned long long int min_average_speed = 0;
00178   time_t max_inactivity_time = 300;
00179   bool secure = true;
00180   bool userfiles_only = false;
00181   bool passive = false;
00182   std::string failure_reason("");
00183   std::string x509_proxy, x509_cert, x509_key, x509_cadir;
00184 
00185   // process optional arguments
00186   for(;;) {
00187     opterr=0;
00188     int optc=getopt(argc,argv,"+hclpfC:n:t:u:U:s:S:a:i:d:");
00189     if(optc == -1) break;
00190     switch(optc) {
00191       case 'h': {
00192         std::cerr<<"Usage: uploader [-hclpf] [-C conf_file] [-n files] [-t threads] [-U uid]"<<std::endl;
00193         std::cerr<<"          [-u username] [-s min_speed] [-S min_speed_time]"<<std::endl;
00194         std::cerr<<"          [-a min_average_speed] [-i min_activity_time]"<<std::endl;
00195         std::cerr<<"          [-d debug_level] job_id control_directory"<<std::endl;
00196         std::cerr<<"          session_directory [cache options]"<<std::endl; 
00197         exit(1);
00198       }; break;
00199       case 'c': {
00200         secure=false;
00201       }; break;
00202       case 'C': {
00203         nordugrid_config_loc(optarg);
00204       }; break;
00205       case 'f': {
00206         use_conf_cache=true;
00207       }; break;
00208       case 'l': {
00209         userfiles_only=true;
00210       }; break;
00211       case 'p': {
00212         passive=true;
00213       }; break;
00214       case 'd': {
00215         Arc::Logger::getRootLogger().setThreshold(Arc::string_to_level(optarg));
00216       }; break;
00217       case 't': {
00218         n_threads=atoi(optarg);
00219         if(n_threads < 1) {
00220           logger.msg(Arc::ERROR, "Wrong number of threads: %s", optarg); exit(1);
00221         };
00222       }; break;
00223       case 'n': {
00224         n_files=atoi(optarg);
00225         if(n_files < 1) {
00226           logger.msg(Arc::ERROR, "Wrong number of files: %s", optarg); exit(1);
00227         };
00228       }; break;
00229       case 'U': {
00230         unsigned int tuid;
00231         if(!Arc::stringto(std::string(optarg),tuid)) {
00232           logger.msg(Arc::ERROR, "Bad number: %s", optarg); exit(1);
00233         };
00234         struct passwd pw_;
00235         struct passwd *pw;
00236         char buf[BUFSIZ];
00237         getpwuid_r(tuid,&pw_,buf,BUFSIZ,&pw);
00238         if(pw == NULL) {
00239           logger.msg(Arc::ERROR, "Wrong user name"); exit(1);
00240         };
00241         file_owner=pw->pw_uid;
00242         file_group=pw->pw_gid;
00243         if(pw->pw_name) file_owner_username=pw->pw_name;
00244         if((getuid() != 0) && (getuid() != file_owner)) {
00245           logger.msg(Arc::ERROR, "Specified user can't be handled"); exit(1);
00246         };
00247       }; break;
00248       case 'u': {
00249         struct passwd pw_;
00250         struct passwd *pw;
00251         char buf[BUFSIZ];
00252         getpwnam_r(optarg,&pw_,buf,BUFSIZ,&pw);
00253         if(pw == NULL) {
00254           logger.msg(Arc::ERROR, "Wrong user name"); exit(1);
00255         };
00256         file_owner=pw->pw_uid;
00257         file_group=pw->pw_gid;
00258         if(pw->pw_name) file_owner_username=pw->pw_name;
00259         if((getuid() != 0) && (getuid() != file_owner)) {
00260           logger.msg(Arc::ERROR, "Specified user can't be handled"); exit(1);
00261         };
00262       }; break;
00263       case 's': {
00264         unsigned int tmpi;
00265         if(!Arc::stringto(std::string(optarg),tmpi)) {
00266           logger.msg(Arc::ERROR, "Bad number: %s", optarg); exit(1);
00267         };
00268         min_speed=tmpi;
00269       }; break;
00270       case 'S': {
00271         unsigned int tmpi;
00272         if(!Arc::stringto(std::string(optarg),tmpi)) {
00273           logger.msg(Arc::ERROR, "Bad number: %s", optarg); exit(1);
00274         };
00275         min_speed_time=tmpi;
00276       }; break;
00277       case 'a': {
00278         unsigned int tmpi;
00279         if(!Arc::stringto(std::string(optarg),tmpi)) {
00280           logger.msg(Arc::ERROR, "Bad number: %s", optarg); exit(1);
00281         };
00282         min_average_speed=tmpi;
00283       }; break;
00284       case 'i': {
00285         unsigned int tmpi;
00286         if(!Arc::stringto(std::string(optarg),tmpi)) {
00287           logger.msg(Arc::ERROR, "Bad number: %s", optarg); exit(1);
00288         };
00289         max_inactivity_time=tmpi;
00290       }; break;
00291       case '?': {
00292         logger.msg(Arc::ERROR, "Unsupported option: %c", (char)optopt);
00293         exit(1);
00294       }; break;
00295       case ':': {
00296         logger.msg(Arc::ERROR, "Missing parameter for option %c", (char)optopt);
00297         exit(1);
00298       }; break;
00299       default: {
00300         logger.msg(Arc::ERROR, "Undefined processing error");
00301         exit(1);
00302       };
00303     };
00304   };
00305   // process required arguments
00306   char * id = argv[optind+0];
00307   if(!id) { logger.msg(Arc::ERROR, "Missing job id"); return 1; };
00308   char* control_dir = argv[optind+1];
00309   if(!control_dir) { logger.msg(Arc::ERROR, "Missing control directory"); return 1; };
00310   char* session_dir = argv[optind+2];
00311   if(!session_dir) { logger.msg(Arc::ERROR, "Missing session directory"); return 1; };
00312 
00313   read_env_vars();
00314   // prepare Job and User descriptions (needed for substitutions in cache dirs)
00315   JobDescription desc(id,session_dir);
00316   uid_t uid;
00317   gid_t gid;
00318   if(file_owner != 0) { uid=file_owner; }
00319   else { uid= getuid(); };
00320   if(file_group != 0) { gid=file_group; }
00321   else { gid= getgid(); };
00322   desc.set_uid(uid,gid);
00323   JobUser user(uid);
00324   user.SetControlDir(control_dir);
00325   user.SetSessionRoot(session_dir);
00326   
00327   // if u or U option not set, use our username
00328   if (file_owner_username == "") {
00329     struct passwd pw_;
00330     struct passwd *pw;
00331     char buf[BUFSIZ];
00332     getpwuid_r(getuid(),&pw_,buf,BUFSIZ,&pw);
00333     if(pw == NULL) {
00334       logger.msg(Arc::ERROR, "Wrong user name"); exit(1);
00335     }
00336     if(pw->pw_name) file_owner_username=pw->pw_name;
00337   }
00338   
00339   Arc::FileCache * cache;
00340 
00341   if (use_conf_cache) {
00342     try {
00343       CacheConfig * cache_config = new CacheConfig(std::string(file_owner_username));
00344       user.SetCacheParams(cache_config);
00345       cache = new Arc::FileCache(cache_config->getCacheDirs(),
00346                                  cache_config->getRemoteCacheDirs(),
00347                                  cache_config->getDrainingCacheDirs(),
00348                                  std::string(id), uid, gid,
00349                                  cache_config->getCacheMax(),
00350                                  cache_config->getCacheMin());
00351       if (!(cache_config->getCacheDirs().size() == 0) && !(*cache)) {
00352         logger.msg(Arc::ERROR, "Error creating cache");
00353         delete cache;
00354         exit(1);
00355       }
00356     }
00357     catch (CacheConfigException e) {
00358       logger.msg(Arc::ERROR, "Error with cache configuration: %s", e.what());
00359       delete cache;
00360       exit(1);
00361     }
00362   }
00363   else if(argv[optind+3]) {
00364     std::string cache_path = argv[optind+3];
00365     if(argv[optind+4])
00366       cache_path += " "+std::string(argv[optind+4]);
00367     cache = new Arc::FileCache(cache_path, std::string(id), uid, gid);
00368     if (!(*cache)) {
00369       logger.msg(Arc::ERROR, "Error creating cache");
00370       delete cache;
00371       exit(1);
00372     }
00373   }
00374   else {
00375     // if no cache defined, use null cache
00376     cache = new Arc::FileCache();
00377   }
00378 
00379   if(min_speed != 0)
00380     logger.msg(Arc::VERBOSE, "Minimal speed: %llu B/s during %i s", min_speed, min_speed_time);
00381   if(min_average_speed != 0)
00382     logger.msg(Arc::VERBOSE, "Minimal average speed: %llu B/s", min_average_speed);
00383   if(max_inactivity_time != 0)
00384     logger.msg(Arc::VERBOSE, "Maximal inactivity time: %i s", max_inactivity_time);
00385 
00386   prepare_proxy();
00387 
00388   if(n_threads > 10) {
00389     logger.msg(Arc::WARNING, "Won't use more than 10 threads");
00390     n_threads=10;
00391   };
00392 
00393   UrlMapConfig url_map;
00394   logger.msg(Arc::INFO, "Uploader started");
00395 
00396   Janitor janitor(desc.get_id(),user.ControlDir());
00397   
00398   Arc::UserConfig usercfg(Arc::initializeCredentialsType(Arc::initializeCredentialsType::TryCredentials));
00399   usercfg.UtilsDirPath(control_dir);
00400   
00401   Arc::DataMover mover;
00402   mover.retry(true);
00403   mover.secure(secure);
00404   mover.passive(passive);
00405   mover.verbose(true); // statistics will be shown if logging is higher than VERBOSE
00406   if(min_speed != 0)
00407     mover.set_default_min_speed(min_speed,min_speed_time);
00408   if(min_average_speed != 0)
00409     mover.set_default_min_average_speed(min_average_speed);
00410   if(max_inactivity_time != 0)
00411     mover.set_default_max_inactivity_time(max_inactivity_time);
00412   bool transfered = true;
00413   bool credentials_expired = false;
00414   std::list<FileData>::iterator it = job_files_.begin();
00415   std::list<FileData>::iterator it2 = job_files_.begin();
00416   
00417   // get the list of output files
00418   if(!job_output_read_file(desc.get_id(),user,job_files_)) {
00419     failure_reason+="Internal error in uploader\n";
00420     logger.msg(Arc::ERROR, "Can't read list of output files"); res=1; goto exit;
00421     //olog << "WARNING: Can't read list of output files - whole output will be removed" << std::endl;
00422   }
00423   // add any output files dynamically added by the user during the job
00424   for(it = job_files_.begin(); it != job_files_.end() ; ++it) {
00425     if(it->pfn.find("@") == 1) { // GM puts a slash on the front of the local file
00426       std::string outputfilelist = session_dir + std::string("/") + it->pfn.substr(2);
00427       logger.msg(Arc::INFO, "Reading output files from user generated list in %s", outputfilelist);
00428       if (!job_Xput_read_file(outputfilelist, job_files_)) {
00429         logger.msg(Arc::ERROR, "Error reading user generated output file list in %s", outputfilelist); res=1; goto exit;
00430       }
00431     }
00432   }
00433   // remove dynamic output file lists from the files to upload
00434   it = job_files_.begin();
00435   while (it != job_files_.end()) {
00436     if(it->pfn.find("@") == 1) it = job_files_.erase(it);
00437     else it++;
00438   }
00439   // check if any files share the same LFN, if so allow overwriting existing LFN
00440   for (it = job_files_.begin(); it != job_files_.end(); it++) {
00441     bool done = false;
00442     for (it2 = job_files_.begin(); it2 != job_files_.end(); it2++) {
00443       if (it != it2 && !it->lfn.empty() && !it2->lfn.empty()) {
00444         // error if lfns (including locations) are identical
00445         if (it->lfn == it2->lfn) {
00446           logger.msg(Arc::ERROR, "Two identical output destinations: %s", it->lfn);
00447           res = 1;
00448           goto exit;
00449         }
00450         Arc::URL u_it(it->lfn);
00451         Arc::URL u_it2(it2->lfn);
00452         if (u_it == u_it2) {
00453           // error if pfns are different
00454           if (it->pfn != it2->pfn) {
00455             logger.msg(Arc::ERROR, "Cannot upload two different files %s and %s to same LFN: %s", it->pfn, it2->pfn, it->lfn);
00456             res = 1;
00457             goto exit;
00458           }    
00459           mover.force_to_meta(true);
00460           done = true;
00461           break;
00462         }
00463       }
00464     }
00465     if (done)
00466       break;
00467   }
00468   // remove bad files
00469   if(clean_files(job_files_,session_dir) != 0) {
00470     failure_reason+="Internal error in uploader\n";
00471     logger.msg(Arc::ERROR, "Can't remove junk files"); res=1; goto exit;
00472   };
00473   expand_files(job_files_,session_dir);
00474   for(std::list<FileData>::iterator i = job_files_.begin();i!=job_files_.end();++i) {
00475     job_files.push_back(*i);
00476   };
00477 
00478   if(!desc.GetLocalDescription(user)) {
00479     logger.msg(Arc::ERROR, "Can't read job local description"); res=1; goto exit;
00480   };
00481 
00482   // Start janitor in parallel
00483   if(janitor) {
00484     if(!janitor.remove()) {
00485       logger.msg(Arc::ERROR, "Failed to deploy Janitor"); res=1; goto exit;
00486     };
00487   };
00488 
00489   // initialize structures to handle upload
00490   /* TODO: add threads=# to all urls if n_threads!=1 */
00491   // Main upload cycle
00492   if(!userfiles_only) for(;;) {
00493     // Initiate transfers
00494     int n = 0;
00495     SimpleConditionLock local_lock(pair_condition);
00496     for(FileDataEx::iterator i=job_files.begin();i!=job_files.end();++i) {
00497       if(i->lfn.find(":") != std::string::npos) { /* is it lfn ? */
00498         ++n;
00499         if(n <= pairs_initiated) continue; // skip files being processed
00500         if(n > n_files) break; // quit if not allowed to process more
00501         /* have source and file to upload */
00502         std::string source;
00503         std::string destination = i->lfn;
00504         if(i->pair == NULL) {
00505           /* define place to store */
00506           std::string stdlog;
00507           JobLocalDescription* local = desc.get_local();
00508           if(local) stdlog=local->stdlog;
00509           if(stdlog.length() > 0) stdlog="/"+stdlog+"/";
00510           if((stdlog.length() > 0) &&
00511              (strncmp(stdlog.c_str(),i->pfn.c_str(),stdlog.length()) == 0)) {
00512             stdlog=i->pfn.c_str()+stdlog.length();
00513             source=std::string("file://")+control_dir+"/job."+id+"."+stdlog;
00514           } else {
00515             source=std::string("file://")+session_dir+i->pfn;
00516           };
00517           if(strncasecmp(destination.c_str(),"file:/",6) == 0) {
00518             failure_reason+=std::string("User requested to store output locally ")+destination.c_str()+"\n";
00519             logger.msg(Arc::ERROR, "Local destination for uploader %s", destination); res=1; goto exit;
00520           };
00521           PointPair* pair = new PointPair(source,destination,usercfg);
00522           if(!(pair->source)) {
00523             failure_reason+=std::string("Can't accept URL ")+source.c_str()+"\n";
00524             logger.msg(Arc::ERROR, "Can't accept URL: %s", source); delete pair; res=1; goto exit;
00525           };
00526           if(!(pair->destination)) {
00527             failure_reason+=std::string("Can't accept URL ")+destination.c_str()+"\n";
00528             logger.msg(Arc::ERROR, "Can't accept URL: %s", destination); delete pair; res=1; goto exit;
00529           };
00530           i->pair=pair;
00531         };
00532         FileDataEx::iterator* it = new FileDataEx::iterator(i);
00533         std::string prefix = i->pfn;
00534         if (prefix.find('/') != std::string::npos) prefix.erase(0, prefix.find('/')+1);
00535         Arc::DataStatus dres = mover.Transfer(*(i->pair->source), *(i->pair->destination), *cache,
00536                                               url_map, min_speed, min_speed_time,
00537                                               min_average_speed, max_inactivity_time,
00538                                               &PointPair::callback, it,
00539                                               prefix.c_str());
00540         if (!dres.Passed()) {
00541           failure_reason+=std::string("Failed to initiate file transfer: ")+source.c_str()+" - "+std::string(dres)+"\n";
00542           logger.msg(Arc::ERROR, "Failed to initiate file transfer: %s - %s", source, std::string(dres));
00543           delete it; res=1; goto exit;
00544         };
00545         ++pairs_initiated;
00546       };
00547     };
00548     if(pairs_initiated <= 0) break; // Looks like no more files to process
00549     // Processing initiated - now wait for event
00550     pair_condition.wait_nonblock();
00551   };
00552   // Print upload summary
00553   for(FileDataEx::iterator i=processed_files.begin();i!=processed_files.end();++i) {
00554     logger.msg(Arc::INFO, "Uploaded %s", i->lfn);
00555   };
00556   for(FileDataEx::iterator i=failed_files.begin();i!=failed_files.end();++i) {
00557     if(i->res.Retryable()) {
00558       job_files.push_back(*i);
00559       logger.msg(Arc::ERROR,"Failed to upload (but may be retried) %s",i->lfn);
00560       res = 4;
00561       continue;
00562     }
00563     logger.msg(Arc::ERROR, "Failed to upload %s", i->lfn);
00564     failure_reason+="Output file: "+i->lfn+" - "+(std::string)(i->res)+"\n";
00565     if(i->res == Arc::DataStatus::CredentialsExpiredError)
00566       credentials_expired=true;
00567     transfered=false;
00568   };
00569   // Check if all files have been properly uploaded
00570   if(!transfered) {
00571     logger.msg(Arc::INFO, "Some uploads failed"); res=2;
00572     if(credentials_expired) res=3;
00573     goto exit;
00574   }
00575   else if(res == 4) { logger.msg(Arc::INFO,"Some uploads failed, but may be retried"); }
00576   else { /* all files left should be kept */
00577     for(FileDataEx::iterator i=job_files.begin();i!=job_files.end();) {
00578       i->lfn=""; ++i;
00579     };
00580   }
00581   if(!userfiles_only) {
00582     job_files_.clear();
00583     for(FileDataEx::iterator i = job_files.begin();i!=job_files.end();++i) job_files_.push_back(*i);
00584     if(!job_output_write_file(desc,user,job_files_)) {
00585       logger.msg(Arc::WARNING, "Failed writing changed output file");
00586     };
00587   };
00588 exit:
00589   // release input files used for this job
00590   cache->Release();
00591   delete cache;
00592   // clean uploaded files here 
00593   job_files_.clear();
00594   for(FileDataEx::iterator i = job_files.begin();i!=job_files.end();++i) job_files_.push_back(*i);
00595   clean_files(job_files_,session_dir);
00596   remove_proxy();
00597   // We are not extremely interested if janitor finished successfuly
00598   // but it should be at least reported.
00599   if(janitor) {
00600     if(!janitor.wait(5*60)) {
00601       logger.msg(Arc::WARNING, "Janitor timeout while removing Dynamic RTE(s) associations (ignoring)");
00602     };
00603     if(janitor.result() != Janitor::REMOVED && janitor.result() != Janitor::NOTENABLED) {
00604       logger.msg(Arc::WARNING, "Janitor failed to remove Dynamic RTE(s) associations (ignoring)");
00605     };
00606   };
00607   if(res != 0 && res != 4) {
00608     job_failed_mark_add(desc,user,failure_reason);
00609   };
00610   logger.msg(Arc::INFO, "Leaving uploader (%i)", res);
00611   return res;
00612 }
00613