or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

dataset-conversions.mdgenomic-data-loading.mdindex.mdpython-integration.mdrdd-conversions.md

python-integration.mddocs/

0

# Python Integration

1

2

ADAM APIs provides Python integration capabilities through DataFrame conversion wrappers, enabling Python developers to work with genomic data processing workflows using familiar DataFrame operations.

3

4

## Capabilities

5

6

### DataFrameConversionWrapper

7

8

Simple wrapper class that implements the Java Function interface for converting between Spark DataFrames in Python environments.

9

10

```java { .api }

11

/**

12

* Wrapper for Python API DataFrame conversions

13

* Implements JFunction interface for Spark transformations in Python

14

*/

15

class DataFrameConversionWrapper implements JFunction<DataFrame, DataFrame> {

16

/**

17

* Creates a conversion wrapper with the target DataFrame

18

* @param newDf The DataFrame to return from conversion operations

19

*/

20

DataFrameConversionWrapper(DataFrame newDf);

21

22

/**

23

* Function interface method for DataFrame conversion

24

* @param v1 Input DataFrame (typically ignored)

25

* @return The wrapped DataFrame specified in constructor

26

*/

27

DataFrame call(DataFrame v1);

28

}

29

```

30

31

## Usage Examples

32

33

**Basic Python integration setup:**

34

35

```python

36

from pyspark.sql import SparkSession

37

from pyspark import SparkContext

38

39

# Create Spark session

40

spark = SparkSession.builder \

41

.appName("ADAM Python API") \

42

.getOrCreate()

43

44

sc = spark.sparkContext

45

46

# Import Java classes for ADAM

47

adam_context_class = sc._jvm.org.bdgenomics.adam.rdd.ADAMContext

48

java_adam_context_class = sc._jvm.org.bdgenomics.adam.api.java.JavaADAMContext

49

50

# Create ADAM context

51

adam_context = adam_context_class(sc._jsc.sc())

52

java_adam_context = java_adam_context_class(adam_context)

53

```

54

55

**Loading genomic data in Python:**

56

57

```python

58

# Load alignment data

59

alignments_java = java_adam_context.loadAlignments("sample.bam")

60

61

# Convert Java RDD to Python DataFrame

62

alignments_df = alignments_java.toDF()

63

64

# Work with DataFrame in Python

65

filtered_alignments = alignments_df.filter(

66

alignments_df.mapq > 30

67

).filter(

68

alignments_df.readMapped == True

69

)

70

71

print(f"Filtered alignments count: {filtered_alignments.count()}")

72

```

73

74

**Using DataFrame conversion wrappers:**

75

76

```python

77

# Import conversion wrapper

78

wrapper_class = sc._jvm.org.bdgenomics.adam.api.python.DataFrameConversionWrapper

79

80

# Create processed DataFrame

81

processed_df = filtered_alignments.select("readName", "contigName", "start", "end")

82

83

# Create conversion wrapper

84

wrapper = wrapper_class(processed_df._jdf)

85

86

# Use wrapper in Java-side transformations

87

# (This pattern is typically used internally by higher-level Python APIs)

88

converted_df = wrapper.call(alignments_df._jdf)

89

```

90

91

**Advanced genomic analysis in Python:**

92

93

```python

94

# Load multiple genomic data types

95

variants_java = java_adam_context.loadVariants("variants.vcf")

96

features_java = java_adam_context.loadFeatures("annotations.bed")

97

98

# Convert to DataFrames

99

variants_df = variants_java.toDF()

100

features_df = features_java.toDF()

101

102

# Perform genomic analysis with Spark SQL

103

variants_df.createOrReplaceTempView("variants")

104

features_df.createOrReplaceTempView("features")

105

106

# Find variants in annotated regions

107

annotated_variants = spark.sql("""

108

SELECT v.*, f.featureType, f.name as gene_name

109

FROM variants v

110

JOIN features f ON (

111

v.contigName = f.contigName AND

112

v.start >= f.start AND

113

v.end <= f.end

114

)

115

WHERE v.qual > 30

116

""")

117

118

print("Annotated variants schema:")

119

annotated_variants.printSchema()

120

annotated_variants.show(10)

121

```

122

123

**Coverage analysis workflow:**

124

125

```python

126

# Load alignment data and convert to coverage

127

alignments_java = java_adam_context.loadAlignments("sample.bam")

128

coverage_java = java_adam_context.loadCoverage("coverage.bed")

129

130

# Convert to DataFrames for analysis

131

alignments_df = alignments_java.toDF()

132

coverage_df = coverage_java.toDF()

133

134

# Calculate coverage statistics

135

coverage_stats = coverage_df.agg({

136

"score": "avg",

137

"score": "max",

138

"score": "min"

139

}).collect()[0]

140

141

print(f"Average coverage: {coverage_stats['avg(score)']:.2f}")

142

print(f"Maximum coverage: {coverage_stats['max(score)']}")

143

print(f"Minimum coverage: {coverage_stats['min(score)']}")

144

145

# Find high-coverage regions

146

high_coverage = coverage_df.filter(coverage_df.score > coverage_stats['avg(score)'] * 2)

147

high_coverage.select("contigName", "start", "end", "score").show()

148

```

149

150

**Working with genotype data:**

151

152

