or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

binary-operations.mddata-transformations.mdexecution-environment.mdgrouping-aggregation.mdindex.mdinput-output.mdpartitioning-distribution.mdtype-system.mdutilities.md

input-output.mddocs/

0

# Input and Output Operations

1

2

Reading data from various sources and writing results to different sinks. These operations handle data ingestion and result persistence.

3

4

## Capabilities

5

6

### File Input Operations

7

8

Read data from various file formats and file systems.

9

10

```scala { .api }

11

class ExecutionEnvironment {

12

/**

13

* Reads a text file as DataSet of strings

14

* @param filePath Path to the text file

15

* @param charsetName Character encoding (default: UTF-8)

16

* @return DataSet of text lines

17

*/

18

def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]

19

20

/**

21

* Reads a text file as DataSet of StringValue objects

22

* @param filePath Path to the text file

23

* @param charsetName Character encoding (default: UTF-8)

24

* @return DataSet of StringValue objects

25

*/

26

def readTextFileWithValue(filePath: String, charsetName: String = "UTF-8"): DataSet[StringValue]

27

28

/**

29

* Reads a CSV file into typed DataSet

30

* @param filePath Path to the CSV file

31

* @return DataSet of parsed CSV records

32

*/

33

def readCsvFile[T: ClassTag: TypeInformation](filePath: String): DataSet[T]

34

35

/**

36

* Reads primitive values from a file

37

* @param filePath Path to the file

38

* @param delimiter Value delimiter (default: newline)

39

* @return DataSet of primitive values

40

*/

41

def readFileOfPrimitives[T: ClassTag: TypeInformation](

42

filePath: String,

43

delimiter: String = "\n"

44

): DataSet[T]

45

46

/**

47

* Reads file using custom input format

48

* @param inputFormat Custom file input format

49

* @param filePath Path to the file

50

* @return DataSet with custom format parsing

51

*/

52

def readFile[T: ClassTag: TypeInformation](

53

inputFormat: FileInputFormat[T],

54

filePath: String

55

): DataSet[T]

56

}

57

```

58

59

**Usage Examples:**

60

61

```scala

62

import org.apache.flink.api.scala._

63

64

val env = ExecutionEnvironment.getExecutionEnvironment

65

66

// Read text file

67

val textLines = env.readTextFile("hdfs://path/to/textfile.txt")

68

69

// Read CSV file with automatic parsing

70

case class Person(name: String, age: Int, city: String)

71

val people = env.readCsvFile[Person]("hdfs://path/to/people.csv")

72

73

// Read primitive values

74

val numbers = env.readFileOfPrimitives[Int]("hdfs://path/to/numbers.txt")

75

76

// Read with different encoding

77

val utf16Text = env.readTextFile("hdfs://path/to/file.txt", "UTF-16")

78

```

79

80

### Custom Input Formats

81

82

Create DataSets from custom input sources and formats.

83

84

```scala { .api }

85

class ExecutionEnvironment {

86

/**

87

* Creates DataSet from custom input format

88

* @param inputFormat Custom input format implementation

89

* @return DataSet using custom input

90

*/

91

def createInput[T: ClassTag: TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]

92

}

93

```

94

95

**Usage Examples:**

96

97

```scala

98

import org.apache.flink.api.common.io.InputFormat

99

100

// Custom input format for reading JSON files

101

class JsonInputFormat[T: ClassTag] extends InputFormat[T, FileInputSplit] {

102

// Implementation for reading JSON data

103

}

104

105

val jsonData = env.createInput(new JsonInputFormat[MyDataClass])

106

```

107

108

### Collection Input Sources

109

110

Create DataSets from in-memory collections and sequences.

111

112

