Source code for muarch.calibrate.truncate

from copy import deepcopy
from typing import List, Tuple

import numpy as np


[docs]def truncate_outliers(data: np.ndarray, *, bounds: List[Tuple[float, float]] = None, sd=0, replacement='mean', inplace=False): # pragma: no cover """ Truncates outliers by replacing it with the mean, median or a specified value. Outlier is determined by the number of standard deviations past the mean within the asset group. Parameters ---------- data: ndarray The tensor (data cube) where the axis represents time, trials and number of asset classes respectively bounds: List of numbers A list containing the lower and upper bound for each asset class. If specified, this takes precedence over the :code:`sd` parameter. If :code:`sd` is set to 0 and bounds are not specified, no changes will be made sd: float The number of standard deviations to consider a point an outlier. If :code:`sd` is set to 0 and bounds are not specified, no changes will be made replacement: {float, 'mean', 'median'} The value to replace outliers with. Valid values are 'mean', 'median' or a number. inplace: bool If True, calibration will modify the original data. Otherwise, a deep copy of the original data will be made before calibration. Deep copy can be time consuming if data is big. Returns ------- ndarray A data cube with the outliers replaced """ num_assets = data.shape[2] if bounds is None and sd == 0: return data if bounds is None: bounds = _form_bounds(data, sd) bounds = _sort_bounds(bounds) replacement = replacement.lower() _validate_data_cube(data) _validate_replacement(replacement) _validate_bounds(bounds, num_assets) if not inplace: data = deepcopy(data) replacement_values = _get_replacement_values(replacement, bounds=bounds, data=data) for i, (lb, ub), r in zip(range(num_assets), bounds, replacement_values): returns = data[..., i] returns[returns > ub] = r returns[returns < lb] = r return data
def _form_bounds(data: np.ndarray, sd: float): assert sd >= 0, "Standard deviations to determine outliers must be >= 0" bounds = [] for i in range(data.shape[2]): returns = data[..., i] mean, std = returns.mean(), returns.std() bounds.append((mean - sd * std, mean + sd * std)) return bounds def _sort_bounds(bounds: List[Tuple[float, float]]): return [(min(b), max(b)) for b in bounds] def _get_replacement_values(replacement: str, *, bounds=None, data: np.ndarray) -> List[float]: num_assets = data.shape[2] if replacement == "mean": return ([np.mean(b) for b in bounds] if bounds is not None else [data[..., n].mean() for n in range(num_assets)]) elif replacement == "median": return [np.median(data[..., n]) for n in range(num_assets)] else: return [float(replacement)] * num_assets def _validate_bounds(bounds: List[Tuple[float, float]], num_assets: int): assert len(bounds) == num_assets, "Number of bound ranges do not match number of assets" def _validate_data_cube(data: np.ndarray): assert data.ndim == 3, "data must be a 3D tensor" def _validate_replacement(replacement: str): if isinstance(replacement, str): assert replacement.lower() in ('mean', 'median'), \ "replacement can only be 'mean', 'median' or a float value" else: assert isinstance(replacement, float), "replacement can only be 'mean', 'median' or a float value"