or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-processing.mddistributed-execution.mddynamic-compilation.mdexecution-contexts.mdhttp-services.mdindex.mdruntime-providers.mdtransaction-management.md

data-processing.mddocs/

0

# Data Processing and SQL Integration

1

2

Data processing utilities and Spark SQL data sources that enable efficient access to CDAP datasets and streams with full type safety, performance optimization, and seamless integration with Apache Spark's DataFrame and RDD APIs.

3

4

## Capabilities

5

6

### Dataset Scanner Iterator

7

8

Iterator implementation for efficiently scanning data from CDAP datasets within Spark applications, providing type-safe access to dataset records.

9

10

```scala { .api }

11

/**

12

* Iterator for scanning data from CDAP datasets in Spark applications

13

* Provides efficient, type-safe access to dataset records

14

* @tparam T Type of data items being scanned

15

*/

16

class DatumScannerIterator[T](scanner: Scanner[T]) extends Iterator[T] with Closeable {

17

/**

18

* Checks if there are more items to scan

19

* @return true if more items are available

20

*/

21

def hasNext: Boolean

22

23

/**

24

* Gets the next item from the scanner

25

* @return Next data item of type T

26

* @throws NoSuchElementException if no more items available

27

*/

28

def next(): T

29

30

/**

31

* Closes the underlying scanner and releases resources

32

*/

33

def close(): Unit

34

}

35

```

36

37

### Serializable Stream Event

38

39

Serializable wrapper for CDAP stream events that can be efficiently processed in distributed Spark operations.

40

41

```java { .api }

42

/**

43

* Serializable wrapper for stream events in Spark processing

44

* Enables efficient distribution of stream data across Spark executors

45

*/

46

public class SerializableStreamEvent implements Serializable {

47

/**

48

* Gets the underlying stream event

49

* @return StreamEvent containing the actual event data

50

*/

51

public StreamEvent getStreamEvent();

52

53

/**

54

* Gets the timestamp of the stream event

55

* @return Event timestamp in milliseconds since epoch

56

*/

57

public long getTimestamp();

58

59

/**

60

* Gets the event headers

61

* @return Map of header key-value pairs

62

*/

63

public Map<String, String> getHeaders();

64

65

/**

66

* Gets the event body

67

* @return ByteBuffer containing the event body data

68

*/

69

public ByteBuffer getBody();

70

71

/**

72

* Gets the event body as a byte array

73

* @return Byte array containing the event body

74

*/

75

public byte[] getBodyBytes();

76

77

/**

78

* Gets the event body as a UTF-8 string

79

* @return String representation of the event body

80

*/

81

public String getBodyAsString();

82

}

83

```

84

85

### Dataset Relation Provider

86

87

Spark SQL data source provider for CDAP datasets, enabling SQL queries against CDAP datasets using DataFrame API.

88

89

```scala { .api }

90

/**

91

* Spark SQL data source provider for CDAP datasets

92

* Enables SQL queries and DataFrame operations on CDAP datasets

93

*/

94

object DatasetRelationProvider extends RelationProvider with SchemaRelationProvider {

95

/**

96

* Gets the short name for this data source

97

* @return "dataset" as the data source identifier

98

*/

99

def shortName(): String

100

101

/**

102

* Creates a relation for the specified dataset

103

* @param sqlContext Spark SQL context

104

* @param parameters Data source parameters including dataset name and namespace

105

* @return BaseRelation for querying the dataset

106

*/

107

def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation

108

109

/**

110

* Creates a relation with a specified schema

111

* @param sqlContext Spark SQL context

112

* @param parameters Data source parameters

113

* @param schema Expected schema for the dataset

114

* @return BaseRelation with the specified schema

115

*/

116

def createRelation(sqlContext: SQLContext,

117

parameters: Map[String, String],

118

schema: StructType): BaseRelation

119

}

120

```

121

122

### Stream Relation Provider

123

124

Spark SQL data source provider for CDAP streams, enabling SQL queries against stream data using DataFrame API.

125

126

