Source code for aleatory.processes.analytical.bes

"""
Bessel Process BES
"""

import math
from functools import partial
from multiprocessing import Pool

import numpy as np
from scipy.special import eval_genlaguerre

from aleatory.processes.analytical.brownian_motion import BrownianMotion
from aleatory.processes.base_analytical import SPAnalytical
from aleatory.stats import ncx
from aleatory.utils.utils import (
    check_positive_integer,
    check_positive_number,
    get_times,
    sample_bessel_global,
)


def _sample_bessel_global(T, initial, dim, n):
    path = sample_bessel_global(T=T, initial=initial, dim=dim, n=n)
    return path


[docs]class BESProcess(SPAnalytical): r""" Bessel process ============== .. image:: ../_static/bes_process_drawn.png Notes ----- A Bessel process :math:`BES^{n}_{0},` for :math:`n\geq 2` integer is a continuous stochastic process :math:`\{X(t) : t \geq 0\}` characterised as the Euclidian norm of an :math:`n`-dimensional Brownian motion. That is, .. math:: X_t = \sqrt{\sum_{i=1}^n (W^i_t)^2}. More generally, for any :math:`\delta >0`, and :math:`x_0 \geq 0`, a Bessel process of dimension :math:`\delta` starting at :math:`x_0`, denoted by .. math:: BES_{{x_0}}^{{\delta}} can be defined by the following SDE .. math:: dX_t = \frac{(\delta-1)}{2} \frac{dt}{X_t} + dW_t \ \ \ \ t\in (0,T] with initial condition :math:`X_0 = x_0\geq 0.`, where - :math:`\delta` is a positive real - :math:`W_t` is a standard one-dimensional Brownian Motion. Constructor, Methods, and Attributes ------------------------------------ """
[docs] def __init__(self, dim=1.0, initial=0.0, T=1.0, rng=None): """ :param double dim: the dimension of the process :math:`n` :param double initial: the initial point of the process :math:`x_0` :param double T: the right hand endpoint of the time interval :math:`[0,T]` for the process :param numpy.random.Generator rng: a custom random number generator """ super().__init__(T=T, rng=rng, initial=initial) self.dim = dim self._brownian_motion = BrownianMotion(T=T, rng=rng) self.name = f"Bessel process $BES^{{{self.dim}}}_{{{self.initial}}}$" self.n = None self.times = None
def __str__(self): return "Bessel process with dimension {dim} and starting condition {initial} on [0, {T}].".format( T=str(self.T), dim=str(self.dim), initial=str(self.initial) ) def __repr__(self): return "BESProcess(dimension={dim}, initial={initial}, T={T})".format( T=str(self.T), dim=str(self.dim), initial=str(self.initial) ) @property def T(self): return self._T @T.setter def T(self, value): check_positive_number(value, "Time end") self._T = float(value) # Keep internal Brownian motion aligned with the process horizon. if hasattr(self, "_brownian_motion") and self._brownian_motion is not None: self._brownian_motion.T = self._T @property def dim(self): """Bessel Process dimension.""" return self._dim @dim.setter def dim(self, value): if value < 0: raise TypeError("Dimension must be positive") self._dim = value @property def initial(self): """Bessel Process initial point.""" return self._initial @initial.setter def initial(self, value): if value < 0: raise TypeError("Initial point must be positive") self._initial = value def _sample_bessel_integer_dim_zero(self, n): check_positive_integer(n) self.n = n self.times = get_times(self.T, n) brownian_samples = [self._brownian_motion.sample(n) for _ in range(self.dim)] norm = np.array([np.linalg.norm(coord) for coord in zip(*brownian_samples)]) return norm def sample(self, n): if isinstance(self.dim, int) and self.initial == 0: return self._sample_bessel_integer_dim_zero(n) else: return _sample_bessel_global(self.T, self.initial, self.dim, n) def simulate(self, n, N, T=None): """ Simulate paths/trajectories from the instanced stochastic process. :param int n: number of steps in each path :param int N: number of paths to simulate :param float T: optional right hand endpoint of the time interval [0, T] :return: list with N paths (each one is a numpy array of size n) """ if T is not None: self.T = T self.n = n self.N = N self.times = get_times(self.T, n) if isinstance(self.dim, int) and self.initial == 0: self.paths = [self.sample(n) for _ in range(N)] return self.paths else: pool = Pool() initial = self.initial dim = self.dim T = self.T func = partial(_sample_bessel_global, T, initial, dim) results = pool.map(func, [n] * N) pool.close() pool.join() self.paths = results return self.paths def _process_expectation(self, times=None): # TODO: Add the case when times is zero, at the moment this fails because nc required division by t if times is None: times = self.times alpha = (self.dim / 2.0) - 1.0 if np.isscalar(times): nc = (self.initial**2) / times expectations = ( np.sqrt(times) * math.sqrt(math.pi / 2.0) * eval_genlaguerre(0.5, alpha, (-1.0 / 2.0) * nc) ) else: nc = (self.initial**2) / times[1:] expectations = ( np.sqrt(times[1:]) * math.sqrt(math.pi / 2.0) * eval_genlaguerre(0.5, alpha, (-1.0 / 2.0) * nc) ) expectations = np.insert(expectations, 0, self.initial) return expectations def _process_variance(self, times=None): if times is None: times = self.times expectations = self._process_expectation(times) variances = self.dim * times + self.initial**2 - expectations**2 return variances def _process_stds(self, times=None): if times is None: times = self.times variances = self._process_variance(times=times) stds = np.sqrt(variances) return stds def get_marginal(self, t): marginal = ncx(df=self.dim, nc=self.initial / np.sqrt(t), scale=np.sqrt(t)) return marginal def marginal_expectation(self, times=None): expectations = self._process_expectation(times=times) return expectations def marginal_variance(self, times): variances = self._process_variance(times=times) return variances
if __name__ == "__main__": # import matplotlib.pyplot as plt qs = "https://raw.githubusercontent.com/quantgirluk/matplotlib-stylesheets/main/quant-pastel-light.mplstyle" # plt.style.use(qs) p1 = BESProcess() p2 = BESProcess(dim=4.0) p3 = BESProcess(initial=5.0, dim=2.5) p4 = BESProcess(initial=3.0, dim=1.25) p5 = BESProcess(initial=1.0, dim=3.0) p1.plot(n=200, N=5, figsize=(12, 7), style=qs) p1.draw(n=200, N=200, figsize=(12, 7), style=qs, envelope=True) for p, cm in [ (p1, "twilight"), (p2, "PuBuGn"), (p3, "Oranges"), (p4, "RdBu"), (p5, "Purples"), # (p6, "Oranges"), ]: p.draw( n=200, N=200, figsize=(12, 7), style=qs, colormap=cm, envelope=False, )