or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

activity.mdclient.mdcommon.mdcontrib-pydantic.mddata-conversion.mdexceptions.mdindex.mdruntime.mdtesting.mdworker.mdworkflow.md

client.mddocs/

0

# Client Operations

1

2

The `temporalio.client` module provides the primary interface for connecting to Temporal servers and managing workflows, schedules, and activities. This module contains the main `Client` class that serves as the entry point for all client-side operations.

3

4

## Client Connection and Configuration

5

6

### Client Class

7

8

The `Client` class is the main interface for interacting with a Temporal server. It provides methods for workflow management, schedule operations, and activity handling.

9

10

```python { .api }

11

class Client:

12

"""Client for accessing Temporal.

13

14

Most users will use connect() to create a client. The service property provides

15

access to a raw gRPC client. Clients are not thread-safe and should only be used

16

in the event loop they are first connected in.

17

18

Clients do not work across forks since runtimes do not work across forks.

19

"""

20

```

21

22

### Connection Methods

23

24

#### Client.connect

25

26

```python { .api }

27

@staticmethod

28

async def connect(

29

target_host: str,

30

*,

31

namespace: str = "default",

32

api_key: Optional[str] = None,

33

data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,

34

plugins: Sequence[Plugin] = [],

35

interceptors: Sequence[Interceptor] = [],

36

default_workflow_query_reject_condition: Optional[temporalio.common.QueryRejectCondition] = None,

37

tls: Union[bool, TLSConfig] = False,

38

retry_config: Optional[RetryConfig] = None,

39

keep_alive_config: Optional[KeepAliveConfig] = KeepAliveConfig.default,

40

rpc_metadata: Mapping[str, str] = {},

41

identity: Optional[str] = None,

42

lazy: bool = False,

43

runtime: Optional[temporalio.runtime.Runtime] = None,

44

http_connect_proxy_config: Optional[HttpConnectProxyConfig] = None,

45

header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC,

46

) -> Client

47

```

48

49

**Parameters:**

50

- `target_host` (str): `host:port` for the Temporal server. For local development, this is often "localhost:7233"

51

- `namespace` (str): Namespace to use for client calls. Default: "default"

52

- `api_key` (Optional[str]): API key for Temporal. This becomes the "Authorization" HTTP header with "Bearer " prepended

53

- `data_converter` (temporalio.converter.DataConverter): Data converter to use for all data conversions to/from payloads

54

- `plugins` (Sequence[Plugin]): Set of plugins that are chained together to allow intercepting and modifying client creation and service connection

55

- `interceptors` (Sequence[Interceptor]): Set of interceptors that are chained together to allow intercepting of client calls

56

- `default_workflow_query_reject_condition` (Optional[temporalio.common.QueryRejectCondition]): The default rejection condition for workflow queries

57

- `tls` (Union[bool, TLSConfig]): TLS configuration. If False, do not use TLS. If True, use system default TLS configuration

58

- `retry_config` (Optional[RetryConfig]): Retry configuration for direct service calls or all high-level calls

59

- `keep_alive_config` (Optional[KeepAliveConfig]): Keep-alive configuration for the client connection

60

- `rpc_metadata` (Mapping[str, str]): Headers to use for all calls to the server

61

- `identity` (Optional[str]): Identity for this client. If unset, a default is created based on the version of the SDK

62

- `lazy` (bool): If true, the client will not connect until the first call is attempted

63

- `runtime` (Optional[temporalio.runtime.Runtime]): The runtime for this client, or the default if unset

64

- `http_connect_proxy_config` (Optional[HttpConnectProxyConfig]): Configuration for HTTP CONNECT proxy

65

- `header_codec_behavior` (HeaderCodecBehavior): Encoding behavior for headers sent by the client

66

67

**Returns:**

68

- `Client`: Connected Temporal client

69

70

**Example:**

71

```python

72

import temporalio.client

73

74

# Connect to local Temporal server

75

client = await temporalio.client.Client.connect("localhost:7233")

76

77

# Connect with TLS and authentication

78

client = await temporalio.client.Client.connect(

79

"my-temporal.example.com:7233",

80

namespace="production",

81

api_key="my-api-key",

82

tls=True

83

)

84

```

85

86

### ClientConfig

87

88

```python { .api }

89

class ClientConfig(TypedDict, total=False):

90

"""TypedDict of config originally passed to Client."""

91

92

service_client: Required[temporalio.service.ServiceClient]

93

namespace: Required[str]

94

data_converter: Required[temporalio.converter.DataConverter]

95

interceptors: Required[Sequence[Interceptor]]

96

default_workflow_query_reject_condition: Required[Optional[temporalio.common.QueryRejectCondition]]

97

header_codec_behavior: Required[HeaderCodecBehavior]

98

plugins: Required[Sequence[Plugin]]

99

```

100

101

### Client Properties

102

103

