CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-temporal--temporal-sdk

Temporal Workflow Java SDK - A framework for authoring Workflows and Activities in Java

Overview
Eval results
Files

workers.mddocs/

Worker Configuration and Management

APIs for configuring and managing Temporal workers that execute workflow and activity code, with options for concurrency, task routing, and performance tuning.

Capabilities

Worker Factory

Factory for creating and managing worker instances.

/**
 * Factory for creating and managing worker instances.
 */
public interface WorkerFactory {
    /**
     * Creates worker factory with workflow client.
     * @param workflowClient workflow client instance
     * @return worker factory
     */
    static WorkerFactory newInstance(WorkflowClient workflowClient);

    /**
     * Creates worker factory with options.
     * @param workflowClient workflow client instance
     * @param factoryOptions factory options
     * @return worker factory
     */
    static WorkerFactory newInstance(WorkflowClient workflowClient, WorkerFactoryOptions factoryOptions);

    /**
     * Creates worker for task queue.
     * @param taskQueue task queue name
     * @return worker instance
     */
    Worker newWorker(String taskQueue);

    /**
     * Creates worker with options.
     * @param taskQueue task queue name
     * @param options worker options
     * @return worker instance
     */
    Worker newWorker(String taskQueue, WorkerOptions options);

    /**
     * Gets workflow client.
     * @return workflow client
     */
    WorkflowClient getWorkflowClient();

    /**
     * Starts all workers in this factory.
     */
    void start();

    /**
     * Shuts down all workers.
     */
    void shutdown();

    /**
     * Shuts down and waits for termination.
     * @param timeout maximum wait time
     * @param unit time unit
     * @return true if terminated within timeout
     */
    boolean shutdownAndAwaitTermination(long timeout, TimeUnit unit);

    /**
     * Awaits termination of all workers.
     * @param timeout maximum wait time
     * @param unit time unit
     * @return true if terminated within timeout
     */
    boolean awaitTermination(long timeout, TimeUnit unit);

    /**
     * Checks if factory is started.
     * @return true if started
     */
    boolean isStarted();

    /**
     * Checks if factory is shutdown.
     * @return true if shutdown
     */
    boolean isShutdown();

    /**
     * Checks if factory is terminated.
     * @return true if terminated
     */
    boolean isTerminated();

    /**
     * Suspends polling on all workers.
     */
    @Experimental
    void suspendPolling();

    /**
     * Resumes polling on all workers.
     */
    @Experimental
    void resumePolling();
}

Usage Examples:

public class WorkerFactoryExample {
    public static void main(String[] args) {
        // Create workflow client
        WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
        WorkflowClient client = WorkflowClient.newInstance(service);

        // Create worker factory
        WorkerFactory factory = WorkerFactory.newInstance(client);

        // Create workers for different task queues
        Worker orderWorker = factory.newWorker("order-processing");
        orderWorker.registerWorkflowImplementationTypes(OrderWorkflowImpl.class);
        orderWorker.registerActivitiesImplementations(new OrderActivitiesImpl());

        Worker paymentWorker = factory.newWorker("payment-processing",
            WorkerOptions.newBuilder()
                .setMaxConcurrentActivityExecutions(10)
                .setMaxConcurrentWorkflowTaskExecutions(5)
                .build()
        );
        paymentWorker.registerWorkflowImplementationTypes(PaymentWorkflowImpl.class);
        paymentWorker.registerActivitiesImplementations(new PaymentActivitiesImpl());

        // Start all workers
        factory.start();

        // Shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            factory.shutdown();
            try {
                if (!factory.awaitTermination(10, TimeUnit.SECONDS)) {
                    System.err.println("Workers did not terminate within timeout");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }));

        System.out.println("Workers started");
    }
}

Worker

Individual worker for executing workflows and activities.

/**
 * Individual worker for executing workflows and activities.
 */
