or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

audit-compliance.mddataset-management.mdindex.mdmetadata-management.mdnamespace-management.mdstream-processing.mdtransaction-management.mdusage-registry.md

stream-processing.mddocs/

0

# Stream Processing

1

2

Real-time stream processing capabilities with coordination, file management, partitioning, and multiple decoder support for various data formats. The stream processing system provides essential functionality for handling high-throughput real-time data ingestion, processing, and consumption within the CDAP platform.

3

4

## Capabilities

5

6

### Stream Administration

7

8

The primary interface for stream lifecycle management and configuration with comprehensive administrative operations.

9

10

```java { .api }

11

public interface StreamAdmin {

12

// Stream lifecycle operations

13

void create(StreamId streamId) throws Exception;

14

void create(StreamId streamId, Map<String, String> properties) throws Exception;

15

void drop(StreamId streamId) throws Exception;

16

void truncate(StreamId streamId) throws Exception;

17

18

// Stream configuration management

19

void updateConfig(StreamId streamId, StreamProperties properties) throws Exception;

20

StreamProperties getConfig(StreamId streamId) throws Exception;

21

22

// Stream metadata and queries

23

StreamSpecification getSpecification(StreamId streamId) throws Exception;

24

List<StreamSpecification> listStreams(NamespaceId namespaceId) throws Exception;

25

boolean exists(StreamId streamId) throws Exception;

26

27

// Administrative operations

28

void upgrade() throws Exception;

29

30

// Stream statistics and monitoring

31

StreamStats getStats(StreamId streamId) throws Exception;

32

long getSize(StreamId streamId) throws Exception;

33

}

34

```

35

36

### Stream View Management

37

38

Stream view store interface for managing logical views over streams with comprehensive CRUD operations and view listing capabilities.

39

40

```java { .api }

41

public interface ViewStore {

42

/**

43

* Creates a view or updates if it already exists.

44

* @param viewId the view identifier

45

* @param config the view configuration specification

46

* @return true if a new view was created, false if updated

47

*/

48

boolean createOrUpdate(StreamViewId viewId, ViewSpecification config);

49

50

/**

51

* Checks if a view exists.

52

* @param viewId the view identifier

53

* @return true if the view exists

54

*/

55

boolean exists(StreamViewId viewId);

56

57

/**

58

* Deletes a stream view.

59

* @param viewId the view identifier to delete

60

*/

61

void delete(StreamViewId viewId) throws NotFoundException;

62

63

/**

64

* Lists all views for a given stream.

65

* @param streamId the stream identifier

66

* @return list of view identifiers for the stream

67

*/

68

List<StreamViewId> list(StreamId streamId);

69

70

/**

71

* Gets the details of a specific view.

72

* @param viewId the view identifier

73

* @return the view details including configuration and metadata

74

*/

75

ViewDetail get(StreamViewId viewId) throws NotFoundException;

76

}

77

```

78

79

### Stream Coordination

80

81

Stream coordination client for managing distributed stream processing with service lifecycle management.

82

83

```java { .api }

84

public interface StreamCoordinatorClient extends Service {

85

// Stream coordination operations

86

void createStream(StreamId streamId, Map<String, String> properties) throws Exception;

87

void deleteStream(StreamId streamId) throws Exception;

88

89

// Stream configuration coordination

90

void updateStreamProperties(StreamId streamId, StreamProperties properties) throws Exception;

91

StreamProperties getStreamProperties(StreamId streamId) throws Exception;

92

93

// Consumer coordination

94

void addConsumerGroup(StreamId streamId, long groupId) throws Exception;

95

void removeConsumerGroup(StreamId streamId, long groupId) throws Exception;

96

97

// Partition management

98

int getPartitionCount(StreamId streamId) throws Exception;

99

void setPartitionCount(StreamId streamId, int partitions) throws Exception;

100

}

101

```

102

103

### Stream File Management

104

105

Stream file writers and management for persistent stream storage with partitioning and format support.

106

107

```java { .api }

108

// Stream file writer factory

109

public interface StreamFileWriterFactory {

110

StreamFileWriter create(StreamId streamId, int partition);

111

StreamFileWriter create(StreamId streamId, int partition, StreamFileType fileType);

112

}

113

114

// Time-partitioned stream file writer for temporal data organization

115

public class TimePartitionedStreamFileWriter implements StreamFileWriter {

116

// Time-based partitioning with configurable partition intervals

117

public void append(StreamEvent event) throws IOException;

118

public void flush() throws IOException;

119

public void close() throws IOException;

120

121

// Partition management

122

public void rotatePartition() throws IOException;

123

public String getCurrentPartitionPath();

124

public long getCurrentPartitionTimestamp();

125

}

126

127

// Stream file type enumeration

128

public enum StreamFileType {

129

EVENT, // Standard event files

130

INDEX, // Index files for efficient seeking

131

META, // Metadata files

132

TEMP // Temporary files during processing

133

}

134

```

135

136

### Stream Input Processing

137

138

Stream input processing components for handling various data sources and input formats.

139

140

```java { .api }

141

// Stream input split finder for distributed processing

142

public class StreamInputSplitFinder<T> {

143

public List<StreamInputSplit> findSplits(StreamInputFormat<T> inputFormat,

144

StreamId streamId,

145

long startTime,

146

long endTime) throws IOException;

147

148

public List<StreamInputSplit> findSplits(StreamInputFormat<T> inputFormat,

149

StreamId streamId,

150

TimeRange timeRange,

151

int maxSplits) throws IOException;

152

}

153

154

// Stream service manager for lifecycle coordination

155

public class StreamServiceManager implements Service {

156

// Service lifecycle management for stream processing

157

@Override

158

protected void doStart();

159

160

@Override

161

protected void doStop();

162

163

// Stream service coordination

164

public void registerStreamService(StreamId streamId, StreamService service);

165

public void unregisterStreamService(StreamId streamId);

166

public StreamService getStreamService(StreamId streamId);

167

}

168

```

169

170

### Stream Event Decoding

171

172

Multiple decoder implementations for various data formats and encoding schemes.

173

174

```java { .api }

175

// Base stream event decoder interface

176

public interface StreamEventDecoder<T> {

177

T decode(StreamEvent event, Charset charset) throws Exception;

178

DecodeResult<T> decode(StreamEvent event, Charset charset, DecodeCallback<T> callback) throws Exception;

179

}

180

181

// String-based stream event decoder

182

public class StringStreamEventDecoder implements StreamEventDecoder<String> {

183

@Override

184

public String decode(StreamEvent event, Charset charset) throws Exception;

185

186

// Decode with custom processing

187

@Override

188

public DecodeResult<String> decode(StreamEvent event, Charset charset,

189

DecodeCallback<String> callback) throws Exception;

190

}

191

192

// Text stream event decoder with line-based processing

193

public class TextStreamEventDecoder implements StreamEventDecoder<String> {

194

// Line-by-line text processing with configurable delimiters

195

public void setLineDelimiter(String delimiter);

196

public void setMaxLineLength(int maxLength);

197

}

198

199

// Binary stream event decoder

200

public class BytesStreamEventDecoder implements StreamEventDecoder<byte[]> {

201

// Raw binary data processing

202

@Override

203

public byte[] decode(StreamEvent event, Charset charset) throws Exception;

204

205

// Chunk-based binary processing

206

public DecodeResult<byte[]> decodeChunked(StreamEvent event, int chunkSize) throws Exception;

207

}

208

209

// Identity decoder for pass-through processing

210

public class IdentityStreamEventDecoder implements StreamEventDecoder<StreamEvent> {

211

// No transformation - returns original stream event

212

@Override

213

public StreamEvent decode(StreamEvent event, Charset charset) throws Exception;

214

}

215

216

// Format-based decoder for structured data

217

public class FormatStreamEventDecoder<T> implements StreamEventDecoder<T> {

218

// Configurable format-based decoding (JSON, Avro, etc.)

219

public FormatStreamEventDecoder(RecordFormat<T> format);

220

public FormatStreamEventDecoder(RecordFormat<T> format, Schema schema);

221

222

@Override

223

public T decode(StreamEvent event, Charset charset) throws Exception;

224

}

225

```

226

227

### Stream Consumer Interface

228

229

Transaction-aware stream consumer with positioning and batch processing capabilities.

230

231

```java { .api }

232

public interface StreamConsumer extends Closeable, TransactionAware {

233

// Basic consumption operations

234

DequeInputDatum poll(long timeout, TimeUnit unit) throws InterruptedException;

235

void consume(int maxEvents, StreamConsumerCallback callback) throws InterruptedException;

236

237

// Consumer positioning

238

void seek(StreamEventOffset offset);

239

StreamEventOffset getPosition();

240

241

// Batch consumption with configuration

242

ConsumeBatch consume(ConsumeConfig config) throws InterruptedException;

243

244

// Consumer state management

245

ConsumerState getConsumerState();

246

void setConsumerState(ConsumerState state);

247

}

248

249

// Stream consumer factory

250

public interface StreamConsumerFactory {

251

StreamConsumer create(StreamId streamId, String namespace, ConsumerConfig config);

252

StreamConsumer create(StreamId streamId, String namespace, ConsumerConfig config,

253

StreamConsumerState startState);

254

255

// Consumer group management

256

StreamConsumer createGroupConsumer(StreamId streamId, String namespace,

257

String groupId, ConsumerConfig config);

258

}

259

```

260

261

## Usage Examples

262

263

### Basic Stream Administration

264

265

```java

266

// Access stream admin (typically injected)

267

StreamAdmin streamAdmin = // ... obtain instance

268

269

// Create a basic stream

270

StreamId streamId = NamespaceId.DEFAULT.stream("userEvents");

271

try {

272

streamAdmin.create(streamId);

273

System.out.println("Created stream: " + streamId.getStream());

274

} catch (Exception e) {

275

System.err.println("Failed to create stream: " + e.getMessage());

276

}

277

278

// Create stream with custom properties

279

Map<String, String> properties = Map.of(

280

"format.name", "avro",

281

"schema.literal", "{\"type\":\"record\",\"name\":\"Event\",\"fields\":[...]}",

282

"retention.seconds", "604800", // 7 days

283

"partition.duration", "3600" // 1 hour partitions

284

);

285

286

StreamId configuredStream = NamespaceId.DEFAULT.stream("configuredEvents");

287

try {

288

streamAdmin.create(configuredStream, properties);

289

System.out.println("Created configured stream: " + configuredStream.getStream());

290

} catch (Exception e) {

291

System.err.println("Failed to create configured stream: " + e.getMessage());

292

}

293

294

// Check if stream exists

295

boolean exists = streamAdmin.exists(streamId);

296

System.out.println("Stream exists: " + exists);

297

298

// Get stream configuration

299

try {

300

StreamProperties config = streamAdmin.getConfig(streamId);

301

System.out.println("Stream format: " + config.getFormat());

302

System.out.println("Stream TTL: " + config.getTTL());

303

} catch (Exception e) {

304

System.err.println("Failed to get stream config: " + e.getMessage());

305

}

306

```

307

308

### Stream Configuration Management

309

310

```java

311

// Update stream properties

312

StreamProperties updatedProperties = StreamProperties.builder()

313

.setTTL(1209600) // 14 days

314

.setFormat(new FormatSpecification("json", null))

315

.setNotificationThresholdMB(100)

316

.build();

317

318

try {

319

streamAdmin.updateConfig(streamId, updatedProperties);

320

System.out.println("Updated stream configuration");

321

} catch (Exception e) {

322

System.err.println("Failed to update stream config: " + e.getMessage());

323

}

324

325

// List all streams in namespace

326

try {

327

List<StreamSpecification> streams = streamAdmin.listStreams(NamespaceId.DEFAULT);

328

System.out.println("Streams in default namespace:");

329

for (StreamSpecification stream : streams) {

330

System.out.println(" - " + stream.getName());

331

System.out.println(" Format: " + stream.getFormat());

332

System.out.println(" TTL: " + stream.getTTL() + " seconds");

333

}

334

} catch (Exception e) {

335

System.err.println("Failed to list streams: " + e.getMessage());

336

}

337

338

// Get stream statistics

339

try {

340

StreamStats stats = streamAdmin.getStats(streamId);

341

System.out.println("Stream statistics:");

342

System.out.println(" Events: " + stats.getEvents());

343

System.out.println(" Size: " + stats.getBytes() + " bytes");

344

System.out.println(" Ingested in last hour: " + stats.getRecentEvents());

345

} catch (Exception e) {

346

System.err.println("Failed to get stream stats: " + e.getMessage());

347

}

348

```

349

350

### Stream Event Decoding

351

352

```java

353

// String decoder for text-based events

354

StringStreamEventDecoder stringDecoder = new StringStreamEventDecoder();

355

356

// Process stream events as strings

357

public void processTextEvents(StreamConsumer consumer) {

358

try {

359

DequeInputDatum event = consumer.poll(5, TimeUnit.SECONDS);

360

if (event != null) {

361

StreamEvent streamEvent = convertToStreamEvent(event);

362

String decodedText = stringDecoder.decode(streamEvent, StandardCharsets.UTF_8);

363

364

System.out.println("Decoded event: " + decodedText);

365

processTextEvent(decodedText);

366

}

367

} catch (Exception e) {

368

System.err.println("Failed to decode stream event: " + e.getMessage());

369

}

370

}

371

372

// Binary decoder for raw data

373

BytesStreamEventDecoder binaryDecoder = new BytesStreamEventDecoder();

374

375

public void processBinaryEvents(StreamConsumer consumer) {

376

try {

377

DequeInputDatum event = consumer.poll(5, TimeUnit.SECONDS);

378

if (event != null) {

379

StreamEvent streamEvent = convertToStreamEvent(event);

380

byte[] binaryData = binaryDecoder.decode(streamEvent, null);

381

382

System.out.println("Decoded binary data: " + binaryData.length + " bytes");

383

processBinaryData(binaryData);

384

}

385

} catch (Exception e) {

386

System.err.println("Failed to decode binary event: " + e.getMessage());

387

}

388

}

389

390

// Format-based decoder for structured data

391

Schema avroSchema = // ... load Avro schema

392

RecordFormat<GenericRecord> avroFormat = new AvroRecordFormat<>(avroSchema);

393

FormatStreamEventDecoder<GenericRecord> avroDecoder =

394

new FormatStreamEventDecoder<>(avroFormat, avroSchema);

395

396

public void processAvroEvents(StreamConsumer consumer) {

397

try {

398

DequeInputDatum event = consumer.poll(5, TimeUnit.SECONDS);

399

if (event != null) {

400

StreamEvent streamEvent = convertToStreamEvent(event);

401

GenericRecord record = avroDecoder.decode(streamEvent, StandardCharsets.UTF_8);

402

403

System.out.println("Decoded Avro record: " + record);

404

processAvroRecord(record);

405

}

406

} catch (Exception e) {

407

System.err.println("Failed to decode Avro event: " + e.getMessage());

408

}

409

}

410

```

411

412

### Stream Consumer Usage

413

414

```java

415

// Create stream consumer

416

StreamConsumerFactory consumerFactory = // ... obtain factory

417

ConsumerConfig config = ConsumerConfig.builder()

418

.setDequeueTimeout(5000) // 5 seconds

419

.setMaxDequeueSize(100) // Max 100 events per batch

420

.build();

421

422

StreamConsumer consumer = consumerFactory.create(streamId, "default", config);

423

424

// Basic event consumption

425

try {

426

DequeInputDatum event = consumer.poll(10, TimeUnit.SECONDS);

427

if (event != null) {

428

System.out.println("Received event: " + new String(event.getData()));

429

System.out.println("Event timestamp: " + event.getTimestamp());

430

System.out.println("Event headers: " + event.getHeaders());

431

} else {

432

System.out.println("No events available");

433

}

434

} catch (InterruptedException e) {

435

System.out.println("Consumer polling interrupted");

436

}

437

438

// Batch event consumption with callback

439

try {

440

consumer.consume(50, new StreamConsumerCallback() {

441

@Override

442

public void onEvent(DequeInputDatum event, DequeInputDatum eventMetadata) throws Exception {

443

String eventData = new String(event.getData());

444

System.out.println("Processing event: " + eventData);

445

446

// Process the event

447

processEvent(eventData);

448

}

449

450

@Override

451

public void onFinish() throws Exception {

452

System.out.println("Batch processing completed");

453

}

454

455

@Override

456

public void onError(Exception error) throws Exception {

457

System.err.println("Error processing batch: " + error.getMessage());

458

throw error;

459

}

460

});

461

} catch (InterruptedException e) {

462

System.out.println("Consumer batch processing interrupted");

463

}

464

465

// Consumer positioning

466

StreamEventOffset currentPosition = consumer.getPosition();

467

System.out.println("Current position: " + currentPosition);

468

469

// Seek to specific offset

470

StreamEventOffset seekOffset = new StreamEventOffset(0, 1000);

471

consumer.seek(seekOffset);

472

System.out.println("Seeked to offset: " + seekOffset);

473

```

474

475

### Advanced Stream Processing Patterns

476

477

```java

478

// Time-partitioned stream processing

479

public class TimePartitionedStreamProcessor {

480

private final StreamAdmin streamAdmin;

481

private final StreamConsumerFactory consumerFactory;

482

private final TimePartitionedStreamFileWriter fileWriter;

483

484

public TimePartitionedStreamProcessor(StreamAdmin streamAdmin,

485

StreamConsumerFactory consumerFactory,

486

TimePartitionedStreamFileWriter fileWriter) {

487

this.streamAdmin = streamAdmin;

488

this.consumerFactory = consumerFactory;

489

this.fileWriter = fileWriter;

490

}

491

492

public void processTimePartitions(StreamId streamId, long startTime, long endTime) {

493

try {

494

// Create consumer for time range

495

StreamConsumer consumer = consumerFactory.create(streamId, "default",

496

ConsumerConfig.builder().build());

497

498

// Seek to start time

499

StreamEventOffset startOffset = findOffsetForTime(streamId, startTime);

500

consumer.seek(startOffset);

501

502

long currentTime = startTime;

503

long partitionInterval = 3600000; // 1 hour partitions

504

505

while (currentTime < endTime) {

506

long partitionEnd = Math.min(currentTime + partitionInterval, endTime);

507

508

System.out.println("Processing partition: " +

509

new Date(currentTime) + " to " + new Date(partitionEnd));

510

511

processTimePartition(consumer, currentTime, partitionEnd);

512

513

currentTime = partitionEnd;

514

}

515

516

} catch (Exception e) {

517

System.err.println("Failed to process time partitions: " + e.getMessage());

518

}

519

}

520

521

private void processTimePartition(StreamConsumer consumer, long startTime, long endTime)

522

throws Exception {

523

// Process events in time partition

524

consumer.consume(1000, new StreamConsumerCallback() {

525

@Override

526

public void onEvent(DequeInputDatum event, DequeInputDatum eventMetadata) throws Exception {

527

if (event.getTimestamp() >= startTime && event.getTimestamp() < endTime) {

528

// Write event to time-partitioned file

529

StreamEvent streamEvent = convertToStreamEvent(event);

530

fileWriter.append(streamEvent);

531

}

532

}

533

534

@Override

535

public void onFinish() throws Exception {

536

fileWriter.flush();

537

System.out.println("Partition processing completed");

538

}

539

540

@Override

541

public void onError(Exception error) throws Exception {

542

System.err.println("Error in partition processing: " + error.getMessage());

543

throw error;

544

}

545

});

546

}

547

}

548

549

// Multi-consumer group processing

550

public class MultiConsumerGroupProcessor {

551

private final StreamConsumerFactory consumerFactory;

552

private final ExecutorService executorService;

553

554

public MultiConsumerGroupProcessor(StreamConsumerFactory consumerFactory) {

555

this.consumerFactory = consumerFactory;

556

this.executorService = Executors.newFixedThreadPool(4);

557

}

558

559

public void processWithMultipleGroups(StreamId streamId) {

560

List<String> consumerGroups = Arrays.asList("analytics", "monitoring", "alerts", "archive");

561

562

for (String groupId : consumerGroups) {

563

executorService.submit(() -> {

564

try {

565

StreamConsumer consumer = consumerFactory.createGroupConsumer(

566

streamId, "default", groupId, ConsumerConfig.builder().build());

567

568

processStreamWithGroup(consumer, groupId);

569

570

} catch (Exception e) {

571

System.err.println("Error in consumer group " + groupId + ": " + e.getMessage());

572

}

573

});

574

}

575

}

576

577

private void processStreamWithGroup(StreamConsumer consumer, String groupId) {

578

System.out.println("Starting processing for group: " + groupId);

579

580

try {

581

while (!Thread.currentThread().isInterrupted()) {

582

DequeInputDatum event = consumer.poll(5, TimeUnit.SECONDS);

583

if (event != null) {

584

processEventForGroup(event, groupId);

585

}

586

}

587

} catch (InterruptedException e) {

588

System.out.println("Consumer group " + groupId + " interrupted");

589

} catch (Exception e) {

590

System.err.println("Error in consumer group " + groupId + ": " + e.getMessage());

591

}

592

}

593

594

private void processEventForGroup(DequeInputDatum event, String groupId) {

595

// Group-specific event processing

596

switch (groupId) {

597

case "analytics":

598

performAnalytics(event);

599

break;

600

case "monitoring":

601

updateMonitoringMetrics(event);

602

break;

603

case "alerts":

604

checkForAlerts(event);

605

break;

606

case "archive":

607

archiveEvent(event);

608

break;

609

}

610

}

611

}

612

```

613

614

### Stream Coordination and Management

615

616

```java

617

// Stream coordination client usage

618

public class StreamCoordinationManager {

619

private final StreamCoordinatorClient coordinatorClient;

620

621

public StreamCoordinationManager(StreamCoordinatorClient coordinatorClient) {

622

this.coordinatorClient = coordinatorClient;

623

}

624

625

public void setupDistributedStream(StreamId streamId, int partitionCount) {

626

try {

627

// Create stream with coordination

628

Map<String, String> properties = Map.of(

629

"partition.count", String.valueOf(partitionCount),

630

"replication.factor", "3"

631

);

632

633

coordinatorClient.createStream(streamId, properties);

634

System.out.println("Created distributed stream: " + streamId);

635

636

// Set up consumer groups

637

for (int groupId = 1; groupId <= 3; groupId++) {

638

coordinatorClient.addConsumerGroup(streamId, groupId);

639

System.out.println("Added consumer group: " + groupId);

640

}

641

642

// Verify partition count

643

int actualPartitions = coordinatorClient.getPartitionCount(streamId);

644

System.out.println("Stream partition count: " + actualPartitions);

645

646

} catch (Exception e) {

647

System.err.println("Failed to setup distributed stream: " + e.getMessage());

648

}

649

}

650

651

public void scaleStream(StreamId streamId, int newPartitionCount) {

652

try {

653

int currentPartitions = coordinatorClient.getPartitionCount(streamId);

654

655

if (newPartitionCount != currentPartitions) {

656

coordinatorClient.setPartitionCount(streamId, newPartitionCount);

657

System.out.println("Scaled stream from " + currentPartitions +

658

" to " + newPartitionCount + " partitions");

659

}

660

661

} catch (Exception e) {

662

System.err.println("Failed to scale stream: " + e.getMessage());

663

}

664

}

665

666

public void cleanupStream(StreamId streamId) {

667

try {

668

// Remove all consumer groups

669

for (int groupId = 1; groupId <= 3; groupId++) {

670

coordinatorClient.removeConsumerGroup(streamId, groupId);

671

}

672

673

// Delete the stream

674

coordinatorClient.deleteStream(streamId);

675

System.out.println("Cleaned up stream: " + streamId);

676

677

} catch (Exception e) {

678

System.err.println("Failed to cleanup stream: " + e.getMessage());

679

}

680

}

681

}

682

```

