Back to index

python-biopython  1.60
test_SeqIO_index.py
Go to the documentation of this file.
00001 # Copyright 2009-2012 by Peter Cock.  All rights reserved.
00002 # This code is part of the Biopython distribution and governed by its
00003 # license.  Please see the LICENSE file that should have been included
00004 # as part of this package.
00005 
00006 """Unit tests for Bio.SeqIO.index(...) and index_db() functions."""
00007 
00008 try:
00009     import sqlite3
00010 except ImportError:
00011     #Try and run what tests we can on Python 2.4 or Jython
00012     #where we don't expect this to be installed.
00013     sqlite3 = None
00014 
00015 import sys
00016 import os
00017 import unittest
00018 import gzip
00019 from StringIO import StringIO
00020 try:
00021     #This is in Python 2.6+, but we need it on Python 3
00022     from io import BytesIO
00023 except ImportError:
00024     BytesIO = StringIO
00025 from Bio._py3k import _as_bytes, _bytes_to_string
00026 
00027 
00028 from Bio.SeqRecord import SeqRecord
00029 from Bio import SeqIO
00030 from Bio.SeqIO._index import _FormatToRandomAccess
00031 from Bio.Alphabet import generic_protein, generic_nucleotide, generic_dna
00032 
00033 from seq_tests_common import compare_record
00034 
00035 def add_prefix(key):
00036     """Dummy key_function for testing index code."""
00037     return "id_" + key
00038 
00039 def gzip_open(filename, format):
00040     #At time of writing, under Python 3.2.2 seems gzip.open(filename, mode)
00041     #insists on giving byte strings (i.e. binary mode)
00042     #See http://bugs.python.org/issue13989
00043     if sys.version_info[0] < 3 or format in SeqIO._BinaryFormats:
00044         return gzip.open(filename)
00045     handle = gzip.open(filename)
00046     data = handle.read() #bytes!
00047     handle.close()
00048     return StringIO(_bytes_to_string(data))
00049 
00050 
00051 if sqlite3:
00052     class OldIndexTest(unittest.TestCase):
00053         """Testing a pre-built index (make sure cross platform etc).
00054 
00055         >>> from Bio import SeqIO
00056         >>> d = SeqIO.index_db("triple_sff.idx", ["E3MFGYR02_no_manifest.sff", "greek.sff", "paired.sff"], "sff")
00057         >>> len(d)
00058         54
00059         """
00060         def test_old(self):
00061             """Load existing index with no options."""
00062             d = SeqIO.index_db("Roche/triple_sff.idx")
00063             self.assertEqual(54, len(d))
00064 
00065         def test_old_format(self):
00066             """Load existing index with correct format."""
00067             d = SeqIO.index_db("Roche/triple_sff.idx", format="sff")
00068             self.assertEqual(54, len(d))
00069 
00070         def test_old_format_wrong(self):
00071             """Load existing index with wrong format."""
00072             self.assertRaises(ValueError, SeqIO.index_db,
00073                               "Roche/triple_sff.idx", format="fasta")
00074 
00075         def test_old_files(self):
00076             """Load existing index with correct files."""
00077             d = SeqIO.index_db("Roche/triple_sff.idx",
00078                                ["E3MFGYR02_no_manifest.sff", "greek.sff", "paired.sff"])
00079             self.assertEqual(54, len(d))
00080 
00081         def test_old_files_wrong(self):
00082             """Load existing index with wrong files."""
00083             self.assertRaises(ValueError, SeqIO.index_db,
00084                               "Roche/triple_sff.idx", ["a.sff", "b.sff", "c.sff"])
00085 
00086         def test_old_files_wrong2(self):
00087             """Load existing index with wrong number of files."""
00088             self.assertRaises(ValueError, SeqIO.index_db,
00089                               "Roche/triple_sff.idx",
00090                               ["E3MFGYR02_no_manifest.sff", "greek.sff"])
00091 
00092 
00093 class IndexDictTests(unittest.TestCase):
00094     """Cunning unit test where methods are added at run time."""
00095     def simple_check(self, filename, format, alphabet, comp):
00096         """Check indexing (without a key function)."""
00097         if comp:
00098             h = gzip_open(filename, format)
00099             id_list = [rec.id for rec in SeqIO.parse(h, format, alphabet)]
00100             h.close()
00101         else:
00102             id_list = [rec.id for rec in SeqIO.parse(filename, format, alphabet)]
00103 
00104         rec_dict = SeqIO.index(filename, format, alphabet)
00105         self.check_dict_methods(rec_dict, id_list, id_list)
00106         rec_dict._proxy._handle.close() #TODO - Better solution
00107         del rec_dict
00108 
00109         if not sqlite3:
00110             return
00111 
00112         #In memory,
00113         #note here give filenames as list of strings
00114         rec_dict = SeqIO.index_db(":memory:", [filename], format,
00115                                   alphabet)
00116         self.check_dict_methods(rec_dict, id_list, id_list)
00117         rec_dict.close()
00118         del rec_dict
00119 
00120         #check error conditions
00121         self.assertRaises(ValueError, SeqIO.index_db,
00122                           ":memory:", format="dummy")
00123         self.assertRaises(ValueError, SeqIO.index_db,
00124                           ":memory:", filenames=["dummy"])
00125 
00126         #Saving to file...
00127         index_tmp = filename + ".idx"
00128         if os.path.isfile(index_tmp):
00129             os.remove(index_tmp)
00130 
00131         #To disk,
00132         #note here we give the filename as a single string
00133         #to confirm that works too (convience feature).
00134         rec_dict = SeqIO.index_db(index_tmp, filename, format,
00135                                   alphabet)
00136         self.check_dict_methods(rec_dict, id_list, id_list)
00137         rec_dict.close()
00138         rec_dict._con.close() #hack for PyPy
00139         del rec_dict
00140 
00141         #Now reload it...
00142         rec_dict = SeqIO.index_db(index_tmp, [filename], format,
00143                                   alphabet)
00144         self.check_dict_methods(rec_dict, id_list, id_list)
00145         rec_dict.close()
00146         rec_dict._con.close() #hack for PyPy
00147         del rec_dict
00148 
00149         #Now reload without passing filenames and format
00150         rec_dict = SeqIO.index_db(index_tmp, alphabet=alphabet)
00151         self.check_dict_methods(rec_dict, id_list, id_list)
00152         rec_dict.close()
00153         rec_dict._con.close() #hack for PyPy
00154         del rec_dict
00155         os.remove(index_tmp)
00156     
00157     def key_check(self, filename, format, alphabet, comp):
00158         """Check indexing with a key function."""
00159         if comp:
00160             h = gzip_open(filename, format)
00161             id_list = [rec.id for rec in SeqIO.parse(h, format, alphabet)]
00162             h.close()
00163         else:
00164             id_list = [rec.id for rec in SeqIO.parse(filename, format, alphabet)]
00165 
00166         key_list = [add_prefix(id) for id in id_list]
00167         rec_dict = SeqIO.index(filename, format, alphabet, add_prefix)
00168         self.check_dict_methods(rec_dict, key_list, id_list)
00169         rec_dict._proxy._handle.close() #TODO - Better solution
00170         del rec_dict
00171 
00172         if not sqlite3:
00173             return
00174 
00175         #In memory,
00176         rec_dict = SeqIO.index_db(":memory:", [filename], format, alphabet,
00177                                   add_prefix)
00178         self.check_dict_methods(rec_dict, key_list, id_list)
00179         #check error conditions
00180         self.assertRaises(ValueError, SeqIO.index_db,
00181                           ":memory:", format="dummy",
00182                           key_function=add_prefix)
00183         self.assertRaises(ValueError, SeqIO.index_db,
00184                           ":memory:", filenames=["dummy"],
00185                           key_function=add_prefix)
00186         rec_dict.close()
00187         del rec_dict
00188 
00189         #Saving to file...
00190         index_tmp = filename + ".key.idx"
00191         if os.path.isfile(index_tmp):
00192             os.remove(index_tmp)
00193         rec_dict = SeqIO.index_db(index_tmp, [filename], format, alphabet,
00194                                   add_prefix)
00195         self.check_dict_methods(rec_dict, key_list, id_list)
00196         rec_dict.close()
00197         rec_dict._con.close() #hack for PyPy
00198         del rec_dict
00199 
00200         #Now reload it...
00201         rec_dict = SeqIO.index_db(index_tmp, [filename], format, alphabet,
00202                                   add_prefix)
00203         self.check_dict_methods(rec_dict, key_list, id_list)
00204         rec_dict.close()
00205         rec_dict._con.close() #hack for PyPy
00206         del rec_dict
00207 
00208         #Now reload without passing filenames and format
00209         rec_dict = SeqIO.index_db(index_tmp, alphabet=alphabet,
00210                                   key_function=add_prefix)
00211         self.check_dict_methods(rec_dict, key_list, id_list)
00212         rec_dict.close()
00213         rec_dict._con.close() #hack for PyPy
00214         del rec_dict
00215         os.remove(index_tmp)
00216         #Done
00217     
00218     def check_dict_methods(self, rec_dict, keys, ids):
00219         self.assertEqual(set(keys), set(rec_dict.keys()))
00220         #This is redundant, I just want to make sure len works:
00221         self.assertEqual(len(keys), len(rec_dict))
00222         #Make sure boolean evaluation works
00223         self.assertEqual(bool(keys), bool(rec_dict))
00224         for key,id in zip(keys, ids):
00225             self.assertTrue(key in rec_dict)
00226             self.assertEqual(id, rec_dict[key].id)
00227             self.assertEqual(id, rec_dict.get(key).id)
00228         #Check non-existant keys,
00229         assert chr(0) not in keys, "Bad example in test"
00230         try:
00231             rec = rec_dict[chr(0)]
00232             raise ValueError("Accessing a non-existent key should fail")
00233         except KeyError:
00234             pass
00235         self.assertEqual(rec_dict.get(chr(0)), None)
00236         self.assertEqual(rec_dict.get(chr(0), chr(1)), chr(1))
00237         if hasattr(dict, "iteritems"):
00238             #Python 2.x
00239             for key, rec in rec_dict.iteritems():
00240                 self.assertTrue(key in keys)
00241                 self.assertTrue(isinstance(rec, SeqRecord))
00242                 self.assertTrue(rec.id in ids)
00243             #Now check non-defined methods...
00244             self.assertRaises(NotImplementedError, rec_dict.items)
00245             self.assertRaises(NotImplementedError, rec_dict.values)
00246         else:
00247             #Python 3
00248             assert not hasattr(rec_dict, "iteritems")
00249             for key, rec in rec_dict.iteritems():
00250                 self.assertTrue(key in keys)
00251                 self.assertTrue(isinstance(rec, SeqRecord))
00252                 self.assertTrue(rec.id in ids)
00253             for rec in rec_dict.itervalues():
00254                 self.assertTrue(key in keys)
00255                 self.assertTrue(isinstance(rec, SeqRecord))
00256                 self.assertTrue(rec.id in ids)
00257         #Check the following fail
00258         self.assertRaises(NotImplementedError, rec_dict.popitem)
00259         self.assertRaises(NotImplementedError, rec_dict.pop, chr(0))
00260         self.assertRaises(NotImplementedError, rec_dict.pop, chr(0), chr(1))
00261         self.assertRaises(NotImplementedError, rec_dict.clear)
00262         self.assertRaises(NotImplementedError, rec_dict.__setitem__, "X", None)
00263         self.assertRaises(NotImplementedError, rec_dict.copy)
00264         self.assertRaises(NotImplementedError, rec_dict.fromkeys, [])
00265 
00266     def get_raw_check(self, filename, format, alphabet, comp):
00267         #Also checking the key_function here
00268         if comp:
00269             h = gzip.open(filename, "rb")
00270             raw_file = h.read()
00271             h.close()
00272             h = gzip_open(filename, format)
00273             id_list = [rec.id.lower() for rec in \
00274                        SeqIO.parse(h, format, alphabet)]
00275             h.close()
00276         else:
00277             h = open(filename, "rb")
00278             raw_file = h.read()
00279             h.close()
00280             id_list = [rec.id.lower() for rec in \
00281                        SeqIO.parse(filename, format, alphabet)]
00282         rec_dict = SeqIO.index(filename, format, alphabet,
00283                                key_function = lambda x : x.lower())
00284         self.assertEqual(set(id_list), set(rec_dict.keys()))
00285         self.assertEqual(len(id_list), len(rec_dict))
00286         for key in id_list:
00287             self.assertTrue(key in rec_dict)
00288             self.assertEqual(key, rec_dict[key].id.lower())
00289             self.assertEqual(key, rec_dict.get(key).id.lower())
00290             raw = rec_dict.get_raw(key)
00291             self.assertTrue(raw.strip())
00292             self.assertTrue(raw in raw_file)
00293             rec1 = rec_dict[key]
00294             #Following isn't very elegant, but it lets me test the
00295             #__getitem__ SFF code is working.
00296             if format in SeqIO._BinaryFormats:
00297                 handle = BytesIO(raw)
00298             else:
00299                 handle = StringIO(_bytes_to_string(raw))
00300             if format == "sff":
00301                 rec2 = SeqIO.SffIO._sff_read_seq_record(handle,
00302                             rec_dict._proxy._flows_per_read,
00303                             rec_dict._proxy._flow_chars,
00304                             rec_dict._proxy._key_sequence,
00305                             rec_dict._proxy._alphabet,
00306                             trim=False)
00307             elif format == "sff-trim":
00308                 rec2 = SeqIO.SffIO._sff_read_seq_record(handle,
00309                             rec_dict._proxy._flows_per_read,
00310                             rec_dict._proxy._flow_chars,
00311                             rec_dict._proxy._key_sequence,
00312                             rec_dict._proxy._alphabet,
00313                             trim=True)
00314             elif format == "uniprot-xml":
00315                 self.assertTrue(raw.startswith(_as_bytes("<entry ")))
00316                 self.assertTrue(raw.endswith(_as_bytes("</entry>")))
00317                 #Currently the __getitem__ method uses this
00318                 #trick too, but we hope to fix that later
00319                 raw = """<?xml version='1.0' encoding='UTF-8'?>
00320                 <uniprot xmlns="http://uniprot.org/uniprot"
00321                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
00322                 xsi:schemaLocation="http://uniprot.org/uniprot
00323                 http://www.uniprot.org/support/docs/uniprot.xsd">
00324                 %s
00325                 </uniprot>
00326                 """ % _bytes_to_string(raw)
00327                 handle = StringIO(raw)
00328                 rec2 = SeqIO.read(handle, format, alphabet)
00329             else:
00330                 rec2 = SeqIO.read(handle, format, alphabet)
00331             self.assertEqual(True, compare_record(rec1, rec2))
00332         rec_dict._proxy._handle.close() #TODO - Better solution
00333         del rec_dict
00334 
00335     if sqlite3:
00336         def test_duplicates_index_db(self):
00337             """Index file with duplicate identifers with Bio.SeqIO.index_db()"""
00338             self.assertRaises(ValueError, SeqIO.index_db, ":memory:",
00339                               ["Fasta/dups.fasta"], "fasta")
00340 
00341     def test_duplicates_index(self):
00342         """Index file with duplicate identifers with Bio.SeqIO.index()"""
00343         self.assertRaises(ValueError, SeqIO.index, "Fasta/dups.fasta", "fasta")
00344 
00345     def test_duplicates_to_dict(self):
00346         """Index file with duplicate identifers with Bio.SeqIO.to_dict()"""
00347         handle = open("Fasta/dups.fasta", "rU")
00348         iterator = SeqIO.parse(handle, "fasta")
00349         self.assertRaises(ValueError, SeqIO.to_dict, iterator)
00350         handle.close()
00351 
00352 tests = [
00353     ("Ace/contig1.ace", "ace", generic_dna),
00354     ("Ace/consed_sample.ace", "ace", None),
00355     ("Ace/seq.cap.ace", "ace", generic_dna),
00356     ("Quality/wrapping_original_sanger.fastq", "fastq", None),
00357     ("Quality/example.fastq", "fastq", None),
00358     ("Quality/example.fastq", "fastq-sanger", generic_dna),
00359     ("Quality/tricky.fastq", "fastq", generic_nucleotide),
00360     ("Quality/sanger_faked.fastq", "fastq-sanger", generic_dna),
00361     ("Quality/solexa_faked.fastq", "fastq-solexa", generic_dna),
00362     ("Quality/illumina_faked.fastq", "fastq-illumina", generic_dna),
00363     ("EMBL/epo_prt_selection.embl", "embl", None),
00364     ("EMBL/U87107.embl", "embl", None),
00365     ("EMBL/TRBG361.embl", "embl", None),
00366     ("EMBL/A04195.imgt", "embl", None), #Not a proper EMBL file, an IMGT file
00367     ("EMBL/A04195.imgt", "imgt", None),
00368     ("GenBank/NC_000932.faa", "fasta", generic_protein),
00369     ("GenBank/NC_005816.faa", "fasta", generic_protein),
00370     ("GenBank/NC_005816.tsv", "tab", generic_protein),
00371     ("GenBank/NC_005816.ffn", "fasta", generic_dna),
00372     ("GenBank/NC_005816.fna", "fasta", generic_dna),
00373     ("GenBank/NC_005816.gb", "gb", None),
00374     ("GenBank/cor6_6.gb", "genbank", None),
00375     ("IntelliGenetics/vpu_nucaligned.txt", "ig", generic_nucleotide),
00376     ("IntelliGenetics/TAT_mase_nuc.txt", "ig", None),
00377     ("IntelliGenetics/VIF_mase-pro.txt", "ig", generic_protein),
00378     ("Phd/phd1", "phd", generic_dna),
00379     ("Phd/phd2", "phd", None),
00380     ("Phd/phd_solexa", "phd", generic_dna),
00381     ("Phd/phd_454", "phd", generic_dna),
00382     ("NBRF/B_nuc.pir", "pir", generic_nucleotide),
00383     ("NBRF/Cw_prot.pir", "pir", generic_protein),
00384     ("NBRF/clustalw.pir", "pir", None),
00385     ("SwissProt/sp001", "swiss", None),
00386     ("SwissProt/sp010", "swiss", None),
00387     ("SwissProt/sp016", "swiss", None),
00388     ("SwissProt/multi_ex.txt", "swiss", None),
00389     ("SwissProt/multi_ex.xml", "uniprot-xml", None),
00390     ("SwissProt/multi_ex.fasta", "fasta", None),
00391     ("Roche/E3MFGYR02_random_10_reads.sff", "sff", generic_dna),
00392     ("Roche/E3MFGYR02_random_10_reads.sff", "sff-trim", generic_dna),
00393     ("Roche/E3MFGYR02_index_at_start.sff", "sff", generic_dna),
00394     ("Roche/E3MFGYR02_index_in_middle.sff", "sff", generic_dna),
00395     ("Roche/E3MFGYR02_alt_index_at_start.sff", "sff", generic_dna),
00396     ("Roche/E3MFGYR02_alt_index_in_middle.sff", "sff", generic_dna),
00397     ("Roche/E3MFGYR02_alt_index_at_end.sff", "sff", generic_dna),
00398     ("Roche/E3MFGYR02_no_manifest.sff", "sff", generic_dna),
00399     ("Roche/greek.sff", "sff", generic_nucleotide),
00400     ("Roche/greek.sff", "sff-trim", generic_nucleotide),
00401     ("Roche/paired.sff", "sff", None),
00402     ("Roche/paired.sff", "sff-trim", None),
00403     ]
00404 for filename, format, alphabet in tests:
00405     assert format in _FormatToRandomAccess
00406     tasks = [(filename, None)]
00407     if os.path.isfile(filename + ".bgz"):
00408         tasks.append((filename + ".bgz","bgzf"))
00409     for filename, comp in tasks:
00410 
00411         def funct(fn,fmt,alpha,c):
00412             f = lambda x : x.simple_check(fn, fmt, alpha, c)
00413             f.__doc__ = "Index %s file %s defaults" % (fmt, fn)
00414             return f
00415         setattr(IndexDictTests, "test_%s_%s_simple" \
00416                     % (format, filename.replace("/","_").replace(".","_")),
00417                 funct(filename, format, alphabet, comp))
00418         del funct
00419 
00420         def funct(fn,fmt,alpha,c):
00421             f = lambda x : x.key_check(fn, fmt, alpha, c)
00422             f.__doc__ = "Index %s file %s with key function" % (fmt, fn)
00423             return f
00424         setattr(IndexDictTests, "test_%s_%s_keyf" \
00425                     % (format, filename.replace("/","_").replace(".","_")),
00426                 funct(filename, format, alphabet, comp))
00427         del funct
00428 
00429         def funct(fn,fmt,alpha,c):
00430             f = lambda x : x.get_raw_check(fn, fmt, alpha, c)
00431             f.__doc__ = "Index %s file %s get_raw" % (fmt, fn)
00432             return f
00433         setattr(IndexDictTests, "test_%s_%s_get_raw" \
00434                     % (format, filename.replace("/","_").replace(".","_")),
00435                 funct(filename, format, alphabet, comp))
00436         del funct
00437 
00438 if __name__ == "__main__":
00439     runner = unittest.TextTestRunner(verbosity = 2)
00440     unittest.main(testRunner=runner)