Source code for PyDynamic.uncertainty.propagate_filter

"""
This module contains functions for the propagation of uncertainties through
the application of a digital filter using the GUM approach.

This modules contains the following functions:

* :func:`FIRuncFilter`: Uncertainty propagation for signal y and uncertain FIR
  filter theta
* :func:`IIRuncFilter`: Uncertainty propagation for the signal x and the uncertain
  IIR filter (b,a)
* :func:`IIR_get_initial_state`: Get a valid internal state for :func:`IIRuncFilter`
  that assumes a stationary signal before the first value.

.. note:: The Elster-Link paper for FIR filters assumes that the autocovariance
          is known and that noise is stationary!

"""
import warnings

import numpy as np
from scipy.linalg import solve, solve_discrete_lyapunov, toeplitz
from scipy.signal import convolve, dimpulse, lfilter, lfilter_zi

from ..misc.tools import trimOrPad

__all__ = ["FIRuncFilter", "IIRuncFilter", "IIR_get_initial_state"]


def _fir_filter(x, theta, Ux=None, Utheta=None, initial_conditions="constant"):
    """Uncertainty propagation for signal x with covariance Ux
       and uncertain FIR filter theta with covariance Utheta.

       If either Ux or Utheta are omitted (None), then corresponding terms are not
       calculated to reduce computation time.

    Parameters
    ----------
    x : np.ndarray
        filter input signal
    theta : np.ndarray
        FIR filter coefficients
    Ux : np.ndarray, optional
        covariance matrix associated with x
        if the signal is fully certain, use `Ux = None` (default) to make use of more efficient calculations.
    Utheta : np.ndarray, optional
        covariance matrix associated with theta. If the filter is fully certain,
        do not provide Utheta to make use of more efficient calculations.
        see also the comparison given in <examples\Digital filtering\FIRuncFilter_runtime_comparison.py>
    initial_conditions : str, optional
        - "constant": assume signal + uncertainty are constant before t=0 (default)
        - "zero": assume signal + uncertainty are zero before t=0


    Returns
    -------
    y : np.ndarray
        FIR filter output signal
    Uy : np.ndarray
        covariance matrix of filter output y


    References
    ----------
    * Elster and Link 2008 [Elster2008]_

    .. seealso:: :mod:`PyDynamic.model_estimation.fit_filter`

    """

    # Note to future developers:
    # The functions _fir_filter and _fir_filter_diag share
    # the same logic. If adjustments become necessary (e.g.
    # due to bug fixing) please also consider adjusting it
    # in the other function as well.

    Ntheta = len(theta)  # FIR filter size

    if initial_conditions == "constant":
        x0 = x[0]

    # Note: currently only used in testing for comparison against Monte Carlo method
    elif initial_conditions == "zero":
        x0 = 0.0

    else:
        raise ValueError(
            f"_fit_filter: You provided 'initial_conditions' = '{initial_conditions}'."
            f"However, only 'zero' or 'constant' are currently supported."
        )

    # propagate filter
    y, _ = lfilter(theta, 1.0, x, zi=x0 * lfilter_zi(theta, 1.0))

    # propagate uncertainty
    Uy = np.zeros((len(x), len(x)))

    ## only calculate subterms, that are non-zero (compare eq. 34 of paper)
    if Ux is not None:
        ## extend covariance Ntheta steps into the past
        if initial_conditions == "constant":
            Ux_extended = _stationary_prepend_covariance(Ux, Ntheta - 1)

        elif initial_conditions == "zero":
            # extend covariance Ntheta steps into the past
            Ux_extended = np.pad(
                Ux,
                ((Ntheta - 1, 0), (Ntheta - 1, 0)),
                "constant",
                constant_values=0,
            )

        # calc subterm theta^T * Ux * theta
        Uy += _clip_main_diagonal_to_zero_from_below(
            convolve(np.outer(theta, theta), Ux_extended, mode="valid")
        )

    if Utheta is not None:
        ## extend signal Ntheta steps into the past
        x_extended = np.r_[np.full((Ntheta - 1), x0), x]

        # calc subterm x^T * Utheta * x
        Uy += _clip_main_diagonal_to_zero_from_below(
            convolve(np.outer(x_extended, x_extended), Utheta, mode="valid")
        )

    if (Ux is not None) and (Utheta is not None):
        # calc subterm Tr(Ux * Utheta)
        Uy += _clip_main_diagonal_to_zero_from_below(
            convolve(Ux_extended, Utheta.T, mode="valid")
        )

    return y, Uy


