or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

byte-utilities.mddata-format-system.mdindex.mdschema-system.mdstream-processing.mdstructured-records.md

stream-processing.mddocs/

0

# Stream Processing

1

2

Event-driven data processing capabilities for handling streaming data with headers, body content, and timestamps. The stream processing system supports real-time data ingestion, custom event decoders, and structured event handling for building scalable data processing pipelines.

3

4

## Capabilities

5

6

### Stream Event Data

7

8

Base classes for representing stream event data with headers and typed body content.

9

10

```java { .api }

11

/**

12

* Generic stream event data with typed body

13

* @param <T> Type of event body

14

*/

15

public class GenericStreamEventData<T> {

16

/**

17

* Create generic stream event data

18

* @param headers Immutable map of event headers

19

* @param body Typed event body

20

*/

21

public GenericStreamEventData(Map<String, String> headers, T body);

22

23

/**

24

* Get immutable map of event headers

25

* @return Map of header key-value pairs

26

*/

27

public Map<String, String> getHeaders();

28

29

/**

30

* Get typed event body

31

* @return Event body of type T

32

*/

33

public T getBody();

34

}

35

36

/**

37

* Stream event data with ByteBuffer body

38

*/

39

public class StreamEventData extends GenericStreamEventData<ByteBuffer> {

40

/**

41

* Create stream event data with ByteBuffer body

42

* @param headers Map of event headers

43

* @param body ByteBuffer containing event data

44

*/

45

public StreamEventData(Map<String, String> headers, ByteBuffer body);

46

}

47

```

48

49

**Usage Examples:**

50

51

```java

52

import java.nio.ByteBuffer;

53

import java.util.HashMap;

54

import java.util.Map;

55

56

// Create event headers

57

Map<String, String> headers = new HashMap<>();

58

headers.put("source", "sensor-01");

59

headers.put("type", "temperature");

60

headers.put("format", "json");

61

62

// Create event body

63

String jsonData = "{\"temperature\": 23.5, \"unit\": \"celsius\"}";

64

ByteBuffer body = ByteBuffer.wrap(jsonData.getBytes());

65

66

// Create stream event data

67

StreamEventData eventData = new StreamEventData(headers, body);

68

69

// Access event information

70

Map<String, String> eventHeaders = eventData.getHeaders();

71

ByteBuffer eventBody = eventData.getBody();

72

String sourceId = eventHeaders.get("source"); // "sensor-01"

73

```

74

75

### Stream Events with Timestamps

76

77

Stream events that extend basic event data with timestamp information for temporal processing.

78

79

```java { .api }

80

/**

81

* Stream event with timestamp information

82

*/

83

public class StreamEvent extends StreamEventData {

84

/**

85

* Create empty stream event

86

*/

87

public StreamEvent();

88

89

/**

90

* Create stream event with body only

91

* @param body Event body data

92

*/

93

public StreamEvent(ByteBuffer body);

94

95

/**

96

* Create stream event with headers and body (current time as timestamp)

97

* @param headers Event headers

98

* @param body Event body data

99

*/

100

public StreamEvent(Map<String, String> headers, ByteBuffer body);

101

102

/**

103

* Create stream event from existing event data with timestamp

104

* @param data Existing stream event data

105

* @param timestamp Event timestamp in milliseconds

106

*/

107

public StreamEvent(StreamEventData data, long timestamp);

108

109

/**

110

* Copy constructor

111

* @param event Stream event to copy

112

*/

113

public StreamEvent(StreamEvent event);

114

115

/**

116

* Create stream event with all parameters

117

* @param headers Event headers

118

* @param body Event body data

119

* @param timestamp Event timestamp in milliseconds since epoch

120

*/

121

public StreamEvent(Map<String, String> headers, ByteBuffer body, long timestamp);

122

123

/**

124

* Get event timestamp

125

* @return Timestamp in milliseconds since epoch

126

*/

127

public long getTimestamp();

128

}

129

```

130

131

**Usage Examples:**

132

133

```java

134

import java.nio.ByteBuffer;

135

import java.util.Collections;

136

import java.util.HashMap;

137

import java.util.Map;

138

139

// Create event with current timestamp

140

Map<String, String> headers = new HashMap<>();

141

headers.put("deviceId", "device-123");

142

headers.put("location", "warehouse-A");

143

144

ByteBuffer data = ByteBuffer.wrap("sensor reading: 42.3".getBytes());

145

StreamEvent event = new StreamEvent(headers, data);

146

147

long timestamp = event.getTimestamp(); // Current time in milliseconds

148

149

// Create event with specific timestamp

150

long specificTime = System.currentTimeMillis() - 3600000; // 1 hour ago

151

StreamEvent historicalEvent = new StreamEvent(headers, data, specificTime);

152

153

// Copy existing event

154

StreamEvent copiedEvent = new StreamEvent(event);

155

156

// Create from existing StreamEventData

157

StreamEventData baseData = new StreamEventData(headers, data);

158

long eventTime = System.currentTimeMillis();

159

StreamEvent fromData = new StreamEvent(baseData, eventTime);

160

```

