0
# Stream Processing
1
2
Real-time stream processing capabilities with coordination, file management, partitioning, and multiple decoder support for various data formats. The stream processing system provides essential functionality for handling high-throughput real-time data ingestion, processing, and consumption within the CDAP platform.
3
4
## Capabilities
5
6
### Stream Administration
7
8
The primary interface for stream lifecycle management and configuration with comprehensive administrative operations.
9
10
```java { .api }
11
public interface StreamAdmin {
12
// Stream lifecycle operations
13
void create(StreamId streamId) throws Exception;
14
void create(StreamId streamId, Map<String, String> properties) throws Exception;
15
void drop(StreamId streamId) throws Exception;
16
void truncate(StreamId streamId) throws Exception;
17
18
// Stream configuration management
19
void updateConfig(StreamId streamId, StreamProperties properties) throws Exception;
20
StreamProperties getConfig(StreamId streamId) throws Exception;
21
22
// Stream metadata and queries
23
StreamSpecification getSpecification(StreamId streamId) throws Exception;
24
List<StreamSpecification> listStreams(NamespaceId namespaceId) throws Exception;
25
boolean exists(StreamId streamId) throws Exception;
26
27
// Administrative operations
28
void upgrade() throws Exception;
29
30
// Stream statistics and monitoring
31
StreamStats getStats(StreamId streamId) throws Exception;
32
long getSize(StreamId streamId) throws Exception;
33
}
34
```
35
36
### Stream View Management
37
38
Stream view store interface for managing logical views over streams with comprehensive CRUD operations and view listing capabilities.
39
40
```java { .api }
41
public interface ViewStore {
42
/**
43
* Creates a view or updates if it already exists.
44
* @param viewId the view identifier
45
* @param config the view configuration specification
46
* @return true if a new view was created, false if updated
47
*/
48
boolean createOrUpdate(StreamViewId viewId, ViewSpecification config);
49
50
/**
51
* Checks if a view exists.
52
* @param viewId the view identifier
53
* @return true if the view exists
54
*/
55
boolean exists(StreamViewId viewId);
56
57
/**
58
* Deletes a stream view.
59
* @param viewId the view identifier to delete
60
*/
61
void delete(StreamViewId viewId) throws NotFoundException;
62
63
/**
64
* Lists all views for a given stream.
65
* @param streamId the stream identifier
66
* @return list of view identifiers for the stream
67
*/
68
List<StreamViewId> list(StreamId streamId);
69
70
/**
71
* Gets the details of a specific view.
72
* @param viewId the view identifier
73
* @return the view details including configuration and metadata
74
*/
75
ViewDetail get(StreamViewId viewId) throws NotFoundException;
76
}
77
```
78
79
### Stream Coordination
80
81
Stream coordination client for managing distributed stream processing with service lifecycle management.
82
83
```java { .api }
84
public interface StreamCoordinatorClient extends Service {
85
// Stream coordination operations
86
void createStream(StreamId streamId, Map<String, String> properties) throws Exception;
87
void deleteStream(StreamId streamId) throws Exception;
88
89
// Stream configuration coordination
90
void updateStreamProperties(StreamId streamId, StreamProperties properties) throws Exception;
91
StreamProperties getStreamProperties(StreamId streamId) throws Exception;
92
93
// Consumer coordination
94
void addConsumerGroup(StreamId streamId, long groupId) throws Exception;
95
void removeConsumerGroup(StreamId streamId, long groupId) throws Exception;
96
97
// Partition management
98
int getPartitionCount(StreamId streamId) throws Exception;
99
void setPartitionCount(StreamId streamId, int partitions) throws Exception;
100
}
101
```
102
103
### Stream File Management
104
105
Stream file writers and management for persistent stream storage with partitioning and format support.
106
107
```java { .api }
108
// Stream file writer factory
109
public interface StreamFileWriterFactory {
110
StreamFileWriter create(StreamId streamId, int partition);
111
StreamFileWriter create(StreamId streamId, int partition, StreamFileType fileType);
112
}
113
114
// Time-partitioned stream file writer for temporal data organization
115
public class TimePartitionedStreamFileWriter implements StreamFileWriter {
116
// Time-based partitioning with configurable partition intervals
117
public void append(StreamEvent event) throws IOException;
118
public void flush() throws IOException;
119
public void close() throws IOException;
120
121
// Partition management
122
public void rotatePartition() throws IOException;
123
public String getCurrentPartitionPath();
124
public long getCurrentPartitionTimestamp();
125
}
126
127
// Stream file type enumeration
128
public enum StreamFileType {
129
EVENT, // Standard event files
130
INDEX, // Index files for efficient seeking
131
META, // Metadata files
132
TEMP // Temporary files during processing
133
}
134
```
135
136
### Stream Input Processing
137
138
Stream input processing components for handling various data sources and input formats.
139
140
```java { .api }
141
// Stream input split finder for distributed processing
142
public class StreamInputSplitFinder<T> {
143
public List<StreamInputSplit> findSplits(StreamInputFormat<T> inputFormat,
144
StreamId streamId,
145
long startTime,
146
long endTime) throws IOException;
147
148
public List<StreamInputSplit> findSplits(StreamInputFormat<T> inputFormat,
149
StreamId streamId,
150
TimeRange timeRange,
151
int maxSplits) throws IOException;
152
}
153
154
// Stream service manager for lifecycle coordination
155
public class StreamServiceManager implements Service {
156
// Service lifecycle management for stream processing
157
@Override
158
protected void doStart();
159
160
@Override
161
protected void doStop();
162
163
// Stream service coordination
164
public void registerStreamService(StreamId streamId, StreamService service);
165
public void unregisterStreamService(StreamId streamId);
166
public StreamService getStreamService(StreamId streamId);
167
}
168
```
169
170
### Stream Event Decoding
171
172
Multiple decoder implementations for various data formats and encoding schemes.
173
174
```java { .api }
175
// Base stream event decoder interface
176
public interface StreamEventDecoder<T> {
177
T decode(StreamEvent event, Charset charset) throws Exception;
178
DecodeResult<T> decode(StreamEvent event, Charset charset, DecodeCallback<T> callback) throws Exception;
179
}
180
181
// String-based stream event decoder
182
public class StringStreamEventDecoder implements StreamEventDecoder<String> {
183
@Override
184
public String decode(StreamEvent event, Charset charset) throws Exception;
185
186
// Decode with custom processing
187
@Override
188
public DecodeResult<String> decode(StreamEvent event, Charset charset,
189
DecodeCallback<String> callback) throws Exception;
190
}
191
192
// Text stream event decoder with line-based processing
193
public class TextStreamEventDecoder implements StreamEventDecoder<String> {
194
// Line-by-line text processing with configurable delimiters
195
public void setLineDelimiter(String delimiter);
196
public void setMaxLineLength(int maxLength);
197
}
198
199
// Binary stream event decoder
200
public class BytesStreamEventDecoder implements StreamEventDecoder<byte[]> {
201
// Raw binary data processing
202
@Override
203
public byte[] decode(StreamEvent event, Charset charset) throws Exception;
204
205
// Chunk-based binary processing
206
public DecodeResult<byte[]> decodeChunked(StreamEvent event, int chunkSize) throws Exception;
207
}
208
209
// Identity decoder for pass-through processing
210
public class IdentityStreamEventDecoder implements StreamEventDecoder<StreamEvent> {
211
// No transformation - returns original stream event
212
@Override
213
public StreamEvent decode(StreamEvent event, Charset charset) throws Exception;
214
}
215
216
// Format-based decoder for structured data
217
public class FormatStreamEventDecoder<T> implements StreamEventDecoder<T> {
218
// Configurable format-based decoding (JSON, Avro, etc.)
219
public FormatStreamEventDecoder(RecordFormat<T> format);
220
public FormatStreamEventDecoder(RecordFormat<T> format, Schema schema);
221
222
@Override
223
public T decode(StreamEvent event, Charset charset) throws Exception;
224
}
225
```
226
227
### Stream Consumer Interface
228
229
Transaction-aware stream consumer with positioning and batch processing capabilities.
230
231
```java { .api }
232
public interface StreamConsumer extends Closeable, TransactionAware {
233
// Basic consumption operations
234
DequeInputDatum poll(long timeout, TimeUnit unit) throws InterruptedException;
235
void consume(int maxEvents, StreamConsumerCallback callback) throws InterruptedException;
236
237
// Consumer positioning
238
void seek(StreamEventOffset offset);
239
StreamEventOffset getPosition();
240
241
// Batch consumption with configuration
242
ConsumeBatch consume(ConsumeConfig config) throws InterruptedException;
243
244
// Consumer state management
245
ConsumerState getConsumerState();
246
void setConsumerState(ConsumerState state);
247
}
248
249
// Stream consumer factory
250
public interface StreamConsumerFactory {
251
StreamConsumer create(StreamId streamId, String namespace, ConsumerConfig config);
252
StreamConsumer create(StreamId streamId, String namespace, ConsumerConfig config,
253
StreamConsumerState startState);
254
255
// Consumer group management
256
StreamConsumer createGroupConsumer(StreamId streamId, String namespace,
257
String groupId, ConsumerConfig config);
258
}
259
```
260
261
## Usage Examples
262
263
### Basic Stream Administration
264
265
```java
266
// Access stream admin (typically injected)
267
StreamAdmin streamAdmin = // ... obtain instance
268
269
// Create a basic stream
270
StreamId streamId = NamespaceId.DEFAULT.stream("userEvents");
271
try {
272
streamAdmin.create(streamId);
273
System.out.println("Created stream: " + streamId.getStream());
274
} catch (Exception e) {
275
System.err.println("Failed to create stream: " + e.getMessage());
276
}
277
278
// Create stream with custom properties
279
Map<String, String> properties = Map.of(
280
"format.name", "avro",
281
"schema.literal", "{\"type\":\"record\",\"name\":\"Event\",\"fields\":[...]}",
282
"retention.seconds", "604800", // 7 days
283
"partition.duration", "3600" // 1 hour partitions
284
);
285
286
StreamId configuredStream = NamespaceId.DEFAULT.stream("configuredEvents");
287
try {
288
streamAdmin.create(configuredStream, properties);
289
System.out.println("Created configured stream: " + configuredStream.getStream());
290
} catch (Exception e) {
291
System.err.println("Failed to create configured stream: " + e.getMessage());
292
}
293
294
// Check if stream exists
295
boolean exists = streamAdmin.exists(streamId);
296
System.out.println("Stream exists: " + exists);
297
298
// Get stream configuration
299
try {
300
StreamProperties config = streamAdmin.getConfig(streamId);
301
System.out.println("Stream format: " + config.getFormat());
302
System.out.println("Stream TTL: " + config.getTTL());
303
} catch (Exception e) {
304
System.err.println("Failed to get stream config: " + e.getMessage());
305
}
306
```
307
308
### Stream Configuration Management
309
310
```java
311
// Update stream properties
312
StreamProperties updatedProperties = StreamProperties.builder()
313
.setTTL(1209600) // 14 days
314
.setFormat(new FormatSpecification("json", null))
315
.setNotificationThresholdMB(100)
316
.build();
317
318
try {
319
streamAdmin.updateConfig(streamId, updatedProperties);
320
System.out.println("Updated stream configuration");
321
} catch (Exception e) {
322
System.err.println("Failed to update stream config: " + e.getMessage());
323
}
324
325
// List all streams in namespace
326
try {
327
List<StreamSpecification> streams = streamAdmin.listStreams(NamespaceId.DEFAULT);
328
System.out.println("Streams in default namespace:");
329
for (StreamSpecification stream : streams) {
330
System.out.println(" - " + stream.getName());
331
System.out.println(" Format: " + stream.getFormat());
332
System.out.println(" TTL: " + stream.getTTL() + " seconds");
333
}
334
} catch (Exception e) {
335
System.err.println("Failed to list streams: " + e.getMessage());
336
}
337
338
// Get stream statistics
339
try {
340
StreamStats stats = streamAdmin.getStats(streamId);
341
System.out.println("Stream statistics:");
342
System.out.println(" Events: " + stats.getEvents());
343
System.out.println(" Size: " + stats.getBytes() + " bytes");
344
System.out.println(" Ingested in last hour: " + stats.getRecentEvents());
345
} catch (Exception e) {
346
System.err.println("Failed to get stream stats: " + e.getMessage());
347
}
348
```
349
350
### Stream Event Decoding
351
352
```java
353
// String decoder for text-based events
354
StringStreamEventDecoder stringDecoder = new StringStreamEventDecoder();
355
356
// Process stream events as strings
357
public void processTextEvents(StreamConsumer consumer) {
358
try {
359
DequeInputDatum event = consumer.poll(5, TimeUnit.SECONDS);
360
if (event != null) {
361
StreamEvent streamEvent = convertToStreamEvent(event);
362
String decodedText = stringDecoder.decode(streamEvent, StandardCharsets.UTF_8);
363
364
System.out.println("Decoded event: " + decodedText);
365
processTextEvent(decodedText);
366
}
367
} catch (Exception e) {
368
System.err.println("Failed to decode stream event: " + e.getMessage());
369
}
370
}
371
372
// Binary decoder for raw data
373
BytesStreamEventDecoder binaryDecoder = new BytesStreamEventDecoder();
374
375
public void processBinaryEvents(StreamConsumer consumer) {
376
try {
377
DequeInputDatum event = consumer.poll(5, TimeUnit.SECONDS);
378
if (event != null) {
379
StreamEvent streamEvent = convertToStreamEvent(event);
380
byte[] binaryData = binaryDecoder.decode(streamEvent, null);
381
382
System.out.println("Decoded binary data: " + binaryData.length + " bytes");
383
processBinaryData(binaryData);
384
}
385
} catch (Exception e) {
386
System.err.println("Failed to decode binary event: " + e.getMessage());
387
}
388
}
389
390
// Format-based decoder for structured data
391
Schema avroSchema = // ... load Avro schema
392
RecordFormat<GenericRecord> avroFormat = new AvroRecordFormat<>(avroSchema);
393
FormatStreamEventDecoder<GenericRecord> avroDecoder =
394
new FormatStreamEventDecoder<>(avroFormat, avroSchema);
395
396
public void processAvroEvents(StreamConsumer consumer) {
397
try {
398
DequeInputDatum event = consumer.poll(5, TimeUnit.SECONDS);
399
if (event != null) {
400
StreamEvent streamEvent = convertToStreamEvent(event);
401
GenericRecord record = avroDecoder.decode(streamEvent, StandardCharsets.UTF_8);
402
403
System.out.println("Decoded Avro record: " + record);
404
processAvroRecord(record);
405
}
406
} catch (Exception e) {
407
System.err.println("Failed to decode Avro event: " + e.getMessage());
408
}
409
}
410
```
411
412
### Stream Consumer Usage
413
414
```java
415
// Create stream consumer
416
StreamConsumerFactory consumerFactory = // ... obtain factory
417
ConsumerConfig config = ConsumerConfig.builder()
418
.setDequeueTimeout(5000) // 5 seconds
419
.setMaxDequeueSize(100) // Max 100 events per batch
420
.build();
421
422
StreamConsumer consumer = consumerFactory.create(streamId, "default", config);
423
424
// Basic event consumption
425
try {
426
DequeInputDatum event = consumer.poll(10, TimeUnit.SECONDS);
427
if (event != null) {
428
System.out.println("Received event: " + new String(event.getData()));
429
System.out.println("Event timestamp: " + event.getTimestamp());
430
System.out.println("Event headers: " + event.getHeaders());
431
} else {
432
System.out.println("No events available");
433
}
434
} catch (InterruptedException e) {
435
System.out.println("Consumer polling interrupted");
436
}
437
438
// Batch event consumption with callback
439
try {
440
consumer.consume(50, new StreamConsumerCallback() {
441
@Override
442
public void onEvent(DequeInputDatum event, DequeInputDatum eventMetadata) throws Exception {
443
String eventData = new String(event.getData());
444
System.out.println("Processing event: " + eventData);
445
446
// Process the event
447
processEvent(eventData);
448
}
449
450
@Override
451
public void onFinish() throws Exception {
452
System.out.println("Batch processing completed");
453
}
454
455
@Override
456
public void onError(Exception error) throws Exception {
457
System.err.println("Error processing batch: " + error.getMessage());
458
throw error;
459
}
460
});
461
} catch (InterruptedException e) {
462
System.out.println("Consumer batch processing interrupted");
463
}
464
465
// Consumer positioning
466
StreamEventOffset currentPosition = consumer.getPosition();
467
System.out.println("Current position: " + currentPosition);
468
469
// Seek to specific offset
470
StreamEventOffset seekOffset = new StreamEventOffset(0, 1000);
471
consumer.seek(seekOffset);
472
System.out.println("Seeked to offset: " + seekOffset);
473
```
474
475
### Advanced Stream Processing Patterns
476
477
```java
478
// Time-partitioned stream processing
479
public class TimePartitionedStreamProcessor {
480
private final StreamAdmin streamAdmin;
481
private final StreamConsumerFactory consumerFactory;
482
private final TimePartitionedStreamFileWriter fileWriter;
483
484
public TimePartitionedStreamProcessor(StreamAdmin streamAdmin,
485
StreamConsumerFactory consumerFactory,
486
TimePartitionedStreamFileWriter fileWriter) {
487
this.streamAdmin = streamAdmin;
488
this.consumerFactory = consumerFactory;
489
this.fileWriter = fileWriter;
490
}
491
492
public void processTimePartitions(StreamId streamId, long startTime, long endTime) {
493
try {
494
// Create consumer for time range
495
StreamConsumer consumer = consumerFactory.create(streamId, "default",
496
ConsumerConfig.builder().build());
497
498
// Seek to start time
499
StreamEventOffset startOffset = findOffsetForTime(streamId, startTime);
500
consumer.seek(startOffset);
501
502
long currentTime = startTime;
503
long partitionInterval = 3600000; // 1 hour partitions
504
505
while (currentTime < endTime) {
506
long partitionEnd = Math.min(currentTime + partitionInterval, endTime);
507
508
System.out.println("Processing partition: " +
509
new Date(currentTime) + " to " + new Date(partitionEnd));
510
511
processTimePartition(consumer, currentTime, partitionEnd);
512
513
currentTime = partitionEnd;
514
}
515
516
} catch (Exception e) {
517
System.err.println("Failed to process time partitions: " + e.getMessage());
518
}
519
}
520
521
private void processTimePartition(StreamConsumer consumer, long startTime, long endTime)
522
throws Exception {
523
// Process events in time partition
524
consumer.consume(1000, new StreamConsumerCallback() {
525
@Override
526
public void onEvent(DequeInputDatum event, DequeInputDatum eventMetadata) throws Exception {
527
if (event.getTimestamp() >= startTime && event.getTimestamp() < endTime) {
528
// Write event to time-partitioned file
529
StreamEvent streamEvent = convertToStreamEvent(event);
530
fileWriter.append(streamEvent);
531
}
532
}
533
534
@Override
535
public void onFinish() throws Exception {
536
fileWriter.flush();
537
System.out.println("Partition processing completed");
538
}
539
540
@Override
541
public void onError(Exception error) throws Exception {
542
System.err.println("Error in partition processing: " + error.getMessage());
543
throw error;
544
}
545
});
546
}
547
}
548
549
// Multi-consumer group processing
550
public class MultiConsumerGroupProcessor {
551
private final StreamConsumerFactory consumerFactory;
552
private final ExecutorService executorService;
553
554
public MultiConsumerGroupProcessor(StreamConsumerFactory consumerFactory) {
555
this.consumerFactory = consumerFactory;
556
this.executorService = Executors.newFixedThreadPool(4);
557
}
558
559
public void processWithMultipleGroups(StreamId streamId) {
560
List<String> consumerGroups = Arrays.asList("analytics", "monitoring", "alerts", "archive");
561
562
for (String groupId : consumerGroups) {
563
executorService.submit(() -> {
564
try {
565
StreamConsumer consumer = consumerFactory.createGroupConsumer(
566
streamId, "default", groupId, ConsumerConfig.builder().build());
567
568
processStreamWithGroup(consumer, groupId);
569
570
} catch (Exception e) {
571
System.err.println("Error in consumer group " + groupId + ": " + e.getMessage());
572
}
573
});
574
}
575
}
576
577
private void processStreamWithGroup(StreamConsumer consumer, String groupId) {
578
System.out.println("Starting processing for group: " + groupId);
579
580
try {
581
while (!Thread.currentThread().isInterrupted()) {
582
DequeInputDatum event = consumer.poll(5, TimeUnit.SECONDS);
583
if (event != null) {
584
processEventForGroup(event, groupId);
585
}
586
}
587
} catch (InterruptedException e) {
588
System.out.println("Consumer group " + groupId + " interrupted");
589
} catch (Exception e) {
590
System.err.println("Error in consumer group " + groupId + ": " + e.getMessage());
591
}
592
}
593
594
private void processEventForGroup(DequeInputDatum event, String groupId) {
595
// Group-specific event processing
596
switch (groupId) {
597
case "analytics":
598
performAnalytics(event);
599
break;
600
case "monitoring":
601
updateMonitoringMetrics(event);
602
break;
603
case "alerts":
604
checkForAlerts(event);
605
break;
606
case "archive":
607
archiveEvent(event);
608
break;
609
}
610
}
611
}
612
```
613
614
### Stream Coordination and Management
615
616
```java
617
// Stream coordination client usage
618
public class StreamCoordinationManager {
619
private final StreamCoordinatorClient coordinatorClient;
620
621
public StreamCoordinationManager(StreamCoordinatorClient coordinatorClient) {
622
this.coordinatorClient = coordinatorClient;
623
}
624
625
public void setupDistributedStream(StreamId streamId, int partitionCount) {
626
try {
627
// Create stream with coordination
628
Map<String, String> properties = Map.of(
629
"partition.count", String.valueOf(partitionCount),
630
"replication.factor", "3"
631
);
632
633
coordinatorClient.createStream(streamId, properties);
634
System.out.println("Created distributed stream: " + streamId);
635
636
// Set up consumer groups
637
for (int groupId = 1; groupId <= 3; groupId++) {
638
coordinatorClient.addConsumerGroup(streamId, groupId);
639
System.out.println("Added consumer group: " + groupId);
640
}
641
642
// Verify partition count
643
int actualPartitions = coordinatorClient.getPartitionCount(streamId);
644
System.out.println("Stream partition count: " + actualPartitions);
645
646
} catch (Exception e) {
647
System.err.println("Failed to setup distributed stream: " + e.getMessage());
648
}
649
}
650
651
public void scaleStream(StreamId streamId, int newPartitionCount) {
652
try {
653
int currentPartitions = coordinatorClient.getPartitionCount(streamId);
654
655
if (newPartitionCount != currentPartitions) {
656
coordinatorClient.setPartitionCount(streamId, newPartitionCount);
657
System.out.println("Scaled stream from " + currentPartitions +
658
" to " + newPartitionCount + " partitions");
659
}
660
661
} catch (Exception e) {
662
System.err.println("Failed to scale stream: " + e.getMessage());
663
}
664
}
665
666
public void cleanupStream(StreamId streamId) {
667
try {
668
// Remove all consumer groups
669
for (int groupId = 1; groupId <= 3; groupId++) {
670
coordinatorClient.removeConsumerGroup(streamId, groupId);
671
}
672
673
// Delete the stream
674
coordinatorClient.deleteStream(streamId);
675
System.out.println("Cleaned up stream: " + streamId);
676
677
} catch (Exception e) {
678
System.err.println("Failed to cleanup stream: " + e.getMessage());
679
}
680
}
681
}
682
```
683
684
## Types
685
686
```java { .api }
687
// Core stream types
688
public final class StreamId extends EntityId {
689
public static StreamId of(String namespace, String stream);
690
public String getStream();
691
public NamespaceId getParent();
692
}
693
694
// Stream properties and configuration
695
public final class StreamProperties {
696
public static Builder builder();
697
698
public long getTTL();
699
public FormatSpecification getFormat();
700
public int getNotificationThresholdMB();
701
public Map<String, String> getProperties();
702
703
public static class Builder {
704
public Builder setTTL(long ttl);
705
public Builder setFormat(FormatSpecification format);
706
public Builder setNotificationThresholdMB(int threshold);
707
public Builder setProperties(Map<String, String> properties);
708
public StreamProperties build();
709
}
710
}
711
712
// Stream specification and metadata
713
public final class StreamSpecification {
714
public String getName();
715
public FormatSpecification getFormat();
716
public long getTTL();
717
public Map<String, String> getProperties();
718
}
719
720
// Stream statistics
721
public final class StreamStats {
722
public long getEvents();
723
public long getBytes();
724
public long getRecentEvents();
725
public long getLastEventTime();
726
}
727
728
// Stream event structures
729
public interface StreamEvent {
730
ByteBuffer getBody();
731
Map<String, String> getHeaders();
732
long getTimestamp();
733
}
734
735
public final class StreamEventOffset {
736
public long getGeneration();
737
public long getOffset();
738
739
public StreamEventOffset(long generation, long offset);
740
}
741
742
// Consumer configuration and state
743
public final class ConsumerConfig {
744
public static Builder builder();
745
746
public int getDequeueTimeout();
747
public int getMaxDequeueSize();
748
public String getInstanceId();
749
750
public static class Builder {
751
public Builder setDequeueTimeout(int timeout);
752
public Builder setMaxDequeueSize(int size);
753
public Builder setInstanceId(String instanceId);
754
public ConsumerConfig build();
755
}
756
}
757
758
public interface ConsumerState {
759
StreamEventOffset getOffset();
760
Map<String, String> getState();
761
long getTimestamp();
762
}
763
764
// Stream input processing types
765
public interface StreamInputSplit {
766
StreamId getStreamId();
767
long getStartTime();
768
long getEndTime();
769
int getPartition();
770
}
771
772
public final class TimeRange {
773
public long getStartTime();
774
public long getEndTime();
775
776
public TimeRange(long startTime, long endTime);
777
}
778
779
// Decoder types
780
public interface DecodeCallback<T> {
781
void decoded(T decodedObject);
782
void onError(Exception error);
783
}
784
785
public final class DecodeResult<T> {
786
public T getResult();
787
public boolean hasError();
788
public Exception getError();
789
}
790
791
// Format specifications
792
public final class FormatSpecification {
793
public String getName();
794
public Schema getSchema();
795
public Map<String, String> getSettings();
796
797
public FormatSpecification(String name, Schema schema);
798
public FormatSpecification(String name, Schema schema, Map<String, String> settings);
799
}
800
801
// Exception types
802
public class StreamException extends Exception {
803
public StreamException(String message);
804
public StreamException(String message, Throwable cause);
805
}
806
807
public class StreamNotFoundException extends StreamException {
808
public StreamNotFoundException(StreamId streamId);
809
}
810
811
public class StreamAlreadyExistsException extends StreamException {
812
public StreamAlreadyExistsException(StreamId streamId);
813
}
814
```