or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconnectors.mdcore-functions.mddatastream-traditional.mddatastream-v2.mdindex.mdstate-management.mdtable-api.mdwindowing.md

connectors.mddocs/

0

# Connector Framework

1

2

Unified connector framework for integrating with external systems, supporting both source and sink operations with exactly-once processing guarantees.

3

4

## Capabilities

5

6

### Source Framework

7

8

New unified source interface for reading data from external systems.

9

10

```java { .api }

11

/**

12

* Unified source interface

13

* @param <T> Output element type

14

* @param <SplitT> Split type

15

* @param <EnumChkT> Enumerator checkpoint type

16

*/

17

interface Source<T, SplitT extends SourceSplit, EnumChkT> {

18

/**

19

* Get source reader

20

* @param readerContext Reader context

21

* @return Source reader

22

*/

23

SourceReader<T, SplitT> createReader(SourceReaderContext readerContext);

24

25

/**

26

* Create split enumerator

27

* @param enumContext Enumerator context

28

* @return Split enumerator

29

*/

30

SplitEnumerator<SplitT, EnumChkT> createEnumerator(SplitEnumeratorContext<SplitT> enumContext);

31

32

/**

33

* Restore split enumerator

34

* @param enumContext Enumerator context

35

* @param checkpoint Checkpoint

36

* @return Restored enumerator

37

*/

38

SplitEnumerator<SplitT, EnumChkT> restoreEnumerator(SplitEnumeratorContext<SplitT> enumContext, EnumChkT checkpoint);

39

40

/**

41

* Get split serializer

42

* @return Split serializer

43

*/

44

SimpleVersionedSerializer<SplitT> getSplitSerializer();

45

46

/**

47

* Get enumerator checkpoint serializer

48

* @return Checkpoint serializer

49

*/

50

SimpleVersionedSerializer<EnumChkT> getEnumeratorCheckpointSerializer();

51

52

/**

53

* Get source boundedness

54

* @return Boundedness

55

*/

56

Boundedness getBoundedness();

57

}

58

59

/**

60

* Source reader interface

61

* @param <T> Element type

62

* @param <SplitT> Split type

63

*/

64

interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseable {

65

/**

66

* Start the reader

67

*/

68

void start();

69

70

/**

71

* Poll for next batch of records

72

* @return Input status

73

* @throws Exception

74

*/

75

InputStatus pollNext(ReaderOutput<T> output) throws Exception;

76

77

/**

78

* List completed checkpoints

79

* @return List of completed checkpoints

80

*/

81

List<SplitT> snapshotState(long checkpointId);

82

83

/**

84

* Add splits to reader

85

* @param splits Splits to add

86

*/

87

void addSplits(List<SplitT> splits);

88

89

/**

90

* Handle no more splits

91

*/

92

void notifyNoMoreSplits();

93

94

@Override

95

void close() throws Exception;

96

}

97

98

/**

99

* Split enumerator interface

100

* @param <SplitT> Split type

101

* @param <CheckpointT> Checkpoint type

102

*/

103

interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> extends AutoCloseable {

104

/**

105

* Start the enumerator

106

*/

107

void start();

108

109

/**

110

* Handle split request

111

* @param subtaskId Subtask requesting splits

112

* @param requesterHostname Requester hostname

113

*/

114

void handleSplitRequest(int subtaskId, @Nullable String requesterHostname);

115

116

/**

117

* Add splits back to enumerator

118

* @param splits Splits to add back

119

* @param subtaskId Subtask ID

120

*/

121

void addSplitsBack(List<SplitT> splits, int subtaskId);

122

123

/**

124

* Add new reader

125

* @param subtaskId Subtask ID

126

*/

127

void addReader(int subtaskId);

128

129

/**

130

* Snapshot enumerator state

131

* @param checkpointId Checkpoint ID

132

* @return Checkpoint state

133

* @throws Exception

134

*/

135

CheckpointT snapshotState(long checkpointId) throws Exception;

136

137

@Override

138

void close() throws IOException;

139

}

140

```

141

142

### Sink Framework

143

144

New unified sink interface for writing data to external systems.

145

146

