0
# Python Integration
1
2
ADAM APIs provides Python integration capabilities through DataFrame conversion wrappers, enabling Python developers to work with genomic data processing workflows using familiar DataFrame operations.
3
4
## Capabilities
5
6
### DataFrameConversionWrapper
7
8
Simple wrapper class that implements the Java Function interface for converting between Spark DataFrames in Python environments.
9
10
```java { .api }
11
/**
12
* Wrapper for Python API DataFrame conversions
13
* Implements JFunction interface for Spark transformations in Python
14
*/
15
class DataFrameConversionWrapper implements JFunction<DataFrame, DataFrame> {
16
/**
17
* Creates a conversion wrapper with the target DataFrame
18
* @param newDf The DataFrame to return from conversion operations
19
*/
20
DataFrameConversionWrapper(DataFrame newDf);
21
22
/**
23
* Function interface method for DataFrame conversion
24
* @param v1 Input DataFrame (typically ignored)
25
* @return The wrapped DataFrame specified in constructor
26
*/
27
DataFrame call(DataFrame v1);
28
}
29
```
30
31
## Usage Examples
32
33
**Basic Python integration setup:**
34
35
```python
36
from pyspark.sql import SparkSession
37
from pyspark import SparkContext
38
39
# Create Spark session
40
spark = SparkSession.builder \
41
.appName("ADAM Python API") \
42
.getOrCreate()
43
44
sc = spark.sparkContext
45
46
# Import Java classes for ADAM
47
adam_context_class = sc._jvm.org.bdgenomics.adam.rdd.ADAMContext
48
java_adam_context_class = sc._jvm.org.bdgenomics.adam.api.java.JavaADAMContext
49
50
# Create ADAM context
51
adam_context = adam_context_class(sc._jsc.sc())
52
java_adam_context = java_adam_context_class(adam_context)
53
```
54
55
**Loading genomic data in Python:**
56
57
```python
58
# Load alignment data
59
alignments_java = java_adam_context.loadAlignments("sample.bam")
60
61
# Convert Java RDD to Python DataFrame
62
alignments_df = alignments_java.toDF()
63
64
# Work with DataFrame in Python
65
filtered_alignments = alignments_df.filter(
66
alignments_df.mapq > 30
67
).filter(
68
alignments_df.readMapped == True
69
)
70
71
print(f"Filtered alignments count: {filtered_alignments.count()}")
72
```
73
74
**Using DataFrame conversion wrappers:**
75
76
```python
77
# Import conversion wrapper
78
wrapper_class = sc._jvm.org.bdgenomics.adam.api.python.DataFrameConversionWrapper
79
80
# Create processed DataFrame
81
processed_df = filtered_alignments.select("readName", "contigName", "start", "end")
82
83
# Create conversion wrapper
84
wrapper = wrapper_class(processed_df._jdf)
85
86
# Use wrapper in Java-side transformations
87
# (This pattern is typically used internally by higher-level Python APIs)
88
converted_df = wrapper.call(alignments_df._jdf)
89
```
90
91
**Advanced genomic analysis in Python:**
92
93
```python
94
# Load multiple genomic data types
95
variants_java = java_adam_context.loadVariants("variants.vcf")
96
features_java = java_adam_context.loadFeatures("annotations.bed")
97
98
# Convert to DataFrames
99
variants_df = variants_java.toDF()
100
features_df = features_java.toDF()
101
102
# Perform genomic analysis with Spark SQL
103
variants_df.createOrReplaceTempView("variants")
104
features_df.createOrReplaceTempView("features")
105
106
# Find variants in annotated regions
107
annotated_variants = spark.sql("""
108
SELECT v.*, f.featureType, f.name as gene_name
109
FROM variants v
110
JOIN features f ON (
111
v.contigName = f.contigName AND
112
v.start >= f.start AND
113
v.end <= f.end
114
)
115
WHERE v.qual > 30
116
""")
117
118
print("Annotated variants schema:")
119
annotated_variants.printSchema()
120
annotated_variants.show(10)
121
```
122
123
**Coverage analysis workflow:**
124
125
```python
126
# Load alignment data and convert to coverage
127
alignments_java = java_adam_context.loadAlignments("sample.bam")
128
coverage_java = java_adam_context.loadCoverage("coverage.bed")
129
130
# Convert to DataFrames for analysis
131
alignments_df = alignments_java.toDF()
132
coverage_df = coverage_java.toDF()
133
134
# Calculate coverage statistics
135
coverage_stats = coverage_df.agg({
136
"score": "avg",
137
"score": "max",
138
"score": "min"
139
}).collect()[0]
140
141
print(f"Average coverage: {coverage_stats['avg(score)']:.2f}")
142
print(f"Maximum coverage: {coverage_stats['max(score)']}")
143
print(f"Minimum coverage: {coverage_stats['min(score)']}")
144
145
# Find high-coverage regions
146
high_coverage = coverage_df.filter(coverage_df.score > coverage_stats['avg(score)'] * 2)
147
high_coverage.select("contigName", "start", "end", "score").show()
148
```
149
150
**Working with genotype data:**
151
152
```python
153
# Load genotype data
154
genotypes_java = java_adam_context.loadGenotypes("samples.vcf")
155
genotypes_df = genotypes_java.toDF()
156
157
# Analyze genotype quality and coverage
158
quality_analysis = genotypes_df.groupBy("sampleId") \
159
.agg({
160
"genotypeQuality": "avg",
161
"readDepth": "avg",
162
"sampleId": "count"
163
}) \
164
.withColumnRenamed("count(sampleId)", "variant_count") \
165
.withColumnRenamed("avg(genotypeQuality)", "avg_quality") \
166
.withColumnRenamed("avg(readDepth)", "avg_depth")
167
168
print("Per-sample genotype statistics:")
169
quality_analysis.show()
170
171
# Find samples with high-quality genotypes
172
high_quality_samples = quality_analysis.filter(
173
(quality_analysis.avg_quality > 50) &
174
(quality_analysis.avg_depth > 20)
175
)
176
177
print("High-quality samples:")
178
high_quality_samples.show()
179
```
180
181
**Fragment analysis:**
182
183
```python
184
# Load fragment data (paired-end reads)
185
fragments_java = java_adam_context.loadFragments("paired_reads.bam")
186
fragments_df = fragments_java.toDF()
187
188
# Analyze insert size distribution
189
insert_size_stats = fragments_df.select("insertSize") \
190
.filter(fragments_df.insertSize.isNotNull()) \
191
.filter(fragments_df.insertSize > 0) \
192
.agg({
193
"insertSize": "avg",
194
"insertSize": "stddev",
195
"insertSize": "min",
196
"insertSize": "max"
197
}).collect()[0]
198
199
print("Insert size statistics:")
200
print(f"Mean: {insert_size_stats['avg(insertSize)']:.2f}")
201
print(f"Std Dev: {insert_size_stats['stddev_samp(insertSize)']:.2f}")
202
print(f"Min: {insert_size_stats['min(insertSize)']}")
203
print(f"Max: {insert_size_stats['max(insertSize)']}")
204
205
# Plot insert size distribution (if matplotlib available)
206
try:
207
import matplotlib.pyplot as plt
208
import pandas as pd
209
210
insert_sizes = fragments_df.select("insertSize") \
211
.filter(fragments_df.insertSize.between(50, 800)) \
212
.toPandas()
213
214
plt.figure(figsize=(10, 6))
215
plt.hist(insert_sizes['insertSize'], bins=50, alpha=0.7)
216
plt.xlabel('Insert Size (bp)')
217
plt.ylabel('Frequency')
218
plt.title('Insert Size Distribution')
219
plt.grid(True, alpha=0.3)
220
plt.show()
221
222
except ImportError:
223
print("Matplotlib not available for plotting")
224
```
225
226
## Integration Patterns
227
228
**Bridging Java and Python APIs:**
229
230
```python
231
def create_adam_context(spark_session):
232
"""Helper function to create ADAM context from Spark session"""
233
sc = spark_session.sparkContext
234
adam_context = sc._jvm.org.bdgenomics.adam.rdd.ADAMContext(sc._jsc.sc())
235
return sc._jvm.org.bdgenomics.adam.api.java.JavaADAMContext(adam_context)
236
237
def load_genomic_data(adam_context, file_path, data_type="alignments"):
238
"""Generic genomic data loader"""
239
if data_type == "alignments":
240
return adam_context.loadAlignments(file_path)
241
elif data_type == "variants":
242
return adam_context.loadVariants(file_path)
243
elif data_type == "features":
244
return adam_context.loadFeatures(file_path)
245
elif data_type == "genotypes":
246
return adam_context.loadGenotypes(file_path)
247
else:
248
raise ValueError(f"Unsupported data type: {data_type}")
249
250
# Usage
251
adam_ctx = create_adam_context(spark)
252
alignments = load_genomic_data(adam_ctx, "sample.bam", "alignments")
253
alignments_df = alignments.toDF()
254
```
255
256
**Error handling in Python integration:**
257
258
```python
259
def safe_load_genomic_data(adam_context, file_path, data_type, stringency="LENIENT"):
260
"""Load genomic data with error handling"""
261
try:
262
# Get stringency enum
263
stringency_class = spark.sparkContext._jvm.htsjdk.samtools.ValidationStringency
264
stringency_val = getattr(stringency_class, stringency)
265
266
if data_type == "alignments":
267
return adam_context.loadAlignments(file_path, stringency_val)
268
elif data_type == "variants":
269
return adam_context.loadVariants(file_path, stringency_val)
270
# Add other types as needed
271
272
except Exception as e:
273
print(f"Error loading {data_type} from {file_path}: {str(e)}")
274
return None
275
276
# Usage with error handling
277
alignments = safe_load_genomic_data(adam_ctx, "potentially_corrupted.bam", "alignments", "LENIENT")
278
if alignments:
279
alignments_df = alignments.toDF()
280
print(f"Successfully loaded {alignments_df.count()} alignment records")
281
```
282
283
## Key Benefits
284
285
- **Familiar DataFrame API**: Work with genomic data using standard Spark DataFrame operations
286
- **Python Ecosystem Integration**: Combine with pandas, matplotlib, scikit-learn, and other Python libraries
287
- **SQL Analysis**: Use Spark SQL for complex genomic queries
288
- **Scalability**: Leverage Spark's distributed computing for large-scale genomic analysis
289
- **Interoperability**: Seamlessly bridge between Java ADAM APIs and Python data science workflows