```scala { .api }

113

class ExecutionEnvironment {

114

/**

115

* Creates DataSet from an iterable collection

116

* @param data Iterable collection of elements

117

* @return DataSet containing the collection elements

118

*/

119

def fromCollection[T: TypeInformation: ClassTag](data: Iterable[T]): DataSet[T]

120

121

/**

122

* Creates DataSet from an iterator

123

* @param data Iterator of elements

124

* @return DataSet containing the iterator elements

125

*/

126

def fromCollection[T: TypeInformation: ClassTag](data: Iterator[T]): DataSet[T]

127

128

/**

129

* Creates DataSet from individual elements

130

* @param data Variable arguments of elements

131

* @return DataSet containing the elements

132

*/

133

def fromElements[T: TypeInformation: ClassTag](data: T*): DataSet[T]

134

135

/**

136

* Creates DataSet from a parallel collection

137

* @param iterator Splittable iterator for parallel processing

138

* @return DataSet from parallel collection

139

*/

140

def fromParallelCollection[T: TypeInformation: ClassTag](iterator: SplittableIterator[T]): DataSet[T]

141

142

/**

143

* Generates a sequence of numbers

144

* @param from Starting number (inclusive)

145

* @param to Ending number (inclusive)

146

* @return DataSet containing the number sequence

147

*/

148

def generateSequence(from: Long, to: Long): DataSet[Long]

149

}

150

```

151

152

**Usage Examples:**

153

154

```scala

155

// From Scala collections

156

val listData = env.fromCollection(List(1, 2, 3, 4, 5))

157

val arrayData = env.fromCollection(Array("a", "b", "c"))

158

159

// From individual elements

160

val elementData = env.fromElements("apple", "banana", "cherry")

161

162

// Generate number sequence

163

val numbers = env.generateSequence(1, 1000000)

164

```

165

166

### File Output Operations

167

168

Write DataSets to various file formats and destinations.

169

170

```scala { .api }

171

class DataSet[T] {

172

/**

173

* Writes DataSet as text file

174

* @param filePath Output file path

175

* @param writeMode Write mode (default: NO_OVERWRITE)

176

* @return DataSink for the write operation

177

*/

178

def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = null): DataSink[T]

179

180

/**

181

* Writes DataSet as CSV file

182

* @param filePath Output file path

183

* @param rowDelimiter Row delimiter (default: newline)

184

* @param fieldDelimiter Field delimiter (default: comma)

185

* @param writeMode Write mode (default: NO_OVERWRITE)

186

* @return DataSink for the write operation

187

*/

188

def writeAsCsv(

189

filePath: String,

190

rowDelimiter: String = "\n",

191

fieldDelimiter: String = ",",

192

writeMode: FileSystem.WriteMode = null

193

): DataSink[T]

194

195

/**

196

* Writes using custom file output format

197

* @param outputFormat Custom file output format

198

* @param filePath Output file path

199

* @param writeMode Write mode (default: NO_OVERWRITE)

200

* @return DataSink for the write operation

201

*/

202

def write(

203

outputFormat: FileOutputFormat[T],

204

filePath: String,

205

writeMode: FileSystem.WriteMode = null

206

): DataSink[T]

207

208

/**

209

* Writes using custom output format

210

* @param outputFormat Custom output format implementation

211

* @return DataSink for the write operation

212

*/

213

def output(outputFormat: OutputFormat[T]): DataSink[T]

214

}

215

```

216

217

**Usage Examples:**

218

219

```scala

220

// Write as text file

221

data.writeAsText("hdfs://path/to/output.txt")

222

223

// Write as CSV with custom delimiters

224

people.writeAsCsv("hdfs://path/to/people.csv", "\n", ";")

225

226

// Overwrite existing files

227

results.writeAsText("hdfs://path/to/results.txt", FileSystem.WriteMode.OVERWRITE)

228

```

229

230

### Console Output Operations

231

232

Output DataSets to console for debugging and monitoring.

233

234

