or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregation-grouping.mddata-input-output.mddataset-operations.mdexecution-environments.mdindex.mditeration-operations.mdjoin-cogroup-operations.mdutility-functions.md

data-input-output.mddocs/

0

# Data Input and Output

1

2

Comprehensive I/O capabilities for reading from and writing to various data formats and storage systems. Flink provides built-in support for common formats like text, CSV, and custom input/output formats.

3

4

## Capabilities

5

6

### Data Sources

7

8

Methods for creating DataSets from various data sources.

9

10

```java { .api }

11

/**

12

* Create DataSet from Java collection

13

* @param data the collection to create DataSet from

14

* @return DataSet containing collection elements

15

*/

16

public <T> DataSet<T> fromCollection(Collection<T> data);

17

18

/**

19

* Create DataSet from individual elements

20

* @param data the elements to include in the DataSet

21

* @return DataSet containing the specified elements

22

*/

23

@SafeVarargs

24

public final <T> DataSet<T> fromElements(T... data);

25

26

/**

27

* Read text file line by line

28

* @param filePath path to the text file

29

* @return DataSet where each element is a line from the file

30

*/

31

public DataSet<String> readTextFile(String filePath);

32

33

/**

34

* Read text file line by line with specific character encoding

35

* @param filePath path to the text file

36

* @param charsetName the charset name for decoding the file

37

* @return DataSet where each element is a line from the file

38

*/

39

public DataSet<String> readTextFile(String filePath, String charsetName);

40

41

/**

42

* Read text file as StringValue objects

43

* @param filePath path to the text file

44

* @return DataSet where each element is a StringValue from the file

45

*/

46

public DataSource<StringValue> readTextFileWithValue(String filePath);

47

48

/**

49

* Read text file as StringValue objects with charset and error handling

50

* @param filePath path to the text file

51

* @param charsetName the charset name for decoding the file

52

* @param skipInvalidLines whether to skip lines that cannot be decoded

53

* @return DataSet where each element is a StringValue from the file

54

*/

55

public DataSource<StringValue> readTextFileWithValue(String filePath, String charsetName, boolean skipInvalidLines);

56

57

/**

58

* Read file containing primitive values

59

* @param filePath path to the file

60

* @param typeClass the class of the primitive type

61

* @return DataSet with elements of the primitive type

62

*/

63

public <X> DataSource<X> readFileOfPrimitives(String filePath, Class<X> typeClass);

64

65

/**

66

* Read file containing primitive values with custom delimiter

67

* @param filePath path to the file

68

* @param delimiter the delimiter separating values

69

* @param typeClass the class of the primitive type

70

* @return DataSet with elements of the primitive type

71

*/

72

public <X> DataSource<X> readFileOfPrimitives(String filePath, String delimiter, Class<X> typeClass);

73

74

/**

75

* Read file using custom input format

76

* @param inputFormat the input format to use for reading

77

* @param filePath path to the file

78

* @return DataSet with elements read by the input format

79

*/

80

public <T> DataSet<T> readFile(FileInputFormat<T> inputFormat, String filePath);

81

82

/**

83

* Generate sequence of numbers

84

* @param from starting number (inclusive)

85

* @param to ending number (inclusive)

86

* @return DataSet containing the number sequence

87

*/

88

public DataSet<Long> generateSequence(long from, long to);

89

```

90

91

**Usage Examples:**

92

93

```java

94

// From collection

95

List<String> words = Arrays.asList("hello", "world", "flink");

96

DataSet<String> wordsDataSet = env.fromCollection(words);

97

98

// From elements

99

DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);

100

101

// Read text file

102

DataSet<String> textData = env.readTextFile("/path/to/input.txt");

103

104

// Generate sequence

105

DataSet<Long> sequence = env.generateSequence(1, 1000);

106

```

107

108

### CSV Reading

109

110

Specialized CSV reader with extensive configuration options.

111

112

