or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-io.mddata-streams.mdexecution-environment.mdindex.mdkeyed-streams.mdprocessing-functions.mdsinks-output.mdstream-connections.mdwindow-functions.mdwindowing.md

sinks-output.mddocs/

0

# Sinks and Output

1

2

Output operations for writing processed data to external systems, monitoring stream results, and collecting data for analysis. This includes various sink types, output formats, and result collection methods.

3

4

## Capabilities

5

6

### Basic Output Operations

7

8

Simple output operations for debugging and monitoring.

9

10

```scala { .api }

11

class DataStream[T] {

12

/**

13

* Print stream elements to standard output

14

* @return DataStreamSink for output configuration

15

*/

16

def print(): DataStreamSink[T]

17

18

/**

19

* Print stream elements with a sink identifier

20

* @param sinkIdentifier Identifier to prefix output lines

21

* @return DataStreamSink for output configuration

22

*/

23

def print(sinkIdentifier: String): DataStreamSink[T]

24

25

/**

26

* Print stream elements to standard error

27

* @return DataStreamSink for output configuration

28

*/

29

def printToErr(): DataStreamSink[T]

30

31

/**

32

* Print stream elements to standard error with identifier

33

* @param sinkIdentifier Identifier to prefix output lines

34

* @return DataStreamSink for output configuration

35

*/

36

def printToErr(sinkIdentifier: String): DataStreamSink[T]

37

}

38

```

39

40

**Usage Examples:**

41

42

```scala

43

import org.apache.flink.streaming.api.scala._

44

45

val numbers = env.fromElements(1, 2, 3, 4, 5)

46

47

// Simple print to stdout

48

numbers.print()

49

50

// Print with identifier (useful for multiple sinks)

51

numbers.print("NumberStream")

52

53

// Print to stderr

54

numbers.printToErr("ErrorStream")

55

```

56

57

### File Output Operations

58

59

Write stream data to files (deprecated but still available).

60

61

```scala { .api }

62

class DataStream[T] {

63

/**

64

* Write stream elements as text to a file (deprecated)

65

* @param path Output file path

66

* @return DataStreamSink for file output

67

*/

68

@deprecated("Use FileSink instead", "1.19.0")

69

def writeAsText(path: String): DataStreamSink[T]

70

71

/**

72

* Write stream elements as text with write mode (deprecated)

73

* @param path Output file path

74

* @param writeMode File write mode (overwrite/no_overwrite)

75

* @return DataStreamSink for file output

76

*/

77

@deprecated("Use FileSink instead", "1.19.0")

78

def writeAsText(path: String, writeMode: FileSystem.WriteMode): DataStreamSink[T]

79

80

/**

81

* Write stream elements as CSV (deprecated)

82

* @param path Output file path

83

* @return DataStreamSink for CSV output

84

*/

85

@deprecated("Use FileSink instead", "1.19.0")

86

def writeAsCsv(path: String): DataStreamSink[T]

87

88

/**

89

* Write stream elements as CSV with custom delimiters (deprecated)

90

* @param path Output file path

91

* @param writeMode File write mode

92

* @param rowDelimiter Row delimiter character

93

* @param fieldDelimiter Field delimiter character

94

* @return DataStreamSink for CSV output

95

*/

96

@deprecated("Use FileSink instead", "1.19.0")

97

def writeAsCsv(

98

path: String,

99

writeMode: FileSystem.WriteMode,

100

rowDelimiter: String,

101

fieldDelimiter: String

102

): DataStreamSink[T]

103

}

104

```

105

106

### Custom Sink Functions

107

108

Apply custom sink functions for specialized output handling.

109

110

```scala { .api }

111

class DataStream[T] {

112

/**

113

* Add a custom sink function

114

* @param sinkFunction SinkFunction implementation

115

* @return DataStreamSink for sink configuration

116

*/

117

def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T]

118

119

/**

120

* Add a sink using function syntax

121

* @param fun Function to process elements

122

* @return DataStreamSink for sink configuration

123

*/

124

def addSink(fun: T => Unit): DataStreamSink[T]

125

126

/**

127

* Use new Sink interface (recommended)

128

* @param sink Sink implementation

129

* @return DataStreamSink for sink configuration

130

*/

131

def sinkTo(sink: org.apache.flink.api.connector.sink.Sink[T, _, _, _]): DataStreamSink[T]

132

133

/**

134

* Use new Sink interface with Scala wrapper

135

* @param sink Sink implementation

136

* @return DataStreamSink for sink configuration

137

*/

138

def sinkTo(sink: Sink[T]): DataStreamSink[T]

139

}

140

```

