Back to index

python-biopython  1.60
AbiIO.py
Go to the documentation of this file.
00001 # Copyright 2011 by Wibowo Arindrarto (w.arindrarto@gmail.com)
00002 # Revisions copyright 2011 by Peter Cock.
00003 # This code is part of the Biopython distribution and governed by its
00004 # license. Please see the LICENSE file that should have been included
00005 # as part of this package.
00006 
00007 """Bio.SeqIO parser for the ABI format.
00008 
00009 ABI is the format used by Applied Biosystem's sequencing machines to store
00010 sequencing results. 
00011 
00012 For more details on the format specification, visit:
00013 http://www.appliedbiosystem.com/support/software_community/ABIF_File_Format.pdf
00014 
00015 """
00016 
00017 __docformat__ = "epytext en"
00018 
00019 import datetime
00020 import struct
00021 
00022 from os.path import basename
00023 from sys import version_info
00024 
00025 from Bio import Alphabet
00026 from Bio.Alphabet.IUPAC import ambiguous_dna, unambiguous_dna
00027 from Bio.Seq import Seq
00028 from Bio.SeqRecord import SeqRecord
00029 from Bio._py3k import _bytes_to_string, _as_bytes
00030 
00031 # dictionary for determining which tags goes into SeqRecord annotation
00032 # each key is tag_name + tag_number
00033 # if a tag entry needs to be added, just add its key and its key
00034 # for the annotations dictionary as the value
00035 _EXTRACT = {
00036             'TUBE1': 'sample_well',
00037             'DySN1': 'dye',
00038             'GTyp1': 'polymer',
00039             'MODL1': 'machine_model',
00040            }
00041 # dictionary for tags that require preprocessing before use in creating
00042 # seqrecords
00043 _SPCTAGS = [
00044             'PBAS2',    # base-called sequence
00045             'PCON2',    # quality values of base-called sequence
00046             'SMPL1',    # sample id inputted before sequencing run
00047             'RUND1',    # run start date
00048             'RUND2',    # run finish date
00049             'RUNT1',    # run start time
00050             'RUNT2',    # run finish time
00051            ]
00052 # dictionary for data unpacking format
00053 _BYTEFMT = {
00054             1: 'b',     # byte
00055             2: 's',     # char
00056             3: 'H',     # word
00057             4: 'h',     # short
00058             5: 'i',     # long
00059             6: '2i',    # rational, legacy unsupported
00060             7: 'f',     # float
00061             8: 'd',     # double
00062             10: 'h2B',  # date
00063             11: '4B',   # time
00064             12: '2i2b', # thumb
00065             13: 'B',    # bool
00066             14: '2h',   # point, legacy unsupported
00067             15: '4h',   # rect, legacy unsupported
00068             16: '2i',   # vPoint, legacy unsupported
00069             17: '4i',   # vRect, legacy unsupported
00070             18: 's',    # pString
00071             19: 's',    # cString
00072             20: '2i',   # tag, legacy unsupported
00073            }
00074 # header data structure (exluding 4 byte ABIF marker)
00075 _HEADFMT = '>H4sI2H3I'
00076 # directory data structure
00077 _DIRFMT = '>4sI2H4I'
00078 
00079 def AbiIterator(handle, alphabet=None, trim=False):
00080     """Iterator for the Abi file format.
00081     """
00082     # raise exception is alphabet is not dna
00083     if alphabet is not None:
00084         if isinstance(Alphabet._get_base_alphabet(alphabet),
00085                       Alphabet.ProteinAlphabet):
00086             raise ValueError("Invalid alphabet, ABI files do not hold proteins.")
00087         if isinstance(Alphabet._get_base_alphabet(alphabet),
00088                       Alphabet.RNAAlphabet):
00089             raise ValueError("Invalid alphabet, ABI files do not hold RNA.")
00090 
00091     # raise exception if handle mode is not 'rb'
00092     if hasattr(handle, 'mode'):
00093         if set('rb') != set(handle.mode.lower()):
00094             raise ValueError("ABI files has to be opened in 'rb' mode.") 
00095 
00096     # check if input file is a valid Abi file
00097     handle.seek(0)
00098     marker = handle.read(4)
00099     if not marker:
00100         # handle empty file gracefully
00101         raise StopIteration
00102     if marker != _as_bytes('ABIF'):
00103         raise IOError('File should start ABIF, not %r' % marker)
00104 
00105     # dirty hack for handling time information
00106     times = {'RUND1': '', 'RUND2': '', 'RUNT1': '', 'RUNT2': '', }
00107 
00108     # initialize annotations
00109     annot = dict(zip(_EXTRACT.values(), [None] * len(_EXTRACT)))
00110 
00111     # parse header and extract data from directories
00112     header = struct.unpack(_HEADFMT, \
00113              handle.read(struct.calcsize(_HEADFMT)))
00114 
00115     for tag_name, tag_number, tag_data in _abi_parse_header(header, handle):
00116         # stop iteration if all desired tags have been extracted
00117         # 4 tags from _EXTRACT + 2 time tags from _SPCTAGS - 3,
00118         # and seq, qual, id
00119         # todo
00120 
00121         key = tag_name + str(tag_number)
00122 
00123         # PBAS2 is base-called sequence
00124         if key == 'PBAS2': 
00125             seq = tag_data
00126             ambigs = 'KYWMRS'
00127             if alphabet is None:
00128                 if set(seq).intersection(ambigs):
00129                     alphabet = ambiguous_dna
00130                 else:
00131                     alphabet = unambiguous_dna
00132         # PCON2 is quality values of base-called sequence
00133         elif key == 'PCON2':
00134             qual = [ord(val) for val in tag_data]
00135         # SMPL1 is sample id entered before sequencing run
00136         elif key == 'SMPL1':
00137             sample_id = tag_data
00138         elif key in times:
00139             times[key] = tag_data
00140         else:
00141             # extract sequence annotation as defined in _EXTRACT          
00142             if key in _EXTRACT:
00143                 annot[_EXTRACT[key]] = tag_data
00144 
00145     # set time annotations
00146     annot['run_start'] = '%s %s' % (times['RUND1'], times['RUNT1'])
00147     annot['run_finish'] = '%s %s' % (times['RUND2'], times['RUNT2'])
00148     
00149     # use the file name as SeqRecord.name if available
00150     try:
00151         file_name = basename(handle.name).replace('.ab1','')
00152     except:
00153         file_name = ""
00154 
00155     record = SeqRecord(Seq(seq, alphabet),
00156                        id=sample_id, name=file_name,
00157                        description='',
00158                        annotations=annot,
00159                        letter_annotations={'phred_quality': qual})
00160                       
00161     if not trim:
00162         yield record
00163     else:
00164         yield _abi_trim(record)
00165 
00166 def _AbiTrimIterator(handle):
00167     """Iterator for the Abi file format that yields trimmed SeqRecord objects.
00168     """
00169     return AbiIterator(handle, trim=True)
00170 
00171 def _abi_parse_header(header, handle):
00172     """Generator that returns directory contents.
00173     """
00174     # header structure (after ABIF marker):
00175     # file version, tag name, tag number,
00176     # element type code, element size, number of elements
00177     # data size, data offset, handle (not file handle)
00178     head_elem_size = header[4]
00179     head_elem_num = header[5]
00180     head_offset = header[7]
00181     index = 0
00182 
00183     while index < head_elem_num:
00184         start = head_offset + index * head_elem_size
00185         # add directory offset to tuple
00186         # to handle directories with data size <= 4 bytes
00187         handle.seek(start)
00188         dir_entry = struct.unpack(_DIRFMT, \
00189                     handle.read(struct.calcsize(_DIRFMT))) + (start,)
00190         index += 1
00191         # only parse desired dirs
00192         key = _bytes_to_string(dir_entry[0])
00193         key += str(dir_entry[1])
00194         if key in (list(_EXTRACT.keys()) + _SPCTAGS):
00195             tag_name = _bytes_to_string(dir_entry[0])
00196             tag_number = dir_entry[1]
00197             elem_code = dir_entry[2]
00198             elem_num = dir_entry[4]
00199             data_size = dir_entry[5]
00200             data_offset = dir_entry[6]
00201             tag_offset = dir_entry[8]
00202             # if data size <= 4 bytes, data is stored inside tag
00203             # so offset needs to be changed
00204             if data_size <= 4:
00205                 data_offset = tag_offset + 20
00206             handle.seek(data_offset)
00207             data = handle.read(data_size)
00208             yield tag_name, tag_number, \
00209                   _parse_tag_data(elem_code, elem_num, data)
00210 
00211 
00212 def _abi_trim(seq_record):
00213     """Trims the sequence using Richard Mott's modified trimming algorithm.
00214 
00215     seq_record - SeqRecord object to be trimmed.
00216 
00217     Trimmed bases are determined from their segment score, which is a
00218     cumulative sum of each base's score. Base scores are calculated from
00219     their quality values.
00220 
00221     More about the trimming algorithm:
00222     http://www.phrap.org/phredphrap/phred.html
00223     http://www.clcbio.com/manual/genomics/Quality_abif_trimming.html
00224     """
00225 
00226     start = False   # flag for starting position of trimmed sequence
00227     segment = 20    # minimum sequence length
00228     trim_start = 0  # init start index
00229     cutoff = 0.05   # default cutoff value for calculating base score
00230 
00231     if len(seq_record) <= segment:
00232         return seq_record
00233     else:
00234         # calculate base score
00235         score_list = [cutoff - (10 ** (qual/-10.0)) for qual in
00236                       seq_record.letter_annotations['phred_quality']]
00237 
00238         # calculate cummulative score
00239         # if cummulative value < 0, set it to 0
00240         # first value is set to 0, because of the assumption that
00241         # the first base will always be trimmed out
00242         cummul_score = [0]
00243         for i in range(1, len(score_list)):
00244             score = cummul_score[-1] + score_list[i]
00245             if score < 0:
00246                 cummul_score.append(0)
00247             else:
00248                 cummul_score.append(score)
00249                 if not start:
00250                     # trim_start = value when cummulative score is first > 0
00251                     trim_start = i
00252                     start = True
00253         
00254         # trim_finish = index of highest cummulative score,
00255         # marking the end of sequence segment with highest cummulative score
00256         trim_finish = cummul_score.index(max(cummul_score))
00257                          
00258         return seq_record[trim_start:trim_finish]
00259 
00260 def _parse_tag_data(elem_code, elem_num, raw_data):
00261     """Returns single data value.
00262     
00263     elem_code - What kind of data
00264     elem_num - How many data points
00265     raw_data - abi file object from which the tags would be unpacked
00266     """
00267     if elem_code in _BYTEFMT:
00268         # because '>1s' unpack differently from '>s'
00269         if elem_num == 1:
00270             num = ''
00271         else:
00272             num = str(elem_num)
00273         fmt = '>' + num + _BYTEFMT[elem_code]
00274 
00275         assert len(raw_data) == struct.calcsize(fmt)
00276         data = struct.unpack(fmt, raw_data)
00277 
00278         # no need to use tuple if len(data) == 1
00279         # also if data is date / time
00280         if elem_code not in [10, 11] and len(data) == 1:
00281             data = data[0]
00282 
00283         # account for different data types
00284         if elem_code == 2:
00285             return _bytes_to_string(data)
00286         elif elem_code == 10:
00287             return str(datetime.date(*data))
00288         elif elem_code == 11:
00289             return str(datetime.time(*data[:3]))
00290         elif elem_code == 13:
00291             return bool(data)
00292         elif elem_code == 18:
00293             return _bytes_to_string(data[1:])
00294         elif self.elem_code == 19:
00295             return _bytes_to_string(data[:-1])
00296         else:
00297             return data
00298     else:
00299         return None
00300 
00301 if __name__ == '__main__':
00302     pass