or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cross-language-processing.mddata-wrappers.mdfile-utilities.mdindex.mdinput-output-formats.mdjob-configuration.mdmapreduce-processing.mdserialization.md

serialization.mddocs/

0

# Serialization and I/O Infrastructure

1

2

Core serialization framework that integrates Avro with Hadoop's serialization system, providing efficient data exchange, schema management, and seamless conversion between Java objects and Avro data formats within MapReduce pipelines.

3

4

## Capabilities

5

6

### Hadoop Serialization Integration

7

8

Main serialization class that registers Avro data types with Hadoop's serialization framework.

9

10

```java { .api }

11

public class AvroSerialization<T> implements Serialization<AvroWrapper<T>> {

12

// Core serialization interface

13

public boolean accept(Class<?> c);

14

public Deserializer<AvroWrapper<T>> getDeserializer(Class<AvroWrapper<T>> c);

15

public Serializer<AvroWrapper<T>> getSerializer(Class<AvroWrapper<T>> c);

16

17

// Configuration management

18

public static void addToConfiguration(Configuration conf);

19

20

// Schema configuration

21

public static void setKeyWriterSchema(Configuration conf, Schema schema);

22

public static void setKeyReaderSchema(Configuration conf, Schema schema);

23

public static void setValueWriterSchema(Configuration conf, Schema schema);

24

public static void setValueReaderSchema(Configuration conf, Schema schema);

25

26

// Schema retrieval

27

public static Schema getKeyWriterSchema(Configuration conf);

28

public static Schema getKeyReaderSchema(Configuration conf);

29

public static Schema getValueWriterSchema(Configuration conf);

30

public static Schema getValueReaderSchema(Configuration conf);

31

32

// Data model support

33

public static GenericData createDataModel(Configuration conf);

34

}

35

```

36

37

#### Usage Example

38

39

```java

40

import org.apache.avro.hadoop.io.AvroSerialization;

41

import org.apache.hadoop.conf.Configuration;

42

43

// Enable Avro serialization

44

Configuration conf = new Configuration();

45

AvroSerialization.addToConfiguration(conf);

46

47

// Configure schemas

48

Schema userSchema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",...}");

49

AvroSerialization.setKeyWriterSchema(conf, userSchema);

50

AvroSerialization.setValueWriterSchema(conf, userSchema);

51

52

// Retrieve configured schemas

53

Schema keySchema = AvroSerialization.getKeyWriterSchema(conf);

54

Schema valueSchema = AvroSerialization.getValueWriterSchema(conf);

55

```

56

57

### Avro Serializer

58

59

Serializer implementation for converting AvroWrapper objects to binary format.

60

61

```java { .api }

62

public class AvroSerializer<T> implements Serializer<AvroWrapper<T>> {

63

// Constructors

64

public AvroSerializer(Schema writerSchema);

65

public AvroSerializer(Schema writerSchema, DatumWriter<T> datumWriter);

66

67

// Configuration

68

public Schema getWriterSchema();

69

70

// Serialization lifecycle

71

public void open(OutputStream outputStream) throws IOException;

72

public void serialize(AvroWrapper<T> avroWrapper) throws IOException;

73

public void close() throws IOException;

74

}

75

```

76

77

#### Usage Example

78

79

```java

80

import org.apache.avro.hadoop.io.AvroSerializer;

81

import org.apache.avro.mapred.AvroWrapper;

82

import java.io.ByteArrayOutputStream;

83

84

// Create serializer

85

Schema schema = Schema.parse("...");

86

AvroSerializer<GenericRecord> serializer = new AvroSerializer<>(schema);

87

88

// Serialize data

89

ByteArrayOutputStream out = new ByteArrayOutputStream();

90

serializer.open(out);

91

92

AvroWrapper<GenericRecord> wrapper = new AvroWrapper<>(record);

93

serializer.serialize(wrapper);

94

serializer.close();

95

96

byte[] serializedData = out.toByteArray();

97

```

98

99

### Avro Deserializer

100

101

Base deserializer class for converting binary data back to AvroWrapper objects.

102

103

```java { .api }

104

public abstract class AvroDeserializer<T extends AvroWrapper<D>,D> implements Deserializer<T> {

105

// Schema access

106

public Schema getWriterSchema();

107

public Schema getReaderSchema();

108

109

// Deserialization lifecycle

110

public void open(InputStream inputStream) throws IOException;

111

public abstract T deserialize(T avroWrapperToReuse) throws IOException;

112

public void close() throws IOException;

113

}

114

115

public class AvroKeyDeserializer<D> extends AvroDeserializer<AvroKey<D>, D> {

116

public AvroKey<D> deserialize(AvroKey<D> avroWrapperToReuse) throws IOException;

117

}

118

119

public class AvroValueDeserializer<D> extends AvroDeserializer<AvroValue<D>, D> {

120

public AvroValue<D> deserialize(AvroValue<D> avroWrapperToReuse) throws IOException;

121

}

122

```