```python { .api }

104

@property

105

def service_client(self) -> temporalio.service.ServiceClient:

106

"""Raw gRPC service client."""

107

108

@property

109

def workflow_service(self) -> temporalio.service.WorkflowService:

110

"""Raw gRPC workflow service client."""

111

112

@property

113

def operator_service(self) -> temporalio.service.OperatorService:

114

"""Raw gRPC operator service client."""

115

116

@property

117

def test_service(self) -> temporalio.service.TestService:

118

"""Raw gRPC test service client."""

119

120

@property

121

def namespace(self) -> str:

122

"""Namespace used in calls by this client."""

123

124

@property

125

def identity(self) -> str:

126

"""Identity used in calls by this client."""

127

128

@property

129

def data_converter(self) -> temporalio.converter.DataConverter:

130

"""Data converter used by this client."""

131

132

@property

133

def rpc_metadata(self) -> Mapping[str, str]:

134

"""Headers for every call made by this client."""

135

136

@rpc_metadata.setter

137

def rpc_metadata(self, value: Mapping[str, str]) -> None:

138

"""Update the headers for this client."""

139

140

@property

141

def api_key(self) -> Optional[str]:

142

"""API key for every call made by this client."""

143

144

@api_key.setter

145

def api_key(self, value: Optional[str]) -> None:

146

"""Update the API key for this client."""

147

```

148

149

## Workflow Management

150

151

### Starting Workflows

152

153

The `start_workflow` method is used to start new workflow executions. It has several overloads to support different calling patterns.

154

155

#### start_workflow (typed method - no parameters)

156

157

```python { .api }

158

async def start_workflow(

159

self,

160

workflow: MethodAsyncNoParam[SelfType, ReturnType],

161

*,

162

id: str,

163

task_queue: str,

164

execution_timeout: Optional[timedelta] = None,

165

run_timeout: Optional[timedelta] = None,

166

task_timeout: Optional[timedelta] = None,

167

id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,

168

id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,

169

retry_policy: Optional[temporalio.common.RetryPolicy] = None,

170

cron_schedule: str = "",

171

memo: Optional[Mapping[str, Any]] = None,

172

search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,

173

static_summary: Optional[str] = None,

174

static_details: Optional[str] = None,

175

start_delay: Optional[timedelta] = None,

176

start_signal: Optional[str] = None,

177

start_signal_args: Sequence[Any] = [],

178

rpc_metadata: Mapping[str, str] = {},

179

rpc_timeout: Optional[timedelta] = None,

180

request_eager_start: bool = False,

181

priority: temporalio.common.Priority = temporalio.common.Priority.default,

182

versioning_override: Optional[temporalio.common.VersioningOverride] = None,

183

) -> WorkflowHandle[SelfType, ReturnType]

184

```

185

186

#### start_workflow (typed method - single parameter)

187

188

```python { .api }

189

async def start_workflow(

190

self,

191

workflow: MethodAsyncSingleParam[SelfType, ParamType, ReturnType],

192

arg: ParamType,

193

*,

194

id: str,

195

task_queue: str,

196

execution_timeout: Optional[timedelta] = None,

197

run_timeout: Optional[timedelta] = None,

198

task_timeout: Optional[timedelta] = None,

199

id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,

200

id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,

201

retry_policy: Optional[temporalio.common.RetryPolicy] = None,

202

cron_schedule: str = "",

203

memo: Optional[Mapping[str, Any]] = None,

204

search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,

205

static_summary: Optional[str] = None,

206

static_details: Optional[str] = None,

207

start_delay: Optional[timedelta] = None,

208

start_signal: Optional[str] = None,

209

start_signal_args: Sequence[Any] = [],

210

rpc_metadata: Mapping[str, str] = {},

211

rpc_timeout: Optional[timedelta] = None,

212

request_eager_start: bool = False,

213

priority: temporalio.common.Priority = temporalio.common.Priority.default,

214

versioning_override: Optional[temporalio.common.VersioningOverride] = None,

215

) -> WorkflowHandle[SelfType, ReturnType]

216

```

217

218

#### start_workflow (typed method - multiple parameters)

219

220

```python { .api }

221

async def start_workflow(

222

self,

223

workflow: Callable[Concatenate[SelfType, MultiParamSpec], Awaitable[ReturnType]],

224

*,

225

args: Sequence[Any],

226

id: str,

227

task_queue: str,

228

execution_timeout: Optional[timedelta] = None,

229

run_timeout: Optional[timedelta] = None,

230

task_timeout: Optional[timedelta] = None,

231

id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,

232

id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,

233

retry_policy: Optional[temporalio.common.RetryPolicy] = None,

234

cron_schedule: str = "",

235

memo: Optional[Mapping[str, Any]] = None,

236

search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,

237

static_summary: Optional[str] = None,

238

static_details: Optional[str] = None,

239

start_delay: Optional[timedelta] = None,

240

start_signal: Optional[str] = None,

241

start_signal_args: Sequence[Any] = [],

242

rpc_metadata: Mapping[str, str] = {},

243

rpc_timeout: Optional[timedelta] = None,

244

request_eager_start: bool = False,

245

priority: temporalio.common.Priority = temporalio.common.Priority.default,

246

versioning_override: Optional[temporalio.common.VersioningOverride] = None,

247

) -> WorkflowHandle[SelfType, ReturnType]

248

```

249

250

#### start_workflow (string name)

251

252

