0
# Sinks and Output
1
2
Output operations for writing processed data to external systems, monitoring stream results, and collecting data for analysis. This includes various sink types, output formats, and result collection methods.
3
4
## Capabilities
5
6
### Basic Output Operations
7
8
Simple output operations for debugging and monitoring.
9
10
```scala { .api }
11
class DataStream[T] {
12
/**
13
* Print stream elements to standard output
14
* @return DataStreamSink for output configuration
15
*/
16
def print(): DataStreamSink[T]
17
18
/**
19
* Print stream elements with a sink identifier
20
* @param sinkIdentifier Identifier to prefix output lines
21
* @return DataStreamSink for output configuration
22
*/
23
def print(sinkIdentifier: String): DataStreamSink[T]
24
25
/**
26
* Print stream elements to standard error
27
* @return DataStreamSink for output configuration
28
*/
29
def printToErr(): DataStreamSink[T]
30
31
/**
32
* Print stream elements to standard error with identifier
33
* @param sinkIdentifier Identifier to prefix output lines
34
* @return DataStreamSink for output configuration
35
*/
36
def printToErr(sinkIdentifier: String): DataStreamSink[T]
37
}
38
```
39
40
**Usage Examples:**
41
42
```scala
43
import org.apache.flink.streaming.api.scala._
44
45
val numbers = env.fromElements(1, 2, 3, 4, 5)
46
47
// Simple print to stdout
48
numbers.print()
49
50
// Print with identifier (useful for multiple sinks)
51
numbers.print("NumberStream")
52
53
// Print to stderr
54
numbers.printToErr("ErrorStream")
55
```
56
57
### File Output Operations
58
59
Write stream data to files (deprecated but still available).
60
61
```scala { .api }
62
class DataStream[T] {
63
/**
64
* Write stream elements as text to a file (deprecated)
65
* @param path Output file path
66
* @return DataStreamSink for file output
67
*/
68
@deprecated("Use FileSink instead", "1.19.0")
69
def writeAsText(path: String): DataStreamSink[T]
70
71
/**
72
* Write stream elements as text with write mode (deprecated)
73
* @param path Output file path
74
* @param writeMode File write mode (overwrite/no_overwrite)
75
* @return DataStreamSink for file output
76
*/
77
@deprecated("Use FileSink instead", "1.19.0")
78
def writeAsText(path: String, writeMode: FileSystem.WriteMode): DataStreamSink[T]
79
80
/**
81
* Write stream elements as CSV (deprecated)
82
* @param path Output file path
83
* @return DataStreamSink for CSV output
84
*/
85
@deprecated("Use FileSink instead", "1.19.0")
86
def writeAsCsv(path: String): DataStreamSink[T]
87
88
/**
89
* Write stream elements as CSV with custom delimiters (deprecated)
90
* @param path Output file path
91
* @param writeMode File write mode
92
* @param rowDelimiter Row delimiter character
93
* @param fieldDelimiter Field delimiter character
94
* @return DataStreamSink for CSV output
95
*/
96
@deprecated("Use FileSink instead", "1.19.0")
97
def writeAsCsv(
98
path: String,
99
writeMode: FileSystem.WriteMode,
100
rowDelimiter: String,
101
fieldDelimiter: String
102
): DataStreamSink[T]
103
}
104
```
105
106
### Custom Sink Functions
107
108
Apply custom sink functions for specialized output handling.
109
110
```scala { .api }
111
class DataStream[T] {
112
/**
113
* Add a custom sink function
114
* @param sinkFunction SinkFunction implementation
115
* @return DataStreamSink for sink configuration
116
*/
117
def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T]
118
119
/**
120
* Add a sink using function syntax
121
* @param fun Function to process elements
122
* @return DataStreamSink for sink configuration
123
*/
124
def addSink(fun: T => Unit): DataStreamSink[T]
125
126
/**
127
* Use new Sink interface (recommended)
128
* @param sink Sink implementation
129
* @return DataStreamSink for sink configuration
130
*/
131
def sinkTo(sink: org.apache.flink.api.connector.sink.Sink[T, _, _, _]): DataStreamSink[T]
132
133
/**
134
* Use new Sink interface with Scala wrapper
135
* @param sink Sink implementation
136
* @return DataStreamSink for sink configuration
137
*/
138
def sinkTo(sink: Sink[T]): DataStreamSink[T]
139
}
140
```
141
142
**Usage Examples:**
143
144
```scala
145
import org.apache.flink.streaming.api.scala._
146
import org.apache.flink.streaming.api.functions.sink.SinkFunction
147
import org.apache.flink.api.connector.sink2.Sink
148
149
case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)
150
151
val readings = env.fromElements(
152
SensorReading("sensor1", 20.0, 1000L),
153
SensorReading("sensor2", 25.0, 2000L)
154
)
155
156
// Custom sink function
157
class LoggingSink extends SinkFunction[SensorReading] {
158
override def invoke(value: SensorReading, context: SinkFunction.Context): Unit = {
159
println(s"Logging: ${value.sensorId} - ${value.temperature}°C at ${value.timestamp}")
160
// Could write to database, send to external system, etc.
161
}
162
}
163
164
readings.addSink(new LoggingSink)
165
166
// Using function syntax
167
readings.addSink(reading =>
168
println(s"Processing: ${reading.sensorId} - ${reading.temperature}°C")
169
)
170
171
// Using new Sink interface (example with FileSink)
172
import org.apache.flink.connector.file.sink.FileSink
173
import org.apache.flink.core.fs.Path
174
import org.apache.flink.formats.parquet.avro.AvroParquetWriters
175
176
val fileSink = FileSink
177
.forRowFormat(new Path("/path/to/output"), new SimpleStringEncoder[SensorReading]("UTF-8"))
178
.build()
179
180
readings.map(_.toString).sinkTo(fileSink)
181
```
182
183
### Output Format Operations
184
185
Use custom output formats for structured data writing.
186
187
```scala { .api }
188
class DataStream[T] {
189
/**
190
* Write using a custom OutputFormat
191
* @param format OutputFormat implementation
192
* @return DataStreamSink for output configuration
193
*/
194
def writeUsingOutputFormat(format: OutputFormat[T]): DataStreamSink[T]
195
}
196
```
197
198
### Socket Output
199
200
Write stream data to network sockets.
201
202
```scala { .api }
203
class DataStream[T] {
204
/**
205
* Write elements to a network socket
206
* @param hostname Target hostname
207
* @param port Target port
208
* @param schema Serialization schema for elements
209
* @return DataStreamSink for socket output
210
*/
211
def writeToSocket(hostname: String, port: Integer, schema: SerializationSchema[T]): DataStreamSink[T]
212
}
213
```
214
215
### Result Collection
216
217
Collect stream results for local processing and analysis.
218
219
```scala { .api }
220
class DataStream[T] {
221
/**
222
* Execute and collect all results (blocking operation)
223
* @return CloseableIterator of all stream elements
224
*/
225
def executeAndCollect(): CloseableIterator[T]
226
227
/**
228
* Execute and collect all results with job name (blocking operation)
229
* @param jobExecutionName Name for the execution job
230
* @return CloseableIterator of all stream elements
231
*/
232
def executeAndCollect(jobExecutionName: String): CloseableIterator[T]
233
234
/**
235
* Execute and collect limited results (blocking operation)
236
* @param limit Maximum number of elements to collect
237
* @return List of collected elements
238
*/
239
def executeAndCollect(limit: Int): List[T]
240
241
/**
242
* Execute and collect limited results with job name (blocking operation)
243
* @param jobExecutionName Name for the execution job
244
* @param limit Maximum number of elements to collect
245
* @return List of collected elements
246
*/
247
def executeAndCollect(jobExecutionName: String, limit: Int): List[T]
248
249
/**
250
* Collect results asynchronously (non-blocking)
251
* @return CloseableIterator for async result access
252
*/
253
def collectAsync(): CloseableIterator[T]
254
255
/**
256
* Collect results using a custom collector
257
* @param collector Custom collector implementation
258
*/
259
def collectAsync(collector: JavaStream.Collector[T]): Unit
260
}
261
```
262
263
**Usage Examples:**
264
265
```scala
266
import org.apache.flink.streaming.api.scala._
267
268
val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
269
val evenNumbers = numbers.filter(_ % 2 == 0)
270
271
// Collect all results (for small datasets only)
272
val allResults = evenNumbers.executeAndCollect()
273
allResults.asScala.foreach(println)
274
allResults.close()
275
276
// Collect limited results
277
val firstThree = evenNumbers.executeAndCollect(3)
278
println(s"First 3 even numbers: $firstThree")
279
280
// Async collection (non-blocking)
281
val asyncResults = evenNumbers.collectAsync()
282
// Process results as they arrive
283
while (asyncResults.hasNext) {
284
println(s"Received: ${asyncResults.next()}")
285
}
286
asyncResults.close()
287
```
288
289
### DataStreamSink Configuration
290
291
Configure sink behavior and properties.
292
293
```scala { .api }
294
class DataStreamSink[T] {
295
/**
296
* Set the parallelism for this sink
297
* @param parallelism Sink parallelism
298
* @return This sink for chaining
299
*/
300
def setParallelism(parallelism: Int): DataStreamSink[T]
301
302
/**
303
* Set a name for this sink operator
304
* @param name Operator name
305
* @return This sink for chaining
306
*/
307
def name(name: String): DataStreamSink[T]
308
309
/**
310
* Set a unique identifier for this sink
311
* @param uid Unique identifier
312
* @return This sink for chaining
313
*/
314
def uid(uid: String): DataStreamSink[T]
315
316
/**
317
* Set a description for this sink
318
* @param description Operator description
319
* @return This sink for chaining
320
*/
321
def setDescription(description: String): DataStreamSink[T]
322
323
/**
324
* Disable chaining for this sink
325
* @return This sink for chaining
326
*/
327
def disableChaining(): DataStreamSink[T]
328
329
/**
330
* Set slot sharing group for this sink
331
* @param slotSharingGroup Slot sharing group name
332
* @return This sink for chaining
333
*/
334
def slotSharingGroup(slotSharingGroup: String): DataStreamSink[T]
335
}
336
```
337
338
**Usage Examples:**
339
340
```scala
341
val readings = env.fromElements(
342
SensorReading("sensor1", 20.0, 1000L),
343
SensorReading("sensor2", 25.0, 2000L)
344
)
345
346
// Configure sink properties
347
readings
348
.addSink(new LoggingSink)
349
.setParallelism(2)
350
.name("Sensor Logging Sink")
351
.uid("sensor-logging-sink-v1")
352
.setDescription("Logs sensor readings to external system")
353
```
354
355
## Types
356
357
```scala { .api }
358
// Sink function interface
359
trait SinkFunction[T] {
360
/**
361
* Process a single element
362
* @param value Element to process
363
* @param context Sink context
364
*/
365
def invoke(value: T, context: SinkFunction.Context): Unit
366
367
trait Context {
368
def currentProcessingTime(): Long
369
def currentWatermark(): Long
370
def timestamp(): Long
371
}
372
}
373
374
// Rich sink function with lifecycle methods
375
abstract class RichSinkFunction[T] extends SinkFunction[T] with RichFunction {
376
override def open(parameters: Configuration): Unit = {}
377
override def close(): Unit = {}
378
}
379
380
// Output format interface
381
trait OutputFormat[T] {
382
def configure(parameters: Configuration): Unit
383
def open(taskNumber: Int, numTasks: Int): Unit
384
def writeRecord(record: T): Unit
385
def close(): Unit
386
}
387
388
// Serialization schema for socket output
389
trait SerializationSchema[T] {
390
def serialize(element: T): Array[Byte]
391
}
392
393
// File system write modes
394
object WriteMode extends Enumeration {
395
val NO_OVERWRITE, OVERWRITE = Value
396
}
397
398
// Closeable iterator for result collection
399
trait CloseableIterator[T] extends Iterator[T] with AutoCloseable {
400
def hasNext: Boolean
401
def next(): T
402
def close(): Unit
403
}
404
405
// New Sink interface (recommended)
406
trait Sink[IN] {
407
def createWriter(context: InitContext): SinkWriter[IN]
408
409
trait InitContext {
410
def getSubtaskId: Int
411
def getNumberOfParallelSubtasks: Int
412
def getRestoredCheckpointId: Option[Long]
413
}
414
}
415
416
trait SinkWriter[IN] extends AutoCloseable {
417
def write(element: IN, context: Context): Unit
418
def flush(endOfInput: Boolean): Unit
419
def close(): Unit
420
421
trait Context {
422
def timestamp(): Long
423
def currentWatermark(): Long
424
}
425
}
426
427
// Built-in encoders
428
class SimpleStringEncoder[T](charset: String) extends Encoder[T] {
429
def encode(element: T, stream: OutputStream): Unit
430
}
431
```