CDAP ETL API provides comprehensive abstractions for building Extract, Transform, and Load pipeline applications on the CDAP platform
—
Framework for building data connectors with browse, sample, and specification generation capabilities to enable dynamic data source and sink discovery in CDAP ETL pipelines.
Base interface for data connectors providing core functionality.
package io.cdap.cdap.etl.api.connector;
public interface Connector {
/**
* Test the connector configuration.
*/
void test(ConnectorContext context) throws ValidationException;
/**
* Browse entities in the connector.
*/
BrowseDetail browse(ConnectorContext context, BrowseRequest request)
throws IllegalArgumentException;
/**
* Generate plugin specification for the connector.
*/
ConnectorSpec generateSpec(ConnectorContext context, ConnectorSpecRequest request)
throws IllegalArgumentException;
/**
* Sample data from the connector.
*/
SampleDetail sample(ConnectorContext context, SampleRequest request);
}Direct connector interface that extends base connector with pipeline configuration.
package io.cdap.cdap.etl.api.connector;
public interface DirectConnector extends Connector, PipelineConfigurable {
// Combines connector capabilities with pipeline configuration
}Usage Example:
@Plugin(type = "connector")
@Name("DatabaseConnector")
@Description("Connector for database systems")
public class DatabaseConnector implements DirectConnector {
private final Config config;
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
config.validate(pipelineConfigurer.getStageConfigurer().getFailureCollector());
}
@Override
public void test(ConnectorContext context) throws ValidationException {
try (Connection conn = DriverManager.getConnection(
config.connectionString, config.username, config.password)) {
// Test database connectivity
if (!conn.isValid(30)) {
throw new ValidationException("Unable to connect to database");
}
} catch (SQLException e) {
throw new ValidationException("Database connection failed: " + e.getMessage());
}
}
@Override
public BrowseDetail browse(ConnectorContext context, BrowseRequest request) {
List<BrowseEntity> entities = new ArrayList<>();
String path = request.getPath();
try (Connection conn = getConnection()) {
if (path.isEmpty() || path.equals("/")) {
// Browse schemas/databases
entities.addAll(browseDatabases(conn));
} else if (path.startsWith("/database/")) {
// Browse tables in database
String database = extractDatabase(path);
entities.addAll(browseTables(conn, database));
}
} catch (SQLException e) {
throw new IllegalArgumentException("Failed to browse: " + e.getMessage());
}
return new BrowseDetail(entities, entities.size());
}
@Override
public ConnectorSpec generateSpec(ConnectorContext context, ConnectorSpecRequest request) {
String path = request.getPath();
Map<String, String> properties = new HashMap<>();
// Parse path to extract database and table information
String[] pathParts = path.split("/");
if (pathParts.length >= 3) {
properties.put("database", pathParts[1]);
properties.put("table", pathParts[2]);
properties.put("connectionString", config.connectionString);
properties.put("username", config.username);
}
Schema schema = inferSchemaFromTable(pathParts[1], pathParts[2]);
return ConnectorSpec.builder()
.addProperty("referenceName", pathParts[2])
.addProperties(properties)
.setSchema(schema)
.build();
}
@Override
public SampleDetail sample(ConnectorContext context, SampleRequest request) {
String path = request.getPath();
int limit = request.getLimit();
// Extract table information from path
String[] pathParts = path.split("/");
String database = pathParts[1];
String table = pathParts[2];
List<StructuredRecord> records = new ArrayList<>();
try (Connection conn = getConnection()) {
String query = String.format("SELECT * FROM %s.%s LIMIT %d",
database, table, limit);
try (PreparedStatement stmt = conn.prepareStatement(query);
ResultSet rs = stmt.executeQuery()) {
Schema schema = inferSchemaFromResultSet(rs.getMetaData());
while (rs.next()) {
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
for (Schema.Field field : schema.getFields()) {
builder.set(field.getName(), rs.getObject(field.getName()));
}
records.add(builder.build());
}
}
} catch (SQLException e) {
throw new RuntimeException("Failed to sample data: " + e.getMessage());
}
return new SampleDetail(SampleType.TABLE, records, schema);
}
}Context interface for connector operations providing access to runtime services.
package io.cdap.cdap.etl.api.connector;
public interface ConnectorContext extends RuntimeContext, PluginContext,
ServiceDiscoverer, FeatureFlagsProvider,
ConnectionConfigurable {
// Provides access to:
// - Runtime arguments and properties
// - Plugin instantiation capabilities
// - Service discovery
// - Feature flags
// - Connection configuration
}Configurer interface for connectors with validation support.
package io.cdap.cdap.etl.api.connector;
public interface ConnectorConfigurer extends FailureCollector {
// Combines failure collection for validation
}Request object for browsing connector entities.
package io.cdap.cdap.etl.api.connector;
public class BrowseRequest {
/**
* Create browse request with path and limit.
*/
public BrowseRequest(String path, int limit) {}
/**
* Get the path to browse.
*/
public String getPath() {}
/**
* Get the maximum number of entities to return.
*/
public int getLimit() {}
}Result object containing browsed entities and metadata.
package io.cdap.cdap.etl.api.connector;
public class BrowseDetail {
/**
* Create browse detail with entities and total count.
*/
public BrowseDetail(List<BrowseEntity> entities, int totalCount) {}
/**
* Get the list of browsed entities.
*/
public List<BrowseEntity> getEntities() {}
/**
* Get the total count of entities.
*/
public int getTotalCount() {}
}Individual entity discovered during browse operation.
package io.cdap.cdap.etl.api.connector;
public class BrowseEntity {
/**
* Create browse entity with properties.
*/
public BrowseEntity(String name, String path, String type,
boolean canBrowse, boolean canSample) {}
/**
* Get entity name.
*/
public String getName() {}
/**
* Get entity path.
*/
public String getPath() {}
/**
* Get entity type.
*/
public String getType() {}
/**
* Check if entity can be browsed further.
*/
public boolean canBrowse() {}
/**
* Check if entity can be sampled.
*/
public boolean canSample() {}
}Browse Implementation Example:
private List<BrowseEntity> browseDatabases(Connection conn) throws SQLException {
List<BrowseEntity> entities = new ArrayList<>();
try (ResultSet rs = conn.getMetaData().getCatalogs()) {
while (rs.next()) {
String database = rs.getString("TABLE_CAT");
entities.add(new BrowseEntity(
database, // name
"/database/" + database, // path
"database", // type
true, // canBrowse
false // canSample
));
}
}
return entities;
}
private List<BrowseEntity> browseTables(Connection conn, String database) throws SQLException {
List<BrowseEntity> entities = new ArrayList<>();
try (ResultSet rs = conn.getMetaData().getTables(database, null, "%", new String[]{"TABLE"})) {
while (rs.next()) {
String table = rs.getString("TABLE_NAME");
entities.add(new BrowseEntity(
table, // name
"/database/" + database + "/" + table, // path
"table", // type
false, // canBrowse
true // canSample
));
}
}
return entities;
}Type information for browse entities.
package io.cdap.cdap.etl.api.connector;
public class BrowseEntityTypeInfo {
// Type information and metadata for browse entities
}Property values for browse entities.
package io.cdap.cdap.etl.api.connector;
public class BrowseEntityPropertyValue {
// Property values and attributes for browse entities
}Request object for sampling data from connector entities.
package io.cdap.cdap.etl.api.connector;
public class SampleRequest {
/**
* Create sample request with path, limit, and properties.
*/
public SampleRequest(String path, int limit, Map<String, String> properties) {}
/**
* Get the path to sample from.
*/
public String getPath() {}
/**
* Get the maximum number of records to sample.
*/
public int getLimit() {}
/**
* Get additional properties for sampling.
*/
public Map<String, String> getProperties() {}
}Result object containing sampled data and metadata.
package io.cdap.cdap.etl.api.connector;
public class SampleDetail {
// Contains sampled data records and inferred schema
}Enumeration of sample types.
package io.cdap.cdap.etl.api.connector;
public enum SampleType {
TABLE, // Structured table data
FILE // File-based data
}Property field definition for samples.
package io.cdap.cdap.etl.api.connector;
public class SamplePropertyField {
// Property field metadata for sample configuration
}Sampling Implementation Example:
@Override
public SampleDetail sample(ConnectorContext context, SampleRequest request) {
String path = request.getPath();
int limit = Math.min(request.getLimit(), 1000); // Cap at 1000 records
Map<String, String> properties = request.getProperties();
// Parse entity path
String[] pathParts = path.split("/");
if (pathParts.length < 3) {
throw new IllegalArgumentException("Invalid path for sampling: " + path);
}
String database = pathParts[1];
String table = pathParts[2];
List<StructuredRecord> samples = new ArrayList<>();
Schema schema = null;
try (Connection conn = getConnection()) {
// Build sampling query with optional filters
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append("SELECT * FROM ").append(database).append(".").append(table);
// Apply filters from properties
String whereClause = properties.get("filter");
if (whereClause != null && !whereClause.isEmpty()) {
queryBuilder.append(" WHERE ").append(whereClause);
}
// Add sampling strategy
String samplingStrategy = properties.getOrDefault("sampling", "limit");
if ("random".equals(samplingStrategy)) {
queryBuilder.append(" ORDER BY RAND()");
}
queryBuilder.append(" LIMIT ").append(limit);
try (PreparedStatement stmt = conn.prepareStatement(queryBuilder.toString());
ResultSet rs = stmt.executeQuery()) {
// Infer schema from result set metadata
schema = inferSchemaFromResultSet(rs.getMetaData());
// Collect sample records
while (rs.next()) {
StructuredRecord.Builder builder = StructuredRecord.builder(schema);
for (Schema.Field field : schema.getFields()) {
String fieldName = field.getName();
Object value = rs.getObject(fieldName);
builder.set(fieldName, convertValue(value, field.getSchema()));
}
samples.add(builder.build());
}
}
} catch (SQLException e) {
throw new RuntimeException("Failed to sample data from " + path, e);
}
return new SampleDetail(SampleType.TABLE, samples, schema);
}Request object for generating connector specifications.
package io.cdap.cdap.etl.api.connector;
public class ConnectorSpecRequest {
// Request parameters for spec generation
}Generated specification from connector.
package io.cdap.cdap.etl.api.connector;
public class ConnectorSpec {
// Generated plugin specification with properties and schema
}Plugin specification details.
package io.cdap.cdap.etl.api.connector;
public class PluginSpec {
// Plugin specification metadata
}Specification Generation Example:
@Override
public ConnectorSpec generateSpec(ConnectorContext context, ConnectorSpecRequest request) {
String path = request.getPath();
String[] pathParts = path.split("/");
if (pathParts.length < 3) {
throw new IllegalArgumentException("Invalid path for spec generation: " + path);
}
String database = pathParts[1];
String table = pathParts[2];
// Generate properties for the source/sink plugin
Map<String, String> properties = new HashMap<>();
properties.put("referenceName", sanitizeReferenceName(table));
properties.put("connectionString", config.connectionString);
properties.put("username", config.username);
properties.put("database", database);
properties.put("table", table);
// Add authentication properties if needed
if (config.useSSL) {
properties.put("sslMode", "required");
}
// Infer schema from table metadata
Schema schema = null;
try (Connection conn = getConnection()) {
schema = inferSchemaFromTable(conn, database, table);
} catch (SQLException e) {
throw new RuntimeException("Failed to infer schema for " + path, e);
}
// Build connector spec
ConnectorSpec.Builder builder = ConnectorSpec.builder()
.setSchema(schema)
.addProperties(properties);
// Add plugin-specific configurations
String pluginType = request.getPluginType();
if ("batchsource".equals(pluginType)) {
builder.addProperty("query", generateSelectQuery(database, table, schema));
} else if ("batchsink".equals(pluginType)) {
builder.addProperty("tableName", table);
builder.addProperty("columns", getColumnList(schema));
}
return builder.build();
}
private Schema inferSchemaFromTable(Connection conn, String database, String table)
throws SQLException {
Schema.Builder schemaBuilder = Schema.recordOf(table);
try (ResultSet rs = conn.getMetaData().getColumns(database, null, table, null)) {
while (rs.next()) {
String columnName = rs.getString("COLUMN_NAME");
int jdbcType = rs.getInt("DATA_TYPE");
boolean nullable = rs.getInt("NULLABLE") == DatabaseMetaData.columnNullable;
Schema fieldSchema = mapJdbcTypeToSchema(jdbcType);
if (nullable) {
fieldSchema = Schema.nullableOf(fieldSchema);
}
schemaBuilder.addField(Schema.Field.of(columnName, fieldSchema));
}
}
return schemaBuilder.build();
}public class FileSystemConnector implements DirectConnector {
@Override
public BrowseDetail browse(ConnectorContext context, BrowseRequest request) {
String path = request.getPath();
List<BrowseEntity> entities = new ArrayList<>();
try {
FileSystem fs = FileSystem.get(getConfiguration());
Path browsePath = new Path(path.isEmpty() ? "/" : path);
if (fs.exists(browsePath) && fs.isDirectory(browsePath)) {
FileStatus[] statuses = fs.listStatus(browsePath);
for (FileStatus status : statuses) {
String name = status.getPath().getName();
String fullPath = status.getPath().toString();
if (status.isDirectory()) {
entities.add(new BrowseEntity(name, fullPath, "directory", true, false));
} else {
String fileType = detectFileType(name);
boolean canSample = isSupportedFormat(fileType);
entities.add(new BrowseEntity(name, fullPath, fileType, false, canSample));
}
}
}
} catch (IOException e) {
throw new RuntimeException("Failed to browse path: " + path, e);
}
return new BrowseDetail(entities, entities.size());
}
private String detectFileType(String fileName) {
String extension = fileName.substring(fileName.lastIndexOf('.') + 1).toLowerCase();
switch (extension) {
case "csv": return "csv";
case "json": return "json";
case "avro": return "avro";
case "parquet": return "parquet";
default: return "file";
}
}
}private Schema mapJdbcTypeToSchema(int jdbcType) {
switch (jdbcType) {
case Types.VARCHAR:
case Types.CHAR:
case Types.LONGVARCHAR:
return Schema.of(Schema.Type.STRING);
case Types.INTEGER:
return Schema.of(Schema.Type.INT);
case Types.BIGINT:
return Schema.of(Schema.Type.LONG);
case Types.FLOAT:
case Types.REAL:
return Schema.of(Schema.Type.FLOAT);
case Types.DOUBLE:
case Types.NUMERIC:
case Types.DECIMAL:
return Schema.of(Schema.Type.DOUBLE);
case Types.BOOLEAN:
case Types.BIT:
return Schema.of(Schema.Type.BOOLEAN);
case Types.TIMESTAMP:
case Types.TIME:
case Types.DATE:
return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
case Types.BINARY:
case Types.VARBINARY:
case Types.LONGVARBINARY:
return Schema.of(Schema.Type.BYTES);
default:
return Schema.of(Schema.Type.STRING); // Default to string for unknown types
}
}
private Object convertValue(Object value, Schema schema) {
if (value == null) {
return null;
}
Schema.Type type = schema.isNullable() ? schema.getNonNullable().getType() : schema.getType();
switch (type) {
case STRING:
return value.toString();
case INT:
return value instanceof Number ? ((Number) value).intValue() : Integer.parseInt(value.toString());
case LONG:
return value instanceof Number ? ((Number) value).longValue() : Long.parseLong(value.toString());
case FLOAT:
return value instanceof Number ? ((Number) value).floatValue() : Float.parseFloat(value.toString());
case DOUBLE:
return value instanceof Number ? ((Number) value).doubleValue() : Double.parseDouble(value.toString());
case BOOLEAN:
return value instanceof Boolean ? value : Boolean.parseBoolean(value.toString());
case BYTES:
return value instanceof byte[] ? value : value.toString().getBytes();
default:
return value;
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap-etl-api