"""Data structure for a simplicial complex."""
from itertools import combinations
from typing import Hashable, Iterable
import numpy as np
from scipy.sparse import csr_matrix
from pytspl.decomposition.eigendecomposition import (
get_curl,
get_curl_eigenpair,
get_divergence,
get_gradient_eigenpair,
get_harmonic_eigenpair,
get_total_variance,
)
from pytspl.decomposition.frequency_component import FrequencyComponent
from pytspl.decomposition.hodge_decomposition import (
get_curl_flow,
get_gradient_flow,
get_harmonic_flow,
)
[docs]
class SimplicialComplex:
"""Data structure class for a simplicial complex."""
def __init__(
self,
nodes: list = [],
edges: list = [],
triangles: list = [],
node_features: dict = {},
edge_features: dict = {},
):
"""
Create a simplicial complex from nodes, edges, and triangles.
Args:
nodes (list, optional): List of nodes. Defaults to [].
edges (list, optional): List of edges. Defaults to [].
triangles (list, optional): List of triangles. Defaults to [].
node_features (dict, optional): Dict of node features.
Defaults to {}.
edge_features (dict, optional): Dict of edge features.
Defaults to {}.
"""
self.nodes = nodes
self.edges = edges
self.triangles = triangles
self.node_features = node_features
self.edge_features = edge_features
self.B1 = self.edges_to_B1(edges, len(nodes))
self.B2 = self.triangles_to_B2(triangles, edges)
[docs]
def print_summary(self):
"""
Print the summary of the simplicial complex.
"""
print(f"Num. of nodes: {len(self.nodes)}")
print(f"Num. of edges: {len(self.edges)}")
print(f"Num. of triangles: {len(self.triangles)}")
print(f"Shape: {self.shape}")
print(f"Max Dimension: {self.max_dim}")
[docs]
def edges_to_B1(self, edges: list, num_nodes: int) -> np.ndarray:
"""
Create the B1 matrix (node-edge) from the edges.
Args:
edges (list): List of edges.
num_nodes (int): Number of nodes.
Returns:
np.ndarray: B1 matrix.
"""
B1 = np.zeros((num_nodes, len(edges)))
for j, edge in enumerate(edges):
from_node, to_node = edge
B1[from_node, j] = -1
B1[to_node, j] = 1
return B1
[docs]
def triangles_to_B2(self, triangles: list, edges: list) -> np.ndarray:
"""
Create the B2 matrix (edge-triangle) from the triangles.
Args:
triangles (list): List of triangles.
edges (list): List of edges.
Returns:
np.ndarray: B2 matrix.
"""
B2 = np.zeros((len(edges), len(triangles)))
for j, triangle in enumerate(triangles):
a, b, c = triangle
try:
index_a = edges.index((a, b))
except ValueError:
index_a = edges.index((b, a))
try:
index_b = edges.index((b, c))
except ValueError:
index_b = edges.index((c, b))
try:
index_c = edges.index((a, c))
except ValueError:
index_c = edges.index((c, a))
B2[index_a, j] = 1
B2[index_c, j] = -1
B2[index_b, j] = 1
return B2
[docs]
def generate_coordinates(self) -> dict:
"""
Generate the coordinates of the nodes using spring layout
if the coordinates of the sc don't exist.
Returns:
dict: Coordinates of the nodes.
"""
import networkx as nx
print("WARNING: No coordinates found.")
print("Generating coordinates using spring layout.")
G = nx.Graph()
G.add_nodes_from(self.nodes)
G.add_edges_from(self.edges)
coordinates = nx.spring_layout(G)
return coordinates
@property
def shape(self) -> tuple:
"""Return the shape of the simplicial complex."""
return (len(self.nodes), len(self.edges), len(self.triangles))
@property
def max_dim(self) -> int:
"""Return the maximum dimension of the simplicial complex."""
return max(len(simplex) for simplex in self.simplices) - 1
@property
def simplices(self) -> list[tuple]:
"""
Get all the simplices of the simplicial complex.
This includes 0-simplices (nodes), 1-simplices (edges), 2-simplices.
Returns:
list[tuple]: List of simplices.
"""
nodes = [(node,) for node in self.nodes]
return nodes + self.edges + self.triangles
[docs]
def edge_feature_names(self) -> list[str]:
"""Return the list of edge feature names."""
if len(self.get_edge_features()) == 0:
return []
return list(list(self.get_edge_features().values())[0].keys())
[docs]
def get_node_features(self) -> list[dict]:
"""Return the list of node features."""
return self.node_features
[docs]
def get_edge_features(self, name: str = None) -> list[dict]:
"""Return the list of edge features."""
edge_features = self.edge_features
if name:
try:
return {
key: value[name] for key, value in edge_features.items()
}
except KeyError:
raise KeyError(
f"Edge feature {name} does not exist in"
+ "the simplicial complex."
)
else:
return edge_features
[docs]
def get_faces(self, simplex: Iterable[Hashable]) -> set[tuple]:
"""
Return the faces of the simplex in order.
Args:
simplex (Iterable[Hashable]): Simplex for which to find the faces.
Returns:
set[tuple]: Set of faces of the simplex.
"""
face_set = set()
num_of_nodes = len(simplex)
for r in range(num_of_nodes, 0, -1):
for face in combinations(simplex, r):
face_set.add(tuple(sorted(face)))
k = len(simplex) - 1
face_set = sorted([face for face in face_set if len(face) == k])
return face_set
[docs]
def identity_matrix(self) -> np.ndarray:
"""Identity matrix of the simplicial complex."""
return np.eye(len(self.nodes))
[docs]
def tocsr(self, matrix: np.ndarray) -> csr_matrix:
"""
Convert a numpy array to a csr_matrix.
Args:
matrix (np.ndarray): Numpy array to convert.
Returns:
csr_matrix: Compressed Sparse Row matrix.
"""
return csr_matrix(matrix, dtype=float)
[docs]
def incidence_matrix(self, rank: int) -> csr_matrix:
"""
Compute the incidence matrix of the simplicial complex.
Args:
rank (int): Rank of the incidence matrix.
Returns:
csr_matrix: Incidence matrix of the simplicial complex.
"""
if rank == 0:
return np.ones(len(self.nodes), dtype=float)
elif rank == 1:
return self.tocsr(self.B1)
elif rank == 2:
return self.tocsr(self.B2)
else:
raise ValueError(
"Rank cannot be larger than the dimension of the complex."
)
[docs]
def adjacency_matrix(self) -> csr_matrix:
"""
Compute the adjacency matrix of the simplicial complex.
Returns:
csr_matrix: Adjacency matrix of the simplicial complex.
"""
adjacency_mat = np.zeros((self.B1.shape[0], self.B1.shape[0]))
for col in range(self.B1.shape[1]):
col_nonzero = np.where(self.B1[:, col] != 0)[0]
from_node, to_node = col_nonzero[0], col_nonzero[1]
adjacency_mat[from_node, to_node] = 1
adjacency_mat[to_node, from_node] = 1
adjacency_mat = csr_matrix(adjacency_mat)
return adjacency_mat
[docs]
def laplacian_matrix(self) -> csr_matrix:
"""
Compute the Laplacian matrix of the simplicial complex.
Returns:
csr_matrix: Laplacian matrix of the simplicial complex.
"""
B1 = self.incidence_matrix(rank=1)
return B1 @ B1.T
[docs]
def lower_laplacian_matrix(self, rank: int = 1) -> csr_matrix:
"""
Compute the lower Laplacian matrix of the simplicial complex.
Args:
rank (int): Rank of the lower Laplacian matrix.
ValueError:
If the rank is not 1 or 2.
Returns:
csr_matrix: Lower Laplacian matrix of the simplicial complex.
"""
if rank == 1:
B1 = self.incidence_matrix(rank=1)
return B1.T @ B1
elif rank == 2:
B2 = self.incidence_matrix(rank=2)
return B2.T @ B2
else:
raise ValueError("Rank must be either 1 or 2.")
[docs]
def upper_laplacian_matrix(self, rank: int = 1) -> csr_matrix:
"""
Compute the upper Laplacian matrix of the simplicial complex.
Args:
rank (int): Rank of the upper Laplacian matrix.
ValueError:
If the rank is not 0 or 1.
Returns:
csr_matrix: Upper Laplacian matrix of the simplicial complex.
"""
if rank == 0:
return self.laplacian_matrix()
elif rank == 1:
B2 = self.incidence_matrix(rank=2)
return B2 @ B2.T
else:
raise ValueError("Rank must be either 0 or 1.")
[docs]
def hodge_laplacian_matrix(self, rank: int = 1) -> csr_matrix:
"""
Compute the Hodge Laplacian matrix of the simplicial complex.
Args:
rank (int): Rank of the Hodge Laplacian matrix.
ValueError:
If the rank is not 0, 1, or 2.
Returns:
csr_matrix: Hodge Laplacian matrix of the simplicial complex.
"""
if rank == 0:
return self.laplacian_matrix()
elif rank == 1:
return self.lower_laplacian_matrix(
rank=rank
) + self.upper_laplacian_matrix(rank=rank)
else:
raise ValueError("Rank must be between 0 and 2.")
[docs]
def apply_lower_shifting(
self, flow: np.ndarray, steps: int = 1
) -> np.ndarray:
"""
Apply the lower shifting operator to the simplicial complex.
Args:
flow (np.ndarray): Flow on the simplicial complex.
steps (int): Number of times to apply the lower shifting operator.
Defaults to 1.
Returns:
np.ndarray: Lower shifted simplicial complex.
"""
L1L = self.lower_laplacian_matrix(rank=1)
if steps == 1:
# L(1, l) @ f
flow = L1L @ flow
else:
# L(1, l)**2 @ f
flow = L1L @ (L1L @ flow)
return flow
[docs]
def apply_upper_shifting(
self, flow: np.ndarray, steps: int = 1
) -> np.ndarray:
"""
Apply the upper shifting operator to the simplicial complex.
Args:
flow (np.ndarray): Flow on the simplicial complex.
steps (int): Number of times to apply the upper shifting operator.
Defaults to 1.
Returns:
np.ndarray: Upper shifted simplicial complex.
"""
L1U = self.upper_laplacian_matrix(rank=1)
if steps == 1:
# L(1, u) @ f
flow = L1U @ flow
else:
# L(1, u)**2 @ f
flow = L1U @ (L1U @ flow)
return flow
[docs]
def apply_k_step_shifting(
self, flow: np.ndarray, steps: int = 2
) -> np.ndarray:
"""
Apply the k-step shifting operator to the simplicial complex.
Args:
flow (np.ndarray): Flow on the simplicial complex.
Returns:
np.ndarray: k-step shifted simplicial complex.
"""
two_step_lower_shifting = self.apply_lower_shifting(flow, steps=steps)
two_step_upper_shifting = self.apply_upper_shifting(flow, steps=steps)
return two_step_lower_shifting + two_step_upper_shifting
[docs]
def get_simplicial_embeddings(self, flow: np.ndarray) -> tuple:
"""
Return the simplicial embeddings of the simplicial complex.
Args:
flow (np.ndarray): Flow on the simplicial complex.
Returns:
np.ndarray: Simplicial embeddings of the simplicial complex.
Harmonic embedding, curl embedding, and gradient embedding.
"""
k = 1
L1 = self.hodge_laplacian_matrix(rank=k).toarray()
L1U = self.upper_laplacian_matrix(rank=k).toarray()
L1L = self.lower_laplacian_matrix(rank=k).toarray()
# eigendecomposition
u_h, _ = get_harmonic_eigenpair(L1, tolerance=1e-3)
u_c, _ = get_curl_eigenpair(L1U, 1e-3)
u_g, _ = get_gradient_eigenpair(L1L, 1e-3)
# each entry of an embedding represents the weight the flow has on the
# corresponding eigenvector
f_tilda_h = u_h.T @ flow
f_tilda_c = u_c.T @ flow
f_tilda_g = u_g.T @ flow
return f_tilda_h, f_tilda_c, f_tilda_g
[docs]
def get_component_eigenpair(
self,
component: str = FrequencyComponent.HARMONIC.value,
tolerance: float = 1e-3,
) -> tuple:
"""
Return the eigendecomposition of the simplicial complex.
Args:
component (str, optional): Component of the eigendecomposition
to return. Defaults to "harmonic".
tolerance (float, optional): Tolerance for eigenvalues to be
considered zero. Defaults to 1e-3.
ValueError:
If the component is not one of 'harmonic', 'curl', or 'gradient'.
Returns:
np.ndarray: Eigenvectors of the component.
np.ndarray: Eigenvalues of the component.
"""
if component == FrequencyComponent.HARMONIC.value:
L1 = self.hodge_laplacian_matrix(rank=1).toarray()
u_h, eig_h = get_harmonic_eigenpair(L1, tolerance)
return u_h, eig_h
elif component == FrequencyComponent.CURL.value:
L1U = self.upper_laplacian_matrix(rank=1).toarray()
u_c, eig_c = get_curl_eigenpair(L1U, tolerance)
return u_c, eig_c
elif component == FrequencyComponent.GRADIENT.value:
L1L = self.lower_laplacian_matrix(rank=1).toarray()
u_g, eig_g = get_gradient_eigenpair(L1L, tolerance)
return u_g, eig_g
else:
raise ValueError(
"Invalid component. Choose from 'harmonic',"
+ "'curl', or 'gradient'."
)
[docs]
def get_total_variance(self) -> np.ndarray:
"""
Get the total variance of the SC.
Returns:
np.ndarray: The total variance of the SC.
"""
laplacian_matrix = self.laplacian_matrix()
return get_total_variance(laplacian_matrix)
[docs]
def get_divergence(self, flow: np.ndarray) -> np.ndarray:
"""
Get the divergence of the edge flow.
Args:
flow (np.ndarray): The edge flow defined over a SC.
Returns:
np.ndarray: The divergence of the edge flow.
"""
B1 = self.incidence_matrix(rank=1)
return get_divergence(B1, flow)
[docs]
def get_curl(self, flow: np.ndarray) -> np.ndarray:
"""
Get the curl of the edge flow.
Args:
flow (np.ndarray): The edge flow defined over a SC.
Returns:
np.ndarray: The curl of the edge flow.
"""
B2 = self.incidence_matrix(rank=2)
return get_curl(B2, flow)
[docs]
def get_component_flow(
self,
flow: np.ndarray,
component: str = FrequencyComponent.GRADIENT.value,
round_fig: bool = True,
round_sig_fig: int = 2,
) -> np.ndarray:
"""
Return the component flow of the simplicial complex
using the Hodge decomposition.
Args:
flow (np.ndarray): Flow on the simplicial complex.
component (str, optional): Component of the Hodge decomposition.
Defaults to FrequencyComponent.GRADIENT.value.
round_fig (bool, optional): Round the hodgedecomposition to the
Default to True.
round_sig_fig (int, optional): Round to significant figure.
Defaults to 2.
Returns:
np.ndarray: Hodge decomposition of the edge flow.
"""
B1 = self.incidence_matrix(rank=1)
B2 = self.incidence_matrix(rank=2)
if component == FrequencyComponent.HARMONIC.value:
f_h = get_harmonic_flow(
B1=B1,
B2=B2,
flow=flow,
round_fig=round_fig,
round_sig_fig=round_sig_fig,
)
return f_h
elif component == FrequencyComponent.CURL.value:
f_c = get_curl_flow(
B2=B2,
flow=flow,
round_fig=round_fig,
round_sig_fig=round_sig_fig,
)
return f_c
elif component == FrequencyComponent.GRADIENT.value:
f_g = get_gradient_flow(
B1=B1,
flow=flow,
round_fig=round_fig,
round_sig_fig=round_sig_fig,
)
return f_g
else:
raise ValueError(
"Invalid component. Choose from 'harmonic',"
+ "'curl', or 'gradient'."
)