or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actions-conditions.mdbatch-processing.mdcore-pipeline.mddata-connectors.mdindex.mdjoin-operations.mdlineage-metadata.mdsql-engine.mdvalidation.md

data-connectors.mddocs/

0

# Data Connectors

1

2

Framework for building data connectors with browse, sample, and specification generation capabilities to enable dynamic data source and sink discovery in CDAP ETL pipelines.

3

4

## Core Connector Interfaces

5

6

### Connector

7

8

Base interface for data connectors providing core functionality.

9

10

```java { .api }

11

package io.cdap.cdap.etl.api.connector;

12

13

public interface Connector {

14

/**

15

* Test the connector configuration.

16

*/

17

void test(ConnectorContext context) throws ValidationException;

18

19

/**

20

* Browse entities in the connector.

21

*/

22

BrowseDetail browse(ConnectorContext context, BrowseRequest request)

23

throws IllegalArgumentException;

24

25

/**

26

* Generate plugin specification for the connector.

27

*/

28

ConnectorSpec generateSpec(ConnectorContext context, ConnectorSpecRequest request)

29

throws IllegalArgumentException;

30

31

/**

32

* Sample data from the connector.

33

*/

34

SampleDetail sample(ConnectorContext context, SampleRequest request);

35

}

36

```

37

38

### DirectConnector

39

40

Direct connector interface that extends base connector with pipeline configuration.

41

42

```java { .api }

43

package io.cdap.cdap.etl.api.connector;

44

45

public interface DirectConnector extends Connector, PipelineConfigurable {

46

// Combines connector capabilities with pipeline configuration

47

}

48

```

49

50

**Usage Example:**

51

```java

52

@Plugin(type = "connector")

53

@Name("DatabaseConnector")

54

@Description("Connector for database systems")

55

public class DatabaseConnector implements DirectConnector {

56

57

private final Config config;

58

59

@Override

60

public void configurePipeline(PipelineConfigurer pipelineConfigurer) {

61

config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());

62

}

63

64

@Override

65

public void test(ConnectorContext context) throws ValidationException {

66

try (Connection conn = DriverManager.getConnection(

67

config.connectionString, config.username, config.password)) {

68

// Test database connectivity

69

if (!conn.isValid(30)) {

70

throw new ValidationException("Unable to connect to database");

71

}

72

} catch (SQLException e) {

73

throw new ValidationException("Database connection failed: " + e.getMessage());

74

}

75

}

76

77

@Override

78

public BrowseDetail browse(ConnectorContext context, BrowseRequest request) {

79

List<BrowseEntity> entities = new ArrayList<>();

80

String path = request.getPath();

81

82

try (Connection conn = getConnection()) {

83

if (path.isEmpty() || path.equals("/")) {

84

// Browse schemas/databases

85

entities.addAll(browseDatabases(conn));

86

} else if (path.startsWith("/database/")) {

87

// Browse tables in database

88

String database = extractDatabase(path);

89

entities.addAll(browseTables(conn, database));

90

}

91

} catch (SQLException e) {

92

throw new IllegalArgumentException("Failed to browse: " + e.getMessage());

93

}

94

95

return new BrowseDetail(entities, entities.size());

96

}

97

98

@Override

99

public ConnectorSpec generateSpec(ConnectorContext context, ConnectorSpecRequest request) {

100

String path = request.getPath();

101

Map<String, String> properties = new HashMap<>();

102

103

// Parse path to extract database and table information

104

String[] pathParts = path.split("/");

105

if (pathParts.length >= 3) {

106

properties.put("database", pathParts[1]);

107

properties.put("table", pathParts[2]);

108

properties.put("connectionString", config.connectionString);

109

properties.put("username", config.username);

110

}

111

112

Schema schema = inferSchemaFromTable(pathParts[1], pathParts[2]);

113

114

return ConnectorSpec.builder()

115

.addProperty("referenceName", pathParts[2])

116

.addProperties(properties)

117

.setSchema(schema)

118

.build();

119

}

120

121

@Override

122

public SampleDetail sample(ConnectorContext context, SampleRequest request) {

123

String path = request.getPath();

124

int limit = request.getLimit();

125

126

// Extract table information from path

127

String[] pathParts = path.split("/");

128

String database = pathParts[1];

129

String table = pathParts[2];

130

131

List<StructuredRecord> records = new ArrayList<>();

132

133

try (Connection conn = getConnection()) {

134

String query = String.format("SELECT * FROM %s.%s LIMIT %d",

135

database, table, limit);

136

137

try (PreparedStatement stmt = conn.prepareStatement(query);

138

ResultSet rs = stmt.executeQuery()) {

139

140

Schema schema = inferSchemaFromResultSet(rs.getMetaData());

141

142

while (rs.next()) {

143

StructuredRecord.Builder builder = StructuredRecord.builder(schema);

144

for (Schema.Field field : schema.getFields()) {

145

builder.set(field.getName(), rs.getObject(field.getName()));

146

}

147

records.add(builder.build());

148

}

149

}

150

} catch (SQLException e) {

151

throw new RuntimeException("Failed to sample data: " + e.getMessage());

152

}

153

154

return new SampleDetail(SampleType.TABLE, records, schema);

155

}

156

}

157

```

