0
# Python Integration
1
2
DataFrame wrapper functionality for Python access to ADAM's data conversion capabilities through PySpark integration. The DataFrameConversionWrapper enables Python developers to leverage ADAM's genomic data processing features within Python data science workflows.
3
4
## Capabilities
5
6
### DataFrameConversionWrapper Class
7
8
Primary wrapper class for Python integration with ADAM's DataFrame conversion system.
9
10
```scala { .api }
11
/**
12
* Wrapper class for Python API DataFrame operations.
13
* Implements Java Function interface for PySpark compatibility.
14
*/
15
class DataFrameConversionWrapper(newDf: DataFrame)
16
extends JFunction[DataFrame, DataFrame] {
17
18
/**
19
* Convert input DataFrame to target DataFrame.
20
* @param v1 Input DataFrame containing source genomic data
21
* @return Target DataFrame with converted genomic data
22
*/
23
def call(v1: DataFrame): DataFrame
24
}
25
```
26
27
**Constructor Parameters:**
28
- `newDf: DataFrame` - The target DataFrame defining the output schema and data
29
30
**Usage Example in Scala (for Java/Scala integration):**
31
32
```scala
33
import org.bdgenomics.adam.api.python.DataFrameConversionWrapper
34
import org.apache.spark.sql.DataFrame
35
36
// Create target DataFrame with desired schema
37
val targetSchema = spark.emptyDataFrame // or specific schema
38
val targetDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], targetSchema)
39
40
// Create wrapper for conversion
41
val wrapper = new DataFrameConversionWrapper(targetDF)
42
43
// Convert source DataFrame
44
val sourceDF: DataFrame = // ... source genomic data as DataFrame
45
val convertedDF: DataFrame = wrapper.call(sourceDF)
46
```
47
48
## Python Integration Patterns
49
50
### PySpark Integration
51
52
The DataFrameConversionWrapper integrates with PySpark through the Java Function interface, enabling seamless use within Python data science workflows.
53
54
**Typical Python Usage Pattern:**
55
56
```python
57
from pyspark.sql import SparkSession
58
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
59
60
# Initialize Spark session with ADAM dependencies
61
spark = SparkSession.builder \
62
.appName("ADAM Python API") \
63
.config("spark.jars", "adam-apis_2.11-0.23.0.jar") \
64
.getOrCreate()
65
66
# Access ADAM's JavaADAMContext from Python
67
java_adam_context = spark._jvm.org.bdgenomics.adam.api.java.JavaADAMContext
68
jac = java_adam_context(spark._jsc.sc())
69
70
# Load genomic data using Java API
71
alignments_rdd = jac.loadAlignments("input.bam")
72
73
# Convert to DataFrame for Python processing
74
alignments_df = alignments_rdd.toDF()
75
76
# Define target schema for conversion
77
target_schema = StructType([
78
StructField("contigName", StringType(), True),
79
StructField("start", IntegerType(), True),
80
StructField("end", IntegerType(), True)
81
])
82
83
# Create empty target DataFrame
84
target_df = spark.createDataFrame([], target_schema)
85
86
# Create conversion wrapper
87
conversion_wrapper = spark._jvm.org.bdgenomics.adam.api.python.DataFrameConversionWrapper(target_df._jdf)
88
89
# Perform conversion
90
converted_df = conversion_wrapper.call(alignments_df._jdf)
91
92
# Convert back to Python DataFrame for further processing
93
python_df = spark.sql("SELECT * FROM converted_data")
94
```
95
96
### Data Science Workflow Integration
97
98
The Python integration enables ADAM to be used within broader Python data science ecosystems including pandas, NumPy, and scikit-learn.
99
100
**Pandas Integration Example:**
101
102
```python
103
# Convert Spark DataFrame to Pandas for local processing
104
pandas_df = converted_df.toPandas()
105
106
# Use with standard Python data science libraries
107
import pandas as pd
108
import numpy as np
109
from sklearn.cluster import KMeans
110
111
# Perform genomic data analysis with pandas
112
genomic_features = pandas_df.groupby('contigName').agg({
113
'start': ['min', 'max', 'count'],
114
'end': ['min', 'max']
115
}).flatten()
116
117
# Apply machine learning algorithms
118
clustering_features = pandas_df[['start', 'end']].values
119
kmeans = KMeans(n_clusters=5)
120
clusters = kmeans.fit_predict(clustering_features)
121
pandas_df['cluster'] = clusters
122
```
123
124
### Jupyter Notebook Integration
125
126
The Python integration works seamlessly within Jupyter notebooks for interactive genomic data exploration.
127
128
**Notebook Usage Example:**
129
130
```python
131
# Cell 1: Setup
132
import pyspark
133
from pyspark.sql import SparkSession
134
import matplotlib.pyplot as plt
135
import seaborn as sns
136
137
spark = SparkSession.builder \
138
.appName("Genomic Data Analysis") \
139
.config("spark.jars", "adam-apis_2.11-0.23.0.jar") \
140
.getOrCreate()
141
142
# Cell 2: Load and convert data
143
jac = spark._jvm.org.bdgenomics.adam.api.java.JavaADAMContext(spark._jsc.sc())
144
variants = jac.loadVariants("variants.vcf")
145
variants_df = variants.toDF().toPandas()
146
147
# Cell 3: Visualize genomic data
148
plt.figure(figsize=(12, 6))
149
sns.histplot(data=variants_df, x='start', bins=50)
150
plt.title('Distribution of Variant Positions')
151
plt.xlabel('Genomic Position')
152
plt.ylabel('Count')
153
plt.show()
154
155
# Cell 4: Statistical analysis
156
summary_stats = variants_df.groupby('contigName').agg({
157
'start': ['count', 'min', 'max'],
158
'referenceAllele': lambda x: x.value_counts().head()
159
})
160
print(summary_stats)
161
```
162
163
## Architecture and Design
164
165
### Java Function Interface
166
167
The DataFrameConversionWrapper implements Spark's Java Function interface to ensure compatibility with PySpark's Java interop layer.
168
169
```scala { .api }
170
// Extends Spark's Java Function interface for PySpark compatibility
171
import org.apache.spark.api.java.function.{ Function => JFunction }
172
173
class DataFrameConversionWrapper(newDf: DataFrame)
174
extends JFunction[DataFrame, DataFrame]
175
```
176
177
### DataFrame Schema Handling
178
179
The wrapper handles DataFrame schema transformations between different genomic data types while preserving data integrity and type safety.
180
181
**Schema Preservation Features:**
182
- **Column mapping**: Automatic mapping between compatible schema fields
183
- **Type conversion**: Safe conversion between compatible data types
184
- **Null handling**: Proper handling of missing or null genomic data fields
185
- **Metadata retention**: Preservation of DataFrame metadata and column descriptions
186
187
### Memory Management
188
189
The Python integration provides efficient memory management for large genomic datasets:
190
191
- **Lazy evaluation**: DataFrame operations are lazily evaluated until action is called
192
- **Partition-aware processing**: Maintains Spark's distributed processing model
193
- **Garbage collection**: Proper cleanup of temporary Java objects in Python environment
194
- **Streaming support**: Compatible with Spark's structured streaming for real-time genomic data
195
196
## Performance Considerations
197
198
### Python-JVM Bridge Overhead
199
200
- **Serialization costs**: Data serialization between Python and JVM adds overhead
201
- **Batch processing**: Process data in larger batches to amortize serialization costs
202
- **DataFrame caching**: Cache frequently accessed DataFrames to avoid repeated conversions
203
- **Column pruning**: Select only necessary columns before DataFrame operations
204
205
### Optimization Strategies
206
207
```python
208
# Optimize DataFrame operations for genomic data
209
genomic_df = spark.sql("""
210
SELECT contigName, start, end, referenceAllele, alternateAllele
211
FROM genomic_variants
212
WHERE contigName IN ('chr1', 'chr2', 'chr3')
213
AND start BETWEEN 1000000 AND 2000000
214
""").cache() # Cache for multiple operations
215
216
# Use vectorized operations where possible
217
import pyspark.sql.functions as F
218
processed_df = genomic_df \
219
.withColumn("variant_length", F.col("end") - F.col("start")) \
220
.withColumn("is_snp", F.col("referenceAllele").rlike("^[ATCG]$"))
221
```
222
223
## Error Handling
224
225
Common errors and troubleshooting:
226
227
- **ClassNotFoundException**: Ensure ADAM JAR files are properly included in Spark classpath
228
- **SerializationException**: Verify DataFrame schemas are compatible between source and target
229
- **OutOfMemoryError**: Increase driver and executor memory for large genomic datasets
230
- **PySparkTypeError**: Ensure proper type conversion between Python and Scala data types