Java/Python API wrappers for ADAM genomics analysis library enabling scalable genomic data processing with Apache Spark
—
ADAM APIs provides Python integration capabilities through DataFrame conversion wrappers, enabling Python developers to work with genomic data processing workflows using familiar DataFrame operations.
Simple wrapper class that implements the Java Function interface for converting between Spark DataFrames in Python environments.
/**
* Wrapper for Python API DataFrame conversions
* Implements JFunction interface for Spark transformations in Python
*/
class DataFrameConversionWrapper implements JFunction<DataFrame, DataFrame> {
/**
* Creates a conversion wrapper with the target DataFrame
* @param newDf The DataFrame to return from conversion operations
*/
DataFrameConversionWrapper(DataFrame newDf);
/**
* Function interface method for DataFrame conversion
* @param v1 Input DataFrame (typically ignored)
* @return The wrapped DataFrame specified in constructor
*/
DataFrame call(DataFrame v1);
}Basic Python integration setup:
from pyspark.sql import SparkSession
from pyspark import SparkContext
# Create Spark session
spark = SparkSession.builder \
.appName("ADAM Python API") \
.getOrCreate()
sc = spark.sparkContext
# Import Java classes for ADAM
adam_context_class = sc._jvm.org.bdgenomics.adam.rdd.ADAMContext
java_adam_context_class = sc._jvm.org.bdgenomics.adam.api.java.JavaADAMContext
# Create ADAM context
adam_context = adam_context_class(sc._jsc.sc())
java_adam_context = java_adam_context_class(adam_context)Loading genomic data in Python:
# Load alignment data
alignments_java = java_adam_context.loadAlignments("sample.bam")
# Convert Java RDD to Python DataFrame
alignments_df = alignments_java.toDF()
# Work with DataFrame in Python
filtered_alignments = alignments_df.filter(
alignments_df.mapq > 30
).filter(
alignments_df.readMapped == True
)
print(f"Filtered alignments count: {filtered_alignments.count()}")Using DataFrame conversion wrappers:
# Import conversion wrapper
wrapper_class = sc._jvm.org.bdgenomics.adam.api.python.DataFrameConversionWrapper
# Create processed DataFrame
processed_df = filtered_alignments.select("readName", "contigName", "start", "end")
# Create conversion wrapper
wrapper = wrapper_class(processed_df._jdf)
# Use wrapper in Java-side transformations
# (This pattern is typically used internally by higher-level Python APIs)
converted_df = wrapper.call(alignments_df._jdf)Advanced genomic analysis in Python:
# Load multiple genomic data types
variants_java = java_adam_context.loadVariants("variants.vcf")
features_java = java_adam_context.loadFeatures("annotations.bed")
# Convert to DataFrames
variants_df = variants_java.toDF()
features_df = features_java.toDF()
# Perform genomic analysis with Spark SQL
variants_df.createOrReplaceTempView("variants")
features_df.createOrReplaceTempView("features")
# Find variants in annotated regions
annotated_variants = spark.sql("""
SELECT v.*, f.featureType, f.name as gene_name
FROM variants v
JOIN features f ON (
v.contigName = f.contigName AND
v.start >= f.start AND
v.end <= f.end
)
WHERE v.qual > 30
""")
print("Annotated variants schema:")
annotated_variants.printSchema()
annotated_variants.show(10)Coverage analysis workflow:
# Load alignment data and convert to coverage
alignments_java = java_adam_context.loadAlignments("sample.bam")
coverage_java = java_adam_context.loadCoverage("coverage.bed")
# Convert to DataFrames for analysis
alignments_df = alignments_java.toDF()
coverage_df = coverage_java.toDF()
# Calculate coverage statistics
coverage_stats = coverage_df.agg({
"score": "avg",
"score": "max",
"score": "min"
}).collect()[0]
print(f"Average coverage: {coverage_stats['avg(score)']:.2f}")
print(f"Maximum coverage: {coverage_stats['max(score)']}")
print(f"Minimum coverage: {coverage_stats['min(score)']}")
# Find high-coverage regions
high_coverage = coverage_df.filter(coverage_df.score > coverage_stats['avg(score)'] * 2)
high_coverage.select("contigName", "start", "end", "score").show()Working with genotype data:
# Load genotype data
genotypes_java = java_adam_context.loadGenotypes("samples.vcf")
genotypes_df = genotypes_java.toDF()
# Analyze genotype quality and coverage
quality_analysis = genotypes_df.groupBy("sampleId") \
.agg({
"genotypeQuality": "avg",
"readDepth": "avg",
"sampleId": "count"
}) \
.withColumnRenamed("count(sampleId)", "variant_count") \
.withColumnRenamed("avg(genotypeQuality)", "avg_quality") \
.withColumnRenamed("avg(readDepth)", "avg_depth")
print("Per-sample genotype statistics:")
quality_analysis.show()
# Find samples with high-quality genotypes
high_quality_samples = quality_analysis.filter(
(quality_analysis.avg_quality > 50) &
(quality_analysis.avg_depth > 20)
)
print("High-quality samples:")
high_quality_samples.show()Fragment analysis:
# Load fragment data (paired-end reads)
fragments_java = java_adam_context.loadFragments("paired_reads.bam")
fragments_df = fragments_java.toDF()
# Analyze insert size distribution
insert_size_stats = fragments_df.select("insertSize") \
.filter(fragments_df.insertSize.isNotNull()) \
.filter(fragments_df.insertSize > 0) \
.agg({
"insertSize": "avg",
"insertSize": "stddev",
"insertSize": "min",
"insertSize": "max"
}).collect()[0]
print("Insert size statistics:")
print(f"Mean: {insert_size_stats['avg(insertSize)']:.2f}")
print(f"Std Dev: {insert_size_stats['stddev_samp(insertSize)']:.2f}")
print(f"Min: {insert_size_stats['min(insertSize)']}")
print(f"Max: {insert_size_stats['max(insertSize)']}")
# Plot insert size distribution (if matplotlib available)
try:
import matplotlib.pyplot as plt
import pandas as pd
insert_sizes = fragments_df.select("insertSize") \
.filter(fragments_df.insertSize.between(50, 800)) \
.toPandas()
plt.figure(figsize=(10, 6))
plt.hist(insert_sizes['insertSize'], bins=50, alpha=0.7)
plt.xlabel('Insert Size (bp)')
plt.ylabel('Frequency')
plt.title('Insert Size Distribution')
plt.grid(True, alpha=0.3)
plt.show()
except ImportError:
print("Matplotlib not available for plotting")Bridging Java and Python APIs:
def create_adam_context(spark_session):
"""Helper function to create ADAM context from Spark session"""
sc = spark_session.sparkContext
adam_context = sc._jvm.org.bdgenomics.adam.rdd.ADAMContext(sc._jsc.sc())
return sc._jvm.org.bdgenomics.adam.api.java.JavaADAMContext(adam_context)
def load_genomic_data(adam_context, file_path, data_type="alignments"):
"""Generic genomic data loader"""
if data_type == "alignments":
return adam_context.loadAlignments(file_path)
elif data_type == "variants":
return adam_context.loadVariants(file_path)
elif data_type == "features":
return adam_context.loadFeatures(file_path)
elif data_type == "genotypes":
return adam_context.loadGenotypes(file_path)
else:
raise ValueError(f"Unsupported data type: {data_type}")
# Usage
adam_ctx = create_adam_context(spark)
alignments = load_genomic_data(adam_ctx, "sample.bam", "alignments")
alignments_df = alignments.toDF()Error handling in Python integration:
def safe_load_genomic_data(adam_context, file_path, data_type, stringency="LENIENT"):
"""Load genomic data with error handling"""
try:
# Get stringency enum
stringency_class = spark.sparkContext._jvm.htsjdk.samtools.ValidationStringency
stringency_val = getattr(stringency_class, stringency)
if data_type == "alignments":
return adam_context.loadAlignments(file_path, stringency_val)
elif data_type == "variants":
return adam_context.loadVariants(file_path, stringency_val)
# Add other types as needed
except Exception as e:
print(f"Error loading {data_type} from {file_path}: {str(e)}")
return None
# Usage with error handling
alignments = safe_load_genomic_data(adam_ctx, "potentially_corrupted.bam", "alignments", "LENIENT")
if alignments:
alignments_df = alignments.toDF()
print(f"Successfully loaded {alignments_df.count()} alignment records")Install with Tessl CLI
npx tessl i tessl/maven-org-bdgenomics-adam--adam-apis-2-10