or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mddata-sources.mdfunction-interfaces.mdindex.mdkeyed-streams.mdoutput-operations.mdscala-extensions.mdstream-composition.mdstream-environment.mdstream-partitioning.mdstream-transformations.mdwindow-operations.mdwindowing.md

data-sources.mddocs/

0

# Data Sources and Stream Creation

1

2

The StreamExecutionEnvironment provides comprehensive functionality for creating DataStreams from various sources, including collections, files, sockets, and custom source functions.

3

4

## Collection-Based Sources

5

6

### From Elements

7

8

```scala { .api }

9

class StreamExecutionEnvironment {

10

def fromElements[T: TypeInformation](data: T*): DataStream[T]

11

}

12

```

13

14

Create a stream from individual elements:

15

16

```scala

17

val env = StreamExecutionEnvironment.getExecutionEnvironment

18

19

// Create stream from individual elements

20

val numbers = env.fromElements(1, 2, 3, 4, 5)

21

val strings = env.fromElements("apple", "banana", "cherry")

22

val tuples = env.fromElements(("Alice", 25), ("Bob", 30), ("Charlie", 35))

23

```

24

25

### From Collection

26

27

```scala { .api }

28

class StreamExecutionEnvironment {

29

def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]

30

def fromCollection[T: TypeInformation](data: Iterator[T]): DataStream[T]

31

}

32

```

33

34

Create streams from Scala collections:

35

36

```scala

37

val env = StreamExecutionEnvironment.getExecutionEnvironment

38

39

// From Scala collections

40

val listData = List(1, 2, 3, 4, 5)

41

val stream1 = env.fromCollection(listData)

42

43

val vectorData = Vector("a", "b", "c")

44

val stream2 = env.fromCollection(vectorData)

45

46

// From iterator

47

val iteratorData = (1 to 1000).iterator

48

val stream3 = env.fromCollection(iteratorData)

49

```

50

51

### From Parallel Collection

52

53

```scala { .api }

54

class StreamExecutionEnvironment {

55

def fromParallelCollection[T: TypeInformation](data: SplittableIterator[T]): DataStream[T]

56

}

57

```

58

59

Create parallel streams from splittable data:

60

61

```scala

62

import org.apache.flink.util.SplittableIterator

63

64

val env = StreamExecutionEnvironment.getExecutionEnvironment

65

66

// Custom splittable iterator for parallel processing

67

class NumberSplittableIterator(from: Int, to: Int) extends SplittableIterator[Int] {

68

private var current = from

69

70

override def hasNext: Boolean = current <= to

71

override def next(): Int = { val result = current; current += 1; result }

72

override def split(numPartitions: Int): Array[SplittableIterator[Int]] = {

73

val range = (to - from + 1) / numPartitions

74

(0 until numPartitions).map { i =>

75

val start = from + i * range

76

val end = if (i == numPartitions - 1) to else start + range - 1

77

new NumberSplittableIterator(start, end)

78

}.toArray

79

}

80

override def getMaximumNumberOfSplits: Int = to - from + 1

81

}

82

83

val parallelStream = env.fromParallelCollection(new NumberSplittableIterator(1, 10000))

84

```

85

86

## Sequence Generation

87

88

```scala { .api }

89

class StreamExecutionEnvironment {

90

def generateSequence(from: Long, to: Long): DataStream[Long]

91

}

92

```

93

94

Generate numeric sequences:

95

96

```scala

97

val env = StreamExecutionEnvironment.getExecutionEnvironment

98

99

// Generate sequence from 1 to 1000

100

val sequence = env.generateSequence(1, 1000)

101

102

// Process the sequence

103

sequence

104

.filter(_ % 2 == 0)

105

.map(_ * 2)

106

.print()

107

```

108

109

## File-Based Sources

110

111

### Text File Reading

112

113

```scala { .api }

114

class StreamExecutionEnvironment {

115

def readTextFile(filePath: String): DataStream[String]

116

def readTextFile(filePath: String, charsetName: String): DataStream[String]

117

}

118

```

119

120

Read text files line by line:

121

122

