CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-temporalio--client

TypeScript SDK Client for communicating with Temporal workflow orchestration systems, providing workflow lifecycle management, connection handling, and durable execution patterns.

Overview
Eval results
Files

activity-completion.mddocs/

Activity Completion

Manual activity completion for activities that complete outside the normal worker execution model, enabling asynchronous activity processing and external service integration.

Capabilities

AsyncCompletionClient

Client for manually completing activities that run outside of normal worker processes.

/**
 * Client for async activity operations
 */
class AsyncCompletionClient extends BaseClient {
  /** Complete activity by task token */
  complete<T = any>(taskToken: Uint8Array, result?: T): Promise<void>;
  /** Complete activity by activity ID */
  complete<T = any>(fullActivityId: FullActivityId, result?: T): Promise<void>;

  /** Fail activity by task token */
  fail(taskToken: Uint8Array, err: unknown): Promise<void>;
  /** Fail activity by activity ID */
  fail(fullActivityId: FullActivityId, err: unknown): Promise<void>;

  /** Report activity cancellation by task token */
  reportCancellation(
    taskToken: Uint8Array,
    details?: unknown
  ): Promise<void>;
  /** Report activity cancellation by activity ID */
  reportCancellation(
    fullActivityId: FullActivityId,
    details?: unknown
  ): Promise<void>;

  /** Send activity heartbeat by task token */
  heartbeat(taskToken: Uint8Array, details?: unknown): Promise<void>;
  /** Send activity heartbeat by activity ID */
  heartbeat(fullActivityId: FullActivityId, details?: unknown): Promise<void>;

  /** Raw gRPC access to WorkflowService */
  readonly workflowService: WorkflowService;
}

type AsyncCompletionClientOptions = BaseClientOptions;

Usage Examples:

import { AsyncCompletionClient } from "@temporalio/client";

const activityClient = new AsyncCompletionClient();

// Complete activity with result (using task token)
const taskToken = new Uint8Array(/* token from activity context */);
await activityClient.complete(taskToken, { status: 'processed', data: result });

// Complete activity using activity ID
const activityId: FullActivityId = {
  workflowId: 'workflow-123',
  runId: 'run-456',
  activityId: 'activity-789',
};
await activityClient.complete(activityId, 'Activity completed successfully');

// Fail activity with error
try {
  // Some external processing
  throw new Error('External service unavailable');
} catch (error) {
  await activityClient.fail(taskToken, error);
}

// Send heartbeat for long-running activity
setInterval(async () => {
  await activityClient.heartbeat(taskToken, { progress: currentProgress });
}, 30000); // Every 30 seconds

// Report activity cancellation
await activityClient.reportCancellation(activityId, 'User requested cancellation');

Activity Identification

Types for identifying activities for completion operations.

/**
 * Complete activity identifier using workflow and activity IDs
 */
interface FullActivityId {
  /** Workflow identifier */
  workflowId: string;
  /** Workflow run identifier (optional) */
  runId?: string;
  /** Activity identifier within the workflow */
  activityId: string;
}

Activity Completion Errors

Specialized error types for activity completion operations.

/**
 * Activity not found error
 */
class ActivityNotFoundError extends Error {
  constructor(message: string);
}

/**
 * Activity completion failed error
 */
class ActivityCompletionError extends Error {
  readonly cause?: Error;
  constructor(message: string, cause?: Error);
}

/**
 * Activity was cancelled error
 */
class ActivityCancelledError extends Error {
  readonly details?: unknown;
  constructor(message: string, details?: unknown);
}

/**
 * Activity was paused error
 */
class ActivityPausedError extends Error {
  constructor(message: string);
}

Common Usage Patterns

External Service Integration

// Process activity in external service
class ExternalProcessor {
  private activityClient: AsyncCompletionClient;

  constructor() {
    this.activityClient = new AsyncCompletionClient();
  }

  async processActivity(taskToken: Uint8Array, payload: any) {
    try {
      // Send initial heartbeat
      await this.activityClient.heartbeat(taskToken, { status: 'started' });

      // Process in external system
      const result = await this.externalService.process(payload);

      // Complete with result
      await this.activityClient.complete(taskToken, result);
    } catch (error) {
      // Fail activity
      await this.activityClient.fail(taskToken, error);
    }
  }

  private async externalService.process(payload: any) {
    // External processing logic
    return { processed: true, data: payload };
  }
}

Long-Running Activity Management

// Manage long-running activity with heartbeats
class LongRunningActivityManager {
  private activityClient: AsyncCompletionClient;
  private heartbeatIntervals = new Map<string, NodeJS.Timer>();

  constructor() {
    this.activityClient = new AsyncCompletionClient();
  }

  startProcessing(taskToken: Uint8Array, activityId: string) {
    // Start heartbeat
    const interval = setInterval(async () => {
      try {
        await this.activityClient.heartbeat(taskToken, {
          timestamp: new Date(),
          status: 'processing'
        });
      } catch (error) {
        console.error('Heartbeat failed:', error);
        this.stopHeartbeat(activityId);
      }
    }, 30000); // 30 seconds

    this.heartbeatIntervals.set(activityId, interval);
  }

  async completeProcessing(taskToken: Uint8Array, activityId: string, result: any) {
    this.stopHeartbeat(activityId);
    await this.activityClient.complete(taskToken, result);
  }

  async cancelProcessing(taskToken: Uint8Array, activityId: string, reason: string) {
    this.stopHeartbeat(activityId);
    await this.activityClient.reportCancellation(taskToken, { reason });
  }

  private stopHeartbeat(activityId: string) {
    const interval = this.heartbeatIntervals.get(activityId);
    if (interval) {
      clearInterval(interval);
      this.heartbeatIntervals.delete(activityId);
    }
  }
}

Batch Activity Processing

// Process multiple activities in batches
class BatchActivityProcessor {
  private activityClient: AsyncCompletionClient;
  private batchQueue: Array<{ taskToken: Uint8Array; payload: any }> = [];

  constructor() {
    this.activityClient = new AsyncCompletionClient();
    this.processBatch();
  }

  enqueueActivity(taskToken: Uint8Array, payload: any) {
    this.batchQueue.push({ taskToken, payload });
  }

  private async processBatch() {
    setInterval(async () => {
      if (this.batchQueue.length === 0) return;

      const batch = this.batchQueue.splice(0, 10); // Process 10 at a time

      await Promise.allSettled(batch.map(async ({ taskToken, payload }) => {
        try {
          const result = await this.processItem(payload);
          await this.activityClient.complete(taskToken, result);
        } catch (error) {
          await this.activityClient.fail(taskToken, error);
        }
      }));
    }, 5000); // Every 5 seconds
  }

  private async processItem(payload: any) {
    // Processing logic
    return { processed: true, data: payload };
  }
}

Install with Tessl CLI

npx tessl i tessl/npm-temporalio--client

docs

activity-completion.md

client-connection.md

error-handling.md

index.md

interceptors.md

schedule-client.md

task-queue-client.md

workflow-client.md

tile.json