0
# Data Processing and SQL Integration
1
2
Data processing utilities and Spark SQL data sources that enable efficient access to CDAP datasets and streams with full type safety, performance optimization, and seamless integration with Apache Spark's DataFrame and RDD APIs.
3
4
## Capabilities
5
6
### Dataset Scanner Iterator
7
8
Iterator implementation for efficiently scanning data from CDAP datasets within Spark applications, providing type-safe access to dataset records.
9
10
```scala { .api }
11
/**
12
* Iterator for scanning data from CDAP datasets in Spark applications
13
* Provides efficient, type-safe access to dataset records
14
* @tparam T Type of data items being scanned
15
*/
16
class DatumScannerIterator[T](scanner: Scanner[T]) extends Iterator[T] with Closeable {
17
/**
18
* Checks if there are more items to scan
19
* @return true if more items are available
20
*/
21
def hasNext: Boolean
22
23
/**
24
* Gets the next item from the scanner
25
* @return Next data item of type T
26
* @throws NoSuchElementException if no more items available
27
*/
28
def next(): T
29
30
/**
31
* Closes the underlying scanner and releases resources
32
*/
33
def close(): Unit
34
}
35
```
36
37
### Serializable Stream Event
38
39
Serializable wrapper for CDAP stream events that can be efficiently processed in distributed Spark operations.
40
41
```java { .api }
42
/**
43
* Serializable wrapper for stream events in Spark processing
44
* Enables efficient distribution of stream data across Spark executors
45
*/
46
public class SerializableStreamEvent implements Serializable {
47
/**
48
* Gets the underlying stream event
49
* @return StreamEvent containing the actual event data
50
*/
51
public StreamEvent getStreamEvent();
52
53
/**
54
* Gets the timestamp of the stream event
55
* @return Event timestamp in milliseconds since epoch
56
*/
57
public long getTimestamp();
58
59
/**
60
* Gets the event headers
61
* @return Map of header key-value pairs
62
*/
63
public Map<String, String> getHeaders();
64
65
/**
66
* Gets the event body
67
* @return ByteBuffer containing the event body data
68
*/
69
public ByteBuffer getBody();
70
71
/**
72
* Gets the event body as a byte array
73
* @return Byte array containing the event body
74
*/
75
public byte[] getBodyBytes();
76
77
/**
78
* Gets the event body as a UTF-8 string
79
* @return String representation of the event body
80
*/
81
public String getBodyAsString();
82
}
83
```
84
85
### Dataset Relation Provider
86
87
Spark SQL data source provider for CDAP datasets, enabling SQL queries against CDAP datasets using DataFrame API.
88
89
```scala { .api }
90
/**
91
* Spark SQL data source provider for CDAP datasets
92
* Enables SQL queries and DataFrame operations on CDAP datasets
93
*/
94
object DatasetRelationProvider extends RelationProvider with SchemaRelationProvider {
95
/**
96
* Gets the short name for this data source
97
* @return "dataset" as the data source identifier
98
*/
99
def shortName(): String
100
101
/**
102
* Creates a relation for the specified dataset
103
* @param sqlContext Spark SQL context
104
* @param parameters Data source parameters including dataset name and namespace
105
* @return BaseRelation for querying the dataset
106
*/
107
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
108
109
/**
110
* Creates a relation with a specified schema
111
* @param sqlContext Spark SQL context
112
* @param parameters Data source parameters
113
* @param schema Expected schema for the dataset
114
* @return BaseRelation with the specified schema
115
*/
116
def createRelation(sqlContext: SQLContext,
117
parameters: Map[String, String],
118
schema: StructType): BaseRelation
119
}
120
```
121
122
### Stream Relation Provider
123
124
Spark SQL data source provider for CDAP streams, enabling SQL queries against stream data using DataFrame API.
125
126
```scala { .api }
127
/**
128
* Spark SQL data source provider for CDAP streams
129
* Enables SQL queries and DataFrame operations on CDAP streams
130
*/
131
object StreamRelationProvider extends RelationProvider with SchemaRelationProvider {
132
/**
133
* Gets the short name for this data source
134
* @return "stream" as the data source identifier
135
*/
136
def shortName(): String
137
138
/**
139
* Creates a relation for the specified stream
140
* @param sqlContext Spark SQL context
141
* @param parameters Data source parameters including stream name and time range
142
* @return BaseRelation for querying the stream
143
*/
144
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
145
146
/**
147
* Creates a relation with a specified schema
148
* @param sqlContext Spark SQL context
149
* @param parameters Data source parameters
150
* @param schema Expected schema for the stream data
151
* @return BaseRelation with the specified schema
152
*/
153
def createRelation(sqlContext: SQLContext,
154
parameters: Map[String, String],
155
schema: StructType): BaseRelation
156
}
157
```
158
159
### Stream Relation
160
161
Base relation implementation for CDAP streams that provides efficient scanning and querying capabilities.
162
163
```scala { .api }
164
/**
165
* Stream-based relation for Spark SQL queries
166
* Provides efficient access to CDAP stream data through DataFrame operations
167
*/
168
class StreamRelation(parameters: Map[String, String],
169
userSchema: Option[StructType])
170
(implicit sqlContext: SQLContext) extends BaseRelation with TableScan with PrunedScan {
171
172
/**
173
* Gets the schema for this relation
174
* @return StructType defining the stream data schema
175
*/
176
def schema: StructType
177
178
/**
179
* Builds a scan over the entire stream
180
* @return RDD[Row] containing all stream events
181
*/
182
def buildScan(): RDD[Row]
183
184
/**
185
* Builds a scan with column pruning
186
* @param requiredColumns Array of column names to include in the scan
187
* @return RDD[Row] containing only the required columns
188
*/
189
def buildScan(requiredColumns: Array[String]): RDD[Row]
190
191
/**
192
* Gets the SQL context
193
* @return SQLContext used for this relation
194
*/
195
def sqlContext: SQLContext
196
}
197
```
198
199
## Usage Examples
200
201
**Dataset Scanner Iterator Usage:**
202
203
```scala
204
import co.cask.cdap.app.runtime.spark.data.DatumScannerIterator
205
import co.cask.cdap.api.dataset.Dataset
206
207
// Create a scanner for a dataset
208
val dataset: Dataset = // ... obtain dataset instance
209
val scanner = dataset.scan()
210
val iterator = new DatumScannerIterator(scanner)
211
212
// Use iterator to process data
213
try {
214
while (iterator.hasNext) {
215
val record = iterator.next()
216
// Process record
217
println(s"Processing record: $record")
218
}
219
} finally {
220
iterator.close()
221
}
222
223
// Use with Spark RDD
224
val rdd = sparkContext.parallelize(Seq(iterator))
225
val processedRDD = rdd.flatMap(_.toSeq)
226
```
227
228
**Serializable Stream Event Usage:**
229
230
```java
231
import co.cask.cdap.app.runtime.spark.SerializableStreamEvent;
232
import java.util.Map;
233
234
// Process serializable stream events in Spark
235
JavaRDD<SerializableStreamEvent> streamRDD = // ... obtain from stream source
236
JavaRDD<String> processedRDD = streamRDD.map(event -> {
237
// Access event metadata
238
long timestamp = event.getTimestamp();
239
Map<String, String> headers = event.getHeaders();
240
241
// Process event body
242
String body = event.getBodyAsString();
243
return "Processed at " + timestamp + ": " + body;
244
});
245
```
246
247
**Dataset SQL Data Source Usage:**
248
249
```scala
250
import org.apache.spark.sql.SQLContext
251
252
// Create SQL context
253
val sqlContext = new SQLContext(sparkContext)
254
255
// Read from CDAP dataset using SQL
256
val datasetDF = sqlContext.read
257
.format("dataset")
258
.option("dataset.name", "my-dataset")
259
.option("dataset.namespace", "default")
260
.load()
261
262
// Query the dataset
263
datasetDF.createOrReplaceTempView("my_table")
264
val results = sqlContext.sql("SELECT * FROM my_table WHERE age > 21")
265
results.show()
266
267
// Use DataFrame API
268
val filteredDF = datasetDF.filter($"age" > 21).select($"name", $"age")
269
filteredDF.collect().foreach(println)
270
```
271
272
**Stream SQL Data Source Usage:**
273
274
```scala
275
import org.apache.spark.sql.SQLContext
276
import org.apache.spark.sql.types._
277
278
// Define stream schema
279
val streamSchema = StructType(Seq(
280
StructField("timestamp", LongType, nullable = false),
281
StructField("headers", MapType(StringType, StringType), nullable = true),
282
StructField("body", StringType, nullable = true)
283
))
284
285
// Read from CDAP stream
286
val streamDF = sqlContext.read
287
.format("stream")
288
.option("stream.name", "my-stream")
289
.option("stream.namespace", "default")
290
.option("stream.start.time", "2023-01-01T00:00:00Z")
291
.option("stream.end.time", "2023-12-31T23:59:59Z")
292
.schema(streamSchema)
293
.load()
294
295
// Query stream data
296
streamDF.createOrReplaceTempView("stream_events")
297
val recentEvents = sqlContext.sql(
298
"SELECT * FROM stream_events WHERE timestamp > unix_timestamp() - 3600"
299
)
300
```
301
302
## Types
303
304
```scala { .api }
305
/**
306
* Interface for dataset scanners
307
* @tparam T Type of data items being scanned
308
*/
309
trait Scanner[T] extends Closeable {
310
/**
311
* Checks if there are more items to scan
312
* @return true if more items are available
313
*/
314
def hasNext: Boolean
315
316
/**
317
* Gets the next item from the scanner
318
* @return Next data item of type T
319
*/
320
def next(): T
321
322
/**
323
* Closes the scanner and releases resources
324
*/
325
def close(): Unit
326
}
327
328
/**
329
* Base relation interface for Spark SQL data sources
330
*/
331
trait BaseRelation {
332
/**
333
* Gets the SQL context
334
* @return SQLContext for this relation
335
*/
336
def sqlContext: SQLContext
337
338
/**
339
* Gets the schema for this relation
340
* @return StructType defining the data schema
341
*/
342
def schema: StructType
343
}
344
345
/**
346
* Interface for relations that support full table scans
347
*/
348
trait TableScan {
349
/**
350
* Builds a scan over the entire relation
351
* @return RDD[Row] containing all data
352
*/
353
def buildScan(): RDD[Row]
354
}
355
356
/**
357
* Interface for relations that support column pruning
358
*/
359
trait PrunedScan {
360
/**
361
* Builds a scan with column pruning
362
* @param requiredColumns Array of column names to include
363
* @return RDD[Row] containing only the required columns
364
*/
365
def buildScan(requiredColumns: Array[String]): RDD[Row]
366
}
367
```
368
369
```java { .api }
370
/**
371
* Interface for CDAP stream events
372
*/
373
public interface StreamEvent {
374
/**
375
* Gets the event timestamp
376
* @return Timestamp in milliseconds since epoch
377
*/
378
long getTimestamp();
379
380
/**
381
* Gets the event headers
382
* @return Map of header key-value pairs
383
*/
384
Map<String, String> getHeaders();
385
386
/**
387
* Gets the event body
388
* @return ByteBuffer containing the event data
389
*/
390
ByteBuffer getBody();
391
}
392
393
/**
394
* Data source parameters for dataset access
395
*/
396
public class DatasetParameters {
397
public static final String DATASET_NAME = "dataset.name";
398
public static final String DATASET_NAMESPACE = "dataset.namespace";
399
public static final String DATASET_ARGUMENTS = "dataset.arguments";
400
}
401
402
/**
403
* Data source parameters for stream access
404
*/
405
public class StreamParameters {
406
public static final String STREAM_NAME = "stream.name";
407
public static final String STREAM_NAMESPACE = "stream.namespace";
408
public static final String STREAM_START_TIME = "stream.start.time";
409
public static final String STREAM_END_TIME = "stream.end.time";
410
public static final String STREAM_BATCH_SIZE = "stream.batch.size";
411
}
412
```