141

142

**Usage Examples:**

143

144

```scala

145

import org.apache.flink.streaming.api.scala._

146

import org.apache.flink.streaming.api.functions.sink.SinkFunction

147

import org.apache.flink.api.connector.sink2.Sink

148

149

case class SensorReading(sensorId: String, temperature: Double, timestamp: Long)

150

151

val readings = env.fromElements(

152

SensorReading("sensor1", 20.0, 1000L),

153

SensorReading("sensor2", 25.0, 2000L)

154

)

155

156

// Custom sink function

157

class LoggingSink extends SinkFunction[SensorReading] {

158

override def invoke(value: SensorReading, context: SinkFunction.Context): Unit = {

159

println(s"Logging: ${value.sensorId} - ${value.temperature}°C at ${value.timestamp}")

160

// Could write to database, send to external system, etc.

161

}

162

}

163

164

readings.addSink(new LoggingSink)

165

166

// Using function syntax

167

readings.addSink(reading =>

168

println(s"Processing: ${reading.sensorId} - ${reading.temperature}°C")

169

)

170

171

// Using new Sink interface (example with FileSink)

172

import org.apache.flink.connector.file.sink.FileSink

173

import org.apache.flink.core.fs.Path

174

import org.apache.flink.formats.parquet.avro.AvroParquetWriters

175

176

val fileSink = FileSink

177

.forRowFormat(new Path("/path/to/output"), new SimpleStringEncoder[SensorReading]("UTF-8"))

178

.build()

179

180

readings.map(_.toString).sinkTo(fileSink)

181

```

182

183

### Output Format Operations

184

185

Use custom output formats for structured data writing.

186

187

```scala { .api }

188

class DataStream[T] {

189

/**

190

* Write using a custom OutputFormat

191

* @param format OutputFormat implementation

192

* @return DataStreamSink for output configuration

193

*/

194

def writeUsingOutputFormat(format: OutputFormat[T]): DataStreamSink[T]

195

}

196

```

197

198

### Socket Output

199

200

Write stream data to network sockets.

201

202

```scala { .api }

203

class DataStream[T] {

204

/**

205

* Write elements to a network socket

206

* @param hostname Target hostname

207

* @param port Target port

208

* @param schema Serialization schema for elements

209

* @return DataStreamSink for socket output

210

*/

211

def writeToSocket(hostname: String, port: Integer, schema: SerializationSchema[T]): DataStreamSink[T]

212

}

213

```

214

215

### Result Collection

216

217

Collect stream results for local processing and analysis.

218

219

```scala { .api }

220

class DataStream[T] {

221

/**

222

* Execute and collect all results (blocking operation)

223

* @return CloseableIterator of all stream elements

224

*/

225

def executeAndCollect(): CloseableIterator[T]

226

227

/**

228

* Execute and collect all results with job name (blocking operation)

229

* @param jobExecutionName Name for the execution job

230

* @return CloseableIterator of all stream elements

231

*/

232

def executeAndCollect(jobExecutionName: String): CloseableIterator[T]

233

234

/**

235

* Execute and collect limited results (blocking operation)

236

* @param limit Maximum number of elements to collect

237

* @return List of collected elements

238

*/

239

def executeAndCollect(limit: Int): List[T]

240

241

/**

242

* Execute and collect limited results with job name (blocking operation)

243

* @param jobExecutionName Name for the execution job

244

* @param limit Maximum number of elements to collect

245

* @return List of collected elements

246

*/

247

def executeAndCollect(jobExecutionName: String, limit: Int): List[T]

248

249

/**

250

* Collect results asynchronously (non-blocking)

251

* @return CloseableIterator for async result access

252

*/

253

def collectAsync(): CloseableIterator[T]

254

255

/**

256

* Collect results using a custom collector

257

* @param collector Custom collector implementation

258

*/

259

def collectAsync(collector: JavaStream.Collector[T]): Unit

260

}

261

```

262

263

**Usage Examples:**

264

265