```scala { .api }

127

/**

128

* Spark SQL data source provider for CDAP streams

129

* Enables SQL queries and DataFrame operations on CDAP streams

130

*/

131

object StreamRelationProvider extends RelationProvider with SchemaRelationProvider {

132

/**

133

* Gets the short name for this data source

134

* @return "stream" as the data source identifier

135

*/

136

def shortName(): String

137

138

/**

139

* Creates a relation for the specified stream

140

* @param sqlContext Spark SQL context

141

* @param parameters Data source parameters including stream name and time range

142

* @return BaseRelation for querying the stream

143

*/

144

def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation

145

146

/**

147

* Creates a relation with a specified schema

148

* @param sqlContext Spark SQL context

149

* @param parameters Data source parameters

150

* @param schema Expected schema for the stream data

151

* @return BaseRelation with the specified schema

152

*/

153

def createRelation(sqlContext: SQLContext,

154

parameters: Map[String, String],

155

schema: StructType): BaseRelation

156

}

157

```

158

159

### Stream Relation

160

161

Base relation implementation for CDAP streams that provides efficient scanning and querying capabilities.

162

163

```scala { .api }

164

/**

165

* Stream-based relation for Spark SQL queries

166

* Provides efficient access to CDAP stream data through DataFrame operations

167

*/

168

class StreamRelation(parameters: Map[String, String],

169

userSchema: Option[StructType])

170

(implicit sqlContext: SQLContext) extends BaseRelation with TableScan with PrunedScan {

171

172

/**

173

* Gets the schema for this relation

174

* @return StructType defining the stream data schema

175

*/

176

def schema: StructType

177

178

/**

179

* Builds a scan over the entire stream

180

* @return RDD[Row] containing all stream events

181

*/

182

def buildScan(): RDD[Row]

183

184

/**

185

* Builds a scan with column pruning

186

* @param requiredColumns Array of column names to include in the scan

187

* @return RDD[Row] containing only the required columns

188

*/

189

def buildScan(requiredColumns: Array[String]): RDD[Row]

190

191

/**

192

* Gets the SQL context

193

* @return SQLContext used for this relation

194

*/

195

def sqlContext: SQLContext

196

}

197

```

198

199

## Usage Examples

200

201

**Dataset Scanner Iterator Usage:**

202

203

```scala

204

import co.cask.cdap.app.runtime.spark.data.DatumScannerIterator

205

import co.cask.cdap.api.dataset.Dataset

206

207

// Create a scanner for a dataset

208

val dataset: Dataset = // ... obtain dataset instance

209

val scanner = dataset.scan()

210

val iterator = new DatumScannerIterator(scanner)

211

212

// Use iterator to process data

213

try {

214

while (iterator.hasNext) {

215

val record = iterator.next()

216

// Process record

217

println(s"Processing record: $record")

218

}

219

} finally {

220

iterator.close()

221

}

222

223

// Use with Spark RDD

224

val rdd = sparkContext.parallelize(Seq(iterator))

225

val processedRDD = rdd.flatMap(_.toSeq)

226

```

227

228

**Serializable Stream Event Usage:**

229

230

```java

231

import co.cask.cdap.app.runtime.spark.SerializableStreamEvent;

232

import java.util.Map;

233

234

// Process serializable stream events in Spark

235

JavaRDD<SerializableStreamEvent> streamRDD = // ... obtain from stream source

236

JavaRDD<String> processedRDD = streamRDD.map(event -> {

237

// Access event metadata

238

long timestamp = event.getTimestamp();

239

Map<String, String> headers = event.getHeaders();

240

241

// Process event body

242

String body = event.getBodyAsString();

243

return "Processed at " + timestamp + ": " + body;

244

});

245

```

246

247

**Dataset SQL Data Source Usage:**

248

249

```scala

250

import org.apache.spark.sql.SQLContext

251

252

// Create SQL context

253

val sqlContext = new SQLContext(sparkContext)

254

255

// Read from CDAP dataset using SQL

256

val datasetDF = sqlContext.read

257

.format("dataset")

258

.option("dataset.name", "my-dataset")

259

.option("dataset.namespace", "default")

260

.load()

261

262

// Query the dataset

263

datasetDF.createOrReplaceTempView("my_table")

264

val results = sqlContext.sql("SELECT * FROM my_table WHERE age > 21")

265

results.show()

266

267

// Use DataFrame API

268

val filteredDF = datasetDF.filter($"age" > 21).select($"name", $"age")

269

filteredDF.collect().foreach(println)

270

```

