0
# Data Connectors
1
2
Framework for building data connectors with browse, sample, and specification generation capabilities to enable dynamic data source and sink discovery in CDAP ETL pipelines.
3
4
## Core Connector Interfaces
5
6
### Connector
7
8
Base interface for data connectors providing core functionality.
9
10
```java { .api }
11
package io.cdap.cdap.etl.api.connector;
12
13
public interface Connector {
14
/**
15
* Test the connector configuration.
16
*/
17
void test(ConnectorContext context) throws ValidationException;
18
19
/**
20
* Browse entities in the connector.
21
*/
22
BrowseDetail browse(ConnectorContext context, BrowseRequest request)
23
throws IllegalArgumentException;
24
25
/**
26
* Generate plugin specification for the connector.
27
*/
28
ConnectorSpec generateSpec(ConnectorContext context, ConnectorSpecRequest request)
29
throws IllegalArgumentException;
30
31
/**
32
* Sample data from the connector.
33
*/
34
SampleDetail sample(ConnectorContext context, SampleRequest request);
35
}
36
```
37
38
### DirectConnector
39
40
Direct connector interface that extends base connector with pipeline configuration.
41
42
```java { .api }
43
package io.cdap.cdap.etl.api.connector;
44
45
public interface DirectConnector extends Connector, PipelineConfigurable {
46
// Combines connector capabilities with pipeline configuration
47
}
48
```
49
50
**Usage Example:**
51
```java
52
@Plugin(type = "connector")
53
@Name("DatabaseConnector")
54
@Description("Connector for database systems")
55
public class DatabaseConnector implements DirectConnector {
56
57
private final Config config;
58
59
@Override
60
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
61
config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
62
}
63
64
@Override
65
public void test(ConnectorContext context) throws ValidationException {
66
try (Connection conn = DriverManager.getConnection(
67
config.connectionString, config.username, config.password)) {
68
// Test database connectivity
69
if (!conn.isValid(30)) {
70
throw new ValidationException("Unable to connect to database");
71
}
72
} catch (SQLException e) {
73
throw new ValidationException("Database connection failed: " + e.getMessage());
74
}
75
}
76
77
@Override
78
public BrowseDetail browse(ConnectorContext context, BrowseRequest request) {
79
List<BrowseEntity> entities = new ArrayList<>();
80
String path = request.getPath();
81
82
try (Connection conn = getConnection()) {
83
if (path.isEmpty() || path.equals("/")) {
84
// Browse schemas/databases
85
entities.addAll(browseDatabases(conn));
86
} else if (path.startsWith("/database/")) {
87
// Browse tables in database
88
String database = extractDatabase(path);
89
entities.addAll(browseTables(conn, database));
90
}
91
} catch (SQLException e) {
92
throw new IllegalArgumentException("Failed to browse: " + e.getMessage());
93
}
94
95
return new BrowseDetail(entities, entities.size());
96
}
97
98
@Override
99
public ConnectorSpec generateSpec(ConnectorContext context, ConnectorSpecRequest request) {
100
String path = request.getPath();
101
Map<String, String> properties = new HashMap<>();
102
103
// Parse path to extract database and table information
104
String[] pathParts = path.split("/");
105
if (pathParts.length >= 3) {
106
properties.put("database", pathParts[1]);
107
properties.put("table", pathParts[2]);
108
properties.put("connectionString", config.connectionString);
109
properties.put("username", config.username);
110
}
111
112
Schema schema = inferSchemaFromTable(pathParts[1], pathParts[2]);
113
114
return ConnectorSpec.builder()
115
.addProperty("referenceName", pathParts[2])
116
.addProperties(properties)
117
.setSchema(schema)
118
.build();
119
}
120
121
@Override
122
public SampleDetail sample(ConnectorContext context, SampleRequest request) {
123
String path = request.getPath();
124
int limit = request.getLimit();
125
126
// Extract table information from path
127
String[] pathParts = path.split("/");
128
String database = pathParts[1];
129
String table = pathParts[2];
130
131
List<StructuredRecord> records = new ArrayList<>();
132
133
try (Connection conn = getConnection()) {
134
String query = String.format("SELECT * FROM %s.%s LIMIT %d",
135
database, table, limit);
136
137
try (PreparedStatement stmt = conn.prepareStatement(query);
138
ResultSet rs = stmt.executeQuery()) {
139
140
Schema schema = inferSchemaFromResultSet(rs.getMetaData());
141
142
while (rs.next()) {
143
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
144
for (Schema.Field field : schema.getFields()) {
145
builder.set(field.getName(), rs.getObject(field.getName()));
146
}
147
records.add(builder.build());
148
}
149
}
150
} catch (SQLException e) {
151
throw new RuntimeException("Failed to sample data: " + e.getMessage());
152
}
153
154
return new SampleDetail(SampleType.TABLE, records, schema);
155
}
156
}
157
```
158
159
## Connector Context
160
161
### ConnectorContext
162
163
Context interface for connector operations providing access to runtime services.
164
165
```java { .api }
166
package io.cdap.cdap.etl.api.connector;
167
168
public interface ConnectorContext extends RuntimeContext, PluginContext,
169
ServiceDiscoverer, FeatureFlagsProvider,
170
ConnectionConfigurable {
171
// Provides access to:
172
// - Runtime arguments and properties
173
// - Plugin instantiation capabilities
174
// - Service discovery
175
// - Feature flags
176
// - Connection configuration
177
}
178
```
179
180
### ConnectorConfigurer
181
182
Configurer interface for connectors with validation support.
183
184
```java { .api }
185
package io.cdap.cdap.etl.api.connector;
186
187
public interface ConnectorConfigurer extends FailureCollector {
188
// Combines failure collection for validation
189
}
190
```
191
192
## Browse Operations
193
194
### BrowseRequest
195
196
Request object for browsing connector entities.
197
198
```java { .api }
199
package io.cdap.cdap.etl.api.connector;
200
201
public class BrowseRequest {
202
/**
203
* Create browse request with path and limit.
204
*/
205
public BrowseRequest(String path, int limit) {}
206
207
/**
208
* Get the path to browse.
209
*/
210
public String getPath() {}
211
212
/**
213
* Get the maximum number of entities to return.
214
*/
215
public int getLimit() {}
216
}
217
```
218
219
### BrowseDetail
220
221
Result object containing browsed entities and metadata.
222
223
```java { .api }
224
package io.cdap.cdap.etl.api.connector;
225
226
public class BrowseDetail {
227
/**
228
* Create browse detail with entities and total count.
229
*/
230
public BrowseDetail(List<BrowseEntity> entities, int totalCount) {}
231
232
/**
233
* Get the list of browsed entities.
234
*/
235
public List<BrowseEntity> getEntities() {}
236
237
/**
238
* Get the total count of entities.
239
*/
240
public int getTotalCount() {}
241
}
242
```
243
244
### BrowseEntity
245
246
Individual entity discovered during browse operation.
247
248
```java { .api }
249
package io.cdap.cdap.etl.api.connector;
250
251
public class BrowseEntity {
252
/**
253
* Create browse entity with properties.
254
*/
255
public BrowseEntity(String name, String path, String type,
256
boolean canBrowse, boolean canSample) {}
257
258
/**
259
* Get entity name.
260
*/
261
public String getName() {}
262
263
/**
264
* Get entity path.
265
*/
266
public String getPath() {}
267
268
/**
269
* Get entity type.
270
*/
271
public String getType() {}
272
273
/**
274
* Check if entity can be browsed further.
275
*/
276
public boolean canBrowse() {}
277
278
/**
279
* Check if entity can be sampled.
280
*/
281
public boolean canSample() {}
282
}
283
```
284
285
**Browse Implementation Example:**
286
```java
287
private List<BrowseEntity> browseDatabases(Connection conn) throws SQLException {
288
List<BrowseEntity> entities = new ArrayList<>();
289
290
try (ResultSet rs = conn.getMetaData().getCatalogs()) {
291
while (rs.next()) {
292
String database = rs.getString("TABLE_CAT");
293
entities.add(new BrowseEntity(
294
database, // name
295
"/database/" + database, // path
296
"database", // type
297
true, // canBrowse
298
false // canSample
299
));
300
}
301
}
302
303
return entities;
304
}
305
306
private List<BrowseEntity> browseTables(Connection conn, String database) throws SQLException {
307
List<BrowseEntity> entities = new ArrayList<>();
308
309
try (ResultSet rs = conn.getMetaData().getTables(database, null, "%", new String[]{"TABLE"})) {
310
while (rs.next()) {
311
String table = rs.getString("TABLE_NAME");
312
entities.add(new BrowseEntity(
313
table, // name
314
"/database/" + database + "/" + table, // path
315
"table", // type
316
false, // canBrowse
317
true // canSample
318
));
319
}
320
}
321
322
return entities;
323
}
324
```
325
326
### Browse Entity Type Information
327
328
#### BrowseEntityTypeInfo
329
330
Type information for browse entities.
331
332
```java { .api }
333
package io.cdap.cdap.etl.api.connector;
334
335
public class BrowseEntityTypeInfo {
336
// Type information and metadata for browse entities
337
}
338
```
339
340
#### BrowseEntityPropertyValue
341
342
Property values for browse entities.
343
344
```java { .api }
345
package io.cdap.cdap.etl.api.connector;
346
347
public class BrowseEntityPropertyValue {
348
// Property values and attributes for browse entities
349
}
350
```
351
352
## Sampling Operations
353
354
### SampleRequest
355
356
Request object for sampling data from connector entities.
357
358
```java { .api }
359
package io.cdap.cdap.etl.api.connector;
360
361
public class SampleRequest {
362
/**
363
* Create sample request with path, limit, and properties.
364
*/
365
public SampleRequest(String path, int limit, Map<String, String> properties) {}
366
367
/**
368
* Get the path to sample from.
369
*/
370
public String getPath() {}
371
372
/**
373
* Get the maximum number of records to sample.
374
*/
375
public int getLimit() {}
376
377
/**
378
* Get additional properties for sampling.
379
*/
380
public Map<String, String> getProperties() {}
381
}
382
```
383
384
### SampleDetail
385
386
Result object containing sampled data and metadata.
387
388
```java { .api }
389
package io.cdap.cdap.etl.api.connector;
390
391
public class SampleDetail {
392
// Contains sampled data records and inferred schema
393
}
394
```
395
396
### SampleType
397
398
Enumeration of sample types.
399
400
```java { .api }
401
package io.cdap.cdap.etl.api.connector;
402
403
public enum SampleType {
404
TABLE, // Structured table data
405
FILE // File-based data
406
}
407
```
408
409
### SamplePropertyField
410
411
Property field definition for samples.
412
413
```java { .api }
414
package io.cdap.cdap.etl.api.connector;
415
416
public class SamplePropertyField {
417
// Property field metadata for sample configuration
418
}
419
```
420
421
**Sampling Implementation Example:**
422
```java
423
@Override
424
public SampleDetail sample(ConnectorContext context, SampleRequest request) {
425
String path = request.getPath();
426
int limit = Math.min(request.getLimit(), 1000); // Cap at 1000 records
427
Map<String, String> properties = request.getProperties();
428
429
// Parse entity path
430
String[] pathParts = path.split("/");
431
if (pathParts.length < 3) {
432
throw new IllegalArgumentException("Invalid path for sampling: " + path);
433
}
434
435
String database = pathParts[1];
436
String table = pathParts[2];
437
438
List<StructuredRecord> samples = new ArrayList<>();
439
Schema schema = null;
440
441
try (Connection conn = getConnection()) {
442
// Build sampling query with optional filters
443
StringBuilder queryBuilder = new StringBuilder();
444
queryBuilder.append("SELECT * FROM ").append(database).append(".").append(table);
445
446
// Apply filters from properties
447
String whereClause = properties.get("filter");
448
if (whereClause != null && !whereClause.isEmpty()) {
449
queryBuilder.append(" WHERE ").append(whereClause);
450
}
451
452
// Add sampling strategy
453
String samplingStrategy = properties.getOrDefault("sampling", "limit");
454
if ("random".equals(samplingStrategy)) {
455
queryBuilder.append(" ORDER BY RAND()");
456
}
457
458
queryBuilder.append(" LIMIT ").append(limit);
459
460
try (PreparedStatement stmt = conn.prepareStatement(queryBuilder.toString());
461
ResultSet rs = stmt.executeQuery()) {
462
463
// Infer schema from result set metadata
464
schema = inferSchemaFromResultSet(rs.getMetaData());
465
466
// Collect sample records
467
while (rs.next()) {
468
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
469
470
for (Schema.Field field : schema.getFields()) {
471
String fieldName = field.getName();
472
Object value = rs.getObject(fieldName);
473
builder.set(fieldName, convertValue(value, field.getSchema()));
474
}
475
476
samples.add(builder.build());
477
}
478
}
479
} catch (SQLException e) {
480
throw new RuntimeException("Failed to sample data from " + path, e);
481
}
482
483
return new SampleDetail(SampleType.TABLE, samples, schema);
484
}
485
```
486
487
## Specification Generation
488
489
### ConnectorSpecRequest
490
491
Request object for generating connector specifications.
492
493
```java { .api }
494
package io.cdap.cdap.etl.api.connector;
495
496
public class ConnectorSpecRequest {
497
// Request parameters for spec generation
498
}
499
```
500
501
### ConnectorSpec
502
503
Generated specification from connector.
504
505
```java { .api }
506
package io.cdap.cdap.etl.api.connector;
507
508
public class ConnectorSpec {
509
// Generated plugin specification with properties and schema
510
}
511
```
512
513
### PluginSpec
514
515
Plugin specification details.
516
517
```java { .api }
518
package io.cdap.cdap.etl.api.connector;
519
520
public class PluginSpec {
521
// Plugin specification metadata
522
}
523
```
524
525
**Specification Generation Example:**
526
```java
527
@Override
528
public ConnectorSpec generateSpec(ConnectorContext context, ConnectorSpecRequest request) {
529
String path = request.getPath();
530
String[] pathParts = path.split("/");
531
532
if (pathParts.length < 3) {
533
throw new IllegalArgumentException("Invalid path for spec generation: " + path);
534
}
535
536
String database = pathParts[1];
537
String table = pathParts[2];
538
539
// Generate properties for the source/sink plugin
540
Map<String, String> properties = new HashMap<>();
541
properties.put("referenceName", sanitizeReferenceName(table));
542
properties.put("connectionString", config.connectionString);
543
properties.put("username", config.username);
544
properties.put("database", database);
545
properties.put("table", table);
546
547
// Add authentication properties if needed
548
if (config.useSSL) {
549
properties.put("sslMode", "required");
550
}
551
552
// Infer schema from table metadata
553
Schema schema = null;
554
try (Connection conn = getConnection()) {
555
schema = inferSchemaFromTable(conn, database, table);
556
} catch (SQLException e) {
557
throw new RuntimeException("Failed to infer schema for " + path, e);
558
}
559
560
// Build connector spec
561
ConnectorSpec.Builder builder = ConnectorSpec.builder()
562
.setSchema(schema)
563
.addProperties(properties);
564
565
// Add plugin-specific configurations
566
String pluginType = request.getPluginType();
567
if ("batchsource".equals(pluginType)) {
568
builder.addProperty("query", generateSelectQuery(database, table, schema));
569
} else if ("batchsink".equals(pluginType)) {
570
builder.addProperty("tableName", table);
571
builder.addProperty("columns", getColumnList(schema));
572
}
573
574
return builder.build();
575
}
576
577
private Schema inferSchemaFromTable(Connection conn, String database, String table)
578
throws SQLException {
579
Schema.Builder schemaBuilder = Schema.recordOf(table);
580
581
try (ResultSet rs = conn.getMetaData().getColumns(database, null, table, null)) {
582
while (rs.next()) {
583
String columnName = rs.getString("COLUMN_NAME");
584
int jdbcType = rs.getInt("DATA_TYPE");
585
boolean nullable = rs.getInt("NULLABLE") == DatabaseMetaData.columnNullable;
586
587
Schema fieldSchema = mapJdbcTypeToSchema(jdbcType);
588
if (nullable) {
589
fieldSchema = Schema.nullableOf(fieldSchema);
590
}
591
592
schemaBuilder.addField(Schema.Field.of(columnName, fieldSchema));
593
}
594
}
595
596
return schemaBuilder.build();
597
}
598
```
599
600
## Advanced Connector Patterns
601
602
### Multi-Format Connectors
603
604
```java
605
public class FileSystemConnector implements DirectConnector {
606
607
@Override
608
public BrowseDetail browse(ConnectorContext context, BrowseRequest request) {
609
String path = request.getPath();
610
List<BrowseEntity> entities = new ArrayList<>();
611
612
try {
613
FileSystem fs = FileSystem.get(getConfiguration());
614
Path browsePath = new Path(path.isEmpty() ? "/" : path);
615
616
if (fs.exists(browsePath) && fs.isDirectory(browsePath)) {
617
FileStatus[] statuses = fs.listStatus(browsePath);
618
619
for (FileStatus status : statuses) {
620
String name = status.getPath().getName();
621
String fullPath = status.getPath().toString();
622
623
if (status.isDirectory()) {
624
entities.add(new BrowseEntity(name, fullPath, "directory", true, false));
625
} else {
626
String fileType = detectFileType(name);
627
boolean canSample = isSupportedFormat(fileType);
628
entities.add(new BrowseEntity(name, fullPath, fileType, false, canSample));
629
}
630
}
631
}
632
} catch (IOException e) {
633
throw new RuntimeException("Failed to browse path: " + path, e);
634
}
635
636
return new BrowseDetail(entities, entities.size());
637
}
638
639
private String detectFileType(String fileName) {
640
String extension = fileName.substring(fileName.lastIndexOf('.') + 1).toLowerCase();
641
switch (extension) {
642
case "csv": return "csv";
643
case "json": return "json";
644
case "avro": return "avro";
645
case "parquet": return "parquet";
646
default: return "file";
647
}
648
}
649
}
650
```
651
652
### Schema Inference Utilities
653
654
```java
655
private Schema mapJdbcTypeToSchema(int jdbcType) {
656
switch (jdbcType) {
657
case Types.VARCHAR:
658
case Types.CHAR:
659
case Types.LONGVARCHAR:
660
return Schema.of(Schema.Type.STRING);
661
case Types.INTEGER:
662
return Schema.of(Schema.Type.INT);
663
case Types.BIGINT:
664
return Schema.of(Schema.Type.LONG);
665
case Types.FLOAT:
666
case Types.REAL:
667
return Schema.of(Schema.Type.FLOAT);
668
case Types.DOUBLE:
669
case Types.NUMERIC:
670
case Types.DECIMAL:
671
return Schema.of(Schema.Type.DOUBLE);
672
case Types.BOOLEAN:
673
case Types.BIT:
674
return Schema.of(Schema.Type.BOOLEAN);
675
case Types.TIMESTAMP:
676
case Types.TIME:
677
case Types.DATE:
678
return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
679
case Types.BINARY:
680
case Types.VARBINARY:
681
case Types.LONGVARBINARY:
682
return Schema.of(Schema.Type.BYTES);
683
default:
684
return Schema.of(Schema.Type.STRING); // Default to string for unknown types
685
}
686
}
687
688
private Object convertValue(Object value, Schema schema) {
689
if (value == null) {
690
return null;
691
}
692
693
Schema.Type type = schema.isNullable() ? schema.getNonNullable().getType() : schema.getType();
694
695
switch (type) {
696
case STRING:
697
return value.toString();
698
case INT:
699
return value instanceof Number ? ((Number) value).intValue() : Integer.parseInt(value.toString());
700
case LONG:
701
return value instanceof Number ? ((Number) value).longValue() : Long.parseLong(value.toString());
702
case FLOAT:
703
return value instanceof Number ? ((Number) value).floatValue() : Float.parseFloat(value.toString());
704
case DOUBLE:
705
return value instanceof Number ? ((Number) value).doubleValue() : Double.parseDouble(value.toString());
706
case BOOLEAN:
707
return value instanceof Boolean ? value : Boolean.parseBoolean(value.toString());
708
case BYTES:
709
return value instanceof byte[] ? value : value.toString().getBytes();
710
default:
711
return value;
712
}
713
}
714
```