Back to index

nordugrid-arc-nox  1.1.0~rc6
stage.cpp
Go to the documentation of this file.
00001 #ifdef HAVE_CONFIG_H
00002 #include <config.h>
00003 #endif
00004 
00005 #include <arc/XMLNode.h>
00006 #include <arc/GUID.h>
00007 #include <arc/User.h>
00008 #include <arc/UserConfig.h>
00009 #include <arc/URL.h>
00010 #include <arc/Logger.h>
00011 
00012 #include <arc/data/FileCache.h>
00013 #include <arc/data/DataHandle.h>
00014 #include <arc/data/DataMover.h>
00015 #include <arc/data/URLMap.h>
00016 
00017 #include "paul.h"
00018 
00019 namespace Paul
00020 {
00021 
00022 class PointPair 
00023 {
00024     private:
00025         Arc::URL source_url;
00026         Arc::URL destination_url;
00027     public:
00028         Arc::DataHandle source;
00029         Arc::DataHandle destination;
00030         PointPair(const std::string& source_str,
00031                   const std::string& destination_str,
00032                   const Arc::UserConfig& usercfg)
00033           : source_url(source_str),
00034             destination_url(destination_str),
00035             source(source_url, usercfg),
00036             destination(destination_url, usercfg) {};
00037         ~PointPair(void) {};
00038 };
00039 
00040 class FileTransfer
00041 {
00042     private:
00043         Arc::DataMover *mover;
00044         Arc::FileCache *cache;
00045         Arc::URLMap url_map;
00046         Arc::Logger logger_;
00047         unsigned long long int min_speed;
00048         time_t min_speed_time;
00049         unsigned long long int min_average_speed;
00050         time_t max_inactivity_time;
00051         std::string cache_path;
00052 
00053     public:
00054         ~FileTransfer()
00055         {
00056             delete mover;
00057             delete cache;
00058         };
00059 
00060         FileTransfer(const std::string &_cache_path):logger_(Arc::Logger::rootLogger, "Paul-FileTransfer") {
00061             cache_path = _cache_path;
00062             logger_.msg(Arc::VERBOSE, "Filetransfer created");
00063     
00064         };
00065 
00066         void create_cache(Job &j)
00067         {
00068             // Create cache
00069             Arc::User cache_user;
00070             std::string job_id = j.getID();
00071             std::string cache_dir = cache_path;
00072 #ifndef WIN32
00073             cache = new Arc::FileCache (cache_dir, job_id, 
00074 #else
00075             std::string cache_data_dir = cache_dir; 
00076             std::string cache_link_dir;
00077             cache = new Arc::FileCache (cache_dir, cache_data_dir,
00078                                         cache_link_dir, job_id,
00079 #endif
00080                                         cache_user.get_uid(),
00081                                         cache_user.get_gid());
00082         }
00083 
00084         void download(const std::string &job_root, Job &j)
00085         {
00086             // Create mover
00087             mover = new Arc::DataMover();
00088             mover->retry(true);
00089             mover->secure(false); // XXX what if I download form https url? 
00090             mover->passive(false);
00091             mover->verbose(true);
00092             min_speed = 0;
00093             min_speed_time = 300;
00094             min_average_speed = 0;
00095             max_inactivity_time = 300;
00096             if (min_speed != 0) {
00097                 mover->set_default_min_speed(min_speed,min_speed_time);
00098             }
00099             if (min_average_speed != 0) {
00100                 mover->set_default_min_average_speed(min_average_speed);
00101             }
00102             if(max_inactivity_time != 0) {
00103                 mover->set_default_max_inactivity_time(max_inactivity_time);
00104             }
00105             create_cache(j);
00106             logger_.msg(Arc::VERBOSE, "download");
00107             Arc::XMLNode jd = j.getJSDL()["JobDescription"];
00108             std::string xml_str;
00109             j.getJSDL().GetXML(xml_str);
00110             logger_.msg(Arc::VERBOSE, xml_str);
00111             Arc::XMLNode ds;
00112             for (int i = 0; (ds = jd["DataStaging"][i]) != false; i++) {
00113                 std::string dest = Glib::build_filename(Glib::build_filename(job_root, j.getID()), (std::string)ds["FileName"]);
00114                 Arc::XMLNode s = ds["Source"];
00115                 if (s == false) {
00116                     // it should not download
00117                     continue;
00118                 }
00119                 std::string src = (std::string)s["URI"];
00120                 logger_.msg(Arc::VERBOSE, "%s -> %s", src, dest);
00121     
00122                 std::string failure;
00123                 Arc::UserConfig usercfg(true);
00124                 PointPair *pair = new PointPair(src, dest, usercfg);
00125                 if (pair->source == NULL) {
00126                     logger_.msg(Arc::ERROR, "Cannot accept source as URL");
00127                     delete pair;
00128                     continue;
00129                 }
00130                 if (pair->destination == NULL) {
00131                     logger_.msg(Arc::ERROR, "Cannot accept destination as URL");
00132                     delete pair;
00133                     continue;
00134                 }
00135                 Arc::DataStatus res = mover->Transfer(*(pair->source), *(pair->destination), 
00136                                                       *cache, url_map, 
00137                                                       min_speed, min_speed_time, 
00138                                                       min_average_speed, max_inactivity_time); 
00139                 if (!res.Passed()) {
00140                     logger_.msg(Arc::ERROR, std::string(res));
00141                     delete pair;
00142                     continue;
00143                 }
00144                 delete pair;
00145             }
00146         };
00147 
00148         void upload(const std::string &job_root, Job &j)
00149         {
00150             // Create mover
00151             mover = new Arc::DataMover();
00152             mover->retry(true);
00153             mover->secure(false); // XXX what if I download form https url? 
00154             mover->passive(false);
00155             mover->verbose(true);
00156             min_speed = 0;
00157             min_speed_time = 300;
00158             min_average_speed = 0;
00159             max_inactivity_time = 300;
00160             if (min_speed != 0) {
00161                 mover->set_default_min_speed(min_speed,min_speed_time);
00162             }
00163             if (min_average_speed != 0) {
00164                 mover->set_default_min_average_speed(min_average_speed);
00165             }
00166             if(max_inactivity_time != 0) {
00167                 mover->set_default_max_inactivity_time(max_inactivity_time);
00168             }
00169             create_cache(j);
00170             Arc::XMLNode jd = j.getJSDL()["JobDescription"];
00171             Arc::XMLNode ds;
00172             for (int i = 0; (ds = jd["DataStaging"][i]) != false; i++) {
00173                 std::string src = Glib::build_filename(Glib::build_filename(job_root, j.getID()), (std::string)ds["FileName"]);
00174                 Arc::XMLNode d = ds["Target"];
00175                 if (d == false) {
00176                     // it should not upload
00177                     continue;
00178                 }
00179                 std::string dest = (std::string)d["URI"];
00180                 logger_.msg(Arc::VERBOSE, "%s -> %s", src, dest);
00181     
00182                 std::string failure;
00183                 Arc::UserConfig usercfg(true);
00184                 PointPair *pair = new PointPair(src, dest, usercfg);
00185                 if (pair->source == NULL) {
00186                     logger_.msg(Arc::ERROR, "Cannot accept source as URL");
00187                     delete pair;
00188                     continue;
00189                 }
00190                 if (pair->destination == NULL) {
00191                     logger_.msg(Arc::ERROR, "Cannot accept destination as URL");
00192                     delete pair;
00193                     continue;
00194                 }
00195                 Arc::DataStatus res = mover->Transfer(*(pair->source), *(pair->destination), 
00196                                                       *cache, url_map, 
00197                                                       min_speed, min_speed_time, 
00198                                                       min_average_speed, max_inactivity_time);
00199                 if (!res.Passed()) {
00200                     logger_.msg(Arc::ERROR, std::string(res));
00201                     delete pair;
00202                     continue;
00203                 }
00204                 logger_.msg(Arc::VERBOSE, "Transfer completed");
00205                 delete pair;
00206             }
00207         };
00208         
00209 };
00210 
00211 bool PaulService::stage_in(Job &j)
00212 {
00213     logger_.msg(Arc::VERBOSE, "Stage in");
00214     
00215     FileTransfer ft(configurator.getCachePath());
00216     ft.download(configurator.getJobRoot(), j);
00217     return true;
00218 }
00219 
00220 bool PaulService::stage_out(Job &j)
00221 {
00222     logger_.msg(Arc::VERBOSE, "Stage out");
00223     FileTransfer ft(configurator.getCachePath());
00224     ft.upload(configurator.getJobRoot(), j);
00225     return true;
00226 }
00227 
00228 }