or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch.mdconfiguration.mdindex.mdstreaming.mdwriting.md

streaming.mddocs/

0

# Streaming Operations

1

2

Streaming operations allow real-time processing of Kafka topics using Spark Structured Streaming. The connector supports both micro-batch and continuous processing modes with exactly-once semantics.

3

4

## Capabilities

5

6

### Stream Reading

7

8

Create a streaming DataFrame from Kafka topics for real-time data processing.

9

10

```scala { .api }

11

/**

12

* Create a streaming DataFrame from Kafka topics

13

* Returns a DataFrame with the fixed Kafka schema

14

*/

15

spark.readStream

16

.format("kafka")

17

.option("kafka.bootstrap.servers", servers: String) // Required: Kafka bootstrap servers

18

.option("subscribe", topics: String) // Topic subscription (comma-separated)

19

.option("startingOffsets", offsets: String) // Starting position: "earliest", "latest", or JSON

20

.load(): DataFrame

21

```

22

23

**Usage Examples:**

24

25

```scala

26

import org.apache.spark.sql.SparkSession

27

28

val spark = SparkSession.builder()

29

.appName("KafkaStreaming")

30

.getOrCreate()

31

32

// Basic streaming read

33

val kafkaStream = spark.readStream

34

.format("kafka")

35

.option("kafka.bootstrap.servers", "localhost:9092")

36

.option("subscribe", "events,logs,metrics")

37

.option("startingOffsets", "latest")

38

.load()

39

40

// With consumer group configuration

41

val configuredStream = spark.readStream

42

.format("kafka")

43

.option("kafka.bootstrap.servers", "localhost:9092,localhost:9093")

44

.option("subscribe", "user-events")

45

.option("startingOffsets", "earliest")

46

.option("kafka.security.protocol", "SASL_SSL")

47

.option("kafka.sasl.mechanism", "PLAIN")

48

.load()

49

```

50

51

### Pattern-Based Subscription

52

53

Subscribe to topics using regex patterns for dynamic topic discovery.

54

55

```scala { .api }

56

/**

57

* Subscribe to topics matching a regex pattern

58

* Automatically includes new topics that match the pattern

59

*/

60

spark.readStream

61

.format("kafka")

62

.option("kafka.bootstrap.servers", servers: String)

63

.option("subscribepattern", pattern: String) // Regex pattern for topic names

64

.load(): DataFrame

65

```

66

67

**Usage Examples:**

68

69

```scala

70

// Subscribe to all topics starting with "events-"

71

val patternStream = spark.readStream

72

.format("kafka")

73

.option("kafka.bootstrap.servers", "localhost:9092")

74

.option("subscribepattern", "events-.*")

75

.option("startingOffsets", "latest")

76

.load()

77

78

// Subscribe to topics by environment

79

val envStream = spark.readStream

80

.format("kafka")

81

.option("kafka.bootstrap.servers", "localhost:9092")

82

.option("subscribepattern", s"${env}-.*") // prod-.*, dev-.*, etc.

83

.load()

84

```

85

86

### Partition Assignment

87

88

Directly assign specific Kafka partitions for fine-grained control.

89

90

```scala { .api }

91

/**

92

* Assign specific Kafka partitions for reading

93

* Provides exact control over which partitions to consume

94

*/

95

spark.readStream

96

.format("kafka")

97

.option("kafka.bootstrap.servers", servers: String)

98

.option("assign", partitionsJson: String) // JSON specification of TopicPartitions

99

.load(): DataFrame

100

```

101

102

**Usage Examples:**

103

104

```scala

105

// Assign specific partitions

106

val assignedStream = spark.readStream

107

.format("kafka")

108

.option("kafka.bootstrap.servers", "localhost:9092")

109

.option("assign", """{"events":[0,1,2],"logs":[0]}""")

110

.option("startingOffsets", "earliest")

111

.load()

112

113

// Assign partitions with specific offsets

114

val offsetStream = spark.readStream

115

.format("kafka")

116

.option("kafka.bootstrap.servers", "localhost:9092")

117

.option("assign", """{"events":[0,1],"logs":[0,1]}""")

118

.option("startingOffsets", """{"events":{"0":100,"1":200},"logs":{"0":50,"1":75}}""")

119

.load()

120

```

121

122

### Offset Management

123

124

Control exactly where streaming starts and how offsets are managed.

125

126

```scala { .api }

127

/**

128

* Offset specification options for streaming reads

129

*/

130

// Starting from specific positions

131

.option("startingOffsets", "earliest") // Start from earliest available

132

.option("startingOffsets", "latest") // Start from latest available

133

.option("startingOffsets", offsetJson) // Start from specific offsets (JSON)

134

135

// Timestamp-based offset resolution

136

.option("startingTimestamp", timestamp: String) // Global timestamp (ms since epoch)

137

.option("startingOffsetsByTimestamp", timestampJson) // Per-partition timestamps (JSON)

138

```

139

140

**Usage Examples:**

141

142

```scala

143

// Start from earliest available offsets

144

val earliestStream = spark.readStream

145

.format("kafka")

146

.option("kafka.bootstrap.servers", "localhost:9092")

147

.option("subscribe", "events")

148

.option("startingOffsets", "earliest")

149

.load()

150

151

// Start from specific timestamp

152

val timestampStream = spark.readStream

153

.format("kafka")

154

.option("kafka.bootstrap.servers", "localhost:9092")

155

.option("subscribe", "events")

156

.option("startingTimestamp", "1640995200000") // Jan 1, 2022 UTC

157

.load()

158

159

// Start from specific offsets per partition

160

val specificStream = spark.readStream

161

.format("kafka")

162

.option("kafka.bootstrap.servers", "localhost:9092")

163

.option("subscribe", "events")

164

.option("startingOffsets", """{"events":{"0":1000,"1":2000,"2":1500}}""")

165

.load()

166

```

