Back to index

nordugrid-arc-nox  1.1.0~rc6
arccp.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #include <string>
00008 #include <list>
00009 
00010 #include <unistd.h>
00011 #include <sys/types.h>
00012 
00013 #include <arc/ArcLocation.h>
00014 #include <arc/Logger.h>
00015 #include <arc/StringConv.h>
00016 #include <arc/URL.h>
00017 #include <arc/User.h>
00018 #include <arc/UserConfig.h>
00019 #include <arc/credential/Credential.h>
00020 #include <arc/data/FileCache.h>
00021 #include <arc/data/DataHandle.h>
00022 #include <arc/data/DataMover.h>
00023 #include <arc/data/URLMap.h>
00024 #include <arc/OptionParser.h>
00025 
00026 static Arc::Logger logger(Arc::Logger::getRootLogger(), "arccp");
00027 
00028 static void progress(FILE *o, const char*, unsigned int,
00029                      unsigned long long int all, unsigned long long int max,
00030                      double, double) {
00031   static int rs = 0;
00032   const char rs_[4] = {
00033     '|', '/', '-', '\\'
00034   };
00035   if (max) {
00036     fprintf(o, "\r|");
00037     unsigned int l = (74 * all + 37) / max;
00038     if (l > 74)
00039       l = 74;
00040     unsigned int i = 0;
00041     for (; i < l; i++)
00042       fprintf(o, "=");
00043     fprintf(o, "%c", rs_[rs++]);
00044     if (rs > 3)
00045       rs = 0;
00046     for (; i < 74; i++)
00047       fprintf(o, " ");
00048     fprintf(o, "|\r");
00049     fflush(o);
00050     return;
00051   }
00052   fprintf(o, "\r%llu kB                    \r", all / 1024);
00053 }
00054 
00055 bool arcregister(const Arc::URL& source_url,
00056                  const Arc::URL& destination_url,
00057                  const Arc::UserConfig& usercfg,
00058                  bool secure,
00059                  bool passive,
00060                  bool force_meta,
00061                  int timeout) {
00062   if (!source_url) {
00063     logger.msg(Arc::ERROR, "Invalid URL: %s", source_url.str());
00064     return false;
00065   }
00066   if (!destination_url) {
00067     logger.msg(Arc::ERROR, "Invalid URL: %s", destination_url.str());
00068     return false;
00069   }
00070   if (source_url.Protocol() == "urllist" &&
00071       destination_url.Protocol() == "urllist") {
00072     std::list<Arc::URL> sources = Arc::ReadURLList(source_url);
00073     std::list<Arc::URL> destinations = Arc::ReadURLList(destination_url);
00074     if (sources.size() == 0) {
00075       logger.msg(Arc::ERROR, "Can't read list of sources from file %s",
00076                  source_url.Path());
00077       return false;
00078     }
00079     if (destinations.size() == 0) {
00080       logger.msg(Arc::ERROR, "Can't read list of destinations from file %s",
00081                  destination_url.Path());
00082       return false;
00083     }
00084     if (sources.size() != destinations.size()) {
00085       logger.msg(Arc::ERROR,
00086                  "Numbers of sources and destinations do not match");
00087       return false;
00088     }
00089     bool r = true;
00090     for (std::list<Arc::URL>::iterator source = sources.begin(),
00091                                        destination = destinations.begin();
00092          (source != sources.end()) && (destination != destinations.end());
00093          source++, destination++)
00094       if(!arcregister(*source, *destination, usercfg, secure, passive,
00095                       force_meta, timeout)) r = false;
00096     return r;
00097   }
00098   if (source_url.Protocol() == "urllist") {
00099     std::list<Arc::URL> sources = Arc::ReadURLList(source_url);
00100     if (sources.size() == 0) {
00101       logger.msg(Arc::ERROR, "Can't read list of sources from file %s",
00102                  source_url.Path());
00103       return false;
00104     }
00105     bool r = true;
00106     for (std::list<Arc::URL>::iterator source = sources.begin();
00107          source != sources.end(); source++)
00108       if(!arcregister(*source, destination_url, usercfg, secure, passive,
00109                       force_meta, timeout)) r = false;
00110     return r;
00111   }
00112   if (destination_url.Protocol() == "urllist") {
00113     std::list<Arc::URL> destinations = Arc::ReadURLList(destination_url);
00114     if (destinations.size() == 0) {
00115       logger.msg(Arc::ERROR, "Can't read list of destinations from file %s",
00116                  destination_url.Path());
00117       return false;
00118     }
00119     bool r = true;
00120     for (std::list<Arc::URL>::iterator destination = destinations.begin();
00121          destination != destinations.end(); destination++)
00122       if(!arcregister(source_url, *destination, usercfg, secure, passive,
00123                       force_meta, timeout)) r = false;
00124     return r;
00125   }
00126 
00127   if (destination_url.Path()[destination_url.Path().length() - 1] == '/') {
00128     logger.msg(Arc::ERROR, "Fileset registration is not supported yet");
00129     return false;
00130   }
00131   if ((source_url.IsSecureProtocol() || destination_url.IsSecureProtocol()) && !Arc::Credential::IsCredentialsValid(usercfg)) {
00132     logger.msg(Arc::ERROR, "Unable to register file (%s): No valid credentials found", source_url.str());
00133     return false;
00134   }
00135   Arc::DataHandle source(source_url, usercfg);
00136   Arc::DataHandle destination(destination_url, usercfg);
00137   if (!source) {
00138     logger.msg(Arc::ERROR, "Unsupported source url: %s", source_url.str());
00139     return false;
00140   }
00141   if (!destination) {
00142     logger.msg(Arc::ERROR, "Unsupported destination url: %s",
00143                destination_url.str());
00144     return false;
00145   }
00146   if (source->IsIndex() || !destination->IsIndex()) {
00147     logger.msg(Arc::ERROR, "For registration source must be ordinary URL"
00148                " and destination must be indexing service");
00149     return false;
00150   }
00151   // Obtain meta-information about source
00152   if (!source->Check()) {
00153     logger.msg(Arc::ERROR, "Source probably does not exist");
00154     return false;
00155   }
00156   // add new location
00157   if (!destination->Resolve(false)) {
00158     logger.msg(Arc::ERROR, "Problems resolving destination");
00159     return false;
00160   }
00161   bool replication = destination->Registered();
00162   destination->SetMeta(*source); // pass metadata
00163   std::string metaname;
00164   // look for similar destination
00165   for (destination->SetTries(1); destination->LocationValid();
00166        destination->NextLocation()) {
00167     const Arc::URL& loc_url = destination->CurrentLocation();
00168     if (loc_url == source_url) {
00169       metaname = destination->CurrentLocationMetadata();
00170       break;
00171     }
00172   }
00173   // remove locations if exist
00174   for (destination->SetTries(1); destination->RemoveLocation();) {}
00175   // add new location
00176   if (metaname.empty())
00177     metaname = source_url.ConnectionURL();
00178   if (!destination->AddLocation(source_url, metaname)) {
00179     destination->PreUnregister(replication);
00180     logger.msg(Arc::ERROR, "Failed to accept new file/destination");
00181     return false;
00182   }
00183   destination->SetTries(1);
00184   if (!destination->PreRegister(replication, force_meta)) {
00185     logger.msg(Arc::ERROR, "Failed to register new file/destination");
00186     return false;
00187   }
00188   if (!destination->PostRegister(replication)) {
00189     destination->PreUnregister(replication);
00190     logger.msg(Arc::ERROR, "Failed to register new file/destination");
00191     return false;
00192   }
00193   return true;
00194 }
00195 
00196 bool arccp(const Arc::URL& source_url_,
00197            const Arc::URL& destination_url_,
00198            const std::string& cache_dir,
00199            const Arc::UserConfig usercfg,
00200            bool secure,
00201            bool passive,
00202            bool force_meta,
00203            int recursion,
00204            int tries,
00205            bool verbose,
00206            int timeout) {
00207   Arc::URL source_url(source_url_);
00208   if (!source_url) {
00209     logger.msg(Arc::ERROR, "Invalid URL: %s", source_url.str());
00210     return false;
00211   }
00212   Arc::URL destination_url(destination_url_);
00213   if (!destination_url) {
00214     logger.msg(Arc::ERROR, "Invalid URL: %s", destination_url.str());
00215     return false;
00216   }
00217   std::string cache_path;
00218   std::string cache_data_path;
00219   std::string id = "<ngcp>";
00220 
00221   if (timeout <= 0)
00222     timeout = 300; // 5 minute default
00223   if (tries < 0)
00224     tries = 0;
00225   if (source_url.Protocol() == "urllist" &&
00226       destination_url.Protocol() == "urllist") {
00227     std::list<Arc::URL> sources = Arc::ReadURLList(source_url);
00228     std::list<Arc::URL> destinations = Arc::ReadURLList(destination_url);
00229     if (sources.size() == 0) {
00230       logger.msg(Arc::ERROR, "Can't read list of sources from file %s",
00231                  source_url.Path());
00232       return false;
00233     }
00234     if (destinations.size() == 0) {
00235       logger.msg(Arc::ERROR, "Can't read list of destinations from file %s",
00236                  destination_url.Path());
00237       return false;
00238     }
00239     if (sources.size() != destinations.size()) {
00240       logger.msg(Arc::ERROR,
00241                  "Numbers of sources and destinations do not match");
00242       return false;
00243     }
00244     bool r = true;
00245     for (std::list<Arc::URL>::iterator source = sources.begin(),
00246                                        destination = destinations.begin();
00247          (source != sources.end()) && (destination != destinations.end());
00248          source++, destination++)
00249       if(!arccp(*source, *destination, cache_dir, usercfg, secure, passive,
00250                 force_meta, recursion, tries, verbose, timeout)) r = false;
00251     return r;
00252   }
00253   if (source_url.Protocol() == "urllist") {
00254     std::list<Arc::URL> sources = Arc::ReadURLList(source_url);
00255     if (sources.size() == 0) {
00256       logger.msg(Arc::ERROR, "Can't read list of sources from file %s",
00257                  source_url.Path());
00258       return false;
00259     }
00260     bool r = true;
00261     for (std::list<Arc::URL>::iterator source = sources.begin();
00262          source != sources.end(); source++)
00263       if(!arccp(*source, destination_url, cache_dir, usercfg, secure,
00264                 passive, force_meta, recursion, tries, verbose, timeout))
00265         r = false;
00266     return r;
00267   }
00268   if (destination_url.Protocol() == "urllist") {
00269     std::list<Arc::URL> destinations = Arc::ReadURLList(destination_url);
00270     if (destinations.size() == 0) {
00271       logger.msg(Arc::ERROR, "Can't read list of destinations from file %s",
00272                  destination_url.Path());
00273       return false;
00274     }
00275     bool r = true;
00276     for (std::list<Arc::URL>::iterator destination = destinations.begin();
00277          destination != destinations.end(); destination++)
00278       if(!arccp(source_url, *destination, cache_dir, usercfg, secure,
00279                 passive, force_meta, recursion, tries, verbose, timeout))
00280         r = false;
00281     return r;
00282   }
00283 
00284   if (destination_url.Path()[destination_url.Path().length() - 1] != '/') {
00285     if (source_url.Path()[source_url.Path().length() - 1] == '/') {
00286       logger.msg(Arc::ERROR,
00287                  "Fileset copy to single object is not supported yet");
00288       return false;
00289     }
00290   }
00291   else {
00292     // Copy TO fileset/directory
00293     if (source_url.Path()[source_url.Path().length() - 1] != '/') {
00294       // Copy FROM single object
00295       std::string::size_type p = source_url.Path().rfind('/');
00296       if (p == std::string::npos) {
00297         logger.msg(Arc::ERROR, "Can't extract object's name from source url");
00298         return false;
00299       }
00300       destination_url.ChangePath(destination_url.Path() +
00301                                  source_url.Path().substr(p + 1));
00302     }
00303     else {
00304       // Fileset copy
00305       // Find out if source can be listed (TODO - through datapoint)
00306       if ((source_url.Protocol() != "rc") &&
00307           (source_url.Protocol() != "rls") &&
00308           (source_url.Protocol() != "fireman") &&
00309           (source_url.Protocol() != "file") &&
00310           (source_url.Protocol() != "se") &&
00311           (source_url.Protocol() != "srm") &&
00312           (source_url.Protocol() != "lfc") &&
00313           (source_url.Protocol() != "gsiftp") &&
00314           (source_url.Protocol() != "ftp")) {
00315         logger.msg(Arc::ERROR,
00316                    "Fileset copy for this kind of source is not supported");
00317         return false;
00318       }
00319       if ((source_url.IsSecureProtocol() || destination_url.IsSecureProtocol()) && !Arc::Credential::IsCredentialsValid(usercfg)) {
00320         logger.msg(Arc::ERROR, "Unable to copy file %s: No valid credentials found", source_url.str());
00321         return false;
00322       }
00323       Arc::DataHandle source(source_url, usercfg);
00324       if (!source) {
00325         logger.msg(Arc::ERROR, "Unsupported source url: %s", source_url.str());
00326         return false;
00327       }
00328       std::list<Arc::FileInfo> files;
00329       if (source->IsIndex()) {
00330         if (!source->ListFiles(files, true, false, false)) {
00331           logger.msg(Arc::ERROR, "Failed listing metafiles");
00332           return false;
00333         }
00334       }
00335       else
00336         if (!source->ListFiles(files, true, false, false)) {
00337           logger.msg(Arc::ERROR, "Failed listing files");
00338           return false;
00339         }
00340       bool failures = false;
00341       // Handle transfer of files first (treat unknown like files)
00342       for (std::list<Arc::FileInfo>::iterator i = files.begin();
00343            i != files.end(); i++) {
00344         if ((i->GetType() != Arc::FileInfo::file_type_unknown) &&
00345             (i->GetType() != Arc::FileInfo::file_type_file))
00346           continue;
00347         logger.msg(Arc::INFO, "Name: %s", i->GetName());
00348         std::string s_url(source_url.str());
00349         std::string d_url(destination_url.str());
00350         s_url += i->GetName();
00351         d_url += i->GetName();
00352         logger.msg(Arc::INFO, "Source: %s", s_url);
00353         logger.msg(Arc::INFO, "Destination: %s", d_url);
00354         Arc::DataHandle source(s_url, usercfg);
00355         Arc::DataHandle destination(d_url, usercfg);
00356         if (!source) {
00357           logger.msg(Arc::INFO, "Unsupported source url: %s", s_url);
00358           continue;
00359         }
00360         if (!destination) {
00361           logger.msg(Arc::INFO, "Unsupported destination url: %s", d_url);
00362           continue;
00363         }
00364         Arc::DataMover mover;
00365         mover.secure(secure);
00366         mover.passive(passive);
00367         mover.verbose(verbose);
00368         mover.force_to_meta(force_meta);
00369         if (tries) {
00370           mover.retry(true); // go through all locations
00371           source->SetTries(tries); // try all locations "tries" times
00372           destination->SetTries(tries);
00373         }
00374         Arc::User cache_user;
00375         Arc::FileCache cache;
00376         if (!cache_dir.empty()) cache = Arc::FileCache(cache_dir+" .", "", cache_user.get_uid(), cache_user.get_gid());
00377         Arc::DataStatus res = mover.Transfer(*source, *destination, cache, Arc::URLMap(),
00378                                              0, 0, 0, timeout);
00379         if (!res.Passed()) {
00380           if (!res.GetDesc().empty())
00381             logger.msg(Arc::INFO, "Current transfer FAILED: %s - %s", std::string(res), res.GetDesc());
00382           else
00383             logger.msg(Arc::INFO, "Current transfer FAILED: %s", std::string(res));
00384           if (res.Retryable())
00385             logger.msg(Arc::ERROR, "This seems like a temporary error, please try again later");
00386           destination->SetTries(1);
00387           // It is not clear how to clear half-registered file. So remove it
00388           // only in case of explicit destination.
00389           if (!(destination->IsIndex()))
00390             mover.Delete(*destination);
00391           failures = true;
00392         }
00393         else
00394           logger.msg(Arc::INFO, "Current transfer complete");
00395       }
00396       if (failures) {
00397         logger.msg(Arc::ERROR, "Some transfers failed");
00398         return false;
00399       }
00400       // Go deeper if allowed
00401       bool r = true;
00402       if (recursion > 0)
00403         // Handle directories recursively
00404         for (std::list<Arc::FileInfo>::iterator i = files.begin();
00405              i != files.end(); i++) {
00406           if (i->GetType() != Arc::FileInfo::file_type_dir)
00407             continue;
00408           if (verbose)
00409             logger.msg(Arc::INFO, "Directory: %s", i->GetName());
00410           std::string s_url(source_url.str());
00411           std::string d_url(destination_url.str());
00412           s_url += i->GetName();
00413           d_url += i->GetName();
00414           s_url += "/";
00415           d_url += "/";
00416           if(!arccp(s_url, d_url, cache_dir, usercfg, secure, passive,
00417                     force_meta, recursion - 1, tries, verbose, timeout))
00418             r = false;
00419         }
00420       return r;
00421     }
00422   }
00423   if ((source_url.IsSecureProtocol() || destination_url.IsSecureProtocol()) && !Arc::Credential::IsCredentialsValid(usercfg)) {
00424     logger.msg(Arc::ERROR, "Unable to copy file %s: No valid credentials found", source_url.str());
00425     return false;
00426   }
00427   Arc::DataHandle source(source_url, usercfg);
00428   Arc::DataHandle destination(destination_url, usercfg);
00429   if (!source) {
00430     logger.msg(Arc::ERROR, "Unsupported source url: %s", source_url.str());
00431     return false;
00432   }
00433   if (!destination) {
00434     logger.msg(Arc::ERROR, "Unsupported destination url: %s",
00435                destination_url.str());
00436     return false;
00437   }
00438   Arc::DataMover mover;
00439   mover.secure(secure);
00440   mover.passive(passive);
00441   mover.verbose(verbose);
00442   mover.force_to_meta(force_meta);
00443   if (tries) { // 0 means default behavior
00444     mover.retry(true); // go through all locations
00445     source->SetTries(tries); // try all locations "tries" times
00446     destination->SetTries(tries);
00447   }
00448   Arc::FileCache cache;
00449   Arc::User cache_user;
00450   // always copy to destination rather than link
00451   if (!cache_dir.empty()) cache = Arc::FileCache(cache_dir+" .", "", cache_user.get_uid(), cache_user.get_gid());
00452   if (verbose)
00453     mover.set_progress_indicator(&progress);
00454   Arc::DataStatus res = mover.Transfer(*source, *destination, cache, Arc::URLMap(),
00455                                        0, 0, 0, timeout);
00456   if (!res.Passed()) {
00457     if (!res.GetDesc().empty())
00458       logger.msg(Arc::ERROR, "Transfer FAILED: %s - %s", std::string(res), res.GetDesc());
00459     else
00460       logger.msg(Arc::ERROR, "Transfer FAILED: %s", std::string(res));
00461     if (res.Retryable())
00462       logger.msg(Arc::ERROR, "This seems like a temporary error, please try again later");
00463     return false;
00464     destination->SetTries(1);
00465     // It is not clear how to clear half-registered file. So remove it only
00466     // in case of explicit destination.
00467     if (!(destination->IsIndex()))
00468       mover.Delete(*destination);
00469   }
00470   logger.msg(Arc::INFO, "Transfer complete");
00471   return true;
00472 }
00473 
00474 int main(int argc, char **argv) {
00475 
00476   setlocale(LC_ALL, "");
00477 
00478   Arc::LogStream logcerr(std::cerr);
00479   logcerr.setFormat(Arc::ShortFormat);
00480   Arc::Logger::getRootLogger().addDestination(logcerr);
00481   Arc::Logger::getRootLogger().setThreshold(Arc::WARNING);
00482 
00483   Arc::ArcLocation::Init(argv[0]);
00484 
00485   Arc::OptionParser options(istring("source destination"),
00486                             istring("The arccp command copies files to, from "
00487                                     "and between grid storage elements."));
00488 
00489   bool passive = false;
00490   options.AddOption('p', "passive",
00491                     istring("use passive transfer (does not work if secure "
00492                             "is on, default if secure is not requested)"),
00493                     passive);
00494 
00495   bool notpassive = false;
00496   options.AddOption('n', "nopassive",
00497                     istring("do not try to force passive transfer"),
00498                     notpassive);
00499 
00500   bool force = false;
00501   options.AddOption('f', "force",
00502                     istring("if the destination is an indexing service "
00503                             "and not the same as the source and the "
00504                             "destination is already registered, then "
00505                             "the copy is normally not done. However, if "
00506                             "this option is specified the source is "
00507                             "assumed to be a replica of the destination "
00508                             "created in an uncontrolled way and the "
00509                             "copy is done like in case of replication. "
00510                             "Using this option also skips validation of "
00511                             "completed transfers."),
00512                     force);
00513 
00514   bool verbose = false;
00515   options.AddOption('i', "indicate", istring("show progress indicator"),
00516                     verbose);
00517 
00518   bool nocopy = false;
00519   options.AddOption('T', "notransfer",
00520                     istring("do not transfer file, just register it - "
00521                             "destination must be non-existing meta-url"),
00522                     nocopy);
00523 
00524   bool secure = false;
00525   options.AddOption('u', "secure",
00526                     istring("use secure transfer (insecure by default)"),
00527                     secure);
00528 
00529   std::string cache_path;
00530   options.AddOption('y', "cache",
00531                     istring("path to local cache (use to put file into cache). "
00532                             "The X509_USER_PROXY and X509_CERT_DIR environment "
00533                             "variables must be set correctly."),
00534                     istring("path"), cache_path);
00535 
00536   int recursion = 0;
00537   options.AddOption('r', "recursive",
00538                     istring("operate recursively up to specified level"),
00539                     istring("level"), recursion);
00540 
00541   int retries = 0;
00542   options.AddOption('R', "retries",
00543                     istring("number of retries before failing file transfer"),
00544                     istring("number"), retries);
00545 
00546   int timeout = 20;
00547   options.AddOption('t', "timeout", istring("timeout in seconds (default 20)"),
00548                     istring("seconds"), timeout);
00549 
00550   std::string conffile;
00551   options.AddOption('z', "conffile",
00552                     istring("configuration file (default ~/.arc/client.conf)"),
00553                     istring("filename"), conffile);
00554 
00555   std::string debug;
00556   options.AddOption('d', "debug",
00557                     istring("FATAL, ERROR, WARNING, INFO, VERBOSE or DEBUG"),
00558                     istring("debuglevel"), debug);
00559 
00560   bool version = false;
00561   options.AddOption('v', "version", istring("print version information"),
00562                     version);
00563 
00564   std::list<std::string> params = options.Parse(argc, argv);
00565 
00566   // If debug is specified as argument, it should be set before loading the configuration.
00567   if (!debug.empty())
00568     Arc::Logger::getRootLogger().setThreshold(Arc::string_to_level(debug));
00569 
00570   Arc::UserConfig usercfg(conffile);
00571   if (!usercfg) {
00572     logger.msg(Arc::ERROR, "Failed configuration initialization");
00573     return 1;
00574   }
00575   usercfg.UtilsDirPath(Arc::UserConfig::ARCUSERDIRECTORY);
00576 
00577   if (debug.empty() && !usercfg.Verbosity().empty())
00578     Arc::Logger::getRootLogger().setThreshold(Arc::string_to_level(usercfg.Verbosity()));
00579 
00580   if (version) {
00581     std::cout << Arc::IString("%s version %s", "arccp", VERSION) << std::endl;
00582     return 0;
00583   }
00584 
00585   if (params.size() != 2) {
00586     logger.msg(Arc::ERROR, "Wrong number of parameters specified");
00587     return 1;
00588   }
00589 
00590   if (passive && notpassive) {
00591     logger.msg(Arc::ERROR, "Options 'p' and 'n' can't be used simultaneously");
00592     return 1;
00593   }
00594 
00595   if ((!secure) && (!notpassive))
00596     passive = true;
00597 
00598   std::list<std::string>::iterator it = params.begin();
00599   std::string source = *it;
00600   ++it;
00601   std::string destination = *it;
00602 
00603   if (nocopy) {
00604     if(!arcregister(source, destination, usercfg, secure, passive, force, timeout))
00605       return 1;
00606   } else {
00607     if(!arccp(source, destination, cache_path, usercfg, secure, passive, force,
00608           recursion, retries + 1, verbose, timeout))
00609       return 1;
00610   }
00611 
00612   return 0;
00613 }