Temporal.io Python SDK for building distributed, scalable, durable, and highly available workflows and activities.
Comprehensive exception hierarchy for handling errors in Temporal workflows, activities, and client operations. All Temporal exceptions inherit from TemporalError and provide detailed error information for proper error handling and debugging.
Foundation exception classes that provide common functionality for all Temporal errors.
class TemporalError(Exception):
@property
def cause(self) -> Optional[BaseException]: ...
class FailureError(TemporalError):
def __init__(
self,
message: str,
*,
failure: Optional[temporalio.api.failure.v1.Failure] = None,
exc_args: Optional[Tuple] = None,
): ...
@property
def message(self) -> str: ...
@property
def failure(self) -> Optional[temporalio.api.failure.v1.Failure]: ...User-defined errors with configurable retry behavior, error categories, and custom details.
class ApplicationErrorCategory(IntEnum):
UNSPECIFIED = 0
BENIGN = 1
class ApplicationError(FailureError):
def __init__(
self,
message: str,
*details: Any,
type: Optional[str] = None,
non_retryable: bool = False,
next_retry_delay: Optional[timedelta] = None,
category: ApplicationErrorCategory = ApplicationErrorCategory.UNSPECIFIED,
): ...
@property
def details(self) -> Sequence[Any]: ...
@property
def type(self) -> Optional[str]: ...
@property
def non_retryable(self) -> bool: ...
@property
def next_retry_delay(self) -> Optional[timedelta]: ...
@property
def category(self) -> ApplicationErrorCategory: ...from temporalio.exceptions import ApplicationError, ApplicationErrorCategory
from datetime import timedelta
# Basic application error
raise ApplicationError("Invalid input data", type="ValidationError")
# Non-retryable error with custom category
raise ApplicationError(
"Authentication failed",
{"user_id": "123", "ip": "192.168.1.1"},
type="AuthError",
non_retryable=True,
category=ApplicationErrorCategory.BENIGN
)
# Retryable error with custom delay
raise ApplicationError(
"Rate limit exceeded",
type="RateLimitError",
next_retry_delay=timedelta(minutes=5)
)Errors related to workflow execution, cancellation, and termination.
class CancelledError(FailureError):
def __init__(self, message: str = "Cancelled", *details: Any): ...
@property
def details(self) -> Sequence[Any]: ...
class TerminatedError(FailureError):
def __init__(self, message: str, *details: Any): ...
@property
def details(self) -> Sequence[Any]: ...
class WorkflowAlreadyStartedError(FailureError):
def __init__(
self, workflow_id: str, workflow_type: str, *, run_id: Optional[str] = None
): ...
workflow_id: str
workflow_type: str
run_id: Optional[str]Errors related to various types of timeouts in workflows and activities.
class TimeoutType(IntEnum):
START_TO_CLOSE = 0
SCHEDULE_TO_START = 1
SCHEDULE_TO_CLOSE = 2
HEARTBEAT = 3
class TimeoutError(FailureError):
def __init__(
self,
message: str,
*,
type: Optional[TimeoutType],
last_heartbeat_details: Sequence[Any],
): ...
@property
def type(self) -> Optional[TimeoutType]: ...
@property
def last_heartbeat_details(self) -> Sequence[Any]: ...from temporalio.exceptions import TimeoutError, TimeoutType, is_cancelled_exception
import temporalio.workflow as workflow
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self) -> str:
try:
result = await workflow.execute_activity(
my_activity,
"data",
schedule_to_close_timeout=timedelta(seconds=30)
)
return result
except TimeoutError as e:
if e.type == TimeoutType.SCHEDULE_TO_CLOSE:
# Handle activity timeout
return "Activity timed out"
except Exception as e:
if is_cancelled_exception(e):
# Handle cancellation
return "Workflow cancelled"
raiseErrors specific to activity execution, including retry information and activity metadata.
class RetryState(IntEnum):
IN_PROGRESS = 0
NON_RETRYABLE_FAILURE = 1
TIMEOUT = 2
MAXIMUM_ATTEMPTS_REACHED = 3
RETRY_POLICY_NOT_SET = 4
INTERNAL_SERVER_ERROR = 5
CANCEL_REQUESTED = 6
class ActivityError(FailureError):
def __init__(
self,
message: str,
*,
scheduled_event_id: int,
started_event_id: int,
identity: str,
activity_type: str,
activity_id: str,
retry_state: Optional[RetryState],
): ...
@property
def scheduled_event_id(self) -> int: ...
@property
def started_event_id(self) -> int: ...
@property
def identity(self) -> str: ...
@property
def activity_type(self) -> str: ...
@property
def activity_id(self) -> str: ...
@property
def retry_state(self) -> Optional[RetryState]: ...Errors from child workflow execution with detailed workflow information.
class ChildWorkflowError(FailureError):
def __init__(
self,
message: str,
*,
namespace: str,
workflow_id: str,
run_id: str,
workflow_type: str,
initiated_event_id: int,
started_event_id: int,
retry_state: Optional[RetryState],
): ...
@property
def namespace(self) -> str: ...
@property
def workflow_id(self) -> str: ...
@property
def run_id(self) -> str: ...
@property
def workflow_type(self) -> str: ...
@property
def initiated_event_id(self) -> int: ...
@property
def started_event_id(self) -> int: ...
@property
def retry_state(self) -> Optional[RetryState]: ...Errors that occur during client operations and workflow management.
class WorkflowFailureError(TemporalError):
pass
class WorkflowContinuedAsNewError(TemporalError):
pass
class WorkflowQueryRejectedError(TemporalError):
pass
class WorkflowQueryFailedError(TemporalError):
pass
class WorkflowUpdateFailedError(TemporalError):
pass
class RPCTimeoutOrCancelledError(TemporalError):
pass
class WorkflowUpdateRPCTimeoutOrCancelledError(RPCTimeoutOrCancelledError):
pass
class AsyncActivityCancelledError(TemporalError):
pass
class ScheduleAlreadyRunningError(TemporalError):
passErrors from communication with Temporal server and RPC operations.
class ServerError(FailureError):
def __init__(self, message: str, *, non_retryable: bool = False): ...
@property
def non_retryable(self) -> bool: ...
class RPCError(TemporalError):
passErrors from Nexus service operations with detailed operation metadata.
class NexusOperationError(FailureError):
def __init__(
self,
message: str,
*,
scheduled_event_id: int,
endpoint: str,
service: str,
operation: str,
operation_token: str,
): ...
@property
def scheduled_event_id(self) -> int: ...
@property
def endpoint(self) -> str: ...
@property
def service(self) -> str: ...
@property
def operation(self) -> str: ...
@property
def operation_token(self) -> str: ...Errors that occur within workflow execution context.
class ContinueAsNewError(BaseException):
pass
class NondeterminismError(TemporalError):
pass
class ReadOnlyContextError(TemporalError):
pass
class RestrictedWorkflowAccessError(NondeterminismError):
passHelper functions for error handling and classification.
def is_cancelled_exception(exception: BaseException) -> bool:
"""Check whether the given exception is considered a cancellation exception
according to Temporal.
This is often used in a conditional of a catch clause to check whether a
cancel occurred inside of a workflow. This can occur from
asyncio.CancelledError or CancelledError or either ActivityError or
ChildWorkflowError if either of those latter two have a CancelledError cause.
Args:
exception: Exception to check.
Returns:
True if a cancelled exception, false if not.
"""from temporalio.exceptions import (
is_cancelled_exception,
ActivityError,
ChildWorkflowError,
ApplicationError,
RetryState
)
import temporalio.workflow as workflow
@workflow.defn
class ErrorHandlingWorkflow:
@workflow.run
async def run(self) -> str:
try:
# Execute child workflow
result = await workflow.execute_child_workflow(
ChildWorkflow.run,
"data",
id="child-workflow"
)
return result
except ActivityError as e:
if e.retry_state == RetryState.MAXIMUM_ATTEMPTS_REACHED:
# Handle max retries reached
return f"Activity {e.activity_type} failed after max retries"
except ChildWorkflowError as e:
if e.retry_state == RetryState.NON_RETRYABLE_FAILURE:
# Handle non-retryable child workflow failure
return f"Child workflow {e.workflow_type} failed permanently"
except Exception as e:
if is_cancelled_exception(e):
# Handle any cancellation
return "Operation was cancelled"
# Re-raise unknown exceptions
raise
return "Success"All Temporal exceptions inherit from TemporalError, allowing for broad exception catching:
try:
# Temporal operations
result = await client.execute_workflow(...)
except TemporalError as e:
# Handle any Temporal-related error
print(f"Temporal error: {e}")
except Exception as e:
# Handle non-Temporal errors
print(f"Other error: {e}")Handle specific error types for precise error recovery:
try:
result = await workflow.execute_activity(activity_func, data)
except TimeoutError as e:
if e.type == TimeoutType.HEARTBEAT:
# Handle heartbeat timeout specifically
pass
except ApplicationError as e:
if e.type == "ValidationError":
# Handle validation errors
pass
elif e.non_retryable:
# Handle non-retryable errors
passUse the utility function to detect cancellation across different error types:
from temporalio.exceptions import is_cancelled_exception
try:
# Workflow operations
await workflow.execute_activity(...)
except Exception as e:
if is_cancelled_exception(e):
# Handle cancellation uniformly
return "Operation cancelled"
raiseInstall with Tessl CLI
npx tessl i tessl/pypi-temporalio