# -*- coding: utf-8 -*-
__all__ = [
'MarkovChain'
]
###########
# IMPORTS #
###########
# Standard
import copy as _cp
import inspect as _ins
import itertools as _it
import math as _mt
# Libraries
import networkx as _nx
import numpy as _np
import numpy.linalg as _npl
# Internal
from .base_classes import (
Model as _Model
)
from .computations import (
calculate_periods as _calculate_periods,
eigenvalues_sorted as _eigenvalues_sorted,
find_cyclic_classes as _find_cyclic_classes,
find_lumping_partitions as _find_lumping_partitions,
gth_solve as _gth_solve,
rdl_decomposition as _rdl_decomposition,
slem as _slem
)
from .custom_types import (
oarray as _oarray,
ofloat as _ofloat,
oint as _oint,
ointerval as _ointerval,
olist_str as _olist_str,
onumeric as _onumeric,
osequence as _osequence,
ostate as _ostate,
ostates as _ostates,
ostatus as _ostatus,
otimes_out as _otimes_out,
tany as _tany,
tarray as _tarray,
tbcond as _tbcond,
tcache as _tcache,
tgraph as _tgraph,
tgraphs as _tgraphs,
tlist_array as _tlist_array,
tlist_int as _tlist_int,
tlist_str as _tlist_str,
tlists_int as _tlists_int,
tlists_str as _tlists_str,
tmc as _tmc,
tmc_dict as _tmc_dict,
tmc_dict_flex as _tmc_dict_flex,
tnumeric as _tnumeric,
tpartition as _tpartition,
tpartitions as _tpartitions,
tpath as _tpath,
trandfunc_flex as _trandfunc_flex,
trdl as _trdl,
tredists as _tredists,
tsequence as _tsequence,
tstate as _tstate,
tstates as _tstates,
ttfunc as _ttfunc,
ttimes_in as _ttimes_in,
tweights as _tweights
)
from .decorators import (
aliased as _aliased,
cached_property as _cached_property,
object_mark as _object_mark
)
from .exceptions import (
ValidationError as _ValidationError
)
from .files_io import (
read_csv as _read_csv,
read_json as _read_json,
read_txt as _read_txt,
read_xml as _read_xml,
write_csv as _write_csv,
write_json as _write_json,
write_txt as _write_txt,
write_xml as _write_xml
)
from .fitting import (
mc_fit_function as _fit_function,
mc_fit_sequence as _fit_sequence
)
from .generators import (
mc_aggregate_spectral_bottom_up as _aggregate_spectral_bottom_up,
mc_aggregate_spectral_top_down as _aggregate_spectral_top_down,
mc_approximation as _approximation,
mc_birth_death as _birth_death,
mc_bounded as _bounded,
mc_canonical as _canonical,
mc_closest_reversible as _closest_reversible,
mc_dirichlet_process as _dirichlet_process,
mc_gamblers_ruin as _gamblers_ruin,
mc_lazy as _lazy,
mc_lump as _lump,
mc_population_genetics_model as _population_genetics_model,
mc_random as _random,
mc_sub as _sub,
mc_urn_model as _urn_model
)
from .measures import (
mc_absorption_probabilities as _absorption_probabilities,
mc_committor_probabilities as _committor_probabilities,
mc_expected_rewards as _expected_rewards,
mc_expected_transitions as _expected_transitions,
mc_first_passage_reward as _first_passage_reward,
mc_first_passage_probabilities as _first_passage_probabilities,
mc_hitting_probabilities as _hitting_probabilities,
mc_hitting_times as _hitting_times,
mc_mean_absorption_times as _mean_absorption_times,
mc_mean_first_passage_times_between as _mean_first_passage_times_between,
mc_mean_first_passage_times_to as _mean_first_passage_times_to,
mc_mean_number_visits as _mean_number_visits,
mc_mean_recurrence_times as _mean_recurrence_times,
mc_mixing_time as _mixing_time,
mc_sensitivity as _sensitivity,
mc_time_correlations as _time_correlations,
mc_time_relaxations as _time_relaxations
)
from .simulations import (
mc_predict as _predict,
mc_redistribute as _redistribute,
mc_sequence_probability as _sequence_probability,
mc_simulate as _simulate
)
from .utilities import (
build_mc_graph as _build_mc_graph,
create_labels as _create_labels,
create_rng as _create_rng,
create_validation_error as _create_validation_error,
get_caller as _get_caller,
get_instance_generators as _get_instance_generators,
get_numpy_random_distributions as _get_numpy_random_distributions
)
from .validation import (
validate_boolean as _validate_boolean,
validate_boundary_condition as _validate_boundary_condition,
validate_dictionary as _validate_dictionary,
validate_enumerator as _validate_enumerator,
validate_file_path as _validate_file_path,
validate_float as _validate_float,
validate_graph as _validate_graph,
validate_hyperparameter as _validate_hyperparameter,
validate_integer as _validate_integer,
validate_interval as _validate_interval,
validate_label as _validate_label,
validate_labels_current as _validate_labels_current,
validate_labels_input as _validate_labels_input,
validate_mask as _validate_mask,
validate_markov_chain as _validate_markov_chain,
validate_matrix as _validate_matrix,
validate_partitions as _validate_partitions,
validate_random_distribution as _validate_random_distribution,
validate_rewards as _validate_rewards,
validate_sequence as _validate_sequence,
validate_status as _validate_status,
validate_time_points as _validate_time_points,
validate_transition_function as _validate_transition_function,
validate_transition_matrix as _validate_transition_matrix,
validate_vector as _validate_vector
)
###########
# CLASSES #
###########
[docs]@_aliased
class MarkovChain(_Model):
"""
Defines a Markov chain with the given transition matrix.
:param p: the transition matrix.
:param states: the name of each state (*if omitted, an increasing sequence of integers starting at 1*).
:raises ValidationError: if any input argument is not compliant.
"""
__instance_generators: _olist_str = None
__random_distributions: _olist_str = None
def __init__(self, p: _tnumeric, states: _olist_str = None):
if MarkovChain.__instance_generators is None:
MarkovChain.__instance_generators = _get_instance_generators(self.__class__)
caller = _get_caller(_ins.stack())
if caller not in MarkovChain.__instance_generators:
try:
p = _validate_transition_matrix(p)
states = _create_labels(p.shape[1]) if states is None else _validate_labels_input(states, p.shape[1])
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
self.__cache: _tcache = {}
self.__digraph: _tgraph = _build_mc_graph(p, states)
self.__p: _tarray = p
self.__size: int = p.shape[1]
self.__states: _tlist_str = states
def __eq__(self, other) -> bool:
if isinstance(other, MarkovChain):
return _np.array_equal(self.p, other.p) and self.states == other.states
return False
def __hash__(self) -> int:
return hash((self.p.tobytes(), tuple(self.states)))
def __repr__(self) -> str:
return self.__class__.__name__
# noinspection PyListCreation
def __str__(self) -> str:
lines = ['']
lines.append('DISCRETE-TIME MARKOV CHAIN')
lines.append(f' SIZE: {self.size:d}')
lines.append(f' RANK: {self.rank:d}')
lines.append(f' CLASSES: {len(self.communicating_classes):d}')
lines.append(f' > RECURRENT: {len(self.recurrent_classes):d}')
lines.append(f' > TRANSIENT: {len(self.transient_classes):d}')
lines.append(f' ERGODIC: {("YES" if self.is_ergodic else "NO")}')
lines.append(f' > APERIODIC: {("YES" if self.is_aperiodic else "NO (" + str(self.period) + ")")}')
lines.append(f' > IRREDUCIBLE: {("YES" if self.is_irreducible else "NO")}')
lines.append(f' ABSORBING: {("YES" if self.is_absorbing else "NO")}')
lines.append(f' MONOTONE: {("YES" if self.is_stochastically_monotone else "NO")}')
lines.append(f' REGULAR: {("YES" if self.is_regular else "NO")}')
lines.append(f' REVERSIBLE: {("YES" if self.is_reversible else "NO")}')
lines.append(f' SYMMETRIC: {("YES" if self.is_symmetric else "NO")}')
lines.append('')
value = '\n'.join(lines)
return value
@_cached_property
def __absorbing_states_indices(self) -> _tlist_int:
indices = [index for index in range(self.__size) if _np.isclose(self.__p[index, index], 1.0)]
return indices
@_cached_property
def __classes_indices(self) -> _tlists_int:
indices = [sorted([self.__states.index(c) for c in scc]) for scc in _nx.strongly_connected_components(self.__digraph)]
return indices
@_cached_property
def __communicating_classes_indices(self) -> _tlists_int:
indices = sorted(self.__classes_indices, key=lambda x: (-len(x), x[0]))
return indices
@_cached_property
def __cyclic_classes_indices(self) -> _tlists_int:
if not self.is_irreducible:
indices = []
elif self.is_aperiodic:
indices = self.__communicating_classes_indices.copy()
else:
indices = _find_cyclic_classes(self.__p)
indices = sorted(indices, key=lambda x: (-len(x), x[0]))
return indices
@_cached_property
def __cyclic_states_indices(self) -> _tlist_int:
indices = sorted(_it.chain.from_iterable(self.__cyclic_classes_indices))
return indices
@_cached_property
def __eigenvalues_sorted(self) -> _tarray:
ev = _eigenvalues_sorted(self.__p)
return ev
@_cached_property
def __rdl_decomposition(self) -> _trdl:
r, d, l = _rdl_decomposition(self.__p) # noqa: E741
return r, d, l
@_cached_property
def __recurrent_classes_indices(self) -> _tlists_int:
indices = [index for index in self.__classes_indices if index not in self.__transient_classes_indices]
indices = sorted(indices, key=lambda x: (-len(x), x[0]))
return indices
@_cached_property
def __recurrent_states_indices(self) -> _tlist_int:
indices = sorted(_it.chain.from_iterable(self.__recurrent_classes_indices))
return indices
@_cached_property
def __slem(self) -> _ofloat:
if not self.is_ergodic:
value = None
else:
value = _slem(self.__p)
return value
@_cached_property
def __states_indices(self) -> _tlist_int:
indices = list(range(self.__size))
return indices
@_cached_property
def __transient_classes_indices(self) -> _tlists_int:
edges = {edge1 for (edge1, edge2) in _nx.condensation(self.__digraph).edges}
indices = [self.__classes_indices[edge] for edge in edges]
indices = sorted(indices, key=lambda x: (-len(x), x[0]))
return indices
@_cached_property
def __transient_states_indices(self) -> _tlist_int:
indices = sorted(_it.chain.from_iterable(self.__transient_classes_indices))
return indices
[docs] @_cached_property
def absorbing_states(self) -> _tlists_str:
"""
A property representing the absorbing states of the Markov chain.
"""
states = [*map(self.__states.__getitem__, self.__absorbing_states_indices)]
return states
[docs] @_cached_property
def accessibility_matrix(self) -> _tarray:
"""
A property representing the accessibility matrix of the Markov chain.
"""
a = self.adjacency_matrix
i = _np.eye(self.__size, dtype=int)
am = _npl.matrix_power(i + a, self.__size - 1)
am = (am > 0).astype(int)
return am
[docs] @_cached_property
def adjacency_matrix(self) -> _tarray:
"""
A property representing the adjacency matrix of the Markov chain.
"""
am = (self.__p > 0.0).astype(int)
return am
[docs] @_cached_property
def communicating_classes(self) -> _tlists_str:
"""
A property representing the communicating classes of the Markov chain.
"""
classes = [[*map(self.__states.__getitem__, i)] for i in self.__communicating_classes_indices]
return classes
[docs] @_cached_property
def communication_matrix(self) -> _tarray:
"""
A property representing the communication matrix of the Markov chain.
"""
cm = _np.zeros((self.__size, self.__size), dtype=int)
for index in self.__communicating_classes_indices:
cm[_np.ix_(index, index)] = 1
return cm
[docs] @_cached_property
def cyclic_classes(self) -> _tlists_str:
"""
A property representing the cyclic classes of the Markov chain.
"""
classes = [[*map(self.__states.__getitem__, i)] for i in self.__cyclic_classes_indices]
return classes
[docs] @_cached_property
def cyclic_states(self) -> _tlists_str:
"""
A property representing the cyclic states of the Markov chain.
"""
states = [*map(self.__states.__getitem__, self.__cyclic_states_indices)]
return states
[docs] @_cached_property
def density(self) -> float:
"""
A property representing the density of the transition matrix of the Markov chain.
"""
am = _np.copy(self.adjacency_matrix)
am[_np.diag_indices_from(am)] = 0
e = float(_np.sum(am))
n = float(self.__size)
d = e / (n * (n - 1.0))
return d
[docs] @_cached_property
def determinant(self) -> float:
"""
A property representing the determinant of the transition matrix of the Markov chain.
"""
d = _npl.det(self.__p)
return d
[docs] @_cached_property
def entropy_rate(self) -> _ofloat:
"""
| A property representing the entropy rate of the Markov chain.
| If the Markov chain has multiple stationary distributions, then :py:class:`None` is returned.
"""
if len(self.pi) > 1:
h = None
else:
pi = self.pi[0]
h = 0.0
for i in range(self.__size):
pi_i = pi[i]
for j in range(self.__size):
if self.__p[i, j] > 0.0:
h += pi_i * self.__p[i, j] * _np.log(self.__p[i, j])
h = h if _np.isclose(h, 0.0) else -h
return h
[docs] @_cached_property
def entropy_rate_normalized(self) -> _ofloat:
"""
| A property representing the entropy rate, normalized between 0 and 1, of the Markov chain.
| If the Markov chain has multiple stationary distributions, then :py:class:`None` is returned.
"""
h = self.entropy_rate
if h is None:
hn = None
elif _np.isclose(h, 0.0):
hn = 0.0
else:
ev = _eigenvalues_sorted(self.adjacency_matrix)
hn = h / _np.log(ev[-1])
hn = min(1.0, max(0.0, hn))
return hn
[docs] @_cached_property
def fundamental_matrix(self) -> _oarray:
"""
| A property representing the fundamental matrix of the Markov chain.
| If the Markov chain is not **absorbing** or has no transient states, then :py:class:`None` is returned.
"""
if not self.is_absorbing or len(self.transient_states) == 0:
fm = None
else:
indices = self.__transient_states_indices
q = self.__p[_np.ix_(indices, indices)]
i = _np.eye(len(indices))
fm = _npl.inv(i - q)
return fm
[docs] @_cached_property
def implied_timescales(self) -> _oarray:
"""
| A property representing the implied timescales of the Markov chain.
| If the Markov chain is not **ergodic**, then :py:class:`None` is returned.
"""
if not self.is_ergodic:
it = None
else:
ev = self.__eigenvalues_sorted[::-1]
with _np.errstate(divide='ignore'):
it = _np.append(_np.inf, -1.0 / _np.log(ev[1:]))
return it
[docs] @_cached_property
def incidence_matrix(self) -> _tarray:
"""
A property representing the incidence matrix of the Markov chain.
"""
n = self.__size
k = n**2
im = _np.zeros((n, k), dtype=int)
us = _np.repeat(self.__states_indices, n)
vs = _np.tile(self.__states_indices, n)
for index, (u, v) in enumerate(zip(us, vs)):
if u == v:
continue
im[u, index] = 1
im[v, index] = 1
return im
[docs] @_cached_property
def is_absorbing(self) -> bool:
"""
A property indicating whether the Markov chain is absorbing.
"""
if len(self.absorbing_states) == 0:
result = False
else:
indices = set(self.__states_indices)
absorbing_indices = set(self.__absorbing_states_indices)
transient_indices = set()
progress = True
unknown_states = None
while progress:
unknown_states = indices.copy() - absorbing_indices - transient_indices
known_states = absorbing_indices | transient_indices
progress = False
for i in unknown_states:
for j in known_states:
if self.__p[i, j] > 0.0:
transient_indices.add(i)
progress = True
break
result = len(unknown_states) == 0
return result
[docs] @_cached_property
def is_aperiodic(self) -> bool:
"""
A property indicating whether the Markov chain is aperiodic.
"""
if self.is_irreducible:
result = set(self.periods).pop() == 1
elif all(period == 1 for period in self.periods):
result = True
else: # pragma: no cover
result = _nx.is_aperiodic(self.__digraph)
return result
[docs] @_cached_property
def is_canonical(self) -> bool:
"""
A property indicating whether the Markov chain has a canonical form.
"""
recurrent_indices = self.__recurrent_states_indices
transient_indices = self.__transient_states_indices
if len(recurrent_indices) == 0 or len(transient_indices) == 0:
result = True
else:
result = max(transient_indices) < min(recurrent_indices)
return result
[docs] @_cached_property
def is_doubly_stochastic(self) -> bool:
"""
A property indicating whether the Markov chain is doubly stochastic.
"""
result = _np.allclose(_np.sum(self.__p, axis=0), 1.0)
return result
[docs] @_cached_property
def is_ergodic(self) -> bool:
"""
A property indicating whether the Markov chain is ergodic.
"""
result = self.is_irreducible and self.is_aperiodic
return result
[docs] @_cached_property
def is_irreducible(self) -> bool:
"""
A property indicating whether the Markov chain is irreducible.
"""
result = len(self.communicating_classes) == 1
return result
[docs] @_cached_property
def is_regular(self) -> bool:
"""
A property indicating whether the Markov chain is regular.
"""
d = _np.diag(self.__p)
nz = _np.count_nonzero(d)
if nz > 0:
k = (2 * self.__size) - nz - 1
else:
k = self.__size**self.__size - (2 * self.__size) + 2
result = _np.all(_npl.matrix_power(self.__p, k) > 0.0)
return result
[docs] @_cached_property
def is_reversible(self) -> bool:
"""
A property indicating whether the Markov chain is reversible.
"""
# noinspection PyTypeChecker
if len(self.pi) > 1:
return False
pi = self.pi[0]
x = pi[:, _np.newaxis] * self.__p
result = _np.allclose(x, _np.transpose(x))
return result
[docs] @_cached_property
@_object_mark(aliases=['is_monotone'])
def is_stochastically_monotone(self) -> bool:
"""
A property indicating whether the Markov chain is stochastically monotone.
"""
result = True
for m in range(1, self.__size):
sm = _np.sum(self.__p[:, m:], axis=1)
for k, l in zip(range(self.__size - 1), range(1, self.__size)):
sk = sm[k]
sl = sm[l]
if sl < sk:
result = False
break
return result
[docs] @_cached_property
def is_symmetric(self) -> bool:
"""
A property indicating whether the Markov chain is symmetric.
"""
result = _np.allclose(self.__p, _np.transpose(self.__p))
return result
[docs] @_cached_property
def kemeny_constant(self) -> _ofloat:
"""
| A property representing the Kemeny's constant of the fundamental matrix of the Markov chain.
| If the Markov chain is not **absorbing** or has no transient states, then :py:class:`None` is returned.
"""
fm = self.fundamental_matrix
if fm is None:
kc = None
elif fm.size == 1:
kc = fm[0].item()
else:
kc = _np.trace(fm).item()
return kc
[docs] @_cached_property
def lumping_partitions(self) -> _tpartitions:
"""
A property representing all the partitions of the Markov chain that satisfy the ordinary lumpability criterion.
"""
lp = _find_lumping_partitions(self.__p)
return lp
[docs] @_cached_property
def mixing_rate(self) -> _ofloat:
"""
| A property representing the mixing rate of the Markov chain.
| If the Markov chain is not **ergodic** or the **SLEM** (second largest eigenvalue modulus) cannot be computed, then :py:class:`None` is returned.
"""
if self.__slem is None:
mr = None
else:
mr = -1.0 / _np.log(self.__slem)
return mr
@property
def n(self) -> int:
"""
A property representing the size of the Markov chain state space.
"""
return self.__size
@property
def p(self) -> _tarray:
"""
A property representing the transition matrix of the Markov chain.
"""
return _np.copy(self.__p)
[docs] @_cached_property
def period(self) -> int:
"""
A property representing the period of the Markov chain.
"""
if self.is_aperiodic:
period = 1
elif self.is_irreducible:
period = set(self.periods).pop()
else: # pragma: no cover
period = 1
for p in [self.periods[self.communicating_classes.index(recurrent_class)] for recurrent_class in self.recurrent_classes]:
period = (period * p) // _mt.gcd(period, p)
return period
[docs] @_cached_property
def periods(self) -> _tlist_int:
"""
A property representing the period of each communicating class defined by the Markov chain.
"""
periods = _calculate_periods(self.__digraph)
return periods
[docs] @_cached_property
@_object_mark(aliases=['stationary_distributions', 'steady_states'])
def pi(self) -> _tlist_array:
"""
| A property representing the stationary distributions of the Markov chain.
| **Aliases:** stationary_distributions, steady_states
"""
if self.is_irreducible:
s = _np.reshape(_gth_solve(self.__p), (1, self.__size))
else:
s = _np.zeros((len(self.recurrent_classes), self.__size), dtype=float)
for index, indices in enumerate(self.__recurrent_classes_indices):
pr = self.__p[_np.ix_(indices, indices)]
s[index, indices] = _gth_solve(pr)
pi = [s[i, :] for i in range(s.shape[0])]
return pi
[docs] @_cached_property
def rank(self) -> int:
"""
A property representing the rank of the transition matrix of the Markov chain.
"""
r = _npl.matrix_rank(self.__p)
return r
[docs] @_cached_property
def recurrent_classes(self) -> _tlists_str:
"""
A property representing the recurrent classes defined by the Markov chain.
"""
classes = [[*map(self.__states.__getitem__, i)] for i in self.__recurrent_classes_indices]
return classes
[docs] @_cached_property
def recurrent_states(self) -> _tlists_str:
"""
A property representing the recurrent states of the Markov chain.
"""
states = [*map(self.__states.__getitem__, self.__recurrent_states_indices)]
return states
[docs] @_cached_property
def relaxation_rate(self) -> _ofloat:
"""
| A property representing the relaxation rate of the Markov chain.
| If the Markov chain is not **ergodic** or the **SLEM** (second largest eigenvalue modulus) cannot be computed, then :py:class:`None` is returned.
"""
if self.__slem is None:
rr = None
else:
rr = 1.0 / self.spectral_gap
return rr
@property
def size(self) -> int:
"""
A property representing the size of the Markov chain.
"""
return self.__size
[docs] @_cached_property
def spectral_gap(self) -> _ofloat:
"""
| A property representing the spectral gap of the Markov chain.
| If the Markov chain is not **ergodic** or the **SLEM** (second largest eigenvalue modulus) cannot be computed, then :py:class:`None` is returned.
"""
if self.__slem is None:
sg = None
else:
sg = 1.0 - self.__slem
return sg
@property
def states(self) -> _tlist_str:
"""
A property representing the states of the Markov chain.
"""
return self.__states
[docs] @_cached_property
def topological_entropy(self) -> float:
"""
A property representing the topological entropy of the Markov chain.
"""
ev = _eigenvalues_sorted(self.adjacency_matrix)
te = _np.log(ev[-1])
return te
[docs] @_cached_property
def transient_classes(self) -> _tlists_str:
"""
A property representing the transient classes defined by the Markov chain.
"""
classes = [[*map(self.__states.__getitem__, i)] for i in self.__transient_classes_indices]
return classes
[docs] @_cached_property
def transient_states(self) -> _tlists_str:
"""
A property representing the transient states of the Markov chain.
"""
states = [*map(self.__states.__getitem__, self.__transient_states_indices)]
return states
[docs] def absorption_probabilities(self) -> _oarray:
"""
The method computes the absorption probabilities of the Markov chain.
| **Notes:**
- If the Markov chain has no transient states, then :py:class:`None` is returned.
"""
if 'ap' not in self.__cache:
self.__cache['ap'] = _absorption_probabilities(self)
return self.__cache['ap']
[docs] @_object_mark(instance_generator=True)
def aggregate(self, s: int, method: str = 'adaptive') -> _tmc:
"""
The method attempts to reduce the state space of the Markov chain to the given number of states through a Kullback-Leibler divergence minimization approach.
| **Notes:**
- The spectral theory based aggregation is described in `Optimal Kullback-Leibler Aggregation via Spectral Theory of Markov Chains (Deng et al., 2011) <https://doi.org/10.1109/TAC.2011.2141350>`_.
:param s: the number of states of the reduced Markov chain.
:param method:
- **spectral-bottom-up** for a spectral theory based aggregation, bottom-up (more suitable for reducing a large number of states).
- **spectral-top-down** for a spectral theory based aggregation, top-down (more suitable for reducing a small number of states).
- **adaptive** for automatically selecting the best aggregation method.
:raises ValidationError: if any input argument is not compliant.
:raises ValueError: if the Markov chain defines only two states or is not **ergodic**.
"""
try:
method = _validate_enumerator(method, ['adaptive', 'spectral-bottom-up', 'spectral-top-down'])
s = _validate_integer(s, lower_limit=(2, False), upper_limit=(self.__size - 1, False))
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
if self.__size == 2: # pragma: no cover
raise ValueError('The Markov chain defines only two states.')
if not self.is_ergodic: # pragma: no cover
raise ValueError('The Markov chain is not ergodic.')
if method == 'adaptive': # pragma: no cover
if self.__size < 30:
method = 'spectral-top-down'
else:
method = 'spectral-bottom-up' if (float(s) / self.__size) <= 0.3 else 'spectral-top-down'
func = _aggregate_spectral_bottom_up if method == 'spectral-bottom-up' else _aggregate_spectral_top_down
p, states, error_message = func(self.p, self.pi[0], s)
if error_message is not None: # pragma: no cover
raise ValueError(error_message)
mc = MarkovChain(p, states)
return mc
[docs] def are_communicating(self, state1: _tstate, state2: _tstate) -> bool:
"""
The method verifies whether the given states of the Markov chain are communicating.
:param state1: the first state.
:param state2: the second state.
:raises ValidationError: if any input argument is not compliant.
"""
try:
state1 = _validate_label(state1, self.__states)
state2 = _validate_label(state2, self.__states)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
a1 = self.accessibility_matrix[state1, state2] != 0
a2 = self.accessibility_matrix[state2, state1] != 0
result = a1 and a2
return result
[docs] @_object_mark(instance_generator=True)
def closest_reversible(self, initial_distribution: _onumeric = None, weighted: bool = False) -> _tmc:
"""
The method computes the closest reversible of the Markov chain.
| **Notes:**
- The algorithm is described in `Computing the Nearest Reversible Markov chain (Nielsen & Weber, 2015) <https://doi.org/10.1002/nla.1967>`_.
:param initial_distribution: the distribution of the states (*if omitted, the states are assumed to be uniformly distributed*).
:param weighted: a boolean indicating whether to use the weighted Frobenius norm.
:raises ValidationError: if any input argument is not compliant.
:raises ValueError: if the closest reversible could not be computed.
"""
try:
initial_distribution = _np.full(self.__size, 1.0 / self.__size, dtype=float) if initial_distribution is None else _validate_vector(initial_distribution, 'stochastic', False, self.__size)
weighted = _validate_boolean(weighted)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
zeros = len(initial_distribution) - _np.count_nonzero(initial_distribution)
if weighted and zeros > 0: # pragma: no cover
raise _ValidationError('If the weighted Frobenius norm is used, the initial distribution must not contain null probabilities.')
if self.is_reversible:
p = _np.copy(self.__p)
else:
p, _, error_message = _closest_reversible(self.__p, initial_distribution, weighted)
if error_message is not None: # pragma: no cover
raise ValueError(error_message)
mc = MarkovChain(p, self.__states)
if not mc.is_reversible: # pragma: no cover
raise ValueError('The closest reversible could not be computed.')
return mc
[docs] @_object_mark(aliases=['crp'])
def committor_probabilities(self, committor_type: str, states1: _tstates, states2: _tstates) -> _oarray:
"""
The method computes the committor probabilities between the given subsets of the state space defined by the Markov chain.
| **Notes:**
- If the Markov chain is not **ergodic**, then :py:class:`None` is returned.
- The method can be accessed through the following aliases: **crp**.
:param committor_type:
- **backward** for the backward committor;
- **forward** for the forward committor.
:param states1: the first subset of the state space.
:param states2: the second subset of the state space.
:raises ValidationError: if any input argument is not compliant.
"""
try:
committor_type = _validate_enumerator(committor_type, ['backward', 'forward'])
states1 = _validate_labels_current(states1, self.__states, True)
states2 = _validate_labels_current(states2, self.__states, True)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
intersection = _np.intersect1d(states1, states2)
if len(intersection) > 0: # pragma: no cover
raise _ValidationError(f'The two sets of states must be disjoint. An intersection has been detected: {", ".join([str(i) for i in intersection])}.')
value = _committor_probabilities(self, committor_type, states1, states2)
return value
[docs] @_object_mark(aliases=['conditional_distribution', 'cd', 'cp'])
def conditional_probabilities(self, state: _tstate) -> _tarray:
"""
The method computes the probabilities, for all the states of the Markov chain, conditioned on the process being at the given state.
| **Notes:**
- The method can be accessed through the following aliases: **conditional_distribution**, **cd**, **cp**.
:param state: the current state.
:raises ValidationError: if any input argument is not compliant.
"""
try:
state = _validate_label(state, self.__states)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = self.__p[state, :]
return value
[docs] @_object_mark(aliases=['er'])
def expected_rewards(self, steps: int, rewards: _tnumeric) -> _tarray:
"""
The method computes the expected rewards of the Markov chain after **N** steps, given the reward value of each state.
| **Notes:**
- The method can be accessed through the following aliases: **er**.
:param steps: the number of steps.
:param rewards: the reward values.
:raises ValidationError: if any input argument is not compliant.
"""
try:
steps = _validate_integer(steps, lower_limit=(0, True))
rewards = _validate_rewards(rewards, self.__size)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = _expected_rewards(self.__p, steps, rewards)
return value
[docs] @_object_mark(aliases=['et'])
def expected_transitions(self, steps: int, initial_distribution: _onumeric = None) -> _tarray:
"""
The method computes the expected number of transitions performed by the Markov chain after *N* steps, given the initial distribution of the states.
| **Notes:**
- The method can be accessed through the following aliases: **et**.
:param steps: the number of steps.
:param initial_distribution: the initial distribution of the states (*if omitted, the states are assumed to be uniformly distributed*).
:raises ValidationError: if any input argument is not compliant.
"""
try:
steps = _validate_integer(steps, lower_limit=(0, True))
initial_distribution = _np.full(self.__size, 1.0 / self.__size, dtype=float) if initial_distribution is None else _validate_vector(initial_distribution, 'stochastic', False, self.__size)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = _expected_transitions(self.__p, self.__rdl_decomposition, steps, initial_distribution)
return value
[docs] @_object_mark(aliases=['fpp'])
def first_passage_probabilities(self, steps: int, initial_state: _tstate, first_passage_states: _ostates = None) -> _tarray:
"""
The method computes the first passage probabilities of the Markov chain after *N* steps, given the initial state and, optionally, the first passage states.
| **Notes:**
- The method can be accessed through the following aliases: **fpp**.
:param steps: the number of steps.
:param initial_state: the initial state.
:param first_passage_states: the first passage states.
:raises ValidationError: if any input argument is not compliant.
"""
try:
steps = _validate_integer(steps, lower_limit=(0, True))
initial_state = _validate_label(initial_state, self.__states)
if first_passage_states is not None:
first_passage_states = _validate_labels_current(first_passage_states, self.__states, False)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = _first_passage_probabilities(self, steps, initial_state, first_passage_states)
return value
[docs] @_object_mark(aliases=['fpr'])
def first_passage_reward(self, steps: int, initial_state: _tstate, first_passage_states: _tstates, rewards: _tnumeric) -> float:
"""
The method computes the first passage reward of the Markov chain after *N* steps, given the reward value of each state, the initial state and the first passage states.
| **Notes:**
- The method can be accessed through the following aliases: **fpr**.
:param steps: the number of steps.
:param initial_state: the initial state.
:param first_passage_states: the first passage states.
:param rewards: the reward values.
:raises ValidationError: if any input argument is not compliant.
:raises ValueError: if the Markov chain defines only two states.
"""
try:
initial_state = _validate_label(initial_state, self.__states)
first_passage_states = _validate_labels_current(first_passage_states, self.__states, True)
rewards = _validate_rewards(rewards, self.__size)
steps = _validate_integer(steps, lower_limit=(0, True))
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
if self.__size == 2: # pragma: no cover
raise ValueError('The Markov chain defines only two states and the first passage rewards cannot be computed.')
if initial_state in first_passage_states: # pragma: no cover
raise _ValidationError('The first passage states cannot include the initial state.')
if len(first_passage_states) == (self.__size - 1): # pragma: no cover
raise _ValidationError('The first passage states cannot include all the states except the initial one.')
value = _first_passage_reward(self, steps, initial_state, first_passage_states, rewards)
return value
[docs] @_object_mark(aliases=['hp'])
def hitting_probabilities(self, targets: _ostates = None) -> _tarray:
"""
The method computes the hitting probability, for the states of the Markov chain, to the given set of states.
| **Notes:**
- The method can be accessed through the following aliases: **hp**.
:param targets: the target states (*if omitted, all the states are targeted*).
:raises ValidationError: if any input argument is not compliant.
"""
try:
if targets is None:
targets = self.__states_indices.copy()
else:
targets = _validate_labels_current(targets, self.__states, False)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = _hitting_probabilities(self, targets)
return value
[docs] @_object_mark(aliases=['ht'])
def hitting_times(self, targets: _ostates = None) -> _tarray:
"""
The method computes the hitting times, for all the states of the Markov chain, to the given set of states.
| **Notes:**
- The method can be accessed through the following aliases: **ht**.
:param targets: the target states (*if omitted, all the states are targeted*).
:raises ValidationError: if any input argument is not compliant.
"""
try:
if targets is None:
targets = self.__states_indices.copy()
else:
targets = _validate_labels_current(targets, self.__states, False)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = _hitting_times(self, targets)
return value
[docs] def is_absorbing_state(self, state: _tstate) -> bool:
"""
The method verifies whether the given state of the Markov chain is absorbing.
:param state: the target state.
:raises ValidationError: if any input argument is not compliant.
"""
try:
state = _validate_label(state, self.__states)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
result = state in self.__absorbing_states_indices
return result
[docs] def is_accessible(self, state_target: _tstate, state_origin: _tstate) -> bool:
"""
The method verifies whether the given target state is reachable from the given origin state.
:param state_target: the target state.
:param state_origin: the origin state.
:raises ValidationError: if any input argument is not compliant.
"""
try:
state_target = _validate_label(state_target, self.__states)
state_origin = _validate_label(state_origin, self.__states)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
result = self.accessibility_matrix[state_origin, state_target] != 0
return result
[docs] def is_cyclic_state(self, state: _tstate) -> bool:
"""
The method verifies whether the given state is cyclic.
:param state: the target state.
:raises ValidationError: if any input argument is not compliant.
"""
try:
state = _validate_label(state, self.__states)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
result = state in self.__cyclic_states_indices
return result
[docs] def is_recurrent_state(self, state: _tstate) -> bool:
"""
The method verifies whether the given state is recurrent.
:param state: the target state.
:raises ValidationError: if any input argument is not compliant.
"""
try:
state = _validate_label(state, self.__states)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
result = state in self.__recurrent_states_indices
return result
[docs] def is_transient_state(self, state: _tstate) -> bool:
"""
The method verifies whether the given state is transient.
:param state: the target state.
:raises ValidationError: if any input argument is not compliant.
"""
try:
state = _validate_label(state, self.__states)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
result = state in self.__transient_states_indices
return result
[docs] @_object_mark(instance_generator=True)
def lump(self, partitions: _tpartition) -> _tmc:
"""
The method attempts to reduce the state space of the Markov chain with respect to the given partitions following the ordinary lumpability criterion.
:param partitions: the partitions of the state space.
:raises ValidationError: if any input argument is not compliant.
:raises ValueError: if the Markov chain defines only two states or is not lumpable with respect to the given partitions.
"""
try:
partitions = _validate_partitions(partitions, self.__states)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
if self.__size == 2: # pragma: no cover
raise ValueError('The Markov chain defines only two states.')
p, states, error_message = _lump(self.p, self.states, partitions)
if error_message is not None: # pragma: no cover
raise ValueError(error_message)
mc = MarkovChain(p, states)
return mc
[docs] @_object_mark(aliases=['mat'])
def mean_absorption_times(self) -> _oarray:
"""
The method computes the mean absorption times of the Markov chain.
| **Notes:**
- If the Markov chain is not **absorbing** or has no transient states, then :py:class:`None` is returned.
- The method can be accessed through the following aliases: **mat**.
"""
if 'mat' not in self.__cache:
self.__cache['mat'] = _mean_absorption_times(self)
return self.__cache['mat']
[docs] @_object_mark(aliases=['mfpt_between', 'mfptb'])
def mean_first_passage_times_between(self, origins: _tstates, targets: _tstates) -> _ofloat:
"""
The method computes the mean first passage times between the given subsets of the state space.
| **Notes:**
- If the Markov chain is not **ergodic**, then :py:class:`None` is returned.
- The method can be accessed through the following aliases: **mfpt_between**, **mfptb**.
:param origins: the origin states.
:param targets: the target states.
:raises ValidationError: if any input argument is not compliant.
"""
try:
origins = _validate_labels_current(origins, self.__states, True)
targets = _validate_labels_current(targets, self.__states, True)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = _mean_first_passage_times_between(self, origins, targets)
return value
[docs] @_object_mark(aliases=['mfpt_to', 'mfptt'])
def mean_first_passage_times_to(self, targets: _ostates = None) -> _oarray:
"""
The method computes the mean first passage times, for all the states, to the given set of states.
| **Notes:**
- If the Markov chain is not **ergodic**, then :py:class:`None` is returned.
- The method can be accessed through the following aliases: **mfpt_to**, **mfptt**.
:param targets: the target states (*if omitted, all the states are targeted*).
:raises ValidationError: if any input argument is not compliant.
"""
try:
if targets is not None:
targets = _validate_labels_current(targets, self.__states, False)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = _mean_first_passage_times_to(self, targets)
return value
[docs] @_object_mark(aliases=['mnv'])
def mean_number_visits(self) -> _oarray:
"""
The method computes the mean number of visits of the Markov chain.
| **Notes:**
- The method can be accessed through the following aliases: **mnv**.
"""
if 'mnv' not in self.__cache:
self.__cache['mnv'] = _mean_number_visits(self)
return self.__cache['mnv']
[docs] @_object_mark(aliases=['mrt'])
def mean_recurrence_times(self) -> _oarray:
"""
The method computes the mean recurrence times of the Markov chain.
| **Notes:**
- If the Markov chain is not **ergodic**, then :py:class:`None` is returned.
- The method can be accessed through the following aliases: **mrt**.
"""
if 'mrt' not in self.__cache:
self.__cache['mrt'] = _mean_recurrence_times(self)
return self.__cache['mrt']
[docs] @_object_mark(instance_generator=True)
def merge_with(self, other: _tmc, gamma: float) -> _tmc:
"""
The method returns a Markov chain whose transition matrix is defined below.
| :math:`p_{new} = (1 - \gamma) p_{current} + \gamma p_{other}`
:param other: the other Markov chain to be merged.
:param gamma: the merger blending factor.
:raises ValidationError: if any input argument is not compliant.
"""
try:
other = _validate_markov_chain(other, self.__size)
gamma = _validate_float(gamma, lower_limit=(0.0, True), upper_limit=(1.0, True))
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
p_current = self.__p
p_other = other.p
p = ((1.0 - gamma) * p_current) + (gamma * p_other)
states = _create_labels(self.__size, 'M')
mc = MarkovChain(p, states)
return mc
[docs] @_object_mark(aliases=['mt'])
def mixing_time(self, initial_distribution: _onumeric = None, jump: int = 1, cutoff_type: str = 'natural') -> _oint:
"""
The method computes the mixing time of the Markov chain, given the initial distribution of the states.
| **Notes:**
- If the Markov chain is not **ergodic**, then :py:class:`None` is returned.
- The method can be accessed through the following aliases: **mt**.
:param initial_distribution: the initial distribution of the states (*if omitted, the states are assumed to be uniformly distributed*).
:param jump: the number of steps in each iteration.
:param cutoff_type:
- **natural** for the natural cutoff;
- **traditional** for the traditional cutoff.
:raises ValidationError: if any input argument is not compliant.
"""
try:
initial_distribution = _np.full(self.__size, 1.0 / self.__size, dtype=float) if initial_distribution is None else _validate_vector(initial_distribution, 'stochastic', False, self.__size)
jump = _validate_integer(jump, lower_limit=(0, True))
cutoff_type = _validate_enumerator(cutoff_type, ['natural', 'traditional'])
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
cutoff = 0.25 if cutoff_type == 'traditional' else 1.0 / (2.0 * _np.exp(1.0))
value = _mixing_time(self, initial_distribution, jump, cutoff)
return value
[docs] @_object_mark(random_output=True)
def next(self, initial_state: _tstate, output_index: bool = False, seed: _oint = None) -> _tstate:
"""
The method simulates a single step in a random walk.
:param initial_state: the initial state.
:param output_index: a boolean indicating whether to output the state index.
:param seed: a seed to be used as RNG initializer for reproducibility purposes.
:raises ValidationError: if any input argument is not compliant.
"""
try:
rng = _create_rng(seed)
initial_state = _validate_label(initial_state, self.__states)
output_index = _validate_boolean(output_index)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = _simulate(self, 1, initial_state, None, rng)[-1]
if not output_index:
value = self.__states[value]
return value
[docs] def predict(self, steps: int, initial_state: _tstate, output_indices: bool = False) -> _osequence:
"""
The method computes the most probable sequence of states produced by a random walk of *N* steps, given the initial state.
| **Notes:**
- In presence of probability ties :py:class:`None` is returned.
:param steps: the number of steps.
:param initial_state: the initial state.
:param output_indices: a boolean indicating whether to output the state indices.
:raises ValidationError: if any input argument is not compliant.
"""
try:
steps = _validate_integer(steps, lower_limit=(0, True))
initial_state = _validate_label(initial_state, self.__states)
output_indices = _validate_boolean(output_indices)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = _predict(self, steps, initial_state)
if value is not None and not output_indices:
value = [*map(self.__states.__getitem__, value)]
return value
[docs] def redistribute(self, steps: int, initial_status: _ostatus = None, output_last: bool = True) -> _tredists:
"""
The method performs a redistribution of states of *N* steps.
:param steps: the number of steps.
:param initial_status: the initial state or the initial distribution of the states (*if omitted, the states are assumed to be uniformly distributed*).
:param output_last: a boolean indicating whether to output only the last distributions.
:raises ValidationError: if any input argument is not compliant.
"""
try:
steps = _validate_integer(steps, lower_limit=(1, False))
initial_status = _np.full(self.__size, 1.0 / self.__size, dtype=float) if initial_status is None else _validate_status(initial_status, self.__states)
output_last = _validate_boolean(output_last)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = _redistribute(self, steps, initial_status, output_last)
return value
[docs] def sensitivity(self, state: _tstate) -> _oarray:
"""
The method computes the sensitivity matrix of the stationary distribution with respect to a given state.
| **Notes:**
- If the Markov chain is not **irreducible**, then :py:class:`None` is returned.
:param state: the target state.
:raises ValidationError: if any input argument is not compliant.
"""
try:
state = _validate_label(state, self.__states)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = _sensitivity(self, state)
return value
[docs] def sequence_probability(self, sequence: _tsequence) -> float:
"""
The method computes the probability of a given sequence of states.
:param sequence: the observed sequence of states.
:raises ValidationError: if any input argument is not compliant.
"""
try:
sequence = _validate_sequence(sequence, self.__states)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = _sequence_probability(self, sequence)
return value
[docs] @_object_mark(random_output=True)
def simulate(self, steps: int, initial_state: _ostate = None, final_state: _ostate = None, output_indices: bool = False, seed: _oint = None) -> _tsequence:
"""
The method simulates a random walk of the given number of steps.
:param steps: the number of steps.
:param initial_state: the initial state (*if omitted, it is chosen uniformly at random*).
:param final_state: the final state of the walk (*if specified, the simulation stops as soon as it is reached even if not all the steps have been performed*).
:param output_indices: a boolean indicating whether to output the state indices.
:param seed: a seed to be used as RNG initializer for reproducibility purposes.
:raises ValidationError: if any input argument is not compliant.
"""
try:
rng = _create_rng(seed)
steps = _validate_integer(steps, lower_limit=(2, False))
initial_state = rng.randint(0, self.__size) if initial_state is None else _validate_label(initial_state, self.__states)
final_state = None if final_state is None else _validate_label(final_state, self.__states)
output_indices = _validate_boolean(output_indices)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = _simulate(self, steps, initial_state, final_state, rng)
if not output_indices:
value = [*map(self.__states.__getitem__, value)]
return value
[docs] @_object_mark(aliases=['tc'])
def time_correlations(self, sequence1: _tsequence, sequence2: _osequence = None, time_points: _ttimes_in = 1) -> _otimes_out:
"""
The method computes the time autocorrelations of a single observed sequence of states or the time cross-correlations of two observed sequences of states.
| **Notes:**
- If the Markov chain has multiple stationary distributions, then :py:class:`None` is returned.
- If a single time point is provided, then a :py:class:`float` is returned.
- The method can be accessed through the following aliases: **tc**.
:param sequence1: the first observed sequence of states.
:param sequence2: the second observed sequence of states.
:param time_points: the time point or a list of time points at which the computation is performed.
:raises ValidationError: if any input argument is not compliant.
"""
try:
sequence1 = _validate_sequence(sequence1, self.__states)
if sequence2 is not None:
sequence2 = _validate_sequence(sequence2, self.__states)
time_points = _validate_time_points(time_points)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = _time_correlations(self, self.__rdl_decomposition, sequence1, sequence2, time_points)
return value
[docs] @_object_mark(aliases=['tr'])
def time_relaxations(self, sequence: _tsequence, initial_distribution: _onumeric = None, time_points: _ttimes_in = 1) -> _otimes_out:
"""
The method computes the time relaxations of an observed sequence of states with respect to the given initial distribution of the states.
| **Notes:**
- If the Markov chain has multiple stationary distributions, then :py:class:`None` is returned.
- If a single time point is provided, then a :py:class:`float` is returned.
- The method can be accessed through the following aliases: **tr**.
:param sequence: the observed sequence of states.
:param initial_distribution: the initial distribution of the states (*if omitted, the states are assumed to be uniformly distributed*).
:param time_points: the time point or a list of time points at which the computation is performed.
:raises ValidationError: if any input argument is not compliant.
"""
try:
sequence = _validate_sequence(sequence, self.__states)
initial_distribution = _np.full(self.__size, 1.0 / self.__size, dtype=float) if initial_distribution is None else _validate_vector(initial_distribution, 'stochastic', False, self.__size)
time_points = _validate_time_points(time_points)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = _time_relaxations(self, self.__rdl_decomposition, sequence, initial_distribution, time_points)
return value
[docs] @_object_mark(aliases=['to_bounded'], instance_generator=True)
def to_bounded_chain(self, boundary_condition: _tbcond) -> _tmc:
"""
The method returns a bounded Markov chain by adjusting the transition matrix of the original process using the specified boundary condition.
| **Notes:**
- The method can be accessed through the following aliases: **to_bounded**.
:param boundary_condition:
- a number representing the first probability of the semi-reflecting condition;
- a string representing the boundary condition type (either absorbing or reflecting).
:raises ValidationError: if any input argument is not compliant.
"""
try:
boundary_condition = _validate_boundary_condition(boundary_condition)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
p, _, _ = _bounded(self.__p, boundary_condition)
mc = MarkovChain(p, self.__states)
return mc
[docs] def to_dictionary(self) -> _tmc_dict:
"""
The method returns a dictionary representing the Markov chain.
"""
d = {(self.__states[i], self.__states[j]): self.__p[i, j] for i in range(self.__size) for j in range(self.__size)}
return d
[docs] def to_file(self, file_path: _tpath):
"""
The method writes a Markov chain to the given file.
| Only **csv**, **json**, **txt** and **xml** files are supported; data format is inferred from the file extension.
:param file_path: the location of the file in which the Markov chain must be written.
:raises OSError: if the file cannot be written.
:raises ValidationError: if any input argument is not compliant.
"""
try:
file_path, file_extension = _validate_file_path(file_path, ['.csv', '.json', '.xml', '.txt'], True)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
d = self.to_dictionary()
if file_extension == '.csv':
_write_csv(True, d, file_path)
elif file_extension == '.json':
_write_json(True, d, file_path)
elif file_extension == '.txt':
_write_txt(d, file_path)
else:
_write_xml(True, d, file_path)
[docs] def to_graph(self) -> _tgraph:
"""
The method returns a directed graph representing the Markov chain.
"""
graph = _cp.deepcopy(self.__digraph)
return graph
[docs] @_object_mark(aliases=['to_lazy'], instance_generator=True)
def to_lazy_chain(self, inertial_weights: _tweights = 0.5) -> _tmc:
"""
The method returns a lazy Markov chain by adjusting the state inertia of the original process.
| **Notes:**
- The method can be accessed through the following aliases: **to_lazy**.
:param inertial_weights: the inertial weights to apply for the transformation.
:raises ValidationError: if any input argument is not compliant.
"""
try:
inertial_weights = _validate_vector(inertial_weights, 'unconstrained', True, self.__size)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
p, _, _ = _lazy(self.__p, inertial_weights)
mc = MarkovChain(p, self.__states)
return mc
[docs] def to_matrix(self) -> _tarray:
"""
The method returns the transition matrix of the Markov chain.
"""
m = _np.copy(self.__p)
return m
# noinspection GrazieInspection
@_object_mark(aliases=['to_nth'], instance_generator=True)
def to_nth_order(self, order: int = 2) -> _tmc:
"""
The method returns an nth-order transformation of the Markov chain.
| **Notes:**
- The method can be accessed through the following aliases: **to_nth**.
:param order: the inertial weights to apply for the transformation.
:raises ValidationError: if any input argument is not compliant.
"""
try:
order = _validate_integer(order, lower_limit=(2, False))
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
p = _npl.matrix_power(self.__p, order)
mc = MarkovChain(p, self.__states)
return mc
[docs] @_object_mark(aliases=['to_sub'], instance_generator=True)
def to_subchain(self, states: _tstates) -> _tmc:
"""
The method returns a subchain containing all the given states plus all the states reachable from them.
| **Notes:**
- The method can be accessed through the following aliases: **to_sub**.
:param states: the states to include in the subchain.
:raises ValidationError: if any input argument is not compliant.
:raises ValueError: if the subchain is not a valid Markov chain.
"""
try:
states = _validate_labels_current(states, self.__states, True)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
p, states_out, error_message = _sub(self.__p, self.__states, self.adjacency_matrix, states)
if error_message is not None: # pragma: no cover
raise ValueError(error_message)
mc = MarkovChain(p, states_out)
return mc
[docs] def transition_probability(self, state_target: _tstate, state_origin: _tstate) -> float:
"""
The method computes the probability of a given state, conditioned on the process being at a given state.
:param state_target: the target state.
:param state_origin: the origin state.
:raises ValidationError: if any input argument is not compliant.
"""
try:
state_target = _validate_label(state_target, self.__states)
state_origin = _validate_label(state_origin, self.__states)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
value = self.__p[state_origin, state_target]
return value
[docs] @staticmethod
@_object_mark(instance_generator=True)
def approximation(size: int, approximation_type: str, alpha: float, sigma: float, rho: float, states: _olist_str = None, k: _ofloat = None) -> _tmc:
"""
The method approximates the Markov chain associated with the discretized version of the first-order autoregressive process defined below.
| :math:`y_t = (1 - \\rho) \\alpha + \\rho y_{t-1} + \\varepsilon_t`
| with :math:`\\varepsilon_t \\overset{i.i.d}{\\sim} \\mathcal{N}(0, \\sigma_{\\varepsilon}^{2})`
:param size: the size of the Markov chain.
:param approximation_type:
- **adda-cooper** for the Adda-Cooper approximation;
- **rouwenhorst** for the Rouwenhorst approximation;
- **tauchen** for the Tauchen approximation;
- **tauchen-hussey** for the Tauchen-Hussey approximation.
:param alpha: the constant term :math:`\\alpha`, representing the unconditional mean of the process.
:param sigma: the standard deviation of the innovation term :math:`\\varepsilon`.
:param rho: the autocorrelation coefficient :math:`\\rho`, representing the persistence of the process across periods.
:param k:
- In the Tauchen approximation, the number of standard deviations to approximate out to (*if omitted, the value is set to 3*).
- In the Tauchen-Hussey approximation, the standard deviation used for the gaussian quadrature (*if omitted, the value is set to an optimal default*).
:param states: the name of each state (*if omitted, an increasing sequence of integers starting at 1*).
:raises ValidationError: if any input argument is not compliant.
:raises ValueError: if the gaussian quadrature fails to converge in the Tauchen-Hussey approximation.
"""
try:
size = _validate_integer(size, lower_limit=(2, False))
approximation_type = _validate_enumerator(approximation_type, ['adda-cooper', 'rouwenhorst', 'tauchen', 'tauchen-hussey'])
alpha = _validate_float(alpha)
sigma = _validate_float(sigma, lower_limit=(0.0, True))
rho = _validate_float(rho, lower_limit=(-1.0, False), upper_limit=(1.0, False))
if states is not None: # pragma: no cover
states = _validate_labels_input(states, size)
if approximation_type == 'tauchen':
k = 3.0 if k is None else _validate_float(k, lower_limit=(1.0, False))
elif approximation_type == 'tauchen-hussey':
k = ((0.5 + (rho / 4.0)) * sigma) + ((1 - (0.5 + (rho / 4.0))) * (sigma / _np.sqrt(1.0 - rho**2.0))) if k is None else _validate_float(k, lower_limit=(0.0, True))
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
p, states_out, error_message = _approximation(size, approximation_type, alpha, sigma, rho, k)
if error_message is not None: # pragma: no cover
raise ValueError(error_message)
if states is None:
states = states_out
mc = MarkovChain(p, states)
return mc
[docs] @staticmethod
@_object_mark(instance_generator=True)
def birth_death(p: _tarray, q: _tarray, states: _olist_str = None) -> _tmc:
"""
The method generates a birth-death Markov chain of given size and from given probabilities.
:param q: the creation probabilities.
:param p: the annihilation probabilities.
:param states: the name of each state (*if omitted, an increasing sequence of integers starting at 1*).
:raises ValidationError: if any input argument is not compliant.
"""
try:
p = _validate_vector(p, 'creation', False)
q = _validate_vector(q, 'annihilation', False)
if states is not None: # pragma: no cover
states = _validate_labels_input(states, {p.shape[0], q.shape[0]}.pop())
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
if p.shape[0] != q.shape[0]: # pragma: no cover
raise _ValidationError('The vector of annihilation probabilities and the vector of creation probabilities must have the same size.')
if not _np.all(q + p <= 1.0): # pragma: no cover
raise _ValidationError('The sums of annihilation and creation probabilities must be less than or equal to 1.')
p, states_out, _ = _birth_death(p, q)
if states is None:
states = states_out
mc = MarkovChain(p, states)
return mc
[docs] @staticmethod
@_object_mark(instance_generator=True)
def dirichlet_process(size: int, diffusion_factor: int, states: _olist_str = None, diagonal_bias_factor: _ofloat = None, shift_concentration: bool = False, seed: _oint = None) -> _tmc:
"""
The method generates a Markov chain of given size using a parametrized Dirichlet process.
:param size: the size of the Markov chain.
:param diffusion_factor: the diffusion factor of the Dirichlet process.
:param states: the name of each state (*if omitted, an increasing sequence of integers starting at 1*).
:param diagonal_bias_factor: the bias factor applied to the diagonal of the transition matrix (*if omitted, no inside-state stability is enforced*).
:param shift_concentration: a boolean indicating whether to shift the concentration of the Dirichlet process to the rightmost states.
:param seed: a seed to be used as RNG initializer for reproducibility purposes.
:raises ValidationError: if any input argument is not compliant.
"""
try:
rng = _create_rng(seed)
size = _validate_integer(size, lower_limit=(2, False))
diffusion_factor = _validate_integer(diffusion_factor, lower_limit=(1, False), upper_limit=(size, False))
diagonal_bias_factor = None if diagonal_bias_factor is None else _validate_float(diagonal_bias_factor, lower_limit=(0.0, True))
shift_concentration = _validate_boolean(shift_concentration)
if states is not None: # pragma: no cover
states = _validate_labels_input(states, size)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
p, states_out, _ = _dirichlet_process(rng, size, float(diffusion_factor), diagonal_bias_factor, shift_concentration)
if states is None:
states = states_out
mc = MarkovChain(p, states)
return mc
[docs] @staticmethod
@_object_mark(instance_generator=True)
def fit_function(quadrature_type: str, possible_states: _tlist_str, f: _ttfunc, quadrature_interval: _ointerval = None) -> _tmc:
"""
The method fits a Markov chain using the given transition function and the given quadrature type for the computation of nodes and weights.
| **Notes:**
- The transition function takes the four input arguments below and returns a numeric value:
- **x_index** an integer value representing the index of the i-th quadrature node;
- **x_value** a float value representing the value of the i-th quadrature node;
- **y_index** an integer value representing the index of the j-th quadrature node;
- **y_value** a float value representing the value of the j-th quadrature node.
:param quadrature_type:
- **gauss-chebyshev** for the Gauss-Chebyshev quadrature;
- **gauss-legendre** for the Gauss-Legendre quadrature;
- **niederreiter** for the Niederreiter equidistributed sequence;
- **newton-cotes** for the Newton-Cotes quadrature;
- **simpson-rule** for the Simpson rule;
- **trapezoid-rule** for the Trapezoid rule.
:param possible_states: the possible states of the process.
:param f: the transition function of the process.
:param quadrature_interval: the quadrature interval to use for the computation of nodes and weights (*if omitted, the interval [0, 1] is used*).
:raises ValidationError: if any input argument is not compliant.
:raises ValueError: if the Gauss-Legendre quadrature fails to converge.
"""
try:
quadrature_type = _validate_enumerator(quadrature_type, ['gauss-chebyshev', 'gauss-legendre', 'niederreiter', 'newton-cotes', 'simpson-rule', 'trapezoid-rule'])
f = _validate_transition_function(f)
possible_states = _validate_labels_input(possible_states)
quadrature_interval = (0.0, 1.0) if quadrature_interval is None else _validate_interval(quadrature_interval)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
if quadrature_type == 'simpson-rule' and (len(possible_states) % 2) == 0: # pragma: no cover
raise _ValidationError('The quadrature based on the Simpson rule requires an odd number of possible states.')
p, error_message = _fit_function(quadrature_type, quadrature_interval, possible_states, f)
if error_message is not None: # pragma: no cover
raise ValueError(error_message)
mc = MarkovChain(p, possible_states)
return mc
[docs] @staticmethod
@_object_mark(instance_generator=True)
def fit_sequence(fitting_type: str, possible_states: _tlist_str, sequence: _tsequence, fitting_param: _tany = None) -> _tmc:
"""
The method fits a Markov chain from an observed sequence of states using the specified fitting approach.
:param fitting_type:
- **map** for the maximum a posteriori fitting;
- **mle** for the maximum likelihood fitting.
:param possible_states: the possible states of the process.
:param sequence: the observed sequence of states.
:param fitting_param:
| - In the maximum a posteriori fitting, the matrix for the a priori distribution (*if omitted, a default value of 1 is assigned to each matrix element*).
| - In the maximum likelihood fitting, a boolean indicating whether to apply a Laplace smoothing to compensate for the unseen transition combinations (*if omitted, the value is set to True*).
:raises ValidationError: if any input argument is not compliant.
"""
try:
possible_states = _validate_labels_input(possible_states)
sequence = _validate_sequence(sequence, possible_states)
fitting_type = _validate_enumerator(fitting_type, ['map', 'mle'])
if fitting_param is not None:
if fitting_type == 'map':
fitting_param = _validate_hyperparameter(fitting_param, len(possible_states))
else:
fitting_param = _validate_boolean(fitting_param)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
if fitting_param is None:
if fitting_type == 'map':
possible_states_length = len(possible_states)
fitting_param = _np.ones((possible_states_length, possible_states_length), dtype=float)
else:
fitting_param = False
p, _ = _fit_sequence(fitting_type, fitting_param, possible_states, sequence)
mc = MarkovChain(p, possible_states)
return mc
[docs] @staticmethod
@_object_mark(instance_generator=True)
def from_dictionary(d: _tmc_dict_flex) -> _tmc:
"""
The method generates a Markov chain from the given dictionary, whose keys represent state pairs and whose values represent transition probabilities.
:param d: the dictionary to transform into the transition matrix.
:raises ValidationError: if any input argument is not compliant.
:raises ValueError: if the transition matrix defined by the dictionary is not valid.
"""
try:
d = _validate_dictionary(d)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
states = [key[0] for key in d.keys() if key[0] == key[1]]
size = len(states)
if size < 2: # pragma: no cover
raise ValueError('The size of the transition matrix defined by the dictionary must be greater than or equal to 2.')
p = _np.zeros((size, size), dtype=float)
for (state_from, state_to), probability in d.items():
p[states.index(state_from), states.index(state_to)] = probability
if not _np.allclose(_np.sum(p, axis=1), _np.ones(size, dtype=float)): # pragma: no cover
raise ValueError('The rows of the transition matrix defined by the dictionary must sum to 1.0.')
mc = MarkovChain(p, states)
return mc
[docs] @staticmethod
@_object_mark(instance_generator=True)
def from_file(file_path: _tpath) -> _tmc:
r"""
The method reads a Markov chain from the given file.
| Only **csv**, **json**, **txt** and **xml** files are supported; data format is inferred from the file extension.
| In **csv** files, data must be structured as follows:
- *Delimiter:* **comma**
- *Quoting:* **minimal**
- *Quote Character:* **double quote**
- *Header Row:* state names
- *Data Rows:* probabilities
| In **json** files, data must be structured as an array of objects with the following properties:
- **state_from** *(string)*
- **state_to** *(string)*
- **probability** *(float or int)*
| In **txt** files, every line of the file must have the following format:
- **<state_from> <state_to> <probability>**
| In **xml** files, the structure must be defined as follows:
- *Root Element:* **MarkovChain**
- *Child Elements:* **Item**\ *, with attributes:*
- **state_from** *(string)*
- **state_to** *(string)*
- **probability** *(float or int)*
:param file_path: the location of the file that defines the Markov chain.
:raises FileNotFoundError: if the file does not exist.
:raises OSError: if the file cannot be read or is empty.
:raises ValidationError: if any input argument is not compliant.
:raises ValueError: if the file contains invalid data.
"""
try:
file_path, file_extension = _validate_file_path(file_path, ['.csv', '.json', '.xml', '.txt'], False)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
if file_extension == '.csv':
d = _read_csv(True, file_path)
elif file_extension == '.json':
d = _read_json(True, file_path)
elif file_extension == '.txt':
d = _read_txt(True, file_path)
else:
d = _read_xml(True, file_path)
states = [key[0] for key in d if key[0] == key[1]]
size = len(states)
if size < 2: # pragma: no cover
raise ValueError('The size of the transition matrix defined by the dictionary must be greater than or equal to 2.')
p = _np.zeros((size, size), dtype=float)
for (state_from, state_to), probability in d.items():
p[states.index(state_from), states.index(state_to)] = probability
if not _np.allclose(_np.sum(p, axis=1), _np.ones(size, dtype=float)): # pragma: no cover
raise ValueError('The rows of the transition matrix defined by the file must sum to 1.0.')
mc = MarkovChain(p, states)
return mc
# noinspection DuplicatedCode
[docs] @staticmethod
@_object_mark(instance_generator=True)
def from_graph(graph: _tgraphs) -> _tmc:
"""
The method generates a Markov chain from the given directed graph, whose transition matrix is obtained through the normalization of edge weights.
:raises ValidationError: if any input argument is not compliant.
"""
try:
graph = _validate_graph(graph)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
states = list(graph.nodes)
size = len(states)
p = _np.zeros((size, size), dtype=float)
edges = list(graph.edges(data='weight', default=0.0))
for edge in edges:
i = states.index(edge[0])
j = states.index(edge[1])
p[i, j] = float(edge[2])
p_sums = _np.sum(p, axis=1)
for i in range(size):
p_sums_i = p_sums[i]
if _np.isclose(p_sums_i, 0.0): # pragma: no cover
p[i, :] = _np.ones(size, dtype=float) / size
else:
p[i, :] /= p_sums_i
mc = MarkovChain(p, states)
return mc
# noinspection DuplicatedCode
[docs] @staticmethod
@_object_mark(instance_generator=True)
def from_matrix(m: _tnumeric, states: _olist_str = None) -> _tmc:
"""
The method generates a Markov chain whose transition matrix is obtained through the normalization of the given matrix.
:param m: the matrix to transform into the transition matrix.
:param states: the name of each state (*if omitted, an increasing sequence of integers starting at 1*).
:raises ValidationError: if any input argument is not compliant.
"""
try:
m = _validate_matrix(m)
states = _create_labels(m.shape[1]) if states is None else _validate_labels_input(states, m.shape[1])
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
p = _np.copy(m)
p_sums = _np.sum(p, axis=1)
size = p.shape[1]
for i in range(size):
p_sums_i = p_sums[i]
if _np.isclose(p_sums_i, 0.0): # pragma: no cover
p[i, :] = _np.ones(size, dtype=float) / size
else:
p[i, :] /= p_sums_i
mc = MarkovChain(p, states)
return mc
[docs] @staticmethod
@_object_mark(instance_generator=True)
def gamblers_ruin(size: int, w: float, states: _olist_str = None) -> _tmc:
"""
The method generates a gambler's ruin Markov chain of given size and win probability.
:param size: the size of the Markov chain.
:param w: the win probability.
:param states: the name of each state (*if omitted, an increasing sequence of integers starting at 1*).
:raises ValidationError: if any input argument is not compliant.
"""
try:
size = _validate_integer(size, lower_limit=(3, False))
w = _validate_float(w, lower_limit=(0.0, True), upper_limit=(1.0, True))
if states is not None: # pragma: no cover
states = _validate_labels_input(states, size)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
p, states_out, _ = _gamblers_ruin(size, w)
if states is None:
states = states_out
mc = MarkovChain(p, states)
return mc
[docs] @staticmethod
@_object_mark(instance_generator=True)
def identity(size: int, states: _olist_str = None) -> _tmc:
"""
The method generates a Markov chain of given size based on an identity transition matrix.
:param size: the size of the Markov chain.
:param states: the name of each state (*if omitted, an increasing sequence of integers starting at 1*).
:raises ValidationError: if any input argument is not compliant.
"""
try:
size = _validate_integer(size, lower_limit=(2, False))
states = _create_labels(size) if states is None else _validate_labels_input(states, size)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
p = _np.eye(size)
mc = MarkovChain(p, states)
return mc
[docs] @staticmethod
@_object_mark(instance_generator=True)
def population_genetics_model(model: str, n: int, s: float = 0.0, u: float = 1.0e-9, v: float = 1.0e-9, states: _olist_str = None) -> _tmc:
"""
The method generates a Markov chain based on the specified population genetics model.
:param model:
- **moran** for the Moran model;
- **wright-fisher** for the Wright-Fisher model.
:param n: the number of individuals.
:param s: the selection intensity.
:param u: the backward mutation rate.
:param v: the forward mutation rate.
:param states: the name of each state (*if omitted, an increasing sequence of integers starting at 1*).
:raises ValidationError: if any input argument is not compliant.
"""
try:
model = _validate_enumerator(model, ['moran', 'wright-fisher'])
n = _validate_integer(n, lower_limit=(2, False), upper_limit=(500, False))
s = _validate_float(s, lower_limit=(-1.0, False))
u = _validate_float(u, lower_limit=(0.0, False), upper_limit=(1.0, False))
v = _validate_float(v, lower_limit=(0.0, False), upper_limit=(1.0, False))
if states is not None: # pragma: no cover
states = _validate_labels_input(states, n + 1)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
if model == 'wright-fisher' and (4.0 * n * max(u, v)) > 1.0:
raise _ValidationError('The highest mutation rate specified is too high for the Wright-Fisher model.')
p, states_out, _ = _population_genetics_model(model, n, s, u, v)
if states is None:
states = states_out
mc = MarkovChain(p, states)
return mc
# noinspection DuplicatedCode
[docs] @staticmethod
@_object_mark(instance_generator=True, random_output=True)
def random(size: int, states: _olist_str = None, zeros: int = 0, mask: _onumeric = None, seed: _oint = None) -> _tmc:
"""
The method generates a Markov chain of given size with random transition probabilities.
| **Notes:**
- In the mask parameter, undefined transition probabilities are represented by *NaN* values.
:param size: the size of the Markov chain.
:param states: the name of each state (*if omitted, an increasing sequence of integers starting at 1*).
:param zeros: the number of null transition probabilities.
:param mask: a matrix representing locations and values of fixed transition probabilities.
:param seed: a seed to be used as RNG initializer for reproducibility purposes.
:raises ValidationError: if any input argument is not compliant.
"""
try:
rng = _create_rng(seed)
size = _validate_integer(size, lower_limit=(2, False))
if states is not None: # pragma: no cover
states = _validate_labels_input(states, size)
zeros = _validate_integer(zeros, lower_limit=(0, False))
mask = _np.full((size, size), _np.nan, dtype=float) if mask is None else _validate_mask(mask, size, size)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
p, states_out, error_message = _random(rng, size, zeros, mask)
if error_message is not None: # pragma: no cover
raise _ValidationError(error_message)
if states is None:
states = states_out
mc = MarkovChain(p, states)
return mc
# noinspection PyIncorrectDocstring
[docs] @staticmethod
@_object_mark(instance_generator=True, random_output=True)
def random_distribution(size: int, f: _trandfunc_flex, states: _olist_str = None, seed: _oint = None, **kwargs) -> _tmc:
r"""
The method generates a Markov chain of given size using draws from a `Numpy <https://numpy.org/>`_ random distribution function.
| **Notes:**
- *NaN* values are replaced with zeros
- Infinite values are replaced with finite numbers.
- Negative values are clipped to zero.
- In transition matrix rows with no positive values the states are assumed to be uniformly distributed.
- In light of the above points, random distribution functions must be carefully parametrized.
:param size: the size of the Markov chain.
:param f: the Numpy random distribution function or the name of the Numpy random distribution function.
:param states: the name of each state (*if omitted, an increasing sequence of integers starting at 1*).
:param seed: a seed to be used as RNG initializer for reproducibility purposes.
:param \**kwargs: additional arguments passed to the random distribution function.
:raises ValidationError: if any input argument is not compliant.
"""
if MarkovChain.__random_distributions is None:
MarkovChain.__random_distributions = _get_numpy_random_distributions()
try:
rng = _create_rng(seed)
size = _validate_integer(size, lower_limit=(2, False))
f = _validate_random_distribution(f, rng, MarkovChain.__random_distributions)
states = _create_labels(size) if states is None else _validate_labels_input(states, size)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
p = f(size=(size, size), **kwargs).astype(float)
p = _np.clip(_np.nan_to_num(p, copy=False), 0.0, None)
p[_np.where(~p.any(axis=1)), :] = _np.ones(p.shape[1], dtype=float)
p /= _np.sum(p, axis=1, keepdims=True)
mc = MarkovChain(p, states)
return mc
[docs] @staticmethod
@_object_mark(instance_generator=True)
def urn_model(model: str, n: int, states: _olist_str = None) -> _tmc:
"""
The method generates a Markov chain of size **2N + 1** based on the specified urn model.
:param model:
- **bernoulli-laplace** for the Bernoulli-Laplace urn model;
- **ehrenfest** for the Ehrenfest urn model.
:param n: the number of elements in each urn.
:param states: the name of each state (*if omitted, an increasing sequence of integers starting at 1*).
:raises ValidationError: if any input argument is not compliant.
"""
try:
model = _validate_enumerator(model, ['bernoulli-laplace', 'ehrenfest'])
n = _validate_integer(n, lower_limit=(1, False))
if states is not None: # pragma: no cover
states = _validate_labels_input(states, (2 * n) + 1)
except Exception as ex: # pragma: no cover
raise _create_validation_error(ex, _ins.trace()) from None
p, states_out, _ = _urn_model(model, n)
if states is None:
states = states_out
mc = MarkovChain(p, states)
return mc