158

159

## Connector Context

160

161

### ConnectorContext

162

163

Context interface for connector operations providing access to runtime services.

164

165

```java { .api }

166

package io.cdap.cdap.etl.api.connector;

167

168

public interface ConnectorContext extends RuntimeContext, PluginContext,

169

ServiceDiscoverer, FeatureFlagsProvider,

170

ConnectionConfigurable {

171

// Provides access to:

172

// - Runtime arguments and properties

173

// - Plugin instantiation capabilities

174

// - Service discovery

175

// - Feature flags

176

// - Connection configuration

177

}

178

```

179

180

### ConnectorConfigurer

181

182

Configurer interface for connectors with validation support.

183

184

```java { .api }

185

package io.cdap.cdap.etl.api.connector;

186

187

public interface ConnectorConfigurer extends FailureCollector {

188

// Combines failure collection for validation

189

}

190

```

191

192

## Browse Operations

193

194

### BrowseRequest

195

196

Request object for browsing connector entities.

197

198

```java { .api }

199

package io.cdap.cdap.etl.api.connector;

200

201

public class BrowseRequest {

202

/**

203

* Create browse request with path and limit.

204

*/

205

public BrowseRequest(String path, int limit) {}

206

207

/**

208

* Get the path to browse.

209

*/

210

public String getPath() {}

211

212

/**

213

* Get the maximum number of entities to return.

214

*/

215

public int getLimit() {}

216

}

217

```

218

219

### BrowseDetail

220

221

Result object containing browsed entities and metadata.

222

223

```java { .api }

224

package io.cdap.cdap.etl.api.connector;

225

226

public class BrowseDetail {

227

/**

228

* Create browse detail with entities and total count.

229

*/

230

public BrowseDetail(List<BrowseEntity> entities, int totalCount) {}

231

232

/**

233

* Get the list of browsed entities.

234

*/

235

public List<BrowseEntity> getEntities() {}

236

237

/**

238

* Get the total count of entities.

239

*/

240

public int getTotalCount() {}

241

}

242

```

243

244

### BrowseEntity

245

246

Individual entity discovered during browse operation.

247

248

```java { .api }

249

package io.cdap.cdap.etl.api.connector;

250

251

public class BrowseEntity {

252

/**

253

* Create browse entity with properties.

254

*/

255

public BrowseEntity(String name, String path, String type,

256

boolean canBrowse, boolean canSample) {}

257

258

/**

259

* Get entity name.

260

*/

261

public String getName() {}

262

263

/**

264

* Get entity path.

265

*/

266

public String getPath() {}

267

268

/**

269

* Get entity type.

270

*/

271

public String getType() {}

272

273

/**

274

* Check if entity can be browsed further.

275

*/

276

public boolean canBrowse() {}

277

278

/**

279

* Check if entity can be sampled.

280

*/

281

public boolean canSample() {}

282

}

283

```

284

285

**Browse Implementation Example:**

286