```scala { .api }

235

class DataSet[T] {

236

/**

237

* Prints all elements to standard output

238

*/

239

def print(): Unit

240

241

/**

242

* Prints all elements to standard error

243

*/

244

def printToErr(): Unit

245

246

/**

247

* Prints elements on task managers with identifier

248

* @param sinkIdentifier Identifier for the print sink

249

* @return DataSink for the print operation

250

*/

251

def print(sinkIdentifier: String): DataSink[T]

252

253

/**

254

* Prints elements to standard error on task managers with identifier

255

* @param sinkIdentifier Identifier for the print sink

256

* @return DataSink for the print operation

257

*/

258

def printToErr(sinkIdentifier: String): DataSink[T]

259

260

/**

261

* Prints elements on task managers with prefix

262

* @param prefix Prefix for each printed line

263

* @return DataSink for the print operation

264

*/

265

def printOnTaskManager(prefix: String): DataSink[T]

266

}

267

```

268

269

**Usage Examples:**

270

271

```scala

272

// Print to console (for small datasets)

273

smallData.print()

274

275

// Print with identifier in distributed environment

276

data.print("MyDataStream")

277

278

// Print with prefix for identification

279

results.printOnTaskManager("RESULT> ")

280

```

281

282

### Data Collection

283

284

Collect DataSet elements to the driver program for inspection.

285

286

```scala { .api }

287

class DataSet[T] {

288

/**

289

* Collects all elements to the driver program

290

* @return Sequence containing all elements

291

*/

292

def collect(): Seq[T]

293

294

/**

295

* Counts the number of elements in the DataSet

296

* @return Number of elements

297

*/

298

def count(): Long

299

}

300

```

301

302

**Usage Examples:**

303

304

```scala

305

// Collect results (use carefully with large datasets)

306

val results = processedData.collect()

307

results.foreach(println)

308

309

// Count elements

310

val elementCount = largeDataset.count()

311

println(s"Dataset contains $elementCount elements")

312

```

313

314

### Write Modes

315

316

Control behavior when output files already exist.

317

318

```scala { .api }

319

object FileSystem {

320

sealed trait WriteMode

321

case object OVERWRITE extends WriteMode // Overwrite existing files

322

case object NO_OVERWRITE extends WriteMode // Fail if files exist (default)

323

}

324

```

325

326

### Custom Output Formats

327

328

Create custom sinks for specialized output requirements.

329

330

```scala { .api }

331

// Example: Custom output format for writing to databases

332

abstract class OutputFormat[T] {

333

/**

334

* Configures the output format

335

* @param parameters Configuration parameters

336

*/

337

def configure(parameters: Configuration): Unit

338

339

/**

340

* Opens the output format

341

* @param taskNumber Task number

342

* @param numTasks Total number of tasks

343

*/

344

def open(taskNumber: Int, numTasks: Int): Unit

345

346

/**

347

* Writes a record

348

* @param record Record to write

349

*/

350

def writeRecord(record: T): Unit

351

352

/**

353

* Closes the output format

354

*/

355

def close(): Unit

356

}

357

```

358

359

**Usage Examples:**

360

361

```scala

362

// Custom database output format

363

class DatabaseOutputFormat[T] extends OutputFormat[T] {

364

private var connection: Connection = _

365

366

override def configure(parameters: Configuration): Unit = {

367

// Setup database connection parameters

368

}

369

370

override def open(taskNumber: Int, numTasks: Int): Unit = {

371

// Open database connection

372

}

373

374

override def writeRecord(record: T): Unit = {

375

// Write record to database

376

}

377

378

override def close(): Unit = {

379

// Close database connection

380

}

381

}

382

383

// Use custom output format

384

data.output(new DatabaseOutputFormat[Person])

385

```

386

387

### Broadcast Variables

388

389

Access side inputs in operations through broadcast variables.

390

391

```scala { .api }

392

class DataSet[T] {

393

/**

394

* Adds a broadcast DataSet that can be accessed in transformations

395

* @param data DataSet to broadcast

396

* @param name Name for accessing the broadcast data

397

* @return DataSet with broadcast variable configured

398

*/

399

def withBroadcastSet(data: DataSet[_], name: String): DataSet[T]

400

}

401

402

// In function implementations, access broadcast data:

403

abstract class RichMapFunction[T, O] extends MapFunction[T, O] {

404

/**

405

* Gets broadcast variable by name

406

* @param name Broadcast variable name

407

* @return List of broadcast elements

408

*/

409

def getBroadcastVariable[X](name: String): java.util.List[X]

410

411

/**

412

* Gets broadcast variable with hint

413

* @param name Broadcast variable name

414

* @param hint Broadcast variable hint

415

* @return Broadcast variable with hint

416

*/

417

def getBroadcastVariableWithInitializer[X, Y](

418

name: String,

419

hint: BroadcastVariableInitializer[X, Y]

420

): Y

421

}

422

```

