or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro-integration.mdindex.mdprotobuf-integration.mdrowdata-integration.mdtable-integration.mdutilities.mdvectorized-reading.mdwriting-support.md

avro-integration.mddocs/

0

# Avro Integration

1

2

Specialized readers and writers for Apache Avro records stored in Parquet format, supporting SpecificRecord, GenericRecord, and reflection-based serialization.

3

4

## Capabilities

5

6

### AvroParquetReaders

7

8

Factory methods for creating StreamFormat readers that can read Avro records from Parquet files.

9

10

```java { .api }

11

/**

12

* Convenience builder to create AvroParquetRecordFormat instances for different Avro record types

13

*/

14

@Experimental

15

public class AvroParquetReaders {

16

17

/**

18

* Creates a StreamFormat for reading Avro SpecificRecord types from Parquet files

19

* @param <T> SpecificRecord type

20

* @param typeClass Class of the SpecificRecord to read

21

* @return StreamFormat for reading SpecificRecord instances

22

*/

23

public static <T extends SpecificRecordBase> StreamFormat<T> forSpecificRecord(Class<T> typeClass);

24

25

/**

26

* Creates a StreamFormat for reading Avro GenericRecord types from Parquet files

27

* Requires explicit schema since Flink needs schema information for serialization

28

* @param schema Avro schema for the GenericRecord

29

* @return StreamFormat for reading GenericRecord instances

30

*/

31

public static StreamFormat<GenericRecord> forGenericRecord(Schema schema);

32

33

/**

34

* Creates a StreamFormat for reading POJOs from Parquet files using Avro reflection

35

* @param <T> POJO type (not SpecificRecord or GenericRecord)

36

* @param typeClass Class of the POJO to read via reflection

37

* @return StreamFormat for reading POJO instances

38

* @throws IllegalArgumentException if typeClass is SpecificRecord or GenericRecord

39

*/

40

public static <T> StreamFormat<T> forReflectRecord(Class<T> typeClass);

41

}

42

```

43

44

### AvroParquetWriters

45

46

Factory methods for creating ParquetWriterFactory instances that can write Avro records to Parquet files.

47

48

```java { .api }

49

/**

50

* Convenience builder to create ParquetWriterFactory instances for different Avro types

51

*/

52

@Experimental

53

public class AvroParquetWriters {

54

55

/**

56

* Creates a ParquetWriterFactory for Avro SpecificRecord types

57

* @param <T> SpecificRecord type

58

* @param type Class of the SpecificRecord to write

59

* @return ParquetWriterFactory for writing SpecificRecord instances

60

*/

61

public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(Class<T> type);

62

63

/**

64

* Creates a ParquetWriterFactory for Avro GenericRecord types

65

* @param schema Avro schema for the GenericRecord

66

* @return ParquetWriterFactory for writing GenericRecord instances

67

*/

68

public static ParquetWriterFactory<GenericRecord> forGenericRecord(Schema schema);

69

70

/**

71

* Creates a ParquetWriterFactory for POJOs using Avro reflection

72

* @param <T> POJO type

73

* @param type Class of the POJO to write via reflection

74

* @return ParquetWriterFactory for writing POJO instances

75

*/

76

public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type);

77

}

78

```

79

80

### AvroParquetRecordFormat

81

82

StreamFormat implementation for reading Avro records from Parquet files with proper type information handling.

83

84

