or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-streams.mdevent-monitoring.mdindex.mdinput-sources.mdjava-api.mdkey-value-ops.mdstate-management.mdstreaming-context.mdwindow-ops.md

input-sources.mddocs/

0

# Input Sources

1

2

Input sources in Spark Streaming provide mechanisms for ingesting data from external systems. These sources create InputDStreams that continuously receive data and convert it into a stream of RDDs for processing.

3

4

## Capabilities

5

6

### Socket-Based Input Streams

7

8

Input streams that connect to TCP sockets to receive data.

9

10

```scala { .api }

11

/**

12

* Create text input stream from TCP socket

13

* @param hostname - Hostname to connect to

14

* @param port - Port number to connect to

15

* @param storageLevel - Storage level for received data (default: MEMORY_AND_DISK_SER_2)

16

* @returns ReceiverInputDStream of strings

17

*/

18

def socketTextStream(

19

hostname: String,

20

port: Int,

21

storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2

22

): ReceiverInputDStream[String]

23

24

/**

25

* Create binary input stream from TCP socket with custom converter

26

* @param hostname - Hostname to connect to

27

* @param port - Port number to connect to

28

* @param converter - Function to convert InputStream to Iterator[T]

29

* @param storageLevel - Storage level for received data

30

* @returns ReceiverInputDStream of converted type T

31

*/

32

def socketStream[T: ClassTag](

33

hostname: String,

34

port: Int,

35

converter: (InputStream) => Iterator[T],

36

storageLevel: StorageLevel

37

): ReceiverInputDStream[T]

38

39

/**

40

* Create raw TCP socket stream for binary data

41

* @param hostname - Hostname to connect to

42

* @param port - Port number to connect to

43

* @param storageLevel - Storage level for received data (default: MEMORY_AND_DISK_SER_2)

44

* @returns ReceiverInputDStream of byte arrays

45

*/

46

def rawSocketStream[T: ClassTag](

47

hostname: String,

48

port: Int,

49

storageLevel: StorageLevel = MEMORY_AND_DISK_SER_2

50

): ReceiverInputDStream[T]

51

```

52

53

**Usage Examples:**

54

55

```scala

56

// Basic text socket stream

57

val lines = ssc.socketTextStream("localhost", 9999)

58

59

// Custom converter for JSON parsing

60

val jsonStream = ssc.socketStream(

61

"data-server", 8080,

62

(inputStream: InputStream) => {

63

scala.io.Source.fromInputStream(inputStream)

64

.getLines()

65

.map(parseJson)

66

},

67

StorageLevel.MEMORY_ONLY

68

)

69

70

// Raw socket for binary protocols

71

val binaryStream = ssc.rawSocketStream[Array[Byte]]("binary-server", 7777)

72

```

73

74

### File System Input Streams

75

76

Input streams that monitor file systems for new files and process them as they arrive.

77

78

```scala { .api }

79

/**

80

* Create input stream from text files in a directory

81

* @param directory - Directory path to monitor for new files

82

* @returns DStream of strings (file contents line by line)

83

*/

84

def textFileStream(directory: String): DStream[String]

85

86

/**

87

* Create input stream from binary files with fixed record length

88

* @param directory - Directory path to monitor for new files

89

* @param recordLength - Length of each record in bytes

90

* @returns DStream of byte arrays

91

*/

92

def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]

93

94

/**

95

* Create generic file input stream using Hadoop InputFormat

96

* @param directory - Directory path to monitor for new files

97

* @returns InputDStream of key-value pairs based on input format F

98

*/

99

def fileStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](

100

directory: String

101

): InputDStream[(K, V)]

102

103

/**

104

* Create file input stream with custom key, value, and input format classes

105

* @param directory - Directory path to monitor

106

* @param keyClass - Class of keys

107

* @param valueClass - Class of values

108

* @param inputFormatClass - Hadoop InputFormat class

109

* @param filter - Function to filter files (optional)

110

* @param newFilesOnly - Whether to process only new files (default: true)

111

* @returns InputDStream of key-value pairs

112

*/

113

def fileStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag](

114

directory: String,

115

keyClass: Class[K],

116

valueClass: Class[V],

117

inputFormatClass: Class[F],

118

filter: Path => Boolean = null,

119

newFilesOnly: Boolean = true

120

): InputDStream[(K, V)]

121

```

122

123

**Usage Examples:**

124

125

```scala

126

// Monitor directory for text files

127

val logFiles = ssc.textFileStream("/var/log/app")

128

129

// Process binary data files

130

val binaryData = ssc.binaryRecordsStream("/data/binary", 1024)

131

132

// Process Hadoop sequence files

133

val sequenceFiles = ssc.fileStream[Text, Text, SequenceFileInputFormat[Text, Text]]("/data/sequence")

134

135

// Custom file processing with filter

136

val csvFiles = ssc.fileStream[LongWritable, Text, TextInputFormat](

137

"/data/csv",

138

classOf[LongWritable],

139

classOf[Text],

140

classOf[TextInputFormat],

141

(path: Path) => path.getName.endsWith(".csv")

142

)

143

```