```python { .api }

253

async def start_workflow(

254

self,

255

workflow: str,

256

arg: Any = temporalio.common._arg_unset,

257

*,

258

args: Sequence[Any] = [],

259

id: str,

260

task_queue: str,

261

result_type: Optional[Type] = None,

262

execution_timeout: Optional[timedelta] = None,

263

run_timeout: Optional[timedelta] = None,

264

task_timeout: Optional[timedelta] = None,

265

id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,

266

id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,

267

retry_policy: Optional[temporalio.common.RetryPolicy] = None,

268

cron_schedule: str = "",

269

memo: Optional[Mapping[str, Any]] = None,

270

search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,

271

static_summary: Optional[str] = None,

272

static_details: Optional[str] = None,

273

start_delay: Optional[timedelta] = None,

274

start_signal: Optional[str] = None,

275

start_signal_args: Sequence[Any] = [],

276

rpc_metadata: Mapping[str, str] = {},

277

rpc_timeout: Optional[timedelta] = None,

278

request_eager_start: bool = False,

279

priority: temporalio.common.Priority = temporalio.common.Priority.default,

280

versioning_override: Optional[temporalio.common.VersioningOverride] = None,

281

) -> WorkflowHandle[Any, Any]

282

```

283

284

**Common Parameters:**

285

- `workflow`: The workflow to start (method reference or string name)

286

- `id` (str): Unique workflow ID

287

- `task_queue` (str): Task queue to run the workflow on

288

- `execution_timeout` (Optional[timedelta]): Timeout for the entire workflow execution

289

- `run_timeout` (Optional[timedelta]): Timeout for a single workflow run

290

- `task_timeout` (Optional[timedelta]): Timeout for individual workflow tasks

291

- `id_reuse_policy` (temporalio.common.WorkflowIDReusePolicy): Policy for reusing workflow IDs

292

- `id_conflict_policy` (temporalio.common.WorkflowIDConflictPolicy): Policy for handling workflow ID conflicts

293

- `retry_policy` (Optional[temporalio.common.RetryPolicy]): Retry policy for the workflow

294

- `cron_schedule` (str): Cron schedule for recurring workflows

295

- `memo` (Optional[Mapping[str, Any]]): Memo to attach to the workflow

296

- `search_attributes` (Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]]): Search attributes for the workflow

297

- `static_summary` (Optional[str]): Static summary for the workflow

298

- `static_details` (Optional[str]): Static details for the workflow

299

- `start_delay` (Optional[timedelta]): Delay before starting the workflow

300

- `start_signal` (Optional[str]): Signal to send when starting the workflow

301

- `start_signal_args` (Sequence[Any]): Arguments for the start signal

302

- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call

303

- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call

304

- `request_eager_start` (bool): Request eager workflow start

305

- `priority` (temporalio.common.Priority): Workflow priority

306

- `versioning_override` (Optional[temporalio.common.VersioningOverride]): Versioning override for the workflow

307

308

**Returns:**

309

- `WorkflowHandle`: Handle to the started workflow

310

311

### Executing Workflows

312

313

Similar to starting workflows, the client provides `execute_workflow` methods that start a workflow and wait for its result.

314

315

```python { .api }

316

async def execute_workflow(

317

self,

318

workflow: Union[str, Callable[..., Awaitable[Any]]],

319

arg: Any = temporalio.common._arg_unset,

320

*,

321

args: Sequence[Any] = [],

322

id: str,

323

task_queue: str,

324

result_type: Optional[Type] = None,

325

execution_timeout: Optional[timedelta] = None,

326

run_timeout: Optional[timedelta] = None,

327

task_timeout: Optional[timedelta] = None,

328

id_reuse_policy: temporalio.common.WorkflowIDReusePolicy = temporalio.common.WorkflowIDReusePolicy.ALLOW_DUPLICATE,

329

id_conflict_policy: temporalio.common.WorkflowIDConflictPolicy = temporalio.common.WorkflowIDConflictPolicy.UNSPECIFIED,

330

retry_policy: Optional[temporalio.common.RetryPolicy] = None,

331

cron_schedule: str = "",

332

memo: Optional[Mapping[str, Any]] = None,

333

search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,

334

static_summary: Optional[str] = None,

335

static_details: Optional[str] = None,

336

start_delay: Optional[timedelta] = None,

337

start_signal: Optional[str] = None,

338

start_signal_args: Sequence[Any] = [],

339

rpc_metadata: Mapping[str, str] = {},

340

rpc_timeout: Optional[timedelta] = None,

341

request_eager_start: bool = False,

342

priority: temporalio.common.Priority = temporalio.common.Priority.default,

343

versioning_override: Optional[temporalio.common.VersioningOverride] = None,

344

) -> Any

345

```

346

347

### Getting Workflow Handles

348

349

#### get_workflow_handle

350

351

```python { .api }

352

def get_workflow_handle(

353

self,

354

workflow_id: str,

355

*,

356

run_id: Optional[str] = None,

357

first_execution_run_id: Optional[str] = None,

358

result_type: Optional[Type] = None,

359

) -> WorkflowHandle[Any, Any]

360

```

361

362

**Parameters:**

363

- `workflow_id` (str): Workflow ID to get a handle to

364

- `run_id` (Optional[str]): Run ID that will be used for all calls

365

- `first_execution_run_id` (Optional[str]): First execution run ID used for cancellation and termination

366

