or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

code-generation.mdencoding-decoding.mdfile-operations.mdgeneric-data.mdindex.mdmessage-operations.mdreflection-operations.mdschema-evolution.mdschema-system.md

message-operations.mddocs/

0

# Message Operations

1

2

Avro's message operations provide efficient encoding and decoding of individual objects for messaging systems and data exchange. These operations support both header-based schema identification and raw data formats for different messaging scenarios.

3

4

## Capabilities

5

6

### Message Encoder Interface

7

8

Core interface for encoding single objects into byte arrays or streams.

9

10

```java { .api }

11

public interface MessageEncoder<D> {

12

byte[] encode(D datum) throws IOException;

13

void encode(D datum, OutputStream stream) throws IOException;

14

}

15

```

16

17

**Usage Examples:**

18

19

```java

20

// Example implementation usage (see concrete implementations below)

21

MessageEncoder<GenericRecord> encoder = /* create encoder */;

22

23

GenericRecord record = createUserRecord();

24

25

// Encode to byte array

26

byte[] encoded = encoder.encode(record);

27

System.out.println("Encoded size: " + encoded.length + " bytes");

28

29

// Encode to stream

30

ByteArrayOutputStream stream = new ByteArrayOutputStream();

31

encoder.encode(record, stream);

32

byte[] streamEncoded = stream.toByteArray();

33

34

// Send encoded data via messaging system

35

messagingSystem.send("user.topic", encoded);

36

```

37

38

### Message Decoder Interface

39

40

Core interface for decoding single objects from byte arrays or streams.

41

42

```java { .api }

43

public interface MessageDecoder<D> {

44

D decode(InputStream stream) throws IOException;

45

D decode(InputStream stream, D reuse) throws IOException;

46

D decode(byte[] encoded) throws IOException;

47

D decode(byte[] encoded, D reuse) throws IOException;

48

49

// Base decoder class with common functionality

50

public abstract static class BaseDecoder<D> implements MessageDecoder<D> {

51

// Default implementations for byte array methods

52

public D decode(byte[] encoded) throws IOException;

53

public D decode(byte[] encoded, D reuse) throws IOException;

54

}

55

}

56

```

57

58

**Usage Examples:**

59

60

```java

61

// Example implementation usage (see concrete implementations below)

62

MessageDecoder<GenericRecord> decoder = /* create decoder */;

63

64

// Decode from byte array

65

byte[] encodedData = receiveFromMessaging();

66

GenericRecord record = decoder.decode(encodedData);

67

System.out.println("Decoded name: " + record.get("name"));

68

69

// Decode with object reuse for performance

70

GenericRecord reusedRecord = null;

71

for (byte[] message : messages) {

72

reusedRecord = decoder.decode(message, reusedRecord);

73

processRecord(reusedRecord);

74

}

75

76

// Decode from stream

77

InputStream messageStream = new ByteArrayInputStream(encodedData);

78

GenericRecord streamRecord = decoder.decode(messageStream);

79

```

80

81

### Binary Message Encoder

82

83

Encoder for binary single-object messages with schema fingerprint headers.

84

85

```java { .api }

86

public class BinaryMessageEncoder<D> implements MessageEncoder<D> {

87

public static <D> BinaryMessageEncoder<D> of(GenericData model, Schema schema);

88

public static <D> BinaryMessageEncoder<D> of(GenericData model, Schema schema, Map<String, Object> config);

89

90

// MessageEncoder implementation

91

public byte[] encode(D datum) throws IOException;

92

public void encode(D datum, OutputStream stream) throws IOException;

93

}

94

```

95

96

**Usage Examples:**

97

98

