or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

activity.mdclient.mdcommon.mdcontrib-pydantic.mddata-conversion.mdexceptions.mdindex.mdruntime.mdtesting.mdworker.mdworkflow.md

activity.mddocs/

0

# Activity Development

1

2

The temporalio.activity module provides functions and utilities for defining and executing activities within Temporal workflows. Activities are the building blocks of workflows that perform actual business logic, interact with external systems, and can be retried, timeout, and cancelled.

3

4

## Core Imports

5

6

```python

7

from temporalio import activity

8

from temporalio.activity import Info, LoggerAdapter, ActivityCancellationDetails

9

```

10

11

## Activity Definition

12

13

### Activity Decorator

14

15

The `@activity.defn` decorator is used to mark functions as Temporal activities:

16

17

```python { .api }

18

@overload

19

def defn(fn: CallableType) -> CallableType: ...

20

21

@overload

22

def defn(

23

*,

24

name: Optional[str] = None,

25

no_thread_cancel_exception: bool = False

26

) -> Callable[[CallableType], CallableType]: ...

27

28

@overload

29

def defn(

30

*,

31

no_thread_cancel_exception: bool = False,

32

dynamic: bool = False

33

) -> Callable[[CallableType], CallableType]: ...

34

35

def defn(

36

fn: Optional[CallableType] = None,

37

*,

38

name: Optional[str] = None,

39

no_thread_cancel_exception: bool = False,

40

dynamic: bool = False,

41

) -> Union[CallableType, Callable[[CallableType], CallableType]]:

42

"""Decorator for activity functions.

43

44

Activities can be async or non-async.

45

46

Args:

47

fn: The function to decorate.

48

name: Name to use for the activity. Defaults to function ``__name__``.

49

This cannot be set if dynamic is set.

50

no_thread_cancel_exception: If set to true, an exception will not be

51

raised in synchronous, threaded activities upon cancellation.

52

dynamic: If true, this activity will be dynamic. Dynamic activities have

53

to accept a single 'Sequence[RawValue]' parameter. This cannot be

54

set to true if name is present.

55

"""

56

```

57

58

#### Basic Activity Definition

59

60

```python

61

@activity.defn

62

async def process_data(data: str) -> str:

63

"""Async activity function."""

64

return f"Processed: {data}"

65

66

@activity.defn

67

def sync_process_data(data: str) -> str:

68

"""Synchronous activity function."""

69

return f"Processed: {data}"

70

```

71

72

#### Named Activities

73

74

```python

75

@activity.defn(name="custom_activity_name")

76

async def my_activity(input: str) -> str:

77

return f"Result: {input}"

78

```

79

80

#### Thread-Safe Cancellation Control

81

82

```python

83

@activity.defn(no_thread_cancel_exception=True)

84

def robust_activity(data: str) -> str:

85

"""Activity that won't have cancellation exceptions thrown in threads."""

86

# Critical cleanup logic that should not be interrupted

87

return process_critical_data(data)

88

```

89

90

#### Dynamic Activities

91

92

```python

93

from temporalio.common import RawValue

94

from typing import Sequence

95

96

@activity.defn(dynamic=True)

97

async def dynamic_activity(args: Sequence[RawValue]) -> Any:

98

"""Dynamic activity that can handle any activity type."""

99

# Use payload_converter() to decode raw values

100

converter = activity.payload_converter()

101

decoded_args = [converter.from_payload(arg.payload) for arg in args]

102

return process_dynamic_args(decoded_args)

103

```

104

105

## Activity Context Functions

106

107

### Client Access

108

109

```python { .api }

110

def client() -> temporalio.client.Client:

111

"""Return a Temporal Client for use in the current activity.

112

113

The client is only available in `async def` activities.

114

115

In tests it is not available automatically, but you can pass a client when creating a

116

:py:class:`temporalio.testing.ActivityEnvironment`.

117

118

Returns:

119

:py:class:`temporalio.client.Client` for use in the current activity.

120

121

Raises:

122

RuntimeError: When the client is not available.

123

"""

124

```

125

126

