or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-io.mdcheckpointing.mddatastream-transformations.mdexecution-environment.mdindex.mdkeyed-streams-state.mdprocess-functions.mdsources-sinks.mdtime-watermarks.mdwindowing.md

sources-sinks.mddocs/

0

# Sources and Sinks

1

2

Sources and sinks are the entry and exit points for data in Apache Flink streaming applications. Sources ingest data from external systems, while sinks output processed results to external systems.

3

4

## Capabilities

5

6

### Built-in Sources

7

8

Pre-defined sources for common data ingestion patterns.

9

10

```java { .api }

11

// Element-based sources

12

<T> DataStreamSource<T> fromElements(T... data);

13

<T> DataStreamSource<T> fromCollection(Collection<T> data);

14

<T> DataStreamSource<T> fromCollection(Collection<T> data, TypeInformation<T> typeInfo);

15

16

// File-based sources

17

DataStreamSource<String> readTextFile(String filePath);

18

DataStreamSource<String> readTextFile(String filePath, String charsetName);

19

DataStreamSource<String> readFile(FileInputFormat<String> inputFormat, String filePath);

20

21

// Network sources

22

DataStreamSource<String> socketTextStream(String hostname, int port);

23

DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter);

24

DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry);

25

26

// Sequence sources

27

DataStreamSource<Long> generateSequence(long from, long to);

28

DataStreamSource<Long> fromSequence(long from, long to);

29

30

// Custom sources

31

<T> DataStreamSource<T> addSource(SourceFunction<T> function);

32

<T> DataStreamSource<T> addSource(SourceFunction<T> function, String sourceName);

33

<T> DataStreamSource<T> addSource(SourceFunction<T> function, TypeInformation<T> typeInfo);

34

```

35

36

**Usage Examples:**

37

38

```java

39

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

40

41

// From elements

42

DataStream<String> words = env.fromElements("hello", "world", "flink");

43

44

// From collection

45

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

46

DataStream<Integer> numberStream = env.fromCollection(numbers);

47

48

// From file

49

DataStream<String> fileStream = env.readTextFile("/path/to/input.txt");

50

51

// Socket stream

52

DataStream<String> socketStream = env.socketTextStream("localhost", 9999);

53

54

// Sequence

55

DataStream<Long> sequence = env.generateSequence(1, 1000000);

56

57

// Custom source

58

DataStream<Event> eventStream = env.addSource(new CustomEventSource());

59

```

60

61

### Custom Source Functions

62

63

Implement custom sources using SourceFunction interface.

64

65

```java { .api }

66

/**

67

* Interface for source functions

68

*/

69

interface SourceFunction<T> extends Function {

70

/**

71

* Main method to emit elements

72

* @param ctx - source context for emitting elements

73

*/

74

void run(SourceContext<T> ctx) throws Exception;

75

76

/**

77

* Cancel the source

78

*/

79

void cancel();

80

81

/**

82

* Source context for emitting elements

83

*/

84

interface SourceContext<T> {

85

void collect(T element);

86

void collectWithTimestamp(T element, long timestamp);

87

void emitWatermark(Watermark mark);

88

void markAsTemporarilyIdle();

89

Object getCheckpointLock();

90

void close();

91

}

92

}

93

94

/**

95

* Rich source function with lifecycle methods

96

*/

97

abstract class RichSourceFunction<T> extends AbstractRichFunction implements SourceFunction<T> {

98

// Inherits open(), close(), getRuntimeContext()

99

}

100

101

/**

102

* Rich parallel source function for parallel sources

103

*/

104

abstract class RichParallelSourceFunction<T> extends RichSourceFunction<T> implements ParallelSourceFunction<T> {

105

// Can run in parallel with multiple instances

106

}

107

```

108

109

**Usage Examples:**

110

111

```java

112

// Simple custom source

113

class NumberSource implements SourceFunction<Integer> {

114

private volatile boolean running = true;

115

private int counter = 0;

116

117

@Override

118

public void run(SourceContext<Integer> ctx) throws Exception {

119

while (running && counter < 1000) {

120

synchronized (ctx.getCheckpointLock()) {

121

ctx.collect(counter++);

122

}

123

Thread.sleep(100);

124

}

125

}

126

127

@Override

128

public void cancel() {

129

running = false;

130

}

131

}

132

133

// Rich source with state

134

class StatefulSource extends RichSourceFunction<Event> {

135

private ListState<Long> offsetState;

136

private volatile boolean running = true;

137

138

@Override

139

public void open(Configuration parameters) throws Exception {

140

super.open(parameters);

141

ListStateDescriptor<Long> descriptor =

142

new ListStateDescriptor<>("offset", Long.class);

143

offsetState = getRuntimeContext().getListState(descriptor);

144

}

145

146

@Override

147

public void run(SourceContext<Event> ctx) throws Exception {

148

// Restore offset from state

149

long offset = 0;

150

for (Long o : offsetState.get()) {

151

offset = o;

152

}

153

154

while (running) {

155

// Emit event with timestamp

156

Event event = fetchNextEvent(offset);

157

ctx.collectWithTimestamp(event, event.getTimestamp());

158

159

// Emit watermark

160

ctx.emitWatermark(new Watermark(event.getTimestamp() - 5000));

161

162

offset++;

163

}

164

}

165

166

@Override

167

public void snapshotState(FunctionSnapshotContext context) throws Exception {

168

offsetState.clear();

169

offsetState.add(currentOffset);

170

}

171

172

@Override

173

public void cancel() {

174

running = false;

175

}

176

}

177

```

