Source code for relife.stochastic_process.renewal_process

import copy

import numpy as np

from relife.base import ParametricModel
from relife.economic import ExponentialDiscounting
from relife.lifetime_model import LeftTruncatedModel
from relife.stochastic_process._sample._data import build_data_sample_from_iterable
from relife.stochastic_process._renewal_equations import (
    delayed_renewal_equation_solver,
    renewal_equation_solver,
)
from relife.utils import get_model_nb_assets, is_frozen, reshape_1d_arg


def _make_timeline(tf, nb_steps):
    timeline = np.linspace(0, tf, nb_steps, dtype=np.float64)  # (nb_steps,)
    return np.atleast_2d(timeline)  # (1, nb_steps) to ensure broadcasting


[docs] class RenewalProcess(ParametricModel): # noinspection PyUnresolvedReferences """Renewal process. Parameters ---------- lifetime_model : any lifetime distribution or frozen lifetime model A lifetime model representing the durations between events. first_lifetime_model : any lifetime distribution or frozen lifetime model, optional A lifetime model for the first renewal (delayed renewal process). It is None by default Attributes ---------- lifetime_model : any lifetime distribution or frozen lifetime model A lifetime model representing the durations between events. first_lifetime_model : any lifetime distribution or frozen lifetime model, optional A lifetime model for the first renewal (delayed renewal process). It is None by default nb_params params params_names """ def __init__(self, lifetime_model, first_lifetime_model=None): super().__init__() self.lifetime_model = lifetime_model if first_lifetime_model is not None: lifetime_model_nb_assets = get_model_nb_assets(lifetime_model) first_lifetime_model_nb_assets = get_model_nb_assets(first_lifetime_model) if ( lifetime_model_nb_assets != 1 and first_lifetime_model_nb_assets != 1 and lifetime_model_nb_assets != first_lifetime_model_nb_assets ): raise ValueError( f""" Args of the first_lifetime_model and lifetime_model should have coherent shapes. Got {first_lifetime_model_nb_assets, lifetime_model_nb_assets} """ ) self.first_lifetime_model = first_lifetime_model
[docs] def renewal_function(self, tf, nb_steps): r"""The renewal function. The renewal function gives the expected total number of renewals. It is computed by solving the renewal equation: .. math:: m(t) = F_1(t) + \int_0^t m(t-x) \mathrm{d}F(x) where: - :math:`m` is the renewal function. - :math:`F` is the cumulative distribution function of the underlying lifetime model. - :math:`F_1` is the cumulative distribution function of the underlying lifetime model for the fist renewal in the case of a delayed renewal process. Parameters ---------- tf : float The final time. nb_steps : int The number of steps used to discretized the time. Returns ------- tuple of two ndarrays A tuple containing the timeline and the computed values. References ---------- .. [1] Rausand, M., Barros, A., & Hoyland, A. (2020). System Reliability Theory: Models, Statistical Methods, and Applications. John Wiley & Sons. """ timeline = _make_timeline(tf, nb_steps) # (nb_steps,) or (1, nb_steps) renewal_function = renewal_equation_solver( timeline, self.lifetime_model, self.lifetime_model.cdf if self.first_lifetime_model is None else self.first_lifetime_model.cdf, ) return np.squeeze(timeline), np.squeeze(renewal_function)
[docs] def renewal_density(self, tf, nb_steps): r"""The renewal density. The renewal density corresponds to the derivative of the renewal function with respect to time. It is computed by solving the renewal equation: .. math:: \mu(t) = f_1(t) + \int_0^t \mu(t-x) \mathrm{d}F(x) where: - :math:`\mu` is the renewal function. - :math:`F` is the cumulative distribution function of the underlying lifetime model. - :math:`f_1` is the probability density function of the underlying lifetime model for the fist renewal in the case of a delayed renewal process. Parameters ---------- tf : float The final time. nb_steps : int The number of steps used to discretized the time. Returns ------- tuple of two ndarrays A tuple containing the timeline and the computed values. References ---------- .. [1] Rausand, M., Barros, A., & Hoyland, A. (2020). System Reliability Theory: Models, Statistical Methods, and Applications. John Wiley & Sons. """ timeline = _make_timeline(tf, nb_steps) #  (nb_steps,) or (m, nb_steps) renewal_density = renewal_equation_solver( timeline, self.lifetime_model, self.lifetime_model.pdf if self.first_lifetime_model is None else self.first_lifetime_model.pdf, ) return np.squeeze(timeline), np.squeeze(renewal_density)
[docs] def sample(self, nb_samples, time_window, seed=None): """Renewal data sampling. This function will sample data and encapsulate them in an object. Parameters ---------- nb_samples : int The size of the desired sample time_window : tuple of two floats Time window in which data are sampled seed : int, optional Random seed, by default None. """ from ._sample import RenewalProcessIterable iterable = RenewalProcessIterable(self, nb_samples, time_window, seed=seed) return build_data_sample_from_iterable(iterable=iterable,time_window=time_window,nb_assets=get_model_nb_assets(self),nb_samples=nb_samples)
[docs] def generate_failure_data(self, nb_samples, time_window, seed=None): """Generate lifetime data This function will generate lifetime data that can be used to fit a lifetime model. Parameters ---------- nb_samples : int The size of the desired sample time_window : tuple of two floats Time window in which failure data are sampled seed : int, optional Random seed, by default None. Returns ------- A dict of time, event, entry and args (covariates) """ from ._sample import RenewalProcessIterable if self.first_lifetime_model is not None and self.first_lifetime_model != self.lifetime_model: if is_frozen(self.first_lifetime_model): if ( isinstance(self.first_lifetime_model.unfrozen_model, LeftTruncatedModel) and self.first_lifetime_model.unfrozen_model.baseline == self.lifetime_model ): pass else: raise ValueError( f"Calling sample_lifetime_data with lifetime_model different from first_lifetime_model is ambiguous." ) iterable = RenewalProcessIterable(self, nb_samples, time_window, seed=seed) struct_array = np.concatenate(tuple(iterable)) struct_array = np.sort(struct_array, order=("asset_id", "sample_id", "timeline")) args_2d = tuple((np.atleast_2d(reshape_1d_arg(arg)) for arg in getattr(self.lifetime_model, "args", ()))) tuple_args_arr = tuple((np.take(np.asarray(arg), struct_array["asset_id"], axis=0) for arg in args_2d)) returned_dict = { "time": struct_array["time"].copy(), "event": struct_array["event"].copy(), "entry": struct_array["entry"].copy(), "args": tuple_args_arr, } return returned_dict
[docs] class RenewalRewardProcess(RenewalProcess): # noinspection PyUnresolvedReferences """Renewal reward process. Parameters ---------- lifetime_model : any lifetime distribution or frozen lifetime model A lifetime model representing the durations between events. reward : Reward A reward object that answers costs or conditional costs given lifetime values discounting_rate : float The discounting rate value used in the exponential discounting function first_lifetime_model : any lifetime distribution or frozen lifetime model, optional A lifetime model for the first renewal (delayed renewal process). It is None by default reward : Reward A reward object for the first renewal Attributes ---------- lifetime_model : any lifetime distribution or frozen lifetime model A lifetime model representing the durations between events. first_lifetime_model : any lifetime distribution or frozen lifetime model, optional A lifetime model for the first renewal (delayed renewal process). It is None by default reward : Reward A reward object that answers costs or conditional costs given lifetime values first_reward : Reward A reward object for the first renewal. If it is not given at the initialization, it is a copy of reward. discounting_rate nb_params params params_names """ def __init__(self, lifetime_model, reward, discounting_rate=0.0, first_lifetime_model=None, first_reward=None): super().__init__(lifetime_model, first_lifetime_model) self.reward = reward self.first_reward = first_reward if first_reward is not None else copy.deepcopy(reward) self.discounting = ExponentialDiscounting(discounting_rate) @property def discounting_rate(self): """ The discounting rate value """ return self.discounting.rate @discounting_rate.setter def discounting_rate(self, value): self.discounting.rate = value
[docs] def expected_total_reward(self, tf, nb_steps): r"""The expected total reward. The renewal equation solved to compute the expected reward is: .. math:: z(t) = \int_0^t E[Y | X = x] e^{-\delta x} \mathrm{d}F(x) + \int_0^t z(t-x) e^{-\delta x}\mathrm{d}F(x) where: - :math:`z` is the expected total reward. - :math:`F` is the cumulative distribution function of the underlying lifetime model. - :math:`X` the interarrival random variable. - :math:`Y` the associated reward. - :math:`D` the exponential discount factor. If the renewal reward process is delayed, the expected total reward is modified as: .. math:: z_1(t) = \int_0^t E[Y_1 | X_1 = x] e^{-\delta x} \mathrm{d}F_1(x) + \int_0^t z(t-x) e^{-\delta x} \mathrm{d}F_1(x) where: - :math:`z_1` is the expected total reward with delay. - :math:`F_1` is the cumulative distribution function of the lifetime model for the first renewal. - :math:`X_1` the interarrival random variable of the first renewal. - :math:`Y_1` the associated reward of the first renewal. Parameters ---------- tf : float The final time. nb_steps : int The number of steps used to discretized the time. Returns ------- tuple of two ndarrays A tuple containing the timeline and the computed values. """ timeline = _make_timeline(tf, nb_steps) #  (nb_steps,) or (m, nb_steps) z = renewal_equation_solver( timeline, self.lifetime_model, lambda t: self.lifetime_model.ls_integrate( lambda x: self.reward.conditional_expectation(x) * self.discounting.factor(x), np.zeros_like(t), t, deg=15, ), # reward partial expectation discounting=self.discounting, ) if self.first_lifetime_model is not None: z = delayed_renewal_equation_solver( timeline, z, self.first_lifetime_model, lambda t: self.first_lifetime_model.ls_integrate( lambda x: self.first_reward.conditional_expectation(x) * self.discounting.factor(x), np.zeros_like(t), t, deg=15, ), # reward partial expectation discounting=self.discounting, ) return np.squeeze(timeline), np.squeeze(z) # (nb_steps,), (nb_steps,) or (m, nb_steps)
[docs] def asymptotic_expected_total_reward(self): r"""Asymptotic expected total reward. The asymptotic expected total reward is: .. math:: z^\infty = \lim_{t\to \infty} z(t) = \dfrac{E\left[Y e^{-\delta X}\right]}{1-E\left[e^{-\delta X}\right]} where: - :math:`X` the interarrival random variable. - :math:`Y` the associated reward. - :math:`D` the exponential discount factor. If the renewal reward process is delayed, the asymptotic expected total reward is modified as: .. math:: z_1^\infty = E\left[Y_1 e^{-\delta X_1}\right] + z^\infty E\left[e^{-\delta X_1}\right] where: - :math:`X_1` the interarrival random variable of the first renewal. - :math:`Y_1` the associated reward of the first renewal. Returns ------- ndarray The assymptotic expected total reward of the process. """ lf = self.lifetime_model.ls_integrate( lambda x: self.discounting.factor(x), 0.0, np.inf, deg=100 ) # () or (m, 1) if self.discounting_rate == 0.0: return np.full_like(np.squeeze(lf), np.inf) ly = self.lifetime_model.ls_integrate( lambda x: self.discounting.factor(x) * self.reward.conditional_expectation(x), 0.0, np.inf, deg=100 ) z = ly / (1 - lf) # () or (m, 1) if self.first_lifetime_model is not None: lf1 = self.first_lifetime_model.ls_integrate(lambda x: self.discounting.factor(x), 0.0, np.inf, deg=100) ly1 = self.first_lifetime_model.ls_integrate( lambda x: self.discounting.factor(x) * self.first_reward.conditional_expectation(x), 0.0, np.inf, deg=100, ) z = ly1 + z * lf1 # () or (m, 1) return np.squeeze(z) # () or (m,)
[docs] def expected_equivalent_annual_worth(self, tf, nb_steps): """Expected equivalent annual worth. Gives the equivalent annual worth of the expected total reward of the process at each point of the timeline. The equivalent annual worth at time :math:`t` is equal to the expected total reward :math:`z` divided by the annuity factor :math:`AF(t)`. Parameters ---------- tf : float The final time. nb_steps : int The number of steps used to discretized the time. Returns ------- tuple of two ndarrays A tuple containing the timeline and the computed values. """ timeline, z = self.expected_total_reward( tf, nb_steps ) # timeline : (nb_steps,) z : (nb_steps,) or (m, nb_steps) af = self.discounting.annuity_factor(timeline) # (nb_steps,) if z.ndim == 2 and af.shape != z.shape: # (m, nb_steps) af = np.tile(af, (z.shape[0], 1)) # (m, nb_steps) q = z / (af + 1e-6) # # (nb_steps,) or (m, nb_steps) avoid zero division if self.first_lifetime_model is not None: q0 = self.first_reward.conditional_expectation(0.0) * self.first_lifetime_model.pdf(0.0) else: q0 = self.reward.conditional_expectation(0.0) * self.lifetime_model.pdf(0.0) # q0 : () or (m, 1) q0 = np.broadcast_to(q0, af.shape) # (), (nb_steps,) or (m, nb_steps) eeac = np.where(af == 0, q0, q) # (nb_steps,) or (m, nb_steps) return np.squeeze(timeline), np.squeeze(eeac) # (nb_steps,) and (nb_steps) or (m, nb_steps)
[docs] def asymptotic_expected_equivalent_annual_worth(self): """Asymptotic expected equivalent annual worth. Returns ------- ndarray The assymptotic expected equivalent annual worth. """ if self.discounting_rate == 0.0: return np.squeeze( self.lifetime_model.ls_integrate(lambda x: self.reward.conditional_expectation(x), 0.0, np.inf, deg=100) / self.lifetime_model.mean() ) # () or (m,) return np.squeeze(self.discounting_rate * self.asymptotic_expected_total_reward()) # () or (m,)
[docs] def sample(self, nb_samples, time_window, seed=None): from ._sample import RenewalProcessIterable iterable = RenewalProcessIterable(self, nb_samples, time_window, seed=seed) return build_data_sample_from_iterable(iterable=iterable,time_window=time_window,nb_assets=get_model_nb_assets(self),nb_samples=nb_samples, is_reward=True)