Sampling profiler for Python programs written in Rust with extremely low overhead
—
Programmatic profiling interface for integrating py-spy functionality into Rust applications. Provides both one-shot profiling and continuous sampling capabilities with full control over configuration and output processing.
Main profiler object for programmatic access to Python process profiling. Handles process attachment, memory access, and stack trace extraction.
pub struct PythonSpy {
// All fields are private - access through methods only
}
impl PythonSpy {
/// Creates a new PythonSpy object for the given process ID
pub fn new(pid: Pid, config: &Config) -> Result<PythonSpy, anyhow::Error>;
/// Attempts to create PythonSpy with retries (useful for processes still starting)
pub fn retry_new(pid: Pid, config: &Config, max_retries: u64) -> Result<PythonSpy, anyhow::Error>;
/// Gets current stack traces for all threads in the Python process
pub fn get_stack_traces(&mut self) -> Result<Vec<StackTrace>, anyhow::Error>;
}Usage Example:
use py_spy::{Config, PythonSpy, Pid};
use anyhow::Error;
fn analyze_python_process(pid: Pid) -> Result<(), Error> {
let config = Config::default();
let mut spy = PythonSpy::new(pid, &config)?;
let traces = spy.get_stack_traces()?;
for trace in traces {
println!("Thread {}: {} frames", trace.thread_id, trace.frames.len());
if trace.owns_gil {
println!(" (holds GIL)");
}
}
Ok(())
}Iterator-based continuous sampling for long-running profiling sessions. Handles timing, error recovery, and subprocess monitoring.
pub struct Sampler {
pub version: Option<Version>,
// Private fields...
}
impl Sampler {
/// Creates a new sampler for the given process
pub fn new(pid: Pid, config: &Config) -> Result<Sampler, anyhow::Error>;
}
impl Iterator for Sampler {
type Item = Sample;
fn next(&mut self) -> Option<Self::Item>;
}Usage Example:
use py_spy::{Config, Pid};
use py_spy::sampler::Sampler;
use std::time::Duration;
use anyhow::Error;
fn continuous_profiling(pid: Pid) -> Result<(), Error> {
let mut config = Config::default();
config.sampling_rate = 100; // 100 Hz
let sampler = Sampler::new(pid, &config)?;
for sample in sampler.take(1000) { // Collect 1000 samples
if let Some(delay) = sample.late {
if delay > Duration::from_millis(10) {
println!("Warning: sampling delay of {:?}", delay);
}
}
if let Some(errors) = sample.sampling_errors {
for (pid, error) in errors {
eprintln!("Sampling error for PID {}: {}", pid, error);
}
}
// Process traces
for trace in sample.traces {
if trace.active && trace.owns_gil {
// Analyze active GIL-holding threads
analyze_trace(&trace);
}
}
}
Ok(())
}Single sampling result containing stack traces and metadata from one sampling interval.
pub struct Sample {
/// Stack traces collected during this sampling interval
pub traces: Vec<StackTrace>,
/// Sampling errors that occurred (per process if using --subprocesses)
pub sampling_errors: Option<Vec<(Pid, anyhow::Error)>>,
/// Delay if sampling was late (indicates system load)
pub late: Option<Duration>,
}Core data structures for representing and analyzing Python call stacks.
pub struct StackTrace {
/// Process ID that generated this stack trace
pub pid: Pid,
/// Python thread ID (not OS thread ID)
pub thread_id: u64,
/// Python thread name if available
pub thread_name: Option<String>,
/// Operating system thread ID
pub os_thread_id: Option<u64>,
/// Whether the thread was active (not idle/sleeping)
pub active: bool,
/// Whether the thread owns the Global Interpreter Lock
pub owns_gil: bool,
/// Stack frames from top (most recent) to bottom (oldest)
pub frames: Vec<Frame>,
/// Process information for subprocess profiling
pub process_info: Option<Arc<ProcessInfo>>,
}
impl StackTrace {
/// Returns human-readable thread status string
pub fn status_str(&self) -> &str;
/// Formats thread ID for display
pub fn format_threadid(&self) -> String;
}Individual function call information within a stack trace.
pub struct Frame {
/// Function or method name
pub name: String,
/// Full path to source file
pub filename: String,
/// Module or shared library name
pub module: Option<String>,
/// Shortened filename for display
pub short_filename: Option<String>,
/// Line number within the file (0 for native frames without debug info)
pub line: i32,
/// Local variables if collected
pub locals: Option<Vec<LocalVariable>>,
/// Whether this is an entry frame (Python 3.11+)
pub is_entry: bool,
/// Whether this is a shim entry (Python 3.12+)
pub is_shim_entry: bool,
}Variable information collected when dump_locals is enabled.
pub struct LocalVariable {
/// Variable name
pub name: String,
/// Memory address of the variable
pub addr: usize,
/// Whether this is a function argument
pub arg: bool,
/// String representation of the variable value
pub repr: Option<String>,
}Process metadata for subprocess profiling scenarios.
pub struct ProcessInfo {
/// Process ID
pub pid: Pid,
/// Command line arguments
pub command_line: String,
/// Parent process information (for process trees)
pub parent: Option<Box<ProcessInfo>>,
}
impl ProcessInfo {
/// Converts to a Frame for flamegraph output
pub fn to_frame(&self) -> Frame;
}use py_spy::{Config, PythonSpy, Pid};
use std::thread;
use std::time::Duration;
use anyhow::Error;
fn robust_profiling(pid: Pid) -> Result<(), Error> {
let config = Config::default();
// Retry connection for processes that might be starting up
let mut spy = match PythonSpy::retry_new(pid, &config, 10) {
Ok(spy) => spy,
Err(e) => {
eprintln!("Failed to attach to process {}: {}", pid, e);
return Err(e.into());
}
};
// Collect samples with error handling
for attempt in 0..100 {
match spy.get_stack_traces() {
Ok(traces) => {
if traces.is_empty() {
println!("No active threads found (attempt {})", attempt + 1);
} else {
process_traces(traces);
}
}
Err(e) => {
eprintln!("Sampling error: {}", e);
// Check if process still exists
if spy.process.exe().is_err() {
println!("Process {} has exited", pid);
break;
}
}
}
thread::sleep(Duration::from_millis(100));
}
Ok(())
}use py_spy::{Config, Frame, Pid};
use py_spy::sampler::Sampler;
use std::collections::HashMap;
use anyhow::Error;
fn custom_analysis(pid: Pid) -> Result<(), Error> {
let config = Config::default();
let sampler = Sampler::new(pid, &config)?;
let mut function_counts: HashMap<String, u64> = HashMap::new();
let mut total_samples = 0;
for sample in sampler.take(1000) {
total_samples += 1;
for trace in sample.traces {
if trace.active && !trace.frames.is_empty() {
let top_frame = &trace.frames[0];
let function_key = format!("{}:{}", top_frame.filename, top_frame.name);
*function_counts.entry(function_key).or_insert(0) += 1;
}
}
}
// Print top functions by sample count
let mut sorted_functions: Vec<_> = function_counts.into_iter().collect();
sorted_functions.sort_by_key(|(_, count)| std::cmp::Reverse(*count));
println!("Top functions by sample count (out of {} total samples):", total_samples);
for (function, count) in sorted_functions.iter().take(10) {
let percentage = (*count as f64 / total_samples as f64) * 100.0;
println!(" {:.1}% - {}", percentage, function);
}
Ok(())
}use py_spy::{Config, LockingStrategy, PythonSpy, Pid};
use anyhow::Error;
fn profile_with_native_extensions(pid: Pid) -> Result<(), Error> {
let mut config = Config::default();
config.native = true; // Enable native stack traces
config.blocking = LockingStrategy::Lock; // Required for native profiling
let mut spy = PythonSpy::new(pid, &config)?;
let traces = spy.get_stack_traces()?;
for trace in traces {
println!("Thread {} ({} frames):", trace.thread_id, trace.frames.len());
for frame in &trace.frames {
if frame.module.is_some() {
println!(" [NATIVE] {} in {:?}", frame.name, frame.module);
} else {
println!(" [PYTHON] {} ({}:{})", frame.name, frame.filename, frame.line);
}
}
}
Ok(())
}Install with Tessl CLI
npx tessl i tessl/pypi-py-spy