161

162

### Stream Event Decoders

163

164

Interface for converting stream events into structured key-value pairs with custom processing logic.

165

166

```java { .api }

167

/**

168

* Interface for decoding stream events into key-value pairs

169

* @param <K> Type of decoded key

170

* @param <V> Type of decoded value

171

*/

172

public interface StreamEventDecoder<K, V> {

173

/**

174

* Decode stream event into key-value pair

175

* @param event Stream event to decode

176

* @param result Reusable result object for decoded output

177

* @return Decode result (may be same instance as result parameter)

178

*/

179

DecodeResult<K, V> decode(StreamEvent event, DecodeResult<K, V> result);

180

181

}

182

183

/**

184

* Container for decoded key-value pair result

185

* Note: Not thread-safe, reuse for performance

186

* @param <K> Key type

187

* @param <V> Value type

188

*/

189

public final class DecodeResult<K, V> {

190

/**

191

* Get decoded key

192

* @return Key value

193

*/

194

public K getKey();

195

196

/**

197

* Set decoded key

198

* @param key Key value

199

* @return This result for chaining

200

*/

201

public DecodeResult<K, V> setKey(K key);

202

203

/**

204

* Get decoded value

205

* @return Value

206

*/

207

public V getValue();

208

209

/**

210

* Set decoded value

211

* @param value Value

212

* @return This result for chaining

213

*/

214

public DecodeResult<K, V> setValue(V value);

215

}

216

```

217

218

**Usage Examples:**

219

220

```java

221

import com.google.gson.Gson;

222

import com.google.gson.JsonObject;

223

224

// Example: JSON event decoder that extracts user ID as key and full data as value

225

public class JsonEventDecoder implements StreamEventDecoder<String, JsonObject> {

226

private final Gson gson = new Gson();

227

228

@Override

229

public DecodeResult<String, JsonObject> decode(StreamEvent event, DecodeResult<String, JsonObject> result) {

230

// Extract body as JSON

231

ByteBuffer body = event.getBody();

232

String jsonString = new String(body.array());

233

JsonObject jsonData = gson.fromJson(jsonString, JsonObject.class);

234

235

// Extract user ID from JSON as key

236

String userId = jsonData.get("userId").getAsString();

237

238

// Set result

239

return result.setKey(userId).setValue(jsonData);

240

}

241

}

242

243

// Usage of decoder

244

JsonEventDecoder decoder = new JsonEventDecoder();

245

StreamEventDecoder.DecodeResult<String, JsonObject> result = new StreamEventDecoder.DecodeResult<>();

246

247

// Create sample event

248

Map<String, String> headers = Collections.singletonMap("type", "user_action");

249

String jsonBody = "{\"userId\": \"user123\", \"action\": \"login\", \"timestamp\": 1623456789}";

250

ByteBuffer body = ByteBuffer.wrap(jsonBody.getBytes());

251

StreamEvent event = new StreamEvent(headers, body);

252

253

// Decode event

254

DecodeResult<String, JsonObject> decodedResult = decoder.decode(event, result);

255

String key = decodedResult.getKey(); // "user123"

256

JsonObject value = decodedResult.getValue(); // Full JSON object

257

258

// Reuse result object for performance

259

StreamEvent nextEvent = new StreamEvent(headers,

260

ByteBuffer.wrap("{\"userId\": \"user456\", \"action\": \"logout\"}".getBytes()));

261

decoder.decode(nextEvent, result); // Reuses same result object

262

```

263

264

### Custom Event Processing Patterns

265

266

Common patterns for processing stream events in data pipelines.

267

268

**Event Filtering:**

269

270

```java

271

import java.util.function.Predicate;

272

273

public class EventFilter {

274

public static Predicate<StreamEvent> byHeader(String headerKey, String expectedValue) {

275

return event -> expectedValue.equals(event.getHeaders().get(headerKey));

276

}

277

278

public static Predicate<StreamEvent> byTimestamp(long minTimestamp, long maxTimestamp) {

279

return event -> {

280

long timestamp = event.getTimestamp();

281

return timestamp >= minTimestamp && timestamp <= maxTimestamp;

282

};

283

}

284

}

285

286

// Usage

287

Predicate<StreamEvent> deviceFilter = EventFilter.byHeader("deviceType", "sensor");

288

Predicate<StreamEvent> timeFilter = EventFilter.byTimestamp(

289

System.currentTimeMillis() - 3600000, // 1 hour ago

290

System.currentTimeMillis()

291

);

292

293

// Filter events

294

List<StreamEvent> events = getStreamEvents();

295

List<StreamEvent> filteredEvents = events.stream()

296

.filter(deviceFilter.and(timeFilter))

297

.collect(Collectors.toList());

298

```

