Python finite state machine library with declarative API for sync and async applications
—
Lifecycle hooks, action handlers, callback registration, and dependency injection for state machine events including entry/exit actions, transition callbacks, and custom event handlers.
Methods for registering callbacks on transitions and events with support for different callback types and priorities.
class AddCallbacksMixin:
"""Mixin providing callback registration functionality."""
def add_callback(self, callback, group=None, priority=None, specs=None):
"""
Add callback to be executed during event processing.
Parameters:
- callback: Callable to execute (function, method, or string reference)
- group: CallbackGroup enum value (VALIDATORS, CONDITIONS, ACTIONS)
- priority: CallbackPriority enum value (HIGH, NORMAL, LOW)
- specs: SpecReference flags for callback resolution
"""
def cond(self, *callbacks):
"""Add condition callbacks that must evaluate to True."""
def unless(self, *callbacks):
"""Add condition callbacks that must evaluate to False."""
def on(self, *callbacks):
"""Add action callbacks executed during transition."""
def before(self, *callbacks):
"""Add callbacks executed before transition."""
def after(self, *callbacks):
"""Add callbacks executed after transition."""
def validators(self, *callbacks):
"""Add validator callbacks for transition validation."""Enumerations and constants for callback organization and execution control.
class CallbackPriority(IntEnum):
"""Priority levels for callback execution order."""
HIGH = 10
NORMAL = 5
LOW = 1
class CallbackGroup(IntEnum):
"""Groups for organizing different types of callbacks."""
VALIDATORS = 1
CONDITIONS = 2
ACTIONS = 3
class SpecReference(IntFlag):
"""Reference types for callback specification resolution."""
NAME = 1 # String name references
CALLABLE = 2 # Direct callable references
PROPERTY = 4 # Property references
# Constants for common specification combinations
SPECS_ALL: SpecReference = NAME | CALLABLE | PROPERTY # All reference types
SPECS_SAFE: SpecReference = NAME # Only name references for safetyStandard callback method naming conventions that are automatically discovered and registered.
# Event-based callbacks (replace {event} with actual event name)
def before_{event}(self, event: str, source: State, target: State, **kwargs):
"""Called before event processing starts."""
def after_{event}(self, event: str, source: State, target: State, **kwargs):
"""Called after event processing completes."""
def on_{event}(self, event: str, source: State, target: State, **kwargs):
"""Called during event transition execution."""
# State-based callbacks (replace {state} with actual state id)
def on_enter_{state}(self, **kwargs):
"""Called when entering the specified state."""
def on_exit_{state}(self, **kwargs):
"""Called when exiting the specified state."""
# Generic lifecycle callbacks
def on_enter_state(self, state: State, **kwargs):
"""Called when entering any state."""
def on_exit_state(self, state: State, **kwargs):
"""Called when exiting any state."""
def on_transition(self, event: str, source: State, target: State, **kwargs):
"""Called during any transition."""Parameters automatically available to callback methods through dependency injection.
# Built-in parameters available to all callbacks:
# - event: str - The event identifier
# - machine: StateMachine - The state machine instance
# - model: object - The external model object (if provided)
# - source: State - The source state of transition
# - target: State - The target state of transition
# - state: State - The current state
# - transition: Transition - The transition object
# - event_data: TriggerData - Complete event data structure
def example_callback(self, event: str, source: State, target: State,
machine, model, transition, event_data,
custom_param: str = "default"):
"""Example showing available built-in parameters."""
passfrom statemachine import StateMachine, State
class ConnectionMachine(StateMachine):
disconnected = State(initial=True)
connecting = State()
connected = State()
error = State()
connect = disconnected.to(connecting)
success = connecting.to(connected)
failure = connecting.to(error)
disconnect = connected.to(disconnected)
retry = error.to(connecting)
def on_enter_connecting(self):
"""Called when entering connecting state."""
print("Attempting to connect...")
self.connection_attempts = getattr(self, 'connection_attempts', 0) + 1
def on_exit_connecting(self):
"""Called when leaving connecting state."""
print("Connection attempt finished")
def on_enter_connected(self):
"""Called when successfully connected."""
print("Connected successfully!")
self.connection_attempts = 0
def on_enter_error(self):
"""Called when connection fails."""
print(f"Connection failed (attempt {self.connection_attempts})")
def on_exit_error(self):
"""Called when leaving error state."""
print("Retrying connection...")
# Usage
conn = ConnectionMachine()
conn.send("connect") # Prints: "Attempting to connect..."
conn.send("success") # Prints: "Connection attempt finished" then "Connected successfully!"class OrderProcessingMachine(StateMachine):
created = State(initial=True)
validated = State()
paid = State()
shipped = State()
delivered = State(final=True)
cancelled = State(final=True)
validate = created.to(validated)
pay = validated.to(paid)
ship = paid.to(shipped)
deliver = shipped.to(delivered)
cancel = (
created.to(cancelled)
| validated.to(cancelled)
| paid.to(cancelled)
)
def before_validate(self, event: str, order_data: dict = None):
"""Called before validation starts."""
print(f"Starting {event} with order: {order_data.get('id', 'unknown')}")
return f"Validation started for order {order_data.get('id')}"
def after_validate(self, event: str, source: State, target: State,
order_data: dict = None):
"""Called after validation completes."""
print(f"Completed {event}: {source.id} -> {target.id}")
if order_data:
print(f"Order {order_data['id']} validation successful")
def before_pay(self, payment_method: str = "card", amount: float = 0.0):
"""Called before payment processing."""
print(f"Processing payment of ${amount:.2f} via {payment_method}")
def on_pay(self, payment_method: str = "card", amount: float = 0.0):
"""Called during payment processing."""
# Simulate payment processing
if amount > 0:
print(f"Payment of ${amount:.2f} processed successfully")
return True
return False
def before_cancel(self, reason: str = "user_request"):
"""Called before cancellation."""
print(f"Cancelling order. Reason: {reason}")
# Usage with event parameters
order = OrderProcessingMachine()
result = order.send("validate", order_data={"id": "ORD-001", "items": ["item1"]})
print(f"Result: {result}") # Prints return value from before_validate
order.send("pay", payment_method="credit_card", amount=99.99)
order.send("cancel", reason="inventory_unavailable")class AccountMachine(StateMachine):
inactive = State(initial=True)
active = State()
suspended = State()
closed = State(final=True)
activate = inactive.to(active)
suspend = active.to(suspended)
reactivate = suspended.to(active)
close = (
inactive.to(closed)
| suspended.to(closed)
| active.to(closed)
)
def before_activate(self, user_id: int = None, **kwargs):
"""Validator called before activation."""
if not user_id:
raise ValueError("User ID required for activation")
if not self.user_exists(user_id):
raise ValueError(f"User {user_id} not found")
print(f"Activating account for user {user_id}")
def on_activate(self, user_id: int = None):
"""Action performed during activation."""
self.current_user_id = user_id
print(f"Account activated for user {user_id}")
def before_suspend(self, reason: str = "policy_violation"):
"""Called before suspension with reason."""
print(f"Suspending account. Reason: {reason}")
self.suspension_reason = reason
def before_close(self, **kwargs):
"""Validator for account closure."""
if hasattr(self, 'current_user_id'):
print(f"Closing account for user {self.current_user_id}")
else:
print("Closing inactive account")
def user_exists(self, user_id: int) -> bool:
"""Simulate user validation."""
return user_id > 0 # Simplified validation
# Usage
account = AccountMachine()
account.send("activate", user_id=12345)
account.send("suspend", reason="suspicious_activity")
account.send("close")class DynamicMachine(StateMachine):
state1 = State(initial=True)
state2 = State()
state3 = State(final=True)
transition1 = state1.to(state2)
transition2 = state2.to(state3)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Add callbacks programmatically
self.transition1.before(self.log_transition)
self.transition1.cond(self.check_condition)
self.transition1.on(self.perform_action)
# Add multiple callbacks with priorities
self.transition2.add_callback(
self.high_priority_action,
group=CallbackGroup.ACTIONS,
priority=CallbackPriority.HIGH
)
self.transition2.add_callback(
self.low_priority_action,
group=CallbackGroup.ACTIONS,
priority=CallbackPriority.LOW
)
def log_transition(self, event: str, source: State, target: State):
"""Logging callback."""
print(f"Transitioning: {event} ({source.id} -> {target.id})")
def check_condition(self, allow_transition: bool = True):
"""Condition callback."""
print(f"Checking condition: {allow_transition}")
return allow_transition
def perform_action(self):
"""Action callback."""
print("Performing main action")
def high_priority_action(self):
"""High priority action (executed first)."""
print("High priority action")
def low_priority_action(self):
"""Low priority action (executed last)."""
print("Low priority action")
# Usage
machine = DynamicMachine()
machine.send("transition1", allow_transition=True)
machine.send("transition2")import asyncio
from statemachine import StateMachine, State
class AsyncWorkflowMachine(StateMachine):
pending = State(initial=True)
processing = State()
completed = State(final=True)
failed = State(final=True)
start = pending.to(processing)
complete = processing.to(completed)
fail = processing.to(failed)
async def on_enter_processing(self, task_id: str = None, **kwargs):
"""Async entry action."""
print(f"Starting async processing for task: {task_id}")
try:
result = await self.process_task(task_id)
print(f"Processing completed: {result}")
except Exception as e:
print(f"Processing failed: {e}")
# Automatically trigger failure transition
await self.send_async("fail")
async def before_complete(self, **kwargs):
"""Async before callback."""
print("Finalizing task...")
await asyncio.sleep(0.5) # Simulate cleanup
print("Finalization complete")
async def process_task(self, task_id: str):
"""Simulate async work."""
await asyncio.sleep(2)
if task_id == "fail_test":
raise Exception("Simulated failure")
return f"Result for {task_id}"
# Usage
async def main():
workflow = AsyncWorkflowMachine()
await workflow.send_async("start", task_id="task_001")
# Processing will complete automatically or fail
if workflow.processing.is_active:
await workflow.send_async("complete")
print(f"Final state: {workflow.current_state.id}")
asyncio.run(main())class AuditedMachine(StateMachine):
start = State(initial=True)
middle = State()
end = State(final=True)
next = start.to(middle) | middle.to(end)
def on_enter_state(self, state: State, **kwargs):
"""Called when entering any state."""
print(f"Entering state: {state.name}")
self.log_state_change("enter", state)
def on_exit_state(self, state: State, **kwargs):
"""Called when exiting any state."""
print(f"Exiting state: {state.name}")
self.log_state_change("exit", state)
def on_transition(self, event: str, source: State, target: State, **kwargs):
"""Called during any transition."""
print(f"Transition: {event} ({source.id} -> {target.id})")
self.audit_log.append({
"event": event,
"from": source.id,
"to": target.id,
"timestamp": self.get_timestamp()
})
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.audit_log = []
self.state_history = []
def log_state_change(self, action: str, state: State):
"""Log state changes."""
self.state_history.append(f"{action}:{state.id}")
def get_timestamp(self):
"""Get current timestamp."""
import datetime
return datetime.datetime.now().isoformat()
# Usage
machine = AuditedMachine()
machine.send("next") # start -> middle
machine.send("next") # middle -> end
print("Audit log:", machine.audit_log)
print("State history:", machine.state_history)Install with Tessl CLI
npx tessl i tessl/pypi-python-statemachine