```java { .api }

147

/**

148

* Unified sink interface

149

* @param <InputT> Input element type

150

*/

151

interface Sink<InputT> {

152

/**

153

* Create sink writer

154

* @param context Writer initialization context

155

* @return Sink writer

156

*/

157

SinkWriter<InputT> createWriter(WriterInitContext context);

158

159

/**

160

* Restore sink writer

161

* @param context Writer initialization context

162

* @param recoveredState Recovered state

163

* @return Restored sink writer

164

* @throws IOException

165

*/

166

default SinkWriter<InputT> restoreWriter(WriterInitContext context, Collection<WriterState> recoveredState) throws IOException {

167

return createWriter(context);

168

}

169

170

/**

171

* Create committer

172

* @param <CommittableT> Committable type

173

* @return Optional committer

174

*/

175

default <CommittableT> Optional<Committer<CommittableT>> createCommitter() {

176

return Optional.empty();

177

}

178

179

/**

180

* Create global committer

181

* @param <CommittableT> Committable type

182

* @param <GlobalCommittableT> Global committable type

183

* @return Optional global committer

184

*/

185

default <CommittableT, GlobalCommittableT> Optional<GlobalCommitter<CommittableT, GlobalCommittableT>> createGlobalCommitter() {

186

return Optional.empty();

187

}

188

189

/**

190

* Get writer state serializer

191

* @return Optional state serializer

192

*/

193

default Optional<SimpleVersionedSerializer<WriterState>> getWriterStateSerializer() {

194

return Optional.empty();

195

}

196

}

197

198

/**

199

* Sink writer interface

200

* @param <InputT> Input element type

201

*/

202

interface SinkWriter<InputT> extends AutoCloseable {

203

/**

204

* Write element

205

* @param element Element to write

206

* @param context Write context

207

* @throws IOException

208

* @throws InterruptedException

209

*/

210

void write(InputT element, Context context) throws IOException, InterruptedException;

211

212

/**

213

* Flush buffered elements

214

* @param endOfInput Whether this is end of input

215

* @return List of committables

216

* @throws IOException

217

* @throws InterruptedException

218

*/

219

List<CommittableT> flush(boolean endOfInput) throws IOException, InterruptedException;

220

221

/**

222

* Snapshot writer state

223

* @param checkpointId Checkpoint ID

224

* @return Writer state

225

* @throws IOException

226

*/

227

default List<WriterState> snapshotState(long checkpointId) throws IOException {

228

return Collections.emptyList();

229

}

230

231

@Override

232

void close() throws Exception;

233

234

/**

235

* Write context

236

*/

237

interface Context {

238

/**

239

* Get current event time

240

* @return Event time

241

*/

242

long currentEventTime();

243

244

/**

245

* Get element timestamp

246

* @return Element timestamp

247

*/

248

Long timestamp();

249

}

250

}

251

```

252

253

### Base Connector Components

254

255

Base classes and interfaces for building connectors.

256

257

```java { .api }

258

/**

259

* Base split interface

260

*/

261

interface SourceSplit {

262

/**

263

* Get split ID

264

* @return Split identifier

265

*/

266

String splitId();

267

}

268

269

/**

270

* Source split with file information

271

*/

272

class FileSourceSplit implements SourceSplit {

273

/**

274

* Create file source split

275

* @param id Split ID

276

* @param path File path

277

* @param offset Start offset

278

* @param length Split length

279

*/

280

public FileSourceSplit(String id, Path path, long offset, long length);

281

282

/**

283

* Get file path

284

* @return File path

285

*/

286

public Path path();

287

288

/**

289

* Get start offset

290

* @return Start offset

291

*/

292

public long offset();

293

294

/**

295

* Get split length

296

* @return Split length

297

*/

298

public long length();

299

}

300

301

/**

302

* Input status enumeration

303

*/

304

enum InputStatus {

305

/** More data available */

306

MORE_AVAILABLE,

307

/** No more data */

308

NOTHING_AVAILABLE,

309

/** End of input */

310

END_OF_INPUT

311

}

312

313

/**

314

* Boundedness enumeration

315

*/

316

enum Boundedness {

317

/** Bounded source */

318

BOUNDED,

319

/** Unbounded source */

320

CONTINUOUS_UNBOUNDED

321

}

322

323

/**

324

* Reader output interface

325

* @param <T> Element type

326

*/

327

interface ReaderOutput<T> {

328

/**

329

* Collect element

330

* @param element Element to collect

331

*/

332

void collect(T element);

333

334

/**

335

* Collect element with timestamp

336

* @param element Element to collect

337

* @param timestamp Element timestamp

338

*/

339

void collect(T element, long timestamp);

340

341

/**

342

* Emit watermark

343

* @param watermark Watermark to emit

344

*/

345

void emitWatermark(Watermark watermark);

346

347

/**

348

* Mark source idle

349

*/

350

void markIdle();

351

352

/**

353

* Mark source active

354

*/

355

void markActive();

356

}

357

```

