Temporal Workflow Java SDK - A framework for authoring Workflows and Activities in Java
Comprehensive APIs for implementing activities that handle external service calls, with built-in retry logic, heartbeat support, and async completion patterns.
Central utility class providing access to activity execution context and exception handling.
/**
* Central utility class for Activity execution, providing access to execution context and exception wrapping.
*/
public final class Activity {
/**
* Returns the ActivityExecutionContext for the current activity thread.
* @return the current activity execution context
* @throws IllegalStateException if called outside of activity execution
*/
public static ActivityExecutionContext getExecutionContext();
/**
* Wraps checked exceptions as RuntimeException for rethrowing from activities.
* @param e the exception to wrap
* @return RuntimeException wrapping the original exception
*/
public static RuntimeException wrap(Throwable e);
}Usage Examples:
@ActivityInterface
public interface FileProcessingActivity {
@ActivityMethod
String processFile(String filename);
}
public class FileProcessingActivityImpl implements FileProcessingActivity {
@Override
public String processFile(String filename) {
ActivityExecutionContext context = Activity.getExecutionContext();
try {
// Long running file processing
for (int i = 0; i < 100; i++) {
// Send heartbeat every 10 iterations
if (i % 10 == 0) {
context.heartbeat("Processing: " + i + "%");
}
// Process file chunk
Thread.sleep(1000);
}
return "File processed: " + filename;
} catch (InterruptedException e) {
throw Activity.wrap(e);
}
}
}Context object providing information about the current Activity execution and tools for heartbeating and async completion.
/**
* Context object providing information about the current Activity execution and tools for heartbeating and async completion.
*/
public interface ActivityExecutionContext {
/**
* Returns ActivityInfo with execution details.
* @return information about the current activity execution
*/
ActivityInfo getInfo();
/**
* Sends heartbeat with optional details, may throw ActivityCompletionException.
* @param details heartbeat details to send
* @throws ActivityCompletionException if activity should be completed
*/
<V> void heartbeat(V details) throws ActivityCompletionException;
/**
* Gets heartbeat details from last heartbeat or failed attempt.
* @param detailsClass class of the heartbeat details
* @return optional heartbeat details
*/
<V> Optional<V> getHeartbeatDetails(Class<V> detailsClass);
/**
* Type-safe version with generic type info for getting heartbeat details.
* @param detailsClass class of the heartbeat details
* @param detailsGenericType generic type information
* @return optional heartbeat details
*/
<V> Optional<V> getHeartbeatDetails(Class<V> detailsClass, Type detailsGenericType);
/**
* Gets details from last failed attempt only.
* @param detailsClass class of the heartbeat details
* @return optional heartbeat details from last failed attempt
*/
<V> Optional<V> getLastHeartbeatDetails(Class<V> detailsClass);
/**
* Type-safe version for getting details from last failed attempt.
* @param detailsClass class of the heartbeat details
* @param detailsGenericType generic type information
* @return optional heartbeat details from last failed attempt
*/
<V> Optional<V> getLastHeartbeatDetails(Class<V> detailsClass, Type detailsGenericType);
/**
* Returns task token for async completion.
* @return task token bytes
*/
byte[] getTaskToken();
/**
* Marks activity for async completion.
*/
void doNotCompleteOnReturn();
/**
* Checks if async completion is enabled.
* @return true if async completion is enabled
*/
boolean isDoNotCompleteOnReturn();
/**
* Returns ManualActivityCompletionClient for local async completion.
* @return client for manual activity completion
*/
ManualActivityCompletionClient useLocalManualCompletion();
/**
* Checks if local manual completion is enabled.
* @return true if local manual completion is enabled
*/
boolean isUseLocalManualCompletion();
/**
* Returns metrics scope for business metrics.
* @return metrics scope
*/
Scope getMetricsScope();
/**
* Returns WorkflowClient for service interaction.
* @return workflow client instance
*/
WorkflowClient getWorkflowClient();
/**
* Returns the currently running activity instance.
* @return activity instance
*/
Object getInstance();
}Usage Examples:
public class DataProcessingActivityImpl implements DataProcessingActivity {
@Override
public ProcessingResult processLargeDataset(DatasetRequest request) {
ActivityExecutionContext context = Activity.getExecutionContext();
ActivityInfo info = context.getInfo();
// Check for previous progress from heartbeat details
Optional<ProcessingState> previousState = context.getHeartbeatDetails(ProcessingState.class);
int startIndex = previousState.map(s -> s.getProcessedCount()).orElse(0);
for (int i = startIndex; i < request.getTotalItems(); i++) {
// Process item
processItem(request.getItem(i));
// Send heartbeat every 100 items
if (i % 100 == 0) {
ProcessingState state = new ProcessingState(i, request.getTotalItems());
context.heartbeat(state);
}
}
return new ProcessingResult(request.getTotalItems());
}
}Information about the Activity Task being executed.
/**
* Information about the Activity Task being executed.
*/
public interface ActivityInfo {
/**
* Task token for async completion.
* @return task token bytes
*/
byte[] getTaskToken();
/**
* Workflow ID that scheduled this activity.
* @return workflow ID
*/
String getWorkflowId();
/**
* Run ID of the scheduling workflow.
* @return workflow run ID
*/
String getRunId();
/**
* Activity execution ID.
* @return activity ID
*/
String getActivityId();
/**
* Activity type name.
* @return activity type
*/
String getActivityType();
/**
* When activity was initially scheduled (UNIX epoch ms).
* @return scheduled timestamp in milliseconds
*/
long getScheduledTimestamp();
/**
* When current attempt started (UNIX epoch ms).
* @return started timestamp in milliseconds
*/
long getStartedTimestamp();
/**
* When current attempt was scheduled (UNIX epoch ms).
* @return current attempt scheduled timestamp in milliseconds
*/
long getCurrentAttemptScheduledTimestamp();
/**
* Schedule-to-close timeout duration.
* @return schedule to close timeout
*/
Duration getScheduleToCloseTimeout();
/**
* Start-to-close timeout duration.
* @return start to close timeout
*/
Duration getStartToCloseTimeout();
/**
* Heartbeat timeout duration.
* @return heartbeat timeout
*/
Duration getHeartbeatTimeout();
/**
* Raw heartbeat details as Payloads.
* @return optional payloads containing heartbeat details
*/
Optional<Payloads> getHeartbeatDetails();
/**
* Type of the scheduling workflow.
* @return workflow type
*/
String getWorkflowType();
/**
* Namespace of the activity execution.
* @return namespace name
*/
String getNamespace();
/**
* Task queue name.
* @return task queue name
*/
String getActivityTaskQueue();
/**
* Current attempt number (starts at 1).
* @return attempt number
*/
int getAttempt();
/**
* Whether this is a local activity.
* @return true if local activity
*/
boolean isLocal();
/**
* Priority of the activity task (Experimental).
* @return task priority
*/
@Experimental
Priority getPriority();
/**
* Retry options for the activity.
* @return retry options
*/
RetryOptions getRetryOptions();
}Interface for implementing activities that can handle any activity type dynamically.
/**
* Interface for implementing activities that can handle any activity type dynamically.
*/
public interface DynamicActivity {
/**
* Executes the dynamic activity with encoded arguments.
* @param args encoded activity arguments
* @return activity result
*/
Object execute(EncodedValues args);
}Usage Examples:
public class GenericActivityImpl implements DynamicActivity {
@Override
public Object execute(EncodedValues args) {
ActivityInfo info = Activity.getExecutionContext().getInfo();
String activityType = info.getActivityType();
switch (activityType) {
case "SendEmail":
return handleSendEmail(args);
case "ProcessPayment":
return handleProcessPayment(args);
case "GenerateReport":
return handleGenerateReport(args);
default:
throw new IllegalArgumentException("Unknown activity type: " + activityType);
}
}
private Object handleSendEmail(EncodedValues args) {
// Extract arguments and send email
String recipient = args.get(0, String.class);
String subject = args.get(1, String.class);
String body = args.get(2, String.class);
// Send email logic...
return "Email sent to " + recipient;
}
}Client for manually completing activities that were marked with doNotCompleteOnReturn().
/**
* Client for manually completing activities that were marked with doNotCompleteOnReturn().
*/
public interface ManualActivityCompletionClient {
/**
* Completes activity successfully with result.
* @param result the activity result
*/
void complete(@Nullable Object result);
/**
* Completes activity with failure.
* @param failure the failure to report
*/
void fail(@Nonnull Throwable failure);
/**
* Records heartbeat, may throw CanceledFailure.
* @param details heartbeat details
* @throws CanceledFailure if activity was canceled
*/
void recordHeartbeat(@Nullable Object details) throws CanceledFailure;
/**
* Reports successful cancellation.
* @param details cancellation details
*/
void reportCancellation(@Nullable Object details);
}Usage Examples:
public class AsyncProcessingActivityImpl implements AsyncProcessingActivity {
@Override
public String startAsyncProcess(String processId) {
ActivityExecutionContext context = Activity.getExecutionContext();
context.doNotCompleteOnReturn();
ManualActivityCompletionClient completionClient = context.useLocalManualCompletion();
// Start async process in separate thread
CompletableFuture.runAsync(() -> {
try {
// Long running async operation
String result = performAsyncOperation(processId);
completionClient.complete(result);
} catch (Exception e) {
completionClient.fail(e);
}
});
return null; // Activity will complete asynchronously
}
}Configuration options for activity invocation.
/**
* Configuration options for activity invocation.
*/
public final class ActivityOptions {
/**
* Creates new builder.
* @return new ActivityOptions builder
*/
public static Builder newBuilder();
/**
* Creates builder from existing options.
* @param options existing options to copy
* @return new builder with copied options
*/
public static Builder newBuilder(ActivityOptions options);
/**
* Returns default instance (deprecated).
* @return default ActivityOptions instance
* @deprecated Use newBuilder() instead
*/
@Deprecated
public static ActivityOptions getDefaultInstance();
/**
* Builder for ActivityOptions.
*/
public static final class Builder {
/**
* Total time willing to wait including retries.
* @param timeout schedule to close timeout
* @return this builder
*/
public Builder setScheduleToCloseTimeout(Duration timeout);
/**
* Time activity can stay in queue.
* @param timeout schedule to start timeout
* @return this builder
*/
public Builder setScheduleToStartTimeout(Duration timeout);
/**
* Maximum time for single attempt.
* @param timeout start to close timeout
* @return this builder
*/
public Builder setStartToCloseTimeout(Duration timeout);
/**
* Heartbeat interval requirement.
* @param timeout heartbeat timeout
* @return this builder
*/
public Builder setHeartbeatTimeout(Duration timeout);
/**
* Task queue for activity dispatch.
* @param taskQueue task queue name
* @return this builder
*/
public Builder setTaskQueue(String taskQueue);
/**
* Retry policy configuration.
* @param retryOptions retry options
* @return this builder
*/
public Builder setRetryOptions(RetryOptions retryOptions);
/**
* Context propagation overrides.
* @param contextPropagators list of context propagators
* @return this builder
*/
public Builder setContextPropagators(List<ContextPropagator> contextPropagators);
/**
* Cancellation behavior.
* @param cancellationType cancellation type
* @return this builder
*/
public Builder setCancellationType(ActivityCancellationType cancellationType);
/**
* Disable eager execution.
* @param disable true to disable eager execution
* @return this builder
*/
public Builder setDisableEagerExecution(boolean disable);
/**
* Summary for UI/CLI (Experimental).
* @param summary activity summary
* @return this builder
*/
@Experimental
public Builder setSummary(String summary);
/**
* Priority settings (Experimental).
* @param priority task priority
* @return this builder
*/
@Experimental
public Builder setPriority(Priority priority);
/**
* Build the ActivityOptions.
* @return configured ActivityOptions
*/
public ActivityOptions build();
}
}Configuration options for local activity invocation.
/**
* Configuration options for local activity invocation.
*/
public final class LocalActivityOptions {
/**
* Creates new builder.
* @return new LocalActivityOptions builder
*/
public static Builder newBuilder();
/**
* Creates builder from existing options.
* @param options existing options to copy
* @return new builder with copied options
*/
public static Builder newBuilder(LocalActivityOptions options);
/**
* Returns default instance.
* @return default LocalActivityOptions
*/
public static LocalActivityOptions getDefaultInstance();
/**
* Builder for LocalActivityOptions.
*/
public static final class Builder {
/**
* Total time including retries.
* @param timeout schedule to close timeout
* @return this builder
*/
public Builder setScheduleToCloseTimeout(Duration timeout);
/**
* Time in local queue.
* @param timeout schedule to start timeout
* @return this builder
*/
public Builder setScheduleToStartTimeout(Duration timeout);
/**
* Maximum single attempt time.
* @param timeout start to close timeout
* @return this builder
*/
public Builder setStartToCloseTimeout(Duration timeout);
/**
* Max local retry delay before using workflow timer.
* @param threshold local retry threshold
* @return this builder
*/
public Builder setLocalRetryThreshold(Duration threshold);
/**
* Retry policy.
* @param retryOptions retry options
* @return this builder
*/
public Builder setRetryOptions(RetryOptions retryOptions);
/**
* Exclude arguments from history marker.
* @param doNotInclude true to exclude arguments
* @return this builder
*/
public Builder setDoNotIncludeArgumentsIntoMarker(boolean doNotInclude);
/**
* Summary for UI/CLI (Experimental).
* @param summary activity summary
* @return this builder
*/
@Experimental
public Builder setSummary(String summary);
/**
* Build the LocalActivityOptions.
* @return configured LocalActivityOptions
*/
public LocalActivityOptions build();
}
}Defines behavior when an activity's call scope is cancelled.
/**
* Defines behavior when an activity's call scope is cancelled.
*/
public enum ActivityCancellationType {
/**
* Wait for activity to confirm cancellation via heartbeat.
*/
WAIT_CANCELLATION_COMPLETED,
/**
* Send cancellation request and immediately fail with CanceledFailure.
*/
TRY_CANCEL,
/**
* Don't request cancellation, immediately fail with CanceledFailure.
*/
ABANDON
}Annotations for defining activity interfaces and methods.
/**
* Marks an interface as an activity interface for workflow stub creation.
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ActivityInterface {
/**
* Prefix for activity type names.
* @return name prefix (default: empty string)
*/
String namePrefix() default "";
}
/**
* Overrides the default activity type name for a method.
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ActivityMethod {
/**
* Custom activity type name.
* @return activity type name (default: method name with first letter capitalized)
*/
String name() default "";
}Usage Examples:
@ActivityInterface
public interface OrderProcessingActivities {
@ActivityMethod(name = "ValidatePayment")
boolean validatePayment(PaymentInfo payment);
@ActivityMethod
void sendConfirmationEmail(String email, OrderInfo order);
@ActivityMethod
ShippingInfo calculateShipping(Address address, List<OrderItem> items);
}
@ActivityInterface(namePrefix = "Notification")
public interface NotificationActivities {
@ActivityMethod // Will be "NotificationSendEmail"
void sendEmail(EmailRequest request);
@ActivityMethod // Will be "NotificationSendSms"
void sendSms(SmsRequest request);
}Install with Tessl CLI
npx tessl i tessl/maven-io-temporal--temporal-sdk