Source code for aleatory.processes.euler_maruyama.cir_process

from aleatory.processes.base_eu import SPEulerMaruyama
import numpy as np
from scipy.stats import ncx2


[docs]class CIRProcess(SPEulerMaruyama): r""" Cox–Ingersoll–Ross (CIR) Process ================================ .. image:: ../_static/cir_process_drawn.png Notes ----- A Cox–Ingersoll–Ross process :math:`X = \{X : t \geq 0\}` is characterised by the following Stochastic Differential Equation .. math:: dX_t = \theta(\mu - X_t) dt + \sigma \sqrt{X_t} dW_t, \ \ \ \ \forall t\in (0,T], with initial condition :math:`X_0 = x_0`, where - :math:`\theta` is the rate of mean reversion - :math:`\mu` is the long term mean value. - :math:`\sigma>0` is the instantaneous volatility - :math:`W_t` is a standard Brownian Motion. It can be seen that each :math:`X_t` follows a non-central chi-square distribution. Constructor, Methods, and Attributes ------------------------------------ """
[docs] def __init__(self, theta=1.0, mu=2.0, sigma=0.5, initial=5.0, T=1.0, rng=None): r""" :param float theta: the parameter :math:`\theta` in the above SDE :param float mu: the parameter :math:`\mu` in the above SDE :param float sigma: the parameter :math:`\sigma>0` in the above SDE :param float initial: the initial condition :math:`x_0` in the above SDE :param float T: the right hand endpoint of the time interval :math:`[0,T]` for the process :param numpy.random.Generator rng: a custom random number generator """ super().__init__(T=T, rng=rng, initial=initial) self.theta = theta self.mu = mu self.sigma = sigma self.n = 1.0 self.dt = 1.0 * self.T / self.n self.times = np.arange(0.0, self.T + self.dt, self.dt) self.name = ( f"CIR Process $X(\\theta={self.theta}, \\mu={self.mu}, \\sigma={self.sigma})$" f"\n starting at $x_0=${self.initial}" ) def f(x, _): # return self.theta * (self.mu - x) return np.exp(-x) * ( self.theta * (self.mu - np.exp(x)) - 0.5 * self.sigma**2 ) def g(x, _): # return self.sigma * np.sqrt(x) return self.sigma * np.exp(-0.5 * x) self.f = f self.g = g
@property def theta(self): return self._theta @theta.setter def theta(self, value): if value <= 0: raise ValueError("theta must be positive") self._theta = value @property def mu(self): return self._mu @mu.setter def mu(self, value): if value <= 0: raise ValueError("mu must be positive") self._mu = value @property def sigma(self): return self._sigma @sigma.setter def sigma(self, value): if value <= 0: raise ValueError("sigma has to be positive") if 2 * self.theta * self.mu <= value**2: raise ValueError("Condition 2*theta*mu >= sigma**2 must be satisfied") self._sigma = value def __str__(self): return "Cox–Ingersoll–Ross process with parameters {speed}, {mean}, and {volatility} on [0, {T}].".format( T=str(self.T), speed=str(self.theta), mean=str(self.mu), volatility=str(self.sigma), ) def _process_expectation(self, times=None): if times is None: times = self.times expectations = self.initial * np.exp((-1.0) * self.theta * times) + self.mu * ( np.ones(len(times)) - np.exp((-1.0) * self.theta * times) ) return expectations def marginal_expectation(self, times=None): expectations = self._process_expectation(times=times) return expectations def _process_variance(self, times=None): if times is None: times = self.times variances = (self.sigma**2 / self.theta) * self.initial * ( np.exp(-1.0 * self.theta * times) - np.exp(-2.0 * self.theta * times) ) + (self.mu * self.sigma**2 / (2 * self.theta)) * ( (np.ones(len(times)) - np.exp(-1.0 * self.theta * times)) ** 2 ) return variances def _process_degrees_of_freedom(self): df = 4.0 * self.theta * self.mu / (self.sigma**2) return df def marginal_df(self): return self._process_degrees_of_freedom() def _process_nc_parameter(self, times=None): if times is None: times = self.times c = ( 2.0 * self.theta / ((1.0 - np.exp(-1.0 * self.theta * times)) * self.sigma**2) ) ncs = 2.0 * c * self.initial * np.exp(-1.0 * self.theta * times) return ncs def marginal_nc_parameter(self, times=None): ncs = self._process_nc_parameter(times=times) return ncs def _process_scales(self, times=None): if times is None: times = self.times c = ( 2.0 * self.theta / ((1.0 - np.exp(-1.0 * self.theta * times)) * self.sigma**2) ) scales = 1.0 / (2.0 * c) return scales def marginal_scale(self, times=None): scales = self._process_scales(times=times) return scales def marginal_variance(self, times=None): variances = self._process_variance(times=times) return variances def _process_stds(self): stds = np.sqrt(self._process_variance()) return stds def process_stds(self): stds = self._process_stds() return stds def get_marginal(self, t): a = self.theta b = self.mu sigma = self.sigma c = 2.0 * a / ((1.0 - np.exp(-1.0 * a * t)) * sigma**2) df = 4.0 * a * b / sigma**2 nc = 2.0 * c * self.initial * np.exp(-1.0 * a * t) scale = 1.0 / (2 * c) marginal = ncx2(df, nc, scale=scale) return marginal def sample(self, n): return self._sample_em_process(n, log=True)
# if __name__ == "__main__": # # import matplotlib.pyplot as plt # # qs = "https://raw.githubusercontent.com/quantgirluk/matplotlib-stylesheets/main/quant-pastel-light.mplstyle" # plt.style.use(qs) # # p1 = CIRProcess() # p2 = CIRProcess(theta=1.0, mu=2.0, sigma=1.0, initial=7.0, T=5.0) # p3 = CIRProcess(theta=1.0, mu=10.0, sigma=2.0, initial=1.0, T=10.0) # p4 = CIRProcess(theta=1.0, mu=10.0, sigma=0.2, initial=1.0, T=1.0) # # for p, cm in [ # (p1, "terrain"), # (p2, "RdPu"), # (p3, "Oranges"), # (p4, "Blues"), # ]: # # p.draw( # n=500, # N=300, # figsize=(12, 7), # style=qs, # colormap=cm, # envelope=True, # ) # # p1.plot(n=500, N=10, figsize=(12, 7), style=qs) # p1.draw(n=500, N=300, figsize=(12, 7), style=qs, envelope=True)