CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-cdap-cdap--cdap-system-app-api

API for CDAP System Applications - provides interfaces and abstract classes for building system services that run in the CDAP system namespace

Pending
Overview
Eval results
Files

worker-tasks.mddocs/

Worker Tasks

Remote task execution framework allowing system services to run tasks on worker nodes with full system context, serializable parameters, and comprehensive error handling.

Capabilities

RunnableTask

Interface representing a task that can be launched by a Task worker service.

/**
 * RunnableTask represents a task that can be launched by a Task worker service.
 */
public interface RunnableTask {
  /**
   * Executes the task with the provided context.
   * @param context the execution context for the task
   * @throws Exception if task execution fails
   */
  void run(RunnableTaskContext context) throws Exception;
}

Usage Example:

import io.cdap.cdap.api.service.worker.RunnableTask;
import io.cdap.cdap.api.service.worker.RunnableTaskContext;

public class DataProcessingTask implements RunnableTask {
  @Override
  public void run(RunnableTaskContext context) throws Exception {
    // Get task parameters
    String param = context.getParam();
    String namespace = context.getNamespace();
    
    // Perform task logic
    String result = processData(param, namespace);
    
    // Write result back
    context.writeResult(result.getBytes());
    
    // Set cleanup task if needed
    context.setCleanupTask(() -> {
      // Cleanup resources
    });
  }
  
  private String processData(String param, String namespace) {
    // Task implementation
    return "processed: " + param;
  }
}

RunnableTaskRequest

Request object for launching a runnable task with parameters and configuration.

/**
 * Request for launching a runnable task.
 */
public class RunnableTaskRequest {
  /**
   * Returns the task class name.
   * @return class name of the task to execute
   */
  public String getClassName();
  
  /**
   * Returns the task parameter.
   * @return task parameter or null if not set
   */
  @Nullable
  public RunnableTaskParam getParam();
  
  /**
   * Returns the artifact ID.
   * @return artifact ID or null if not set
   */
  @Nullable
  public ArtifactId getArtifactId();
  
  /**
   * Returns the namespace.
   * @return namespace or null if not set
   */
  @Nullable
  public String getNamespace();
  
  /**
   * Returns builder for RunnableTaskRequest.
   * @param taskClassName the class name of the task
   * @return builder instance
   */
  public static Builder getBuilder(String taskClassName);
  
  /**
   * Builder for RunnableTaskRequest.
   */
  public static class Builder {
    /**
     * Sets parameter for the task.
     * @param param parameter string
     * @return builder instance
     */
    public Builder withParam(String param);
    
    /**
     * Sets namespace for the task.
     * @param namespace namespace string
     * @return builder instance
     */
    public Builder withNamespace(String namespace);
    
    /**
     * Sets artifact ID for the task.
     * @param artifactId artifact identifier
     * @return builder instance
     */
    public Builder withArtifact(ArtifactId artifactId);
    
    /**
     * Sets embedded task request.
     * @param embeddedTaskRequest nested task request
     * @return builder instance
     */
    public Builder withEmbeddedTaskRequest(RunnableTaskRequest embeddedTaskRequest);
    
    /**
     * Builds the request.
     * @return constructed RunnableTaskRequest
     */
    public RunnableTaskRequest build();
  }
}

Usage Example:

import io.cdap.cdap.api.service.worker.RunnableTaskRequest;
import io.cdap.cdap.api.artifact.ArtifactId;

// Create simple task request
RunnableTaskRequest simpleTask = RunnableTaskRequest
  .getBuilder("com.example.DataProcessingTask")
  .withParam("input-data")
  .withNamespace("analytics")
  .build();

// Create task request with artifact
ArtifactId artifact = new ArtifactId("my-plugin", "1.0.0", ArtifactScope.USER);
RunnableTaskRequest taskWithArtifact = RunnableTaskRequest
  .getBuilder("com.example.PluginTask")
  .withParam("plugin-config")
  .withArtifact(artifact)
  .withNamespace("default")
  .build();

// Create nested task request
RunnableTaskRequest embeddedTask = RunnableTaskRequest
  .getBuilder("com.example.SubTask")
  .withParam("sub-param")
  .build();

RunnableTaskRequest parentTask = RunnableTaskRequest
  .getBuilder("com.example.ParentTask")
  .withEmbeddedTaskRequest(embeddedTask)
  .build();

