or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-context-rdds.mdindex.mdlegacy-mllib.mdmachine-learning.mdpandas-api.mdresource-management.mdsql-dataframes.mdstreaming.md

streaming.mddocs/

0

# Streaming

1

2

Real-time data processing with structured streaming and DStreams for continuous data ingestion, processing, and output to various sinks. Enables processing of live data streams with fault tolerance and exactly-once semantics.

3

4

## Capabilities

5

6

### Streaming Context

7

8

Main entry point for streaming applications using discretized streams (DStreams).

9

10

```python { .api }

11

class StreamingContext:

12

"""Main entry point for Spark Streaming functionality."""

13

14

def __init__(self, sparkContext, batchDuration):

15

"""

16

Create StreamingContext.

17

18

Parameters:

19

- sparkContext (SparkContext): Spark context

20

- batchDuration: Batch duration for micro-batches

21

"""

22

23

def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):

24

"""

25

Create input stream from TCP socket.

26

27

Parameters:

28

- hostname (str): Hostname to connect to

29

- port (int): Port to connect to

30

- storageLevel (StorageLevel): Storage level for received data

31

32

Returns:

33

DStream of strings

34

"""

35

36

def textFileStream(self, directory):

37

"""

38

Create input stream that monitors directory for new files.

39

40

Parameters:

41

- directory (str): Directory to monitor

42

43

Returns:

44

DStream of strings

45

"""

46

47

def queueStream(self, rdds, oneAtATime=True, default=None):

48

"""

49

Create input stream from queue of RDDs.

50

51

Parameters:

52

- rdds: Queue of RDDs

53

- oneAtATime (bool): Process one RDD at a time

54

- default: Default RDD if queue is empty

55

56

Returns:

57

DStream

58

"""

59

60

def start(self):

61

"""Start the streaming context."""

62

63

def stop(self, stopSparkContext=True, stopGraceFully=False):

64

"""

65

Stop the streaming context.

66

67

Parameters:

68

- stopSparkContext (bool): Whether to stop SparkContext

69

- stopGraceFully (bool): Whether to stop gracefully

70

"""

71

72

def awaitTermination(self, timeout=None):

73

"""Wait for the streaming context to terminate."""

74

75

def checkpoint(self, directory):

76

"""

77

Set checkpoint directory.

78

79

Parameters:

80

- directory (str): Checkpoint directory

81

"""

82

83

class DStream:

84

"""Discretized stream representing continuous stream of data."""

85

86

def map(self, f):

87

"""

88

Apply function to each element of the DStream.

89

90

Parameters:

91

- f: Function to apply

92

93

Returns:

94

New DStream

95

"""

96

97

def filter(self, f):

98

"""

99

Filter elements of the DStream.

100

101

Parameters:

102

- f: Filter function

103

104

Returns:

105

Filtered DStream

106

"""

107

108

def flatMap(self, f):

109

"""

110

Apply function and flatten results.

111

112

Parameters:

113

- f: Function returning iterable

114

115

Returns:

116

Flattened DStream

117

"""

118

119

def union(self, other):

120

"""

121

Union with another DStream.

122

123

Parameters:

124

- other (DStream): Another DStream

125

126

Returns:

127

Union DStream

128

"""

129

130

def reduce(self, f):

131

"""

132

Reduce elements using function.

133

134

Parameters:

135

- f: Reduce function

136

137

Returns:

138

DStream with reduced elements

139

"""

140

141

def reduceByKey(self, func, numPartitions=None):

142

"""

143

Reduce by key for paired DStream.

144

145

Parameters:

146

- func: Reduce function

147

- numPartitions (int): Number of partitions

148

149

Returns:

150

DStream with reduced values per key

151

"""

152

153

def groupByKey(self, numPartitions=None):

154

"""

155

Group by key for paired DStream.

156

157

Parameters:

158

- numPartitions (int): Number of partitions

159

160

Returns:

161

DStream with grouped values per key

162

"""

163

164

def countByValue(self):

165

"""Count occurrences of each element."""

166

167

def foreachRDD(self, func):

168

"""

169

Apply function to each RDD in the DStream.

170

171

Parameters:

172

- func: Function to apply to RDDs

173

"""

174

175

def saveAsTextFiles(self, prefix, suffix=None):

176

"""

177

Save DStream as text files.

178

179

Parameters:

180

- prefix (str): File prefix

181

- suffix (str): File suffix

182

"""

183

184

def window(self, windowDuration, slideDuration=None):

185

"""

186

Create windowed DStream.

187

188

Parameters:

189

- windowDuration: Window duration

190

- slideDuration: Slide duration

191

192

Returns:

193

Windowed DStream

194

"""

195

196

def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration):

197

"""

198

Reduce over sliding window.

199

200

Parameters:

201

- reduceFunc: Reduce function

202

- invReduceFunc: Inverse reduce function

203

- windowDuration: Window duration

204

- slideDuration: Slide duration

205

206

Returns:

207

DStream with windowed reductions

208

"""

209

210

def transform(self, func):

211

"""

212

Transform each RDD using function.

213

214

Parameters:

215

- func: Transformation function

216

217

Returns:

218

Transformed DStream

219

"""

220

221

def cache(self):

222

"""Cache the DStream."""

223

224

def checkpoint(self, interval):

225

"""

226

Enable checkpointing.

227

228

Parameters:

229

- interval: Checkpoint interval

230

"""

231

232

class StreamingListener:

233

"""Listener for streaming events."""

234

235

def onBatchCompleted(self, batchCompleted):

236

"""Called when batch processing completes."""

237

238

def onBatchStarted(self, batchStarted):

239

"""Called when batch processing starts."""

240

241

def onOutputOperationCompleted(self, outputOperationCompleted):

242

"""Called when output operation completes."""

243

244

def onOutputOperationStarted(self, outputOperationStarted):

245

"""Called when output operation starts."""

246

247

def onReceiverError(self, receiverError):

248

"""Called when receiver encounters error."""

249

250

def onReceiverStarted(self, receiverStarted):

251

"""Called when receiver starts."""

252

253

def onReceiverStopped(self, receiverStopped):

254

"""Called when receiver stops."""

255

```

