or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-writers.mdfile-io-operations.mdindex.mdschema-registry-integration.mdserialization-deserialization.mdtable-api-integration.mdtype-system-integration.md

file-io-operations.mddocs/

0

# File I/O Operations

1

2

Input and output formats for reading and writing Avro files in Flink batch processing scenarios. Provides efficient file-based processing with support for compression, splitting, and type safety.

3

4

## AvroInputFormat

5

6

FileInputFormat implementation for reading Avro files in batch processing jobs.

7

8

```java { .api }

9

public class AvroInputFormat<E> extends FileInputFormat<E>

10

implements ResultTypeQueryable<E>, CheckpointableInputFormat<FileInputSplit, Tuple2<Long, Long>> {

11

12

// Constructor

13

public AvroInputFormat(Path filePath, Class<E> type);

14

15

// Configuration methods

16

public void setReuseAvroValue(boolean reuseAvroValue);

17

public void setUnsplittable(boolean unsplittable);

18

19

// Type information

20

public TypeInformation<E> getProducedType();

21

}

22

```

23

24

### Usage Examples

25

26

**Reading Specific Records:**

27

28

```java

29

import org.apache.flink.formats.avro.AvroInputFormat;

30

import org.apache.flink.core.fs.Path;

31

32

// Create input format for specific record type

33

AvroInputFormat<User> inputFormat = new AvroInputFormat<>(

34

new Path("hdfs://path/to/user/files/*.avro"),

35

User.class

36

);

37

38

// Configure reuse behavior

39

inputFormat.setReuseAvroValue(true); // Default: true for better performance

40

41

// Create dataset

42

DataSet<User> users = env.createInput(inputFormat);

43

```

44

45

**Reading Generic Records:**

46

47

```java

48

import org.apache.avro.generic.GenericRecord;

49

50

// Create input format for generic records

51

AvroInputFormat<GenericRecord> genericInputFormat = new AvroInputFormat<>(

52

new Path("hdfs://path/to/data/*.avro"),

53

GenericRecord.class

54

);

55

56

// Use in batch job

57

DataSet<GenericRecord> records = env.createInput(genericInputFormat);

58

59

// Process generic records

60

DataSet<String> names = records.map(record -> record.get("name").toString());

61

```

62

63

**File Splitting Control:**

64

65

```java

66

// Allow file splitting for parallel processing (default)

67

inputFormat.setUnsplittable(false);

68

69

// Force reading entire files (useful for small files)

70

inputFormat.setUnsplittable(true);

71

72

// Process with parallelism

73

DataSet<User> users = env.createInput(inputFormat)

74

.setParallelism(4);

75

```

76

77

## AvroOutputFormat

78

79

FileOutputFormat implementation for writing Avro files in batch processing jobs.

80

81

```java { .api }

82

public class AvroOutputFormat<E> extends FileOutputFormat<E> implements Serializable {

83

84

// Constructors

85

public AvroOutputFormat(Path filePath, Class<E> type);

86

public AvroOutputFormat(Class<E> type);

87

88

// Configuration methods

89

public void setSchema(Schema schema);

90

public void setCodec(Codec codec);

91

92

// Codec enum

93

public enum Codec {

94

NULL, SNAPPY, BZIP2, DEFLATE, XZ

95

}

96

}

97

```

98

99

### Usage Examples

100

101

**Writing Specific Records:**

102

103

```java

104

import org.apache.flink.formats.avro.AvroOutputFormat;

105

106

// Create output format with path and type

107

AvroOutputFormat<User> outputFormat = new AvroOutputFormat<>(

108

new Path("hdfs://output/path/users.avro"),

109

User.class

110

);

111

112

// Configure compression

113

outputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY);

114

115

// Write dataset

116

DataSet<User> users = ...;

117

users.output(outputFormat);

118

```

119

120

**Writing Generic Records:**

121

122

```java

123

import org.apache.avro.Schema;

124

import org.apache.avro.generic.GenericRecord;

125

126

// Create output format for generic records

127

AvroOutputFormat<GenericRecord> genericOutputFormat = new AvroOutputFormat<>(

128

new Path("hdfs://output/path/records.avro"),

129

GenericRecord.class

130

);

131

132

// Set explicit schema for generic records

133

Schema schema = new Schema.Parser().parse(schemaString);

134

genericOutputFormat.setSchema(schema);

135

136

// Configure compression

137

genericOutputFormat.setCodec(AvroOutputFormat.Codec.DEFLATE);

138

139

// Write generic records

140

DataSet<GenericRecord> records = ...;

141

records.output(genericOutputFormat);

142

```

143

144

**Dynamic Output Paths:**

145

