0
# Stream Processing
1
2
Event-driven data processing capabilities for handling streaming data with headers, body content, and timestamps. The stream processing system supports real-time data ingestion, custom event decoders, and structured event handling for building scalable data processing pipelines.
3
4
## Capabilities
5
6
### Stream Event Data
7
8
Base classes for representing stream event data with headers and typed body content.
9
10
```java { .api }
11
/**
12
* Generic stream event data with typed body
13
* @param <T> Type of event body
14
*/
15
public class GenericStreamEventData<T> {
16
/**
17
* Create generic stream event data
18
* @param headers Immutable map of event headers
19
* @param body Typed event body
20
*/
21
public GenericStreamEventData(Map<String, String> headers, T body);
22
23
/**
24
* Get immutable map of event headers
25
* @return Map of header key-value pairs
26
*/
27
public Map<String, String> getHeaders();
28
29
/**
30
* Get typed event body
31
* @return Event body of type T
32
*/
33
public T getBody();
34
}
35
36
/**
37
* Stream event data with ByteBuffer body
38
*/
39
public class StreamEventData extends GenericStreamEventData<ByteBuffer> {
40
/**
41
* Create stream event data with ByteBuffer body
42
* @param headers Map of event headers
43
* @param body ByteBuffer containing event data
44
*/
45
public StreamEventData(Map<String, String> headers, ByteBuffer body);
46
}
47
```
48
49
**Usage Examples:**
50
51
```java
52
import java.nio.ByteBuffer;
53
import java.util.HashMap;
54
import java.util.Map;
55
56
// Create event headers
57
Map<String, String> headers = new HashMap<>();
58
headers.put("source", "sensor-01");
59
headers.put("type", "temperature");
60
headers.put("format", "json");
61
62
// Create event body
63
String jsonData = "{\"temperature\": 23.5, \"unit\": \"celsius\"}";
64
ByteBuffer body = ByteBuffer.wrap(jsonData.getBytes());
65
66
// Create stream event data
67
StreamEventData eventData = new StreamEventData(headers, body);
68
69
// Access event information
70
Map<String, String> eventHeaders = eventData.getHeaders();
71
ByteBuffer eventBody = eventData.getBody();
72
String sourceId = eventHeaders.get("source"); // "sensor-01"
73
```
74
75
### Stream Events with Timestamps
76
77
Stream events that extend basic event data with timestamp information for temporal processing.
78
79
```java { .api }
80
/**
81
* Stream event with timestamp information
82
*/
83
public class StreamEvent extends StreamEventData {
84
/**
85
* Create empty stream event
86
*/
87
public StreamEvent();
88
89
/**
90
* Create stream event with body only
91
* @param body Event body data
92
*/
93
public StreamEvent(ByteBuffer body);
94
95
/**
96
* Create stream event with headers and body (current time as timestamp)
97
* @param headers Event headers
98
* @param body Event body data
99
*/
100
public StreamEvent(Map<String, String> headers, ByteBuffer body);
101
102
/**
103
* Create stream event from existing event data with timestamp
104
* @param data Existing stream event data
105
* @param timestamp Event timestamp in milliseconds
106
*/
107
public StreamEvent(StreamEventData data, long timestamp);
108
109
/**
110
* Copy constructor
111
* @param event Stream event to copy
112
*/
113
public StreamEvent(StreamEvent event);
114
115
/**
116
* Create stream event with all parameters
117
* @param headers Event headers
118
* @param body Event body data
119
* @param timestamp Event timestamp in milliseconds since epoch
120
*/
121
public StreamEvent(Map<String, String> headers, ByteBuffer body, long timestamp);
122
123
/**
124
* Get event timestamp
125
* @return Timestamp in milliseconds since epoch
126
*/
127
public long getTimestamp();
128
}
129
```
130
131
**Usage Examples:**
132
133
```java
134
import java.nio.ByteBuffer;
135
import java.util.Collections;
136
import java.util.HashMap;
137
import java.util.Map;
138
139
// Create event with current timestamp
140
Map<String, String> headers = new HashMap<>();
141
headers.put("deviceId", "device-123");
142
headers.put("location", "warehouse-A");
143
144
ByteBuffer data = ByteBuffer.wrap("sensor reading: 42.3".getBytes());
145
StreamEvent event = new StreamEvent(headers, data);
146
147
long timestamp = event.getTimestamp(); // Current time in milliseconds
148
149
// Create event with specific timestamp
150
long specificTime = System.currentTimeMillis() - 3600000; // 1 hour ago
151
StreamEvent historicalEvent = new StreamEvent(headers, data, specificTime);
152
153
// Copy existing event
154
StreamEvent copiedEvent = new StreamEvent(event);
155
156
// Create from existing StreamEventData
157
StreamEventData baseData = new StreamEventData(headers, data);
158
long eventTime = System.currentTimeMillis();
159
StreamEvent fromData = new StreamEvent(baseData, eventTime);
160
```
161
162
### Stream Event Decoders
163
164
Interface for converting stream events into structured key-value pairs with custom processing logic.
165
166
```java { .api }
167
/**
168
* Interface for decoding stream events into key-value pairs
169
* @param <K> Type of decoded key
170
* @param <V> Type of decoded value
171
*/
172
public interface StreamEventDecoder<K, V> {
173
/**
174
* Decode stream event into key-value pair
175
* @param event Stream event to decode
176
* @param result Reusable result object for decoded output
177
* @return Decode result (may be same instance as result parameter)
178
*/
179
DecodeResult<K, V> decode(StreamEvent event, DecodeResult<K, V> result);
180
181
}
182
183
/**
184
* Container for decoded key-value pair result
185
* Note: Not thread-safe, reuse for performance
186
* @param <K> Key type
187
* @param <V> Value type
188
*/
189
public final class DecodeResult<K, V> {
190
/**
191
* Get decoded key
192
* @return Key value
193
*/
194
public K getKey();
195
196
/**
197
* Set decoded key
198
* @param key Key value
199
* @return This result for chaining
200
*/
201
public DecodeResult<K, V> setKey(K key);
202
203
/**
204
* Get decoded value
205
* @return Value
206
*/
207
public V getValue();
208
209
/**
210
* Set decoded value
211
* @param value Value
212
* @return This result for chaining
213
*/
214
public DecodeResult<K, V> setValue(V value);
215
}
216
```
217
218
**Usage Examples:**
219
220
```java
221
import com.google.gson.Gson;
222
import com.google.gson.JsonObject;
223
224
// Example: JSON event decoder that extracts user ID as key and full data as value
225
public class JsonEventDecoder implements StreamEventDecoder<String, JsonObject> {
226
private final Gson gson = new Gson();
227
228
@Override
229
public DecodeResult<String, JsonObject> decode(StreamEvent event, DecodeResult<String, JsonObject> result) {
230
// Extract body as JSON
231
ByteBuffer body = event.getBody();
232
String jsonString = new String(body.array());
233
JsonObject jsonData = gson.fromJson(jsonString, JsonObject.class);
234
235
// Extract user ID from JSON as key
236
String userId = jsonData.get("userId").getAsString();
237
238
// Set result
239
return result.setKey(userId).setValue(jsonData);
240
}
241
}
242
243
// Usage of decoder
244
JsonEventDecoder decoder = new JsonEventDecoder();
245
StreamEventDecoder.DecodeResult<String, JsonObject> result = new StreamEventDecoder.DecodeResult<>();
246
247
// Create sample event
248
Map<String, String> headers = Collections.singletonMap("type", "user_action");
249
String jsonBody = "{\"userId\": \"user123\", \"action\": \"login\", \"timestamp\": 1623456789}";
250
ByteBuffer body = ByteBuffer.wrap(jsonBody.getBytes());
251
StreamEvent event = new StreamEvent(headers, body);
252
253
// Decode event
254
DecodeResult<String, JsonObject> decodedResult = decoder.decode(event, result);
255
String key = decodedResult.getKey(); // "user123"
256
JsonObject value = decodedResult.getValue(); // Full JSON object
257
258
// Reuse result object for performance
259
StreamEvent nextEvent = new StreamEvent(headers,
260
ByteBuffer.wrap("{\"userId\": \"user456\", \"action\": \"logout\"}".getBytes()));
261
decoder.decode(nextEvent, result); // Reuses same result object
262
```
263
264
### Custom Event Processing Patterns
265
266
Common patterns for processing stream events in data pipelines.
267
268
**Event Filtering:**
269
270
```java
271
import java.util.function.Predicate;
272
273
public class EventFilter {
274
public static Predicate<StreamEvent> byHeader(String headerKey, String expectedValue) {
275
return event -> expectedValue.equals(event.getHeaders().get(headerKey));
276
}
277
278
public static Predicate<StreamEvent> byTimestamp(long minTimestamp, long maxTimestamp) {
279
return event -> {
280
long timestamp = event.getTimestamp();
281
return timestamp >= minTimestamp && timestamp <= maxTimestamp;
282
};
283
}
284
}
285
286
// Usage
287
Predicate<StreamEvent> deviceFilter = EventFilter.byHeader("deviceType", "sensor");
288
Predicate<StreamEvent> timeFilter = EventFilter.byTimestamp(
289
System.currentTimeMillis() - 3600000, // 1 hour ago
290
System.currentTimeMillis()
291
);
292
293
// Filter events
294
List<StreamEvent> events = getStreamEvents();
295
List<StreamEvent> filteredEvents = events.stream()
296
.filter(deviceFilter.and(timeFilter))
297
.collect(Collectors.toList());
298
```
299
300
**Event Transformation:**
301
302
```java
303
import java.util.function.Function;
304
305
public class EventTransformer {
306
public static Function<StreamEvent, StreamEvent> addHeader(String key, String value) {
307
return event -> {
308
Map<String, String> newHeaders = new HashMap<>(event.getHeaders());
309
newHeaders.put(key, value);
310
return new StreamEvent(newHeaders, event.getBody(), event.getTimestamp());
311
};
312
}
313
314
public static Function<StreamEvent, StreamEvent> updateTimestamp(long newTimestamp) {
315
return event -> new StreamEvent(event.getHeaders(), event.getBody(), newTimestamp);
316
}
317
}
318
319
// Usage
320
Function<StreamEvent, StreamEvent> addProcessingTime =
321
EventTransformer.addHeader("processedAt", String.valueOf(System.currentTimeMillis()));
322
323
StreamEvent originalEvent = new StreamEvent(headers, body);
324
StreamEvent enrichedEvent = addProcessingTime.apply(originalEvent);
325
```
326
327
**Batch Event Processing:**
328
329
```java
330
import java.util.List;
331
import java.util.concurrent.CompletableFuture;
332
333
public class BatchEventProcessor {
334
private final StreamEventDecoder<String, Object> decoder;
335
private final int batchSize;
336
337
public BatchEventProcessor(StreamEventDecoder<String, Object> decoder, int batchSize) {
338
this.decoder = decoder;
339
this.batchSize = batchSize;
340
}
341
342
public CompletableFuture<Void> processBatch(List<StreamEvent> events) {
343
return CompletableFuture.runAsync(() -> {
344
StreamEventDecoder.DecodeResult<String, Object> result =
345
new StreamEventDecoder.DecodeResult<>();
346
347
for (StreamEvent event : events) {
348
decoder.decode(event, result);
349
// Process decoded key-value pair
350
processKeyValue(result.getKey(), result.getValue());
351
}
352
});
353
}
354
355
private void processKeyValue(String key, Object value) {
356
// Custom processing logic
357
System.out.println("Processing: " + key + " -> " + value);
358
}
359
}
360
```
361
362
## Event Data Access Patterns
363
364
### Header-Based Routing
365
366
```java
367
public class EventRouter {
368
public String determineRoute(StreamEvent event) {
369
Map<String, String> headers = event.getHeaders();
370
371
String eventType = headers.get("type");
372
String priority = headers.get("priority");
373
374
if ("error".equals(eventType)) {
375
return "error-processing-queue";
376
} else if ("high".equals(priority)) {
377
return "priority-queue";
378
} else {
379
return "standard-queue";
380
}
381
}
382
}
383
```
384
385
### Body Content Inspection
386
387
```java
388
public class ContentAnalyzer {
389
public boolean containsKeyword(StreamEvent event, String keyword) {
390
ByteBuffer body = event.getBody();
391
String content = new String(body.array());
392
return content.toLowerCase().contains(keyword.toLowerCase());
393
}
394
395
public int getContentLength(StreamEvent event) {
396
return event.getBody().remaining();
397
}
398
}
399
```
400
401
### Temporal Event Processing
402
403
```java
404
public class TemporalProcessor {
405
private static final long FIVE_MINUTES = 5 * 60 * 1000; // 5 minutes in milliseconds
406
407
public boolean isRecentEvent(StreamEvent event) {
408
long eventTime = event.getTimestamp();
409
long currentTime = System.currentTimeMillis();
410
return (currentTime - eventTime) <= FIVE_MINUTES;
411
}
412
413
public List<StreamEvent> groupByTimeWindow(List<StreamEvent> events, long windowSizeMs) {
414
return events.stream()
415
.sorted((e1, e2) -> Long.compare(e1.getTimestamp(), e2.getTimestamp()))
416
.collect(Collectors.toList());
417
}
418
}
419
```
420
421
## Performance Considerations
422
423
### Memory Management
424
425
- StreamEvent objects are lightweight but contain references to header maps and ByteBuffers
426
- Reuse DecodeResult objects in tight processing loops to reduce garbage collection
427
- ByteBuffer body data shares underlying byte arrays, so modifications affect all references
428
429
### Threading Considerations
430
431
- StreamEvent and StreamEventData are immutable after construction (thread-safe for reading)
432
- DecodeResult is explicitly marked as not thread-safe - use separate instances per thread
433
- Header maps returned by getHeaders() are immutable collections
434
435
### Processing Efficiency
436
437
- Access headers by key is O(1) using hash-based lookup
438
- ByteBuffer body access is direct memory access (very fast)
439
- Timestamp access is a simple field read (no computation)
440
- Decoder interface allows for efficient stream processing without object allocation per event