```java

287

private List<BrowseEntity> browseDatabases(Connection conn) throws SQLException {

288

List<BrowseEntity> entities = new ArrayList<>();

289

290

try (ResultSet rs = conn.getMetaData().getCatalogs()) {

291

while (rs.next()) {

292

String database = rs.getString("TABLE_CAT");

293

entities.add(new BrowseEntity(

294

database, // name

295

"/database/" + database, // path

296

"database", // type

297

true, // canBrowse

298

false // canSample

299

));

300

}

301

}

302

303

return entities;

304

}

305

306

private List<BrowseEntity> browseTables(Connection conn, String database) throws SQLException {

307

List<BrowseEntity> entities = new ArrayList<>();

308

309

try (ResultSet rs = conn.getMetaData().getTables(database, null, "%", new String[]{"TABLE"})) {

310

while (rs.next()) {

311

String table = rs.getString("TABLE_NAME");

312

entities.add(new BrowseEntity(

313

table, // name

314

"/database/" + database + "/" + table, // path

315

"table", // type

316

false, // canBrowse

317

true // canSample

318

));

319

}

320

}

321

322

return entities;

323

}

324

```

325

326

### Browse Entity Type Information

327

328

#### BrowseEntityTypeInfo

329

330

Type information for browse entities.

331

332

```java { .api }

333

package io.cdap.cdap.etl.api.connector;

334

335

public class BrowseEntityTypeInfo {

336

// Type information and metadata for browse entities

337

}

338

```

339

340

#### BrowseEntityPropertyValue

341

342

Property values for browse entities.

343

344

```java { .api }

345

package io.cdap.cdap.etl.api.connector;

346

347

public class BrowseEntityPropertyValue {

348

// Property values and attributes for browse entities

349

}

350

```

351

352

## Sampling Operations

353

354

### SampleRequest

355

356

Request object for sampling data from connector entities.

357

358

```java { .api }

359

package io.cdap.cdap.etl.api.connector;

360

361

public class SampleRequest {

362

/**

363

* Create sample request with path, limit, and properties.

364

*/

365

public SampleRequest(String path, int limit, Map<String, String> properties) {}

366

367

/**

368

* Get the path to sample from.

369

*/

370

public String getPath() {}

371

372

/**

373

* Get the maximum number of records to sample.

374

*/

375

public int getLimit() {}

376

377

/**

378

* Get additional properties for sampling.

379

*/

380

public Map<String, String> getProperties() {}

381

}

382

```

383

384

### SampleDetail

385

386

Result object containing sampled data and metadata.

387

388

```java { .api }

389

package io.cdap.cdap.etl.api.connector;

390

391

public class SampleDetail {

392

// Contains sampled data records and inferred schema

393

}

394

```

395

396

### SampleType

397

398

Enumeration of sample types.

399

400

```java { .api }

401

package io.cdap.cdap.etl.api.connector;

402

403

public enum SampleType {

404

TABLE, // Structured table data

405

FILE // File-based data

406

}

407

```

408

409

### SamplePropertyField

410

411

Property field definition for samples.

412

413

```java { .api }

414

package io.cdap.cdap.etl.api.connector;

415

416

public class SamplePropertyField {

417

// Property field metadata for sample configuration

418

}

419

```

420

421

**Sampling Implementation Example:**

422

```java

423

@Override

424

public SampleDetail sample(ConnectorContext context, SampleRequest request) {

425

String path = request.getPath();

426

int limit = Math.min(request.getLimit(), 1000); // Cap at 1000 records

427

Map<String, String> properties = request.getProperties();

428

429

// Parse entity path

430

String[] pathParts = path.split("/");

431

if (pathParts.length < 3) {

432

throw new IllegalArgumentException("Invalid path for sampling: " + path);

433

}

434

435

String database = pathParts[1];

436

String table = pathParts[2];

437

438

List<StructuredRecord> samples = new ArrayList<>();

439

Schema schema = null;

440

441

try (Connection conn = getConnection()) {

442

// Build sampling query with optional filters

443

StringBuilder queryBuilder = new StringBuilder();

444

queryBuilder.append("SELECT * FROM ").append(database).append(".").append(table);

445

446

// Apply filters from properties

447

String whereClause = properties.get("filter");

448

if (whereClause != null && !whereClause.isEmpty()) {

449

queryBuilder.append(" WHERE ").append(whereClause);

450

}

451

452

// Add sampling strategy

453

String samplingStrategy = properties.getOrDefault("sampling", "limit");

454

if ("random".equals(samplingStrategy)) {

455

queryBuilder.append(" ORDER BY RAND()");

456

}

457

458

queryBuilder.append(" LIMIT ").append(limit);

459

460

try (PreparedStatement stmt = conn.prepareStatement(queryBuilder.toString());

461

ResultSet rs = stmt.executeQuery()) {

462

463

// Infer schema from result set metadata

464

schema = inferSchemaFromResultSet(rs.getMetaData());

465

466

// Collect sample records

467

while (rs.next()) {

468

StructuredRecord.Builder builder = StructuredRecord.builder(schema);

469

470

for (Schema.Field field : schema.getFields()) {

471

String fieldName = field.getName();

472

Object value = rs.getObject(fieldName);

473

builder.set(fieldName, convertValue(value, field.getSchema()));

474

}

475

476

samples.add(builder.build());

477

}

478

}

479

} catch (SQLException e) {

480

throw new RuntimeException("Failed to sample data from " + path, e);

481

}

482

483

return new SampleDetail(SampleType.TABLE, samples, schema);

484

}

485

```

