Back to index

python-biopython  1.60
test_Emboss.py
Go to the documentation of this file.
00001 # Copyright 2009 by Peter Cock.  All rights reserved.
00002 # This code is part of the Biopython distribution and governed by its
00003 # license.  Please see the LICENSE file that should have been included
00004 # as part of this package.
00005 """Runs a few EMBOSS tools to check our wrappers and parsers."""
00006 
00007 import os
00008 import sys
00009 import unittest
00010 import subprocess
00011 from StringIO import StringIO
00012 
00013 from Bio.Emboss.Applications import WaterCommandline, NeedleCommandline
00014 from Bio.Emboss.Applications import SeqretCommandline, SeqmatchallCommandline
00015 from Bio import SeqIO
00016 from Bio import AlignIO
00017 from Bio import MissingExternalDependencyError
00018 from Bio.Alphabet import generic_protein, generic_dna, generic_nucleotide
00019 from Bio.Seq import Seq, translate
00020 from Bio.SeqRecord import SeqRecord
00021 #from Bio.Data.IUPACData import ambiguous_dna_letters
00022 
00023 #################################################################
00024 
00025 #Try to avoid problems when the OS is in another language
00026 os.environ['LANG'] = 'C'
00027 
00028 exes_wanted = ["water", "needle", "seqret", "transeq", "seqmatchall",
00029                "embossversion"]
00030 exes = dict() #Dictionary mapping from names to exe locations
00031 
00032 if "EMBOSS_ROOT" in os.environ:
00033     #Windows default installation path is C:\mEMBOSS which contains the exes.
00034     #EMBOSS also sets an environment variable which we will check for.
00035     path = os.environ["EMBOSS_ROOT"]
00036     if os.path.isdir(path):
00037         for name in exes_wanted:
00038             if os.path.isfile(os.path.join(path, name+".exe")):
00039                 exes[name] = os.path.join(path, name+".exe")
00040     del path, name
00041 if sys.platform!="win32":
00042     import commands
00043     for name in exes_wanted:
00044         #This will "just work" if installed on the path as normal on Unix
00045         output = commands.getoutput("%s -help" % name)
00046         if "not found" not in output and "not recognized" not in output:
00047             exes[name] = name
00048         del output
00049     del name
00050 
00051 if len(exes) < len(exes_wanted):
00052     raise MissingExternalDependencyError(\
00053         "Install EMBOSS if you want to use Bio.Emboss.")
00054 
00055 def get_emboss_version():
00056     """Returns a tuple of three ints, e.g. (6,1,0)"""
00057     #Windows and Unix versions of EMBOSS seem to differ in
00058     #which lines go to stdout and stderr - so merge them.
00059     child = subprocess.Popen(exes["embossversion"],
00060                              stdout=subprocess.PIPE,
00061                              stderr=subprocess.STDOUT,
00062                              universal_newlines=True,
00063                              shell=(sys.platform!="win32"))
00064     stdout, stderr = child.communicate()
00065     child.stdout.close() #This is both stdout and stderr
00066     del child
00067     assert stderr is None #Send to stdout instead
00068     for line in stdout.split("\n"):
00069         if line.strip()=="Reports the current EMBOSS version number":
00070             pass
00071         elif line.startswith("Writes the current EMBOSS version number"):
00072             pass
00073         elif line.count(".")==2:
00074             return tuple(int(v) for v in line.strip().split("."))
00075         elif line.count(".")==3:
00076             #e.g. I installed mEMBOSS-6.2.0.1-setup.exe
00077             #which reports 6.2.0.1 - for this return (6,2,0)
00078             return tuple(int(v) for v in line.strip().split("."))[:3]
00079         else:
00080             #Either we can't understand the output, or this is really
00081             #an error message not caught earlier (e.g. not in English)
00082             raise MissingExternalDependencyError(\
00083                 "Install EMBOSS if you want to use Bio.Emboss (%s)." \
00084                 % line)
00085             
00086 #To avoid confusing known errors from old versions of EMBOSS ...
00087 emboss_version = get_emboss_version()
00088 if emboss_version < (6,1,0):
00089     raise MissingExternalDependencyError(\
00090         "Test requires EMBOSS 6.1.0 patch 3 or later.")
00091     
00092 
00093 #################################################################
00094 
00095 #Top level function as this makes it easier to use for debugging:
00096 def emboss_convert(filename, old_format, new_format):
00097     """Run seqret, returns handle."""
00098     #Setup, this assumes for all the format names used
00099     #Biopython and EMBOSS names are consistent!
00100     cline = SeqretCommandline(exes["seqret"],
00101                               sequence = filename,
00102                               sformat = old_format,
00103                               osformat = new_format,
00104                               auto = True, #no prompting
00105                               stdout = True)
00106     #Run the tool,
00107     child = subprocess.Popen(str(cline),
00108                              stdin=subprocess.PIPE,
00109                              stdout=subprocess.PIPE,
00110                              stderr=subprocess.PIPE,
00111                              universal_newlines=True,
00112                              shell=(sys.platform!="win32"))
00113     child.stdin.close()
00114     child.stderr.close()
00115     return child.stdout
00116 
00117 #Top level function as this makes it easier to use for debugging:
00118 def emboss_piped_SeqIO_convert(records, old_format, new_format):
00119     """Run seqret, returns records (as a generator)."""
00120     #Setup, this assumes for all the format names used
00121     #Biopython and EMBOSS names are consistent!
00122     cline = SeqretCommandline(exes["seqret"],
00123                               sformat = old_format,
00124                               osformat = new_format,
00125                               auto = True, #no prompting
00126                               filter = True)
00127     #Run the tool,
00128     child = subprocess.Popen(str(cline),
00129                              stdin=subprocess.PIPE,
00130                              stdout=subprocess.PIPE,
00131                              stderr=subprocess.PIPE,
00132                              universal_newlines=True,
00133                              shell=(sys.platform!="win32"))
00134     SeqIO.write(records, child.stdin, old_format)
00135     child.stdin.close()
00136     child.stderr.close()
00137     #TODO - Is there a nice way to return an interator AND
00138     #automatically close the handle?
00139     records = list(SeqIO.parse(child.stdout, new_format))
00140     child.stdout.close()
00141     return records
00142 
00143 #Top level function as this makes it easier to use for debugging:
00144 def emboss_piped_AlignIO_convert(alignments, old_format, new_format):
00145     """Run seqret, returns alignments (as a generator)."""
00146     #Setup, this assumes for all the format names used
00147     #Biopython and EMBOSS names are consistent!
00148     cline = SeqretCommandline(exes["seqret"],
00149                               sformat = old_format,
00150                               osformat = new_format,
00151                               auto = True, #no prompting
00152                               filter = True)
00153     #Run the tool,
00154     child = subprocess.Popen(str(cline),
00155                              stdin=subprocess.PIPE,
00156                              stdout=subprocess.PIPE,
00157                              stderr=subprocess.PIPE,
00158                              universal_newlines=True,
00159                              shell=(sys.platform!="win32"))
00160     try:
00161         AlignIO.write(alignments, child.stdin, old_format)
00162     except Exception, err:
00163         child.stdin.close()
00164         child.stderr.close()
00165         child.stdout.close()
00166         raise
00167     child.stdin.close()
00168     child.stderr.close()
00169     #TODO - Is there a nice way to return an interator AND
00170     #automatically close the handle? 
00171     try:
00172         aligns = list(AlignIO.parse(child.stdout, new_format))
00173     except Exception, err:
00174         child.stdout.close()
00175         raise
00176     child.stdout.close()
00177     return aligns
00178 
00179 
00180 #Top level function as this makes it easier to use for debugging:
00181 def compare_records(old_list, new_list):
00182     """Check two lists of SeqRecords agree, raises a ValueError if mismatch."""
00183     if len(old_list) != len(new_list):
00184         raise ValueError("%i vs %i records" % (len(old_list), len(new_list)))
00185     for old, new in zip(old_list, new_list):
00186         #Note the name matching is a bit fuzzy, e.g. truncation and
00187         #no spaces in PHYLIP files.
00188         if old.id != new.id and old.name != new.name \
00189         and (old.id not in new.id) and (new.id not in old.id) \
00190         and (old.id.replace(" ","_") != new.id.replace(" ","_")):
00191             raise ValueError("'%s' or '%s' vs '%s' or '%s' records" \
00192                              % (old.id, old.name, new.id, new.name))
00193         if len(old.seq) != len(new.seq):
00194             raise ValueError("%i vs %i" % (len(old.seq), len(new.seq)))
00195         if str(old.seq).upper() != str(new.seq).upper():
00196             if str(old.seq).replace("X","N")==str(new.seq) :
00197                 raise ValueError("X -> N (protein forced into nucleotide?)")
00198             if len(old.seq) < 200:
00199                 raise ValueError("'%s' vs '%s'" % (old.seq, new.seq))
00200             else:
00201                 raise ValueError("'%s...%s' vs '%s...%s'" \
00202                                  % (old.seq[:60], old.seq[-10:],
00203                                     new.seq[:60], new.seq[-10:]))
00204         if old.features and new.features \
00205         and len(old.features) != len(new.features):
00206             raise ValueError("%i vs %i features" \
00207                              % (len(old.features, len(new.features))))
00208         #TODO - check annotation
00209     return True
00210 
00211 #Top level function as this makes it easier to use for debugging:
00212 def compare_alignments(old_list, new_list):
00213     """Check two lists of Alignments agree, raises a ValueError if mismatch."""
00214     if len(old_list) != len(new_list):
00215         raise ValueError("%i vs %i alignments" % (len(old_list), len(new_list)))
00216     for old, new in zip(old_list, new_list):
00217         if len(old) != len(new):
00218             raise ValueError("Alignment with %i vs %i records" \
00219                              % (len(old), len(new)))
00220         compare_records(old,new)
00221     return True
00222 
00223 class SeqRetSeqIOTests(unittest.TestCase):
00224     """Check EMBOSS seqret against Bio.SeqIO for converting files."""
00225 
00226     def tearDown(self):
00227         clean_up()
00228 
00229     def check_SeqIO_to_EMBOSS(self, in_filename, in_format, skip_formats=[],
00230                               alphabet=None):
00231         """Can Bio.SeqIO write files seqret can read back?"""
00232         if alphabet:
00233             records = list(SeqIO.parse(in_filename, in_format, alphabet))
00234         else:
00235             records = list(SeqIO.parse(in_filename, in_format))
00236         for temp_format in ["genbank","embl","fasta"]:
00237             if temp_format in skip_formats:
00238                 continue
00239             new_records = list(emboss_piped_SeqIO_convert(records, temp_format, "fasta"))
00240             try:
00241                 self.assertTrue(compare_records(records, new_records))
00242             except ValueError, err:
00243                 raise ValueError("Disagree on file %s %s in %s format: %s" \
00244                                  % (in_format, in_filename, temp_format, err))
00245             
00246     def check_EMBOSS_to_SeqIO(self, filename, old_format,
00247                               skip_formats=[]):
00248         """Can Bio.SeqIO read seqret's conversion of the file?"""
00249         #TODO: Why can't we read EMBOSS's swiss output?
00250         self.assertTrue(os.path.isfile(filename))
00251         old_records = list(SeqIO.parse(filename, old_format))
00252         for new_format in ["genbank","fasta","pir","embl", "ig"]:
00253             if new_format in skip_formats:
00254                 continue
00255             handle = emboss_convert(filename, old_format, new_format)
00256             new_records = list(SeqIO.parse(handle, new_format))
00257             handle.close()
00258             try:
00259                 self.assertTrue(compare_records(old_records, new_records))
00260             except ValueError, err:
00261                 raise ValueError("Disagree on %s file %s in %s format: %s" \
00262                                  % (old_format, filename, new_format, err))
00263 
00264     def check_SeqIO_with_EMBOSS(self, filename, old_format, skip_formats=[],
00265                                 alphabet=None):
00266         #Check EMBOSS can read Bio.SeqIO output...
00267         self.check_SeqIO_to_EMBOSS(filename, old_format, skip_formats,
00268                                    alphabet)
00269         #Check Bio.SeqIO can read EMBOSS seqret output...
00270         self.check_EMBOSS_to_SeqIO(filename, old_format, skip_formats)
00271 
00272     def test_abi(self):
00273         """SeqIO agrees with EMBOSS' Abi to FASTQ conversion."""
00274         #This lets use check the id, sequence, and quality scores
00275         for filename in ["Abi/3730.ab1", "Abi/empty.ab1"]:
00276              old = SeqIO.read(filename, "abi")
00277              handle = emboss_convert(filename, "abi", "fastq-sanger")
00278              new = SeqIO.read(handle, "fastq-sanger")
00279              handle.close()
00280              if emboss_version == (6,4,0) and new.id == "EMBOSS_001":
00281                  #Avoid bug in EMBOSS 6.4.0 (patch forthcoming)
00282                  pass
00283              else:
00284                  self.assertEqual(old.id, new.id)
00285              self.assertEqual(str(old.seq), str(new.seq))
00286              if emboss_version < (6,3,0) and new.letter_annotations["phred_quality"] == [1]*len(old):
00287                  #Apparent bug in EMBOSS 6.2.0.1 on Windows           
00288                  pass
00289              else:
00290                  self.assertEqual(old.letter_annotations, new.letter_annotations)
00291 
00292     def test_genbank(self):
00293         """SeqIO & EMBOSS reading each other's conversions of a GenBank file."""
00294         self.check_SeqIO_with_EMBOSS("GenBank/cor6_6.gb", "genbank")
00295 
00296     def test_genbank2(self):
00297         """SeqIO & EMBOSS reading each other's conversions of another GenBank file."""
00298         self.check_SeqIO_with_EMBOSS("GenBank/NC_000932.gb", "genbank")
00299 
00300     def test_embl(self):
00301         """SeqIO & EMBOSS reading each other's conversions of an EMBL file."""
00302         self.check_SeqIO_with_EMBOSS("EMBL/U87107.embl", "embl")
00303 
00304     def test_ig(self):
00305         """SeqIO & EMBOSS reading each other's conversions of an ig file."""
00306         #NOTE - EMBOSS considers "genbank" to be for nucleotides only,
00307         #and will turn "X" into "N" for GenBank output.
00308         self.check_SeqIO_to_EMBOSS("IntelliGenetics/VIF_mase-pro.txt", "ig",
00309                                    alphabet=generic_protein,
00310                                    skip_formats=["genbank","embl"])
00311         #TODO - What does a % in an ig sequence mean?
00312         #e.g. "IntelliGenetics/vpu_nucaligned.txt"
00313         #and  "IntelliGenetics/TAT_mase_nuc.txt"
00314         #EMBOSS seems to ignore them.
00315 
00316     def test_pir(self):
00317         """SeqIO & EMBOSS reading each other's conversions of a PIR file."""
00318         #Skip genbank here, EMBOSS mangles the LOCUS line:
00319         self.check_SeqIO_with_EMBOSS("NBRF/clustalw.pir", "pir",
00320                                skip_formats=["genbank"])
00321         #Skip EMBL here, EMBOSS mangles the ID line
00322         #Skip GenBank, EMBOSS 6.0.1 on Windows won't output proteins as GenBank
00323         self.check_SeqIO_with_EMBOSS("NBRF/DMB_prot.pir", "pir",
00324                                skip_formats=["embl","genbank"])
00325     def test_clustalw(self):
00326         """SeqIO & EMBOSS reading each other's conversions of a Clustalw file."""
00327         self.check_SeqIO_with_EMBOSS("Clustalw/hedgehog.aln", "clustal",
00328                                    skip_formats=["embl","genbank"])
00329         self.check_SeqIO_with_EMBOSS("Clustalw/opuntia.aln", "clustal",
00330                                    skip_formats=["embl","genbank"])
00331 
00332 class SeqRetAlignIOTests(unittest.TestCase):
00333     """Check EMBOSS seqret against Bio.SeqIO for converting files."""
00334 
00335     def tearDown(self):
00336         clean_up()
00337 
00338     def check_EMBOSS_to_AlignIO(self, filename, old_format,
00339                               skip_formats=[]):
00340         """Can AlignIO read seqret's conversion of the file?"""
00341         self.assertTrue(os.path.isfile(filename), filename)
00342         old_aligns = list(AlignIO.parse(filename, old_format))
00343         formats = ["clustal", "phylip", "ig"]
00344         if len(old_aligns) == 1:
00345             formats.extend(["fasta","nexus"])
00346         for new_format in formats:
00347             if new_format in skip_formats:
00348                 continue
00349             handle = emboss_convert(filename, old_format, new_format)
00350             try:
00351                 new_aligns = list(AlignIO.parse(handle, new_format))
00352             except:
00353                 handle.close()
00354                 raise ValueError("Can't parse %s file %s in %s format." \
00355                                  % (old_format, filename, new_format))
00356             handle.close()
00357             try:
00358                 self.assertTrue(compare_alignments(old_aligns, new_aligns))
00359             except ValueError, err:
00360                 raise ValueError("Disagree on %s file %s in %s format: %s" \
00361                                  % (old_format, filename, new_format, err))
00362 
00363     def check_AlignIO_to_EMBOSS(self, in_filename, in_format, skip_formats=[],
00364                                 alphabet=None):
00365         """Can Bio.AlignIO write files seqret can read back?"""
00366         if alphabet:
00367             old_aligns = list(AlignIO.parse(in_filename,in_format,alphabet))
00368         else:
00369             old_aligns = list(AlignIO.parse(in_filename,in_format))
00370 
00371         formats = ["clustal", "phylip"]
00372         if len(old_aligns) == 1:
00373             formats.extend(["fasta","nexus"])
00374         for temp_format in formats:
00375             if temp_format in skip_formats:
00376                 continue
00377             #PHYLIP is a simple format which explicitly supports
00378             #multiple alignments (unlike FASTA).
00379             try:
00380                 new_aligns = list(emboss_piped_AlignIO_convert(old_aligns,
00381                                                                temp_format,
00382                                                                "phylip"))
00383             except ValueError, e:
00384                 #e.g. ValueError: Need a DNA, RNA or Protein alphabet
00385                 #from writing Nexus files...
00386                 continue
00387             try:
00388                 self.assertTrue(compare_alignments(old_aligns, new_aligns))
00389             except ValueError, err:
00390                 raise ValueError("Disagree on file %s %s in %s format: %s" \
00391                                  % (in_format, in_filename, temp_format, err))
00392 
00393     def check_AlignIO_with_EMBOSS(self, filename, old_format, skip_formats=[],
00394                                   alphabet=None):
00395         #Check EMBOSS can read Bio.AlignIO output...
00396         self.check_AlignIO_to_EMBOSS(filename, old_format, skip_formats,
00397                                    alphabet)
00398         #Check Bio.AlignIO can read EMBOSS seqret output...
00399         self.check_EMBOSS_to_AlignIO(filename, old_format, skip_formats)
00400         
00401     def test_align_clustalw(self):
00402         """AlignIO & EMBOSS reading each other's conversions of a ClustalW file."""
00403         self.check_AlignIO_with_EMBOSS("Clustalw/hedgehog.aln", "clustal")
00404         self.check_AlignIO_with_EMBOSS("Clustalw/opuntia.aln", "clustal")
00405         self.check_AlignIO_with_EMBOSS("Clustalw/odd_consensus.aln", "clustal",
00406                                skip_formats=["nexus"]) #TODO - why not nexus?
00407         self.check_AlignIO_with_EMBOSS("Clustalw/protein.aln", "clustal")
00408         self.check_AlignIO_with_EMBOSS("Clustalw/promals3d.aln", "clustal")
00409 
00410     def test_clustalw(self):
00411         """AlignIO & EMBOSS reading each other's conversions of a PHYLIP file."""
00412         self.check_AlignIO_with_EMBOSS("Phylip/horses.phy", "phylip")
00413         self.check_AlignIO_with_EMBOSS("Phylip/hennigian.phy", "phylip")
00414         self.check_AlignIO_with_EMBOSS("Phylip/reference_dna.phy", "phylip")
00415         self.check_AlignIO_with_EMBOSS("Phylip/reference_dna2.phy", "phylip")
00416         self.check_AlignIO_with_EMBOSS("Phylip/interlaced.phy", "phylip")
00417         self.check_AlignIO_with_EMBOSS("Phylip/interlaced2.phy", "phylip")
00418         self.check_AlignIO_with_EMBOSS("Phylip/random.phy", "phylip")
00419 
00420         
00421 class PairwiseAlignmentTests(unittest.TestCase):
00422     """Run pairwise alignments with water and needle, and parse them."""
00423 
00424     def tearDown(self):
00425         clean_up()
00426         
00427     def pairwise_alignment_check(self, query_seq,
00428                                  targets, alignments,
00429                                  local=True):
00430         """Check pairwise alignment data is sane."""
00431         #The datasets should be small, so making iterators into lists is OK
00432         targets = list(targets)
00433         alignments = list(alignments)
00434         self.assertEqual(len(targets), len(alignments))
00435         for target, alignment in zip(targets, alignments):
00436             self.assertEqual(len(alignment), 2)
00437             #self.assertEqual(target.id, alignment[1].id) #too strict
00438             if alignment[1].id not in target.id \
00439             and alignment[1].id not in target.name:
00440                 raise AssertionError("%s vs %s or %s" \
00441                                      % (alignment[1].id , target.id, target.name))
00442             if local:
00443                 #Local alignment
00444                 self.assertTrue(str(alignment[0].seq).replace("-","") \
00445                              in query_seq)
00446                 self.assertTrue(str(alignment[1].seq).replace("-","").upper() \
00447                              in str(target.seq).upper())
00448             else:
00449                 #Global alignment
00450                 self.assertEqual(str(query_seq), str(alignment[0].seq).replace("-",""))
00451                 self.assertEqual(str(target.seq).upper(), \
00452                                  str(alignment[1].seq).replace("-","").upper())
00453         return True
00454 
00455     def run_water(self, cline):
00456         #Run the tool,
00457         stdout, stderr = cline()
00458         self.assertTrue(stderr.strip().startswith("Smith-Waterman local alignment"),
00459                         stderr)
00460         if cline.outfile:
00461             self.assertEqual(stdout.strip(), "")
00462             self.assertTrue(os.path.isfile(cline.outfile))
00463         else :
00464             #Don't use this yet... could return stdout handle instead?
00465             return stdout
00466 
00467     def test_water_file(self):
00468         """water with the asis trick, output to a file."""
00469         #Setup, try a mixture of keyword arguments and later additions:
00470         cline = WaterCommandline(cmd=exes["water"],
00471                                  gapopen="10", gapextend="0.5")
00472         #Try using both human readable names, and the literal ones:
00473         cline.set_parameter("asequence", "asis:ACCCGGGCGCGGT")
00474         cline.set_parameter("-bsequence", "asis:ACCCGAGCGCGGT")
00475         #Try using a property set here:
00476         cline.outfile = "Emboss/temp with space.water"
00477         self.assertEqual(str(eval(repr(cline))), str(cline))
00478         #Run the tool,
00479         self.run_water(cline)
00480         #Check we can parse the output...
00481         align = AlignIO.read(cline.outfile,"emboss")
00482         self.assertEqual(len(align), 2)
00483         self.assertEqual(str(align[0].seq), "ACCCGGGCGCGGT")
00484         self.assertEqual(str(align[1].seq), "ACCCGAGCGCGGT")
00485         #Clean up,
00486         os.remove(cline.outfile)            
00487         
00488     def test_water_piped(self):
00489         """water with asis trick, output piped to stdout."""
00490         cline = WaterCommandline(cmd=exes["water"],
00491                                  asequence="asis:ACCCGGGCGCGGT",
00492                                  bsequence="asis:ACCCGAGCGCGGT",
00493                                  gapopen=10,
00494                                  gapextend=0.5,
00495                                  auto=True, filter=True)
00496         self.assertEqual(str(cline),
00497                          exes["water"] + " -auto -filter" \
00498                          + " -asequence=asis:ACCCGGGCGCGGT" \
00499                          + " -bsequence=asis:ACCCGAGCGCGGT" \
00500                          + " -gapopen=10 -gapextend=0.5")
00501         #Run the tool,
00502         child = subprocess.Popen(str(cline),
00503                                  stdin=subprocess.PIPE,
00504                                  stdout=subprocess.PIPE,
00505                                  stderr=subprocess.PIPE,
00506                                  universal_newlines=True,
00507                                  shell=(sys.platform!="win32"))
00508         child.stdin.close()
00509         #Check we could read it's output
00510         align = AlignIO.read(child.stdout, "emboss")
00511         self.assertEqual(len(align), 2)
00512         self.assertEqual(str(align[0].seq), "ACCCGGGCGCGGT")
00513         self.assertEqual(str(align[1].seq), "ACCCGAGCGCGGT")
00514         #Check no error output:
00515         self.assertEqual(child.stderr.read(), "")
00516         self.assertEqual(0, child.wait())
00517         child.stdout.close()
00518         child.stderr.close()
00519 
00520     def test_needle_file(self):
00521         """needle with the asis trick, output to a file."""
00522         #Setup,
00523         cline = NeedleCommandline(cmd=exes["needle"])
00524         cline.set_parameter("-asequence", "asis:ACCCGGGCGCGGT")
00525         cline.set_parameter("-bsequence", "asis:ACCCGAGCGCGGT")
00526         cline.set_parameter("-gapopen", "10")
00527         cline.set_parameter("-gapextend", "0.5")
00528         #EMBOSS would guess this, but let's be explicit:
00529         cline.set_parameter("-snucleotide", "True")
00530         cline.set_parameter("-outfile", "Emboss/temp with space.needle")
00531         self.assertEqual(str(eval(repr(cline))), str(cline))
00532         #Run the tool,
00533         stdout, stderr = cline()
00534         #Check it worked,
00535         self.assertTrue(stderr.strip().startswith("Needleman-Wunsch global alignment"), stderr)
00536         self.assertEqual(stdout.strip(), "")
00537         filename = cline.outfile
00538         self.assertTrue(os.path.isfile(filename))
00539         #Check we can parse the output...
00540         align = AlignIO.read(filename,"emboss")
00541         self.assertEqual(len(align), 2)
00542         self.assertEqual(str(align[0].seq), "ACCCGGGCGCGGT")
00543         self.assertEqual(str(align[1].seq), "ACCCGAGCGCGGT")
00544         #Clean up,
00545         os.remove(filename)
00546 
00547     def test_needle_piped(self):
00548         """needle with asis trick, output piped to stdout."""
00549         cline = NeedleCommandline(cmd=exes["needle"],
00550                                  asequence="asis:ACCCGGGCGCGGT",
00551                                  bsequence="asis:ACCCGAGCGCGGT",
00552                                  gapopen=10,
00553                                  gapextend=0.5,
00554                                  auto=True, filter=True)
00555         self.assertEqual(str(cline),
00556                          exes["needle"] + " -auto -filter" \
00557                          + " -asequence=asis:ACCCGGGCGCGGT" \
00558                          + " -bsequence=asis:ACCCGAGCGCGGT" \
00559                          + " -gapopen=10 -gapextend=0.5")
00560         #Run the tool,
00561         child = subprocess.Popen(str(cline),
00562                                  stdin=subprocess.PIPE,
00563                                  stdout=subprocess.PIPE,
00564                                  stderr=subprocess.PIPE,
00565                                  universal_newlines=True,
00566                                  shell=(sys.platform!="win32"))
00567         child.stdin.close()
00568         #Check we could read it's output
00569         align = AlignIO.read(child.stdout, "emboss")
00570         self.assertEqual(len(align), 2)
00571         self.assertEqual(str(align[0].seq), "ACCCGGGCGCGGT")
00572         self.assertEqual(str(align[1].seq), "ACCCGAGCGCGGT")
00573         #Check no error output:
00574         self.assertEqual(child.stderr.read(), "")
00575         self.assertEqual(0, child.wait())
00576         child.stdout.close()
00577         child.stderr.close()
00578 
00579     def test_water_file2(self):
00580         """water with the asis trick and nucleotide FASTA file, output to a file."""
00581         #Setup,
00582         query = "ACACACTCACACACACTTGGTCAGAGATGCTGTGCTTCTTGGAAGCAAGGNCTCAAAGGCAAGGTGCACGCAGAGGGACGTTTGAGTCTGGGATGAAGCATGTNCGTATTATTTATATGATGGAATTTCACGTTTTTATG"
00583         out_file = "Emboss/temp_test2.water"
00584         in_file = "Fasta/f002"
00585         self.assertTrue(os.path.isfile(in_file))
00586         if os.path.isfile(out_file):
00587             os.remove(out_file)
00588         cline = WaterCommandline(cmd=exes["water"])
00589         cline.set_parameter("-asequence", "asis:%s" % query)
00590         cline.set_parameter("-bsequence", in_file)
00591         cline.set_parameter("-gapopen", "10")
00592         cline.set_parameter("-gapextend", "0.5")
00593         cline.set_parameter("-outfile", out_file)
00594         self.assertEqual(str(eval(repr(cline))), str(cline))
00595         #Run the tool,
00596         self.run_water(cline)
00597         #Check we can parse the output and it is sensible...
00598         self.pairwise_alignment_check(query,
00599                                       SeqIO.parse(in_file,"fasta"),
00600                                       AlignIO.parse(out_file,"emboss"),
00601                                       local=True)
00602         #Clean up,
00603         os.remove(out_file)
00604 
00605     def test_water_file3(self):
00606         """water with the asis trick and GenBank file, output to a file."""
00607         #Setup,
00608         query = "TGTTGTAATGTTTTAATGTTTCTTCTCCCTTTAGATGTACTACGTTTGGA"
00609         out_file = "Emboss/temp_test3.water"
00610         in_file = "GenBank/cor6_6.gb"
00611         self.assertTrue(os.path.isfile(in_file))
00612         if os.path.isfile(out_file):
00613             os.remove(out_file)
00614         cline = WaterCommandline(cmd=exes["water"])
00615         cline.set_parameter("asequence", "asis:%s" % query)
00616         cline.set_parameter("bsequence", in_file)
00617         #TODO - Tell water this is a GenBank file!
00618         cline.set_parameter("gapopen", "1")
00619         cline.set_parameter("gapextend", "0.5")
00620         cline.set_parameter("outfile", out_file)
00621         self.assertEqual(str(eval(repr(cline))), str(cline))
00622         #Run the tool,
00623         self.run_water(cline)
00624         #Check we can parse the output and it is sensible...
00625         self.pairwise_alignment_check(query,
00626                                       SeqIO.parse(in_file,"genbank"),
00627                                       AlignIO.parse(out_file,"emboss"),
00628                                       local=True)
00629         #Clean up,
00630         os.remove(out_file)
00631 
00632     def test_water_file4(self):
00633         """water with the asis trick and SwissProt file, output to a file."""
00634         #Setup,
00635         query = "DVCTGKALCDPVTQNIKTYPVKIENLRVMI"
00636         out_file = "Emboss/temp_test4.water"
00637         in_file = "SwissProt/sp004"
00638         self.assertTrue(os.path.isfile(in_file))
00639         if os.path.isfile(out_file):
00640             os.remove(out_file)
00641         cline = WaterCommandline(cmd=exes["water"])
00642         cline.set_parameter("-asequence", "asis:%s" % query)
00643         cline.set_parameter("-bsequence", in_file)
00644         #EMBOSS should work this out, but let's be explicit:
00645         cline.set_parameter("-sprotein", True)
00646         #TODO - Tell water this is a SwissProt file!
00647         cline.set_parameter("-gapopen", "20")
00648         cline.set_parameter("-gapextend", "5")
00649         cline.set_parameter("-outfile", out_file)
00650         self.assertEqual(str(eval(repr(cline))), str(cline))
00651         #Run the tool,
00652         self.run_water(cline)
00653         #Check we can parse the output and it is sensible...
00654         self.pairwise_alignment_check(query,
00655                                       SeqIO.parse(in_file,"swiss"),
00656                                       AlignIO.parse(out_file,"emboss"),
00657                                       local=True)
00658         #Clean up,
00659         os.remove(out_file)
00660         
00661     def test_needle_piped2(self):
00662         """needle with asis trick, and nucleotide FASTA file, output piped to stdout."""
00663         #TODO - Support needle in Bio.Emboss.Applications
00664         #(ideally with the -auto and -filter arguments)
00665         #Setup,
00666         query = "ACACACTCACACACACTTGGTCAGAGATGCTGTGCTTCTTGGAA"
00667         cline = exes["needle"]
00668         cline += " -asequence asis:" + query
00669         cline += " -bsequence Fasta/f002"
00670         cline += " -auto" #no prompting
00671         cline += " -filter" #use stdout
00672         #Run the tool,
00673         child = subprocess.Popen(str(cline),
00674                                  stdin=subprocess.PIPE,
00675                                  stdout=subprocess.PIPE,
00676                                  stderr=subprocess.PIPE,
00677                                  universal_newlines=True,
00678                                  shell=(sys.platform!="win32"))
00679         child.stdin.close()
00680         #Check we can parse the output and it is sensible...
00681         self.pairwise_alignment_check(query,
00682                                       SeqIO.parse("Fasta/f002","fasta"),
00683                                       AlignIO.parse(child.stdout,"emboss"),
00684                                       local=False)
00685         #Check no error output:
00686         self.assertEqual(child.stderr.read(), "")
00687         self.assertEqual(0, child.wait())
00688         child.stdout.close()
00689         child.stderr.close()
00690 
00691     def test_water_needs_output(self):
00692         """water without output file or stdout/filter should give error."""
00693         cline = WaterCommandline(cmd=exes["water"],
00694                                  asequence="asis:ACCCGGGCGCGGT",
00695                                  bsequence="asis:ACCCGAGCGCGGT",
00696                                  gapopen=10,
00697                                  gapextend=0.5,
00698                                  auto=True)
00699         self.assertTrue(cline.auto)
00700         self.assertTrue(not cline.stdout)
00701         self.assertTrue(not cline.filter)
00702         self.assertEqual(cline.outfile, None)
00703         self.assertRaises(ValueError, str, cline)
00704 
00705     def test_needle_needs_output(self):
00706         """needle without output file or stdout/filter should give error."""
00707         cline = NeedleCommandline(cmd=exes["needle"],
00708                                  asequence="asis:ACCCGGGCGCGGT",
00709                                  bsequence="asis:ACCCGAGCGCGGT",
00710                                  gapopen=10,
00711                                  gapextend=0.5,
00712                                  auto=True)
00713         self.assertTrue(cline.auto)
00714         self.assertTrue(not cline.stdout)
00715         self.assertTrue(not cline.filter)
00716         self.assertEqual(cline.outfile, None)
00717         self.assertRaises(ValueError, str, cline)
00718    
00719     def test_seqtmatchall_piped(self):
00720         """seqmatchall with pair output piped to stdout."""
00721         cline = SeqmatchallCommandline(cmd=exes["seqmatchall"],
00722                                        sequence="Fasta/f002",
00723                                        aformat="pair", wordsize=9,
00724                                        auto=True, stdout=True)
00725         self.assertEqual(str(cline),
00726                          exes["seqmatchall"] + " -auto -stdout" \
00727                          + " -sequence=Fasta/f002"
00728                          + " -wordsize=9 -aformat=pair")
00729         #Run the tool,
00730         child = subprocess.Popen(str(cline),
00731                                  stdin=subprocess.PIPE,
00732                                  stdout=subprocess.PIPE,
00733                                  stderr=subprocess.PIPE,
00734                                  universal_newlines=True,
00735                                  shell=(sys.platform!="win32"))
00736         child.stdin.close()
00737         #Check we could read it's output
00738         for align in AlignIO.parse(child.stdout, "emboss") :
00739             self.assertEqual(len(align), 2)
00740             self.assertEqual(align.get_alignment_length(), 9)
00741         #Check no error output:
00742         self.assertEqual(child.stderr.read(), "")
00743         self.assertEqual(0, child.wait())
00744         child.stdout.close()
00745         child.stderr.close()
00746         
00747 #Top level function as this makes it easier to use for debugging:
00748 def emboss_translate(sequence, table=None, frame=None):
00749     """Call transeq, returns protein sequence as string."""
00750     #TODO - Support transeq in Bio.Emboss.Applications?
00751     #(doesn't seem worthwhile as Biopython can do translations)
00752 
00753     if not sequence:
00754         raise ValueError(sequence)
00755 
00756     #Setup,
00757     cline = exes["transeq"]
00758 
00759     if len(sequence) < 100:
00760         filename = None
00761         cline += " -sequence asis:%s" % sequence
00762     else:
00763         #There are limits on command line string lengths...
00764         #use a temp file instead.
00765         filename = "Emboss/temp_transeq.txt"
00766         SeqIO.write(SeqRecord(sequence, id="Test"), filename, "fasta")
00767         cline += " -sequence %s" % filename
00768 
00769     cline += " -auto" #no prompting
00770     cline += " -filter" #use stdout
00771     if table is not None:
00772         cline += " -table %s" % str(table)
00773     if frame is not None:
00774         cline += " -frame %s" % str(frame)
00775     #Run the tool,
00776     child = subprocess.Popen(str(cline),
00777                              stdin=subprocess.PIPE,
00778                              stdout=subprocess.PIPE,
00779                              stderr=subprocess.PIPE,
00780                              universal_newlines=True,
00781                              shell=(sys.platform!="win32"))
00782     out, err = child.communicate()
00783     #Check no error output:
00784     if err != "":
00785         raise ValueError(str(cline) + "\n" + err)
00786 
00787     #Check we could read it's output
00788     record = SeqIO.read(StringIO(out), "fasta")
00789 
00790     if 0 != child.wait():
00791         raise ValueError(str(cline))
00792     
00793     if filename:
00794         os.remove(filename)
00795         if not record.id.startswith("Test"):
00796             raise ValueError(str(cline))
00797     else:
00798         if not record.id.startswith("asis"):
00799             raise ValueError(str(cline))
00800     return str(record.seq)
00801 
00802 #Top level function as this makes it easier to use for debugging:
00803 def check_translation(sequence, translation, table=None):
00804     if table is None:
00805         t = 1
00806     else:
00807         t = table
00808     if translation != str(sequence.translate(t)) \
00809     or translation != str(translate(sequence,t)) \
00810     or translation != translate(str(sequence),t):
00811         #More details...
00812         for i, amino in enumerate(translation):
00813             codon = sequence[i*3:i*3+3]
00814             if amino != str(codon.translate(t)):
00815                 raise ValueError("%s -> %s not %s (table %s)" \
00816                          % (codon, amino, codon.translate(t), t))
00817         #Shouldn't reach this line:
00818         raise ValueError("%s -> %s (table %s)" \
00819                          % (sequence, translation, t))
00820     return True
00821 
00822 class TranslationTests(unittest.TestCase):
00823     """Run pairwise alignments with water and needle, and parse them."""
00824 
00825     def tearDown(self):
00826         clean_up()
00827 
00828     def test_simple(self):
00829         """transeq vs Bio.Seq for simple translations (including alt tables)."""
00830 
00831         examples = [Seq("ACGTGACTGACGTAGCATGCCACTAGG"),
00832                     #Unamibguous TA? codons:
00833                     Seq("TAATACTATTAG", generic_dna),
00834                     #Most of the ambiguous TA? codons:
00835                     Seq("TANTARTAYTAMTAKTAHTABTADTAV", generic_dna),
00836                     #Problem cases,
00837                     #
00838                     #Seq("TAW", generic_dna),
00839                     #W = A or T, but EMBOSS does TAW -> X
00840                     #TAA -> Y, TAT ->Y, so in Biopython TAW -> Y
00841                     #
00842                     #Seq("TAS", generic_dna),
00843                     #S = C or G, but EMBOSS does TAS -> Y
00844                     #TAG -> *, TAC ->Y, so in Biopython TAS -> X (Y or *)
00845                     #
00846                     #Seq("AAS", generic_dna),
00847                     #On table 9, EMBOSS gives N, we give X.
00848                     #S = C or G, so according to my reading of
00849                     #table 9 on the NCBI page, AAC=N, AAG=K
00850                     #suggesting this is a bug in EMBOSS.
00851                     #
00852                     Seq("ACGGGGGGGGTAAGTGGTGTGTGTGTAGT", generic_dna),
00853                     ]
00854         
00855         for sequence in examples:
00856             #EMBOSS treats spare residues differently... avoid this issue
00857             if len(sequence) % 3 != 0:
00858                 sequence = sequence[:-(len(sequence)%3)]
00859             self.assertEqual(len(sequence) % 3, 0)
00860             self.assertTrue(len(sequence) > 0)
00861             self.check(sequence)
00862 
00863     def check(self, sequence):
00864         """Compare our translation to EMBOSS's using all tables.
00865 
00866         Takes a Seq object (and a filename containing it)."""
00867         translation = emboss_translate(sequence)
00868         self.assertTrue(check_translation(sequence, translation))
00869 
00870         for table in [1,2,3,4,5,6,9,10,11,12,13,14,15,16,21,22,23]:
00871             translation = emboss_translate(sequence, table)
00872             self.assertTrue(check_translation(sequence, translation, table))
00873         return True
00874 
00875     def translate_all_codons(self, letters):
00876         sequence = Seq("".join([c1+c3+c3 \
00877                        for c1 in letters \
00878                        for c2 in letters \
00879                        for c3 in letters]),
00880                        generic_nucleotide)
00881         self.check(sequence)
00882         
00883     #def test_all_ambig_dna_codons(self):
00884     #    """transeq vs Bio.Seq on ambiguous DNA codons (inc. alt tables)."""
00885     #    self.translate_all_codons(ambiguous_dna_letters)
00886 
00887     def test_all_unambig_dna_codons(self):
00888         """transeq vs Bio.Seq on unambiguous DNA codons (inc. alt tables)."""
00889         self.translate_all_codons("ATCGatcg")
00890 
00891     def test_all_unambig_rna_codons(self):
00892         """transeq vs Bio.Seq on unambiguous RNA codons (inc. alt tables)."""
00893         self.translate_all_codons("AUCGaucg")
00894 
00895     def test_mixed_unambig_rna_codons(self):
00896         """transeq vs Bio.Seq on unambiguous DNA/RNA codons (inc. alt tables)."""
00897         self.translate_all_codons("ATUCGatucg")
00898         
00899 def clean_up():
00900     """Fallback clean up method to remove temp files."""
00901     for filename in os.listdir("Emboss"):
00902         if filename.startswith("temp_"):
00903             try:
00904                 os.remove(filename)
00905             except:
00906                 pass
00907 
00908 if __name__ == "__main__":
00909     runner = unittest.TextTestRunner(verbosity = 2)
00910     unittest.main(testRunner=runner)
00911     clean_up()