```java

99

// Create binary message encoder for generic records

100

Schema userSchema = new Schema.Parser().parse(userSchemaJson);

101

BinaryMessageEncoder<GenericRecord> encoder =

102

BinaryMessageEncoder.of(GenericData.get(), userSchema);

103

104

// Create user record

105

GenericRecord user = new GenericData.Record(userSchema);

106

user.put("name", "Alice");

107

user.put("age", 30);

108

user.put("email", "alice@example.com");

109

110

// Encode to binary message with header

111

byte[] binaryMessage = encoder.encode(user);

112

System.out.println("Binary message size: " + binaryMessage.length);

113

114

// The encoded message includes:

115

// - Magic byte and format version

116

// - Schema fingerprint

117

// - Serialized data

118

119

// Send via message queue

120

messageQueue.publish("users", binaryMessage);

121

122

// Create encoder for specific records

123

BinaryMessageEncoder<User> specificEncoder =

124

BinaryMessageEncoder.of(SpecificData.get(), User.SCHEMA$);

125

126

User specificUser = new User("Bob", 35, "bob@example.com");

127

byte[] specificMessage = specificEncoder.encode(specificUser);

128

```

129

130

### Binary Message Decoder

131

132

Decoder for binary single-object messages that can resolve schemas using fingerprints.

133

134

```java { .api }

135

public class BinaryMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {

136

public static <D> BinaryMessageDecoder<D> of(GenericData model, Schema writerSchema, Schema readerSchema);

137

public static <D> BinaryMessageDecoder<D> of(GenericData model, Schema writerSchema, Schema readerSchema, SchemaStore resolver);

138

139

// MessageDecoder implementation

140

public D decode(InputStream stream) throws IOException;

141

public D decode(InputStream stream, D reuse) throws IOException;

142

}

143

```

144

145

**Usage Examples:**

146

147

```java

148

// Create binary message decoder

149

Schema readerSchema = new Schema.Parser().parse(readerSchemaJson);

150

BinaryMessageDecoder<GenericRecord> decoder =

151

BinaryMessageDecoder.of(GenericData.get(), readerSchema, readerSchema);

152

153

// Decode binary message

154

byte[] binaryMessage = messageQueue.receive("users");

155

GenericRecord decodedUser = decoder.decode(binaryMessage);

156

System.out.println("Decoded user: " + decodedUser.get("name"));

157

158

// Schema evolution with different reader schema

159

Schema evolvedReaderSchema = new Schema.Parser().parse(evolvedSchemaJson);

160

BinaryMessageDecoder<GenericRecord> evolvingDecoder =

161

BinaryMessageDecoder.of(GenericData.get(), writerSchema, evolvedReaderSchema);

162

163

GenericRecord evolvedUser = evolvingDecoder.decode(binaryMessage);

164

165

// Use schema store for automatic schema resolution

166

SchemaStore schemaStore = createSchemaStore();

167

BinaryMessageDecoder<GenericRecord> resolverDecoder =

168

BinaryMessageDecoder.of(GenericData.get(), null, readerSchema, schemaStore);

169

170

// This decoder can handle messages with different writer schemas

171

GenericRecord autoResolvedUser = resolverDecoder.decode(binaryMessage);

172

173

// Decode multiple messages efficiently

174

List<byte[]> messages = messageQueue.receiveBatch("users", 100);

175

GenericRecord reusedRecord = null;

176

for (byte[] message : messages) {

177

reusedRecord = decoder.decode(message, reusedRecord);

178

processUser(reusedRecord);

179

}

180

```

181

182

### Raw Message Encoder

183

184

Encoder for raw messages without headers - just the serialized data.

185

186

```java { .api }

187

public class RawMessageEncoder<D> implements MessageEncoder<D> {

188

public static <D> RawMessageEncoder<D> of(GenericData model, Schema schema);

189

190

// MessageEncoder implementation

191

public byte[] encode(D datum) throws IOException;

192

public void encode(D datum, OutputStream stream) throws IOException;

193

}

194

```

195

196

**Usage Examples:**

197

198

```java

199

// Create raw message encoder (no headers)

200

Schema userSchema = new Schema.Parser().parse(userSchemaJson);

201

RawMessageEncoder<GenericRecord> rawEncoder =

202

RawMessageEncoder.of(GenericData.get(), userSchema);

203

204

GenericRecord user = createUserRecord();

205

206

// Encode without any headers - just the raw data

207

byte[] rawMessage = rawEncoder.encode(user);

208

System.out.println("Raw message size: " + rawMessage.length);

209

210

// Raw messages are smaller but require external schema management

211

// Useful when schema is managed separately (e.g., schema registry)

212

213

// Use in streaming scenarios where every byte counts

214

for (GenericRecord record : largeDataset) {

215

byte[] rawData = rawEncoder.encode(record);

216

highThroughputStream.write(rawData);

217

}

218

```