486

487

## Specification Generation

488

489

### ConnectorSpecRequest

490

491

Request object for generating connector specifications.

492

493

```java { .api }

494

package io.cdap.cdap.etl.api.connector;

495

496

public class ConnectorSpecRequest {

497

// Request parameters for spec generation

498

}

499

```

500

501

### ConnectorSpec

502

503

Generated specification from connector.

504

505

```java { .api }

506

package io.cdap.cdap.etl.api.connector;

507

508

public class ConnectorSpec {

509

// Generated plugin specification with properties and schema

510

}

511

```

512

513

### PluginSpec

514

515

Plugin specification details.

516

517

```java { .api }

518

package io.cdap.cdap.etl.api.connector;

519

520

public class PluginSpec {

521

// Plugin specification metadata

522

}

523

```

524

525

**Specification Generation Example:**

526

```java

527

@Override

528

public ConnectorSpec generateSpec(ConnectorContext context, ConnectorSpecRequest request) {

529

String path = request.getPath();

530

String[] pathParts = path.split("/");

531

532

if (pathParts.length < 3) {

533

throw new IllegalArgumentException("Invalid path for spec generation: " + path);

534

}

535

536

String database = pathParts[1];

537

String table = pathParts[2];

538

539

// Generate properties for the source/sink plugin

540

Map<String, String> properties = new HashMap<>();

541

properties.put("referenceName", sanitizeReferenceName(table));

542

properties.put("connectionString", config.connectionString);

543

properties.put("username", config.username);

544

properties.put("database", database);

545

properties.put("table", table);

546

547

// Add authentication properties if needed

548

if (config.useSSL) {

549

properties.put("sslMode", "required");

550

}

551

552

// Infer schema from table metadata

553

Schema schema = null;

554

try (Connection conn = getConnection()) {

555

schema = inferSchemaFromTable(conn, database, table);

556

} catch (SQLException e) {

557

throw new RuntimeException("Failed to infer schema for " + path, e);

558

}

559

560

// Build connector spec

561

ConnectorSpec.Builder builder = ConnectorSpec.builder()

562

.setSchema(schema)

563

.addProperties(properties);

564

565

// Add plugin-specific configurations

566

String pluginType = request.getPluginType();

567

if ("batchsource".equals(pluginType)) {

568

builder.addProperty("query", generateSelectQuery(database, table, schema));

569

} else if ("batchsink".equals(pluginType)) {

570

builder.addProperty("tableName", table);

571

builder.addProperty("columns", getColumnList(schema));

572

}

573

574

return builder.build();

575

}

576

577

private Schema inferSchemaFromTable(Connection conn, String database, String table)

578

throws SQLException {

579

Schema.Builder schemaBuilder = Schema.recordOf(table);

580

581

try (ResultSet rs = conn.getMetaData().getColumns(database, null, table, null)) {

582

while (rs.next()) {

583

String columnName = rs.getString("COLUMN_NAME");

584

int jdbcType = rs.getInt("DATA_TYPE");

585

boolean nullable = rs.getInt("NULLABLE") == DatabaseMetaData.columnNullable;

586

587

Schema fieldSchema = mapJdbcTypeToSchema(jdbcType);

588

if (nullable) {

589

fieldSchema = Schema.nullableOf(fieldSchema);

590

}

591

592

schemaBuilder.addField(Schema.Field.of(columnName, fieldSchema));

593

}

594

}

595

596

return schemaBuilder.build();

597

}

598

```

