Source code for PyDynamic.uncertainty.propagate_MonteCarlo

# -*- coding: utf-8 -*-
"""

The propagation of uncertainties via the FIR and IIR formulae alone does not
enable the derivation of credible intervals, because the underlying distribution
remains unknown. The GUM-S2 Monte Carlo method provides a reference method for the
calculation of uncertainties for such cases.

This module implements Monte Carlo methods for the propagation of uncertainties for digital filtering.

This module contains the following functions:
* MC: Standard Monte Carlo method for application of digital filter
* SMC: Sequential Monte Carlo method with reduced computer memory requirements

"""

import sys

import numpy as np
import scipy as sp
import scipy.stats as stats
from scipy.signal import lfilter

from ..misc.filterstuff import isstable

__all__ = ["MC", "SMC"]

class Normal_ZeroCorr():
	"""
	Multivariate normal distribution with zero correlation
	"""
	def __init__(self, mean=None, cov=None):
		"""
		Parameters
		----------
			loc: np.ndarray, optional
				mean values, default is zero
			scale: np.ndarray, optional
				standard deviations for the elements in loc, default is zero
		"""
		if isinstance(mean, np.ndarray):
			self.mean = mean
			if isinstance(cov, np.ndarray):
				assert (len(cov)==len(mean))
				self.std = cov
			elif isinstance(cov, float):
				self.std = cov * np.ones_like(mean)
			else:
				self.std = np.zeros_like(mean)
		elif isinstance(cov, np.ndarray):
			self.std = cov
			self.mean = np.zeros_like(cov)

	def rvs(self, size=1):
		# This function mimics the behavior of the scipy stats package
		return np.tile(self.mean, (size, 1)) + np.random.randn(size, len(self.mean))*np.tile(self.std, (size, 1))



