from __future__ import annotations
from typing import TYPE_CHECKING, Callable, Generic, Optional, TypeVar, Union
import numpy as np
from numpy.typing import NDArray
from typing_extensions import override
from relife import ParametricModel
from relife.data import LifetimeData
from relife.economic import (
Discounting,
ExponentialDiscounting,
Reward,
)
from relife.lifetime_model import (
FrozenAgeReplacementModel,
FrozenLeftTruncatedModel,
FrozenLifetimeRegression,
LifetimeDistribution,
)
from relife.sample import (
RenewalProcessIterator,
RenewalRewardProcessIterator,
)
if TYPE_CHECKING:
from relife.economic import Reward
from relife.sample import (
CountData,
CountDataFunctions,
)
M = TypeVar(
"M",
bound=Union[LifetimeDistribution, FrozenLifetimeRegression, FrozenAgeReplacementModel, FrozenLeftTruncatedModel],
)
R = TypeVar("R", bound=Reward)
[docs]
class RenewalProcess(ParametricModel, Generic[M]):
"""Renewal process.
Parameters
----------
lifetime_model : any lifetime distribution or frozen lifetime model
A lifetime model representing the durations between events.
first_lifetime_model : any lifetime distribution or frozen lifetime model, optional
A lifetime model for the first renewal (delayed renewal process). It is None by default
"""
count_data: Optional[CountData]
def __init__(
self,
lifetime_model: M,
first_lifetime_model: Optional[M] = None,
):
super().__init__()
self.lifetime_model = lifetime_model
self.first_lifetime_model = first_lifetime_model
self.count_data = None
@property
def sample(self) -> Optional[CountDataFunctions]:
if self.count_data is not None:
from relife.sample import CountDataFunctions
return CountDataFunctions(type(self), self.count_data)
return None
def _make_timeline(self, tf: float, nb_steps: int) -> NDArray[np.float64]:
timeline = np.linspace(0, tf, nb_steps, dtype=np.float64) # (nb_steps,)
args_nb_assets = getattr(self.lifetime_model, "args_nb_assets", 1) # default 1 for LifetimeDistribution case
if args_nb_assets > 1:
timeline = np.tile(timeline, (args_nb_assets, 1))
return timeline # (nb_steps,) or (m, nb_steps)
[docs]
def renewal_function(self, tf: float, nb_steps: int) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
r"""The renewal function.
Parameters
----------
tf : float
Time horizon. The renewal function will be computed up until this calendar time.
nb_steps : int
The number of steps used to compute the renewal function.
Returns
-------
tuple of two ndarrays
A tuple containing the timeline used to compute the renewal function and its corresponding values at each
step of the timeline.
Notes
-----
The expected total number of renewals is computed by solving the
renewal equation:
.. math::
m(t) = F_1(t) + \int_0^t m(t-x) \mathrm{d}F(x)
where:
- :math:`m` is the renewal function,
- :math:`F` is the cumulative distribution function of the underlying
lifetime model,
- :math:`F_1` is the cumulative distribution function of the underlying
lifetime model for the fist renewal in the case of a delayed renewal
process.
References
----------
.. [1] Rausand, M., Barros, A., & Hoyland, A. (2020). System Reliability
Theory: Models, Statistical Methods, and Applications. John Wiley &
Sons.
"""
timeline = self._make_timeline(tf, nb_steps) # (nb_steps,) or (m, nb_steps)
return timeline, renewal_equation_solver(
timeline,
self.lifetime_model,
self.lifetime_model.cdf if self.first_lifetime_model is None else self.first_lifetime_model.cdf,
)
[docs]
def renewal_density(self, tf: float, nb_steps: int) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
r"""The renewal density.
Parameters
----------
tf : float
Time horizon. The renewal density will be computed up until this calendar time.
nb_steps : int
The number of steps used to compute the renewal density.
Returns
-------
tuple of two ndarrays
A tuple containing the timeline used to compute the renewal density and its corresponding values at each
step of the timeline.
Notes
-----
The renewal density is the derivative of the renewal function with
respect to time. It is computed by solving the renewal equation:
.. math::
\mu(t) = f_1(t) + \int_0^t \mu(t-x) \mathrm{d}F(x)
where:
- :math:`\mu` is the renewal function,
- :math:`F` is the cumulative distribution function of the underlying
lifetime model,
- :math:`f_1` is the probability density function of the underlying
lifetime model for the fist renewal in the case of a delayed renewal
process.
References
----------
.. [1] Rausand, M., Barros, A., & Hoyland, A. (2020). System Reliability
Theory: Models, Statistical Methods, and Applications. John Wiley &
Sons.
"""
timeline = self._make_timeline(tf, nb_steps) # (nb_steps,) or (m, nb_steps)
return timeline, renewal_equation_solver(
timeline,
self.lifetime_model,
self.lifetime_model.pdf if self.first_lifetime_model is None else self.first_lifetime_model.pdf,
)
[docs]
def sample_count_data(
self,
tf: float,
t0: float = 0.0,
size: int | tuple[int] | tuple[int, int] = 1,
maxsample: int = 100000,
seed: Optional[int] = None,
) -> None:
"""Renewal data sampling.
This function will generate sampling data insternally. These data
Parameters
----------
tf : float
Time at the end of the observation.
t0 : float, default 0
Time at the beginning of the observation.
size : int or tuple of 2 int
Size of the sample
maxsample : int, optional
Maximum number of samples, by default 100000.
seed : int, optional
Random seed, by default None.
"""
from relife.sample import concatenate_renewal_data
iterator = RenewalProcessIterator(self, size, (t0, tf), seed=seed) # type: ignore
self.count_data = concatenate_renewal_data(iterator, maxsample)
def sample_lifetime_data(
self,
tf: float,
t0: float = 0.0,
size: int | tuple[int] | tuple[int, int] = 1,
maxsample: int = 1e5,
seed: Optional[int] = None,
) -> LifetimeData:
from relife.sample import concatenate_renewal_data
if self.first_lifetime_model is not None and self.first_lifetime_model != self.lifetime_model:
from relife.lifetime_model import FrozenLeftTruncatedModel
if (
isinstance(self.first_lifetime_model, FrozenLeftTruncatedModel)
and self.first_lifetime_model.unfreeze() == self.lifetime_model
):
pass
else:
raise ValueError(
f"Calling sample_lifetime_data with lifetime_model different from first_lifetime_model is ambiguous."
)
iterator = RenewalProcessIterator(self, size, (t0, tf), seed=seed)
count_data = concatenate_renewal_data(iterator, maxsample)
return LifetimeData(
count_data.struct_array["time"].copy(),
event=count_data.struct_array["event"].copy(),
entry=count_data.struct_array["entry"].copy(),
args=tuple(
(
np.take(arg, count_data.struct_array["asset_id"])
for arg in getattr(self.lifetime_model, "frozen_args", ())
)
),
)
[docs]
class RenewalRewardProcess(RenewalProcess[M], Generic[M, R]):
def __init__(
self,
lifetime_model: M,
reward: R,
discounting_rate: float = 0.0,
first_lifetime_model: Optional[M] = None,
first_reward: Optional[R] = None,
):
super().__init__(lifetime_model, first_lifetime_model)
self.reward = reward
self.first_reward = first_reward if first_reward is not None else reward
self.discounting = ExponentialDiscounting(discounting_rate)
@property
def discounting_rate(self) -> float:
return self.discounting.rate
@override
def _make_timeline(self, tf: float, nb_steps: int) -> NDArray[np.float64]:
# control with reward too
timeline = np.linspace(0, tf, nb_steps, dtype=np.float64) # (nb_steps,)
args_nb_assets = getattr(self.lifetime_model, "args_nb_assets", 1) # default 1 for LifetimeDistribution case
if args_nb_assets > 1:
timeline = np.tile(timeline, (args_nb_assets, 1))
elif self.reward.ndim == 2: # elif because we consider that if m > 1 in frozen_model, in reward it is 1 or m
timeline = np.tile(timeline, (self.reward.size, 1))
return timeline # (nb_steps,) or (m, nb_steps)
def expected_total_reward(self, tf: float, nb_steps: int) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
timeline = self._make_timeline(tf, nb_steps) # (nb_steps,) or (m, nb_steps)
z = renewal_equation_solver(
timeline,
self.lifetime_model,
lambda t: self.lifetime_model.ls_integrate(
lambda x: self.reward.sample(x) * self.discounting.factor(x), np.zeros_like(t), t, deg=15
), # reward partial expectation
discounting=self.discounting,
)
if self.first_lifetime_model is not None:
return delayed_renewal_equation_solver(
timeline,
z,
self.first_lifetime_model,
lambda t: self.first_lifetime_model.ls_integrate(
lambda x: self.first_reward.sample(x) * self.discounting.factor(x),
np.zeros_like(t),
timeline,
deg=15,
), # reward partial expectation
discounting=self.discounting,
)
return timeline, z # (nb_steps,) or (m, nb_steps)
def asymptotic_expected_total_reward(
self,
) -> NDArray[np.float64]:
lf = self.lifetime_model.ls_integrate(lambda x: self.discounting.factor(x), 0.0, np.inf, deg=15)
ly = self.lifetime_model.ls_integrate(
lambda x: self.discounting.factor(x) * self.reward.sample(x), 0.0, np.inf, deg=15
)
z = ly / (1 - lf) # () or (m, 1)
if self.first_lifetime_model is not None:
lf1 = self.first_lifetime_model.ls_integrate(lambda x: self.discounting.factor(x), 0.0, np.inf, deg=15)
ly1 = self.first_lifetime_model.ls_integrate(
lambda x: self.discounting.factor(x) * self.first_reward.sample(x), 0.0, np.inf, deg=15
)
z = ly1 + z * lf1 # () or (m, 1)
return z # () or (m, 1)
def expected_equivalent_annual_worth(
self, tf: float, nb_steps: int
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
timeline, z = self.expected_total_reward(tf, nb_steps) # (nb_steps,) or (m, nb_steps)
af = self.discounting.annuity_factor(timeline) # (nb_steps,) or (m, nb_steps)
q = z / (af + 1e-6) # # (nb_steps,) or (m, nb_steps) avoid zero division
if self.first_lifetime_model is not None:
q0 = self.first_reward.sample(0.0) * self.first_lifetime_model.pdf(0.0)
else:
q0 = self.reward.sample(0.0) * self.lifetime_model.pdf(0.0)
# q0 : () or (m, 1)
q0 = np.broadcast_to(q0, af.shape) # (), (nb_steps,) or (m, nb_steps)
return timeline, np.where(af == 0, q0, q)
def asymptotic_expected_equivalent_annual_worth(self) -> NDArray[np.float64]:
if self.discounting_rate == 0.0:
return (
self.lifetime_model.ls_integrate(lambda x: self.reward.sample(x), 0.0, np.inf, deg=15)
/ self.lifetime_model.mean()
) # () or (m, 1)
return self.discounting_rate * self.asymptotic_expected_total_reward() # () or (m, 1)
[docs]
@override
def sample_count_data(
self,
tf: float,
t0: float = 0.0,
size: int | tuple[int] | tuple[int, int] = 1,
maxsample: int = 1e5,
seed: Optional[int] = None,
) -> None:
from relife.sample import concatenate_count_data
iterator = RenewalRewardProcessIterator(self, size, (t0, tf), seed=seed)
self.count_data = concatenate_count_data(iterator, maxsample)
def renewal_equation_solver(
timeline: NDArray[np.float64],
lifetime_model: LifetimeDistribution | FrozenLifetimeRegression | FrozenAgeReplacementModel,
evaluated_func: Callable[[NDArray[np.float64]], NDArray[np.float64]],
discounting: Optional[Discounting] = None,
) -> NDArray[np.float64]:
# timeline : (nb_steps,) or (m, nb_steps)
tm = 0.5 * (timeline[..., 1:] + timeline[..., :-1]) # (nb_steps - 1,) or (m, nb_steps - 1)
f = lifetime_model.cdf(timeline) # (nb_steps,) or (m, nb_steps)
fm = lifetime_model.cdf(tm) # (nb_steps - 1,) or (m, nb_steps - 1)
y = evaluated_func(timeline) # (nb_steps,) or (m, nb_steps)
if y.shape != f.shape:
raise ValueError("Invalid shape between model and evaluated_func")
if discounting is not None:
d = discounting.factor(timeline) # (nb_steps,) or (m, nb_steps)
else:
d = np.ones_like(f)
z = np.empty(y.shape)
u = d * np.insert(f[..., 1:] - fm, 0, 1, axis=-1)
v = d[..., :-1] * np.insert(np.diff(fm), 0, 1, axis=-1)
q0 = 1 / (1 - d[..., 0] * fm[..., 0])
z[..., 0] = y[..., 0]
z[..., 1] = q0 * (y[..., 1] + z[..., 0] * u[..., 1])
for n in range(2, f.shape[-1]):
z[..., n] = q0 * (y[..., n] + z[..., 0] * u[..., n] + np.sum(z[..., 1:n][..., ::-1] * v[..., 1:n], axis=-1))
return z
def delayed_renewal_equation_solver(
timeline: NDArray[np.float64],
z: NDArray[np.float64],
first_lifetime_model: (
LifetimeDistribution | FrozenLifetimeRegression | FrozenAgeReplacementModel | FrozenLeftTruncatedModel
),
evaluated_func: Callable[[NDArray[np.float64]], NDArray[np.float64]],
discounting: Optional[ExponentialDiscounting] = None,
) -> NDArray[np.float64]:
# timeline : (nb_steps,) or (m, nb_steps)
tm = 0.5 * (timeline[..., 1:] + timeline[..., :-1]) # (nb_steps - 1,) or (m, nb_steps - 1)
f1 = first_lifetime_model.cdf(timeline) # (nb_steps,) or (m, nb_steps)
f1m = first_lifetime_model.cdf(tm) # (nb_steps - 1,) or (m, nb_steps - 1)
y1 = evaluated_func(timeline) # (nb_steps,) or (m, nb_steps - 1)
if discounting is not None:
d = discounting.factor(timeline) # (nb_steps,) or (m, nb_steps - 1)
else:
d = np.ones_like(f1)
z1 = np.empty(y1.shape)
u1 = d * np.insert(f1[..., 1:] - f1m, 0, 1, axis=-1)
v1 = d[..., :-1] * np.insert(np.diff(f1m), 0, 1, axis=-1)
z1[..., 0] = y1[..., 0]
z1[..., 1] = y1[..., 1] + z[..., 0] * u1[..., 1] + z[..., 1] * d[..., 0] * f1m[..., 0]
for n in range(2, f1.shape[-1]):
z1[..., n] = (
y1[..., n]
+ z[..., 0] * u1[..., n]
+ z[..., n] * d[..., 0] * f1m[..., 0]
+ np.sum(z[..., 1:n][..., ::-1] * v1[..., 1:n], axis=-1)
)
return z1