Service Provider Interfaces (SPIs) that define extensible contracts for CDAP runtime functionality, enabling pluggable implementations of cluster provisioning, job lifecycle management, and infrastructure integration
—
Core SPI for creating, managing, and destroying compute clusters across different cloud platforms and environments. The provisioning system provides standardized interfaces for cluster lifecycle management with pluggable provisioner implementations that can integrate with various infrastructure providers.
Main SPI interface that defines the contract for cluster provisioning implementations. Implementers provide concrete provisioning logic for specific platforms (AWS, GCP, Azure, Kubernetes, etc.).
/**
* Main interface for creating and managing clusters
* Implementations provide platform-specific provisioning logic
*/
interface Provisioner {
/** Get the specification of this provisioner */
ProvisionerSpecification getSpec();
/** Initialize the provisioner with system context */
void initialize(ProvisionerSystemContext context) throws Exception;
/** Validate provisioner-specific properties (throws IllegalArgumentException if invalid) */
void validateProperties(Map<String, String> properties);
/** Create a new cluster with the given context */
Cluster createCluster(ProvisionerContext context) throws Exception;
/** Get the current status of a cluster */
ClusterStatus getClusterStatus(ProvisionerContext context, Cluster cluster) throws Exception;
/** Get the reason for cluster failure if it is available (only when status is FAILED) */
String getClusterFailureMsg(ProvisionerContext context, Cluster cluster) throws Exception;
/** Get detailed information about a cluster */
Cluster getClusterDetail(ProvisionerContext context, Cluster cluster) throws Exception;
/** Initialize a cluster after it has been created */
void initializeCluster(ProvisionerContext context, Cluster cluster) throws Exception;
/** @deprecated Since 6.2.0. Use deleteClusterWithStatus instead */
@Deprecated
void deleteCluster(ProvisionerContext context, Cluster cluster) throws Exception;
/** Delete a cluster with its current status */
ClusterStatus deleteClusterWithStatus(ProvisionerContext context, Cluster cluster) throws Exception;
/** Get the polling strategy for checking cluster status */
PollingStrategy getPollingStrategy(ProvisionerContext context, Cluster cluster);
/** Get the capabilities of this provisioner */
Capabilities getCapabilities();
/** Get the runtime job manager for this provisioner (optional) */
Optional<RuntimeJobManager> getRuntimeJobManager(ProvisionerContext context);
/** Get a label describing total processing CPUs from properties */
Optional<String> getTotalProcessingCpusLabel(Map<String, String> properties);
}Usage Example:
public class MyCloudProvisioner implements Provisioner {
@Override
public ProvisionerSpecification getSpec() {
return new ProvisionerSpecification("my-cloud",
"My Cloud Provisioner",
"Provisions clusters on MyCloud platform");
}
@Override
public Cluster createCluster(ProvisionerContext context) throws Exception {
Map<String, String> properties = context.getProperties();
ProgramRunInfo runInfo = context.getProgramRunInfo();
// Create cluster using cloud provider APIs
String clusterId = createCloudCluster(properties, runInfo);
return new Cluster(clusterId, ClusterStatus.CREATING,
Collections.emptyList(), properties);
}
@Override
public ClusterStatus getClusterStatus(ProvisionerContext context, Cluster cluster)
throws Exception {
// Check cluster status via cloud provider API
return checkCloudClusterStatus(cluster.getName());
}
// Other method implementations...
}Runtime context provided to provisioner operations containing program information, configuration, and utility services.
/**
* Context for provisioner operations providing runtime information and services
*/
interface ProvisionerContext {
/** @deprecated Use getProgramRunInfo() instead */
@Deprecated
ProgramRun getProgramRun();
/** Get program run information */
ProgramRunInfo getProgramRunInfo();
/** Get provisioner-specific properties */
Map<String, String> getProperties();
/** Get SSH context for remote operations (nullable) */
@Nullable SSHContext getSSHContext();
/** Get Spark compatibility version */
SparkCompat getSparkCompat();
/** @deprecated Use getCDAPVersionInfo() instead */
@Deprecated
String getCDAPVersion();
/** Get CDAP version information */
VersionInfo getCDAPVersionInfo();
/** Get application-specific CDAP version information (nullable) */
@Nullable VersionInfo getAppCDAPVersionInfo();
/** Get location factory for file operations */
LocationFactory getLocationFactory();
/** Get runtime monitor type */
RuntimeMonitorType getRuntimeMonitorType();
/** Get provisioner metrics collector */
ProvisionerMetrics getMetrics(Map<String, String> tags);
/** Execute a task asynchronously */
<T> CompletionStage<T> execute(Callable<T> callable);
/** Get profile name (nullable) */
@Nullable String getProfileName();
/** Get error category for exception handling (nullable) */
@Nullable ErrorCategory getErrorCategory();
}System-level context for provisioner initialization and configuration management.
/**
* System-level context for provisioner initialization
*/
interface ProvisionerSystemContext {
/** Get system properties */
Map<String, String> getProperties();
/** Reload properties from system configuration */
void reloadProperties();
/** Get CDAP version (deprecated, use getCDAPVersionInfo) */
@Deprecated
String getCDAPVersion();
/** Get a distributed lock with the given name */
Lock getLock(String name);
}Core data structures representing clusters and their components.
/**
* Information about a cluster including nodes and properties
*/
class Cluster {
/** Get cluster name/identifier */
String getName();
/** Get current cluster status */
ClusterStatus getStatus();
/** Get list of cluster nodes */
List<Node> getNodes();
/** Get cluster properties */
Map<String, String> getProperties();
}
/**
* Information about a cluster node
*/
class Node {
/** Get unique node identifier */
String getId();
/** Get node type */
Node.Type getType();
/** Get node IP address */
String getIpAddress();
/** Get node creation timestamp */
long getCreateTime();
/** Get node-specific properties */
Map<String, String> getProperties();
/** Node types */
enum Type {
MASTER, // Master/coordinator nodes
WORKER, // Worker/compute nodes
UNKNOWN // Unknown or unspecified type
}
}Status enumeration and related utilities for cluster state tracking.
/**
* Status values for clusters
*/
enum ClusterStatus {
CREATING, // Cluster is being created
RUNNING, // Cluster is running and available
FAILED, // Cluster creation or operation failed
DELETING, // Cluster is being deleted
NOT_EXISTS, // Cluster does not exist
ORPHANED // Cluster exists but is not managed
}Metadata describing a provisioner implementation.
/**
* Specification of a provisioner including name, label, and description
*/
class ProvisionerSpecification {
/** Get provisioner name (unique identifier) */
String getName();
/** Get human-readable label */
String getLabel();
/** Get detailed description */
String getDescription();
}Declaration of what dataset types a provisioner can handle.
/**
* Encapsulates provisioner capabilities for specific dataset types
*/
class Capabilities {
/** Get supported dataset types */
Set<String> getDatasetTypes();
/** Check if capabilities are empty */
boolean isEmpty();
/** Empty capabilities constant */
static final Capabilities EMPTY;
}Configurable strategies for polling cluster status during operations.
/**
* Defines how to poll for cluster status changes
*/
interface PollingStrategy {
/**
* Get next poll interval in milliseconds
* @param numPolls Number of polls already performed
* @param startTime Time when polling started
* @return Milliseconds to wait before next poll, or -1 to stop polling
*/
long nextPoll(int numPolls, long startTime);
}
/**
* Factory for common polling strategies
*/
class PollingStrategies {
/** Create fixed interval polling strategy */
static PollingStrategy fixedInterval(long interval, TimeUnit unit);
/** Add initial delay to existing strategy */
static PollingStrategy initialDelay(PollingStrategy strategy, long delay, TimeUnit unit);
/** Add jittered initial delay to existing strategy */
static PollingStrategy initialDelay(PollingStrategy strategy, long minDelay,
long maxDelay, TimeUnit unit);
}Usage Example:
// Fixed 30-second polling
PollingStrategy fixedStrategy = PollingStrategies.fixedInterval(30, TimeUnit.SECONDS);
// Fixed polling with 2-minute initial delay
PollingStrategy delayedStrategy = PollingStrategies.initialDelay(
PollingStrategies.fixedInterval(15, TimeUnit.SECONDS),
2, TimeUnit.MINUTES
);Interface for collecting provisioner operation metrics.
/**
* Interface for collecting provisioner metrics
*/
interface ProvisionerMetrics {
/** Increment a counter metric */
void count(String metricName, int delta);
/** Set a gauge metric value */
void gauge(String metricName, long value);
}Specialized exceptions for provisioner operations.
/**
* Exception indicating a retryable provisioning failure
*/
class RetryableProvisionException extends Exception implements ErrorTagProvider {
/** Get error tags for categorization */
Set<String> getErrorTags();
}Common property keys used in cluster configuration.
/**
* Common constants for cluster properties
*/
final class ClusterProperties {
/** Kerberos principal property key */
static final String KERBEROS_PRINCIPAL = "kerberos.principal";
/** Kerberos keytab property key */
static final String KERBEROS_KEYTAB = "kerberos.keytab";
}Legacy classes maintained for backward compatibility.
/**
* @deprecated since 6.2.0. Use ProgramRunInfo instead
*/
@Deprecated
class ProgramRun {
String getNamespace();
String getApplication();
String getProgram();
String getRun();
}Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap-runtime-spi