0
# Streaming
1
2
Real-time data processing with structured streaming and DStreams for continuous data ingestion, processing, and output to various sinks. Enables processing of live data streams with fault tolerance and exactly-once semantics.
3
4
## Capabilities
5
6
### Streaming Context
7
8
Main entry point for streaming applications using discretized streams (DStreams).
9
10
```python { .api }
11
class StreamingContext:
12
"""Main entry point for Spark Streaming functionality."""
13
14
def __init__(self, sparkContext, batchDuration):
15
"""
16
Create StreamingContext.
17
18
Parameters:
19
- sparkContext (SparkContext): Spark context
20
- batchDuration: Batch duration for micro-batches
21
"""
22
23
def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
24
"""
25
Create input stream from TCP socket.
26
27
Parameters:
28
- hostname (str): Hostname to connect to
29
- port (int): Port to connect to
30
- storageLevel (StorageLevel): Storage level for received data
31
32
Returns:
33
DStream of strings
34
"""
35
36
def textFileStream(self, directory):
37
"""
38
Create input stream that monitors directory for new files.
39
40
Parameters:
41
- directory (str): Directory to monitor
42
43
Returns:
44
DStream of strings
45
"""
46
47
def queueStream(self, rdds, oneAtATime=True, default=None):
48
"""
49
Create input stream from queue of RDDs.
50
51
Parameters:
52
- rdds: Queue of RDDs
53
- oneAtATime (bool): Process one RDD at a time
54
- default: Default RDD if queue is empty
55
56
Returns:
57
DStream
58
"""
59
60
def start(self):
61
"""Start the streaming context."""
62
63
def stop(self, stopSparkContext=True, stopGraceFully=False):
64
"""
65
Stop the streaming context.
66
67
Parameters:
68
- stopSparkContext (bool): Whether to stop SparkContext
69
- stopGraceFully (bool): Whether to stop gracefully
70
"""
71
72
def awaitTermination(self, timeout=None):
73
"""Wait for the streaming context to terminate."""
74
75
def checkpoint(self, directory):
76
"""
77
Set checkpoint directory.
78
79
Parameters:
80
- directory (str): Checkpoint directory
81
"""
82
83
class DStream:
84
"""Discretized stream representing continuous stream of data."""
85
86
def map(self, f):
87
"""
88
Apply function to each element of the DStream.
89
90
Parameters:
91
- f: Function to apply
92
93
Returns:
94
New DStream
95
"""
96
97
def filter(self, f):
98
"""
99
Filter elements of the DStream.
100
101
Parameters:
102
- f: Filter function
103
104
Returns:
105
Filtered DStream
106
"""
107
108
def flatMap(self, f):
109
"""
110
Apply function and flatten results.
111
112
Parameters:
113
- f: Function returning iterable
114
115
Returns:
116
Flattened DStream
117
"""
118
119
def union(self, other):
120
"""
121
Union with another DStream.
122
123
Parameters:
124
- other (DStream): Another DStream
125
126
Returns:
127
Union DStream
128
"""
129
130
def reduce(self, f):
131
"""
132
Reduce elements using function.
133
134
Parameters:
135
- f: Reduce function
136
137
Returns:
138
DStream with reduced elements
139
"""
140
141
def reduceByKey(self, func, numPartitions=None):
142
"""
143
Reduce by key for paired DStream.
144
145
Parameters:
146
- func: Reduce function
147
- numPartitions (int): Number of partitions
148
149
Returns:
150
DStream with reduced values per key
151
"""
152
153
def groupByKey(self, numPartitions=None):
154
"""
155
Group by key for paired DStream.
156
157
Parameters:
158
- numPartitions (int): Number of partitions
159
160
Returns:
161
DStream with grouped values per key
162
"""
163
164
def countByValue(self):
165
"""Count occurrences of each element."""
166
167
def foreachRDD(self, func):
168
"""
169
Apply function to each RDD in the DStream.
170
171
Parameters:
172
- func: Function to apply to RDDs
173
"""
174
175
def saveAsTextFiles(self, prefix, suffix=None):
176
"""
177
Save DStream as text files.
178
179
Parameters:
180
- prefix (str): File prefix
181
- suffix (str): File suffix
182
"""
183
184
def window(self, windowDuration, slideDuration=None):
185
"""
186
Create windowed DStream.
187
188
Parameters:
189
- windowDuration: Window duration
190
- slideDuration: Slide duration
191
192
Returns:
193
Windowed DStream
194
"""
195
196
def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration):
197
"""
198
Reduce over sliding window.
199
200
Parameters:
201
- reduceFunc: Reduce function
202
- invReduceFunc: Inverse reduce function
203
- windowDuration: Window duration
204
- slideDuration: Slide duration
205
206
Returns:
207
DStream with windowed reductions
208
"""
209
210
def transform(self, func):
211
"""
212
Transform each RDD using function.
213
214
Parameters:
215
- func: Transformation function
216
217
Returns:
218
Transformed DStream
219
"""
220
221
def cache(self):
222
"""Cache the DStream."""
223
224
def checkpoint(self, interval):
225
"""
226
Enable checkpointing.
227
228
Parameters:
229
- interval: Checkpoint interval
230
"""
231
232
class StreamingListener:
233
"""Listener for streaming events."""
234
235
def onBatchCompleted(self, batchCompleted):
236
"""Called when batch processing completes."""
237
238
def onBatchStarted(self, batchStarted):
239
"""Called when batch processing starts."""
240
241
def onOutputOperationCompleted(self, outputOperationCompleted):
242
"""Called when output operation completes."""
243
244
def onOutputOperationStarted(self, outputOperationStarted):
245
"""Called when output operation starts."""
246
247
def onReceiverError(self, receiverError):
248
"""Called when receiver encounters error."""
249
250
def onReceiverStarted(self, receiverStarted):
251
"""Called when receiver starts."""
252
253
def onReceiverStopped(self, receiverStopped):
254
"""Called when receiver stops."""
255
```
256
257
### Structured Streaming
258
259
High-level streaming API built on DataFrames for continuous processing.
260
261
```python { .api }
262
class DataStreamReader:
263
"""Interface for reading streaming data into DataFrames."""
264
265
def format(self, source):
266
"""
267
Specify data source format.
268
269
Parameters:
270
- source (str): Data source format
271
272
Returns:
273
DataStreamReader
274
"""
275
276
def option(self, key, value):
277
"""
278
Add input option.
279
280
Parameters:
281
- key (str): Option key
282
- value: Option value
283
284
Returns:
285
DataStreamReader
286
"""
287
288
def options(self, **options):
289
"""
290
Add input options.
291
292
Parameters:
293
- options: Keyword options
294
295
Returns:
296
DataStreamReader
297
"""
298
299
def schema(self, schema):
300
"""
301
Specify input schema.
302
303
Parameters:
304
- schema: Schema definition
305
306
Returns:
307
DataStreamReader
308
"""
309
310
def load(self, path=None, format=None, schema=None, **options):
311
"""
312
Load streaming data.
313
314
Parameters:
315
- path (str): Input path
316
- format (str): Data format
317
- schema: Input schema
318
- options: Additional options
319
320
Returns:
321
Streaming DataFrame
322
"""
323
324
def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxFilesPerTrigger=None, latestFirst=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None):
325
"""Read CSV files as streaming DataFrame."""
326
327
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None, allowUnquotedControlChars=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None, maxFilesPerTrigger=None, latestFirst=None):
328
"""Read JSON files as streaming DataFrame."""
329
330
def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None, maxFilesPerTrigger=None, latestFirst=None):
331
"""Read Parquet files as streaming DataFrame."""
332
333
def text(self, path, wholetext=False, lineSep=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None, maxFilesPerTrigger=None, latestFirst=None):
334
"""Read text files as streaming DataFrame."""
335
336
class DataStreamWriter:
337
"""Interface for writing streaming DataFrames."""
338
339
def outputMode(self, outputMode):
340
"""
341
Specify output mode.
342
343
Parameters:
344
- outputMode (str): Output mode ('append', 'complete', 'update')
345
346
Returns:
347
DataStreamWriter
348
"""
349
350
def format(self, source):
351
"""
352
Specify output format.
353
354
Parameters:
355
- source (str): Output format
356
357
Returns:
358
DataStreamWriter
359
"""
360
361
def option(self, key, value):
362
"""
363
Add output option.
364
365
Parameters:
366
- key (str): Option key
367
- value: Option value
368
369
Returns:
370
DataStreamWriter
371
"""
372
373
def options(self, **options):
374
"""
375
Add output options.
376
377
Parameters:
378
- options: Keyword options
379
380
Returns:
381
DataStreamWriter
382
"""
383
384
def partitionBy(self, *cols):
385
"""
386
Partition output by columns.
387
388
Parameters:
389
- cols: Partition columns
390
391
Returns:
392
DataStreamWriter
393
"""
394
395
def queryName(self, queryName):
396
"""
397
Specify query name.
398
399
Parameters:
400
- queryName (str): Query name
401
402
Returns:
403
DataStreamWriter
404
"""
405
406
def trigger(self, processingTime=None, once=None, continuous=None, availableNow=None):
407
"""
408
Set trigger for stream processing.
409
410
Parameters:
411
- processingTime (str): Processing time interval
412
- once (bool): Process once and stop
413
- continuous (str): Continuous processing interval
414
- availableNow (bool): Process available data and stop
415
416
Returns:
417
DataStreamWriter
418
"""
419
420
def start(self, path=None, format=None, outputMode=None, partitionBy=None, queryName=None, **options):
421
"""
422
Start streaming query.
423
424
Parameters:
425
- path (str): Output path
426
- format (str): Output format
427
- outputMode (str): Output mode
428
- partitionBy: Partition columns
429
- queryName (str): Query name
430
- options: Additional options
431
432
Returns:
433
StreamingQuery
434
"""
435
436
def foreach(self, f):
437
"""
438
Apply function to each row.
439
440
Parameters:
441
- f: Function to apply
442
443
Returns:
444
StreamingQuery
445
"""
446
447
def foreachBatch(self, func):
448
"""
449
Apply function to each micro-batch.
450
451
Parameters:
452
- func: Function to apply to batches
453
454
Returns:
455
StreamingQuery
456
"""
457
458
class StreamingQuery:
459
"""Handle for streaming query."""
460
461
@property
462
def id(self):
463
"""Unique identifier of the query."""
464
465
@property
466
def name(self):
467
"""Name of the query."""
468
469
@property
470
def isActive(self):
471
"""Whether the query is active."""
472
473
def start(self):
474
"""Start the query."""
475
476
def stop(self):
477
"""Stop the execution of the query."""
478
479
def awaitTermination(self, timeout=None):
480
"""
481
Wait for termination of the query.
482
483
Parameters:
484
- timeout (int): Timeout in seconds
485
"""
486
487
def processAllAvailable(self):
488
"""Block until all available data is processed."""
489
490
def lastProgress(self):
491
"""Progress information of the last trigger."""
492
493
def recentProgress(self):
494
"""Progress information of recent triggers."""
495
496
def status(self):
497
"""Current status of the query."""
498
499
def exception(self):
500
"""Exception that caused the query to stop."""
501
```
502
503
## Types
504
505
```python { .api }
506
class StreamingQueryException(Exception):
507
"""Exception thrown by streaming query."""
508
pass
509
510
class StreamingQueryStatus:
511
"""Status of a streaming query."""
512
pass
513
514
class StreamingQueryProgress:
515
"""Progress information of a streaming query."""
516
pass
517
```