"""Main class for DiagnosisModel."""
import numpy as np
import matplotlib.pyplot as plt
import sympy as sym
import copy
import faultdiagnosistoolbox.dmperm as dmperm
import faultdiagnosistoolbox.Matching as match
import faultdiagnosistoolbox.StructurePlotting as smplot
import faultdiagnosistoolbox.CodeGeneration as codegen
import faultdiagnosistoolbox.SensorPlacement as sensplace
import faultdiagnosistoolbox.TestSelection as testselection
from faultdiagnosistoolbox.VarIdGen import VarIdGen
import sys
[docs]
class DiagnosisModel:
"""Diagnosis model.
Parameters
----------
model_def: dict
A dictionary with the following keys:
type : string
type of model (Symbolic, VarStruc, MatrixStruc)
x : list
list of unknown variables in the model
f : list
list of fault variables
z : list
list of known variables
rels : list
list of model equations (in case of symbolic model)
X/F/Z : arraylike
structure matrices (in case of MatrixStruc model)
parameters : list
list of parameters (optional) For the induction motor model, this corresponds to
Example
-------
Consider a dynamic model described by the equations below with unknown variables ``x = (i_a, i_b, lambda_a, lambda_b, w, di_a, di_b, dlambda_a, dlambda_b, dw, q_a, q_b, Tl)``,
three measurements ``y1, y2, y3``, and known input controls ``u_a, u_b``, i.e., ``z = (u_a, u_b, y1, y2, y3)``, and two sensor faults ``f=(f_a, f_b)``.
To define a symbolic model, define model variables,
>>> model_def = {'type': 'Symbolic',
'x': ['i_a', 'i_b', 'lambda_a', 'lambda_b', 'w',
'di_a', 'di_b', 'dlambda_a', 'dlambda_b', 'dw', 'q_a', 'q_b', 'Tl'],
'f': ['f_a', 'f_b'],
'z': ['u_a', 'u_b', 'y1', 'y2', 'y3'],
'parameters': ['a', 'b', 'c', 'd', 'L_M', 'k', 'c_f', 'c_t']}
and make variables symolic using SymPy, for example as:
>>> sym.var(model_def['x'])
>>> sym.var(model_def['f'])
>>> sym.var(model_def['z'])
>>> sym.var(model_def['parameters'])
Then, the model equations can be represented by
>>> model_def['rels'] = [
-q_a + w*lambda_a,
-q_b + w*lambda_b,
-di_a + -a*i_a + b*c*lambda_a + b*q_b + d*u_a,
-di_b + -a*i_b + b*c*lambda_b + b*q_a + d*u_b,
-dlambda_a + L_M*c*i_a - c*lambda_a-q_b,
-dlambda_b + L_M*c*i_b - c*lambda_b-q_a,
-dw + -k*c_f*w + k*c_t*(i_a*lambda_b - i_b*lambda_a) - k*Tl,
fdt.DiffConstraint('di_a','i_a'),
fdt.DiffConstraint('di_b','i_b'),
fdt.DiffConstraint('dlambda_a','lambda_a'),
fdt.DiffConstraint('dlambda_b','lambda_b'),
fdt.DiffConstraint('dw','w'),
-y1 + i_a + f_a,
-y2 + i_b + f_b,
-y3 + w]
Now, the model can be defined using
>>> model = fdt.DiagnosisModel(model_def, name ='Induction motor')
"""
def __init__(self, modeldef, name=""):
"""Initialize DiagnosisModel object."""
self.X = np.zeros((0, 0), dtype="float64")
self.F = np.zeros((0, 0), dtype="float64")
self.Z = np.zeros((0, 0), dtype="float64")
self.x = []
self.f = []
self.z = []
self.name = name
self.parameters = []
self.vGen = VarIdGen()
self.modelType = "Structural"
if modeldef["type"] == "VarStruc" or modeldef["type"] == "Symbolic":
self.X = _ModelStructure(modeldef["rels"], modeldef["x"])
self.x = modeldef["x"]
self.F = _ModelStructure(modeldef["rels"], modeldef["f"])
self.f = modeldef["f"]
self.Z = _ModelStructure(modeldef["rels"], modeldef["z"])
self.z = modeldef["z"]
self.e = list(map(lambda x: self.vGen.NewE(), np.arange(0, self.ne())))
if modeldef["type"] == "Symbolic":
self.modelType = "Symbolic"
if "parameters" in modeldef:
self.parameters = modeldef["parameters"]
elif modeldef["type"] == "MatrixStruc":
self.X = np.array(modeldef["X"], dtype=np.int64)
ne = self.X.shape[0]
if len(modeldef["F"]) > 0:
self.F = np.array(modeldef["F"], dtype=np.int64)
else:
self.F = np.zeros((ne, 0), dtype=np.int64)
if len(modeldef["Z"]) > 0:
self.Z = np.array(modeldef["Z"], dtype=np.int64)
else:
self.Z = np.zeros((ne, 0), dtype=np.int64)
if "x" in modeldef:
self.x = modeldef["x"]
else:
self.x = [f"x{x + 1}" for x in range(self.X.shape[1])]
if "f" in modeldef:
self.f = modeldef["f"]
else:
self.f = [f"f{x + 1}" for x in range(self.F.shape[1])]
if "z" in modeldef:
self.z = modeldef["z"]
else:
self.z = [f"z{x + 1}" for x in range(self.Z.shape[1])]
self.e = [f"e{x + 1}" for x in range(self.ne())]
else:
print("Model definition type " + modeldef["type"] + " is not supported (yet)")
if modeldef["type"] == "Symbolic":
self.syme = np.array(_ToEquations(modeldef["rels"]), dtype=object)
if np.any(np.sum(self.X > 1, 0) > 1):
print(
"The model has higher order derivatives, "
+ "please rewrite as a set of first order "
+ "differential equations"
)
self.P = np.arange(0, len(self.x))
self.Pfault = []
[docs]
def copy(self):
"""Return a new copy of the model object."""
return copy.deepcopy(self)
[docs]
def ne(self) -> int:
"""Return number of equations in model."""
return self.X.shape[0]
[docs]
def nx(self) -> int:
"""Return number of unknown variables in model."""
return self.X.shape[1]
[docs]
def nf(self) -> int:
"""Return number of fault variables in model."""
return self.F.shape[1]
[docs]
def nz(self) -> int:
"""Return number of known variables in model."""
return self.Z.shape[1]
[docs]
def GetDMParts(self) -> dmperm.DMResult:
"""Return Dulmage-Mendelsohn decomposition of structure for unknown variables."""
return dmperm.GetDMParts(self.X)
[docs]
def MSO(self) -> list[np.ndarray]:
"""Return the set of MSO sets.
For details of the algorithm see the journal publication
Krysander, Mattias, Jan Aslund, and Mattias Nyberg.
"An efficient algorithm for finding minimal overconstrained
subsystems for model-based diagnosis."
Systems, Man and Cybernetics, Part A: Systems and Humans,
IEEE Transactions on 38.1 (2008): 197-206.
"""
return list(dmperm.MSO(self.X))
[docs]
def MTES(self) -> list[np.ndarray]:
"""Return the set of MTES sets.
For details of the algorithm see the paper
A Structural Algorithm for Finding Testable Sub-models and
Multiple Fault Isolability Analysis. Mattias Krysander, Jan Åslund,
and Erik Frisk (2010). 21st International Workshop on Principles of
Diagnosis (DX-10). Portland, Oregon, USA.
"""
return dmperm.MTES(self)
[docs]
def TestSelection(self, arr, method="aminc", isolabilitymatrix="") -> list[np.ndarray]:
"""A minimal hitting set based test selection.
Find sets of tests, based on a set of equations or a fault sensitivity
matrix, FSM, that achieves isolability performance specifications
Simple test selection strategy that finds sets of tests that, ideally,
fulfills specified isolability performance specifications. Note that
this is a purely structural method, no noise or model uncertainty
considerations are used.
Parameters
----------
arr : An array of arrays, interpreted as a set of equations
sets used to design residuals. For eample the ouput of
the MSO or MTES functions
isolabilitymatrix : Matrix specifying required isolability
performance. A 0 in position (i,j) represents that
fault i is isolable from fault j, a 1 indicates
that fault i is not isolable from fault j. A fault
can not be isolable from itself and therefore must
the diagonal always be 1.
method : Choice of test seleciton method.
'aminc' - Searches for a subset minimal sets of tests
that fulfills requirements (default)
Uses aminc, an approximative minimal cardinality
hitting set approach from:
Cormen, L., Leiserson, C. E., and
Ronald, L. (1990). Rivest,
"Introduction to Algorithms.", 1990.
Information also in De Kleer, Johan. "Hitting set
algorithms for model-based diagnosis."
22th International Workshop on Principles
of Diagnosis, DX, 2011.
'full' - Finds all subset minimal sets of tests
that fulfills requirements. Warning,
might easily lead to computationally
intractable problems.
"""
return testselection.TestSelection(self, arr, method, isolabilitymatrix)
[docs]
def srank(self) -> int:
"""Return the structural rank of the incidence matrix for the unknown variables."""
return dmperm.srank(self.X)
[docs]
def IsPSO(self, eq=None) -> bool:
"""Return True if the model is a PSO set.
Parameters
----------
eq (optional) : Set of equations to analyze. (default: all equations)
"""
if eq is None:
eqs = np.arange(0, self.X.shape[0])
else:
eqs = eq
dm = dmperm.GetDMParts(self.X[eqs, :])
return (len(dm.Mm.row) == 0) and (len(dm.M0) == 0)
[docs]
def Structural(self):
"""Convert the model to a structural model."""
self.modelType = "Structural"
self.syme = []
[docs]
def LumpDynamics(self):
"""Lump the dynamics variables.
If model is symbolic, it will be converted into a structural model
"""
if self.modelType == "Symbolic":
self.Structural()
X = self.X
F = self.F
Z = self.Z
dvars = np.where(np.any(X == 3, axis=0))[0]
diffEq = self.DifferentialConstraints()
for e in diffEq:
iv = np.where(X[e, :] == 2)[0][0]
dv = np.where(X[e, :] == 3)[0][0]
X[:, iv] = np.logical_or(X[:, iv] > 0, X[:, dv] > 0).astype(np.int64)
self.X = np.delete(np.delete(X, dvars, axis=1), diffEq, axis=0)
self.x = [self.x[xi] for xi in np.arange(0, len(self.x)) if not (xi in dvars)]
self.F = np.delete(F, diffEq, axis=0)
self.Z = np.delete(Z, diffEq, axis=0)
self.e = [self.e[ei] for ei in np.arange(0, len(self.e)) if not (ei in diffEq)]
self.parameters = []
self.syme = []
self.Pfault = []
self.P = np.arange(0, len(self.x))
[docs]
def IsObservable(self, eq=None) -> bool:
"""Return true if the model is observable.
Parameters
----------
eq : Set of equations to analyze. (default: all equations)
"""
if eq is None:
eq = []
return dmperm.IsObservable(self.X, eq)
[docs]
def Pantelides(self) -> tuple[int, np.ndarray]:
"""Compute structural index and differentiation vector for exactly determined models
idx, nu = model.Pantelides()
Returns
-----
idx : Structural index of model
nu : Differentiation vector as defined in
Pantelides, Constantinos C. "The consistent initialization of
differential-algebraic systems." SIAM Journal on Scientific and
Statistical Computing 9.2 (1988): 213-231.
"""
X = self.X.copy()
ne, nx = X.shape
if dmperm.srank(X) < nx:
print("Error: Pantelides can only be called for square, " + "exactly determined models")
return None
der = np.zeros((ne, 1), dtype=int)
X[X == 3] = 1
X = X - 1
cmatching = False
hd = None
while not cmatching:
Xder = X + der @ np.ones((1, nx))
Xder[X < 0] = -1
hd = np.max(Xder, axis=0)
hod = Xder == (np.ones((ne, 1)) @ hd.reshape((1, -1)))
if dmperm.srank(hod) == nx:
cmatching = True
else:
dm = dmperm.GetDMParts(hod)
der[dm.Mp.row] = der[dm.Mp.row] + 1
idx = np.max(der) + np.any(hd == 0)
return idx, der.reshape(-1)
[docs]
def IsHighIndex(self, eq=None) -> bool:
"""Return true if the model is high structural differential index.
Parameters
----------
eq : Set of equations to analyze. (default: all equations)
"""
if eq is None:
eq = []
if len(eq) == 0:
eq = np.arange(0, self.X.shape[0])
return dmperm.IsHighIndex(self.X, eq)
[docs]
def IsLowIndex(self, eq=None) -> bool:
"""Return true if the model is low structural differential index.
Parameters
----------
eq : Set of equations to analyze. (default: all equations)
"""
if eq is None:
eq = np.arange(0, self.X.shape[0])
return dmperm.IsLowIndex(self.X, eq)
[docs]
def IsUnderdetermined(self) -> bool:
"""Return true if the model has underdetermined parts."""
dm = dmperm.GetDMParts(self.X)
return len(dm.Mm.row) > 0
[docs]
def DifferentialConstraints(self) -> bool:
"""Return indices to the differential constraints."""
return np.where(np.any(self.X == 2, axis=1))[0]
[docs]
def DynamicVariables(self) -> tuple[list, list[int]]:
"""Return variables and index to dynamic variables in model."""
idx = np.where(np.any(self.X == 2, axis=0))[0]
c = list(np.array(self.x)[idx])
return c, idx
[docs]
def AlgebraicVariables(self) -> tuple[list, list[int]]:
"""Return variables and index to algebraic variables in model."""
dyn_idx = np.where(np.any(self.X == 2, axis=0))[0]
idx = [x for x in range(self.nx()) if not (x in dyn_idx)]
c = list(np.array(self.x)[idx])
return c, idx
[docs]
def FSM(self, eqs_sets, plot=False) -> np.ndarray:
"""Return the fault signature matrix for a set of equation sets.
Parameters
----------
eqs_sets : list of sets of equations, e.g., MSO sets
Plot : If True, plot the fault signature matrix (dafault: False)
"""
r = np.zeros((len(eqs_sets), self.F.shape[1]), dtype=np.int64)
for idx, eqs in enumerate(eqs_sets):
r[idx, :] = np.any(self.F[eqs, :], axis=0)
if plot:
plt.spy(r, markersize=10, marker="o")
plt.xticks(np.arange(0, self.nf()), self.f)
plt.yticks(np.arange(0, len(eqs_sets)), ["eq. set " + str(k + 1) for k in np.arange(0, len(eqs_sets))])
plt.gca().xaxis.tick_bottom()
return r
[docs]
def Matching(self, eqs) -> match.Matching:
"""Return a matching for a set of equations.
Parameters
----------
eqs : Set of equations
"""
return match.Matching(self.X, np.array(eqs))
[docs]
def MSOCausalitySweep(self, mso, diffres="int", causality=None) -> list[str]:
"""Determine casuality for MSO set residual generator.
Determine causality for sequential residual generator for
each n residual equations for a given MSO set.
Parameters
----------
mso :
list of equations
diffres :
Can be 'int' or 'der' (default 'int'). Determines how
to treat differential constraints when used as a
residual equation.
causality :
Can be 'int' or 'der'. When causality is specified, the call returns a boolean vector indicating
if it is possible to realize the residual generator in derivative or integral causality respectively
with the corresponding equations as residual equation. If this option is given, the diffres key have no effect.
"""
def IsDifferentialStructure(Xi, eq):
return np.any(Xi[eq, :] == 3)
if (causality == "der") and (not (diffres == "der")):
diffres = "der"
if (causality == "int") and (not (diffres == "int")):
diffres = "int"
res = []
X = self.X
for red in mso:
m0 = np.sort([e for e in mso if e != red])
Gamma = self.Matching(m0)
if not IsDifferentialStructure(X, red):
res.append(Gamma.matchType)
elif diffres == "int":
if Gamma.matchType == "der":
res.append("mixed")
elif (Gamma.matchType == "int") or (Gamma.matchType == "mixed"):
res.append(Gamma.matchType)
elif Gamma.matchType == "algebraic":
res.append("int")
elif diffres == "der":
if Gamma.matchType == "int":
res.append("mixed")
elif (Gamma.matchType == "der") or (Gamma.matchType == "mixed"):
res.append(Gamma.matchType)
elif Gamma.matchType == "algebraic":
res.append("der")
if causality is not None:
res = np.array(list(map(lambda c: c == causality or c == "algebraic", res)))
return res
[docs]
def MSOdifferentialOrder(self, mso):
"""Determine the number of required differentiations of each equation, each known variable in the model to obtain an ARR.
Args:
model (DiagnosisModel): Model object
mso (np.array): Array of equations
Returns:
eqOrder (np.array): Array of the required number of differentiations of each equation.
zIdx (np.array): The known variables included as an array of indices.
zOrder (np.array): Array of the highest order of derivatives of each known variable.
fIdx (np.array): The fault variables included as an array of indices.
fOrder (np.array): Array of the lowest order of derivatives of each fault variable.
"""
def redundancy(X):
dm = dmperm.GetDMParts(X)
return 0 if len(dm.Mp.row) == 0 else (dm.Mp.row.shape[0] - dm.Mp.col.shape[0])
X = self.X[mso].astype(int)
Z = self.Z[mso].astype(int)
F = self.F[mso].astype(int)
ne = X.shape[0] # Number of equations
if self.Redundancy(mso) != 1:
raise RuntimeError("The provided set of equations is not an MSO set.")
# Write the system on the matrix form
# x1' x1 x2
# A B C
x1_vars = np.sort(np.where((X == 2).any(axis=0))[0])
x2_vars = np.setdiff1d(range(X.shape[1]), x1_vars)
n1 = len(x1_vars)
n2 = len(x2_vars)
A = (X[:, x1_vars] == 2).astype(int)
B = (X[:, x1_vars] == 1).astype(int)
C = (X[:, x2_vars] > 0).astype(int)
# Differentiate until redundancy
X0 = np.column_stack((A, C, B))
Xd = X0
while redundancy(Xd) == 0:
X0[:, 0:n1] = np.logical_or(X0[:, 0:n1], B)
X0 = np.column_stack((A, C, X0))
Xd = np.block([[np.zeros((Xd.shape[0], n1 + n2)), Xd], [X0]])
# Collect the results
dm = dmperm.GetDMParts(Xd)
num_diff = dm.Mp.row // ne # Number of differentiations
diff_eq = dm.Mp.row % ne # Equation number
# Differential order for e
eq_order = [max([rho for ei, rho in zip(diff_eq, num_diff) if ei == ez]) for ez in np.arange(ne)]
# Differential order for z and f
z_idx = np.where(Z[diff_eq, :].any(axis=0))[0]
z_diff = np.zeros(len(z_idx), dtype=int)
for k, zi in enumerate(z_idx):
ez = np.where(Z[:, zi])[0][0]
z_diff[k] = max([rho for rho, ei in zip(num_diff, diff_eq) if ei == ez])
f_idx = np.where(F[diff_eq, :].any(axis=0))[0]
f_diff = np.zeros(len(f_idx), dtype=int)
for k, fi in enumerate(f_idx):
ez = np.where(F[:, fi])[0][0]
f_diff[k] = min([rho for rho, ei in zip(num_diff, diff_eq) if ei == ez])
return eq_order, z_idx, z_diff, f_idx, f_diff
[docs]
def MeasurementEquations(self, m):
"""Return indices to measurement equations.
Parameters
----------
m : List of names for measurement variables (strings)
"""
mIdx = np.array([self.z.index(zi) for zi in m if zi in self.z])
if len(mIdx) > 0:
return np.where(np.any(self.Z[:, mIdx], axis=1))[0]
else:
return []
[docs]
def SubModel(self, m, v, clear=True, remove=False, verbose=False):
"""Extract submodel.
Parameters
----------
eqs :
Set of indices to or logicals for equations to keep/remove
vars :
Set of indices to or logicals for variables to keep/remove
clear :
If true, non used varaibles in the submodel will be eliminated (default: true)
verbose :
If true, verbose output (default: false)
remove :
If true, supplied equations are removed instead of kept (default: false)
"""
if remove:
eqs = np.array([ei for ei in np.arange(0, self.ne()) if not (ei in m)])
var = np.array([vi for vi in np.arange(0, self.nx()) if not (vi in v)])
else:
eqs = m
var = np.arange(0, self.nx())
if clear:
xIdx = var[np.any(self.X[eqs, :][:, var], axis=0)]
fIdx = np.argwhere(np.any(self.F[eqs, :], axis=0)).flatten()
zIdx = np.argwhere(np.any(self.Z[eqs, :], axis=0)).flatten()
else:
xIdx = var
fIdx = np.arange(0, self.nf())
zIdx = np.arange(0, self.nz())
if verbose and clear:
cxIdx = [xi for xi in var if not (xi in xIdx)]
cfIdx = [fi for fi in np.arange(0, self.nf()) if not (fi in fIdx)]
czIdx = [zi for zi in np.arange(0, self.nf()) if not (zi in zIdx)]
if len(cxIdx) > 0:
sys.stdout.write("Removing unknown variables: ")
for xi in cxIdx:
sys.stdout.write(f"{self.x[xi]} ")
sys.stdout.write("\n")
if len(cfIdx) > 0:
sys.stdout.write("Removing fault variables: ")
for fi in cfIdx:
sys.stdout.write(f"{self.f[fi]} ")
sys.stdout.write("\n")
if len(czIdx) > 0:
sys.stdout.write("Removing known variables: ")
for zi in czIdx:
sys.stdout.write(f"{self.z[zi]} ")
sys.stdout.write("\n")
self.X = self.X[eqs, :][:, xIdx]
self.F = self.F[eqs, :][:, fIdx]
self.Z = self.Z[eqs, :][:, zIdx]
self.e = [self.e[ei] for ei in eqs]
self.x = [self.x[xi] for xi in xIdx]
self.f = [self.f[fi] for fi in fIdx]
self.z = [self.z[zi] for zi in zIdx]
if len(self.syme) > 0:
self.syme = self.syme[eqs]
self.P = np.arange(0, len(xIdx))
self.Pfault = []
[docs]
def IsDynamic(self) -> bool:
"""Return True if model is dynamic."""
return np.any(self.X == 2)
[docs]
def IsStatic(self) -> bool:
"""Return True if model is static."""
return not self.IsDynamic()
[docs]
def Redundancy(self, eqs=None) -> int:
"""Return the redundancy of the model."""
if eqs is None:
eqs = []
if len(eqs) == 0:
eqs = np.arange(0, self.ne())
dm = dmperm.GetDMParts(self.X[eqs, :])
return len(dm.Mp.row) - len(dm.Mp.col)
[docs]
def MTESRedundancy(self) -> int:
"""Return the redundancy of an MTES for the model."""
eqs = np.argwhere(np.any(self.F, axis=1))[:, 0]
return self.Redundancy(eqs) + 1
# def PlotDM(self, **options):
[docs]
def PlotDM(self, ax=None, eqclass=False, fault=False, verbose=False):
"""Plot Dulmage-Mendelsohn decomposition of model structure.
Plots a Dulmage-Mendelsohn decomposition, originally described in
Dulmage, A. and Mendelsohn, N. "Coverings of bipartite graphs."
Canadian Journal of Mathematics 10.4 (1958): 516-534.
Parameters
----------
eqclass : If True, perform canonical decomposition of M+ and
plot equivalence classes
fault : If true, indicates fault equations in canonical
decomposition of M+
verbose : If True, use full variable names in plot. If not specified, heuristic
decides based on model size of variable names should be printed.
"""
# labelVars = False
# if 'verbose' in options:
# labelVars = options['verbose']
# elif self.X.shape[0] < 40:
# labelVars = True
# if 'eqclass' in options:
# eqclass = options['eqclass']
# else:
# eqclass = False
# if 'fault' in options:
# fault = options['fault']
# else:
# fault = False
if not ax:
ax = plt.gca()
smplot.PlotDM(self, ax=ax, verbose=verbose, eqclass=eqclass, fault=fault)
# def PlotModel(self, **options):
[docs]
def PlotModel(self, ax=None, verbose=False):
"""Plot the model structure."""
# labelVars = False
# if 'verbose' in options:
# labelVars = options['verbose']
# elif (self.nx() + self.nf() + self.nz()) < 40:
# labelVars = True
if not ax:
ax = plt.gca()
smplot.PlotModel(self, ax=ax, verbose=verbose)
# def PlotMatching(self, Gamma, **options):
[docs]
def PlotMatching(self, Gamma, verbose=False):
"""Plot a matching.
Plot the structure in an upper-triangular
incidence matrix with the matched variables in the diagonal.
Parameters
----------
Gamma : A matching computed by the Matching class method
"""
q = np.array([], dtype=np.int64)
for g in Gamma.matching:
q = np.concatenate((q, g.col))
# Determine if axis should be labeled
labelVars = False
if verbose:
labelVars = verbose
elif len(q) < 40:
labelVars = True
smplot.PlotMatching(self, Gamma, verbose=labelVars)
[docs]
def PossibleSensorLocations(self, x=None):
"""Set possible sensor locations.
Parameters
----------
x : Specification of possible sensor locations
The sensor positions x can be given either as
indices into model.x or variable names (default: all positions)
"""
if x is None:
self.P = np.arange(0, len(self.x)) # Assume all possible sensor locations
else:
if issubclass(type(x[0]), str): # list of strings
self.P = np.array([self.x.index(xi) for xi in x if xi in self.x])
else:
self.P = x.copy()
[docs]
def SensorLocationsWithFaults(self, x=None):
"""Set possible sensor locations that has faults in new sensors.
Parameters
----------
x : Index to those sensor locations that can become faulty.
If no input argument is given, no sensors may
become faulty. The sensor positions x can be
given either as indices into model.x or variable
names
"""
if x is None:
self.Pfault = []
else:
if issubclass(type(x[0]), str): # list of strings
self.Pfault = np.array([self.x.index(xi) for xi in x if xi in self.x])
else:
self.Pfault = x.copy()
[docs]
def SensorPlacementIsolability(self, isolabilityspecification=None):
"""Compute all minimal sensor sets that achieves maximal fault isolability of the faults in the model.
Krysander, Mattias, and Erik Frisk, "Sensor placement for fault
diagnosis." Systems, Man and Cybernetics, Part A: Systems and Humans,
IEEE Transactions on 38.6 (2008): 1398-1410.
Parameters
----------
isolabilityspecification : Isolability specification, a 0/1-matrix with
a 0 in position (i,j) if fault fi should be
isolable from fault fj; 1 otherwise. Structural
isolability is a symmetric relation; and if
the specification is not symmetric; the
specification is made symmetric. Defaults to
the identity matrix, i.e., full isolability
among faults.
Returns
-------
res - list of all minimal sensor sets, represented by strings
idx - same as res, but represented with indicices into model.f
"""
if isolabilityspecification is None:
Ispec = np.eye(self.nf())
else:
Ispec = isolabilityspecification
return sensplace.SensorPlacementIsolability(self, Ispec)
[docs]
def SensorPlacementDetectability(self):
"""Compute all minimal sensor sets that achieves maximal fault detectability of the faults in the model.
Krysander, Mattias, and Erik Frisk, "Sensor placement for fault
diagnosis." Systems, Man and Cybernetics, Part A: Systems and Humans,
IEEE Transactions on 38.6 (2008): 1398-1410.
Returns
-------
res - list of all minimal sensor sets, represented by strings
idx - same as res, but represented with indicices into model.f
"""
return sensplace.SensorPlacementDetectability(self)
[docs]
def AddSensors(self, sensors, name=None, fault=None):
"""Add sensor equations to a model.
Parameters
----------
sensors : Description of sensors to add s can be a list of strings
with the names of sensors to add or indices into the known
variables (model.x) which sensors to add.
It is only possible to add sensors measuring single
variables in x. If functions of variables in x are
measured, extend the model with a new variable first.
If the corresponding variable can be faulty, see
class method SensorLocationsWithFaults, a fault will be added
automatically.
name : List with names of new sensor variables
fault : List with names of fault variables for new sensors
"""
if fault is None:
fault = []
if name is None:
name = []
if issubclass(type(sensors[0]), str): # list of strings, convert to indices into self.x
s = np.array([self.x.index(xi) for xi in sensors if xi in self.x])
else:
s = sensors
ns = len(s)
nx = self.X.shape[1]
nz = self.Z.shape[1]
nf = self.F.shape[1]
ne = self.X.shape[0]
Xs = np.zeros((ns, nx), np.int64)
Fs = np.zeros((ns, nf), np.int64)
Zs = np.zeros((ns, nz + ns), np.int64)
fs = np.zeros(ns, dtype=bool)
for sIdx, si in enumerate(s):
Xs[sIdx, si] = 1
Zs[sIdx, sIdx + nz] = 1
if si in self.Pfault:
nf = nf + 1
Fs = np.hstack((Fs, np.zeros((ns, 1), dtype=np.int64))) # Add column for new fault
Fs[sIdx, -1] = 1 # Add fault
fs[sIdx] = True
else:
Fs[sIdx, :] = np.zeros((1, nf))
self.X = np.vstack((self.X, Xs))
self.Z = np.hstack((self.Z, np.zeros((ne, ns), dtype=np.int64)))
self.Z = np.vstack((self.Z, Zs))
if np.sum(fs) > 0:
self.F = np.hstack((self.F, np.zeros((ne, np.sum(fs)), dtype=np.int64)))
self.F = np.vstack((self.F, Fs))
self.e = self.e + list(map(lambda x: self.vGen.NewE(), s))
for idx, zi in enumerate(s):
if len(name) == 0:
znum = np.sum(np.array(s)[0:idx] == s[idx]) + 1
if znum > 1:
zName = "z" + str(znum) + self.x[zi]
else:
zName = "z" + self.x[zi]
else:
zName = name[idx]
self.z = self.z + [zName]
fName = "fx"
if fs[idx]:
if len(fault) == 0:
fName = "f" + zName
else:
fName = fault[idx]
self.f = self.f + [fName]
if self.modelType == "Symbolic":
if fs[idx]:
rel = sym.Eq(sym.symbols(zName), sym.symbols(self.x[zi]) + sym.symbols(fName))
else:
rel = sym.Eq(sym.symbols(zName), sym.symbols(self.x[zi]))
self.syme = np.concatenate((self.syme, [rel]))
[docs]
def DetectabilityAnalysis(self, causality="mixed"):
"""Perform a structural detectability analysis.
Parameters
----------
causality : Can be 'mixed' (default), 'int', or 'der' for mixed,
integral, or derivative causality analysis respectively.
For details, see
Frisk, E., Bregon, A., Aaslund, J., Krysander, M.,
Pulido, B., Biswas, G., "Diagnosability analysis
considering causal interpretations for differential
constraints", IEEE Transactions on Systems, Man and
Cybernetics, Part A: Systems and Humans, 2012, 42(5),
1216-1229.
Returns
-------
df : Detectable faults
ndf : Non-detectable faults
"""
# MplusCausal = lambda X: dmperm.Mplus(X, causality=causality)
def MplusCausal(X):
return dmperm.Mplus(X, causality=causality)
dm = MplusCausal(self.X)
df = [self.f[fidx] for fidx in np.arange(0, self.F.shape[1]) if np.argwhere(self.F[:, fidx])[:, 0] in dm]
ndf = [self.f[fidx] for fidx in np.arange(0, self.F.shape[1]) if np.argwhere(self.F[:, fidx])[:, 0] not in dm]
return df, ndf
[docs]
def IsolabilityAnalysis(self, ax=None, permute=True, causality="mixed"):
"""Perform structural single fault isolability analysis of model.
Parameters
----------
ax :
If not None, plot the isolability matrix in the specified axis
permute :
If True, permute the fault variables such that the
isolability matrix gets a block structure for easier
interpretation when plotted. Does not affect the output
argument im, only the plot (default True)
causality :
Can be 'mixed' (default), 'int', or 'der' for mixed,
integral, or derivative causality analysis respectively.
For details, see
Frisk, E., Bregon, A., Aaslund, J., Krysander, M.,
Pulido, B., Biswas, G., "Diagnosability analysis
considering causal interpretations for differential
constraints", IEEE Transactions on Systems, Man and
Cybernetics, Part A: Systems and Humans, 2012, 42(5),
1216-1229.
Returns
-------
im :
Isolability matrix, im(i,j)=1 if fault i can be isolated from fault j, 0 otherwise
"""
# MplusCausal = lambda X: dmperm.Mplus(X, causality=causality)
def MplusCausal(X_i):
return dmperm.Mplus(X_i, causality=causality)
# ne = self.X.shape[0]
nf = len(self.f)
# plusRow = MplusCausal(self.X)
# Determine equations for each fault
# feq = list(map(lambda fi: np.argwhere(self.F[:, fi])[0][0],
# np.arange(0, nf)))
# Determine non-detectable faults
# ndrows = [x for x in np.arange(0, ne) if x not in plusRow]
# ndf = [self.f[fi] for fi in np.arange(0, len(self.f)) if feq[fi] in ndrows]
# df = [self.f[fi] for fi in np.arange(0, len(self.f)) if not feq[fi] in ndrows]
im = np.ones((nf, nf), dtype=np.int64)
for fi in np.arange(0, nf):
# Decouple fi
fieqs = [x[0] for x in np.argwhere(self.F[:, fi] == 0)]
X = self.X[fieqs, :]
plus_row = MplusCausal(X)
fisolrows = [fieqs[ei] for ei in plus_row]
idx = [fj for fj in np.arange(0, nf) if np.any(self.F[fisolrows, :], axis=0)[fj]]
im[idx, fi] = 0
if ax:
if permute:
p, q, _, _, _, _, _ = dmperm.dmperm(im)
else:
p = np.arange(0, nf)
q = p
ax.spy(im[p, :][:, q], markersize=8, marker="o", color="b")
ax.set_xticks(np.arange(0, nf))
ax.set_xticklabels([self.f[fi] for fi in p])
ax.set_yticks(np.arange(0, nf))
ax.set_yticklabels([self.f[fi] for fi in p])
if len(self.name) > 0:
title_string = f"Isolability matrix for '{self.name}'"
else:
title_string = "Isolability matrix"
if causality == "der":
title_string = f"{title_string} (derivative causality)"
elif causality == "int":
title_string = f"{title_string} (integral causality)"
ax.set_title(title_string)
ax.xaxis.tick_bottom()
return im
[docs]
def IsolabilityAnalysisArrs(self, arrs, permute=True, ax=None):
"""Perform structural single fault isolability analysis of a set of ARRs.
Parameters
----------
arrs :
List of ARRs, e.g., list of MSO sets
ax :
If not None, plot the isolability matrix in the specified axis
permute :
If True, permute the fault variables such that the
isolability matrix gets a block structure for easier
interpretation when plotted. Does not affect the output
argument im, only the plot (default True)
Returns
-------
im : Isolability matrix, im(i,j)=1 if fault i can be isolated
from fault j, 0 otherwise
"""
FSM = self.FSM(arrs)
return self.IsolabilityAnalysisFSM(FSM, permute=permute, ax=ax)
[docs]
def IsolabilityAnalysisFSM(self, FSM, permute=True, ax=None):
"""Perform structural single fault isolability analysis of a fault sensitivity matrix (FSM).
Parameters
----------
FSM :
Fault sensitivity matrix
ax :
If not None, plot the isolability matrix in the specified axis
permute :
If True, permute the fault variables such that the
isolability matrix gets a block structure for easier
interpretation when plotted. Does not affect the output
argument im, only the plot (default True)
Returns
-------
im :
Isolability matrix, im(i,j)=1 if fault i can be isolated from fault j, 0 otherwise
"""
nf = FSM.shape[1]
# nr = FSM.shape[0]
im = np.ones((nf, nf), dtype=np.int64)
for f in FSM:
zIdx = np.array([[x0, y0] for x0 in np.where(f > 0)[0] for y0 in np.where(f == 0)[0]])
if len(zIdx) > 0:
im[zIdx[:, 0], zIdx[:, 1]] = 0
if ax:
if permute:
p, q, _, _, _, _, _ = dmperm.dmperm(im)
else:
p = np.arange(0, nf)
q = p
ax.spy(im[p, :][:, q], markersize=10, marker="o")
ax.set_xticks(np.arange(0, self.nf()))
ax.set_xticklabels(self.f)
ax.set_yticks(np.arange(0, self.nf()))
ax.set_yticklabels(self.f)
ax.xaxis.tick_bottom()
if len(self.name) > 0:
ax.set_title("Isolability matrix for a given FSM in '" + self.name + "'")
else:
ax.set_title("Isolability matrix for a given FSM")
return im
[docs]
def SeqResGen(
self,
Gamma,
resEq,
name,
diffres="int",
language="Python",
batch=False,
api="Python",
user_functions=None,
external_src=None,
external_headers=None,
):
"""(Experimental) Generate Python/C code for sequential residual generator.
Given a matching and a residual equation, generate code implementing the
residual generator. Generates Python/C file. How to call/compile the
generated file is described in the generated file (or the user manual).
Parameters
----------
Gamma :
Matching
resEq :
Index to equation to use as residual equation
name :
Name of residual equation. Will also be used as a basis for filename.
diffres :
Can be 'int' or 'der' (default 'int'). Determines how
to treat differential constraints when used as a residual equation.
language :
Defaults to Matlab but also C code can be generated
batch :
Generate a batch mode residual generator, only applicable when generating C code.
Instead of computing the residual for one data samnple, batch mode runs the residual
generator for a whole data set. This can significantly decrease computational time.
user_functions :
Translation dictionary, from user function name to generated code
external_src :
List of Python files with external functions used in residual generator
external_headers :
List of external header files to include in generated C source
"""
if user_functions is None:
user_functions = {}
if external_headers is None:
external_headers = []
if external_src is None:
external_src = []
if api != "Python":
print("Error, only python api supported")
codegen.SeqResGen(
self,
Gamma,
resEq,
name,
diffres=diffres,
language=language,
batch=batch,
user_functions=user_functions,
external_src=external_src,
external_headers=external_headers,
)
[docs]
def Lint(self):
"""Print model information and checks for inconsistencies in model definition."""
war = 0
err = 0
# dm = dmperm.GetDMParts(self.X)
if len(self.name) > 0:
print("Model: " + self.name)
else:
print("Model information")
sys.stdout.write("\n Type:" + self.modelType)
nd = np.sum(self.X == 3)
if nd > 0:
sys.stdout.write(", dynamic\n")
else:
sys.stdout.write(", static\n")
print("\n Variables and equations")
print(" " + str(self.nx()) + " unknown variables")
print(" " + str(self.nz()) + " known variables")
print(" " + str(self.nf()) + " fault variables")
print(" " + str(self.ne()) + " equations, including " + str(nd) + " differential constraints")
print("\n Degree of redundancy: " + str(self.Redundancy()))
if self.Redundancy() > 0 and len(self.f) > 0:
print(" Degree of redundancy of MTES set: " + str(self.MTESRedundancy()))
print("\n")
if self.ne() != self.F.shape[0] or self.ne() != self.Z.shape[0]:
print("Error! Inconsistent numnber of rows in incidence matrices")
err = err + 1
if self.nx() != len(self.x):
print("Error! Inconsistent number of unknown variables")
err = err + 1
if self.nz() != len(self.z):
print("Error! Inconsistent number of known variables")
err = err + 1
if self.nf() != len(self.f):
print("Error! Inconsistent number of fault variables")
err = err + 1
if self.ne() != len(self.e):
print("Error! Inconsistent number of equations")
err = err + 1
if len([v for v in self.P if not (v in np.arange(0, self.nx()))]) > 0:
print("Error! Possible sensor locations outside set of unknown variables")
err = err + 1
if len([v for v in self.Pfault if not (v in np.arange(0, self.nx()))]) > 0:
print("Error! Possible sensor locations with faults outside set of unknown variables")
err = err + 1
if np.any(np.sum(self.F > 0, axis=0) > 1):
print("Error! Fault variables can only appear in 1 equation, rewrite model with intermediate variables")
err = err + 1
xIdx = np.where(np.all(self.X == 0, axis=0))[0]
for ii in xIdx:
print("Warning! Variable " + self.x[ii] + " is not included in model")
war = war + 1
zIdx = np.where(np.all(self.Z == 0, axis=0))[0]
for ii in zIdx:
print("Warning! Variable " + self.z[ii] + " is not included in model")
war = war + 1
fIdx = np.where(np.all(self.F == 0, axis=0))[0]
for ii in fIdx:
print("Warning! Variable " + self.f[ii] + " is not included in model")
war = war + 1
if self.IsUnderdetermined():
print("Warning! Model is underdetermined")
war = war + 1
print(f" Model validation finished with {err} errors and {war} warnings.")
def DiffConstraint(dvar, ivar):
"""Define a differential constraint."""
return [dvar, ivar, "diff"]
def _ModelStructure(rels, x):
"""Compute incidence matrix."""
ne = len(rels)
nx = len(x)
X = np.zeros((ne, nx), dtype="int64")
for k, rel in enumerate(rels):
if IsDifferentialConstraint(rel):
if (rel[0] in x) and (rel[1] in x):
dvIdx = x.index(rel[0])
ivIdx = x.index(rel[1])
X[k, dvIdx] = 3
X[k, ivIdx] = 2
else:
X[k, _RelModelStructure(rel, x)] = 1
return X
def _RelModelStructure(rel, x):
"""Compute symbolic model structure."""
if IsSymbolic(rel):
relVars = [str(e) for e in rel.atoms(sym.Symbol)]
else:
relVars = rel
return [xi for xi in range(0, len(x)) if x[xi] in relVars]
def IsDifferentialConstraint(rel):
"""Return true if relation is differential constraint."""
return isinstance(rel, list) and (len(rel) == 3) and (rel[2] == "diff")
def IsSymbolic(v):
"""Return true if variable is symbolic."""
return isinstance(v, sym.core.Expr)
# return isinstance(v, tuple(sym.core.core.all_classes))
def _ToEquations(rels):
"""Convert relation into symbolic equations."""
def _ToEquation(rel):
if IsSymbolic(rel) and not isinstance(rel, sym.Equality):
return sym.Eq(rel, 0)
else:
return rel
return list(map(lambda r: _ToEquation(r), rels))