Python finite state machine library with declarative API for sync and async applications
—
Event handling and state transition management including event triggers, transition definitions with conditions, guards and validators, and transition lifecycle callbacks.
Events trigger state transitions and carry data to callbacks and actions.
class Event(str):
"""
An event triggers transitions between states.
Events inherit from str and can be used as string identifiers.
They carry information about triggers and associated transitions.
Parameters:
- transitions: Event transitions (string, TransitionList, or None)
- id: Event identifier (optional, generated if not provided)
- name: Event name identifier (optional, derived from id)
"""
def __new__(cls, transitions=None, id=None, name=None, _sm=None): ...
@property
def id(self) -> str:
"""Get the event identifier."""
@property
def name(self) -> str:
"""Get the human-readable event name."""
@property
def transitions(self) -> TransitionList:
"""Get transitions associated with this event."""
def __call__(self, *args, **kwargs):
"""Trigger the event (for bound events)."""
def split(self, sep=None, maxsplit=-1):
"""Split event into multiple events (string method)."""
def is_same_event(self, event=None):
"""Check if this event matches the given event."""
def add_callback(self, callback, group=None, priority=None):
"""Add callback to event execution."""Runtime event instances bound to specific state machine instances.
class BoundEvent(Event):
"""
Event bound to a specific state machine instance.
Provides callable interface for triggering events directly.
"""
def __call__(self, *args, **kwargs):
"""Call the event to trigger associated transitions."""
async def __call__(self, *args, **kwargs):
"""Async version for async state machines."""Defines connections between states with optional conditions, guards, and actions.
class Transition:
"""
Represents a state transition with source, target, and conditions.
Parameters:
- source: Source State object
- target: Target State object
- event: Event trigger(s) - string, list of strings, or space-separated string
- internal: Internal transition flag (doesn't trigger entry/exit actions)
- validators: Validation callbacks invoked before transition
- cond: Condition callbacks that must evaluate to True
- unless: Condition callbacks that must evaluate to False
- on: Action callbacks executed during transition
- before: Callbacks executed before transition
- after: Callbacks executed after transition
"""
def __init__(self, source: State, target: State, event=None, internal: bool = False, validators=None, cond=None, unless=None, on=None, before=None, after=None): ...
@property
def source(self) -> State:
"""Get the source state."""
@property
def target(self) -> State:
"""Get the target state."""
@property
def event(self) -> str:
"""Get the event identifier."""
@property
def internal(self) -> bool:
"""Check if transition is internal."""
def match(self, event: str) -> bool:
"""Check if transition matches given event."""
def add_event(self, event):
"""Add event to this transition."""
def execute(self, machine, event_data):
"""Execute the transition."""Collection of transitions with callback support and fluent API.
class TransitionList:
"""
Collection of transitions supporting callbacks and fluent operations.
Allows chaining transitions and adding shared callbacks.
"""
def __init__(self, transitions=None): ...
def add_transitions(self, transitions):
"""Add transitions to the collection."""
def add_event(self, event):
"""Add event to all transitions in the collection."""
def unique_events(self):
"""Get unique events across all transitions."""
def __or__(self, other) -> TransitionList:
"""Chain transitions using | operator."""
def __getitem__(self, index):
"""Get transition by index."""
def __len__(self) -> int:
"""Get number of transitions in the list."""
def cond(self, *callbacks) -> TransitionList:
"""Add condition callbacks to all transitions."""
def unless(self, *callbacks) -> TransitionList:
"""Add unless callbacks to all transitions."""
def on(self, *callbacks) -> TransitionList:
"""Add action callbacks to all transitions."""
def before(self, *callbacks) -> TransitionList:
"""Add before callbacks to all transitions."""
def after(self, *callbacks) -> TransitionList:
"""Add after callbacks to all transitions."""from statemachine import StateMachine, State, Event
class OrderMachine(StateMachine):
# Define states
pending = State(initial=True)
paid = State()
shipped = State()
delivered = State(final=True)
cancelled = State(final=True)
# Define events and transitions
pay = pending.to(paid)
ship = paid.to(shipped)
deliver = shipped.to(delivered)
cancel = (
pending.to(cancelled)
| paid.to(cancelled)
| shipped.to(cancelled)
)
# Usage
order = OrderMachine()
order.send("pay") # Trigger pay event
order.pay() # Alternative direct call
print(order.current_state.id) # "paid"class ATMMachine(StateMachine):
idle = State(initial=True)
card_inserted = State()
pin_entered = State()
authenticated = State()
insert_card = idle.to(card_inserted)
enter_pin = card_inserted.to(pin_entered)
# Conditional transition with guard
authenticate = pin_entered.to(authenticated).cond("valid_pin")
reject = pin_entered.to(idle).unless("valid_pin")
def valid_pin(self, pin: str = None):
"""Guard function - return True to allow transition."""
return pin == "1234"
# Usage with event data
atm = ATMMachine()
atm.send("insert_card")
atm.send("enter_pin")
atm.send("authenticate", pin="1234") # Will succeedclass WorkflowMachine(StateMachine):
start = State(initial=True)
step1 = State()
step2 = State()
step3 = State()
completed = State(final=True)
error = State(final=True)
# Chain multiple transitions with shared conditions
process = (
start.to(step1)
| step1.to(step2).cond("step1_valid")
| step2.to(step3).cond("step2_valid")
| step3.to(completed).cond("step3_valid")
).on("log_progress").before("validate_step")
# Error transitions
handle_error = (
step1.to(error)
| step2.to(error)
| step3.to(error)
).unless("step_valid")
def validate_step(self, **kwargs):
print("Validating step...")
def log_progress(self, source: State, target: State, **kwargs):
print(f"Moving from {source.id} to {target.id}")
def step1_valid(self, data=None):
return data and data.get("step1_complete", False)
def step2_valid(self, data=None):
return data and data.get("step2_complete", False)
def step3_valid(self, data=None):
return data and data.get("step3_complete", False)
def step_valid(self, **kwargs):
return True # Simplified validationclass MediaPlayerMachine(StateMachine):
stopped = State(initial=True)
playing = State()
paused = State()
play = stopped.to(playing) | paused.to(playing)
pause = playing.to(paused)
stop = playing.to(stopped) | paused.to(stopped)
# Internal transition - doesn't trigger entry/exit actions
seek = playing.to(playing, internal=True).on("update_position")
def on_enter_playing(self):
print("Starting playback")
def on_exit_playing(self):
print("Stopping playback")
def update_position(self, position: int):
print(f"Seeking to position: {position}")
# Usage
player = MediaPlayerMachine()
player.send("play") # Prints: "Starting playback"
player.send("seek", position=30) # Prints: "Seeking to position: 30"
# No entry/exit messages due to internal=Trueclass NotificationMachine(StateMachine):
draft = State(initial=True)
scheduled = State()
sent = State(final=True)
schedule = draft.to(scheduled)
send = scheduled.to(sent)
def before_schedule(self, event: str, source: State, target: State,
when: str = None, recipients: list = None):
"""Parameters are automatically injected from event data."""
print(f"Scheduling {event} from {source.id} to {target.id}")
if when:
print(f"Scheduled for: {when}")
if recipients:
print(f"Recipients: {', '.join(recipients)}")
def on_send(self, message: str = "Default message"):
print(f"Sending: {message}")
# Usage with event parameters
notification = NotificationMachine()
notification.send(
"schedule",
when="2023-12-25 09:00",
recipients=["user1@example.com", "user2@example.com"]
)
notification.send("send", message="Happy Holidays!")import asyncio
from statemachine import StateMachine, State
class AsyncProcessMachine(StateMachine):
idle = State(initial=True)
processing = State()
completed = State(final=True)
start = idle.to(processing)
finish = processing.to(completed)
async def on_enter_processing(self, task_data=None):
"""Async action handler."""
print("Starting async processing...")
if task_data:
await self.process_data(task_data)
print("Processing completed")
async def process_data(self, data):
"""Simulate async work."""
await asyncio.sleep(2)
print(f"Processed: {data}")
# Usage
async def main():
machine = AsyncProcessMachine()
await machine.send_async("start", task_data="important_data")
await machine.send_async("finish")
asyncio.run(main())Install with Tessl CLI
npx tessl i tessl/pypi-python-statemachine