Comprehensive Python toolkit for working with genome annotation files in GFF3, GTF, and TBL formats with format conversion and analysis capabilities
Comprehensive file handling utilities, data validation functions, and annotation statistics calculation with support for compressed formats, flexible I/O operations, and robust error handling.
Advanced file handling with automatic compression detection and support for various formats.
def zopen(filename, mode="r", buff=1024*1024, external=PARALLEL):
"""
Open files with automatic compression support.
Parameters:
- filename (str): Path to file (supports .gz, .bz2, .xz)
- mode (str): File opening mode ("r", "w", "a")
- buff (int): Buffer size for reading
- external (int): Compression tool selection (NORMAL=0, PROCESS=1, PARALLEL=2)
Returns:
file-like: File handle for reading/writing
"""
def open_pipe(command, mode="r", buff=1024*1024):
"""
Open command as pipe for reading/writing.
Parameters:
- command (str): Shell command to execute
- mode (str): Pipe mode ("r" or "w")
- buff (int): Buffer size
Returns:
file-like: Pipe handle
"""
def open_gz(filename, mode="r", buff=1024*1024, external=PARALLEL):
"""
Open gzipped files with optional external tool support.
Parameters:
- filename (str): Path to .gz file
- mode (str): File opening mode
- buff (int): Buffer size
- external (int): Compression tool selection (NORMAL=0, PROCESS=1, PARALLEL=2)
Returns:
file-like: Gzipped file handle
"""
def open_bz2(filename, mode="r", buff=1024*1024, external=PARALLEL):
"""
Open bz2 compressed files.
Parameters:
- filename (str): Path to .bz2 file
- mode (str): File opening mode
- buff (int): Buffer size
- external (int): Compression tool selection (NORMAL=0, PROCESS=1, PARALLEL=2)
Returns:
file-like: Bz2 file handle
"""
def open_xz(filename, mode="r", buff=1024*1024, external=PARALLEL):
"""
Open xz compressed files.
Parameters:
- filename (str): Path to .xz file
- mode (str): File opening mode
- buff (int): Buffer size
- external (int): Compression tool selection (NORMAL=0, PROCESS=1, PARALLEL=2)
Returns:
file-like: Xz file handle
"""Validate file existence, format, and properties before processing.
def check_inputs(inputs):
"""
Validate that input files exist and are accessible.
Parameters:
- inputs (list): List of file paths to check
Returns:
bool: True if all files exist, raises exception otherwise
"""
def is_file(f):
"""
Check if file exists and is readable.
Parameters:
- f (str): File path to check
Returns:
bool: True if file exists and is readable
"""
def is_gzipped(filepath):
"""
Check if file is gzipped by examining magic bytes.
Parameters:
- filepath (str): Path to file
Returns:
bool: True if file is gzipped
"""
def is_text_file(filepath):
"""
Check if file contains text data.
Parameters:
- filepath (str): Path to file
Returns:
bool: True if file appears to be text
"""
def check_file_type(filepath):
"""
Determine file type (text/gzipped/binary).
Parameters:
- filepath (str): Path to file
Returns:
str: File type ("text", "gzipped", "binary")
"""System-level utilities for program discovery and path resolution.
def which2(program):
"""
Find program executable in system PATH.
Parameters:
- program (str): Program name to search for
Returns:
str|None: Full path to executable or None if not found
"""Process and filter annotation data using flexible patterns.
def filter_annotations(annotations, grep=None, grepv=None):
"""
Filter annotation dictionary using regex patterns.
Parameters:
- annotations (dict): Annotation dictionary to filter
- grep (list): Patterns to keep (include matches)
- grepv (list): Patterns to exclude (remove matches)
Returns:
dict: Filtered annotation dictionary
"""
def readBlocks(source, pattern):
"""
Read file in blocks separated by pattern.
Parameters:
- source (str): File path or file handle
- pattern (str): Regex pattern for block separation
Yields:
str: Text blocks between pattern matches
"""
def readBlocks2(source, startpattern, endpattern):
"""
Read file in blocks defined by start and end patterns.
Parameters:
- source (str): File path or file handle
- startpattern (str): Regex pattern for block start
- endpattern (str): Regex pattern for block end
Yields:
str: Text blocks between start and end patterns
"""Calculate comprehensive statistics from annotation data.
def annotation_stats(Genes):
"""
Calculate comprehensive annotation statistics.
Parameters:
- Genes (dict): Annotation dictionary to analyze
Returns:
dict: Statistics including:
- gene_count: Total number of genes
- transcript_count: Total number of transcripts
- avg_transcripts_per_gene: Average transcripts per gene
- protein_coding_genes: Number of protein-coding genes
- functional_annotation_counts: GO terms, EC numbers, etc.
- exon_statistics: Average exon counts and lengths
- intron_statistics: Average intron counts and lengths
- strand_distribution: Plus/minus strand counts
- contig_distribution: Genes per chromosome/contig
"""File opening mode constants for different compression handling approaches.
NORMAL = 0 # Standard file opening
PROCESS = 1 # Process-based file opening
PARALLEL = 2 # Parallel file processing modefrom gfftk.utils import zopen, check_inputs, is_gzipped
# Check files before processing
input_files = ["annotation.gff3", "genome.fasta.gz", "proteins.faa"]
if check_inputs(input_files):
print("All input files found")
# Open files with automatic compression handling
with zopen("large_annotation.gff3.gz", "r") as f:
for line in f:
if line.startswith("##"):
continue
# Process GFF3 lines
# Check file properties
if is_gzipped("genome.fasta.gz"):
print("Genome file is compressed")from gfftk.utils import filter_annotations
from gfftk.gff import gff2dict
# Load annotation
annotation = gff2dict("annotation.gff3", "genome.fasta")
# Filter for kinase genes (case-insensitive)
kinases = filter_annotations(
annotation,
grep=["product:kinase:i"]
)
# Remove pseudogenes and keep only protein-coding
filtered = filter_annotations(
annotation,
grep=["type:mRNA"],
grepv=["product:pseudogene", "note:partial"]
)
print(f"Found {len(kinases)} kinase genes")
print(f"Filtered to {len(filtered)} protein-coding genes")from gfftk.utils import annotation_stats
from gfftk.gff import gff2dict
# Load annotation data
annotation = gff2dict("annotation.gff3", "genome.fasta")
# Calculate comprehensive statistics
stats = annotation_stats(annotation)
print(f"Genome Annotation Statistics:")
print(f"Total genes: {stats['gene_count']}")
print(f"Total transcripts: {stats['transcript_count']}")
print(f"Avg transcripts per gene: {stats['avg_transcripts_per_gene']:.2f}")
print(f"Protein-coding genes: {stats['protein_coding_genes']}")
if 'functional_annotation_counts' in stats:
func_stats = stats['functional_annotation_counts']
print(f"Genes with GO terms: {func_stats.get('go_terms', 0)}")
print(f"Genes with EC numbers: {func_stats.get('ec_numbers', 0)}")
if 'strand_distribution' in stats:
strand_stats = stats['strand_distribution']
print(f"Plus strand genes: {strand_stats.get('+', 0)}")
print(f"Minus strand genes: {strand_stats.get('-', 0)}")from gfftk.utils import readBlocks, readBlocks2
# Read FASTA file by sequences
for sequence_block in readBlocks("genome.fasta", r"^>"):
lines = sequence_block.strip().split('\n')
if lines:
header = lines[0]
sequence = ''.join(lines[1:])
print(f"Sequence: {header}, Length: {len(sequence)}")
# Read structured file with start/end markers
for block in readBlocks2("structured_data.txt", r"^START", r"^END"):
# Process data between START and END markers
process_data_block(block)from gfftk.utils import which2, open_pipe
# Check for external tools
if which2("blastp"):
print("BLAST+ is available")
if which2("diamond"):
print("Diamond is available for faster searches")
# Use external tools via pipes
with open_pipe("grep '^>' genome.fasta | wc -l", "r") as p:
sequence_count = int(p.read().strip())
print(f"Genome has {sequence_count} sequences")from gfftk.utils import zopen, filter_annotations, annotation_stats
from gfftk.gff import gff2dict
import os
def process_annotation_files(input_dir, output_dir, filters=None):
"""Process multiple annotation files with filtering and statistics."""
os.makedirs(output_dir, exist_ok=True)
results = {}
for filename in os.listdir(input_dir):
if filename.endswith(('.gff3', '.gff3.gz')):
print(f"Processing {filename}...")
# Load annotation
input_path = os.path.join(input_dir, filename)
genome_path = os.path.join(input_dir, "genome.fasta")
annotation = gff2dict(input_path, genome_path)
# Apply filters if provided
if filters:
annotation = filter_annotations(
annotation,
grep=filters.get('grep'),
grepv=filters.get('grepv')
)
# Calculate statistics
stats = annotation_stats(annotation)
# Write filtered annotation
base_name = filename.replace('.gz', '').replace('.gff3', '')
output_path = os.path.join(output_dir, f"{base_name}_filtered.gff3")
from gfftk.gff import dict2gff3
dict2gff3(annotation, output=output_path)
results[filename] = {
'stats': stats,
'output_file': output_path
}
return results
# Example usage
filters = {
'grep': ['type:mRNA'], # Keep only mRNA features
'grepv': ['product:hypothetical'] # Remove hypothetical proteins
}
results = process_annotation_files(
input_dir="raw_annotations/",
output_dir="filtered_annotations/",
filters=filters
)# File opening modes
FileOpeningMode = int # NORMAL, PROCESS, or PARALLEL
# File type detection result
FileType = str # "text", "gzipped", "binary"
# Filter pattern for annotations
FilterPattern = str # Format: "key:pattern" or "key:pattern:flags"
# Annotation statistics structure
AnnotationStats = {
"gene_count": int,
"transcript_count": int,
"avg_transcripts_per_gene": float,
"protein_coding_genes": int,
"pseudogenes": int,
"functional_annotation_counts": dict,
"exon_statistics": dict,
"intron_statistics": dict,
"strand_distribution": dict,
"contig_distribution": dict,
"length_statistics": dict
}
# Functional annotation counts
FunctionalStats = {
"go_terms": int,
"ec_numbers": int,
"db_xrefs": int,
"product_descriptions": int,
"gene_names": int
}
# Structural statistics
StructuralStats = {
"avg_exons_per_transcript": float,
"avg_exon_length": float,
"avg_introns_per_transcript": float,
"avg_intron_length": float,
"avg_cds_length": float,
"avg_protein_length": float
}
# Block reading generator type
BlockGenerator = Iterator[str] # Generator yielding text blocksInstall with Tessl CLI
npx tessl i tessl/pypi-gfftk