0
# Sources and Sinks
1
2
Sources and sinks are the entry and exit points for data in Apache Flink streaming applications. Sources ingest data from external systems, while sinks output processed results to external systems.
3
4
## Capabilities
5
6
### Built-in Sources
7
8
Pre-defined sources for common data ingestion patterns.
9
10
```java { .api }
11
// Element-based sources
12
<T> DataStreamSource<T> fromElements(T... data);
13
<T> DataStreamSource<T> fromCollection(Collection<T> data);
14
<T> DataStreamSource<T> fromCollection(Collection<T> data, TypeInformation<T> typeInfo);
15
16
// File-based sources
17
DataStreamSource<String> readTextFile(String filePath);
18
DataStreamSource<String> readTextFile(String filePath, String charsetName);
19
DataStreamSource<String> readFile(FileInputFormat<String> inputFormat, String filePath);
20
21
// Network sources
22
DataStreamSource<String> socketTextStream(String hostname, int port);
23
DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter);
24
DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry);
25
26
// Sequence sources
27
DataStreamSource<Long> generateSequence(long from, long to);
28
DataStreamSource<Long> fromSequence(long from, long to);
29
30
// Custom sources
31
<T> DataStreamSource<T> addSource(SourceFunction<T> function);
32
<T> DataStreamSource<T> addSource(SourceFunction<T> function, String sourceName);
33
<T> DataStreamSource<T> addSource(SourceFunction<T> function, TypeInformation<T> typeInfo);
34
```
35
36
**Usage Examples:**
37
38
```java
39
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
40
41
// From elements
42
DataStream<String> words = env.fromElements("hello", "world", "flink");
43
44
// From collection
45
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
46
DataStream<Integer> numberStream = env.fromCollection(numbers);
47
48
// From file
49
DataStream<String> fileStream = env.readTextFile("/path/to/input.txt");
50
51
// Socket stream
52
DataStream<String> socketStream = env.socketTextStream("localhost", 9999);
53
54
// Sequence
55
DataStream<Long> sequence = env.generateSequence(1, 1000000);
56
57
// Custom source
58
DataStream<Event> eventStream = env.addSource(new CustomEventSource());
59
```
60
61
### Custom Source Functions
62
63
Implement custom sources using SourceFunction interface.
64
65
```java { .api }
66
/**
67
* Interface for source functions
68
*/
69
interface SourceFunction<T> extends Function {
70
/**
71
* Main method to emit elements
72
* @param ctx - source context for emitting elements
73
*/
74
void run(SourceContext<T> ctx) throws Exception;
75
76
/**
77
* Cancel the source
78
*/
79
void cancel();
80
81
/**
82
* Source context for emitting elements
83
*/
84
interface SourceContext<T> {
85
void collect(T element);
86
void collectWithTimestamp(T element, long timestamp);
87
void emitWatermark(Watermark mark);
88
void markAsTemporarilyIdle();
89
Object getCheckpointLock();
90
void close();
91
}
92
}
93
94
/**
95
* Rich source function with lifecycle methods
96
*/
97
abstract class RichSourceFunction<T> extends AbstractRichFunction implements SourceFunction<T> {
98
// Inherits open(), close(), getRuntimeContext()
99
}
100
101
/**
102
* Rich parallel source function for parallel sources
103
*/
104
abstract class RichParallelSourceFunction<T> extends RichSourceFunction<T> implements ParallelSourceFunction<T> {
105
// Can run in parallel with multiple instances
106
}
107
```
108
109
**Usage Examples:**
110
111
```java
112
// Simple custom source
113
class NumberSource implements SourceFunction<Integer> {
114
private volatile boolean running = true;
115
private int counter = 0;
116
117
@Override
118
public void run(SourceContext<Integer> ctx) throws Exception {
119
while (running && counter < 1000) {
120
synchronized (ctx.getCheckpointLock()) {
121
ctx.collect(counter++);
122
}
123
Thread.sleep(100);
124
}
125
}
126
127
@Override
128
public void cancel() {
129
running = false;
130
}
131
}
132
133
// Rich source with state
134
class StatefulSource extends RichSourceFunction<Event> {
135
private ListState<Long> offsetState;
136
private volatile boolean running = true;
137
138
@Override
139
public void open(Configuration parameters) throws Exception {
140
super.open(parameters);
141
ListStateDescriptor<Long> descriptor =
142
new ListStateDescriptor<>("offset", Long.class);
143
offsetState = getRuntimeContext().getListState(descriptor);
144
}
145
146
@Override
147
public void run(SourceContext<Event> ctx) throws Exception {
148
// Restore offset from state
149
long offset = 0;
150
for (Long o : offsetState.get()) {
151
offset = o;
152
}
153
154
while (running) {
155
// Emit event with timestamp
156
Event event = fetchNextEvent(offset);
157
ctx.collectWithTimestamp(event, event.getTimestamp());
158
159
// Emit watermark
160
ctx.emitWatermark(new Watermark(event.getTimestamp() - 5000));
161
162
offset++;
163
}
164
}
165
166
@Override
167
public void snapshotState(FunctionSnapshotContext context) throws Exception {
168
offsetState.clear();
169
offsetState.add(currentOffset);
170
}
171
172
@Override
173
public void cancel() {
174
running = false;
175
}
176
}
177
```
178
179
### Built-in Sinks
180
181
Pre-defined sinks for common output patterns.
182
183
```java { .api }
184
// Console output
185
DataStreamSink<T> print();
186
DataStreamSink<T> print(String sinkIdentifier);
187
DataStreamSink<T> printToErr();
188
DataStreamSink<T> printToErr(String sinkIdentifier);
189
190
// File output
191
DataStreamSink<T> writeAsText(String path);
192
DataStreamSink<T> writeAsText(String path, WriteMode writeMode);
193
DataStreamSink<T> writeAsCsv(String path);
194
DataStreamSink<T> writeAsCsv(String path, WriteMode writeMode);
195
DataStreamSink<T> writeAsCsv(String path, String rowDelimiter, String fieldDelimiter);
196
197
// Socket output
198
DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T> schema);
199
200
// Custom sinks
201
DataStreamSink<T> addSink(SinkFunction<T> sinkFunction);
202
DataStreamSink<T> addSink(SinkFunction<T> sinkFunction, String name);
203
```
204
205
**Usage Examples:**
206
207
```java
208
DataStream<String> result = processedStream;
209
210
// Print to console
211
result.print();
212
result.print("MyOutput");
213
214
// Write to file
215
result.writeAsText("/path/to/output.txt");
216
result.writeAsText("/path/to/output.txt", WriteMode.OVERWRITE);
217
218
// Write CSV
219
tupleStream.writeAsCsv("/path/to/output.csv", "\n", ",");
220
221
// Socket output
222
result.writeToSocket("localhost", 9999, new SimpleStringSchema());
223
224
// Custom sink
225
result.addSink(new CustomSink());
226
```
227
228
### Custom Sink Functions
229
230
Implement custom sinks using SinkFunction interface.
231
232
```java { .api }
233
/**
234
* Interface for sink functions
235
*/
236
interface SinkFunction<IN> extends Function {
237
/**
238
* Process each element
239
* @param value - input element
240
* @param context - sink context
241
*/
242
default void invoke(IN value, Context context) throws Exception {
243
invoke(value);
244
}
245
246
/**
247
* Simple invoke method (deprecated in favor of invoke with context)
248
* @param value - input element
249
*/
250
default void invoke(IN value) throws Exception {}
251
252
/**
253
* Sink context interface
254
*/
255
interface Context {
256
long currentProcessingTime();
257
long currentWatermark();
258
Long timestamp();
259
}
260
}
261
262
/**
263
* Rich sink function with lifecycle methods
264
*/
265
abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN> {
266
// Inherits open(), close(), getRuntimeContext()
267
}
268
```
269
270
**Usage Examples:**
271
272
```java
273
// Simple custom sink
274
class DatabaseSink implements SinkFunction<Event> {
275
private transient Connection connection;
276
277
@Override
278
public void invoke(Event event, Context context) throws Exception {
279
if (connection == null) {
280
connection = DriverManager.getConnection("jdbc:...");
281
}
282
283
PreparedStatement stmt = connection.prepareStatement(
284
"INSERT INTO events (id, value, timestamp) VALUES (?, ?, ?)"
285
);
286
stmt.setString(1, event.getId());
287
stmt.setString(2, event.getValue());
288
stmt.setLong(3, event.getTimestamp());
289
stmt.executeUpdate();
290
}
291
}
292
293
// Rich sink with connection pooling
294
class PooledDatabaseSink extends RichSinkFunction<Event> {
295
private transient ConnectionPool pool;
296
297
@Override
298
public void open(Configuration parameters) throws Exception {
299
super.open(parameters);
300
pool = new ConnectionPool();
301
}
302
303
@Override
304
public void invoke(Event event, Context context) throws Exception {
305
try (Connection conn = pool.getConnection()) {
306
// Insert logic
307
}
308
}
309
310
@Override
311
public void close() throws Exception {
312
if (pool != null) {
313
pool.close();
314
}
315
super.close();
316
}
317
}
318
```
319
320
### Streaming File Sink
321
322
Advanced file sink for streaming applications with exactly-once guarantees.
323
324
```java { .api }
325
/**
326
* File sink for streaming applications
327
*/
328
class StreamingFileSink<IN> implements SinkFunction<IN> {
329
/**
330
* Create row format builder for text-based files
331
*/
332
static <IN> StreamingFileSink.RowFormatBuilder<IN, String> forRowFormat(
333
Path basePath,
334
Encoder<IN> encoder
335
);
336
337
/**
338
* Create bulk format builder for columnar formats (Parquet, ORC)
339
*/
340
static <IN> StreamingFileSink.BulkFormatBuilder<IN, IN> forBulkFormat(
341
Path basePath,
342
BulkWriter.Factory<IN> writerFactory
343
);
344
}
345
```
346
347
**Usage Examples:**
348
349
```java
350
// Row format (text files)
351
StreamingFileSink<String> textSink = StreamingFileSink
352
.forRowFormat(new Path("/path/to/output"), new SimpleStringEncoder<String>("UTF-8"))
353
.withRollingPolicy(DefaultRollingPolicy.builder()
354
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
355
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
356
.withMaxPartSize(1024 * 1024 * 1024)
357
.build())
358
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd--HH"))
359
.build();
360
361
dataStream.addSink(textSink);
362
363
// Bulk format (Parquet)
364
StreamingFileSink<Event> parquetSink = StreamingFileSink
365
.forBulkFormat(
366
new Path("/path/to/output"),
367
ParquetAvroWriters.forReflectRecord(Event.class)
368
)
369
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd"))
370
.build();
371
372
eventStream.addSink(parquetSink);
373
```
374
375
## Types
376
377
### Source Function Types
378
379
```java { .api }
380
// Base source function
381
interface SourceFunction<T> extends Function {
382
void run(SourceContext<T> ctx) throws Exception;
383
void cancel();
384
385
interface SourceContext<T> {
386
void collect(T element);
387
void collectWithTimestamp(T element, long timestamp);
388
void emitWatermark(Watermark mark);
389
void markAsTemporarilyIdle();
390
Object getCheckpointLock();
391
void close();
392
}
393
}
394
395
// Parallel source function
396
interface ParallelSourceFunction<T> extends SourceFunction<T> {
397
// Marker interface for sources that can run in parallel
398
}
399
400
// Rich source functions
401
abstract class RichSourceFunction<T> extends AbstractRichFunction implements SourceFunction<T>;
402
abstract class RichParallelSourceFunction<T> extends RichSourceFunction<T> implements ParallelSourceFunction<T>;
403
404
// Checkpointed source function
405
interface CheckpointedFunction {
406
void snapshotState(FunctionSnapshotContext context) throws Exception;
407
void initializeState(FunctionInitializationContext context) throws Exception;
408
}
409
```
410
411
### Sink Function Types
412
413
```java { .api }
414
// Base sink function
415
interface SinkFunction<IN> extends Function {
416
default void invoke(IN value, Context context) throws Exception;
417
default void invoke(IN value) throws Exception;
418
419
interface Context {
420
long currentProcessingTime();
421
long currentWatermark();
422
Long timestamp();
423
}
424
}
425
426
// Rich sink function
427
abstract class RichSinkFunction<IN> extends AbstractRichFunction implements SinkFunction<IN>;
428
429
// Two-phase commit sink function for exactly-once semantics
430
abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichSinkFunction<IN>
431
implements CheckpointedFunction, CheckpointListener {
432
433
abstract TXN beginTransaction() throws Exception;
434
abstract void preCommit(TXN transaction) throws Exception;
435
abstract void commit(TXN transaction);
436
abstract void abort(TXN transaction);
437
}
438
```
439
440
### Utility Types
441
442
```java { .api }
443
// Write modes
444
enum WriteMode {
445
NO_OVERWRITE, // Fail if file exists
446
OVERWRITE // Overwrite existing files
447
}
448
449
// Watermark
450
class Watermark implements Serializable {
451
public Watermark(long timestamp);
452
long getTimestamp();
453
}
454
455
// Encoders for StreamingFileSink
456
interface Encoder<IN> extends Serializable {
457
void encode(IN element, OutputStream stream) throws IOException;
458
}
459
460
class SimpleStringEncoder<IN> implements Encoder<IN> {
461
public SimpleStringEncoder();
462
public SimpleStringEncoder(String charset);
463
}
464
465
// Bucket assigners
466
interface BucketAssigner<IN, BucketID> extends Serializable {
467
BucketID getBucketId(IN element, Context context);
468
469
interface Context {
470
long currentProcessingTime();
471
long currentWatermark();
472
Long timestamp();
473
}
474
}
475
476
class DateTimeBucketAssigner<IN> implements BucketAssigner<IN, String> {
477
public DateTimeBucketAssigner(String formatString);
478
public DateTimeBucketAssigner(String formatString, ZoneId zoneId);
479
}
480
```