178

179

### Built-in Sinks

180

181

Pre-defined sinks for common output patterns.

182

183

```java { .api }

184

// Console output

185

DataStreamSink<T> print();

186

DataStreamSink<T> print(String sinkIdentifier);

187

DataStreamSink<T> printToErr();

188

DataStreamSink<T> printToErr(String sinkIdentifier);

189

190

// File output

191

DataStreamSink<T> writeAsText(String path);

192

DataStreamSink<T> writeAsText(String path, WriteMode writeMode);

193

DataStreamSink<T> writeAsCsv(String path);

194

DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode);

195

DataStreamSink<T> writeAsCsv(String path, String rowDelimiter, String fieldDelimiter);

196

197

// Socket output

198

DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T> schema);

199

200

// Custom sinks

201

DataStreamSink<T> addSink(SinkFunction<T> sinkFunction);

202

DataStreamSink<T> addSink(SinkFunction<T> sinkFunction, String name);

203

```

204

205

**Usage Examples:**

206

207

```java

208

DataStream<String> result = processedStream;

209

210

// Print to console

211

result.print();

212

result.print("MyOutput");

213

214

// Write to file

215

result.writeAsText("/path/to/output.txt");

216

result.writeAsText("/path/to/output.txt", WriteMode.OVERWRITE);

217

218

// Write CSV

219

tupleStream.writeAsCsv("/path/to/output.csv", "\n", ",");

220

221

// Socket output

222

result.writeToSocket("localhost", 9999, new SimpleStringSchema());

223

224

// Custom sink

225

result.addSink(new CustomSink());

226

```

227

228

### Custom Sink Functions

229

230

Implement custom sinks using SinkFunction interface.

231

232

```java { .api }

233

/**

234

* Interface for sink functions

235

*/

236

interface SinkFunction<IN> extends Function {

237

/**

238

* Process each element

239

* @param value - input element

240

* @param context - sink context

241

*/

242

default void invoke(IN value, Context context) throws Exception {

243

invoke(value);

244

}

245

246

/**

247

* Simple invoke method (deprecated in favor of invoke with context)

248

* @param value - input element

249

*/

250

default void invoke(IN value) throws Exception {}

251

252

/**

253

* Sink context interface

254

*/

255

interface Context {

256

long currentProcessingTime();

257

long currentWatermark();

258

Long timestamp();

259

}

260

}

261

262

/**

263

* Rich sink function with lifecycle methods

264

*/

265

abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {

266

// Inherits open(), close(), getRuntimeContext()

267

}

268

```

269

270

**Usage Examples:**

271

272

```java

273

// Simple custom sink

274

class DatabaseSink implements SinkFunction<Event> {

275

private transient Connection connection;

276

277

@Override

278

public void invoke(Event event, Context context) throws Exception {

279

if (connection == null) {

280

connection = DriverManager.getConnection("jdbc:...");

281

}

282

283

PreparedStatement stmt = connection.prepareStatement(

284

"INSERT INTO events (id, value, timestamp) VALUES (?, ?, ?)"

285

);

286

stmt.setString(1, event.getId());

287

stmt.setString(2, event.getValue());

288

stmt.setLong(3, event.getTimestamp());

289

stmt.executeUpdate();

290

}

291

}

292

293

// Rich sink with connection pooling

294

class PooledDatabaseSink extends RichSinkFunction<Event> {

295

private transient ConnectionPool pool;

296

297

@Override

298

public void open(Configuration parameters) throws Exception {

299

super.open(parameters);

300

pool = new ConnectionPool();

301

}

302

303

@Override

304

public void invoke(Event event, Context context) throws Exception {

305

try (Connection conn = pool.getConnection()) {

306

// Insert logic

307

}

308

}

309

310

@Override

311

public void close() throws Exception {

312

if (pool != null) {

313

pool.close();

314

}

315

super.close();

316

}

317

}

318

```

319

320

### Streaming File Sink

321

322

Advanced file sink for streaming applications with exactly-once guarantees.

323

324

