A high-performance graph library for Python implemented in Rust, providing fast graph algorithms and data structures for computational tasks
—
Event-driven traversal algorithms using the visitor pattern for breadth-first search, depth-first search, and Dijkstra's algorithm. rustworkx provides customizable traversal with event handling for implementing complex graph algorithms.
BFS traversal with visitor pattern for event-driven algorithm implementation.
def bfs_search(graph, source, visitor):
"""
Breadth-first search traversal with visitor callbacks.
Explores graph level by level from source nodes, calling visitor
methods at key algorithm events for custom processing.
Parameters:
- graph: Input graph (PyGraph or PyDiGraph)
- source: List of source node indices or single node index
- visitor: BFSVisitor instance with event callback methods
Returns:
None (results communicated through visitor methods)
Raises:
StopSearch: When visitor raises this to terminate early
PruneSearch: When visitor raises this to skip subtrees
"""
def dfs_edges(graph, source = None):
"""
Get depth-first search tree edges.
Returns edges discovered during DFS traversal, forming
a spanning forest of the graph.
Parameters:
- graph: Input graph (PyGraph or PyDiGraph)
- source (int, optional): Starting node, if None searches all components
Returns:
EdgeList: List of tree edges as (source, target) tuples
"""DFS traversal with visitor pattern and timing information.
def dfs_search(graph, source, visitor):
"""
Depth-first search traversal with visitor callbacks.
Explores graph by going as deep as possible before backtracking,
providing discovery and finish times for each node.
Parameters:
- graph: Input graph (PyGraph or PyDiGraph)
- source: List of source node indices or single node index
- visitor: DFSVisitor instance with event callback methods
Returns:
None (results communicated through visitor methods)
Raises:
StopSearch: When visitor raises this to terminate early
PruneSearch: When visitor raises this to skip subtrees
"""Dijkstra's algorithm with visitor pattern for shortest path exploration.
def dijkstra_search(graph, source, weight_fn, visitor):
"""
Dijkstra shortest path search with visitor callbacks.
Explores graph in order of increasing distance from source,
calling visitor methods for path relaxation events.
Parameters:
- graph: Input graph (PyGraph or PyDiGraph)
- source: List of source node indices or single node index
- weight_fn (callable): Function to extract edge weights, or None for unit weights
- visitor: DijkstraVisitor instance with event callback methods
Returns:
None (results communicated through visitor methods)
Raises:
StopSearch: When visitor raises this to terminate early
PruneSearch: When visitor raises this to skip subtrees
"""Base class for breadth-first search event handling.
class BFSVisitor:
"""
Visitor base class for BFS algorithm events.
Override methods to implement custom BFS-based algorithms.
All methods are optional with default no-op implementations.
"""
def discover_vertex(self, vertex: int):
"""
Called when vertex is first discovered (colored gray).
Parameters:
- vertex (int): Node index being discovered
"""
pass
def examine_edge(self, edge: tuple):
"""
Called on every out-edge of each vertex after discovery.
Parameters:
- edge (tuple): Edge as (source, target, weight) tuple
"""
pass
def tree_edge(self, edge: tuple):
"""
Called on each edge in the BFS tree.
These are edges to previously undiscovered vertices.
Parameters:
- edge (tuple): Tree edge as (source, target, weight) tuple
"""
pass
def non_tree_edge(self, edge: tuple):
"""
Called on back or cross edges (non-tree edges).
Parameters:
- edge (tuple): Non-tree edge as (source, target, weight) tuple
"""
pass
def gray_target_edge(self, edge: tuple):
"""
Called on edges to gray (discovered but unfinished) vertices.
Parameters:
- edge (tuple): Edge to gray vertex as (source, target, weight) tuple
"""
pass
def black_target_edge(self, edge: tuple):
"""
Called on edges to black (finished) vertices.
Parameters:
- edge (tuple): Edge to black vertex as (source, target, weight) tuple
"""
pass
def finish_vertex(self, vertex: int):
"""
Called when vertex processing is complete (colored black).
Parameters:
- vertex (int): Node index being finished
"""
passBase class for depth-first search event handling with timing information.
class DFSVisitor:
"""
Visitor base class for DFS algorithm events.
Override methods to implement custom DFS-based algorithms.
Provides discovery and finish timing information.
"""
def discover_vertex(self, vertex: int, timestamp: int):
"""
Called when vertex is first discovered.
Parameters:
- vertex (int): Node index being discovered
- timestamp (int): Discovery time
"""
pass
def tree_edge(self, edge: tuple):
"""
Called on tree edges (to undiscovered vertices).
Parameters:
- edge (tuple): Tree edge as (source, target, weight) tuple
"""
pass
def back_edge(self, edge: tuple):
"""
Called on back edges (to ancestors in DFS tree).
In undirected graphs, these may indicate cycles.
Parameters:
- edge (tuple): Back edge as (source, target, weight) tuple
"""
pass
def forward_or_cross_edge(self, edge: tuple):
"""
Called on forward edges (to descendants) or cross edges.
Parameters:
- edge (tuple): Forward/cross edge as (source, target, weight) tuple
"""
pass
def finish_vertex(self, vertex: int, timestamp: int):
"""
Called when vertex processing is complete.
Parameters:
- vertex (int): Node index being finished
- timestamp (int): Finish time
"""
passBase class for Dijkstra algorithm event handling with distance information.
class DijkstraVisitor:
"""
Visitor base class for Dijkstra algorithm events.
Override methods to implement custom shortest path algorithms.
Provides distance information for path relaxation.
"""
def discover_vertex(self, vertex: int, cost: float):
"""
Called when vertex is first discovered with its distance.
Parameters:
- vertex (int): Node index being discovered
- cost (float): Shortest distance found to this vertex
"""
pass
def examine_edge(self, edge: tuple):
"""
Called on every out-edge of each vertex.
Parameters:
- edge (tuple): Edge being examined as (source, target, weight) tuple
"""
pass
def edge_relaxed(self, edge: tuple):
"""
Called when edge relaxation improves shortest path.
Parameters:
- edge (tuple): Relaxed edge as (source, target, weight) tuple
"""
pass
def edge_not_relaxed(self, edge: tuple):
"""
Called when edge relaxation does not improve path.
Parameters:
- edge (tuple): Non-relaxed edge as (source, target, weight) tuple
"""
pass
def finish_vertex(self, vertex: int):
"""
Called when vertex is finished (optimal distance found).
Parameters:
- vertex (int): Node index being finished
"""
passException for early termination of graph traversal.
class StopSearch(Exception):
"""
Raise this exception in visitor methods to stop traversal early.
The search function will return normally without re-raising
this exception, allowing clean early termination.
"""
passException for pruning search subtrees.
class PruneSearch(Exception):
"""
Raise this exception in visitor methods to prune search subtree.
Prevents further exploration of the current branch while
continuing search in other areas of the graph.
Note: Raising in finish_vertex events causes an error.
"""
passimport rustworkx as rx
from rustworkx.visit import BFSVisitor, StopSearch, PruneSearch
class PathFinder(BFSVisitor):
def __init__(self, target):
self.target = target
self.parent = {}
self.found = False
def tree_edge(self, edge):
source, target, _ = edge
self.parent[target] = source
if target == self.target:
self.found = True
raise rx.visit.StopSearch()
def get_path(self, start):
if not self.found:
return None
path = []
current = self.target
while current != start:
path.append(current)
current = self.parent[current]
path.append(start)
return list(reversed(path))
# Find path using BFS
graph = rx.generators.grid_graph(5, 5)
finder = PathFinder(target=24) # Bottom-right corner
rx.bfs_search(graph, [0], finder) # Start from top-left
if finder.found:
path = finder.get_path(0)
print(f"Path found: {path}")
else:
print("No path found")from rustworkx.visit import DFSVisitor
class CycleDetector(DFSVisitor):
def __init__(self):
self.has_cycle = False
self.cycle_edge = None
def back_edge(self, edge):
self.has_cycle = True
self.cycle_edge = edge
raise rx.visit.StopSearch()
# Detect cycles in directed graph
digraph = rx.PyDiGraph()
nodes = digraph.add_nodes_from(['A', 'B', 'C'])
digraph.add_edges_from([
(nodes[0], nodes[1], None),
(nodes[1], nodes[2], None),
(nodes[2], nodes[0], None) # Creates cycle
])
detector = CycleDetector()
rx.dfs_search(digraph, [nodes[0]], detector)
if detector.has_cycle:
print(f"Cycle detected on edge: {detector.cycle_edge}")
else:
print("No cycles found")from rustworkx.visit import DijkstraVisitor
class ShortestPathTree(DijkstraVisitor):
def __init__(self, start):
self.start = start
self.distances = {}
self.predecessors = {}
def discover_vertex(self, vertex, cost):
self.distances[vertex] = cost
def edge_relaxed(self, edge):
source, target, _ = edge
self.predecessors[target] = source
def get_path_to(self, target):
if target not in self.predecessors and target != self.start:
return None
path = []
current = target
while current != self.start:
path.append(current)
current = self.predecessors[current]
path.append(self.start)
return list(reversed(path))
# Build shortest path tree
weighted_graph = rx.PyGraph()
nodes = weighted_graph.add_nodes_from(['S', 'A', 'B', 'T'])
weighted_graph.add_edges_from([
(nodes[0], nodes[1], 2.0),
(nodes[0], nodes[2], 4.0),
(nodes[1], nodes[3], 3.0),
(nodes[2], nodes[3], 1.0)
])
spt = ShortestPathTree(nodes[0])
rx.dijkstra_search(
weighted_graph,
[nodes[0]],
weight_fn=lambda x: x,
visitor=spt
)
# Get paths to all reachable nodes
for node in nodes[1:]:
path = spt.get_path_to(node)
distance = spt.distances.get(node, float('inf'))
print(f"Path to {node}: {path}, distance: {distance}")class ComponentFinder(BFSVisitor):
def __init__(self):
self.current_component = []
self.components = []
def discover_vertex(self, vertex):
self.current_component.append(vertex)
def finish_component(self):
if self.current_component:
self.components.append(self.current_component.copy())
self.current_component.clear()
def find_components(graph):
finder = ComponentFinder()
visited = set()
for node in graph.node_indices():
if node not in visited:
# Start new component
rx.bfs_search(graph, [node], finder)
visited.update(finder.current_component)
finder.finish_component()
return finder.components
# Find connected components
disconnected_graph = rx.PyGraph()
# Component 1
comp1 = disconnected_graph.add_nodes_from(['A', 'B', 'C'])
disconnected_graph.add_edges_from([
(comp1[0], comp1[1], None),
(comp1[1], comp1[2], None)
])
# Component 2
comp2 = disconnected_graph.add_nodes_from(['X', 'Y'])
disconnected_graph.add_edges_from([
(comp2[0], comp2[1], None)
])
components = find_components(disconnected_graph)
print(f"Components found: {components}")class BipartiteTester(BFSVisitor):
def __init__(self):
self.colors = {}
self.is_bipartite = True
self.conflict_edge = None
def discover_vertex(self, vertex):
if vertex not in self.colors:
self.colors[vertex] = 0 # Start with color 0
def tree_edge(self, edge):
source, target, _ = edge
# Color target with opposite color of source
self.colors[target] = 1 - self.colors[source]
def non_tree_edge(self, edge):
source, target, _ = edge
# Check if both endpoints have same color
if self.colors[source] == self.colors[target]:
self.is_bipartite = False
self.conflict_edge = edge
raise rx.visit.StopSearch()
def test_bipartite(graph):
tester = BipartiteTester()
try:
rx.bfs_search(graph, [0], tester)
except rx.visit.StopSearch:
pass
return tester.is_bipartite, tester.colors if tester.is_bipartite else None
# Test bipartite property
test_graph = rx.generators.cycle_graph(6) # Even cycle is bipartite
is_bip, coloring = test_bipartite(test_graph)
print(f"Is bipartite: {is_bip}")
if is_bip:
print(f"Coloring: {coloring}")class RestrictedBFS(BFSVisitor):
def __init__(self, forbidden_nodes):
self.forbidden = set(forbidden_nodes)
self.visited = set()
def discover_vertex(self, vertex):
self.visited.add(vertex)
def examine_edge(self, edge):
source, target, _ = edge
if target in self.forbidden:
raise rx.visit.PruneSearch() # Don't explore this branch
# BFS avoiding certain nodes
graph = rx.generators.grid_graph(4, 4)
forbidden = [5, 6, 9, 10] # Block some central nodes
restricted_bfs = RestrictedBFS(forbidden)
rx.bfs_search(graph, [0], restricted_bfs)
print(f"Visited nodes: {sorted(restricted_bfs.visited)}")
print(f"Avoided nodes: {forbidden}")Install with Tessl CLI
npx tessl i tessl/pypi-rustworkx