Source code for aleatory.processes.euler_maruyama.cir_process

from aleatory.processes.base import SPEulerMaruyama
import numpy as np
from scipy.stats import ncx2


[docs]class CIRProcess(SPEulerMaruyama): r""" Cox–Ingersoll–Ross Process .. image:: _static/cir_process_drawn.png A Cox–Ingersoll–Ross process :math:`X = \{X : t \geq 0\}` is characterised by the following Stochastic Differential Equation .. math:: dX_t = \theta(\mu - X_t) dt + \sigma \sqrt{X_t} dW_t, \ \ \ \ \forall t\in (0,T], with initial condition :math:`X_0 = x_0`, where - :math:`\theta` is the rate of mean reversion - :math:`\mu` is the long term mean value. - :math:`\sigma>0` is the instantaneous volatility - :math:`W_t` is a standard Brownian Motion. Each :math:`X_t` follows a non-central chi-square distribution. :param float theta: the parameter :math:`\theta` in the above SDE :param float mu: the parameter :math:`\mu` in the above SDE :param float sigma: the parameter :math:`\sigma>0` in the above SDE :param float initial: the initial condition :math:`x_0` in the above SDE :param float T: the right hand endpoint of the time interval :math:`[0,T]` for the process :param numpy.random.Generator rng: a custom random number generator """ def __init__(self, theta=1.0, mu=2.0, sigma=0.5, initial=5.0, T=1.0, rng=None): super().__init__(T=T, rng=rng, initial=initial) self.theta = theta self.mu = mu self.sigma = sigma self.n = 1.0 self.dt = 1.0 * self.T / self.n self.times = np.arange(0.0, self.T + self.dt, self.dt) self.name = "CIR Process" def f(x, _): # return self.theta * (self.mu - x) return np.exp(-x) * (self.theta * (self.mu - np.exp(x)) - 0.5 * self.sigma ** 2) def g(x, _): # return self.sigma * np.sqrt(x) return self.sigma * np.exp(-0.5 * x) self.f = f self.g = g @property def theta(self): return self._theta @theta.setter def theta(self, value): if value < 0: raise ValueError("theta must be positive") self._theta = value @property def sigma(self): return self._sigma @sigma.setter def sigma(self, value): if value <= 0: raise ValueError("sigma has to be positive") if 2 * self.theta * self.mu <= value ** 2: raise ValueError("Condition 2*theta*mu >= sigma**2 must be satisfied") self._sigma = value def __str__(self): return "Cox–Ingersoll–Ross process with parameters {speed}, {mean}, and {volatility} on [0, {T}].".format( T=str(self.T), speed=str(self.theta), mean=str(self.mu), volatility=str(self.sigma)) def _process_expectation(self, times=None): if times is None: times = self.times expectations = self.initial * np.exp((-1.0) * self.theta * times) + self.mu * ( np.ones(len(times)) - np.exp((-1.0) * self.theta * times)) return expectations def marginal_expectation(self, times=None): expectations = self._process_expectation(times=times) return expectations def _process_variance(self, times=None): if times is None: times = self.times variances = (self.sigma ** 2 / self.theta) * self.initial * ( np.exp(-1.0 * self.theta * times) - np.exp(-2.0 * self.theta * times)) + ( self.mu * self.sigma ** 2 / (2 * self.theta)) * ( (np.ones(len(times)) - np.exp(-1.0 * self.theta * times)) ** 2) return variances def _process_degrees_of_freedom(self): df = 4.0 * self.theta * self.mu / (self.sigma ** 2) return df def marginal_df(self): return self._process_degrees_of_freedom() def _process_nc_parameter(self, times=None): if times is None: times = self.times c = 2.0 * self.theta / ((1.0 - np.exp(-1.0 * self.theta * times)) * self.sigma ** 2) ncs = 2.0 * c * self.initial * np.exp(-1.0 * self.theta * times) return ncs def marginal_nc_parameter(self, times=None): ncs = self._process_nc_parameter(times=times) return ncs def _process_scales(self, times=None): if times is None: times = self.times c = 2.0 * self.theta / ((1.0 - np.exp(-1.0 * self.theta * times)) * self.sigma ** 2) scales = 1.0 / (2.0 * c) return scales def marginal_scale(self, times=None): scales = self._process_scales(times=times) return scales def marginal_variance(self, times=None): variances = self._process_variance(times=times) return variances def _process_stds(self): stds = np.sqrt(self._process_variance()) return stds def process_stds(self): stds = self._process_stds() return stds def get_marginal(self, t): a = self.theta b = self.mu sigma = self.sigma c = 2.0 * a / ((1.0 - np.exp(-1.0 * a * t)) * sigma ** 2) df = 4.0 * a * b / sigma ** 2 nc = 2.0 * c * self.initial * np.exp(-1.0 * a * t) scale = 1.0 / (2 * c) marginal = ncx2(df, nc, scale=scale) return marginal def sample(self, n): return self._sample_em_process(n, log=True)