Source code for PyDynamic.uncertainty.interpolation

# -*- coding: utf-8 -*-
"""
The :mod:`PyDynamic.uncertainty.interpolation` module implements methods for
the propagation of uncertainties in the application of standard interpolation methods
as provided by :class:`scipy.interpolate.interp1d`.

This module contains the following function:

* :func:`interp1d_unc`: Interpolate arbitrary time series considering the associated
  uncertainties
"""

from typing import Optional, Tuple

import numpy as np
from scipy.interpolate import interp1d

__all__ = ["interp1d_unc"]


[docs]def interp1d_unc( t_new: np.ndarray, t: np.ndarray, y: np.ndarray, uy: np.ndarray, kind: Optional[str] = "linear", ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Interpolate arbitrary time series considering the associated uncertainties The interpolation timestamps must lie within the range of the original timestamps. In addition, at least two of each of the original timestamps, measured values and associated measurement uncertainties are required and an equal number of each of these three. Parameters ---------- t_new: (M,) array_like The timestamps at which to evaluate the interpolated values. t: (N,) array_like timestamps in ascending order y: (N,) array_like corresponding measurement values uy: (N,) array_like corresponding measurement values' uncertainties kind: str, optional Specifies the kind of interpolation for the measurement values as a string ('previous', 'next', 'nearest' or 'linear'). Returns ------- t_new : (M,) array_like interpolation timestamps y_new : (M,) array_like interpolated measurement values uy_new : (M,) array_like interpolated measurement values' uncertainties References ---------- * White [White2017]_ """ # Check for ascending order of timestamps. if not np.all(t[1:] >= t[:-1]): raise ValueError("Array of timestamps needs to be in ascending order.") # Check for proper dimensions of inputs. if ((t.min() > t_new) | (t.max() < t_new)).any(): raise ValueError( "Array of interpolation timestamps must be in the range of original " "timestamps." ) if not len(t) == len(y): raise ValueError( "Array of measurement values must be same length as array of timestamps." ) if not len(y) == len(uy): raise ValueError( "Array of associated measurement values' uncertainties must be same length " "as array of measurement values." ) # Interpolate measurement values in the desired fashion. interp_y = interp1d(t, y, kind=kind) y_new = interp_y(t_new) if kind in ("previous", "next", "nearest"): # Look up uncertainties in cases where it is applicable. interp_uy = interp1d(t, uy, kind=kind) uy_new = interp_uy(t_new) elif kind == "linear": # Determine the relevant interpolation intervals for all interpolation # timestamps. We determine the intervals for each timestamp in t_new by # finding the biggest of all timestamps in t smaller or equal than the one in # t_new. From the array of indices of our left interval bounds we get the # right bounds by just incrementing the indices, which of course # results in index errors at the end of our array, in case the biggest # timestamps in t_new are (quasi) equal to the biggest (i.e. last) timestamp # in t. This gets corrected just after the iteration by simply manually # choosing one interval "further left", which will just at the node result # in the same interpolation (being the actual value of y). indices = np.empty_like(t_new, dtype=int) it_t_new = np.nditer(t_new, flags=["f_index"]) while not it_t_new.finished: # Find indices of biggest of all timestamps smaller or equal # than current interpolation timestamp. Assume that timestamps are in # ascending order. indices[it_t_new.index] = np.where(t <= it_t_new[0])[0][-1] it_t_new.iternext() # Correct all degenerated intervals. This happens when the last timestamps of # t and t_new are equal. indices[np.where(indices == len(t) - 1)] -= 1 t_prev = t[indices] t_next = t[indices + 1] # Look up corresponding input uncertainties. uy_prev_sqr = uy[indices] ** 2 uy_next_sqr = uy[indices + 1] ** 2 # Compute uncertainties for interpolated measurement values. uy_new = np.sqrt( (t_new - t_next) ** 2 * uy_prev_sqr + (t_new - t_prev) ** 2 * uy_next_sqr ) / (t_next - t_prev) else: raise NotImplementedError return t_new, y_new, uy_new