CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-rustworkx

A high-performance graph library for Python implemented in Rust, providing fast graph algorithms and data structures for computational tasks

Pending
Overview
Eval results
Files

traversal.mddocs/

Graph Traversal

Event-driven traversal algorithms using the visitor pattern for breadth-first search, depth-first search, and Dijkstra's algorithm. rustworkx provides customizable traversal with event handling for implementing complex graph algorithms.

Capabilities

Breadth-First Search

BFS traversal with visitor pattern for event-driven algorithm implementation.

def bfs_search(graph, source, visitor):
    """
    Breadth-first search traversal with visitor callbacks.
    
    Explores graph level by level from source nodes, calling visitor
    methods at key algorithm events for custom processing.
    
    Parameters:
    - graph: Input graph (PyGraph or PyDiGraph)
    - source: List of source node indices or single node index
    - visitor: BFSVisitor instance with event callback methods
    
    Returns:
    None (results communicated through visitor methods)
    
    Raises:
    StopSearch: When visitor raises this to terminate early
    PruneSearch: When visitor raises this to skip subtrees
    """

def dfs_edges(graph, source = None):
    """
    Get depth-first search tree edges.
    
    Returns edges discovered during DFS traversal, forming
    a spanning forest of the graph.
    
    Parameters:
    - graph: Input graph (PyGraph or PyDiGraph)
    - source (int, optional): Starting node, if None searches all components
    
    Returns:
    EdgeList: List of tree edges as (source, target) tuples
    """

Depth-First Search

DFS traversal with visitor pattern and timing information.

def dfs_search(graph, source, visitor):
    """
    Depth-first search traversal with visitor callbacks.
    
    Explores graph by going as deep as possible before backtracking,
    providing discovery and finish times for each node.
    
    Parameters:
    - graph: Input graph (PyGraph or PyDiGraph)
    - source: List of source node indices or single node index
    - visitor: DFSVisitor instance with event callback methods
    
    Returns:
    None (results communicated through visitor methods)
    
    Raises:
    StopSearch: When visitor raises this to terminate early
    PruneSearch: When visitor raises this to skip subtrees
    """

Dijkstra Search

Dijkstra's algorithm with visitor pattern for shortest path exploration.

def dijkstra_search(graph, source, weight_fn, visitor):
    """
    Dijkstra shortest path search with visitor callbacks.
    
    Explores graph in order of increasing distance from source,
    calling visitor methods for path relaxation events.
    
    Parameters:
    - graph: Input graph (PyGraph or PyDiGraph)
    - source: List of source node indices or single node index
    - weight_fn (callable): Function to extract edge weights, or None for unit weights
    - visitor: DijkstraVisitor instance with event callback methods
    
    Returns:
    None (results communicated through visitor methods)
    
    Raises:
    StopSearch: When visitor raises this to terminate early
    PruneSearch: When visitor raises this to skip subtrees
    """

Visitor Classes

BFSVisitor

Base class for breadth-first search event handling.

class BFSVisitor:
    """
    Visitor base class for BFS algorithm events.
    
    Override methods to implement custom BFS-based algorithms.
    All methods are optional with default no-op implementations.
    """
    
    def discover_vertex(self, vertex: int):
        """
        Called when vertex is first discovered (colored gray).
        
        Parameters:
        - vertex (int): Node index being discovered
        """
        pass
    
    def examine_edge(self, edge: tuple):
        """
        Called on every out-edge of each vertex after discovery.
        
        Parameters:
        - edge (tuple): Edge as (source, target, weight) tuple
        """
        pass
    
    def tree_edge(self, edge: tuple):
        """
        Called on each edge in the BFS tree.
        
        These are edges to previously undiscovered vertices.
        
        Parameters:
        - edge (tuple): Tree edge as (source, target, weight) tuple
        """
        pass
    
    def non_tree_edge(self, edge: tuple):
        """
        Called on back or cross edges (non-tree edges).
        
        Parameters:
        - edge (tuple): Non-tree edge as (source, target, weight) tuple
        """
        pass
    
    def gray_target_edge(self, edge: tuple):
        """
        Called on edges to gray (discovered but unfinished) vertices.
        
        Parameters:
        - edge (tuple): Edge to gray vertex as (source, target, weight) tuple
        """
        pass
    
    def black_target_edge(self, edge: tuple):
        """
        Called on edges to black (finished) vertices.
        
        Parameters:
        - edge (tuple): Edge to black vertex as (source, target, weight) tuple
        """
        pass
    
    def finish_vertex(self, vertex: int):
        """
        Called when vertex processing is complete (colored black).
        
        Parameters:
        - vertex (int): Node index being finished
        """
        pass

DFSVisitor

Base class for depth-first search event handling with timing information.