599

600

## Advanced Connector Patterns

601

602

### Multi-Format Connectors

603

604

```java

605

public class FileSystemConnector implements DirectConnector {

606

607

@Override

608

public BrowseDetail browse(ConnectorContext context, BrowseRequest request) {

609

String path = request.getPath();

610

List<BrowseEntity> entities = new ArrayList<>();

611

612

try {

613

FileSystem fs = FileSystem.get(getConfiguration());

614

Path browsePath = new Path(path.isEmpty() ? "/" : path);

615

616

if (fs.exists(browsePath) && fs.isDirectory(browsePath)) {

617

FileStatus[] statuses = fs.listStatus(browsePath);

618

619

for (FileStatus status : statuses) {

620

String name = status.getPath().getName();

621

String fullPath = status.getPath().toString();

622

623

if (status.isDirectory()) {

624

entities.add(new BrowseEntity(name, fullPath, "directory", true, false));

625

} else {

626

String fileType = detectFileType(name);

627

boolean canSample = isSupportedFormat(fileType);

628

entities.add(new BrowseEntity(name, fullPath, fileType, false, canSample));

629

}

630

}

631

}

632

} catch (IOException e) {

633

throw new RuntimeException("Failed to browse path: " + path, e);

634

}

635

636

return new BrowseDetail(entities, entities.size());

637

}

638

639

private String detectFileType(String fileName) {

640

String extension = fileName.substring(fileName.lastIndexOf('.') + 1).toLowerCase();

641

switch (extension) {

642

case "csv": return "csv";

643

case "json": return "json";

644

case "avro": return "avro";

645

case "parquet": return "parquet";

646

default: return "file";

647

}

648

}

649

}

650

```

651

652

### Schema Inference Utilities

653

654

```java

655

private Schema mapJdbcTypeToSchema(int jdbcType) {

656

switch (jdbcType) {

657

case Types.VARCHAR:

658

case Types.CHAR:

659

case Types.LONGVARCHAR:

660

return Schema.of(Schema.Type.STRING);

661

case Types.INTEGER:

662

return Schema.of(Schema.Type.INT);

663

case Types.BIGINT:

664

return Schema.of(Schema.Type.LONG);

665

case Types.FLOAT:

666

case Types.REAL:

667

return Schema.of(Schema.Type.FLOAT);

668

case Types.DOUBLE:

669

case Types.NUMERIC:

670

case Types.DECIMAL:

671

return Schema.of(Schema.Type.DOUBLE);

672

case Types.BOOLEAN:

673

case Types.BIT:

674

return Schema.of(Schema.Type.BOOLEAN);

675

case Types.TIMESTAMP:

676

case Types.TIME:

677

case Types.DATE:

678

return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);

679

case Types.BINARY:

680

case Types.VARBINARY:

681

case Types.LONGVARBINARY:

682

return Schema.of(Schema.Type.BYTES);

683

default:

684

return Schema.of(Schema.Type.STRING); // Default to string for unknown types

685

}

686

}

687

688

private Object convertValue(Object value, Schema schema) {

689

if (value == null) {

690

return null;

691

}

692

693

Schema.Type type = schema.isNullable() ? schema.getNonNullable().getType() : schema.getType();

694

695

switch (type) {

696

case STRING:

697

return value.toString();

698

case INT:

699

return value instanceof Number ? ((Number) value).intValue() : Integer.parseInt(value.toString());

700

case LONG:

701

return value instanceof Number ? ((Number) value).longValue() : Long.parseLong(value.toString());

702

case FLOAT:

703

return value instanceof Number ? ((Number) value).floatValue() : Float.parseFloat(value.toString());

704

case DOUBLE:

705

return value instanceof Number ? ((Number) value).doubleValue() : Double.parseDouble(value.toString());

706

case BOOLEAN:

707

return value instanceof Boolean ? value : Boolean.parseBoolean(value.toString());

708

case BYTES:

709

return value instanceof byte[] ? value : value.toString().getBytes();

710

default:

711

return value;

712

}

713

}

714

```