High performance graph data structures and algorithms for Python
—
Specialized classes for representing and manipulating clustering results from community detection algorithms. These objects provide comprehensive functionality for analyzing, comparing, and visualizing community structures.
Core functionality for working with any type of clustering or partition of ordered sets.
class Clustering:
def __init__(self, membership, n=None):
"""
Create clustering from membership vector.
Parameters:
- membership: list of int, cluster assignment for each element
- n: int, total number of elements (None for auto-detection)
"""
def __len__(self):
"""
Get number of clusters.
Returns:
int, cluster count
"""
def __getitem__(self, idx):
"""
Get members of specific cluster.
Parameters:
- idx: int, cluster index
Returns:
list of int, member indices in cluster
"""
def size(self, idx):
"""
Get size of specific cluster.
Parameters:
- idx: int, cluster index
Returns:
int, number of elements in cluster
"""
def sizes(self):
"""
Get sizes of all clusters.
Returns:
list of int, cluster sizes
"""
def summary(self, n=None):
"""
Get summary statistics of clustering.
Parameters:
- n: int, number of largest clusters to show
Returns:
str, formatted summary
"""Usage Examples:
import igraph as ig
# Create clustering from membership vector
membership = [0, 0, 0, 1, 1, 2, 2, 2, 2]
clustering = ig.Clustering(membership)
# Basic operations
print(f"Number of clusters: {len(clustering)}")
print(f"Cluster 0 members: {clustering[0]}")
print(f"Cluster sizes: {clustering.sizes()}")
print(f"Largest cluster size: {clustering.size(clustering.sizes().index(max(clustering.sizes())))}")
# Summary information
print(clustering.summary())
# Iterate over clusters
for i, cluster in enumerate(clustering):
print(f"Cluster {i}: {cluster}")Graph-specific clustering with enhanced functionality for network community analysis.
class VertexClustering(Clustering):
def __init__(self, graph, membership=None):
"""
Create vertex clustering for graph.
Parameters:
- graph: Graph object
- membership: list, community assignment (None for singleton clusters)
"""
@classmethod
def FromAttribute(cls, graph, attribute):
"""
Create clustering from vertex attribute.
Parameters:
- graph: Graph object
- attribute: str, vertex attribute name containing cluster IDs
Returns:
VertexClustering based on attribute values
"""
@property
def graph(self):
"""Get associated graph object."""
@property
def membership(self):
"""Get membership vector."""
def modularity(self, weights=None):
"""
Calculate modularity of clustering.
Parameters:
- weights: str/list, edge weights or attribute name
Returns:
float, modularity score (-0.5 to 1.0)
"""
def crossing(self):
"""
Identify edges crossing between communities.
Returns:
list of bool, True for edges crossing communities
"""
def subgraph(self, idx):
"""
Get induced subgraph for specific community.
Parameters:
- idx: int, community index
Returns:
Graph, subgraph containing only community members
"""
def subgraphs(self):
"""
Get all community subgraphs.
Returns:
list of Graph, subgraphs for each community
"""
def giant(self):
"""
Get largest community.
Returns:
list of int, vertices in largest community
"""Usage Examples:
# Create sample graph with communities
g = ig.Graph.Famous("Zachary") # Karate club network
# Community detection
leiden_comm = g.community_leiden()
print(f"Type: {type(leiden_comm)}") # VertexClustering
# Modularity analysis
modularity = leiden_comm.modularity()
print(f"Modularity: {modularity:.3f}")
# Weighted modularity (if edges have weights)
if "weight" in g.es.attributes():
weighted_mod = leiden_comm.modularity(weights="weight")
print(f"Weighted modularity: {weighted_mod:.3f}")
# Edge crossing analysis
crossing_edges = leiden_comm.crossing()
n_crossing = sum(crossing_edges)
print(f"Edges crossing communities: {n_crossing}/{g.ecount()}")
# Community subgraphs
subgraphs = leiden_comm.subgraphs()
for i, sg in enumerate(subgraphs):
print(f"Community {i}: {sg.vcount()} vertices, {sg.ecount()} edges")
# Largest community
giant_community = leiden_comm.giant()
print(f"Largest community has {len(giant_community)} vertices")
# Create clustering from vertex attribute
g.vs["type"] = ["A", "A", "B", "B", "B", "C", "C"] # Example attributes
attr_clustering = ig.VertexClustering.FromAttribute(g, "type")
print(f"Attribute-based clustering: {attr_clustering.membership}")Tree-like structures representing hierarchical community organization.
class Dendrogram:
def __init__(self, merges, n=None, nmerges=None):
"""
Create dendrogram from merge information.
Parameters:
- merges: list of tuples, merge steps (cluster1, cluster2)
- n: int, number of leaves
- nmerges: int, number of merges
"""
def format(self, format="newick"):
"""
Export dendrogram in standard format.
Parameters:
- format: str, output format ("newick" supported)
Returns:
str, formatted dendrogram
"""
def summary(self):
"""
Get ASCII art summary of dendrogram.
Returns:
str, tree visualization
"""
class VertexDendrogram(Dendrogram):
def __init__(self, graph, merges, modularity=None):
"""
Create vertex dendrogram for graph.
Parameters:
- graph: Graph object
- merges: merge steps
- modularity: list, modularity at each merge level
"""
def as_clustering(self, n=None):
"""
Convert dendrogram to flat clustering.
Parameters:
- n: int, number of clusters (None for optimal)
Returns:
VertexClustering at specified level
"""
@property
def optimal_count(self):
"""
Get optimal number of clusters (highest modularity).
Returns:
int, optimal cluster count
"""Usage Examples:
# Hierarchical community detection
walktrap_dendro = g.community_walktrap()
print(f"Type: {type(walktrap_dendro)}") # VertexDendrogram
# Optimal clustering
optimal_n = walktrap_dendro.optimal_count
print(f"Optimal number of clusters: {optimal_n}")
optimal_clustering = walktrap_dendro.as_clustering()
print(f"Optimal modularity: {optimal_clustering.modularity():.3f}")
# Explore different cut levels
for n_clusters in [2, 3, 4, 5]:
clustering = walktrap_dendro.as_clustering(n_clusters)
print(f"{n_clusters} clusters: modularity {clustering.modularity():.3f}")
# Export formats
newick_str = walktrap_dendro.format("newick")
print("Newick format:", newick_str[:100], "...")
# ASCII visualization
print(walktrap_dendro.summary())
# Fast greedy also returns dendrogram
fastgreedy_dendro = g.community_fastgreedy()
fg_optimal = fastgreedy_dendro.as_clustering()Handle overlapping communities where vertices can belong to multiple clusters.
class Cover:
def __init__(self, n=0):
"""
Create empty cover structure.
Parameters:
- n: int, number of elements
"""
def add_cluster(self, cluster):
"""
Add cluster to cover.
Parameters:
- cluster: list, members of new cluster
Returns:
None
"""
def size(self, idx):
"""
Get size of specific cluster.
Parameters:
- idx: int, cluster index
Returns:
int, cluster size
"""
class VertexCover(Cover):
def __init__(self, graph, clusters=None):
"""
Create vertex cover for graph.
Parameters:
- graph: Graph object
- clusters: list of lists, cluster memberships
"""
@property
def graph(self):
"""Get associated graph."""
def subgraph(self, idx):
"""
Get subgraph for cluster (may include overlaps).
Parameters:
- idx: int, cluster index
Returns:
Graph, cluster subgraph
"""Usage Examples:
# Create overlapping communities manually
vertices = list(range(g.vcount()))
overlapping_clusters = [
[0, 1, 2, 5], # Cluster 0
[2, 3, 4, 5], # Cluster 1 (vertices 2,5 overlap)
[4, 6, 7, 8] # Cluster 2 (vertex 4 overlaps)
]
cover = ig.VertexCover(g, overlapping_clusters)
# Analyze overlaps
for i in range(len(overlapping_clusters)):
subgraph = cover.subgraph(i)
print(f"Cluster {i}: {cover.size(i)} vertices, {subgraph.ecount()} edges")
# Find overlapping vertices
all_vertices = set()
overlapping = set()
for cluster in overlapping_clusters:
for v in cluster:
if v in all_vertices:
overlapping.add(v)
all_vertices.add(v)
print(f"Overlapping vertices: {overlapping}")Specialized clustering for cohesive block analysis (k-core based communities).
class CohesiveBlocks:
def __init__(self, graph, blocks=None, cohesion=None, parent=None):
"""
Create cohesive blocks structure.
Parameters:
- graph: Graph object
- blocks: list, block membership
- cohesion: list, cohesion values for blocks
- parent: list, parent blocks in hierarchy
"""
@property
def graph(self):
"""Get associated graph."""
def cohesion(self, idx=None):
"""
Get cohesion values.
Parameters:
- idx: int, specific block index (None for all)
Returns:
int/list, cohesion value(s)
"""
def hierarchy(self):
"""
Get block hierarchy information.
Returns:
Tree structure of cohesive blocks
"""Usage Examples:
# Cohesive blocks analysis
cohesive = g.cohesive_blocks()
print(f"Type: {type(cohesive)}") # CohesiveBlocks
# Block information
n_blocks = len(cohesive)
print(f"Number of cohesive blocks: {n_blocks}")
# Cohesion levels
cohesions = cohesive.cohesion()
print(f"Cohesion levels: {cohesions}")
# Hierarchy structure
hierarchy = cohesive.hierarchy()
print("Block hierarchy available")Compare different clustering results to understand algorithmic differences.
def compare_communities(comm1, comm2, method="vi", remove_none=False):
"""
Compare two clusterings.
Parameters:
- comm1, comm2: Clustering objects
- method: str, comparison method
("vi" - variation of information, "nmi" - normalized mutual information,
"split-join", "rand", "adjusted_rand")
- remove_none: bool, ignore unassigned elements
Returns:
float, similarity/distance measure
"""
def split_join_distance(comm1, comm2):
"""
Calculate split-join distance.
Parameters:
- comm1, comm2: Clustering objects
Returns:
int, split-join distance
"""Usage Examples:
from igraph import compare_communities, split_join_distance
# Generate different clusterings
leiden_comm = g.community_leiden(resolution=1.0)
louvain_comm = g.community_louvain()
walktrap_comm = g.community_walktrap().as_clustering()
infomap_comm = g.community_infomap()
clusterings = [
("Leiden", leiden_comm),
("Louvain", louvain_comm),
("Walktrap", walktrap_comm),
("Infomap", infomap_comm)
]
# Compare all pairs
print("Variation of Information (lower = more similar):")
for i, (name1, comm1) in enumerate(clusterings):
for j, (name2, comm2) in enumerate(clusterings):
if i < j: # Avoid duplicate comparisons
vi = compare_communities(comm1, comm2, method="vi")
nmi = compare_communities(comm1, comm2, method="nmi")
sj = split_join_distance(comm1, comm2)
print(f"{name1} vs {name2}: VI={vi:.3f}, NMI={nmi:.3f}, SJ={sj}")
# Consensus clustering
def consensus_clustering(clusterings, threshold=0.5):
"""Create consensus from multiple clusterings."""
n = len(clusterings[0].membership)
consensus_matrix = [[0 for _ in range(n)] for _ in range(n)]
# Count co-occurrences
for clustering in clusterings:
for i in range(n):
for j in range(n):
if clustering.membership[i] == clustering.membership[j]:
consensus_matrix[i][j] += 1
# Normalize
for i in range(n):
for j in range(n):
consensus_matrix[i][j] /= len(clusterings)
return consensus_matrix
# Create consensus
all_comms = [comm for _, comm in clusterings]
consensus = consensus_clustering(all_comms)
print("Consensus matrix computed")Assess clustering quality using various internal and external measures.
Usage Examples:
def evaluate_clustering(graph, clustering):
"""Comprehensive clustering evaluation."""
results = {}
# Basic statistics
results["n_clusters"] = len(clustering)
results["sizes"] = clustering.sizes()
results["modularity"] = clustering.modularity()
# Size distribution
sizes = clustering.sizes()
results["size_mean"] = sum(sizes) / len(sizes)
results["size_std"] = (sum((s - results["size_mean"])**2 for s in sizes) / len(sizes))**0.5
results["size_max"] = max(sizes)
results["size_min"] = min(sizes)
# Coverage and crossing edges
crossing = clustering.crossing()
results["coverage"] = 1 - sum(crossing) / len(crossing)
results["internal_edges"] = len(crossing) - sum(crossing)
results["external_edges"] = sum(crossing)
# Conductance (for each cluster)
conductances = []
for i in range(len(clustering)):
cluster_vertices = clustering[i]
if len(cluster_vertices) > 0:
subgraph = clustering.subgraph(i)
internal_degree = sum(subgraph.degree())
# External degree
external_degree = 0
for v in cluster_vertices:
for neighbor in graph.neighbors(v):
if neighbor not in cluster_vertices:
external_degree += 1
total_degree = internal_degree + external_degree
conductance = external_degree / total_degree if total_degree > 0 else 0
conductances.append(conductance)
results["avg_conductance"] = sum(conductances) / len(conductances) if conductances else 0
return results
# Evaluate different algorithms
for name, comm in clusterings:
eval_results = evaluate_clustering(g, comm)
print(f"\\n{name}:")
print(f" Clusters: {eval_results['n_clusters']}")
print(f" Modularity: {eval_results['modularity']:.3f}")
print(f" Coverage: {eval_results['coverage']:.3f}")
print(f" Avg conductance: {eval_results['avg_conductance']:.3f}")
print(f" Size range: {eval_results['size_min']}-{eval_results['size_max']}")
# Stability analysis
def clustering_stability(graph, algorithm_func, n_trials=10):
"""Assess clustering stability across multiple runs."""
clusterings = []
for _ in range(n_trials):
clustering = algorithm_func()
clusterings.append(clustering)
# Compare all pairs
similarities = []
for i in range(len(clusterings)):
for j in range(i+1, len(clusterings)):
nmi = compare_communities(clusterings[i], clusterings[j], method="nmi")
similarities.append(nmi)
avg_similarity = sum(similarities) / len(similarities)
return avg_similarity, similarities
# Test stability
leiden_stability = clustering_stability(g, lambda: g.community_leiden())
print(f"\\nLeiden stability (NMI): {leiden_stability[0]:.3f}")Install with Tessl CLI
npx tessl i tessl/pypi-igraph