423

424

**Usage Examples:**

425

426

```scala

427

import org.apache.flink.api.common.functions.RichMapFunction

428

import org.apache.flink.configuration.Configuration

429

430

// Broadcast lookup data

431

val lookupData = env.fromElements(("key1", "value1"), ("key2", "value2"))

432

433

// Use broadcast data in transformation

434

val enrichedData = data

435

.map(new RichMapFunction[String, String] {

436

var lookup: Map[String, String] = _

437

438

override def open(parameters: Configuration): Unit = {

439

val broadcastData = getBroadcastVariable[(String, String)]("lookup")

440

lookup = broadcastData.asScala.toMap

441

}

442

443

override def map(value: String): String = {

444

lookup.getOrElse(value, "unknown")

445

}

446

})

447

.withBroadcastSet(lookupData, "lookup")

448

```

449

450

## Types

451

452

```scala { .api }

453

trait InputFormat[T, InputSplit] {

454

/**

455

* Configures the input format

456

* @param parameters Configuration parameters

457

*/

458

def configure(parameters: Configuration): Unit

459

460

/**

461

* Creates input splits for parallel reading

462

* @param minNumSplits Minimum number of splits

463

* @return Array of input splits

464

*/

465

def createInputSplits(minNumSplits: Int): Array[InputSplit]

466

467

/**

468

* Opens an input split for reading

469

* @param split Input split to open

470

*/

471

def open(split: InputSplit): Unit

472

473

/**

474

* Checks if more records are available

475

* @return True if more records available

476

*/

477

def reachedEnd(): Boolean

478

479

/**

480

* Reads the next record

481

* @param reuse Object to reuse for the record

482

* @return Next record or null if end reached

483

*/

484

def nextRecord(reuse: T): T

485

486

/**

487

* Closes the input format

488

*/

489

def close(): Unit

490

}

491

492

abstract class FileInputFormat[T] extends InputFormat[T, FileInputSplit] {

493

/**

494

* Sets the path to read from

495

* @param filePath File path

496

*/

497

def setFilePath(filePath: Path): Unit

498

499

/**

500

* Sets file paths to read from

501

* @param filePaths Array of file paths

502

*/

503

def setFilePaths(filePaths: Path*): Unit

504

}

505

506

class DataSink[T] {

507

/**

508

* Sets the parallelism for this sink

509

* @param parallelism Degree of parallelism

510

* @return DataSink with specified parallelism

511

*/

512

def setParallelism(parallelism: Int): DataSink[T]

513

514

/**

515

* Gets the parallelism of this sink

516

* @return Current parallelism setting

517

*/

518

def getParallelism: Int

519

520

/**

521

* Sets the name for this sink

522

* @param name Sink name

523

* @return DataSink with specified name

524

*/

525

def name(name: String): DataSink[T]

526

}

527

528

trait SplittableIterator[T] extends Iterator[T] {

529

/**

530

* Splits the iterator into multiple iterators

531

* @param numPartitions Number of partitions

532

* @return Array of split iterators

533

*/

534

def split(numPartitions: Int): Array[Iterator[T]]

535

536

/**

537

* Gets the maximum number of splits

538

* @return Maximum number of splits

539

*/

540

def getMaximumNumberOfSplits: Int

541

}

542

543

class StringValue extends Comparable[StringValue] {

544

/**

545

* Creates StringValue from string

546

* @param value String value

547

*/

548

def this(value: String)

549

550

/**

551

* Gets the string value

552

* @return String value

553

*/

554

def getValue: String

555

556

/**

557

* Sets the string value

558

* @param value String value

559

*/

560

def setValue(value: String): Unit

561

}

562

```