API for CDAP System Applications - provides interfaces and abstract classes for building system services that run in the CDAP system namespace
—
Remote task execution framework allowing system services to run tasks on worker nodes with full system context, serializable parameters, and comprehensive error handling.
Interface representing a task that can be launched by a Task worker service.
/**
* RunnableTask represents a task that can be launched by a Task worker service.
*/
public interface RunnableTask {
/**
* Executes the task with the provided context.
* @param context the execution context for the task
* @throws Exception if task execution fails
*/
void run(RunnableTaskContext context) throws Exception;
}Usage Example:
import io.cdap.cdap.api.service.worker.RunnableTask;
import io.cdap.cdap.api.service.worker.RunnableTaskContext;
public class DataProcessingTask implements RunnableTask {
@Override
public void run(RunnableTaskContext context) throws Exception {
// Get task parameters
String param = context.getParam();
String namespace = context.getNamespace();
// Perform task logic
String result = processData(param, namespace);
// Write result back
context.writeResult(result.getBytes());
// Set cleanup task if needed
context.setCleanupTask(() -> {
// Cleanup resources
});
}
private String processData(String param, String namespace) {
// Task implementation
return "processed: " + param;
}
}Request object for launching a runnable task with parameters and configuration.
/**
* Request for launching a runnable task.
*/
public class RunnableTaskRequest {
/**
* Returns the task class name.
* @return class name of the task to execute
*/
public String getClassName();
/**
* Returns the task parameter.
* @return task parameter or null if not set
*/
@Nullable
public RunnableTaskParam getParam();
/**
* Returns the artifact ID.
* @return artifact ID or null if not set
*/
@Nullable
public ArtifactId getArtifactId();
/**
* Returns the namespace.
* @return namespace or null if not set
*/
@Nullable
public String getNamespace();
/**
* Returns builder for RunnableTaskRequest.
* @param taskClassName the class name of the task
* @return builder instance
*/
public static Builder getBuilder(String taskClassName);
/**
* Builder for RunnableTaskRequest.
*/
public static class Builder {
/**
* Sets parameter for the task.
* @param param parameter string
* @return builder instance
*/
public Builder withParam(String param);
/**
* Sets namespace for the task.
* @param namespace namespace string
* @return builder instance
*/
public Builder withNamespace(String namespace);
/**
* Sets artifact ID for the task.
* @param artifactId artifact identifier
* @return builder instance
*/
public Builder withArtifact(ArtifactId artifactId);
/**
* Sets embedded task request.
* @param embeddedTaskRequest nested task request
* @return builder instance
*/
public Builder withEmbeddedTaskRequest(RunnableTaskRequest embeddedTaskRequest);
/**
* Builds the request.
* @return constructed RunnableTaskRequest
*/
public RunnableTaskRequest build();
}
}Usage Example:
import io.cdap.cdap.api.service.worker.RunnableTaskRequest;
import io.cdap.cdap.api.artifact.ArtifactId;
// Create simple task request
RunnableTaskRequest simpleTask = RunnableTaskRequest
.getBuilder("com.example.DataProcessingTask")
.withParam("input-data")
.withNamespace("analytics")
.build();
// Create task request with artifact
ArtifactId artifact = new ArtifactId("my-plugin", "1.0.0", ArtifactScope.USER);
RunnableTaskRequest taskWithArtifact = RunnableTaskRequest
.getBuilder("com.example.PluginTask")
.withParam("plugin-config")
.withArtifact(artifact)
.withNamespace("default")
.build();
// Create nested task request
RunnableTaskRequest embeddedTask = RunnableTaskRequest
.getBuilder("com.example.SubTask")
.withParam("sub-param")
.build();
RunnableTaskRequest parentTask = RunnableTaskRequest
.getBuilder("com.example.ParentTask")
.withEmbeddedTaskRequest(embeddedTask)
.build();Context for RunnableTask execution, providing result writing and cleanup capabilities.
/**
* Represents a context for a RunnableTask. This context is used for writing back
* the result of RunnableTask execution.
*/
public class RunnableTaskContext {
/**
* Constructor with task request.
* @param taskRequest the originating task request
*/
public RunnableTaskContext(RunnableTaskRequest taskRequest);
/**
* Constructor with task request and system app context.
* @param taskRequest the originating task request
* @param systemAppTaskContext system app task context (nullable)
*/
public RunnableTaskContext(RunnableTaskRequest taskRequest,
@Nullable SystemAppTaskContext systemAppTaskContext);
/**
* Writes result data.
* @param data result data as byte array
* @throws IOException if writing fails
*/
public void writeResult(byte[] data) throws IOException;
/**
* Sets cleanup task to run after task completion.
* @param cleanupTask cleanup runnable
*/
public void setCleanupTask(Runnable cleanupTask);
/**
* Executes the cleanup task.
*/
public void executeCleanupTask();
/**
* Sets whether to terminate the task runner on task completion.
* @param terminate true to terminate after completion
*/
public void setTerminateOnComplete(boolean terminate);
/**
* Returns true if terminate the task runner after the task completed.
* @return termination flag
*/
public boolean isTerminateOnComplete();
/**
* Gets the result as ByteBuffer.
* @return result buffer
*/
public ByteBuffer getResult();
/**
* Returns the class name.
* @return task class name
*/
public String getClassName();
/**
* Returns the parameter.
* @return parameter string or null
*/
@Nullable
public String getParam();
/**
* Returns embedded request.
* @return embedded task request or null
*/
@Nullable
public RunnableTaskRequest getEmbeddedRequest();
/**
* Returns namespace.
* @return namespace string or null
*/
@Nullable
public String getNamespace();
/**
* Returns artifact ID.
* @return artifact ID or null
*/
@Nullable
public ArtifactId getArtifactId();
/**
* Returns the system app task context.
* @return system app task context or null
*/
@Nullable
public SystemAppTaskContext getRunnableTaskSystemAppContext();
}Parameter wrapper for runnable task requests supporting both simple strings and embedded task requests.
/**
* Class for the parameter of RunnableTaskRequest.
*/
public class RunnableTaskParam {
/**
* Constructor with simple parameter and embedded task request.
* @param simpleParam parameter string (nullable)
* @param embeddedTaskRequest embedded task request (nullable)
*/
public RunnableTaskParam(@Nullable String simpleParam,
@Nullable RunnableTaskRequest embeddedTaskRequest);
/**
* Returns embedded task request.
* @return embedded task request or null
*/
@Nullable
public RunnableTaskRequest getEmbeddedTaskRequest();
/**
* Returns simple parameter.
* @return parameter string or null
*/
@Nullable
public String getSimpleParam();
/**
* String representation.
* @return string representation
*/
public String toString();
/**
* Equals implementation.
* @param o object to compare
* @return true if equal
*/
public boolean equals(Object o);
/**
* Hash code implementation.
* @return hash code
*/
public int hashCode();
}System App context for remote tasks with plugin configuration, artifact management, and macro evaluation.
/**
* System App context for a remote task.
*/
public interface SystemAppTaskContext
extends ServiceDiscoverer, SecureStore, AutoCloseable, FeatureFlagsProvider {
/**
* Fetches preferences for the given namespace.
* @param namespace the namespace to get preferences for
* @param resolved whether to resolve macros in preference values
* @return map of preference key-value pairs
* @throws Exception if fetching preferences fails
*/
Map<String, String> getPreferencesForNamespace(String namespace, boolean resolved) throws Exception;
/**
* Creates a PluginConfigurer that can be used to instantiate plugins at runtime.
* @param namespace the namespace context for plugin configuration
* @return plugin configurer
* @throws IOException if plugin configurer creation fails
*/
PluginConfigurer createPluginConfigurer(String namespace) throws IOException;
/**
* Creates a ServicePluginConfigurer that can be used to instantiate plugins
* with macro evaluation.
* @param namespace the namespace context for plugin configuration
* @return service plugin configurer
*/
ServicePluginConfigurer createServicePluginConfigurer(String namespace);
/**
* Evaluates macros using provided macro evaluator with the provided parsing options.
* @param namespace the namespace context for macro evaluation
* @param macros map of properties containing macros to evaluate
* @param evaluator the macro evaluator to use
* @param options macro parsing options
* @return map with evaluated macros
* @throws InvalidMacroException if macro evaluation fails
*/
Map<String, String> evaluateMacros(String namespace,
Map<String, String> macros,
MacroEvaluator evaluator,
MacroParserOptions options)
throws InvalidMacroException;
/**
* Returns ArtifactManager for artifact listing and class loading.
* @return artifact manager
*/
ArtifactManager getArtifactManager();
/**
* Returns String service name.
* @return service name
*/
String getServiceName();
}Specialized exception classes for handling errors in remote task execution.
/**
* An exception class for wrapping an Exception coming from remote task execution.
*/
public class RemoteExecutionException extends Exception {
/**
* Constructor with remote task exception cause.
* @param cause the remote task exception
*/
public RemoteExecutionException(RemoteTaskException cause);
/**
* Returns the remote task exception cause.
* @return remote task exception cause
*/
public RemoteTaskException getCause();
/**
* Converts a BasicThrowable to a RemoteExecutionException.
* @param basicThrowable the basic throwable to convert
* @return converted remote execution exception
*/
public static RemoteExecutionException fromBasicThrowable(BasicThrowable basicThrowable);
}
/**
* Captures the stacktrace of exceptions from remote task.
*/
public class RemoteTaskException extends Exception {
/**
* Constructor with remote exception class name, message and cause.
* @param remoteExceptionClassName the remote exception class name
* @param message the exception message
* @param cause the underlying cause (nullable)
*/
public RemoteTaskException(String remoteExceptionClassName, String message, @Nullable Throwable cause);
/**
* Returns the remote exception class name.
* @return remote exception class name
*/
public String getRemoteExceptionClassName();
/**
* String representation.
* @return string representation
*/
public String toString();
}// In a system HTTP service handler
@POST
@Path("/process")
public void processData(HttpServiceRequest request, HttpServiceResponder responder) {
try {
// Create task request
RunnableTaskRequest taskRequest = RunnableTaskRequest
.getBuilder("com.example.DataProcessingTask")
.withParam("input-data")
.withNamespace("analytics")
.build();
// Execute task
byte[] result = getContext().runTask(taskRequest);
responder.sendBytes(result, "application/json");
} catch (Exception e) {
responder.sendError(500, "Task execution failed: " + e.getMessage());
}
}public class ResourceIntensiveTask implements RunnableTask {
@Override
public void run(RunnableTaskContext context) throws Exception {
// Allocate resources
ExternalResource resource = allocateResource();
// Set cleanup task
context.setCleanupTask(() -> {
resource.close();
LOG.info("Resource cleaned up");
});
try {
// Perform work with resource
String result = resource.process(context.getParam());
context.writeResult(result.getBytes());
} catch (Exception e) {
// Cleanup will still be called
throw e;
}
}
}SystemAppTaskContext for advanced plugin configuration and macro evaluationInstall with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap-system-app-api