CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-cdap-cdap--cdap-runtime-spi

Service Provider Interfaces (SPIs) that define extensible contracts for CDAP runtime functionality, enabling pluggable implementations of cluster provisioning, job lifecycle management, and infrastructure integration

Pending
Overview
Eval results
Files

runtime-job.mddocs/

Runtime Job Management

SPI for launching, monitoring, and managing runtime jobs within provisioned clusters. The runtime job system handles job lifecycle from submission through completion with comprehensive status tracking, resource management, and environment abstraction. This enables pluggable job execution engines across different compute environments.

Capabilities

Runtime Job Manager Interface

Main SPI interface for managing the lifecycle of runtime jobs. Implementations provide platform-specific job management logic for environments like Hadoop YARN, Kubernetes, or cloud-native job services.

/**
 * Manages runtime job lifecycle with platform-specific implementations
 * Must be closed to release resources properly
 */
interface RuntimeJobManager extends Closeable {
    /**
     * Launch a new runtime job
     * @param jobInfo Information needed to launch the job
     * @throws Exception if job launch fails
     */
    void launch(RuntimeJobInfo jobInfo) throws Exception;
    
    /**
     * Get details about a running job
     * @param programRunInfo Program run information to look up
     * @return Job details if found, empty if not found
     * @throws Exception if lookup fails
     */
    Optional<RuntimeJobDetail> getDetail(ProgramRunInfo programRunInfo) throws Exception;
    
    /**
     * Stop a running job gracefully
     * @param programRunInfo Program run information
     * @throws Exception if stop operation fails
     */
    void stop(ProgramRunInfo programRunInfo) throws Exception;
    
    /**
     * Kill a running job immediately
     * @param runtimeJobDetail Job details including current status
     * @throws Exception if kill operation fails
     */
    void kill(RuntimeJobDetail runtimeJobDetail) throws Exception;
    
    /** Clean up manager resources */
    void close() throws IOException;
}

Usage Example:

public class KubernetesJobManager implements RuntimeJobManager {
    private final KubernetesClient k8sClient;
    
    @Override
    public void launch(RuntimeJobInfo jobInfo) throws Exception {
        ProgramRunInfo runInfo = jobInfo.getProgramRunInfo();
        
        // Create Kubernetes Job spec
        Job job = new JobBuilder()
            .withNewMetadata()
                .withName(generateJobName(runInfo))
                .withLabels(createJobLabels(runInfo))
            .endMetadata()
            .withNewSpec()
                // Configure job spec from jobInfo
                .withParallelism(1)
                .withNewTemplate()
                    .withNewSpec()
                        .addNewContainer()
                            .withName("cdap-runtime")
                            .withImage("cdap/runtime:latest")
                            .withArgs(jobInfo.getArguments().toArray(new String[0]))
                            .withNewResources()
                                .addToRequests("cpu", new Quantity(String.valueOf(jobInfo.getVirtualCores())))
                                .addToRequests("memory", new Quantity(jobInfo.getMemoryMb() + "Mi"))
                            .endResources()
                        .endContainer()
                    .endSpec()
                .endTemplate()
            .endSpec()
            .build();
            
        k8sClient.batch().v1().jobs().create(job);
    }
    
    @Override
    public Optional<RuntimeJobDetail> getDetail(ProgramRunInfo programRunInfo) throws Exception {
        String jobName = generateJobName(programRunInfo);
        Job job = k8sClient.batch().v1().jobs().withName(jobName).get();
        
        if (job == null) {
            return Optional.empty();
        }
        
        RuntimeJobStatus status = mapJobStatus(job.getStatus());
        return Optional.of(new RuntimeJobDetail(programRunInfo, status));
    }
    
    // Other method implementations...
}

Runtime Job Interface

Interface representing an individual job that can be executed in a runtime environment.

/**
 * Represents a job executed in a runtime environment
 * Implementations define the actual job logic
 */
interface RuntimeJob {
    /**
     * Execute the job in the given environment
     * @param environment Runtime environment providing necessary services
     * @throws Exception if job execution fails
     */
    void run(RuntimeJobEnvironment environment) throws Exception;
    
    /**
     * Request the job to stop gracefully
     * Implementation should handle this asynchronously
     */
    void requestStop();
}

Runtime Job Information

Interface defining all information needed to launch a runtime job.

/**
 * Information needed to launch a runtime job
 * Provides job configuration, resources, and execution parameters
 */
interface RuntimeJobInfo {
    /**
     * Get files that need to be localized for the job
     * @return Collection of LocalFile objects for job execution
     */
    Collection<? extends LocalFile> getLocalizeFiles();
    
    /**
     * Get the runtime job class name to execute
     * @return Fully qualified class name implementing RuntimeJob
     */
    String getRuntimeJobClassname();
    
    /**
     * Get program run information
     * @return Program run details
     */
    ProgramRunInfo getProgramRunInfo();
    