- `result_type` (Optional[Type]): The result type to deserialize into if known

367

368

**Returns:**

369

- `WorkflowHandle[Any, Any]`: Handle to the workflow

370

371

#### get_workflow_handle_for

372

373

```python { .api }

374

def get_workflow_handle_for(

375

self,

376

workflow: Union[MethodAsyncNoParam[SelfType, ReturnType], MethodAsyncSingleParam[SelfType, Any, ReturnType]],

377

workflow_id: str,

378

*,

379

run_id: Optional[str] = None,

380

first_execution_run_id: Optional[str] = None,

381

) -> WorkflowHandle[SelfType, ReturnType]

382

```

383

384

**Parameters:**

385

- `workflow`: The workflow run method to use for typing the handle

386

- `workflow_id` (str): Workflow ID to get a handle to

387

- `run_id` (Optional[str]): Run ID that will be used for all calls

388

- `first_execution_run_id` (Optional[str]): First execution run ID used for cancellation and termination

389

390

**Returns:**

391

- `WorkflowHandle[SelfType, ReturnType]`: Typed handle to the workflow

392

393

### WorkflowHandle

394

395

```python { .api }

396

class WorkflowHandle(Generic[SelfType, ReturnType]):

397

"""Handle for interacting with a workflow.

398

399

This is usually created via Client.get_workflow_handle or

400

returned from Client.start_workflow.

401

"""

402

403

def __init__(

404

self,

405

client: Client,

406

id: str,

407

*,

408

run_id: Optional[str] = None,

409

result_run_id: Optional[str] = None,

410

first_execution_run_id: Optional[str] = None,

411

result_type: Optional[Type] = None,

412

start_workflow_response: Optional[Union[temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse, temporalio.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse]] = None,

413

)

414

```

415

416

#### WorkflowHandle Properties

417

418

```python { .api }

419

@property

420

def id(self) -> str:

421

"""ID of the workflow."""

422

423

@property

424

def run_id(self) -> Optional[str]:

425

"""Run ID used for most calls on this handle."""

426

427

@property

428

def result_run_id(self) -> Optional[str]:

429

"""Run ID used for result calls on this handle."""

430

431

@property

432

def first_execution_run_id(self) -> Optional[str]:

433

"""Run ID of the first execution used for cancel and terminate calls."""

434

```

435

436

#### WorkflowHandle.result

437

438

```python { .api }

439

async def result(

440

self,

441

*,

442

follow_runs: bool = True,

443

rpc_metadata: Mapping[str, str] = {},

444

rpc_timeout: Optional[timedelta] = None,

445

) -> ReturnType

446

```

447

448

**Parameters:**

449

- `follow_runs` (bool): Whether to follow workflow runs if the workflow continues as new

450

- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call

451

- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call

452

453

**Returns:**

454

- `ReturnType`: The workflow result

455

456

### Continue-as-New Workflows

457

458

Continue-as-new workflows are handled automatically by the client when following runs in the `result()` method.

459

460

**Example:**

461

```python

462

# Start a workflow

463

handle = await client.start_workflow(

464

MyWorkflow.run,

465

"input_data",

466

id="my-workflow-id",

467

task_queue="my-task-queue"

468

)

469

470

# Get the result (will follow continue-as-new workflows by default)

471

result = await handle.result()

472

```

473

474

## Workflow Interaction

475

476

### Signaling Workflows

477

478

#### WorkflowHandle.signal

479

480

```python { .api }

481

async def signal(

482

self,

483

signal: Union[MethodSyncOrAsyncNoParam[SelfType], str],

484

arg: Any = temporalio.common._arg_unset,

485

*,

486

args: Sequence[Any] = [],

487

rpc_metadata: Mapping[str, str] = {},

488

rpc_timeout: Optional[timedelta] = None,

489

) -> None

490

```

491

492

**Parameters:**

493

- `signal`: The signal method to call or signal name as string

494

- `arg`: Single argument for the signal

495

- `args` (Sequence[Any]): Multiple arguments for the signal

496

- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call

497

- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call

498

499

**Example:**

500

```python

501

# Signal a workflow using typed method

502

await handle.signal(MyWorkflow.my_signal, "signal_data")

503

504

# Signal a workflow using string name

505

await handle.signal("my_signal", "signal_data")

506

```

507

508

### Querying Workflows

509

510

#### WorkflowHandle.query

511

512

```python { .api }

513

async def query(

514

self,

515

query: Union[MethodSyncOrAsyncNoParam[SelfType, ReturnType], str],

516

arg: Any = temporalio.common._arg_unset,

517

*,

518

args: Sequence[Any] = [],

519

reject_condition: Optional[temporalio.common.QueryRejectCondition] = None,

520

rpc_metadata: Mapping[str, str] = {},

521

rpc_timeout: Optional[timedelta] = None,

522

) -> Any

523

```

524

525

**Parameters:**

526

- `query`: The query method to call or query name as string

527

- `arg`: Single argument for the query

528

- `args` (Sequence[Any]): Multiple arguments for the query

529

- `reject_condition` (Optional[temporalio.common.QueryRejectCondition]): Condition for rejecting the query

530

- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call

531

- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call

532

533

**Returns:**

534

- `Any`: The query result