```java { .api }

85

/**

86

* StreamFormat implementation for reading Avro records from Parquet files

87

* @param <E> Avro record type

88

*/

89

public class AvroParquetRecordFormat<E> implements StreamFormat<E> {

90

91

/**

92

* Creates a new AvroParquetRecordFormat

93

* @param avroTypeInfo Type information for the Avro records

94

* @param dataModelSupplier Supplier for GenericData instance

95

*/

96

public AvroParquetRecordFormat(

97

TypeInformation<E> avroTypeInfo,

98

SerializableSupplier<GenericData> dataModelSupplier

99

);

100

101

/**

102

* Creates a reader for the given file split

103

* @param config Hadoop configuration

104

* @param split File source split to read

105

* @return Reader iterator for Avro records

106

* @throws IOException if reader creation fails

107

*/

108

public RecordReaderIterator<E> createReader(Configuration config, FileSourceSplit split) throws IOException;

109

110

/**

111

* Restores a reader from checkpoint state

112

* @param config Hadoop configuration

113

* @param split File source split to read

114

* @return Reader iterator for Avro records

115

* @throws IOException if reader restoration fails

116

*/

117

public RecordReaderIterator<E> restoreReader(Configuration config, FileSourceSplit split) throws IOException;

118

119

/**

120

* Indicates whether this format supports splitting (it does not)

121

* @return false - Parquet files with Avro are not splittable

122

*/

123

public boolean isSplittable();

124

125

/**

126

* Returns the produced type information

127

* @return TypeInformation for the Avro record type

128

*/

129

public TypeInformation<E> getProducedType();

130

}

131

```

132

133

## Usage Examples

134

135

### Reading SpecificRecord

136

137

```java

138

import org.apache.flink.formats.parquet.avro.AvroParquetReaders;

139

import org.apache.flink.connector.file.src.FileSource;

140

import org.apache.flink.core.fs.Path;

141

import org.apache.avro.specific.SpecificRecordBase;

142

143

// Define your Avro SpecificRecord class

144

public class User extends SpecificRecordBase {

145

private String name;

146

private int age;

147

private String email;

148

// ... constructors, getters, setters

149

}

150

151

// Create file source for reading User records

152

FileSource<User> source = FileSource

153

.forRecordStreamFormat(

154

AvroParquetReaders.forSpecificRecord(User.class),

155

new Path("hdfs://path/to/user/parquet/files")

156

)

157

.build();

158

159

// Create DataStream

160

DataStream<User> userStream = env.fromSource(

161

source,

162

WatermarkStrategy.noWatermarks(),

163

"user-parquet-source"

164

);

165

```

166

167

### Reading GenericRecord

168

169

```java

170

import org.apache.avro.Schema;

171

import org.apache.avro.generic.GenericRecord;

172

173

// Define Avro schema

174

String schemaString = """

175

{

176

"type": "record",

177

"name": "Product",

178

"fields": [

179

{"name": "id", "type": "long"},

180

{"name": "name", "type": "string"},

181

{"name": "price", "type": "double"},

182

{"name": "category", "type": ["null", "string"], "default": null}

183

]

184

}

185

""";

186

187

Schema schema = new Schema.Parser().parse(schemaString);

188

189

// Create file source for GenericRecord

190

FileSource<GenericRecord> source = FileSource

191

.forRecordStreamFormat(

192

AvroParquetReaders.forGenericRecord(schema),

193

new Path("/data/products")

194

)

195

.build();

196

197

DataStream<GenericRecord> productStream = env.fromSource(

198

source,

199

WatermarkStrategy.noWatermarks(),

200

"product-parquet-source"

201

);

202

203

// Process GenericRecord

204

productStream.map(record -> {

205

Long id = (Long) record.get("id");

206

String name = record.get("name").toString();

207

Double price = (Double) record.get("price");

208

return new ProcessedProduct(id, name, price);

209

});

210

```

211

212

### Reading with Reflection

213

214

```java

215

// POJO class for reflection-based reading

216

public class Event {

217

public long timestamp;

218

public String eventType;

219

public Map<String, Object> properties;

220

221

// Default constructor required

222

public Event() {}

223

224

// Constructor, getters, setters...

225

}

226

227

// Create file source using reflection

228

FileSource<Event> source = FileSource

229

.forRecordStreamFormat(

230

AvroParquetReaders.forReflectRecord(Event.class),

231

new Path("/events/parquet")

232

)

233

.build();

234

```

235

236

### Writing SpecificRecord

237

238