```java { .api }

325

/**

326

* File sink for streaming applications

327

*/

328

class StreamingFileSink<IN> implements SinkFunction<IN> {

329

/**

330

* Create row format builder for text-based files

331

*/

332

static <IN> StreamingFileSink.RowFormatBuilder<IN, String> forRowFormat(

333

Path basePath,

334

Encoder<IN> encoder

335

);

336

337

/**

338

* Create bulk format builder for columnar formats (Parquet, ORC)

339

*/

340

static <IN> StreamingFileSink.BulkFormatBuilder<IN, IN> forBulkFormat(

341

Path basePath,

342

BulkWriter.Factory<IN> writerFactory

343

);

344

}

345

```

346

347

**Usage Examples:**

348

349

```java

350

// Row format (text files)

351

StreamingFileSink<String> textSink = StreamingFileSink

352

.forRowFormat(new Path("/path/to/output"), new SimpleStringEncoder<String>("UTF-8"))

353

.withRollingPolicy(DefaultRollingPolicy.builder()

354

.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))

355

.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))

356

.withMaxPartSize(1024 * 1024 * 1024)

357

.build())

358

.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH"))

359

.build();

360

361

dataStream.addSink(textSink);

362

363

// Bulk format (Parquet)

364

StreamingFileSink<Event> parquetSink = StreamingFileSink

365

.forBulkFormat(

366

new Path("/path/to/output"),

367

ParquetAvroWriters.forReflectRecord(Event.class)

368

)

369

.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd"))

370

.build();

371

372

eventStream.addSink(parquetSink);

373

```

374

375

## Types

376

377

### Source Function Types

378

379

```java { .api }

380

// Base source function

381

interface SourceFunction<T> extends Function {

382

void run(SourceContext<T> ctx) throws Exception;

383

void cancel();

384

385

interface SourceContext<T> {

386

void collect(T element);

387

void collectWithTimestamp(T element, long timestamp);

388

void emitWatermark(Watermark mark);

389

void markAsTemporarilyIdle();

390

Object getCheckpointLock();

391

void close();

392

}

393

}

394

395

// Parallel source function

396

interface ParallelSourceFunction<T> extends SourceFunction<T> {

397

// Marker interface for sources that can run in parallel

398

}

399

400

// Rich source functions

401

abstract class RichSourceFunction<T> extends AbstractRichFunction implements SourceFunction<T>;

402

abstract class RichParallelSourceFunction<T> extends RichSourceFunction<T> implements ParallelSourceFunction<T>;

403

404

// Checkpointed source function

405

interface CheckpointedFunction {

406

void snapshotState(FunctionSnapshotContext context) throws Exception;

407

void initializeState(FunctionInitializationContext context) throws Exception;

408

}

409

```

410

411

### Sink Function Types

412

413

```java { .api }

414

// Base sink function

415

interface SinkFunction<IN> extends Function {

416

default void invoke(IN value, Context context) throws Exception;

417

default void invoke(IN value) throws Exception;

418

419

interface Context {

420

long currentProcessingTime();

421

long currentWatermark();

422

Long timestamp();

423

}

424

}

425

426

// Rich sink function

427

abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN>;

428

429

// Two-phase commit sink function for exactly-once semantics

430

abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichSinkFunction<IN>

431

implements CheckpointedFunction, CheckpointListener {

432

433

abstract TXN beginTransaction() throws Exception;

434

abstract void preCommit(TXN transaction) throws Exception;

435

abstract void commit(TXN transaction);

436

abstract void abort(TXN transaction);

437

}

438

```

439

440

### Utility Types

441

442

```java { .api }

443

// Write modes

444

enum WriteMode {

445

NO_OVERWRITE, // Fail if file exists

446

OVERWRITE // Overwrite existing files

447

}

448

449

// Watermark

450

class Watermark implements Serializable {

451

public Watermark(long timestamp);

452

long getTimestamp();

453

}

454

455

// Encoders for StreamingFileSink

456

interface Encoder<IN> extends Serializable {

457

void encode(IN element, OutputStream stream) throws IOException;

458

}

459

460

class SimpleStringEncoder<IN> implements Encoder<IN> {

461

public SimpleStringEncoder();

462

public SimpleStringEncoder(String charset);

463

}

464

465

// Bucket assigners

466

interface BucketAssigner<IN, BucketID> extends Serializable {

467

BucketID getBucketId(IN element, Context context);

468

469

interface Context {

470

long currentProcessingTime();

471

long currentWatermark();

472

Long timestamp();

473

}

474

}

475

476

class DateTimeBucketAssigner<IN> implements BucketAssigner<IN, String> {

477

public DateTimeBucketAssigner(String formatString);

478

public DateTimeBucketAssigner(String formatString, ZoneId zoneId);

479

}

480

```