Back to index

nordugrid-arc-nox  1.1.0~rc6
DataPointFile.cpp
Go to the documentation of this file.
00001 // -*- indent-tabs-mode: nil -*-
00002 
00003 #ifdef HAVE_CONFIG_H
00004 #include <config.h>
00005 #endif
00006 
00007 #include <cstdlib>
00008 // NOTE: On Solaris errno is not working properly if cerrno is included first
00009 #include <cerrno>
00010 
00011 #include <dirent.h>
00012 #include <fcntl.h>
00013 #include <sys/types.h>
00014 #include <sys/stat.h>
00015 #include <unistd.h>
00016 
00017 #include <glibmm.h>
00018 
00019 #include <arc/Thread.h>
00020 #include <arc/Logger.h>
00021 #include <arc/URL.h>
00022 #include <arc/StringConv.h>
00023 #include <arc/data/DataBuffer.h>
00024 #include <arc/data/DataCallback.h>
00025 #include <arc/data/MkDirRecursive.h>
00026 
00027 #ifdef WIN32
00028 #include <arc/win32.h>
00029 #endif
00030 
00031 #include "DataPointFile.h"
00032 
00033 namespace Arc {
00034 
00035   Logger DataPointFile::logger(DataPoint::logger, "File");
00036 
00037   DataPointFile::DataPointFile(const URL& url, const UserConfig& usercfg)
00038     : DataPointDirect(url, usercfg),
00039       reading(false),
00040       writing(false),
00041       is_channel(false) {
00042     if (url.Protocol() == "file") {
00043       cache = false;
00044       is_channel = false;
00045       local = true;
00046     }
00047     else if (url.Path() == "-") { // won't work
00048       linkable = false;
00049       is_channel = true;
00050     }
00051   }
00052 
00053   DataPointFile::~DataPointFile() {
00054     StopReading();
00055     StopWriting();
00056   }
00057 
00058   Plugin* DataPointFile::Instance(PluginArgument *arg) {
00059     DataPointPluginArgument *dmcarg = dynamic_cast<DataPointPluginArgument*>(arg);
00060     if (!dmcarg)
00061       return NULL;
00062     if (((const URL &)(*dmcarg)).Protocol() != "file")
00063       return NULL;
00064     return new DataPointFile(*dmcarg, *dmcarg);
00065   }
00066 
00067   void DataPointFile::read_file_start(void* arg) {
00068     ((DataPointFile*)arg)->read_file();
00069   }
00070 
00071   void DataPointFile::read_file() {
00072     bool limit_length = false;
00073     unsigned long long int range_length = 0;
00074     unsigned long long int offset = 0;
00075     if (range_end > range_start) {
00076       range_length = range_end - range_start;
00077       limit_length = true;
00078       lseek(fd, range_start, SEEK_SET);
00079       offset = range_start;
00080     }
00081     else
00082       lseek(fd, 0, SEEK_SET);
00083     for (;;) {
00084       if (limit_length)
00085         if (range_length == 0)
00086           break;
00087       /* read from fd here and push to buffer */
00088       /* 1. claim buffer */
00089       int h;
00090       unsigned int l;
00091       if (!buffer->for_read(h, l, true)) {
00092         /* failed to get buffer - must be error or request to exit */
00093         buffer->error_read(true);
00094         break;
00095       }
00096       if (buffer->error()) {
00097         buffer->is_read(h, 0, 0);
00098         break;
00099       }
00100       /* 2. read */
00101       if (limit_length)
00102         if (l > range_length)
00103           l = range_length;
00104       unsigned long long int p = lseek(fd, 0, SEEK_CUR);
00105       if (p == (unsigned long long int)(-1))
00106         p = offset;
00107       int ll = read(fd, (*(buffer))[h], l);
00108       if (ll == -1) { /* error */
00109         buffer->is_read(h, 0, 0);
00110         buffer->error_read(true);
00111         break;
00112       }
00113       if (ll == 0) { /* eof */
00114         buffer->is_read(h, 0, 0);
00115         break;
00116       }
00117       /* 3. announce */
00118       buffer->is_read(h, ll, p);
00119       if (limit_length) {
00120         if (ll > range_length)
00121           range_length = 0;
00122         else
00123           range_length -= ll;
00124       }
00125       offset += ll; // for non-seakable files
00126     }
00127     close(fd);
00128     buffer->eof_read(true);
00129     transfer_cond.signal();
00130   }
00131 
00132   void DataPointFile::write_file_start(void* arg) {
00133     ((DataPointFile*)arg)->write_file();
00134   }
00135 
00136   void DataPointFile::write_file() {
00137     for (;;) {
00138       /* take from buffer and write to fd */
00139       /* 1. claim buffer */
00140       int h;
00141       unsigned int l;
00142       unsigned long long int p;
00143       if (!buffer->for_write(h, l, p, true)) {
00144         /* failed to get buffer - must be error or request to exit */
00145         if (!buffer->eof_read())
00146           buffer->error_write(true);
00147         buffer->eof_write(true);
00148         break;
00149       }
00150       if (buffer->error()) {
00151         buffer->is_written(h);
00152         buffer->eof_write(true);
00153         break;
00154       }
00155       /* 2. write */
00156       lseek(fd, p, SEEK_SET);
00157       int l_ = 0;
00158       int ll = 0;
00159       while (l_ < l) {
00160         ll = write(fd, (*(buffer))[h] + l_, l - l_);
00161         if (ll == -1) { /* error */
00162           buffer->is_written(h);
00163           buffer->error_write(true);
00164           buffer->eof_write(true);
00165           break;
00166         }
00167         l_ += ll;
00168       }
00169       if (ll == -1)
00170         break;
00171       /* 3. announce */
00172       buffer->is_written(h);
00173     }
00174 #ifndef WIN32
00175     // This is for broken filesystems. Specifically for Lustre.
00176     if (fsync(fd) != 0 && errno != EINVAL) { // this error is caused by special files like stdout
00177       logger.msg(ERROR, "fsync of file %s failed: %s", url.Path(), strerror(errno));
00178       buffer->error_write(true);
00179     }
00180 #endif
00181     if (close(fd) != 0) {
00182       logger.msg(ERROR, "closing file %s failed: %s", url.Path(), strerror(errno));
00183       buffer->error_write(true);
00184     }    
00185     transfer_cond.signal();
00186   }
00187 
00188   DataStatus DataPointFile::Check() {
00189     if (reading)
00190       return DataStatus::IsReadingError;
00191     if (writing)
00192       return DataStatus::IsWritingError;
00193     User user;
00194     int res = user.check_file_access(url.Path(), O_RDONLY);
00195     if (res != 0) {
00196       logger.msg(INFO, "File is not accessible: %s", url.Path());
00197       return DataStatus::CheckError;
00198     }
00199     struct stat st;
00200     if (stat((url.Path()).c_str(), &st) != 0) {
00201       logger.msg(INFO, "Can't stat file: %s", url.Path());
00202       return DataStatus::CheckError;
00203     }
00204     SetSize(st.st_size);
00205     SetCreated(st.st_mtime);
00206     return DataStatus::Success;
00207   }
00208 
00209   DataStatus DataPointFile::Remove() {
00210     if (reading)
00211       return DataStatus::IsReadingError;
00212     if (writing)
00213       return DataStatus::IsReadingError;
00214       
00215     const char* path = url.Path().c_str();
00216     struct stat st;
00217     if(stat(path,&st) != 0) {
00218       if (errno == ENOENT) return DataStatus::Success;
00219       logger.msg(INFO, "File is not accessible: %s - %s", path, strerror(errno));
00220       return DataStatus::DeleteError;
00221     }
00222     // path is a directory
00223     if(S_ISDIR(st.st_mode)) {
00224       if (rmdir(path) == -1) {
00225         logger.msg(INFO, "Can't delete directory: %s - %s", path, strerror(errno));
00226         return DataStatus::DeleteError;
00227       }
00228       return DataStatus::Success;
00229     }
00230     // path is a file
00231     if(unlink(path) == -1 && errno != ENOENT) {
00232       logger.msg(INFO, "Can't delete file: %s - %s", path, strerror(errno));
00233       return DataStatus::DeleteError;
00234     }
00235     return DataStatus::Success;
00236   }
00237 
00238   DataStatus DataPointFile::StartReading(DataBuffer& buf) {
00239     if (reading)
00240       return DataStatus::IsReadingError;
00241     if (writing)
00242       return DataStatus::IsWritingError;
00243     reading = true;
00244     /* try to open */
00245     int flags = O_RDONLY;
00246 #ifdef WIN32
00247     flags |= O_BINARY;
00248 #endif
00249 
00250     if (url.Path() == "-") // won't work
00251       fd = dup(STDIN_FILENO);
00252     else {
00253       User user;
00254       if (user.check_file_access(url.Path(), flags) != 0) {
00255         reading = false;
00256         return DataStatus::ReadStartError;
00257       }
00258       fd = open((url.Path()).c_str(), flags);
00259     }
00260     if (fd == -1) {
00261       reading = false;
00262       return DataStatus::ReadStartError;
00263     }
00264     /* provide some metadata */
00265     struct stat st;
00266     if (fstat(fd, &st) == 0) {
00267       SetSize(st.st_size);
00268       SetCreated(st.st_mtime);
00269     }
00270     buffer = &buf;
00271     transfer_cond.reset();
00272     /* create thread to maintain reading */
00273     if(!CreateThreadFunction(&DataPointFile::read_file_start,this)) {
00274       close(fd);
00275       fd = -1;
00276       reading = false;
00277       return DataStatus::ReadStartError;
00278     }
00279     return DataStatus::Success;
00280   }
00281 
00282   DataStatus DataPointFile::StopReading() {
00283     if (!reading)
00284       return DataStatus::ReadStopError;
00285     reading = false;
00286     if (!buffer->eof_read()) {
00287       buffer->error_read(true);      /* trigger transfer error */
00288       close(fd);
00289       fd = -1;
00290     }
00291     // buffer->wait_eof_read();
00292     transfer_cond.wait();         /* wait till reading thread exited */
00293     if (buffer->error_read())
00294       return DataStatus::ReadError;
00295     return DataStatus::Success;
00296   }
00297 
00298   DataStatus DataPointFile::StartWriting(DataBuffer& buf,
00299                                          DataCallback *space_cb) {
00300     if (reading)
00301       return DataStatus::IsReadingError;
00302     if (writing)
00303       return DataStatus::IsWritingError;
00304     writing = true;
00305     /* try to open */
00306     buffer = &buf;
00307     if (url.Path() == "-") { // won't work
00308       fd = dup(STDOUT_FILENO);
00309       if (fd == -1) {
00310         logger.msg(ERROR, "Failed to use channel stdout");
00311         buffer->error_write(true);
00312         buffer->eof_write(true);
00313         writing = false;
00314         return DataStatus::WriteStartError;
00315       }
00316     }
00317     else {
00318       User user;
00319       /* do not check permissions to create anything here -
00320          suppose it path was checked at higher level */
00321       /* make directories */
00322       if (url.Path().empty()) {
00323         logger.msg(ERROR, "Invalid url: %s", url.str());
00324         buffer->error_write(true);
00325         buffer->eof_write(true);
00326         writing = false;
00327         return DataStatus::WriteStartError;
00328       }
00329       std::string dirpath = Glib::path_get_dirname(url.Path());
00330       if(dirpath == ".") dirpath = G_DIR_SEPARATOR_S; // shouldn't happen
00331       if (mkdir_recursive("", dirpath, S_IRWXU, user) != 0)
00332         if (errno != EEXIST) {
00333           logger.msg(ERROR, "Failed to create/find directory %s, (%d)", dirpath, errno);
00334           buffer->error_write(true);
00335           buffer->eof_write(true);
00336           writing = false;
00337           return DataStatus::WriteStartError;
00338         }
00339 
00340       /* try to create file, if failed - try to open it */
00341       int flags = O_WRONLY;
00342 #ifdef WIN32
00343       flags |= O_BINARY;
00344 #endif
00345       fd = open((url.Path()).c_str(), flags | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
00346       if (fd == -1)
00347         fd = open((url.Path()).c_str(), flags | O_TRUNC, S_IRUSR | S_IWUSR);
00348       else  /* this file was created by us. Hence we can set it's owner */
00349         (fchown(fd, user.get_uid(), user.get_gid()) != 0);
00350       if (fd == -1) {
00351         logger.msg(ERROR, "Failed to create/open file %s (%d)", url.Path(), errno);
00352         buffer->error_write(true);
00353         buffer->eof_write(true);
00354         writing = false;
00355         return DataStatus::WriteStartError;
00356       }
00357 
00358       /* preallocate space */
00359       buffer->speed.hold(true);
00360       if (additional_checks && CheckSize()) {
00361         unsigned long long int fsize = GetSize();
00362         logger.msg(INFO, "setting file %s to size %llu",
00363                    url.Path(), fsize);
00364         /* because filesytem can skip empty blocks do real write */
00365         unsigned long long int old_size = lseek(fd, 0, SEEK_END);
00366         if (old_size < fsize) {
00367           char buf[65536];
00368           memset(buf, 0xFF, sizeof(buf));
00369           unsigned int l = 1;
00370           while (l > 0) {
00371             old_size = lseek(fd, 0, SEEK_END);
00372             l = sizeof(buf);
00373             if (l > (fsize - old_size))
00374               l = fsize - old_size;
00375             if (write(fd, buf, l) == -1) {
00376               /* out of space */
00377               if (space_cb != NULL)
00378                 if (space_cb->cb((unsigned long long int)l))
00379                   continue;
00380               lseek(fd, 0, SEEK_SET);
00381               (ftruncate(fd, 0) != 0);
00382               close(fd);
00383               fd = -1;
00384               logger.msg(INFO, "Failed to preallocate space");
00385               buffer->speed.reset();
00386               buffer->speed.hold(false);
00387               buffer->error_write(true);
00388               buffer->eof_write(true);
00389               writing = false;
00390               return DataStatus::WriteStartError;
00391             }
00392           }
00393         }
00394       }
00395     }
00396     buffer->speed.reset();
00397     buffer->speed.hold(false);
00398     transfer_cond.reset();
00399     /* create thread to maintain writing */
00400     if(!CreateThreadFunction(&DataPointFile::write_file_start,this)) {
00401       close(fd);
00402       fd = -1;
00403       buffer->error_write(true);
00404       buffer->eof_write(true);
00405       writing = false;
00406       return DataStatus::WriteStartError;
00407     }
00408     return DataStatus::Success;
00409   }
00410 
00411   DataStatus DataPointFile::StopWriting() {
00412     if (!writing)
00413       return DataStatus::WriteStopError;
00414     writing = false;
00415     if (!buffer->eof_write()) {
00416       buffer->error_write(true);      /* trigger transfer error */
00417       close(fd);
00418       fd = -1;
00419     }
00420     // buffer->wait_eof_write();
00421     transfer_cond.wait();         /* wait till writing thread exited */
00422     // validate file size
00423     if (additional_checks && CheckSize()) {
00424       struct stat st;
00425       std::string path = url.Path();
00426       if (stat(path.c_str(), &st) != 0 && errno != ENOENT) {
00427         logger.msg(ERROR, "Error during file validation. Can't stat file %s", url.Path());
00428         return DataStatus::WriteStopError;
00429       }
00430       if (errno != ENOENT && GetSize() != st.st_size) {
00431         logger.msg(ERROR, "Error during file validation: Local file size %llu does not match source file size %llu for file %s",
00432                           st.st_size, GetSize(), url.Path());
00433         return DataStatus::WriteStopError;
00434       }
00435     }
00436     
00437     if (buffer->error_write())
00438       return DataStatus::WriteError;
00439     return DataStatus::Success;
00440   }
00441 
00442   DataStatus DataPointFile::ListFiles(std::list<FileInfo>& files,
00443                                       bool long_list,
00444                                       bool resolve,
00445                                       bool metadata) {
00446     if (reading)
00447       return DataStatus::IsReadingError;
00448     if (writing)
00449       return DataStatus::IsWritingError;
00450     std::string dirname = url.Path();
00451     if (dirname[dirname.length() - 1] == '/')
00452       dirname.resize(dirname.length() - 1);
00453 
00454     struct stat st;
00455     if (stat(dirname.c_str(), &st) != 0) {
00456       logger.msg(INFO, "Failed to read object %s: %s", dirname, strerror(errno));
00457       return DataStatus::ListError;
00458     }
00459     if (S_ISDIR(st.st_mode) && !metadata) {
00460       try {
00461         Glib::Dir dir(dirname);
00462         std::string file_name;
00463         while ((file_name = dir.read_name()) != "") {
00464           std::list<FileInfo>::iterator f =
00465             files.insert(files.end(), FileInfo(file_name.c_str()));
00466           if (long_list) {
00467             std::string fname = dirname + "/" + file_name;
00468             struct stat st;
00469             if (stat(fname.c_str(), &st) == 0) {
00470               f->SetSize(st.st_size);
00471               f->SetCreated(st.st_mtime);
00472               if (S_ISDIR(st.st_mode))
00473                 f->SetType(FileInfo::file_type_dir);
00474               else if (S_ISREG(st.st_mode))
00475                 f->SetType(FileInfo::file_type_file);
00476             }
00477           }
00478         }
00479       }
00480       catch (Glib::FileError& e) {
00481         logger.msg(INFO, "Failed to read object %s", dirname);
00482         return DataStatus::ListError;
00483       }
00484     }
00485     else {
00486       std::list<FileInfo>::iterator f =
00487         files.insert(files.end(), FileInfo(dirname));
00488       f->SetMetaData("path", dirname);
00489       f->SetSize(st.st_size);
00490       f->SetMetaData("size", tostring(st.st_size));
00491       logger.msg(INFO, "size is %s", tostring(st.st_size));
00492       f->SetCreated(st.st_mtime);
00493       f->SetMetaData("mtime", (Time(st.st_mtime)).str());
00494        if (S_ISDIR(st.st_mode)) {
00495          f->SetType(FileInfo::file_type_dir);
00496         f->SetMetaData("type", "dir");
00497       }
00498       else if (S_ISREG(st.st_mode)) {
00499         f->SetType(FileInfo::file_type_file);
00500         f->SetMetaData("type", "file");
00501       }
00502       // fill some more metadata
00503       f->SetMetaData("atime", (Time(st.st_atime)).str());
00504       f->SetMetaData("ctime", (Time(st.st_ctime)).str());
00505       f->SetMetaData("group", tostring(st.st_gid));
00506       f->SetMetaData("owner", tostring(st.st_uid));
00507       unsigned int mode = st.st_mode;
00508       std::string perms;
00509       if (mode & S_IRUSR) perms += 'r'; else perms += '-';
00510       if (mode & S_IWUSR) perms += 'w'; else perms += '-';
00511       if (mode & S_IXUSR) perms += 'x'; else perms += '-';
00512 #ifndef WIN32
00513       if (mode & S_IRGRP) perms += 'r'; else perms += '-';
00514       if (mode & S_IWGRP) perms += 'w'; else perms += '-';
00515       if (mode & S_IXGRP) perms += 'x'; else perms += '-';
00516       if (mode & S_IROTH) perms += 'r'; else perms += '-';
00517       if (mode & S_IWOTH) perms += 'w'; else perms += '-';
00518       if (mode & S_IXOTH) perms += 'x'; else perms += '-';
00519 #endif
00520       f->SetMetaData("accessperm", perms);
00521     }
00522     return DataStatus::Success;
00523   }
00524 
00525   bool DataPointFile::WriteOutOfOrder() {
00526     if (!url)
00527       return false;
00528     if (url.Protocol() == "file")
00529       return true;
00530     return false;
00531   }
00532 
00533 } // namespace Arc
00534 
00535 Arc::PluginDescriptor PLUGINS_TABLE_NAME[] = {
00536   { "file", "HED:DMC", 0, &Arc::DataPointFile::Instance },
00537   { NULL, NULL, 0, NULL }
00538 };