A high-performance graph library for Python implemented in Rust, providing fast graph algorithms and data structures for computational tasks
—
Comprehensive collection of shortest path algorithms for finding optimal paths between nodes in weighted and unweighted graphs. rustworkx provides implementations of major shortest path algorithms optimized for performance with parallel processing support.
Single-source shortest path algorithm for graphs with non-negative edge weights. Supports both single-target and all-targets modes with optional weight functions.
def dijkstra_shortest_paths(graph, source: int, target: int = None, weight_fn = None, default_weight: float = 1.0, as_undirected: bool = False) -> dict:
"""
Find shortest paths using Dijkstra's algorithm.
Parameters:
- graph: PyGraph or PyDiGraph to search
- source (int): Source node index
- target (int, optional): Target node index, if None finds paths to all nodes
- weight_fn (callable, optional): Function to extract edge weight
- default_weight (float): Default edge weight if weight_fn not provided
- as_undirected (bool): Treat directed graph as undirected
Returns:
dict: Mapping of target nodes to path lists
"""
def dijkstra_shortest_path_lengths(graph, node: int, edge_cost_fn, goal: int = None) -> dict:
"""
Compute shortest path lengths using Dijkstra's algorithm.
Parameters:
- graph: Input graph
- node (int): Source node index
- edge_cost_fn (callable): Function returning edge cost as float
- goal (int, optional): Target node, returns single entry if specified
Returns:
dict: Mapping of target nodes to path lengths
"""
def all_pairs_dijkstra_shortest_paths(graph, edge_cost_fn):
"""
All-pairs shortest paths using Dijkstra's algorithm.
Multithreaded execution for improved performance.
Parameters:
- graph: Input graph
- edge_cost_fn (callable): Function returning edge cost as float
Returns:
AllPairsPathMapping: Nested mapping of source -> target -> path
"""
def all_pairs_dijkstra_path_lengths(graph, edge_cost_fn):
"""
All-pairs shortest path lengths using Dijkstra's algorithm.
Parameters:
- graph: Input graph
- edge_cost_fn (callable): Function returning edge cost as float
Returns:
AllPairsPathLengthMapping: Nested mapping of source -> target -> length
"""Single-source shortest path algorithm supporting negative edge weights. Detects negative cycles and provides comprehensive error handling.
def bellman_ford_shortest_paths(graph, source: int, target: int = None, weight_fn = None, default_weight: float = 1.0, as_undirected: bool = False):
"""
Find shortest paths using Bellman-Ford algorithm.
Supports negative edge weights and detects negative cycles.
Parameters:
- graph: PyGraph or PyDiGraph to search
- source (int): Source node index
- target (int, optional): Target node index
- weight_fn (callable, optional): Function to extract edge weight
- default_weight (float): Default edge weight
- as_undirected (bool): Treat directed graph as undirected
Returns:
PathMapping: Mapping of target nodes to paths
Raises:
NegativeCycle: When negative cycle is detected
"""
def bellman_ford_shortest_path_lengths(graph, node: int, edge_cost_fn, goal: int = None):
"""
Compute shortest path lengths using Bellman-Ford algorithm.
Parameters:
- graph: Input graph
- node (int): Source node index
- edge_cost_fn (callable): Function returning edge cost (can be negative)
- goal (int, optional): Target node
Returns:
PathLengthMapping: Mapping of target nodes to path lengths
Raises:
NegativeCycle: When negative cycle is detected
"""
def all_pairs_bellman_ford_shortest_paths(graph, edge_cost_fn):
"""
All-pairs shortest paths using Bellman-Ford algorithm.
Parameters:
- graph: Input graph
- edge_cost_fn (callable): Function returning edge cost
Returns:
AllPairsPathMapping: Nested mapping of source -> target -> path
Raises:
NegativeCycle: When negative cycle is detected
"""
def all_pairs_bellman_ford_path_lengths(graph, edge_cost_fn):
"""
All-pairs shortest path lengths using Bellman-Ford algorithm.
Parameters:
- graph: Input graph
- edge_cost_fn (callable): Function returning edge cost
Returns:
AllPairsPathLengthMapping: Nested mapping of source -> target -> length
Raises:
NegativeCycle: When negative cycle is detected
"""All-pairs shortest path algorithm particularly effective for dense graphs. Supports parallel execution and multiple output formats.
def floyd_warshall(graph, weight_fn = None, default_weight: float = 1.0, parallel_threshold: int = 300):
"""
All-pairs shortest paths using Floyd-Warshall algorithm.
Parameters:
- graph: Input graph
- weight_fn (callable, optional): Function to extract edge weight
- default_weight (float): Default edge weight
- parallel_threshold (int): Node count threshold for parallel execution
Returns:
AllPairsPathLengthMapping: Nested mapping of path lengths
"""
def floyd_warshall_numpy(graph, weight_fn = None, default_weight: float = 1.0, parallel_threshold: int = 300):
"""
Floyd-Warshall algorithm returning NumPy distance matrix.
Parameters:
- graph: Input graph
- weight_fn (callable, optional): Function to extract edge weight
- default_weight (float): Default edge weight
- parallel_threshold (int): Node count threshold for parallel execution
Returns:
numpy.ndarray: Distance matrix with np.inf for unreachable pairs
"""
def floyd_warshall_successor_and_distance(graph, weight_fn = None, default_weight: float = 1.0, parallel_threshold: int = 300):
"""
Floyd-Warshall returning both distance and successor matrices.
Parameters:
- graph: Input graph (PyDiGraph only)
- weight_fn (callable, optional): Function to extract edge weight
- default_weight (float): Default edge weight
- parallel_threshold (int): Node count threshold for parallel execution
Returns:
Tuple[numpy.ndarray, numpy.ndarray]: (distance_matrix, successor_matrix)
"""Heuristic shortest path algorithm using estimated costs to guide search toward target node. Optimal when heuristic is admissible.
def astar_shortest_path(graph, node: int, goal_fn, edge_cost_fn, estimate_cost_fn):
"""
A* shortest path algorithm with heuristic guidance.
Parameters:
- graph: Input graph
- node (int): Starting node index
- goal_fn (callable): Function returning True for goal nodes
- edge_cost_fn (callable): Function returning non-negative edge cost
- estimate_cost_fn (callable): Heuristic function returning estimated cost to goal
Returns:
NodeIndices: List of node indices forming shortest path
"""Additional path-finding utilities for specific use cases and path enumeration.
def all_simple_paths(graph, from_: int, to, min_depth: int = None, cutoff: int = None) -> list:
"""
Find all simple paths between nodes (no repeated nodes).
Parameters:
- graph: Input graph
- from_ (int): Source node index
- to (int or list): Target node(s)
- min_depth (int, optional): Minimum path length
- cutoff (int, optional): Maximum path length
Returns:
list: List of paths, each path is list of node indices
"""
def all_shortest_paths(graph, source: int, target: int, weight_fn = None, default_weight: float = 1.0, as_undirected: bool = False) -> list:
"""
Find all shortest paths between two specific nodes.
Parameters:
- graph: Input graph
- source (int): Source node index
- target (int): Target node index
- weight_fn (callable, optional): Function to extract edge weight
- default_weight (float): Default edge weight
- as_undirected (bool): Treat directed graph as undirected
Returns:
list: List of shortest paths (each path is list of node indices)
"""
def single_source_all_shortest_paths(graph, source: int, weight_fn = None, default_weight: float = 1.0, as_undirected: bool = False) -> dict:
"""
All shortest paths from single source to all other nodes.
Parameters:
- graph: Input graph
- source (int): Source node index
- weight_fn (callable, optional): Function to extract edge weight
- default_weight (float): Default edge weight
- as_undirected (bool): Treat directed graph as undirected
Returns:
dict: Mapping of target nodes to lists of shortest paths
"""
def k_shortest_path_lengths(graph, start: int, k: int, edge_cost, goal: int = None) -> dict:
"""
Compute lengths of k-th shortest paths.
Parameters:
- graph: Input graph
- start (int): Source node index
- k (int): Which shortest path to find (1st, 2nd, etc.)
- edge_cost (callable): Function returning edge cost
- goal (int, optional): Target node
Returns:
dict: Mapping of destination nodes to k-th shortest path length
"""
def has_path(graph, source: int, target: int, as_undirected: bool = False) -> bool:
"""
Check if path exists between two nodes.
Parameters:
- graph: Input graph
- source (int): Source node index
- target (int): Target node index
- as_undirected (bool): Treat directed graph as undirected
Returns:
bool: True if path exists, False otherwise
"""Functions for computing graph distances and analyzing connectivity patterns.
def distance_matrix(graph, parallel_threshold: int = 300, as_undirected: bool = False, null_value: float = 0.0):
"""
Compute unweighted distance matrix (each edge has distance 1).
Parameters:
- graph: Input graph
- parallel_threshold (int): Node count for parallel execution
- as_undirected (bool): Treat directed graph as undirected
- null_value (float): Value for absent edges
Returns:
numpy.ndarray: Distance matrix
"""
def unweighted_average_shortest_path_length(graph, parallel_threshold: int = 300, disconnected: bool = False) -> float:
"""
Average shortest path length with unweighted edges.
Parameters:
- graph: Input graph
- parallel_threshold (int): Node count for parallel execution
- disconnected (bool): Include only connected pairs if True
Returns:
float: Average shortest path length
"""
def num_shortest_paths_unweighted(graph, source: int):
"""
Count unweighted shortest paths from source node.
Parameters:
- graph: Input graph
- source (int): Source node index
Returns:
NodesCountMapping: Mapping of target nodes to path counts
"""import rustworkx as rx
# Create weighted graph
graph = rx.PyGraph()
nodes = graph.add_nodes_from(['A', 'B', 'C', 'D'])
graph.add_edges_from([
(nodes[0], nodes[1], 4.0),
(nodes[1], nodes[2], 2.0),
(nodes[2], nodes[3], 3.0),
(nodes[0], nodes[3], 10.0)
])
# Find shortest paths from node 0
paths = rx.dijkstra_shortest_paths(graph, nodes[0])
print(f"Paths from A: {paths}")
# Get only path lengths
lengths = rx.dijkstra_shortest_path_lengths(
graph, nodes[0],
edge_cost_fn=lambda x: x
)
print(f"Path lengths: {lengths}")# Graph with negative weights
digraph = rx.PyDiGraph()
nodes = digraph.add_nodes_from(['S', 'A', 'B', 'T'])
digraph.add_edges_from([
(nodes[0], nodes[1], 1.0),
(nodes[1], nodes[2], -3.0), # Negative weight
(nodes[2], nodes[3], 2.0),
(nodes[0], nodes[3], 5.0)
])
try:
paths = rx.bellman_ford_shortest_paths(
digraph, nodes[0],
weight_fn=lambda x: x
)
print(f"Shortest paths: {paths}")
except rx.NegativeCycle as e:
print(f"Negative cycle detected: {e}")# Grid graph for A* demonstration
def manhattan_distance(node_data, target_pos):
"""Manhattan distance heuristic."""
x1, y1 = node_data
x2, y2 = target_pos
return abs(x1 - x2) + abs(y1 - y2)
grid = rx.generators.grid_graph(5, 5)
target_node = 24 # Bottom-right corner
target_pos = (4, 4)
path = rx.astar_shortest_path(
grid,
node=0, # Top-left corner
goal_fn=lambda node: node == target_node,
edge_cost_fn=lambda edge: 1.0,
estimate_cost_fn=lambda node: manhattan_distance(node, target_pos)
)
print(f"A* path: {path}")# Compute all-pairs shortest paths
all_paths = rx.all_pairs_dijkstra_shortest_paths(
graph,
edge_cost_fn=lambda x: x
)
# Access specific path
path_0_to_3 = all_paths[nodes[0]][nodes[3]]
print(f"Path from A to D: {path_0_to_3}")
# Get distance matrix for analysis
distances = rx.floyd_warshall_numpy(graph, weight_fn=lambda x: x)
print(f"Distance matrix:\n{distances}")Install with Tessl CLI
npx tessl i tessl/pypi-rustworkx