0
# Pydantic Integration
1
2
Pydantic v2 data converter for seamless serialization and validation of Pydantic models in Temporal workflows and activities. This integration provides automatic JSON conversion with type validation for all Pydantic-supported types.
3
4
## Capabilities
5
6
### Pydantic Data Converter
7
8
Ready-to-use data converter that automatically handles Pydantic models and types.
9
10
```python { .api }
11
pydantic_data_converter = DataConverter(
12
payload_converter_class=PydanticPayloadConverter
13
)
14
"""Pydantic data converter.
15
16
Supports conversion of all types supported by Pydantic to and from JSON.
17
18
In addition to Pydantic models, these include all `json.dump`-able types,
19
various non-`json.dump`-able standard library types such as dataclasses,
20
types from the datetime module, sets, UUID, etc, and custom types composed
21
of any of these.
22
23
To use, pass as the ``data_converter`` argument of :py:class:`temporalio.client.Client`
24
"""
25
```
26
27
### Payload Converters
28
29
Specialized payload converters for Pydantic JSON serialization with customizable options.
30
31
```python { .api }
32
@dataclass
33
class ToJsonOptions:
34
exclude_unset: bool = False
35
36
class PydanticJSONPlainPayloadConverter(EncodingPayloadConverter):
37
def __init__(self, to_json_options: Optional[ToJsonOptions] = None): ...
38
39
@property
40
def encoding(self) -> str: ...
41
42
def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]: ...
43
44
def from_payload(
45
self,
46
payload: temporalio.api.common.v1.Payload,
47
type_hint: Optional[Type] = None,
48
) -> Any: ...
49
50
class PydanticPayloadConverter(CompositePayloadConverter):
51
def __init__(self, to_json_options: Optional[ToJsonOptions] = None): ...
52
```
53
54
## Usage Examples
55
56
### Basic Usage
57
58
Using the pre-configured Pydantic data converter with clients and workers.
59
60
```python
61
from temporalio.contrib.pydantic import pydantic_data_converter
62
from temporalio.client import Client
63
from temporalio.worker import Worker
64
from pydantic import BaseModel
65
from datetime import datetime
66
from typing import Optional
67
68
# Define Pydantic models
69
class UserProfile(BaseModel):
70
user_id: str
71
email: str
72
full_name: str
73
created_at: datetime
74
is_active: bool = True
75
metadata: Optional[dict] = None
76
77
class OrderData(BaseModel):
78
order_id: str
79
user: UserProfile
80
items: list[str]
81
total_amount: float
82
order_date: datetime
83
84
# Create client with Pydantic data converter
85
client = await Client.connect(
86
"localhost:7233",
87
data_converter=pydantic_data_converter
88
)
89
90
# Create worker with Pydantic data converter
91
worker = Worker(
92
client,
93
task_queue="my-queue",
94
workflows=[OrderWorkflow],
95
activities=[process_order],
96
data_converter=pydantic_data_converter
97
)
98
```
99
100
### Workflow and Activity with Pydantic Models
101
102
Automatic serialization and validation of complex data structures.
103
104
```python
105
from temporalio import workflow, activity
106
from temporalio.contrib.pydantic import pydantic_data_converter
107
from pydantic import BaseModel, Field, validator
108
from datetime import datetime, timedelta
109
from typing import List, Optional
110
from uuid import uuid4
111
112
class Address(BaseModel):
113
street: str
114
city: str
115
state: str
116
zip_code: str = Field(min_length=5, max_length=10)
117
country: str = "US"
118
119
class Customer(BaseModel):
120
customer_id: str = Field(default_factory=lambda: str(uuid4()))
121
name: str = Field(min_length=1, max_length=100)
122
email: str = Field(regex=r'^[^@]+@[^@]+\.[^@]+$')
123
phone: Optional[str] = None
124
address: Address
125
created_at: datetime = Field(default_factory=datetime.utcnow)
126
127
@validator('phone')
128
def validate_phone(cls, v):
129
if v and not v.replace('-', '').replace(' ', '').isdigit():
130
raise ValueError('Invalid phone number')
131
return v
132
133
class OrderRequest(BaseModel):
134
customer: Customer
135
items: List[str] = Field(min_items=1)
136
total_amount: float = Field(gt=0)
137
currency: str = "USD"
138
priority: int = Field(ge=1, le=5, default=3)
139
140
class OrderResult(BaseModel):
141
order_id: str = Field(default_factory=lambda: str(uuid4()))
142
status: str = "pending"
143
created_at: datetime = Field(default_factory=datetime.utcnow)
144
estimated_delivery: datetime
145
tracking_number: Optional[str] = None
146
147
@activity.defn
148
async def validate_customer(customer: Customer) -> bool:
149
"""Activity automatically receives validated Customer model."""
150
# Customer is already validated by Pydantic
151
print(f"Processing order for {customer.name} at {customer.address.city}")
152
153
# Simulate validation logic
154
if customer.email.endswith('@blocked.com'):
155
return False
156
return True
157
158
@activity.defn
159
async def process_payment(amount: float, currency: str) -> str:
160
"""Simple activity with basic types."""
161
# Simulate payment processing
162
return f"payment-{uuid4()}"
163
164
@workflow.defn
165
class OrderProcessingWorkflow:
166
@workflow.run
167
async def run(self, request: OrderRequest) -> OrderResult:
168
"""Workflow automatically receives validated OrderRequest model."""
169
170
# All Pydantic validation happens automatically
171
workflow.logger.info(f"Processing order for customer: {request.customer.name}")
172
173
# Validate customer
174
is_valid = await workflow.execute_activity(
175
validate_customer,
176
request.customer, # Automatically serialized
177
schedule_to_close_timeout=timedelta(minutes=1)
178
)
179
180
if not is_valid:
181
raise workflow.ApplicationError("Customer validation failed", type="ValidationError")
182
183
# Process payment
184
payment_id = await workflow.execute_activity(
185
process_payment,
186
request.total_amount,
187
request.currency,
188
schedule_to_close_timeout=timedelta(minutes=5)
189
)
190
191
# Return validated result
192
return OrderResult(
193
status="confirmed",
194
estimated_delivery=datetime.utcnow() + timedelta(days=3),
195
tracking_number=f"TRK-{payment_id[-8:]}"
196
)
197
198
# Execute workflow with automatic validation
199
async def main():
200
client = await Client.connect(
201
"localhost:7233",
202
data_converter=pydantic_data_converter
203
)
204
205
# Create request with automatic validation
206
request = OrderRequest(
207
customer=Customer(
208
name="John Doe",
209
email="john@example.com",
210
phone="555-123-4567",
211
address=Address(
212
street="123 Main St",
213
city="Anytown",
214
state="CA",
215
zip_code="90210"
216
)
217
),
218
items=["laptop", "mouse", "keyboard"],
219
total_amount=1299.99
220
)
221
222
# Execute workflow - automatic serialization/deserialization
223
result = await client.execute_workflow(
224
OrderProcessingWorkflow.run,
225
request,
226
id="order-12345",
227
task_queue="my-queue"
228
)
229
230
print(f"Order created: {result.order_id}, Status: {result.status}")
231
```
232
233
### Custom Serialization Options
234
235
Configure JSON serialization behavior for specific use cases.
236
237
```python
238
from temporalio.contrib.pydantic import (
239
PydanticPayloadConverter,
240
ToJsonOptions,
241
pydantic_data_converter
242
)
243
from temporalio.converter import DataConverter
244
from pydantic import BaseModel
245
from typing import Optional
246
247
class UserSettings(BaseModel):
248
theme: str = "light"
249
notifications: bool = True
250
language: str = "en"
251
beta_features: Optional[bool] = None
252
253
# Custom converter that excludes unset fields
254
custom_converter = DataConverter(
255
payload_converter_class=lambda: PydanticPayloadConverter(
256
to_json_options=ToJsonOptions(exclude_unset=True)
257
)
258
)
259
260
# Create client with custom converter
261
client = await Client.connect(
262
"localhost:7233",
263
data_converter=custom_converter
264
)
265
266
# When serialized, unset fields will be excluded from JSON
267
settings = UserSettings(theme="dark") # notifications and language use defaults, beta_features is None
268
# JSON output: {"theme": "dark", "notifications": true, "language": "en"}
269
# beta_features is excluded because it's unset
270
```
271
272
### Error Handling and Validation
273
274
Pydantic validation errors are automatically handled during deserialization.
275
276
```python
277
from pydantic import BaseModel, ValidationError, Field
278
from temporalio import workflow
279
from temporalio.exceptions import ApplicationError
280
281
class StrictModel(BaseModel):
282
required_field: str = Field(min_length=1)
283
numeric_field: int = Field(gt=0)
284
285
@workflow.defn
286
class ValidationWorkflow:
287
@workflow.run
288
async def run(self, data: StrictModel) -> str:
289
# If invalid data is passed, Pydantic will raise ValidationError
290
# This happens automatically during deserialization
291
return f"Processed: {data.required_field}"
292
293
# This will work
294
valid_data = StrictModel(required_field="valid", numeric_field=42)
295
296
# This would cause a ValidationError during workflow execution
297
# invalid_data = StrictModel(required_field="", numeric_field=-1)
298
```
299
300
## Supported Types
301
302
The Pydantic data converter supports all types handled by Pydantic v2:
303
304
### Standard Library Types
305
- All JSON-serializable types (`str`, `int`, `float`, `bool`, `list`, `dict`, etc.)
306
- `datetime`, `date`, `time`, `timedelta`
307
- `UUID`, `Decimal`, `Path`, `IPv4Address`, `IPv6Address`
308
- `set`, `frozenset`, `deque`
309
- `Enum` classes
310
- `dataclass` instances
311
312
### Pydantic Features
313
- `BaseModel` subclasses with validation
314
- Generic models
315
- Discriminated unions
316
- Custom validators and serializers
317
- Field constraints and validation
318
- Nested models and complex structures
319
320
### Custom Types
321
Any type that Pydantic can serialize/deserialize, including:
322
- Custom classes with `__pydantic_serializer__`
323
- Types with custom JSON encoders
324
- Complex nested structures
325
326
## Best Practices
327
328
### Model Design
329
```python
330
from pydantic import BaseModel, Field
331
from datetime import datetime
332
from typing import Optional
333
from uuid import uuid4
334
335
class WorkflowInput(BaseModel):
336
# Use default factories for generated fields
337
request_id: str = Field(default_factory=lambda: str(uuid4()))
338
timestamp: datetime = Field(default_factory=datetime.utcnow)
339
340
# Add validation constraints
341
user_id: str = Field(min_length=1, max_length=50)
342
amount: float = Field(gt=0, description="Amount must be positive")
343
344
# Optional fields with defaults
345
priority: int = Field(default=1, ge=1, le=5)
346
metadata: Optional[dict] = None
347
348
class Config:
349
# Generate JSON schema for documentation
350
schema_extra = {
351
"example": {
352
"user_id": "user123",
353
"amount": 99.99,
354
"priority": 2
355
}
356
}
357
```
358
359
### Error Handling
360
```python
361
@workflow.defn
362
class SafeWorkflow:
363
@workflow.run
364
async def run(self, input_data: MyModel) -> dict:
365
try:
366
# Process the validated model
367
result = await self.process_data(input_data)
368
return {"success": True, "result": result}
369
except Exception as e:
370
# Log validation or processing errors
371
workflow.logger.error(f"Workflow failed: {e}")
372
return {"success": False, "error": str(e)}
373
```
374
375
### Performance Considerations
376
- Use `exclude_unset=True` for large models to reduce payload size
377
- Consider using `Field(exclude=True)` for sensitive data that shouldn't be serialized
378
- For high-throughput workflows, validate that model complexity doesn't impact performance
379
380
## Migration from Standard JSON
381
382
Migrating from the default JSON converter to Pydantic is straightforward:
383
384
```python
385
# Before: using default converter
386
from temporalio.client import Client
387
388
client = await Client.connect("localhost:7233")
389
390
# After: using Pydantic converter
391
from temporalio.client import Client
392
from temporalio.contrib.pydantic import pydantic_data_converter
393
394
client = await Client.connect(
395
"localhost:7233",
396
data_converter=pydantic_data_converter
397
)
398
399
# Your existing dict-based payloads will continue to work
400
# New Pydantic models provide additional validation and type safety
401
```