358

359

### File Connectors

360

361

File-based source and sink connectors.

362

363

```java { .api }

364

/**

365

* File source for reading files

366

* @param <T> Element type

367

*/

368

class FileSource<T> implements Source<T, FileSourceSplit, PendingSplitsCheckpoint> {

369

/**

370

* Create file source builder

371

* @param <T> Element type

372

* @return File source builder

373

*/

374

public static <T> FileSourceBuilder<T> forRecordStreamFormat(StreamFormat<T> streamFormat);

375

376

/**

377

* Create file source for bulk format

378

* @param bulkFormat Bulk format

379

* @param <T> Element type

380

* @return File source builder

381

*/

382

public static <T> FileSourceBuilder<T> forBulkFileFormat(BulkFormat<T, ?> bulkFormat);

383

}

384

385

/**

386

* File source builder

387

* @param <T> Element type

388

*/

389

class FileSourceBuilder<T> {

390

/**

391

* Set file paths to read

392

* @param inputPaths Input paths

393

* @return Builder

394

*/

395

public FileSourceBuilder<T> setFilePaths(Path... inputPaths);

396

397

/**

398

* Set file paths to read

399

* @param inputPaths Input paths

400

* @return Builder

401

*/

402

public FileSourceBuilder<T> setFilePaths(Collection<Path> inputPaths);

403

404

/**

405

* Monitor path for new files

406

* @param monitoredPath Path to monitor

407

* @param processingMode Processing mode

408

* @return Builder

409

*/

410

public FileSourceBuilder<T> monitorContinuously(Path monitoredPath, Duration discoveryInterval);

411

412

/**

413

* Set file path filter

414

* @param pathFilter Path filter

415

* @return Builder

416

*/

417

public FileSourceBuilder<T> setFilePathFilter(PathFilter pathFilter);

418

419

/**

420

* Build file source

421

* @return File source

422

*/

423

public FileSource<T> build();

424

}

425

426

/**

427

* Stream format interface

428

* @param <T> Element type

429

*/

430

interface StreamFormat<T> {

431

/**

432

* Create reader for input stream

433

* @param config Configuration

434

* @param inputStream Input stream

435

* @param fileLen File length

436

* @param splitEnd Split end position

437

* @return Stream format reader

438

* @throws IOException

439

*/

440

Reader<T> createReader(Configuration config, FSDataInputStream inputStream, long fileLen, long splitEnd) throws IOException;

441

442

/**

443

* Check if format is splittable

444

* @return true if splittable

445

*/

446

boolean isSplittable();

447

448

/**

449

* Stream format reader

450

* @param <T> Element type

451

*/

452

interface Reader<T> extends AutoCloseable {

453

/**

454

* Read next record

455

* @return Next record or null if end of split

456

* @throws IOException

457

*/

458

T read() throws IOException;

459

460

@Override

461

void close() throws IOException;

462

}

463

}

464

```

465

466

### Committable Types

467

468

Types used in the sink framework for two-phase commit scenarios.

469

470

```java { .api }

471

/**

472

* Base interface for committable data

473

*/

474

interface Committable {}

475

476

/**

477

* Marker interface for writer state

478

*/

479

interface WriterState {}

480

481

/**

482

* Committer interface for two-phase commit

483

* @param <CommittableT> Committable type

484

*/

485

interface Committer<CommittableT> extends AutoCloseable {

486

/**

487

* Commit the committables

488

* @param committables List of committables to commit

489

* @return List of retry committables

490

* @throws IOException

491

* @throws InterruptedException

492

*/

493

List<CommittableT> commit(List<CommittableT> committables) throws IOException, InterruptedException;

494

495

@Override

496

void close() throws Exception;

497

}

498

499

/**

500

* Global committer interface

501

* @param <CommittableT> Committable type

502

* @param <GlobalCommittableT> Global committable type

503

*/

504

interface GlobalCommitter<CommittableT, GlobalCommittableT> extends AutoCloseable {

505

/**

506

* Combine committables for global commit

507

* @param committables List of committables

508

* @return Combined global committable

509

* @throws IOException

510

*/

511

GlobalCommittableT combine(List<CommittableT> committables) throws IOException;

512

513

/**

514

* Commit global committable

515

* @param globalCommittables List of global committables

516

* @return List of retry global committables

517

* @throws IOException

518

* @throws InterruptedException

519

*/

520

List<GlobalCommittableT> commit(List<GlobalCommittableT> globalCommittables) throws IOException, InterruptedException;

521

522

@Override

523

void close() throws Exception;

524

}

525

```