0
# Activity Completion
1
2
Manual activity completion for activities that complete outside the normal worker execution model, enabling asynchronous activity processing and external service integration.
3
4
## Capabilities
5
6
### AsyncCompletionClient
7
8
Client for manually completing activities that run outside of normal worker processes.
9
10
```typescript { .api }
11
/**
12
* Client for async activity operations
13
*/
14
class AsyncCompletionClient extends BaseClient {
15
/** Complete activity by task token */
16
complete<T = any>(taskToken: Uint8Array, result?: T): Promise<void>;
17
/** Complete activity by activity ID */
18
complete<T = any>(fullActivityId: FullActivityId, result?: T): Promise<void>;
19
20
/** Fail activity by task token */
21
fail(taskToken: Uint8Array, err: unknown): Promise<void>;
22
/** Fail activity by activity ID */
23
fail(fullActivityId: FullActivityId, err: unknown): Promise<void>;
24
25
/** Report activity cancellation by task token */
26
reportCancellation(
27
taskToken: Uint8Array,
28
details?: unknown
29
): Promise<void>;
30
/** Report activity cancellation by activity ID */
31
reportCancellation(
32
fullActivityId: FullActivityId,
33
details?: unknown
34
): Promise<void>;
35
36
/** Send activity heartbeat by task token */
37
heartbeat(taskToken: Uint8Array, details?: unknown): Promise<void>;
38
/** Send activity heartbeat by activity ID */
39
heartbeat(fullActivityId: FullActivityId, details?: unknown): Promise<void>;
40
41
/** Raw gRPC access to WorkflowService */
42
readonly workflowService: WorkflowService;
43
}
44
45
type AsyncCompletionClientOptions = BaseClientOptions;
46
```
47
48
**Usage Examples:**
49
50
```typescript
51
import { AsyncCompletionClient } from "@temporalio/client";
52
53
const activityClient = new AsyncCompletionClient();
54
55
// Complete activity with result (using task token)
56
const taskToken = new Uint8Array(/* token from activity context */);
57
await activityClient.complete(taskToken, { status: 'processed', data: result });
58
59
// Complete activity using activity ID
60
const activityId: FullActivityId = {
61
workflowId: 'workflow-123',
62
runId: 'run-456',
63
activityId: 'activity-789',
64
};
65
await activityClient.complete(activityId, 'Activity completed successfully');
66
67
// Fail activity with error
68
try {
69
// Some external processing
70
throw new Error('External service unavailable');
71
} catch (error) {
72
await activityClient.fail(taskToken, error);
73
}
74
75
// Send heartbeat for long-running activity
76
setInterval(async () => {
77
await activityClient.heartbeat(taskToken, { progress: currentProgress });
78
}, 30000); // Every 30 seconds
79
80
// Report activity cancellation
81
await activityClient.reportCancellation(activityId, 'User requested cancellation');
82
```
83
84
### Activity Identification
85
86
Types for identifying activities for completion operations.
87
88
```typescript { .api }
89
/**
90
* Complete activity identifier using workflow and activity IDs
91
*/
92
interface FullActivityId {
93
/** Workflow identifier */
94
workflowId: string;
95
/** Workflow run identifier (optional) */
96
runId?: string;
97
/** Activity identifier within the workflow */
98
activityId: string;
99
}
100
```
101
102
### Activity Completion Errors
103
104
Specialized error types for activity completion operations.
105
106
```typescript { .api }
107
/**
108
* Activity not found error
109
*/
110
class ActivityNotFoundError extends Error {
111
constructor(message: string);
112
}
113
114
/**
115
* Activity completion failed error
116
*/
117
class ActivityCompletionError extends Error {
118
readonly cause?: Error;
119
constructor(message: string, cause?: Error);
120
}
121
122
/**
123
* Activity was cancelled error
124
*/
125
class ActivityCancelledError extends Error {
126
readonly details?: unknown;
127
constructor(message: string, details?: unknown);
128
}
129
130
/**
131
* Activity was paused error
132
*/
133
class ActivityPausedError extends Error {
134
constructor(message: string);
135
}
136
```
137
138
## Common Usage Patterns
139
140
### External Service Integration
141
142
```typescript
143
// Process activity in external service
144
class ExternalProcessor {
145
private activityClient: AsyncCompletionClient;
146
147
constructor() {
148
this.activityClient = new AsyncCompletionClient();
149
}
150
151
async processActivity(taskToken: Uint8Array, payload: any) {
152
try {
153
// Send initial heartbeat
154
await this.activityClient.heartbeat(taskToken, { status: 'started' });
155
156
// Process in external system
157
const result = await this.externalService.process(payload);
158
159
// Complete with result
160
await this.activityClient.complete(taskToken, result);
161
} catch (error) {
162
// Fail activity
163
await this.activityClient.fail(taskToken, error);
164
}
165
}
166
167
private async externalService.process(payload: any) {
168
// External processing logic
169
return { processed: true, data: payload };
170
}
171
}
172
```
173
174
### Long-Running Activity Management
175
176
```typescript
177
// Manage long-running activity with heartbeats
178
class LongRunningActivityManager {
179
private activityClient: AsyncCompletionClient;
180
private heartbeatIntervals = new Map<string, NodeJS.Timer>();
181
182
constructor() {
183
this.activityClient = new AsyncCompletionClient();
184
}
185
186
startProcessing(taskToken: Uint8Array, activityId: string) {
187
// Start heartbeat
188
const interval = setInterval(async () => {
189
try {
190
await this.activityClient.heartbeat(taskToken, {
191
timestamp: new Date(),
192
status: 'processing'
193
});
194
} catch (error) {
195
console.error('Heartbeat failed:', error);
196
this.stopHeartbeat(activityId);
197
}
198
}, 30000); // 30 seconds
199
200
this.heartbeatIntervals.set(activityId, interval);
201
}
202
203
async completeProcessing(taskToken: Uint8Array, activityId: string, result: any) {
204
this.stopHeartbeat(activityId);
205
await this.activityClient.complete(taskToken, result);
206
}
207
208
async cancelProcessing(taskToken: Uint8Array, activityId: string, reason: string) {
209
this.stopHeartbeat(activityId);
210
await this.activityClient.reportCancellation(taskToken, { reason });
211
}
212
213
private stopHeartbeat(activityId: string) {
214
const interval = this.heartbeatIntervals.get(activityId);
215
if (interval) {
216
clearInterval(interval);
217
this.heartbeatIntervals.delete(activityId);
218
}
219
}
220
}
221
```
222
223
### Batch Activity Processing
224
225
```typescript
226
// Process multiple activities in batches
227
class BatchActivityProcessor {
228
private activityClient: AsyncCompletionClient;
229
private batchQueue: Array<{ taskToken: Uint8Array; payload: any }> = [];
230
231
constructor() {
232
this.activityClient = new AsyncCompletionClient();
233
this.processBatch();
234
}
235
236
enqueueActivity(taskToken: Uint8Array, payload: any) {
237
this.batchQueue.push({ taskToken, payload });
238
}
239
240
private async processBatch() {
241
setInterval(async () => {
242
if (this.batchQueue.length === 0) return;
243
244
const batch = this.batchQueue.splice(0, 10); // Process 10 at a time
245
246
await Promise.allSettled(batch.map(async ({ taskToken, payload }) => {
247
try {
248
const result = await this.processItem(payload);
249
await this.activityClient.complete(taskToken, result);
250
} catch (error) {
251
await this.activityClient.fail(taskToken, error);
252
}
253
}));
254
}, 5000); // Every 5 seconds
255
}
256
257
private async processItem(payload: any) {
258
// Processing logic
259
return { processed: true, data: payload };
260
}
261
}
262
```