```python

127

@activity.defn

128

async def query_external_service(user_id: str) -> dict:

129

"""Activity that uses the client to interact with Temporal."""

130

client = activity.client()

131

132

# Can use client to start child workflows, query other executions, etc.

133

result = await client.execute_workflow(

134

"external_workflow",

135

user_id,

136

id=f"external-{user_id}",

137

task_queue="external-queue"

138

)

139

return result

140

```

141

142

### Activity Information Access

143

144

```python { .api }

145

def info() -> Info:

146

"""Current activity's info.

147

148

Returns:

149

Info for the currently running activity.

150

151

Raises:

152

RuntimeError: When not in an activity.

153

"""

154

```

155

156

```python

157

@activity.defn

158

async def tracked_activity(data: str) -> str:

159

"""Activity that uses info for logging and tracking."""

160

info = activity.info()

161

162

# Access activity execution details

163

activity_logger.info(

164

f"Processing activity {info.activity_type} "

165

f"(attempt {info.attempt}) for workflow {info.workflow_id}"

166

)

167

168

return f"Processed {data} in attempt {info.attempt}"

169

```

170

171

### Context Detection

172

173

```python { .api }

174

def in_activity() -> bool:

175

"""Whether the current code is inside an activity.

176

177

Returns:

178

True if in an activity, False otherwise.

179

"""

180

```

181

182

```python

183

def conditional_logic():

184

"""Function that behaves differently in activity vs non-activity context."""

185

if activity.in_activity():

186

# Use activity-specific logging and error handling

187

info = activity.info()

188

logger = activity.logger

189

logger.info(f"Running in activity {info.activity_type}")

190

else:

191

# Use regular application logging

192

logger = logging.getLogger(__name__)

193

logger.info("Running outside activity context")

194

```

195

196

## Activity Lifecycle Management

197

198

### Heartbeat Functionality

199

200

```python { .api }

201

def heartbeat(*details: Any) -> None:

202

"""Send a heartbeat for the current activity.

203

204

Raises:

205

RuntimeError: When not in an activity.

206

"""

207

```

208

209

Heartbeats are essential for long-running activities to signal that they are still alive and making progress:

210

211

```python

212

@activity.defn

213

async def long_running_task(items: List[str]) -> List[str]:

214

"""Activity that reports progress via heartbeats."""

215

results = []

216

217

for i, item in enumerate(items):

218

# Process the item

219

result = await process_item(item)

220

results.append(result)

221

222

# Send heartbeat with progress details

223

progress = {

224

"processed": i + 1,

225

"total": len(items),

226

"last_item": item

227

}

228

activity.heartbeat(progress)

229

230

# Check for cancellation periodically

231

if activity.is_cancelled():

232

break

233

234

return results

235

```

236

237

### Cancellation Detection

238

239

```python { .api }

240

def is_cancelled() -> bool:

241

"""Whether a cancellation was ever requested on this activity.

242

243

Returns:

244

True if the activity has had a cancellation request, False otherwise.

245

246

Raises:

247

RuntimeError: When not in an activity.

248

"""

249

```

250

251

```python

252

@activity.defn

253

async def cancellable_activity(duration_seconds: int) -> str:

254

"""Activity that gracefully handles cancellation."""

255

start_time = time.time()

256

257

while time.time() - start_time < duration_seconds:

258

# Check for cancellation

259

if activity.is_cancelled():

260

# Perform cleanup

261

await cleanup_resources()

262

return "Cancelled gracefully"

263

264

# Do some work

265

await asyncio.sleep(1)

266

activity.heartbeat({"elapsed": time.time() - start_time})

267

268

return "Completed successfully"

269

```

270

271

### Worker Shutdown Detection

272

273

```python { .api }

274

def is_worker_shutdown() -> bool:

275

"""Whether shutdown has been invoked on the worker.

276

277

Returns:

278

True if shutdown has been called on the worker, False otherwise.

279

280

Raises:

281

RuntimeError: When not in an activity.

282

"""

283

```

284

285

```python

286

@activity.defn

287

async def shutdown_aware_activity(data: List[str]) -> List[str]:

288

"""Activity that handles worker shutdown gracefully."""

289

results = []

290

291

for item in data:

292

# Check if worker is shutting down

293

if activity.is_worker_shutdown():

294

# Save partial progress and exit cleanly

295

await save_partial_results(results)

296

raise Exception("Worker shutting down, partial results saved")

297

298

result = await process_item(item)

299

results.append(result)

300

301

return results

302

```

