DataFrame wrapper functionality for Python access to ADAM's data conversion capabilities through PySpark integration. The DataFrameConversionWrapper enables Python developers to leverage ADAM's genomic data processing features within Python data science workflows.
Primary wrapper class for Python integration with ADAM's DataFrame conversion system.
/**
* Wrapper class for Python API DataFrame operations.
* Implements Java Function interface for PySpark compatibility.
*/
class DataFrameConversionWrapper(newDf: DataFrame)
extends JFunction[DataFrame, DataFrame] {
/**
* Convert input DataFrame to target DataFrame.
* @param v1 Input DataFrame containing source genomic data
* @return Target DataFrame with converted genomic data
*/
def call(v1: DataFrame): DataFrame
}Constructor Parameters:
newDf: DataFrame - The target DataFrame defining the output schema and dataUsage Example in Scala (for Java/Scala integration):
import org.bdgenomics.adam.api.python.DataFrameConversionWrapper
import org.apache.spark.sql.DataFrame
// Create target DataFrame with desired schema
val targetSchema = spark.emptyDataFrame // or specific schema
val targetDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], targetSchema)
// Create wrapper for conversion
val wrapper = new DataFrameConversionWrapper(targetDF)
// Convert source DataFrame
val sourceDF: DataFrame = // ... source genomic data as DataFrame
val convertedDF: DataFrame = wrapper.call(sourceDF)The DataFrameConversionWrapper integrates with PySpark through the Java Function interface, enabling seamless use within Python data science workflows.
Typical Python Usage Pattern:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Initialize Spark session with ADAM dependencies
spark = SparkSession.builder \
.appName("ADAM Python API") \
.config("spark.jars", "adam-apis_2.11-0.23.0.jar") \
.getOrCreate()
# Access ADAM's JavaADAMContext from Python
java_adam_context = spark._jvm.org.bdgenomics.adam.api.java.JavaADAMContext
jac = java_adam_context(spark._jsc.sc())
# Load genomic data using Java API
alignments_rdd = jac.loadAlignments("input.bam")
# Convert to DataFrame for Python processing
alignments_df = alignments_rdd.toDF()
# Define target schema for conversion
target_schema = StructType([
StructField("contigName", StringType(), True),
StructField("start", IntegerType(), True),
StructField("end", IntegerType(), True)
])
# Create empty target DataFrame
target_df = spark.createDataFrame([], target_schema)
# Create conversion wrapper
conversion_wrapper = spark._jvm.org.bdgenomics.adam.api.python.DataFrameConversionWrapper(target_df._jdf)
# Perform conversion
converted_df = conversion_wrapper.call(alignments_df._jdf)
# Convert back to Python DataFrame for further processing
python_df = spark.sql("SELECT * FROM converted_data")The Python integration enables ADAM to be used within broader Python data science ecosystems including pandas, NumPy, and scikit-learn.
Pandas Integration Example:
# Convert Spark DataFrame to Pandas for local processing
pandas_df = converted_df.toPandas()
# Use with standard Python data science libraries
import pandas as pd
import numpy as np
from sklearn.cluster import KMeans
# Perform genomic data analysis with pandas
genomic_features = pandas_df.groupby('contigName').agg({
'start': ['min', 'max', 'count'],
'end': ['min', 'max']
}).flatten()
# Apply machine learning algorithms
clustering_features = pandas_df[['start', 'end']].values
kmeans = KMeans(n_clusters=5)
clusters = kmeans.fit_predict(clustering_features)
pandas_df['cluster'] = clustersThe Python integration works seamlessly within Jupyter notebooks for interactive genomic data exploration.
Notebook Usage Example:
# Cell 1: Setup
import pyspark
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import seaborn as sns
spark = SparkSession.builder \
.appName("Genomic Data Analysis") \
.config("spark.jars", "adam-apis_2.11-0.23.0.jar") \
.getOrCreate()
# Cell 2: Load and convert data
jac = spark._jvm.org.bdgenomics.adam.api.java.JavaADAMContext(spark._jsc.sc())
variants = jac.loadVariants("variants.vcf")
variants_df = variants.toDF().toPandas()
# Cell 3: Visualize genomic data
plt.figure(figsize=(12, 6))
sns.histplot(data=variants_df, x='start', bins=50)
plt.title('Distribution of Variant Positions')
plt.xlabel('Genomic Position')
plt.ylabel('Count')
plt.show()
# Cell 4: Statistical analysis
summary_stats = variants_df.groupby('contigName').agg({
'start': ['count', 'min', 'max'],
'referenceAllele': lambda x: x.value_counts().head()
})
print(summary_stats)The DataFrameConversionWrapper implements Spark's Java Function interface to ensure compatibility with PySpark's Java interop layer.
// Extends Spark's Java Function interface for PySpark compatibility
import org.apache.spark.api.java.function.{ Function => JFunction }
class DataFrameConversionWrapper(newDf: DataFrame)
extends JFunction[DataFrame, DataFrame]The wrapper handles DataFrame schema transformations between different genomic data types while preserving data integrity and type safety.
Schema Preservation Features:
The Python integration provides efficient memory management for large genomic datasets:
# Optimize DataFrame operations for genomic data
genomic_df = spark.sql("""
SELECT contigName, start, end, referenceAllele, alternateAllele
FROM genomic_variants
WHERE contigName IN ('chr1', 'chr2', 'chr3')
AND start BETWEEN 1000000 AND 2000000
""").cache() # Cache for multiple operations
# Use vectorized operations where possible
import pyspark.sql.functions as F
processed_df = genomic_df \
.withColumn("variant_length", F.col("end") - F.col("start")) \
.withColumn("is_snp", F.col("referenceAllele").rlike("^[ATCG]$"))Common errors and troubleshooting: