"""Dulmage-Mendelsohn an MSO functionality."""
import faultdiagnosistoolbox.dmpermlib as dmpermlib
import scipy.sparse as sp
import numpy as np
import copy
from dataclasses import dataclass, field
def CSCDict(A):
"""Compressed matrix format."""
return {
"nzmax": A.nnz,
"m": A.shape[0],
"n": A.shape[1],
"p": A.indptr.astype(np.int64),
"i": A.indices.astype(np.int64),
"x": A.data.astype(np.float64),
"nz": -1,
}
def dmperm(A) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Dulmage-Mendelsohn decomposition of matrix.
p, q, r, s, cc, rr, m = dmperm(A) finds the Dulmage-Mendelsohn decomposition
of A. p and q are row and column permutation vectors, respectively
such that A[p][:, q] has block upper triangular form. r and s are vectors
indicating the block boundaries for the fine decomposition. cc and rr
are vectors of length five indicating the block boundaries of the
coarse decomposition. m is a maximum matching such that m[j] = i if column j
is matched to row i, or -1 if column j is unmatched.
"""
if sp.issparse(A):
return dmpermlib.dmperm(A)
else:
return dmpermlib.dmperm(sp.csc_matrix(A))
[docs]
def srank(A) -> int:
"""Compute structural rank of matrix."""
# _, _, _, _, rr, _ = dmperm(A)
# return rr[3]
# Extract matching vector m(i) = j
# if column j is matched to row i, or -1 if column j is unmatched.
#
# m = np.full(hod.shape[1], -1)
# m[p[rr[0]:rr[1]]] = q[cc[1]:cc[2]]
# m[p[rr[1]:rr[2]]] = q[cc[2]:cc[3]]
# m[p[rr[2]:rr[3]]] = q[cc[3]:cc[4]]
# return sum(m >= 0)
# _, _, _, _, cc, _, _ = dmperm(A)
# return sum(np.diff(cc[1:]))
_, _, _, _, _, _, m = dmperm(A)
return sum(m >= 0)
@dataclass
class EqBlock:
"""EqBlock class."""
row: np.array = field(default_factory=lambda: np.array([], dtype=np.int64))
col: np.array = field(default_factory=lambda: np.array([], dtype=np.int64))
@dataclass
class DMResult:
"""Dulmage-Mendelsohn decomposition base class."""
Mm: EqBlock = field(default_factory=lambda: EqBlock([], []))
M0: list = field(default_factory=lambda: [])
Mp: EqBlock = field(default_factory=lambda: EqBlock([], []))
rowp: list = field(default_factory=lambda: [])
colp: list = field(default_factory=lambda: [])
M0eqs: list = field(default_factory=lambda: [])
M0vars: list = field(default_factory=lambda: [])
def MSO(X):
"""Compute set of MSO sets."""
if sp.issparse(X):
return dmpermlib.MSO(X)
else:
return dmpermlib.MSO(sp.csc_matrix(X))
[docs]
def GetDMParts(X) -> DMResult:
"""Compute Dulmage-Mendelsohn decomposition."""
if sp.issparse(X):
p, q, r, s, _, _, _ = dmperm(X)
else:
p, q, r, s, _, _, _ = dmperm(sp.csc_matrix(X))
m = X.shape[0]
n = X.shape[1]
res = DMResult()
if (p.size == 0) or (q.size == 0):
if m > n:
res.Mm = EqBlock([], [])
res.Mp = EqBlock(np.arange(0, m), np.arange(0, n))
else:
res.Mp = EqBlock([], [])
res.Mm = EqBlock(np.arange(0, m), np.arange(0, n))
res.rowp = np.arange(1, m)
res.colp = np.arange(1, n)
res.M0 = []
else:
idx = 0
if s[1] > r[1]: # Existance of M-
res.Mm = EqBlock(np.sort(p[r[0] : r[1]]), np.sort(q[s[0] : s[1]]))
idx = idx + 1
else:
res.Mm = EqBlock([], [])
while (idx < r.size - 1) and (r[idx + 1] - r[idx]) == (s[idx + 1] - s[idx]):
# M0 block exists
res.M0.append(EqBlock(np.sort(p[r[idx] : r[idx + 1]]), np.sort(q[s[idx] : s[idx + 1]])))
idx = idx + 1
if idx < r.size - 1: # M+ exists
res.Mp = EqBlock(np.sort(p[r[idx] : r[idx + 1]]), np.sort(q[s[idx] : s[idx + 1]]))
else:
res.Mp = EqBlock([], [])
res.rowp = p
res.colp = q
res.M0eqs = np.array([], dtype=np.int64)
res.M0vars = np.array([], dtype=np.int64)
for hc in res.M0:
res.M0eqs = np.concatenate((res.M0eqs, hc.row))
res.M0vars = np.concatenate((res.M0vars, hc.col))
return res
def PSODecomposition(X) -> dict[list[EqBlock], np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Compute PSO decomposition."""
if not IsPSO(X):
print("PSO Decomposition for PSO structures only, exiting...")
return
n = X.shape[0]
m = X.shape[1]
delrows = np.arange(0, n)
eqclass = []
trivclass = np.array([], dtype=np.int64)
Mi = np.array([])
while len(delrows) > 0:
temprow = np.array([x for x in np.arange(0, n) if not x == delrows[0]])
dm = GetDMParts(X[temprow, :])
if len(dm.M0vars) > 0:
ec_row = np.sort(np.append(temprow[dm.M0eqs], delrows[0]))
ec = EqBlock(ec_row, np.sort(dm.M0vars))
eqclass.append(ec)
Mi = np.concatenate((Mi, ec.col)).astype(np.int64)
delrows = [x for x in delrows if x not in ec.row]
else:
trivclass = np.concatenate((trivclass, [delrows[0]]))
delrows = delrows[1 : len(delrows)]
X0 = np.sort([x for x in np.arange(0, m) if not (x in Mi)])
if len(X0) == 0:
X0 = []
res = {"eqclass": eqclass, "trivclass": trivclass, "X0": X0}
p = np.array([], dtype=np.int64)
q = np.array([], dtype=np.int64)
for ec in eqclass:
p = np.concatenate((p, ec.row))
q = np.concatenate((q, ec.col))
p = np.concatenate((p, trivclass))
q = np.concatenate((q, X0))
res["p"] = p
res["q"] = q
return res
def IsPSO(X, eq=None) -> bool:
"""Return True if PSO."""
if eq is None:
eqs = np.arange(0, X.shape[0])
else:
eqs = eq
dm = GetDMParts(X[eqs, :])
return (len(dm.Mm.row) == 0) and (len(dm.M0) == 0)
def IsObservable(Xin, eq=None) -> bool:
"""Return True if observable."""
def DiffConstraints(X):
diffEqs = np.where(np.any(X == 2, axis=1))[0]
algEqs = np.array([ei for ei in np.arange(0, ne) if not (ei in diffEqs)])
x1Idx = np.argwhere(np.any(X[diffEqs, :] == 2, axis=0)).flatten()
dx1Idx = np.array(list(map(lambda x1i: np.argwhere(X[np.argwhere(X[:, x1i] == 2)[0][0], :] == 3)[0][0], x1Idx)))
x2Idx = np.array([xi for xi in np.arange(0, nx) if (not (xi in x1Idx)) and (not (xi in dx1Idx))])
return diffEqs, algEqs, x1Idx, dx1Idx, x2Idx
if eq is None:
X = Xin
else:
X = Xin[eq, :]
# Remove zero columns
zc = np.where(np.all(X == 0, axis=0))[0]
X = np.delete(X, zc, axis=1)
ne = X.shape[0]
nx = X.shape[1]
diffEqs, algEqs, x1Idx, dx1Idx, x2Idx = DiffConstraints(X)
n1 = len(x1Idx)
n2 = len(x2Idx)
# nalg = len(algEqs)
# Model format:
# x1' = dx1
# 0 = A11 x1 + A12 x2 + A13 dx1
#
# AC-sF=0
A = X[algEqs, :][:, np.hstack((x1Idx, x2Idx, dx1Idx)).astype(np.int64)]
A11 = A[:, np.arange(0, n1)]
A12 = A[:, np.arange(n1, n1 + n2)]
A13 = A[:, np.arange(n1 + n2, 2 * n1 + n2)]
AC = np.vstack((np.hstack((np.zeros((n1, n1 + n2)), np.eye(n1))), A)).astype(np.int64)
F = np.vstack((np.hstack((np.eye(n1), np.zeros((n1, n1 + n2)))), np.zeros((ne - n1, n1 + n2 + n1)))).astype(
np.int64
)
# Condition 1: srank(A-sF)=n1+n2
obs = srank(np.hstack((np.logical_or(A11, A13), A12))) == n1 + n2
if obs:
# Condition 2: v(AC)=2*n1+n2
obs = srank(AC) == 2 * n1 + n2
if obs:
# Condition 3: no s-argc in Hall components of AC-sF
G3 = np.maximum(AC, 2 * F)
dm = GetDMParts(G3)
if len(dm.M0) > 0:
hc_sarcs = list(map(lambda hc: np.any(G3[hc.row, :][:, hc.col] == 2), dm.M0))
obs = len(hc_sarcs) == 0 or (not np.any(hc_sarcs))
return obs
def IsHighIndex(X, eq=None) -> bool:
"""Return True if high differential index."""
if eq is None:
eq = np.arange(0, X.shape[0])
X1 = X[eq, :]
col_d1 = np.any(X1 == 3, axis=0)
row_a = np.all(X1 <= 1, axis=1)
row_d = np.any(X1 > 1, axis=1)
col_2 = np.all(X1[row_d, :] == 0, axis=0)
Xhod = X1[row_a, :][:, np.logical_or(col_d1, col_2)]
nz = np.sum(np.all(Xhod == 0, axis=0))
return srank(Xhod) < Xhod.shape[1] - nz
def IsLowIndex(X, eq=None) -> bool:
"""Return True if low differential index."""
if eq is None:
eq = np.arange(0, X.shape[0])
return not IsHighIndex(X, eq)
def Mplus(X, causality="mixed"):
"""Compute over-determined part."""
def Gp(Gi):
dm = GetDMParts(Gi[2])
if len(dm.Mp.row) == 0 or len(dm.Mp.col) == 0:
return np.array([]), np.array([]), np.array([[]])
G1 = copy.deepcopy(Gi)
return (G1[0][dm.Mp.row], G1[1][dm.Mp.col], G1[2][dm.Mp.row, :][:, dm.Mp.col])
def Gm(Gi):
dm = GetDMParts(Gi[2])
if len(dm.Mm.row) == 0 or len(dm.Mm.col) == 0:
return np.array([]), np.array([]), np.array([[]])
G1 = copy.deepcopy(Gi)
return (G1[0][dm.Mm.row], G1[1][dm.Mm.col], G1[2][dm.Mm.row, :][:, dm.Mm.col])
def G0(Gi):
dm = GetDMParts(Gi[2])
if len(dm.M0eqs) == 0 or len(dm.M0vars) == 0:
return np.array([]), np.array([]), np.array([[]])
G1 = copy.deepcopy(Gi)
return (G1[0][dm.M0eqs], G1[1][dm.M0vars], G1[2][dm.M0eqs, :][:, dm.M0vars])
# def Gadd(G1, G2):
# c1, x1, A1 = copy.deepcopy(G1)
# c2, x2, A2 = copy.deepcopy(G2)
#
# c1 = np.unique(np.concatenate((c1, c2)))
# x1 = np.unique(np.concatenate((x1, x2)))
# A1[A2 == 1] = 1
# A1[A2 == 2] = 2
# return c1, x1, A1
def CGX(Gi, X):
if len(X) == 0:
return []
xIdx = [np.where(Gi[1] == ii)[0][0] for ii in X]
idx = np.unique([e[0] for e in np.argwhere(Gi[2][:, xIdx] > 0)])
if len(idx) == 0:
return []
return Gi[0][idx]
def CGE(Gi, Ei):
return Gi[0][np.where(np.any(Ei, axis=1))[0]]
def GsubC(Gi, C):
c, x, A = copy.deepcopy(Gi)
Cidx = list(map(lambda ci: np.argwhere(Gi[0] == ci)[0][0], C))
A = np.delete(A, Cidx, axis=0)
c = np.array([ci for ci in c if not (ci in C)])
return c, x, A
def GsubX(Gi, X):
c, x, A = copy.deepcopy(Gi)
Xidx = list(map(lambda xi: np.argwhere(Gi[1] == xi)[0][0], X))
A = np.delete(A, Xidx, axis=1)
x = np.array([xi for xi in x if not (xi in X)])
return c, x, A
Xc = np.array(X.copy())
# Represent graph as a tuple G=(constraints,variables, adjacency matrix)
G = (np.arange(0, Xc.shape[0], dtype=np.int64), np.arange(0, Xc.shape[1], dtype=np.int64), Xc)
if causality == "mixed":
return Gp(G)[0]
elif causality == "int":
while True:
# G := G+
G = Gp(G)
# G1 = G - Ed
G1 = copy.deepcopy(G)
G1[2][G1[2] == 3] = 0 # Zero the derivative edges
# G := G - C(G,X(G1-))
dm = GetDMParts(G1[2])
if len(dm.Mm.col) > 0 and len(dm.Mm.row) > 0:
G1m = Gm(G1)
G = GsubC(G, CGX(G, G1m[1]))
else:
break
return G[0]
elif causality == "der":
Xc = []
while True:
# Gnc := G - Xc
Gnc = GsubX(G, Xc)
# Gni := Gnc - C(Gnc,Ei)
Ei = Gnc[2].copy()
Ei[Ei != 2] = 0
Gni = GsubC(Gnc, CGE(Gnc, Ei))
# Xc:= Xc u X(Gni+ u Gni0)
Gnip = Gp(Gni)
Gni0 = G0(Gni)
X1 = np.unique(np.concatenate((Gnip[1], Gni0[1]))).astype(np.int64) # X1 = X(Gni+ u Gni0)
Xc = np.unique(np.concatenate((Xc, X1))).astype(np.int64)
if len(X1) == 0:
break
# G := (G-C(G,X\Xc))+
X1 = np.array([xi for xi in G[1] if not (xi in Xc)]) # X1 = X\Xc
Gres = Gp(GsubC(G, CGX(G, X1)))
return Gres[0]
else:
return np.array([])
def MixedCausalityMatching(Gin):
"""Compute mixed causality matching."""
def FindAdmissibleIntEdge(G):
X = G[2]
dm = GetDMParts(X)
for hallComponent in dm.M0:
Ei = np.argwhere(X[hallComponent.row, :][:, hallComponent.col] == 2)
if len(Ei) > 0:
Ei = Ei[0] # Get the first one
return np.array([G[0][hallComponent.row[Ei[0]]], G[1][hallComponent.col[Ei[1]]]])
return []
G = [Gin[0].copy(), Gin[1].copy(), Gin[2].copy()]
Gamma = []
e = FindAdmissibleIntEdge(G)
while len(e) > 0:
if len(Gamma) == 0:
Gamma = [e]
else:
Gamma = np.concatenate((Gamma, [e]), axis=0)
# G := G - C({e}) - X({e})
rowIdx = np.argwhere(G[0] == e[0])[0, 0]
colIdx = np.argwhere(G[1] == e[1])[0, 0]
G[0] = np.delete(G[0], rowIdx)
G[1] = np.delete(G[1], colIdx)
G[2] = np.delete(G[2], rowIdx, axis=0)
G[2] = np.delete(G[2], colIdx, axis=1)
e = FindAdmissibleIntEdge(G)
dm = GetDMParts(G[2])
Gamma_p = np.stack((G[0][dm.rowp][:], G[1][dm.colp][:]), axis=1)
return np.flipud(np.concatenate((Gamma, Gamma_p), axis=0))
def MTES(self):
"""Compute set of MTES sets."""
S = {"eq": [], "f": [], "sr": []}
m = MTES_initModel(self) # overdetermined or empty
if not (m is None) and m["sr"] > 0 and len(m["f"]) > 0:
S = MTES_FindMTES(m, 0)
# return np.array(S['eq'], dtype=object)
return S["eq"]
def MTES_storeFS(m):
"""internal."""
# eq = np.sort(np.hstack(m['e'])).tolist()
# f = np.sort(np.hstack(m['f'])).tolist()
eq = np.sort(np.hstack(m["e"]))
f = np.sort(np.hstack(m["f"]))
return {"eq": [eq], "f": [f], "sr": [m["sr"]]}
def MTES_initModel(model):
"""internal."""
dm = GetDMParts(model.X)
row_over = dm.Mp.row
col_over = dm.Mp.col
ef = np.argwhere(np.any(model.F, axis=1)).flatten()
ef = np.intersect1d(ef, row_over)
idx = np.hstack((ef, np.setdiff1d(row_over, ef)))
# m = {}
# m['sr'] = len(row_over) - len(col_over)
# m['X'] = model.X[idx, :][:, col_over]
# m['f'] = list(map(lambda fi: np.argwhere(fi)[0], model.F[ef, :]))
# m['delrow'] = 0
# m['e'] = list(map(lambda ei: np.array([ei]), idx))
if len(row_over) > 0:
m = {
"sr": len(row_over) - len(col_over),
"X": model.X[idx, :][:, col_over],
"f": list(map(lambda fi: np.argwhere(fi)[0], model.F[ef, :])),
"delrow": 0,
"e": list(map(lambda ei: np.array([ei]), idx)),
}
else:
m = None
return m
def MTES_GetPartialModel(m, rows):
"""internal."""
n = {}
variables = np.any(m["X"][rows, :], axis=0)
n["X"] = m["X"][rows, :][:, variables]
# n['e'] = list(np.array(m['e'])[rows])
n["e"] = [m["e"][k] for k in range(len(m["e"])) if k in rows]
# n['f'] = list(np.array(m['f'])[[ei for ei in rows if ei < len(m['f'])]])
n["f"] = [m["f"][ei] for ei in rows if ei < len(m["f"])]
n["sr"] = n["X"].shape[0] - n["X"].shape[1]
n["delrow"] = np.sum(rows < m["delrow"])
return n
def MTES_FindMTES(m, p):
"""internal."""
m, _ = MTES_LumpExt(m, 0)
if len(m["f"]) == 1: # m is an MTES
S = MTES_storeFS(m)
else: # recurse
if p == 1:
S = MTES_storeFS(m)
else:
S = {"eq": [], "f": [], "sr": []}
row = m["delrow"]
while len(m["f"]) > row:
m, row = MTES_LumpExt(m, row)
for delrow in np.arange(m["delrow"], len(m["f"])):
# create the model where delrow has been removed
m["delrow"] = delrow
rows = np.delete(np.arange(0, m["X"].shape[0]), delrow)
n = MTES_GetPartialModel(m, rows)
Sn = MTES_FindMTES(n, p) # make recursive call
S = MTES_addResults(S, Sn)
return S
def MTES_LumpExt(m, row):
"""MTES lumping."""
n = {}
no_rows = m["X"].shape[0]
remRows = np.hstack((np.arange(0, row), np.arange(row + 1, no_rows)))
remRowsf = np.hstack((np.arange(0, row), np.arange(row + 1, len(m["f"]))))
dm = GetDMParts(m["X"][remRows, :])
row_just = dm.M0eqs
row_over = np.array(dm.Mp.row, dtype=np.int64)
col_over = np.array(dm.Mp.col, dtype=np.int64)
if len(row_just) > 0:
eqcls = np.hstack((remRows[row_just], [row]))
no_rows_before_row = np.sum(eqcls < row)
row = row - no_rows_before_row
no_rows_before = np.sum(eqcls < m["delrow"])
n["delrow"] = m["delrow"] - no_rows_before
eqclsf = eqcls[eqcls < len(m["f"])]
row_overf = row_over[row_over < len(remRowsf)]
if no_rows_before > 0:
rowinsert = n["delrow"]
else:
rowinsert = row
x1 = m["X"][remRows[row_over[0:rowinsert]], :][:, col_over]
x3 = m["X"][remRows[row_over[rowinsert:]], :][:, col_over]
x2 = np.any(m["X"][eqcls, :][:, col_over], axis=0).astype(np.int64)
n["X"] = np.vstack((x1, x2, x3))
foo = (
remRows[row_over[0:rowinsert]].astype(np.int64)
if len(row_over[0:rowinsert]) > 0
else np.array([], dtype=np.int64)
)
# e1 = list(np.array(m['e'], dtype=np.ndarray)[foo])
e1 = [m["e"][idx] for idx in foo]
# e2 = list([np.hstack(np.array(m['e'], dtype=np.ndarray)[eqcls])])
e2 = list([np.hstack([m["e"][idx] for idx in eqcls])])
foo = (
remRows[row_over[rowinsert:]].astype(np.int64)
if len(row_over[rowinsert:]) > 0
else np.array([], dtype=np.int64)
)
# e3 = list(np.array(m['e'], dtype=np.ndarray)[foo])
e3 = [m["e"][idx] for idx in foo]
n["e"] = e1 + e2 + e3
foo = (
remRowsf[row_overf[0:rowinsert]].astype(np.int64)
if len(row_overf[0:rowinsert]) > 0
else np.array([], dtype=np.int64)
)
ef1 = list(np.array(m["f"], dtype=np.ndarray)[foo])
ef2 = list([np.hstack(np.array(m["f"], dtype=np.ndarray)[eqclsf])])
foo = (
remRowsf[row_overf[rowinsert:]].astype(np.int64)
if len(row_overf[rowinsert:]) > 0
else np.array([], dtype=np.int64)
)
ef3 = list(np.array(m["f"], dtype=np.ndarray)[foo])
n["f"] = ef1 + ef2 + ef3
n["sr"] = m["sr"]
if no_rows_before > 0:
n["delrow"] = n["delrow"] + 1
else:
n = m
row = row + 1
return n, row
def MTES_addResults(S, Sn):
"""internal."""
return {"eq": S["eq"] + Sn["eq"], "f": S["f"] + Sn["f"], "sr": S["sr"] + Sn["sr"]}