123

124

#### Usage Example

125

126

```java

127

import org.apache.avro.hadoop.io.AvroKeyDeserializer;

128

import org.apache.avro.mapred.AvroKey;

129

import java.io.ByteArrayInputStream;

130

131

// Create deserializer

132

AvroKeyDeserializer<GenericRecord> deserializer = new AvroKeyDeserializer<>();

133

134

// Deserialize data

135

ByteArrayInputStream in = new ByteArrayInputStream(serializedData);

136

deserializer.open(in);

137

138

AvroKey<GenericRecord> key = new AvroKey<>();

139

AvroKey<GenericRecord> result = deserializer.deserialize(key);

140

deserializer.close();

141

142

GenericRecord record = result.datum();

143

```

144

145

### Data Conversion Framework

146

147

Framework for converting between different data formats and Avro.

148

149

```java { .api }

150

public abstract class AvroDatumConverter<INPUT,OUTPUT> {

151

// Core conversion method

152

public abstract OUTPUT convert(INPUT input);

153

154

// Schema information

155

public abstract Schema getWriterSchema();

156

}

157

158

public class AvroDatumConverterFactory {

159

// Constructor

160

public AvroDatumConverterFactory(Configuration conf);

161

162

// Factory method

163

public <IN,OUT> AvroDatumConverter<IN,OUT> create(Class<IN> inputClass);

164

}

165

```

166

167

The factory includes built-in converters for common Hadoop types:

168

169

- `WritableConverter`: Converts Hadoop Writable objects to Avro

170

- `TextConverter`: Converts Text objects to Avro strings

171

- `LongWritableConverter`: Converts LongWritable to Avro long

172

- `IntWritableConverter`: Converts IntWritable to Avro int

173

- `DoubleWritableConverter`: Converts DoubleWritable to Avro double

174

- `FloatWritableConverter`: Converts FloatWritable to Avro float

175

- `BooleanWritableConverter`: Converts BooleanWritable to Avro boolean

176

- `BytesWritableConverter`: Converts BytesWritable to Avro bytes

177

178

#### Usage Example

179

180

```java

181

import org.apache.avro.hadoop.io.AvroDatumConverterFactory;

182

import org.apache.avro.hadoop.io.AvroDatumConverter;

183

import org.apache.hadoop.io.Text;

184

185

// Create converter factory

186

Configuration conf = new Configuration();

187

AvroDatumConverterFactory factory = new AvroDatumConverterFactory(conf);

188

189

// Get converter for Text to Avro string

190

AvroDatumConverter<Text, Utf8> converter = factory.create(Text.class);

191

192

// Convert data

193

Text input = new Text("Hello, World!");

194

Utf8 avroString = converter.convert(input);

195

Schema schema = converter.getWriterSchema();

196

```

197

198

### Key Comparators

199

200

Specialized comparators for Avro data that support both object and raw byte comparison.

201

202

```java { .api }

203

public class AvroKeyComparator<T> implements RawComparator<AvroKey<T>>, Configurable {

204

// Object comparison

205

public int compare(AvroKey<T> x, AvroKey<T> y);

206

207

// Raw byte comparison (for efficiency)

208

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);

209

210

// Configuration

211

public void setConf(Configuration conf);

212

public Configuration getConf();

213

}

214

```

215

216

#### Usage Example

217

218

```java

219

import org.apache.avro.hadoop.io.AvroKeyComparator;

220

import org.apache.avro.mapred.AvroKey;

221

import org.apache.hadoop.conf.Configuration;

222

223

// Create and configure comparator

224

AvroKeyComparator<GenericRecord> comparator = new AvroKeyComparator<>();

225

Configuration conf = new Configuration();

226

AvroSerialization.setKeyWriterSchema(conf, schema);

227

comparator.setConf(conf);

228

229

// Compare objects

230

AvroKey<GenericRecord> key1 = new AvroKey<>(record1);

231

AvroKey<GenericRecord> key2 = new AvroKey<>(record2);

232

int result = comparator.compare(key1, key2);

233

```

234

235

### Legacy API Serialization

236

237

Serialization support specifically for the legacy MapReduce API.

238

239