683

684

## Types

685

686

```java { .api }

687

// Core stream types

688

public final class StreamId extends EntityId {

689

public static StreamId of(String namespace, String stream);

690

public String getStream();

691

public NamespaceId getParent();

692

}

693

694

// Stream properties and configuration

695

public final class StreamProperties {

696

public static Builder builder();

697

698

public long getTTL();

699

public FormatSpecification getFormat();

700

public int getNotificationThresholdMB();

701

public Map<String, String> getProperties();

702

703

public static class Builder {

704

public Builder setTTL(long ttl);

705

public Builder setFormat(FormatSpecification format);

706

public Builder setNotificationThresholdMB(int threshold);

707

public Builder setProperties(Map<String, String> properties);

708

public StreamProperties build();

709

}

710

}

711

712

// Stream specification and metadata

713

public final class StreamSpecification {

714

public String getName();

715

public FormatSpecification getFormat();

716

public long getTTL();

717

public Map<String, String> getProperties();

718

}

719

720

// Stream statistics

721

public final class StreamStats {

722

public long getEvents();

723

public long getBytes();

724

public long getRecentEvents();

725

public long getLastEventTime();

726

}

727

728

// Stream event structures

729

public interface StreamEvent {

730

ByteBuffer getBody();

731

Map<String, String> getHeaders();

732

long getTimestamp();

733

}

734

735

public final class StreamEventOffset {

736

public long getGeneration();

737

public long getOffset();

738

739

public StreamEventOffset(long generation, long offset);

740

}

741

742

// Consumer configuration and state

743

public final class ConsumerConfig {

744

public static Builder builder();

745

746

public int getDequeueTimeout();

747

public int getMaxDequeueSize();

748

public String getInstanceId();

749

750

public static class Builder {

751

public Builder setDequeueTimeout(int timeout);

752

public Builder setMaxDequeueSize(int size);

753

public Builder setInstanceId(String instanceId);

754

public ConsumerConfig build();

755

}

756

}

757

758

public interface ConsumerState {

759

StreamEventOffset getOffset();

760

Map<String, String> getState();

761

long getTimestamp();

762

}

763

764

// Stream input processing types

765

public interface StreamInputSplit {

766

StreamId getStreamId();

767

long getStartTime();

768

long getEndTime();

769

int getPartition();

770

}

771

772

public final class TimeRange {

773

public long getStartTime();

774

public long getEndTime();

775

776

public TimeRange(long startTime, long endTime);

777

}

778

779

// Decoder types

780

public interface DecodeCallback<T> {

781

void decoded(T decodedObject);

782

void onError(Exception error);

783

}

784

785

public final class DecodeResult<T> {

786

public T getResult();

787

public boolean hasError();

788

public Exception getError();

789

}

790

791

// Format specifications

792

public final class FormatSpecification {

793

public String getName();

794

public Schema getSchema();

795

public Map<String, String> getSettings();

796

797

public FormatSpecification(String name, Schema schema);

798

public FormatSpecification(String name, Schema schema, Map<String, String> settings);

799

}

800

801

// Exception types

802

public class StreamException extends Exception {

803

public StreamException(String message);

804

public StreamException(String message, Throwable cause);

805

}

806

807

public class StreamNotFoundException extends StreamException {

808

public StreamNotFoundException(StreamId streamId);

809

}

810

811

public class StreamAlreadyExistsException extends StreamException {

812

public StreamAlreadyExistsException(StreamId streamId);

813

}

814

```