or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdfilesystem.mdindex.mdregistry.mdrowdata.mdschemas.mdutilities.md

filesystem.mddocs/

0

# File System Operations

1

2

## Capabilities

3

4

### Avro Input Format

5

6

File input format for reading Avro files in batch processing scenarios.

7

8

```java { .api }

9

/**

10

* Input format for reading Avro files

11

* @param <T> Type of records to read

12

*/

13

public class AvroInputFormat<T> extends FileInputFormat<T> {

14

15

/**

16

* Creates AvroInputFormat for SpecificRecord types

17

* @param filePath Path to Avro file(s)

18

* @param recordClazz SpecificRecord class to deserialize to

19

*/

20

public AvroInputFormat(Path filePath, Class<T> recordClazz);

21

22

/**

23

* Creates AvroInputFormat for GenericRecord types

24

* @param filePath Path to Avro file(s)

25

* @param schema Avro schema for GenericRecord deserialization

26

*/

27

public AvroInputFormat(Path filePath, Schema schema);

28

29

/**

30

* Reads the next record from the input

31

* @param reuse Object to reuse for the next record (may be null)

32

* @return Next record, or null if end of input

33

* @throws IOException If reading fails

34

*/

35

public T nextRecord(T reuse) throws IOException;

36

37

/**

38

* Checks if the input has been exhausted

39

* @return true if no more records available

40

* @throws IOException If check fails

41

*/

42

public boolean reachedEnd() throws IOException;

43

}

44

```

45

46

### Avro Output Format

47

48

File output format for writing Avro files in batch processing scenarios.

49

50

```java { .api }

51

/**

52

* Output format for writing Avro files

53

* @param <T> Type of records to write

54

*/

55

public class AvroOutputFormat<T> extends FileOutputFormat<T> {

56

57

/**

58

* Creates AvroOutputFormat for SpecificRecord types

59

* @param outputFilePath Path where to write Avro file

60

* @param recordClazz SpecificRecord class to serialize from

61

*/

62

public AvroOutputFormat(Path outputFilePath, Class<T> recordClazz);

63

64

/**

65

* Creates AvroOutputFormat for GenericRecord types

66

* @param outputFilePath Path where to write Avro file

67

* @param schema Avro schema for GenericRecord serialization

68

*/

69

public AvroOutputFormat(Path outputFilePath, Schema schema);

70

71

/**

72

* Writes a record to the output

73

* @param record Record to write

74

* @throws IOException If writing fails

75

*/

76

public void writeRecord(T record) throws IOException;

77

78

/**

79

* Sets the compression codec for the output file

80

* @param codecName Name of compression codec (snappy, gzip, etc.)

81

*/

82

public void setCodec(String codecName);

83

}

84

```

85

86

### Bulk Writer Support

87

88

High-performance bulk writing for streaming scenarios.

89

90

```java { .api }

91

/**

92

* Bulk writer for Avro files that wraps an Avro DataFileWriter

93

* @param <T> Type of records to write

94

*/

95

public class AvroBulkWriter<T> implements BulkWriter<T> {

96

97

/**

98

* Creates a new AvroBulkWriter wrapping the given Avro DataFileWriter

99

* @param dataFileWriter The underlying Avro DataFileWriter

100

*/

101

public AvroBulkWriter(DataFileWriter<T> dataFileWriter);

102

103

/**

104

* Adds an element to the writer

105

* @param element Element to write

106

* @throws IOException If writing fails

107

*/

108

public void addElement(T element) throws IOException;

109

110

/**

111

* Flushes pending writes

112

* @throws IOException If flush fails

113

*/

114

public void flush() throws IOException;

115

116

/**

117

* Finishes writing and closes the writer

118

* @throws IOException If finish fails

119

*/

120

public void finish() throws IOException;

121

}

122

123

/**

124

* Factory for creating Avro bulk writers

125

* @param <T> Type of records to write

126

*/

127

public class AvroWriterFactory<T> implements BulkWriter.Factory<T> {

128

129

/**

130

* Creates AvroWriterFactory with AvroBuilder

131

* @param avroBuilder Builder for creating DataFileWriter instances

132

*/

133

public AvroWriterFactory(AvroBuilder<T> avroBuilder);

134

135

/**

136

* Creates a bulk writer for the given output stream

137

* @param out Output stream to write to

138

* @return Bulk writer instance

139

* @throws IOException If creation fails

140

*/

141

public BulkWriter<T> create(FSDataOutputStream out) throws IOException;

142

}

143

144

/**

145

* Builder interface for creating Avro DataFileWriter instances

146

* This is a functional interface that extends Serializable

147

* @param <T> Type of records the writer will handle

148

*/

149

@FunctionalInterface

150

public interface AvroBuilder<T> extends Serializable {

151

152

/**

153

* Creates and configures an Avro writer to the given output stream

154

* @param outputStream Output stream to write to

155

* @return Configured DataFileWriter

156

* @throws IOException If creation fails

157

*/

158

DataFileWriter<T> createWriter(OutputStream outputStream) throws IOException;

159

}

160

```

