Back to index

python-biopython  1.60
MarkovModel.py
Go to the documentation of this file.
00001 """Deal with representations of Markov Models.
00002 """
00003 # standard modules
00004 import copy
00005 import math
00006 import random
00007 
00008 #TODO - Take advantage of defaultdict once Python 2.4 is dead?
00009 #from collections import defaultdict
00010 
00011 # biopython
00012 from Bio.Seq import MutableSeq
00013 
00014 def _gen_random_array(n):
00015     """ Return an array of n random numbers, where the elements of the array sum
00016     to 1.0"""
00017     randArray = [random.random() for i in range(n)]
00018     total = sum(randArray)
00019     normalizedRandArray = [x/total for x in randArray]
00020     
00021     return normalizedRandArray
00022 
00023 def _calculate_emissions(emission_probs):
00024     """Calculate which symbols can be emitted in each state
00025     """
00026     # loop over all of the state-symbol duples, mapping states to
00027     # lists of emitted symbols
00028     emissions = dict()
00029     for state, symbol in emission_probs:
00030         try:
00031             emissions[state].append(symbol)
00032         except KeyError:
00033             emissions[state] = [symbol]
00034 
00035     return emissions
00036 
00037 def _calculate_from_transitions(trans_probs):
00038     """Calculate which 'from transitions' are allowed for each state
00039 
00040     This looks through all of the trans_probs, and uses this dictionary
00041     to determine allowed transitions. It converts this information into
00042     a dictionary, whose keys are source states and whose values are
00043     lists of destination states reachable from the source state via a
00044     transition.
00045     """
00046     transitions = dict()
00047     for from_state, to_state in trans_probs:
00048         try:
00049             transitions[from_state].append(to_state)
00050         except KeyError:
00051             transitions[from_state] = [to_state]
00052 
00053     return transitions
00054 
00055 def _calculate_to_transitions(trans_probs):
00056     """Calculate which 'to transitions' are allowed for each state
00057 
00058     This looks through all of the trans_probs, and uses this dictionary
00059     to determine allowed transitions. It converts this information into
00060     a dictionary, whose keys are destination states and whose values are
00061     lists of source states from which the destination is reachable via a
00062     transition.
00063     """
00064     transitions = dict()
00065     for from_state, to_state in trans_probs:
00066         try:
00067             transitions[to_state].append(from_state)
00068         except KeyError:
00069             transitions[to_state] = [from_state]
00070 
00071     return transitions
00072 
00073 class MarkovModelBuilder(object):
00074     """Interface to build up a Markov Model.
00075 
00076     This class is designed to try to separate the task of specifying the
00077     Markov Model from the actual model itself. This is in hopes of making
00078     the actual Markov Model classes smaller.
00079 
00080     So, this builder class should be used to create Markov models instead
00081     of trying to initiate a Markov Model directly.
00082     """
00083     # the default pseudo counts to use
00084     DEFAULT_PSEUDO = 1
00085     
00086     def __init__(self, state_alphabet, emission_alphabet):
00087         """Initialize a builder to create Markov Models.
00088 
00089         Arguments:
00090 
00091         o state_alphabet -- An alphabet containing all of the letters that
00092         can appear in the states
00093        
00094         o emission_alphabet -- An alphabet containing all of the letters for
00095         states that can be emitted by the HMM.
00096         """
00097         self._state_alphabet = state_alphabet
00098         self._emission_alphabet = emission_alphabet
00099 
00100         # probabilities for the initial state, initialized by calling
00101         # set_initial_probabilities (required)
00102         self.initial_prob = {}
00103 
00104         # the probabilities for transitions and emissions
00105         # by default we have no transitions and all possible emissions
00106         self.transition_prob = {}
00107         self.emission_prob = self._all_blank(state_alphabet,
00108                                              emission_alphabet)
00109 
00110         # the default pseudocounts for transition and emission counting
00111         self.transition_pseudo = {}
00112         self.emission_pseudo = self._all_pseudo(state_alphabet,
00113                                                 emission_alphabet)
00114 
00115     def _all_blank(self, first_alphabet, second_alphabet):
00116         """Return a dictionary with all counts set to zero.
00117 
00118         This uses the letters in the first and second alphabet to create
00119         a dictionary with keys of two tuples organized as
00120         (letter of first alphabet, letter of second alphabet). The values
00121         are all set to 0.
00122         """
00123         all_blank = {}
00124         for first_state in first_alphabet.letters:
00125             for second_state in second_alphabet.letters:
00126                 all_blank[(first_state, second_state)] = 0
00127 
00128         return all_blank
00129 
00130     def _all_pseudo(self, first_alphabet, second_alphabet):
00131         """Return a dictionary with all counts set to a default value.
00132 
00133         This takes the letters in first alphabet and second alphabet and
00134         creates a dictionary with keys of two tuples organized as:
00135         (letter of first alphabet, letter of second alphabet). The values
00136         are all set to the value of the class attribute DEFAULT_PSEUDO.
00137         """
00138         all_counts = {}
00139         for first_state in first_alphabet.letters:
00140             for second_state in second_alphabet.letters:
00141                 all_counts[(first_state, second_state)] = self.DEFAULT_PSEUDO
00142 
00143         return all_counts
00144                 
00145     def get_markov_model(self):
00146         """Return the markov model corresponding with the current parameters.
00147 
00148         Each markov model returned by a call to this function is unique
00149         (ie. they don't influence each other).
00150         """
00151 
00152         # user must set initial probabilities
00153         if not self.initial_prob:
00154             raise Exception("set_initial_probabilities must be called to " +
00155                             "fully initialize the Markov model")
00156 
00157         initial_prob = copy.deepcopy(self.initial_prob)
00158         transition_prob = copy.deepcopy(self.transition_prob)
00159         emission_prob = copy.deepcopy(self.emission_prob)
00160         transition_pseudo = copy.deepcopy(self.transition_pseudo)
00161         emission_pseudo = copy.deepcopy(self.emission_pseudo)
00162         
00163         return HiddenMarkovModel(initial_prob, transition_prob, emission_prob,
00164                                  transition_pseudo, emission_pseudo)
00165 
00166     def set_initial_probabilities(self, initial_prob):
00167         """Set initial state probabilities.
00168 
00169         initial_prob is a dictionary mapping states to probabilities.
00170         Suppose, for example, that the state alphabet is ['A', 'B']. Call
00171         set_initial_prob({'A': 1}) to guarantee that the initial
00172         state will be 'A'. Call set_initial_prob({'A': 0.5, 'B': 0.5})
00173         to make each initial state equally probable.
00174 
00175         This method must now be called in order to use the Markov model
00176         because the calculation of initial probabilities has changed
00177         incompatibly; the previous calculation was incorrect.
00178 
00179         If initial probabilities are set for all states, then they should add up
00180         to 1. Otherwise the sum should be <= 1. The residual probability is
00181         divided up evenly between all the states for which the initial
00182         probability has not been set. For example, calling
00183         set_initial_prob({}) results in P('A') = 0.5 and P('B') = 0.5,
00184         for the above example.
00185         """
00186         self.initial_prob = copy.copy(initial_prob)
00187 
00188         # ensure that all referenced states are valid
00189         for state in initial_prob.iterkeys():
00190             assert state in self._state_alphabet.letters, \
00191                    "State %s was not found in the sequence alphabet" % state
00192 
00193         # distribute the residual probability, if any
00194         num_states_not_set =\
00195             len(self._state_alphabet.letters) - len(self.initial_prob)
00196         if num_states_not_set < 0:
00197             raise Exception("Initial probabilities can't exceed # of states")
00198         prob_sum = sum(self.initial_prob.values())
00199         if prob_sum > 1.0:
00200             raise Exception("Total initial probability cannot exceed 1.0")
00201         if num_states_not_set > 0:
00202             prob = (1.0 - prob_sum) / num_states_not_set
00203             for state in self._state_alphabet.letters:
00204                 if not state in self.initial_prob:
00205                     self.initial_prob[state] = prob
00206 
00207     def set_equal_probabilities(self):
00208         """Reset all probabilities to be an average value.
00209 
00210         Resets the values of all initial probabilities and all allowed
00211         transitions and all allowed emissions to be equal to 1 divided by the
00212         number of possible elements.
00213 
00214         This is useful if you just want to initialize a Markov Model to
00215         starting values (ie. if you have no prior notions of what the
00216         probabilities should be -- or if you are just feeling too lazy
00217         to calculate them :-).
00218 
00219         Warning 1 -- this will reset all currently set probabilities.
00220 
00221         Warning 2 -- This just sets all probabilities for transitions and
00222         emissions to total up to 1, so it doesn't ensure that the sum of
00223         each set of transitions adds up to 1.
00224         """
00225 
00226         # set initial state probabilities
00227         new_initial_prob = float(1) / float(len(self.transition_prob))
00228         for state in self._state_alphabet.letters:
00229             self.initial_prob[state] = new_initial_prob
00230 
00231         # set the transitions
00232         new_trans_prob = float(1) / float(len(self.transition_prob))
00233         for key in self.transition_prob:
00234             self.transition_prob[key] = new_trans_prob
00235 
00236         # set the emissions
00237         new_emission_prob = float(1) / float(len(self.emission_prob))
00238         for key in self.emission_prob:
00239             self.emission_prob[key] = new_emission_prob
00240 
00241 
00242     def set_random_initial_probabilities(self):
00243         """Set all initial state probabilities to a randomly generated distribution.
00244         Returns the dictionary containing the initial probabilities.
00245         """
00246         initial_freqs = _gen_random_array(len(self._state_alphabet.letters))
00247         for state in self._state_alphabet.letters:
00248             self.initial_prob[state] = initial_freqs.pop()
00249 
00250         return self.initial_prob
00251 
00252     def set_random_transition_probabilities(self):
00253         """Set all allowed transition probabilities to a randomly generated distribution.
00254         Returns the dictionary containing the transition probabilities.
00255         """
00256 
00257         if not self.transition_prob:
00258             raise Exception("No transitions have been allowed yet. " +
00259                             "Allow some or all transitions by calling " + 
00260                             "allow_transition or allow_all_transitions first.")
00261 
00262         transitions_from = _calculate_from_transitions(self.transition_prob)
00263         for from_state in transitions_from.keys():
00264             freqs = _gen_random_array(len(transitions_from[from_state]))
00265             for to_state in transitions_from[from_state]:
00266                 self.transition_prob[(from_state, to_state)] = freqs.pop()
00267 
00268         return self.transition_prob
00269 
00270     def set_random_emission_probabilities(self):
00271         """Set all allowed emission probabilities to a randomly generated
00272         distribution.  Returns the dictionary containing the emission
00273         probabilities.
00274         """
00275 
00276         if not self.emission_prob:
00277             raise Exception("No emissions have been allowed yet. " +
00278                             "Allow some or all emissions.")
00279 
00280         emissions = _calculate_emissions(self.emission_prob)
00281         for state in emissions.iterkeys():
00282             freqs = _gen_random_array(len(emissions[state]))
00283             for symbol in emissions[state]:
00284                 self.emission_prob[(state, symbol)] = freqs.pop()
00285 
00286         return self.emission_prob
00287 
00288         
00289     def set_random_probabilities(self):
00290         """Set all probabilities to randomly generated numbers.
00291 
00292         Resets probabilities of all initial states, transitions, and
00293         emissions to random values.
00294         """
00295         self.set_random_initial_probabilities()
00296         self.set_random_transition_probabilities()
00297         self.set_random_emission_probabilities()
00298 
00299     # --- functions to deal with the transitions in the sequence
00300 
00301     def allow_all_transitions(self):
00302         """A convenience function to create transitions between all states.
00303 
00304         By default all transitions within the alphabet are disallowed; this
00305         is a way to change this to allow all possible transitions.
00306         """
00307         # first get all probabilities and pseudo counts set
00308         # to the default values
00309         all_probs = self._all_blank(self._state_alphabet,
00310                                     self._state_alphabet)
00311 
00312         all_pseudo = self._all_pseudo(self._state_alphabet,
00313                                       self._state_alphabet)
00314 
00315         # now set any probabilities and pseudo counts that
00316         # were previously set
00317         for set_key in self.transition_prob:
00318             all_probs[set_key] = self.transition_prob[set_key]
00319 
00320         for set_key in self.transition_pseudo:
00321             all_pseudo[set_key] = self.transition_pseudo[set_key]
00322 
00323         # finally reinitialize the transition probs and pseudo counts
00324         self.transition_prob = all_probs
00325         self.transition_pseudo = all_pseudo
00326 
00327     def allow_transition(self, from_state, to_state, probability = None,
00328                          pseudocount = None):
00329         """Set a transition as being possible between the two states.
00330 
00331         probability and pseudocount are optional arguments
00332         specifying the probabilities and pseudo counts for the transition.
00333         If these are not supplied, then the values are set to the
00334         default values.
00335 
00336         Raises:
00337         KeyError -- if the two states already have an allowed transition.
00338         """
00339         # check the sanity of adding these states
00340         for state in [from_state, to_state]:
00341             assert state in self._state_alphabet.letters, \
00342                    "State %s was not found in the sequence alphabet" % state
00343 
00344         # ensure that the states are not already set
00345         if ((from_state, to_state) not in self.transition_prob and 
00346             (from_state, to_state) not in self.transition_pseudo):
00347             # set the initial probability
00348             if probability is None:
00349                 probability = 0
00350             self.transition_prob[(from_state, to_state)] = probability
00351 
00352             # set the initial pseudocounts
00353             if pseudocount is None:
00354                 pseudcount = self.DEFAULT_PSEUDO
00355             self.transition_pseudo[(from_state, to_state)] = pseudocount 
00356         else:
00357             raise KeyError("Transition from %s to %s is already allowed."
00358                            % (from_state, to_state))
00359 
00360     def destroy_transition(self, from_state, to_state):
00361         """Restrict transitions between the two states.
00362 
00363         Raises:
00364         KeyError if the transition is not currently allowed.
00365         """
00366         try:
00367             del self.transition_prob[(from_state, to_state)]
00368             del self.transition_pseudo[(from_state, to_state)]
00369         except KeyError:
00370             raise KeyError("Transition from %s to %s is already disallowed."
00371                            % (from_state, to_state))
00372 
00373     def set_transition_score(self, from_state, to_state, probability):
00374         """Set the probability of a transition between two states.
00375 
00376         Raises:
00377         KeyError if the transition is not allowed.
00378         """
00379         if (from_state, to_state) in self.transition_prob:
00380             self.transition_prob[(from_state, to_state)] = probability
00381         else:
00382             raise KeyError("Transition from %s to %s is not allowed."
00383                            % (from_state, to_state))
00384 
00385     def set_transition_pseudocount(self, from_state, to_state, count):
00386         """Set the default pseudocount for a transition.
00387 
00388         To avoid computational problems, it is helpful to be able to
00389         set a 'default' pseudocount to start with for estimating
00390         transition and emission probabilities (see p62 in Durbin et al
00391         for more discussion on this. By default, all transitions have
00392         a pseudocount of 1.
00393 
00394         Raises:
00395         KeyError if the transition is not allowed.
00396         """
00397         if (from_state, to_state) in self.transition_pseudo:
00398             self.transition_pseudo[(from_state, to_state)] = count
00399         else:
00400             raise KeyError("Transition from %s to %s is not allowed."
00401                            % (from_state, to_state))
00402 
00403     # --- functions to deal with emissions from the sequence
00404 
00405     def set_emission_score(self, seq_state, emission_state, probability):
00406         """Set the probability of a emission from a particular state.
00407 
00408         Raises:
00409         KeyError if the emission from the given state is not allowed.
00410         """
00411         if (seq_state, emission_state) in self.emission_prob:
00412             self.emission_prob[(seq_state, emission_state)] = probability
00413         else:
00414             raise KeyError("Emission of %s from %s is not allowed."
00415                            % (emission_state, seq_state))
00416 
00417     def set_emission_pseudocount(self, seq_state, emission_state, count):
00418         """Set the default pseudocount for an emission.
00419 
00420         To avoid computational problems, it is helpful to be able to
00421         set a 'default' pseudocount to start with for estimating
00422         transition and emission probabilities (see p62 in Durbin et al
00423         for more discussion on this. By default, all emissions have
00424         a pseudocount of 1.
00425 
00426         Raises:
00427         KeyError if the emission from the given state is not allowed.
00428         """
00429         if (seq_state, emission_state) in self.emission_pseudo:
00430             self.emission_pseudo[(seq_state, emission_state)] = count
00431         else:
00432             raise KeyError("Emission of %s from %s is not allowed."
00433                            % (emission_state, seq_state))
00434 
00435 class HiddenMarkovModel(object):
00436     """Represent a hidden markov model that can be used for state estimation.
00437     """
00438     def __init__(self, initial_prob, transition_prob, emission_prob,
00439                  transition_pseudo, emission_pseudo):
00440         """Initialize a Markov Model.
00441 
00442         Note: You should use the MarkovModelBuilder class instead of
00443         initiating this class directly.
00444 
00445         Arguments:
00446 
00447         o initial_prob - A dictionary of initial probabilities for all states.
00448 
00449         o transition_prob -- A dictionary of transition probabilities for all
00450         possible transitions in the sequence.
00451 
00452         o emission_prob -- A dictionary of emission probabilities for all
00453         possible emissions from the sequence states.
00454 
00455         o transition_pseudo -- Pseudo-counts to be used for the transitions,
00456         when counting for purposes of estimating transition probabilities.
00457 
00458         o emission_pseudo -- Pseudo-counts to be used for the emissions,
00459         when counting for purposes of estimating emission probabilities.
00460         """
00461 
00462         self.initial_prob = initial_prob
00463 
00464         self._transition_pseudo = transition_pseudo
00465         self._emission_pseudo = emission_pseudo
00466         
00467         self.transition_prob = transition_prob
00468         self.emission_prob = emission_prob
00469 
00470         # a dictionary of the possible transitions from each state
00471         # each key is a source state, mapped to a list of the destination states
00472         # that are reachable from the source state via a transition
00473         self._transitions_from = \
00474            _calculate_from_transitions(self.transition_prob)
00475 
00476         # a dictionary of the possible transitions to each state
00477         # each key is a destination state, mapped to a list of source states
00478         # from which the destination is reachable via a transition
00479         self._transitions_to = \
00480            _calculate_to_transitions(self.transition_prob)
00481 
00482 
00483     def get_blank_transitions(self):
00484         """Get the default transitions for the model.
00485 
00486         Returns a dictionary of all of the default transitions between any
00487         two letters in the sequence alphabet. The dictionary is structured
00488         with keys as (letter1, letter2) and values as the starting number
00489         of transitions.
00490         """
00491         return self._transition_pseudo
00492 
00493     def get_blank_emissions(self):
00494         """Get the starting default emmissions for each sequence.
00495         
00496         This returns a dictionary of the default emmissions for each
00497         letter. The dictionary is structured with keys as
00498         (seq_letter, emmission_letter) and values as the starting number
00499         of emmissions.
00500         """
00501         return self._emission_pseudo
00502 
00503     def transitions_from(self, state_letter):
00504         """Get all destination states to which there are transitions from the
00505         state_letter source state.
00506 
00507         This returns all letters which the given state_letter can transition
00508         to. An empty list is returned if state_letter has no outgoing
00509         transitions.
00510         """
00511         if state_letter in self._transitions_from:
00512             return self._transitions_from[state_letter]
00513         else:
00514             return []
00515 
00516     def transitions_to(self, state_letter):
00517         """Get all source states from which there are transitions to the
00518         state_letter destination state.
00519 
00520         This returns all letters which the given state_letter is reachable
00521         from. An empty list is returned if state_letter is unreachable.
00522         """
00523         if state_letter in self._transitions_to:
00524             return self._transitions_to[state_letter]
00525         else:
00526             return []
00527 
00528     def viterbi(self, sequence, state_alphabet):
00529         """Calculate the most probable state path using the Viterbi algorithm.
00530 
00531         This implements the Viterbi algorithm (see pgs 55-57 in Durbin et
00532         al for a full explanation -- this is where I took my implementation
00533         ideas from), to allow decoding of the state path, given a sequence
00534         of emissions.
00535 
00536         Arguments:
00537 
00538         o sequence -- A Seq object with the emission sequence that we
00539         want to decode.
00540 
00541         o state_alphabet -- The alphabet of the possible state sequences
00542         that can be generated.
00543         """
00544 
00545         # calculate logarithms of the initial, transition, and emission probs
00546         log_initial = self._log_transform(self.initial_prob)
00547         log_trans = self._log_transform(self.transition_prob)
00548         log_emission = self._log_transform(self.emission_prob)
00549 
00550         viterbi_probs = {}
00551         pred_state_seq = {}
00552         state_letters = state_alphabet.letters
00553 
00554         # --- recursion
00555         # loop over the training squence (i = 1 .. L)
00556         # NOTE: My index numbers are one less than what is given in Durbin
00557         # et al, since we are indexing the sequence going from 0 to
00558         # (Length - 1) not 1 to Length, like in Durbin et al.
00559         for i in range(0, len(sequence)):
00560             # loop over all of the possible i-th states in the state path
00561             for cur_state in state_letters:
00562                 # e_{l}(x_{i})
00563                 emission_part = log_emission[(cur_state, sequence[i])]
00564 
00565                 max_prob = 0
00566                 if i == 0:
00567                     # for the first state, use the initial probability rather
00568                     # than looking back to previous states
00569                     max_prob = log_initial[cur_state]
00570                 else:
00571                     # loop over all possible (i-1)-th previous states
00572                     possible_state_probs = {}
00573                     for prev_state in self.transitions_to(cur_state):
00574                         # a_{kl}
00575                         trans_part = log_trans[(prev_state, cur_state)]
00576 
00577                         # v_{k}(i - 1)
00578                         viterbi_part = viterbi_probs[(prev_state, i - 1)]
00579                         cur_prob = viterbi_part + trans_part
00580 
00581                         possible_state_probs[prev_state] = cur_prob
00582 
00583                     # calculate the viterbi probability using the max
00584                     max_prob = max(possible_state_probs.values())
00585 
00586                 # v_{k}(i)
00587                 viterbi_probs[(cur_state, i)] = (emission_part + max_prob)
00588 
00589                 if i > 0:
00590                     # get the most likely prev_state leading to cur_state
00591                     for state in possible_state_probs:
00592                         if possible_state_probs[state] == max_prob:
00593                             pred_state_seq[(i - 1, cur_state)] = state
00594                             break
00595                     
00596         # --- termination
00597         # calculate the probability of the state path
00598         # loop over all states
00599         all_probs = {}
00600         for state in state_letters:
00601             # v_{k}(L)
00602             all_probs[state] = viterbi_probs[(state, len(sequence) - 1)]
00603 
00604         state_path_prob = max(all_probs.values())
00605 
00606         # find the last pointer we need to trace back from
00607         last_state = ''
00608         for state in all_probs:
00609             if all_probs[state] == state_path_prob:
00610                 last_state = state
00611 
00612         assert last_state != '', "Didn't find the last state to trace from!"
00613                 
00614         # --- traceback
00615         traceback_seq = MutableSeq('', state_alphabet)
00616         
00617         loop_seq = range(1, len(sequence))
00618         loop_seq.reverse()
00619 
00620         # last_state is the last state in the most probable state sequence.
00621         # Compute that sequence by walking backwards in time. From the i-th
00622         # state in the sequence, find the (i-1)-th state as the most
00623         # probable state preceding the i-th state.
00624         state = last_state
00625         traceback_seq.append(state)
00626         for i in loop_seq:
00627             state = pred_state_seq[(i - 1, state)]
00628             traceback_seq.append(state)
00629 
00630         # put the traceback sequence in the proper orientation
00631         traceback_seq.reverse()
00632 
00633         return traceback_seq.toseq(), state_path_prob
00634 
00635     def _log_transform(self, probability):
00636         """Return log transform of the given probability dictionary.
00637 
00638         When calculating the Viterbi equation, add logs of probabilities rather
00639         than multiplying probabilities, to avoid underflow errors. This method
00640         returns a new dictionary with the same keys as the given dictionary
00641         and log-transformed values.
00642         """
00643         log_prob = copy.copy(probability)
00644         try:
00645             neg_inf = float("-inf")
00646         except ValueError:
00647             #On Python 2.5 or older that was handled in C code,
00648             #and failed on Windows XP 32bit
00649             neg_inf = - 1E400
00650         for key in log_prob:
00651             prob = log_prob[key]
00652             if prob > 0:
00653                 log_prob[key] = math.log(log_prob[key])
00654             else:
00655                 log_prob[key] = neg_inf
00656 
00657         return log_prob
00658