```python

153

# Load genotype data

154

genotypes_java = java_adam_context.loadGenotypes("samples.vcf")

155

genotypes_df = genotypes_java.toDF()

156

157

# Analyze genotype quality and coverage

158

quality_analysis = genotypes_df.groupBy("sampleId") \

159

.agg({

160

"genotypeQuality": "avg",

161

"readDepth": "avg",

162

"sampleId": "count"

163

}) \

164

.withColumnRenamed("count(sampleId)", "variant_count") \

165

.withColumnRenamed("avg(genotypeQuality)", "avg_quality") \

166

.withColumnRenamed("avg(readDepth)", "avg_depth")

167

168

print("Per-sample genotype statistics:")

169

quality_analysis.show()

170

171

# Find samples with high-quality genotypes

172

high_quality_samples = quality_analysis.filter(

173

(quality_analysis.avg_quality > 50) &

174

(quality_analysis.avg_depth > 20)

175

)

176

177

print("High-quality samples:")

178

high_quality_samples.show()

179

```

180

181

**Fragment analysis:**

182

183

```python

184

# Load fragment data (paired-end reads)

185

fragments_java = java_adam_context.loadFragments("paired_reads.bam")

186

fragments_df = fragments_java.toDF()

187

188

# Analyze insert size distribution

189

insert_size_stats = fragments_df.select("insertSize") \

190

.filter(fragments_df.insertSize.isNotNull()) \

191

.filter(fragments_df.insertSize > 0) \

192

.agg({

193

"insertSize": "avg",

194

"insertSize": "stddev",

195

"insertSize": "min",

196

"insertSize": "max"

197

}).collect()[0]

198

199

print("Insert size statistics:")

200

print(f"Mean: {insert_size_stats['avg(insertSize)']:.2f}")

201

print(f"Std Dev: {insert_size_stats['stddev_samp(insertSize)']:.2f}")

202

print(f"Min: {insert_size_stats['min(insertSize)']}")

203

print(f"Max: {insert_size_stats['max(insertSize)']}")

204

205

# Plot insert size distribution (if matplotlib available)

206

try:

207

import matplotlib.pyplot as plt

208

import pandas as pd

209

210

insert_sizes = fragments_df.select("insertSize") \

211

.filter(fragments_df.insertSize.between(50, 800)) \

212

.toPandas()

213

214

plt.figure(figsize=(10, 6))

215

plt.hist(insert_sizes['insertSize'], bins=50, alpha=0.7)

216

plt.xlabel('Insert Size (bp)')

217

plt.ylabel('Frequency')

218

plt.title('Insert Size Distribution')

219

plt.grid(True, alpha=0.3)

220

plt.show()

221

222

except ImportError:

223

print("Matplotlib not available for plotting")

224

```

225

226

## Integration Patterns

227

228

**Bridging Java and Python APIs:**

229

230

```python

231

def create_adam_context(spark_session):

232

"""Helper function to create ADAM context from Spark session"""

233

sc = spark_session.sparkContext

234

adam_context = sc._jvm.org.bdgenomics.adam.rdd.ADAMContext(sc._jsc.sc())

235

return sc._jvm.org.bdgenomics.adam.api.java.JavaADAMContext(adam_context)

236

237

def load_genomic_data(adam_context, file_path, data_type="alignments"):

238

"""Generic genomic data loader"""

239

if data_type == "alignments":

240

return adam_context.loadAlignments(file_path)

241

elif data_type == "variants":

242

return adam_context.loadVariants(file_path)

243

elif data_type == "features":

244

return adam_context.loadFeatures(file_path)

245

elif data_type == "genotypes":

246

return adam_context.loadGenotypes(file_path)

247

else:

248

raise ValueError(f"Unsupported data type: {data_type}")

249

250

# Usage

251

adam_ctx = create_adam_context(spark)

252

alignments = load_genomic_data(adam_ctx, "sample.bam", "alignments")

253

alignments_df = alignments.toDF()

254

```

255

256

**Error handling in Python integration:**

257

258

```python

259

def safe_load_genomic_data(adam_context, file_path, data_type, stringency="LENIENT"):

260

"""Load genomic data with error handling"""

261

try:

262

# Get stringency enum

263

stringency_class = spark.sparkContext._jvm.htsjdk.samtools.ValidationStringency

264

stringency_val = getattr(stringency_class, stringency)

265

266

if data_type == "alignments":

267

return adam_context.loadAlignments(file_path, stringency_val)

268

elif data_type == "variants":

269

return adam_context.loadVariants(file_path, stringency_val)

270

# Add other types as needed

271

272

except Exception as e:

273

print(f"Error loading {data_type} from {file_path}: {str(e)}")

274

return None

275

276

# Usage with error handling

277

alignments = safe_load_genomic_data(adam_ctx, "potentially_corrupted.bam", "alignments", "LENIENT")

278

if alignments:

279

alignments_df = alignments.toDF()

280

print(f"Successfully loaded {alignments_df.count()} alignment records")

281

```

282

283

## Key Benefits

284

285

- **Familiar DataFrame API**: Work with genomic data using standard Spark DataFrame operations

286

- **Python Ecosystem Integration**: Combine with pandas, matplotlib, scikit-learn, and other Python libraries

287

- **SQL Analysis**: Use Spark SQL for complex genomic queries

288

- **Scalability**: Leverage Spark's distributed computing for large-scale genomic analysis

289

- **Interoperability**: Seamlessly bridge between Java ADAM APIs and Python data science workflows