161

162

## Usage Examples

163

164

### Reading Avro Files - Batch Processing

165

166

```java

167

// Reading SpecificRecord files

168

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

169

170

// For generated SpecificRecord classes

171

AvroInputFormat<User> inputFormat =

172

new AvroInputFormat<>(new Path("/path/to/user/files/*.avro"), User.class);

173

174

DataSet<User> users = env.createInput(inputFormat);

175

176

// Process the data

177

DataSet<String> usernames = users

178

.filter(user -> user.getAge() > 18)

179

.map(user -> user.getUsername().toString());

180

181

// Reading GenericRecord files

182

Schema schema = new Schema.Parser().parse(new File("user-schema.avsc"));

183

AvroInputFormat<GenericRecord> genericInputFormat =

184

new AvroInputFormat<>(new Path("/path/to/generic/files/*.avro"), schema);

185

186

DataSet<GenericRecord> records = env.createInput(genericInputFormat);

187

188

// Process generic records

189

DataSet<Long> userIds = records

190

.map(record -> (Long) record.get("user_id"));

191

```

192

193

### Writing Avro Files - Batch Processing

194

195

```java

196

// Writing SpecificRecord files

197

DataSet<User> users = // ... your user dataset

198

199

AvroOutputFormat<User> outputFormat =

200

new AvroOutputFormat<>(new Path("/output/path/users.avro"), User.class);

201

202

// Set compression codec

203

outputFormat.setCodec("snappy");

204

205

users.output(outputFormat);

206

207

// Writing GenericRecord files with custom schema

208

Schema schema = SchemaBuilder.record("ProcessedUser")

209

.fields()

210

.name("id").type().longType().noDefault()

211

.name("processed_name").type().stringType().noDefault()

212

.name("score").type().doubleType().noDefault()

213

.endRecord();

214

215

DataSet<GenericRecord> processedUsers = users

216

.map(new MapFunction<User, GenericRecord>() {

217

@Override

218

public GenericRecord map(User user) throws Exception {

219

GenericRecord record = new GenericData.Record(schema);

220

record.put("id", user.getId());

221

record.put("processed_name", user.getUsername().toString().toUpperCase());

222

record.put("score", calculateScore(user));

223

return record;

224

}

225

});

226

227

AvroOutputFormat<GenericRecord> genericOutputFormat =

228

new AvroOutputFormat<>(new Path("/output/path/processed.avro"), schema);

229

230

processedUsers.output(genericOutputFormat);

231

232

env.execute("Process Avro Files");

233

```

234

235

### Bulk Writing - Streaming to Files

236

237

```java

238

// Setup for streaming to Avro files

239

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

240

241

// Create Avro builder for SpecificRecord

242

AvroBuilder<User> avroBuilder = new AvroBuilder<User>() {

243

@Override

244

public DataFileWriter<User> createWriter(OutputStream out) throws IOException {

245

DatumWriter<User> datumWriter = new SpecificDatumWriter<>(User.class);

246

DataFileWriter<User> dataFileWriter = new DataFileWriter<>(datumWriter);

247

248

// Set codec

249

dataFileWriter.setCodec(CodecFactory.snappyCodec());

250

251

// Create with schema from SpecificRecord

252

dataFileWriter.create(User.getClassSchema(), out);

253

return dataFileWriter;

254

}

255

};

256

257

// Create writer factory

258

AvroWriterFactory<User> writerFactory = new AvroWriterFactory<>(avroBuilder);

259

260

// Stream to files

261

DataStream<User> userStream = // ... your user stream

262

263

userStream

264

.addSink(StreamingFileSink

265

.forBulkFormat(new Path("/streaming/output/path"), writerFactory)

266

.withRollingPolicy(OnCheckpointRollingPolicy.build())

267

.build());

268

269

env.execute("Stream to Avro Files");

270

```

