Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.
Pydantic v2 data converter for seamless serialization and validation of Pydantic models in Temporal workflows and activities. This integration provides automatic JSON conversion with type validation for all Pydantic-supported types.
Ready-to-use data converter that automatically handles Pydantic models and types.
pydantic_data_converter = DataConverter(
payload_converter_class=PydanticPayloadConverter
)
"""Pydantic data converter.
Supports conversion of all types supported by Pydantic to and from JSON.
In addition to Pydantic models, these include all `json.dump`-able types,
various non-`json.dump`-able standard library types such as dataclasses,
types from the datetime module, sets, UUID, etc, and custom types composed
of any of these.
To use, pass as the ``data_converter`` argument of :py:class:`temporalio.client.Client`
"""Specialized payload converters for Pydantic JSON serialization with customizable options.
@dataclass
class ToJsonOptions:
exclude_unset: bool = False
class PydanticJSONPlainPayloadConverter(EncodingPayloadConverter):
def __init__(self, to_json_options: Optional[ToJsonOptions] = None): ...
@property
def encoding(self) -> str: ...
def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]: ...
def from_payload(
self,
payload: temporalio.api.common.v1.Payload,
type_hint: Optional[Type] = None,
) -> Any: ...
class PydanticPayloadConverter(CompositePayloadConverter):
def __init__(self, to_json_options: Optional[ToJsonOptions] = None): ...Using the pre-configured Pydantic data converter with clients and workers.
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.client import Client
from temporalio.worker import Worker
from pydantic import BaseModel
from datetime import datetime
from typing import Optional
# Define Pydantic models
class UserProfile(BaseModel):
user_id: str
email: str
full_name: str
created_at: datetime
is_active: bool = True
metadata: Optional[dict] = None
class OrderData(BaseModel):
order_id: str
user: UserProfile
items: list[str]
total_amount: float
order_date: datetime
# Create client with Pydantic data converter
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter
)
# Create worker with Pydantic data converter
worker = Worker(
client,
task_queue="my-queue",
workflows=[OrderWorkflow],
activities=[process_order],
data_converter=pydantic_data_converter
)Automatic serialization and validation of complex data structures.
from temporalio import workflow, activity
from temporalio.contrib.pydantic import pydantic_data_converter
from pydantic import BaseModel, Field, validator
from datetime import datetime, timedelta
from typing import List, Optional
from uuid import uuid4
class Address(BaseModel):
street: str
city: str
state: str
zip_code: str = Field(min_length=5, max_length=10)
country: str = "US"
class Customer(BaseModel):
customer_id: str = Field(default_factory=lambda: str(uuid4()))
name: str = Field(min_length=1, max_length=100)
email: str = Field(regex=r'^[^@]+@[^@]+\.[^@]+$')
phone: Optional[str] = None
address: Address
created_at: datetime = Field(default_factory=datetime.utcnow)
@validator('phone')
def validate_phone(cls, v):
if v and not v.replace('-', '').replace(' ', '').isdigit():
raise ValueError('Invalid phone number')
return v
class OrderRequest(BaseModel):
customer: Customer
items: List[str] = Field(min_items=1)
total_amount: float = Field(gt=0)
currency: str = "USD"
priority: int = Field(ge=1, le=5, default=3)
class OrderResult(BaseModel):
order_id: str = Field(default_factory=lambda: str(uuid4()))
status: str = "pending"
created_at: datetime = Field(default_factory=datetime.utcnow)
estimated_delivery: datetime
tracking_number: Optional[str] = None
@activity.defn
async def validate_customer(customer: Customer) -> bool:
"""Activity automatically receives validated Customer model."""
# Customer is already validated by Pydantic
print(f"Processing order for {customer.name} at {customer.address.city}")
# Simulate validation logic
if customer.email.endswith('@blocked.com'):
return False
return True
@activity.defn
async def process_payment(amount: float, currency: str) -> str:
"""Simple activity with basic types."""
# Simulate payment processing
return f"payment-{uuid4()}"
@workflow.defn
class OrderProcessingWorkflow:
@workflow.run
async def run(self, request: OrderRequest) -> OrderResult:
"""Workflow automatically receives validated OrderRequest model."""
# All Pydantic validation happens automatically
workflow.logger.info(f"Processing order for customer: {request.customer.name}")
# Validate customer
is_valid = await workflow.execute_activity(
validate_customer,
request.customer, # Automatically serialized
schedule_to_close_timeout=timedelta(minutes=1)
)
if not is_valid:
raise workflow.ApplicationError("Customer validation failed", type="ValidationError")
# Process payment
payment_id = await workflow.execute_activity(
process_payment,
request.total_amount,
request.currency,
schedule_to_close_timeout=timedelta(minutes=5)
)
# Return validated result
return OrderResult(
status="confirmed",
estimated_delivery=datetime.utcnow() + timedelta(days=3),
tracking_number=f"TRK-{payment_id[-8:]}"
)
# Execute workflow with automatic validation
async def main():
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter
)
# Create request with automatic validation
request = OrderRequest(
customer=Customer(
name="John Doe",
email="john@example.com",
phone="555-123-4567",
address=Address(
street="123 Main St",
city="Anytown",
state="CA",
zip_code="90210"
)
),
items=["laptop", "mouse", "keyboard"],
total_amount=1299.99
)
# Execute workflow - automatic serialization/deserialization
result = await client.execute_workflow(
OrderProcessingWorkflow.run,
request,
id="order-12345",
task_queue="my-queue"
)
print(f"Order created: {result.order_id}, Status: {result.status}")Configure JSON serialization behavior for specific use cases.
from temporalio.contrib.pydantic import (
PydanticPayloadConverter,
ToJsonOptions,
pydantic_data_converter
)
from temporalio.converter import DataConverter
from pydantic import BaseModel
from typing import Optional
class UserSettings(BaseModel):
theme: str = "light"
notifications: bool = True
language: str = "en"
beta_features: Optional[bool] = None
# Custom converter that excludes unset fields
custom_converter = DataConverter(
payload_converter_class=lambda: PydanticPayloadConverter(
to_json_options=ToJsonOptions(exclude_unset=True)
)
)
# Create client with custom converter
client = await Client.connect(
"localhost:7233",
data_converter=custom_converter
)
# When serialized, unset fields will be excluded from JSON
settings = UserSettings(theme="dark") # notifications and language use defaults, beta_features is None
# JSON output: {"theme": "dark", "notifications": true, "language": "en"}
# beta_features is excluded because it's unsetPydantic validation errors are automatically handled during deserialization.
from pydantic import BaseModel, ValidationError, Field
from temporalio import workflow
from temporalio.exceptions import ApplicationError
class StrictModel(BaseModel):
required_field: str = Field(min_length=1)
numeric_field: int = Field(gt=0)
@workflow.defn
class ValidationWorkflow:
@workflow.run
async def run(self, data: StrictModel) -> str:
# If invalid data is passed, Pydantic will raise ValidationError
# This happens automatically during deserialization
return f"Processed: {data.required_field}"
# This will work
valid_data = StrictModel(required_field="valid", numeric_field=42)
# This would cause a ValidationError during workflow execution
# invalid_data = StrictModel(required_field="", numeric_field=-1)The Pydantic data converter supports all types handled by Pydantic v2:
str, int, float, bool, list, dict, etc.)datetime, date, time, timedeltaUUID, Decimal, Path, IPv4Address, IPv6Addressset, frozenset, dequeEnum classesdataclass instancesBaseModel subclasses with validationAny type that Pydantic can serialize/deserialize, including:
__pydantic_serializer__from pydantic import BaseModel, Field
from datetime import datetime
from typing import Optional
from uuid import uuid4
class WorkflowInput(BaseModel):
# Use default factories for generated fields
request_id: str = Field(default_factory=lambda: str(uuid4()))
timestamp: datetime = Field(default_factory=datetime.utcnow)
# Add validation constraints
user_id: str = Field(min_length=1, max_length=50)
amount: float = Field(gt=0, description="Amount must be positive")
# Optional fields with defaults
priority: int = Field(default=1, ge=1, le=5)
metadata: Optional[dict] = None
class Config:
# Generate JSON schema for documentation
schema_extra = {
"example": {
"user_id": "user123",
"amount": 99.99,
"priority": 2
}
}@workflow.defn
class SafeWorkflow:
@workflow.run
async def run(self, input_data: MyModel) -> dict:
try:
# Process the validated model
result = await self.process_data(input_data)
return {"success": True, "result": result}
except Exception as e:
# Log validation or processing errors
workflow.logger.error(f"Workflow failed: {e}")
return {"success": False, "error": str(e)}exclude_unset=True for large models to reduce payload sizeField(exclude=True) for sensitive data that shouldn't be serializedMigrating from the default JSON converter to Pydantic is straightforward:
# Before: using default converter
from temporalio.client import Client
client = await Client.connect("localhost:7233")
# After: using Pydantic converter
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter
)
# Your existing dict-based payloads will continue to work
# New Pydantic models provide additional validation and type safetyInstall with Tessl CLI
npx tessl i tessl/pypi-temporalio