or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-transformations.mdcore-streaming.mdindex.mdinput-sources.mdjava-api.md

input-sources.mddocs/

0

# Input Sources and Data Ingestion

1

2

Comprehensive data ingestion capabilities supporting various input sources including files, sockets, queues, and custom receivers. Provides both receiver-based and direct stream approaches for fault-tolerant data consumption.

3

4

## Capabilities

5

6

### File-based Input Streams

7

8

Monitor directories for new files and process them as streaming data.

9

10

```scala { .api }

11

class StreamingContext {

12

/** Monitor directory for new text files */

13

def textFileStream(directory: String): DStream[String]

14

15

/** Monitor directory for files using Hadoop InputFormat */

16

def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): InputDStream[(K, V)]

17

18

/** Monitor directory with custom key/value/format and filtering function */

19

def fileStream[K, V, F <: NewInputFormat[K, V]](

20

directory: String,

21

filter: Path => Boolean,

22

newFilesOnly: Boolean

23

): InputDStream[(K, V)]

24

25

/** Monitor directory with additional configuration */

26

def fileStream[K, V, F <: NewInputFormat[K, V]](

27

directory: String,

28

filter: Path => Boolean,

29

newFilesOnly: Boolean,

30

conf: Configuration

31

): InputDStream[(K, V)]

32

33

/** Monitor directory for binary records with fixed length */

34

def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]

35

}

36

```

37

38

**Usage Examples:**

39

40

```scala

41

import org.apache.spark.streaming._

42

43

// Monitor directory for text files

44

val lines = ssc.textFileStream("/path/to/text/files")

45

46

// Monitor for JSON files using Hadoop TextInputFormat

47

val jsonFiles = ssc.fileStream[LongWritable, Text, TextInputFormat]("/path/to/json")

48

.map(_._2.toString)

49

50

// Monitor with custom filter for CSV files only

51

val csvFiles = ssc.fileStream[LongWritable, Text, TextInputFormat](

52

"/path/to/files",

53

(path: Path) => path.getName.endsWith(".csv"),

54

newFilesOnly = true

55

)

56

57

// Binary data with fixed record length

58

val binaryData = ssc.binaryRecordsStream("/path/to/binary", recordLength = 1024)

59

```

60

61

### Socket-based Input Streams

62

63

Receive streaming data from TCP socket connections.

64

65

```scala { .api }

66

class StreamingContext {

67

/** Receive text data from TCP socket */

68

def socketTextStream(hostname: String, port: Int): ReceiverInputDStream[String]

69

70

/** Receive text with custom storage level */

71

def socketTextStream(

72

hostname: String,

73

port: Int,

74

storageLevel: StorageLevel

75

): ReceiverInputDStream[String]

76

77

/** Receive data with custom converter function */

78

def socketStream[T](

79

hostname: String,

80

port: Int,

81

converter: (InputStream) => Iterator[T],

82

storageLevel: StorageLevel

83

): ReceiverInputDStream[T]

84

85

/** Receive raw bytes from socket */

86

def rawSocketStream[T](

87

hostname: String,

88

port: Int,

89

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

90

): ReceiverInputDStream[T]

91

}

92

```

93

94

**Usage Examples:**

95

96

```scala

97

// Basic text socket stream

98

val socketLines = ssc.socketTextStream("localhost", 9999)

99

100

// Socket stream with persistence configuration

101

val persistentSocket = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_ONLY)

102

103

// Custom data format from socket

104

val customSocket = ssc.socketStream[MyData](

105

"localhost",

106

8888,

107

inputStream => {

108

// Custom deserialization logic

109

val buffer = new Array[Byte](1024)

110

Iterator.continually {

111

inputStream.read(buffer)

112

MyData.deserialize(buffer)

113

}.takeWhile(_ != null)

114

},

115

StorageLevel.MEMORY_ONLY

116

)

117

```

118

119

### Queue-based Input Streams

120

121

Create DStreams from a queue of RDDs, useful for testing and controlled data injection.

122

123

```scala { .api }

124

class StreamingContext {

125

/** Create DStream from queue of RDDs */

126

def queueStream[T](queue: Queue[RDD[T]]): InputDStream[T]

127

128

/** Create with oneAtATime processing flag */

129

def queueStream[T](queue: Queue[RDD[T]], oneAtATime: Boolean): InputDStream[T]

130

131

/** Create with default RDD for empty batches */

132

def queueStream[T](

133

queue: Queue[RDD[T]],

134

oneAtATime: Boolean,

135

defaultRDD: RDD[T]

136

): InputDStream[T]

137

}

138

```

139

140

**Usage Examples:**

141

142

```scala

143

import scala.collection.mutable.Queue

144

145

// Create queue and RDDs

146

val rddQueue = new Queue[RDD[Int]]()

147

val rdd1 = ssc.sparkContext.parallelize(1 to 100)

148

val rdd2 = ssc.sparkContext.parallelize(101 to 200)

149

150

rddQueue.enqueue(rdd1, rdd2)

151

152

// Create queue stream

153

val queueStream = ssc.queueStream(rddQueue)

154

155

// Process one RDD at a time

156

val orderedStream = ssc.queueStream(rddQueue, oneAtATime = true)

157

158

// With default RDD for empty periods

159

val defaultRDD = ssc.sparkContext.parallelize(Seq(0))

160

val streamWithDefault = ssc.queueStream(rddQueue, oneAtATime = false, defaultRDD)

161

```

162

163

### Custom Receiver Input Streams

164

165

Create DStreams using custom Receiver implementations for specialized data sources.

166

167