146

```java

147

// Create output format without fixed path

148

AvroOutputFormat<User> dynamicOutputFormat = new AvroOutputFormat<>(User.class);

149

150

// Use with custom output logic

151

users.output(dynamicOutputFormat)

152

.withCustomPartitioning(new UserPartitioner());

153

```

154

155

## Compression Codecs

156

157

Support for various compression algorithms to reduce file size and improve I/O performance.

158

159

```java { .api }

160

public enum Codec {

161

NULL((byte) 0, CodecFactory.nullCodec()), // No compression

162

SNAPPY((byte) 1, CodecFactory.snappyCodec()), // Fast compression

163

BZIP2((byte) 2, CodecFactory.bzip2Codec()), // High compression ratio

164

DEFLATE((byte) 3, CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)), // Standard compression

165

XZ((byte) 4, CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL)); // High compression ratio

166

}

167

```

168

169

### Codec Selection Guidelines

170

171

**SNAPPY (Recommended):**

172

- Fast compression and decompression

173

- Good balance between speed and compression ratio

174

- Default codec for most use cases

175

176

**DEFLATE:**

177

- Standard compression algorithm

178

- Better compression than SNAPPY, slower processing

179

- Good for storage-constrained environments

180

181

**BZIP2:**

182

- High compression ratio

183

- Slower than SNAPPY and DEFLATE

184

- Best for archival storage

185

186

**XZ:**

187

- Highest compression ratio

188

- Slowest processing

189

- Best for long-term storage with infrequent access

190

191

**NULL:**

192

- No compression

193

- Fastest processing

194

- Use when storage space is not a concern

195

196

## Performance Optimization

197

198

### Input Format Optimization

199

200

**File Splitting:**

201

```java

202

// Enable splitting for large files (parallel processing)

203

inputFormat.setUnsplittable(false);

204

205

// Disable splitting for small files (reduce overhead)

206

inputFormat.setUnsplittable(true);

207

```

208

209

**Object Reuse:**

210

```java

211

// Enable object reuse for better memory performance (default)

212

inputFormat.setReuseAvroValue(true);

213

214

// Disable if objects need to be retained across operations

215

inputFormat.setReuseAvroValue(false);

216

```

217

218

### Output Format Optimization

219

220

**Compression Selection:**

221

```java

222

// For high-throughput scenarios

223

outputFormat.setCodec(AvroOutputFormat.Codec.SNAPPY);

224

225

// For storage optimization

226

outputFormat.setCodec(AvroOutputFormat.Codec.BZIP2);

227

228

// For archival

229

outputFormat.setCodec(AvroOutputFormat.Codec.XZ);

230

```

231

232

## Error Handling

233

234

**Input Errors:**

235

- File not found: Throws `FileNotFoundException`

236

- Schema mismatch: Throws `IOException` with detailed error message

237

- Corrupted files: Throws `AvroRuntimeException`

238

239

**Output Errors:**

240

- Path creation failure: Throws `IOException`

241

- Schema validation errors: Throws `IllegalArgumentException`

242

- Disk space issues: Throws `IOException`

243

244

### Error Recovery Patterns

245

246

```java

247

// Robust input processing

248

try {

249

DataSet<User> users = env.createInput(inputFormat);

250

// Process data

251

} catch (Exception e) {

252

logger.error("Failed to read Avro files", e);

253

// Implement fallback or retry logic

254

}

255

256

// Safe output writing

257

try {

258

users.output(outputFormat);

259

env.execute("Write Avro Files");

260

} catch (Exception e) {

261

logger.error("Failed to write Avro files", e);

262

// Clean up partial files or retry

263

}

264

```

265

266

## Integration with Hadoop Ecosystem

267

268

**HDFS Integration:**

269

```java

270

// Read from HDFS

271

AvroInputFormat<User> hdfsInputFormat = new AvroInputFormat<>(

272

new Path("hdfs://namenode:8020/data/users/*.avro"),

273

User.class

274

);

275

276

// Write to HDFS with replication

277

AvroOutputFormat<User> hdfsOutputFormat = new AvroOutputFormat<>(

278

new Path("hdfs://namenode:8020/output/users.avro"),

279

User.class

280

);

281

```

282

283

**S3 Integration:**

284

```java

285

// Read from S3

286

AvroInputFormat<User> s3InputFormat = new AvroInputFormat<>(

287

new Path("s3a://bucket/data/users/*.avro"),

288

User.class

289

);

290

291

// Write to S3

292

AvroOutputFormat<User> s3OutputFormat = new AvroOutputFormat<>(

293

new Path("s3a://bucket/output/users.avro"),

294

User.class

295

);

296

```