def _clip_main_diagonal_to_zero_from_below(matrix: np.ndarray) -> np.ndarray:
    np.fill_diagonal(matrix, matrix.diagonal().clip(min=0))
    return matrix


def _fir_filter_diag(
    x, theta, Ux_diag=None, Utheta_diag=None, initial_conditions="constant"
):
    """Uncertainty propagation for signal x with covariance diagonal Ux_diag
       and uncertain FIR filter theta with covariance diagonal Utheta_diag.

       If either Ux_diag or Utheta_diag are omitted (None), then corresponding terms are not
       calculated to reduce computation time.

    Parameters
    ----------
    x : np.ndarray
        filter input signal
    theta : np.ndarray
        FIR filter coefficients
    Ux_diag : np.ndarray, optional
        diagonal of covariance matrix ([ux11^2, ux22^2, ..., uxnn^2])
        if the signal is fully certain, use `Ux_diag = None` (default) to make use of more efficient calculations.
    Utheta_diag : np.ndarray, optional
        diagonal of covariance matrix ([ut11^2, ut22^2, ..., utnn^2])
        if the filter is fully certain, use `Utheta_diag = None` (default) to make use of more efficient calculations.
        see also the comparison given in <examples\Digital filtering\FIRuncFilter_runtime_comparison.py>
    initial_conditions : str, optional
        constant: assume signal + uncertainty are constant before t=0 (default)
        zero: assume signal + uncertainty are zero before t=0


    Returns
    -------
    y : np.ndarray
        FIR filter output signal
    Uy_diag : np.ndarray
        diagonal of covariance matrix of filter output y


    References
    ----------
    * Elster and Link 2008 [Elster2008]_

    .. seealso:: :mod:`PyDynamic.model_estimation.fit_filter`

    """

    # Note to future developers:
    # The functions _fir_filter and _fir_filter_diag share
    # the same logic. If adjustments become necessary (e.g.
    # due to bug fixing) please also consider adjusting it
    # in the other function as well.

    Ntheta = len(theta)  # FIR filter size

    if initial_conditions == "constant":
        x0 = x[0]

    # Note: currently only used in testing for comparison against Monte Carlo method
    elif initial_conditions == "zero":
        x0 = 0.0

    else:
        raise ValueError(
            f"_fit_filter: You provided 'initial_conditions' = '{initial_conditions}'."
            f"However, only 'zero' or 'constant' are currently supported."
        )

    # propagate filter
    y, _ = lfilter(theta, 1.0, x, zi=x0 * lfilter_zi(theta, 1.0))

    # propagate uncertainty
    Uy_diag = np.zeros(len(x))

    ## only calculate subterms, that are non-zero (compare eq. 34 of paper)
    if Ux_diag is not None:
        ## extend covariance Ntheta steps into the past
        if initial_conditions == "constant":
            Ux0 = Ux_diag[0]
        elif initial_conditions == "zero":
            Ux0 = 0.0
        Ux_diag_extended = np.r_[np.full((Ntheta - 1), Ux0), Ux_diag]

        # calc subterm theta^T * Ux * theta
        Uy_diag += convolve(np.square(theta), Ux_diag_extended, mode="valid").clip(
            min=0
        )

    if Utheta_diag is not None:
        ## extend signal Ntheta steps into the past
        x_extended = np.r_[np.full((Ntheta - 1), x0), x]

        # calc subterm x^T * Utheta * x
        Uy_diag += convolve(np.square(x_extended), Utheta_diag, mode="valid").clip(
            min=0
        )

    if (Ux_diag is not None) and (Utheta_diag is not None):
        # calc subterm Tr(Ux * Utheta)
        Uy_diag += convolve(Ux_diag_extended, Utheta_diag, mode="valid").clip(min=0)

    return y, Uy_diag