```scala

123

val env = StreamExecutionEnvironment.getExecutionEnvironment

124

125

// Read text file with default charset (UTF-8)

126

val textStream = env.readTextFile("/path/to/input.txt")

127

128

// Read with specific charset

129

val textStreamLatin1 = env.readTextFile("/path/to/input.txt", "ISO-8859-1")

130

131

// Process text data

132

textStream

133

.flatMap(_.split("\\s+"))

134

.map(word => (word, 1))

135

.keyBy(0)

136

.sum(1)

137

.print()

138

```

139

140

### Custom File Formats

141

142

```scala { .api }

143

class StreamExecutionEnvironment {

144

def readFile[T: TypeInformation](inputFormat: FileInputFormat[T], filePath: String): DataStream[T]

145

def readFile[T: TypeInformation](

146

inputFormat: FileInputFormat[T],

147

filePath: String,

148

watchType: FileProcessingMode,

149

interval: Long

150

): DataStream[T]

151

}

152

```

153

154

Read files with custom input formats:

155

156

```scala

157

import org.apache.flink.api.common.io.FileInputFormat

158

import org.apache.flink.streaming.api.functions.source.FileProcessingMode

159

160

val env = StreamExecutionEnvironment.getExecutionEnvironment

161

162

// Custom CSV input format example

163

class CsvInputFormat extends FileInputFormat[String] {

164

// Implementation details...

165

}

166

167

// Read with custom format

168

val csvStream = env.readFile(new CsvInputFormat(), "/path/to/data.csv")

169

170

// Monitor file changes (for streaming file ingestion)

171

val monitoredStream = env.readFile(

172

new CsvInputFormat(),

173

"/path/to/data/",

174

FileProcessingMode.PROCESS_CONTINUOUSLY,

175

1000 // Check every 1000ms

176

)

177

```

178

179

### File Stream Monitoring (Deprecated)

180

181

```scala { .api }

182

class StreamExecutionEnvironment {

183

def readFileStream(streamPath: String, intervalMillis: Long, watchType: FileMonitoringFunction.WatchType): DataStream[String]

184

}

185

```

186

187

## Socket-Based Sources

188

189

```scala { .api }

190

class StreamExecutionEnvironment {

191

def socketTextStream(hostname: String, port: Int): DataStream[String]

192

def socketTextStream(hostname: String, port: Int, delimiter: Char): DataStream[String]

193

def socketTextStream(hostname: String, port: Int, delimiter: Char, maxRetry: Long): DataStream[String]

194

}

195

```

196

197

Connect to socket sources for real-time data:

198

199

```scala

200

val env = StreamExecutionEnvironment.getExecutionEnvironment

201

202

// Connect to socket with default settings

203

val socketStream = env.socketTextStream("localhost", 9999)

204

205

// With custom delimiter and retry settings

206

val socketStreamCustom = env.socketTextStream("localhost", 9999, '\n', 5)

207

208

// Process socket data

209

socketStream

210

.flatMap(_.toLowerCase.split("\\W+"))

211

.filter(_.nonEmpty)

212

.map((_, 1))

213

.keyBy(0)

214

.sum(1)

215

.print()

216

```

217

218

## Custom Source Functions

219

220

### Simple Source Function

221

222

```scala { .api }

223

class StreamExecutionEnvironment {

224

def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T]

225

def addSource[T: TypeInformation](function: SourceContext[T] => Unit): DataStream[T]

226

}

227

```

228

229

Create custom source functions:

230

231

```scala

232

import org.apache.flink.streaming.api.functions.source.{SourceFunction, SourceContext}

233

234

// Custom source function class

235

class NumberGeneratorSource extends SourceFunction[Int] {

236

@volatile private var isRunning = true

237

238

override def run(ctx: SourceContext[Int]): Unit = {

239

var counter = 0

240

while (isRunning && counter < 1000) {

241

ctx.collect(counter)

242

counter += 1

243

Thread.sleep(100) // Emit every 100ms

244

}

245

}

246

247

override def cancel(): Unit = {

248

isRunning = false

249

}

250

}

251

252

val env = StreamExecutionEnvironment.getExecutionEnvironment

253

254

// Use custom source function

255

val customStream = env.addSource(new NumberGeneratorSource)

256

257

// Lambda-based source function

258

val lambdaStream = env.addSource { ctx =>

259

for (i <- 1 to 100) {

260

ctx.collect(s"Message $i")

261

Thread.sleep(1000)

262

}

263

}

264

```