```java { .api }

240

public class org.apache.avro.mapred.AvroSerialization implements Serialization<AvroWrapper> {

241

// Legacy implementation for org.apache.hadoop.mapred compatibility

242

public boolean accept(Class<?> c);

243

public Deserializer<AvroWrapper> getDeserializer(Class<AvroWrapper> c);

244

public Serializer<AvroWrapper> getSerializer(Class<AvroWrapper> c);

245

}

246

```

247

248

## Schema Evolution Support

249

250

The serialization framework supports Avro's schema evolution capabilities:

251

252

### Writer and Reader Schemas

253

254

```java

255

// Set different schemas for writing and reading

256

AvroSerialization.setKeyWriterSchema(conf, writerSchema);

257

AvroSerialization.setKeyReaderSchema(conf, readerSchema);

258

259

// Data written with writer schema will be automatically converted to reader schema

260

```

261

262

### Evolution Rules

263

264

The serialization framework follows Avro's schema evolution rules:

265

- **Forward Compatibility**: New schema can read data written with old schema

266

- **Backward Compatibility**: Old schema can read data written with new schema

267

- **Full Compatibility**: Both forward and backward compatible

268

269

#### Example

270

271

```java

272

// Original schema (version 1)

273

Schema v1Schema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}");

274

275

// Evolved schema (version 2) with new field

276

Schema v2Schema = Schema.parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":[\"null\",\"string\"],\"default\":null}]}");

277

278

// Configure for evolution

279

AvroSerialization.setKeyWriterSchema(conf, v1Schema); // Data was written with v1

280

AvroSerialization.setKeyReaderSchema(conf, v2Schema); // Read with v2 (email will be null)

281

```

282

283

## Data Model Integration

284

285

Support for different Avro data models:

286

287

### Generic Data Model

288

289

```java

290

import org.apache.avro.generic.GenericData;

291

292

// Use generic data model (default)

293

AvroSerialization.setDataModelClass(conf, GenericData.class);

294

GenericData dataModel = AvroSerialization.createDataModel(conf);

295

```

296

297

### Specific Data Model

298

299

```java

300

import org.apache.avro.specific.SpecificData;

301

302

// Use specific data model for generated classes

303

AvroSerialization.setDataModelClass(conf, SpecificData.class);

304

```

305

306

### Reflect Data Model

307

308

```java

309

import org.apache.avro.reflect.ReflectData;

310

311

// Use reflection data model for POJOs

312

AvroSerialization.setDataModelClass(conf, ReflectData.class);

313

```

314

315

## Performance Optimization

316

317

### Object Reuse

318

319

Serializers and deserializers support object reuse for better performance:

320

321

```java

322

// Reuse wrapper objects

323

AvroKey<GenericRecord> reusableKey = new AvroKey<>();

324

AvroValue<GenericRecord> reusableValue = new AvroValue<>();

325

326

// Deserializers will reuse these objects

327

AvroKey<GenericRecord> result = deserializer.deserialize(reusableKey);

328

```

329

330

### Raw Byte Comparison

331

332

Use raw byte comparison for sorting without deserialization:

333

334

```java

335

// Raw comparator avoids object deserialization for sorting

336

AvroKeyComparator<GenericRecord> comparator = new AvroKeyComparator<>();

337

// Hadoop will use raw byte comparison when possible

338

```

339

340

### Memory Management

341

342

```java

343

// Proper resource management

344

try (AvroSerializer<GenericRecord> serializer = new AvroSerializer<>(schema)) {

345

serializer.open(outputStream);

346

// Use serializer

347

} // Automatically closed

348

```

349

350

## Configuration Best Practices

351

352

### Schema Management

353

354

```java

355

// Store schemas in configuration for access across tasks

356

AvroSerialization.setKeyWriterSchema(conf, keySchema);

357

AvroSerialization.setValueWriterSchema(conf, valueSchema);

358

359

// Retrieve schemas where needed

360

Schema keySchema = AvroSerialization.getKeyWriterSchema(conf);

361

```

362

363

### Serialization Registration

364

365

```java

366

// Always add Avro serialization to configuration

367

AvroSerialization.addToConfiguration(conf);

368

369

// Verify serialization is properly configured

370

String[] serializations = conf.getStrings("io.serializations");

371

// Should include "org.apache.avro.hadoop.io.AvroSerialization"

372

```

373

374

## Error Handling

375

376

Common serialization issues and solutions:

377

378

- **Schema Not Found**: Ensure schemas are configured before creating serializers/deserializers

379

- **Schema Evolution Errors**: Verify schema compatibility using Avro's SchemaCompatibility class

380

- **Memory Issues**: Use object reuse patterns for high-throughput scenarios

381

- **Configuration Errors**: Verify AvroSerialization is added to configuration

382

- **Codec Issues**: Ensure compression codecs are available on all cluster nodes