Python package for reading, writing, and processing physiologic signals and annotations in WFDB format.
Comprehensive signal processing tools for physiological data including resampling, filtering, peak detection, QRS detection, heart rate analysis, and signal evaluation. This module provides algorithms specifically designed for biomedical signal analysis and cardiovascular data processing.
Fundamental signal processing operations including resampling, normalization, and filtering utilities.
def resample_sig(x: np.ndarray, fs: float, fs_target: float) -> Tuple[np.ndarray, np.ndarray]:
"""
Resample signal to different sampling frequency.
Parameters:
- x: ndarray, input signal
- fs: float, original sampling frequency in Hz
- fs_target: float, target sampling frequency in Hz
Returns:
- resampled_x: ndarray, resampled signal
- resampled_t: ndarray, resampled time points
"""
def resample_ann(ann_sample: Union[List[int], np.ndarray], fs: float,
fs_target: float) -> np.ndarray:
"""
Resample annotation locations to match new sampling frequency.
Parameters:
- ann_sample: array-like, original annotation sample locations
- fs: float, original sampling frequency
- fs_target: float, target sampling frequency
Returns:
ndarray of resampled annotation locations
"""
def resample_singlechan(x: np.ndarray, ann: Annotation, fs: float,
fs_target: float) -> Tuple[np.ndarray, Annotation]:
"""
Resample single channel signal with annotations.
Parameters:
- x: ndarray, single channel signal
- ann: Annotation, annotation object
- fs: float, original sampling frequency
- fs_target: float, target sampling frequency
Returns:
- resampled_x: ndarray, resampled signal
- resampled_ann: Annotation, resampled annotations
"""
def resample_multichan(xs: np.ndarray, ann: Annotation, fs: float,
fs_target: float, resamp_ann_chan: int = 0) -> Tuple[np.ndarray, Annotation]:
"""
Resample multiple channel signals with annotations.
Parameters:
- xs: ndarray, multi-channel signals (MxN array)
- ann: Annotation, annotation object
- fs: float, original sampling frequency
- fs_target: float, target sampling frequency
- resamp_ann_chan: int, channel to use for annotation resampling
Returns:
- resampled_xs: ndarray, resampled signals
- resampled_ann: Annotation, resampled annotations
"""
def normalize_bound(sig: np.ndarray, lb: float = 0, ub: float = 1) -> np.ndarray:
"""
Normalize signal values between specified bounds.
Parameters:
- sig: ndarray, input signal
- lb: float, lower bound (default: 0)
- ub: float, upper bound (default: 1)
Returns:
ndarray of normalized signal values
"""
def get_filter_gain(b: np.ndarray, a: np.ndarray, f_gain: float, fs: float) -> float:
"""
Calculate filter gain at specific frequency.
Parameters:
- b: ndarray, numerator filter coefficients
- a: ndarray, denominator filter coefficients
- f_gain: float, frequency to calculate gain at
- fs: float, sampling frequency
Returns:
Filter gain at specified frequency
"""Algorithms for detecting peaks in physiological signals with various detection strategies.
def find_peaks(sig: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Find hard and soft peaks in signal using derivative-based method.
Parameters:
- sig: ndarray, input signal
Returns:
- hard_peaks: ndarray, indices of hard peaks (strong local maxima)
- soft_peaks: ndarray, indices of soft peaks (weak local maxima)
"""
def find_local_peaks(sig: np.ndarray, radius: int) -> np.ndarray:
"""
Find local peaks within specified radius.
Parameters:
- sig: ndarray, input signal
- radius: int, search radius for local maxima
Returns:
ndarray of peak indices
"""
def correct_peaks(sig: np.ndarray, peak_inds: np.ndarray, search_radius: int,
smooth_window_size: int, peak_dir: str = "compare") -> np.ndarray:
"""
Adjust peak locations to local maxima or minima.
Parameters:
- sig: ndarray, input signal
- peak_inds: ndarray, initial peak indices
- search_radius: int, radius to search for corrections
- smooth_window_size: int, window size for smoothing
- peak_dir: str, direction to search ("up", "down", "compare")
Returns:
ndarray of corrected peak indices
"""Specialized algorithms for detecting QRS complexes in ECG signals.
def xqrs_detect(sig: np.ndarray, fs: float = 250, sampfrom: int = 0,
sampto: Union[int, str] = 'end', conf: Dict = None,
learn: bool = True, verbose: bool = True) -> np.ndarray:
"""
Detect QRS complexes using XQRS algorithm.
Parameters:
- sig: ndarray, input ECG signal
- fs: float, sampling frequency in Hz
- sampfrom: int, starting sample for detection
- sampto: int or 'end', ending sample for detection
- conf: dict, configuration parameters for XQRS
- learn: bool, enable learning mode for adaptation
- verbose: bool, print detection progress information
Returns:
ndarray of QRS peak sample locations
"""
def gqrs_detect(sig: np.ndarray, fs: float = 250, adcgain: float = None,
adczero: float = None, threshold: float = 1.0, hr: int = 75,
RRdelta: float = 0.2, RRmin: float = 0.28, RRmax: float = 2.4,
QS: float = 0.07, QT: float = 0.35, RTmin: float = 0.25,
RTmax: float = 0.33, QRSa: float = 750, QRSamin: float = 130) -> np.ndarray:
"""
Detect QRS complexes using GQRS algorithm.
Parameters:
- sig: ndarray, input ECG signal
- fs: float, sampling frequency in Hz
- adcgain: float, ADC gain value
- adczero: float, ADC zero level
- threshold: float, detection threshold
- hr: int, expected heart rate in BPM
- RRdelta: float, RR interval variation tolerance
- RRmin: float, minimum RR interval in seconds
- RRmax: float, maximum RR interval in seconds
- QS: float, QRS onset to peak time in seconds
- QT: float, QT interval duration in seconds
- RTmin: float, minimum RT interval in seconds
- RTmax: float, maximum RT interval in seconds
- QRSa: float, QRS area threshold
- QRSamin: float, minimum QRS area
Returns:
ndarray of QRS peak sample locations
"""Functions for calculating heart rate metrics and RR interval analysis.
def compute_hr(sig_len: int, qrs_inds: np.ndarray, fs: float) -> np.ndarray:
"""
Calculate instantaneous heart rate from QRS locations.
Parameters:
- sig_len: int, total signal length in samples
- qrs_inds: ndarray, QRS peak sample locations
- fs: float, sampling frequency in Hz
Returns:
ndarray of instantaneous heart rate values in BPM
"""
def calc_rr(sample: Union[List[int], np.ndarray], fs: float = None,
ann_style: str = 'index', time_warn: bool = True) -> np.ndarray:
"""
Calculate RR intervals from sample locations.
Parameters:
- sample: array-like, sample locations or time values
- fs: float, sampling frequency (required if ann_style='index')
- ann_style: str, 'index' for sample indices or 'time' for time values
- time_warn: bool, warn about time format assumptions
Returns:
ndarray of RR intervals in seconds
"""
def calc_mean_hr(rr: np.ndarray, fs: float = None, min_rr: float = None,
max_rr: float = None, rr_units: str = "samples") -> float:
"""
Calculate mean heart rate from RR intervals.
Parameters:
- rr: ndarray, RR intervals
- fs: float, sampling frequency (if rr_units='samples')
- min_rr: float, minimum acceptable RR interval
- max_rr: float, maximum acceptable RR interval
- rr_units: str, 'samples' or 'seconds'
Returns:
Mean heart rate in BPM
"""
def ann2rr(record_name: str, extension: str, pn_dir: str = None,
channel: Union[str, int] = 'all', sampfrom: int = 0,
sampto: Union[int, str] = 'end', shift_samps: bool = False,
return_anns: bool = False, ann_style: str = 'index') -> Union[np.ndarray, Tuple[np.ndarray, Annotation]]:
"""
Convert annotations to RR intervals.
Parameters:
- record_name: str, record name
- extension: str, annotation extension
- pn_dir: str, PhysioNet database directory
- channel: str or int, channel selection ('all' or specific channel)
- sampfrom: int, starting sample
- sampto: int or 'end', ending sample
- shift_samps: bool, shift sample numbers
- return_anns: bool, also return annotation object
- ann_style: str, 'index' or 'time' format
Returns:
RR intervals array, optionally with annotation object
"""
def rr2ann(rr_array: np.ndarray, record_name: str, extension: str,
fs: float = 250, as_time: bool = False) -> None:
"""
Convert RR intervals to annotation file.
Parameters:
- rr_array: ndarray, RR intervals in seconds
- record_name: str, output record name
- extension: str, output annotation extension
- fs: float, sampling frequency for conversion
- as_time: bool, treat RR intervals as time values
"""Tools for comparing and evaluating signal processing results.
def compare_annotations(ref_sample: np.ndarray, test_sample: np.ndarray,
window_width: float, signal: np.ndarray = None) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Compare reference and test annotations for accuracy evaluation.
Parameters:
- ref_sample: ndarray, reference annotation sample locations
- test_sample: ndarray, test annotation sample locations
- window_width: float, matching window width in samples
- signal: ndarray, optional signal for additional analysis
Returns:
- true_positives: ndarray, matched test annotations
- false_positives: ndarray, unmatched test annotations
- false_negatives: ndarray, unmatched reference annotations
"""
def benchmark_mitdb(detector: Callable, verbose: bool = False,
print_results: bool = False) -> Dict[str, float]:
"""
Benchmark detector performance on MIT-BIH Arrhythmia Database.
Parameters:
- detector: callable, detection function to benchmark
- verbose: bool, print detailed progress information
- print_results: bool, print benchmark results
Returns:
Dictionary with performance metrics (sensitivity, PPV, F1-score, etc.)
"""Signal filtering and averaging operations.
def sigavg(sig_list: List[np.ndarray], mode: str = 'auto') -> np.ndarray:
"""
Signal averaging/ensemble averaging.
Parameters:
- sig_list: list of ndarray, signals to average
- mode: str, averaging mode ('auto', 'mean', 'median')
Returns:
ndarray of averaged signal
"""class XQRS:
"""Configurable QRS detector class."""
def __init__(self, sig: np.ndarray, fs: float = 250):
"""
Initialize XQRS detector.
Parameters:
- sig: ndarray, input ECG signal
- fs: float, sampling frequency in Hz
"""
def detect(self, sig: np.ndarray, verbose: bool = False) -> np.ndarray:
"""
Detect QRS complexes in signal.
Parameters:
- sig: ndarray, input ECG signal
- verbose: bool, print detection progress
Returns:
ndarray of QRS peak sample locations
"""
class Comparitor:
"""Class for comparing annotation sets."""
def compare_annotations(self, ref_sample: np.ndarray, test_sample: np.ndarray,
window_width: float, signal: np.ndarray = None) -> Dict[str, np.ndarray]:
"""
Compare reference and test annotations.
Parameters:
- ref_sample: ndarray, reference annotations
- test_sample: ndarray, test annotations
- window_width: float, matching window width
- signal: ndarray, optional signal data
Returns:
Dictionary with comparison results
"""import wfdb
import numpy as np
# Read ECG record
record = wfdb.rdrecord('100', pn_dir='mitdb', channels=[0])
ecg_signal = record.p_signal[:, 0]
# Detect QRS complexes using XQRS
qrs_inds = wfdb.processing.xqrs_detect(ecg_signal, fs=record.fs)
print(f"Detected {len(qrs_inds)} QRS complexes")
# Alternative: use GQRS algorithm
qrs_inds_gqrs = wfdb.processing.gqrs_detect(ecg_signal, fs=record.fs)
# Use XQRS class for more control
detector = wfdb.processing.XQRS(ecg_signal, fs=record.fs)
qrs_inds_class = detector.detect(ecg_signal, verbose=True)import wfdb
import numpy as np
# Read record and detect QRS
record = wfdb.rdrecord('100', pn_dir='mitdb')
qrs_inds = wfdb.processing.xqrs_detect(record.p_signal[:, 0], fs=record.fs)
# Calculate instantaneous heart rate
hr = wfdb.processing.compute_hr(record.sig_len, qrs_inds, record.fs)
print(f"Mean HR: {np.mean(hr):.1f} BPM")
print(f"HR range: {np.min(hr):.1f} - {np.max(hr):.1f} BPM")
# Calculate RR intervals
rr_intervals = wfdb.processing.calc_rr(qrs_inds, fs=record.fs, ann_style='index')
print(f"Mean RR: {np.mean(rr_intervals):.3f} seconds")
# Mean heart rate from RR intervals
mean_hr = wfdb.processing.calc_mean_hr(rr_intervals, rr_units='seconds')
print(f"Mean HR from RR: {mean_hr:.1f} BPM")import wfdb
import numpy as np
# Read signal at original sampling rate
record = wfdb.rdrecord('100', pn_dir='mitdb') # 360 Hz
original_signal = record.p_signal[:, 0]
# Resample to different frequency
target_fs = 250 # Hz
resampled_sig, resampled_t = wfdb.processing.resample_sig(
original_signal, record.fs, target_fs)
print(f"Original: {len(original_signal)} samples at {record.fs} Hz")
print(f"Resampled: {len(resampled_sig)} samples at {target_fs} Hz")
# Normalize signal
normalized_sig = wfdb.processing.normalize_bound(resampled_sig, lb=-1, ub=1)import wfdb
import numpy as np
# Read signal
record = wfdb.rdrecord('100', pn_dir='mitdb', channels=[0])
signal = record.p_signal[:, 0]
# Find peaks using derivative method
hard_peaks, soft_peaks = wfdb.processing.find_peaks(signal)
print(f"Hard peaks: {len(hard_peaks)}, Soft peaks: {len(soft_peaks)}")
# Find local peaks with radius
local_peaks = wfdb.processing.find_local_peaks(signal, radius=10)
# Correct peak locations to local maxima
corrected_peaks = wfdb.processing.correct_peaks(
signal, hard_peaks, search_radius=5, smooth_window_size=3)import wfdb
# Read reference annotations
ref_ann = wfdb.rdann('100', 'atr', pn_dir='mitdb')
ref_qrs = ref_ann.sample
# Detect QRS using algorithm
record = wfdb.rdrecord('100', pn_dir='mitdb', channels=[0])
test_qrs = wfdb.processing.xqrs_detect(record.p_signal[:, 0], fs=record.fs)
# Compare results
tp, fp, fn = wfdb.processing.compare_annotations(
ref_qrs, test_qrs, window_width=50) # 50 sample tolerance
# Calculate metrics
sensitivity = len(tp) / (len(tp) + len(fn))
ppv = len(tp) / (len(tp) + len(fp))
print(f"Sensitivity: {sensitivity:.3f}")
print(f"Positive Predictive Value: {ppv:.3f}")Install with Tessl CLI
npx tessl i tessl/pypi-wfdb