Package for reading, manipulating, and writing genomic data
—
Support for reading and writing block gzip (BGZF) compressed files, the standard compression format used in genomics for SAM/BAM files and tabix-indexed files.
Interface for reading and writing BGZF compressed files with seeking and block-level access.
class BGZFile:
def __init__(self, filepath, mode, compresslevel=6, threads=1):
"""
Open BGZF compressed file.
Parameters:
- filepath: str, path to BGZF file
- mode: str, file mode ('r', 'w', 'rb', 'wb', 'rt', 'wt')
- compresslevel: int, compression level (0-9, default 6)
- threads: int, number of compression threads
Returns:
BGZFile object
"""
def read(self, size=-1):
"""
Read data from file.
Parameters:
- size: int, number of bytes to read (-1 for all)
Returns:
bytes or str, data read from file
"""
def readline(self, size=-1):
"""
Read single line from file.
Parameters:
- size: int, maximum bytes to read
Returns:
bytes or str, line data
"""
def readlines(self, hint=-1):
"""
Read all lines from file.
Parameters:
- hint: int, approximate number of bytes to read
Returns:
list, lines from file
"""
def write(self, data):
"""
Write data to file.
Parameters:
- data: bytes or str, data to write
Returns:
int, number of bytes written
"""
def writelines(self, lines):
"""
Write multiple lines to file.
Parameters:
- lines: iterable, lines to write
"""
def seek(self, offset, whence=0):
"""
Seek to position in file.
Parameters:
- offset: int, byte offset
- whence: int, seek reference (0=start, 1=current, 2=end)
Returns:
int, new file position
"""
def tell(self):
"""
Get current file position.
Returns:
int, current byte position
"""
def flush(self):
"""Flush write buffers."""
def close(self):
"""Close the file."""
def truncate(self, size=None):
"""
Truncate file to specified size.
Parameters:
- size: int, size in bytes (current position if None)
"""
# Properties
@property
def mode(self) -> str:
"""File mode."""
@property
def name(self) -> str:
"""File name."""
@property
def closed(self) -> bool:
"""True if file is closed."""
@property
def readable(self) -> bool:
"""True if file is readable."""
@property
def writable(self) -> bool:
"""True if file is writable."""
@property
def seekable(self) -> bool:
"""True if file supports seeking."""
# Context manager support
def __enter__(self):
"""Context manager entry."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
# Iterator support
def __iter__(self):
"""Iterate over lines."""
def __next__(self):
"""Get next line."""import pysam
# Reading BGZF files
with pysam.BGZFile("data.txt.gz", "rt") as infile:
# Read entire file
content = infile.read()
print(f"File content: {content}")
# Reading line by line
with pysam.BGZFile("data.txt.gz", "rt") as infile:
for line_num, line in enumerate(infile, 1):
print(f"Line {line_num}: {line.strip()}")
# Reading specific amount of data
with pysam.BGZFile("data.txt.gz", "rb") as infile:
chunk = infile.read(1024) # Read first 1KB
print(f"First chunk: {len(chunk)} bytes")import pysam
# Writing text data
with pysam.BGZFile("output.txt.gz", "wt", compresslevel=9) as outfile:
outfile.write("Header line\n")
for i in range(1000):
outfile.write(f"Data line {i}\n")
# Writing binary data
with pysam.BGZFile("output.bin.gz", "wb") as outfile:
data = b"Binary data chunk"
outfile.write(data)
# Writing with multiple threads for better compression speed
with pysam.BGZFile("large_output.txt.gz", "wt", threads=4) as outfile:
for i in range(1000000):
outfile.write(f"Large dataset line {i}\n")import pysam
# Seeking in BGZF files (supports random access)
with pysam.BGZFile("indexed_data.txt.gz", "rt") as infile:
# Read from beginning
first_line = infile.readline()
print(f"First line: {first_line.strip()}")
# Remember position
pos = infile.tell()
print(f"Current position: {pos}")
# Read more data
second_line = infile.readline()
# Seek back to previous position
infile.seek(pos)
# Read same line again
second_line_again = infile.readline()
assert second_line == second_line_again
# Seek to end and get file size
infile.seek(0, 2) # Seek to end
file_size = infile.tell()
print(f"File size: {file_size} bytes")import pysam
def process_large_bgzf_file(filename, chunk_size=8192):
"""Process large BGZF file in chunks to manage memory usage."""
with pysam.BGZFile(filename, "rt") as infile:
processed_lines = 0
while True:
chunk = infile.read(chunk_size)
if not chunk:
break
# Process chunk line by line
lines = chunk.split('\n')
# Handle partial line at end of chunk
if not chunk.endswith('\n') and lines:
# Save last partial line for next iteration
partial_line = lines[-1]
lines = lines[:-1]
# Seek back to start of partial line
infile.seek(infile.tell() - len(partial_line.encode()))
# Process complete lines
for line in lines:
if line.strip(): # Skip empty lines
# Process line here
processed_lines += 1
if processed_lines % 10000 == 0:
print(f"Processed {processed_lines} lines")
return processed_lines
# Usage
total_lines = process_large_bgzf_file("large_dataset.txt.gz")
print(f"Total processed lines: {total_lines}")import pysam
def compress_to_bgzf(input_file, output_file, compression_level=6):
"""Compress regular file to BGZF format."""
with open(input_file, 'rb') as infile:
with pysam.BGZFile(output_file, 'wb', compresslevel=compression_level) as outfile:
# Copy data in chunks
chunk_size = 64 * 1024 # 64KB chunks
while True:
chunk = infile.read(chunk_size)
if not chunk:
break
outfile.write(chunk)
def decompress_bgzf(input_file, output_file):
"""Decompress BGZF file to regular file."""
with pysam.BGZFile(input_file, 'rb') as infile:
with open(output_file, 'wb') as outfile:
# Copy data in chunks
chunk_size = 64 * 1024 # 64KB chunks
while True:
chunk = infile.read(chunk_size)
if not chunk:
break
outfile.write(chunk)
# Usage
compress_to_bgzf("large_file.txt", "large_file.txt.gz", compression_level=9)
decompress_bgzf("compressed_file.txt.gz", "decompressed_file.txt")import pysam
import os
def split_bgzf_file(input_file, output_prefix, lines_per_file=1000000):
"""Split large BGZF file into smaller files."""
file_count = 0
current_lines = 0
current_file = None
try:
with pysam.BGZFile(input_file, "rt") as infile:
for line in infile:
# Open new output file if needed
if current_lines == 0:
if current_file:
current_file.close()
file_count += 1
output_filename = f"{output_prefix}_{file_count:03d}.txt.gz"
current_file = pysam.BGZFile(output_filename, "wt")
# Write line to current file
current_file.write(line)
current_lines += 1
# Check if we need to start new file
if current_lines >= lines_per_file:
current_lines = 0
finally:
if current_file:
current_file.close()
return file_count
def merge_bgzf_files(input_files, output_file):
"""Merge multiple BGZF files into one."""
with pysam.BGZFile(output_file, "wt") as outfile:
for input_file in input_files:
with pysam.BGZFile(input_file, "rt") as infile:
# Copy all lines from input to output
for line in infile:
outfile.write(line)
# Usage
num_files = split_bgzf_file("huge_dataset.txt.gz", "split_part", 500000)
print(f"Split into {num_files} files")
# Merge them back
input_files = [f"split_part_{i:03d}.txt.gz" for i in range(1, num_files + 1)]
merge_bgzf_files(input_files, "merged_dataset.txt.gz")import pysam
# Create custom tabix-compatible file
def create_bed_file_with_bgzf(features, output_file):
"""Create sorted, BGZF-compressed BED file suitable for tabix indexing."""
# Sort features by chromosome and position
sorted_features = sorted(features, key=lambda x: (x['chrom'], x['start']))
# Write to BGZF file
with pysam.BGZFile(output_file, "wt") as outfile:
for feature in sorted_features:
line = f"{feature['chrom']}\t{feature['start']}\t{feature['end']}\t{feature['name']}\n"
outfile.write(line)
# Create tabix index
pysam.tabix_index(output_file, preset="bed")
# Example usage
features = [
{'chrom': 'chr1', 'start': 1000, 'end': 2000, 'name': 'feature1'},
{'chrom': 'chr1', 'start': 1500, 'end': 2500, 'name': 'feature2'},
{'chrom': 'chr2', 'start': 500, 'end': 1500, 'name': 'feature3'},
]
create_bed_file_with_bgzf(features, "features.bed.gz")
# Now can use with TabixFile
with pysam.TabixFile("features.bed.gz", parser=pysam.asBed()) as tabixfile:
for record in tabixfile.fetch("chr1", 1200, 1800):
print(f"Overlapping feature: {record.name}")threads parameter) can significantly speed up writingInstall with Tessl CLI
npx tessl i tessl/pypi-pysam