    /**
     * Get JVM properties for the job process
     * @return Map of JVM system properties
     */
    Map<String, String> getJvmProperties();
    
    /**
     * Get command line arguments for the job process
     * @return Map of command line arguments
     */
    Map<String, String> getArguments();
    
    /**
     * Get number of virtual CPU cores requested
     * @return Number of virtual cores
     */
    int getVirtualCores();
    
    /**
     * Get memory allocation in megabytes
     * @return Memory in MB
     */
    int getMemoryMb();
}

Runtime Job Environment

Interface providing the runtime environment and services available to executing jobs.

/**
 * Runtime environment for job execution
 * Provides access to services and configuration needed by jobs
 */
interface RuntimeJobEnvironment {
    /**
     * Get location factory for file system operations
     * @return Location factory instance
     */
    LocationFactory getLocationFactory();
    
    /**
     * Get Twill runner for launching distributed applications
     * @return Twill runner instance
     */
    org.apache.twill.api.TwillRunner getTwillRunner();
    
    /**
     * Get environment-specific properties
     * @return Map of environment properties
     */
    Map<String, String> getProperties();
    
    /**
     * Get launch mode for the job
     * @return Launch mode (CLIENT or CLUSTER)
     */
    LaunchMode getLaunchMode();
}

Runtime Job Detail

Data class containing runtime job information and current status.

/**
 * Runtime job details including status information
 */
class RuntimeJobDetail {
    /**
     * Get program run information for this job
     * @return Program run info
     */
    ProgramRunInfo getRunInfo();
    
    /**
     * Get current job status
     * @return Current runtime job status
     */
    RuntimeJobStatus getStatus();
}

Launch Mode

Enumeration defining how jobs should be launched.

/**
 * Program launch mode defining execution environment
 */
enum LaunchMode {
    /**
     * Launch job in-process within the client
     * Suitable for lightweight jobs or development
     */
    CLIENT,
    
    /**
     * Launch job in separate container/process
     * Suitable for production workloads requiring isolation
     */
    CLUSTER
}

Runtime Job Status

Comprehensive status tracking for runtime jobs with termination state information.

/**
 * Status values for runtime jobs with termination information
 */
enum RuntimeJobStatus {
    /** Job is starting up */
    STARTING(false),
    
    /** Job is running normally */
    RUNNING(false),
    
    /** Job is in the process of stopping */
    STOPPING(false),
    
    /** Job has stopped (may be restarted) */
    STOPPED(true),
    
    /** Job completed successfully */
    COMPLETED(true),
    
    /** Job failed with an error */
    FAILED(true),
    
    /** Job status is unknown */
    UNKNOWN(false);
    
    private final boolean terminated;
    
    RuntimeJobStatus(boolean terminated) {
        this.terminated = terminated;
    }
    
    /**
     * Check if this status represents a terminated job
     * @return true if job is in terminal state
     */
    public boolean isTerminated() {
        return terminated;
    }
}

Usage Example:

RuntimeJobDetail jobDetail = jobManager.getDetail(programRunInfo).orElse(null);
if (jobDetail != null) {
    RuntimeJobStatus status = jobDetail.getStatus();
    
    if (status.isTerminated()) {
        System.out.println("Job has completed with status: " + status);
        
        switch (status) {
            case COMPLETED:
                System.out.println("Job completed successfully");
                break;
            case FAILED:
                System.out.println("Job failed - check logs");
                break;
            case STOPPED:
                System.out.println("Job was stopped");
                break;
        }
    } else {
        System.out.println("Job is still running with status: " + status);
    }
}

Exception Types

Specialized exceptions for runtime job operations.

/**
 * Exception for program runs that completed but failed
 * Indicates the job execution itself failed rather than job management
 */
class ProgramRunFailureException extends RuntimeException {
    /**
     * Create exception with failure message
     * @param message Description of the failure
     */
    public ProgramRunFailureException(String message);
}

Usage Example:

public class MyRuntimeJob implements RuntimeJob {
    @Override
    public void run(RuntimeJobEnvironment environment) throws Exception {
        try {
            // Job logic here
            boolean success = executeJobLogic(environment);
            
            if (!success) {
                throw new ProgramRunFailureException("Job logic failed validation");
            }
            
        } catch (Exception e) {
            // Convert to appropriate exception type
            if (isJobLogicFailure(e)) {
                throw new ProgramRunFailureException("Job failed: " + e.getMessage());
            } else {
                // Re-throw infrastructure failures as-is
                throw e;
            }
        }
    }
    
    @Override
    public void requestStop() {
        // Implement graceful shutdown logic
        this.shouldStop = true;
    }
    
    private boolean executeJobLogic(RuntimeJobEnvironment env) {
        // Implementation specific logic
        return true;
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-cdap-cdap--cdap-runtime-spi

docs

core-types.md

index.md

profile.md

provisioner.md

runtime-job.md

ssh.md

tile.json