Service Provider Interfaces (SPIs) that define extensible contracts for CDAP runtime functionality, enabling pluggable implementations of cluster provisioning, job lifecycle management, and infrastructure integration
npx @tessl/cli install tessl/maven-io-cdap-cdap--cdap-runtime-spi@6.11.0CDAP Runtime Service Provider Interface (SPI) library that defines extensible interfaces for runtime provisioning, job management, and infrastructure integration within the CDAP data application platform. This library enables pluggable implementations of cluster provisioning, runtime job lifecycle management, SSH operations, and profile management through standardized interfaces.
pom.xml:<dependency>
<groupId>io.cdap.cdap</groupId>
<artifactId>cdap-runtime-spi</artifactId>
<version>6.11.0</version>
</dependency>import io.cdap.cdap.runtime.spi.provisioner.Provisioner;
import io.cdap.cdap.runtime.spi.provisioner.ProvisionerContext;
import io.cdap.cdap.runtime.spi.runtimejob.RuntimeJobManager;
import io.cdap.cdap.runtime.spi.ssh.SSHContext;
import io.cdap.cdap.runtime.spi.ssh.SSHKeyPair;
import io.cdap.cdap.runtime.spi.ProgramRunInfo;
import io.cdap.cdap.runtime.spi.profile.ProfileStatus;
import java.security.KeyException;
import java.io.IOException;// Create program run information
ProgramRunInfo runInfo = new ProgramRunInfo.Builder()
.setNamespace("default")
.setApplication("myapp")
.setVersion("1.0.0")
.setProgramType("workflow")
.setProgram("myworkflow")
.setRun("run-123")
.build();
// Implement a custom provisioner
public class MyProvisioner implements Provisioner {
@Override
public ProvisionerSpecification getSpec() {
return new ProvisionerSpecification("my-provisioner",
"My Custom Provisioner",
"Custom provisioner for cloud resources");
}
@Override
public Cluster createCluster(ProvisionerContext context) throws Exception {
// Implementation for creating clusters
return new Cluster(/* cluster details */);
}
// Additional provisioner methods...
}CDAP Runtime SPI is built around several key architectural patterns:
Provisioner, RuntimeJobManager) define contracts for pluggable implementationsProvisionerContext, SSHContext) provide runtime information and capabilities to implementationsProgramRunInfo use fluent builders for constructionCloseable ensure proper cleanup of resources like SSH sessions and job managersClusterStatus, RuntimeJobStatus) provide clear state managementCore SPI for creating, managing, and destroying compute clusters across different cloud platforms and environments. Provides standardized interfaces for cluster lifecycle management with pluggable provisioner implementations.
interface Provisioner {
ProvisionerSpecification getSpec();
void initialize(ProvisionerSystemContext context) throws Exception;
void validateProperties(Map<String, String> properties);
Cluster createCluster(ProvisionerContext context) throws Exception;
ClusterStatus getClusterStatus(ProvisionerContext context, Cluster cluster) throws Exception;
Cluster getClusterDetail(ProvisionerContext context, Cluster cluster) throws Exception;
void initializeCluster(ProvisionerContext context, Cluster cluster) throws Exception;
void deleteClusterWithStatus(ProvisionerContext context, Cluster cluster,
ClusterStatus clusterStatus) throws Exception;
PollingStrategy getPollingStrategy(ProvisionerContext context, Cluster cluster);
Capabilities getCapabilities();
Optional<RuntimeJobManager> getRuntimeJobManager(ProvisionerContext context);
String getTotalProcessingCpusLabel(Map<String, String> properties);
}SPI for launching, monitoring, and managing runtime jobs within provisioned clusters. Handles job lifecycle from submission through completion with status tracking and resource management.
interface RuntimeJobManager extends Closeable {
void launch(RuntimeJobInfo jobInfo) throws Exception;
Optional<RuntimeJobDetail> getDetail(ProgramRunInfo programRunInfo) throws Exception;
void stop(ProgramRunInfo programRunInfo) throws Exception;
void kill(RuntimeJobDetail runtimeJobDetail) throws Exception;
}
interface RuntimeJob {
void run(RuntimeJobEnvironment environment) throws Exception;
void requestStop();
}Comprehensive SSH utilities for secure remote operations including session management, command execution, file transfer, and port forwarding. Essential for cluster initialization and remote job management.
interface SSHContext {
SSHKeyPair generate(String user) throws Exception;
SSHKeyPair generate(String user, int keySize) throws Exception;
void setSSHKeyPair(SSHKeyPair keyPair);
Optional<SSHKeyPair> getSSHKeyPair();
SSHSession createSSHSession(String host) throws Exception;
SSHSession createSSHSession(SSHKeyPair keyPair, String host) throws Exception;
}
interface SSHSession extends Closeable {
boolean isAlive();
InetSocketAddress getAddress();
String getUsername();
SSHProcess execute(String... commands) throws Exception;
SSHProcess executeAndWait(String... commands) throws Exception;
void copy(Path localFile, String remotePath) throws Exception;
PortForwarding createLocalPortForward(String remoteHost, int remotePort,
int localPort,
PortForwarding.DataConsumer dataConsumer)
throws Exception;
RemotePortForwarding createRemotePortForward(int remotePort, int localPort)
throws Exception;
}Foundation classes and enums that support all SPI operations, including program run information, version handling, compatibility settings, and monitoring configurations.
class ProgramRunInfo {
String getNamespace();
String getApplication();
String getVersion();
String getProgramType();
String getProgram();
String getRun();
static Builder builder();
interface Builder {
Builder setNamespace(String namespace);
Builder setApplication(String application);
Builder setVersion(String version);
Builder setProgramType(String programType);
Builder setProgram(String program);
Builder setRun(String run);
ProgramRunInfo build();
}
}
enum RuntimeMonitorType {
SSH, URL
}
enum SparkCompat {
SPARK2_2_11("spark2_2.11"),
SPARK3_2_12("spark3_2.12");
String getCompat();
}Simple profile status management for runtime configuration profiles, controlling whether profiles can be assigned to programs or deleted from the system.
enum ProfileStatus {
ENABLED, // Profile can be assigned to programs, cannot be deleted
DISABLED // Profile cannot be assigned, can be deleted
}class Cluster {
String getName();
ClusterStatus getStatus();
List<Node> getNodes();
Map<String, String> getProperties();
}
class Node {
String getId();
Node.Type getType();
String getIpAddress();
long getCreateTime();
Map<String, String> getProperties();
enum Type {
MASTER, WORKER, UNKNOWN
}
}
interface VersionInfo extends Comparable<Object> {
int getMajor();
int getMinor();
int getFix();
boolean isSnapshot();
long getBuildTime();
}enum ClusterStatus {
CREATING, RUNNING, FAILED, DELETING, NOT_EXISTS, ORPHANED
}
enum RuntimeJobStatus {
STARTING(false), RUNNING(false), STOPPING(false),
STOPPED(true), COMPLETED(true), FAILED(true), UNKNOWN(false);
boolean isTerminated();
}
enum LaunchMode {
CLIENT, // Launch in-process
CLUSTER // Launch in separate container
}class RetryableProvisionException extends Exception implements ErrorTagProvider {
Set<String> getErrorTags();
}
class ProgramRunFailureException extends RuntimeException {
// Exception for program runs that completed but failed
}