or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-sink.mdhybrid-source.mdindex.mdrate-limiting.mdsource-reader.mdtable-api.md

hybrid-source.mddocs/

0

# Hybrid Source System

1

2

The Hybrid Source System enables seamless switching between multiple underlying sources based on configured source chains. It supports both static pre-configured sources and dynamic source creation with position transfer between sources.

3

4

## Core Components

5

6

### HybridSource

7

8

The main class that coordinates multiple underlying sources.

9

10

```java { .api }

11

@PublicEvolving

12

public class HybridSource<T> implements Source<T, HybridSourceSplit, HybridSourceEnumeratorState> {

13

14

// Static builder methods

15

public static <T, EnumT extends SplitEnumerator> HybridSourceBuilder<T, EnumT> builder(

16

Source<T, ?, ?> firstSource)

17

18

// Source interface methods

19

public Boundedness getBoundedness()

20

public SourceReader<T, HybridSourceSplit> createReader(SourceReaderContext readerContext) throws Exception

21

public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> createEnumerator(

22

SplitEnumeratorContext<HybridSourceSplit> enumContext)

23

public SplitEnumerator<HybridSourceSplit, HybridSourceEnumeratorState> restoreEnumerator(

24

SplitEnumeratorContext<HybridSourceSplit> enumContext,

25

HybridSourceEnumeratorState checkpoint) throws Exception

26

public SimpleVersionedSerializer<HybridSourceSplit> getSplitSerializer()

27

public SimpleVersionedSerializer<HybridSourceEnumeratorState> getEnumeratorCheckpointSerializer()

28

}

29

```

30

31

### HybridSourceBuilder

32

33

Builder for constructing hybrid sources with multiple underlying sources.

34

35

```java { .api }

36

@PublicEvolving

37

public static class HybridSourceBuilder<T, EnumT extends SplitEnumerator> implements Serializable {

38

39

// Add pre-configured source

40

public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>

41

HybridSourceBuilder<T, ToEnumT> addSource(NextSourceT source)

42

43

// Add source with deferred instantiation

44

public <ToEnumT extends SplitEnumerator, NextSourceT extends Source<T, ?, ?>>

45

HybridSourceBuilder<T, ToEnumT> addSource(

46

SourceFactory<T, NextSourceT, ? super EnumT> sourceFactory,

47

Boundedness boundedness)

48

49

// Build the hybrid source

50

public HybridSource<T> build()

51

}

52

```

53

54

### SourceFactory

55

56

Factory interface for creating sources with dynamic configuration.

57

58

```java { .api }

59

@PublicEvolving

60

@FunctionalInterface

61

public interface SourceFactory<T, SourceT extends Source<T, ?, ?>, FromEnumT extends SplitEnumerator>

62

extends Serializable {

63

SourceT create(SourceSwitchContext<FromEnumT> context)

64

}

65

```

66

67

### SourceSwitchContext

68

69

Context provided to source factory for position transfer.

70

71

```java { .api }

72

@PublicEvolving

73

public interface SourceSwitchContext<EnumT> {

74

EnumT getPreviousEnumerator()

75

}

76

```

77

78

## Implementation Examples

79

80

### Simple Hybrid Source

81

82

```java

83

// Create a hybrid source that reads files first, then switches to Kafka

84

public class FileToKafkaHybridSource {

85

86

public static HybridSource<String> create(

87

Path filePath,

88

String kafkaBootstrapServers,

89

String kafkaTopic) {

90

91

// Create file source

92

FileSource<String> fileSource = FileSource

93

.forRecordStreamFormat(new TextLineInputFormat(), filePath)

94

.build();

95

96

// Create Kafka source

97

KafkaSource<String> kafkaSource = KafkaSource.<String>builder()

98

.setBootstrapServers(kafkaBootstrapServers)

99

.setTopics(kafkaTopic)

100

.setGroupId("hybrid-consumer")

101

.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))

102

.setStartingOffsets(OffsetsInitializer.earliest())

103

.build();

104

105

// Build hybrid source

106

return HybridSource.builder(fileSource)

107

.addSource(kafkaSource)

108

.build();

109

}

110

}

111

112

// Usage

113

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

114

115

HybridSource<String> hybridSource = FileToKafkaHybridSource.create(

116

Paths.get("/path/to/input/files"),

117

"localhost:9092",

118

"input-events"

119

);

120

121

DataStream<String> stream = env.fromSource(

122

hybridSource,

123

WatermarkStrategy.noWatermarks(),

124

"hybrid-file-kafka-source"

125

);

126

```

127

128

### Dynamic Position Transfer

129

130

```java

131

// Advanced hybrid source with position transfer from file timestamps to Kafka offsets

132

public class TimestampBasedHybridSource {

133

134

public static HybridSource<EventRecord> createWithTimestampTransfer(

135

Path filePath,

136

String kafkaBootstrapServers,

137

String kafkaTopic) {

138

139

// Create timestamped file source

140

TimestampedFileSource fileSource = new TimestampedFileSource(filePath);

141

142

// Build hybrid source with dynamic Kafka configuration

143

return HybridSource.<EventRecord, TimestampedFileEnumerator>builder(fileSource)

144

.addSource(

145

switchContext -> {

146

// Get the previous enumerator to extract end timestamp

147

TimestampedFileEnumerator fileEnumerator = switchContext.getPreviousEnumerator();

148

long endTimestamp = fileEnumerator.getMaxTimestamp();

149

150

LOG.info("Switching from file source to Kafka at timestamp: {}", endTimestamp);

151

152

// Create Kafka source starting from the file's end timestamp

153

return KafkaSource.<EventRecord>builder()

154

.setBootstrapServers(kafkaBootstrapServers)

155

.setTopics(kafkaTopic)

156

.setGroupId("hybrid-consumer-" + UUID.randomUUID())

157

.setDeserializer(new EventRecordDeserializer())

158

.setStartingOffsets(OffsetsInitializer.timestamp(endTimestamp))

159

.build();

160

},

161

Boundedness.CONTINUOUS_UNBOUNDED

162

)

163

.build();

164

}

165

}

166

167

// Custom file source that tracks timestamps

168

public class TimestampedFileSource implements Source<EventRecord, TimestampedFileSplit, TimestampedFileEnumeratorState> {

169

private final Path filePath;

170

171

public TimestampedFileSource(Path filePath) {

172

this.filePath = filePath;

173

}

174

175

@Override

176

public SplitEnumerator<TimestampedFileSplit, TimestampedFileEnumeratorState> createEnumerator(

177

SplitEnumeratorContext<TimestampedFileSplit> enumContext) {

178

return new TimestampedFileEnumerator(enumContext, filePath);

179

}

180

181

@Override

182

public SourceReader<EventRecord, TimestampedFileSplit> createReader(

183

SourceReaderContext readerContext) {

184

return new TimestampedFileReader(readerContext);

185

}

186

187

@Override

188

public Boundedness getBoundedness() {

189

return Boundedness.BOUNDED;

190

}

191

192

// ... other required methods

193

}

194

195

// Enumerator that tracks maximum timestamp seen

196

public class TimestampedFileEnumerator implements SplitEnumerator<TimestampedFileSplit, TimestampedFileEnumeratorState> {

197

private final SplitEnumeratorContext<TimestampedFileSplit> context;

198

private final Path filePath;

199

private long maxTimestamp = 0;

200

private boolean splitsAssigned = false;

201

202

public TimestampedFileEnumerator(SplitEnumeratorContext<TimestampedFileSplit> context, Path filePath) {

203

this.context = context;

204

this.filePath = filePath;

205

}

206

207

@Override

208

public void start() {

209

// Assign splits for file reading

210

if (!splitsAssigned) {

211

List<TimestampedFileSplit> splits = createFileSplits();

212

context.assignSplits(new SplitsAssignment<>(

213

Collections.singletonMap(0, splits) // Assign to reader 0

214

));

215

splitsAssigned = true;

216

}

217

}

218

219

@Override

220

public void handleSplitRequest(int subtaskId, String requesterHostname) {

221

// No more splits to assign

222

}

223

224

@Override

225

public void addSplitsBack(List<TimestampedFileSplit> splits, int subtaskId) {

226

// Re-assign splits if needed

227

context.assignSplits(new SplitsAssignment<>(

228

Collections.singletonMap(subtaskId, splits)

229

));

230

}

231

232

@Override

233

public void addReader(int subtaskId) {

234

// New reader registered

235

}

236

237

@Override

238

public TimestampedFileEnumeratorState snapshotState(long checkpointId) {

239

return new TimestampedFileEnumeratorState(maxTimestamp, splitsAssigned);

240

}

241

242

@Override

243

public void notifyCheckpointComplete(long checkpointId) {

244

// Checkpoint completed

245

}

246

247

@Override

248

public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {

249

if (sourceEvent instanceof TimestampUpdateEvent) {

250

TimestampUpdateEvent timestampEvent = (TimestampUpdateEvent) sourceEvent;

251

maxTimestamp = Math.max(maxTimestamp, timestampEvent.getTimestamp());

252

LOG.debug("Updated max timestamp to: {}", maxTimestamp);

253

}

254

}

255

256

@Override

257

public void close() {

258

// Cleanup resources

259

}

260

261

public long getMaxTimestamp() {

262

return maxTimestamp;

263

}

264

265

private List<TimestampedFileSplit> createFileSplits() {

266

// Create splits for the file

267

return Collections.singletonList(

268

new TimestampedFileSplit("file-split-0", filePath, 0, getFileSize(filePath))

269

);

270

}

271

}

272

273

// Event to communicate timestamp updates

274

public class TimestampUpdateEvent implements SourceEvent {

275

private final long timestamp;

276

277

public TimestampUpdateEvent(long timestamp) {

278

this.timestamp = timestamp;

279

}

280

281

public long getTimestamp() {

282

return timestamp;

283

}

284

}

285

```

286

287

### Multi-Stage Hybrid Source

288

289

```java

290

// Complex hybrid source with multiple stages: Archive → Recent Files → Real-time Stream

291

public class MultiStageHybridSource {

292

293

public static HybridSource<LogRecord> createLogProcessingSource(

294

Path archivePath,

295

Path recentPath,

296

String streamingEndpoint) {

297

298

// Stage 1: Archive files (oldest data)

299

FileSource<LogRecord> archiveSource = FileSource

300

.forRecordStreamFormat(new LogRecordFormat(), archivePath)

301

.build();

302

303

// Stage 2: Recent files (newer data)

304

FileSource<LogRecord> recentSource = FileSource

305

.forRecordStreamFormat(new LogRecordFormat(), recentPath)

306

.monitorContinuously(Duration.ofSeconds(10)) // Monitor for new files

307

.build();

308

309

return HybridSource.<LogRecord, FileSourceEnumerator>builder(archiveSource)

310

// Add recent files stage

311

.addSource(

312

switchContext -> {

313

FileSourceEnumerator archiveEnumerator = switchContext.getPreviousEnumerator();

314

long maxProcessedTime = archiveEnumerator.getMaxProcessedTimestamp();

315

316

// Configure recent source to start after archive data

317

return FileSource

318

.forRecordStreamFormat(new LogRecordFormat(), recentPath)

319

.monitorContinuously(Duration.ofSeconds(10))

320

.setFilenameFilter(path -> getFileTimestamp(path) > maxProcessedTime)

321

.build();

322

},

323

Boundedness.BOUNDED // Recent files are bounded

324

)

325

// Add real-time streaming stage

326

.addSource(

327

switchContext -> {

328

// Get end time from recent files

329

FileSourceEnumerator recentEnumerator = switchContext.getPreviousEnumerator();

330

long streamStartTime = recentEnumerator.getMaxProcessedTimestamp();

331

332

// Create streaming source starting from where files ended

333

return new LogStreamSource(streamingEndpoint, streamStartTime);

334

},

335

Boundedness.CONTINUOUS_UNBOUNDED // Streaming is unbounded

336

)

337

.build();

338

}

339

340

private static long getFileTimestamp(Path path) {

341

// Extract timestamp from filename or file attributes

342

String filename = path.getFileName().toString();

343

// Assuming filename like "logs-2024-01-01-12-00.txt"

344

// Parse and return timestamp

345

return parseTimestampFromFilename(filename);

346

}

347

}

348

```

349

350

### Source Factory Patterns

351

352

```java

353

// Factory for database sources with connection pooling

354

public class DatabaseSourceFactory implements SourceFactory<DatabaseRecord, DatabaseSource, Object> {

355

private final String connectionUrl;

356

private final String query;

357

private final DataSource dataSource;

358

359

public DatabaseSourceFactory(String connectionUrl, String query) {

360

this.connectionUrl = connectionUrl;

361

this.query = query;

362

this.dataSource = createDataSource(connectionUrl);

363

}

364

365

@Override

366

public DatabaseSource create(SourceSwitchContext<Object> context) {

367

// Create database source with existing connection pool

368

return new DatabaseSource(dataSource, query);

369

}

370

371

private DataSource createDataSource(String url) {

372

HikariConfig config = new HikariConfig();

373

config.setJdbcUrl(url);

374

config.setMaximumPoolSize(10);

375

config.setMinimumIdle(2);

376

return new HikariDataSource(config);

377

}

378

}

379

380

// Factory with conditional source creation

381

public class ConditionalSourceFactory implements SourceFactory<String, Source<String, ?, ?>, FileSourceEnumerator> {

382

383

@Override

384

public Source<String, ?, ?> create(SourceSwitchContext<FileSourceEnumerator> context) {

385

FileSourceEnumerator previousEnumerator = context.getPreviousEnumerator();

386

387

// Choose next source based on previous source state

388

if (previousEnumerator.getProcessedRecordCount() > 1_000_000) {

389

// Large dataset - use Kafka for scalability

390

return createKafkaSource();

391

} else if (previousEnumerator.hasErrors()) {

392

// Previous source had errors - use reliable source

393

return createDatabaseSource();

394

} else {

395

// Normal case - use default file source

396

return createFileSource();

397

}

398

}

399

400

private Source<String, ?, ?> createKafkaSource() {

401

return KafkaSource.<String>builder()

402

.setBootstrapServers("localhost:9092")

403

.setTopics("high-volume-topic")

404

.setGroupId("hybrid-high-volume")

405

.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))

406

.setStartingOffsets(OffsetsInitializer.latest())

407

.build();

408

}

409

410

private Source<String, ?, ?> createDatabaseSource() {

411

return JdbcSource.<String>builder()

412

.setDrivername("com.mysql.cj.jdbc.Driver")

413

.setDBUrl("jdbc:mysql://localhost:3306/backup")

414

.setQuery("SELECT data FROM backup_records WHERE processed = false")

415

.setRowTypeInfo(Types.STRING)

416

.build();

417

}

418

419

private Source<String, ?, ?> createFileSource() {

420

return FileSource

421

.forRecordStreamFormat(new TextLineInputFormat(), Paths.get("/backup/files"))

422

.build();

423

}

424

}

425

```

426

427

## State Management and Checkpointing

428

429

### HybridSourceEnumeratorState

430

431

```java { .api }

432

public class HybridSourceEnumeratorState {

433

private final int currentSourceIndex;

434

private final byte[] currentEnumeratorState;

435

private final List<HybridSourceSplit> remainingSplits;

436

437

// Constructor and methods for state management

438

public HybridSourceEnumeratorState(

439

int currentSourceIndex,

440

byte[] currentEnumeratorState,

441

List<HybridSourceSplit> remainingSplits)

442

443

public int getCurrentSourceIndex()

444

public byte[] getCurrentEnumeratorState()

445

public List<HybridSourceSplit> getRemainingSplits()

446

}

447

```

448

449

### Custom State Serialization

450

451

```java

452

public class CustomHybridSourceStateSerializer

453

implements SimpleVersionedSerializer<HybridSourceEnumeratorState> {

454

455

@Override

456

public int getVersion() {

457

return 1;

458

}

459

460

@Override

461

public byte[] serialize(HybridSourceEnumeratorState state) throws IOException {

462

try (ByteArrayOutputStream baos = new ByteArrayOutputStream();

463

DataOutputStream out = new DataOutputStream(baos)) {

464

465

// Write current source index

466

out.writeInt(state.getCurrentSourceIndex());

467

468

// Write enumerator state

469

byte[] enumState = state.getCurrentEnumeratorState();

470

out.writeInt(enumState.length);

471

out.write(enumState);

472

473

// Write remaining splits

474

List<HybridSourceSplit> splits = state.getRemainingSplits();

475

out.writeInt(splits.size());

476

for (HybridSourceSplit split : splits) {

477

serializeSplit(split, out);

478

}

479

480

return baos.toByteArray();

481

}

482

}

483

484

@Override

485

public HybridSourceEnumeratorState deserialize(int version, byte[] serialized) throws IOException {

486

try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);

487

DataInputStream in = new DataInputStream(bais)) {

488

489

// Read current source index

490

int currentSourceIndex = in.readInt();

491

492

// Read enumerator state

493

int enumStateLength = in.readInt();

494

byte[] enumState = new byte[enumStateLength];

495

in.readFully(enumState);

496

497

// Read remaining splits

498

int splitsCount = in.readInt();

499

List<HybridSourceSplit> splits = new ArrayList<>(splitsCount);

500

for (int i = 0; i < splitsCount; i++) {

501

splits.add(deserializeSplit(in));

502

}

503

504

return new HybridSourceEnumeratorState(currentSourceIndex, enumState, splits);

505

}

506

}

507

}

508

```

509

510

## Configuration Examples

511

512

### Basic Configuration

513

514

```java

515

// Simple file-to-stream hybrid

516

HybridSource<String> basicHybrid = HybridSource.builder(fileSource)

517

.addSource(streamSource)

518

.build();

519

520

// Configure in job

521

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

522

env.enableCheckpointing(Duration.ofMinutes(1));

523

524

DataStream<String> stream = env.fromSource(

525

basicHybrid,

526

WatermarkStrategy.noWatermarks(),

527

"basic-hybrid-source"

528

);

529

```

530

531

### Advanced Configuration with Position Transfer

532

533

```java

534

public class AdvancedHybridConfiguration {

535

536

public static HybridSource<TransactionRecord> createTransactionSource() {

537

// Historical data from data lake

538

ParquetSource<TransactionRecord> historicalSource = ParquetSource

539

.forRecords(TransactionRecord.class, Paths.get("s3://data-lake/transactions/"))

540

.withParallelism(16)

541

.build();

542

543

return HybridSource.<TransactionRecord, ParquetSourceEnumerator>builder(historicalSource)

544

// Recent data from database

545

.addSource(

546

switchContext -> {

547

ParquetSourceEnumerator histEnumerator = switchContext.getPreviousEnumerator();

548

Instant cutoffTime = histEnumerator.getMaxRecordTimestamp();

549

550

return JdbcSource.<TransactionRecord>builder()

551

.setDrivername("org.postgresql.Driver")

552

.setDBUrl("jdbc:postgresql://localhost:5432/transactions")

553

.setQuery("SELECT * FROM transactions WHERE created_at > ?")

554

.setParametersProvider(new TimestampParameterProvider(cutoffTime))

555

.setRowTypeInfo(TransactionRecord.getTypeInfo())

556

.build();

557

},

558

Boundedness.BOUNDED

559

)

560

// Live stream

561

.addSource(

562

switchContext -> {

563

// Get end time from database source

564

JdbcSourceEnumerator dbEnumerator = switchContext.getPreviousEnumerator();

565

Instant streamStart = dbEnumerator.getMaxProcessedTimestamp();

566

567

return KafkaSource.<TransactionRecord>builder()

568

.setBootstrapServers("kafka-cluster:9092")

569

.setTopics("live-transactions")

570

.setGroupId("hybrid-transaction-processor")

571

.setDeserializer(new TransactionRecordDeserializer())

572

.setStartingOffsets(OffsetsInitializer.timestamp(streamStart.toEpochMilli()))

573

.build();

574

},

575

Boundedness.CONTINUOUS_UNBOUNDED

576

)

577

.build();

578

}

579

}

580

```

581

582

## Best Practices

583

584

### Position Transfer

585

586

1. **Implement Precise Timestamp Tracking**

587

```java

588

public class TimestampTrackingEnumerator implements SplitEnumerator<MySplit, MyEnumeratorState> {

589

private volatile Instant maxProcessedTimestamp = Instant.MIN;

590

591

@Override

592

public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {

593

if (sourceEvent instanceof TimestampProgressEvent) {

594

TimestampProgressEvent event = (TimestampProgressEvent) sourceEvent;

595

synchronized (this) {

596

if (event.getTimestamp().isAfter(maxProcessedTimestamp)) {

597

maxProcessedTimestamp = event.getTimestamp();

598

}

599

}

600

}

601

}

602

603

public Instant getMaxProcessedTimestamp() {

604

return maxProcessedTimestamp;

605

}

606

}

607

```

608

609

2. **Handle Clock Skew and Overlap**

610

```java

611

public class OverlapHandlingSourceFactory implements SourceFactory<Event, KafkaSource<Event>, FileSourceEnumerator> {

612

private final Duration overlapBuffer;

613

614

@Override

615

public KafkaSource<Event> create(SourceSwitchContext<FileSourceEnumerator> context) {

616

FileSourceEnumerator prevEnum = context.getPreviousEnumerator();

617

Instant switchTime = prevEnum.getMaxProcessedTimestamp();

618

619

// Start slightly before the file source ended to handle clock skew

620

Instant kafkaStartTime = switchTime.minus(overlapBuffer);

621

622

return KafkaSource.<Event>builder()

623

.setStartingOffsets(OffsetsInitializer.timestamp(kafkaStartTime.toEpochMilli()))

624

.setDeserializer(new DeduplicatingEventDeserializer(switchTime)) // Handle duplicates

625

.build();

626

}

627

}

628

```

629

630

### Resource Management

631

632

1. **Efficient Source Lifecycle Management**

633

```java

634

public class ResourceAwareHybridSource {

635

636

public static <T> HybridSource<T> createWithResourceManagement(

637

List<Source<T, ?, ?>> sources) {

638

639

HybridSourceBuilder<T, ?> builder = null;

640

641

for (int i = 0; i < sources.size(); i++) {

642

final int sourceIndex = i;

643

644

if (builder == null) {

645

builder = HybridSource.builder(sources.get(0));

646

} else {

647

builder = builder.addSource(

648

switchContext -> {

649

// Cleanup previous source resources if needed

650

cleanupPreviousSource(switchContext.getPreviousEnumerator());

651

652

// Pre-warm next source

653

Source<T, ?, ?> nextSource = sources.get(sourceIndex);

654

prewarmSource(nextSource);

655

656

return nextSource;

657

},

658

sources.get(sourceIndex).getBoundedness()

659

);

660

}

661

}

662

663

return builder.build();

664

}

665

666

private static void cleanupPreviousSource(SplitEnumerator previousEnumerator) {

667

if (previousEnumerator instanceof ResourceManager) {

668

((ResourceManager) previousEnumerator).releaseResources();

669

}

670

}

671

672

private static <T> void prewarmSource(Source<T, ?, ?> source) {

673

if (source instanceof Prewarmable) {

674

((Prewarmable) source).prewarm();

675

}

676

}

677

}

678

```

679

680

2. **Memory and Performance Optimization**

681

```java

682

public class OptimizedHybridSource {

683

684

public static HybridSource<Record> createOptimized(SourceChainConfig config) {

685

686

return HybridSource.builder(createFirstSource(config))

687

.addSource(

688

switchContext -> {

689

// Adjust parallelism based on data volume

690

int optimalParallelism = calculateOptimalParallelism(

691

switchContext.getPreviousEnumerator()

692

);

693

694

return createSecondSource(config)

695

.withParallelism(optimalParallelism);

696

},

697

Boundedness.CONTINUOUS_UNBOUNDED

698

)

699

.build();

700

}

701

702

private static int calculateOptimalParallelism(SplitEnumerator previousEnumerator) {

703

if (previousEnumerator instanceof MetricsProvider) {

704

MetricsProvider metricsProvider = (MetricsProvider) previousEnumerator;

705

long recordsPerSecond = metricsProvider.getRecordsPerSecond();

706

707

// Scale parallelism based on throughput

708

return Math.max(1, (int) (recordsPerSecond / 10000)); // 10k records per subtask

709

}

710

711

return Runtime.getRuntime().availableProcessors();

712

}

713

}

714

```

715

716

The Hybrid Source System provides a powerful framework for building complex data ingestion pipelines that can seamlessly transition between different data sources while maintaining exactly-once processing guarantees and efficient resource utilization.