Back to index

nordugrid-arc-nox  1.1.0~rc6
DataPointGridFTP.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #include <openssl/ssl.h>
00008 
00009 #include <arc/Logger.h>
00010 #include <arc/StringConv.h>
00011 #include <arc/UserConfig.h>
00012 #include <arc/data/DataBuffer.h>
00013 #include <arc/globusutils/GlobusErrorUtils.h>
00014 #include <arc/globusutils/GlobusWorkarounds.h>
00015 #include <arc/globusutils/GSSCredential.h>
00016 #include <arc/crypto/OpenSSL.h>
00017 
00018 #include "DataPointGridFTP.h"
00019 #include "Lister.h"
00020 
00021 namespace Arc {
00022 
00023   static bool proxy_initialized = false;
00024 
00025   Logger DataPointGridFTP::logger(DataPoint::logger, "GridFTP");
00026 
00027   void DataPointGridFTP::ftp_complete_callback(void *arg,
00028                                                globus_ftp_client_handle_t*,
00029                                                globus_object_t *error) {
00030     DataPointGridFTP *it = (DataPointGridFTP*)arg;
00031     if (error == GLOBUS_SUCCESS) {
00032       logger.msg(DEBUG, "ftp_complete_callback: success");
00033       it->condstatus = DataStatus::Success;
00034       it->cond.signal();
00035     }
00036     else {
00037       logger.msg(VERBOSE, "ftp_complete_callback: error: %s",
00038                  globus_object_to_string(error));
00039       it->condstatus = DataStatus::TransferError;
00040       it->cond.signal();
00041     }
00042   }
00043 
00044   void DataPointGridFTP::ftp_check_callback(void *arg,
00045                                             globus_ftp_client_handle_t*,
00046                                             globus_object_t *error,
00047                                             globus_byte_t*,
00048                                             globus_size_t,
00049                                             globus_off_t,
00050                                             globus_bool_t eof) {
00051     logger.msg(VERBOSE, "ftp_check_callback");
00052     DataPointGridFTP *it = (DataPointGridFTP*)arg;
00053     if (error != GLOBUS_SUCCESS) {
00054       logger.msg(VERBOSE, "Globus error: %s", globus_object_to_string(error));
00055       return;
00056     }
00057     if (eof)
00058       return;
00059     GlobusResult res =
00060       globus_ftp_client_register_read(&(it->ftp_handle),
00061                                       (globus_byte_t*)(it->ftp_buf),
00062                                       sizeof(it->ftp_buf),
00063                                       &ftp_check_callback, it);
00064     if (!res) {
00065       logger.msg(INFO,
00066                  "Registration of Globus FTP buffer failed - cancel check");
00067       logger.msg(VERBOSE, "Globus error: %s", res.str());
00068       globus_ftp_client_abort(&(it->ftp_handle));
00069       return;
00070     }
00071     return;
00072   }
00073 
00074   DataStatus DataPointGridFTP::Check() {
00075     if (!ftp_active)
00076       return DataStatus::NotInitializedError;
00077     if (reading)
00078       return DataStatus::IsReadingError;
00079     if (writing)
00080       return DataStatus::IsWritingError;
00081     GlobusResult res;
00082     globus_off_t size = 0;
00083     globus_abstime_t gl_modify_time;
00084     time_t modify_time;
00085     int modify_utime;
00086     set_attributes();
00087     res = globus_ftp_client_size(&ftp_handle, url.str().c_str(), &ftp_opattr,
00088                                  &size, &ftp_complete_callback, this);
00089     if (!res) {
00090       logger.msg(VERBOSE, "check_ftp: globus_ftp_client_size failed");
00091       logger.msg(INFO, "Globus error: %s", res.str());
00092     }
00093     else if (!cond.wait(1000*usercfg.Timeout())) {
00094       logger.msg(INFO, "check_ftp: timeout waiting for size");
00095       globus_ftp_client_abort(&ftp_handle);
00096       cond.wait();
00097     }
00098     else if (!condstatus)
00099       logger.msg(INFO, "check_ftp: failed to get file's size");
00100     else {
00101       SetSize(size);
00102       logger.msg(VERBOSE, "check_ftp: obtained size: %lli", GetSize());
00103     }
00104     res = globus_ftp_client_modification_time(&ftp_handle, url.str().c_str(),
00105                                               &ftp_opattr, &gl_modify_time,
00106                                               &ftp_complete_callback, this);
00107     if (!res) {
00108       logger.msg(VERBOSE,
00109                  "check_ftp: globus_ftp_client_modification_time failed");
00110       logger.msg(INFO, "Globus error: %s", res.str());
00111     }
00112     else if (!cond.wait(1000*usercfg.Timeout())) {
00113       logger.msg(INFO, "check_ftp: timeout waiting for modification_time");
00114       globus_ftp_client_abort(&ftp_handle);
00115       cond.wait();
00116     }
00117     else if (!condstatus)
00118       logger.msg(INFO, "check_ftp: failed to get file's modification time");
00119     else {
00120       GlobusTimeAbstimeGet(gl_modify_time, modify_time, modify_utime);
00121       SetCreated(modify_time);
00122       logger.msg(VERBOSE, "check_ftp: obtained creation date: %s", GetCreated().str());
00123     }
00124     // Do not use partial_get for ordinary ftp. Stupid globus tries to
00125     // use non-standard commands anyway.
00126     if (is_secure) {
00127       res = globus_ftp_client_partial_get(&ftp_handle, url.str().c_str(),
00128                                           &ftp_opattr, GLOBUS_NULL, 0, 1,
00129                                           &ftp_complete_callback, this);
00130       if (!res) {
00131         logger.msg(VERBOSE, "check_ftp: globus_ftp_client_get failed");
00132         logger.msg(INFO, "Globus error: %s", res.str());
00133         return DataStatus::CheckError;
00134       }
00135       // use eof_flag to pass result from callback
00136       ftp_eof_flag = false;
00137       logger.msg(VERBOSE, "check_ftp: globus_ftp_client_register_read");
00138       res = globus_ftp_client_register_read(&ftp_handle,
00139                                             (globus_byte_t*)ftp_buf,
00140                                             sizeof(ftp_buf),
00141                                             &ftp_check_callback, this);
00142       if (!res) {
00143         globus_ftp_client_abort(&ftp_handle);
00144         cond.wait();
00145         return DataStatus::CheckError;
00146       }
00147       if (!cond.wait(1000*usercfg.Timeout())) {
00148         logger.msg(INFO, "check_ftp: timeout waiting for partial get");
00149         globus_ftp_client_abort(&ftp_handle);
00150         cond.wait();
00151         return DataStatus::CheckError;
00152       }
00153       return condstatus;
00154     }
00155     else {
00156       // Do not use it at all. It does not give too much useful
00157       // information anyway. But request at least existence of file.
00158       if (!CheckSize())
00159         return DataStatus::CheckError;
00160       return DataStatus::Success;
00161     }
00162   }
00163 
00164   DataStatus DataPointGridFTP::Remove() {
00165     if (!ftp_active)
00166       return DataStatus::NotInitializedError;
00167     if (reading)
00168       return DataStatus::IsReadingError;
00169     if (writing)
00170       return DataStatus::IsWritingError;
00171     GlobusResult res;
00172     set_attributes();
00173     res = globus_ftp_client_delete(&ftp_handle, url.str().c_str(),
00174                                    &ftp_opattr, &ftp_complete_callback, this);
00175     if (!res) {
00176       logger.msg(VERBOSE, "delete_ftp: globus_ftp_client_delete failed");
00177       logger.msg(INFO, "Globus error: %s", res.str());
00178       return DataStatus::DeleteError;
00179     }
00180     if (!cond.wait(1000*usercfg.Timeout())) {
00181       logger.msg(INFO, "delete_ftp: globus_ftp_client_delete timeout");
00182       globus_ftp_client_abort(&ftp_handle);
00183       cond.wait();
00184       return DataStatus::DeleteError;
00185     }
00186     return condstatus;
00187   }
00188 
00189   static bool remove_last_dir(std::string& dir) {
00190     // dir also contains proto and server
00191     std::string::size_type nn = std::string::npos;
00192     if (!strncasecmp(dir.c_str(), "ftp://", 6))
00193       nn = dir.find('/', 6);
00194     else if (!strncasecmp(dir.c_str(), "gsiftp://", 9))
00195       nn = dir.find('/', 9);
00196     if (nn == std::string::npos)
00197       return false;
00198     std::string::size_type n;
00199     if ((n = dir.rfind('/')) == std::string::npos)
00200       return false;
00201     if (n < nn)
00202       return false;
00203     dir.resize(n);
00204     return true;
00205   }
00206 
00207   static bool add_last_dir(std::string& dir, const std::string& path) {
00208     int l = dir.length();
00209     std::string::size_type n = path.find('/', l + 1);
00210     if (n == std::string::npos)
00211       return false;
00212     dir = path;
00213     dir.resize(n);
00214     return true;
00215   }
00216 
00217   bool DataPointGridFTP::mkdir_ftp() {
00218     ftp_dir_path = url.str();
00219     for (;;)
00220       if (!remove_last_dir(ftp_dir_path))
00221         break;
00222     bool result = false;
00223     for (;;) {
00224       if (!add_last_dir(ftp_dir_path, url.str()))
00225         break;
00226       logger.msg(VERBOSE, "mkdir_ftp: making %s", ftp_dir_path);
00227       GlobusResult res =
00228         globus_ftp_client_mkdir(&ftp_handle, ftp_dir_path.c_str(), &ftp_opattr,
00229                                 &ftp_complete_callback, this);
00230       if (!res) {
00231         logger.msg(INFO, "Globus error: %s", res.str());
00232         return false;
00233       }
00234       if (!cond.wait(1000*usercfg.Timeout())) {
00235         logger.msg(INFO, "mkdir_ftp: timeout waiting for mkdir");
00236         /* timeout - have to cancel operation here */
00237         globus_ftp_client_abort(&ftp_handle);
00238         cond.wait();
00239         return false;
00240       }
00241       if (!condstatus)
00242         result = condstatus;
00243     }
00244     return result;
00245   }
00246 
00247   DataStatus DataPointGridFTP::StartReading(DataBuffer& buf) {
00248     if (!ftp_active)
00249       return DataStatus::NotInitializedError;
00250     if (reading)
00251       return DataStatus::IsReadingError;
00252     if (writing)
00253       return DataStatus::IsWritingError;
00254     set_attributes();
00255     reading = true;
00256     buffer = &buf;
00257     bool limit_length = false;
00258     unsigned long long int range_length = 0;
00259     if (range_end > range_start) {
00260       range_length = range_end - range_start;
00261       limit_length = true;
00262     }
00263     logger.msg(VERBOSE, "start_reading_ftp");
00264     ftp_eof_flag = false;
00265     globus_ftp_client_handle_cache_url_state(&ftp_handle, url.str().c_str());
00266     GlobusResult res;
00267     logger.msg(VERBOSE, "start_reading_ftp: globus_ftp_client_get");
00268     if (limit_length)
00269       res = globus_ftp_client_partial_get(&ftp_handle, url.str().c_str(),
00270                                           &ftp_opattr, GLOBUS_NULL,
00271                                           range_start,
00272                                           range_start + range_length + 1,
00273                                           &ftp_get_complete_callback, this);
00274     else
00275       res = globus_ftp_client_get(&ftp_handle, url.str().c_str(),
00276                                   &ftp_opattr, GLOBUS_NULL,
00277                                   &ftp_get_complete_callback, this);
00278     if (!res) {
00279       logger.msg(VERBOSE, "start_reading_ftp: globus_ftp_client_get failed");
00280       logger.msg(INFO, "Globus error: %s", res.str());
00281       globus_ftp_client_handle_flush_url_state(&ftp_handle, url.str().c_str());
00282       buffer->error_read(true);
00283       reading = false;
00284       return DataStatus::ReadStartError;
00285     }
00286     if (globus_thread_create(&ftp_control_thread, GLOBUS_NULL,
00287                              &ftp_read_thread, this) != 0) {
00288       logger.msg(VERBOSE, "start_reading_ftp: globus_thread_create failed");
00289       globus_ftp_client_abort(&ftp_handle);
00290       cond.wait();
00291       globus_ftp_client_handle_flush_url_state(&ftp_handle, url.str().c_str());
00292       buffer->error_read(true);
00293       reading = false;
00294       return DataStatus::ReadStartError;
00295     }
00296     // make sure globus has thread for handling network/callbacks
00297     globus_thread_blocking_will_block();
00298     return DataStatus::Success;
00299   }
00300 
00301   DataStatus DataPointGridFTP::StopReading() {
00302     if (!reading)
00303       return DataStatus::ReadStopError;
00304     reading = false;
00305     if (!buffer->eof_read()) {
00306       logger.msg(VERBOSE, "stop_reading_ftp: aborting connection");
00307       globus_ftp_client_abort(&ftp_handle);
00308     }
00309     logger.msg(VERBOSE, "stop_reading_ftp: waiting for transfer to finish");
00310     cond.wait();
00311     logger.msg(VERBOSE, "stop_reading_ftp: exiting: %s", url.str());
00312     globus_ftp_client_handle_flush_url_state(&ftp_handle, url.str().c_str());
00313     return condstatus;
00314   }
00315 
00316   void* DataPointGridFTP::ftp_read_thread(void *arg) {
00317     DataPointGridFTP *it = (DataPointGridFTP*)arg;
00318     int h;
00319     unsigned int l;
00320     GlobusResult res;
00321     int registration_failed = 0;
00322     logger.msg(INFO, "ftp_read_thread: get and register buffers");
00323     int n_buffers = 0;
00324     for (;;) {
00325       if (it->buffer->eof_read())
00326         break;
00327       if (!it->buffer->for_read(h, l, true)) { /* eof or error */
00328         if (it->buffer->error()) { /* error -> abort reading */
00329           logger.msg(VERBOSE, "ftp_read_thread: for_read failed - aborting: %s",
00330                      it->url.str());
00331           globus_ftp_client_abort(&(it->ftp_handle));
00332         }
00333         break;
00334       }
00335       res =
00336         globus_ftp_client_register_read(&(it->ftp_handle),
00337                                         (globus_byte_t*)((*(it->buffer))[h]),
00338                                         l, &(it->ftp_read_callback), it);
00339       if (!res) {
00340         logger.msg(DEBUG, "ftp_read_thread: Globus error: %s", res.str());
00341         registration_failed++;
00342         if (registration_failed >= 10) {
00343           it->buffer->is_read(h, 0, 0);
00344           it->buffer->error_read(true);
00345           // can set eof here because no callback will be called (I guess).
00346           it->buffer->eof_read(true);
00347           logger.msg(DEBUG, "ftp_read_thread: "
00348                      "too many registration failures - abort: %s",
00349                      it->url.str());
00350         }
00351         else {
00352           logger.msg(DEBUG, "ftp_read_thread: "
00353                      "failed to register globus buffer - will try later: %s",
00354                      it->url.str());
00355           it->buffer->is_read(h, 0, 0);
00356           sleep(1);
00357         }
00358       }
00359       else
00360         n_buffers++;
00361     }
00362     /* make sure complete callback is called */
00363     logger.msg(VERBOSE, "ftp_read_thread: waiting for eof");
00364     it->buffer->wait_eof_read();
00365     logger.msg(VERBOSE, "ftp_read_thread: exiting");
00366     it->condstatus = it->buffer->error_read() ? DataStatus::ReadError :
00367                      DataStatus::Success;
00368     it->cond.signal();
00369     return NULL;
00370   }
00371 
00372   void DataPointGridFTP::ftp_read_callback(void *arg,
00373                                            globus_ftp_client_handle_t*,
00374                                            globus_object_t *error,
00375                                            globus_byte_t *buffer,
00376                                            globus_size_t length,
00377                                            globus_off_t offset,
00378                                            globus_bool_t eof) {
00379     DataPointGridFTP *it = (DataPointGridFTP*)arg;
00380     if (error != GLOBUS_SUCCESS) {
00381       logger.msg(VERBOSE, "ftp_read_callback: failure");
00382       it->buffer->is_read((char*)buffer, 0, 0);
00383       return;
00384     }
00385     logger.msg(DEBUG, "ftp_read_callback: success");
00386     it->buffer->is_read((char*)buffer, length, offset);
00387     if (eof)
00388       it->ftp_eof_flag = true;
00389     return;
00390   }
00391 
00392   void DataPointGridFTP::ftp_get_complete_callback(void *arg,
00393                                                    globus_ftp_client_handle_t*,
00394                                                    globus_object_t *error) {
00395     DataPointGridFTP *it = (DataPointGridFTP*)arg;
00396     /* data transfer finished */
00397     if (error != GLOBUS_SUCCESS) {
00398       logger.msg(INFO, "Failed to get ftp file");
00399       logger.msg(VERBOSE, "Globus error: %s", globus_object_to_string(error));
00400       it->failure_code = DataStatus(DataStatus::ReadStartError, globus_object_to_string(error));
00401       it->buffer->error_read(true);
00402       return;
00403     }
00404     it->buffer->eof_read(true);
00405     return;
00406   }
00407 
00408   DataStatus DataPointGridFTP::StartWriting(DataBuffer& buf,
00409                                             DataCallback*) {
00410     if (!ftp_active)
00411       return DataStatus::NotInitializedError;
00412     if (reading)
00413       return DataStatus::IsReadingError;
00414     if (writing)
00415       return DataStatus::IsWritingError;
00416     set_attributes();
00417     writing = true;
00418     buffer = &buf;
00419     /* size of file first */
00420     bool limit_length = false;
00421     unsigned long long int range_length = 0;
00422     if (range_end > range_start) {
00423       range_length = range_end - range_start;
00424       limit_length = true;
00425     }
00426     ftp_eof_flag = false;
00427     GlobusResult res;
00428     globus_ftp_client_handle_cache_url_state(&ftp_handle, url.str().c_str());
00429     if (autodir) {
00430       logger.msg(VERBOSE, "start_writing_ftp: mkdir");
00431       if (!mkdir_ftp())
00432         logger.msg(VERBOSE,
00433                    "start_writing_ftp: mkdir failed - still trying to write");
00434     }
00435     logger.msg(VERBOSE, "start_writing_ftp: put");
00436     if (limit_length)
00437       res = globus_ftp_client_partial_put(&ftp_handle, url.str().c_str(),
00438                                           &ftp_opattr, GLOBUS_NULL,
00439                                           range_start,
00440                                           range_start + range_length,
00441                                           &ftp_put_complete_callback, this);
00442     else
00443       res = globus_ftp_client_put(&ftp_handle, url.str().c_str(),
00444                                   &ftp_opattr, GLOBUS_NULL,
00445                                   &ftp_put_complete_callback, this);
00446     if (!res) {
00447       logger.msg(VERBOSE, "start_writing_ftp: put failed");
00448       logger.msg(INFO, "Globus error: %s", res.str());
00449       globus_ftp_client_handle_flush_url_state(&ftp_handle, url.str().c_str());
00450       buffer->error_write(true);
00451       writing = false;
00452       return DataStatus::WriteStartError;
00453     }
00454     if (globus_thread_create(&ftp_control_thread, GLOBUS_NULL,
00455                              &ftp_write_thread, this) != 0) {
00456       logger.msg(VERBOSE, "start_writing_ftp: globus_thread_create failed");
00457       globus_ftp_client_handle_flush_url_state(&ftp_handle, url.str().c_str());
00458       buffer->error_write(true);
00459       writing = false;
00460       return DataStatus::WriteStartError;
00461     }
00462     // make sure globus has thread for handling network/callbacks
00463     globus_thread_blocking_will_block();
00464     return DataStatus::Success;
00465   }
00466 
00467   DataStatus DataPointGridFTP::StopWriting() {
00468     if (!writing)
00469       return DataStatus::WriteStopError;
00470     writing = false;
00471     if (!buffer->eof_write()) {
00472       logger.msg(VERBOSE, "StopWriting: aborting connection");
00473       globus_ftp_client_abort(&ftp_handle);
00474     }
00475     cond.wait();
00476     globus_ftp_client_handle_flush_url_state(&ftp_handle, url.str().c_str());
00477     return condstatus;
00478   }
00479 
00480   void* DataPointGridFTP::ftp_write_thread(void *arg) {
00481     DataPointGridFTP *it = (DataPointGridFTP*)arg;
00482     int h;
00483     unsigned int l;
00484     unsigned long long int o;
00485     GlobusResult res;
00486     globus_bool_t eof = GLOBUS_FALSE;
00487     logger.msg(INFO, "ftp_write_thread: get and register buffers");
00488     for (;;) {
00489       if (!it->buffer->for_write(h, l, o, true)) {
00490         if (it->buffer->error()) {
00491           logger.msg(VERBOSE, "ftp_write_thread: for_write failed - aborting");
00492           globus_ftp_client_abort(&(it->ftp_handle));
00493           break;
00494         }
00495         // no buffers and no errors - must be pure eof
00496         eof = GLOBUS_TRUE;
00497         char dummy;
00498         o = it->buffer->eof_position();
00499         res = globus_ftp_client_register_write(&(it->ftp_handle),
00500                                                (globus_byte_t*)(&dummy), 0, o,
00501                                                eof, &ftp_write_callback, it);
00502         break;
00503         // if(res == GLOBUS_SUCCESS) break;
00504         // sleep(1); continue;
00505       }
00506       res =
00507         globus_ftp_client_register_write(&(it->ftp_handle),
00508                                          (globus_byte_t*)((*(it->buffer))[h]),
00509                                          l, o, eof, &ftp_write_callback, it);
00510       if (!res) {
00511         it->buffer->is_notwritten(h);
00512         sleep(1);
00513       }
00514     }
00515     /* make sure complete callback is called */
00516     it->buffer->wait_eof_write();
00517     it->condstatus = it->buffer->error_write() ? DataStatus::WriteError :
00518                      DataStatus::Success;
00519     it->cond.signal();
00520     return NULL;
00521   }
00522 
00523   void DataPointGridFTP::ftp_write_callback(void *arg,
00524                                             globus_ftp_client_handle_t*,
00525                                             globus_object_t *error,
00526                                             globus_byte_t *buffer,
00527                                             globus_size_t,
00528                                             globus_off_t,
00529                                             globus_bool_t) {
00530     DataPointGridFTP *it = (DataPointGridFTP*)arg;
00531     if (error != GLOBUS_SUCCESS) {
00532       logger.msg(VERBOSE, "ftp_write_callback: failure");
00533       it->buffer->is_written((char*)buffer);
00534       return;
00535     }
00536     logger.msg(DEBUG, "ftp_write_callback: success");
00537     it->buffer->is_written((char*)buffer);
00538     return;
00539   }
00540 
00541   void DataPointGridFTP::ftp_put_complete_callback(void *arg,
00542                                                    globus_ftp_client_handle_t*,
00543                                                    globus_object_t *error) {
00544     DataPointGridFTP *it = (DataPointGridFTP*)arg;
00545     /* data transfer finished */
00546     if (error != GLOBUS_SUCCESS) {
00547       logger.msg(INFO, "Failed to store ftp file");
00548       it->failure_code = DataStatus(DataStatus::WriteStartError, globus_object_to_string(error));
00549       logger.msg(VERBOSE, "Globus error: %s", globus_object_to_string(error));
00550       it->buffer->error_write(true);
00551       return;
00552     }
00553     it->buffer->eof_write(true);
00554     return;
00555   }
00556 
00557   DataStatus DataPointGridFTP::ListFiles(std::list<FileInfo>& files,
00558                                          bool long_list,
00559                                          bool resolve,
00560                                          bool metadata) {
00561     if (!ftp_active)
00562       return DataStatus::NotInitializedError;
00563     if (reading)
00564       return DataStatus::IsReadingError;
00565     if (writing)
00566       return DataStatus::IsWritingError;
00567     set_attributes();
00568     Lister lister(*credential);
00569     if (lister.retrieve_dir_info(url,!(long_list | resolve | metadata)) != 0) {
00570       if (lister.retrieve_file_info(url,!(long_list | resolve | metadata)) != 0) {
00571         logger.msg(ERROR, "Failed to obtain listing from ftp: %s", url.str());
00572         return DataStatus::ListError;
00573       }
00574     }
00575     lister.close_connection();
00576     DataStatus result = DataStatus::Success;
00577     for (std::list<FileInfo>::iterator i = lister.begin();
00578          i != lister.end(); ++i) {
00579       std::list<FileInfo>::iterator f =
00580         files.insert(files.end(), FileInfo(i->GetLastName()));
00581       if (long_list) {
00582         GlobusResult res;
00583         globus_off_t size = 0;
00584         globus_abstime_t gl_modify_time;
00585         time_t modify_time;
00586         int modify_utime;
00587         // Lister should always return full path to file
00588         std::string f_url = url.ConnectionURL() + i->GetName();
00589         f->SetType(i->GetType());
00590         if (i->CheckSize())
00591           f->SetSize(i->GetSize());
00592         else if (i->GetType() != FileInfo::file_type_dir) {
00593           logger.msg(DEBUG, "list_files_ftp: looking for size of %s", f_url);
00594           res = globus_ftp_client_size(&ftp_handle, f_url.c_str(), &ftp_opattr,
00595                                        &size, &ftp_complete_callback, this);
00596           if (!res) {
00597             logger.msg(VERBOSE, "list_files_ftp: globus_ftp_client_size failed");
00598             logger.msg(INFO, "Globus error: %s", res.str());
00599             result = DataStatus::ListError;
00600           }
00601           else if (!cond.wait(1000*usercfg.Timeout())) {
00602             logger.msg(INFO, "list_files_ftp: timeout waiting for size");
00603             globus_ftp_client_abort(&ftp_handle);
00604             cond.wait();
00605             result = DataStatus::ListError;
00606           }
00607           else if (!condstatus) {
00608             logger.msg(INFO, "list_files_ftp: failed to get file's size");
00609             result = DataStatus::ListError;
00610             // Guessing - directories usually have no size
00611             f->SetType(FileInfo::file_type_dir);
00612           }
00613           else {
00614             f->SetSize(size);
00615             // Guessing - only files usually have size
00616             f->SetType(FileInfo::file_type_file);
00617           }
00618         }
00619         if (i->CheckCreated())
00620           f->SetCreated(i->GetCreated());
00621         else {
00622           logger.msg(DEBUG, "list_files_ftp: "
00623                      "looking for modification time of %s", f_url);
00624           res =
00625             globus_ftp_client_modification_time(&ftp_handle, f_url.c_str(),
00626                                                 &ftp_opattr, &gl_modify_time,
00627                                                 &ftp_complete_callback, this);
00628           if (!res) {
00629             logger.msg(VERBOSE, "list_files_ftp: "
00630                        "globus_ftp_client_modification_time failed");
00631             logger.msg(INFO, "Globus error: %s", res.str());
00632             result = DataStatus::ListError;
00633           }
00634           else if (!cond.wait(1000*usercfg.Timeout())) {
00635             logger.msg(INFO, "list_files_ftp: "
00636                        "timeout waiting for modification_time");
00637             globus_ftp_client_abort(&ftp_handle);
00638             cond.wait();
00639             result = DataStatus::ListError;
00640           }
00641           else if (!condstatus) {
00642             logger.msg(INFO, "list_files_ftp: "
00643                        "failed to get file's modification time");
00644             result = DataStatus::ListError;
00645           }
00646           else {
00647             GlobusTimeAbstimeGet(gl_modify_time, modify_time, modify_utime);
00648             f->SetCreated(modify_time);
00649           }
00650         }
00651       }
00652     }
00653     return result;
00654   }
00655 
00656   DataPointGridFTP::DataPointGridFTP(const URL& url, const UserConfig& usercfg)
00657     : DataPointDirect(url, usercfg),
00658       ftp_active(false),
00659       condstatus(DataStatus::Success),
00660       credential(NULL),
00661       reading(false),
00662       writing(false) {
00663     //globus_module_activate(GLOBUS_FTP_CLIENT_MODULE);
00664     //if (!proxy_initialized)
00665     //  proxy_initialized = GlobusRecoverProxyOpenSSL();
00666     // Activating globus only once because it looks like 
00667     // deactivation of GLOBUS_FTP_CONTROL_MODULE is not
00668     // handled properly on Windows. This should not cause
00669     // problems (except for valgrind) because this plugin
00670     // is registered as persistent.
00671     if (!proxy_initialized) {
00672       globus_module_activate(GLOBUS_COMMON_MODULE);
00673       globus_module_activate(GLOBUS_FTP_CLIENT_MODULE);
00674       proxy_initialized = GlobusRecoverProxyOpenSSL();
00675     }
00676     is_secure = false;
00677     if (url.Protocol() == "gsiftp")
00678       is_secure = true;
00679     if (!ftp_active) {
00680       GlobusResult res;
00681       globus_ftp_client_handleattr_t ftp_attr;
00682       if (!(res = globus_ftp_client_handleattr_init(&ftp_attr))) {
00683         logger.msg(ERROR,
00684                    "init_handle: globus_ftp_client_handleattr_init failed");
00685         logger.msg(ERROR, "Globus error: %s", res.str());
00686         ftp_active = false;
00687         return;
00688       }
00689 #ifdef HAVE_GLOBUS_FTP_CLIENT_HANDLEATTR_SET_GRIDFTP2
00690       if (!(res = globus_ftp_client_handleattr_set_gridftp2(&ftp_attr,
00691                                                             GLOBUS_TRUE))) {
00692         globus_ftp_client_handleattr_destroy(&ftp_attr);
00693         logger.msg(ERROR, "init_handle: "
00694                    "globus_ftp_client_handleattr_set_gridftp2 failed");
00695         logger.msg(ERROR, "Globus error: %s", res.str());
00696         ftp_active = false;
00697         return;
00698       }
00699 #endif
00700       if (!(res = globus_ftp_client_handle_init(&ftp_handle, &ftp_attr))) {
00701         globus_ftp_client_handleattr_destroy(&ftp_attr);
00702         logger.msg(ERROR, "init_handle: globus_ftp_client_handle_init failed");
00703         logger.msg(ERROR, "Globus error: %s", res.str());
00704         ftp_active = false;
00705         return;
00706       }
00707       globus_ftp_client_handleattr_destroy(&ftp_attr);
00708       if (!(res = globus_ftp_client_operationattr_init(&ftp_opattr))) {
00709         logger.msg(ERROR, "init_handle: "
00710                    "globus_ftp_client_operationattr_init failed");
00711         logger.msg(ERROR, "Globus error: %s", res.str());
00712         globus_ftp_client_handle_destroy(&ftp_handle);
00713         ftp_active = false;
00714         return;
00715       }
00716     }
00717     ftp_active = true;
00718     ftp_threads = 1;
00719     if (allow_out_of_order) {
00720       ftp_threads = stringtoi(url.Option("threads"));
00721       if (ftp_threads < 1)
00722         ftp_threads = 1;
00723       if (ftp_threads > MAX_PARALLEL_STREAMS)
00724         ftp_threads = MAX_PARALLEL_STREAMS;
00725     }
00726     autodir = additional_checks;
00727     std::string autodir_s = url.Option("autodir");
00728     if(autodir_s == "yes") {
00729       autodir = true;
00730     } else if(autodir_s == "no") {
00731       autodir = false;
00732     }
00733   }
00734 
00735   void DataPointGridFTP::set_attributes(void) {
00736     globus_ftp_control_parallelism_t paral;
00737     if (ftp_threads > 1) {
00738       paral.fixed.mode = GLOBUS_FTP_CONTROL_PARALLELISM_FIXED;
00739       paral.fixed.size = ftp_threads;
00740     }
00741     else {
00742       paral.fixed.mode = GLOBUS_FTP_CONTROL_PARALLELISM_NONE;
00743       paral.fixed.size = 1;
00744     }
00745     globus_ftp_client_operationattr_set_parallelism(&ftp_opattr, &paral);
00746     globus_ftp_client_operationattr_set_striped(&ftp_opattr, GLOBUS_FALSE);
00747     /*   globus_ftp_client_operationattr_set_layout         */
00748     /*   globus_ftp_client_operationattr_set_tcp_buffer     */
00749     globus_ftp_client_operationattr_set_type(&ftp_opattr,
00750                                              GLOBUS_FTP_CONTROL_TYPE_IMAGE);
00751     if (!is_secure) { // plain ftp protocol
00752       globus_ftp_client_operationattr_set_mode(&ftp_opattr,
00753                                                GLOBUS_FTP_CONTROL_MODE_STREAM);
00754       globus_ftp_client_operationattr_set_data_protection(&ftp_opattr,
00755                                                           GLOBUS_FTP_CONTROL_PROTECTION_CLEAR);
00756       globus_ftp_client_operationattr_set_control_protection(&ftp_opattr,
00757                                                              GLOBUS_FTP_CONTROL_PROTECTION_CLEAR);
00758       // need to set dcau to none in order Globus libraries not to send
00759       // it to pure ftp server
00760       globus_ftp_control_dcau_t dcau;
00761       dcau.mode = GLOBUS_FTP_CONTROL_DCAU_NONE;
00762       globus_ftp_client_operationattr_set_dcau(&ftp_opattr, &dcau);
00763     }
00764     else { // gridftp protocol
00765 
00766       if (!credential)
00767         credential = new GSSCredential(usercfg.ProxyPath(),
00768                                        usercfg.CertificatePath(), usercfg.KeyPath());
00769 
00770       GlobusResult r = globus_ftp_client_operationattr_set_authorization(
00771                      &ftp_opattr,
00772                      *credential,":globus-mapping:","user@",
00773                      GLOBUS_NULL,GLOBUS_NULL);
00774       if(!r) {
00775         logger.msg(WARNING, "Failed to set credentials for GridFTP transfer");
00776         logger.msg(VERBOSE, "globus_ftp_client_operationattr_set_authorization: error: %s", r.str());
00777       }
00778       if (force_secure || (url.Option("secure") == "yes")) {
00779         globus_ftp_client_operationattr_set_mode(&ftp_opattr,
00780                                                  GLOBUS_FTP_CONTROL_MODE_EXTENDED_BLOCK);
00781         globus_ftp_client_operationattr_set_data_protection(&ftp_opattr,
00782                                                             GLOBUS_FTP_CONTROL_PROTECTION_PRIVATE);
00783         logger.msg(VERBOSE, "Using secure data transfer");
00784       }
00785       else {
00786         if (force_passive)
00787           globus_ftp_client_operationattr_set_mode(&ftp_opattr,
00788                                                    GLOBUS_FTP_CONTROL_MODE_STREAM);
00789         else
00790           globus_ftp_client_operationattr_set_mode(&ftp_opattr,
00791                                                    GLOBUS_FTP_CONTROL_MODE_EXTENDED_BLOCK);
00792         globus_ftp_client_operationattr_set_data_protection(&ftp_opattr,
00793                                                             GLOBUS_FTP_CONTROL_PROTECTION_CLEAR);
00794         logger.msg(VERBOSE, "Using insecure data transfer");
00795       }
00796       globus_ftp_client_operationattr_set_control_protection(&ftp_opattr,
00797                                                              GLOBUS_FTP_CONTROL_PROTECTION_PRIVATE);
00798     }
00799     /*   globus_ftp_client_operationattr_set_dcau                         */
00800     /*   globus_ftp_client_operationattr_set_resume_third_party_transfer  */
00801     /*   globus_ftp_client_operationattr_set_authorization                */
00802     globus_ftp_client_operationattr_set_append(&ftp_opattr, GLOBUS_FALSE);
00803   }
00804 
00805   DataPointGridFTP::~DataPointGridFTP() {
00806     StopReading();
00807     StopWriting();
00808     if (ftp_active) {
00809       logger.msg(VERBOSE, "DataPoint::deinit_handle: destroy ftp_handle");
00810       globus_ftp_client_handle_destroy(&ftp_handle);
00811       globus_ftp_client_operationattr_destroy(&ftp_opattr);
00812     }
00813     if (credential)
00814       delete credential;
00815     // See activation for description
00816     //globus_module_deactivate(GLOBUS_FTP_CLIENT_MODULE);
00817   }
00818 
00819   Plugin* DataPointGridFTP::Instance(PluginArgument *arg) {
00820     DataPointPluginArgument *dmcarg = dynamic_cast<DataPointPluginArgument*>(arg);
00821     if (!dmcarg)
00822       return NULL;
00823     if (((const URL&)(*dmcarg)).Protocol() != "gsiftp" &&
00824         ((const URL&)(*dmcarg)).Protocol() != "ftp")
00825       return NULL;
00826     // Make this code non-unloadable because both OpenSSL
00827     // and Globus have problems with unloading
00828     Glib::Module* module = dmcarg->get_module();
00829     PluginsFactory* factory = dmcarg->get_factory();
00830     if(!(factory && module)) {
00831       logger.msg(ERROR, "Missing reference to factory and/or module. It is unsafe to use Globus in non-persistent mode - (Grid)FTP code is disabled. Report to developers.");
00832       return NULL;
00833     }
00834     factory->makePersistent(module);
00835     OpenSSLInit();
00836     return new DataPointGridFTP(*dmcarg, *dmcarg);
00837   }
00838 
00839   bool DataPointGridFTP::WriteOutOfOrder() {
00840     return true;
00841   }
00842 
00843   bool DataPointGridFTP::ProvidesMeta() {
00844     return true;
00845   }
00846 
00847 } // namespace Arc
00848 
00849 Arc::PluginDescriptor PLUGINS_TABLE_NAME[] = {
00850   { "gridftp", "HED:DMC", 0, &Arc::DataPointGridFTP::Instance },
00851   { NULL, NULL, 0, NULL }
00852 };