class DFSVisitor:
    """
    Visitor base class for DFS algorithm events.
    
    Override methods to implement custom DFS-based algorithms.
    Provides discovery and finish timing information.
    """
    
    def discover_vertex(self, vertex: int, timestamp: int):
        """
        Called when vertex is first discovered.
        
        Parameters:
        - vertex (int): Node index being discovered
        - timestamp (int): Discovery time
        """
        pass
    
    def tree_edge(self, edge: tuple):
        """
        Called on tree edges (to undiscovered vertices).
        
        Parameters:
        - edge (tuple): Tree edge as (source, target, weight) tuple
        """
        pass
    
    def back_edge(self, edge: tuple):
        """
        Called on back edges (to ancestors in DFS tree).
        
        In undirected graphs, these may indicate cycles.
        
        Parameters:
        - edge (tuple): Back edge as (source, target, weight) tuple
        """
        pass
    
    def forward_or_cross_edge(self, edge: tuple):
        """
        Called on forward edges (to descendants) or cross edges.
        
        Parameters:
        - edge (tuple): Forward/cross edge as (source, target, weight) tuple
        """
        pass
    
    def finish_vertex(self, vertex: int, timestamp: int):
        """
        Called when vertex processing is complete.
        
        Parameters:
        - vertex (int): Node index being finished
        - timestamp (int): Finish time
        """
        pass

DijkstraVisitor

Base class for Dijkstra algorithm event handling with distance information.

class DijkstraVisitor:
    """
    Visitor base class for Dijkstra algorithm events.
    
    Override methods to implement custom shortest path algorithms.
    Provides distance information for path relaxation.
    """
    
    def discover_vertex(self, vertex: int, cost: float):
        """
        Called when vertex is first discovered with its distance.
        
        Parameters:
        - vertex (int): Node index being discovered
        - cost (float): Shortest distance found to this vertex
        """
        pass
    
    def examine_edge(self, edge: tuple):
        """
        Called on every out-edge of each vertex.
        
        Parameters:
        - edge (tuple): Edge being examined as (source, target, weight) tuple
        """
        pass
    
    def edge_relaxed(self, edge: tuple):
        """
        Called when edge relaxation improves shortest path.
        
        Parameters:
        - edge (tuple): Relaxed edge as (source, target, weight) tuple
        """
        pass
    
    def edge_not_relaxed(self, edge: tuple):
        """
        Called when edge relaxation does not improve path.
        
        Parameters:
        - edge (tuple): Non-relaxed edge as (source, target, weight) tuple
        """
        pass
    
    def finish_vertex(self, vertex: int):
        """
        Called when vertex is finished (optimal distance found).
        
        Parameters:
        - vertex (int): Node index being finished
        """
        pass

Exception Classes

StopSearch

Exception for early termination of graph traversal.

class StopSearch(Exception):
    """
    Raise this exception in visitor methods to stop traversal early.
    
    The search function will return normally without re-raising
    this exception, allowing clean early termination.
    """
    pass

PruneSearch

Exception for pruning search subtrees.

class PruneSearch(Exception):
    """
    Raise this exception in visitor methods to prune search subtree.
    
    Prevents further exploration of the current branch while
    continuing search in other areas of the graph.
    
    Note: Raising in finish_vertex events causes an error.
    """
    pass

Usage Examples

Basic BFS Implementation

import rustworkx as rx
from rustworkx.visit import BFSVisitor, StopSearch, PruneSearch

class PathFinder(BFSVisitor):
    def __init__(self, target):
        self.target = target
        self.parent = {}
        self.found = False
    
    def tree_edge(self, edge):
        source, target, _ = edge
        self.parent[target] = source
        if target == self.target:
            self.found = True
            raise rx.visit.StopSearch()
    
    def get_path(self, start):
        if not self.found:
            return None
        path = []
        current = self.target
        while current != start:
            path.append(current)
            current = self.parent[current]
        path.append(start)
        return list(reversed(path))

# Find path using BFS
graph = rx.generators.grid_graph(5, 5)
finder = PathFinder(target=24)  # Bottom-right corner
rx.bfs_search(graph, [0], finder)  # Start from top-left

if finder.found:
    path = finder.get_path(0)
    print(f"Path found: {path}")
else:
    print("No path found")

DFS-Based Cycle Detection

from rustworkx.visit import DFSVisitor

class CycleDetector(DFSVisitor):
    def __init__(self):
        self.has_cycle = False
        self.cycle_edge = None
    
    def back_edge(self, edge):
        self.has_cycle = True
        self.cycle_edge = edge
        raise rx.visit.StopSearch()

# Detect cycles in directed graph
digraph = rx.PyDiGraph()
nodes = digraph.add_nodes_from(['A', 'B', 'C'])
digraph.add_edges_from([
    (nodes[0], nodes[1], None),
    (nodes[1], nodes[2], None),
    (nodes[2], nodes[0], None)  # Creates cycle
])

detector = CycleDetector()
rx.dfs_search(digraph, [nodes[0]], detector)

if detector.has_cycle:
    print(f"Cycle detected on edge: {detector.cycle_edge}")
else:
    print("No cycles found")

Dijkstra-Based Shortest Path Tree