535

536

### Workflow Updates

537

538

#### Executing Updates

539

540

```python { .api }

541

async def execute_update(

542

self,

543

update: Union[MethodSyncOrAsyncNoParam[SelfType, ReturnType], str],

544

arg: Any = temporalio.common._arg_unset,

545

*,

546

args: Sequence[Any] = [],

547

id: Optional[str] = None,

548

first_execution_run_id: Optional[str] = None,

549

wait_for_stage: WorkflowUpdateStage = WorkflowUpdateStage.COMPLETED,

550

rpc_metadata: Mapping[str, str] = {},

551

rpc_timeout: Optional[timedelta] = None,

552

) -> Any

553

```

554

555

**Parameters:**

556

- `update`: The update method to call or update name as string

557

- `arg`: Single argument for the update

558

- `args` (Sequence[Any]): Multiple arguments for the update

559

- `id` (Optional[str]): Update ID

560

- `first_execution_run_id` (Optional[str]): First execution run ID

561

- `wait_for_stage` (WorkflowUpdateStage): Stage to wait for before returning

562

- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call

563

- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call

564

565

**Returns:**

566

- `Any`: The update result

567

568

#### Starting Updates

569

570

```python { .api }

571

async def start_update(

572

self,

573

update: Union[MethodSyncOrAsyncNoParam[SelfType, ReturnType], str],

574

arg: Any = temporalio.common._arg_unset,

575

*,

576

args: Sequence[Any] = [],

577

id: Optional[str] = None,

578

first_execution_run_id: Optional[str] = None,

579

wait_for_stage: WorkflowUpdateStage = WorkflowUpdateStage.ACCEPTED,

580

rpc_metadata: Mapping[str, str] = {},

581

rpc_timeout: Optional[timedelta] = None,

582

) -> WorkflowUpdateHandle[Any, Any]

583

```

584

585

#### Getting Update Handles

586

587

```python { .api }

588

def get_update_handle(

589

self,

590

id: str,

591

*,

592

first_execution_run_id: Optional[str] = None,

593

result_type: Optional[Type] = None,

594

) -> WorkflowUpdateHandle[SelfType, Any]

595

```

596

597

```python { .api }

598

def get_update_handle_for(

599

self,

600

update: Union[MethodSyncOrAsyncNoParam[SelfType, ReturnType], MethodSyncOrAsyncSingleParam[SelfType, Any, ReturnType]],

601

id: str,

602

*,

603

first_execution_run_id: Optional[str] = None,

604

) -> WorkflowUpdateHandle[SelfType, ReturnType]

605

```

606

607

### WorkflowUpdateHandle

608

609

```python { .api }

610

class WorkflowUpdateHandle(Generic[SelfType, ReturnType]):

611

"""Handle for interacting with a workflow update."""

612

613

@property

614

def id(self) -> str:

615

"""ID of the update."""

616

617

@property

618

def workflow_id(self) -> str:

619

"""Workflow ID this update is for."""

620

621

@property

622

def workflow_run_id(self) -> Optional[str]:

623

"""Run ID of the workflow this update is for."""

624

625

async def result(

626

self,

627

*,

628

rpc_metadata: Mapping[str, str] = {},

629

rpc_timeout: Optional[timedelta] = None,

630

) -> ReturnType:

631

"""Get the result of the update."""

632

```

633

634

### WorkflowUpdateStage

635

636

```python { .api }

637

class WorkflowUpdateStage(IntEnum):

638

"""Stage of workflow update execution."""

639

640

ADMITTED = 1

641

ACCEPTED = 2

642

COMPLETED = 3

643

```

644

645

### Canceling and Terminating Workflows

646

647

#### WorkflowHandle.cancel

648

649

```python { .api }

650

async def cancel(

651

self,

652

*,

653

rpc_metadata: Mapping[str, str] = {},

654

rpc_timeout: Optional[timedelta] = None,

655

) -> None

656

```

657

658

**Parameters:**

659

- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call

660

- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call

661

662

#### WorkflowHandle.terminate

663

664

```python { .api }

665

async def terminate(

666

self,

667

*,

668

reason: Optional[str] = None,

669

details: Sequence[Any] = [],

670

rpc_metadata: Mapping[str, str] = {},

671

rpc_timeout: Optional[timedelta] = None,

672

) -> None

673

```

674

675

**Parameters:**

676

- `reason` (Optional[str]): Reason for terminating the workflow

677

- `details` (Sequence[Any]): Additional details about the termination

678

- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call

679

- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call

680

681

**Example:**

682

```python

683

# Cancel a workflow

684

await handle.cancel()

685

686

# Terminate a workflow with reason

687

await handle.terminate(reason="Manual termination", details=["User requested"])

688

```

689

690

## Schedule Management

691

692

### Creating and Managing Schedules

693

694

#### Client.create_schedule

695

696

```python { .api }

697

async def create_schedule(

698

self,

699

id: str,

700

schedule: Schedule,

701

*,

702

trigger_immediately: bool = False,

703

backfill: Sequence[ScheduleBackfill] = [],

704

memo: Optional[Mapping[str, Any]] = None,

705

search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None,

706

rpc_metadata: Mapping[str, str] = {},

707

rpc_timeout: Optional[timedelta] = None,

708

) -> ScheduleHandle

709

```

