or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

dataset-conversions.mdindex.mdjava-api.mdpython-integration.mdrdd-conversions.md

python-integration.mddocs/

0

# Python Integration

1

2

DataFrame wrapper functionality for Python access to ADAM's data conversion capabilities through PySpark integration. The DataFrameConversionWrapper enables Python developers to leverage ADAM's genomic data processing features within Python data science workflows.

3

4

## Capabilities

5

6

### DataFrameConversionWrapper Class

7

8

Primary wrapper class for Python integration with ADAM's DataFrame conversion system.

9

10

```scala { .api }

11

/**

12

* Wrapper class for Python API DataFrame operations.

13

* Implements Java Function interface for PySpark compatibility.

14

*/

15

class DataFrameConversionWrapper(newDf: DataFrame)

16

extends JFunction[DataFrame, DataFrame] {

17

18

/**

19

* Convert input DataFrame to target DataFrame.

20

* @param v1 Input DataFrame containing source genomic data

21

* @return Target DataFrame with converted genomic data

22

*/

23

def call(v1: DataFrame): DataFrame

24

}

25

```

26

27

**Constructor Parameters:**

28

- `newDf: DataFrame` - The target DataFrame defining the output schema and data

29

30

**Usage Example in Scala (for Java/Scala integration):**

31

32

```scala

33

import org.bdgenomics.adam.api.python.DataFrameConversionWrapper

34

import org.apache.spark.sql.DataFrame

35

36

// Create target DataFrame with desired schema

37

val targetSchema = spark.emptyDataFrame // or specific schema

38

val targetDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], targetSchema)

39

40

// Create wrapper for conversion

41

val wrapper = new DataFrameConversionWrapper(targetDF)

42

43

// Convert source DataFrame

44

val sourceDF: DataFrame = // ... source genomic data as DataFrame

45

val convertedDF: DataFrame = wrapper.call(sourceDF)

46

```

47

48

## Python Integration Patterns

49

50

### PySpark Integration

51

52

The DataFrameConversionWrapper integrates with PySpark through the Java Function interface, enabling seamless use within Python data science workflows.

53

54

**Typical Python Usage Pattern:**

55

56

```python

57

from pyspark.sql import SparkSession

58

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

59

60

# Initialize Spark session with ADAM dependencies

61

spark = SparkSession.builder \

62

.appName("ADAM Python API") \

63

.config("spark.jars", "adam-apis_2.11-0.23.0.jar") \

64

.getOrCreate()

65

66

# Access ADAM's JavaADAMContext from Python

67

java_adam_context = spark._jvm.org.bdgenomics.adam.api.java.JavaADAMContext

68

jac = java_adam_context(spark._jsc.sc())

69

70

# Load genomic data using Java API

71

alignments_rdd = jac.loadAlignments("input.bam")

72

73

# Convert to DataFrame for Python processing

74

alignments_df = alignments_rdd.toDF()

75

76

# Define target schema for conversion

77

target_schema = StructType([

78

StructField("contigName", StringType(), True),

79

StructField("start", IntegerType(), True),

80

StructField("end", IntegerType(), True)

81

])

82

83

# Create empty target DataFrame

84

target_df = spark.createDataFrame([], target_schema)

85

86

# Create conversion wrapper

87

conversion_wrapper = spark._jvm.org.bdgenomics.adam.api.python.DataFrameConversionWrapper(target_df._jdf)

88

89

# Perform conversion

90

converted_df = conversion_wrapper.call(alignments_df._jdf)

91

92

# Convert back to Python DataFrame for further processing

93

python_df = spark.sql("SELECT * FROM converted_data")

94

```

95

96

### Data Science Workflow Integration

97

98

The Python integration enables ADAM to be used within broader Python data science ecosystems including pandas, NumPy, and scikit-learn.

99

100

**Pandas Integration Example:**

101

102

```python

103

# Convert Spark DataFrame to Pandas for local processing

104

pandas_df = converted_df.toPandas()

105

106

# Use with standard Python data science libraries

107

import pandas as pd

108

import numpy as np

109

from sklearn.cluster import KMeans

110

111

# Perform genomic data analysis with pandas

112

genomic_features = pandas_df.groupby('contigName').agg({

113

'start': ['min', 'max', 'count'],

114

'end': ['min', 'max']

115

}).flatten()

116

117

# Apply machine learning algorithms

118

clustering_features = pandas_df[['start', 'end']].values

119

kmeans = KMeans(n_clusters=5)

120

clusters = kmeans.fit_predict(clustering_features)

121

pandas_df['cluster'] = clusters

122

```