```java { .api }

113

/**

114

* Create CSV reader for structured data reading

115

* @param filePath path to the CSV file

116

* @return CsvReader for configuration and DataSet creation

117

*/

118

public CsvReader readCsvFile(String filePath);

119

```

120

121

### CsvReader Configuration

122

123

The CsvReader class provides fluent API for CSV configuration.

124

125

```java { .api }

126

/**

127

* CSV reader with configuration options

128

*/

129

public class CsvReader {

130

/**

131

* Set line delimiter (default: newline)

132

* @param delimiter the line delimiter

133

* @return CsvReader for method chaining

134

*/

135

public CsvReader lineDelimiter(String delimiter);

136

137

/**

138

* Set field delimiter (default: comma)

139

* @param delimiter the field delimiter

140

* @return CsvReader for method chaining

141

*/

142

public CsvReader fieldDelimiter(String delimiter);

143

144

/**

145

* Include only specific fields by position

146

* @param fields boolean array indicating which fields to include

147

* @return CsvReader for method chaining

148

*/

149

public CsvReader includeFields(boolean... fields);

150

151

/**

152

* Ignore the first line (header row)

153

* @return CsvReader for method chaining

154

*/

155

public CsvReader ignoreFirstLine();

156

157

/**

158

* Ignore lines that cannot be parsed

159

* @return CsvReader for method chaining

160

*/

161

public CsvReader ignoreInvalidLines();

162

163

/**

164

* Parse CSV into POJO objects

165

* @param pojoType the POJO class type

166

* @param pojoFields the field names in order

167

* @return DataSource of POJO objects

168

*/

169

public <T> DataSource<T> pojoType(Class<T> pojoType, String... pojoFields);

170

171

/**

172

* Parse CSV with specified field types

173

* @param fieldTypes the types for each field

174

* @return DataSource with typed tuples

175

*/

176

public DataSource<?> types(Class<?>... fieldTypes);

177

}

178

```

179

180

**Usage Examples:**

181

182

```java

183

// Basic CSV reading

184

DataSet<Tuple3<String, Integer, Double>> csvData = env

185

.readCsvFile("/path/to/data.csv")

186

.fieldDelimiter(",")

187

.lineDelimiter("\n")

188

.ignoreFirstLine()

189

.types(String.class, Integer.class, Double.class);

190

191

// CSV to POJO

192

public static class Person {

193

public String name;

194

public Integer age;

195

public String city;

196

}

197

198

DataSet<Person> people = env

199

.readCsvFile("/path/to/people.csv")

200

.ignoreFirstLine()

201

.pojoType(Person.class, "name", "age", "city");

202

203

// Selective field reading

204

DataSet<Tuple2<String, Integer>> nameAge = env

205

.readCsvFile("/path/to/people.csv")

206

.includeFields(true, true, false) // name, age, skip city

207

.types(String.class, Integer.class);

208

```

209

210

### Input Formats

211

212

Built-in input formats for reading various data types.

213

214

```java { .api }

215

/**

216

* Input format for reading text files line by line

217

*/

218

public class TextInputFormat extends FileInputFormat<String> {

219

// Reads text files, each line becomes a String element

220

}

221

222

/**

223

* Input format for reading text files as StringValue objects

224

*/

225

public class TextValueInputFormat extends FileInputFormat<StringValue> {

226

// Reads text files as StringValue for better memory efficiency

227

}

228

229

/**

230

* Input format for reading from Java collections

231

*/

232

public class CollectionInputFormat<T> implements InputFormat<T, GenericInputSplit> {

233

/**

234

* Create input format from collection

235

* @param dataSet the collection to read from

236

* @param serializer serializer for the data type

237

*/

238

public CollectionInputFormat(Collection<T> dataSet, TypeSerializer<T> serializer);

239

}

240

241

/**

242

* Input format for reading from iterators

243

*/

244

public class IteratorInputFormat<T> implements InputFormat<T, GenericInputSplit> {

245

/**

246

* Create input format from iterator

247

* @param iterator the iterator to read from

248

* @param serializer serializer for the data type

249

*/

250

public IteratorInputFormat(Iterator<T> iterator, TypeSerializer<T> serializer);

251

}

252

253

/**

254

* Input format for reading primitive types

255

*/

256

public class PrimitiveInputFormat<T> extends FileInputFormat<T> {

257

/**

258

* Create input format for primitive types

259

* @param filePath path to the file

260

* @param delimiter delimiter between values

261

* @param typeClass the primitive type class

262

*/

263

public PrimitiveInputFormat(Path filePath, String delimiter, Class<T> typeClass);

264

}

265

266

/**

267

* CSV input format for Row objects

268

*/

269

public class RowCsvInputFormat extends CsvInputFormat<Row> {

270

// Specialized CSV format for Row-based data

271

}

272

```