303

304

## Activity Cancellation and Control

305

306

### Cancellation Details

307

308

```python { .api }

309

def cancellation_details() -> Optional[ActivityCancellationDetails]:

310

"""Cancellation details of the current activity, if any. Once set, cancellation details do not change."""

311

```

312

313

The `ActivityCancellationDetails` class provides detailed information about why an activity was cancelled:

314

315

```python { .api }

316

@dataclass(frozen=True)

317

class ActivityCancellationDetails:

318

"""Provides the reasons for the activity's cancellation. Cancellation details are set once and do not change once set."""

319

320

not_found: bool = False

321

cancel_requested: bool = False

322

paused: bool = False

323

timed_out: bool = False

324

worker_shutdown: bool = False

325

```

326

327

```python

328

@activity.defn

329

async def detailed_cancellation_handling() -> str:

330

"""Activity that provides detailed cancellation information."""

331

try:

332

# Perform long-running work

333

await perform_work()

334

return "Success"

335

except Exception:

336

# Check cancellation details

337

details = activity.cancellation_details()

338

if details:

339

if details.timed_out:

340

await handle_timeout_cleanup()

341

return "Timed out, cleanup completed"

342

elif details.cancel_requested:

343

await handle_cancellation_cleanup()

344

return "Cancelled by request, cleanup completed"

345

elif details.worker_shutdown:

346

await handle_shutdown_cleanup()

347

return "Worker shutdown, cleanup completed"

348

raise

349

```

350

351

### Shielding from Cancellation

352

353

```python { .api }

354

@contextmanager

355

def shield_thread_cancel_exception() -> Iterator[None]:

356

"""Context manager for synchronous multithreaded activities to delay

357

cancellation exceptions.

358

359

By default, synchronous multithreaded activities have an exception thrown

360

inside when cancellation occurs. Code within a "with" block of this context

361

manager will delay that throwing until the end. Even if the block returns a

362

value or throws its own exception, if a cancellation exception is pending,

363

it is thrown instead. Therefore users are encouraged to not throw out of

364

this block and can surround this with a try/except if they wish to catch a

365

cancellation.

366

367

This properly supports nested calls and will only throw after the last one.

368

369

This just runs the blocks with no extra effects for async activities or

370

synchronous multiprocess/other activities.

371

372

Raises:

373

temporalio.exceptions.CancelledError: If a cancellation occurs anytime

374

during this block and this is not nested in another shield block.

375

"""

376

```

377

378

```python

379

@activity.defn

380

def sync_activity_with_cleanup(data: str) -> str:

381

"""Synchronous activity that performs critical cleanup."""

382

try:

383

# Critical section that should not be interrupted

384

with activity.shield_thread_cancel_exception():

385

# Perform critical operations

386

critical_result = perform_critical_operation(data)

387

388

# Critical cleanup that must complete

389

cleanup_critical_resources()

390

391

return critical_result

392

except temporalio.exceptions.CancelledError:

393

# Handle cancellation after cleanup is complete

394

return "Cancelled after cleanup"

395

```

396

397

### Synchronous Cancellation Waiting

398

399

```python { .api }

400

def wait_for_cancelled_sync(timeout: Optional[Union[timedelta, float]] = None) -> None:

401

"""Synchronously block while waiting for a cancellation request on this

402

activity.

403

404

This is essentially a wrapper around :py:meth:`threading.Event.wait`.

405

406

Args:

407

timeout: Max amount of time to wait for cancellation.

408

409

Raises:

410

RuntimeError: When not in an activity.

411

"""

412

```

413

414

```python

415

@activity.defn

416

def sync_monitoring_activity(check_interval: float = 1.0) -> str:

417

"""Synchronous activity that monitors for cancellation."""

418

while True:

419

# Perform some work

420

result = perform_work_unit()

421

422

# Wait for cancellation with timeout

423

activity.wait_for_cancelled_sync(timeout=check_interval)

424

425

# Check if actually cancelled

426

if activity.is_cancelled():

427

return "Cancelled during monitoring"

428

429

# Continue if timeout occurred (not cancelled)

430

if work_is_complete(result):

431

return "Work completed successfully"

432

```

