or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

avro-integration.mdformat-factory.mdindex.mdprotobuf-integration.mdrowdata-writers.mdschema-utilities.mdvectorized-input.md

avro-integration.mddocs/

0

# Avro Integration

1

2

Complete Apache Avro integration for Parquet format, supporting specific records, generic records, and reflection-based serialization with full schema compatibility.

3

4

## Capabilities

5

6

### ParquetAvroWriters

7

8

Utility class providing convenience methods for creating Parquet writer factories for various Avro data types.

9

10

```java { .api }

11

/**

12

* Convenience builder for creating ParquetWriterFactory instances for Avro types

13

* Supports specific records, generic records, and reflection-based serialization

14

*/

15

public class ParquetAvroWriters {

16

17

/**

18

* Creates a ParquetWriterFactory for Avro specific record types

19

* Uses the record's built-in schema for Parquet column definition

20

* @param type The specific record class extending SpecificRecordBase

21

* @return ParquetWriterFactory configured for the specific record type

22

*/

23

public static <T extends SpecificRecordBase> ParquetWriterFactory<T> forSpecificRecord(Class<T> type);

24

25

/**

26

* Creates a ParquetWriterFactory for Avro generic records

27

* Uses the provided schema for Parquet column definition and data serialization

28

* @param schema The Avro schema defining the record structure

29

* @return ParquetWriterFactory configured for generic records with the given schema

30

*/

31

public static ParquetWriterFactory<GenericRecord> forGenericRecord(Schema schema);

32

33

/**

34

* Creates a ParquetWriterFactory using reflection to derive schema from POJO

35

* Automatically generates Avro schema from Java class structure

36

* @param type The Java class to use for reflection-based schema generation

37

* @return ParquetWriterFactory configured for the reflected type

38

*/

39

public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type);

40

}

41

```

42

43

## Usage Examples

44

45

### Specific Record Integration

46

47

```java

48

import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;

49

import org.apache.avro.specific.SpecificRecordBase;

50

51

// Define Avro specific record (typically generated from .avsc schema)

52

public class User extends SpecificRecordBase {

53

public String name;

54

public int age;

55

public String email;

56

// ... getters, setters, and Avro-generated methods

57

}

58

59

// Create writer factory for specific record

60

ParquetWriterFactory<User> userWriterFactory =

61

ParquetAvroWriters.forSpecificRecord(User.class);

62

63

// Use with FileSink

64

FileSink<User> userSink = FileSink

65

.forBulkFormat(new Path("/output/users"), userWriterFactory)

66

.build();

67

```

68

69

### Generic Record Usage

70

71

```java

72

import org.apache.avro.Schema;

73

import org.apache.avro.generic.GenericRecord;

74

import org.apache.avro.generic.GenericData;

75

76

// Define schema programmatically

77

String schemaJson = """

78

{

79

"type": "record",

80

"name": "Order",

81

"fields": [

82

{"name": "orderId", "type": "long"},

83

{"name": "customerId", "type": "string"},

84

{"name": "amount", "type": "double"},

85

{"name": "timestamp", "type": "long"}

86

]

87

}

88

""";

89

90

Schema orderSchema = new Schema.Parser().parse(schemaJson);

91

92

// Create writer factory for generic records

93

ParquetWriterFactory<GenericRecord> orderWriterFactory =

94

ParquetAvroWriters.forGenericRecord(orderSchema);

95

96

// Create and populate generic records

97

GenericRecord order = new GenericData.Record(orderSchema);

98

order.put("orderId", 12345L);

99

order.put("customerId", "CUST001");

100

order.put("amount", 99.95);

101

order.put("timestamp", System.currentTimeMillis());

102

```

103

104

### Reflection-Based Usage

105

106

```java

107

// Plain Java class (POJO)

108

public class Product {

109

private String productId;

110

private String name;

111

private double price;

112

private String category;

113

114

// Standard constructors, getters, and setters

115

public Product() {}

116

117

public Product(String productId, String name, double price, String category) {

118

this.productId = productId;

119

this.name = name;

120

this.price = price;

121

this.category = category;

122

}

123

124

// ... getters and setters

125

}

126

127

// Create writer factory using reflection

128

ParquetWriterFactory<Product> productWriterFactory =

129

ParquetAvroWriters.forReflectRecord(Product.class);

130

131

// The schema is automatically derived from the class structure

132

DataStream<Product> productStream = // ... your product data stream

133

productStream.sinkTo(FileSink

134

.forBulkFormat(new Path("/output/products"), productWriterFactory)

135

.build());

136

```

137

138

### Complex Schema Integration

139

140