RunnableTaskContext

Context for RunnableTask execution, providing result writing and cleanup capabilities.

/**
 * Represents a context for a RunnableTask. This context is used for writing back 
 * the result of RunnableTask execution.
 */
public class RunnableTaskContext {
  /**
   * Constructor with task request.
   * @param taskRequest the originating task request
   */
  public RunnableTaskContext(RunnableTaskRequest taskRequest);
  
  /**
   * Constructor with task request and system app context.
   * @param taskRequest the originating task request
   * @param systemAppTaskContext system app task context (nullable)
   */
  public RunnableTaskContext(RunnableTaskRequest taskRequest, 
                           @Nullable SystemAppTaskContext systemAppTaskContext);
  
  /**
   * Writes result data.
   * @param data result data as byte array
   * @throws IOException if writing fails
   */
  public void writeResult(byte[] data) throws IOException;
  
  /**
   * Sets cleanup task to run after task completion.
   * @param cleanupTask cleanup runnable
   */
  public void setCleanupTask(Runnable cleanupTask);
  
  /**
   * Executes the cleanup task.
   */
  public void executeCleanupTask();
  
  /**
   * Sets whether to terminate the task runner on task completion.
   * @param terminate true to terminate after completion
   */
  public void setTerminateOnComplete(boolean terminate);
  
  /**
   * Returns true if terminate the task runner after the task completed.
   * @return termination flag
   */
  public boolean isTerminateOnComplete();
  
  /**
   * Gets the result as ByteBuffer.
   * @return result buffer
   */
  public ByteBuffer getResult();
  
  /**
   * Returns the class name.
   * @return task class name
   */
  public String getClassName();
  
  /**
   * Returns the parameter.
   * @return parameter string or null
   */
  @Nullable
  public String getParam();
  
  /**
   * Returns embedded request.
   * @return embedded task request or null
   */
  @Nullable
  public RunnableTaskRequest getEmbeddedRequest();
  
  /**
   * Returns namespace.
   * @return namespace string or null
   */
  @Nullable
  public String getNamespace();
  
  /**
   * Returns artifact ID.
   * @return artifact ID or null
   */
  @Nullable
  public ArtifactId getArtifactId();
  
  /**
   * Returns the system app task context.
   * @return system app task context or null
   */
  @Nullable
  public SystemAppTaskContext getRunnableTaskSystemAppContext();
}

RunnableTaskParam

Parameter wrapper for runnable task requests supporting both simple strings and embedded task requests.

/**
 * Class for the parameter of RunnableTaskRequest.
 */
public class RunnableTaskParam {
  /**
   * Constructor with simple parameter and embedded task request.
   * @param simpleParam parameter string (nullable)
   * @param embeddedTaskRequest embedded task request (nullable)
   */
  public RunnableTaskParam(@Nullable String simpleParam, 
                          @Nullable RunnableTaskRequest embeddedTaskRequest);
  
  /**
   * Returns embedded task request.
   * @return embedded task request or null
   */
  @Nullable
  public RunnableTaskRequest getEmbeddedTaskRequest();
  
  /**
   * Returns simple parameter.
   * @return parameter string or null
   */
  @Nullable
  public String getSimpleParam();
  
  /**
   * String representation.
   * @return string representation
   */
  public String toString();
  
  /**
   * Equals implementation.
   * @param o object to compare
   * @return true if equal
   */
  public boolean equals(Object o);
  
  /**
   * Hash code implementation.
   * @return hash code
   */
  public int hashCode();
}

SystemAppTaskContext

System App context for remote tasks with plugin configuration, artifact management, and macro evaluation.

/**
 * System App context for a remote task.
 */