273

274

### Data Sinks and Output Operations

275

276

Methods for writing DataSet content to external systems.

277

278

```java { .api }

279

/**

280

* Write DataSet as text file

281

* @param filePath path where to write the file

282

* @return DataSink for execution

283

*/

284

public DataSink<T> writeAsText(String filePath);

285

286

/**

287

* Write DataSet as text file with write mode

288

* @param filePath path where to write the file

289

* @param writeMode OVERWRITE or NO_OVERWRITE

290

* @return DataSink for execution

291

*/

292

public DataSink<T> writeAsText(String filePath, WriteMode writeMode);

293

294

/**

295

* Write DataSet as CSV file

296

* @param filePath path where to write the CSV file

297

* @return DataSink for execution

298

*/

299

public DataSink<T> writeAsCsv(String filePath);

300

301

/**

302

* Write with custom text formatter

303

* @param filePath path where to write the file

304

* @param formatter custom text formatter

305

* @return DataSink for execution

306

*/

307

public DataSink<T> writeAsFormattedText(String filePath, TextFormatter<T> formatter);

308

309

/**

310

* Write using custom output format

311

* @param outputFormat the output format to use

312

* @param filePath path where to write

313

* @return DataSink for execution

314

*/

315

public DataSink<T> write(OutputFormat<T> outputFormat, String filePath);

316

317

/**

318

* Output using custom output format (no file path)

319

* @param outputFormat the output format to use

320

* @return DataSink for execution

321

*/

322

public DataSink<T> output(OutputFormat<T> outputFormat);

323

```

324

325

### Debug Output Operations

326

327

Operations for debugging and development.

328

329

```java { .api }

330

/**

331

* Print DataSet content to standard output (executes immediately)

332

* @throws Exception if printing fails

333

*/

334

public void print() throws Exception;

335

336

/**

337

* Print DataSet content to standard error (executes immediately)

338

* @throws Exception if printing fails

339

*/

340

public void printToErr() throws Exception;

341

342

/**

343

* Print DataSet content with identifier to standard output

344

* @param sinkIdentifier identifier for the print sink

345

* @return DataSink for execution

346

*/

347

public DataSink<T> print(String sinkIdentifier);

348

349

/**

350

* Print DataSet content with identifier to standard error

351

* @param sinkIdentifier identifier for the print sink

352

* @return DataSink for execution

353

*/

354

public DataSink<T> printToErr(String sinkIdentifier);

355

356

/**

357

* Print DataSet content on task manager with prefix (for debugging)

358

* @param prefix prefix for the printed output

359

* @return DataSink for execution

360

*/

361

public DataSink<T> printOnTaskManager(String prefix);

362

```

363

364

**Usage Examples:**

365

366

```java

367

// Write as text

368

DataSet<String> result = processedData;

369

result.writeAsText("/path/to/output.txt", WriteMode.OVERWRITE);

370

371

// Write as CSV

372

DataSet<Tuple3<String, Integer, Double>> data = getData();

373

data.writeAsCsv("/path/to/output.csv");

374

375

// Print for debugging

376

result.print();

377

378

// Custom formatter

379

result.writeAsFormattedText("/path/to/formatted.txt", new TextFormatter<String>() {

380

@Override

381

public String format(String record) {

382

return "Record: " + record;

383

}

384

});

385

```