public interface Worker {
    /**
     * Registers workflow implementation types.
     * @param workflowImplementationClasses workflow implementation classes
     */
    void registerWorkflowImplementationTypes(Class<?>... workflowImplementationClasses);

    /**
     * Registers workflow implementation types with options.
     * @param options registration options
     * @param workflowImplementationClasses workflow implementation classes
     */
    void registerWorkflowImplementationTypes(WorkflowImplementationOptions options, Class<?>... workflowImplementationClasses);

    /**
     * Registers activities implementations.
     * @param activityImplementations activity implementation instances
     */
    void registerActivitiesImplementations(Object... activityImplementations);

    /**
     * Registers activities implementations with options.
     * @param options registration options
     * @param activityImplementations activity implementation instances
     */
    void registerActivitiesImplementations(ActivityImplementationOptions options, Object... activityImplementations);

    /**
     * Adds workflow implementation factory.
     * @param workflowInterface workflow interface class
     * @param factory workflow instance factory
     */
    <R> void addWorkflowImplementationFactory(Class<R> workflowInterface, Func<R> factory);

    /**
     * Adds workflow implementation factory with options.
     * @param options registration options
     * @param workflowInterface workflow interface class
     * @param factory workflow instance factory
     */
    <R> void addWorkflowImplementationFactory(WorkflowImplementationOptions options, Class<R> workflowInterface, Func<R> factory);

    /**
     * Adds activity implementation factory.
     * @param activityInterface activity interface class
     * @param factory activity instance factory
     */
    <R> void addActivityImplementationFactory(Class<R> activityInterface, Func<R> factory);

    /**
     * Adds activity implementation factory with options.
     * @param options registration options
     * @param activityInterface activity interface class
     * @param factory activity instance factory
     */
    <R> void addActivityImplementationFactory(ActivityImplementationOptions options, Class<R> activityInterface, Func<R> factory);

    /**
     * Starts the worker.
     */
    void start();

    /**
     * Shuts down the worker.
     */
    void shutdown();

    /**
     * Awaits worker termination.
     * @param timeout maximum wait time
     * @param unit time unit
     * @return true if terminated within timeout
     */
    boolean awaitTermination(long timeout, TimeUnit unit);

    /**
     * Checks if worker is started.
     * @return true if started
     */
    boolean isStarted();

    /**
     * Checks if worker is shutdown.
     * @return true if shutdown
     */
    boolean isShutdown();

    /**
     * Checks if worker is terminated.
     * @return true if terminated
     */
    boolean isTerminated();

    /**
     * Suspends polling for new tasks.
     */
    @Experimental
    void suspendPolling();

    /**
     * Resumes polling for new tasks.
     */
    @Experimental
    void resumePolling();

    /**
     * Checks if polling is suspended.
     * @return true if polling is suspended
     */
    @Experimental
    boolean isPollingSupported();

    /**
     * Gets task queue name.
     * @return task queue name
     */
    String getTaskQueue();

    /**
     * Gets worker options.
     * @return worker options
     */
    WorkerOptions getOptions();
}

Usage Examples:

public class WorkerExample {
    public void setupWorkers() {
        WorkerFactory factory = WorkerFactory.newInstance(client);

        // Basic worker setup
        Worker basicWorker = factory.newWorker("basic-tasks");
        basicWorker.registerWorkflowImplementationTypes(BasicWorkflowImpl.class);
        basicWorker.registerActivitiesImplementations(new BasicActivitiesImpl());

        // Worker with custom options
        Worker customWorker = factory.newWorker("custom-tasks",
            WorkerOptions.newBuilder()
                .setMaxConcurrentWorkflowTaskExecutions(20)
                .setMaxConcurrentActivityExecutions(50)
                .setWorkerActivationTimeout(Duration.ofSeconds(30))
                .build()
        );

        // Register multiple workflow types
        customWorker.registerWorkflowImplementationTypes(
            OrderProcessingWorkflowImpl.class,
            InventoryWorkflowImpl.class,
            ShippingWorkflowImpl.class
        );

        // Register activities with factory
        customWorker.addActivityImplementationFactory(
            PaymentActivities.class,
            () -> new PaymentActivitiesImpl(paymentService)
        );

        // Register workflow with factory for dependency injection
        customWorker.addWorkflowImplementationFactory(
            ComplexWorkflow.class,
            () -> new ComplexWorkflowImpl(configService.getConfig())
        );

        factory.start();
    }

