or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

activity.mdclient.mdcommon.mdcontrib-pydantic.mddata-conversion.mdexceptions.mdindex.mdruntime.mdtesting.mdworker.mdworkflow.md

contrib-pydantic.mddocs/

0

# Pydantic Integration

1

2

Pydantic v2 data converter for seamless serialization and validation of Pydantic models in Temporal workflows and activities. This integration provides automatic JSON conversion with type validation for all Pydantic-supported types.

3

4

## Capabilities

5

6

### Pydantic Data Converter

7

8

Ready-to-use data converter that automatically handles Pydantic models and types.

9

10

```python { .api }

11

pydantic_data_converter = DataConverter(

12

payload_converter_class=PydanticPayloadConverter

13

)

14

"""Pydantic data converter.

15

16

Supports conversion of all types supported by Pydantic to and from JSON.

17

18

In addition to Pydantic models, these include all `json.dump`-able types,

19

various non-`json.dump`-able standard library types such as dataclasses,

20

types from the datetime module, sets, UUID, etc, and custom types composed

21

of any of these.

22

23

To use, pass as the ``data_converter`` argument of :py:class:`temporalio.client.Client`

24

"""

25

```

26

27

### Payload Converters

28

29

Specialized payload converters for Pydantic JSON serialization with customizable options.

30

31

```python { .api }

32

@dataclass

33

class ToJsonOptions:

34

exclude_unset: bool = False

35

36

class PydanticJSONPlainPayloadConverter(EncodingPayloadConverter):

37

def __init__(self, to_json_options: Optional[ToJsonOptions] = None): ...

38

39

@property

40

def encoding(self) -> str: ...

41

42

def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]: ...

43

44

def from_payload(

45

self,

46

payload: temporalio.api.common.v1.Payload,

47

type_hint: Optional[Type] = None,

48

) -> Any: ...

49

50

class PydanticPayloadConverter(CompositePayloadConverter):

51

def __init__(self, to_json_options: Optional[ToJsonOptions] = None): ...

52

```

53

54

## Usage Examples

55

56

### Basic Usage

57

58

Using the pre-configured Pydantic data converter with clients and workers.

59

60

```python

61

from temporalio.contrib.pydantic import pydantic_data_converter

62

from temporalio.client import Client

63

from temporalio.worker import Worker

64

from pydantic import BaseModel

65

from datetime import datetime

66

from typing import Optional

67

68

# Define Pydantic models

69

class UserProfile(BaseModel):

70

user_id: str

71

email: str

72

full_name: str

73

created_at: datetime

74

is_active: bool = True

75

metadata: Optional[dict] = None

76

77

class OrderData(BaseModel):

78

order_id: str

79

user: UserProfile

80

items: list[str]

81

total_amount: float

82

order_date: datetime

83

84

# Create client with Pydantic data converter

85

client = await Client.connect(

86

"localhost:7233",

87

data_converter=pydantic_data_converter

88

)

89

90

# Create worker with Pydantic data converter

91

worker = Worker(

92

client,

93

task_queue="my-queue",

94

workflows=[OrderWorkflow],

95

activities=[process_order],

96

data_converter=pydantic_data_converter

97

)

98

```

99

100

### Workflow and Activity with Pydantic Models

101

102

Automatic serialization and validation of complex data structures.

103

104