265

266

### Rich Source Function

267

268

```scala

269

import org.apache.flink.streaming.api.functions.source.RichSourceFunction

270

import org.apache.flink.configuration.Configuration

271

272

class ConfigurableSource extends RichSourceFunction[String] {

273

private var isRunning = true

274

private var config: String = _

275

276

override def open(parameters: Configuration): Unit = {

277

// Initialize with runtime context

278

config = getRuntimeContext.getJobParameter("source.config", "default")

279

}

280

281

override def run(ctx: SourceContext[String]): Unit = {

282

while (isRunning) {

283

ctx.collect(s"Data from $config")

284

Thread.sleep(1000)

285

}

286

}

287

288

override def cancel(): Unit = {

289

isRunning = false

290

}

291

}

292

```

293

294

## Input Format Sources

295

296

```scala { .api }

297

class StreamExecutionEnvironment {

298

def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T]

299

}

300

```

301

302

Create streams from custom input formats:

303

304

```scala

305

import org.apache.flink.api.common.io.InputFormat

306

307

// Custom input format implementation

308

class DatabaseInputFormat extends InputFormat[MyRecord, DatabaseInputSplit] {

309

// Implementation details...

310

}

311

312

val env = StreamExecutionEnvironment.getExecutionEnvironment

313

val dbStream = env.createInput(new DatabaseInputFormat)

314

```

315

316

## Source Configuration and Best Practices

317

318

### Parallelism Control

319

320

```scala

321

val env = StreamExecutionEnvironment.getExecutionEnvironment

322

323

// Set specific parallelism for sources

324

val source = env.addSource(new CustomSource)

325

.setParallelism(4) // 4 parallel source instances

326

327

// Some sources don't support parallelism > 1

328

val socketSource = env.socketTextStream("localhost", 9999)

329

.setParallelism(1) // Socket sources typically need parallelism 1

330

```

331

332

### Source Watermark Assignment

333

334

```scala

335

import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor

336

import org.apache.flink.streaming.api.windowing.time.Time

337

338

val env = StreamExecutionEnvironment.getExecutionEnvironment

339

340

// Create source with timestamp and watermark assignment

341

val timestampedStream = env.addSource(new TimestampedSource)

342

.assignTimestampsAndWatermarks(

343

new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(5)) {

344

override def extractTimestamp(element: MyEvent): Long = element.timestamp

345

}

346

)

347

```

348

349

## Complete Example: Multi-Source Application

350

351

```scala

352

import org.apache.flink.streaming.api.scala._

353

import org.apache.flink.streaming.api.windowing.time.Time

354

355

object MultiSourceExample {

356

def main(args: Array[String]): Unit = {

357

val env = StreamExecutionEnvironment.getExecutionEnvironment

358

359

// Static data source

360

val staticData = env.fromElements(

361

("user1", "login"),

362

("user2", "logout"),

363

("user3", "login")

364

)

365

366

// File-based source

367

val fileData = env.readTextFile("/path/to/logs.txt")

368

.map(line => {

369

val parts = line.split(",")

370

(parts(0), parts(1))

371

})

372

373

// Socket-based real-time source

374

val realtimeData = env.socketTextStream("localhost", 9999)

375

.map(line => {

376

val parts = line.split(",")

377

(parts(0), parts(1))

378

})

379

380

// Union all sources

381

val allData = staticData

382

.union(fileData)

383

.union(realtimeData)

384

385

// Process combined data

386

allData

387

.keyBy(0)

388

.timeWindow(Time.minutes(5))

389

.apply((key, window, events, out) => {

390

out.collect((key, events.size))

391

})

392

.print()

393

394

env.execute("Multi-Source Streaming Application")

395

}

396

}

397

```