123

124

### Jupyter Notebook Integration

125

126

The Python integration works seamlessly within Jupyter notebooks for interactive genomic data exploration.

127

128

**Notebook Usage Example:**

129

130

```python

131

# Cell 1: Setup

132

import pyspark

133

from pyspark.sql import SparkSession

134

import matplotlib.pyplot as plt

135

import seaborn as sns

136

137

spark = SparkSession.builder \

138

.appName("Genomic Data Analysis") \

139

.config("spark.jars", "adam-apis_2.11-0.23.0.jar") \

140

.getOrCreate()

141

142

# Cell 2: Load and convert data

143

jac = spark._jvm.org.bdgenomics.adam.api.java.JavaADAMContext(spark._jsc.sc())

144

variants = jac.loadVariants("variants.vcf")

145

variants_df = variants.toDF().toPandas()

146

147

# Cell 3: Visualize genomic data

148

plt.figure(figsize=(12, 6))

149

sns.histplot(data=variants_df, x='start', bins=50)

150

plt.title('Distribution of Variant Positions')

151

plt.xlabel('Genomic Position')

152

plt.ylabel('Count')

153

plt.show()

154

155

# Cell 4: Statistical analysis

156

summary_stats = variants_df.groupby('contigName').agg({

157

'start': ['count', 'min', 'max'],

158

'referenceAllele': lambda x: x.value_counts().head()

159

})

160

print(summary_stats)

161

```

162

163

## Architecture and Design

164

165

### Java Function Interface

166

167

The DataFrameConversionWrapper implements Spark's Java Function interface to ensure compatibility with PySpark's Java interop layer.

168

169

```scala { .api }

170

// Extends Spark's Java Function interface for PySpark compatibility

171

import org.apache.spark.api.java.function.{ Function => JFunction }

172

173

class DataFrameConversionWrapper(newDf: DataFrame)

174

extends JFunction[DataFrame, DataFrame]

175

```

176

177

### DataFrame Schema Handling

178

179

The wrapper handles DataFrame schema transformations between different genomic data types while preserving data integrity and type safety.

180

181

**Schema Preservation Features:**

182

- **Column mapping**: Automatic mapping between compatible schema fields

183

- **Type conversion**: Safe conversion between compatible data types

184

- **Null handling**: Proper handling of missing or null genomic data fields

185

- **Metadata retention**: Preservation of DataFrame metadata and column descriptions

186

187

### Memory Management

188

189

The Python integration provides efficient memory management for large genomic datasets:

190

191

- **Lazy evaluation**: DataFrame operations are lazily evaluated until action is called

192

- **Partition-aware processing**: Maintains Spark's distributed processing model

193

- **Garbage collection**: Proper cleanup of temporary Java objects in Python environment

194

- **Streaming support**: Compatible with Spark's structured streaming for real-time genomic data

195

196

## Performance Considerations

197

198

### Python-JVM Bridge Overhead

199

200

- **Serialization costs**: Data serialization between Python and JVM adds overhead

201

- **Batch processing**: Process data in larger batches to amortize serialization costs

202

- **DataFrame caching**: Cache frequently accessed DataFrames to avoid repeated conversions

203

- **Column pruning**: Select only necessary columns before DataFrame operations

204

205

### Optimization Strategies

206

207

```python

208

# Optimize DataFrame operations for genomic data

209

genomic_df = spark.sql("""

210

SELECT contigName, start, end, referenceAllele, alternateAllele

211

FROM genomic_variants

212

WHERE contigName IN ('chr1', 'chr2', 'chr3')

213

AND start BETWEEN 1000000 AND 2000000

214

""").cache() # Cache for multiple operations

215

216

# Use vectorized operations where possible

217

import pyspark.sql.functions as F

218

processed_df = genomic_df \

219

.withColumn("variant_length", F.col("end") - F.col("start")) \

220

.withColumn("is_snp", F.col("referenceAllele").rlike("^[ATCG]$"))

221

```

222

223

## Error Handling

224

225

Common errors and troubleshooting:

226

227

- **ClassNotFoundException**: Ensure ADAM JAR files are properly included in Spark classpath

228

- **SerializationException**: Verify DataFrame schemas are compatible between source and target

229

- **OutOfMemoryError**: Increase driver and executor memory for large genomic datasets

230

- **PySparkTypeError**: Ensure proper type conversion between Python and Scala data types