0
# Streaming Operations
1
2
Streaming operations allow real-time processing of Kafka topics using Spark Structured Streaming. The connector supports both micro-batch and continuous processing modes with exactly-once semantics.
3
4
## Capabilities
5
6
### Stream Reading
7
8
Create a streaming DataFrame from Kafka topics for real-time data processing.
9
10
```scala { .api }
11
/**
12
* Create a streaming DataFrame from Kafka topics
13
* Returns a DataFrame with the fixed Kafka schema
14
*/
15
spark.readStream
16
.format("kafka")
17
.option("kafka.bootstrap.servers", servers: String) // Required: Kafka bootstrap servers
18
.option("subscribe", topics: String) // Topic subscription (comma-separated)
19
.option("startingOffsets", offsets: String) // Starting position: "earliest", "latest", or JSON
20
.load(): DataFrame
21
```
22
23
**Usage Examples:**
24
25
```scala
26
import org.apache.spark.sql.SparkSession
27
28
val spark = SparkSession.builder()
29
.appName("KafkaStreaming")
30
.getOrCreate()
31
32
// Basic streaming read
33
val kafkaStream = spark.readStream
34
.format("kafka")
35
.option("kafka.bootstrap.servers", "localhost:9092")
36
.option("subscribe", "events,logs,metrics")
37
.option("startingOffsets", "latest")
38
.load()
39
40
// With consumer group configuration
41
val configuredStream = spark.readStream
42
.format("kafka")
43
.option("kafka.bootstrap.servers", "localhost:9092,localhost:9093")
44
.option("subscribe", "user-events")
45
.option("startingOffsets", "earliest")
46
.option("kafka.security.protocol", "SASL_SSL")
47
.option("kafka.sasl.mechanism", "PLAIN")
48
.load()
49
```
50
51
### Pattern-Based Subscription
52
53
Subscribe to topics using regex patterns for dynamic topic discovery.
54
55
```scala { .api }
56
/**
57
* Subscribe to topics matching a regex pattern
58
* Automatically includes new topics that match the pattern
59
*/
60
spark.readStream
61
.format("kafka")
62
.option("kafka.bootstrap.servers", servers: String)
63
.option("subscribepattern", pattern: String) // Regex pattern for topic names
64
.load(): DataFrame
65
```
66
67
**Usage Examples:**
68
69
```scala
70
// Subscribe to all topics starting with "events-"
71
val patternStream = spark.readStream
72
.format("kafka")
73
.option("kafka.bootstrap.servers", "localhost:9092")
74
.option("subscribepattern", "events-.*")
75
.option("startingOffsets", "latest")
76
.load()
77
78
// Subscribe to topics by environment
79
val envStream = spark.readStream
80
.format("kafka")
81
.option("kafka.bootstrap.servers", "localhost:9092")
82
.option("subscribepattern", s"${env}-.*") // prod-.*, dev-.*, etc.
83
.load()
84
```
85
86
### Partition Assignment
87
88
Directly assign specific Kafka partitions for fine-grained control.
89
90
```scala { .api }
91
/**
92
* Assign specific Kafka partitions for reading
93
* Provides exact control over which partitions to consume
94
*/
95
spark.readStream
96
.format("kafka")
97
.option("kafka.bootstrap.servers", servers: String)
98
.option("assign", partitionsJson: String) // JSON specification of TopicPartitions
99
.load(): DataFrame
100
```
101
102
**Usage Examples:**
103
104
```scala
105
// Assign specific partitions
106
val assignedStream = spark.readStream
107
.format("kafka")
108
.option("kafka.bootstrap.servers", "localhost:9092")
109
.option("assign", """{"events":[0,1,2],"logs":[0]}""")
110
.option("startingOffsets", "earliest")
111
.load()
112
113
// Assign partitions with specific offsets
114
val offsetStream = spark.readStream
115
.format("kafka")
116
.option("kafka.bootstrap.servers", "localhost:9092")
117
.option("assign", """{"events":[0,1],"logs":[0,1]}""")
118
.option("startingOffsets", """{"events":{"0":100,"1":200},"logs":{"0":50,"1":75}}""")
119
.load()
120
```
121
122
### Offset Management
123
124
Control exactly where streaming starts and how offsets are managed.
125
126
```scala { .api }
127
/**
128
* Offset specification options for streaming reads
129
*/
130
// Starting from specific positions
131
.option("startingOffsets", "earliest") // Start from earliest available
132
.option("startingOffsets", "latest") // Start from latest available
133
.option("startingOffsets", offsetJson) // Start from specific offsets (JSON)
134
135
// Timestamp-based offset resolution
136
.option("startingTimestamp", timestamp: String) // Global timestamp (ms since epoch)
137
.option("startingOffsetsByTimestamp", timestampJson) // Per-partition timestamps (JSON)
138
```
139
140
**Usage Examples:**
141
142
```scala
143
// Start from earliest available offsets
144
val earliestStream = spark.readStream
145
.format("kafka")
146
.option("kafka.bootstrap.servers", "localhost:9092")
147
.option("subscribe", "events")
148
.option("startingOffsets", "earliest")
149
.load()
150
151
// Start from specific timestamp
152
val timestampStream = spark.readStream
153
.format("kafka")
154
.option("kafka.bootstrap.servers", "localhost:9092")
155
.option("subscribe", "events")
156
.option("startingTimestamp", "1640995200000") // Jan 1, 2022 UTC
157
.load()
158
159
// Start from specific offsets per partition
160
val specificStream = spark.readStream
161
.format("kafka")
162
.option("kafka.bootstrap.servers", "localhost:9092")
163
.option("subscribe", "events")
164
.option("startingOffsets", """{"events":{"0":1000,"1":2000,"2":1500}}""")
165
.load()
166
```
167
168
### Performance Tuning
169
170
Configure streaming performance and resource usage.
171
172
```scala { .api }
173
/**
174
* Performance tuning options for streaming operations
175
*/
176
.option("maxOffsetsPerTrigger", maxRecords: String) // Max records per micro-batch
177
.option("minOffsetsPerTrigger", minRecords: String) // Min records before triggering
178
.option("maxTriggerDelay", delay: String) // Max delay before triggering (e.g., "30s")
179
```
180
181
**Usage Examples:**
182
183
```scala
184
// Control batch sizes
185
val tunedStream = spark.readStream
186
.format("kafka")
187
.option("kafka.bootstrap.servers", "localhost:9092")
188
.option("subscribe", "high-volume-topic")
189
.option("maxOffsetsPerTrigger", "10000") // Max 10K records per batch
190
.option("minOffsetsPerTrigger", "1000") // Min 1K records before processing
191
.option("maxTriggerDelay", "30s") // Process every 30s regardless
192
.load()
193
```
194
195
### Reliability Configuration
196
197
Configure failure handling and data loss behavior.
198
199
```scala { .api }
200
/**
201
* Reliability and failure handling options
202
*/
203
.option("failOnDataLoss", failBehavior: String) // "true" or "false"
204
.option("groupIdPrefix", prefix: String) // Consumer group ID prefix
205
.option("includeHeaders", includeHeaders: String) // "true" or "false"
206
```
207
208
**Usage Examples:**
209
210
```scala
211
// Configure reliability
212
val reliableStream = spark.readStream
213
.format("kafka")
214
.option("kafka.bootstrap.servers", "localhost:9092")
215
.option("subscribe", "critical-events")
216
.option("failOnDataLoss", "true") // Fail if data is lost
217
.option("groupIdPrefix", "my-app") // Custom consumer group prefix
218
.option("includeHeaders", "true") // Include message headers in schema
219
.load()
220
221
// Handle potential data loss gracefully
222
val gracefulStream = spark.readStream
223
.format("kafka")
224
.option("kafka.bootstrap.servers", "localhost:9092")
225
.option("subscribe", "logs")
226
.option("failOnDataLoss", "false") // Continue on data loss
227
.load()
228
```
229
230
### Processing Modes
231
232
Choose between micro-batch and continuous processing modes.
233
234
```scala { .api }
235
/**
236
* Stream processing with different execution modes
237
*/
238
// Micro-batch processing (default)
239
query.trigger(Trigger.ProcessingTime("10 seconds"))
240
241
// Continuous processing (experimental)
242
query.trigger(Trigger.Continuous("1 second"))
243
```
244
245
**Usage Examples:**
246
247
```scala
248
import org.apache.spark.sql.streaming.Trigger
249
250
val stream = spark.readStream
251
.format("kafka")
252
.option("kafka.bootstrap.servers", "localhost:9092")
253
.option("subscribe", "events")
254
.load()
255
256
// Micro-batch processing every 30 seconds
257
val microBatchQuery = stream
258
.writeStream
259
.format("console")
260
.trigger(Trigger.ProcessingTime("30 seconds"))
261
.start()
262
263
// Continuous processing (low-latency)
264
val continuousQuery = stream
265
.writeStream
266
.format("console")
267
.trigger(Trigger.Continuous("1 second"))
268
.start()
269
```
270
271
## Data Processing Patterns
272
273
### Message Deserialization
274
275
```scala
276
import org.apache.spark.sql.functions._
277
278
// Extract and cast message values
279
val messages = kafkaStream
280
.select(
281
col("topic"),
282
col("partition"),
283
col("offset"),
284
col("timestamp"),
285
col("key").cast("string").as("messageKey"),
286
col("value").cast("string").as("messageValue")
287
)
288
289
// Parse JSON messages
290
val jsonMessages = kafkaStream
291
.select(
292
from_json(col("value").cast("string"), schema).as("data"),
293
col("topic"),
294
col("timestamp")
295
)
296
.select("data.*", "topic", "timestamp")
297
```
298
299
### Windowed Aggregations
300
301
```scala
302
import org.apache.spark.sql.streaming.Trigger
303
import org.apache.spark.sql.functions._
304
305
// Windowed count by topic
306
val windowedCounts = kafkaStream
307
.groupBy(
308
window(col("timestamp"), "10 minutes", "5 minutes"),
309
col("topic")
310
)
311
.count()
312
.writeStream
313
.outputMode("update")
314
.format("console")
315
.trigger(Trigger.ProcessingTime("30 seconds"))
316
.start()
317
```
318
319
### Stateful Processing
320
321
```scala
322
// Maintain state across micro-batches
323
val statefulStream = kafkaStream
324
.groupByKey(_.topic)
325
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(
326
updateFunction
327
)
328
```
329
330
## Error Handling
331
332
Common error scenarios and handling strategies:
333
334
```scala
335
// Handle deserialization errors
336
val safeMessages = kafkaStream
337
.select(
338
col("topic"),
339
col("offset"),
340
when(col("value").isNotNull,
341
col("value").cast("string")).as("messageValue")
342
)
343
.filter(col("messageValue").isNotNull)
344
345
// Monitor for data loss
346
kafkaStream.writeStream
347
.option("checkpointLocation", "/path/to/checkpoint")
348
.foreachBatch { (batchDF, batchId) =>
349
// Custom batch processing with error handling
350
try {
351
batchDF.show()
352
} catch {
353
case ex: Exception =>
354
println(s"Error processing batch $batchId: ${ex.getMessage}")
355
}
356
}
357
.start()
358
```