144

145

### Queue-Based Input Streams

146

147

Input streams created from queues of RDDs, useful for testing and programmatic data injection.

148

149

```scala { .api }

150

/**

151

* Create input stream from queue of RDDs

152

* @param queue - Queue containing RDDs to process

153

* @param oneAtATime - Whether to process one RDD per batch (default: true)

154

* @param defaultRDD - Default RDD when queue is empty (optional)

155

* @returns InputDStream of queue elements

156

*/

157

def queueStream[T: ClassTag](

158

queue: Queue[RDD[T]],

159

oneAtATime: Boolean = true,

160

defaultRDD: RDD[T] = null

161

): InputDStream[T]

162

```

163

164

**Usage Examples:**

165

166

```scala

167

import scala.collection.mutable.Queue

168

169

// Create queue and add RDDs

170

val rddQueue = Queue[RDD[Int]]()

171

val queueStream = ssc.queueStream(rddQueue)

172

173

// Add data to queue (typically done in another thread)

174

for (i <- 1 to 10) {

175

rddQueue += ssc.sparkContext.parallelize(1 to 100, 2)

176

}

177

178

// Process one RDD at a time

179

val singleRDDStream = ssc.queueStream(rddQueue, oneAtATime = true)

180

```

181

182

### Custom Receiver Input Streams

183

184

Input streams using custom receiver implementations for specialized data sources.

185

186

```scala { .api }

187

/**

188

* Create input stream from custom receiver

189

* @param receiver - Custom receiver implementation extending Receiver[T]

190

* @returns ReceiverInputDStream from the receiver

191

*/

192

def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]

193

194

/**

195

* Create input stream from multiple receivers (union of all receivers)

196

* @param receivers - Sequence of receivers to union

197

* @returns Combined ReceiverInputDStream from all receivers

198

*/

199

def union[T: ClassTag](receivers: Seq[ReceiverInputDStream[T]]): DStream[T]

200

```

201

202

**Custom Receiver Implementation:**

203

204

```scala

205

abstract class Receiver[T](storageLevel: StorageLevel) extends Serializable {

206

/**

207

* Start the receiver - implement data receiving logic

208

*/

209

def onStart(): Unit

210

211

/**

212

* Stop the receiver - implement cleanup logic

213

*/

214

def onStop(): Unit

215

216

/**

217

* Store received data

218

* @param data - Single data item to store

219

*/

220

def store(data: T): Unit

221

222

/**

223

* Store multiple data items

224

* @param data - Iterator of data items to store

225

*/

226

def store(data: Iterator[T]): Unit

227

228

/**

229

* Report error to driver

230

* @param message - Error message

231

* @param throwable - Exception that occurred (optional)

232

*/

233

def reportError(message: String, throwable: Throwable = null): Unit

234

235

/**

236

* Check if receiver is stopped

237

* @returns true if receiver has been stopped

238

*/

239

def isStopped(): Boolean

240

}

241

```

242

243

**Usage Examples:**

244

245

```scala

246

// Custom HTTP receiver

247

class HttpReceiver(url: String, storageLevel: StorageLevel)

248

extends Receiver[String](storageLevel) {

249

250

var httpClient: HttpClient = _

251

252

def onStart() {

253

httpClient = new HttpClient()

254

// Start background thread to poll HTTP endpoint

255

new Thread("Http Receiver") {

256

override def run() {

257

while (!isStopped()) {

258

val response = httpClient.get(url)

259

store(response.body)

260

Thread.sleep(1000)

261

}

262

}

263

}.start()

264

}

265

266

def onStop() {

267

if (httpClient != null) httpClient.close()

268

}

269

}

270

271

// Use custom receiver

272

val httpStream = ssc.receiverStream(new HttpReceiver("http://api.example.com/data", MEMORY_ONLY))

273

```

274

275

### Union Operations for Input Streams

276

277

Combine multiple input streams into a single stream.

278

279

```scala { .api }

280

/**

281

* Union multiple DStreams of the same type

282

* @param streams - Sequence of DStreams to union

283

* @returns Single DStream containing data from all input streams

284

*/

285

def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T]

286

```

287

288

**Usage Examples:**

289

290

```scala

291

// Combine multiple socket streams

292

val stream1 = ssc.socketTextStream("host1", 9999)

293

val stream2 = ssc.socketTextStream("host2", 9999)

294

val stream3 = ssc.socketTextStream("host3", 9999)

295

296

val combinedStream = ssc.union(Seq(stream1, stream2, stream3))

297

298

// Combine different types of streams

299

val fileStream = ssc.textFileStream("/data/logs")

300

val socketStream = ssc.socketTextStream("localhost", 9999)

301

val allLogs = ssc.union(Seq(fileStream, socketStream))

302

```

