# -*- coding: utf-8 -*-
__all__ = [
'HiddenMarkovModel'
]
###########
# IMPORTS #
###########
# Standard
import copy as _cp
import inspect as _ins
# Libraries
import numpy as _np
import numpy.linalg as _npl
# Internal
from .base_classes import (
Model as _Model
)
from .custom_types import (
ohmm_decoding as _ohmm_decoding,
oint as _oint,
olist_str as _olist_str,
onumeric as _onumeric,
ostate as _ostate,
ostates as _ostates,
ostatus as _ostatus,
tarray as _tarray,
tgraph as _tgraph,
tgraphs as _tgraphs,
thmm as _thmm,
thmm_dict as _thmm_dict,
thmm_dict_flex as _thmm_dict_flex,
thmm_prediction as _thmm_prediction,
thmm_sequence_ext as _thmm_sequence_ext,
thmm_step as _thmm_step,
thmm_symbols_ext as _thmm_symbols_ext,
tlist_str as _tlist_str,
tnumeric as _tnumeric,
tpair_array as _tpair_array,
tpair_int as _tpair_int,
tpath as _tpath,
tsequence as _tsequence,
tstate as _tstate
)
from .decorators import (
cached_property as _cached_property,
object_mark as _object_mark
)
from .exceptions import (
ValidationError as _ValidationError
)
from .files_io import (
read_csv as _read_csv,
read_json as _read_json,
read_txt as _read_txt,
read_xml as _read_xml,
write_csv as _write_csv,
write_json as _write_json,
write_txt as _write_txt,
write_xml as _write_xml
)
from .fitting import (
hmm_fit as _fit,
)
from .generators import (
hmm_estimate as _estimate,
hmm_random as _random,
hmm_restrict as _restrict
)
from .markov_chain import (
MarkovChain as _MarkovChain
)
from .measures import (
hmm_decode as _decode
)
from .simulations import (
hmm_predict as _predict,
hmm_simulate as _simulate
)
from .utilities import (
build_hmm_graph as _build_hmm_graph,
create_labels as _create_labels,
create_rng as _create_rng,
create_validation_error as _create_validation_error,
get_caller as _get_caller,
get_instance_generators as _get_instance_generators
)
from .validation import (
validate_boolean as _validate_boolean,
validate_dictionary as _validate_dictionary,
validate_emission_matrix as _validate_emission_matrix,
validate_enumerator as _validate_enumerator,
validate_file_path as _validate_file_path,
validate_graph as _validate_graph,
validate_integer as _validate_integer,
validate_label as _validate_label,
validate_labels_current as _validate_labels_current,
validate_labels_input as _validate_labels_input,
validate_mask as _validate_mask,
validate_matrix as _validate_matrix,
validate_sequence as _validate_sequence,
validate_sequences as _validate_sequences,
validate_status as _validate_status,
validate_transition_matrix as _validate_transition_matrix
)
###########
# CLASSES #
###########
[docs]class HiddenMarkovModel(_Model):
"""
Defines a hidden Markov model with the given transition and emission matrices.
:param p: the transition matrix.
:param e: the emission matrix.
:param states: the name of each state (*if omitted, an increasing sequence of integers starting at 1 with prefix P*).
:param symbols: the name of each symbol (*if omitted, an increasing sequence of integers starting at 1 with prefix E*).
:raises ValidationError: if any input argument is not compliant.
"""
__instance_generators: _olist_str = None
def __init__(self, p: _tnumeric, e: _tnumeric, states: _olist_str = None, symbols: _olist_str = None):
if HiddenMarkovModel.__instance_generators is None:
HiddenMarkovModel.__instance_generators = _get_instance_generators(self.__class__)
caller = _get_caller(_ins.stack())
if caller not in HiddenMarkovModel.__instance_generators:
try:
p = _validate_transition_matrix(p)
e = _validate_emission_matrix(e, p.shape[1])
states = _create_labels(p.shape[1], 'P') if states is None else _validate_labels_input(states, p.shape[1])
symbols = _create_labels(e.shape[1], 'E') if symbols is None else _validate_labels_input(symbols, e.shape[1])
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
if len(list(set(states) & set(symbols))) > 0: # pragma: no cover
raise _ValidationError('State names and symbol names must be different.')
self.__digraph: _tgraph = _build_hmm_graph(p, e, states, symbols)
self.__e: _tarray = e
self.__p: _tarray = p
self.__size: _tpair_int = (p.shape[1], e.shape[1])
self.__states: _tlist_str = states
self.__symbols: _tlist_str = symbols
def __eq__(self, other) -> bool:
if isinstance(other, HiddenMarkovModel):
return _np.array_equal(self.p, other.p) and _np.array_equal(self.e, other.e) and self.states == other.states and self.symbols == other.symbols
return False
def __hash__(self) -> int:
return hash((self.p.tobytes(), self.e.tobytes(), tuple(self.states), tuple(self.symbols)))
def __repr__(self) -> str:
return self.__class__.__name__
# noinspection PyListCreation
def __str__(self) -> str:
lines = ['']
lines.append('HIDDEN MARKOV MODEL')
lines.append(f' STATES: {self.n:d}')
lines.append(f' SYMBOLS: {self.k:d}')
lines.append(f' ERGODIC: {("YES" if self.is_ergodic else "NO")}')
lines.append(f' REGULAR: {("YES" if self.is_regular else "NO")}')
lines.append('')
value = '\n'.join(lines)
return value
@property
def e(self) -> _tarray:
"""
A property representing the emission matrix of the hidden Markov model.
"""
return _np.copy(self.__e)
[docs] @_cached_property
def is_ergodic(self) -> bool:
"""
A property indicating whether the hidden Markov model is ergodic.
"""
mc = _MarkovChain(self.__p, self.__states)
result = mc.is_ergodic and _np.all(self.__e > 0.0)
return result
[docs] @_cached_property
def is_regular(self) -> bool:
"""
A property indicating whether the hidden Markov model is regular.
"""
result = _npl.matrix_rank(self.__e) == self.__size[1]
return result
@property
def k(self) -> int:
"""
A property representing the size of the hidden Markov model symbol space.
"""
return self.__size[1]
@property
def n(self) -> int:
"""
A property representing the size of the hidden Markov model state space.
"""
return self.__size[0]
@property
def p(self) -> _tarray:
"""
A property representing the transition matrix of the hidden Markov model.
"""
return _np.copy(self.__p)
@property
def size(self) -> _tpair_int:
"""
| A property representing the size of the hidden Markov model.
| The first value represents the number of states, the second value represents the number of symbols.
"""
return self.__size
@property
def states(self) -> _tlist_str:
"""
A property representing the states of the hidden Markov model.
"""
return self.__states
@property
def symbols(self) -> _tlist_str:
"""
A property representing the symbols of the hidden Markov model.
"""
return self.__symbols
[docs] def decode(self, symbols: _tsequence, initial_status: _ostatus = None, use_scaling: bool = True) -> _ohmm_decoding:
"""
The method calculates the log probability, the posterior probabilities, the backward probabilities and the forward probabilities of an observed sequence of symbols.
| **Notes:**
- If the observed sequence of symbols cannot be decoded, then :py:class:`None` is returned.
:param symbols: the observed sequence of symbols.
:param initial_status: the initial state or the initial distribution of the states (*if omitted, the states are assumed to be uniformly distributed*).
:param use_scaling: a boolean indicating whether to return scaled backward and forward probabilities together with their scaling factors.
:raises ValidationError: if any input argument is not compliant.
"""
try:
symbols = _validate_sequence(symbols, self.__symbols)
initial_status = _np.full(self.__size[0], 1.0 / self.__size[0], dtype=float) if initial_status is None else _validate_status(initial_status, self.__states)
use_scaling = _validate_boolean(use_scaling)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = _decode(self.__p, self.__e, initial_status, symbols, use_scaling)
return value
[docs] def emission_probability(self, symbol: _tstate, state: _tstate) -> float:
"""
The method computes the probability of a given symbol, conditioned on the process being at a given state.
:param symbol: the target symbol.
:param state: the origin state.
:raises ValidationError: if any input argument is not compliant.
"""
try:
symbol = _validate_label(symbol, self.__symbols)
state = _validate_label(state, self.__states)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = self.__e[state, symbol]
return value
[docs] @_object_mark(random_output=True)
def next(self, initial_state: _tstate, target: str = 'both', output_index: bool = False, seed: _oint = None) -> _thmm_step:
"""
The method simulates a single step in a random walk.
:param initial_state: the initial state.
:param target:
- **state** for a random state;
- **symbol** for a random symbol;
- **both** for a random state and a random symbol.
:param output_index: a boolean indicating whether to output the state index.
:param seed: a seed to be used as RNG initializer for reproducibility purposes.
:raises ValidationError: if any input argument is not compliant.
"""
try:
rng = _create_rng(seed)
target = _validate_enumerator(target, ['both', 'state', 'symbol'])
initial_state = _validate_label(initial_state, self.__states)
output_index = _validate_boolean(output_index)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
simulation = _simulate(self, 1, initial_state, None, None, rng)
if target == 'state':
value = simulation[0][-1] if output_index else self.__states[simulation[0][-1]]
elif target == 'symbol':
value = simulation[1][-1] if output_index else self.__symbols[simulation[1][-1]]
else:
v0 = simulation[0][-1] if output_index else self.__states[simulation[0][-1]]
v1 = simulation[1][-1] if output_index else self.__symbols[simulation[1][-1]]
value = (v0, v1)
return value
[docs] def predict(self, prediction_type: str, symbols: _tsequence, initial_status: _ostatus = None, output_indices: bool = False) -> _thmm_prediction:
"""
The method calculates the log probability and the most probable states path of an observed sequence of symbols.
| **Notes:**
- If the maximum a posteriori prediction is used and the observed sequence of symbols cannot be decoded, then :py:class:`None` is returned.
- If the maximum likelihood prediction is used and the observed sequence of symbols produces null transition probabilities, then :py:class:`None` is returned.
:param prediction_type:
- **map** for the maximum a posteriori prediction;
- **mle** or **viterbi** for the maximum likelihood prediction.
:param symbols: the observed sequence of symbols.
:param initial_status: the initial state or the initial distribution of the states (*if omitted, the states are assumed to be uniformly distributed*).
:param output_indices: a boolean indicating whether to output the state indices.
:raises ValidationError: if any input argument is not compliant.
"""
try:
prediction_type = _validate_enumerator(prediction_type, ['map', 'mle', 'viterbi'])
symbols = _validate_sequence(symbols, self.__symbols)
initial_status = _np.full(self.__size[0], 1.0 / self.__size[0], dtype=float) if initial_status is None else _validate_status(initial_status, self.__states)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = _predict(prediction_type, self.__p, self.__e, initial_status, symbols)
if value is not None and not output_indices:
value = (value[0], [*map(self.__states.__getitem__, value[1])])
return value
[docs] @_object_mark(instance_generator=True)
def restrict(self, states: _ostates = None, symbols: _ostates = None) -> _thmm:
"""
The method returns a submodel restricted to the given states and symbols.
| **Notes:**
- Submodel transition and emission matrices are normalized so that their rows sum to 1.0.
- Submodel transition and emission matrices whose rows sum to 0.0 are replaced by uniformly distributed probabilities.
:param states: the states to include in the submodel.
:param symbols: the symbols to include in the submodel.
:raises ValidationError: if any input argument is not compliant.
"""
if states is None and symbols is None:
raise _ValidationError('Either submodel states or submodel symbols must be defined.')
try:
states = list(range(self.__size[0])) if states is None else _validate_labels_current(states, self.__states, True, 2)
symbols = list(range(self.__size[1])) if symbols is None else _validate_labels_current(symbols, self.__symbols, True, 2)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
p, e, states_out, symbols_out, _ = _restrict(self.__p, self.__e, self.__states, self.__symbols, states, symbols)
hmm = HiddenMarkovModel(p, e, states_out, symbols_out)
return hmm
[docs] @_object_mark(random_output=True)
def simulate(self, steps: int, initial_state: _ostate = None, final_state: _ostate = None, final_symbol: _ostate = None, output_indices: bool = False, seed: _oint = None) -> _thmm_sequence_ext:
"""
The method simulates a random sequence of states and symbols of the given number of steps.
:param steps: the number of steps.
:param initial_state: the initial state (*if omitted, it is chosen uniformly at random*).
:param final_state: the final state of the simulation (*if specified, the simulation stops as soon as it is reached even if not all the steps have been performed*).
:param final_symbol: the final state of the simulation (*if specified, the simulation stops as soon as it is reached even if not all the steps have been performed*).
:param output_indices: a boolean indicating whether to output the state indices.
:param seed: a seed to be used as RNG initializer for reproducibility purposes.
:raises ValidationError: if any input argument is not compliant.
"""
try:
rng = _create_rng(seed)
steps = _validate_integer(steps, lower_limit=(2, False))
initial_state = rng.randint(0, self.__size[0]) if initial_state is None else _validate_label(initial_state, self.__states)
final_state = None if final_state is None else _validate_label(final_state, self.__states)
final_symbol = None if final_symbol is None else _validate_label(final_symbol, self.__symbols)
output_indices = _validate_boolean(output_indices)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = _simulate(self, steps, initial_state, final_state, final_symbol, rng)
if not output_indices:
v0 = [*map(self.__states.__getitem__, value[0])]
v1 = [*map(self.__symbols.__getitem__, value[1])]
value = (v0, v1)
return value
[docs] def to_dictionary(self) -> _thmm_dict:
"""
The method returns a dictionary representing the hidden Markov model.
"""
n, k = self.__size
d = {}
for i in range(n):
state = self.__states[i]
for j in range(n):
d[('P', state, self.__states[j])] = self.__p[i, j]
for j in range(k):
d[('E', state, self.__symbols[j])] = self.__e[i, j]
return d
[docs] def to_file(self, file_path: _tpath):
"""
The method writes a hidden Markov model to the given file.
| Only **csv**, **json**, **txt** and **xml** files are supported; data format is inferred from the file extension.
:param file_path: the location of the file in which the hidden Markov model must be written.
:raises OSError: if the file cannot be written.
:raises ValidationError: if any input argument is not compliant.
"""
try:
file_path, file_extension = _validate_file_path(file_path, ['.csv', '.json', '.xml', '.txt'], True)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
d = self.to_dictionary()
if file_extension == '.csv':
_write_csv(False, d, file_path)
elif file_extension == '.json':
_write_json(False, d, file_path)
elif file_extension == '.txt':
_write_txt(d, file_path)
else:
_write_xml(False, d, file_path)
[docs] def to_graph(self) -> _tgraph:
"""
The method returns a directed graph representing the hidden Markov model.
"""
graph = _cp.deepcopy(self.__digraph)
return graph
[docs] def to_matrices(self) -> _tpair_array:
"""
The method returns a tuple of two items representing the underlying matrices of the hidden Markov model.
| The first item is the transition matrix and the second item is the emission matrix.
"""
m = (_np.copy(self.__p), _np.copy(self.__e))
return m
[docs] def transition_probability(self, state_target: _tstate, state_origin: _tstate) -> float:
"""
The method computes the probability of a given state, conditioned on the process being at a given state.
:param state_target: the target state.
:param state_origin: the origin state.
:raises ValidationError: if any input argument is not compliant.
"""
try:
state_target = _validate_label(state_target, self.__states)
state_origin = _validate_label(state_origin, self.__states)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = self.__p[state_origin, state_target]
return value
[docs] @staticmethod
@_object_mark(instance_generator=True)
def estimate(possible_states: _tlist_str, possible_symbols: _tlist_str, sequence_states: _tsequence, sequence_symbols: _tsequence) -> _thmm:
"""
The method performs the maximum likelihood estimation of transition and emission probabilities from an observed sequence of states and symbols.
:param possible_states: the possible states of the model.
:param possible_symbols: the possible symbols of the model.
:param sequence_states: the observed sequence of states.
:param sequence_symbols: the observed sequence of symbols.
:raises ValidationError: if any input argument is not compliant.
"""
try:
possible_states = _validate_labels_input(possible_states)
possible_symbols = _validate_labels_input(possible_symbols)
sequence_states = _validate_sequence(sequence_states, possible_states)
sequence_symbols = _validate_sequence(sequence_symbols, possible_symbols)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
if len(list(set(possible_states) & set(possible_symbols))) > 0: # pragma: no cover
raise _ValidationError('State names and symbol names must be different.')
if len(sequence_states) != len(sequence_symbols):
raise ValueError('The observed sequence of states and the observed sequence of symbols must have the same length.')
p, e = _estimate(len(possible_states), len(possible_symbols), sequence_states, sequence_symbols, True)
hmm = HiddenMarkovModel(p, e, possible_states, possible_symbols)
return hmm
[docs] @staticmethod
@_object_mark(instance_generator=True)
def fit(fitting_type: str, possible_states: _tlist_str, possible_symbols: _tlist_str, p_guess: _tarray, e_guess: _tarray, symbols: _thmm_symbols_ext, initial_status: _ostatus = None) -> _thmm:
"""
The method fits a hidden Markov model from an initial guess and one or more observed sequences of symbols.
:param fitting_type:
- **baum-welch** for the Baum-Welch fitting;
- **map** for the maximum a posteriori fitting;
- **mle** or **viterbi** for the maximum likelihood fitting.
:param possible_states: the possible states of the model.
:param possible_symbols: the possible symbols of the model.
:param p_guess: the initial transition matrix guess.
:param e_guess: the initial emission matrix guess.
:param symbols: the observed sequence(s) of symbols.
:param initial_status: the initial state or the initial distribution of the states (*if omitted, the states are assumed to be uniformly distributed*).
:raises ValidationError: if any input argument is not compliant.
:raises ValueError: if the fitting algorithm fails to converge.
"""
try:
fitting_type = _validate_enumerator(fitting_type, ['baum-welch', 'map', 'mle', 'viterbi'])
possible_states = _validate_labels_input(possible_states)
possible_symbols = _validate_labels_input(possible_symbols)
p_guess = _validate_transition_matrix(p_guess, len(possible_states))
e_guess = _validate_emission_matrix(e_guess, p_guess.shape[1])
symbols = _validate_sequences(symbols, possible_symbols, True)
initial_status = _np.full(len(possible_states), 1.0 / len(possible_states), dtype=float) if initial_status is None else _validate_status(initial_status, possible_states)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
if len(list(set(possible_states) & set(possible_symbols))) > 0: # pragma: no cover
raise _ValidationError('State names and symbol names must be different.')
p, e, error_message = _fit(fitting_type, p_guess, e_guess, initial_status, symbols)
if error_message is not None: # pragma: no cover
raise ValueError(error_message)
hmm = HiddenMarkovModel(p, e, possible_states, possible_symbols)
return hmm
# noinspection DuplicatedCode
[docs] @staticmethod
@_object_mark(instance_generator=True)
def from_dictionary(d: _thmm_dict_flex) -> _thmm:
"""
The method generates a hidden Markov model from the given dictionary, whose keys represent state pairs and whose values represent transition probabilities.
:param d: the dictionary to transform into the transition matrix.
:raises ValidationError: if any input argument is not compliant.
:raises ValueError: if the transition matrix defined by the dictionary is not valid.
"""
try:
d = _validate_dictionary(d, ['P', 'E'])
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
states = [key[1] for key in d.keys() if key[0] == 'P' and key[1] == key[2]]
n = len(states)
if n < 2: # pragma: no cover
raise ValueError('The size of the transition matrix defined by the dictionary must be greater than or equal to 2.')
symbols = [key[2] for key in d.keys() if key[0] == 'E' and key[1] == states[0]]
k = len(symbols)
if k < 2: # pragma: no cover
raise ValueError('The size of the emission matrix defined by the dictionary must be greater than or equal to 2.')
p, e = _np.zeros((n, n), dtype=float), _np.zeros((n, k), dtype=float)
for (reference, element_from, element_to), probability in d.items():
if reference == 'E':
e[states.index(element_from), symbols.index(element_to)] = probability
else:
p[states.index(element_from), states.index(element_to)] = probability
if not _np.allclose(_np.sum(p, axis=1), _np.ones(n, dtype=float)): # pragma: no cover
raise ValueError('The rows of the transition matrix defined by the dictionary must sum to 1.0.')
if not _np.allclose(_np.sum(e, axis=1), _np.ones(n, dtype=float)): # pragma: no cover
raise ValueError('The rows of the emission matrix defined by the dictionary must sum to 1.0.')
hmm = HiddenMarkovModel(p, e, states, symbols)
return hmm
# noinspection DuplicatedCode
[docs] @staticmethod
@_object_mark(instance_generator=True)
def from_file(file_path: _tpath) -> _thmm:
r"""
The method reads a hidden Markov model from the given file.
| Only **csv**, **json**, **txt** and **xml** files are supported; data format is inferred from the file extension.
| Transition probabilities are associated to reference attribute "P", emission probabilities are associated to reference attribute "E".
| In **csv** files, data must be structured as follows:
- *Delimiter:* **comma**
- *Quoting:* **minimal**
- *Quote Character:* **double quote**
- *Header Row:* state names (prefixed with "P\_") and symbol names (prefixed with "E\_")
- *Data Rows:* **probabilities**
| In **json** files, data must be structured as an array of objects with the following properties:
- **reference** *(string)*
- **element_from** *(string)*
- **element_to** *(string)*
- **probability** *(float or int)*
| In **txt** files, every line of the file must have the following format:
- **<reference> <element_from> <element_to> <probability>**
| In **xml** files, the structure must be defined as follows:
- *Root Element:* **HiddenMarkovModel**
- *Child Elements:* **Item**\ *, with attributes:*
- **reference** *(string)*
- **element_from** *(string)*
- **element_to** *(string)*
- **probability** *(float or int)*
:param file_path: the location of the file that defines the hidden Markov model.
:raises FileNotFoundError: if the file does not exist.
:raises OSError: if the file cannot be read or is empty.
:raises ValidationError: if any input argument is not compliant.
:raises ValueError: if the file contains invalid data.
"""
try:
file_path, file_extension = _validate_file_path(file_path, ['.csv', '.json', '.xml', '.txt'], False)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
if file_extension == '.csv':
d = _read_csv(False, file_path)
elif file_extension == '.json':
d = _read_json(False, file_path)
elif file_extension == '.txt':
d = _read_txt(False, file_path)
else:
d = _read_xml(False, file_path)
states = [key[1] for key in d if key[0] == 'P' and key[1] == key[2]]
n = len(states)
if n < 2: # pragma: no cover
raise ValueError('The size of the transition matrix defined by the dictionary must be greater than or equal to 2.')
symbols = [key[2] for key in d if key[0] == 'E' and key[1] == states[0]]
k = len(symbols)
if k < 2: # pragma: no cover
raise ValueError('The size of the emission matrix defined by the dictionary must be greater than or equal to 2.')
p, e = _np.zeros((n, n), dtype=float), _np.zeros((n, k), dtype=float)
for (reference, element_from, element_to), probability in d.items():
if reference == 'E':
e[states.index(element_from), symbols.index(element_to)] = probability
else:
p[states.index(element_from), states.index(element_to)] = probability
if not _np.allclose(_np.sum(p, axis=1), _np.ones(n, dtype=float)): # pragma: no cover
raise ValueError('The rows of the transition matrix defined by the dictionary must sum to 1.0.')
if not _np.allclose(_np.sum(e, axis=1), _np.ones(n, dtype=float)): # pragma: no cover
raise ValueError('The rows of the emission matrix defined by the dictionary must sum to 1.0.')
hmm = HiddenMarkovModel(p, e, states, symbols)
return hmm
# noinspection DuplicatedCode
[docs] @staticmethod
@_object_mark(instance_generator=True)
def from_graph(graph: _tgraphs) -> _thmm:
"""
The method generates a hidden Markov model from the given directed graph, whose transition and emission matrices are obtained through the normalization of edge weights.
:raises ValidationError: if any input argument is not compliant.
"""
try:
graph = _validate_graph(graph, 2, [('type', ('E', 'P'))])
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
nodes = graph.nodes(data='layer', default=-1)
states = [node[0] for node in nodes if node[1] == 1]
symbols = [node[0] for node in nodes if node[1] == 0]
n, k = len(states), len(symbols)
p, e = _np.zeros((n, n), dtype=float), _np.zeros((n, k), dtype=float)
edge_types = graph.edges(data='type', default='')
edge_weights = graph.edges(data='weight', default=0.0)
for edge in graph.edges:
edge_type = [edge_type[2] for edge_type in edge_types if edge_type[0] == edge[0] and edge_type[1] == edge[1]][0]
edge_weight = [edge_weight[2] for edge_weight in edge_weights if edge_weight[0] == edge[0] and edge_weight[1] == edge[1]][0]
i = states.index(edge[0])
if edge_type == 'E':
j = symbols.index(edge[1])
e[i, j] = float(edge_weight)
else:
j = states.index(edge[1])
p[i, j] = float(edge_weight)
p_sums, e_sums = _np.sum(p, axis=1), _np.sum(e, axis=1)
for i in range(n):
p_sums_i = p_sums[i]
if _np.isclose(p_sums_i, 0.0): # pragma: no cover
p[i, :] = _np.ones(n, dtype=float) / n
else:
p[i, :] /= p_sums_i
e_sums_i = e_sums[i]
if _np.isclose(e_sums_i, 0.0): # pragma: no cover
e[i, :] = _np.ones(k, dtype=float) / k
else:
e[i, :] /= e_sums_i
hmm = HiddenMarkovModel(p, e, states, symbols)
return hmm
# noinspection DuplicatedCode
[docs] @staticmethod
@_object_mark(instance_generator=True)
def from_matrices(mp: _tnumeric, me: _tnumeric, states: _olist_str = None, symbols: _olist_str = None) -> _thmm:
"""
The method generates a hidden Markov model whose transition and emission matrices are obtained through the normalization of the given matrices.
:param mp: the matrix to transform into the transition matrix.
:param me: the matrix to transform into the emission matrix.
:param states: the name of each state (*if omitted, an increasing sequence of integers starting at 1 with prefix P*).
:param symbols: the name of each symbol (*if omitted, an increasing sequence of integers starting at 1 with prefix E*).
:raises ValidationError: if any input argument is not compliant.
"""
try:
mp = _validate_matrix(mp)
me = _validate_matrix(me, rows=mp.shape[1])
states = _create_labels(mp.shape[1], 'P') if states is None else _validate_labels_input(states, mp.shape[1])
symbols = _create_labels(me.shape[1], 'E') if symbols is None else _validate_labels_input(symbols, me.shape[1])
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
if len(list(set(states) & set(symbols))) > 0: # pragma: no cover
raise _ValidationError('State names and symbol names must be different.')
n, k = mp.shape[0], me.shape[1]
p, e = _np.copy(mp), _np.copy(me)
p_sums, e_sums = _np.sum(p, axis=1), _np.sum(e, axis=1)
for i in range(n):
p_sums_i = p_sums[i]
if _np.isclose(p_sums_i, 0.0): # pragma: no cover
p[i, :] = _np.ones(n, dtype=float) / n
else:
p[i, :] /= p_sums_i
e_sums_i = e_sums[i]
if _np.isclose(e_sums_i, 0.0): # pragma: no cover
e[i, :] = _np.ones(k, dtype=float) / k
else:
e[i, :] /= e_sums_i
hmm = HiddenMarkovModel(p, e, states, symbols)
return hmm
[docs] @staticmethod
@_object_mark(instance_generator=True, random_output=True)
def random(n: int, k: int, states: _olist_str = None, p_zeros: int = 0, p_mask: _onumeric = None, symbols: _olist_str = None, e_zeros: int = 0, e_mask: _onumeric = None, seed: _oint = None) -> _thmm:
"""
The method generates a Markov chain of given size with random transition probabilities.
| **Notes:**
- In the mask parameter, undefined transition probabilities are represented by *NaN* values.
:param n: the number of states.
:param k: the number of symbols.
:param states: the name of each state (*if omitted, an increasing sequence of integers starting at 1 with prefix P*).
:param p_zeros: the number of null transition probabilities.
:param p_mask: a matrix representing locations and values of fixed transition probabilities.
:param symbols: the name of each symbol (*if omitted, an increasing sequence of integers starting at 1 with prefix E*).
:param e_zeros: the number of null emission probabilities.
:param e_mask: a matrix representing locations and values of fixed emission probabilities.
:param seed: a seed to be used as RNG initializer for reproducibility purposes.
:raises ValidationError: if any input argument is not compliant.
"""
try:
rng = _create_rng(seed)
n = _validate_integer(n, lower_limit=(2, False))
k = _validate_integer(k, lower_limit=(2, False))
if states is not None:
states = _validate_labels_input(states, n)
p_zeros = _validate_integer(p_zeros, lower_limit=(0, False))
p_mask = _np.full((n, n), _np.nan, dtype=float) if p_mask is None else _validate_mask(p_mask, n, n)
if symbols is not None:
symbols = _validate_labels_input(symbols, k)
e_zeros = _validate_integer(e_zeros, lower_limit=(0, False))
e_mask = _np.full((n, k), _np.nan, dtype=float) if e_mask is None else _validate_mask(e_mask, n, k)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
p, e, states_out, symbols_out, error_message = _random(rng, n, k, p_zeros, p_mask, e_zeros, e_mask)
if error_message is not None: # pragma: no cover
raise _ValidationError(error_message)
states = states_out if states is None else states
symbols = symbols_out if symbols is None else symbols
if len(list(set(states) & set(symbols))) > 0: # pragma: no cover
raise _ValidationError('State names and symbol names must be different.')
hmm = HiddenMarkovModel(p, e, states, symbols)
return hmm