or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-operations.mdindex.mdinput-sources.mdjava-api.mdoutput-operations.mdstateful-operations.mdtransformations.md

input-sources.mddocs/

0

# Input Sources

1

2

Methods for ingesting data streams from various external sources including network sockets, file systems, in-memory queues, and custom receivers.

3

4

## Socket-based Input Streams

5

6

### Text Socket Streams

7

8

Create DStream from socket text data:

9

```scala { .api }

10

def socketTextStream(

11

hostname: String,

12

port: Int,

13

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

14

): ReceiverInputDStream[String]

15

```

16

17

Example socket text stream:

18

```scala

19

val lines = ssc.socketTextStream("localhost", 9999)

20

val words = lines.flatMap(_.split(" "))

21

words.print()

22

```

23

24

### Custom Socket Streams

25

26

Create DStream with custom converter function:

27

```scala { .api }

28

def socketStream[T: ClassTag](

29

hostname: String,

30

port: Int,

31

converter: (InputStream) => Iterator[T],

32

storageLevel: StorageLevel

33

): ReceiverInputDStream[T]

34

```

35

36

Example with custom converter:

37

```scala

38

import java.io.InputStream

39

import scala.io.Source

40

41

def jsonConverter(inputStream: InputStream): Iterator[MyJsonObject] = {

42

Source.fromInputStream(inputStream).getLines().map(parseJson)

43

}

44

45

val jsonStream = ssc.socketStream("localhost", 8080, jsonConverter, StorageLevel.MEMORY_ONLY)

46

```

47

48

### Raw Socket Streams

49

50

Create DStream for raw binary data:

51

```scala { .api }

52

def rawSocketStream[T: ClassTag](

53

hostname: String,

54

port: Int,

55

storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

56

): ReceiverInputDStream[T]

57

```

58

59

## File-based Input Streams

60

61

### Text File Streams

62

63

Monitor directory for new text files:

64

```scala { .api }

65

def textFileStream(directory: String): DStream[String]

66

```

67

68

Example text file monitoring:

69

```scala

70

val fileStream = ssc.textFileStream("/data/streaming-input")

71

val processedLines = fileStream.filter(_.nonEmpty).map(_.toUpperCase)

72

processedLines.print()

73

```

74

75

### Generic File Streams

76

77

Monitor directory with custom input format:

78

```scala { .api }

79

def fileStream[K, V, F <: NewInputFormat[K, V]](

80

directory: String

81

)(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): InputDStream[(K, V)]

82

```

83

84

Overloaded versions:

85

```scala { .api }

86

def fileStream[K, V, F <: NewInputFormat[K, V]](

87

directory: String,

88

filter: Path => Boolean,

89

newFilesOnly: Boolean

90

)(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): InputDStream[(K, V)]

91

92

def fileStream[K, V, F <: NewInputFormat[K, V]](

93

directory: String,

94

filter: Path => Boolean,

95

newFilesOnly: Boolean,

96

conf: Configuration

97

)(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): InputDStream[(K, V)]

98

```

99

100

Example with Hadoop input format:

101

```scala

102

import org.apache.hadoop.io.{LongWritable, Text}

103

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

104

105

val hadoopStream = ssc.fileStream[LongWritable, Text, TextInputFormat]("/data/input")

106

val textStream = hadoopStream.map(_._2.toString)

107

```

108

109

### Binary Records Streams

110

111

Read fixed-length binary records:

112

```scala { .api }

113

def binaryRecordsStream(directory: String, recordLength: Int): DStream[Array[Byte]]

114

```

115

116

Example binary records:

117

```scala

118

val binaryStream = ssc.binaryRecordsStream("/data/binary", 1024)

119

val processedRecords = binaryStream.map { bytes =>

120

// Process fixed 1024-byte records

121

processRecord(bytes)

122

}

123

```

124

125

## Queue-based Input Streams

126

127

### RDD Queue Streams

128

129

Create DStream from queue of RDDs:

130

```scala { .api }

131

def queueStream[T: ClassTag](

132

queue: Queue[RDD[T]],

133

oneAtATime: Boolean = true

134

): InputDStream[T]

135

136

def queueStream[T: ClassTag](

137

queue: Queue[RDD[T]],

138

oneAtATime: Boolean,

139

defaultRDD: RDD[T]

140

): InputDStream[T]

141

```

142

143

Example queue stream:

144

```scala

145

import scala.collection.mutable.Queue

146

147

val rddQueue = new Queue[RDD[Int]]()

148

val queueStream = ssc.queueStream(rddQueue)

149

150

// Add RDDs to queue in another thread

151

new Thread {

152

override def run(): Unit = {

153

for (i <- 1 to 100) {

154

rddQueue += ssc.sparkContext.parallelize(1 to 10)

155

Thread.sleep(1000)

156

}

157

}

158

}.start()

159

160

queueStream.print()

161

```

