CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-temporalio

Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.

Overview
Eval results
Files

runtime.mddocs/

Runtime Configuration

Runtime configuration for the Temporal Python SDK, including telemetry, logging, metrics collection, and performance monitoring. The runtime manages the underlying thread pool and provides observability features for Temporal applications.

Capabilities

Runtime Management

Core runtime that manages internal resources and telemetry configuration for clients and workers.

class Runtime:
    @staticmethod
    def default() -> Runtime: ...

    @staticmethod
    def set_default(runtime: Runtime, *, error_if_already_set: bool = True) -> None: ...

    def __init__(self, *, telemetry: TelemetryConfig) -> None: ...

    @property
    def metric_meter(self) -> temporalio.common.MetricMeter: ...

Usage Examples

from temporalio.runtime import Runtime, TelemetryConfig, LoggingConfig, PrometheusConfig

# Use default runtime
runtime = Runtime.default()

# Create custom runtime with metrics
telemetry = TelemetryConfig(
    metrics=PrometheusConfig(bind_address="0.0.0.0:9090")
)
custom_runtime = Runtime(telemetry=telemetry)

# Set as default before creating clients/workers
Runtime.set_default(custom_runtime)

# Access metrics from runtime
meter = runtime.metric_meter
counter = meter.create_counter("workflow_count", "Number of workflows")

Telemetry Configuration

Comprehensive telemetry configuration for logging, metrics, and observability.

@dataclass(frozen=True)
class TelemetryConfig:
    logging: Optional[LoggingConfig] = LoggingConfig.default
    metrics: Optional[Union[OpenTelemetryConfig, PrometheusConfig, MetricBuffer]] = None
    global_tags: Mapping[str, str] = field(default_factory=dict)
    attach_service_name: bool = True
    metric_prefix: Optional[str] = None

Logging Configuration

Configure logging output from the Temporal Core and Python SDK components.

@dataclass(frozen=True)
class LoggingConfig:
    filter: Union[TelemetryFilter, str]
    forwarding: Optional[LogForwardingConfig] = None

    default: ClassVar[LoggingConfig]

@dataclass
class TelemetryFilter:
    core_level: str
    other_level: str

    def formatted(self) -> str: ...

@dataclass
class LogForwardingConfig:
    logger: logging.Logger
    append_target_to_name: bool = True
    prepend_target_on_message: bool = True
    overwrite_log_record_time: bool = True
    append_log_fields_to_message: bool = True

Usage Examples

import logging
from temporalio.runtime import (
    TelemetryConfig,
    LoggingConfig,
    TelemetryFilter,
    LogForwardingConfig
)

# Basic logging configuration
logging_config = LoggingConfig(
    filter=TelemetryFilter(core_level="INFO", other_level="WARN")
)

# Forward Core logs to Python logger
logger = logging.getLogger("temporal.core")
logger.setLevel(logging.INFO)

forward_config = LogForwardingConfig(
    logger=logger,
    append_target_to_name=True,
    prepend_target_on_message=True
)

logging_config = LoggingConfig(
    filter="INFO",
    forwarding=forward_config
)

telemetry = TelemetryConfig(logging=logging_config)

Metrics Configuration

Multiple options for metrics collection including OpenTelemetry, Prometheus, and in-memory buffering.

Prometheus Metrics

Export metrics to Prometheus-compatible endpoints.

@dataclass(frozen=True)
class PrometheusConfig:
    bind_address: str
    counters_total_suffix: bool = False
    unit_suffix: bool = False
    durations_as_seconds: bool = False
    histogram_bucket_overrides: Optional[Mapping[str, Sequence[float]]] = None

OpenTelemetry Metrics

Export metrics to OpenTelemetry collectors.

class OpenTelemetryMetricTemporality(Enum):
    CUMULATIVE = 1
    DELTA = 2

@dataclass(frozen=True)
class OpenTelemetryConfig:
    url: str
    headers: Optional[Mapping[str, str]] = None
    metric_periodicity: Optional[timedelta] = None
    metric_temporality: OpenTelemetryMetricTemporality = OpenTelemetryMetricTemporality.CUMULATIVE
    durations_as_seconds: bool = False
    http: bool = False

Metrics Buffer

In-memory metrics collection for testing and custom processing.

class MetricBufferDurationFormat(Enum):
    MILLISECONDS = 1
    SECONDS = 2

class MetricBuffer:
    def __init__(
        self,
        buffer_size: int,
        duration_format: MetricBufferDurationFormat = MetricBufferDurationFormat.MILLISECONDS,
    ): ...

    def retrieve_updates(self) -> Sequence[BufferedMetricUpdate]: ...

class BufferedMetric(Protocol):
    @property
    def name(self) -> str: ...

    @property
    def description(self) -> Optional[str]: ...

    @property
    def unit(self) -> Optional[str]: ...

    @property
    def kind(self) -> BufferedMetricKind: ...

