0
# Data Transformations
1
2
Genomic-specific transformations including format conversions, quality score recalibration, duplicate marking, and coverage analysis. These operations are optimized for distributed processing of large genomic datasets while maintaining data integrity and genomic coordinate awareness.
3
4
## Capabilities
5
6
### Alignment Transformations
7
8
Specialized transformations for sequencing read data including quality improvement and analysis operations.
9
10
```scala { .api }
11
/**
12
* Convert alignment records to coverage depth information
13
* @param collapse - Whether to merge adjacent coverage records with same depth
14
* @return CoverageRDD representing sequencing depth across genomic positions
15
*/
16
def toCoverage(collapse: Boolean = true): CoverageRDD
17
18
/**
19
* Group paired-end reads into fragments representing complete DNA molecules
20
* @return FragmentRDD containing paired reads as single fragments
21
*/
22
def toFragments(): FragmentRDD
23
24
/**
25
* Mark duplicate reads based on genomic coordinates and orientation
26
* Uses Picard's duplicate marking algorithm for compatibility
27
* @return AlignmentRecordRDD with duplicate reads flagged
28
*/
29
def markDuplicates(): AlignmentRecordRDD
30
31
/**
32
* Recalibrate base quality scores using known variant sites
33
* Implements GATK-style base quality score recalibration
34
* @param knownSnps - VariantRDD of known variant sites for recalibration
35
* @return AlignmentRecordRDD with recalibrated quality scores
36
*/
37
def recalibrateBaseQualities(knownSnps: VariantRDD): AlignmentRecordRDD
38
39
/**
40
* Realign reads around indels to improve alignment accuracy
41
* @return AlignmentRecordRDD with improved alignments around indels
42
*/
43
def realignIndels(): AlignmentRecordRDD
44
45
/**
46
* Sort reads by genomic coordinate for efficient processing
47
* @return AlignmentRecordRDD sorted by reference position
48
*/
49
def sortReadsByReferencePosition(): AlignmentRecordRDD
50
51
/**
52
* Count k-mers of specified length across all reads
53
* @param kmerLength - Length of k-mers to count
54
* @return RDD of k-mer sequences with their occurrence counts
55
*/
56
def countKmers(kmerLength: Int): RDD[(String, Long)]
57
```
58
59
**Usage Examples:**
60
61
```scala
62
import org.bdgenomics.adam.rdd.ADAMContext._
63
64
val alignments = sc.loadBam("input.bam")
65
66
// Generate coverage from alignments
67
val coverage = alignments.toCoverage(collapse = true)
68
coverage.saveAsWig("coverage.wig")
69
70
// Mark and remove duplicates
71
val deduped = alignments
72
.markDuplicates()
73
.transform(_.filter(!_.getDuplicateRead))
74
75
// Quality score recalibration workflow
76
val knownVariants = sc.loadVcf("known_sites.vcf").toVariants()
77
val recalibrated = alignments.recalibrateBaseQualities(knownVariants)
78
79
// K-mer analysis
80
val kmers21 = alignments.countKmers(21)
81
val topKmers = kmers21.top(100)(Ordering.by(_._2))
82
```
83
84
### Variant Transformations
85
86
Transformations for variant calling and population genetics workflows.
87
88
```scala { .api }
89
/**
90
* Convert variants to genotype calls for population analysis
91
* @return GenotypeRDD containing sample-specific genotype information
92
*/
93
def toGenotypes(): GenotypeRDD
94
95
/**
96
* Convert variants to variant contexts with full VCF metadata
97
* @return VariantContextRDD containing variants with header information
98
*/
99
def toVariantContexts(): VariantContextRDD
100
101
/**
102
* Extract variants from genotype calls
103
* @return VariantRDD containing unique variant sites
104
*/
105
def toVariants(): VariantRDD // Available on GenotypeRDD and VariantContextRDD
106
```
107
108
**Usage Examples:**
109
110
```scala
111
// Load and transform variant data
112
val variantContexts = sc.loadVcf("population.vcf")
113
114
// Extract variants for analysis
115
val variants = variantContexts.toVariants()
116
117
// Extract genotypes for population genetics
118
val genotypes = variantContexts.toGenotypes()
119
120
// Filter variants by quality and convert back
121
val highQualityVariants = variants
122
.transform(_.filter(_.getQuality > 30.0))
123
.toVariantContexts()
124
```
125
126
### Coverage Analysis
127
128
Transformations for analyzing sequencing depth and coverage patterns.
129
130
```scala { .api }
131
/**
132
* Collapse adjacent coverage records with identical depth values
133
* @return CoverageRDD with consolidated coverage intervals
134
*/
135
def collapse(): CoverageRDD
136
137
/**
138
* Convert coverage records to genomic features for analysis
139
* @return FeatureRDD representing coverage intervals as features
140
*/
141
def toFeatures(): FeatureRDD
142
143
/**
144
* Aggregate coverage across multiple samples at each position
145
* @return CoverageRDD with combined coverage information
146
*/
147
def aggregateByPosition(): CoverageRDD
148
```
149
150
**Usage Examples:**
151
152
```scala
153
// Generate and analyze coverage
154
val alignments = sc.loadBam("sample.bam")
155
val coverage = alignments.toCoverage()
156
157
// Collapse adjacent regions with same coverage
158
val collapsed = coverage.collapse()
159
160
// Find regions with high coverage (>50x)
161
val highCoverage = coverage
162
.transform(_.filter(_.count > 50.0))
163
.toFeatures()
164
165
highCoverage.saveAsBed("high_coverage_regions.bed")
166
```
167
168
### Fragment Operations
169
170
Transformations for paired-end sequencing fragment analysis.
171
172
```scala { .api }
173
/**
174
* Convert fragments back to individual alignment records
175
* @return AlignmentRecordRDD containing separate read alignments
176
*/
177
def toAlignmentRecords(): AlignmentRecordRDD
178
179
/**
180
* Mark duplicate fragments based on alignment coordinates
181
* @return FragmentRDD with duplicate fragments flagged
182
*/
183
def markDuplicates(): FragmentRDD
184
```
185
186
**Usage Examples:**
187
188
```scala
189
// Work with paired-end fragments
190
val alignments = sc.loadBam("paired_end.bam")
191
val fragments = alignments.toFragments()
192
193
// Mark duplicate fragments
194
val dedupedFragments = fragments.markDuplicates()
195
196
// Convert back to reads for downstream processing
197
val cleanReads = dedupedFragments.toAlignmentRecords()
198
```
199
200
### Format Conversions
201
202
Convert between different genomic data representations and file formats.
203
204
```scala { .api }
205
/**
206
* Save alignment records as SAM/BAM/CRAM format
207
* @param pathName - Output file path
208
* @param asType - Output format (SAM, BAM, or CRAM)
209
* @param asSingleFile - Whether to save as single file or partitioned
210
*/
211
def saveAsSam(pathName: String,
212
asType: SAMFormat = SAMFormat.SAM,
213
asSingleFile: Boolean = false): Unit
214
215
/**
216
* Save alignment records as FASTQ format
217
* @param pathName - Output file path
218
* @param outputOriginalBaseQualities - Whether to output original quality scores
219
* @param asSingleFile - Whether to save as single file
220
*/
221
def saveAsFastq(pathName: String,
222
outputOriginalBaseQualities: Boolean = false,
223
asSingleFile: Boolean = false): Unit
224
225
/**
226
* Save variants as VCF format
227
* @param pathName - Output file path
228
* @param stringency - Validation stringency for VCF compliance
229
*/
230
def saveAsVcf(pathName: String,
231
stringency: ValidationStringency = ValidationStringency.STRICT): Unit
232
233
/**
234
* Save features in various annotation formats
235
*/
236
def saveAsBed(pathName: String): Unit // BED format
237
def saveAsGtf(pathName: String): Unit // GTF format
238
def saveAsGff3(pathName: String): Unit // GFF3 format
239
def saveAsIntervalList(pathName: String): Unit // Picard interval list
240
def saveAsNarrowPeak(pathName: String): Unit // ENCODE narrowPeak
241
242
/**
243
* Save coverage as WIG format for genome browsers
244
* @param pathName - Output file path
245
*/
246
def saveAsWig(pathName: String): Unit
247
248
/**
249
* Save reference sequences as FASTA format
250
* @param pathName - Output file path
251
* @param lineWidth - Number of bases per line
252
*/
253
def saveAsFasta(pathName: String, lineWidth: Int = 60): Unit
254
```
255
256
### Advanced Transformations
257
258
Complex genomic analysis operations for specialized workflows.
259
260
```scala { .api }
261
/**
262
* Generic transformation preserving genomic metadata
263
* @param tFn - Function to transform the underlying RDD
264
* @return Same GenomicRDD type with transformed data
265
*/
266
def transform(tFn: RDD[T] => RDD[T]): U
267
268
/**
269
* Transform to different GenomicRDD type
270
* @param tFn - Function to transform RDD to different type
271
* @return New GenomicRDD type with appropriate metadata
272
*/
273
def transmute[X, Y <: GenomicRDD[X, Y]](tFn: RDD[T] => RDD[X]): Y
274
275
/**
276
* Union multiple GenomicRDDs of the same type
277
* @param rdds - Variable number of GenomicRDDs to union
278
* @return Combined GenomicRDD containing all records
279
*/
280
def union(rdds: U*): U
281
```
282
283
**Usage Examples:**
284
285
```scala
286
// Complex transformation pipeline
287
val alignments = sc.loadBam("input.bam")
288
289
val processed = alignments
290
.transform(_.filter(_.getReadMapped)) // Filter mapped reads
291
.markDuplicates() // Mark duplicates
292
.transform(_.filter(!_.getDuplicateRead)) // Remove duplicates
293
.sortReadsByReferencePosition() // Sort by position
294
295
// Convert to different format
296
val coverage = processed.toCoverage()
297
298
// Union multiple samples
299
val sample1 = sc.loadBam("sample1.bam")
300
val sample2 = sc.loadBam("sample2.bam")
301
val sample3 = sc.loadBam("sample3.bam")
302
303
val combined = sample1.union(sample2, sample3)
304
```
305
306
### Quality Control Operations
307
308
Operations for assessing and improving data quality.
309
310
```scala { .api }
311
/**
312
* Calculate alignment statistics for quality assessment
313
* @return RDD of alignment statistics by reference contig
314
*/
315
def alignmentStatistics(): RDD[(String, AlignmentStatistics)]
316
317
/**
318
* Filter reads based on quality metrics
319
* @param minMappingQuality - Minimum mapping quality score
320
* @param requireProperPair - Whether to require properly paired reads
321
* @return AlignmentRecordRDD with filtered reads
322
*/
323
def filterByQuality(minMappingQuality: Int, requireProperPair: Boolean = false): AlignmentRecordRDD
324
325
/**
326
* Calculate insert size distribution for paired-end reads
327
* @return RDD of insert sizes with their frequencies
328
*/
329
def insertSizeDistribution(): RDD[(Int, Long)]
330
```
331
332
**Usage Examples:**
333
334
```scala
335
// Quality control workflow
336
val alignments = sc.loadBam("sample.bam")
337
338
// Filter by mapping quality
339
val highQuality = alignments.filterByQuality(minMappingQuality = 30)
340
341
// Calculate statistics
342
val stats = alignments.alignmentStatistics()
343
stats.collect().foreach { case (contig, stat) =>
344
println(s"$contig: ${stat.mappedReads} mapped reads")
345
}
346
347
// Analyze insert sizes
348
val insertSizes = alignments.insertSizeDistribution()
349
val medianInsertSize = insertSizes.map(_._1).median()
350
```
351
352
## Transformation Performance Tips
353
354
1. **Chain operations** before triggering actions to minimize data shuffling
355
2. **Use broadcast joins** for small reference datasets
356
3. **Persist intermediate results** that will be reused multiple times
357
4. **Apply filters early** in transformation chains to reduce data volume
358
5. **Consider partitioning** by genomic region for region-based analyses
359
6. **Use appropriate storage levels** when persisting large datasets
360
361
## Error Handling in Transformations
362
363
All transformations respect the validation stringency settings and handle malformed data according to the specified policy:
364
365
- **STRICT**: Fail immediately on malformed records
366
- **LENIENT**: Log warnings and skip malformed records
367
- **SILENT**: Skip malformed records without warnings
368
369
Use appropriate stringency levels based on data quality and processing requirements.