Back to index

python-biopython  1.60
test_SeqIO_QualityIO.py
Go to the documentation of this file.
00001 # Copyright 2009-2010 by Peter Cock.  All rights reserved.
00002 # This code is part of the Biopython distribution and governed by its
00003 # license.  Please see the LICENSE file that should have been included
00004 # as part of this package.
00005 
00006 """Additional unit tests for Bio.SeqIO.QualityIO (covering FASTQ and QUAL)."""
00007 import os
00008 import unittest
00009 import warnings
00010 
00011 from StringIO import StringIO
00012 try:
00013     #This is in Python 2.6+, but we need it on Python 3
00014     from io import BytesIO
00015 except ImportError:
00016     BytesIO = StringIO
00017 
00018 from Bio import BiopythonWarning
00019 from Bio.Alphabet import generic_dna
00020 from Bio.SeqIO import QualityIO
00021 from Bio import SeqIO
00022 from Bio.Seq import Seq, UnknownSeq, MutableSeq
00023 from Bio.SeqRecord import SeqRecord
00024 from Bio.Data.IUPACData import ambiguous_dna_letters, ambiguous_rna_letters
00025 
00026 BINARY_FORMATS = ["sff", "sff-trim"]
00027 
00028 def truncation_expected(format):
00029     if format in ["fastq-solexa", "fastq-illumina"] :
00030         return 62
00031     elif format in ["fastq", "fastq-sanger"]:
00032         return 93
00033     else:
00034         assert format in ["fasta", "qual", "phd", "sff"]
00035         return None
00036 
00037 #Top level function as this makes it easier to use for debugging:
00038 def write_read(filename, in_format, out_format):
00039     if in_format in BINARY_FORMATS:
00040         mode = "rb"
00041     else:
00042         mode = "r"
00043     records = list(SeqIO.parse(open(filename, mode),in_format))
00044     #Write it out...
00045     if out_format in BINARY_FORMATS:
00046         handle = BytesIO()
00047     else :
00048         handle = StringIO()
00049     SeqIO.write(records, handle, out_format)
00050     handle.seek(0)
00051     #Now load it back and check it agrees,
00052     records2 = list(SeqIO.parse(handle,out_format))
00053     compare_records(records, records2, truncation_expected(out_format))
00054 
00055 def compare_record(old, new, truncate=None):
00056     """Quality aware SeqRecord comparision.
00057 
00058     This will check the mapping between Solexa and PHRED scores.
00059     It knows to ignore UnknownSeq objects for string matching (i.e. QUAL files).
00060     """
00061     if old.id != new.id:
00062         raise ValueError("'%s' vs '%s' " % (old.id, new.id))
00063     if old.description != new.description \
00064     and (old.id+" "+old.description).strip() != new.description:
00065         raise ValueError("'%s' vs '%s' " % (old.description, new.description))
00066     if len(old.seq) != len(new.seq):
00067         raise ValueError("%i vs %i" % (len(old.seq), len(new.seq)))
00068     if isinstance(old.seq, UnknownSeq) or isinstance(new.seq, UnknownSeq):
00069         pass
00070     elif str(old.seq) != str(new.seq):
00071         if len(old.seq) < 200:
00072             raise ValueError("'%s' vs '%s'" % (old.seq, new.seq))
00073         else:
00074             raise ValueError("'%s...' vs '%s...'" % (old.seq[:100], new.seq[:100]))
00075     if "phred_quality" in old.letter_annotations \
00076     and "phred_quality" in new.letter_annotations \
00077     and old.letter_annotations["phred_quality"] != new.letter_annotations["phred_quality"]:
00078         if truncate and [min(q,truncate) for q in old.letter_annotations["phred_quality"]] == \
00079                         [min(q,truncate) for q in new.letter_annotations["phred_quality"]]:
00080             pass
00081         else:
00082             raise ValuerError("Mismatch in phred_quality")
00083     if "solexa_quality" in old.letter_annotations \
00084     and "solexa_quality" in new.letter_annotations \
00085     and old.letter_annotations["solexa_quality"] != new.letter_annotations["solexa_quality"]:
00086         if truncate and [min(q,truncate) for q in old.letter_annotations["solexa_quality"]] == \
00087                         [min(q,truncate) for q in new.letter_annotations["solexa_quality"]]:
00088             pass
00089         else:
00090             raise ValueError("Mismatch in phred_quality")
00091     if "phred_quality" in old.letter_annotations \
00092     and "solexa_quality" in new.letter_annotations:
00093         #Mapping from Solexa to PHRED is lossy, but so is PHRED to Solexa.
00094         #Assume "old" is the original, and "new" has been converted.
00095         converted = [round(QualityIO.solexa_quality_from_phred(q)) \
00096                      for q in old.letter_annotations["phred_quality"]]
00097         if truncate:
00098             converted = [min(q,truncate) for q in converted]
00099         if converted != new.letter_annotations["solexa_quality"]:
00100             print
00101             print old.letter_annotations["phred_quality"]
00102             print converted
00103             print new.letter_annotations["solexa_quality"]
00104             raise ValueError("Mismatch in phred_quality vs solexa_quality")
00105     if "solexa_quality" in old.letter_annotations \
00106     and "phred_quality" in new.letter_annotations:
00107         #Mapping from Solexa to PHRED is lossy, but so is PHRED to Solexa.
00108         #Assume "old" is the original, and "new" has been converted.
00109         converted = [round(QualityIO.phred_quality_from_solexa(q)) \
00110                      for q in old.letter_annotations["solexa_quality"]]
00111         if truncate:
00112             converted = [min(q,truncate) for q in converted]
00113         if converted != new.letter_annotations["phred_quality"]:
00114             print old.letter_annotations["solexa_quality"]
00115             print converted
00116             print new.letter_annotations["phred_quality"]
00117             raise ValueError("Mismatch in solexa_quality vs phred_quality")
00118     return True
00119 
00120 def compare_records(old_list, new_list, truncate_qual=None):
00121     """Check two lists of SeqRecords agree, raises a ValueError if mismatch."""
00122     if len(old_list) != len(new_list):
00123         raise ValueError("%i vs %i records" % (len(old_list), len(new_list)))
00124     for old, new in zip(old_list, new_list):
00125         if not compare_record(old,new,truncate_qual):
00126             return False
00127     return True
00128 
00129 
00130 class TestFastqErrors(unittest.TestCase):
00131     """Test reject invalid FASTQ files."""
00132     def check_fails(self, filename, good_count, formats=None, raw=True):
00133         if not formats:
00134             formats = ["fastq-sanger", "fastq-solexa", "fastq-illumina"]
00135         for format in formats:
00136             handle = open(filename, "rU")
00137             records = SeqIO.parse(handle, format)
00138             for i in range(good_count):
00139                 record = records.next() #Make sure no errors!
00140                 self.assertTrue(isinstance(record, SeqRecord))
00141             self.assertRaises(ValueError, records.next)
00142             handle.close()
00143 
00144     def check_general_fails(self, filename, good_count):
00145         handle = open(filename, "rU")
00146         tuples = QualityIO.FastqGeneralIterator(handle)
00147         for i in range(good_count):
00148             title, seq, qual = tuples.next() #Make sure no errors!
00149         self.assertRaises(ValueError, tuples.next)
00150         handle.close()
00151 
00152     def check_general_passes(self, filename, record_count):
00153         handle = open(filename, "rU")
00154         tuples = QualityIO.FastqGeneralIterator(handle)
00155         #This "raw" parser doesn't check the ASCII characters which means
00156         #certain invalid FASTQ files will get parsed without errors.
00157         count = 0
00158         for title, seq, qual in tuples:
00159             self.assertEqual(len(seq), len(qual))
00160             count += 1
00161         self.assertEqual(count, record_count)
00162         handle.close()
00163 
00164     def check_all_fail(self, filename, count):
00165         self.check_fails(filename, count)
00166         self.check_general_fails(filename, count)
00167 
00168     def check_qual_char(self, filename, good_count, count):
00169         self.check_fails(filename, good_count)
00170         self.check_general_passes(filename, count)
00171 
00172 #Now add methods at run time... these FASTQ files will be rejected
00173 #by both the low level parser AND the high level SeqRecord parser:
00174 tests = [("diff_ids", 2),
00175          ("no_qual", 0),
00176          ("long_qual", 3),
00177          ("short_qual", 2),
00178          ("double_seq", 3),
00179          ("double_qual", 2),
00180          ("tabs", 0),
00181          ("spaces", 0),
00182          ("trunc_in_title", 4),
00183          ("trunc_in_seq", 4),
00184          ("trunc_in_plus", 4),
00185          ("trunc_in_qual", 4),
00186          ("trunc_at_seq", 4),
00187          ("trunc_at_plus", 4),
00188          ("trunc_at_qual", 4)]
00189 for base_name, good_count in tests:
00190     def funct(name,c):
00191         f = lambda x : x.check_all_fail("Quality/error_%s.fastq" % name,c)
00192         f.__doc__ = "Reject FASTQ with %s" % name.replace("_"," ")
00193         return f
00194     setattr(TestFastqErrors, "test_%s" % (base_name),
00195             funct(base_name, good_count))
00196     del funct        
00197 
00198 #Now add methods for FASTQ files which will be rejected by the high
00199 #level SeqRecord parser, but will be accepted by the low level parser:
00200 tests = [("del", 3, 5),
00201          ("space", 3, 5),
00202          ("vtab", 0, 5),
00203          ("escape", 4, 5),
00204          ("unit_sep", 2, 5),
00205          ("tab", 4, 5),
00206          ("null", 0, 5)]
00207 for base_name, good_count, full_count in tests:
00208     def funct(name,c1,c2):
00209         f = lambda x : x.check_qual_char("Quality/error_qual_%s.fastq"%name,c1,c2)
00210         f.__doc__ = "Reject FASTQ with %s in quality" % name.replace("_"," ")
00211         return f
00212     setattr(TestFastqErrors, "test_qual_%s" % (base_name),
00213             funct(base_name, good_count, full_count))
00214     del funct        
00215 
00216 
00217 class TestReferenceSffConversions(unittest.TestCase):
00218     def check(self, sff_name, sff_format, out_name, format) :
00219         wanted = list(SeqIO.parse(open(out_name), format))
00220         data = StringIO()
00221         count = SeqIO.convert(sff_name, sff_format, data, format)
00222         self.assertEqual(count, len(wanted))
00223         data.seek(0)
00224         converted = list(SeqIO.parse(data, format))
00225         self.assertEqual(len(wanted), len(converted))
00226         for old, new in zip(wanted, converted) :
00227             self.assertEqual(old.id, new.id)
00228             self.assertEqual(old.name, new.name)
00229             if format!="qual" :
00230                 self.assertEqual(str(old.seq), str(new.seq))
00231             elif format!="fasta" :
00232                 self.assertEqual(old.letter_annotations["phred_quality"],
00233                                  new.letter_annotations["phred_quality"])
00234 
00235     def check_sff(self, sff_name):
00236         self.check(sff_name, "sff", "Roche/E3MFGYR02_random_10_reads_no_trim.fasta", "fasta")
00237         self.check(sff_name, "sff", "Roche/E3MFGYR02_random_10_reads_no_trim.qual", "qual")
00238         self.check(sff_name, "sff-trim", "Roche/E3MFGYR02_random_10_reads.fasta", "fasta")
00239         self.check(sff_name, "sff-trim", "Roche/E3MFGYR02_random_10_reads.qual", "qual")
00240 
00241     def test_original(self) :
00242         """Test converting E3MFGYR02_random_10_reads.sff into FASTA+QUAL"""
00243         self.check_sff("Roche/E3MFGYR02_random_10_reads.sff")
00244         
00245     def test_no_manifest(self) :
00246         """Test converting E3MFGYR02_no_manifest.sff into FASTA+QUAL"""
00247         self.check_sff("Roche/E3MFGYR02_no_manifest.sff")
00248         
00249     def test_alt_index_at_start(self) :
00250         """Test converting E3MFGYR02_alt_index_at_start into FASTA+QUAL"""
00251         self.check_sff("Roche/E3MFGYR02_alt_index_at_start.sff")
00252 
00253     def test_alt_index_in_middle(self) :
00254         """Test converting E3MFGYR02_alt_index_in_middle into FASTA+QUAL"""
00255         self.check_sff("Roche/E3MFGYR02_alt_index_in_middle.sff")
00256 
00257     def test_alt_index_at_end(self) :
00258         """Test converting E3MFGYR02_alt_index_at_end into FASTA+QUAL"""
00259         self.check_sff("Roche/E3MFGYR02_alt_index_at_end.sff")
00260 
00261     def test_index_at_start(self) :
00262         """Test converting E3MFGYR02_index_at_start into FASTA+QUAL"""
00263         self.check_sff("Roche/E3MFGYR02_index_at_start.sff")
00264 
00265     def test_index_at_end(self) :
00266         """Test converting E3MFGYR02_index_in_middle into FASTA+QUAL"""
00267         self.check_sff("Roche/E3MFGYR02_index_in_middle.sff")
00268 
00269 class TestReferenceFastqConversions(unittest.TestCase):
00270     """Tests where we have reference output."""
00271     def simple_check(self, base_name, in_variant):
00272         for out_variant in ["sanger", "solexa", "illumina"]:
00273             if out_variant != "sanger":
00274                 #Ignore data loss warnings from max qualities
00275                 warnings.simplefilter('ignore', BiopythonWarning)
00276             in_filename = "Quality/%s_original_%s.fastq" \
00277                           % (base_name, in_variant)
00278             self.assertTrue(os.path.isfile(in_filename))
00279             #Load the reference output...  
00280             expected = open("Quality/%s_as_%s.fastq" \
00281                             % (base_name, out_variant),
00282                             "rU").read()
00283             #Check matches using convert...
00284             handle = StringIO()
00285             SeqIO.convert(in_filename, "fastq-"+in_variant,
00286                           handle, "fastq-"+out_variant)
00287             self.assertEqual(expected, handle.getvalue())
00288             #Check matches using parse/write
00289             handle = StringIO()
00290             SeqIO.write(SeqIO.parse(open(in_filename), "fastq-"+in_variant),
00291                         handle, "fastq-"+out_variant)
00292             self.assertEqual(expected, handle.getvalue())
00293             if out_variant != "sanger":
00294                 warnings.filters.pop()
00295 
00296 #Now add methods at run time...
00297 tests = [("illumina_full_range", "illumina"),
00298          ("sanger_full_range", "sanger"),
00299          ("longreads", "sanger"),
00300          ("solexa_full_range", "solexa"),
00301          ("misc_dna", "sanger"),
00302          ("wrapping", "sanger"),
00303          ("misc_rna", "sanger")]
00304 for base_name, variant in tests:
00305     assert variant in ["sanger", "solexa", "illumina"]
00306     def funct(bn,var):
00307         f = lambda x : x.simple_check(bn,var)
00308         f.__doc__ = "Reference conversions of %s file %s" % (var, bn)
00309         return f
00310     setattr(TestReferenceFastqConversions, "test_%s_%s" % (base_name, variant),
00311             funct(base_name, variant))
00312     del funct        
00313 
00314 class TestQual(unittest.TestCase):
00315     """Tests with QUAL files."""
00316     def test_paired(self):
00317         """Check FASTQ parsing matches FASTA+QUAL parsing"""
00318         records1 = list(\
00319             QualityIO.PairedFastaQualIterator(open("Quality/example.fasta"),
00320                                               open("Quality/example.qual")))
00321         records2 = list(SeqIO.parse(open("Quality/example.fastq"),"fastq"))
00322         self.assertTrue(compare_records(records1, records2))
00323 
00324     def test_qual(self):
00325         """Check FASTQ parsing matches QUAL parsing"""
00326         records1 = list(SeqIO.parse(open("Quality/example.qual"),"qual"))
00327         records2 = list(SeqIO.parse(open("Quality/example.fastq"),"fastq"))
00328         #Will ignore the unknown sequences :)
00329         self.assertTrue(compare_records(records1, records2))
00330 
00331     def test_qual_out(self):
00332         """Check FASTQ to QUAL output"""
00333         records = SeqIO.parse(open("Quality/example.fastq"),"fastq")
00334         h = StringIO("")
00335         SeqIO.write(records, h, "qual")
00336         self.assertEqual(h.getvalue(),open("Quality/example.qual").read())
00337 
00338     def test_fasta(self):
00339         """Check FASTQ parsing matches FASTA parsing"""
00340         records1 = list(SeqIO.parse(open("Quality/example.fasta"),"fasta"))
00341         records2 = list(SeqIO.parse(open("Quality/example.fastq"),"fastq"))
00342         self.assertTrue(compare_records(records1, records2))
00343 
00344     def test_fasta_out(self):
00345         """Check FASTQ to FASTA output"""
00346         records = SeqIO.parse(open("Quality/example.fastq"),"fastq")
00347         h = StringIO("")
00348         SeqIO.write(records, h, "fasta")
00349         self.assertEqual(h.getvalue(),open("Quality/example.fasta").read())
00350 
00351     def test_qual_negative(self):
00352         """Check QUAL negative scores mapped to PHRED zero"""
00353         data = """>1117_10_107_F3
00354 23 31 -1 -1 -1 29 -1 -1 20 32 -1 18 25 7 -1 6 -1 -1 -1 30 -1 20 13 7 -1 -1 21 30 -1 24 -1 22 -1 -1 22 14 -1 12 26 21 -1 5 -1 -1 -1 20 -1 -1 12 28 
00355 >1117_10_146_F3
00356 20 33 -1 -1 -1 29 -1 -1 28 28 -1 7 16 5 -1 30 -1 -1 -1 14 -1 4 13 4 -1 -1 11 13 -1 5 -1 7 -1 -1 10 16 -1 4 12 15 -1 8 -1 -1 -1 16 -1 -1 10 4 
00357 >1117_10_1017_F3
00358 33 33 -1 -1 -1 27 -1 -1 17 16 -1 28 24 11 -1 6 -1 -1 -1 29 -1 8 29 24 -1 -1 8 8 -1 20 -1 13 -1 -1 8 13 -1 28 10 24 -1 10 -1 -1 -1 4 -1 -1 7 6 
00359 >1117_11_136_F3
00360 16 22 -1 -1 -1 33 -1 -1 30 27 -1 27 28 32 -1 29 -1 -1 -1 27 -1 18 9 6 -1 -1 23 16 -1 26 -1 5 7 -1 22 7 -1 18 14 8 -1 8 -1 -1 -1 11 -1 -1 4 24"""
00361         h = StringIO(data)
00362         h2 = StringIO()
00363         self.assertEqual(4, SeqIO.convert(h, "qual", h2, "fastq"))
00364         self.assertEqual(h2.getvalue(), """@1117_10_107_F3
00365 ??????????????????????????????????????????????????
00366 +
00367 8@!!!>!!5A!3:(!'!!!?!5.(!!6?!9!7!!7/!-;6!&!!!5!!-=
00368 @1117_10_146_F3
00369 ??????????????????????????????????????????????????
00370 +
00371 5B!!!>!!==!(1&!?!!!/!%.%!!,.!&!(!!+1!%-0!)!!!1!!+%
00372 @1117_10_1017_F3
00373 ??????????????????????????????????????????????????
00374 +
00375 BB!!!<!!21!=9,!'!!!>!)>9!!))!5!.!!).!=+9!+!!!%!!('
00376 @1117_11_136_F3
00377 ??????????????????????????????????????????????????
00378 +
00379 17!!!B!!?<!<=A!>!!!<!3*'!!81!;!&(!7(!3/)!)!!!,!!%9
00380 """)
00381 
00382 
00383 
00384 class TestReadWrite(unittest.TestCase):
00385     """Test can read and write back files."""
00386     def test_fastq_2000(self):
00387         """Read and write back simple example with upper case 2000bp read"""
00388         data = "@%s\n%s\n+\n%s\n" \
00389                % ("id descr goes here", "ACGT"*500, "!@a~"*500)
00390         handle = StringIO("")
00391         self.assertEqual(1, SeqIO.write(SeqIO.parse(StringIO(data), "fastq"), handle, "fastq"))
00392         self.assertEqual(data, handle.getvalue())
00393 
00394     def test_fastq_1000(self):
00395         """Read and write back simple example with mixed case 1000bp read"""
00396         data = "@%s\n%s\n+\n%s\n" \
00397                % ("id descr goes here", "ACGTNncgta"*100, "abcd!!efgh"*100)
00398         handle = StringIO("")
00399         self.assertEqual(1, SeqIO.write(SeqIO.parse(StringIO(data), "fastq"), handle, "fastq"))
00400         self.assertEqual(data, handle.getvalue())
00401 
00402     def test_fastq_dna(self):
00403         """Read and write back simple example with ambiguous DNA"""
00404         #First in upper case...        
00405         data = "@%s\n%s\n+\n%s\n" \
00406                % ("id descr goes here",
00407                   ambiguous_dna_letters.upper(),
00408                   "".join(chr(33+q) for q in range(len(ambiguous_dna_letters))))
00409         handle = StringIO("")
00410         self.assertEqual(1, SeqIO.write(SeqIO.parse(StringIO(data), "fastq"), handle, "fastq"))
00411         self.assertEqual(data, handle.getvalue())
00412         #Now in lower case...
00413         data = "@%s\n%s\n+\n%s\n" \
00414                % ("id descr goes here",
00415                   ambiguous_dna_letters.lower(),
00416                   "".join(chr(33+q) for q in range(len(ambiguous_dna_letters))))
00417         handle = StringIO("")
00418         self.assertEqual(1, SeqIO.write(SeqIO.parse(StringIO(data), "fastq"), handle, "fastq"))
00419         self.assertEqual(data, handle.getvalue())
00420 
00421     def test_fastq_rna(self):
00422         """Read and write back simple example with ambiguous RNA"""
00423         #First in upper case...        
00424         data = "@%s\n%s\n+\n%s\n" \
00425                % ("id descr goes here",
00426                   ambiguous_rna_letters.upper(),
00427                   "".join(chr(33+q) for q in range(len(ambiguous_rna_letters))))
00428         handle = StringIO("")
00429         self.assertEqual(1, SeqIO.write(SeqIO.parse(StringIO(data), "fastq"), handle, "fastq"))
00430         self.assertEqual(data, handle.getvalue())
00431         #Now in lower case...
00432         data = "@%s\n%s\n+\n%s\n" \
00433                % ("id descr goes here",
00434                   ambiguous_rna_letters.lower(),
00435                   "".join(chr(33+q) for q in range(len(ambiguous_rna_letters))))
00436         handle = StringIO("")
00437         self.assertEqual(1, SeqIO.write(SeqIO.parse(StringIO(data), "fastq"), handle, "fastq"))
00438         self.assertEqual(data, handle.getvalue())
00439 
00440 
00441 class TestWriteRead(unittest.TestCase):
00442     """Test can write and read back files."""
00443     def test_generated(self):
00444         """Write and read back odd SeqRecord objects"""
00445         record1 = SeqRecord(Seq("ACGT"*500, generic_dna),  id="Test", description="Long "*500,
00446                            letter_annotations={"phred_quality":[40,30,20,10]*500})
00447         record2 = SeqRecord(MutableSeq("NGGC"*1000),  id="Mut", description="very "*1000+"long",
00448                            letter_annotations={"phred_quality":[0,5,5,10]*1000})
00449         record3 = SeqRecord(UnknownSeq(2000,character="N"),  id="Unk", description="l"+("o"*1000)+"ng",
00450                            letter_annotations={"phred_quality":[0,1]*1000})
00451         record4 = SeqRecord(Seq("ACGT"*500),  id="no_descr", description="", name="",
00452                            letter_annotations={"phred_quality":[40,50,60,62]*500})
00453         record5 = SeqRecord(Seq("",generic_dna),  id="empty_p", description="(could have been trimmed lots)",
00454                            letter_annotations={"phred_quality":[]})
00455         record6 = SeqRecord(Seq(""),  id="empty_s", description="(could have been trimmed lots)",
00456                            letter_annotations={"solexa_quality":[]})
00457         record7 = SeqRecord(Seq("ACNN"*500),  id="Test_Sol", description="Long "*500,
00458                            letter_annotations={"solexa_quality":[40,30,0,-5]*500})
00459         record8 = SeqRecord(Seq("ACGT"),  id="HighQual", description="With very large qualities that even Sanger FASTQ can't hold!",
00460                            letter_annotations={"solexa_quality":[0,10,100,1000]})
00461         #TODO - Record with no identifier?
00462         records = [record1, record2, record3, record4, record5, record6, record7, record8]
00463         #TODO - Have a Biopython defined "DataLossWarning?"
00464         warnings.simplefilter('ignore', BiopythonWarning)
00465         #TODO - Include phd output?
00466         for format in ["fasta", "fastq", "fastq-solexa", "fastq-illumina", "qual"]:
00467             handle = StringIO()
00468             SeqIO.write(records, handle, format)
00469             handle.seek(0)
00470             compare_records(records,
00471                             list(SeqIO.parse(handle, format)),
00472                             truncation_expected(format))
00473         warnings.filters.pop()
00474             
00475     def check(self, filename, format, out_formats):
00476         for f in out_formats:
00477             write_read(filename, format, f)
00478 
00479     def test_tricky(self):
00480         """Write and read back tricky.fastq"""
00481         self.check(os.path.join("Quality", "tricky.fastq"), "fastq",
00482                    ["fastq", "fastq-sanger", "fastq-illumina", "fastq-solexa",
00483                     "fasta", "qual", "phd"])
00484 
00485     def test_sanger_93(self):
00486         """Write and read back sanger_93.fastq"""
00487         self.check(os.path.join("Quality", "sanger_93.fastq"), "fastq",
00488                    ["fastq", "fastq-sanger", "fasta", "qual", "phd"])
00489         #TODO - Have a Biopython defined "DataLossWarning?"
00490         #TODO - On Python 2.6+ we can check this warning is really triggered
00491         warnings.simplefilter('ignore', BiopythonWarning)
00492         self.check(os.path.join("Quality", "sanger_93.fastq"), "fastq",
00493                    ["fastq-solexa","fastq-illumina"])
00494         warnings.filters.pop()
00495 
00496     def test_sanger_faked(self):
00497         """Write and read back sanger_faked.fastq"""
00498         self.check(os.path.join("Quality", "sanger_faked.fastq"), "fastq",
00499                    ["fastq", "fastq-sanger", "fastq-illumina", "fastq-solexa",
00500                     "fasta", "qual", "phd"])
00501 
00502     def test_example_fasta(self):
00503         """Write and read back example.fasta"""
00504         write_read(os.path.join("Quality", "example.fasta"), "fasta", "fasta")
00505         #TODO - tests to check can't write FASTQ or QUAL...
00506 
00507     def test_example_fastq(self):
00508         """Write and read back example.fastq"""
00509         self.check(os.path.join("Quality", "example.fastq"), "fastq",
00510                    ["fastq", "fastq-sanger", "fastq-illumina", "fastq-solexa",
00511                     "fasta", "qual", "phd"])
00512 
00513     def test_example_qual(self):
00514         """Write and read back example.qual"""
00515         self.check(os.path.join("Quality", "example.qual"), "qual",
00516                    ["fastq", "fastq-sanger", "fastq-illumina", "fastq-solexa",
00517                     "fasta", "qual", "phd"])
00518 
00519     def test_solexa_faked(self):
00520         """Write and read back solexa_faked.fastq"""
00521         self.check(os.path.join("Quality", "solexa_faked.fastq"), "fastq-solexa",
00522                    ["fastq", "fastq-sanger", "fastq-illumina", "fastq-solexa",
00523                     "fasta", "qual", "phd"])
00524 
00525     def test_solexa_example(self):
00526         """Write and read back solexa_example.fastq"""
00527         self.check(os.path.join("Quality", "solexa_example.fastq"), "fastq-solexa",
00528                    ["fastq", "fastq-sanger", "fastq-illumina", "fastq-solexa",
00529                     "fasta", "qual", "phd"])
00530 
00531     def test_illumina_faked(self):
00532         """Write and read back illumina_faked.fastq"""
00533         self.check(os.path.join("Quality", "illumina_faked.fastq"), "fastq-illumina",
00534                    ["fastq", "fastq-sanger", "fastq-illumina", "fastq-solexa",
00535                     "fasta", "qual", "phd"])
00536 
00537     def test_greek_sff(self) :
00538         """Write and read back greek.sff"""
00539         self.check(os.path.join("Roche", "greek.sff"), "sff",
00540                    ["fastq", "fastq-sanger", "fastq-illumina", "fastq-solexa",
00541                     "fasta", "qual", "phd", "sff"])
00542 
00543     def test_paired_sff(self) :
00544         """Write and read back paired.sff"""
00545         self.check(os.path.join("Roche", "paired.sff"), "sff",
00546                    ["fastq", "fastq-sanger", "fastq-illumina", "fastq-solexa",
00547                     "fasta", "qual", "phd", "sff"])
00548 
00549     def test_E3MFGYR02(self) :
00550         """Write and read back E3MFGYR02_random_10_reads.sff"""
00551         self.check(os.path.join("Roche", "E3MFGYR02_random_10_reads.sff"), "sff",
00552                    ["fastq", "fastq-sanger", "fastq-illumina", "fastq-solexa",
00553                     "fasta", "qual", "phd", "sff"])
00554 
00555     def test_E3MFGYR02_no_manifest(self) :
00556         """Write and read back E3MFGYR02_no_manifest.sff"""
00557         self.check(os.path.join("Roche", "E3MFGYR02_no_manifest.sff"), "sff",
00558                    ["fastq", "fastq-sanger", "fastq-illumina", "fastq-solexa",
00559                     "fasta", "qual", "phd", "sff"])
00560 
00561     def test_E3MFGYR02_index_at_start(self) :
00562         """Write and read back E3MFGYR02_index_at_start.sff"""
00563         self.check(os.path.join("Roche", "E3MFGYR02_index_at_start.sff"), "sff",
00564                    ["fastq", "fastq-sanger", "fastq-illumina", "fastq-solexa",
00565                     "fasta", "qual", "phd", "sff"])
00566 
00567     def test_E3MFGYR02_index_in_middle(self) :
00568         """Write and read back E3MFGYR02_index_in_middle.sff"""
00569         self.check(os.path.join("Roche", "E3MFGYR02_index_in_middle.sff"), "sff",
00570                    ["fastq", "fastq-sanger", "fastq-illumina", "fastq-solexa",
00571                     "fasta", "qual", "phd", "sff"])
00572 
00573     def test_E3MFGYR02_alt_index_at_start(self) :
00574         """Write and read back E3MFGYR02_alt_index_at_start.sff"""
00575         self.check(os.path.join("Roche", "E3MFGYR02_alt_index_at_start.sff"), "sff",
00576                    ["fastq", "fastq-sanger", "fastq-illumina", "fastq-solexa",
00577                     "fasta", "qual", "phd", "sff"])
00578 
00579     def test_E3MFGYR02_alt_index_in_middle(self) :
00580         """Write and read back E3MFGYR02_alt_index_in_middle.sff"""
00581         self.check(os.path.join("Roche", "E3MFGYR02_alt_index_in_middle.sff"), "sff",
00582                    ["fastq", "fastq-sanger", "fastq-illumina", "fastq-solexa",
00583                     "fasta", "qual", "phd", "sff"])
00584 
00585     def test_E3MFGYR02_alt_index_at_end(self) :
00586         """Write and read back E3MFGYR02_alt_index_at_end.sff"""
00587         self.check(os.path.join("Roche", "E3MFGYR02_alt_index_at_end.sff"), "sff",
00588                    ["fastq", "fastq-sanger", "fastq-illumina", "fastq-solexa",
00589                     "fasta", "qual", "phd", "sff"])
00590 
00591     def test_E3MFGYR02_trimmed(self) :
00592         """Write and read back E3MFGYR02_random_10_reads.sff (trimmed)"""
00593         self.check(os.path.join("Roche", "E3MFGYR02_random_10_reads.sff"), "sff-trim",
00594                    ["fastq", "fastq-sanger", "fastq-illumina", "fastq-solexa",
00595                     "fasta", "qual", "phd"]) #not sff as output
00596 
00597 
00598 class MappingTests(unittest.TestCase):
00599     def test_solexa_quality_from_phred(self):
00600         """Mapping check for function solexa_quality_from_phred"""
00601         self.assertEqual(-5, round(QualityIO.solexa_quality_from_phred(0)))
00602         self.assertEqual(-5, round(QualityIO.solexa_quality_from_phred(1)))
00603         self.assertEqual(-2, round(QualityIO.solexa_quality_from_phred(2)))
00604         self.assertEqual(0, round(QualityIO.solexa_quality_from_phred(3)))
00605         self.assertEqual(2, round(QualityIO.solexa_quality_from_phred(4)))
00606         self.assertEqual(3, round(QualityIO.solexa_quality_from_phred(5)))
00607         self.assertEqual(5, round(QualityIO.solexa_quality_from_phred(6)))
00608         self.assertEqual(6, round(QualityIO.solexa_quality_from_phred(7)))
00609         self.assertEqual(7, round(QualityIO.solexa_quality_from_phred(8)))
00610         self.assertEqual(8, round(QualityIO.solexa_quality_from_phred(9)))
00611         for i in range(10,100):
00612             self.assertEqual(i, round(QualityIO.solexa_quality_from_phred(i)))
00613         
00614     def test_phred_quality_from_solexa(self):
00615         """Mapping check for function phred_quality_from_solexa"""
00616         self.assertEqual(1, round(QualityIO.phred_quality_from_solexa(-5)))
00617         self.assertEqual(1, round(QualityIO.phred_quality_from_solexa(-4)))
00618         self.assertEqual(2, round(QualityIO.phred_quality_from_solexa(-3)))
00619         self.assertEqual(2, round(QualityIO.phred_quality_from_solexa(-2)))
00620         self.assertEqual(3, round(QualityIO.phred_quality_from_solexa(-1)))
00621         self.assertEqual(3, round(QualityIO.phred_quality_from_solexa(0)))
00622         self.assertEqual(4, round(QualityIO.phred_quality_from_solexa(1)))
00623         self.assertEqual(4, round(QualityIO.phred_quality_from_solexa(2)))
00624         self.assertEqual(5, round(QualityIO.phred_quality_from_solexa(3)))
00625         self.assertEqual(5, round(QualityIO.phred_quality_from_solexa(4)))
00626         self.assertEqual(6, round(QualityIO.phred_quality_from_solexa(5)))
00627         self.assertEqual(7, round(QualityIO.phred_quality_from_solexa(6)))
00628         self.assertEqual(8, round(QualityIO.phred_quality_from_solexa(7)))
00629         self.assertEqual(9, round(QualityIO.phred_quality_from_solexa(8)))
00630         self.assertEqual(10, round(QualityIO.phred_quality_from_solexa(9)))
00631         for i in range(10,100):
00632             self.assertEqual(i, round(QualityIO.phred_quality_from_solexa(i)))
00633 
00634     def test_sanger_to_solexa(self):
00635         """Mapping check for FASTQ Sanger (0 to 93) to Solexa (-5 to 62)"""
00636         #The point of this test is the writing code doesn't actually use the
00637         #solexa_quality_from_phred function directly. For speed it uses a
00638         #cached dictionary of the mappings.
00639         seq = "N"*94
00640         qual = "".join(chr(33+q) for q in range(0,94))
00641         expected_sol = [min(62,int(round(QualityIO.solexa_quality_from_phred(q)))) \
00642                         for q in range(0,94)]
00643         in_handle = StringIO("@Test\n%s\n+\n%s" % (seq,qual))
00644         out_handle = StringIO("")
00645         #Want to ignore the data loss warning
00646         #(on Python 2.6 we could check for it!)
00647         warnings.simplefilter('ignore', BiopythonWarning)
00648         SeqIO.write(SeqIO.parse(in_handle, "fastq-sanger"),
00649                     out_handle, "fastq-solexa")
00650         warnings.filters.pop()
00651         out_handle.seek(0)
00652         record = SeqIO.read(out_handle, "fastq-solexa")
00653         self.assertEqual(str(record.seq), seq)
00654         self.assertEqual(record.letter_annotations["solexa_quality"],
00655                          expected_sol)
00656 
00657     def test_solexa_to_sanger(self):
00658         """Mapping check for FASTQ Solexa (-5 to 62) to Sanger (0 to 62)"""
00659         #The point of this test is the writing code doesn't actually use the
00660         #solexa_quality_from_phred function directly. For speed it uses a
00661         #cached dictionary of the mappings.
00662         seq = "N"*68
00663         qual = "".join(chr(64+q) for q in range(-5,63))
00664         expected_phred = [round(QualityIO.phred_quality_from_solexa(q)) \
00665                           for q in range(-5,63)]
00666         in_handle = StringIO("@Test\n%s\n+\n%s" % (seq,qual))
00667         out_handle = StringIO("")
00668         #Want to ignore the data loss warning
00669         #(on Python 2.6 we could check for it!)
00670         warnings.simplefilter('ignore', BiopythonWarning)
00671         SeqIO.write(SeqIO.parse(in_handle, "fastq-solexa"),
00672                     out_handle, "fastq-sanger")
00673         warnings.filters.pop()
00674         out_handle.seek(0)
00675         record = SeqIO.read(out_handle, "fastq-sanger")
00676         self.assertEqual(str(record.seq), seq)
00677         self.assertEqual(record.letter_annotations["phred_quality"],
00678                          expected_phred)
00679 
00680     def test_sanger_to_illumina(self):
00681         """Mapping check for FASTQ Sanger (0 to 93) to Illumina (0 to 62)"""
00682         seq = "N"*94
00683         qual = "".join(chr(33+q) for q in range(0,94))
00684         expected_phred = [min(62,q) for q in range(0,94)]
00685         in_handle = StringIO("@Test\n%s\n+\n%s" % (seq,qual))
00686         out_handle = StringIO("")
00687         #Want to ignore the data loss warning
00688         #(on Python 2.6 we could check for it!)
00689         warnings.simplefilter('ignore', BiopythonWarning)
00690         SeqIO.write(SeqIO.parse(in_handle, "fastq-sanger"),
00691                     out_handle, "fastq-illumina")
00692         warnings.filters.pop()
00693         out_handle.seek(0)
00694         record = SeqIO.read(out_handle, "fastq-illumina")
00695         self.assertEqual(str(record.seq), seq)
00696         self.assertEqual(record.letter_annotations["phred_quality"],
00697                          expected_phred)
00698 
00699     def test_illumina_to_sanger(self):
00700         """Mapping check for FASTQ Illumina (0 to 62) to Sanger (0 to 62)"""
00701         seq = "N"*63
00702         qual = "".join(chr(64+q) for q in range(0,63))
00703         expected_phred = range(63)
00704         in_handle = StringIO("@Test\n%s\n+\n%s" % (seq,qual))
00705         out_handle = StringIO("")
00706         SeqIO.write(SeqIO.parse(in_handle, "fastq-illumina"),
00707                     out_handle, "fastq-sanger")
00708         out_handle.seek(0)
00709         record = SeqIO.read(out_handle, "fastq-sanger")
00710         self.assertEqual(str(record.seq), seq)
00711         self.assertEqual(record.letter_annotations["phred_quality"],
00712                          expected_phred)
00713 
00714 
00715 if __name__ == "__main__":
00716     runner = unittest.TextTestRunner(verbosity = 2)
00717     unittest.main(testRunner=runner)