271

272

### Custom Avro Builder with Compression

273

274

```java

275

// Custom builder for GenericRecord with custom configuration

276

AvroBuilder<GenericRecord> customBuilder = new AvroBuilder<GenericRecord>() {

277

private final Schema schema;

278

279

public CustomAvroBuilder(Schema schema) {

280

this.schema = schema;

281

}

282

283

@Override

284

public DataFileWriter<GenericRecord> createWriter(OutputStream out) throws IOException {

285

DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);

286

DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);

287

288

// Configure compression

289

dataFileWriter.setCodec(CodecFactory.deflateCodec(6)); // Compression level 6

290

291

// Set custom metadata

292

dataFileWriter.setMeta("created_by", "flink-sql-avro");

293

dataFileWriter.setMeta("created_at", System.currentTimeMillis());

294

295

dataFileWriter.create(schema, out);

296

return dataFileWriter;

297

}

298

};

299

300

// Use custom builder

301

AvroWriterFactory<GenericRecord> customWriterFactory =

302

new AvroWriterFactory<>(customBuilder);

303

```

304

305

### File System Table Integration

306

307

```java

308

// Create file system table with Avro format

309

String createTableSQL = """

310

CREATE TABLE user_files (

311

user_id BIGINT,

312

username STRING,

313

email STRING,

314

registration_date DATE,

315

last_login TIMESTAMP(3)

316

) PARTITIONED BY (registration_date)

317

WITH (

318

'connector' = 'filesystem',

319

'path' = '/data/warehouse/users',

320

'format' = 'avro',

321

'avro.codec' = 'snappy',

322

'sink.partition-commit.policy.kind' = 'success-file'

323

)

324

""";

325

326

tableEnv.executeSql(createTableSQL);

327

328

// Insert data into partitioned Avro files

329

String insertSQL = """

330

INSERT INTO user_files

331

SELECT

332

user_id,

333

username,

334

email,

335

CAST(created_at AS DATE) as registration_date,

336

last_login

337

FROM user_stream

338

""";

339

340

tableEnv.executeSql(insertSQL);

341

342

// Read from partitioned Avro files

343

Table result = tableEnv.sqlQuery("""

344

SELECT

345

COUNT(*) as daily_registrations,

346

registration_date

347

FROM user_files

348

WHERE registration_date >= CURRENT_DATE - INTERVAL '30' DAY

349

GROUP BY registration_date

350

ORDER BY registration_date

351

""");

352

```

353

354

### Performance Optimization

355

356

```java

357

// Optimized settings for large file processing

358

AvroInputFormat<GenericRecord> optimizedInput =

359

new AvroInputFormat<>(new Path("/large/files/*.avro"), schema);

360

361

// Configure parallel reading

362

env.getConfig().setParallelism(16);

363

364

// For output, use appropriate compression

365

AvroOutputFormat<GenericRecord> optimizedOutput =

366

new AvroOutputFormat<>(new Path("/output/compressed.avro"), schema);

367

368

// Balance between compression ratio and speed

369

optimizedOutput.setCodec("snappy"); // Fast compression

370

// optimizedOutput.setCodec("gzip"); // Better compression ratio

371

// optimizedOutput.setCodec("zstd"); // Good balance

372

373

// For streaming, configure checkpointing for fault tolerance

374

env.enableCheckpointing(60000); // Checkpoint every minute

375

376

// Bulk writing with appropriate rolling policy

377

StreamingFileSink<GenericRecord> sink = StreamingFileSink

378

.forBulkFormat(new Path("/streaming/output"), writerFactory)

379

.withRollingPolicy(

380

DefaultRollingPolicy.builder()

381

.withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) // Roll every 15 minutes

382

.withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) // Roll after 5 minutes of inactivity

383

.withMaxPartSize(1024 * 1024 * 128) // Roll at 128MB

384

.build())

385

.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd/HH"))

386

.build();

387

```