CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-temporalio

Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.

Overview
Eval results
Files

contrib-pydantic.mddocs/

Pydantic Integration

Pydantic v2 data converter for seamless serialization and validation of Pydantic models in Temporal workflows and activities. This integration provides automatic JSON conversion with type validation for all Pydantic-supported types.

Capabilities

Pydantic Data Converter

Ready-to-use data converter that automatically handles Pydantic models and types.

pydantic_data_converter = DataConverter(
    payload_converter_class=PydanticPayloadConverter
)
"""Pydantic data converter.

Supports conversion of all types supported by Pydantic to and from JSON.

In addition to Pydantic models, these include all `json.dump`-able types,
various non-`json.dump`-able standard library types such as dataclasses,
types from the datetime module, sets, UUID, etc, and custom types composed
of any of these.

To use, pass as the ``data_converter`` argument of :py:class:`temporalio.client.Client`
"""

Payload Converters

Specialized payload converters for Pydantic JSON serialization with customizable options.

@dataclass
class ToJsonOptions:
    exclude_unset: bool = False

class PydanticJSONPlainPayloadConverter(EncodingPayloadConverter):
    def __init__(self, to_json_options: Optional[ToJsonOptions] = None): ...

    @property
    def encoding(self) -> str: ...

    def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]: ...

    def from_payload(
        self,
        payload: temporalio.api.common.v1.Payload,
        type_hint: Optional[Type] = None,
    ) -> Any: ...

class PydanticPayloadConverter(CompositePayloadConverter):
    def __init__(self, to_json_options: Optional[ToJsonOptions] = None): ...

Usage Examples

Basic Usage

Using the pre-configured Pydantic data converter with clients and workers.

from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.client import Client
from temporalio.worker import Worker
from pydantic import BaseModel
from datetime import datetime
from typing import Optional

# Define Pydantic models
class UserProfile(BaseModel):
    user_id: str
    email: str
    full_name: str
    created_at: datetime
    is_active: bool = True
    metadata: Optional[dict] = None

class OrderData(BaseModel):
    order_id: str
    user: UserProfile
    items: list[str]
    total_amount: float
    order_date: datetime

# Create client with Pydantic data converter
client = await Client.connect(
    "localhost:7233",
    data_converter=pydantic_data_converter
)

# Create worker with Pydantic data converter
worker = Worker(
    client,
    task_queue="my-queue",
    workflows=[OrderWorkflow],
    activities=[process_order],
    data_converter=pydantic_data_converter
)

Workflow and Activity with Pydantic Models

Automatic serialization and validation of complex data structures.

from temporalio import workflow, activity
from temporalio.contrib.pydantic import pydantic_data_converter
from pydantic import BaseModel, Field, validator
from datetime import datetime, timedelta
from typing import List, Optional
from uuid import uuid4

class Address(BaseModel):
    street: str
    city: str
    state: str
    zip_code: str = Field(min_length=5, max_length=10)
    country: str = "US"

class Customer(BaseModel):
    customer_id: str = Field(default_factory=lambda: str(uuid4()))
    name: str = Field(min_length=1, max_length=100)
    email: str = Field(regex=r'^[^@]+@[^@]+\.[^@]+$')
    phone: Optional[str] = None
    address: Address
    created_at: datetime = Field(default_factory=datetime.utcnow)

    @validator('phone')
    def validate_phone(cls, v):
        if v and not v.replace('-', '').replace(' ', '').isdigit():
            raise ValueError('Invalid phone number')
        return v

class OrderRequest(BaseModel):
    customer: Customer
    items: List[str] = Field(min_items=1)
    total_amount: float = Field(gt=0)
    currency: str = "USD"
    priority: int = Field(ge=1, le=5, default=3)

class OrderResult(BaseModel):
    order_id: str = Field(default_factory=lambda: str(uuid4()))
    status: str = "pending"
    created_at: datetime = Field(default_factory=datetime.utcnow)
    estimated_delivery: datetime
    tracking_number: Optional[str] = None

@activity.defn
async def validate_customer(customer: Customer) -> bool:
    """Activity automatically receives validated Customer model."""
    # Customer is already validated by Pydantic
    print(f"Processing order for {customer.name} at {customer.address.city}")

    # Simulate validation logic
    if customer.email.endswith('@blocked.com'):
        return False
    return True

@activity.defn
async def process_payment(amount: float, currency: str) -> str:
    """Simple activity with basic types."""
    # Simulate payment processing
    return f"payment-{uuid4()}"

@workflow.defn
class OrderProcessingWorkflow:
    @workflow.run
    async def run(self, request: OrderRequest) -> OrderResult:
        """Workflow automatically receives validated OrderRequest model."""

        # All Pydantic validation happens automatically
        workflow.logger.info(f"Processing order for customer: {request.customer.name}")

        # Validate customer
        is_valid = await workflow.execute_activity(
            validate_customer,
            request.customer,  # Automatically serialized
            schedule_to_close_timeout=timedelta(minutes=1)
        )

        if not is_valid:
            raise workflow.ApplicationError("Customer validation failed", type="ValidationError")

        # Process payment
        payment_id = await workflow.execute_activity(
            process_payment,
            request.total_amount,
            request.currency,
            schedule_to_close_timeout=timedelta(minutes=5)
        )

        # Return validated result
        return OrderResult(
            status="confirmed",
            estimated_delivery=datetime.utcnow() + timedelta(days=3),
            tracking_number=f"TRK-{payment_id[-8:]}"
        )

