0
# Hybrid Source System
1
2
The Hybrid Source System enables seamless switching between multiple underlying sources based on configured source chains. It supports both static pre-configured sources and dynamic source creation with position transfer between sources.
3
4
## Core Components
5
6
### HybridSource
7
8
The main class that coordinates multiple underlying sources.
9
10
```java { .api }
11
@PublicEvolving
12
public class HybridSource<T> implements Source<T, HybridSourceSplit, HybridSourceEnumeratorState> {
13
14
// Static builder methods
15
public static <T, EnumT extends SplitEnumerator> HybridSourceBuilder<T, EnumT> builder(
16
Source<T, ?, ?> firstSource)
17
18
// Source interface methods
19
public Boundedness getBoundedness()
20
public SourceReader<T, HybridSourceSplit> createReader(SourceReaderContext readerContext) throws Exception
21
public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> createEnumerator(
22
SplitEnumeratorContext<HybridSourceSplit> enumContext)
23
public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> restoreEnumerator(
24
SplitEnumeratorContext<HybridSourceSplit> enumContext,
25
HybridSourceEnumeratorState checkpoint) throws Exception
26
public SimpleVersionedSerializer<HybridSourceSplit> getSplitSerializer()
27
public SimpleVersionedSerializer<HybridSourceEnumeratorState> getEnumeratorCheckpointSerializer()
28
}
29
```
30
31
### HybridSourceBuilder
32
33
Builder for constructing hybrid sources with multiple underlying sources.
34
35
```java { .api }
36
@PublicEvolving
37
public static class HybridSourceBuilder<T, EnumT extends SplitEnumerator> implements Serializable {
38
39
// Add pre-configured source
40
public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
41
HybridSourceBuilder<T, ToEnumT> addSource(NextSourceT source)
42
43
// Add source with deferred instantiation
44
public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>
45
HybridSourceBuilder<T, ToEnumT> addSource(
46
SourceFactory<T, NextSourceT, ? super EnumT> sourceFactory,
47
Boundedness boundedness)
48
49
// Build the hybrid source
50
public HybridSource<T> build()
51
}
52
```
53
54
### SourceFactory
55
56
Factory interface for creating sources with dynamic configuration.
57
58
```java { .api }
59
@PublicEvolving
60
@FunctionalInterface
61
public interface SourceFactory<T, SourceT extends Source<T, ?, ?>, FromEnumT extends SplitEnumerator>
62
extends Serializable {
63
SourceT create(SourceSwitchContext<FromEnumT> context)
64
}
65
```
66
67
### SourceSwitchContext
68
69
Context provided to source factory for position transfer.
70
71
```java { .api }
72
@PublicEvolving
73
public interface SourceSwitchContext<EnumT> {
74
EnumT getPreviousEnumerator()
75
}
76
```
77
78
## Implementation Examples
79
80
### Simple Hybrid Source
81
82
```java
83
// Create a hybrid source that reads files first, then switches to Kafka
84
public class FileToKafkaHybridSource {
85
86
public static HybridSource<String> create(
87
Path filePath,
88
String kafkaBootstrapServers,
89
String kafkaTopic) {
90
91
// Create file source
92
FileSource<String> fileSource = FileSource
93
.forRecordStreamFormat(new TextLineInputFormat(), filePath)
94
.build();
95
96
// Create Kafka source
97
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
98
.setBootstrapServers(kafkaBootstrapServers)
99
.setTopics(kafkaTopic)
100
.setGroupId("hybrid-consumer")
101
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
102
.setStartingOffsets(OffsetsInitializer.earliest())
103
.build();
104
105
// Build hybrid source
106
return HybridSource.builder(fileSource)
107
.addSource(kafkaSource)
108
.build();
109
}
110
}
111
112
// Usage
113
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
114
115
HybridSource<String> hybridSource = FileToKafkaHybridSource.create(
116
Paths.get("/path/to/input/files"),
117
"localhost:9092",
118
"input-events"
119
);
120
121
DataStream<String> stream = env.fromSource(
122
hybridSource,
123
WatermarkStrategy.noWatermarks(),
124
"hybrid-file-kafka-source"
125
);
126
```
127
128
### Dynamic Position Transfer
129
130
```java
131
// Advanced hybrid source with position transfer from file timestamps to Kafka offsets
132
public class TimestampBasedHybridSource {
133
134
public static HybridSource<EventRecord> createWithTimestampTransfer(
135
Path filePath,
136
String kafkaBootstrapServers,
137
String kafkaTopic) {
138
139
// Create timestamped file source
140
TimestampedFileSource fileSource = new TimestampedFileSource(filePath);
141
142
// Build hybrid source with dynamic Kafka configuration
143
return HybridSource.<EventRecord, TimestampedFileEnumerator>builder(fileSource)
144
.addSource(
145
switchContext -> {
146
// Get the previous enumerator to extract end timestamp
147
TimestampedFileEnumerator fileEnumerator = switchContext.getPreviousEnumerator();
148
long endTimestamp = fileEnumerator.getMaxTimestamp();
149
150
LOG.info("Switching from file source to Kafka at timestamp: {}", endTimestamp);
151
152
// Create Kafka source starting from the file's end timestamp
153
return KafkaSource.<EventRecord>builder()
154
.setBootstrapServers(kafkaBootstrapServers)
155
.setTopics(kafkaTopic)
156
.setGroupId("hybrid-consumer-" + UUID.randomUUID())
157
.setDeserializer(new EventRecordDeserializer())
158
.setStartingOffsets(OffsetsInitializer.timestamp(endTimestamp))
159
.build();
160
},
161
Boundedness.CONTINUOUS_UNBOUNDED
162
)
163
.build();
164
}
165
}
166
167
// Custom file source that tracks timestamps
168
public class TimestampedFileSource implements Source<EventRecord, TimestampedFileSplit, TimestampedFileEnumeratorState> {
169
private final Path filePath;
170
171
public TimestampedFileSource(Path filePath) {
172
this.filePath = filePath;
173
}
174
175
@Override
176
public SplitEnumerator<TimestampedFileSplit, TimestampedFileEnumeratorState> createEnumerator(
177
SplitEnumeratorContext<TimestampedFileSplit> enumContext) {
178
return new TimestampedFileEnumerator(enumContext, filePath);
179
}
180
181
@Override
182
public SourceReader<EventRecord, TimestampedFileSplit> createReader(
183
SourceReaderContext readerContext) {
184
return new TimestampedFileReader(readerContext);
185
}
186
187
@Override
188
public Boundedness getBoundedness() {
189
return Boundedness.BOUNDED;
190
}
191
192
// ... other required methods
193
}
194
195
// Enumerator that tracks maximum timestamp seen
196
public class TimestampedFileEnumerator implements SplitEnumerator<TimestampedFileSplit, TimestampedFileEnumeratorState> {
197
private final SplitEnumeratorContext<TimestampedFileSplit> context;
198
private final Path filePath;
199
private long maxTimestamp = 0;
200
private boolean splitsAssigned = false;
201
202
public TimestampedFileEnumerator(SplitEnumeratorContext<TimestampedFileSplit> context, Path filePath) {
203
this.context = context;
204
this.filePath = filePath;
205
}
206
207
@Override
208
public void start() {
209
// Assign splits for file reading
210
if (!splitsAssigned) {
211
List<TimestampedFileSplit> splits = createFileSplits();
212
context.assignSplits(new SplitsAssignment<>(
213
Collections.singletonMap(0, splits) // Assign to reader 0
214
));
215
splitsAssigned = true;
216
}
217
}
218
219
@Override
220
public void handleSplitRequest(int subtaskId, String requesterHostname) {
221
// No more splits to assign
222
}
223
224
@Override
225
public void addSplitsBack(List<TimestampedFileSplit> splits, int subtaskId) {
226
// Re-assign splits if needed
227
context.assignSplits(new SplitsAssignment<>(
228
Collections.singletonMap(subtaskId, splits)
229
));
230
}
231
232
@Override
233
public void addReader(int subtaskId) {
234
// New reader registered
235
}
236
237
@Override
238
public TimestampedFileEnumeratorState snapshotState(long checkpointId) {
239
return new TimestampedFileEnumeratorState(maxTimestamp, splitsAssigned);
240
}
241
242
@Override
243
public void notifyCheckpointComplete(long checkpointId) {
244
// Checkpoint completed
245
}
246
247
@Override
248
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
249
if (sourceEvent instanceof TimestampUpdateEvent) {
250
TimestampUpdateEvent timestampEvent = (TimestampUpdateEvent) sourceEvent;
251
maxTimestamp = Math.max(maxTimestamp, timestampEvent.getTimestamp());
252
LOG.debug("Updated max timestamp to: {}", maxTimestamp);
253
}
254
}
255
256
@Override
257
public void close() {
258
// Cleanup resources
259
}
260
261
public long getMaxTimestamp() {
262
return maxTimestamp;
263
}
264
265
private List<TimestampedFileSplit> createFileSplits() {
266
// Create splits for the file
267
return Collections.singletonList(
268
new TimestampedFileSplit("file-split-0", filePath, 0, getFileSize(filePath))
269
);
270
}
271
}
272
273
// Event to communicate timestamp updates
274
public class TimestampUpdateEvent implements SourceEvent {
275
private final long timestamp;
276
277
public TimestampUpdateEvent(long timestamp) {
278
this.timestamp = timestamp;
279
}
280
281
public long getTimestamp() {
282
return timestamp;
283
}
284
}
285
```
286
287
### Multi-Stage Hybrid Source
288
289
```java
290
// Complex hybrid source with multiple stages: Archive → Recent Files → Real-time Stream
291
public class MultiStageHybridSource {
292
293
public static HybridSource<LogRecord> createLogProcessingSource(
294
Path archivePath,
295
Path recentPath,
296
String streamingEndpoint) {
297
298
// Stage 1: Archive files (oldest data)
299
FileSource<LogRecord> archiveSource = FileSource
300
.forRecordStreamFormat(new LogRecordFormat(), archivePath)
301
.build();
302
303
// Stage 2: Recent files (newer data)
304
FileSource<LogRecord> recentSource = FileSource
305
.forRecordStreamFormat(new LogRecordFormat(), recentPath)
306
.monitorContinuously(Duration.ofSeconds(10)) // Monitor for new files
307
.build();
308
309
return HybridSource.<LogRecord, FileSourceEnumerator>builder(archiveSource)
310
// Add recent files stage
311
.addSource(
312
switchContext -> {
313
FileSourceEnumerator archiveEnumerator = switchContext.getPreviousEnumerator();
314
long maxProcessedTime = archiveEnumerator.getMaxProcessedTimestamp();
315
316
// Configure recent source to start after archive data
317
return FileSource
318
.forRecordStreamFormat(new LogRecordFormat(), recentPath)
319
.monitorContinuously(Duration.ofSeconds(10))
320
.setFilenameFilter(path -> getFileTimestamp(path) > maxProcessedTime)
321
.build();
322
},
323
Boundedness.BOUNDED // Recent files are bounded
324
)
325
// Add real-time streaming stage
326
.addSource(
327
switchContext -> {
328
// Get end time from recent files
329
FileSourceEnumerator recentEnumerator = switchContext.getPreviousEnumerator();
330
long streamStartTime = recentEnumerator.getMaxProcessedTimestamp();
331
332
// Create streaming source starting from where files ended
333
return new LogStreamSource(streamingEndpoint, streamStartTime);
334
},
335
Boundedness.CONTINUOUS_UNBOUNDED // Streaming is unbounded
336
)
337
.build();
338
}
339
340
private static long getFileTimestamp(Path path) {
341
// Extract timestamp from filename or file attributes
342
String filename = path.getFileName().toString();
343
// Assuming filename like "logs-2024-01-01-12-00.txt"
344
// Parse and return timestamp
345
return parseTimestampFromFilename(filename);
346
}
347
}
348
```
349
350
### Source Factory Patterns
351
352
```java
353
// Factory for database sources with connection pooling
354
public class DatabaseSourceFactory implements SourceFactory<DatabaseRecord, DatabaseSource, Object> {
355
private final String connectionUrl;
356
private final String query;
357
private final DataSource dataSource;
358
359
public DatabaseSourceFactory(String connectionUrl, String query) {
360
this.connectionUrl = connectionUrl;
361
this.query = query;
362
this.dataSource = createDataSource(connectionUrl);
363
}
364
365
@Override
366
public DatabaseSource create(SourceSwitchContext<Object> context) {
367
// Create database source with existing connection pool
368
return new DatabaseSource(dataSource, query);
369
}
370
371
private DataSource createDataSource(String url) {
372
HikariConfig config = new HikariConfig();
373
config.setJdbcUrl(url);
374
config.setMaximumPoolSize(10);
375
config.setMinimumIdle(2);
376
return new HikariDataSource(config);
377
}
378
}
379
380
// Factory with conditional source creation
381
public class ConditionalSourceFactory implements SourceFactory<String, Source<String, ?, ?>, FileSourceEnumerator> {
382
383
@Override
384
public Source<String, ?, ?> create(SourceSwitchContext<FileSourceEnumerator> context) {
385
FileSourceEnumerator previousEnumerator = context.getPreviousEnumerator();
386
387
// Choose next source based on previous source state
388
if (previousEnumerator.getProcessedRecordCount() > 1_000_000) {
389
// Large dataset - use Kafka for scalability
390
return createKafkaSource();
391
} else if (previousEnumerator.hasErrors()) {
392
// Previous source had errors - use reliable source
393
return createDatabaseSource();
394
} else {
395
// Normal case - use default file source
396
return createFileSource();
397
}
398
}
399
400
private Source<String, ?, ?> createKafkaSource() {
401
return KafkaSource.<String>builder()
402
.setBootstrapServers("localhost:9092")
403
.setTopics("high-volume-topic")
404
.setGroupId("hybrid-high-volume")
405
.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
406
.setStartingOffsets(OffsetsInitializer.latest())
407
.build();
408
}
409
410
private Source<String, ?, ?> createDatabaseSource() {
411
return JdbcSource.<String>builder()
412
.setDrivername("com.mysql.cj.jdbc.Driver")
413
.setDBUrl("jdbc:mysql://localhost:3306/backup")
414
.setQuery("SELECT data FROM backup_records WHERE processed = false")
415
.setRowTypeInfo(Types.STRING)
416
.build();
417
}
418
419
private Source<String, ?, ?> createFileSource() {
420
return FileSource
421
.forRecordStreamFormat(new TextLineInputFormat(), Paths.get("/backup/files"))
422
.build();
423
}
424
}
425
```
426
427
## State Management and Checkpointing
428
429
### HybridSourceEnumeratorState
430
431
```java { .api }
432
public class HybridSourceEnumeratorState {
433
private final int currentSourceIndex;
434
private final byte[] currentEnumeratorState;
435
private final List<HybridSourceSplit> remainingSplits;
436
437
// Constructor and methods for state management
438
public HybridSourceEnumeratorState(
439
int currentSourceIndex,
440
byte[] currentEnumeratorState,
441
List<HybridSourceSplit> remainingSplits)
442
443
public int getCurrentSourceIndex()
444
public byte[] getCurrentEnumeratorState()
445
public List<HybridSourceSplit> getRemainingSplits()
446
}
447
```
448
449
### Custom State Serialization
450
451
```java
452
public class CustomHybridSourceStateSerializer
453
implements SimpleVersionedSerializer<HybridSourceEnumeratorState> {
454
455
@Override
456
public int getVersion() {
457
return 1;
458
}
459
460
@Override
461
public byte[] serialize(HybridSourceEnumeratorState state) throws IOException {
462
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
463
DataOutputStream out = new DataOutputStream(baos)) {
464
465
// Write current source index
466
out.writeInt(state.getCurrentSourceIndex());
467
468
// Write enumerator state
469
byte[] enumState = state.getCurrentEnumeratorState();
470
out.writeInt(enumState.length);
471
out.write(enumState);
472
473
// Write remaining splits
474
List<HybridSourceSplit> splits = state.getRemainingSplits();
475
out.writeInt(splits.size());
476
for (HybridSourceSplit split : splits) {
477
serializeSplit(split, out);
478
}
479
480
return baos.toByteArray();
481
}
482
}
483
484
@Override
485
public HybridSourceEnumeratorState deserialize(int version, byte[] serialized) throws IOException {
486
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
487
DataInputStream in = new DataInputStream(bais)) {
488
489
// Read current source index
490
int currentSourceIndex = in.readInt();
491
492
// Read enumerator state
493
int enumStateLength = in.readInt();
494
byte[] enumState = new byte[enumStateLength];
495
in.readFully(enumState);
496
497
// Read remaining splits
498
int splitsCount = in.readInt();
499
List<HybridSourceSplit> splits = new ArrayList<>(splitsCount);
500
for (int i = 0; i < splitsCount; i++) {
501
splits.add(deserializeSplit(in));
502
}
503
504
return new HybridSourceEnumeratorState(currentSourceIndex, enumState, splits);
505
}
506
}
507
}
508
```
509
510
## Configuration Examples
511
512
### Basic Configuration
513
514
```java
515
// Simple file-to-stream hybrid
516
HybridSource<String> basicHybrid = HybridSource.builder(fileSource)
517
.addSource(streamSource)
518
.build();
519
520
// Configure in job
521
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
522
env.enableCheckpointing(Duration.ofMinutes(1));
523
524
DataStream<String> stream = env.fromSource(
525
basicHybrid,
526
WatermarkStrategy.noWatermarks(),
527
"basic-hybrid-source"
528
);
529
```
530
531
### Advanced Configuration with Position Transfer
532
533
```java
534
public class AdvancedHybridConfiguration {
535
536
public static HybridSource<TransactionRecord> createTransactionSource() {
537
// Historical data from data lake
538
ParquetSource<TransactionRecord> historicalSource = ParquetSource
539
.forRecords(TransactionRecord.class, Paths.get("s3://data-lake/transactions/"))
540
.withParallelism(16)
541
.build();
542
543
return HybridSource.<TransactionRecord, ParquetSourceEnumerator>builder(historicalSource)
544
// Recent data from database
545
.addSource(
546
switchContext -> {
547
ParquetSourceEnumerator histEnumerator = switchContext.getPreviousEnumerator();
548
Instant cutoffTime = histEnumerator.getMaxRecordTimestamp();
549
550
return JdbcSource.<TransactionRecord>builder()
551
.setDrivername("org.postgresql.Driver")
552
.setDBUrl("jdbc:postgresql://localhost:5432/transactions")
553
.setQuery("SELECT * FROM transactions WHERE created_at > ?")
554
.setParametersProvider(new TimestampParameterProvider(cutoffTime))
555
.setRowTypeInfo(TransactionRecord.getTypeInfo())
556
.build();
557
},
558
Boundedness.BOUNDED
559
)
560
// Live stream
561
.addSource(
562
switchContext -> {
563
// Get end time from database source
564
JdbcSourceEnumerator dbEnumerator = switchContext.getPreviousEnumerator();
565
Instant streamStart = dbEnumerator.getMaxProcessedTimestamp();
566
567
return KafkaSource.<TransactionRecord>builder()
568
.setBootstrapServers("kafka-cluster:9092")
569
.setTopics("live-transactions")
570
.setGroupId("hybrid-transaction-processor")
571
.setDeserializer(new TransactionRecordDeserializer())
572
.setStartingOffsets(OffsetsInitializer.timestamp(streamStart.toEpochMilli()))
573
.build();
574
},
575
Boundedness.CONTINUOUS_UNBOUNDED
576
)
577
.build();
578
}
579
}
580
```
581
582
## Best Practices
583
584
### Position Transfer
585
586
1. **Implement Precise Timestamp Tracking**
587
```java
588
public class TimestampTrackingEnumerator implements SplitEnumerator<MySplit, MyEnumeratorState> {
589
private volatile Instant maxProcessedTimestamp = Instant.MIN;
590
591
@Override
592
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
593
if (sourceEvent instanceof TimestampProgressEvent) {
594
TimestampProgressEvent event = (TimestampProgressEvent) sourceEvent;
595
synchronized (this) {
596
if (event.getTimestamp().isAfter(maxProcessedTimestamp)) {
597
maxProcessedTimestamp = event.getTimestamp();
598
}
599
}
600
}
601
}
602
603
public Instant getMaxProcessedTimestamp() {
604
return maxProcessedTimestamp;
605
}
606
}
607
```
608
609
2. **Handle Clock Skew and Overlap**
610
```java
611
public class OverlapHandlingSourceFactory implements SourceFactory<Event, KafkaSource<Event>, FileSourceEnumerator> {
612
private final Duration overlapBuffer;
613
614
@Override
615
public KafkaSource<Event> create(SourceSwitchContext<FileSourceEnumerator> context) {
616
FileSourceEnumerator prevEnum = context.getPreviousEnumerator();
617
Instant switchTime = prevEnum.getMaxProcessedTimestamp();
618
619
// Start slightly before the file source ended to handle clock skew
620
Instant kafkaStartTime = switchTime.minus(overlapBuffer);
621
622
return KafkaSource.<Event>builder()
623
.setStartingOffsets(OffsetsInitializer.timestamp(kafkaStartTime.toEpochMilli()))
624
.setDeserializer(new DeduplicatingEventDeserializer(switchTime)) // Handle duplicates
625
.build();
626
}
627
}
628
```
629
630
### Resource Management
631
632
1. **Efficient Source Lifecycle Management**
633
```java
634
public class ResourceAwareHybridSource {
635
636
public static <T> HybridSource<T> createWithResourceManagement(
637
List<Source<T, ?, ?>> sources) {
638
639
HybridSourceBuilder<T, ?> builder = null;
640
641
for (int i = 0; i < sources.size(); i++) {
642
final int sourceIndex = i;
643
644
if (builder == null) {
645
builder = HybridSource.builder(sources.get(0));
646
} else {
647
builder = builder.addSource(
648
switchContext -> {
649
// Cleanup previous source resources if needed
650
cleanupPreviousSource(switchContext.getPreviousEnumerator());
651
652
// Pre-warm next source
653
Source<T, ?, ?> nextSource = sources.get(sourceIndex);
654
prewarmSource(nextSource);
655
656
return nextSource;
657
},
658
sources.get(sourceIndex).getBoundedness()
659
);
660
}
661
}
662
663
return builder.build();
664
}
665
666
private static void cleanupPreviousSource(SplitEnumerator previousEnumerator) {
667
if (previousEnumerator instanceof ResourceManager) {
668
((ResourceManager) previousEnumerator).releaseResources();
669
}
670
}
671
672
private static <T> void prewarmSource(Source<T, ?, ?> source) {
673
if (source instanceof Prewarmable) {
674
((Prewarmable) source).prewarm();
675
}
676
}
677
}
678
```
679
680
2. **Memory and Performance Optimization**
681
```java
682
public class OptimizedHybridSource {
683
684
public static HybridSource<Record> createOptimized(SourceChainConfig config) {
685
686
return HybridSource.builder(createFirstSource(config))
687
.addSource(
688
switchContext -> {
689
// Adjust parallelism based on data volume
690
int optimalParallelism = calculateOptimalParallelism(
691
switchContext.getPreviousEnumerator()
692
);
693
694
return createSecondSource(config)
695
.withParallelism(optimalParallelism);
696
},
697
Boundedness.CONTINUOUS_UNBOUNDED
698
)
699
.build();
700
}
701
702
private static int calculateOptimalParallelism(SplitEnumerator previousEnumerator) {
703
if (previousEnumerator instanceof MetricsProvider) {
704
MetricsProvider metricsProvider = (MetricsProvider) previousEnumerator;
705
long recordsPerSecond = metricsProvider.getRecordsPerSecond();
706
707
// Scale parallelism based on throughput
708
return Math.max(1, (int) (recordsPerSecond / 10000)); // 10k records per subtask
709
}
710
711
return Runtime.getRuntime().availableProcessors();
712
}
713
}
714
```
715
716
The Hybrid Source System provides a powerful framework for building complex data ingestion pipelines that can seamlessly transition between different data sources while maintaining exactly-once processing guarantees and efficient resource utilization.