167

168

### Performance Tuning

169

170

Configure streaming performance and resource usage.

171

172

```scala { .api }

173

/**

174

* Performance tuning options for streaming operations

175

*/

176

.option("maxOffsetsPerTrigger", maxRecords: String) // Max records per micro-batch

177

.option("minOffsetsPerTrigger", minRecords: String) // Min records before triggering

178

.option("maxTriggerDelay", delay: String) // Max delay before triggering (e.g., "30s")

179

```

180

181

**Usage Examples:**

182

183

```scala

184

// Control batch sizes

185

val tunedStream = spark.readStream

186

.format("kafka")

187

.option("kafka.bootstrap.servers", "localhost:9092")

188

.option("subscribe", "high-volume-topic")

189

.option("maxOffsetsPerTrigger", "10000") // Max 10K records per batch

190

.option("minOffsetsPerTrigger", "1000") // Min 1K records before processing

191

.option("maxTriggerDelay", "30s") // Process every 30s regardless

192

.load()

193

```

194

195

### Reliability Configuration

196

197

Configure failure handling and data loss behavior.

198

199

```scala { .api }

200

/**

201

* Reliability and failure handling options

202

*/

203

.option("failOnDataLoss", failBehavior: String) // "true" or "false"

204

.option("groupIdPrefix", prefix: String) // Consumer group ID prefix

205

.option("includeHeaders", includeHeaders: String) // "true" or "false"

206

```

207

208

**Usage Examples:**

209

210

```scala

211

// Configure reliability

212

val reliableStream = spark.readStream

213

.format("kafka")

214

.option("kafka.bootstrap.servers", "localhost:9092")

215

.option("subscribe", "critical-events")

216

.option("failOnDataLoss", "true") // Fail if data is lost

217

.option("groupIdPrefix", "my-app") // Custom consumer group prefix

218

.option("includeHeaders", "true") // Include message headers in schema

219

.load()

220

221

// Handle potential data loss gracefully

222

val gracefulStream = spark.readStream

223

.format("kafka")

224

.option("kafka.bootstrap.servers", "localhost:9092")

225

.option("subscribe", "logs")

226

.option("failOnDataLoss", "false") // Continue on data loss

227

.load()

228

```

229

230

### Processing Modes

231

232

Choose between micro-batch and continuous processing modes.

233

234

```scala { .api }

235

/**

236

* Stream processing with different execution modes

237

*/

238

// Micro-batch processing (default)

239

query.trigger(Trigger.ProcessingTime("10 seconds"))

240

241

// Continuous processing (experimental)

242

query.trigger(Trigger.Continuous("1 second"))

243

```

244

245

**Usage Examples:**

246

247

```scala

248

import org.apache.spark.sql.streaming.Trigger

249

250

val stream = spark.readStream

251

.format("kafka")

252

.option("kafka.bootstrap.servers", "localhost:9092")

253

.option("subscribe", "events")

254

.load()

255

256

// Micro-batch processing every 30 seconds

257

val microBatchQuery = stream

258

.writeStream

259

.format("console")

260

.trigger(Trigger.ProcessingTime("30 seconds"))

261

.start()

262

263

// Continuous processing (low-latency)

264

val continuousQuery = stream

265

.writeStream

266

.format("console")

267

.trigger(Trigger.Continuous("1 second"))

268

.start()

269

```

270

271

## Data Processing Patterns

272

273

### Message Deserialization

274

275

```scala

276

import org.apache.spark.sql.functions._

277

278

// Extract and cast message values

279

val messages = kafkaStream

280

.select(

281

col("topic"),

282

col("partition"),

283

col("offset"),

284

col("timestamp"),

285

col("key").cast("string").as("messageKey"),

286

col("value").cast("string").as("messageValue")

287

)

288

289

// Parse JSON messages

290

val jsonMessages = kafkaStream

291

.select(

292

from_json(col("value").cast("string"), schema).as("data"),

293

col("topic"),

294

col("timestamp")

295

)

296

.select("data.*", "topic", "timestamp")

297

```

298

299

### Windowed Aggregations

300

301

```scala

302

import org.apache.spark.sql.streaming.Trigger

303

import org.apache.spark.sql.functions._

304

305

// Windowed count by topic

306

val windowedCounts = kafkaStream

307

.groupBy(

308

window(col("timestamp"), "10 minutes", "5 minutes"),

309

col("topic")

310

)

311

.count()

312

.writeStream

313

.outputMode("update")

314

.format("console")

315

.trigger(Trigger.ProcessingTime("30 seconds"))

316

.start()

317

```

318

319

### Stateful Processing

320

321

```scala

322

// Maintain state across micro-batches

323

val statefulStream = kafkaStream

324

.groupByKey(_.topic)

325

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(

326

updateFunction

327

)

328

```

329

330

## Error Handling

331

332

Common error scenarios and handling strategies:

333

334

```scala

335

// Handle deserialization errors

336

val safeMessages = kafkaStream

337

.select(

338

col("topic"),

339

col("offset"),

340

when(col("value").isNotNull,

341

col("value").cast("string")).as("messageValue")

342

)

343

.filter(col("messageValue").isNotNull)

344

345

// Monitor for data loss

346

kafkaStream.writeStream

347

.option("checkpointLocation", "/path/to/checkpoint")

348

.foreachBatch { (batchDF, batchId) =>

349

// Custom batch processing with error handling

350

try {

351

batchDF.show()

352

} catch {

353

case ex: Exception =>

354

println(s"Error processing batch $batchId: ${ex.getMessage}")

355

}

356

}

357

.start()

358

```