```python

105

from temporalio import workflow, activity

106

from temporalio.contrib.pydantic import pydantic_data_converter

107

from pydantic import BaseModel, Field, validator

108

from datetime import datetime, timedelta

109

from typing import List, Optional

110

from uuid import uuid4

111

112

class Address(BaseModel):

113

street: str

114

city: str

115

state: str

116

zip_code: str = Field(min_length=5, max_length=10)

117

country: str = "US"

118

119

class Customer(BaseModel):

120

customer_id: str = Field(default_factory=lambda: str(uuid4()))

121

name: str = Field(min_length=1, max_length=100)

122

email: str = Field(regex=r'^[^@]+@[^@]+\.[^@]+$')

123

phone: Optional[str] = None

124

address: Address

125

created_at: datetime = Field(default_factory=datetime.utcnow)

126

127

@validator('phone')

128

def validate_phone(cls, v):

129

if v and not v.replace('-', '').replace(' ', '').isdigit():

130

raise ValueError('Invalid phone number')

131

return v

132

133

class OrderRequest(BaseModel):

134

customer: Customer

135

items: List[str] = Field(min_items=1)

136

total_amount: float = Field(gt=0)

137

currency: str = "USD"

138

priority: int = Field(ge=1, le=5, default=3)

139

140

class OrderResult(BaseModel):

141

order_id: str = Field(default_factory=lambda: str(uuid4()))

142

status: str = "pending"

143

created_at: datetime = Field(default_factory=datetime.utcnow)

144

estimated_delivery: datetime

145

tracking_number: Optional[str] = None

146

147

@activity.defn

148

async def validate_customer(customer: Customer) -> bool:

149

"""Activity automatically receives validated Customer model."""

150

# Customer is already validated by Pydantic

151

print(f"Processing order for {customer.name} at {customer.address.city}")

152

153

# Simulate validation logic

154

if customer.email.endswith('@blocked.com'):

155

return False

156

return True

157

158

@activity.defn

159

async def process_payment(amount: float, currency: str) -> str:

160

"""Simple activity with basic types."""

161

# Simulate payment processing

162

return f"payment-{uuid4()}"

163

164

@workflow.defn

165

class OrderProcessingWorkflow:

166

@workflow.run

167

async def run(self, request: OrderRequest) -> OrderResult:

168

"""Workflow automatically receives validated OrderRequest model."""

169

170

# All Pydantic validation happens automatically

171

workflow.logger.info(f"Processing order for customer: {request.customer.name}")

172

173

# Validate customer

174

is_valid = await workflow.execute_activity(

175

validate_customer,

176

request.customer, # Automatically serialized

177

schedule_to_close_timeout=timedelta(minutes=1)

178

)

179

180

if not is_valid:

181

raise workflow.ApplicationError("Customer validation failed", type="ValidationError")

182

183

# Process payment

184

payment_id = await workflow.execute_activity(

185

process_payment,

186

request.total_amount,

187

request.currency,

188

schedule_to_close_timeout=timedelta(minutes=5)

189

)

190

191

# Return validated result

192

return OrderResult(

193

status="confirmed",

194

estimated_delivery=datetime.utcnow() + timedelta(days=3),

195

tracking_number=f"TRK-{payment_id[-8:]}"

196

)

197

198

# Execute workflow with automatic validation

199

async def main():

200

client = await Client.connect(

201

"localhost:7233",

202

data_converter=pydantic_data_converter

203

)

204

205

# Create request with automatic validation

206

request = OrderRequest(

207

customer=Customer(

208

name="John Doe",

209

email="john@example.com",

210

phone="555-123-4567",

211

address=Address(

212

street="123 Main St",

213

city="Anytown",

214

state="CA",

215

zip_code="90210"

216

)

217

),

218

items=["laptop", "mouse", "keyboard"],

219

total_amount=1299.99

220

)

221

222

# Execute workflow - automatic serialization/deserialization

223

result = await client.execute_workflow(

224

OrderProcessingWorkflow.run,

225

request,

226

id="order-12345",

227

task_queue="my-queue"

228

)

229

230

print(f"Order created: {result.order_id}, Status: {result.status}")

231

```

232

233

### Custom Serialization Options

234

235

Configure JSON serialization behavior for specific use cases.

236

237

```python

238

from temporalio.contrib.pydantic import (

239

PydanticPayloadConverter,

240

ToJsonOptions,

241

pydantic_data_converter

242

)

243

from temporalio.converter import DataConverter

244

from pydantic import BaseModel

245

from typing import Optional

246

247

class UserSettings(BaseModel):

248

theme: str = "light"

249

notifications: bool = True

250

language: str = "en"

251

beta_features: Optional[bool] = None

252

253

# Custom converter that excludes unset fields

254

custom_converter = DataConverter(

255

payload_converter_class=lambda: PydanticPayloadConverter(

256

to_json_options=ToJsonOptions(exclude_unset=True)

257

)

258

)

259

260

# Create client with custom converter

261

client = await Client.connect(

262

"localhost:7233",

263

data_converter=custom_converter

264

)

265

266

# When serialized, unset fields will be excluded from JSON

267

settings = UserSettings(theme="dark") # notifications and language use defaults, beta_features is None

268

# JSON output: {"theme": "dark", "notifications": true, "language": "en"}

269

# beta_features is excluded because it's unset

270

```