256

257

### Structured Streaming

258

259

High-level streaming API built on DataFrames for continuous processing.

260

261

```python { .api }

262

class DataStreamReader:

263

"""Interface for reading streaming data into DataFrames."""

264

265

def format(self, source):

266

"""

267

Specify data source format.

268

269

Parameters:

270

- source (str): Data source format

271

272

Returns:

273

DataStreamReader

274

"""

275

276

def option(self, key, value):

277

"""

278

Add input option.

279

280

Parameters:

281

- key (str): Option key

282

- value: Option value

283

284

Returns:

285

DataStreamReader

286

"""

287

288

def options(self, **options):

289

"""

290

Add input options.

291

292

Parameters:

293

- options: Keyword options

294

295

Returns:

296

DataStreamReader

297

"""

298

299

def schema(self, schema):

300

"""

301

Specify input schema.

302

303

Parameters:

304

- schema: Schema definition

305

306

Returns:

307

DataStreamReader

308

"""

309

310

def load(self, path=None, format=None, schema=None, **options):

311

"""

312

Load streaming data.

313

314

Parameters:

315

- path (str): Input path

316

- format (str): Data format

317

- schema: Input schema

318

- options: Additional options

319

320

Returns:

321

Streaming DataFrame

322

"""

323

324

def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxFilesPerTrigger=None, latestFirst=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None):

325

"""Read CSV files as streaming DataFrame."""

326

327

def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None, allowUnquotedControlChars=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None, maxFilesPerTrigger=None, latestFirst=None):

328

"""Read JSON files as streaming DataFrame."""

329

330

def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None, maxFilesPerTrigger=None, latestFirst=None):

331

"""Read Parquet files as streaming DataFrame."""

332

333

def text(self, path, wholetext=False, lineSep=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None, maxFilesPerTrigger=None, latestFirst=None):

334

"""Read text files as streaming DataFrame."""

335

336

class DataStreamWriter:

337

"""Interface for writing streaming DataFrames."""

338

339

def outputMode(self, outputMode):

340

"""

341

Specify output mode.

342

343

Parameters:

344

- outputMode (str): Output mode ('append', 'complete', 'update')

345

346

Returns:

347

DataStreamWriter

348

"""

349

350

def format(self, source):

351

"""

352

Specify output format.

353

354

Parameters:

355

- source (str): Output format

356

357

Returns:

358

DataStreamWriter

359

"""

360

361

def option(self, key, value):

362

"""

363

Add output option.

364

365

Parameters:

366

- key (str): Option key

367

- value: Option value

368

369

Returns:

370

DataStreamWriter

371

"""

372

373

def options(self, **options):

374

"""

375

Add output options.

376

377

Parameters:

378

- options: Keyword options

379

380

Returns:

381

DataStreamWriter

382

"""

383

384

def partitionBy(self, *cols):

385

"""

386

Partition output by columns.

387

388

Parameters:

389

- cols: Partition columns

390

391

Returns:

392

DataStreamWriter

393

"""

394

395

def queryName(self, queryName):

396

"""

397

Specify query name.

398

399

Parameters:

400

- queryName (str): Query name

401

402

Returns:

403

DataStreamWriter

404

"""

405

406

def trigger(self, processingTime=None, once=None, continuous=None, availableNow=None):

407

"""

408

Set trigger for stream processing.

409

410

Parameters:

411

- processingTime (str): Processing time interval

412

- once (bool): Process once and stop

413

- continuous (str): Continuous processing interval

414

- availableNow (bool): Process available data and stop

415

416

Returns:

417

DataStreamWriter

418

"""

419

420

def start(self, path=None, format=None, outputMode=None, partitionBy=None, queryName=None, **options):

421

"""

422

Start streaming query.

423

424

Parameters:

425

- path (str): Output path

426

- format (str): Output format

427

- outputMode (str): Output mode

428

- partitionBy: Partition columns

429

- queryName (str): Query name

430

- options: Additional options

431

432

Returns:

433

StreamingQuery

434

"""

435

436

def foreach(self, f):

437

"""

438

Apply function to each row.

439

440

Parameters:

441

- f: Function to apply

442

443

Returns:

444

StreamingQuery

445

"""

446

447

def foreachBatch(self, func):

448

"""

449

Apply function to each micro-batch.

450

451

Parameters:

452

- func: Function to apply to batches

453

454

Returns:

455

StreamingQuery

456

"""

457

458

class StreamingQuery:

459

"""Handle for streaming query."""

460

461

@property

462

def id(self):

463

"""Unique identifier of the query."""

464

465

@property

466

def name(self):

467

"""Name of the query."""

468

469

@property

470

def isActive(self):

471

"""Whether the query is active."""

472

473

def start(self):

474

"""Start the query."""

475

476

def stop(self):

477

"""Stop the execution of the query."""

478

479

def awaitTermination(self, timeout=None):

480

"""

481

Wait for termination of the query.

482

483

Parameters:

484

- timeout (int): Timeout in seconds

485

"""

486

487

def processAllAvailable(self):

488

"""Block until all available data is processed."""

489

490

def lastProgress(self):

491

"""Progress information of the last trigger."""

492

493

def recentProgress(self):

494

"""Progress information of recent triggers."""

495

496

def status(self):

497

"""Current status of the query."""

498

499

def exception(self):

500

"""Exception that caused the query to stop."""

501

```

502

503

## Types

504

505

```python { .api }

506

class StreamingQueryException(Exception):

507

"""Exception thrown by streaming query."""

508

pass

509

510

class StreamingQueryStatus:

511

"""Status of a streaming query."""

512

pass

513

514

class StreamingQueryProgress:

515

"""Progress information of a streaming query."""

516

pass

517

```