271

272

**Stream SQL Data Source Usage:**

273

274

```scala

275

import org.apache.spark.sql.SQLContext

276

import org.apache.spark.sql.types._

277

278

// Define stream schema

279

val streamSchema = StructType(Seq(

280

StructField("timestamp", LongType, nullable = false),

281

StructField("headers", MapType(StringType, StringType), nullable = true),

282

StructField("body", StringType, nullable = true)

283

))

284

285

// Read from CDAP stream

286

val streamDF = sqlContext.read

287

.format("stream")

288

.option("stream.name", "my-stream")

289

.option("stream.namespace", "default")

290

.option("stream.start.time", "2023-01-01T00:00:00Z")

291

.option("stream.end.time", "2023-12-31T23:59:59Z")

292

.schema(streamSchema)

293

.load()

294

295

// Query stream data

296

streamDF.createOrReplaceTempView("stream_events")

297

val recentEvents = sqlContext.sql(

298

"SELECT * FROM stream_events WHERE timestamp > unix_timestamp() - 3600"

299

)

300

```

301

302

## Types

303

304

```scala { .api }

305

/**

306

* Interface for dataset scanners

307

* @tparam T Type of data items being scanned

308

*/

309

trait Scanner[T] extends Closeable {

310

/**

311

* Checks if there are more items to scan

312

* @return true if more items are available

313

*/

314

def hasNext: Boolean

315

316

/**

317

* Gets the next item from the scanner

318

* @return Next data item of type T

319

*/

320

def next(): T

321

322

/**

323

* Closes the scanner and releases resources

324

*/

325

def close(): Unit

326

}

327

328

/**

329

* Base relation interface for Spark SQL data sources

330

*/

331

trait BaseRelation {

332

/**

333

* Gets the SQL context

334

* @return SQLContext for this relation

335

*/

336

def sqlContext: SQLContext

337

338

/**

339

* Gets the schema for this relation

340

* @return StructType defining the data schema

341

*/

342

def schema: StructType

343

}

344

345

/**

346

* Interface for relations that support full table scans

347

*/

348

trait TableScan {

349

/**

350

* Builds a scan over the entire relation

351

* @return RDD[Row] containing all data

352

*/

353

def buildScan(): RDD[Row]

354

}

355

356

/**

357

* Interface for relations that support column pruning

358

*/

359

trait PrunedScan {

360

/**

361

* Builds a scan with column pruning

362

* @param requiredColumns Array of column names to include

363

* @return RDD[Row] containing only the required columns

364

*/

365

def buildScan(requiredColumns: Array[String]): RDD[Row]

366

}

367

```

368

369

```java { .api }

370

/**

371

* Interface for CDAP stream events

372

*/

373

public interface StreamEvent {

374

/**

375

* Gets the event timestamp

376

* @return Timestamp in milliseconds since epoch

377

*/

378

long getTimestamp();

379

380

/**

381

* Gets the event headers

382

* @return Map of header key-value pairs

383

*/

384

Map<String, String> getHeaders();

385

386

/**

387

* Gets the event body

388

* @return ByteBuffer containing the event data

389

*/

390

ByteBuffer getBody();

391

}

392

393

/**

394

* Data source parameters for dataset access

395

*/

396

public class DatasetParameters {

397

public static final String DATASET_NAME = "dataset.name";

398

public static final String DATASET_NAMESPACE = "dataset.namespace";

399

public static final String DATASET_ARGUMENTS = "dataset.arguments";

400

}

401

402

/**

403

* Data source parameters for stream access

404

*/

405

public class StreamParameters {

406

public static final String STREAM_NAME = "stream.name";

407

public static final String STREAM_NAMESPACE = "stream.namespace";

408

public static final String STREAM_START_TIME = "stream.start.time";

409

public static final String STREAM_END_TIME = "stream.end.time";

410

public static final String STREAM_BATCH_SIZE = "stream.batch.size";

411

}

412

```