219

220

### Raw Message Decoder

221

222

Decoder for raw messages that requires external schema knowledge.

223

224

```java { .api }

225

public class RawMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {

226

public static <D> RawMessageDecoder<D> of(GenericData model, Schema writerSchema, Schema readerSchema);

227

228

// MessageDecoder implementation

229

public D decode(InputStream stream) throws IOException;

230

public D decode(InputStream stream, D reuse) throws IOException;

231

}

232

```

233

234

**Usage Examples:**

235

236

```java

237

// Schema must be known ahead of time for raw messages

238

Schema knownSchema = getSchemaFromRegistry("user-v1");

239

RawMessageDecoder<GenericRecord> rawDecoder =

240

RawMessageDecoder.of(GenericData.get(), knownSchema, knownSchema);

241

242

// Decode raw message data

243

byte[] rawMessage = receiveRawMessage();

244

GenericRecord decodedUser = rawDecoder.decode(rawMessage);

245

246

// Efficient decoding of raw message streams

247

InputStream rawStream = getRawMessageStream();

248

List<GenericRecord> users = new ArrayList<>();

249

GenericRecord reusedRecord = null;

250

251

while (rawStream.available() > 0) {

252

reusedRecord = rawDecoder.decode(rawStream, reusedRecord);

253

users.add(new GenericData.Record(reusedRecord, true)); // Deep copy

254

}

255

256

// Schema evolution with raw messages

257

Schema writerSchema = getSchemaFromRegistry("user-v1");

258

Schema readerSchema = getSchemaFromRegistry("user-v2");

259

RawMessageDecoder<GenericRecord> evolvingRawDecoder =

260

RawMessageDecoder.of(GenericData.get(), writerSchema, readerSchema);

261

262

GenericRecord evolvedUser = evolvingRawDecoder.decode(rawMessage);

263

```

264

265

### Schema Store Interface

266

267

Interface for resolving schemas by fingerprint in message decoding scenarios.

268

269

```java { .api }

270

public interface SchemaStore {

271

Schema findByFingerprint(long fingerprint);

272

273

// Optional method for caching

274

default void addSchema(Schema schema) {

275

// Default implementation does nothing

276

}

277

}

278

```

279

280

**Usage Examples:**

281

282

```java

283

// Implement custom schema store

284

import org.apache.avro.SchemaNormalization;

285

286

public class InMemorySchemaStore implements SchemaStore {

287

private final Map<Long, Schema> schemas = new ConcurrentHashMap<>();

288

289

@Override

290

public Schema findByFingerprint(long fingerprint) {

291

return schemas.get(fingerprint);

292

}

293

294

@Override

295

public void addSchema(Schema schema) {

296

long fingerprint = SchemaNormalization.parsingFingerprint64(schema);

297

schemas.put(fingerprint, schema);

298

}

299

300

public void loadSchemasFromRegistry() {

301

// Load schemas from external registry

302

List<Schema> allSchemas = schemaRegistry.getAllSchemas();

303

for (Schema schema : allSchemas) {

304

addSchema(schema);

305

}

306

}

307

}

308

309

// Use schema store with message decoder

310

InMemorySchemaStore schemaStore = new InMemorySchemaStore();

311

schemaStore.loadSchemasFromRegistry();

312

313

BinaryMessageDecoder<GenericRecord> decoder =

314

BinaryMessageDecoder.of(GenericData.get(), null, readerSchema, schemaStore);

315

316

// Decoder will automatically resolve writer schemas using fingerprints

317

byte[] message = receiveMessage();

318

GenericRecord record = decoder.decode(message); // Schema resolved automatically

319

320

// Database-backed schema store with complete fingerprint handling

321

public class DatabaseSchemaStore implements SchemaStore {

322

private final SchemaRepository repository;

323

private final Cache<Long, Schema> cache;

324

325

@Override

326

public Schema findByFingerprint(long fingerprint) {

327

return cache.get(fingerprint, fp -> {

328

String schemaJson = repository.findByFingerprint(fp);

329

return schemaJson != null ? new Schema.Parser().parse(schemaJson) : null;

330

});

331

}

332

333

@Override

334

public void addSchema(Schema schema) {

335

// Calculate 64-bit parsing fingerprint for message headers

336

long fingerprint = SchemaNormalization.parsingFingerprint64(schema);

337

338

// Store in database

339

repository.save(fingerprint, schema.toString());

340

341

// Update cache

342

cache.put(fingerprint, schema);

343

344

System.out.println("Stored schema with fingerprint: " + fingerprint);

345

}

346

347

// Utility method to get fingerprint for any schema

348

public long getFingerprint(Schema schema) {

349

return SchemaNormalization.parsingFingerprint64(schema);

350

}

351

}

352

```