303

304

## Input Stream Types and Properties

305

306

### ReceiverInputDStream

307

308

Base class for receiver-based input streams that actively receive data.

309

310

```scala { .api }

311

abstract class ReceiverInputDStream[T: ClassTag](ssc: StreamingContext)

312

extends InputDStream[T](ssc) {

313

314

/**

315

* Get the receiver for this input stream

316

* @returns Receiver instance used by this stream

317

*/

318

def getReceiver(): Receiver[T]

319

}

320

```

321

322

### InputDStream

323

324

Base class for all input streams.

325

326

```scala { .api }

327

abstract class InputDStream[T: ClassTag](ssc: StreamingContext)

328

extends DStream[T](ssc) {

329

330

/**

331

* Start the input stream (called automatically by StreamingContext)

332

*/

333

def start(): Unit

334

335

/**

336

* Stop the input stream (called automatically by StreamingContext)

337

*/

338

def stop(): Unit

339

}

340

```

341

342

### Storage Levels

343

344

Constants for controlling how received data is stored and replicated.

345

346

```scala { .api }

347

import org.apache.spark.storage.StorageLevel

348

349

// Common storage levels for input streams

350

StorageLevel.MEMORY_ONLY // Store in memory only

351

StorageLevel.MEMORY_ONLY_2 // Store in memory, replicated 2x

352

StorageLevel.MEMORY_AND_DISK // Store in memory, spill to disk

353

StorageLevel.MEMORY_AND_DISK_2 // Store in memory and disk, replicated 2x

354

StorageLevel.MEMORY_AND_DISK_SER // Store serialized in memory, spill to disk

355

StorageLevel.MEMORY_AND_DISK_SER_2 // Store serialized, replicated 2x (default)

356

```

357

358

**Choosing Storage Levels:**

359

360

```scala

361

// High performance, risk of data loss

362

val fastStream = ssc.socketTextStream("host", 9999, StorageLevel.MEMORY_ONLY)

363

364

// Fault tolerant, slower

365

val reliableStream = ssc.socketTextStream("host", 9999, StorageLevel.MEMORY_AND_DISK_SER_2)

366

367

// Memory efficient

368

val compactStream = ssc.socketTextStream("host", 9999, StorageLevel.MEMORY_AND_DISK_SER)

369

```

370

371

## Advanced Input Stream Configuration

372

373

### File Stream Monitoring Behavior

374

375

File streams have specific behavior for monitoring directories:

376

377

- **New Files Only**: By default, only files created after the stream started are processed

378

- **File Atomicity**: Files should be moved into the directory atomically (rename operation)

379

- **File Formats**: Support for text files, binary files, and Hadoop InputFormats

380

- **Nested Directories**: Recursive monitoring of subdirectories is not supported

381

382

### Receiver Reliability

383

384

Receivers can be reliable or unreliable:

385

386

- **Reliable Receivers**: Acknowledge data receipt and can replay data on failure

387

- **Unreliable Receivers**: Do not acknowledge receipt, data may be lost on failure

388

389

```scala

390

// Reliable receiver pattern

391

class ReliableReceiver extends Receiver[String](MEMORY_AND_DISK_2) {

392

def onStart() {

393

// Implementation that can replay data from last acknowledged offset

394

}

395

}

396

397

// Unreliable receiver (simpler but less fault-tolerant)

398

class SimpleReceiver extends Receiver[String](MEMORY_ONLY) {

399

def onStart() {

400

// Simple implementation, data may be lost on failure

401

}

402

}

403

```

404

405

### Backpressure and Rate Limiting

406

407

Control the rate of data ingestion to prevent overwhelming the system:

408

409

```scala

410

// Enable backpressure (automatically adjusts receiving rate)

411

val conf = new SparkConf()

412

.set("spark.streaming.backpressure.enabled", "true")

413

.set("spark.streaming.backpressure.initialRate", "1000")

414

415

// Set maximum rate for receivers

416

val conf2 = new SparkConf()

417

.set("spark.streaming.receiver.maxRate", "1000")

418

```

419

420

**Configuration Examples:**

421

422

```scala

423

// Production configuration for reliable file processing

424

val conf = new SparkConf()

425

.setAppName("LogProcessor")

426

.set("spark.streaming.stopGracefullyOnShutdown", "true")

427

.set("spark.streaming.backpressure.enabled", "true")

428

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

429

430

val ssc = new StreamingContext(conf, Seconds(5))

431

ssc.checkpoint("hdfs://checkpoint")

432

433

val logStream = ssc.textFileStream("/var/log/app")

434

.filter(_.contains("ERROR"))

435

.cache()

436

437

logStream.foreachRDD { rdd =>

438

if (!rdd.isEmpty()) {

439

rdd.saveAsTextFile(s"hdfs://processed/${System.currentTimeMillis()}")

440

}

441

}

442

```