# -*- coding: utf-8 -*-
# dcf
# ---
# A Python library for generating discounted cashflows.
#
# Author: sonntagsgesicht, based on a fork of Deutsche Postbank [pbrisk]
# Version: 0.7, copyright Sunday, 22 May 2022
# Website: https://github.com/sonntagsgesicht/dcf
# License: Apache License 2.0 (see LICENSE file)
from collections import OrderedDict
from inspect import signature
from math import exp
from warnings import warn
from .. import interpolation as _interpolations
from ..compounding import continuous_compounding, continuous_rate
from ..interpolation import linear_scheme, log_linear_scheme
from ..daycount import day_count as _default_day_count
[docs]def rate_table(curve, x_grid=None, y_grid=None):
r""" table of calculated rates
:param curve: function $f$
:param x_grid: vertical date axis $x_0, \dots, x_m$
:param y_grid: horizontal period axis $y_1, \dots, y_n$
(implicitly added a non-period $y_0=0$)
:return: list(list(float)) matrix $T=(t_{i,j})$ with
$t_{i,j}=f(x_i+y_j) \text{ if } x_i+y_j < x_{i+1}$.
>>> from tabulate import tabulate
>>> from dcf import Curve, rate_table
>>> curve = Curve([1, 4], [0, 1])
>>> table = rate_table(curve, x_grid=(0, 1, 2, 3, 4, 5), y_grid=(.0, .25, .5, .75))
>>> print(tabulate(table, headers='firstrow', floatfmt='.4f'))
0.0 0.25 0.5 0.75
-- ------ ------ ------ ------
0 0.0000 0.0000 0.0000 0.0000
1 0.0000 0.0833 0.1667 0.2500
2 0.3333 0.4167 0.5000 0.5833
3 0.6667 0.7500 0.8333 0.9167
4 1.0000 1.0000 1.0000 1.0000
5 1.0000 1.0000 1.0000 1.0000
>>> from businessdate import BusinessDate, BusinessPeriod
>>> from dcf import ZeroRateCurve
>>> term = '1m', '3m', '6m', '1y', '2y', '5y',
>>> rates = -0.008, -0.0057, -0.0053, -0.0036, -0.0010, 0.0014,
>>> today = BusinessDate(20211201)
>>> tenor = BusinessPeriod('1m')
>>> dates = [today + t for t in term]
>>> f = ZeroRateCurve(dates, rates, origin=today, forward_tenor=tenor)
>>> print(tabulate(f.table, headers='firstrow', floatfmt=".4f", tablefmt='latex'))
\begin{tabular}{lrrrrrrr}
\hline
& 0D & 1M & 2M & 3M & 6M & 1Y & 2Y \\
\hline
20211201 & -0.0080 & & & & & & \\
20220101 & -0.0080 & -0.0068 & & & & & \\
20220301 & -0.0057 & -0.0056 & -0.0054 & & & & \\
20220601 & -0.0053 & -0.0050 & -0.0047 & -0.0044 & & & \\
20221201 & -0.0036 & -0.0034 & -0.0032 & -0.0030 & -0.0023 & & \\
20231201 & -0.0010 & -0.0009 & -0.0009 & -0.0008 & -0.0006 & -0.0002 & 0.0006 \\
20261201 & 0.0014 & 0.0014 & 0.0014 & 0.0014 & 0.0014 & 0.0014 & 0.0014 \\
\hline
\end{tabular}
""" # noqa: E501
if x_grid is None:
x_grid = list(curve.domain)
if curve.origin not in x_grid:
x_grid = [curve.origin] + x_grid
if y_grid is None:
diff = list(e-s for s, e in zip(x_grid[:-1], x_grid[1:]))
step = diff[0]
y_grid = [step * 0]
for span in diff:
line = [step]
while line[-1] + step < span:
line.append(line[-1] + step)
y_grid.extend(line)
step = span
y_grid = tuple(sorted(set(y_grid)))
# fill table
grid = list()
grid.append(('',) + tuple(y_grid))
for i, x in enumerate(x_grid):
lst = x_grid[i+1] if i < len(x_grid)-1 \
else x_grid[-1] + y_grid[-1] + y_grid[-1]
grid.append(((x,) + tuple(curve(x+y) for y in y_grid if x + y < lst)))
return grid
[docs]class Price(object):
"""Price object for assets"""
@property
def value(self):
""" asset price value """
return float(self._value)
@property
def origin(self):
""" asset price date """
return self._origin
def __init__(self, value=0., origin=None):
r"""
:param value: price value
:param origin: price date
>>> from businessdate import BusinessDate
>>> from dcf import Price
>>> p=Price(100, BusinessDate(20201212))
>>> p.value
100.0
>>> float(p)
100.0
>>> p
Price(100.000000; origin=BusinessDate(20201212))
"""
self._value = value
self._origin = origin
def __float__(self):
return float(self.value)
def __str__(self):
return '%s(%f; origin=%s)' % \
(self.__class__.__name__, self.value, repr(self.origin))
def __repr__(self):
return str(self)
[docs]class Curve(object):
"""Curve function object"""
INTERPOLATIONS = dict()
"""mapping (dict) of availiable interpolations
additional to |dcf.interpolation|"""
_INTERPOLATION = linear_scheme # default interpolation
@property
def kwargs(self):
""" returns constructor arguments as ordered dictionary """
kw = type(self.__class__.__name__ + 'Kwargs', (OrderedDict,), {})()
for name in signature(self.__class__).parameters:
attr = self(self.domain) if name == 'data' else None
attr = getattr(self, '_' + name, attr)
attr = getattr(attr, '__name__', attr)
if attr is not None:
kw[name] = attr
setattr(kw, name, attr)
return kw
@property
def domain(self):
"""coordinates and date of given (not interpolated) x-values"""
return self._domain
@property
def table(self):
r""" table of interpolated rates (pretty printable)
given by |rate_table()|.
"""
# print(tabulate(curve.table, headers='firstrow')) # for pretty print
return rate_table(self)
def __init__(self, domain=(), data=(), interpolation=None):
r"""
:param list(float) domain: source values $x_1 \dots x_n$
:param list(float) data: target values $y_1 \dots y_n$
:param function interpolation:
(optional, default is defined on class level)
Interpolation function $\gamma$
such that $\gamma(x_i)=y_i$ for $i=1 \dots n$.
If **interpolation** is a string,
the interpolation function is
taken from class member dictionary |Curve.INTERPOLATIONS|.
Interpolation functions $\gamma$ can be constructed piecewise
using via |interpolation_scheme|.
Curve function object
$$f:\mathbb{R} \rightarrow \mathbb{R}, x \mapsto f(x)=y$$
build from finite point vectors $x$ and $y$
using piecewise various interpolation functions.
>>> from dcf import Curve
>>> c = Curve([0, 1, 2], [1, 2, 3])
get the grid of x values
>>> c.domain
[0, 1, 2]
get the grid of y values
>>> c(c.domain)
(1.0, 2.0, 3.0)
get a interpolated curve value
>>> c(1.5)
2.5
update existing values
>>> c[2] = 4
>>> c(c.domain)
(1.0, 2.0, 4.0)
add new points
>>> c[3] = 5
>>> c(c.domain)
(1.0, 2.0, 4.0, 5.0)
"""
# cast/extract inputs from Curve if given as argument
if isinstance(domain, Curve):
data = domain
domain = data.domain
if isinstance(data, Curve):
interpolation = \
interpolation or data.kwargs.get('interpolation', None)
_data = data(domain) # assuming domain is a list of dates !
if isinstance(data, DateCurve):
domain = [data.day_count(d) for d in domain]
data = _data
# sort data by domain values
if not len(domain) == len(data):
raise ValueError('%s requires equal length input '
'for domain (%d) and data (%d) ' %
(self.__class__.__name__, len(domain), len(data)))
self._interpolation = interpolation
self._update(domain, data)
def _update(self, domain, data):
interpolation = self._interpolation
if interpolation in self.INTERPOLATIONS:
func = self.INTERPOLATIONS[interpolation]
elif interpolation is None:
func = self._INTERPOLATION
else:
func = vars(_interpolations).get(interpolation, interpolation)
if domain:
domain, data = map(list, zip(*sorted(zip(*(domain, data)))))
self._func = func(domain, data)
self._domain = domain
def __contains__(self, item):
return item in self.domain
def __iter__(self):
return self.domain
def __getitem__(self, item):
if item in self:
return self(item)
raise KeyError(item)
def __setitem__(self, key, value):
domain = list(self.domain)
data = list(self(domain))
if key in domain:
data[domain.index(key)] = value
else:
domain.append(key)
data.append(value)
self._update(domain, data)
def __call__(self, x):
if isinstance(x, (tuple, list)):
return tuple(self(xx) for xx in x)
return self._func(x)
def __add__(self, other):
x_list = sorted(set(self.domain + other.domain))
y_list = [self(x) + other(x) for x in x_list]
return self.__class__(x_list, y_list, self._interpolation)
def __sub__(self, other):
x_list = sorted(set(self.domain + other.domain))
y_list = [self(x) - other(x) for x in x_list]
return self.__class__(x_list, y_list, self._interpolation)
def __mul__(self, other):
x_list = sorted(set(self.domain + other.domain))
y_list = [self(x) * other(x) for x in x_list]
return self.__class__(x_list, y_list, self._interpolation)
def __truediv__(self, other):
return self.__div__(other)
def __div__(self, other):
x_list = sorted(set(self.domain + other.domain))
if any(not other(x) for x in x_list):
raise ZeroDivisionError("Division with %s requires on "
"zero values." % other.__class__.__name__)
y_list = [self(x) / other(x) for x in x_list]
return self.__class__(x_list, y_list, self._interpolation)
def __str__(self):
inner = tuple()
if self.domain:
s, e = self.domain[0], self.domain[-1]
inner = f'[{s!r} ... {e!r}]', f'[{self(s)!r} ... {self(e)!r}]'
kw = self.kwargs
kw.pop('data')
kw.pop('domain')
inner += tuple(f"{k!s}={v!r}" for k, v in kw.items())
s = self.__class__.__name__ + '(' + ', '.join(inner) + ')'
return s
def __repr__(self):
s = self.__class__.__name__ + '()'
if self.domain:
fill = ',\n' + ' ' * (len(s) - 1)
kw = self.kwargs
inner = str(kw.pop('domain')), str(kw.pop('data'))
inner += tuple(f"{k!s}={v!r}" for k, v in kw.items())
s = self.__class__.__name__ + '(' + fill.join(inner) + ')'
return s
[docs] def shifted(self, delta=0.0):
""" build curve object with shifted **domain** by **delta**
:param delta: shift size
:return: curve object with shifted **domain** by **delta**
"""
if delta:
x_list = [x + delta for x in self.domain]
else:
x_list = self.domain
# y_list = self(self.domain)
# return self.__class__(x_list, y_list, self.interpolation)
return self.__class__(x_list, self)
[docs]class DateCurve(Curve):
"""Curve function object with dates as domain (points)"""
DAY_COUNT = dict()
"""mapping (dict) of availiable day count functions
additional to |dcf.daycount|"""
_TIME_SHIFT = '1D'
"""default time shift"""
def __init__(self, domain=(), data=(), interpolation=None,
origin=None, day_count=None):
"""curve function object with dates as domain (points)
:param domain: squences of date points
:param data: squence of curve values
:param interpolation: interpolation function
(see |Curve|)
:param origin: inital origin of date points
(used to calculate year fractions of poins in domain)
:param day_count: day count function
to derive year fractions from time periods
>>> from dcf import DateCurve
**domain** given as date/time measured in year fraction (float)
>>> domain = 0.5, 1.0, 1.5, 2.0
>>> data = 1, 2, 3, 4
>>> c = DateCurve(domain, data)
>>> c.domain
(0.5, 1.0, 1.5, 2.0)
>>> c(0.75)
1.5
**domain** given as date/time measured in dates (date)
>>> from datetime import date
>>> domain = date(2022, 8, 12), date(2023, 2, 12), date(2023, 8, 12), date(2024, 2, 12)
>>> data = 1, 2, 3, 4
>>> c = DateCurve(domain, data)
>>> c.domain
(datetime.date(2022, 8, 12), datetime.date(2023, 2, 12), datetime.date(2023, 8, 12), datetime.date(2024, 2, 12))
>>> c(date(2022, 11, 12))
1.5
**domain** given as date/time measured in dates (BusinessDate)
>>> from businessdate import BusinessDate
>>> t = BusinessDate(20220212)
>>> domain = tuple(t + p for p in ('6m', '12m', '18m', '24m'))
>>> data = 1, 2, 3, 4
>>> c = DateCurve(domain, data)
>>> c.domain
(BusinessDate(20220812), BusinessDate(20230212), BusinessDate(20230812), BusinessDate(20240212))
>>> c(t + '9m')
1.5
""" # noqa 501
if isinstance(domain, DateCurve):
data = domain
domain = data.domain
elif isinstance(data, DateCurve):
interpolation = interpolation or data.kwargs.interpolation
origin = origin or data.kwargs.origin
day_count = day_count or data.kwargs.day_count
data = data(domain) # assuming data is a list of dates !
self._domain = domain
self._origin = origin
self._day_count = day_count
super().__init__(domain, data, interpolation)
self.fixings = dict()
@property
def domain(self):
r""" domain of curve $t_1 \dots t_n$ as list of dates
where curve values are given explicit """
return self._domain
@property
def origin(self):
""" date of origin (date zero)
as curve reference date for time calucations """
if self._origin is not None:
return self._origin
return self._domain[0] if self._domain else None
def _update(self, domain, data):
flt_domain = tuple(self.day_count(d) for d in domain)
super()._update(flt_domain, data)
self._domain = domain
def __call__(self, x):
if isinstance(x, (list, tuple)):
return tuple(self(xx) for xx in x)
if x in self.fixings:
return self.fixings[x]
return super(DateCurve, self).__call__(self.day_count(x))
def __add__(self, other):
new = super(DateCurve, self).__add__(
other.shifted(self.origin - other.origin))
self.__class__(new.domain, new(new.domain), new._interpolation,
self.origin, self._day_count)
return new
def __sub__(self, other):
new = super(DateCurve, self).__sub__(
other.shifted(self.origin - other.origin))
self.__class__(new.domain, new(new.domain), new._interpolation,
self.origin, self._day_count)
return new
def __mul__(self, other):
new = super(DateCurve, self).__mul__(
other.shifted(self.origin - other.origin))
self.__class__(new.domain, new(new.domain), new._interpolation,
self.origin, self._day_count)
return new
def __div__(self, other):
new = super(DateCurve, self).__div__(
other.shifted(self.origin - other.origin))
new.origin = self.origin
return new
[docs] def day_count(self, start, end=None):
""" day count function to calculate a year fraction of time period
:param start: first date of period
:param end: last date of period
:return: (float) year fraction
"""
if end is None:
return self.day_count(self.origin, start)
if self._day_count is None:
return _default_day_count(start, end)
if self._day_count in self.DAY_COUNT:
day_count = self.DAY_COUNT.get(self._day_count)
return day_count(start, end)
return self._day_count(start, end)
[docs] def to_curve(self):
"""deprecated method to cast to |Curve()| object"""
cls = self.__class__.__name__
msg = "\n%s().cast(cast_type, **kwargs) is deprecated.\n" \
"Please use for casting an object `curve` of type %s\n" \
" cast_type(curve, **kwargs)\n" \
"instead." % (cls, cls)
warn(msg)
return Curve(self)
[docs] def integrate(self, start, stop):
r""" integrates curve and returns results as annualized rates
:param start: lower integration boundary
:param stop: upper integration boundary
:return: (float) integral value$
If $\gamma$ is this the curve. **integrate** returns
$$\int_a^b \gamma(t)\ dt$$
where $a$ is **start** and $b$ is **stop**.
if available **integrate** uses
`scipy.integrate.quad <https://docs.scipy.org/doc/scipy/reference/generated/scipy.integrate.quad.html>`_
""" # noqa E501
# try use result, error = scipy.integrate(self, start, stop)
try:
from scipy.integrate import quad
# raise ImportError()
s = self.day_count(start)
e = self.day_count(stop)
f = super(DateCurve, self).__call__
value, *_ = quad(f, s, e)
except ImportError:
value = 0.0
step = self._TIME_SHIFT
current = start
while current + step < stop:
value += self(current) * \
self.day_count(current, current + step)
current += step
value += self(current) * self.day_count(current, stop)
result = value / self.day_count(start, stop)
return result
[docs] def derivative(self, start):
r""" calculates numericaly the first derivative
:param start: curve point to calcuate derivative at this point
:return: (float) first derivative
If $\gamma$ is this the curve **derivative** returns
$$\frac{d}{dt}\gamma(t)$$
where $t$ is **start** but derived numericaly.
if available **derivative** uses
`scipy.misc.derivative <https://docs.scipy.org/doc/scipy/reference/generated/scipy.misc.derivative.html>`_
""" # noqa E501
try:
from scipy.misc import derivative
s = self.day_count(start)
dx = self.day_count(start, start + self._TIME_SHIFT)
f = super(DateCurve, self).__call__
result = derivative(f, s, dx)
except ImportError:
stop = start + self._TIME_SHIFT
value = self(stop) - self(start)
result = value / self.day_count(start, stop)
return result
[docs]class ForwardCurve(DateCurve):
"""Forward price curve with yield extrapolation """
_INTERPOLATION = log_linear_scheme
def __init__(self, domain=(), data=(), interpolation=None, origin=None,
day_count=None, yield_curve=0.0):
r""" curve of future asset prices i.e. asset forward prices
:param domain: dates of given asset prices $t_1 \dots t_n$
:param data: actual asset prices $p_{t_1} \dots p_{t_n}$
:param interpolation: interpolation method
for interpolating given asset prices
:param origin: origin of curve
:param day_count: day count method resp. function $\tau$
to calculate year fractions
:param yield_curve: yield $y$ to extrapolate by continous compounding
$$p_T = p_{t_n} \cdot \exp(y \cdot \tau(t_n, T))$$
or yield curve function $\gamma_c$ to extrapolate by
$$p_T = p_{t_n} \cdot \gamma_c(T)/\gamma_c(t_n)$$
or interest rate curve $c$ extrapolate by
$$p_T = p_{t_n} \cdot df_{c}^{-1}(t_n, T)$$
"""
if not data:
if isinstance(domain, float):
# build lists from single spot price value
data = [domain]
domain = [origin]
elif isinstance(domain, Price):
# build lists from single spot price
origin = domain.origin
data = [domain.value]
domain = [domain.origin]
super().__init__(domain, data, interpolation, origin, day_count)
if isinstance(yield_curve, float) and self.origin is not None:
yc = (lambda x: exp(-self.day_count(x) * yield_curve))
else:
yc = yield_curve
self.yield_curve = yc
""" yield curve for extrapolation using discount factors """
def __call__(self, x):
if isinstance(x, (list, tuple)):
return [self(xx) for xx in x]
else:
return self.get_forward_price(x)
[docs] def get_forward_price(self, value_date):
""" asset forward price at **value_date**
derived by interpolation on given forward prices
and extrapolation by given discount_factor resp. yield curve
:param value_date: future date of asset price
:return: asset forward price at **value_date**
"""
last_date = self.domain[-1]
if value_date <= last_date:
return super().__call__(value_date)
last_price = super().__call__(last_date)
if self.yield_curve is None:
df = 1.0
elif hasattr(self.yield_curve, 'get_discount_factor'):
df = self.yield_curve.get_discount_factor(last_date, value_date)
else:
df = self.yield_curve(value_date) / self.yield_curve(last_date)
return last_price / df
[docs]class RateCurve(DateCurve):
"""Interest rate curve and credit curve"""
_FORWARD_TENOR = '3M'
@staticmethod
def _get_storage_value(curve, x):
raise NotImplementedError()
[docs] def cast(self, cast_type, **kwargs):
"""deprecated method to cast a curve"""
cls = self.__class__.__name__
msg = "\n%s().cast(cast_type, **kwargs) is deprecated.\n" \
"Please use for casting an object `curve` of type %s\n" \
" cast_type(curve, **kwargs)\n" \
"instead." % (cls, cls)
warn(msg)
if 'domain' in kwargs:
kwargs['data'] = self
else:
kwargs['domain'] = self
return cast_type(**kwargs)
@property
def forward_tenor(self):
"""tenor (time period) associated to the rates of the curve"""
return self._FORWARD_TENOR \
if self._forward_tenor is None else self._forward_tenor
@property
def spread(self):
"""spread curve to add spreads to curve"""
return self._spread
@spread.setter
def spread(self, curve):
"""spread curve to add spreads to curve"""
if curve is not None and self._spread is not None:
raise TypeError("direct re-setting of spread curve not allowed."
"first re-set spread curve to None.")
self._spread = curve
def __init__(self, domain=(), data=(), interpolation=None, origin=None,
day_count=None, forward_tenor=None):
r"""
:param domain: either curve points $t_1 \dots t_n$
or a curve object $C$
:param data: either curve values $y_1 \dots y_n$
or a curve object $C$
:param interpolation: (optional) interpolation scheme
:param origin: (optional) curve points origin $t_0$
:param day_count: (optional) day count convention function $\tau(s, t)$
:param forward_tenor: (optional) forward rate tenor period $\tau^*$
If **data** is a |RateCurve| instance $C$,
it is casted to this new class type
with domain grid given by **domain**.
If **domain** is a |RateCurve| instance $C$,
it is casted to this new class type
with domain grid given **domain** property of $C$.
Further arguments
**interpolation**, **origin**, **day_count**, **forward_tenor**
will replace the ones given by $C$ if not given explictly.
"""
other = None
# either domain or data can be RateCurve too.
# if given extract arguments for casting
if isinstance(domain, RateCurve):
if data:
raise TypeError("If first argument is %s, "
"data argument must not be given." %
domain.__class__.__name__)
other = domain
domain = other.domain
if isinstance(data, RateCurve):
other = data
domain = other.domain if domain is None else domain
if other:
# get data as self._get_storage_value
data = [self._get_storage_value(other, x) for x in domain]
# use other properties if not give explicitly
# interpolation should default to class defaults
# interpolation = other.interpolation
# interpolation = \
# interpolation or other.kwargs.get('interpolation', None)
origin = origin or other.kwargs.origin
day_count = day_count or other.kwargs.day_count
super(RateCurve, self).__init__(
domain, data, interpolation, origin, day_count)
self._forward_tenor = forward_tenor
self._spread = None
def __call__(self, x):
if isinstance(x, (list, tuple)):
return tuple(self(xx) for xx in x)
s = self._spread(x) if self._spread else 0.0
return super().__call__(x) + s
def __add__(self, other):
new = super(RateCurve, self).__add__(self.__class__(other))
return self.__class__(new, forward_tenor=self._forward_tenor)
def __sub__(self, other):
new = super(RateCurve, self).__sub__(self.__class__(other))
return self.__class__(new, forward_tenor=self._forward_tenor)
def __mul__(self, other):
new = super(RateCurve, self).__mul__(self.__class__(other))
return self.__class__(new, forward_tenor=self._forward_tenor)
def __div__(self, other):
new = super(RateCurve, self).__div__(self.__class__(other))
return self.__class__(new, forward_tenor=self._forward_tenor)
def _get_compounding_factor(self, start, stop):
# aka discount factor
if start == stop:
return 1.
ir = self._get_compounding_rate(start, stop)
t = self.day_count(start, stop)
return continuous_compounding(ir, t)
def _get_compounding_rate(self, start, stop):
# aka zero rate
if start == stop:
return self._get_compounding_rate(
start, start + self._TIME_SHIFT)
df = self._get_compounding_factor(start, stop)
t = self.day_count(start, stop)
return continuous_rate(df, t)