271

272

### Error Handling and Validation

273

274

Pydantic validation errors are automatically handled during deserialization.

275

276

```python

277

from pydantic import BaseModel, ValidationError, Field

278

from temporalio import workflow

279

from temporalio.exceptions import ApplicationError

280

281

class StrictModel(BaseModel):

282

required_field: str = Field(min_length=1)

283

numeric_field: int = Field(gt=0)

284

285

@workflow.defn

286

class ValidationWorkflow:

287

@workflow.run

288

async def run(self, data: StrictModel) -> str:

289

# If invalid data is passed, Pydantic will raise ValidationError

290

# This happens automatically during deserialization

291

return f"Processed: {data.required_field}"

292

293

# This will work

294

valid_data = StrictModel(required_field="valid", numeric_field=42)

295

296

# This would cause a ValidationError during workflow execution

297

# invalid_data = StrictModel(required_field="", numeric_field=-1)

298

```

299

300

## Supported Types

301

302

The Pydantic data converter supports all types handled by Pydantic v2:

303

304

### Standard Library Types

305

- All JSON-serializable types (`str`, `int`, `float`, `bool`, `list`, `dict`, etc.)

306

- `datetime`, `date`, `time`, `timedelta`

307

- `UUID`, `Decimal`, `Path`, `IPv4Address`, `IPv6Address`

308

- `set`, `frozenset`, `deque`

309

- `Enum` classes

310

- `dataclass` instances

311

312

### Pydantic Features

313

- `BaseModel` subclasses with validation

314

- Generic models

315

- Discriminated unions

316

- Custom validators and serializers

317

- Field constraints and validation

318

- Nested models and complex structures

319

320

### Custom Types

321

Any type that Pydantic can serialize/deserialize, including:

322

- Custom classes with `__pydantic_serializer__`

323

- Types with custom JSON encoders

324

- Complex nested structures

325

326

## Best Practices

327

328

### Model Design

329

```python

330

from pydantic import BaseModel, Field

331

from datetime import datetime

332

from typing import Optional

333

from uuid import uuid4

334

335

class WorkflowInput(BaseModel):

336

# Use default factories for generated fields

337

request_id: str = Field(default_factory=lambda: str(uuid4()))

338

timestamp: datetime = Field(default_factory=datetime.utcnow)

339

340

# Add validation constraints

341

user_id: str = Field(min_length=1, max_length=50)

342

amount: float = Field(gt=0, description="Amount must be positive")

343

344

# Optional fields with defaults

345

priority: int = Field(default=1, ge=1, le=5)

346

metadata: Optional[dict] = None

347

348

class Config:

349

# Generate JSON schema for documentation

350

schema_extra = {

351

"example": {

352

"user_id": "user123",

353

"amount": 99.99,

354

"priority": 2

355

}

356

}

357

```

358

359

### Error Handling

360

```python

361

@workflow.defn

362

class SafeWorkflow:

363

@workflow.run

364

async def run(self, input_data: MyModel) -> dict:

365

try:

366

# Process the validated model

367

result = await self.process_data(input_data)

368

return {"success": True, "result": result}

369

except Exception as e:

370

# Log validation or processing errors

371

workflow.logger.error(f"Workflow failed: {e}")

372

return {"success": False, "error": str(e)}

373

```

374

375

### Performance Considerations

376

- Use `exclude_unset=True` for large models to reduce payload size

377

- Consider using `Field(exclude=True)` for sensitive data that shouldn't be serialized

378

- For high-throughput workflows, validate that model complexity doesn't impact performance

379

380

## Migration from Standard JSON

381

382

Migrating from the default JSON converter to Pydantic is straightforward:

383

384

```python

385

# Before: using default converter

386

from temporalio.client import Client

387

388

client = await Client.connect("localhost:7233")

389

390

# After: using Pydantic converter

391

from temporalio.client import Client

392

from temporalio.contrib.pydantic import pydantic_data_converter

393

394

client = await Client.connect(

395

"localhost:7233",

396

data_converter=pydantic_data_converter

397

)

398

399

# Your existing dict-based payloads will continue to work

400

# New Pydantic models provide additional validation and type safety

401

```