# Execute workflow with automatic validation
async def main():
    client = await Client.connect(
        "localhost:7233",
        data_converter=pydantic_data_converter
    )

    # Create request with automatic validation
    request = OrderRequest(
        customer=Customer(
            name="John Doe",
            email="john@example.com",
            phone="555-123-4567",
            address=Address(
                street="123 Main St",
                city="Anytown",
                state="CA",
                zip_code="90210"
            )
        ),
        items=["laptop", "mouse", "keyboard"],
        total_amount=1299.99
    )

    # Execute workflow - automatic serialization/deserialization
    result = await client.execute_workflow(
        OrderProcessingWorkflow.run,
        request,
        id="order-12345",
        task_queue="my-queue"
    )

    print(f"Order created: {result.order_id}, Status: {result.status}")

Custom Serialization Options

Configure JSON serialization behavior for specific use cases.

from temporalio.contrib.pydantic import (
    PydanticPayloadConverter,
    ToJsonOptions,
    pydantic_data_converter
)
from temporalio.converter import DataConverter
from pydantic import BaseModel
from typing import Optional

class UserSettings(BaseModel):
    theme: str = "light"
    notifications: bool = True
    language: str = "en"
    beta_features: Optional[bool] = None

# Custom converter that excludes unset fields
custom_converter = DataConverter(
    payload_converter_class=lambda: PydanticPayloadConverter(
        to_json_options=ToJsonOptions(exclude_unset=True)
    )
)

# Create client with custom converter
client = await Client.connect(
    "localhost:7233",
    data_converter=custom_converter
)

# When serialized, unset fields will be excluded from JSON
settings = UserSettings(theme="dark")  # notifications and language use defaults, beta_features is None
# JSON output: {"theme": "dark", "notifications": true, "language": "en"}
# beta_features is excluded because it's unset

Error Handling and Validation

Pydantic validation errors are automatically handled during deserialization.

from pydantic import BaseModel, ValidationError, Field
from temporalio import workflow
from temporalio.exceptions import ApplicationError

class StrictModel(BaseModel):
    required_field: str = Field(min_length=1)
    numeric_field: int = Field(gt=0)

@workflow.defn
class ValidationWorkflow:
    @workflow.run
    async def run(self, data: StrictModel) -> str:
        # If invalid data is passed, Pydantic will raise ValidationError
        # This happens automatically during deserialization
        return f"Processed: {data.required_field}"

# This will work
valid_data = StrictModel(required_field="valid", numeric_field=42)

# This would cause a ValidationError during workflow execution
# invalid_data = StrictModel(required_field="", numeric_field=-1)

Supported Types

The Pydantic data converter supports all types handled by Pydantic v2:

Standard Library Types

  • All JSON-serializable types (str, int, float, bool, list, dict, etc.)
  • datetime, date, time, timedelta
  • UUID, Decimal, Path, IPv4Address, IPv6Address
  • set, frozenset, deque
  • Enum classes
  • dataclass instances

Pydantic Features

  • BaseModel subclasses with validation
  • Generic models
  • Discriminated unions
  • Custom validators and serializers
  • Field constraints and validation
  • Nested models and complex structures

Custom Types

Any type that Pydantic can serialize/deserialize, including:

  • Custom classes with __pydantic_serializer__
  • Types with custom JSON encoders
  • Complex nested structures

Best Practices

Model Design

from pydantic import BaseModel, Field
from datetime import datetime
from typing import Optional
from uuid import uuid4

class WorkflowInput(BaseModel):
    # Use default factories for generated fields
    request_id: str = Field(default_factory=lambda: str(uuid4()))
    timestamp: datetime = Field(default_factory=datetime.utcnow)

    # Add validation constraints
    user_id: str = Field(min_length=1, max_length=50)
    amount: float = Field(gt=0, description="Amount must be positive")

    # Optional fields with defaults
    priority: int = Field(default=1, ge=1, le=5)
    metadata: Optional[dict] = None

    class Config:
        # Generate JSON schema for documentation
        schema_extra = {
            "example": {
                "user_id": "user123",
                "amount": 99.99,
                "priority": 2
            }
        }

Error Handling

@workflow.defn
class SafeWorkflow:
    @workflow.run
    async def run(self, input_data: MyModel) -> dict:
        try:
            # Process the validated model
            result = await self.process_data(input_data)
            return {"success": True, "result": result}
        except Exception as e:
            # Log validation or processing errors
            workflow.logger.error(f"Workflow failed: {e}")
            return {"success": False, "error": str(e)}

Performance Considerations

  • Use exclude_unset=True for large models to reduce payload size
  • Consider using Field(exclude=True) for sensitive data that shouldn't be serialized
  • For high-throughput workflows, validate that model complexity doesn't impact performance

Migration from Standard JSON

Migrating from the default JSON converter to Pydantic is straightforward:

# Before: using default converter
from temporalio.client import Client

client = await Client.connect("localhost:7233")

# After: using Pydantic converter
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter

client = await Client.connect(
    "localhost:7233",
    data_converter=pydantic_data_converter
)

# Your existing dict-based payloads will continue to work
# New Pydantic models provide additional validation and type safety

Install with Tessl CLI

npx tessl i tessl/pypi-temporalio

docs

activity.md

client.md

common.md

contrib-pydantic.md

data-conversion.md

exceptions.md

index.md

runtime.md

testing.md

worker.md

workflow.md

tile.json