```java

141

import org.apache.avro.Schema;

142

import org.apache.avro.generic.GenericRecord;

143

144

// Complex nested schema with arrays and nested records

145

String complexSchemaJson = """

146

{

147

"type": "record",

148

"name": "Transaction",

149

"fields": [

150

{"name": "transactionId", "type": "string"},

151

{"name": "timestamp", "type": "long"},

152

{"name": "customer", "type": {

153

"type": "record",

154

"name": "Customer",

155

"fields": [

156

{"name": "customerId", "type": "string"},

157

{"name": "name", "type": "string"},

158

{"name": "tier", "type": {"type": "enum", "name": "Tier", "symbols": ["BRONZE", "SILVER", "GOLD"]}}

159

]

160

}},

161

{"name": "items", "type": {

162

"type": "array",

163

"items": {

164

"type": "record",

165

"name": "Item",

166

"fields": [

167

{"name": "itemId", "type": "string"},

168

{"name": "quantity", "type": "int"},

169

{"name": "unitPrice", "type": "double"}

170

]

171

}

172

}},

173

{"name": "totalAmount", "type": "double"}

174

]

175

}

176

""";

177

178

Schema transactionSchema = new Schema.Parser().parse(complexSchemaJson);

179

ParquetWriterFactory<GenericRecord> complexWriterFactory =

180

ParquetAvroWriters.forGenericRecord(transactionSchema);

181

```

182

183

### Schema Evolution and Compatibility

184

185

```java

186

// Original schema v1

187

String schemaV1 = """

188

{

189

"type": "record",

190

"name": "UserEvent",

191

"fields": [

192

{"name": "userId", "type": "string"},

193

{"name": "eventType", "type": "string"},

194

{"name": "timestamp", "type": "long"}

195

]

196

}

197

""";

198

199

// Evolved schema v2 with optional field

200

String schemaV2 = """

201

{

202

"type": "record",

203

"name": "UserEvent",

204

"fields": [

205

{"name": "userId", "type": "string"},

206

{"name": "eventType", "type": "string"},

207

{"name": "timestamp", "type": "long"},

208

{"name": "sessionId", "type": ["null", "string"], "default": null}

209

]

210

}

211

""";

212

213

// Both schemas can be used for reading/writing with Avro's compatibility rules

214

Schema schemaV1Parsed = new Schema.Parser().parse(schemaV1);

215

Schema schemaV2Parsed = new Schema.Parser().parse(schemaV2);

216

217

ParquetWriterFactory<GenericRecord> writerV2 =

218

ParquetAvroWriters.forGenericRecord(schemaV2Parsed);

219

```

220

221

## Advanced Configuration

222

223

### Custom Avro Data Models

224

225

```java

226

import org.apache.avro.generic.GenericData;

227

import org.apache.avro.specific.SpecificData;

228

import org.apache.avro.reflect.ReflectData;

229

230

// The writers automatically use appropriate data models:

231

// - forSpecificRecord() uses SpecificData.get()

232

// - forGenericRecord() uses GenericData.get()

233

// - forReflectRecord() uses ReflectData.get()

234

235

// For custom data model configurations, you may need to modify

236

// the underlying AvroParquetWriter configuration (advanced usage)

237

```

238

239

### Performance Considerations

240

241

```java

242

// For high-throughput scenarios with specific records

243

ParquetWriterFactory<MySpecificRecord> optimizedFactory =

244

ParquetAvroWriters.forSpecificRecord(MySpecificRecord.class);

245

246

// Configure the underlying Parquet settings through FileSink

247

FileSink<MySpecificRecord> optimizedSink = FileSink

248

.forBulkFormat(outputPath, optimizedFactory)

249

.withRollingPolicy(DefaultRollingPolicy.builder()

250

.withRolloverInterval(Duration.ofMinutes(15))

251

.withInactivityInterval(Duration.ofMinutes(5))

252

.withMaxPartSize(MemorySize.ofMebiBytes(256))

253

.build())

254

.build();

255

```

256

257

## Error Handling

258

259

Common exceptions and troubleshooting:

260

261

```java

262

try {

263

ParquetWriterFactory<GenericRecord> factory =

264

ParquetAvroWriters.forGenericRecord(schema);

265

} catch (Exception e) {

266

// Schema parsing errors, invalid schema format

267

}

268

269

try {

270

ParquetWriterFactory<MyPOJO> factory =

271

ParquetAvroWriters.forReflectRecord(MyPOJO.class);

272

} catch (Exception e) {

273

// Reflection errors, unsupported field types,

274

// missing default constructors

275

}

276

277

// Runtime writing errors

278

try {

279

bulkWriter.addElement(avroRecord);

280

} catch (IOException e) {

281

// File system errors, serialization issues

282

} catch (ClassCastException e) {

283

// Type mismatch between record and expected schema

284

}

285

```

286

287

## Schema Registry Integration

288

289

For production environments using schema registries:

290

291

```java

292

// Conceptual integration with Confluent Schema Registry

293

// (requires additional dependencies and configuration)

294

295

/*

296

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;

297

298

SchemaRegistryClient schemaRegistry = // ... initialize client

299

Schema schema = schemaRegistry.getByID(schemaId);

300

301

ParquetWriterFactory<GenericRecord> registryBasedFactory =

302

ParquetAvroWriters.forGenericRecord(schema);

303

*/

304

```