[docs]def MC(x,Ux,b,a,Uab,runs=1000,blow=None,alow=None,return_samples=False,shift=0,verbose=True): r"""Standard Monte Carlo method Monte Carlo based propagation of uncertainties for a digital filter (b,a) with uncertainty matrix :math:`U_{\theta}` for :math:`\theta=(a_1,\ldots,a_{N_a},b_0,\ldots,b_{N_b})^T` Parameters ---------- x: np.ndarray filter input signal Ux: float or np.ndarray standard deviation of signal noise (float), point-wise standard uncertainties or covariance matrix associated with x b: np.ndarray filter numerator coefficients a: np.ndarray filter denominator coefficients Uab: np.ndarray uncertainty matrix :math:`U_\theta` runs: int,optional number of Monte Carlo runs return_samples: bool, optional whether samples or mean and std are returned If 'return_sampes' is false, the method returns: Returns ------- y: np.ndarray filter output signal Uy: np.ndarray uncertainty associated with Other wise the method returns Returns ------- Y: np.ndarray array of Monte Carlo results References ---------- * Eichstädt, Link, Harris and Elster [Eichst2012]_ """ Na = len(a) runs = int(runs) Y = np.zeros((runs,len(x))) # set up matrix of MC results theta = np.hstack((a[1:],b)) # create the parameter vector from the filter coefficients Theta = np.random.multivariate_normal(theta,Uab,runs) # Theta is small and thus we can draw the full matrix now. if isinstance(Ux, np.ndarray): if len(Ux.shape)==1: dist = Normal_ZeroCorr(loc=x, scale=Ux) # non-iid noise w/o correlation else: dist = stats.multivariate_normal(x, Ux) # colored noise elif isinstance(Ux, float): dist = Normal_ZeroCorr(mean=x, cov=Ux) # iid noise else: raise NotImplementedError("The supplied type of uncertainty is not implemented") unst_count = 0 # Count how often in the MC runs the IIR filter is unstable. st_inds = list() if verbose: sys.stdout.write('MC progress: ') for k in range(runs): xn = dist.rvs() # draw filter input signal if not blow is None: if alow is None: alow = 1.0 # FIR low-pass filter xn = lfilter(blow,alow,xn) # low-pass filtered input signal bb = Theta[k,Na-1:] aa = np.hstack((1.0, Theta[k,:Na-1])) if isstable(bb,aa): Y[k,:] = lfilter(bb,aa,xn) st_inds.append(k) else: unst_count += 1 # don't apply the IIR filter if it's unstable if np.mod(k, 0.1*runs) == 0 and verbose: sys.stdout.write(' %d%%' % (np.round(100.0*k/runs))) if verbose: sys.stdout.write(" 100%\n") if unst_count > 0: print("In %d Monte Carlo %d filters have been unstable" % (runs,unst_count)) print("These results will not be considered for calculation of mean and std") print("However, if return_samples is 'True' then ALL samples are returned.") Y = np.roll(Y,int(shift),axis=1) # correct for the (known) sample delay if return_samples: return Y else: y = np.mean(Y[st_inds,:],axis=0) uy= np.cov(Y[st_inds,:],rowvar=0) return y, uy
[docs]def SMC(x,noise_std,b,a,Uab=None,runs=1000,Perc=None,blow=None,alow=None,shift=0, return_samples=False,phi=None,theta=None,Delta=0.0): r"""Sequential Monte Carlo method Sequential Monte Carlo propagation for a digital filter (b,a) with uncertainty matrix :math:`U_{\theta}` for :math:`\theta=(a_1,\ldots,a_{N_a},b_0,\ldots,b_{N_b})^T` Parameters ---------- x: np.ndarray filter input signal noise_std: float standard deviation of signal noise b: np.ndarray filter numerator coefficients a: np.ndarray filter denominator coefficients Uab: np.ndarray uncertainty matrix :math:`U_\theta` runs: int, optional number of Monte Carlo runs Perc: list, optional list of percentiles for quantile calculation blow: np.ndarray optional low-pass filter numerator coefficients alow: np.ndarray optional low-pass filter denominator coefficients shift: int integer for time delay of output signals return_samples: bool, otpional whether to return y and Uy or the matrix Y of MC results phi, theta: np.ndarray, optional parameters for AR(MA) noise model ::math:`\epsilon(n) = \sum_k \phi_k\epsilon(n-k) + \sum_k \theta_k w(n-k) + w(n)` with ::math:`w(n)\sim N(0,noise_std^2)` Delta: float,optional upper bound on systematic error of the filter If return_samples is False: Returns ------- y: np.ndarray filter output signal (Monte Carlo mean) Uy: np.ndarray uncertainties associated with y (Monte Carlo point-wise std) Quant: np.ndarray quantiles corresponding to percentiles Perc (if not None) at Otherwise: Returns ------- Y: np.ndarray array of all Monte Carlo results References ---------- * Eichstädt, Link, Harris, Elster [Eichst2012]_ """ runs = int(runs) if isinstance(a,np.ndarray): # filter order denominator Na = len(a)-1 else: Na = 0 if isinstance(b,np.ndarray): # filter order numerator Nb = len(b)-1 else: Nb = 0 if isinstance(theta,np.ndarray) or isinstance(theta,float): # initialise noise matrix corresponding to ARMA noise model if isinstance(theta,float): W = np.zeros((runs,1)) else: W = np.zeros((runs,len(theta))) else: MA = False # no moving average part in noise process if isinstance(phi,np.ndarray) or isinstance(phi,float): # Initialise for autoregressive part of noise process AR = True if isinstance(phi,float): E = np.zeros((runs,1)) else: E = np.zeros((runs,len(phi))) else: AR = False # no autoregresssive part in noise process if isinstance(blow,np.ndarray): X = np.zeros((runs,len(blow))) # initialise matrix of low-pass filtered input signal else: X = np.zeros(runs) if isinstance(alow,np.ndarray): Xl = np.zeros((runs,len(alow)-1)) else: Xl = np.zeros((runs,1)) if Na==0: # only FIR filter coefs = b else: coefs = np.hstack((a[1:],b)) if isinstance(Uab, np.ndarray): # Monte Carlo draw for filter coefficients Coefs = np.random.multivariate_normal(coefs, Uab, runs) else: Coefs = np.tile(coefs, (runs, 1)) b0 = Coefs[:,Na] if Na>0: # filter is IIR A = Coefs[:,:Na] if Nb>Na: A = np.hstack((A,np.zeros((runs,Nb-Na)))) else: # filter is FIR -> zero state equations A = np.zeros((runs,Nb)) c = Coefs[:,Na+1:] - np.multiply(np.tile(b0[:, np.newaxis],(1,Nb)),A) # Fixed part of state-space model States = np.zeros(np.shape(A)) # initialise matrix of states calcP = False # by default no percentiles requested if not Perc is None: # percentiles requested calcP = True P = np.zeros((len(Perc),len(x))) y = np.zeros_like(x) # initialise outputs Uy= np.zeros_like(x) # initialise vector of point-wise uncertainties (no correlation) print("Sequential Monte Carlo progress", end="") # start of the actual MC part for k in range(len(x)): w = np.random.randn(runs)*noise_std # noise process draw if AR and MA: E = np.hstack( ( E.dot(phi) + W.dot(theta) + w, E[:-1]) ) W = np.hstack( (w, W[:-1] ) ) elif AR: E = np.hstack( (E.dot(phi) + w, E[:-1]) ) elif MA: E = W.dot(theta) + w W = np.hstack( (w, W[:-1] ) ) else: w = np.random.randn(runs,1)*noise_std E = w if isinstance(alow,np.ndarray): # apply low-pass filter X = np.hstack((x[k] + E, X[:, :-1])) Xl = np.hstack( ( X.dot(blow.T) - Xl[:,:len(alow)].dot(alow[1:]), Xl[:,:-1] ) ) elif isinstance(blow,np.ndarray): X = np.hstack((x[k] + E, X[:, :-1])) Xl = X.dot(blow) else: Xl = x[k] + E if len(Xl.shape)==1: Xl = Xl[:,np.newaxis] # prepare for easier calculations Y = np.sum(np.multiply(c,States),axis=1) + np.multiply(b0,Xl[:,0]) + (np.random.rand(runs)*2*Delta - Delta) # state-space system output Z = -np.sum(np.multiply(A,States),axis=1) + Xl[:,0] # calculate state updates States = np.hstack((Z[:,np.newaxis], States[:,:-1])) # store state updates and remove old ones y[k] = np.mean(Y) # point-wise best estimate Uy[k]= np.std(Y) # point-wise standard uncertainties if calcP: P[:,k] = sp.stats.mstats.mquantiles(np.asarray(Y),prob=Perc) # percentiles if requested if np.mod(k, np.round(0.1*len(x))) == 0: print(' %d%%' % (np.round(100.0*k/len(x))), end="") print(" 100%") y = np.roll(y,int(shift)) # correct for (known) delay Uy= np.roll(Uy,int(shift)) if calcP: P = np.roll(P,int(shift),axis=1) return y, Uy, P else: return y, Uy