386

387

### Output Formats

388

389

Built-in output formats for writing data in various formats.

390

391

```java { .api }

392

/**

393

* Output format for writing text files

394

*/

395

public class TextOutputFormat<T> extends FileOutputFormat<T> {

396

/**

397

* Create text output format

398

* @param outputPath path to write the output

399

*/

400

public TextOutputFormat(Path outputPath);

401

}

402

403

/**

404

* Output format for writing CSV files

405

*/

406

public class CsvOutputFormat<T> extends FileOutputFormat<T> {

407

/**

408

* Create CSV output format

409

* @param outputPath path to write the CSV

410

*/

411

public CsvOutputFormat(Path outputPath);

412

413

/**

414

* Set field delimiter

415

* @param fieldDelimiter delimiter between fields

416

*/

417

public void setFieldDelimiter(String fieldDelimiter);

418

419

/**

420

* Set record delimiter

421

* @param recordDelimiter delimiter between records

422

*/

423

public void setRecordDelimiter(String recordDelimiter);

424

}

425

426

/**

427

* Output format for printing to stdout/stderr

428

*/

429

public class PrintingOutputFormat<T> implements OutputFormat<T> {

430

/**

431

* Create printing output format

432

* @param targetStream target stream (System.out or System.err)

433

* @param sinkIdentifier identifier for the sink

434

*/

435

public PrintingOutputFormat(PrintStream targetStream, String sinkIdentifier);

436

}

437

438

/**

439

* Output format for collecting to local collection

440

*/

441

public class LocalCollectionOutputFormat<T> implements OutputFormat<T> {

442

/**

443

* Create local collection output format

444

* @param out the collection to write to

445

*/

446

public LocalCollectionOutputFormat(List<T> out);

447

}

448

449

/**

450

* Output format that discards all records (for testing)

451

*/

452

public class DiscardingOutputFormat<T> implements OutputFormat<T> {

453

// Discards all records - useful for performance testing

454

}

455

```

456

457

### File System Support

458

459

Write mode options for file operations.

460

461

```java { .api }

462

/**

463

* Write mode for file operations

464

*/

465

public enum WriteMode {

466

/** Overwrite existing files */

467

OVERWRITE,

468

/** Fail if file already exists */

469

NO_OVERWRITE

470

}

471

```

472

473

### Data Properties

474

475

Configure data properties for input splits.

476

477

```java { .api }

478

/**

479

* Properties for data splits

480

*/

481

public class SplitDataProperties<T> {

482

/**

483

* Specify that data is sorted by given fields

484

* @param fields the fields by which data is sorted

485

* @return configured properties

486

*/

487

public SplitDataProperties<T> splitsPartitionedBy(int... fields);

488

489

/**

490

* Specify grouping properties

491

* @param fields the fields by which data is grouped

492

* @return configured properties

493

*/

494

public SplitDataProperties<T> splitsGroupedBy(int... fields);

495

}

496

```

497

498

## Types

499

500

```java { .api }

501

import org.apache.flink.api.java.io.*;

502

import org.apache.flink.api.java.operators.DataSource;

503

import org.apache.flink.api.java.operators.DataSink;

504

import org.apache.flink.api.common.io.FileInputFormat;

505

import org.apache.flink.api.common.io.FileOutputFormat;

506

import org.apache.flink.api.common.io.InputFormat;

507

import org.apache.flink.api.common.io.OutputFormat;

508

import org.apache.flink.core.fs.FileSystem.WriteMode;

509

import org.apache.flink.types.StringValue;

510

import org.apache.flink.types.Row;

511

import java.util.Collection;

512

import java.util.List;

513

```