Source code for relife.stochastic_process.renewal_process

# pyright: basic

import copy
from typing import Any, TypedDict

import numpy as np
from numpy.typing import NDArray

from relife.base import ParametricModel
from relife.economic import ExponentialDiscounting, Reward
from relife.lifetime_model import (
    LeftTruncatedModel,
)
from relife.stochastic_process._sample import StochasticSampleMapping
from relife.typing import AnyParametricLifetimeModel
from relife.utils import get_model_nb_assets

from ._renewal_equations import (
    delayed_renewal_equation_solver,
    renewal_equation_solver,
)


def _make_timeline(tf: float, nb_steps: int) -> NDArray[np.float64]:
    timeline = np.linspace(0, tf, nb_steps, dtype=np.float64)  # (nb_steps,)
    return np.atleast_2d(timeline)  # (1, nb_steps) to ensure broadcasting


class LifetimeFitArgs(TypedDict):
    time: NDArray[np.floating]
    event: NDArray[np.bool_]
    entry: NDArray[np.floating]
    args: tuple[NDArray[np.floating], ...]


[docs] class RenewalProcess(ParametricModel): # noinspection PyUnresolvedReferences """Renewal process. Parameters ---------- lifetime_model : any lifetime distribution or frozen lifetime model A lifetime model representing the durations between events. first_lifetime_model : any lifetime distribution or frozen lifetime model, optional A lifetime model for the first renewal (delayed renewal process). It is None by default Attributes ---------- lifetime_model : any lifetime distribution or frozen lifetime model A lifetime model representing the durations between events. first_lifetime_model : any lifetime distribution or frozen lifetime model, optional A lifetime model for the first renewal (delayed renewal process). It is None by default nb_params params params_names """ lifetime_model: AnyParametricLifetimeModel[()] first_lifetime_model: AnyParametricLifetimeModel[()] | None def __init__( self, lifetime_model: AnyParametricLifetimeModel[()], first_lifetime_model: AnyParametricLifetimeModel[()] | None = None, ) -> None: super().__init__() self.lifetime_model = lifetime_model self.first_lifetime_model = first_lifetime_model
[docs] def renewal_function( self, tf: float, nb_steps: int ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: r"""The renewal function. The renewal function gives the expected total number of renewals. It is computed by solving the renewal equation: .. math:: m(t) = F_1(t) + \int_0^t m(t-x) \mathrm{d}F(x) where: - :math:`m` is the renewal function. - :math:`F` is the cumulative distribution function of the underlying lifetime model. - :math:`F_1` is the cumulative distribution function of the underlying lifetime model for the fist renewal in the case of a delayed renewal process. Parameters ---------- tf : float The final time. nb_steps : int The number of steps used to discretized the time. Returns ------- tuple of two ndarrays A tuple containing the timeline and the computed values. References ---------- .. [1] Rausand, M., Barros, A., & Hoyland, A. (2020). System Reliability Theory: Models, Statistical Methods, and Applications. John Wiley & Sons. """ timeline = _make_timeline(tf, nb_steps) # (nb_steps,) or (1, nb_steps) renewal_function = renewal_equation_solver( timeline, self.lifetime_model, self.lifetime_model.cdf if self.first_lifetime_model is None else self.first_lifetime_model.cdf, ) return np.squeeze(timeline), np.squeeze(renewal_function)
[docs] def renewal_density( self, tf: float, nb_steps: int ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: r"""The renewal density. The renewal density corresponds to the derivative of the renewal function with respect to time. It is computed by solving the renewal equation: .. math:: \mu(t) = f_1(t) + \int_0^t \mu(t-x) \mathrm{d}F(x) where: - :math:`\mu` is the renewal function. - :math:`F` is the cumulative distribution function of the underlying lifetime model. - :math:`f_1` is the probability density function of the underlying lifetime model for the fist renewal in the case of a delayed renewal process. Parameters ---------- tf : float The final time. nb_steps : int The number of steps used to discretized the time. Returns ------- tuple of two ndarrays A tuple containing the timeline and the computed values. References ---------- .. [1] Rausand, M., Barros, A., & Hoyland, A. (2020). System Reliability Theory: Models, Statistical Methods, and Applications. John Wiley & Sons. """ timeline = _make_timeline(tf, nb_steps) #  (nb_steps,) or (m, nb_steps) renewal_density = renewal_equation_solver( timeline, self.lifetime_model, self.lifetime_model.pdf if self.first_lifetime_model is None else self.first_lifetime_model.pdf, ) return np.squeeze(timeline), np.squeeze(renewal_density)
[docs] def sample( self, nb_samples: int, time_window: tuple[float, float], seed=None ) -> StochasticSampleMapping: """Renewal data sampling. This function will sample data and encapsulate them in an object. Parameters ---------- nb_samples : int The size of the desired sample time_window : tuple of two floats Time window in which data are sampled seed : int, optional Random seed, by default None. """ from ._sample import RenewalProcessIterable iterable = RenewalProcessIterable(self, nb_samples, time_window, seed=seed) struct_array = np.concatenate(tuple(iterable)) struct_array = np.sort( struct_array, order=("asset_id", "sample_id", "timeline") ) return StochasticSampleMapping.from_struct_array( struct_array, get_model_nb_assets(self), nb_samples )
[docs] def generate_failure_data( self, nb_samples: int, time_window: tuple[float, float], seed=None ) -> dict[str, Any]: """Generate lifetime data This function will generate lifetime data that can be used to fit a lifetime model. Parameters ---------- nb_samples : int The size of the desired sample time_window : tuple of two floats Time window in which data are sampled seed : int, optional Random seed, by default None. Returns ------- A dict of time, event, entry and args (covariates) """ from relife.base import FrozenParametricModel from ._sample import RenewalProcessIterable if ( self.first_lifetime_model is not None and self.first_lifetime_model != self.lifetime_model ): if isinstance(self.first_lifetime_model, FrozenParametricModel): if ( isinstance( self.first_lifetime_model._unfrozen_model, LeftTruncatedModel ) and self.first_lifetime_model._unfrozen_model.baseline == self.lifetime_model ): pass else: raise ValueError( "Calling sample_lifetime_data with lifetime_model different from first_lifetime_model is ambiguous." ) iterable = RenewalProcessIterable(self, nb_samples, time_window, seed=seed) struct_array = np.concatenate(tuple(iterable)) struct_array = np.sort( struct_array, order=("sample_id", "asset_id", "timeline") ) args_2d = tuple( (np.atleast_2d(arg) for arg in getattr(self.lifetime_model, "args", ())) ) tuple_args_arr = tuple( ( np.take(np.asarray(arg), struct_array["asset_id"], axis=0) for arg in args_2d ) ) returned_dict = { "time": struct_array["time"].copy(), "event": struct_array["event"].copy(), "entry": struct_array["entry"].copy(), "args": tuple_args_arr, } return returned_dict
[docs] class RenewalRewardProcess(RenewalProcess): # noinspection PyUnresolvedReferences """Renewal reward process. Parameters ---------- lifetime_model : any lifetime distribution or frozen lifetime model A lifetime model representing the durations between events. reward : Reward A reward object that answers costs or conditional costs given lifetime values discounting_rate : float The discounting rate value used in the exponential discounting function first_lifetime_model : any lifetime distribution or frozen lifetime model, optional A lifetime model for the first renewal (delayed renewal process). It is None by default reward : Reward A reward object for the first renewal Attributes ---------- lifetime_model : any lifetime distribution or frozen lifetime model A lifetime model representing the durations between events. first_lifetime_model : any lifetime distribution or frozen lifetime model, optional A lifetime model for the first renewal (delayed renewal process). It is None by default reward : Reward A reward object that answers costs or conditional costs given lifetime values first_reward : Reward A reward object for the first renewal. If it is not given at the initialization, it is a copy of reward. discounting_rate nb_params params params_names """ lifetime_model: AnyParametricLifetimeModel[()] first_lifetime_model: AnyParametricLifetimeModel[()] | None reward: Reward first_reward: Reward discounting: ExponentialDiscounting def __init__( self, lifetime_model: AnyParametricLifetimeModel[()], reward: Reward, discounting_rate: float = 0.0, first_lifetime_model: AnyParametricLifetimeModel[()] | None = None, first_reward: Reward | None = None, ) -> None: super().__init__(lifetime_model, first_lifetime_model) self.reward = reward self.first_reward = ( first_reward if first_reward is not None else copy.deepcopy(reward) ) self.discounting = ExponentialDiscounting(discounting_rate) @property def discounting_rate(self) -> float: """ The discounting rate value """ return self.discounting.rate @discounting_rate.setter def discounting_rate(self, value: float) -> None: self.discounting.rate = value
[docs] def expected_total_reward( self, tf: float, nb_steps: int ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: r"""The expected total reward. The renewal equation solved to compute the expected reward is: .. math:: z(t) = \int_0^t E[Y | X = x] e^{-\delta x} \mathrm{d}F(x) + \int_0^t z(t-x) e^{-\delta x}\mathrm{d}F(x) where: - :math:`z` is the expected total reward. - :math:`F` is the cumulative distribution function of the underlying lifetime model. - :math:`X` the interarrival random variable. - :math:`Y` the associated reward. - :math:`D` the exponential discount factor. If the renewal reward process is delayed, the expected total reward is modified as: .. math:: z_1(t) = \int_0^t E[Y_1 | X_1 = x] e^{-\delta x} \mathrm{d}F_1(x) + \int_0^t z(t-x) e^{-\delta x} \mathrm{d}F_1(x) where: - :math:`z_1` is the expected total reward with delay. - :math:`F_1` is the cumulative distribution function of the lifetime model for the first renewal. - :math:`X_1` the interarrival random variable of the first renewal. - :math:`Y_1` the associated reward of the first renewal. Parameters ---------- tf : float The final time. nb_steps : int The number of steps used to discretized the time. Returns ------- tuple of two ndarrays A tuple containing the timeline and the computed values. """ timeline = _make_timeline(tf, nb_steps) #  (nb_steps,) or (m, nb_steps) z = renewal_equation_solver( timeline, self.lifetime_model, lambda t: self.lifetime_model.ls_integrate( lambda x: ( self.reward.conditional_expectation(x) * self.discounting.factor(x) ), np.zeros_like(t), np.asarray(t), deg=15, ), # reward partial expectation discounting=self.discounting, ) if self.first_lifetime_model is not None: z = delayed_renewal_equation_solver( timeline, z, self.first_lifetime_model, lambda t: self.first_lifetime_model.ls_integrate( lambda x: ( self.first_reward.conditional_expectation(x) * self.discounting.factor(x) ), np.zeros_like(t), np.asarray(t), deg=15, ), # reward partial expectation discounting=self.discounting, ) return np.squeeze(timeline), np.squeeze( z ) # (nb_steps,), (nb_steps,) or (m, nb_steps)
[docs] def asymptotic_expected_total_reward(self) -> np.float64 | NDArray[np.float64]: r"""Asymptotic expected total reward. The asymptotic expected total reward is: .. math:: z^\infty = \lim_{t\to \infty} z(t) = \dfrac{E\left[Y e^{-\delta X}\right]}{1-E\left[e^{-\delta X}\right]} where: - :math:`X` the interarrival random variable. - :math:`Y` the associated reward. - :math:`D` the exponential discount factor. If the renewal reward process is delayed, the asymptotic expected total reward is modified as: .. math:: z_1^\infty = E\left[Y_1 e^{-\delta X_1}\right] + z^\infty E\left[e^{-\delta X_1}\right] where: - :math:`X_1` the interarrival random variable of the first renewal. - :math:`Y_1` the associated reward of the first renewal. Returns ------- ndarray The assymptotic expected total reward of the process. """ lf = self.lifetime_model.ls_integrate( lambda x: self.discounting.factor(x), np.float64(0.0), np.asarray(np.inf), deg=100, ) # () or (m, 1) if self.discounting_rate == 0.0: return np.full_like(np.squeeze(lf), np.inf) ly = self.lifetime_model.ls_integrate( lambda x: ( self.discounting.factor(x) * self.reward.conditional_expectation(x) ), 0.0, np.inf, deg=100, ) # () or (m, 1) z = np.squeeze(ly / (1 - lf)) # () or (m,) if self.first_lifetime_model is not None: lf1 = np.squeeze( self.first_lifetime_model.ls_integrate( lambda x: self.discounting.factor(x), 0.0, np.inf, deg=100 ) ) # () or (m,) ly1 = np.squeeze( self.first_lifetime_model.ls_integrate( lambda x: ( self.discounting.factor(x) * self.first_reward.conditional_expectation(x) ), 0.0, np.inf, deg=100, ) ) # () or (m,) z = ly1 + z * lf1 # () or (m,) return z # () or (m,)
[docs] def expected_equivalent_annual_worth( self, tf: float, nb_steps: int ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: """Expected equivalent annual worth. Gives the equivalent annual worth of the expected total reward of the process at each point of the timeline. The equivalent annual worth at time :math:`t` is equal to the expected total reward :math:`z` divided by the annuity factor :math:`AF(t)`. Parameters ---------- tf : float The final time. nb_steps : int The number of steps used to discretized the time. Returns ------- tuple of two ndarrays A tuple containing the timeline and the computed values. """ timeline, z = self.expected_total_reward( tf, nb_steps ) # timeline : (nb_steps,) z : (nb_steps,) or (m, nb_steps) af = self.discounting.annuity_factor(timeline) # (nb_steps,) if z.ndim == 2 and af.shape != z.shape: # (m, nb_steps) af = np.tile(af, (z.shape[0], 1)) # (m, nb_steps) q = z / (af + 1e-6) # # (nb_steps,) or (m, nb_steps) avoid zero division if self.first_lifetime_model is not None: q0 = self.first_reward.conditional_expectation( np.asarray(0.0) ) * self.first_lifetime_model.pdf(0.0) else: q0 = self.reward.conditional_expectation( np.asarray(0.0) ) * self.lifetime_model.pdf(0.0) # q0 : () or (m, 1) q0 = np.broadcast_to(q0, af.shape) # (), (nb_steps,) or (m, nb_steps) eeac = np.where(af == 0, q0, q) # (nb_steps,) or (m, nb_steps) return np.squeeze(timeline), np.squeeze( eeac ) # (nb_steps,) and (nb_steps) or (m, nb_steps)
[docs] def asymptotic_expected_equivalent_annual_worth( self, ) -> np.float64 | NDArray[np.float64]: """Asymptotic expected equivalent annual worth. Returns ------- ndarray The assymptotic expected equivalent annual worth. """ if self.discounting_rate == 0.0: return np.squeeze( np.asarray( self.lifetime_model.ls_integrate( lambda x: self.reward.conditional_expectation(x), np.float64(0.0), np.asarray(np.inf), deg=100, ), dtype=float, ) / self.lifetime_model.mean() ) # () or (m,) return ( self.discounting_rate * self.asymptotic_expected_total_reward() ) # () or (m,)