def _stationary_prepend_covariance(U, n):
    """Prepend covariance matrix U by n steps into the past"""

    c = np.r_[U[:, 0], np.zeros(n)]
    r = np.r_[U[0, :], np.zeros(n)]

    U_adjusted = toeplitz(c, r)
    U_adjusted[n:, n:] = U

    return U_adjusted


[docs] def FIRuncFilter( y, sigma_noise, theta, Utheta=None, shift=0, blow=None, kind="corr", return_full_covariance=False, ): """Uncertainty propagation for signal y and uncertain FIR filter theta A preceding FIR low-pass filter with coefficients `blow` can be provided optionally. This method keeps the signature of `PyDynamic.uncertainty.FIRuncFilter`, but internally works differently and can return a full covariance matrix. Also, sigma_noise can be a full covariance matrix. Parameters ---------- y : np.ndarray filter input signal sigma_noise : float or np.ndarray - float: standard deviation of white noise in y - 1D-array: interpretation depends on kind - 2D-array: full covariance of input theta : np.ndarray FIR filter coefficients Utheta : np.ndarray, optional - 1D-array: coefficient-wise standard uncertainties of filter - 2D-array: covariance matrix associated with theta if the filter is fully certain, use `Utheta = None` (default) to make use of more efficient calculations. see also the comparison given in <examples\Digital filtering\FIRuncFilter_runtime_comparison.py> shift : int, optional time delay of filter output signal (in samples) (defaults to 0) blow : np.ndarray, optional optional FIR low-pass filter kind : string, optional only meaningful in combination with sigma_noise a 1D numpy array - "diag": point-wise standard uncertainties of non-stationary white noise - "corr": single sided autocovariance of stationary colored noise (default) return_full_covariance : bool, optional whether or not to return a full covariance of the output, defaults to False Returns ------- x : np.ndarray FIR filter output signal Ux : np.ndarray - return_full_covariance == False : point-wise standard uncertainties associated with x (default) - return_full_covariance == True : covariance matrix containing uncertainties associated with x References ---------- * Elster and Link 2008 [Elster2008]_ .. seealso:: :mod:`PyDynamic.model_estimation.fit_filter` """ # Check for special cases, that we can compute much faster: ## These special cases come from the default behavior of the `old` FIRuncFilter ## implementation, which always returned only the square-root of the main diagonal ## of the covariance matrix. ## The special case is activated, if the user does not want a full covariance of ## the result (return_full_covariance == False). Furthermore, sigma_noise and Utheta ## must be representable by pure diagonal covariance matricies (i.e. they are of type ## None, float or 1D-array). By the same reasoning, we need to exclude cases ## of preceding low-pass filtering, as the covariance of the low-pass-filtered ## signal would no longer be purely diagonal. ## Computation is then based on eq. 33 [Elster2008]_ - which is substantially ## faster (typically by orders of magnitude) when compared to the full covariance ## calculation. ## Check this example: <examples\digital_filtering\FIRuncFilter_runtime_comparison.py> # note to user if not return_full_covariance: print( "FIRuncFilter: Output uncertainty will be given as 1D-array of point-wise " "standard uncertainties. Although this requires significantly lesser computations, " "it ignores correlation information. Every FIR-filtered signal will have " "off-diagonal entries in its covariance matrix (assuming the filter is longer " "than 1). To get the full output covariance matrix, use 'return_full_covariance=True'." ) if ( (not return_full_covariance) # no need for full covariance and ( # cases of sigma_noise, that support the faster computation isinstance(sigma_noise, float) or (isinstance(sigma_noise, np.ndarray) and len(sigma_noise.shape) == 1) or (sigma_noise is None) ) and ( # cases of Utheta, that support the faster computation isinstance(Utheta, float) or (isinstance(Utheta, np.ndarray) and len(Utheta.shape) == 1) or (Utheta is None) ) # the low-pass-filtered signal wouldn't have pure diagonal covariance and (blow is None) # if sigma_noise is 1D, it must represent the diagonal of a covariance matrix and (kind == "diag" or not isinstance(sigma_noise, np.ndarray)) ): if isinstance(sigma_noise, float): Uy_diag = np.full_like(y, sigma_noise**2) elif isinstance(sigma_noise, np.ndarray): Uy_diag = np.square(sigma_noise) else: Uy_diag = sigma_noise if isinstance(Utheta, float): Utheta_diag = np.full_like(theta, Utheta) elif isinstance(Utheta, np.ndarray): Utheta_diag = np.square(Utheta) else: Utheta_diag = Utheta x, Ux_diag = _fir_filter_diag( y, theta, Uy_diag, Utheta_diag, initial_conditions="constant" ) Ux_diag = np.sqrt(np.abs(Ux_diag)) # shift result if shift != 0: x = np.roll(x, -int(shift)) Ux_diag = np.roll(Ux_diag, -int(shift)) return x, Ux_diag # otherwise, the method computes full covariance information else: Ntheta = len(theta) # FIR filter size # check which case of sigma_noise is necessary if isinstance(sigma_noise, float): Uy = np.diag(np.full(len(y), sigma_noise**2)) elif isinstance(sigma_noise, np.ndarray): if len(sigma_noise.shape) == 1: if kind == "diag": Uy = np.diag(sigma_noise**2) elif kind == "corr": Uy = toeplitz(trimOrPad(sigma_noise, len(y))) else: raise ValueError( f"Unknown kind `{kind}`. Don't now how to interpret the array " f"sigma_noise." ) elif len(sigma_noise.shape) == 2: Uy = sigma_noise else: raise ValueError( "Unsupported value of sigma_noise. Please check the documentation." ) # filter operation(s) if isinstance(blow, np.ndarray): # apply (fully certain) lowpass-filter xlow, Ulow = _fir_filter(y, blow, Uy, None, initial_conditions="constant") # apply filter to lowpass-filtered signal x, Ux = _fir_filter( xlow, theta, Ulow, Utheta, initial_conditions="constant" ) else: # apply filter to input signal x, Ux = _fir_filter(y, theta, Uy, Utheta, initial_conditions="constant") # shift result if shift != 0: x = np.roll(x, -int(shift)) Ux = np.roll(Ux, (-int(shift), -int(shift)), axis=(0, 1)) if return_full_covariance: return x, Ux else: return x, np.sqrt(np.abs(np.diag(Ux)))
[docs] def IIRuncFilter(x, Ux, b, a, Uab=None, state=None, kind="corr"): """ Uncertainty propagation for the signal x and the uncertain IIR filter (b,a) Parameters ---------- x : np.ndarray filter input signal Ux : float or np.ndarray float: standard deviation of white noise in x (requires `kind="diag"`) 1D-array: interpretation depends on kind b : np.ndarray filter numerator coefficients a : np.ndarray filter denominator coefficients Uab : np.ndarray, optional (default: None) covariance matrix for (a[1:],b) state : dict, optional (default: None) An internal state (z, dz, P, cache) to start from - e.g. from a previous run of IIRuncFilter. * If not given, (z, dz, P) are calculated such that the signal was constant before the given range * If given, the input parameters (b, a, Uab) are ignored to avoid repetitive rebuild of the internal system description (instead, the cache is used). However a valid new state (i.e. with new b, a, Uab) can always be generated by using :func:`IIR_get_initial_state`. kind : string, optional (default: "corr") defines the interpretation of Ux, if Ux is a 1D-array "diag": point-wise standard uncertainties of non-stationary white noise "corr": single sided autocovariance of stationary (colored/correlated) noise (default) Returns ------- y : np.ndarray filter output signal Uy : np.ndarray uncertainty associated with y state : dict dictionary of updated internal state Note ---- In case of `a == [1.0]` (FIR filter), the results of :func:`IIRuncFilter` and :func:`FIRuncFilter` might differ! This is because IIRuncFilter propagates uncertainty according to the (first-order Taylor series of the) GUM, whereas FIRuncFilter takes full variance information into account (which leads to an additional term). This is documented in the description of formula (33) of [Elster2008]_ . The difference can be visualized by running `PyDynamic/examples/digital_filtering/validate_FIR_IIR_MC.py` References ---------- * Link and Elster [Link2009]_ """ # check user input if kind not in ("diag", "corr"): raise ValueError( "`kind` is expected to be either 'diag' or 'corr' but '{KIND}' was given.".format( KIND=kind ) ) # make Ux an array if not isinstance(Ux, np.ndarray): Ux = np.full(x.shape, Ux) # translate iid noise to vector if kind != "diag": kind = "diag" warnings.warn( f"Ux of type float and `kind='{kind}'` was given. To ensure the " f"behavior described in the docstring (float -> standard deviation of " f"white noise in x), `kind='diag'` is set. \n(" "To suppress this warning, explicitly set `kind='diag'`", category=UserWarning, ) # system, corr_unc and processed_input are cached as well to reduce computational load if state is None: # calculate initial state if kind == "diag": state = IIR_get_initial_state(b, a, Uab=Uab, x0=x[0], U0=Ux[0]) else: # "corr" state = IIR_get_initial_state( b, a, Uab=Uab, x0=x[0], U0=np.sqrt(Ux[0]), Ux=Ux ) z = state["z"] dz = state["dz"] P = state["P"] A, bs, cT, b0 = state["cache"]["system"] corr_unc = state["cache"]["corr_unc"] b, a, Uab = state["cache"]["processed_input"] p = len(a) - 1 # phi: dy/dtheta phi = np.empty((2 * p + 1, 1)) # output y, output uncertainty Uy y = np.zeros_like(x) Uy = np.zeros_like(x) # implementation of the state-space formulas from the paper for n in range(len(y)): # calculate output according to formulas (7) y[n] = cT @ z + b0 * x[n] # (7) # if Uab is not given, use faster implementation if isinstance(Uab, np.ndarray): # calculate phi according to formulas (13) and (15) from paper phi[:p] = np.transpose( cT @ dz - np.transpose(b0 * z[::-1]) ) # derivative w.r.t. a_1,...,a_p phi[p] = -a[1:][::-1] @ z + x[n] # derivative w.r.t. b_0 phi[p + 1 :] = z[::-1] # derivative w.r.t. b_1,...,b_p # output uncertainty according to formulas (12) and (19) if kind == "diag": Uy[n] = ( phi.T @ Uab @ phi + cT @ P @ cT.T + np.square(b0 * Ux[n]) ) # (12) else: # "corr" Uy[n] = phi.T @ Uab @ phi + corr_unc # (19) else: # Uab is None # output uncertainty according to formulas (12) and (19) if kind == "diag": Uy[n] = cT @ P @ cT.T + np.square(b0 * Ux[n]) # (12) else: # "corr" Uy[n] = corr_unc # (19) # timestep update preparations if kind == "diag": u_square = np.square(Ux[n]) # as in formula (18) else: # "corr" u_square = Ux[0] # adopted for kind == "corr" # | DON'T | # because dA is sparse, this is not efficient: # | USE | # dA = _get_derivative_A(p) # | THIS | # dA_z = np.hstack(dA @ z) # this is efficient, because no tensor-multiplication is involved: dA_z = np.vstack((np.zeros((p - 1, p)), -z[::-1].T)) # timestep update P = A @ P @ A.T + u_square * np.outer(bs, bs) # state uncertainty, formula (18) dz = A @ dz + dA_z # state derivative, formula (17) z = A @ z + bs * x[n] # state, formula (6) Uy = np.sqrt(np.abs(Uy)) # calculate point-wise standard uncertainties # return result and internal state state.update({"z": z, "dz": dz, "P": P}) return y, Uy, state
def _tf2ss(b, a): """ Variant of :func:`scipy.signal.tf2ss` that fits the definitions of [Link2009]_ """ p = len(a) - 1 A = np.vstack([np.eye(p - 1, p, k=1), -a[1:][::-1]]) B = np.zeros((p, 1)) B[-1] = 1 C = np.expand_dims((b[1:] - b[0] * a[1:])[::-1], axis=0) D = np.ones((1, 1)) * b[0] return A, B, C, D def _get_derivative_A(size_A): """ build tensor representing dA/dtheta """ dA = np.zeros((size_A, size_A, size_A)) for k in range(size_A): dA[k, -1, -(k + 1)] = -1 return dA def _get_corr_unc(b, a, Ux): """ Calculate the cumulated correlated noise based on equations (20) of [Link2009]_ . """ # get impulse response of IIR defined by (b,a) h_theta = np.squeeze( dimpulse((b, a, 1), x0=0.0, t=np.arange(0, len(Ux), step=1))[1][0] ) # equation (20), note: # - for values r<0 or s<0 the contribution to the sum is zero (because h_theta is zero) # - Ux is the one-sided autocorrelation and assumed to be zero outside its range corr_unc = np.sum(toeplitz(Ux) * np.outer(h_theta, h_theta)) return corr_unc
[docs] def IIR_get_initial_state(b, a, Uab=None, x0=1.0, U0=1.0, Ux=None): """ Calculate the internal state for the IIRuncFilter-function corresponding to stationary non-zero input signal. Parameters ---------- b : np.ndarray filter numerator coefficients a : np.ndarray filter denominator coefficients Uab : np.ndarray, optional (default: None) covariance matrix for (a[1:],b) x0 : float, optional (default: 1.0) stationary input value U0 : float, optional (default: 1.0) stationary input uncertainty Ux : np.ndarray, optional (default: None) single sided autocovariance of stationary (colored/correlated) noise (needed in the `kind="corr"` case of :func:`IIRuncFilter`) Returns ------- internal_state : dict dictionary of state """ # adjust filter coefficients for lengths b, a, Uab = _adjust_filter_coefficients(b, a, Uab) # convert into state space representation A, B, C, D = _tf2ss(b, a) # necessary intermediate variables p = len(A) IminusA = np.eye(p) - A dA = _get_derivative_A(p) # stationary internal state # (eye()-A) * zs = B*x0 zs = solve(IminusA, B) * x0 # stationary derivative of internal state # (eye() - A) dzs = dA * zs dzs = solve(IminusA, np.hstack(dA @ zs)) # stationary uncertainty of internal state # A * Ps * A^T - Ps + u^2 * B * B^T = 0 Ps = solve_discrete_lyapunov(A, U0**2 * np.outer(B, B)) if isinstance(Ux, np.ndarray): corr_unc = _get_corr_unc(b, a, Ux) else: corr_unc = 0 # bring results into the format that is used within IIRuncFilter # this is the only place, where the structure of the cache is "documented" cache = { "system": (A, B, C, D), "corr_unc": corr_unc, "processed_input": (b, a, Uab), } state = {"z": zs, "dz": dzs, "P": Ps, "cache": cache} return state
def _adjust_filter_coefficients(b, a, Uab): """ Bring b, a to the same length and adjust Uab accordingly """ # length difference of both filters d = len(a) - len(b) # adjust filter coefficient uncertainties if isinstance(Uab, np.ndarray): if d != 0: l_theta = len(a) + len(b) - 1 Uab_expected_shape = (l_theta, l_theta) if Uab.shape == Uab_expected_shape: if d < 0: Uab = np.insert( np.insert(Uab, [len(a) - 1] * (-d), 0, axis=0), [len(a) - 1] * (-d), 0, axis=1, ) elif d > 0: Uab = np.hstack((Uab, np.zeros((Uab.shape[0], d)))) Uab = np.vstack((Uab, np.zeros((d, Uab.shape[1])))) else: raise ValueError( "Uab is of shape {ACTUAL_SHAPE}, but expected shape is {EXPECTED_SHAPE} " "(len(a)+len(b)-1)".format( ACTUAL_SHAPE=Uab.shape, EXPECTED_SHAPE=Uab_expected_shape ) ) # adjust filter coefficients for later use if d < 0: a = np.hstack((a, np.zeros(-d))) elif d > 0: b = np.hstack((b, np.zeros(d))) return b, a, Uab