Back to index

nordugrid-arc-nox  1.1.0~rc6
DataMover.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #include <cstdlib>
00008 // NOTE: On Solaris errno is not working properly if cerrno is included first
00009 #include <cerrno>
00010 
00011 #include <sys/types.h>
00012 #include <unistd.h>
00013 
00014 #include <glibmm.h>
00015 
00016 #include <arc/DateTime.h>
00017 #include <arc/Logger.h>
00018 #include <arc/Thread.h>
00019 #include <arc/URL.h>
00020 #include <arc/UserConfig.h>
00021 #include <arc/Utils.h>
00022 #include <arc/credential/Credential.h>
00023 #include <arc/data/DataBuffer.h>
00024 #include <arc/data/CheckSum.h>
00025 #include <arc/data/DataMover.h>
00026 #include <arc/data/DataPoint.h>
00027 #include <arc/data/DataHandle.h>
00028 #include <arc/data/FileCache.h>
00029 #include <arc/data/MkDirRecursive.h>
00030 #include <arc/data/URLMap.h>
00031 
00032 #ifdef WIN32
00033 #include <arc/win32.h>
00034 #endif
00035 
00036 namespace Arc {
00037 
00038   Logger DataMover::logger(Logger::getRootLogger(), "DataMover");
00039 
00040   DataMover::DataMover()
00041     : be_verbose(false),
00042       force_secure(false),
00043       force_passive(false),
00044       force_registration(false),
00045       do_checks(true),
00046       do_retries(true),
00047       default_min_speed(0),
00048       default_min_speed_time(0),
00049       default_min_average_speed(0),
00050       default_max_inactivity_time(300),
00051       show_progress(NULL) {}
00052 
00053   DataMover::~DataMover() {}
00054 
00055   bool DataMover::verbose() {
00056     return be_verbose;
00057   }
00058 
00059   void DataMover::verbose(bool val) {
00060     be_verbose = val;
00061   }
00062 
00063   void DataMover::verbose(const std::string& prefix) {
00064     be_verbose = true;
00065     verbose_prefix = prefix;
00066   }
00067 
00068   bool DataMover::retry() {
00069     return do_retries;
00070   }
00071 
00072   void DataMover::retry(bool val) {
00073     do_retries = val;
00074   }
00075 
00076   bool DataMover::checks() {
00077     return do_checks;
00078   }
00079 
00080   void DataMover::checks(bool val) {
00081     do_checks = val;
00082   }
00083 
00084   typedef struct {
00085     DataPoint *source;
00086     DataPoint *destination;
00087     FileCache *cache;
00088     const URLMap *map;
00089     unsigned long long int min_speed;
00090     time_t min_speed_time;
00091     unsigned long long int min_average_speed;
00092     time_t max_inactivity_time;
00093     DataMover::callback cb;
00094     DataMover *it;
00095     void *arg;
00096     const char *prefix;
00097   } transfer_struct;
00098 
00099   DataStatus DataMover::Delete(DataPoint& url, bool errcont) {
00100     bool remove_lfn = !url.HaveLocations(); // pfn or plain url
00101     if (!url.Resolve(true).Passed())
00102       // TODO: Check if error is real or "not exist".
00103       if (remove_lfn)
00104         logger.msg(INFO,
00105                    "No locations found - probably no more physical instances");
00106     std::list<URL> removed_urls;
00107     if (url.HaveLocations())
00108       for (; url.LocationValid();) {
00109         logger.msg(INFO, "Removing %s", url.CurrentLocation().str());
00110         // It can happen that after resolving list contains duplicated
00111         // physical locations obtained from different meta-data-services.
00112         // Because not all locations can reliably say if files does not exist
00113         // or access is not allowed, avoid duplicated delete attempts.
00114         bool url_was_deleted = false;
00115         for (std::list<URL>::iterator u = removed_urls.begin(); u
00116              != removed_urls.end(); ++u)
00117           if (url.CurrentLocation() == (*u)) {
00118             url_was_deleted = true;
00119             break;
00120           }
00121         if (url_was_deleted) {
00122           logger.msg(DEBUG, "This instance was already deleted");
00123         }
00124         else {
00125           url.SetSecure(false);
00126           if (!url.Remove()) {
00127             logger.msg(INFO, "Failed to delete physical file");
00128             if (!errcont) {
00129               url.NextLocation();
00130               continue;
00131             }
00132           }
00133           else
00134             removed_urls.push_back(url.CurrentLocation());
00135         }
00136         if (url.IsIndex()) {
00137           logger.msg(INFO, "Removing metadata in %s",
00138                      url.CurrentLocationMetadata());
00139           DataStatus err = url.Unregister(false);
00140           if (!err) {
00141             logger.msg(ERROR, "Failed to delete meta-information");
00142             url.NextLocation();
00143           }
00144           else
00145             url.RemoveLocation();
00146         }
00147         else {
00148           // Leave immediately in case of direct URL
00149           break;
00150         }
00151       }
00152     if (url.IsIndex()) {
00153       if (url.HaveLocations()) {
00154         logger.msg(ERROR, "Failed to remove all physical instances");
00155         return DataStatus::DeleteError;
00156       }
00157       if (remove_lfn) {
00158         logger.msg(INFO, "Removing logical file from metadata %s", url.str());
00159         DataStatus err = url.Unregister(true);
00160         if (!err) {
00161           logger.msg(ERROR, "Failed to delete logical file");
00162           return err;
00163         }
00164       }
00165     }
00166     else {
00167       if (!url.LocationValid()) {
00168         logger.msg(ERROR, "Failed to remove instance");
00169         return DataStatus::DeleteError;
00170       }
00171     }
00172     return DataStatus::Success;
00173   }
00174 
00175   void transfer_func(void *arg) {
00176     transfer_struct *param = (transfer_struct*)arg;
00177     DataStatus res = param->it->Transfer(*(param->source),
00178                                          *(param->destination), *(param->cache), *(param->map),
00179                                          param->min_speed, param->min_speed_time,
00180                                          param->min_average_speed, param->max_inactivity_time,
00181                                          NULL, NULL, param->prefix);
00182     (*(param->cb))(param->it, res, param->arg);
00183     if (param->prefix) free((void*)(param->prefix));
00184     if (param->cache) delete param->cache;
00185     free(param);
00186   }
00187 
00188   /* transfer data from source to destination */
00189   DataStatus DataMover::Transfer(DataPoint& source, DataPoint& destination,
00190                                  FileCache& cache, const URLMap& map,
00191                                  DataMover::callback cb, void *arg, const char *prefix) {
00192     return Transfer(source, destination, cache, map, default_min_speed,
00193                     default_min_speed_time, default_min_average_speed,
00194                     default_max_inactivity_time, cb, arg, prefix);
00195   }
00196 
00197   DataStatus DataMover::Transfer(DataPoint& source, DataPoint& destination,
00198                                  FileCache& cache, const URLMap& map, unsigned long long int min_speed,
00199                                  time_t min_speed_time, unsigned long long int
00200                                  min_average_speed, time_t max_inactivity_time,
00201                                  DataMover::callback cb, void *arg,
00202                                  const char *prefix) {
00203 
00204     if (cb != NULL) {
00205       logger.msg(VERBOSE, "DataMover::Transfer : starting new thread");
00206       transfer_struct *param = (transfer_struct*)malloc(sizeof(transfer_struct));
00207       if (param == NULL)
00208         return DataStatus::TransferError;
00209       param->source = &source;
00210       param->destination = &destination;
00211       param->cache = new FileCache(cache);
00212       param->map = &map;
00213       param->min_speed = min_speed;
00214       param->min_speed_time = min_speed_time;
00215       param->min_average_speed = min_average_speed;
00216       param->max_inactivity_time = max_inactivity_time;
00217       param->cb = cb;
00218       param->it = this;
00219       param->arg = arg;
00220       param->prefix = NULL;
00221       if (prefix)
00222         param->prefix = strdup(prefix);
00223       if (param->prefix == NULL)
00224         param->prefix = strdup(verbose_prefix.c_str());
00225       if (!CreateThreadFunction(&transfer_func, param)) {
00226         if(param->prefix) free((void*)(param->prefix));
00227         if(param->cache) delete param->cache;
00228         free(param);
00229         return DataStatus::TransferError;
00230       }
00231       return DataStatus::Success;
00232     }
00233     logger.msg(INFO, "Transfer from %s to %s", source.str(), destination.str());
00234     if (!source) {
00235       logger.msg(ERROR, "Not valid source");
00236       source.NextTry();
00237       return DataStatus::ReadAcquireError;
00238     }
00239     if (!destination) {
00240       logger.msg(ERROR, "Not valid destination");
00241       destination.NextTry();
00242       return DataStatus::WriteAcquireError;
00243     }
00244     // initial cache check, if the DN is cached we can exit straight away
00245     bool cacheable = false;
00246     bool executable = (source.GetURL().Option("exec") == "yes") ? true : false;
00247     bool cache_copy = (source.GetURL().Option("cache") == "copy") ? true : false;
00248     // DN is used for cache permissions
00249     std::string dn;
00250     Time exp_time(0);
00251     if (source.Cache() && destination.Local() && cache) {
00252       cacheable = true;
00253       try {
00254         // TODO (important) load credential in unified way or 
00255         // use already loaded one
00256         Credential ci(GetEnv("X509_USER_PROXY"), GetEnv("X509_USER_PROXY"), GetEnv("X509_CERT_DIR"), "");
00257         dn = ci.GetIdentityName();
00258         exp_time = ci.GetEndTime();
00259       } catch (CredentialError e) {
00260         logger.msg(WARNING, "Couldn't handle certificate: %s", e.what());
00261       }
00262     }
00263 #ifndef WIN32
00264     if (cacheable && source.GetURL().Option("cache") != "renew") {
00265       std::string canonic_url = source.str();
00266       bool is_in_cache = false;
00267       bool is_locked = false;
00268       if (cache.Start(canonic_url, is_in_cache, is_locked)) {
00269         if (is_in_cache) {
00270           logger.msg(INFO, "File %s is cached (%s) - checking permissions",
00271                      canonic_url, cache.File(canonic_url));
00272           // check the list of cached DNs
00273           if (cache.CheckDN(canonic_url, dn)) {
00274             logger.msg(VERBOSE, "Permission checking passed");
00275             bool cache_link_result;
00276             if (source.ReadOnly() && !executable && !cache_copy) {
00277               logger.msg(VERBOSE, "Linking/copying cached file");
00278               cache_link_result = cache.Link(destination.CurrentLocation().Path(), canonic_url);
00279             }
00280             else {
00281               logger.msg(VERBOSE, "Copying cached file");
00282               cache_link_result = cache.Copy(destination.CurrentLocation().Path(), canonic_url, executable);
00283             }
00284             cache.Stop(canonic_url);
00285             source.NextLocation(); /* to decrease retry counter */
00286             if (!cache_link_result)
00287               return DataStatus::CacheError;           
00288             return DataStatus::Success;
00289           }
00290         }
00291         cache.Stop(canonic_url);
00292       }
00293     }
00294 #endif /*WIN32*/
00295       
00296     for (;;) {
00297       DataStatus dres = source.Resolve(true);
00298       if (dres.Passed()) {
00299         if (source.HaveLocations())
00300           break;
00301         logger.msg(ERROR, "No locations for source found: %s", source.str());
00302       }
00303       else
00304         logger.msg(ERROR, "Failed to resolve source: %s", source.str());
00305       source.NextTry(); /* try again */
00306       if (!do_retries)
00307         return dres;
00308       if (!source.LocationValid())
00309         return dres;
00310     }
00311     for (;;) {
00312       DataStatus dres = destination.Resolve(false);
00313       if (dres.Passed()) {
00314         if (destination.HaveLocations())
00315           break;
00316         logger.msg(ERROR, "No locations for destination found: %s",
00317                    destination.str());
00318       }
00319       else
00320         logger.msg(ERROR, "Failed to resolve destination: %s", destination.str());
00321       destination.NextTry(); /* try again */
00322       if (!do_retries)
00323         return dres;
00324       if (!destination.LocationValid())
00325         return dres;
00326     }
00327     bool replication = false;
00328     if (source.IsIndex() && destination.IsIndex())
00329       // check for possible replication
00330       if (source.GetURL() == destination.GetURL()) {
00331         replication = true;
00332         // we do not want to replicate to same physical file
00333         destination.RemoveLocations(source);
00334         if (!destination.HaveLocations()) {
00335           logger.msg(ERROR, "No locations for destination different from source "
00336                      "found: %s", destination.str());
00337           return DataStatus::WriteResolveError;
00338         }
00339       }
00340     // Try to avoid any additional checks meant to provide
00341     // meta-information whenever possible
00342     bool checks_required = destination.AcceptsMeta() && (!replication);
00343     bool destination_meta_initially_stored = destination.Registered();
00344     bool destination_overwrite = false;
00345     if (!replication) { // overwriting has no sense in case of replication
00346       std::string value = destination.GetURL().Option("overwrite", "no");
00347       if (strcasecmp(value.c_str(), "no") != 0)
00348         destination_overwrite = true;
00349     }
00350     if (destination_overwrite) {
00351       if ((destination.IsIndex() && destination_meta_initially_stored)
00352           || (!destination.IsIndex())) {
00353         URL del_url = destination.GetURL();
00354         logger.msg(VERBOSE, "DataMover::Transfer: trying to destroy/overwrite "
00355                    "destination: %s", del_url.str());
00356         int try_num = destination.GetTries();
00357         for (;;) {
00358           DataHandle del(del_url, destination.GetUserConfig());
00359           del->SetTries(1);
00360           DataStatus res = Delete(*del);
00361           if (res == DataStatus::Success)
00362             break;
00363           if (!destination.IsIndex()) {
00364             // pfn has chance to be overwritten directly
00365             logger.msg(INFO, "Failed to delete %s but will still try to upload", del_url.str());
00366             break;
00367           }
00368           logger.msg(INFO, "Failed to delete %s", del_url.str());
00369           destination.NextTry(); /* try again */
00370           if (!do_retries)
00371             return res;
00372           if ((--try_num) <= 0)
00373             return res;
00374         }
00375         if (destination.IsIndex()) {
00376           for (;;) {
00377             DataStatus dres = destination.Resolve(false);
00378             if (dres.Passed()) {
00379               if (destination.HaveLocations())
00380                 break;
00381               logger.msg(ERROR, "No locations for destination found: %s",
00382                          destination.str());
00383             }
00384             else
00385               logger.msg(ERROR, "Failed to resolve destination: %s",
00386                          destination.str());
00387             destination.NextTry(); /* try again */
00388             if (!do_retries)
00389               return dres;
00390             if (!destination.LocationValid())
00391               return dres;
00392           }
00393           destination_meta_initially_stored = destination.Registered();
00394           if (destination_meta_initially_stored) {
00395             logger.msg(INFO, "Deleted but still have locations at %s",
00396                        destination.str());
00397             return DataStatus::WriteResolveError;
00398           }
00399         }
00400       }
00401     }
00402     DataStatus res = DataStatus::TransferError;
00403     int try_num;
00404     for (try_num = 0;; try_num++) { /* cycle for retries */
00405       logger.msg(VERBOSE, "DataMover: cycle");
00406       if ((try_num != 0) && (!do_retries)) {
00407         logger.msg(VERBOSE, "DataMover: no retries requested - exit");
00408         return res;
00409       }
00410       if ((!source.LocationValid()) || (!destination.LocationValid())) {
00411         if (!source.LocationValid())
00412           logger.msg(VERBOSE, "DataMover: source out of tries - exit");
00413         if (!destination.LocationValid())
00414           logger.msg(VERBOSE, "DataMover: destination out of tries - exit");
00415         /* out of tries */
00416         return res;
00417       }
00418       // By putting DataBuffer here, one makes sure it will be always
00419       // destroyed AFTER all DataHandle. This allows for not bothering
00420       // to call stop_reading/stop_writing because they are called in
00421       // destructor of DataHandle.
00422       DataBuffer buffer;
00423       logger.msg(INFO, "Real transfer from %s to %s", source.CurrentLocation().str(), destination.CurrentLocation().str());
00424       /* creating handler for transfer */
00425       source.SetSecure(force_secure);
00426       source.Passive(force_passive);
00427       destination.SetSecure(force_secure);
00428       destination.Passive(force_passive);
00429       destination.SetAdditionalChecks(do_checks);
00430       /* take suggestion from DataHandle about buffer, etc. */
00431       long long int bufsize;
00432       int bufnum;
00433       /* tune buffers */
00434       bufsize = 65536; /* have reasonable buffer size */
00435       bool seekable = destination.WriteOutOfOrder();
00436       source.ReadOutOfOrder(seekable);
00437       bufnum = 1;
00438       if (source.BufSize() > bufsize)
00439         bufsize = source.BufSize();
00440       if (destination.BufSize() > bufsize)
00441         bufsize = destination.BufSize();
00442       if (seekable) {
00443         if (source.BufNum() > bufnum)
00444           bufnum = source.BufNum();
00445         if (destination.BufNum() > bufnum)
00446           bufnum = destination.BufNum();
00447       }
00448       bufnum = bufnum * 2;
00449       logger.msg(VERBOSE, "Creating buffer: %lli x %i", bufsize, bufnum);
00450       /* prepare crc */
00451       CheckSumAny crc;
00452       // Shold we trust indexing service or always compute checksum ?
00453       // Let's trust.
00454       if (destination.AcceptsMeta()) { // may need to compute crc
00455         // Let it be CRC32 by default.
00456         std::string crc_type = destination.GetURL().Option("checksum", destination.DefaultCheckSum());
00457         logger.msg(VERBOSE, "DataMover::Transfer: checksum type is %s", crc_type);
00458         if (!source.CheckCheckSum()) {
00459           crc = crc_type.c_str();
00460           logger.msg(VERBOSE, "DataMover::Transfer: will try to compute crc");
00461         }
00462         else if (CheckSumAny::Type(crc_type.c_str())
00463                  != CheckSumAny::Type(source.GetCheckSum().c_str())) {
00464           crc = crc_type.c_str();
00465           logger.msg(VERBOSE, "DataMover::Transfer: will try to compute crc");
00466         }
00467       }
00468       /* create buffer and tune speed control */
00469       buffer.set(&crc, bufsize, bufnum);
00470       if (!buffer)
00471         logger.msg(INFO, "Buffer creation failed !");
00472       buffer.speed.set_min_speed(min_speed, min_speed_time);
00473       buffer.speed.set_min_average_speed(min_average_speed);
00474       buffer.speed.set_max_inactivity_time(max_inactivity_time);
00475       buffer.speed.verbose(be_verbose);
00476       if (be_verbose) {
00477         if (prefix)
00478           buffer.speed.verbose(std::string(prefix));
00479         else
00480           buffer.speed.verbose(verbose_prefix);
00481         buffer.speed.set_progress_indicator(show_progress);
00482       }
00483       /* checking if current source should be mapped to different location */
00484       /* TODO: make mapped url to be handled by source handle directly */
00485       bool mapped = false;
00486       URL mapped_url;
00487       if (destination.Local()) {
00488         mapped_url = source.CurrentLocation();
00489         mapped = map.map(mapped_url);
00490         /* TODO: copy options to mapped_url */
00491         if (!mapped)
00492           mapped_url = URL();
00493         else {
00494           logger.msg(VERBOSE, "Url is mapped to: %s", mapped_url.str());
00495           if (mapped_url.Protocol() == "link")
00496             /* can't cache links */
00497             cacheable = false;
00498         }
00499       }
00500       // Do not link if user asks. Replace link:// with file://
00501       if ((!source.ReadOnly()) && mapped)
00502         if (mapped_url.Protocol() == "link")
00503           mapped_url.ChangeProtocol("file");
00504       DataHandle mapped_h(mapped_url, source.GetUserConfig());
00505       DataPoint& mapped_p(*mapped_h);
00506       if (mapped_h) {
00507         mapped_p.SetSecure(force_secure);
00508         mapped_p.Passive(force_passive);
00509       }
00510       /* Try to initiate cache (if needed) */
00511       std::string canonic_url = source.str();
00512 #ifndef WIN32
00513       if (cacheable) {
00514         res = DataStatus::Success;
00515         for (;;) { /* cycle for outdated cache files */
00516           bool is_in_cache = false;
00517           bool is_locked = false;
00518           if (!cache.Start(canonic_url, is_in_cache, is_locked)) {
00519             if (is_locked) {
00520               logger.msg(VERBOSE, "Cached file is locked - should retry");
00521               source.NextLocation(); /* to decrease retry counter */
00522               return DataStatus::CacheErrorRetryable;
00523             }
00524             cacheable = false;
00525             logger.msg(INFO, "Failed to initiate cache");
00526             break;
00527           }
00528           if (is_in_cache) {
00529             // check for forced re-download option
00530             std::string cache_option = source.GetURL().Option("cache");
00531             if (cache_option == "renew") {
00532               logger.msg(VERBOSE, "Forcing re-download of file %s", canonic_url);
00533               cache.StopAndDelete(canonic_url);
00534               continue;
00535             }
00536             /* just need to check permissions */
00537             logger.msg(INFO, "File %s is cached (%s) - checking permissions",
00538                        canonic_url, cache.File(canonic_url));
00539             // check the list of cached DNs
00540             bool have_permission = false;
00541             if (cache.CheckDN(canonic_url, dn))
00542               have_permission = true;
00543             else {
00544               DataStatus cres = source.Check();
00545               if (!cres.Passed()) {
00546                 logger.msg(ERROR, "Permission checking failed: %s", source.str());
00547                 cache.Stop(canonic_url);
00548                 source.NextLocation(); /* try another source */
00549                 logger.msg(VERBOSE, "source.next_location");
00550                 res = cres;
00551                 break;
00552               }
00553               cache.AddDN(canonic_url, dn, exp_time);
00554             }
00555             logger.msg(VERBOSE, "Permission checking passed");
00556             /* check if file is fresh enough */
00557             bool outdated = true;
00558             if (have_permission)
00559               outdated = false; // cached DN means don't check creation date
00560             if (source.CheckCreated() && cache.CheckCreated(canonic_url)) {
00561               Time sourcetime = source.GetCreated();
00562               Time cachetime = cache.GetCreated(canonic_url);
00563               logger.msg(VERBOSE, "Source creation date: %s", sourcetime.str());
00564               logger.msg(VERBOSE, "Cache creation date: %s", cachetime.str());
00565               if (sourcetime <= cachetime)
00566                 outdated = false;
00567             }
00568             if (cache.CheckValid(canonic_url)) {
00569               Time validtime = cache.GetValid(canonic_url);
00570               logger.msg(VERBOSE, "Cache file valid until: %s", validtime.str());
00571               if (validtime > Time())
00572                 outdated = false;
00573               else
00574                 outdated = true;
00575             }
00576             if (outdated) {
00577               cache.StopAndDelete(canonic_url);
00578               logger.msg(INFO, "Cached file is outdated, will re-download");
00579               continue;
00580             }
00581             logger.msg(VERBOSE, "Cached copy is still valid");
00582             if (source.ReadOnly() && !executable && !cache_copy) {
00583               logger.msg(VERBOSE, "Linking/copying cached file");
00584               if (!cache.Link(destination.CurrentLocation().Path(), canonic_url)) {
00585                 /* failed cache link is unhandable */
00586                 cache.Stop(canonic_url);
00587                 source.NextLocation(); /* to decrease retry counter */
00588                 return DataStatus::CacheError;
00589               }
00590             }
00591             else {
00592               logger.msg(VERBOSE, "Copying cached file");
00593               if (!cache.Copy(destination.CurrentLocation().Path(), canonic_url, executable)) {
00594                 /* failed cache copy is unhandable */
00595                 cache.Stop(canonic_url);
00596                 source.NextLocation(); /* to decrease retry counter */
00597                 return DataStatus::CacheError;
00598               }
00599             }
00600             cache.Stop(canonic_url);
00601             return DataStatus::Success;
00602             // Leave here. Rest of code below is for transfer.
00603           }
00604           break;
00605         }
00606         if (cacheable && !res.Passed())
00607           continue;
00608       }
00609 #endif /*WIN32*/
00610       if (mapped) {
00611         if ((mapped_url.Protocol() == "link")
00612             || (mapped_url.Protocol() == "file")) {
00613           /* check permissions first */
00614           logger.msg(INFO, "URL is mapped to local access - "
00615                      "checking permissions on original URL");
00616           DataStatus cres =  source.Check();
00617           if (!cres.Passed()) {
00618             logger.msg(ERROR, "Permission checking on original URL failed: %s",
00619                        source.str());
00620             source.NextLocation(); /* try another source */
00621             logger.msg(VERBOSE, "source.next_location");
00622 #ifndef WIN32
00623             if (cacheable)
00624               cache.StopAndDelete(canonic_url);
00625 #endif
00626             res = cres;
00627             continue;
00628           }
00629           logger.msg(VERBOSE, "Permission checking passed");
00630           if (mapped_url.Protocol() == "link") {
00631             logger.msg(VERBOSE, "Linking local file");
00632             const std::string& file_name = mapped_url.Path();
00633             const std::string& link_name = destination.CurrentLocation().Path();
00634             // create directory structure for link_name
00635             {
00636               User user;
00637               std::string dirpath = Glib::path_get_dirname(link_name);
00638               if(dirpath == ".") dirpath = G_DIR_SEPARATOR_S;
00639               if (mkdir_recursive("", dirpath.c_str(), S_IRWXU, user) != 0) {
00640                 if (errno != EEXIST) {
00641                   logger.msg(ERROR, "Failed to create/find directory %s : %s",
00642                              dirpath, StrError());
00643                   source.NextLocation(); /* try another source */
00644                   logger.msg(VERBOSE, "source.next_location");
00645                   res = DataStatus::ReadStartError;
00646                   continue;
00647                 }
00648               }
00649             }
00650             // make link
00651             if (symlink(file_name.c_str(), link_name.c_str()) == -1) {
00652               logger.msg(ERROR, "Failed to make symbolic link %s to %s : %s",
00653                          link_name, file_name, StrError());
00654               source.NextLocation(); /* try another source */
00655               logger.msg(VERBOSE, "source.next_location");
00656               res = DataStatus::ReadStartError;
00657               continue;
00658             }
00659             User user;
00660             (lchown(link_name.c_str(), user.get_uid(), user.get_gid()) != 0);
00661             return DataStatus::Success;
00662             // Leave after making a link. Rest moves data.
00663           }
00664         }
00665       }
00666       URL churl;
00667 #ifndef WIN32
00668       if (cacheable) {
00669         /* create new destination for cache file */
00670         churl = cache.File(canonic_url);
00671         logger.msg(INFO, "cache file: %s", churl.Path());
00672       }
00673 #endif
00674       DataHandle chdest_h(churl, destination.GetUserConfig());
00675       DataPoint& chdest(*chdest_h);
00676       if (chdest_h) {
00677         chdest.SetSecure(force_secure);
00678         chdest.Passive(force_passive);
00679         chdest.SetAdditionalChecks(false); // don't pre-allocate space in cache
00680         chdest.SetMeta(destination); // share metadata
00681       }
00682       DataPoint& source_url = mapped ? mapped_p : source;
00683       DataPoint& destination_url = cacheable ? chdest : destination;
00684       // Disable checks meant to provide meta-information if not needed
00685       source_url.SetAdditionalChecks(do_checks & (checks_required | cacheable));
00686       
00687       // check location meta
00688       if (source_url.GetAdditionalChecks()) {
00689         DataStatus r = source_url.CompareLocationMetadata();
00690         if (!r.Passed()) {
00691           if (r == DataStatus::InconsistentMetadataError)
00692             logger.msg(ERROR, "Meta info of source and location do not match for %s", source_url.str());
00693           if (source.NextLocation())
00694             logger.msg(VERBOSE, "(Re)Trying next source");
00695           res = DataStatus::ReadError;
00696 #ifndef WIN32
00697           if (cacheable)
00698             cache.StopAndDelete(canonic_url);
00699 #endif
00700           continue;
00701         }
00702       }
00703       DataStatus datares = source_url.StartReading(buffer);
00704       if (!datares.Passed()) {
00705         logger.msg(ERROR, "Failed to start reading from source: %s",
00706                    source_url.str());
00707         source_url.StopReading();
00708         res = datares;
00709         if (source.GetFailureReason() != DataStatus::UnknownError)
00710           res = source.GetFailureReason();
00711         /* try another source */
00712         if (source.NextLocation())
00713           logger.msg(VERBOSE, "(Re)Trying next source");
00714 #ifndef WIN32
00715         if (cacheable)
00716           cache.StopAndDelete(canonic_url);
00717 #endif
00718         continue;
00719       }
00720       if (mapped)
00721         destination.SetMeta(mapped_p);
00722       if (force_registration && destination.IsIndex()) {
00723         // at least compare metadata
00724         if (!destination.CompareMeta(source)) {
00725           logger.msg(ERROR, "Metadata of source and destination are different");
00726           source_url.StopReading();
00727           source.NextLocation(); /* not exactly sure if this would help */
00728           res = DataStatus::PreRegisterError;
00729 #ifndef WIN32
00730           if (cacheable)
00731             cache.StopAndDelete(canonic_url);
00732 #endif
00733           continue;
00734         }
00735       }
00736       // pass metadata gathered during start_reading()
00737       // from source to destination
00738       destination.SetMeta(source);
00739       if (chdest_h)
00740         chdest.SetMeta(source);
00741       if (destination.CheckSize())
00742         buffer.speed.set_max_data(destination.GetSize());
00743       datares = destination.PreRegister(replication, force_registration);
00744       if (!datares.Passed()) {
00745         logger.msg(ERROR, "Failed to preregister destination: %s",
00746                    destination.str());
00747         source_url.StopReading();
00748         destination.NextLocation(); /* not exactly sure if this would help */
00749         logger.msg(VERBOSE, "destination.next_location");
00750         res = datares;
00751         // Normally remote destination is not cached. But who knows.
00752 #ifndef WIN32
00753         if (cacheable)
00754           cache.StopAndDelete(canonic_url);
00755 #endif
00756         continue;
00757       }
00758       buffer.speed.reset();
00759       DataStatus read_failure = DataStatus::Success;
00760       DataStatus write_failure = DataStatus::Success;
00761       if (!cacheable) {
00762         datares = destination.StartWriting(buffer);
00763         if (!datares.Passed()) {
00764           logger.msg(ERROR, "Failed to start writing to destination: %s",
00765                      destination.str());
00766           destination.StopWriting();
00767           source_url.StopReading();
00768           if (!destination.PreUnregister(replication ||
00769                                          destination_meta_initially_stored).Passed())
00770             logger.msg(ERROR, "Failed to unregister preregistered lfn. "
00771                        "You may need to unregister it manually: %s", destination.str());
00772           if (destination.NextLocation())
00773             logger.msg(VERBOSE, "(Re)Trying next destination");
00774           res = datares;
00775           if(destination.GetFailureReason() != DataStatus::UnknownError)
00776             res = destination.GetFailureReason();
00777           continue;
00778         }
00779       }
00780       else {
00781 #ifndef WIN32
00782         datares = chdest.StartWriting(buffer);
00783         if (!datares.Passed()) {
00784           // TODO: put callback to clean cache into FileCache
00785           logger.msg(ERROR, "Failed to start writing to cache");
00786           chdest.StopWriting();
00787           source_url.StopReading();
00788           // hope there will be more space next time
00789           cache.StopAndDelete(canonic_url);
00790           if (!destination.PreUnregister(replication ||
00791                                          destination_meta_initially_stored).Passed())
00792             logger.msg(ERROR, "Failed to unregister preregistered lfn. "
00793                        "You may need to unregister it manually");
00794           return DataStatus::CacheError; // repeating won't help here
00795         }
00796 #endif
00797       }
00798       logger.msg(VERBOSE, "Waiting for buffer");
00799       for (; (!buffer.eof_read() || !buffer.eof_write()) && !buffer.error();)
00800         buffer.wait_any();
00801       logger.msg(INFO, "buffer: read eof : %i", (int)buffer.eof_read());
00802       logger.msg(INFO, "buffer: write eof: %i", (int)buffer.eof_write());
00803       logger.msg(INFO, "buffer: error    : %i", (int)buffer.error());
00804       logger.msg(VERBOSE, "Closing read channel");
00805       read_failure = source_url.StopReading();
00806       if (cacheable && mapped)
00807         source.SetMeta(mapped_p); // pass more metadata (checksum)
00808       logger.msg(VERBOSE, "Closing write channel");
00809       // turn off checks during stop_writing() if force is turned on
00810       destination_url.SetAdditionalChecks(!force_registration);
00811       if (!destination_url.StopWriting().Passed())
00812         buffer.error_write(true);
00813 
00814       if (buffer.error()) {
00815 #ifndef WIN32
00816         if (cacheable) 
00817           cache.StopAndDelete(canonic_url);
00818 #endif        
00819         if (!destination.PreUnregister(replication ||
00820                                        destination_meta_initially_stored).Passed())
00821           logger.msg(ERROR, "Failed to unregister preregistered lfn. "
00822                      "You may need to unregister it manually");
00823         // Analyze errors
00824         // Easy part first - if either read or write part report error
00825         // go to next endpoint.
00826         if (buffer.error_read()) {
00827           if (source.NextLocation())
00828             logger.msg(VERBOSE, "(Re)Trying next source");
00829           // check for error from callbacks etc
00830           if(source.GetFailureReason() != DataStatus::UnknownError)
00831             res=source.GetFailureReason();
00832           else
00833             res=DataStatus::ReadError;
00834         }
00835         else if (buffer.error_write()) {
00836           if (destination.NextLocation())
00837             logger.msg(VERBOSE, "(Re)Trying next destination");
00838           // check for error from callbacks etc
00839           if(destination.GetFailureReason() != DataStatus::UnknownError)
00840             res=destination.GetFailureReason();
00841           else
00842             res=DataStatus::WriteError;
00843         }
00844         else if (buffer.error_transfer()) {
00845           // Here is more complicated case - operation timeout
00846           // Let's first check if buffer was full
00847           res = DataStatus::TransferError;
00848           if (!buffer.for_read()) {
00849             // No free buffers for 'read' side. Buffer must be full.
00850             res.SetDesc(destination.GetFailureReason().GetDesc());
00851             if (destination.NextLocation())
00852               logger.msg(VERBOSE, "(Re)Trying next destination");
00853           }
00854           else if (!buffer.for_write()) {
00855             // Buffer is empty
00856             res.SetDesc(source.GetFailureReason().GetDesc());
00857             if (source.NextLocation())
00858               logger.msg(VERBOSE, "(Re)Trying next source");
00859           }
00860           else {
00861             // Both endpoints were very slow? Choose randomly.
00862             logger.msg(VERBOSE, "Cause of failure unclear - choosing randomly");
00863             Glib::Rand r;
00864             if (r.get_int() < (RAND_MAX / 2)) {
00865               res.SetDesc(source.GetFailureReason().GetDesc());
00866               if (source.NextLocation())
00867                 logger.msg(VERBOSE, "(Re)Trying next source");
00868             }
00869             else {
00870               res.SetDesc(destination.GetFailureReason().GetDesc());
00871               if (destination.NextLocation())
00872                 logger.msg(VERBOSE, "(Re)Trying next destination");
00873             }
00874           }
00875         }
00876         continue;
00877       }
00878       // check if checksum is specified as a metadata attribute
00879       if (!destination.GetURL().MetaDataOption("checksumtype").empty() &&
00880           !destination.GetURL().MetaDataOption("checksumvalue").empty()) {
00881         std::string csum = destination.GetURL().MetaDataOption("checksumtype") + ":" + destination.GetURL().MetaDataOption("checksumvalue");
00882         source.SetCheckSum(csum.c_str());
00883         logger.msg(VERBOSE, "DataMove::Transfer: using supplied checksum %s", csum);
00884       }
00885       else if (crc)
00886         if (buffer.checksum_valid()) {
00887           // source.meta_checksum(crc.end());
00888           char buf[100];
00889           crc.print(buf, 100);
00890           source.SetCheckSum(buf);
00891           logger.msg(VERBOSE, "DataMover::Transfer: have valid checksum");
00892         }
00893       destination.SetMeta(source); // pass more metadata (checksum)
00894       datares = destination.PostRegister(replication);
00895       if (!datares.Passed()) {
00896         logger.msg(ERROR, "Failed to postregister destination %s",
00897                    destination.str());
00898         if (!destination.PreUnregister(replication ||
00899                                        destination_meta_initially_stored).Passed())
00900           logger.msg(ERROR, "Failed to unregister preregistered lfn. "
00901                      "You may need to unregister it manually: %s", destination.str());
00902         destination.NextLocation(); /* not sure if this can help */
00903         logger.msg(VERBOSE, "destination.next_location");
00904 #ifndef WIN32
00905         if(cacheable) 
00906           cache.Stop(canonic_url);
00907 #endif
00908         res = datares;
00909         continue;
00910       }
00911 #ifndef WIN32
00912       if (cacheable) {
00913         if (source.CheckValid())
00914           cache.SetValid(canonic_url, source.GetValid());
00915         cache.AddDN(canonic_url, dn, exp_time);
00916         bool cache_link_result;
00917         if (executable || cache_copy) {
00918           logger.msg(VERBOSE, "Copying cached file");
00919           cache_link_result = cache.Copy(destination.CurrentLocation().Path(), canonic_url, executable);
00920         }
00921         else {
00922           logger.msg(VERBOSE, "Linking/copying cached file");
00923           cache_link_result = cache.Link(destination.CurrentLocation().Path(), canonic_url);
00924         }
00925         cache.Stop(canonic_url);
00926         if (!cache_link_result) {
00927           if (!destination.PreUnregister(replication ||
00928                                          destination_meta_initially_stored).Passed())
00929             logger.msg(ERROR, "Failed to unregister preregistered lfn. "
00930                        "You may need to unregister it manually");
00931           return DataStatus::CacheError; /* retry won't help */
00932         }
00933       }
00934 #endif
00935       if (buffer.error())
00936         continue; // should never happen - keep just in case
00937       break;
00938     }
00939     return DataStatus::Success;
00940   }
00941 
00942   void DataMover::secure(bool val) {
00943     force_secure = val;
00944   }
00945 
00946   void DataMover::passive(bool val) {
00947     force_passive = val;
00948   }
00949 
00950   void DataMover::force_to_meta(bool val) {
00951     force_registration = val;
00952   }
00953 
00954 } // namespace Arc