Back to index

python-biopython  1.60
test_bgzf.py
Go to the documentation of this file.
00001 # Copyright 2010-2011 by Peter Cock.
00002 # All rights reserved.
00003 # This code is part of the Biopython distribution and governed by its
00004 # license.  Please see the LICENSE file that should have been included
00005 # as part of this package.
00006 """Test code for working with BGZF files (used in BAM files).
00007 
00008 See also the doctests in bgzf.py which are called via run_tests.py
00009 """
00010 
00011 import unittest
00012 import gzip
00013 import os
00014 from random import shuffle
00015 
00016 from Bio._py3k import _as_bytes, _as_string
00017 _empty_bytes_string = _as_bytes("")
00018 
00019 from Bio import bgzf
00020 
00021 class BgzfTests(unittest.TestCase):
00022     def setUp(self):
00023         self.temp_file = "temp.bgzf"
00024         if os.path.isfile(self.temp_file):
00025             os.remove(self.temp_file)
00026 
00027     def tearDown(self):
00028         if os.path.isfile(self.temp_file):
00029             os.remove(self.temp_file)
00030 
00031     def rewrite(self, compressed_input_file, output_file):
00032         h = gzip.open(compressed_input_file, "rb")
00033         data = h.read()
00034         h.close()
00035 
00036         h = bgzf.BgzfWriter(output_file, "wb")
00037         h.write(data)
00038         h.close() #Gives empty BGZF block as BAM EOF marker
00039 
00040         h = gzip.open(output_file)
00041         new_data = h.read()
00042         h.close()
00043 
00044         #Check the decompressed files agree
00045         self.assert_(new_data, "Empty BGZF file?")
00046         self.assertEqual(len(data), len(new_data))
00047         self.assertEqual(data, new_data)
00048 
00049     def check_blocks(self, old_file, new_file):
00050         h = open(old_file, "rb")
00051         old = list(bgzf.BgzfBlocks(h))
00052         h.close()
00053         h = open(new_file, "rb")
00054         new = list(bgzf.BgzfBlocks(h))
00055         h.close()
00056         self.assertEqual(len(old), len(new))
00057         self.assertEqual(old, new)
00058 
00059     def check_text(self, old_file, new_file):
00060         h = open(old_file) #text mode!
00061         old_line = h.readline()
00062         old = old_line + h.read()
00063         h.close()
00064 
00065         h = bgzf.BgzfReader(new_file, "r") #Text mode!
00066         new_line = h.readline()
00067         new = new_line + h.read(len(old))
00068         h.close()
00069 
00070         self.assertEqual(old_line, new_line)
00071         self.assertEqual(len(old), len(new))
00072         self.assertEqual(old, new)
00073 
00074     def check_by_line(self, old_file, new_file, old_gzip=False):
00075         for mode in ["r", "rb"]:
00076             if old_gzip:
00077                 h = gzip.open(old_file, mode)
00078             else:
00079                 h = open(old_file, mode)
00080             old = h.read()
00081             #Seems gzip can return bytes even if mode="r",
00082             #perhaps a bug in Python 3.2?
00083             if "b" in mode:
00084                 old = _as_bytes(old)
00085             else:
00086                 old = _as_string(old)
00087             h.close()
00088 
00089             for cache in [1,10]:
00090                 h = bgzf.BgzfReader(new_file, mode, max_cache=cache)
00091                 if "b" in mode:
00092                     new = _empty_bytes_string.join(line for line in h)
00093                 else:
00094                     new = "".join(line for line in h)
00095                 h.close()
00096 
00097                 self.assertEqual(len(old), len(new))
00098                 self.assertEqual(old[:10], new[:10], \
00099                                  "%r vs %r, mode %r" % (old[:10], new[:10], mode))
00100                 self.assertEqual(old, new)
00101 
00102     def check_by_char(self, old_file, new_file, old_gzip=False):
00103         for mode in ["r", "rb"]:
00104             if old_gzip:
00105                 h = gzip.open(old_file,mode)
00106             else:
00107                 h = open(old_file, mode)
00108             old = h.read()
00109             #Seems gzip can return bytes even if mode="r",
00110             #perhaps a bug in Python 3.2?
00111             if "b" in mode:
00112                 old = _as_bytes(old)
00113             else:
00114                 old = _as_string(old)
00115             h.close()
00116 
00117             for cache in [1,10]:
00118                 h = bgzf.BgzfReader(new_file, mode, max_cache=cache)
00119                 temp = []
00120                 while True:
00121                     char = h.read(1)
00122                     if not char: break
00123                     temp.append(char)
00124                 if "b" in mode:
00125                     new = _empty_bytes_string.join(temp)
00126                 else:
00127                     new = "".join(temp)
00128                 del temp
00129                 h.close()
00130 
00131                 self.assertEqual(len(old), len(new))
00132                 #If bytes vs unicode mismatch, give a short error message:
00133                 self.assertEqual(old[:10], new[:10], \
00134                                  "%r vs %r, mode %r" % (old[:10], new[:10], mode))
00135                 self.assertEqual(old, new)
00136 
00137     def check_random(self, filename):
00138         """Check BGZF random access by reading blocks in forward & reverse order"""
00139         h = gzip.open(filename, "rb")
00140         old = h.read()
00141         h.close()
00142 
00143         h = open(filename, "rb")
00144         blocks = list(bgzf.BgzfBlocks(h))
00145         h.close()
00146 
00147         #Forward
00148         new = _empty_bytes_string
00149         h = bgzf.BgzfReader(filename, "rb")
00150         for start, raw_len, data_start, data_len in blocks:
00151             #print start, raw_len, data_start, data_len
00152             h.seek(bgzf.make_virtual_offset(start,0))
00153             data = h.read(data_len)
00154             self.assertEqual(len(data), data_len)
00155             #self.assertEqual(start + raw_len, h._handle.tell())
00156             self.assertEqual(len(new), data_start)
00157             new += data
00158         h.close()
00159         self.assertEqual(len(old), len(new))
00160         self.assertEqual(old, new)
00161 
00162         #Reverse
00163         new = _empty_bytes_string
00164         h = bgzf.BgzfReader(filename, "rb")
00165         for start, raw_len, data_start, data_len in blocks[::-1]:
00166             #print start, raw_len, data_start, data_len
00167             h.seek(bgzf.make_virtual_offset(start,0))
00168             data = h.read(data_len)
00169             self.assertEqual(len(data), data_len)
00170             #self.assertEqual(start + raw_len, h._handle.tell())
00171             new = data + new
00172         h.close()
00173         self.assertEqual(len(old), len(new))
00174         self.assertEqual(old, new)
00175 
00176         #Jump back - non-sequential seeking
00177         if len(blocks) >= 3:
00178             h = bgzf.BgzfReader(filename, "rb", max_cache = 1)
00179             #Seek to a late block in the file,
00180             #half way into the third last block
00181             start, raw_len, data_start, data_len = blocks[-3]
00182             voffset = bgzf.make_virtual_offset(start, data_len // 2)
00183             h.seek(voffset)
00184             self.assertEqual(voffset, h.tell())
00185             data = h.read(1000)
00186             self.assertTrue(data in old)
00187             self.assertEqual(old.find(data), data_start + data_len // 2)
00188             #Now seek to an early block in the file,
00189             #half way into the second block
00190             start, raw_len, data_start, data_len = blocks[1]
00191             h.seek(bgzf.make_virtual_offset(start, data_len // 2))
00192             voffset = bgzf.make_virtual_offset(start, data_len // 2)
00193             h.seek(voffset)
00194             self.assertEqual(voffset, h.tell())
00195             #Now read all rest of this block and start of next block
00196             data = h.read(data_len + 1000)
00197             self.assertTrue(data in old)
00198             self.assertEqual(old.find(data), data_start + data_len // 2)
00199             h.close()
00200 
00201         #Check seek/tell at block boundaries
00202         v_offsets = []
00203         for start, raw_len, data_start, data_len in blocks:
00204             for within_offset in [0, 1, data_len // 2, data_len - 1]:
00205                 if within_offset < 0 or data_len <= within_offset:
00206                     continue
00207                 voffset = bgzf.make_virtual_offset(start, within_offset)
00208                 real_offset = data_start + within_offset
00209                 v_offsets.append((voffset, real_offset))
00210         shuffle(v_offsets)
00211         h = bgzf.BgzfReader(filename, "rb", max_cache = 1)
00212         for voffset, real_offset in v_offsets:
00213             h.seek(0)
00214             assert voffset >= 0 and real_offset >= 0
00215             self.assertEqual(h.read(real_offset), old[:real_offset])
00216             self.assertEqual(h.tell(), voffset)
00217         for voffset, real_offset in v_offsets:
00218             h.seek(voffset)
00219             self.assertEqual(h.tell(), voffset)
00220         h.close()
00221 
00222 
00223     def test_random_bam_ex1(self):
00224         """Check random access to SamBam/ex1.bam"""
00225         self.check_random("SamBam/ex1.bam")
00226 
00227     def test_random_bam_ex1_refresh(self):
00228         """Check random access to SamBam/ex1_refresh.bam"""
00229         self.check_random("SamBam/ex1_refresh.bam")
00230 
00231     def test_random_bam_ex1_header(self):
00232         """Check random access to SamBam/ex1_header.bam"""
00233         self.check_random("SamBam/ex1_header.bam")
00234 
00235     def test_random_example_fastq(self):
00236         """Check random access to Quality/example.fastq.bgz"""
00237         self.check_random("Quality/example.fastq.bgz")
00238 
00239     def test_text_example_fastq(self):
00240         """Check text mode access to Quality/example.fastq.bgz"""
00241         self.check_text("Quality/example.fastq", "Quality/example.fastq.bgz")
00242 
00243     def test_iter_example_fastq(self):
00244         """Check iteration over Quality/example.fastq.bgz"""
00245         self.check_by_line("Quality/example.fastq", "Quality/example.fastq.bgz")
00246         self.check_by_char("Quality/example.fastq", "Quality/example.fastq.bgz")
00247 
00248     def test_iter_example_gb(self):
00249         """Check iteration over GenBank/NC_000932.gb.bgz"""
00250         self.check_by_line("GenBank/NC_000932.gb", "GenBank/NC_000932.gb.bgz")
00251         self.check_by_char("GenBank/NC_000932.gb", "GenBank/NC_000932.gb.bgz")
00252 
00253     def test_bam_ex1(self):
00254         """Reproduce BGZF compression for BAM file"""
00255         temp_file = self.temp_file
00256 
00257         #Note this example is from an old version of samtools
00258         #and all the blocks are full (except the last one)
00259         self.rewrite("SamBam/ex1.bam", temp_file)
00260 
00261         #Now check the blocks agree (using the fact that
00262         #this example BAM file has simple block usage)
00263         self.check_blocks("SamBam/ex1.bam", temp_file)
00264 
00265     def test_iter_bam_ex1(self):
00266         """Check iteration over SamBam/ex1.bam"""
00267         self.check_by_char("SamBam/ex1.bam", "SamBam/ex1.bam", True)
00268 
00269     def test_example_fastq(self):
00270         """Reproduce BGZF compression for a FASTQ file"""
00271         temp_file = self.temp_file
00272         self.rewrite("Quality/example.fastq.gz", temp_file)
00273         self.check_blocks("Quality/example.fastq.bgz", temp_file)
00274 
00275 if __name__ == "__main__":
00276     runner = unittest.TextTestRunner(verbosity = 2)
00277     unittest.main(testRunner=runner)