Back to index

obnam  1.1
backup_plugin.py
Go to the documentation of this file.
00001 # Copyright (C) 2009, 2010  Lars Wirzenius
00002 #
00003 # This program is free software: you can redistribute it and/or modify
00004 # it under the terms of the GNU General Public License as published by
00005 # the Free Software Foundation, either version 3 of the License, or
00006 # (at your option) any later version.
00007 #
00008 # This program is distributed in the hope that it will be useful,
00009 # but WITHOUT ANY WARRANTY; without even the implied warranty of
00010 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00011 # GNU General Public License for more details.
00012 #
00013 # You should have received a copy of the GNU General Public License
00014 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
00015 
00016 
00017 import errno
00018 import gc
00019 import logging
00020 import os
00021 import re
00022 import stat
00023 import time
00024 import traceback
00025 import tracing
00026 import ttystatus
00027 
00028 import obnamlib
00029 
00030 
00031 class ChunkidPool(object):
00032 
00033     '''Checksum/chunkid mappings that are pending an upload to shared trees.'''
00034     
00035     def __init__(self):
00036         self.clear()
00037         
00038     def add(self, chunkid, checksum):
00039         if checksum not in self._mapping:
00040             self._mapping[checksum] = []
00041         self._mapping[checksum].append(chunkid)
00042 
00043     def __contains__(self, checksum):
00044         return checksum in self._mapping
00045 
00046     def get(self, checksum):
00047         return self._mapping.get(checksum, [])
00048         
00049     def clear(self):
00050         self._mapping = {}
00051         
00052     def __iter__(self):
00053         for checksum in self._mapping.keys():
00054             for chunkid in self._mapping[checksum]:
00055                 yield chunkid, checksum
00056 
00057 
00058 class BackupPlugin(obnamlib.ObnamPlugin):
00059 
00060     def enable(self):
00061         backup_group = obnamlib.option_group['backup'] = 'Backing up'
00062         perf_group = obnamlib.option_group['perf']
00063     
00064         self.app.add_subcommand('backup', self.backup,
00065                                 arg_synopsis='[FILE]...')
00066         self.app.settings.string_list(['root'], 'what to backup')
00067         self.app.settings.string_list(['exclude'], 
00068                                  'regular expression for pathnames to '
00069                                  'exclude from backup (can be used multiple '
00070                                  'times)',
00071                                  group=backup_group)
00072         self.app.settings.boolean(['exclude-caches'],
00073                                     'exclude directories (and their subdirs) '
00074                                     'that contain a CACHEDIR.TAG file',
00075                                  group=backup_group)
00076         self.app.settings.boolean(['one-file-system'],
00077                                     'exclude directories (and their subdirs) '
00078                                     'that are in a different filesystem',
00079                                  group=backup_group)
00080         self.app.settings.bytesize(['checkpoint'],
00081                                       'make a checkpoint after a given SIZE '
00082                                       '(%default)',
00083                                     metavar='SIZE',
00084                                     default=1024**3,
00085                                  group=backup_group)
00086         self.app.settings.integer(['chunkids-per-group'],
00087                                   'encode NUM chunk ids per group (%default)',
00088                                   metavar='NUM',
00089                                   default=obnamlib.DEFAULT_CHUNKIDS_PER_GROUP,
00090                                   group=perf_group)
00091         self.app.settings.choice(['deduplicate'],
00092                                  ['fatalist', 'never', 'verify'],
00093                                  'find duplicate data in backed up data '
00094                                     'and store it only once; three modes '
00095                                     'are available: never de-duplicate, '
00096                                     'verify that no hash collisions happen, '
00097                                     'or (the default) fatalistically accept '
00098                                     'the risk of collisions',
00099                                  metavar='MODE',
00100                                  group=backup_group)
00101         self.app.settings.boolean(['leave-checkpoints'],
00102                                   'leave checkpoint generations at the end '
00103                                     'of a successful backup run',
00104                                  group=backup_group)
00105         self.app.settings.boolean(['small-files-in-btree'],
00106                                   'put contents of small files directly into '
00107                                     'the per-client B-tree, instead of '
00108                                     'separate chunk files; do not use this '
00109                                     'as it is quite bad for performance',
00110                                  group=backup_group)
00111 
00112     def configure_ttystatus_for_backup(self):
00113         self.app.ts['current-file'] = ''
00114         self.app.ts['uploaded-bytes'] = 0
00115         self.file_count = 0
00116         self.uploaded_bytes = 0
00117 
00118         self.app.ts.format('%ElapsedTime() '
00119                            '%Counter(current-file) '
00120                            'files; '
00121                            '%ByteSize(uploaded-bytes) '
00122                            '('
00123                            '%ByteSpeed(uploaded-bytes,10)'
00124                            ') '
00125                            '%String(what)')
00126 
00127     def configure_ttystatus_for_checkpoint_removal(self):
00128         self.app.ts['what'] = 'removing checkpoints'
00129 
00130     def update_progress_with_file(self, filename, metadata):
00131         self.app.ts['what'] = filename
00132         self.app.ts['current-file'] = filename
00133         self.file_count += 1
00134 
00135     def update_progress_with_upload(self, amount):
00136         self.app.ts['uploaded-bytes'] += amount
00137         self.uploaded_bytes += amount
00138 
00139     def report_stats(self):
00140         size_table = [
00141             (1024**4, 'TiB'),
00142             (1024**3, 'GiB'),
00143             (1024**2, 'MiB'),
00144             (1024**1, 'KiB'),
00145             (0, 'B')
00146         ]
00147         
00148         for size_base, size_unit in size_table:
00149             if self.uploaded_bytes >= size_base:
00150                 if size_base > 0:
00151                     size_amount = float(self.uploaded_bytes) / float(size_base)
00152                 else:
00153                     size_amount = float(self.uploaded_bytes)
00154                 break
00155 
00156         speed_table = [
00157             (1024**3, 'GiB/s'),
00158             (1024**2, 'MiB/s'),
00159             (1024**1, 'KiB/s'),
00160             (0, 'B/s')
00161         ]
00162         duration = time.time() - self.started
00163         speed = float(self.uploaded_bytes) / duration
00164         for speed_base, speed_unit in speed_table:
00165             if speed >= speed_base:
00166                 if speed_base > 0:
00167                     speed_amount = speed / speed_base
00168                 else:
00169                     speed_amount = speed
00170                 break
00171 
00172         duration_string = ''
00173         seconds = duration
00174         if seconds >= 3600:
00175             duration_string += '%dh' % int(seconds/3600)
00176             seconds %= 3600
00177         if seconds >= 60:
00178             duration_string += '%dm' % int(seconds/60)
00179             seconds %= 60
00180         if seconds > 0:
00181             duration_string += '%ds' % round(seconds)
00182 
00183         logging.info('Backup performance statistics:')
00184         logging.info('* files found: %s' % self.file_count)
00185         logging.info('* uploaded data: %s bytes (%s %s)' % 
00186                         (self.uploaded_bytes, size_amount, size_unit))
00187         logging.info('* duration: %s s' % duration)
00188         logging.info('* average speed: %s %s' % (speed_amount, speed_unit))
00189         self.app.ts.notify('Backed up %d files, uploaded %.1f %s '
00190                            'in %s at %.1f %s average speed' %
00191                             (self.file_count, size_amount, size_unit,
00192                              duration_string, speed_amount, speed_unit))
00193 
00194     def error(self, msg, exc=None):
00195         self.errors = True
00196         logging.error(msg)
00197         if exc:
00198             logging.debug(repr(exc))
00199 
00200     def parse_checkpoint_size(self, value):
00201         p = obnamlib.ByteSizeParser()
00202         p.set_default_unit('MiB')
00203         return p.parse(value)
00204         
00205     @property
00206     def pretend(self):
00207         return self.app.settings['pretend']
00208 
00209     def backup(self, args):
00210         '''Backup data to repository.'''
00211         logging.info('Backup starts')
00212         logging.info('Checkpoints every %s bytes' % 
00213                         self.app.settings['checkpoint'])
00214 
00215         self.app.settings.require('repository')
00216         self.app.settings.require('client-name')
00217         
00218         if not self.app.settings['repository']:
00219             raise obnamlib.Error('No --repository setting. '
00220                                   'You need to specify it on the command '
00221                                   'line or a configuration file.')
00222         
00223         # This is ugly, but avoids having to update the dependency on
00224         # ttystatus yet again.
00225         if not hasattr(self.app.ts, 'flush'):
00226             self.app.ts.flush = lambda: None
00227 
00228         self.started = time.time()
00229         self.configure_ttystatus_for_backup()
00230         self.app.ts['what'] = 'setting up'
00231         self.app.ts.flush()
00232 
00233         self.compile_exclusion_patterns()
00234         self.memory_dump_counter = 0
00235 
00236         client_name = self.app.settings['client-name']
00237         if self.pretend:
00238             self.repo = self.app.open_repository()
00239             self.repo.open_client(client_name)
00240         else:
00241             self.repo = self.app.open_repository(create=True)
00242             self.add_client(client_name)
00243             self.repo.lock_client(client_name)
00244             
00245             # Need to lock the shared stuff briefly, so encryption etc
00246             # gets initialized.
00247             self.repo.lock_shared()
00248             self.repo.unlock_shared()
00249 
00250         self.errors = False
00251         self.chunkid_pool = ChunkidPool()
00252         try:
00253             if not self.pretend:
00254                 self.repo.start_generation()
00255             self.fs = None
00256             roots = self.app.settings['root'] + args
00257             if roots:
00258                 self.backup_roots(roots)
00259             self.app.ts['what'] = 'committing changes'
00260             self.app.ts.flush()
00261             if not self.pretend:
00262                 self.repo.lock_shared()
00263                 self.add_chunks_to_shared()
00264                 self.repo.commit_client()
00265                 self.repo.commit_shared()
00266             self.repo.fs.close()
00267             self.app.ts.clear()
00268             self.report_stats()
00269 
00270             logging.info('Backup finished.')
00271             self.app.dump_memory_profile('at end of backup run')
00272         except BaseException, e:
00273             logging.debug('Handling exception %s' % str(e))
00274             logging.debug(traceback.format_exc())
00275             self.unlock_when_error()
00276             raise
00277 
00278         if self.errors:
00279             raise obnamlib.Error('There were errors during the backup')
00280 
00281     def unlock_when_error(self):
00282         try:
00283             if self.repo.got_client_lock:
00284                 logging.info('Unlocking client because of error')
00285                 self.repo.unlock_client()
00286             if self.repo.got_shared_lock:
00287                 logging.info('Unlocking shared trees because of error')
00288                 self.repo.unlock_shared()
00289         except BaseException, e2:
00290             logging.debug('Got second exception while unlocking: %s' % 
00291                             str(e2))
00292             logging.debug(traceback.format_exc())
00293 
00294     def add_chunks_to_shared(self):
00295         for chunkid, checksum in self.chunkid_pool:
00296             self.repo.put_chunk_in_shared_trees(chunkid, checksum)
00297         self.chunkid_pool.clear()
00298 
00299     def add_client(self, client_name):
00300         self.repo.lock_root()
00301         if client_name not in self.repo.list_clients():
00302             tracing.trace('adding new client %s' % client_name)
00303             tracing.trace('client list before adding: %s' % 
00304                             self.repo.list_clients())
00305             self.repo.add_client(client_name)
00306             tracing.trace('client list after adding: %s' % 
00307                             self.repo.list_clients())
00308         self.repo.commit_root()
00309         self.repo = self.app.open_repository(repofs=self.repo.fs.fs)
00310 
00311     def compile_exclusion_patterns(self):
00312         log = self.app.settings['log']
00313         if log:
00314             log = self.app.settings['log']
00315             self.app.settings['exclude'].append(log)
00316         for pattern in self.app.settings['exclude']:
00317             logging.debug('Exclude pattern: %s' % pattern)
00318 
00319         self.exclude_pats = []
00320         for x in self.app.settings['exclude']:
00321             try:
00322                 self.exclude_pats.append(re.compile(x))
00323             except re.error, e:
00324                 msg = 'error compiling regular expression "%s": %s' % (x, e)
00325                 logging.error(msg)
00326                 self.app.ts.error(msg)
00327 
00328     def backup_roots(self, roots):
00329         self.fs = self.app.fsf.new(roots[0])
00330         self.fs.connect()
00331 
00332         absroots = []
00333         for root in roots:
00334             self.fs.reinit(root)
00335             absroots.append(self.fs.abspath('.'))
00336         
00337         if not self.pretend:
00338             self.remove_old_roots(absroots)
00339 
00340         self.checkpoints = []
00341         self.last_checkpoint = 0
00342         self.interval = self.app.settings['checkpoint']
00343 
00344         for root in roots:
00345             logging.info('Backing up root %s' % root)
00346             self.fs.reinit(root)
00347             absroot = self.fs.abspath('.')
00348             self.root_metadata = self.fs.lstat(absroot)
00349             for pathname, metadata in self.find_files(absroot):
00350                 logging.debug('Backing up %s' % pathname)
00351                 try:
00352                     if stat.S_ISDIR(metadata.st_mode):
00353                         self.backup_dir_contents(pathname)
00354                     elif stat.S_ISREG(metadata.st_mode):
00355                         assert metadata.md5 is None
00356                         metadata.md5 = self.backup_file_contents(pathname,
00357                                                                  metadata)
00358                     self.backup_metadata(pathname, metadata)
00359                 except (IOError, OSError), e:
00360                     msg = 'Can\'t back up %s: %s' % (pathname, e.strerror)
00361                     self.error(msg, e)
00362                     if e.errno == errno.ENOSPC:
00363                         raise
00364                 if self.time_for_checkpoint():
00365                     self.make_checkpoint()
00366 
00367             self.backup_parents('.')
00368 
00369         remove_checkpoints = (not self.errors and
00370                               not self.app.settings['leave-checkpoints']
00371                               and not self.pretend)
00372         if remove_checkpoints:
00373             self.configure_ttystatus_for_checkpoint_removal()
00374             for gen in self.checkpoints:
00375                 self.app.ts['checkpoint'] = gen
00376                 self.repo.remove_generation(gen)
00377 
00378         if self.fs:
00379             self.fs.close()
00380 
00381     def time_for_checkpoint(self):
00382         bytes_since = (self.repo.fs.bytes_written - self.last_checkpoint)
00383         return bytes_since >= self.interval
00384 
00385     def make_checkpoint(self):
00386         logging.info('Making checkpoint')
00387         self.app.ts['what'] = 'making checkpoint'
00388         self.app.ts.flush()
00389         if not self.pretend:
00390             self.checkpoints.append(self.repo.new_generation)
00391             self.backup_parents('.')
00392             self.repo.lock_shared()
00393             self.add_chunks_to_shared()
00394             self.repo.commit_client(checkpoint=True)
00395             self.repo.commit_shared()
00396             self.last_checkpoint = self.repo.fs.bytes_written
00397             self.repo = self.app.open_repository(repofs=self.repo.fs.fs)
00398             self.repo.lock_client(self.app.settings['client-name'])
00399             self.repo.start_generation()
00400             self.app.dump_memory_profile('at end of checkpoint')
00401         self.app.ts['what'] = self.app.ts['current-file']
00402 
00403     def find_files(self, root):
00404         '''Find all files and directories that need to be backed up.
00405         
00406         This is a generator. It yields (pathname, metadata) pairs.
00407         
00408         The caller should not recurse through directories, just backup
00409         the directory itself (name, metadata, file list).
00410         
00411         '''
00412 
00413         for pathname, st in self.fs.scan_tree(root, ok=self.can_be_backed_up):
00414             tracing.trace('considering %s' % pathname)
00415             try:
00416                 metadata = obnamlib.read_metadata(self.fs, pathname, st=st)
00417                 self.update_progress_with_file(pathname, metadata)
00418                 if self.needs_backup(pathname, metadata):
00419                     yield pathname, metadata
00420             except GeneratorExit:
00421                 raise
00422             except KeyboardInterrupt:
00423                 logging.error('Keyboard interrupt')
00424                 raise
00425             except BaseException, e:
00426                 msg = 'Cannot back up %s: %s' % (pathname, str(e))
00427                 self.error(msg, e)
00428 
00429     def can_be_backed_up(self, pathname, st):
00430         if self.app.settings['one-file-system']:
00431             if st.st_dev != self.root_metadata.st_dev: 
00432                 logging.info('Excluding (one-file-system): %s' %
00433                              pathname)
00434                 return False
00435 
00436         for pat in self.exclude_pats:
00437             if pat.search(pathname):
00438                 logging.info('Excluding (pattern): %s' % pathname)
00439                 return False
00440 
00441         if stat.S_ISDIR(st.st_mode) and self.app.settings['exclude-caches']:
00442             tag_filename = 'CACHEDIR.TAG'
00443             tag_contents = 'Signature: 8a477f597d28d172789f06886806bc55'
00444             tag_path = os.path.join(pathname, 'CACHEDIR.TAG')
00445             if self.fs.exists(tag_path):
00446                 # Can't use with, because Paramiko's SFTPFile does not work.
00447                 f = self.fs.open(tag_path, 'rb')
00448                 data = f.read(len(tag_contents))
00449                 f.close()
00450                 if data == tag_contents:
00451                     logging.info('Excluding (cache dir): %s' % pathname)
00452                     return False
00453         
00454         return True
00455 
00456     def needs_backup(self, pathname, current):
00457         '''Does a given file need to be backed up?'''
00458         
00459         # Directories always require backing up so that backup_dir_contents
00460         # can remove stuff that no longer exists from them.
00461         if current.isdir():
00462             return True
00463         if self.pretend:
00464             gens = self.repo.list_generations()
00465             if not gens:
00466                 return True
00467             gen = gens[-1]
00468         else:
00469             gen = self.repo.new_generation
00470         try:
00471             old = self.repo.get_metadata(gen, pathname)
00472         except obnamlib.Error:
00473             # File does not exist in the previous generation, so it
00474             # does need to be backed up.
00475             return True
00476         return (current.st_mtime_sec != old.st_mtime_sec or
00477                 current.st_mtime_nsec != old.st_mtime_nsec or
00478                 current.st_mode != old.st_mode or
00479                 current.st_nlink != old.st_nlink or
00480                 current.st_size != old.st_size or
00481                 current.st_uid != old.st_uid or
00482                 current.st_gid != old.st_gid or
00483                 current.xattr != old.xattr)
00484 
00485     def backup_parents(self, root):
00486         '''Back up parents of root, non-recursively.'''
00487         root = self.fs.abspath(root)
00488         tracing.trace('backing up parents of %s', root)
00489         while True:
00490             parent = os.path.dirname(root)
00491             metadata = obnamlib.read_metadata(self.fs, root)
00492             if not self.pretend:
00493                 self.repo.create(root, metadata)
00494             if root == parent:
00495                 break
00496             root = parent
00497 
00498     def backup_metadata(self, pathname, metadata):
00499         '''Back up metadata for a filesystem object'''
00500         
00501         tracing.trace('backup_metadata: %s', pathname)
00502         if not self.pretend:
00503             self.repo.create(pathname, metadata)
00504 
00505     def backup_file_contents(self, filename, metadata):
00506         '''Back up contents of a regular file.'''
00507         tracing.trace('backup_file_contents: %s', filename)
00508         if self.pretend:
00509             tracing.trace('pretending to upload the whole file')
00510             self.update_progress_with_upload(metadata.st_size)
00511             return
00512 
00513         tracing.trace('setting file chunks to empty')
00514         if not self.pretend:
00515             self.repo.set_file_chunks(filename, [])
00516 
00517         tracing.trace('opening file for reading')
00518         f = self.fs.open(filename, 'r')
00519 
00520         summer = self.repo.new_checksummer()
00521 
00522         max_intree = self.app.settings['node-size'] / 4
00523         if (metadata.st_size <= max_intree and 
00524             self.app.settings['small-files-in-btree']):
00525             contents = f.read()
00526             assert len(contents) <= max_intree # FIXME: silly error checking
00527             f.close()
00528             self.repo.set_file_data(filename, contents)
00529             summer.update(contents)
00530             return summer.digest()
00531 
00532         chunk_size = int(self.app.settings['chunk-size'])
00533         chunkids = []
00534         while True:
00535             tracing.trace('reading some data')
00536             data = f.read(chunk_size)
00537             if not data:
00538                 tracing.trace('end of data')
00539                 break
00540             tracing.trace('got %d bytes of data' % len(data))
00541             summer.update(data)
00542             if not self.pretend:
00543                 chunkids.append(self.backup_file_chunk(data))
00544                 if len(chunkids) >= self.app.settings['chunkids-per-group']:
00545                     tracing.trace('adding %d chunkids to file' % len(chunkids))
00546                     self.repo.append_file_chunks(filename, chunkids)
00547                     self.app.dump_memory_profile('after appending some '
00548                                                     'chunkids')
00549                     chunkids = []
00550             else:
00551                 self.update_progress_with_upload(len(data))
00552             
00553             if not self.pretend and self.time_for_checkpoint():
00554                 logging.debug('making checkpoint in the middle of a file')
00555                 self.repo.append_file_chunks(filename, chunkids)
00556                 chunkids = []
00557                 self.make_checkpoint()
00558             
00559         tracing.trace('closing file')
00560         f.close()
00561         if chunkids:
00562             assert not self.pretend
00563             tracing.trace('adding final %d chunkids to file' % len(chunkids))
00564             self.repo.append_file_chunks(filename, chunkids)
00565         self.app.dump_memory_profile('at end of file content backup for %s' %
00566                                      filename)
00567         tracing.trace('done backing up file contents')
00568         return summer.digest()
00569         
00570     def backup_file_chunk(self, data):
00571         '''Back up a chunk of data by putting it into the repository.'''
00572 
00573         def find():
00574             return (self.repo.find_chunks(checksum) + 
00575                      self.chunkid_pool.get(checksum))
00576 
00577         def get(chunkid):
00578             return self.repo.get_chunk(chunkid)
00579 
00580         def put():
00581             self.update_progress_with_upload(len(data))
00582             return self.repo.put_chunk_only(data)
00583             
00584         def share(chunkid):
00585             self.chunkid_pool.add(chunkid, checksum)
00586 
00587         checksum = self.repo.checksum(data)
00588 
00589         mode = self.app.settings['deduplicate']
00590         if mode == 'never':
00591             return put()
00592         elif mode == 'verify':
00593             for chunkid in find():
00594                 data2 = get(chunkid)
00595                 if data == data2:
00596                     return chunkid
00597             else:
00598                 chunkid = put()
00599                 share(chunkid)
00600                 return chunkid
00601         elif mode == 'fatalist':
00602             existing = find()
00603             if existing:
00604                 return existing[0]
00605             else:
00606                 chunkid = put()
00607                 share(chunkid)
00608                 return chunkid
00609         else:
00610             if not hasattr(self, 'bad_deduplicate_reported'):
00611                 logging.error('unknown --deduplicate setting value')
00612                 self.bad_deduplicate_reported = True
00613             chunkid = put()
00614             share(chunkid)
00615             return chunkid
00616 
00617     def backup_dir_contents(self, root):
00618         '''Back up the list of files in a directory.'''
00619 
00620         tracing.trace('backup_dir: %s', root)
00621         if self.pretend:
00622             return
00623 
00624         new_basenames = self.fs.listdir(root)
00625         old_basenames = self.repo.listdir(self.repo.new_generation, root)
00626 
00627         for old in old_basenames:
00628             pathname = os.path.join(root, old)
00629             if old not in new_basenames:
00630                 self.repo.remove(pathname)
00631         # Files that are created after the previous generation will be
00632         # added to the directory when they are backed up, so we don't
00633         # need to worry about them here.
00634 
00635     def remove_old_roots(self, new_roots):
00636         '''Remove from started generation anything that is not a backup root.
00637         
00638         We recurse from filesystem root directory until getting to one of 
00639         the new backup roots, or a directory or file that is not a parent 
00640         of one of the new backup roots. We remove anything that is not a
00641         new backup root, or their parent.
00642         
00643         '''
00644         
00645         def is_parent(pathname):
00646             x = pathname + os.sep
00647             for new_root in new_roots:
00648                 if new_root.startswith(x):
00649                     return True
00650             return False
00651 
00652         def helper(dirname):
00653             gen_id = self.repo.new_generation
00654             basenames = self.repo.listdir(gen_id, dirname)
00655             for basename in basenames:
00656                 pathname = os.path.join(dirname, basename)
00657                 if is_parent(pathname):
00658                     metadata = self.repo.get_metadata(gen_id, pathname)
00659                     if metadata.isdir():
00660                         helper(pathname)
00661                 elif pathname not in new_roots:
00662                     self.repo.remove(pathname)
00663 
00664         assert not self.pretend
00665         helper('/')
00666