from rustworkx.visit import DijkstraVisitor

class ShortestPathTree(DijkstraVisitor):
    def __init__(self, start):
        self.start = start
        self.distances = {}
        self.predecessors = {}
    
    def discover_vertex(self, vertex, cost):
        self.distances[vertex] = cost
    
    def edge_relaxed(self, edge):
        source, target, _ = edge
        self.predecessors[target] = source
    
    def get_path_to(self, target):
        if target not in self.predecessors and target != self.start:
            return None
        
        path = []
        current = target
        while current != self.start:
            path.append(current)
            current = self.predecessors[current]
        path.append(self.start)
        return list(reversed(path))

# Build shortest path tree
weighted_graph = rx.PyGraph()
nodes = weighted_graph.add_nodes_from(['S', 'A', 'B', 'T'])
weighted_graph.add_edges_from([
    (nodes[0], nodes[1], 2.0),
    (nodes[0], nodes[2], 4.0),
    (nodes[1], nodes[3], 3.0),
    (nodes[2], nodes[3], 1.0)
])

spt = ShortestPathTree(nodes[0])
rx.dijkstra_search(
    weighted_graph, 
    [nodes[0]], 
    weight_fn=lambda x: x,
    visitor=spt
)

# Get paths to all reachable nodes
for node in nodes[1:]:
    path = spt.get_path_to(node)
    distance = spt.distances.get(node, float('inf'))
    print(f"Path to {node}: {path}, distance: {distance}")

Connected Components Using BFS

class ComponentFinder(BFSVisitor):
    def __init__(self):
        self.current_component = []
        self.components = []
    
    def discover_vertex(self, vertex):
        self.current_component.append(vertex)
    
    def finish_component(self):
        if self.current_component:
            self.components.append(self.current_component.copy())
            self.current_component.clear()

def find_components(graph):
    finder = ComponentFinder()
    visited = set()
    
    for node in graph.node_indices():
        if node not in visited:
            # Start new component
            rx.bfs_search(graph, [node], finder)
            visited.update(finder.current_component)
            finder.finish_component()
    
    return finder.components

# Find connected components
disconnected_graph = rx.PyGraph()
# Component 1
comp1 = disconnected_graph.add_nodes_from(['A', 'B', 'C'])
disconnected_graph.add_edges_from([
    (comp1[0], comp1[1], None),
    (comp1[1], comp1[2], None)
])
# Component 2  
comp2 = disconnected_graph.add_nodes_from(['X', 'Y'])
disconnected_graph.add_edges_from([
    (comp2[0], comp2[1], None)
])

components = find_components(disconnected_graph)
print(f"Components found: {components}")

Advanced: Bipartite Testing with BFS

class BipartiteTester(BFSVisitor):
    def __init__(self):
        self.colors = {}
        self.is_bipartite = True
        self.conflict_edge = None
    
    def discover_vertex(self, vertex):
        if vertex not in self.colors:
            self.colors[vertex] = 0  # Start with color 0
    
    def tree_edge(self, edge):
        source, target, _ = edge
        # Color target with opposite color of source
        self.colors[target] = 1 - self.colors[source]
    
    def non_tree_edge(self, edge):
        source, target, _ = edge
        # Check if both endpoints have same color
        if self.colors[source] == self.colors[target]:
            self.is_bipartite = False
            self.conflict_edge = edge
            raise rx.visit.StopSearch()

def test_bipartite(graph):
    tester = BipartiteTester()
    try:
        rx.bfs_search(graph, [0], tester)
    except rx.visit.StopSearch:
        pass
    
    return tester.is_bipartite, tester.colors if tester.is_bipartite else None

# Test bipartite property
test_graph = rx.generators.cycle_graph(6)  # Even cycle is bipartite
is_bip, coloring = test_bipartite(test_graph)
print(f"Is bipartite: {is_bip}")
if is_bip:
    print(f"Coloring: {coloring}")

Search Pruning Example

class RestrictedBFS(BFSVisitor):
    def __init__(self, forbidden_nodes):
        self.forbidden = set(forbidden_nodes)
        self.visited = set()
    
    def discover_vertex(self, vertex):
        self.visited.add(vertex)
    
    def examine_edge(self, edge):
        source, target, _ = edge
        if target in self.forbidden:
            raise rx.visit.PruneSearch()  # Don't explore this branch

# BFS avoiding certain nodes
graph = rx.generators.grid_graph(4, 4)
forbidden = [5, 6, 9, 10]  # Block some central nodes

restricted_bfs = RestrictedBFS(forbidden)
rx.bfs_search(graph, [0], restricted_bfs)

print(f"Visited nodes: {sorted(restricted_bfs.visited)}")
print(f"Avoided nodes: {forbidden}")

Install with Tessl CLI

npx tessl i tessl/pypi-rustworkx

docs

advanced-algorithms.md

analysis.md

centrality.md

core-classes.md

generators.md

index.md

layouts.md

shortest-paths.md

traversal.md

visualization.md

tile.json