433

434

### Worker Shutdown Waiting

435

436

```python { .api }

437

def wait_for_worker_shutdown_sync(

438

timeout: Optional[Union[timedelta, float]] = None,

439

) -> None:

440

"""Synchronously block while waiting for shutdown to be called on the

441

worker.

442

443

This is essentially a wrapper around :py:meth:`threading.Event.wait`.

444

445

Args:

446

timeout: Max amount of time to wait for shutdown to be called on the

447

worker.

448

449

Raises:

450

RuntimeError: When not in an activity.

451

"""

452

```

453

454

```python

455

@activity.defn

456

def graceful_shutdown_activity(work_items: List[str]) -> List[str]:

457

"""Activity that gracefully handles worker shutdown."""

458

results = []

459

460

for item in work_items:

461

# Process item

462

result = process_item(item)

463

results.append(result)

464

465

# Check for worker shutdown with timeout

466

activity.wait_for_worker_shutdown_sync(timeout=0.1)

467

468

if activity.is_worker_shutdown():

469

# Save partial results before shutdown

470

save_results(results)

471

raise Exception(f"Worker shutdown, saved {len(results)} results")

472

473

return results

474

```

475

476

## Async Activity Completion

477

478

### External Completion

479

480

```python { .api }

481

def raise_complete_async() -> NoReturn:

482

"""Raise an error that says the activity will be completed

483

asynchronously.

484

"""

485

```

486

487

For activities that need to be completed by external systems:

488

489

```python

490

@activity.defn

491

async def async_completion_activity(task_id: str) -> str:

492

"""Activity that will be completed externally."""

493

# Start external process

494

external_system.start_task(task_id, callback_url="http://callback/complete")

495

496

# Register for external completion

497

activity.raise_complete_async()

498

# This line will never be reached

499

```

500

501

The external system would then complete the activity using the client:

502

503

```python

504

# External system completion

505

client = Client.connect("localhost:7233")

506

await client.complete_activity_by_task_token(

507

task_token=activity_task_token,

508

result="External completion result"

509

)

510

```

511

512

## Utility Functions

513

514

### Payload Converter Access

515

516

```python { .api }

517

def payload_converter() -> temporalio.converter.PayloadConverter:

518

"""Get the payload converter for the current activity.

519

520

This is often used for dynamic activities to convert payloads.

521

"""

522

```

523

524

```python

525

@activity.defn(dynamic=True)

526

async def dynamic_payload_activity(args: Sequence[temporalio.common.RawValue]) -> Any:

527

"""Dynamic activity that handles various payload types."""

528

converter = activity.payload_converter()

529

530

# Convert raw payloads to Python objects

531

decoded_args = []

532

for raw_value in args:

533

decoded = converter.from_payload(raw_value.payload)

534

decoded_args.append(decoded)

535

536

# Process based on argument types

537

if len(decoded_args) == 1 and isinstance(decoded_args[0], str):

538

return f"String processing: {decoded_args[0]}"

539

elif len(decoded_args) == 2 and all(isinstance(arg, int) for arg in decoded_args):

540

return f"Math result: {decoded_args[0] + decoded_args[1]}"

541

else:

542

return f"Generic processing: {decoded_args}"

543

```

544

545

### Metric Meter Access

546

547

```python { .api }

548

def metric_meter() -> temporalio.common.MetricMeter:

549

"""Get the metric meter for the current activity.

550

551

.. warning::

552

This is only available in async or synchronous threaded activities. An

553

error is raised on non-thread-based sync activities when trying to

554

access this.

555

556

Returns:

557

Current metric meter for this activity for recording metrics.

558

559

Raises:

560

RuntimeError: When not in an activity or in a non-thread-based

561

synchronous activity.

562

"""

563

```

564

565

```python

566

@activity.defn

567

async def metrics_activity(data_size: int) -> str:

568

"""Activity that records custom metrics."""

569

meter = activity.metric_meter()

570

571

# Create custom metrics

572

processing_counter = meter.create_counter(

573

"activity_items_processed",

574

"Number of items processed by activity"

575

)

576

577

processing_histogram = meter.create_histogram(

578

"activity_processing_duration",

579

"Time spent processing items"

580

)

581

582

start_time = time.time()

583

584

# Process data with metrics

585

for i in range(data_size):

586

await process_item(i)

587

processing_counter.add(1)

588

589

duration = time.time() - start_time

590

processing_histogram.record(duration)

591

592

return f"Processed {data_size} items in {duration:.2f}s"

593

```