710

711

**Parameters:**

712

- `id` (str): Schedule ID

713

- `schedule` (Schedule): The schedule definition

714

- `trigger_immediately` (bool): Whether to trigger the schedule immediately upon creation

715

- `backfill` (Sequence[ScheduleBackfill]): Backfill requests for the schedule

716

- `memo` (Optional[Mapping[str, Any]]): Memo to attach to the schedule

717

- `search_attributes` (Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]]): Search attributes for the schedule

718

- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call

719

- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call

720

721

**Returns:**

722

- `ScheduleHandle`: Handle to the created schedule

723

724

#### Client.get_schedule_handle

725

726

```python { .api }

727

def get_schedule_handle(self, id: str) -> ScheduleHandle

728

```

729

730

**Parameters:**

731

- `id` (str): Schedule ID

732

733

**Returns:**

734

- `ScheduleHandle`: Handle to the schedule

735

736

### ScheduleHandle

737

738

```python { .api }

739

class ScheduleHandle:

740

"""Handle for interacting with a schedule.

741

742

This is usually created via Client.get_schedule_handle or

743

returned from Client.create_schedule.

744

"""

745

746

def __init__(self, client: Client, id: str) -> None:

747

"""Create schedule handle."""

748

749

@property

750

def id(self) -> str:

751

"""ID of the schedule."""

752

```

753

754

#### ScheduleHandle Operations

755

756

```python { .api }

757

async def backfill(

758

self,

759

*backfill: ScheduleBackfill,

760

rpc_metadata: Mapping[str, str] = {},

761

rpc_timeout: Optional[timedelta] = None,

762

) -> None:

763

"""Backfill the schedule."""

764

765

async def delete(

766

self,

767

*,

768

rpc_metadata: Mapping[str, str] = {},

769

rpc_timeout: Optional[timedelta] = None,

770

) -> None:

771

"""Delete the schedule."""

772

773

async def describe(

774

self,

775

*,

776

rpc_metadata: Mapping[str, str] = {},

777

rpc_timeout: Optional[timedelta] = None,

778

) -> ScheduleDescription:

779

"""Get description of the schedule."""

780

781

async def pause(

782

self,

783

*,

784

note: Optional[str] = None,

785

rpc_metadata: Mapping[str, str] = {},

786

rpc_timeout: Optional[timedelta] = None,

787

) -> None:

788

"""Pause the schedule."""

789

790

async def trigger(

791

self,

792

*,

793

overlap_policy: Optional[ScheduleOverlapPolicy] = None,

794

rpc_metadata: Mapping[str, str] = {},

795

rpc_timeout: Optional[timedelta] = None,

796

) -> None:

797

"""Manually trigger the schedule."""

798

799

async def unpause(

800

self,

801

*,

802

note: Optional[str] = None,

803

rpc_metadata: Mapping[str, str] = {},

804

rpc_timeout: Optional[timedelta] = None,

805

) -> None:

806

"""Unpause the schedule."""

807

808

async def update(

809

self,

810

updater: Callable[[ScheduleUpdateInput], None],

811

*,

812

rpc_metadata: Mapping[str, str] = {},

813

rpc_timeout: Optional[timedelta] = None,

814

) -> None:

815

"""Update the schedule."""

816

```

817

818

### Schedule Types

819

820

#### Schedule

821

822

```python { .api }

823

@dataclass

824

class Schedule:

825

"""Complete schedule definition."""

826

827

action: ScheduleAction

828

spec: ScheduleSpec

829

policy: SchedulePolicy = dataclasses.field(default_factory=SchedulePolicy)

830

state: ScheduleState = dataclasses.field(default_factory=ScheduleState)

831

```

832

833

#### ScheduleSpec

834

835

```python { .api }

836

@dataclass

837

class ScheduleSpec:

838

"""Specification for when to run a scheduled action."""

839

840

calendars: Sequence[ScheduleCalendarSpec] = ()

841

intervals: Sequence[ScheduleIntervalSpec] = ()

842

cron_expressions: Sequence[str] = ()

843

skip: Sequence[ScheduleCalendarSpec] = ()

844

start_at: Optional[datetime] = None

845

end_at: Optional[datetime] = None

846

jitter: Optional[timedelta] = None

847

time_zone_name: str = "UTC"

848

```

849

850

#### ScheduleAction

851

852

```python { .api }

853

class ScheduleAction(ABC):

854

"""Base class for schedule actions."""

855

856

@abstractmethod

857

def _to_proto(self) -> temporalio.api.schedule.v1.ScheduleAction:

858

"""Convert to proto."""

859

```

860

861

#### ScheduleActionStartWorkflow

862

863

```python { .api }

864

@dataclass

865

class ScheduleActionStartWorkflow(ScheduleAction):

866

"""Schedule action that starts a workflow."""

867

868

workflow: str

869

args: Sequence[Any] = ()

870

id: str = ""

871

task_queue: str = ""

872

execution_timeout: Optional[timedelta] = None

873

run_timeout: Optional[timedelta] = None

874

task_timeout: Optional[timedelta] = None

875

retry_policy: Optional[temporalio.common.RetryPolicy] = None

876

memo: Optional[Mapping[str, Any]] = None

877

search_attributes: Optional[Union[temporalio.common.TypedSearchAttributes, temporalio.common.SearchAttributes]] = None

878

headers: Optional[Mapping[str, temporalio.api.common.v1.Payload]] = None

879

```

