or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

dataset-conversions.mdindex.mdjava-api.mdpython-integration.mdrdd-conversions.md
tile.json

python-integration.mddocs/

Python Integration

DataFrame wrapper functionality for Python access to ADAM's data conversion capabilities through PySpark integration. The DataFrameConversionWrapper enables Python developers to leverage ADAM's genomic data processing features within Python data science workflows.

Capabilities

DataFrameConversionWrapper Class

Primary wrapper class for Python integration with ADAM's DataFrame conversion system.

/**
 * Wrapper class for Python API DataFrame operations.
 * Implements Java Function interface for PySpark compatibility.
 */
class DataFrameConversionWrapper(newDf: DataFrame) 
    extends JFunction[DataFrame, DataFrame] {
    
    /**
     * Convert input DataFrame to target DataFrame.
     * @param v1 Input DataFrame containing source genomic data
     * @return Target DataFrame with converted genomic data
     */
    def call(v1: DataFrame): DataFrame
}

Constructor Parameters:

  • newDf: DataFrame - The target DataFrame defining the output schema and data

Usage Example in Scala (for Java/Scala integration):

import org.bdgenomics.adam.api.python.DataFrameConversionWrapper
import org.apache.spark.sql.DataFrame

// Create target DataFrame with desired schema
val targetSchema = spark.emptyDataFrame // or specific schema
val targetDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], targetSchema)

// Create wrapper for conversion
val wrapper = new DataFrameConversionWrapper(targetDF)

// Convert source DataFrame
val sourceDF: DataFrame = // ... source genomic data as DataFrame
val convertedDF: DataFrame = wrapper.call(sourceDF)

Python Integration Patterns

PySpark Integration

The DataFrameConversionWrapper integrates with PySpark through the Java Function interface, enabling seamless use within Python data science workflows.

Typical Python Usage Pattern:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Initialize Spark session with ADAM dependencies
spark = SparkSession.builder \
    .appName("ADAM Python API") \
    .config("spark.jars", "adam-apis_2.11-0.23.0.jar") \
    .getOrCreate()

# Access ADAM's JavaADAMContext from Python
java_adam_context = spark._jvm.org.bdgenomics.adam.api.java.JavaADAMContext
jac = java_adam_context(spark._jsc.sc())

# Load genomic data using Java API
alignments_rdd = jac.loadAlignments("input.bam")

# Convert to DataFrame for Python processing
alignments_df = alignments_rdd.toDF()

# Define target schema for conversion
target_schema = StructType([
    StructField("contigName", StringType(), True),
    StructField("start", IntegerType(), True),
    StructField("end", IntegerType(), True)
])

# Create empty target DataFrame
target_df = spark.createDataFrame([], target_schema)

# Create conversion wrapper
conversion_wrapper = spark._jvm.org.bdgenomics.adam.api.python.DataFrameConversionWrapper(target_df._jdf)

# Perform conversion
converted_df = conversion_wrapper.call(alignments_df._jdf)

# Convert back to Python DataFrame for further processing
python_df = spark.sql("SELECT * FROM converted_data")

Data Science Workflow Integration

The Python integration enables ADAM to be used within broader Python data science ecosystems including pandas, NumPy, and scikit-learn.

Pandas Integration Example:

# Convert Spark DataFrame to Pandas for local processing
pandas_df = converted_df.toPandas()

# Use with standard Python data science libraries
import pandas as pd
import numpy as np
from sklearn.cluster import KMeans

# Perform genomic data analysis with pandas
genomic_features = pandas_df.groupby('contigName').agg({
    'start': ['min', 'max', 'count'],
    'end': ['min', 'max']
}).flatten()

# Apply machine learning algorithms
clustering_features = pandas_df[['start', 'end']].values
kmeans = KMeans(n_clusters=5)
clusters = kmeans.fit_predict(clustering_features)
pandas_df['cluster'] = clusters

Jupyter Notebook Integration

The Python integration works seamlessly within Jupyter notebooks for interactive genomic data exploration.

Notebook Usage Example:

# Cell 1: Setup
import pyspark
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import seaborn as sns

spark = SparkSession.builder \
    .appName("Genomic Data Analysis") \
    .config("spark.jars", "adam-apis_2.11-0.23.0.jar") \
    .getOrCreate()

# Cell 2: Load and convert data
jac = spark._jvm.org.bdgenomics.adam.api.java.JavaADAMContext(spark._jsc.sc())
variants = jac.loadVariants("variants.vcf")
variants_df = variants.toDF().toPandas()

# Cell 3: Visualize genomic data
plt.figure(figsize=(12, 6))
sns.histplot(data=variants_df, x='start', bins=50)
plt.title('Distribution of Variant Positions')
plt.xlabel('Genomic Position')
plt.ylabel('Count')
plt.show()

# Cell 4: Statistical analysis
summary_stats = variants_df.groupby('contigName').agg({
    'start': ['count', 'min', 'max'],
    'referenceAllele': lambda x: x.value_counts().head()
})
print(summary_stats)

Architecture and Design

Java Function Interface

The DataFrameConversionWrapper implements Spark's Java Function interface to ensure compatibility with PySpark's Java interop layer.

// Extends Spark's Java Function interface for PySpark compatibility
import org.apache.spark.api.java.function.{ Function => JFunction }

class DataFrameConversionWrapper(newDf: DataFrame) 
    extends JFunction[DataFrame, DataFrame]

DataFrame Schema Handling

The wrapper handles DataFrame schema transformations between different genomic data types while preserving data integrity and type safety.

Schema Preservation Features:

  • Column mapping: Automatic mapping between compatible schema fields
  • Type conversion: Safe conversion between compatible data types
  • Null handling: Proper handling of missing or null genomic data fields
  • Metadata retention: Preservation of DataFrame metadata and column descriptions

Memory Management

The Python integration provides efficient memory management for large genomic datasets:

  • Lazy evaluation: DataFrame operations are lazily evaluated until action is called
  • Partition-aware processing: Maintains Spark's distributed processing model
  • Garbage collection: Proper cleanup of temporary Java objects in Python environment
  • Streaming support: Compatible with Spark's structured streaming for real-time genomic data

Performance Considerations

Python-JVM Bridge Overhead

  • Serialization costs: Data serialization between Python and JVM adds overhead
  • Batch processing: Process data in larger batches to amortize serialization costs
  • DataFrame caching: Cache frequently accessed DataFrames to avoid repeated conversions
  • Column pruning: Select only necessary columns before DataFrame operations

Optimization Strategies

# Optimize DataFrame operations for genomic data
genomic_df = spark.sql("""
    SELECT contigName, start, end, referenceAllele, alternateAllele
    FROM genomic_variants 
    WHERE contigName IN ('chr1', 'chr2', 'chr3')
    AND start BETWEEN 1000000 AND 2000000
""").cache()  # Cache for multiple operations

# Use vectorized operations where possible
import pyspark.sql.functions as F
processed_df = genomic_df \
    .withColumn("variant_length", F.col("end") - F.col("start")) \
    .withColumn("is_snp", F.col("referenceAllele").rlike("^[ATCG]$"))

Error Handling

Common errors and troubleshooting:

  • ClassNotFoundException: Ensure ADAM JAR files are properly included in Spark classpath
  • SerializationException: Verify DataFrame schemas are compatible between source and target
  • OutOfMemoryError: Increase driver and executor memory for large genomic datasets
  • PySparkTypeError: Ensure proper type conversion between Python and Scala data types