CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-spark-core

Core Spark 1.x integration component for the Cask Data Application Platform providing runtime services and execution context for CDAP applications

Pending
Overview
Eval results
Files

http-services.mddocs/

HTTP Service Framework

HTTP service framework that allows Spark applications to expose web endpoints and REST APIs while maintaining full integration with CDAP's service discovery, security model, and resource management capabilities.

Capabilities

Spark HTTP Service Server

HTTP service server that enables Spark applications to expose web endpoints and handle HTTP requests with full lifecycle management.

/**
 * HTTP service server for Spark applications
 * Provides web endpoint capabilities with proper lifecycle management
 */
public class SparkHttpServiceServer {
    /**
     * Starts the HTTP service server and waits for it to be ready
     * Blocks until the server is fully started and ready to accept requests
     * @throws Exception if server startup fails
     */
    public void startAndWait() throws Exception;
    
    /**
     * Stops the HTTP service server and waits for shutdown to complete
     * Blocks until the server is fully stopped and all resources are released
     * @throws Exception if server shutdown fails
     */
    public void stopAndWait() throws Exception;
    
    /**
     * Gets the bind address of the running server
     * @return InetSocketAddress containing the host and port the server is bound to
     * @throws IllegalStateException if server is not running
     */
    public InetSocketAddress getBindAddress();
    
    /**
     * Checks if the server is running
     * @return true if the server is currently running
     */
    public boolean isRunning();
    
    /**
     * Gets the server state
     * @return Current state of the server (STARTING, RUNNING, STOPPING, STOPPED)
     */
    public State getState();
}

Default Spark HTTP Service Context (Scala)

Scala-based HTTP service context that provides access to CDAP services and program metadata within HTTP service handlers.

/**
 * Default implementation of HTTP service context for Spark applications
 * Provides access to CDAP services with Scala-friendly interfaces
 */
class DefaultSparkHttpServiceContext(runtimeContext: SparkRuntimeContext) extends SparkHttpServiceContext {
    /**
     * Gets the Spark program specification
     * @return SparkSpecification containing program metadata
     */
    def getSpecification: SparkSpecification
    
    /**
     * Gets the instance ID of this service
     * @return Instance ID (0-based) of this service instance
     */
    def getInstanceId: Int
    
    /**
     * Gets the total number of service instances
     * @return Total count of service instances
     */
    def getInstanceCount: Int
    
    /**
     * Gets the logical start time of the program
     * @return Start time in milliseconds since epoch
     */
    def getLogicalStartTime: Long
    
    /**
     * Gets runtime arguments as a Scala map
     * @return Map of argument key-value pairs
     */
    def getRuntimeArguments: Map[String, String]
    
    /**
     * Gets the dataset framework for dataset operations
     * @return DatasetFramework for dataset access
     */
    def getDatasetFramework: DatasetFramework
    
    /**
     * Gets the messaging context for pub-sub operations
     * @return MessagingContext for messaging operations
     */
    def getMessagingContext: MessagingContext
    
    /**
     * Gets the admin interface for CDAP operations
     * @return Admin interface for administrative operations
     */
    def getAdmin: Admin
}

Spark HTTP Service Plugin Context

Plugin context for HTTP services that provides access to plugins and their configurations within service handlers.

/**
 * Plugin context for HTTP services in Spark applications
 * Provides access to plugins and their configurations
 */
public class DefaultSparkHttpServicePluginContext implements PluginContext {
    /**
     * Gets runtime arguments for the service
     * @return Map of runtime argument key-value pairs
     */
    public Map<String, String> getRuntimeArguments();
    
    /**
     * Gets properties for a specific plugin
     * @param pluginId Identifier of the plugin
     * @return PluginProperties containing plugin configuration
     * @throws IllegalArgumentException if plugin not found
     */
    public PluginProperties getPluginProperties(String pluginId);
    
    /**
     * Loads a plugin class by ID
     * @param pluginId Identifier of the plugin
     * @param <T> Expected type of the plugin class
     * @return Class object for the plugin
     * @throws IllegalArgumentException if plugin not found
     * @throws ClassNotFoundException if plugin class cannot be loaded
     */
    public <T> Class<T> loadPluginClass(String pluginId);
    