880

881

#### SchedulePolicy

882

883

```python { .api }

884

@dataclass

885

class SchedulePolicy:

886

"""General schedule policies."""

887

888

overlap: ScheduleOverlapPolicy = ScheduleOverlapPolicy.UNSPECIFIED

889

catchup_window: Optional[timedelta] = None

890

pause_on_failure: bool = False

891

```

892

893

#### ScheduleOverlapPolicy

894

895

```python { .api }

896

class ScheduleOverlapPolicy(IntEnum):

897

"""Policy for overlapping schedule executions."""

898

899

UNSPECIFIED = 0

900

SKIP = 1

901

BUFFER_ONE = 2

902

BUFFER_ALL = 3

903

CANCEL_OTHER = 4

904

TERMINATE_OTHER = 5

905

ALLOW_ALL = 6

906

```

907

908

**Example:**

909

```python

910

import temporalio.client

911

from temporalio.client import ScheduleActionStartWorkflow, ScheduleSpec, Schedule, SchedulePolicy

912

913

# Create a schedule to run a workflow every hour

914

schedule = Schedule(

915

action=ScheduleActionStartWorkflow(

916

workflow="MyWorkflow",

917

args=["scheduled_data"],

918

id="scheduled-workflow-{{}}", # Double braces for template

919

task_queue="my-task-queue"

920

),

921

spec=ScheduleSpec(

922

intervals=[timedelta(hours=1)]

923

),

924

policy=SchedulePolicy()

925

)

926

927

# Create the schedule

928

schedule_handle = await client.create_schedule("my-schedule", schedule)

929

930

# Trigger the schedule manually

931

await schedule_handle.trigger()

932

```

933

934

## Worker Build Management

935

936

### Build ID Compatibility

937

938

#### Client.update_worker_build_id_compatibility

939

940

```python { .api }

941

async def update_worker_build_id_compatibility(

942

self,

943

task_queue: str,

944

operation: BuildIdOp,

945

*,

946

rpc_metadata: Mapping[str, str] = {},

947

rpc_timeout: Optional[timedelta] = None,

948

) -> None

949

```

950

951

**Parameters:**

952

- `task_queue` (str): Task queue to update build ID compatibility for

953

- `operation` (BuildIdOp): Build ID operation to perform

954

- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call

955

- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call

956

957

#### Client.get_worker_build_id_compatibility

958

959

```python { .api }

960

async def get_worker_build_id_compatibility(

961

self,

962

task_queue: str,

963

*,

964

max_sets: int = 0,

965

rpc_metadata: Mapping[str, str] = {},

966

rpc_timeout: Optional[timedelta] = None,

967

) -> WorkerBuildIdVersionSets

968

```

969

970

**Parameters:**

971

- `task_queue` (str): Task queue to get build ID compatibility for

972

- `max_sets` (int): Maximum number of sets to return. 0 means no limit

973

- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call

974

- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call

975

976

**Returns:**

977

- `WorkerBuildIdVersionSets`: Build ID version sets

978

979

### Task Reachability

980

981

#### Client.get_worker_task_reachability

982

983

```python { .api }

984

async def get_worker_task_reachability(

985

self,

986

*,

987

build_ids: Sequence[str] = [],

988

task_queues: Sequence[str] = [],

989

reachability_type: WorkerTaskReachabilityType = WorkerTaskReachabilityType.UNSPECIFIED,

990

rpc_metadata: Mapping[str, str] = {},

991

rpc_timeout: Optional[timedelta] = None,

992

) -> WorkerTaskReachability

993

```

994

995

**Parameters:**

996

- `build_ids` (Sequence[str]): Build IDs to check reachability for

997

- `task_queues` (Sequence[str]): Task queues to check reachability for

998

- `reachability_type` (WorkerTaskReachabilityType): Type of reachability to check

999

- `rpc_metadata` (Mapping[str, str]): RPC metadata for this call

1000

- `rpc_timeout` (Optional[timedelta]): Timeout for the RPC call

1001

1002

**Returns:**

1003

- `WorkerTaskReachability`: Task reachability information

1004

1005

### Worker Build Types

1006

1007

#### WorkerBuildIdVersionSets

1008

1009

```python { .api }

1010

@dataclass

1011

class WorkerBuildIdVersionSets:

1012

"""Worker build ID version sets."""

1013

1014

version_sets: Sequence[BuildIdVersionSet]

1015

```

1016

1017

#### BuildIdVersionSet

1018

1019

```python { .api }

1020

@dataclass

1021

class BuildIdVersionSet:

1022

"""A single build ID version set."""

1023

1024

build_ids: Sequence[str]

1025

is_default: bool

1026

```

1027

1028

#### BuildIdOp

1029

1030

```python { .api }

1031

class BuildIdOp(ABC):

1032

"""Base class for build ID operations."""

1033

1034

@abstractmethod

1035

def _to_proto(self) -> temporalio.api.taskqueue.v1.BuildIdOp:

1036

"""Convert to proto."""

1037

```

