Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.
Runtime configuration for the Temporal Python SDK, including telemetry, logging, metrics collection, and performance monitoring. The runtime manages the underlying thread pool and provides observability features for Temporal applications.
Core runtime that manages internal resources and telemetry configuration for clients and workers.
class Runtime:
@staticmethod
def default() -> Runtime: ...
@staticmethod
def set_default(runtime: Runtime, *, error_if_already_set: bool = True) -> None: ...
def __init__(self, *, telemetry: TelemetryConfig) -> None: ...
@property
def metric_meter(self) -> temporalio.common.MetricMeter: ...from temporalio.runtime import Runtime, TelemetryConfig, LoggingConfig, PrometheusConfig
# Use default runtime
runtime = Runtime.default()
# Create custom runtime with metrics
telemetry = TelemetryConfig(
metrics=PrometheusConfig(bind_address="0.0.0.0:9090")
)
custom_runtime = Runtime(telemetry=telemetry)
# Set as default before creating clients/workers
Runtime.set_default(custom_runtime)
# Access metrics from runtime
meter = runtime.metric_meter
counter = meter.create_counter("workflow_count", "Number of workflows")Comprehensive telemetry configuration for logging, metrics, and observability.
@dataclass(frozen=True)
class TelemetryConfig:
logging: Optional[LoggingConfig] = LoggingConfig.default
metrics: Optional[Union[OpenTelemetryConfig, PrometheusConfig, MetricBuffer]] = None
global_tags: Mapping[str, str] = field(default_factory=dict)
attach_service_name: bool = True
metric_prefix: Optional[str] = NoneConfigure logging output from the Temporal Core and Python SDK components.
@dataclass(frozen=True)
class LoggingConfig:
filter: Union[TelemetryFilter, str]
forwarding: Optional[LogForwardingConfig] = None
default: ClassVar[LoggingConfig]
@dataclass
class TelemetryFilter:
core_level: str
other_level: str
def formatted(self) -> str: ...
@dataclass
class LogForwardingConfig:
logger: logging.Logger
append_target_to_name: bool = True
prepend_target_on_message: bool = True
overwrite_log_record_time: bool = True
append_log_fields_to_message: bool = Trueimport logging
from temporalio.runtime import (
TelemetryConfig,
LoggingConfig,
TelemetryFilter,
LogForwardingConfig
)
# Basic logging configuration
logging_config = LoggingConfig(
filter=TelemetryFilter(core_level="INFO", other_level="WARN")
)
# Forward Core logs to Python logger
logger = logging.getLogger("temporal.core")
logger.setLevel(logging.INFO)
forward_config = LogForwardingConfig(
logger=logger,
append_target_to_name=True,
prepend_target_on_message=True
)
logging_config = LoggingConfig(
filter="INFO",
forwarding=forward_config
)
telemetry = TelemetryConfig(logging=logging_config)Multiple options for metrics collection including OpenTelemetry, Prometheus, and in-memory buffering.
Export metrics to Prometheus-compatible endpoints.
@dataclass(frozen=True)
class PrometheusConfig:
bind_address: str
counters_total_suffix: bool = False
unit_suffix: bool = False
durations_as_seconds: bool = False
histogram_bucket_overrides: Optional[Mapping[str, Sequence[float]]] = NoneExport metrics to OpenTelemetry collectors.
class OpenTelemetryMetricTemporality(Enum):
CUMULATIVE = 1
DELTA = 2
@dataclass(frozen=True)
class OpenTelemetryConfig:
url: str
headers: Optional[Mapping[str, str]] = None
metric_periodicity: Optional[timedelta] = None
metric_temporality: OpenTelemetryMetricTemporality = OpenTelemetryMetricTemporality.CUMULATIVE
durations_as_seconds: bool = False
http: bool = FalseIn-memory metrics collection for testing and custom processing.
class MetricBufferDurationFormat(Enum):
MILLISECONDS = 1
SECONDS = 2
class MetricBuffer:
def __init__(
self,
buffer_size: int,
duration_format: MetricBufferDurationFormat = MetricBufferDurationFormat.MILLISECONDS,
): ...
def retrieve_updates(self) -> Sequence[BufferedMetricUpdate]: ...
class BufferedMetric(Protocol):
@property
def name(self) -> str: ...
@property
def description(self) -> Optional[str]: ...
@property
def unit(self) -> Optional[str]: ...
@property
def kind(self) -> BufferedMetricKind: ...
class BufferedMetricUpdate(Protocol):
@property
def metric(self) -> BufferedMetric: ...
@property
def value(self) -> Union[int, float]: ...
@property
def attributes(self) -> temporalio.common.MetricAttributes: ...from temporalio.runtime import (
Runtime,
TelemetryConfig,
PrometheusConfig,
OpenTelemetryConfig,
MetricBuffer,
MetricBufferDurationFormat,
OpenTelemetryMetricTemporality
)
from datetime import timedelta
# Prometheus metrics
prometheus_config = PrometheusConfig(
bind_address="0.0.0.0:9090",
counters_total_suffix=True,
durations_as_seconds=True,
histogram_bucket_overrides={
"temporal_workflow_task_execution_time": [0.1, 0.5, 1.0, 5.0, 10.0]
}
)
# OpenTelemetry metrics
otel_config = OpenTelemetryConfig(
url="http://localhost:4317",
headers={"Authorization": "Bearer token"},
metric_periodicity=timedelta(seconds=10),
metric_temporality=OpenTelemetryMetricTemporality.DELTA,
durations_as_seconds=True
)
# Buffered metrics for testing
buffer = MetricBuffer(
buffer_size=10000,
duration_format=MetricBufferDurationFormat.SECONDS
)
# Use in telemetry configuration
telemetry = TelemetryConfig(
metrics=prometheus_config, # or otel_config or buffer
global_tags={"service": "my-service", "environment": "production"},
metric_prefix="myapp_temporal_"
)
runtime = Runtime(telemetry=telemetry)
# For buffered metrics, retrieve updates periodically
if isinstance(telemetry.metrics, MetricBuffer):
updates = buffer.retrieve_updates()
for update in updates:
print(f"Metric: {update.metric.name}, Value: {update.value}")Global tags, service naming, and metric prefixing for better observability organization.
# Comprehensive telemetry configuration
telemetry = TelemetryConfig(
logging=LoggingConfig(
filter=TelemetryFilter(core_level="INFO", other_level="WARN"),
forwarding=LogForwardingConfig(logger=logging.getLogger("temporal"))
),
metrics=PrometheusConfig(
bind_address="0.0.0.0:9090",
durations_as_seconds=True
),
global_tags={
"service": "order-service",
"environment": "production",
"region": "us-west-2"
},
attach_service_name=True,
metric_prefix="orders_temporal_"
)The runtime is automatically used by clients and workers when set as default.
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.runtime import Runtime, TelemetryConfig, PrometheusConfig
# Configure runtime before creating clients
telemetry = TelemetryConfig(
metrics=PrometheusConfig(bind_address="0.0.0.0:9090")
)
Runtime.set_default(Runtime(telemetry=telemetry))
# Clients and workers will use the configured runtime
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="my-queue",
workflows=[MyWorkflow],
activities=[my_activity]
)
# Access metrics from the runtime
meter = Runtime.default().metric_meter
workflow_counter = meter.create_counter(
"workflows_executed",
"Number of workflows executed"
)from typing import Union, Optional, Sequence, Mapping, Protocol
from datetime import timedelta
from enum import Enum
import logging
# Metric buffer constants
BufferedMetricKind = NewType("BufferedMetricKind", int)
BUFFERED_METRIC_KIND_COUNTER = BufferedMetricKind(0)
BUFFERED_METRIC_KIND_GAUGE = BufferedMetricKind(1)
BUFFERED_METRIC_KIND_HISTOGRAM = BufferedMetricKind(2)
# Configuration types
MetricsConfig = Union[OpenTelemetryConfig, PrometheusConfig, MetricBuffer]
LogFilter = Union[TelemetryFilter, str]Install with Tessl CLI
npx tessl i tessl/pypi-temporalio