or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-streaming.mdindex.mdinput-sources.mdjava-api.mdmonitoring-listeners.mdstate-management.mdweb-ui.md

input-sources.mddocs/

0

# Input Sources

1

2

Data ingestion capabilities for reading from various external sources including files, sockets, message queues, and custom receivers.

3

4

## Capabilities

5

6

### Socket Streams

7

8

Create DStreams from TCP socket connections for real-time text or binary data ingestion.

9

10

```scala { .api }

11

/**

12

* Create a DStream from TCP socket text stream

13

* @param hostname - Host to connect to

14

* @param port - Port number

15

* @param storageLevel - Storage level for received data

16

* @return DStream of strings from socket

17

*/

18

def socketTextStream(

19

hostname: String,

20

port: Int,

21

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

22

): DStream[String]

23

24

/**

25

* Create a DStream from TCP socket with custom converter

26

* @param hostname - Host to connect to

27

* @param port - Port number

28

* @param converter - Function to convert bytes to objects

29

* @param storageLevel - Storage level for received data

30

*/

31

def socketStream[T](

32

hostname: String,

33

port: Int,

34

converter: (InputStream) => Iterator[T],

35

storageLevel: StorageLevel

36

): DStream[T]

37

38

/**

39

* Create a DStream of raw bytes from TCP socket

40

*/

41

def rawSocketStream[T](

42

hostname: String,

43

port: Int,

44

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

45

): DStream[T]

46

```

47

48

**Usage Examples:**

49

50

```scala

51

// Basic text stream from socket

52

val lines = ssc.socketTextStream("localhost", 9999)

53

54

// Custom object stream with converter

55

val events = ssc.socketStream(

56

"event-server", 8080,

57

(inputStream: InputStream) => {

58

val reader = new BufferedReader(new InputStreamReader(inputStream))

59

Iterator.continually(reader.readLine()).takeWhile(_ != null)

60

.map(line => parseEvent(line))

61

},

62

StorageLevel.MEMORY_AND_DISK

63

)

64

```

65

66

### File Streams

67

68

Monitor file systems and create DStreams from new files appearing in directories.

69

70

```scala { .api }

71

/**

72

* Monitor directory for new text files

73

* @param directory - Directory to monitor

74

* @return DStream of file contents as strings

75

*/

76

def textFileStream(directory: String): DStream[String]

77

78

/**

79

* Monitor directory for files of specific format

80

* @param directory - Directory to monitor

81

* @param filter - Optional file filter function

82

* @param newFilesOnly - Process only new files vs all files

83

*/

84

def fileStream[K, V, F <: NewInputFormat[K, V]](

85

directory: String,

86

filter: Path => Boolean = acceptAllFiles,

87

newFilesOnly: Boolean = true

88

): DStream[(K, V)]

89

90

/**

91

* Read binary records from files

92

* @param directory - Directory to monitor

93

* @param recordLength - Length of each binary record

94

*/

95

def binaryRecordsStream(

96

directory: String,

97

recordLength: Int

98

): DStream[Array[Byte]]

99

```

100

101

**Usage Examples:**

102

103

```scala

104

// Monitor directory for text files

105

val logs = ssc.textFileStream("hdfs://namenode/logs")

106

107

// Monitor for JSON files with custom processing

108

val jsonFiles = ssc.fileStream[LongWritable, Text, TextInputFormat](

109

"hdfs://namenode/json-data",

110

(path: Path) => path.getName.endsWith(".json"),

111

newFilesOnly = true

112

)

113

114

// Process binary log files

115

val binaryLogs = ssc.binaryRecordsStream("hdfs://namenode/binary-logs", 1024)

116

```

117

118

### Queue Streams

119

120

Create DStreams from queues of RDDs, primarily used for testing and debugging streaming applications.

121

122

```scala { .api }

123

/**

124

* Create a DStream from a queue of RDDs

125

* @param queue - Queue containing RDDs to process

126

* @param oneAtATime - Process one RDD per batch vs all available

127

* @param defaultRDD - Default RDD when queue is empty

128

*/

129

def queueStream[T](

130

queue: Queue[RDD[T]],

131

oneAtATime: Boolean = true,

132

defaultRDD: RDD[T] = null

133

): DStream[T]

134

```

135

136

**Usage Examples:**

137

138

```scala

139

import scala.collection.mutable.Queue

140

141

// Create test data queue

142

val rddQueue = new Queue[RDD[Int]]()

143

val queueStream = ssc.queueStream(rddQueue)

144

145

// Add test RDDs to queue

146

for (i <- 1 to 10) {

147

rddQueue += ssc.sparkContext.parallelize(Seq(i, i+1, i+2))

148

}

149

150

// Process the queue stream

151

queueStream.map(_ * 2).print()

152

```

153

154

### Receiver-based Streams

155

156

Framework for creating custom receivers to ingest data from external sources.

157

158