```java

239

import org.apache.flink.formats.parquet.avro.AvroParquetWriters;

240

import org.apache.flink.connector.file.sink.FileSink;

241

242

// Create FileSink for writing User records

243

FileSink<User> sink = FileSink

244

.forBulkFormat(

245

new Path("/output/users"),

246

AvroParquetWriters.forSpecificRecord(User.class)

247

)

248

.withRollingPolicy(

249

DefaultRollingPolicy.builder()

250

.withRolloverInterval(Duration.ofMinutes(10))

251

.withInactivityInterval(Duration.ofMinutes(2))

252

.build()

253

)

254

.build();

255

256

// Write user stream to Parquet

257

userStream.sinkTo(sink);

258

```

259

260

### Writing GenericRecord

261

262

```java

263

// Create GenericRecord instances

264

DataStream<GenericRecord> genericStream = originalStream.map(data -> {

265

GenericRecord record = new GenericData.Record(schema);

266

record.put("id", data.getId());

267

record.put("name", data.getName());

268

record.put("price", data.getPrice());

269

record.put("category", data.getCategory());

270

return record;

271

});

272

273

// Create sink for GenericRecord

274

FileSink<GenericRecord> genericSink = FileSink

275

.forBulkFormat(

276

new Path("/output/products"),

277

AvroParquetWriters.forGenericRecord(schema)

278

)

279

.build();

280

281

genericStream.sinkTo(genericSink);

282

```

283

284

### Schema Evolution Handling

285

286

```java

287

import org.apache.avro.Schema;

288

289

// Handle schema evolution by reading with newer schema

290

Schema oldSchema = getOldSchema();

291

Schema newSchema = getNewSchemaWithDefaults();

292

293

// Reader will handle missing fields using defaults

294

FileSource<GenericRecord> evolutionSource = FileSource

295

.forRecordStreamFormat(

296

AvroParquetReaders.forGenericRecord(newSchema),

297

new Path("/data/evolved-records")

298

)

299

.build();

300

301

DataStream<GenericRecord> evolvedStream = env.fromSource(

302

evolutionSource,

303

WatermarkStrategy.noWatermarks(),

304

"evolved-source"

305

);

306

```

307

308

### Complex Nested Types

309

310

```java

311

// Avro schema with nested types

312

String nestedSchema = """

313

{

314

"type": "record",

315

"name": "Order",

316

"fields": [

317

{"name": "orderId", "type": "string"},

318

{"name": "customer", "type": {

319

"type": "record",

320

"name": "Customer",

321

"fields": [

322

{"name": "id", "type": "long"},

323

{"name": "name", "type": "string"}

324

]

325

}},

326

{"name": "items", "type": {

327

"type": "array",

328

"items": {

329

"type": "record",

330

"name": "OrderItem",

331

"fields": [

332

{"name": "productId", "type": "string"},

333

{"name": "quantity", "type": "int"},

334

{"name": "unitPrice", "type": "double"}

335

]

336

}

337

}}

338

]

339

}

340

""";

341

342

Schema orderSchema = new Schema.Parser().parse(nestedSchema);

343

344

// Read complex nested structures

345

FileSource<GenericRecord> orderSource = FileSource

346

.forRecordStreamFormat(

347

AvroParquetReaders.forGenericRecord(orderSchema),

348

new Path("/orders/nested")

349

)

350

.build();

351

```

352

353

## Performance Considerations

354

355

### Memory Usage

356

357

Avro-Parquet integration requires schema information in memory for serialization/deserialization. For GenericRecord usage, schemas should be reused when possible to minimize memory overhead.

358

359

### Type Safety

360

361

SpecificRecord provides compile-time type safety and better performance due to generated code. GenericRecord offers more flexibility but requires runtime type checking.

362

363

### Schema Registry Integration

364

365

```java

366

// Example with schema registry (conceptual)

367

Schema schema = schemaRegistry.getLatestSchema("user-events");

368

FileSource<GenericRecord> source = FileSource

369

.forRecordStreamFormat(

370

AvroParquetReaders.forGenericRecord(schema),

371

inputPath

372

)

373

.build();

374

```

375

376

The Avro integration provides seamless compatibility with existing Avro-based data pipelines while leveraging Parquet's columnar storage benefits for improved query performance.