Back to index

obnam  1.1
vfs.py
Go to the documentation of this file.
00001 # Copyright (C) 2008, 2010  Lars Wirzenius <liw@liw.fi>
00002 #
00003 # This program is free software; you can redistribute it and/or modify
00004 # it under the terms of the GNU General Public License as published by
00005 # the Free Software Foundation; either version 2 of the License, or
00006 # (at your option) any later version.
00007 #
00008 # This program is distributed in the hope that it will be useful,
00009 # but WITHOUT ANY WARRANTY; without even the implied warranty of
00010 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00011 # GNU General Public License for more details.
00012 #
00013 # You should have received a copy of the GNU General Public License along
00014 # with this program; if not, write to the Free Software Foundation, Inc.,
00015 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
00016 
00017 
00018 import errno
00019 import logging
00020 import os
00021 import stat
00022 import urlparse
00023 
00024 import obnamlib
00025 
00026 
00027 class VirtualFileSystem(object):
00028 
00029     '''A virtual filesystem interface.
00030     
00031     The backup program needs to access both local and remote files.
00032     To make it easier to support all kinds of files both locally and
00033     remotely, we use a custom virtual filesystem interface so that
00034     all filesystem access is done the same way. This way, we can
00035     easily support user data and backup repositories in any combination of
00036     local and remote filesystems.
00037 
00038     This class defines the interface for such virtual filesystems.
00039     Sub-classes will actually implement the interface.
00040 
00041     When a VFS is instantiated, it is bound to a base URL. When
00042     accessing the virtual filesystem, all paths are then given
00043     relative to the base URL. The Unix syntax for files is used
00044     for the relative paths: directory components separated by
00045     slashes, and an initial slash indicating the root of the
00046     filesystem (in this case, the base URL).
00047     
00048     '''
00049 
00050     def __init__(self, baseurl):
00051         self.baseurl = baseurl
00052         self.bytes_read = 0
00053         self.bytes_written = 0
00054         logging.info('VFS: __init__: baseurl=%s' % self.baseurl)
00055 
00056     def log_stats(self):
00057         logging.info('VFS: baseurl=%s read=%d written=%d' %
00058                      (self.baseurl, self.bytes_read, self.bytes_written))
00059 
00060     def connect(self):
00061         '''Connect to filesystem.'''
00062         
00063     def close(self):
00064         '''Close connection to filesystem.'''
00065         self.log_stats()
00066 
00067     def reinit(self, new_baseurl, create=False):
00068         '''Go back to the beginning.
00069         
00070         This behaves like instantiating a new instance, but possibly
00071         faster for things like SftpFS. If there is a network
00072         connection already open, it will be reused.
00073         
00074         '''
00075 
00076     def abspath(self, pathname):
00077         '''Return absolute version of pathname.'''
00078         return os.path.abspath(os.path.join(self.getcwd(), pathname))
00079 
00080     def getcwd(self):
00081         '''Return current working directory as absolute pathname.'''
00082         
00083     def chdir(self, pathname):
00084         '''Change current working directory to pathname.'''
00085 
00086     def listdir(self, pathname):
00087         '''Return list of basenames of entities at pathname.'''
00088 
00089     def listdir2(self, pathname):
00090         '''Return list of basenames and stats of entities at pathname.'''
00091 
00092     def lock(self, lockname, data):
00093         '''Create a lock file with the given name.'''
00094 
00095     def unlock(self, lockname):
00096         '''Remove a lock file.'''
00097 
00098     def exists(self, pathname):
00099         '''Does the file or directory exist?'''
00100 
00101     def isdir(self, pathname):
00102         '''Is it a directory?'''
00103 
00104     def mkdir(self, pathname):
00105         '''Create a directory.
00106         
00107         Parent directories must already exist.
00108         
00109         '''
00110         
00111     def makedirs(self, pathname):
00112         '''Create a directory, and missing parents.'''
00113 
00114     def rmdir(self, pathname):
00115         '''Remove an empty directory.'''
00116 
00117     def rmtree(self, dirname):
00118         '''Remove a directory tree, including its contents.'''
00119         if self.isdir(dirname):
00120             for pathname, st in self.scan_tree(dirname):
00121                 if stat.S_ISDIR(st.st_mode):
00122                     self.rmdir(pathname)
00123                 else:
00124                     self.remove(pathname)
00125 
00126     def remove(self, pathname):
00127         '''Remove a file.'''
00128 
00129     def rename(self, old, new):
00130         '''Rename a file.'''
00131 
00132     def lstat(self, pathname):
00133         '''Like os.lstat.'''
00134 
00135     def get_username(self, uid):
00136         '''Return name for user, or None if not known.'''
00137 
00138     def get_groupname(self, gid):
00139         '''Return name for group, or None if not known.'''
00140 
00141     def llistxattr(self, pathname):
00142         '''Return list of names of extended attributes for file.'''
00143         return []
00144         
00145     def lgetxattr(self, pathname, attrname):
00146         '''Return value of an extended attribute.'''
00147         
00148     def lsetxattr(self, pathname, attrname, attrvalue):
00149         '''Set value of an extended attribute.'''
00150 
00151     def lchown(self, pathname, uid, gid):
00152         '''Like os.lchown.'''
00153 
00154     def chmod(self, pathname, mode):
00155         '''Like os.chmod.'''
00156 
00157     def lutimes(self, pathname, atime_sec, atime_nsec, mtime_sec, mtime_nsec):
00158         '''Like lutimes(2).
00159         
00160         This isn't quite like lutimes, actually. Most importantly, it uses
00161         nanosecond timestamps rather than microsecond. This is important.
00162         
00163         '''
00164 
00165     def link(self, existing_path, new_path):
00166         '''Like os.link.'''
00167 
00168     def readlink(self, symlink):
00169         '''Like os.readlink.'''
00170 
00171     def symlink(self, source, destination):
00172         '''Like os.symlink.'''
00173 
00174     def open(self, pathname, mode):
00175         '''Open a file, like the builtin open() or file() function.
00176 
00177         The return value is a file object like the ones returned
00178         by the builtin open() function.
00179 
00180         '''
00181 
00182     def cat(self, pathname):
00183         '''Return the contents of a file.'''
00184 
00185     def write_file(self, pathname, contents):
00186         '''Write a new file.
00187 
00188         The file must not yet exist. The file is not necessarily written 
00189         atomically, meaning that if the writing fails (connection to
00190         server drops, for example), the file might exist in a partial
00191         form. The callers need to deal with this.
00192         
00193         Any directories in pathname will be created if necessary.
00194 
00195         '''
00196 
00197     def overwrite_file(self, pathname, contents):
00198         '''Like write_file, but overwrites existing file.'''
00199 
00200     def scan_tree(self, dirname, ok=None, dirst=None, log=logging.error,
00201                   error_handler=None):
00202         '''Scan a tree for files.
00203         
00204         Return a generator that returns ``(pathname, stat_result)``
00205         pairs for each file and directory in the tree, in 
00206         depth-first order.
00207         
00208         If ``ok`` is not None, it must be a function that determines
00209         if a particular file or directory should be returned.
00210         It gets the pathname and stat result as arguments, and
00211         should return True or False. If it returns False on a
00212         directory, ``scan_tree`` will not recurse into the
00213         directory.
00214         
00215         ``dirst`` is for internal optimization, and should not
00216         be used by the caller. ``log`` is used by unit tests and
00217         should not be used by the caller.
00218         
00219         Errors from calling ``listdir`` or ``lstat`` are logged,
00220         but do not stop the scanning. Such files or directories are
00221         not returned, however. If `error_handler` is defined, it is
00222         called once for every problem, giving the name and exception
00223         as arguments.
00224         
00225         '''
00226         
00227         error_handler = error_handler or (lambda name, e: None)
00228 
00229         try:
00230             pairs = self.listdir2(dirname)
00231         except OSError, e:
00232             log('listdir failed: %s: %s' % (e.filename, e.strerror))
00233             error_handler(dirname, e)
00234             pairs = []
00235             
00236         queue = []
00237         for name, st in pairs:
00238             pathname = os.path.join(dirname, name)
00239             if isinstance(st, BaseException):
00240                 error_handler(pathname, st)
00241             elif ok is None or ok(pathname, st):
00242                 if stat.S_ISDIR(st.st_mode):
00243                     for t in self.scan_tree(pathname, ok=ok, dirst=st):
00244                         yield t
00245                 else:
00246                     queue.append((pathname, st))
00247 
00248         for pathname, st in queue:
00249             yield pathname, st
00250 
00251         if dirst is None:
00252             try:
00253                 dirst = self.lstat(dirname)
00254             except OSError, e:
00255                 log('lstat for dir failed: %s: %s' % (e.filename, e.strerror))
00256                 return
00257 
00258         yield dirname, dirst
00259 
00260         
00261 class VfsFactory:
00262 
00263     '''Create new instances of VirtualFileSystem.'''
00264 
00265     def __init__(self):
00266         self.implementations = {}
00267         
00268     def register(self, scheme, implementation, **kwargs):
00269         if scheme in self.implementations:
00270             raise obnamlib.Error('URL scheme %s already registered' % scheme)
00271         self.implementations[scheme] = (implementation, kwargs)
00272 
00273     def new(self, url, create=False):
00274         '''Create a new VFS appropriate for a given URL.'''
00275         scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
00276         if scheme in self.implementations:
00277             klass, kwargs = self.implementations[scheme]
00278             return klass(url, create=create, **kwargs)
00279         raise obnamlib.Error('Unknown VFS type %s' % url)
00280             
00281             
00282 class VfsTests(object): # pragma: no cover
00283 
00284     '''Re-useable tests for VirtualFileSystem implementations.
00285     
00286     The base class can't be usefully instantiated itself.
00287     Instead you are supposed to sub-class it and implement the API in
00288     a suitable way for yourself.
00289     
00290     This class implements a number of tests that the API implementation
00291     must pass. The implementation's own test class should inherit from
00292     this class, and unittest.TestCase.
00293     
00294     The test sub-class should define a setUp method that sets the following:
00295     
00296     * self.fs to an instance of the API implementation sub-class
00297     * self.basepath to the path to the base of the filesystem
00298     
00299     basepath must be operable as a pathname using os.path tools. If
00300     the VFS implemenation operates remotely and wants to operate on a
00301     URL like 'http://domain/path' as the baseurl, then basepath must be
00302     just the path portion of the URL.
00303     
00304     The directory indicated by basepath must exist, but must be empty
00305     at start.
00306     
00307     '''
00308     
00309     non_ascii_name = u'm\u00e4kel\u00e4'.encode('utf-8')
00310 
00311     def test_abspath_returns_input_for_absolute_path(self):
00312         self.assertEqual(self.fs.abspath('/foo/bar'), '/foo/bar')
00313 
00314     def test_abspath_returns_absolute_path_for_relative_input(self):
00315         self.assertEqual(self.fs.abspath('foo'),
00316                          os.path.join(self.basepath, 'foo'))
00317 
00318     def test_abspath_normalizes_path(self):
00319         self.assertEqual(self.fs.abspath('foo/..'), self.basepath)
00320 
00321     def test_abspath_returns_plain_string(self):
00322         self.fs.mkdir(self.non_ascii_name)
00323         self.fs.chdir(self.non_ascii_name)
00324         self.assertEqual(type(self.fs.abspath('.')), str)
00325 
00326     def test_reinit_works(self):
00327         self.fs.chdir('/')
00328         self.fs.reinit(self.fs.baseurl)
00329         self.assertEqual(self.fs.getcwd(), self.basepath)
00330 
00331     def test_reinit_to_nonexistent_filename_raises_OSError(self):
00332         notexist = os.path.join(self.fs.baseurl, 'thisdoesnotexist')
00333         self.assertRaises(OSError, self.fs.reinit, notexist)
00334 
00335     def test_reinit_creates_target_if_requested(self):
00336         self.fs.chdir('/')
00337         new_baseurl = os.path.join(self.fs.baseurl, 'newdir')
00338         new_basepath = os.path.join(self.basepath, 'newdir')
00339         self.fs.reinit(new_baseurl, create=True)
00340         self.assertEqual(self.fs.getcwd(), new_basepath)
00341 
00342     def test_getcwd_returns_dirname(self):
00343         self.assertEqual(self.fs.getcwd(), self.basepath)
00344 
00345     def test_getcwd_returns_plain_string(self):
00346         self.fs.mkdir(self.non_ascii_name)
00347         self.fs.chdir(self.non_ascii_name)
00348         self.assertEqual(type(self.fs.getcwd()), str)
00349 
00350     def test_chdir_changes_only_fs_cwd_not_process_cwd(self):
00351         process_cwd = os.getcwd()
00352         self.fs.chdir('/')
00353         self.assertEqual(self.fs.getcwd(), '/')
00354         self.assertEqual(os.getcwd(), process_cwd)
00355 
00356     def test_chdir_to_nonexistent_raises_exception(self):
00357         self.assertRaises(OSError, self.fs.chdir, '/foobar')
00358 
00359     def test_chdir_to_relative_works(self):
00360         pathname = os.path.join(self.basepath, 'foo')
00361         os.mkdir(pathname)
00362         self.fs.chdir('foo')
00363         self.assertEqual(self.fs.getcwd(), pathname)
00364 
00365     def test_chdir_to_dotdot_works(self):
00366         pathname = os.path.join(self.basepath, 'foo')
00367         os.mkdir(pathname)
00368         self.fs.chdir('foo')
00369         self.fs.chdir('..')
00370         self.assertEqual(self.fs.getcwd(), self.basepath)
00371 
00372     def test_creates_lock_file(self):
00373         self.fs.lock('lock', 'lock data')
00374         self.assertTrue(self.fs.exists('lock'))
00375         self.assertEqual(self.fs.cat('lock'), 'lock data')
00376 
00377     def test_second_lock_fails(self):
00378         self.fs.lock('lock', 'lock data')
00379         self.assertRaises(Exception, self.fs.lock, 'lock', 'second lock')
00380         self.assertEqual(self.fs.cat('lock'), 'lock data')
00381 
00382     def test_unlock_removes_lock(self):
00383         self.fs.lock('lock', 'lock data')
00384         self.fs.unlock('lock')
00385         self.assertFalse(self.fs.exists('lock'))
00386 
00387     def test_exists_returns_false_for_nonexistent_file(self):
00388         self.assertFalse(self.fs.exists('foo'))
00389 
00390     def test_exists_returns_true_for_existing_file(self):
00391         self.fs.write_file('foo', '')
00392         self.assert_(self.fs.exists('foo'))
00393 
00394     def test_isdir_returns_false_for_nonexistent_file(self):
00395         self.assertFalse(self.fs.isdir('foo'))
00396 
00397     def test_isdir_returns_false_for_nondir(self):
00398         self.fs.write_file('foo', '')
00399         self.assertFalse(self.fs.isdir('foo'))
00400 
00401     def test_isdir_returns_true_for_existing_dir(self):
00402         self.fs.mkdir('foo')
00403         self.assert_(self.fs.isdir('foo'))
00404 
00405     def test_listdir_returns_plain_strings_only(self):
00406         self.fs.write_file(u'M\u00E4kel\u00E4'.encode('utf-8'), 'data')
00407         names = self.fs.listdir('.')
00408         types = [type(x) for x in names]
00409         self.assertEqual(types, [str])
00410 
00411     def test_listdir_raises_oserror_if_directory_does_not_exist(self):
00412         self.assertRaises(OSError, self.fs.listdir, 'foo')
00413 
00414     def test_listdir2_returns_name_stat_pairs(self):
00415         funny = u'M\u00E4kel\u00E4'.encode('utf-8')
00416         self.fs.write_file(funny, 'data')
00417         pairs = self.fs.listdir2('.')
00418         self.assertEqual(len(pairs), 1)
00419         self.assertEqual(len(pairs[0]), 2)
00420         name, st = pairs[0]
00421         self.assertEqual(type(name), str)
00422         self.assertEqual(name, funny)
00423         self.assert_(hasattr(st, 'st_mode'))
00424         self.assertFalse(hasattr(st, 'st_mtime'))
00425         self.assert_(hasattr(st, 'st_mtime_sec'))
00426         self.assert_(hasattr(st, 'st_mtime_nsec'))
00427 
00428     def test_listdir2_returns_plain_strings_only(self):
00429         self.fs.write_file(u'M\u00E4kel\u00E4'.encode('utf-8'), 'data')
00430         names = [name for name, st in self.fs.listdir2('.')]
00431         types = [type(x) for x in names]
00432         self.assertEqual(types, [str])
00433 
00434     def test_listdir2_raises_oserror_if_directory_does_not_exist(self):
00435         self.assertRaises(OSError, self.fs.listdir2, 'foo')
00436 
00437     def test_mknod_creates_fifo(self):
00438         self.fs.mknod('foo', 0600 | stat.S_IFIFO)
00439         self.assertEqual(self.fs.lstat('foo').st_mode, 0600 | stat.S_IFIFO)
00440 
00441     def test_mkdir_raises_oserror_if_directory_exists(self):
00442         self.assertRaises(OSError, self.fs.mkdir, '.')
00443 
00444     def test_mkdir_raises_oserror_if_parent_does_not_exist(self):
00445         self.assertRaises(OSError, self.fs.mkdir, 'foo/bar')
00446     
00447     def test_makedirs_raises_oserror_when_directory_exists(self):
00448         self.fs.mkdir('foo')
00449         self.assertRaises(OSError, self.fs.makedirs, 'foo')
00450     
00451     def test_makedirs_creates_directory_when_parent_exists(self):
00452         self.fs.makedirs('foo')
00453         self.assert_(self.fs.isdir('foo'))
00454     
00455     def test_makedirs_creates_directory_when_parent_does_not_exist(self):
00456         self.fs.makedirs('foo/bar')
00457         self.assert_(self.fs.isdir('foo/bar'))
00458 
00459     def test_rmdir_removes_directory(self):
00460         self.fs.mkdir('foo')
00461         self.fs.rmdir('foo')
00462         self.assertFalse(self.fs.exists('foo'))
00463 
00464     def test_rmdir_raises_oserror_if_directory_does_not_exist(self):
00465         self.assertRaises(OSError, self.fs.rmdir, 'foo')
00466 
00467     def test_rmdir_raises_oserror_if_directory_is_not_empty(self):
00468         self.fs.mkdir('foo')
00469         self.fs.write_file('foo/bar', '')
00470         self.assertRaises(OSError, self.fs.rmdir, 'foo')
00471 
00472     def test_rmtree_removes_directory_tree(self):
00473         self.fs.mkdir('foo')
00474         self.fs.write_file('foo/bar', '')
00475         self.fs.rmtree('foo')
00476         self.assertFalse(self.fs.exists('foo'))
00477 
00478     def test_rmtree_is_silent_when_target_does_not_exist(self):
00479         self.assertEqual(self.fs.rmtree('foo'), None)
00480 
00481     def test_remove_removes_file(self):
00482         self.fs.write_file('foo', '')
00483         self.fs.remove('foo')
00484         self.assertFalse(self.fs.exists('foo'))
00485 
00486     def test_remove_raises_oserror_if_file_does_not_exist(self):
00487         self.assertRaises(OSError, self.fs.remove, 'foo')
00488 
00489     def test_rename_renames_file(self):
00490         self.fs.write_file('foo', 'xxx')
00491         self.fs.rename('foo', 'bar')
00492         self.assertFalse(self.fs.exists('foo'))
00493         self.assertEqual(self.fs.cat('bar'), 'xxx')
00494 
00495     def test_rename_raises_oserror_if_file_does_not_exist(self):
00496         self.assertRaises(OSError, self.fs.rename, 'foo', 'bar')
00497 
00498     def test_rename_works_if_target_exists(self):
00499         self.fs.write_file('foo', 'foo')
00500         self.fs.write_file('bar', 'bar')
00501         self.fs.rename('foo', 'bar')
00502         self.assertEqual(self.fs.cat('bar'), 'foo')
00503 
00504     def test_lstat_returns_result_with_all_required_fields(self):
00505         st = self.fs.lstat('.')
00506         for field in obnamlib.metadata_fields:
00507             if field.startswith('st_'):
00508                 self.assert_(hasattr(st, field), 'stat must return %s' % field)
00509 
00510     def test_lstat_returns_right_filetype_for_directory(self):
00511         st = self.fs.lstat('.')
00512         self.assert_(stat.S_ISDIR(st.st_mode))
00513 
00514     def test_lstat_raises_oserror_for_nonexistent_entry(self):
00515         self.assertRaises(OSError, self.fs.lstat, 'notexists')
00516 
00517     def test_chmod_sets_permissions_correctly(self):
00518         self.fs.mkdir('foo')
00519         self.fs.chmod('foo', 0777)
00520         self.assertEqual(self.fs.lstat('foo').st_mode & 0777, 0777)
00521 
00522     def test_chmod_raises_oserror_for_nonexistent_entry(self):
00523         self.assertRaises(OSError, self.fs.chmod, 'notexists', 0)
00524 
00525     def test_lutimes_sets_times_correctly(self):
00526         self.fs.mkdir('foo')
00527 
00528         self.fs.lutimes('foo', 1, 2*1000, 3, 4*1000)
00529 
00530         self.assertEqual(self.fs.lstat('foo').st_atime_sec, 1)
00531         # not all filesystems support sub-second timestamps; those that 
00532         # do not, return 0, so we have to accept either that or the correct
00533         # value, but no other vlaues
00534         self.assert_(self.fs.lstat('foo').st_atime_nsec in [0, 2*1000])
00535 
00536         self.assertEqual(self.fs.lstat('foo').st_mtime_sec, 3)
00537         self.assert_(self.fs.lstat('foo').st_mtime_nsec in [0, 4*1000])
00538 
00539     def test_lutimes_raises_oserror_for_nonexistent_entry(self):
00540         self.assertRaises(OSError, self.fs.lutimes, 'notexists', 1, 2, 3, 4)
00541 
00542     def test_link_creates_hard_link(self):
00543         self.fs.write_file('foo', 'foo')
00544         self.fs.link('foo', 'bar')
00545         st1 = self.fs.lstat('foo')
00546         st2 = self.fs.lstat('bar')
00547         self.assertEqual(st1, st2)
00548 
00549     def test_symlink_creates_soft_link(self):
00550         self.fs.symlink('foo', 'bar')
00551         target = self.fs.readlink('bar')
00552         self.assertEqual(target, 'foo')
00553 
00554     def test_readlink_returns_plain_string(self):
00555         self.fs.symlink(self.non_ascii_name, self.non_ascii_name)
00556         target = self.fs.readlink(self.non_ascii_name)
00557         self.assertEqual(target, self.non_ascii_name)
00558         self.assertEqual(type(target), str)
00559 
00560     def test_symlink_raises_oserror_if_name_exists(self):
00561         self.fs.write_file('foo', 'foo')
00562         self.assertRaises(OSError, self.fs.symlink, 'bar', 'foo')
00563 
00564     def test_opens_existing_file_ok_for_reading(self):
00565         self.fs.write_file('foo', '')
00566         self.assert_(self.fs.open('foo', 'r'))
00567 
00568     def test_opens_existing_file_ok_for_writing(self):
00569         self.fs.write_file('foo', '')
00570         self.assert_(self.fs.open('foo', 'w'))
00571 
00572     def test_open_fails_for_nonexistent_file(self):
00573         self.assertRaises(IOError, self.fs.open, 'foo', 'r')
00574 
00575     def test_cat_reads_existing_file_ok(self):
00576         self.fs.write_file('foo', 'bar')
00577         self.assertEqual(self.fs.cat('foo'), 'bar')
00578 
00579     def test_cat_fails_for_nonexistent_file(self):
00580         self.assertRaises(IOError, self.fs.cat, 'foo')
00581     
00582     def test_has_read_nothing_initially(self):
00583         self.assertEqual(self.fs.bytes_read, 0)
00584     
00585     def test_cat_updates_bytes_read(self):
00586         self.fs.write_file('foo', 'bar')
00587         self.fs.cat('foo')
00588         self.assertEqual(self.fs.bytes_read, 3)
00589 
00590     def test_write_fails_if_file_exists_already(self):
00591         self.fs.write_file('foo', 'bar')
00592         self.assertRaises(OSError, self.fs.write_file, 'foo', 'foobar')
00593 
00594     def test_write_creates_missing_directories(self):
00595         self.fs.write_file('foo/bar', 'yo')
00596         self.assertEqual(self.fs.cat('foo/bar'), 'yo')
00597 
00598     def test_write_leaves_existing_file_intact(self):
00599         self.fs.write_file('foo', 'bar')
00600         try:
00601             self.fs.write_file('foo', 'foobar')
00602         except OSError:
00603             pass
00604         self.assertEqual(self.fs.cat('foo'), 'bar')
00605 
00606     def test_overwrite_creates_new_file_ok(self):
00607         self.fs.overwrite_file('foo', 'bar')
00608         self.assertEqual(self.fs.cat('foo'), 'bar')
00609 
00610     def test_overwrite_replaces_existing_file(self):
00611         self.fs.write_file('foo', 'bar')
00612         self.fs.overwrite_file('foo', 'foobar')
00613         self.assertEqual(self.fs.cat('foo'), 'foobar')
00614     
00615     def test_has_written_nothing_initially(self):
00616         self.assertEqual(self.fs.bytes_written, 0)
00617     
00618     def test_write_updates_written(self):
00619         self.fs.write_file('foo', 'foo')
00620         self.assertEqual(self.fs.bytes_written, 3)
00621     
00622     def test_overwrite_updates_written(self):
00623         self.fs.overwrite_file('foo', 'foo')
00624         self.assertEqual(self.fs.bytes_written, 3)
00625 
00626     def set_up_scan_tree(self):
00627         self.dirs = ['foo', 'foo/bar', 'foobar']
00628         self.dirs = [os.path.join(self.basepath, x) for x in self.dirs]
00629         for dirname in self.dirs:
00630             self.fs.mkdir(dirname)
00631         self.dirs.insert(0, self.basepath)
00632         self.fs.symlink('foo', 'symfoo')
00633         self.pathnames = self.dirs + [os.path.join(self.basepath, 'symfoo')]
00634 
00635     def test_scan_tree_returns_nothing_if_listdir_fails(self):
00636         self.set_up_scan_tree()
00637         def raiser(dirname):
00638             raise OSError((123, 'oops', dirname))
00639         def logerror(msg):
00640             pass
00641         self.fs.listdir2 = raiser
00642         result = list(self.fs.scan_tree(self.basepath, log=logerror))
00643         self.assertEqual(len(result), 1)
00644         pathname, st = result[0]
00645         self.assertEqual(pathname, self.basepath)
00646 
00647     def test_scan_tree_returns_the_right_stuff(self):
00648         self.set_up_scan_tree()
00649         result = list(self.fs.scan_tree(self.basepath))
00650         pathnames = [pathname for pathname, st in result]
00651         self.assertEqual(sorted(pathnames), sorted(self.pathnames))
00652     
00653     def test_scan_tree_filters_away_unwanted(self):
00654         def ok(pathname, st):
00655             return stat.S_ISDIR(st.st_mode)
00656         self.set_up_scan_tree()
00657         result = list(self.fs.scan_tree(self.basepath, ok=ok))
00658         pathnames = [pathname for pathname, st in result]
00659         self.assertEqual(sorted(pathnames), sorted(self.dirs))
00660