```scala { .api }

159

/**

160

* Create input stream using a custom receiver

161

* @param receiver - Custom receiver implementation

162

*/

163

def receiverStream[T](receiver: Receiver[T]): DStream[T]

164

165

/**

166

* Abstract base class for custom receivers

167

*/

168

abstract class Receiver[T](storageLevel: StorageLevel) extends Serializable {

169

170

/** Called when receiver is started */

171

def onStart(): Unit

172

173

/** Called when receiver is stopped */

174

def onStop(): Unit

175

176

/** Store single data item */

177

def store(dataItem: T): Unit

178

179

/** Store multiple data items */

180

def store(dataBuffer: ArrayBuffer[T]): Unit

181

182

/** Store data from iterator */

183

def store(dataIterator: Iterator[T]): Unit

184

185

/** Store raw bytes */

186

def store(bytes: ByteBuffer): Unit

187

188

/** Stop receiver with message */

189

def stop(message: String): Unit

190

191

/** Restart receiver with message */

192

def restart(message: String): Unit

193

194

/** Report error */

195

def reportError(message: String, throwable: Throwable): Unit

196

197

/** Check if receiver is started */

198

def isStarted(): Boolean

199

200

/** Check if receiver is stopped */

201

def isStopped(): Boolean

202

203

/** Preferred location for receiver */

204

def preferredLocation: Option[String]

205

}

206

```

207

208

**Usage Examples:**

209

210

```scala

211

// Custom receiver for external API

212

class ApiReceiver(apiUrl: String) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {

213

private var client: ApiClient = _

214

215

def onStart(): Unit = {

216

client = new ApiClient(apiUrl)

217

// Start background thread to fetch data

218

new Thread(() => {

219

while (!isStopped()) {

220

try {

221

val data = client.fetchData()

222

store(data)

223

Thread.sleep(1000)

224

} catch {

225

case e: Exception => reportError("API fetch failed", e)

226

}

227

}

228

}).start()

229

}

230

231

def onStop(): Unit = {

232

if (client != null) client.close()

233

}

234

}

235

236

// Use custom receiver

237

val apiStream = ssc.receiverStream(new ApiReceiver("https://api.example.com/stream"))

238

```

239

240

### Input Stream Classes

241

242

Base classes and implementations for different types of input streams.

243

244

```scala { .api }

245

/**

246

* Base class for all input streams

247

*/

248

abstract class InputDStream[T](ssc: StreamingContext) extends DStream[T] {

249

250

/** Duration between batches */

251

override def slideDuration: Duration = ssc.graph.batchDuration

252

253

/** Input streams have no dependencies */

254

override def dependencies: List[DStream[_]] = List()

255

256

/** Compute RDD for given time */

257

override def compute(validTime: Time): Option[RDD[T]]

258

}

259

260

/**

261

* Base class for receiver-based input streams

262

*/

263

abstract class ReceiverInputDStream[T](ssc: StreamingContext) extends InputDStream[T](ssc) {

264

265

/** Get the receiver for this input stream */

266

def getReceiver(): Receiver[T]

267

268

/** Start the receiver */

269

def start(): Unit

270

271

/** Stop the receiver */

272

def stop(): Unit

273

}

274

275

/**

276

* Input stream for file monitoring

277

*/

278

class FileInputDStream[K, V, F <: NewInputFormat[K, V]](

279

ssc: StreamingContext,

280

directory: String,

281

filter: Path => Boolean,

282

newFilesOnly: Boolean

283

) extends InputDStream[(K, V)](ssc)

284

285

/**

286

* Input stream for socket connections

287

*/

288

class SocketInputDStream[T](

289

ssc: StreamingContext,

290

host: String,

291

port: Int,

292

converter: (InputStream) => Iterator[T],

293

storageLevel: StorageLevel

294

) extends ReceiverInputDStream[T](ssc)

295

296

/**

297

* Input stream from RDD queue (for testing)

298

*/

299

class QueueInputDStream[T](

300

ssc: StreamingContext,

301

queue: Queue[RDD[T]],

302

oneAtATime: Boolean,

303

defaultRDD: RDD[T]

304

) extends InputDStream[T](ssc)

305

306

/**

307

* Input stream that generates constant RDD

308

*/

309

class ConstantInputDStream[T](

310

ssc: StreamingContext,

311

rdd: RDD[T]

312

) extends InputDStream[T](ssc)

313

```

314

315

## External Source Integration

316

317

Spark Streaming integrates with many external sources through additional libraries:

318

319

- **Kafka**: `spark-streaming-kafka` for Apache Kafka integration

320

- **Flume**: `spark-streaming-flume` for Apache Flume integration

321

- **Kinesis**: `spark-streaming-kinesis-asl` for Amazon Kinesis

322

- **Twitter**: `spark-streaming-twitter` for Twitter API

323

- **MQTT**: `spark-streaming-mqtt` for MQTT message brokers

324

- **ZeroMQ**: `spark-streaming-zeromq` for ZeroMQ messaging

325

326

Each integration provides specialized input stream methods added to StreamingContext through implicit conversions.