```scala

266

import org.apache.flink.streaming.api.scala._

267

268

val numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

269

val evenNumbers = numbers.filter(_ % 2 == 0)

270

271

// Collect all results (for small datasets only)

272

val allResults = evenNumbers.executeAndCollect()

273

allResults.asScala.foreach(println)

274

allResults.close()

275

276

// Collect limited results

277

val firstThree = evenNumbers.executeAndCollect(3)

278

println(s"First 3 even numbers: $firstThree")

279

280

// Async collection (non-blocking)

281

val asyncResults = evenNumbers.collectAsync()

282

// Process results as they arrive

283

while (asyncResults.hasNext) {

284

println(s"Received: ${asyncResults.next()}")

285

}

286

asyncResults.close()

287

```

288

289

### DataStreamSink Configuration

290

291

Configure sink behavior and properties.

292

293

```scala { .api }

294

class DataStreamSink[T] {

295

/**

296

* Set the parallelism for this sink

297

* @param parallelism Sink parallelism

298

* @return This sink for chaining

299

*/

300

def setParallelism(parallelism: Int): DataStreamSink[T]

301

302

/**

303

* Set a name for this sink operator

304

* @param name Operator name

305

* @return This sink for chaining

306

*/

307

def name(name: String): DataStreamSink[T]

308

309

/**

310

* Set a unique identifier for this sink

311

* @param uid Unique identifier

312

* @return This sink for chaining

313

*/

314

def uid(uid: String): DataStreamSink[T]

315

316

/**

317

* Set a description for this sink

318

* @param description Operator description

319

* @return This sink for chaining

320

*/

321

def setDescription(description: String): DataStreamSink[T]

322

323

/**

324

* Disable chaining for this sink

325

* @return This sink for chaining

326

*/

327

def disableChaining(): DataStreamSink[T]

328

329

/**

330

* Set slot sharing group for this sink

331

* @param slotSharingGroup Slot sharing group name

332

* @return This sink for chaining

333

*/

334

def slotSharingGroup(slotSharingGroup: String): DataStreamSink[T]

335

}

336

```

337

338

**Usage Examples:**

339

340

```scala

341

val readings = env.fromElements(

342

SensorReading("sensor1", 20.0, 1000L),

343

SensorReading("sensor2", 25.0, 2000L)

344

)

345

346

// Configure sink properties

347

readings

348

.addSink(new LoggingSink)

349

.setParallelism(2)

350

.name("Sensor Logging Sink")

351

.uid("sensor-logging-sink-v1")

352

.setDescription("Logs sensor readings to external system")

353

```

354

355

## Types

356

357

```scala { .api }

358

// Sink function interface

359

trait SinkFunction[T] {

360

/**

361

* Process a single element

362

* @param value Element to process

363

* @param context Sink context

364

*/

365

def invoke(value: T, context: SinkFunction.Context): Unit

366

367

trait Context {

368

def currentProcessingTime(): Long

369

def currentWatermark(): Long

370

def timestamp(): Long

371

}

372

}

373

374

// Rich sink function with lifecycle methods

375

abstract class RichSinkFunction[T] extends SinkFunction[T] with RichFunction {

376

override def open(parameters: Configuration): Unit = {}

377

override def close(): Unit = {}

378

}

379

380

// Output format interface

381

trait OutputFormat[T] {

382

def configure(parameters: Configuration): Unit

383

def open(taskNumber: Int, numTasks: Int): Unit

384

def writeRecord(record: T): Unit

385

def close(): Unit

386

}

387

388

// Serialization schema for socket output

389

trait SerializationSchema[T] {

390

def serialize(element: T): Array[Byte]

391

}

392

393

// File system write modes

394

object WriteMode extends Enumeration {

395

val NO_OVERWRITE, OVERWRITE = Value

396

}

397

398

// Closeable iterator for result collection

399

trait CloseableIterator[T] extends Iterator[T] with AutoCloseable {

400

def hasNext: Boolean

401

def next(): T

402

def close(): Unit

403

}

404

405

// New Sink interface (recommended)

406

trait Sink[IN] {

407

def createWriter(context: InitContext): SinkWriter[IN]

408

409

trait InitContext {

410

def getSubtaskId: Int

411

def getNumberOfParallelSubtasks: Int

412

def getRestoredCheckpointId: Option[Long]

413

}

414

}

415

416

trait SinkWriter[IN] extends AutoCloseable {

417

def write(element: IN, context: Context): Unit

418

def flush(endOfInput: Boolean): Unit

419

def close(): Unit

420

421

trait Context {

422

def timestamp(): Long

423

def currentWatermark(): Long

424

}

425

}

426

427

// Built-in encoders

428

class SimpleStringEncoder[T](charset: String) extends Encoder[T] {

429

def encode(element: T, stream: OutputStream): Unit

430

}

431

```