162

163

**Note**: Queue streams do not support checkpointing and should not be used in production for fault tolerance.

164

165

## Custom Receiver Streams

166

167

### Receiver-based Streams

168

169

Create DStream with custom receiver:

170

```scala { .api }

171

def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T]

172

```

173

174

Example custom receiver:

175

```scala

176

import org.apache.spark.storage.StorageLevel

177

import org.apache.spark.streaming.receiver.Receiver

178

import java.util.concurrent.Executors

179

180

class CustomReceiver extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {

181

private val executor = Executors.newSingleThreadExecutor()

182

183

def onStart(): Unit = {

184

executor.execute(new Runnable {

185

def run(): Unit = {

186

receive()

187

}

188

})

189

}

190

191

def onStop(): Unit = {

192

executor.shutdown()

193

}

194

195

private def receive(): Unit = {

196

try {

197

while (!isStopped()) {

198

// Simulate receiving data

199

val data = generateData()

200

store(data)

201

Thread.sleep(100)

202

}

203

} catch {

204

case e: Exception => restart("Error receiving data", e)

205

}

206

}

207

208

private def generateData(): String = {

209

// Custom data generation logic

210

s"data-${System.currentTimeMillis()}"

211

}

212

}

213

214

val customStream = ssc.receiverStream(new CustomReceiver())

215

customStream.print()

216

```

217

218

### Pluggable Input Streams

219

220

Alternative constructor for custom receivers:

221

```scala { .api }

222

class PluggableInputDStream[T: ClassTag](

223

ssc: StreamingContext,

224

receiver: Receiver[T]

225

) extends ReceiverInputDStream[T](ssc)

226

```

227

228

## Input Stream Properties

229

230

### Storage Levels

231

232

Common storage levels for input streams:

233

- `StorageLevel.MEMORY_ONLY` - Store in memory only

234

- `StorageLevel.MEMORY_AND_DISK` - Memory with disk fallback

235

- `StorageLevel.MEMORY_ONLY_SER` - Memory with serialization

236

- `StorageLevel.MEMORY_AND_DISK_SER` - Memory and disk with serialization

237

- `StorageLevel.MEMORY_AND_DISK_SER_2` - Replicated version

238

239

Example with custom storage level:

240

```scala

241

val stream = ssc.socketTextStream(

242

"localhost",

243

9999,

244

StorageLevel.MEMORY_ONLY_SER

245

)

246

```

247

248

### Input Stream Identification

249

250

All input streams have unique identifiers:

251

```scala { .api }

252

abstract class InputDStream[T] extends DStream[T] {

253

val id: Int

254

val name: String

255

def start(): Unit

256

def stop(): Unit

257

}

258

```

259

260

Access input stream properties:

261

```scala

262

val fileStream = ssc.textFileStream("/data/input")

263

println(s"Stream ID: ${fileStream.id}")

264

println(s"Stream name: ${fileStream.name}")

265

```

266

267

## File Stream Configuration

268

269

### File Monitoring Behavior

270

271

File streams monitor directories with these characteristics:

272

- Only files in the monitored directory (not subdirectories) are processed

273

- Files are processed based on modification time, not creation time

274

- Files must be written atomically (e.g., move operation) to be processed correctly

275

- File names should be consistent (lexicographically increasing) for best results

276

277

### File Processing Guarantees

278

279

- Each file is processed exactly once (assuming no failures)

280

- Files are processed in order of modification time

281

- Processing latency depends on batch interval and file discovery mechanism

282

283

Example atomic file writing pattern:

284

```scala

285

// Write to temporary file first

286

val tempFile = new File("/data/streaming-input/.temp-file")

287

writeDataToFile(tempFile, data)

288

289

// Atomically move to final location

290

val finalFile = new File("/data/streaming-input/data-file.txt")

291

tempFile.renameTo(finalFile)

292

```

293

294

## Input Stream Reliability

295

296

### Reliable Receivers

297

298

For fault-tolerant processing, use receivers that support write-ahead logs:

299

```scala

300

// Enable write-ahead logs in Spark configuration

301

val conf = new SparkConf()

302

.set("spark.streaming.receiver.writeAheadLog.enable", "true")

303

.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")

304

305

val ssc = new StreamingContext(conf, Seconds(5))

306

ssc.checkpoint("checkpoint-directory")

307

```

308

309

### Unreliable Receivers

310

311

Socket streams and some custom receivers do not support write-ahead logs and may lose data on failure. For production use cases requiring fault tolerance, prefer:

312

- Kafka integration (external library)

313

- File-based inputs with HDFS

314

- Custom receivers with reliable storage

315

- Message queue systems with acknowledgment