Python library for importing XDF (Extensible Data Format) files used in neuroscience and biosignal research
npx @tessl/cli install tessl/pypi-pyxdf@1.17.0A Python library for importing XDF (Extensible Data Format) files commonly used in neuroscience and biosignal research. PyXDF provides a simple interface to load multi-stream time-series data recorded from Lab Streaming Layer (LSL) systems, supporting various data formats and advanced processing features like clock synchronization and jitter removal.
pip install pyxdfimport pyxdfDirect function imports:
from pyxdf import load_xdf, resolve_streams, match_streaminfosAdvanced imports (for low-level operations):
from pyxdf.pyxdf import open_xdf, parse_xdf, parse_chunksimport pyxdf
import matplotlib.pyplot as plt
import numpy as np
# Load an XDF file
streams, header = pyxdf.load_xdf("recording.xdf")
# Process each stream
for stream in streams:
y = stream["time_series"]
if isinstance(y, list):
# String markers - draw vertical lines
for timestamp, marker in zip(stream["time_stamps"], y):
plt.axvline(x=timestamp)
print(f'Marker "{marker[0]}" @ {timestamp:.2f}s')
elif isinstance(y, np.ndarray):
# Numeric data - plot as lines
plt.plot(stream["time_stamps"], y)
else:
raise RuntimeError("Unknown stream format")
plt.show()PyXDF operates on the XDF (Extensible Data Format) specification, processing multi-stream recordings with:
The library handles file corruption gracefully, supports compressed files (.xdfz), and provides advanced processing options for research applications requiring high temporal precision.
Core functionality for importing XDF files with comprehensive data processing, stream selection, and timing corrections.
def load_xdf(
filename,
select_streams=None,
*,
on_chunk=None,
synchronize_clocks=True,
handle_clock_resets=True,
dejitter_timestamps=True,
jitter_break_threshold_seconds=1,
jitter_break_threshold_samples=500,
clock_reset_threshold_seconds=5,
clock_reset_threshold_stds=5,
clock_reset_threshold_offset_seconds=1,
clock_reset_threshold_offset_stds=10,
winsor_threshold=0.0001,
verbose=None,
):
"""
Import an XDF file with optional stream selection and processing.
Args:
filename (str): Path to XDF file (*.xdf or *.xdfz)
select_streams (int | list[int] | list[dict] | None): Stream selection criteria
on_chunk (callable, optional): Callback function for chunk processing
synchronize_clocks (bool): Enable clock synchronization (default: True)
handle_clock_resets (bool): Handle computer restarts during recording (default: True)
dejitter_timestamps (bool): Perform jitter removal for regular streams (default: True)
jitter_break_threshold_seconds (float): Break detection threshold in seconds (default: 1)
jitter_break_threshold_samples (int): Break detection threshold in samples (default: 500)
clock_reset_threshold_seconds (float): Clock reset detection threshold (default: 5)
clock_reset_threshold_stds (float): Reset detection in standard deviations (default: 5)
clock_reset_threshold_offset_seconds (float): Offset threshold for resets (default: 1)
clock_reset_threshold_offset_stds (float): Offset threshold in stds (default: 10)
winsor_threshold (float): Robust fitting threshold (default: 0.0001)
verbose (bool | None): Logging level control
Returns:
tuple[list[dict], dict]: (streams, fileheader)
- streams: List of stream dictionaries
- fileheader: File header metadata
"""Each stream in the returned list contains:
# Stream dictionary structure
stream = {
"time_series": Union[np.ndarray, list], # Channel x Sample data or string markers
"time_stamps": np.ndarray, # Sample timestamps (synchronized)
"info": { # Stream metadata
"name": list[str], # Stream name
"type": list[str], # Content type (EEG, Events, etc.)
"channel_count": list[str], # Number of channels
"channel_format": list[str], # Data format (int8, float32, etc.)
"nominal_srate": list[str], # Declared sampling rate
"effective_srate": float, # Measured sampling rate
"stream_id": int, # Unique stream identifier
"segments": list[tuple[int, int]], # Data break segments (start, end)
"desc": dict, # Domain-specific metadata
},
"clock_times": list[float], # Clock measurement times
"clock_values": list[float], # Clock offset values
}int8, int16, int32, int64, float32, double64.xdf) and gzip-compressed (.xdfz, .xdf.gz)Utilities for discovering streams in XDF files and selecting streams based on criteria.
def resolve_streams(fname):
"""
Resolve streams in given XDF file without loading data.
Args:
fname (str): Path to XDF file
Returns:
list[dict]: Stream information dictionaries with metadata
"""
def match_streaminfos(stream_infos, parameters):
"""
Find stream IDs matching specified criteria.
Args:
stream_infos (list[dict]): Stream information from resolve_streams
parameters (list[dict]): Matching criteria as key-value pairs
Returns:
list[int]: Stream IDs matching all criteria
Examples:
# Match streams by name
match_streaminfos(infos, [{"name": "EEG"}])
# Match by type and name
match_streaminfos(infos, [{"type": "EEG", "name": "ActiChamp"}])
# Match multiple criteria (OR logic)
match_streaminfos(infos, [{"type": "EEG"}, {"name": "Markers"}])
"""Advanced utilities for direct XDF file handling and chunk-level processing.
def open_xdf(file):
"""
Open XDF file for reading with format validation.
Args:
file (str | pathlib.Path | io.RawIOBase): File path or opened binary file handle
Returns:
io.BufferedReader | gzip.GzipFile: Opened file handle positioned after magic bytes
Raises:
IOError: If file is not a valid XDF file (missing XDF: magic bytes)
ValueError: If file handle is opened in text mode
Exception: If file does not exist
"""
def parse_xdf(fname):
"""
Parse and return all chunks from an XDF file without processing.
Args:
fname (str): Path to XDF file
Returns:
list[dict]: Raw chunks containing headers, samples, and metadata
"""
def parse_chunks(chunks):
"""
Extract stream information from parsed XDF chunks.
Args:
chunks (list[dict]): Raw chunks from parse_xdf
Returns:
list[dict]: Stream metadata dictionaries suitable for resolve_streams
"""# Load specific stream by ID
streams, _ = pyxdf.load_xdf("file.xdf", select_streams=5)
# Load multiple streams by ID
streams, _ = pyxdf.load_xdf("file.xdf", select_streams=[1, 3, 5])
# Load streams by criteria
streams, _ = pyxdf.load_xdf("file.xdf", select_streams=[{"type": "EEG"}])
# Load streams matching name and type
criteria = [{"type": "EEG", "name": "BrainAmp"}]
streams, _ = pyxdf.load_xdf("file.xdf", select_streams=criteria)Python modules providing command-line utilities for XDF file inspection and playback.
# python -m pyxdf.cli.print_metadata -f=/path/to/file.xdfPrints stream metadata including:
# python -m pyxdf.cli.playback_lsl filename [options]Replays XDF data over Lab Streaming Layer (LSL) streams in real-time with configurable options:
Parameters:
filename (str): Path to the XDF file to playback (required)--playback_speed (float): Playback speed multiplier (default: 1.0)--loop: Loop playback of the file continuously (flag, default: False)--wait_for_consumer: Wait for LSL consumer before starting playback (flag, default: False)Features:
# Basic playback
python -m pyxdf.cli.playback_lsl recording.xdf
# Loop mode with 2x speed
python -m pyxdf.cli.playback_lsl recording.xdf --playback_speed 2.0 --loop
# Wait for consumers before starting
python -m pyxdf.cli.playback_lsl recording.xdf --wait_for_consumer
# Slow motion playback at half speed
python -m pyxdf.cli.playback_lsl recording.xdf --playback_speed 0.5def process_chunk(data, timestamps, info, stream_id):
"""Custom chunk processing callback."""
# Apply real-time filtering, downsampling, etc.
if info["type"][0] == "EEG":
# Apply notch filter to EEG data
filtered_data = apply_notch_filter(data, 60.0) # Remove 60Hz noise
return filtered_data, timestamps, info
return data, timestamps, info
# Load with custom processing
streams, _ = pyxdf.load_xdf("recording.xdf", on_chunk=process_chunk)# Load with custom break detection
streams, _ = pyxdf.load_xdf(
"recording.xdf",
jitter_break_threshold_seconds=0.5, # Detect 500ms breaks
jitter_break_threshold_samples=100 # Or 100-sample breaks
)
# Process segments separately
for stream in streams:
for start_idx, end_idx in stream["info"]["segments"]:
segment_data = stream["time_series"][start_idx:end_idx+1]
segment_times = stream["time_stamps"][start_idx:end_idx+1]
# Process each continuous segment
process_segment(segment_data, segment_times)# Disable automatic processing for manual control
streams, _ = pyxdf.load_xdf(
"recording.xdf",
synchronize_clocks=False, # Skip automatic sync
dejitter_timestamps=False, # Skip jitter removal
verbose=True # Enable debug logging
)
# Access raw clock information
for stream in streams:
clock_times = stream["clock_times"]
clock_values = stream["clock_values"]
# Implement custom synchronization
custom_sync_timestamps = apply_custom_sync(
stream["time_stamps"], clock_times, clock_values
)PyXDF includes robust error handling for common issues:
try:
streams, header = pyxdf.load_xdf("corrupted.xdf")
except IOError as e:
print(f"File error: {e}")
# Raised for invalid XDF files (missing magic bytes) or file access issues
except ValueError as e:
print(f"Invalid stream selection: {e}")
# Raised for malformed select_streams parameter or no matching streams
except FileNotFoundError as e:
print(f"File not found: {e}")
# Raised when XDF file doesn't exist
except struct.error as e:
print(f"Data corruption detected: {e}")
# Raised for corrupted binary data, library attempts recovery
except Exception as e:
print(f"Parsing error: {e}")
# General parsing errors - library attempts to recover and load partial dataError Recovery Mechanisms:
PyXDF automatically handles many failure scenarios:
Specific Error Conditions:
ValueError("No matching streams found.") - When select_streams criteria match no streamsValueError("Argument 'select_streams' must be...") - Invalid select_streams parameter formatIOError("Invalid XDF file") - File doesn't start with "XDF:" magic bytesValueError("file has to be opened in binary mode") - Text mode file handle passed to open_xdfException("file does not exist") - File path doesn't exist when using open_xdfEOFError - Unexpected end of file, handled gracefully with partial data recovery# Type annotations for main function parameters
filename: Union[str, pathlib.Path]
select_streams: Union[None, int, list[int], list[dict]]
on_chunk: Union[None, Callable[[np.ndarray, np.ndarray, dict, int], tuple[np.ndarray, np.ndarray, dict]]]
# Stream selection criteria format
stream_criteria: dict[str, str] # e.g., {"type": "EEG", "name": "BrainAmp"}
# Stream info structure from resolve_streams
StreamInfo = {
"stream_id": int,
"name": str,
"type": str,
"source_id": str,
"created_at": str,
"uid": str,
"session_id": str,
"hostname": str,
"channel_count": int,
"channel_format": str,
"nominal_srate": float,
}