A comprehensive Python library for representing electrophysiology data with support for reading and writing a wide range of neurophysiology file formats.
—
High-performance, memory-efficient access to electrophysiology files through Neo's RawIO interface. This low-level API provides direct access to signal chunks, header information, and metadata without creating full Neo objects, enabling efficient processing of large datasets and real-time applications.
Functions for automatically detecting and instantiating appropriate RawIO classes.
def get_rawio(filename):
"""
Auto-detect file format and return appropriate RawIO class instance.
Parameters:
- filename (str): Path to file or directory
Returns:
RawIO instance for the detected file format
Raises:
IOError: If format cannot be determined or no compatible RawIO found
"""Base interface shared by all RawIO classes for consistent low-level data access.
class BaseRawIO:
"""Base class defining the RawIO interface for all file formats."""
def parse_header(self):
"""Parse file header and extract metadata."""
def get_signal_size(self, block_index, seg_index, channel_indexes=None):
"""
Get the size of analog signal data.
Parameters:
- block_index (int): Block index in file
- seg_index (int): Segment index within block
- channel_indexes (list, optional): Specific channels
Returns:
int: Number of samples in the signal
"""
def get_analogsignal_chunk(self, block_index, seg_index,
i_start=None, i_stop=None,
channel_indexes=None):
"""
Get analog signal data chunk.
Parameters:
- block_index (int): Block index
- seg_index (int): Segment index
- i_start (int, optional): Start sample index
- i_stop (int, optional): Stop sample index
- channel_indexes (list, optional): Channel indices to read
Returns:
numpy.ndarray: Signal data chunk (samples x channels)
"""
def get_spike_timestamps(self, block_index, seg_index, unit_index):
"""
Get spike timestamps for a specific unit.
Parameters:
- block_index (int): Block index
- seg_index (int): Segment index
- unit_index (int): Unit/channel index
Returns:
numpy.ndarray: Spike timestamps
"""
def get_spike_raw_waveforms(self, block_index, seg_index, unit_index):
"""
Get raw spike waveform data.
Parameters:
- block_index (int): Block index
- seg_index (int): Segment index
- unit_index (int): Unit index
Returns:
numpy.ndarray: Waveform data (spikes x samples x channels)
"""Low-level access for major hardware manufacturer formats.
class AxonRawIO:
"""
Low-level Axon file format access (.abf).
Provides efficient access to Molecular Devices pCLAMP and AxoScope
files with direct header parsing and signal chunk retrieval.
"""
def __init__(self, filename): ...
def parse_header(self): ...
def get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): ...
# Axon-specific properties
header: dict # Parsed ABF header information
sampling_rate: float # Sampling rate in Hz
protocol_info: dict # Protocol and stimulus information
class BlackrockRawIO:
"""Low-level Blackrock Microsystems file access (.nev, .ns1-.ns6)."""
def __init__(self, filename): ...
def parse_header(self): ...
def get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): ...
def get_spike_timestamps(self, block_index, seg_index, unit_index): ...
class PlexonRawIO:
"""Low-level Plexon file format access (.plx)."""
def __init__(self, filename): ...
def parse_header(self): ...
def get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): ...
def get_spike_timestamps(self, block_index, seg_index, unit_index): ...
def get_spike_raw_waveforms(self, block_index, seg_index, unit_index): ...
class Plexon2RawIO:
"""Low-level Plexon2 file format access (.pl2)."""
def __init__(self, filename): ...
def parse_header(self): ...
def get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): ...
class IntanRawIO:
"""Low-level Intan Technologies file access (.rhd, .rhs)."""
def __init__(self, filename): ...
def parse_header(self): ...
def get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): ...
class NeuralynxRawIO:
"""Low-level Neuralynx file format access (.ncs, .nev, .nse, .ntt)."""
def __init__(self, dirname): ...
def parse_header(self): ...
def get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): ...
def get_spike_timestamps(self, block_index, seg_index, unit_index): ...
class TdtRawIO:
"""Low-level Tucker-Davis Technologies file access (.tev, .tsq, .tnt)."""
def __init__(self, dirname): ...
def parse_header(self): ...
def get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): ...
class Spike2RawIO:
"""Low-level Cambridge Electronic Design Spike2 access (.smr, .son)."""
def __init__(self, filename): ...
def parse_header(self): ...
def get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): ...Specialized low-level access for high-channel-count MEA and probe recordings.
class BiocamRawIO:
"""Low-level 3Brain Biocam file access (.brw, .bxr)."""
def __init__(self, filename): ...
def parse_header(self): ...
def get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): ...
class MaxwellRawIO:
"""Low-level Maxwell Biosystems file access (.raw.h5)."""
def __init__(self, filename): ...
def parse_header(self): ...
def get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): ...
class NeuroNexusRawIO:
"""Low-level NeuroNexus probe file access (.xdaq)."""
def __init__(self, filename): ...
def parse_header(self): ...
def get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): ...
class SpikeGadgetsRawIO:
"""Low-level SpikeGadgets file access (.rec, .mda)."""
def __init__(self, filename): ...
def parse_header(self): ...
def get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): ...
class RawMCSRawIO:
"""Low-level Multi Channel Systems raw file access (.raw)."""
def __init__(self, filename): ...
def parse_header(self): ...
def get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): ...Low-level access for open source and analysis software formats.
class OpenEphysRawIO:
"""Low-level Open Ephys GUI file access (.continuous, .events, .spikes)."""
def __init__(self, dirname): ...
def parse_header(self): ...
def get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): ...
class OpenEphysBinaryRawIO:
"""Low-level Open Ephys binary file access (.dat)."""
def __init__(self, dirname): ...
def parse_header(self): ...
def get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): ...
class PhyRawIO:
"""Low-level Phy spike sorting file access (params.py, cluster_info.tsv)."""
def __init__(self, dirname): ...
def parse_header(self): ...
def get_spike_timestamps(self, block_index, seg_index, unit_index): ...
def get_spike_raw_waveforms(self, block_index, seg_index, unit_index): ...
class SpikeGLXRawIO:
"""Low-level SpikeGLX acquisition file access (.bin, .meta)."""
def __init__(self, dirname): ...
def parse_header(self): ...
def get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): ...Low-level access for industry-standard and research formats.
class EDFRawIO:
"""Low-level European Data Format access (.edf, .bdf)."""
def __init__(self, filename): ...
def parse_header(self): ...
def get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): ...
class NIXRawIO:
"""Low-level NIX format access (.nix)."""
def __init__(self, filename): ...
def parse_header(self): ...
def get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): ...
class RawBinarySignalRawIO:
"""
Low-level raw binary signal file access (.raw, .bin).
Flexible access to custom binary formats with user-specified
data layout parameters.
"""
def __init__(self, filename, dtype='float32', sampling_rate=1.0,
nb_channel=1, signal_gain=1.0, signal_offset=0.0): ...
def parse_header(self): ...
def get_analogsignal_chunk(self, block_index, seg_index, i_start, i_stop, channel_indexes): ...import neo.rawio
import numpy as np
# Auto-detect and create RawIO instance
rawio = neo.rawio.get_rawio('recording.abf')
# Parse file header
rawio.parse_header()
# Examine file structure
print(f"Number of blocks: {rawio.block_count()}")
print(f"Number of segments: {rawio.segment_count(0)}")
print(f"Number of channels: {rawio.signal_channels_count()}")
print(f"Sampling rate: {rawio.get_signal_sampling_rate()}")
# Get channel information
signal_channels = rawio.header['signal_channels']
for i, ch in enumerate(signal_channels):
print(f"Channel {i}: {ch['name']} ({ch['units']})")# Get signal chunk without loading entire file
chunk = rawio.get_analogsignal_chunk(
block_index=0,
seg_index=0,
i_start=1000, # Start at sample 1000
i_stop=2000, # End at sample 2000
channel_indexes=[0, 1, 2] # Only channels 0, 1, 2
)
print(f"Chunk shape: {chunk.shape}") # (1000 samples, 3 channels)
print(f"Data type: {chunk.dtype}")
print(f"Memory usage: {chunk.nbytes} bytes")
# Stream processing of large files
chunk_size = 10000 # Process 10k samples at a time
total_samples = rawio.get_signal_size(0, 0)
n_chunks = total_samples // chunk_size
processed_data = []
for i in range(n_chunks):
i_start = i * chunk_size
i_stop = min((i + 1) * chunk_size, total_samples)
chunk = rawio.get_analogsignal_chunk(0, 0, i_start, i_stop)
# Process chunk (e.g., filter, downsample, etc.)
processed_chunk = np.mean(chunk, axis=1) # Example: channel average
processed_data.append(processed_chunk)
final_result = np.concatenate(processed_data)
print(f"Processed {len(final_result)} samples")# Access spike data (for formats that support it)
if rawio.spike_channels_count() > 0:
spike_channels = rawio.header['spike_channels']
for unit_index in range(len(spike_channels)):
# Get spike timestamps
spike_times = rawio.get_spike_timestamps(
block_index=0,
seg_index=0,
unit_index=unit_index
)
print(f"Unit {unit_index}: {len(spike_times)} spikes")
# Get spike waveforms if available
if hasattr(rawio, 'get_spike_raw_waveforms'):
waveforms = rawio.get_spike_raw_waveforms(0, 0, unit_index)
if waveforms is not None:
print(f" Waveform shape: {waveforms.shape}")def analyze_signal_statistics(rawio, block_index=0, seg_index=0):
"""Compute signal statistics without loading entire file."""
total_samples = rawio.get_signal_size(block_index, seg_index)
n_channels = rawio.signal_channels_count()
chunk_size = 50000 # Adjust based on available memory
# Initialize accumulators
running_sum = np.zeros(n_channels)
running_sum_squares = np.zeros(n_channels)
sample_count = 0
# Process file in chunks
for i_start in range(0, total_samples, chunk_size):
i_stop = min(i_start + chunk_size, total_samples)
chunk = rawio.get_analogsignal_chunk(
block_index, seg_index, i_start, i_stop
)
# Update statistics
running_sum += np.sum(chunk, axis=0)
running_sum_squares += np.sum(chunk**2, axis=0)
sample_count += chunk.shape[0]
# Compute final statistics
means = running_sum / sample_count
variances = (running_sum_squares / sample_count) - means**2
stds = np.sqrt(variances)
return means, stds
# Use the function
means, stds = analyze_signal_statistics(rawio)
print(f"Channel means: {means}")
print(f"Channel standard deviations: {stds}")def simulate_realtime_processing(rawio, chunk_duration=0.1):
"""Simulate real-time processing by reading time-based chunks."""
sampling_rate = rawio.get_signal_sampling_rate()
chunk_samples = int(chunk_duration * sampling_rate)
total_samples = rawio.get_signal_size(0, 0)
print(f"Simulating real-time processing:")
print(f" Chunk duration: {chunk_duration}s")
print(f" Chunk size: {chunk_samples} samples")
print(f" Total duration: {total_samples / sampling_rate:.2f}s")
for i_start in range(0, total_samples, chunk_samples):
i_stop = min(i_start + chunk_samples, total_samples)
# Read chunk
chunk = rawio.get_analogsignal_chunk(0, 0, i_start, i_stop)
# Simulate processing time
import time
processing_start = time.time()
# Example processing: detect threshold crossings
threshold = 3 * np.std(chunk, axis=0)
crossings = np.sum(np.abs(chunk) > threshold, axis=0)
processing_time = time.time() - processing_start
chunk_time = i_start / sampling_rate
print(f" t={chunk_time:.1f}s: {crossings.sum()} crossings, "
f"processed in {processing_time*1000:.1f}ms")
simulate_realtime_processing(rawio, chunk_duration=1.0)# Axon-specific operations
if isinstance(rawio, neo.rawio.AxonRawIO):
print("Axon ABF file detected")
print(f"Protocol name: {rawio.header.get('protocol', 'Unknown')}")
# Access ABF-specific metadata
if hasattr(rawio, '_axon_info'):
axon_info = rawio._axon_info
print(f"Episode count: {axon_info.get('lEpisodesPerRun', 'Unknown')}")
print(f"Sample interval: {axon_info.get('fADCSampleInterval', 'Unknown')} μs")
# Blackrock-specific operations
elif isinstance(rawio, neo.rawio.BlackrockRawIO):
print("Blackrock file detected")
# Access NEV event data if available
if rawio.event_channels_count() > 0:
events = rawio.get_event_timestamps(0, 0, 0)
print(f"Found {len(events)} digital events")
# SpikeGLX-specific operations
elif isinstance(rawio, neo.rawio.SpikeGLXRawIO):
print("SpikeGLX file detected")
# Access probe geometry information
if hasattr(rawio, '_geometry'):
geometry = rawio._geometry
print(f"Probe has {len(geometry)} recording sites")try:
rawio = neo.rawio.get_rawio('data_file.ns5')
rawio.parse_header()
except Exception as e:
print(f"Failed to open file: {e}")
# Validate file integrity
try:
# Test reading small chunk
test_chunk = rawio.get_analogsignal_chunk(0, 0, 0, 100)
print(f"File validation successful: {test_chunk.shape}")
except Exception as e:
print(f"File validation failed: {e}")
# Check for required capabilities
if rawio.signal_channels_count() == 0:
print("Warning: No analog signal channels found")
if rawio.spike_channels_count() == 0:
print("Warning: No spike channels found")
# Memory usage estimation
total_samples = rawio.get_signal_size(0, 0)
n_channels = rawio.signal_channels_count()
bytes_per_sample = 4 # Assuming float32
total_memory = total_samples * n_channels * bytes_per_sample
print(f"Full file would require {total_memory / 1e9:.2f} GB memory")# RawIO class types
RawIOClass = type[BaseRawIO] # Base RawIO class type
RawIOInstance = BaseRawIO # RawIO instance type
# Index and size types
BlockIndex = int # Block index in file
SegmentIndex = int # Segment index within block
ChannelIndex = int # Channel index
SampleIndex = int # Sample index
ChannelIndexes = list[int] | slice # Channel selection
SampleRange = tuple[int, int] # Sample start and stop indices
# Data chunk types
SignalChunk = np.ndarray # Signal data chunk (samples x channels)
SpikeTimestamps = np.ndarray # Spike timestamp array
SpikeWaveforms = np.ndarray # Spike waveform data (spikes x samples x channels)
EventData = np.ndarray # Event timestamp and code data
# Header and metadata types
HeaderDict = dict[str, Any] # Parsed header information
ChannelInfo = dict[str, Any] # Channel metadata
SamplingRate = float # Sampling rate in Hz
SignalUnits = str # Physical units of signal
# File format types
FileName = str # File path string
DirectoryName = str # Directory path string (for multi-file formats)
FileExtension = str # File extension stringInstall with Tessl CLI
npx tessl i tessl/pypi-neo