    /**
     * Creates a new instance of a plugin
     * @param pluginId Identifier of the plugin
     * @param <T> Expected type of the plugin instance
     * @return New instance of the plugin
     * @throws IllegalArgumentException if plugin not found
     * @throws InstantiationException if plugin cannot be instantiated
     */
    public <T> T newPluginInstance(String pluginId) throws InstantiationException;
    
    /**
     * Checks if a plugin exists
     * @param pluginId Identifier of the plugin
     * @return true if the plugin exists
     */
    public boolean hasPlugin(String pluginId);
}

HTTP Service Handler Base

Base class for implementing HTTP service handlers with common functionality and lifecycle management.

/**
 * Base class for HTTP service handlers
 * Provides common functionality and lifecycle management
 */
public abstract class AbstractHttpServiceHandler implements HttpServiceHandler {
    /**
     * Initializes the service handler
     * Called once when the service starts
     * @param context HTTP service context
     * @throws Exception if initialization fails
     */
    public void initialize(HttpServiceContext context) throws Exception;
    
    /**
     * Destroys the service handler
     * Called once when the service stops
     */
    public void destroy();
    
    /**
     * Gets the HTTP service context
     * @return HttpServiceContext for accessing CDAP services
     */
    protected HttpServiceContext getContext();
    
    /**
     * Gets a dataset instance
     * @param datasetName Name of the dataset
     * @param <T> Expected dataset type
     * @return Dataset instance
     * @throws DatasetInstantiationException if dataset cannot be accessed
     */
    protected <T extends Dataset> T getDataset(String datasetName) throws DatasetInstantiationException;
    
    /**
     * Gets runtime arguments
     * @return Map of runtime arguments
     */
    protected Map<String, String> getRuntimeArguments();
}

Usage Examples

Basic HTTP Service Server:

import co.cask.cdap.app.runtime.spark.service.SparkHttpServiceServer;
import java.net.InetSocketAddress;

// Create and start HTTP service server
SparkHttpServiceServer server = new SparkHttpServiceServer(
    sparkHttpServiceContext, 
    httpServiceHandlers,
    httpServiceSpec
);

try {
    // Start server (blocks until ready)
    server.startAndWait();
    
    // Get server address
    InetSocketAddress address = server.getBindAddress();
    System.out.println("Server running on " + address.getHostName() + ":" + address.getPort());
    
    // Server is now accepting requests
    // ... application logic
    
} finally {
    // Stop server (blocks until stopped)
    server.stopAndWait();
}

Scala HTTP Service Context:

import co.cask.cdap.app.runtime.spark.service.DefaultSparkHttpServiceContext

// Create HTTP service context
val serviceContext = new DefaultSparkHttpServiceContext(runtimeContext)

// Access service metadata
val spec = serviceContext.getSpecification
val instanceId = serviceContext.getInstanceId
val instanceCount = serviceContext.getInstanceCount
val startTime = serviceContext.getLogicalStartTime

// Access CDAP services
val datasetFramework = serviceContext.getDatasetFramework
val messagingContext = serviceContext.getMessagingContext
val admin = serviceContext.getAdmin

// Use runtime arguments
val args = serviceContext.getRuntimeArguments
val port = args.getOrElse("service.port", "8080").toInt
val enableSsl = args.getOrElse("service.ssl.enabled", "false").toBoolean

HTTP Service Handler Implementation:

import co.cask.cdap.api.annotation.GET;
import co.cask.cdap.api.annotation.POST;
import co.cask.cdap.api.annotation.Path;
import co.cask.cdap.api.service.http.AbstractHttpServiceHandler;
import co.cask.cdap.api.service.http.HttpServiceRequest;
import co.cask.cdap.api.service.http.HttpServiceResponder;

@Path("/api/v1")
public class MySparkHttpHandler extends AbstractHttpServiceHandler {
    
    @Override
    public void initialize(HttpServiceContext context) throws Exception {
        super.initialize(context);
        // Custom initialization logic
    }
    
    @GET
    @Path("/status")
    public void getStatus(HttpServiceRequest request, HttpServiceResponder responder) {
        // Get service status
        Map<String, Object> status = new HashMap<>();
        status.put("instanceId", getContext().getInstanceId());
        status.put("instanceCount", getContext().getInstanceCount());
        status.put("startTime", getContext().getLogicalStartTime());
        
        responder.sendJson(200, status);
    }
    
