0
# Message Operations
1
2
Avro's message operations provide efficient encoding and decoding of individual objects for messaging systems and data exchange. These operations support both header-based schema identification and raw data formats for different messaging scenarios.
3
4
## Capabilities
5
6
### Message Encoder Interface
7
8
Core interface for encoding single objects into byte arrays or streams.
9
10
```java { .api }
11
public interface MessageEncoder<D> {
12
byte[] encode(D datum) throws IOException;
13
void encode(D datum, OutputStream stream) throws IOException;
14
}
15
```
16
17
**Usage Examples:**
18
19
```java
20
// Example implementation usage (see concrete implementations below)
21
MessageEncoder<GenericRecord> encoder = /* create encoder */;
22
23
GenericRecord record = createUserRecord();
24
25
// Encode to byte array
26
byte[] encoded = encoder.encode(record);
27
System.out.println("Encoded size: " + encoded.length + " bytes");
28
29
// Encode to stream
30
ByteArrayOutputStream stream = new ByteArrayOutputStream();
31
encoder.encode(record, stream);
32
byte[] streamEncoded = stream.toByteArray();
33
34
// Send encoded data via messaging system
35
messagingSystem.send("user.topic", encoded);
36
```
37
38
### Message Decoder Interface
39
40
Core interface for decoding single objects from byte arrays or streams.
41
42
```java { .api }
43
public interface MessageDecoder<D> {
44
D decode(InputStream stream) throws IOException;
45
D decode(InputStream stream, D reuse) throws IOException;
46
D decode(byte[] encoded) throws IOException;
47
D decode(byte[] encoded, D reuse) throws IOException;
48
49
// Base decoder class with common functionality
50
public abstract static class BaseDecoder<D> implements MessageDecoder<D> {
51
// Default implementations for byte array methods
52
public D decode(byte[] encoded) throws IOException;
53
public D decode(byte[] encoded, D reuse) throws IOException;
54
}
55
}
56
```
57
58
**Usage Examples:**
59
60
```java
61
// Example implementation usage (see concrete implementations below)
62
MessageDecoder<GenericRecord> decoder = /* create decoder */;
63
64
// Decode from byte array
65
byte[] encodedData = receiveFromMessaging();
66
GenericRecord record = decoder.decode(encodedData);
67
System.out.println("Decoded name: " + record.get("name"));
68
69
// Decode with object reuse for performance
70
GenericRecord reusedRecord = null;
71
for (byte[] message : messages) {
72
reusedRecord = decoder.decode(message, reusedRecord);
73
processRecord(reusedRecord);
74
}
75
76
// Decode from stream
77
InputStream messageStream = new ByteArrayInputStream(encodedData);
78
GenericRecord streamRecord = decoder.decode(messageStream);
79
```
80
81
### Binary Message Encoder
82
83
Encoder for binary single-object messages with schema fingerprint headers.
84
85
```java { .api }
86
public class BinaryMessageEncoder<D> implements MessageEncoder<D> {
87
public static <D> BinaryMessageEncoder<D> of(GenericData model, Schema schema);
88
public static <D> BinaryMessageEncoder<D> of(GenericData model, Schema schema, Map<String, Object> config);
89
90
// MessageEncoder implementation
91
public byte[] encode(D datum) throws IOException;
92
public void encode(D datum, OutputStream stream) throws IOException;
93
}
94
```
95
96
**Usage Examples:**
97
98
```java
99
// Create binary message encoder for generic records
100
Schema userSchema = new Schema.Parser().parse(userSchemaJson);
101
BinaryMessageEncoder<GenericRecord> encoder =
102
BinaryMessageEncoder.of(GenericData.get(), userSchema);
103
104
// Create user record
105
GenericRecord user = new GenericData.Record(userSchema);
106
user.put("name", "Alice");
107
user.put("age", 30);
108
user.put("email", "alice@example.com");
109
110
// Encode to binary message with header
111
byte[] binaryMessage = encoder.encode(user);
112
System.out.println("Binary message size: " + binaryMessage.length);
113
114
// The encoded message includes:
115
// - Magic byte and format version
116
// - Schema fingerprint
117
// - Serialized data
118
119
// Send via message queue
120
messageQueue.publish("users", binaryMessage);
121
122
// Create encoder for specific records
123
BinaryMessageEncoder<User> specificEncoder =
124
BinaryMessageEncoder.of(SpecificData.get(), User.SCHEMA$);
125
126
User specificUser = new User("Bob", 35, "bob@example.com");
127
byte[] specificMessage = specificEncoder.encode(specificUser);
128
```
129
130
### Binary Message Decoder
131
132
Decoder for binary single-object messages that can resolve schemas using fingerprints.
133
134
```java { .api }
135
public class BinaryMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
136
public static <D> BinaryMessageDecoder<D> of(GenericData model, Schema writerSchema, Schema readerSchema);
137
public static <D> BinaryMessageDecoder<D> of(GenericData model, Schema writerSchema, Schema readerSchema, SchemaStore resolver);
138
139
// MessageDecoder implementation
140
public D decode(InputStream stream) throws IOException;
141
public D decode(InputStream stream, D reuse) throws IOException;
142
}
143
```
144
145
**Usage Examples:**
146
147
```java
148
// Create binary message decoder
149
Schema readerSchema = new Schema.Parser().parse(readerSchemaJson);
150
BinaryMessageDecoder<GenericRecord> decoder =
151
BinaryMessageDecoder.of(GenericData.get(), readerSchema, readerSchema);
152
153
// Decode binary message
154
byte[] binaryMessage = messageQueue.receive("users");
155
GenericRecord decodedUser = decoder.decode(binaryMessage);
156
System.out.println("Decoded user: " + decodedUser.get("name"));
157
158
// Schema evolution with different reader schema
159
Schema evolvedReaderSchema = new Schema.Parser().parse(evolvedSchemaJson);
160
BinaryMessageDecoder<GenericRecord> evolvingDecoder =
161
BinaryMessageDecoder.of(GenericData.get(), writerSchema, evolvedReaderSchema);
162
163
GenericRecord evolvedUser = evolvingDecoder.decode(binaryMessage);
164
165
// Use schema store for automatic schema resolution
166
SchemaStore schemaStore = createSchemaStore();
167
BinaryMessageDecoder<GenericRecord> resolverDecoder =
168
BinaryMessageDecoder.of(GenericData.get(), null, readerSchema, schemaStore);
169
170
// This decoder can handle messages with different writer schemas
171
GenericRecord autoResolvedUser = resolverDecoder.decode(binaryMessage);
172
173
// Decode multiple messages efficiently
174
List<byte[]> messages = messageQueue.receiveBatch("users", 100);
175
GenericRecord reusedRecord = null;
176
for (byte[] message : messages) {
177
reusedRecord = decoder.decode(message, reusedRecord);
178
processUser(reusedRecord);
179
}
180
```
181
182
### Raw Message Encoder
183
184
Encoder for raw messages without headers - just the serialized data.
185
186
```java { .api }
187
public class RawMessageEncoder<D> implements MessageEncoder<D> {
188
public static <D> RawMessageEncoder<D> of(GenericData model, Schema schema);
189
190
// MessageEncoder implementation
191
public byte[] encode(D datum) throws IOException;
192
public void encode(D datum, OutputStream stream) throws IOException;
193
}
194
```
195
196
**Usage Examples:**
197
198
```java
199
// Create raw message encoder (no headers)
200
Schema userSchema = new Schema.Parser().parse(userSchemaJson);
201
RawMessageEncoder<GenericRecord> rawEncoder =
202
RawMessageEncoder.of(GenericData.get(), userSchema);
203
204
GenericRecord user = createUserRecord();
205
206
// Encode without any headers - just the raw data
207
byte[] rawMessage = rawEncoder.encode(user);
208
System.out.println("Raw message size: " + rawMessage.length);
209
210
// Raw messages are smaller but require external schema management
211
// Useful when schema is managed separately (e.g., schema registry)
212
213
// Use in streaming scenarios where every byte counts
214
for (GenericRecord record : largeDataset) {
215
byte[] rawData = rawEncoder.encode(record);
216
highThroughputStream.write(rawData);
217
}
218
```
219
220
### Raw Message Decoder
221
222
Decoder for raw messages that requires external schema knowledge.
223
224
```java { .api }
225
public class RawMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
226
public static <D> RawMessageDecoder<D> of(GenericData model, Schema writerSchema, Schema readerSchema);
227
228
// MessageDecoder implementation
229
public D decode(InputStream stream) throws IOException;
230
public D decode(InputStream stream, D reuse) throws IOException;
231
}
232
```
233
234
**Usage Examples:**
235
236
```java
237
// Schema must be known ahead of time for raw messages
238
Schema knownSchema = getSchemaFromRegistry("user-v1");
239
RawMessageDecoder<GenericRecord> rawDecoder =
240
RawMessageDecoder.of(GenericData.get(), knownSchema, knownSchema);
241
242
// Decode raw message data
243
byte[] rawMessage = receiveRawMessage();
244
GenericRecord decodedUser = rawDecoder.decode(rawMessage);
245
246
// Efficient decoding of raw message streams
247
InputStream rawStream = getRawMessageStream();
248
List<GenericRecord> users = new ArrayList<>();
249
GenericRecord reusedRecord = null;
250
251
while (rawStream.available() > 0) {
252
reusedRecord = rawDecoder.decode(rawStream, reusedRecord);
253
users.add(new GenericData.Record(reusedRecord, true)); // Deep copy
254
}
255
256
// Schema evolution with raw messages
257
Schema writerSchema = getSchemaFromRegistry("user-v1");
258
Schema readerSchema = getSchemaFromRegistry("user-v2");
259
RawMessageDecoder<GenericRecord> evolvingRawDecoder =
260
RawMessageDecoder.of(GenericData.get(), writerSchema, readerSchema);
261
262
GenericRecord evolvedUser = evolvingRawDecoder.decode(rawMessage);
263
```
264
265
### Schema Store Interface
266
267
Interface for resolving schemas by fingerprint in message decoding scenarios.
268
269
```java { .api }
270
public interface SchemaStore {
271
Schema findByFingerprint(long fingerprint);
272
273
// Optional method for caching
274
default void addSchema(Schema schema) {
275
// Default implementation does nothing
276
}
277
}
278
```
279
280
**Usage Examples:**
281
282
```java
283
// Implement custom schema store
284
import org.apache.avro.SchemaNormalization;
285
286
public class InMemorySchemaStore implements SchemaStore {
287
private final Map<Long, Schema> schemas = new ConcurrentHashMap<>();
288
289
@Override
290
public Schema findByFingerprint(long fingerprint) {
291
return schemas.get(fingerprint);
292
}
293
294
@Override
295
public void addSchema(Schema schema) {
296
long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
297
schemas.put(fingerprint, schema);
298
}
299
300
public void loadSchemasFromRegistry() {
301
// Load schemas from external registry
302
List<Schema> allSchemas = schemaRegistry.getAllSchemas();
303
for (Schema schema : allSchemas) {
304
addSchema(schema);
305
}
306
}
307
}
308
309
// Use schema store with message decoder
310
InMemorySchemaStore schemaStore = new InMemorySchemaStore();
311
schemaStore.loadSchemasFromRegistry();
312
313
BinaryMessageDecoder<GenericRecord> decoder =
314
BinaryMessageDecoder.of(GenericData.get(), null, readerSchema, schemaStore);
315
316
// Decoder will automatically resolve writer schemas using fingerprints
317
byte[] message = receiveMessage();
318
GenericRecord record = decoder.decode(message); // Schema resolved automatically
319
320
// Database-backed schema store with complete fingerprint handling
321
public class DatabaseSchemaStore implements SchemaStore {
322
private final SchemaRepository repository;
323
private final Cache<Long, Schema> cache;
324
325
@Override
326
public Schema findByFingerprint(long fingerprint) {
327
return cache.get(fingerprint, fp -> {
328
String schemaJson = repository.findByFingerprint(fp);
329
return schemaJson != null ? new Schema.Parser().parse(schemaJson) : null;
330
});
331
}
332
333
@Override
334
public void addSchema(Schema schema) {
335
// Calculate 64-bit parsing fingerprint for message headers
336
long fingerprint = SchemaNormalization.parsingFingerprint64(schema);
337
338
// Store in database
339
repository.save(fingerprint, schema.toString());
340
341
// Update cache
342
cache.put(fingerprint, schema);
343
344
System.out.println("Stored schema with fingerprint: " + fingerprint);
345
}
346
347
// Utility method to get fingerprint for any schema
348
public long getFingerprint(Schema schema) {
349
return SchemaNormalization.parsingFingerprint64(schema);
350
}
351
}
352
```
353
354
### Message Exception Handling
355
356
Specific exceptions for message encoding/decoding operations.
357
358
```java { .api }
359
public class MissingSchemaException extends AvroRuntimeException {
360
public MissingSchemaException(String message);
361
public MissingSchemaException(String message, Throwable cause);
362
}
363
364
public class BadHeaderException extends AvroRuntimeException {
365
public BadHeaderException(String message);
366
public BadHeaderException(String message, Throwable cause);
367
}
368
```
369
370
**Usage Examples:**
371
372
```java
373
// Handle message-specific exceptions
374
BinaryMessageDecoder<GenericRecord> decoder = createDecoder();
375
376
try {
377
byte[] suspiciousMessage = receiveMessage();
378
GenericRecord record = decoder.decode(suspiciousMessage);
379
processRecord(record);
380
381
} catch (MissingSchemaException e) {
382
// Schema not found in schema store
383
logger.error("Schema not available for message: " + e.getMessage());
384
// Possibly fetch schema from registry and retry
385
386
} catch (BadHeaderException e) {
387
// Message header is corrupted or invalid
388
logger.error("Invalid message header: " + e.getMessage());
389
// Skip message or send to dead letter queue
390
391
} catch (IOException e) {
392
// General I/O error during decoding
393
logger.error("Failed to decode message: " + e.getMessage());
394
}
395
396
// Robust message processing with error handling
397
public void processMessages(List<byte[]> messages) {
398
int successCount = 0;
399
int errorCount = 0;
400
401
for (byte[] message : messages) {
402
try {
403
GenericRecord record = decoder.decode(message);
404
processRecord(record);
405
successCount++;
406
} catch (MissingSchemaException | BadHeaderException e) {
407
logger.warn("Skipping invalid message: " + e.getMessage());
408
errorCount++;
409
} catch (Exception e) {
410
logger.error("Unexpected error processing message", e);
411
errorCount++;
412
}
413
}
414
415
logger.info("Processed {} messages successfully, {} errors", successCount, errorCount);
416
}
417
```
418
419
## Types
420
421
```java { .api }
422
public interface MessageEncoder<D> {
423
byte[] encode(D datum) throws IOException;
424
void encode(D datum, OutputStream stream) throws IOException;
425
}
426
427
public interface MessageDecoder<D> {
428
D decode(InputStream stream) throws IOException;
429
D decode(InputStream stream, D reuse) throws IOException;
430
D decode(byte[] encoded) throws IOException;
431
D decode(byte[] encoded, D reuse) throws IOException;
432
433
public abstract static class BaseDecoder<D> implements MessageDecoder<D> {
434
// Common implementation for byte array methods
435
}
436
}
437
438
public class BinaryMessageEncoder<D> implements MessageEncoder<D> {
439
// Binary format with schema fingerprint header
440
}
441
442
public class BinaryMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
443
// Binary format decoder with schema resolution
444
}
445
446
public class RawMessageEncoder<D> implements MessageEncoder<D> {
447
// Raw data encoding without headers
448
}
449
450
public class RawMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> {
451
// Raw data decoding with known schema
452
}
453
454
public interface SchemaStore {
455
Schema findByFingerprint(long fingerprint);
456
default void addSchema(Schema schema);
457
}
458
459
public class MissingSchemaException extends AvroRuntimeException {
460
// Schema not found exception
461
}
462
463
public class BadHeaderException extends AvroRuntimeException {
464
// Invalid message header exception
465
}
466
```