public interface SystemAppTaskContext 
    extends ServiceDiscoverer, SecureStore, AutoCloseable, FeatureFlagsProvider {
  
  /**
   * Fetches preferences for the given namespace.
   * @param namespace the namespace to get preferences for
   * @param resolved whether to resolve macros in preference values
   * @return map of preference key-value pairs
   * @throws Exception if fetching preferences fails
   */
  Map<String, String> getPreferencesForNamespace(String namespace, boolean resolved) throws Exception;
  
  /**
   * Creates a PluginConfigurer that can be used to instantiate plugins at runtime.
   * @param namespace the namespace context for plugin configuration
   * @return plugin configurer
   * @throws IOException if plugin configurer creation fails
   */
  PluginConfigurer createPluginConfigurer(String namespace) throws IOException;
  
  /**
   * Creates a ServicePluginConfigurer that can be used to instantiate plugins 
   * with macro evaluation.
   * @param namespace the namespace context for plugin configuration
   * @return service plugin configurer
   */
  ServicePluginConfigurer createServicePluginConfigurer(String namespace);
  
  /**
   * Evaluates macros using provided macro evaluator with the provided parsing options.
   * @param namespace the namespace context for macro evaluation
   * @param macros map of properties containing macros to evaluate
   * @param evaluator the macro evaluator to use
   * @param options macro parsing options
   * @return map with evaluated macros
   * @throws InvalidMacroException if macro evaluation fails
   */
  Map<String, String> evaluateMacros(String namespace, 
                                    Map<String, String> macros, 
                                    MacroEvaluator evaluator, 
                                    MacroParserOptions options) 
                                    throws InvalidMacroException;
  
  /**
   * Returns ArtifactManager for artifact listing and class loading.
   * @return artifact manager
   */
  ArtifactManager getArtifactManager();
  
  /**
   * Returns String service name.
   * @return service name
   */
  String getServiceName();
}

Exception Handling

Specialized exception classes for handling errors in remote task execution.

/**
 * An exception class for wrapping an Exception coming from remote task execution.
 */
public class RemoteExecutionException extends Exception {
  /**
   * Constructor with remote task exception cause.
   * @param cause the remote task exception
   */
  public RemoteExecutionException(RemoteTaskException cause);
  
  /**
   * Returns the remote task exception cause.
   * @return remote task exception cause
   */
  public RemoteTaskException getCause();
  
  /**
   * Converts a BasicThrowable to a RemoteExecutionException.
   * @param basicThrowable the basic throwable to convert
   * @return converted remote execution exception
   */
  public static RemoteExecutionException fromBasicThrowable(BasicThrowable basicThrowable);
}

/**
 * Captures the stacktrace of exceptions from remote task.
 */
public class RemoteTaskException extends Exception {
  /**
   * Constructor with remote exception class name, message and cause.
   * @param remoteExceptionClassName the remote exception class name
   * @param message the exception message
   * @param cause the underlying cause (nullable)
   */
  public RemoteTaskException(String remoteExceptionClassName, String message, @Nullable Throwable cause);
  
  /**
   * Returns the remote exception class name.
   * @return remote exception class name
   */
  public String getRemoteExceptionClassName();
  
  /**
   * String representation.
   * @return string representation
   */
  public String toString();
}

Usage Patterns

Basic Task Execution

// In a system HTTP service handler
@POST
@Path("/process")
public void processData(HttpServiceRequest request, HttpServiceResponder responder) {
  try {
    // Create task request
    RunnableTaskRequest taskRequest = RunnableTaskRequest
      .getBuilder("com.example.DataProcessingTask")
      .withParam("input-data")
      .withNamespace("analytics")
      .build();
    
    // Execute task
    byte[] result = getContext().runTask(taskRequest);
    responder.sendBytes(result, "application/json");
  } catch (Exception e) {
    responder.sendError(500, "Task execution failed: " + e.getMessage());
  }
}

Task with Cleanup

public class ResourceIntensiveTask implements RunnableTask {
  @Override
  public void run(RunnableTaskContext context) throws Exception {
    // Allocate resources
    ExternalResource resource = allocateResource();
    
    // Set cleanup task
    context.setCleanupTask(() -> {
      resource.close();
      LOG.info("Resource cleaned up");
    });
    
    try {
      // Perform work with resource
      String result = resource.process(context.getParam());
      context.writeResult(result.getBytes());
    } catch (Exception e) {
      // Cleanup will still be called
      throw e;
    }
  }
}

Important Notes

  • Tasks are executed on remote worker nodes, not in the originating service
  • Task classes must be available on the worker node classpath or specified via artifact
  • All task parameters must be serializable
  • Cleanup tasks are always executed, even if the main task fails
  • Use SystemAppTaskContext for advanced plugin configuration and macro evaluation
  • Remote task execution must be enabled in CDAP configuration
  • Task results are limited by available memory and should be kept reasonably small

Install with Tessl CLI

npx tessl i tessl/maven-io-cdap-cdap--cdap-system-app-api

docs

http-services.md

index.md

system-services.md

worker-tasks.md

tile.json