299

300

**Event Transformation:**

301

302

```java

303

import java.util.function.Function;

304

305

public class EventTransformer {

306

public static Function<StreamEvent, StreamEvent> addHeader(String key, String value) {

307

return event -> {

308

Map<String, String> newHeaders = new HashMap<>(event.getHeaders());

309

newHeaders.put(key, value);

310

return new StreamEvent(newHeaders, event.getBody(), event.getTimestamp());

311

};

312

}

313

314

public static Function<StreamEvent, StreamEvent> updateTimestamp(long newTimestamp) {

315

return event -> new StreamEvent(event.getHeaders(), event.getBody(), newTimestamp);

316

}

317

}

318

319

// Usage

320

Function<StreamEvent, StreamEvent> addProcessingTime =

321

EventTransformer.addHeader("processedAt", String.valueOf(System.currentTimeMillis()));

322

323

StreamEvent originalEvent = new StreamEvent(headers, body);

324

StreamEvent enrichedEvent = addProcessingTime.apply(originalEvent);

325

```

326

327

**Batch Event Processing:**

328

329

```java

330

import java.util.List;

331

import java.util.concurrent.CompletableFuture;

332

333

public class BatchEventProcessor {

334

private final StreamEventDecoder<String, Object> decoder;

335

private final int batchSize;

336

337

public BatchEventProcessor(StreamEventDecoder<String, Object> decoder, int batchSize) {

338

this.decoder = decoder;

339

this.batchSize = batchSize;

340

}

341

342

public CompletableFuture<Void> processBatch(List<StreamEvent> events) {

343

return CompletableFuture.runAsync(() -> {

344

StreamEventDecoder.DecodeResult<String, Object> result =

345

new StreamEventDecoder.DecodeResult<>();

346

347

for (StreamEvent event : events) {

348

decoder.decode(event, result);

349

// Process decoded key-value pair

350

processKeyValue(result.getKey(), result.getValue());

351

}

352

});

353

}

354

355

private void processKeyValue(String key, Object value) {

356

// Custom processing logic

357

System.out.println("Processing: " + key + " -> " + value);

358

}

359

}

360

```

361

362

## Event Data Access Patterns

363

364

### Header-Based Routing

365

366

```java

367

public class EventRouter {

368

public String determineRoute(StreamEvent event) {

369

Map<String, String> headers = event.getHeaders();

370

371

String eventType = headers.get("type");

372

String priority = headers.get("priority");

373

374

if ("error".equals(eventType)) {

375

return "error-processing-queue";

376

} else if ("high".equals(priority)) {

377

return "priority-queue";

378

} else {

379

return "standard-queue";

380

}

381

}

382

}

383

```

384

385

### Body Content Inspection

386

387

```java

388

public class ContentAnalyzer {

389

public boolean containsKeyword(StreamEvent event, String keyword) {

390

ByteBuffer body = event.getBody();

391

String content = new String(body.array());

392

return content.toLowerCase().contains(keyword.toLowerCase());

393

}

394

395

public int getContentLength(StreamEvent event) {

396

return event.getBody().remaining();

397

}

398

}

399

```

400

401

### Temporal Event Processing

402

403

```java

404

public class TemporalProcessor {

405

private static final long FIVE_MINUTES = 5 * 60 * 1000; // 5 minutes in milliseconds

406

407

public boolean isRecentEvent(StreamEvent event) {

408

long eventTime = event.getTimestamp();

409

long currentTime = System.currentTimeMillis();

410

return (currentTime - eventTime) <= FIVE_MINUTES;

411

}

412

413

public List<StreamEvent> groupByTimeWindow(List<StreamEvent> events, long windowSizeMs) {

414

return events.stream()

415

.sorted((e1, e2) -> Long.compare(e1.getTimestamp(), e2.getTimestamp()))

416

.collect(Collectors.toList());

417

}

418

}

419

```

420

421

## Performance Considerations

422

423

### Memory Management

424

425

- StreamEvent objects are lightweight but contain references to header maps and ByteBuffers

426

- Reuse DecodeResult objects in tight processing loops to reduce garbage collection

427

- ByteBuffer body data shares underlying byte arrays, so modifications affect all references

428

429

### Threading Considerations

430

431

- StreamEvent and StreamEventData are immutable after construction (thread-safe for reading)

432

- DecodeResult is explicitly marked as not thread-safe - use separate instances per thread

433

- Header maps returned by getHeaders() are immutable collections

434

435

### Processing Efficiency

436

437

- Access headers by key is O(1) using hash-based lookup

438

- ByteBuffer body access is direct memory access (very fast)

439

- Timestamp access is a simple field read (no computation)

440

- Decoder interface allows for efficient stream processing without object allocation per event