353

354

### Message Exception Handling

355

356

Specific exceptions for message encoding/decoding operations.

357

358

```java { .api }

359

public class MissingSchemaException extends AvroRuntimeException {

360

public MissingSchemaException(String message);

361

public MissingSchemaException(String message, Throwable cause);

362

}

363

364

public class BadHeaderException extends AvroRuntimeException {

365

public BadHeaderException(String message);

366

public BadHeaderException(String message, Throwable cause);

367

}

368

```

369

370

**Usage Examples:**

371

372

```java

373

// Handle message-specific exceptions

374

BinaryMessageDecoder<GenericRecord> decoder = createDecoder();

375

376

try {

377

byte[] suspiciousMessage = receiveMessage();

378

GenericRecord record = decoder.decode(suspiciousMessage);

379

processRecord(record);

380

381

} catch (MissingSchemaException e) {

382

// Schema not found in schema store

383

logger.error("Schema not available for message: " + e.getMessage());

384

// Possibly fetch schema from registry and retry

385

386

} catch (BadHeaderException e) {

387

// Message header is corrupted or invalid

388

logger.error("Invalid message header: " + e.getMessage());

389

// Skip message or send to dead letter queue

390

391

} catch (IOException e) {

392

// General I/O error during decoding

393

logger.error("Failed to decode message: " + e.getMessage());

394

}

395

396

// Robust message processing with error handling

397

public void processMessages(List<byte[]> messages) {

398

int successCount = 0;

399

int errorCount = 0;

400

401

for (byte[] message : messages) {

402

try {

403

GenericRecord record = decoder.decode(message);

404

processRecord(record);

405

successCount++;

406

} catch (MissingSchemaException | BadHeaderException e) {

407

logger.warn("Skipping invalid message: " + e.getMessage());

408

errorCount++;

409

} catch (Exception e) {

410

logger.error("Unexpected error processing message", e);

411

errorCount++;

412

}

413

}

414

415

logger.info("Processed {} messages successfully, {} errors", successCount, errorCount);

416

}

417

```

418

419

## Types

420

421

```java { .api }

422

public interface MessageEncoder<D> {

423

byte[] encode(D datum) throws IOException;

424

void encode(D datum, OutputStream stream) throws IOException;

425

}

426

427

public interface MessageDecoder<D> {

428

D decode(InputStream stream) throws IOException;

429

D decode(InputStream stream, D reuse) throws IOException;

430

D decode(byte[] encoded) throws IOException;

431

D decode(byte[] encoded, D reuse) throws IOException;

432

433

public abstract static class BaseDecoder<D> implements MessageDecoder<D> {

434

// Common implementation for byte array methods

435

}

436

}

437

438

public class BinaryMessageEncoder<D> implements MessageEncoder<D> {

439

// Binary format with schema fingerprint header

440

}

441

442

public class BinaryMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {

443

// Binary format decoder with schema resolution

444

}

445

446

public class RawMessageEncoder<D> implements MessageEncoder<D> {

447

// Raw data encoding without headers

448

}

449

450

public class RawMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {

451

// Raw data decoding with known schema

452

}

453

454

public interface SchemaStore {

455

Schema findByFingerprint(long fingerprint);

456

default void addSchema(Schema schema);

457

}

458

459

public class MissingSchemaException extends AvroRuntimeException {

460

// Schema not found exception

461

}

462

463

public class BadHeaderException extends AvroRuntimeException {

464

// Invalid message header exception

465

}

466

```