```scala { .api }

168

class StreamingContext {

169

/** Create DStream from custom Receiver */

170

def receiverStream[T](receiver: Receiver[T]): ReceiverInputDStream[T]

171

}

172

173

abstract class Receiver[T](storageLevel: StorageLevel) extends Serializable {

174

/** Called when receiver is started */

175

def onStart(): Unit

176

177

/** Called when receiver is stopped */

178

def onStop(): Unit

179

180

/** Store single data item */

181

def store(dataItem: T): Unit

182

183

/** Store collection of data items */

184

def store(dataBuffer: ArrayBuffer[T]): Unit

185

186

/** Store iterator of data items */

187

def store(dataIterator: Iterator[T]): Unit

188

189

/** Store raw bytes */

190

def store(bytes: ByteBuffer): Unit

191

192

/** Store with metadata */

193

def store(bytes: ByteBuffer, metadata: Any): Unit

194

195

/** Restart receiver with message */

196

def restart(message: String): Unit

197

198

/** Restart with message and error */

199

def restart(message: String, error: Throwable): Unit

200

201

/** Stop receiver with message */

202

def stop(message: String): Unit

203

204

/** Stop with message and error */

205

def stop(message: String, error: Throwable): Unit

206

207

/** Report error */

208

def reportError(message: String, throwable: Throwable): Unit

209

210

/** Check if receiver is started */

211

def isStarted(): Boolean

212

213

/** Check if receiver is stopped */

214

def isStopped(): Boolean

215

216

/** Preferred execution location */

217

def preferredLocation: Option[String]

218

219

/** Associated stream ID */

220

def streamId: Int

221

}

222

```

223

224

**Usage Examples:**

225

226

```scala

227

import java.net._

228

import java.io._

229

230

// Custom receiver for UDP data

231

class UDPReceiver(port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {

232

private var socket: DatagramSocket = _

233

234

def onStart(): Unit = {

235

new Thread("UDP Receiver") {

236

override def run(): Unit = receive()

237

}.start()

238

}

239

240

def onStop(): Unit = {

241

if (socket != null) {

242

socket.close()

243

socket = null

244

}

245

}

246

247

private def receive(): Unit = {

248

try {

249

socket = new DatagramSocket(port)

250

val buffer = new Array[Byte](1024)

251

252

while (!isStopped()) {

253

val packet = new DatagramPacket(buffer, buffer.length)

254

socket.receive(packet)

255

val data = new String(packet.getData, 0, packet.getLength)

256

store(data)

257

}

258

} catch {

259

case e: Exception if !isStopped() =>

260

restart("Error receiving data", e)

261

}

262

}

263

}

264

265

// Use custom receiver

266

val udpStream = ssc.receiverStream(new UDPReceiver(9999))

267

268

// Custom receiver with batching

269

class BatchedReceiver[T](batchSize: Int) extends Receiver[T](StorageLevel.MEMORY_ONLY) {

270

private val buffer = new ArrayBuffer[T]()

271

272

def onStart(): Unit = {

273

// Implementation

274

}

275

276

def onStop(): Unit = {

277

// Flush remaining data

278

if (buffer.nonEmpty) {

279

store(buffer)

280

buffer.clear()

281

}

282

}

283

284

private def addData(item: T): Unit = {

285

buffer += item

286

if (buffer.size >= batchSize) {

287

store(buffer.toArray)

288

buffer.clear()

289

}

290

}

291

}

292

```

293

294

### Union Operations

295

296

Combine multiple input streams into a single DStream.

297

298

```scala { .api }

299

class StreamingContext {

300

/** Union multiple DStreams */

301

def union[T](streams: Seq[DStream[T]]): DStream[T]

302

}

303

```

304

305

**Usage Examples:**

306

307

```scala

308

// Multiple input sources

309

val fileStream = ssc.textFileStream("/path/to/files")

310

val socketStream = ssc.socketTextStream("localhost", 9999)

311

val queueStream = ssc.queueStream(rddQueue)

312

313

// Union all streams

314

val combinedStream = ssc.union(Seq(fileStream, socketStream, queueStream))

315

316

// Process combined data

317

combinedStream.foreachRDD { rdd =>

318

println(s"Combined batch size: ${rdd.count()}")

319

}

320

```

321

322

### Input Stream Base Classes

323

324

Base abstractions for implementing custom input streams.

325

326

```scala { .api }

327

/**

328

* Base class for input streams that receive data into Spark Streaming

329

*/

330

abstract class InputDStream[T](ssc: StreamingContext) extends DStream[T](ssc) {

331

/** Start the stream */

332

def start(): Unit

333

334

/** Stop the stream */

335

def stop(): Unit

336

}

337

338

/**

339

* Input streams that use receivers to receive data

340

*/

341

abstract class ReceiverInputDStream[T](ssc: StreamingContext) extends InputDStream[T](ssc) {

342

/** Get receiver for this input stream */

343

def getReceiver(): Receiver[T]

344

}

345

346

/**

347

* File-based input stream implementation

348

*/

349

class FileInputDStream[K, V, F <: NewInputFormat[K, V]](

350

ssc: StreamingContext,

351

directory: String,

352

filter: Path => Boolean,

353

newFilesOnly: Boolean,

354

conf: Option[Configuration]

355

) extends InputDStream[(K, V)](ssc)

356

357

/**

358

* Socket-based input stream implementation

359

*/

360

class SocketInputDStream[T](

361

ssc: StreamingContext,

362

host: String,

363

port: Int,

364

converter: (InputStream) => Iterator[T],

365

storageLevel: StorageLevel

366

) extends ReceiverInputDStream[T](ssc)

367

368

/**

369

* Queue-based input stream implementation

370

*/

371

class QueueInputDStream[T](

372

ssc: StreamingContext,

373

queue: Queue[RDD[T]],

374

oneAtATime: Boolean,

375

defaultRDD: Option[RDD[T]]

376

) extends InputDStream[T](ssc)

377

```