594

595

## Core Classes and Types

596

597

### Info Class

598

599

```python { .api }

600

@dataclass(frozen=True)

601

class Info:

602

"""Information about the running activity.

603

604

Retrieved inside an activity via :py:func:`info`.

605

"""

606

607

activity_id: str

608

activity_type: str

609

attempt: int

610

current_attempt_scheduled_time: datetime

611

heartbeat_details: Sequence[Any]

612

heartbeat_timeout: Optional[timedelta]

613

is_local: bool

614

schedule_to_close_timeout: Optional[timedelta]

615

scheduled_time: datetime

616

start_to_close_timeout: Optional[timedelta]

617

started_time: datetime

618

task_queue: str

619

task_token: bytes

620

workflow_id: str

621

workflow_namespace: str

622

workflow_run_id: str

623

workflow_type: str

624

priority: temporalio.common.Priority

625

```

626

627

The Info class provides comprehensive information about the activity execution context:

628

629

```python

630

@activity.defn

631

async def info_logging_activity(data: str) -> str:

632

"""Activity that logs detailed execution information."""

633

info = activity.info()

634

635

# Log execution details

636

logger.info(f"Activity {info.activity_type} started")

637

logger.info(f"Attempt {info.attempt} of activity {info.activity_id}")

638

logger.info(f"Running for workflow {info.workflow_type}:{info.workflow_id}")

639

logger.info(f"Task queue: {info.task_queue}")

640

logger.info(f"Is local activity: {info.is_local}")

641

642

if info.heartbeat_timeout:

643

logger.info(f"Heartbeat timeout: {info.heartbeat_timeout}")

644

645

if info.start_to_close_timeout:

646

logger.info(f"Start-to-close timeout: {info.start_to_close_timeout}")

647

648

# Use priority information

649

if info.priority.priority_key:

650

logger.info(f"Priority key: {info.priority.priority_key}")

651

652

return f"Processed {data} in attempt {info.attempt}"

653

```

654

655

### Priority Class

656

657

```python { .api }

658

@dataclass(frozen=True)

659

class Priority:

660

"""Priority contains metadata that controls relative ordering of task processing when tasks are

661

backlogged in a queue."""

662

663

priority_key: Optional[int] = None

664

"""Priority key is a positive integer from 1 to n, where smaller integers correspond to higher

665

priorities (tasks run sooner)."""

666

667

fairness_key: Optional[str] = None

668

"""A short string (max 64 bytes) that is used as a key for a fairness balancing mechanism."""

669

670

fairness_weight: Optional[float] = None

671

"""A float that represents the weight for task dispatch for the associated fairness key."""

672

673

default: ClassVar[Priority]

674

"""Singleton default priority instance."""

675

```

676

677

### LoggerAdapter Class

678

679

```python { .api }

680

class LoggerAdapter(logging.LoggerAdapter):

681

"""Adapter that adds details to the log about the running activity.

682

683

Attributes:

684

activity_info_on_message: Boolean for whether a string representation of

685

a dict of some activity info will be appended to each message.

686

Default is True.

687

activity_info_on_extra: Boolean for whether a ``temporal_activity``

688

dictionary value will be added to the ``extra`` dictionary with some

689

activity info, making it present on the ``LogRecord.__dict__`` for

690

use by others. Default is True.

691

full_activity_info_on_extra: Boolean for whether an ``activity_info``

692

value will be added to the ``extra`` dictionary with the entire

693

activity info, making it present on the ``LogRecord.__dict__`` for

694

use by others. Default is False.

695

"""

696

697

def __init__(

698

self, logger: logging.Logger, extra: Optional[Mapping[str, Any]]

699

) -> None:

700

"""Create the logger adapter."""

701

702

def process(

703

self, msg: Any, kwargs: MutableMapping[str, Any]

704

) -> Tuple[Any, MutableMapping[str, Any]]:

705

"""Override to add activity details."""

706

707

@property

708

def base_logger(self) -> logging.Logger:

709

"""Underlying logger usable for actions such as adding

710

handlers/formatters.

711

"""

712

```