1038

1039

#### WorkerTaskReachability

1040

1041

```python { .api }

1042

@dataclass

1043

class WorkerTaskReachability:

1044

"""Worker task reachability information."""

1045

1046

build_id_reachability: Mapping[str, BuildIdReachability]

1047

task_queue_reachability: Mapping[str, TaskQueueReachability]

1048

```

1049

1050

### Version Management

1051

1052

Version management is handled through the build ID compatibility system. Workers can be updated to new versions while maintaining compatibility with existing workflows.

1053

1054

**Example:**

1055

```python

1056

# Update build ID compatibility to add a new version

1057

from temporalio.client import AddNewIdInNewDefaultSet

1058

1059

await client.update_worker_build_id_compatibility(

1060

"my-task-queue",

1061

AddNewIdInNewDefaultSet("build-id-v2")

1062

)

1063

1064

# Get current build ID compatibility

1065

compatibility = await client.get_worker_build_id_compatibility("my-task-queue")

1066

```

1067

1068

## Async Activity Management

1069

1070

### AsyncActivityHandle

1071

1072

```python { .api }

1073

class AsyncActivityHandle:

1074

"""Handle representing an external activity for completion and heartbeat."""

1075

1076

def __init__(

1077

self,

1078

client: Client,

1079

id_or_token: Union[AsyncActivityIDReference, bytes]

1080

) -> None:

1081

"""Create an async activity handle."""

1082

```

1083

1084

#### AsyncActivityHandle Methods

1085

1086

```python { .api }

1087

async def heartbeat(

1088

self,

1089

*details: Any,

1090

rpc_metadata: Mapping[str, str] = {},

1091

rpc_timeout: Optional[timedelta] = None,

1092

) -> None:

1093

"""Send a heartbeat for the activity."""

1094

1095

async def complete(

1096

self,

1097

result: Any = None,

1098

*,

1099

rpc_metadata: Mapping[str, str] = {},

1100

rpc_timeout: Optional[timedelta] = None,

1101

) -> None:

1102

"""Complete the activity with a result."""

1103

1104

async def fail(

1105

self,

1106

exception: BaseException,

1107

*,

1108

last_heartbeat_details: Sequence[Any] = [],

1109

rpc_metadata: Mapping[str, str] = {},

1110

rpc_timeout: Optional[timedelta] = None,

1111

) -> None:

1112

"""Fail the activity with an exception."""

1113

1114

async def report_cancellation(

1115

self,

1116

*details: Any,

1117

rpc_metadata: Mapping[str, str] = {},

1118

rpc_timeout: Optional[timedelta] = None,

1119

) -> None:

1120

"""Report that the activity was cancelled."""

1121

```

1122

1123

### Getting Async Activity Handles

1124

1125

#### Client.get_async_activity_handle (by ID and task queue)

1126

1127

```python { .api }

1128

def get_async_activity_handle(

1129

self,

1130

*,

1131

activity_id: str,

1132

task_queue: str,

1133

workflow_namespace: Optional[str] = None,

1134

) -> AsyncActivityHandle

1135

```

1136

1137

**Parameters:**

1138

- `activity_id` (str): ID of the async activity

1139

- `task_queue` (str): Task queue the activity is running on

1140

- `workflow_namespace` (Optional[str]): Workflow namespace if different from client namespace

1141

1142

**Returns:**

1143

- `AsyncActivityHandle`: Handle to the async activity

1144

1145

#### Client.get_async_activity_handle (by task token)

1146

1147

```python { .api }

1148

def get_async_activity_handle(

1149

self,

1150

*,

1151

task_token: bytes

1152

) -> AsyncActivityHandle

1153

```

1154

1155

**Parameters:**

1156

- `task_token` (bytes): Task token for the async activity

1157

1158

**Returns:**

1159

- `AsyncActivityHandle`: Handle to the async activity

1160

1161

### AsyncActivityIDReference

1162

1163

```python { .api }

1164

@dataclass

1165

class AsyncActivityIDReference:

1166

"""Reference to an async activity by ID."""

1167

1168

activity_id: str

1169

task_queue: str

1170

workflow_namespace: Optional[str] = None

1171

```

1172

1173

### Activity Completion and Failure

1174

1175

Async activities can be completed or failed from external processes using the activity handle.

1176

1177

**Example:**

1178

```python

1179

# Get an async activity handle by ID

1180

activity_handle = client.get_async_activity_handle(

1181

activity_id="my-activity-123",

1182

task_queue="my-task-queue"

1183

)

1184

1185

# Send a heartbeat

1186

await activity_handle.heartbeat("Still processing...")

1187

1188

# Complete the activity

1189

await activity_handle.complete("Activity completed successfully")

1190

1191

# Or fail the activity

1192

try:

1193

# Some processing that might fail

1194

pass

1195

except Exception as e:

1196

await activity_handle.fail(e)

1197

```

1198

1199

**Example with task token:**

1200

```python

1201

# Get an async activity handle by task token

1202

activity_handle = client.get_async_activity_handle(task_token=task_token_bytes)

1203

1204

# Report cancellation

1205

await activity_handle.report_cancellation("User cancelled the operation")

1206

```