    public void demonstrateWorkerLifecycle() {
        Worker worker = factory.newWorker("lifecycle-demo");

        // Register implementations
        worker.registerWorkflowImplementationTypes(DemoWorkflowImpl.class);

        // Check status
        assert !worker.isStarted();
        assert !worker.isShutdown();

        // Start worker
        worker.start();
        assert worker.isStarted();

        // Suspend polling during maintenance
        worker.suspendPolling();

        // Resume after maintenance
        worker.resumePolling();

        // Shutdown gracefully
        worker.shutdown();
        try {
            if (!worker.awaitTermination(30, TimeUnit.SECONDS)) {
                System.err.println("Worker did not terminate within timeout");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

Worker Options

Configuration options for worker behavior including concurrency limits and timeouts.

/**
 * Configuration options for worker behavior including concurrency limits and timeouts.
 */
public final class WorkerOptions {
    /**
     * Creates new builder.
     * @return new WorkerOptions builder
     */
    public static Builder newBuilder();

    /**
     * Creates builder from existing options.
     * @param options existing options to copy
     * @return new builder with copied options
     */
    public static Builder newBuilder(WorkerOptions options);

    /**
     * Returns default instance.
     * @return default WorkerOptions
     */
    public static WorkerOptions getDefaultInstance();

    /**
     * Builder for WorkerOptions.
     */
    public static final class Builder {
        /**
         * Maximum concurrent workflow task executions.
         * @param maxConcurrentWorkflowTaskExecutions maximum concurrent workflow tasks
         * @return this builder
         */
        public Builder setMaxConcurrentWorkflowTaskExecutions(int maxConcurrentWorkflowTaskExecutions);

        /**
         * Maximum concurrent activity executions.
         * @param maxConcurrentActivityExecutions maximum concurrent activities
         * @return this builder
         */
        public Builder setMaxConcurrentActivityExecutions(int maxConcurrentActivityExecutions);

        /**
         * Maximum concurrent local activity executions.
         * @param maxConcurrentLocalActivityExecutions maximum concurrent local activities
         * @return this builder
         */
        public Builder setMaxConcurrentLocalActivityExecutions(int maxConcurrentLocalActivityExecutions);

        /**
         * Maximum workflow task queue poll thread count.
         * @param maxWorkerActivitiesPerSecond max activities per second
         * @return this builder
         */
        public Builder setMaxWorkerActivitiesPerSecond(double maxWorkerActivitiesPerSecond);

        /**
         * Maximum activity task queue poll thread count.
         * @param maxTaskQueueActivitiesPerSecond max task queue activities per second
         * @return this builder
         */
        public Builder setMaxTaskQueueActivitiesPerSecond(double maxTaskQueueActivitiesPerSecond);

        /**
         * Default workflow task start-to-close timeout.
         * @param defaultWorkflowTaskStartToCloseTimeout default timeout
         * @return this builder
         */
        public Builder setDefaultWorkflowTaskStartToCloseTimeout(Duration defaultWorkflowTaskStartToCloseTimeout);

        /**
         * Default activity task start-to-close timeout.
         * @param defaultActivityTaskStartToCloseTimeout default timeout
         * @return this builder
         */
        public Builder setDefaultActivityTaskStartToCloseTimeout(Duration defaultActivityTaskStartToCloseTimeout);

        /**
         * Default local activity task start-to-close timeout.
         * @param defaultLocalActivityStartToCloseTimeout default timeout
         * @return this builder
         */
        public Builder setDefaultLocalActivityStartToCloseTimeout(Duration defaultLocalActivityStartToCloseTimeout);

        /**
         * Task queue poll timeout.
         * @param taskQueuePollTimeout poll timeout
         * @return this builder
         */
        public Builder setTaskQueuePollTimeout(Duration taskQueuePollTimeout);

        /**
         * Enable logging of replay events.
         * @param enableLoggingInReplay true to enable
         * @return this builder
         */
        public Builder setEnableLoggingInReplay(boolean enableLoggingInReplay);

        /**
         * Sticky queue schedule-to-start timeout.
         * @param stickyQueueScheduleToStartTimeout sticky timeout
         * @return this builder
         */
        public Builder setStickyQueueScheduleToStartTimeout(Duration stickyQueueScheduleToStartTimeout);

        /**
         * Disable sticky execution.
         * @param disableStickyExecution true to disable
         * @return this builder
         */
        public Builder setDisableStickyExecution(boolean disableStickyExecution);

        /**
         * Worker activation timeout.
         * @param workerActivationTimeout activation timeout
         * @return this builder
         */
        public Builder setWorkerActivationTimeout(Duration workerActivationTimeout);

        /**
         * Local activity worker only mode.
         * @param localActivityWorkerOnly true for local activity only
         * @return this builder
         */
        public Builder setLocalActivityWorkerOnly(boolean localActivityWorkerOnly);

        /**
         * Default deadlock detection timeout.
         * @param defaultDeadlockDetectionTimeout deadlock timeout
         * @return this builder
         */
        public Builder setDefaultDeadlockDetectionTimeout(Duration defaultDeadlockDetectionTimeout);

        /**
         * Maximum fatal error count before shutdown.
         * @param maxConcurrentWorkflowTaskPollers max pollers
         * @return this builder
         */
        public Builder setMaxConcurrentWorkflowTaskPollers(int maxConcurrentWorkflowTaskPollers);

        /**
         * Maximum concurrent activity task pollers.
         * @param maxConcurrentActivityTaskPollers max pollers
         * @return this builder
         */
        public Builder setMaxConcurrentActivityTaskPollers(int maxConcurrentActivityTaskPollers);

        /**
         * Identity for this worker.
         * @param identity worker identity
         * @return this builder
         */
        public Builder setIdentity(String identity);

        /**
         * Binary checksum for workflow compatibility.
         * @param binaryChecksum binary checksum
         * @return this builder
         */
        public Builder setBinaryChecksum(String binaryChecksum);

        /**
         * Build ID for worker versioning (Experimental).
         * @param buildId build ID
         * @return this builder
         */
        @Experimental
        public Builder setBuildId(String buildId);

        /**
         * Use build ID for versioning (Experimental).
         * @param useBuildIdForVersioning true to use build ID
         * @return this builder
         */
        @Experimental
        public Builder setUseBuildIdForVersioning(boolean useBuildIdForVersioning);

        /**
         * Worker tuner for dynamic scaling (Experimental).
         * @param workerTuner worker tuner
         * @return this builder
         */
        @Experimental
        public Builder setWorkerTuner(WorkerTuner workerTuner);

        /**
         * Build the WorkerOptions.
         * @return configured WorkerOptions
         */
        public WorkerOptions build();
    }
}

Worker Factory Options

Options for worker factory configuration.

/**
 * Options for worker factory configuration.
 */
public final class WorkerFactoryOptions {
    /**
     * Creates new builder.
     * @return new WorkerFactoryOptions builder
     */
    public static Builder newBuilder();

    /**
     * Creates builder from existing options.
     * @param options existing options to copy
     * @return new builder with copied options
     */
    public static Builder newBuilder(WorkerFactoryOptions options);

    /**
     * Returns default instance.
     * @return default WorkerFactoryOptions
     */
    public static WorkerFactoryOptions getDefaultInstance();

    /**
     * Builder for WorkerFactoryOptions.
     */
    public static final class Builder {
        /**
         * Workflow host local poll thread count.
         * @param workflowHostLocalPollThreadCount thread count
         * @return this builder
         */
        public Builder setWorkflowHostLocalPollThreadCount(int workflowHostLocalPollThreadCount);

        /**
         * Maximum fatal error count before factory shutdown.
         * @param maxWorkflowThreadCount max thread count
         * @return this builder
         */
        public Builder setMaxWorkflowThreadCount(int maxWorkflowThreadCount);

        /**
         * Cache for sticky workflow executions.
         * @param workflowCache workflow cache
         * @return this builder
         */
        public Builder setWorkflowCache(WorkflowCache workflowCache);

        /**
         * Disable eager workflow start.
         * @param disableEagerWorkflowStart true to disable
         * @return this builder
         */
        public Builder setDisableEagerWorkflowStart(boolean disableEagerWorkflowStart);

        /**
         * Enable worker graceful shutdown.
         * @param enableGracefulShutdown true to enable
         * @return this builder
         */
        public Builder setEnableGracefulShutdown(boolean enableGracefulShutdown);

        /**
         * Worker interceptors.
         * @param workerInterceptors interceptor instances
         * @return this builder
         */
        public Builder setWorkerInterceptors(WorkerInterceptor... workerInterceptors);

        /**
         * Build the WorkerFactoryOptions.
         * @return configured WorkerFactoryOptions
         */
        public WorkerFactoryOptions build();
    }
}

Workflow Implementation Options

Options for registering workflow implementations.

/**
 * Options for registering workflow implementations.
 */
public final class WorkflowImplementationOptions {
    /**
     * Creates new builder.
     * @return new WorkflowImplementationOptions builder
     */
    public static Builder newBuilder();

    /**
     * Builder for WorkflowImplementationOptions.
     */
    public static final class Builder {
        /**
         * Failure converter for this workflow type.
         * @param failureConverter failure converter
         * @return this builder
         */
        public Builder setFailureConverter(FailureConverter failureConverter);

        /**
         * Data converter for this workflow type.
         * @param dataConverter data converter
         * @return this builder
         */
        public Builder setDataConverter(DataConverter dataConverter);

        /**
         * Context propagators for this workflow type.
         * @param contextPropagators context propagators
         * @return this builder
         */
        public Builder setContextPropagators(List<ContextPropagator> contextPropagators);

        /**
         * Workflow interceptors for this workflow type.
         * @param workflowInterceptors workflow interceptors
         * @return this builder
         */
        public Builder setWorkflowInterceptors(WorkflowInterceptor... workflowInterceptors);

        /**
         * Default workflow method type.
         * @param defaultWorkflowMethodType default method type
         * @return this builder
         */
        public Builder setDefaultWorkflowMethodType(Class<?> defaultWorkflowMethodType);

        /**
         * Build the WorkflowImplementationOptions.
         * @return configured WorkflowImplementationOptions
         */
        public WorkflowImplementationOptions build();
    }
}

Activity Implementation Options

Options for registering activity implementations.

/**
 * Options for registering activity implementations.
 */
public final class ActivityImplementationOptions {
    /**
     * Creates new builder.
     * @return new ActivityImplementationOptions builder
     */
    public static Builder newBuilder();

    /**
     * Builder for ActivityImplementationOptions.
     */
    public static final class Builder {
        /**
         * Data converter for this activity type.
         * @param dataConverter data converter
         * @return this builder
         */
        public Builder setDataConverter(DataConverter dataConverter);

        /**
         * Context propagators for this activity type.
         * @param contextPropagators context propagators
         * @return this builder
         */
        public Builder setContextPropagators(List<ContextPropagator> contextPropagators);

        /**
         * Activity interceptors for this activity type.
         * @param activityInterceptors activity interceptors
         * @return this builder
         */
        public Builder setActivityInterceptors(ActivityInterceptor... activityInterceptors);

        /**
         * Build the ActivityImplementationOptions.
         * @return configured ActivityImplementationOptions
         */
        public ActivityImplementationOptions build();
    }
}

Worker Tuning

Performance tuning utilities for dynamic worker scaling.

/**
 * Interface for dynamic worker tuning and scaling.
 */
public interface WorkerTuner {
    /**
     * Gets workflow slot supplier.
     * @return slot supplier for workflows
     */
    SlotSupplier<WorkflowSlotInfo> getWorkflowTaskSlotSupplier();

    /**
     * Gets activity slot supplier.
     * @return slot supplier for activities
     */
    SlotSupplier<ActivitySlotInfo> getActivityTaskSlotSupplier();

    /**
     * Gets local activity slot supplier.
     * @return slot supplier for local activities
     */
    SlotSupplier<LocalActivitySlotInfo> getLocalActivitySlotSupplier();

    /**
     * Shuts down the tuner.
     */
    void shutdown();

    /**
     * Awaits tuner termination.
     * @param timeout maximum wait time
     * @param unit time unit
     * @return true if terminated within timeout
     */
    boolean awaitTermination(long timeout, TimeUnit unit);
}

/**
 * Interface for supplying execution slots.
 */
public interface SlotSupplier<SI extends SlotInfo> {
    /**
     * Tries to reserve a slot.
     * @param slotInfo slot information
     * @return slot permit if available
     */
    SlotPermit tryReserveSlot(SI slotInfo);

    /**
     * Marks a slot as used.
     * @param slotInfo slot information
     */
    void markSlotUsed(SI slotInfo);

    /**
     * Releases a slot.
     * @param slotInfo slot information
     */
    void releaseSlot(SI slotInfo);

    /**
     * Gets maximum slots available.
     * @return maximum slot count
     */
    int getMaximumSlots();
}

Usage Examples:

public class WorkerTuningExample {
    public void setupTunedWorker() {
        // Create custom worker tuner
        WorkerTuner tuner = new CustomWorkerTuner();

        WorkerOptions options = WorkerOptions.newBuilder()
            .setWorkerTuner(tuner)
            .setMaxConcurrentWorkflowTaskExecutions(100)
            .setMaxConcurrentActivityExecutions(200)
            .build();

        Worker worker = factory.newWorker("tuned-tasks", options);
        worker.registerWorkflowImplementationTypes(HighThroughputWorkflowImpl.class);
        worker.registerActivitiesImplementations(new ScalableActivitiesImpl());
    }

    public void configureHighPerformanceWorker() {
        WorkerOptions options = WorkerOptions.newBuilder()
            // High concurrency settings
            .setMaxConcurrentWorkflowTaskExecutions(50)
            .setMaxConcurrentActivityExecutions(100)
            .setMaxConcurrentLocalActivityExecutions(200)

            // Polling optimization
            .setMaxConcurrentWorkflowTaskPollers(5)
            .setMaxConcurrentActivityTaskPollers(10)
            .setTaskQueuePollTimeout(Duration.ofSeconds(10))

            // Performance tuning
            .setWorkerActivationTimeout(Duration.ofSeconds(30))
            .setDefaultWorkflowTaskStartToCloseTimeout(Duration.ofSeconds(30))
            .setDefaultActivityTaskStartToCloseTimeout(Duration.ofMinutes(5))

            // Sticky execution for better cache utilization
            .setDisableStickyExecution(false)
            .setStickyQueueScheduleToStartTimeout(Duration.ofSeconds(5))

            .build();

        Worker highPerfWorker = factory.newWorker("high-performance", options);
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-temporal--temporal-sdk

docs

activities.md

client.md

data-conversion.md

exceptions.md

index.md

workers.md

workflows.md

tile.json