    @POST
    @Path("/data")
    public void processData(HttpServiceRequest request, HttpServiceResponder responder) throws Exception {
        // Access dataset
        Dataset dataset = getDataset("my-dataset");
        
        // Process request data
        byte[] requestBody = request.getContent();
        String data = new String(requestBody, StandardCharsets.UTF_8);
        
        // Write to dataset
        // ... dataset operations
        
        responder.sendString(200, "Data processed successfully");
    }
}

Plugin Context Usage:

import co.cask.cdap.app.runtime.spark.service.DefaultSparkHttpServicePluginContext;

// Access plugin context
DefaultSparkHttpServicePluginContext pluginContext = // ... obtain from service context

// Check if plugin exists
if (pluginContext.hasPlugin("my-plugin")) {
    // Get plugin properties
    PluginProperties properties = pluginContext.getPluginProperties("my-plugin");
    String configValue = properties.getProperties().get("config.key");
    
    // Load plugin class
    Class<MyPlugin> pluginClass = pluginContext.loadPluginClass("my-plugin");
    
    // Create plugin instance
    MyPlugin plugin = pluginContext.newPluginInstance("my-plugin");
    plugin.configure(configValue);
}

Types

/**
 * Context interface for HTTP services
 */
public interface HttpServiceContext {
    /**
     * Gets the service specification
     * @return HttpServiceSpecification containing service metadata
     */
    HttpServiceSpecification getSpecification();
    
    /**
     * Gets the instance ID of this service
     * @return Instance ID (0-based) of this service instance
     */
    int getInstanceId();
    
    /**
     * Gets the total number of service instances
     * @return Total count of service instances
     */
    int getInstanceCount();
    
    /**
     * Gets runtime arguments
     * @return Map of runtime arguments
     */
    Map<String, String> getRuntimeArguments();
    
    /**
     * Gets the dataset framework
     * @return DatasetFramework for dataset operations
     */
    DatasetFramework getDatasetFramework();
}

/**
 * Interface for HTTP service handlers
 */
public interface HttpServiceHandler {
    /**
     * Initializes the handler
     * @param context HTTP service context
     * @throws Exception if initialization fails
     */
    void initialize(HttpServiceContext context) throws Exception;
    
    /**
     * Destroys the handler
     */
    void destroy();
}

/**
 * Specification for HTTP services
 */
public class HttpServiceSpecification {
    /**
     * Gets the service name
     * @return Name of the HTTP service
     */
    public String getName();
    
    /**
     * Gets the service description
     * @return Description of the HTTP service
     */
    public String getDescription();
    
    /**
     * Gets the service handlers
     * @return Map of handler names to handler specifications
     */
    public Map<String, HttpServiceHandlerSpecification> getHandlers();
    
    /**
     * Gets service resources
     * @return Resources allocated to the service
     */
    public Resources getResources();
}

/**
 * Server state enumeration
 */
public enum State {
    STARTING,    // Server is starting up
    RUNNING,     // Server is running and accepting requests
    STOPPING,    // Server is shutting down
    STOPPED      // Server has stopped
}

/**
 * Plugin properties container
 */
public class PluginProperties {
    /**
     * Gets the plugin properties
     * @return Map of property key-value pairs
     */
    public Map<String, String> getProperties();
    
    /**
     * Gets a property value
     * @param key Property key
     * @return Property value or null if not found
     */
    public String getProperty(String key);
    
    /**
     * Gets a property value with default
     * @param key Property key
     * @param defaultValue Default value if property not found
     * @return Property value or default value
     */
    public String getProperty(String key, String defaultValue);
}
/**
 * Trait defining the Spark HTTP service context interface
 */
trait SparkHttpServiceContext {
    /**
     * Gets the Spark program specification
     * @return SparkSpecification containing program metadata
     */
    def getSpecification: SparkSpecification
    
    /**
     * Gets the instance ID of this service
     * @return Instance ID (0-based) of this service instance
     */
    def getInstanceId: Int
    
    /**
     * Gets the total number of service instances
     * @return Total count of service instances
     */
    def getInstanceCount: Int
    
    /**
     * Gets the logical start time of the program
     * @return Start time in milliseconds since epoch
     */
    def getLogicalStartTime: Long
    
    /**
     * Gets runtime arguments as a Scala map
     * @return Map of argument key-value pairs
     */
    def getRuntimeArguments: Map[String, String]
}

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-spark-core

docs

data-processing.md

distributed-execution.md

dynamic-compilation.md

execution-contexts.md

http-services.md

index.md

runtime-providers.md

transaction-management.md

tile.json