class BufferedMetricUpdate(Protocol):
    @property
    def metric(self) -> BufferedMetric: ...

    @property
    def value(self) -> Union[int, float]: ...

    @property
    def attributes(self) -> temporalio.common.MetricAttributes: ...

Usage Examples

from temporalio.runtime import (
    Runtime,
    TelemetryConfig,
    PrometheusConfig,
    OpenTelemetryConfig,
    MetricBuffer,
    MetricBufferDurationFormat,
    OpenTelemetryMetricTemporality
)
from datetime import timedelta

# Prometheus metrics
prometheus_config = PrometheusConfig(
    bind_address="0.0.0.0:9090",
    counters_total_suffix=True,
    durations_as_seconds=True,
    histogram_bucket_overrides={
        "temporal_workflow_task_execution_time": [0.1, 0.5, 1.0, 5.0, 10.0]
    }
)

# OpenTelemetry metrics
otel_config = OpenTelemetryConfig(
    url="http://localhost:4317",
    headers={"Authorization": "Bearer token"},
    metric_periodicity=timedelta(seconds=10),
    metric_temporality=OpenTelemetryMetricTemporality.DELTA,
    durations_as_seconds=True
)

# Buffered metrics for testing
buffer = MetricBuffer(
    buffer_size=10000,
    duration_format=MetricBufferDurationFormat.SECONDS
)

# Use in telemetry configuration
telemetry = TelemetryConfig(
    metrics=prometheus_config,  # or otel_config or buffer
    global_tags={"service": "my-service", "environment": "production"},
    metric_prefix="myapp_temporal_"
)

runtime = Runtime(telemetry=telemetry)

# For buffered metrics, retrieve updates periodically
if isinstance(telemetry.metrics, MetricBuffer):
    updates = buffer.retrieve_updates()
    for update in updates:
        print(f"Metric: {update.metric.name}, Value: {update.value}")

Advanced Configuration

Global tags, service naming, and metric prefixing for better observability organization.

# Comprehensive telemetry configuration
telemetry = TelemetryConfig(
    logging=LoggingConfig(
        filter=TelemetryFilter(core_level="INFO", other_level="WARN"),
        forwarding=LogForwardingConfig(logger=logging.getLogger("temporal"))
    ),
    metrics=PrometheusConfig(
        bind_address="0.0.0.0:9090",
        durations_as_seconds=True
    ),
    global_tags={
        "service": "order-service",
        "environment": "production",
        "region": "us-west-2"
    },
    attach_service_name=True,
    metric_prefix="orders_temporal_"
)

Integration with Clients and Workers

The runtime is automatically used by clients and workers when set as default.

from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.runtime import Runtime, TelemetryConfig, PrometheusConfig

# Configure runtime before creating clients
telemetry = TelemetryConfig(
    metrics=PrometheusConfig(bind_address="0.0.0.0:9090")
)
Runtime.set_default(Runtime(telemetry=telemetry))

# Clients and workers will use the configured runtime
client = await Client.connect("localhost:7233")
worker = Worker(
    client,
    task_queue="my-queue",
    workflows=[MyWorkflow],
    activities=[my_activity]
)

# Access metrics from the runtime
meter = Runtime.default().metric_meter
workflow_counter = meter.create_counter(
    "workflows_executed",
    "Number of workflows executed"
)

Types

from typing import Union, Optional, Sequence, Mapping, Protocol
from datetime import timedelta
from enum import Enum
import logging

# Metric buffer constants
BufferedMetricKind = NewType("BufferedMetricKind", int)
BUFFERED_METRIC_KIND_COUNTER = BufferedMetricKind(0)
BUFFERED_METRIC_KIND_GAUGE = BufferedMetricKind(1)
BUFFERED_METRIC_KIND_HISTOGRAM = BufferedMetricKind(2)

# Configuration types
MetricsConfig = Union[OpenTelemetryConfig, PrometheusConfig, MetricBuffer]
LogFilter = Union[TelemetryFilter, str]

Performance Considerations

  • Runtime Creation: Each runtime creates a new internal thread pool, so create sparingly and reuse
  • Buffer Size: Set MetricBuffer size to a high value (tens of thousands) and drain regularly
  • Log Forwarding: Avoid TRACE level logging with log forwarding as it can be high volume
  • Metric Periodicity: Balance between observability needs and performance overhead

Best Practices

  1. Set Runtime Early: Configure the runtime before creating any clients or workers
  2. Single Runtime: Use one runtime instance per application to avoid resource duplication
  3. Metric Naming: Use consistent prefixes and global tags for better metric organization
  4. Log Levels: Use appropriate log levels for production (WARN for Core, ERROR for others)
  5. Metrics Export: Choose the appropriate metrics backend for your monitoring infrastructure

Install with Tessl CLI

npx tessl i tessl/pypi-temporalio

docs

activity.md

client.md

common.md

contrib-pydantic.md

data-conversion.md

exceptions.md

index.md

runtime.md

testing.md

worker.md

workflow.md

tile.json