713

714

The SDK provides a pre-configured logger adapter:

715

716

```python { .api }

717

logger: LoggerAdapter

718

"""Logger that will have contextual activity details embedded."""

719

```

720

721

```python

722

@activity.defn

723

async def logging_activity(message: str) -> str:

724

"""Activity that demonstrates contextual logging."""

725

# Use the pre-configured logger with activity context

726

activity.logger.info(f"Processing message: {message}")

727

activity.logger.warning("This is a warning with activity context")

728

729

# Create custom logger adapter

730

custom_logger = activity.LoggerAdapter(

731

logging.getLogger("custom"),

732

{"custom_field": "custom_value"}

733

)

734

custom_logger.activity_info_on_message = False

735

custom_logger.full_activity_info_on_extra = True

736

737

custom_logger.info("Custom logging with full activity info in extra")

738

739

return f"Logged: {message}"

740

```

741

742

### Supporting Types

743

744

#### CallableType

745

746

```python { .api }

747

CallableType = TypeVar('CallableType', bound=Callable)

748

"""Type variable for callable functions used in activity definitions."""

749

```

750

751

#### RawValue

752

753

```python { .api }

754

@dataclass(frozen=True)

755

class RawValue:

756

"""Raw value container for dynamic activities."""

757

758

payload: temporalio.api.common.v1.Payload

759

"""The raw payload data."""

760

```

761

762

## Complete Activity Examples

763

764

### Robust Activity with All Features

765

766

```python

767

@activity.defn(name="robust_data_processor")

768

async def robust_activity(

769

data: List[dict],

770

batch_size: int = 10,

771

timeout_seconds: int = 300

772

) -> dict:

773

"""Comprehensive activity demonstrating all features."""

774

info = activity.info()

775

meter = activity.metric_meter()

776

777

# Create metrics

778

items_processed = meter.create_counter("items_processed", "Items processed")

779

processing_duration = meter.create_histogram("processing_duration", "Processing time")

780

781

activity.logger.info(f"Starting robust processing of {len(data)} items")

782

783

results = []

784

start_time = time.time()

785

786

try:

787

for i, item in enumerate(data):

788

# Check for various cancellation conditions

789

if activity.is_cancelled():

790

details = activity.cancellation_details()

791

if details and details.timed_out:

792

activity.logger.warning("Activity timed out, saving partial results")

793

await save_partial_results(results)

794

break

795

796

if activity.is_worker_shutdown():

797

activity.logger.warning("Worker shutting down, saving progress")

798

await save_partial_results(results)

799

break

800

801

# Process item with error handling

802

try:

803

result = await process_complex_item(item)

804

results.append(result)

805

items_processed.add(1)

806

except Exception as e:

807

activity.logger.error(f"Failed to process item {i}: {e}")

808

continue

809

810

# Send heartbeat every batch

811

if (i + 1) % batch_size == 0:

812

progress = {

813

"processed": len(results),

814

"total": len(data),

815

"success_rate": len(results) / (i + 1),

816

"elapsed_time": time.time() - start_time

817

}

818

activity.heartbeat(progress)

819

820

duration = time.time() - start_time

821

processing_duration.record(duration)

822

823

final_result = {

824

"total_items": len(data),

825

"processed_items": len(results),

826

"success_rate": len(results) / len(data) if data else 0,

827

"processing_time": duration,

828

"activity_info": {

829

"activity_id": info.activity_id,

830

"attempt": info.attempt,

831

"workflow_id": info.workflow_id

832

}

833

}

834

835

activity.logger.info(f"Successfully processed {len(results)}/{len(data)} items")

836

return final_result

837

838

except Exception as e:

839

activity.logger.error(f"Activity failed: {e}")

840

# Save whatever progress we made

841

await save_partial_results(results)

842

raise

843

```

844

845

This comprehensive activity.md sub-doc provides complete coverage of the temporalio.activity module, including all API signatures, detailed parameter documentation, usage examples, and comprehensive type definitions. The documentation follows the Knowledge Tile format and provides developers with everything needed to effectively use activities in their Temporal workflows.