CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dramatiq

Background processing library for Python that provides fast and reliable distributed task processing with actors, message brokers, and comprehensive middleware

Pending
Overview
Eval results
Files

actors.mddocs/

Actors

The actor system in Dramatiq provides the foundation for defining and managing background tasks. Actors are functions or classes decorated to run asynchronously, supporting queues, priorities, retries, and custom processing options.

Capabilities

Actor Decorator

Transform regular functions into actors that can be executed asynchronously by workers.

@actor(
    fn=None, *,
    actor_class=Actor,
    actor_name: str = None,
    queue_name: str = "default",
    priority: int = 0,
    broker: Broker = None,
    **options
)
def decorated_function(*args, **kwargs): ...

Parameters:

  • fn: Function to decorate (optional for parametrized decorator)
  • actor_class: Actor class to use (default: Actor)
  • actor_name: Actor name (defaults to function name)
  • queue_name: Queue name (default: "default")
  • priority: Priority level (default: 0, lower values = higher priority)
  • broker: Broker instance (uses global broker if None)
  • **options: Additional actor options for middleware/broker

Usage:

# Simple actor
@dramatiq.actor
def send_email(to, subject, body):
    print(f"Sending email to {to}: {subject}")

# Actor with options
@dramatiq.actor(queue_name="high_priority", priority=1, max_retries=5)
def process_payment(user_id, amount):
    print(f"Processing ${amount} for user {user_id}")

# Send messages
send_email.send("user@example.com", "Hello", "Welcome!")
process_payment.send(123, 99.99)

Actor Class

The core actor implementation that wraps functions for asynchronous execution.

class Actor:
    def __init__(self, fn, *, broker, actor_name, queue_name, priority, options):
        """
        Create an actor instance.
        
        Parameters:
        - fn: Callable function (supports sync/async)
        - broker: Broker instance
        - actor_name: str - Name of the actor
        - queue_name: str - Queue name
        - priority: int - Actor priority (lower = higher priority)
        - options: Dict[str, Any] - Arbitrary options for broker/middleware
        """
    
    def message(self, *args, **kwargs) -> Message:
        """
        Build a message without sending it.
        
        Returns:
        Message object that can be sent later or used in composition
        """
    
    def message_with_options(self, *, args=(), kwargs=None, **options) -> Message:
        """
        Build message with custom options.
        
        Parameters:
        - args: tuple - Arguments for the actor
        - kwargs: dict - Keyword arguments for the actor
        - **options: Additional message options (delay, etc.)
        
        Returns:
        Message object with specified options
        """
    
    def send(self, *args, **kwargs) -> Message:
        """
        Send message asynchronously to broker.
        
        Returns:
        Message object representing the enqueued task
        """
    
    def send_with_options(self, *, args=(), kwargs=None, delay=None, **options) -> Message:
        """
        Send message with custom options.
        
        Parameters:
        - args: tuple - Arguments for the actor
        - kwargs: dict - Keyword arguments for the actor  
        - delay: int - Delay in milliseconds before processing
        - **options: Additional message options
        
        Returns:
        Message object representing the enqueued task
        """
    
    def __call__(self, *args, **kwargs):
        """
        Execute the actor synchronously (for testing/development).
        
        Returns:
        Result of the wrapped function
        """
    
    # Properties
    logger: Logger          # Actor's logger instance
    fn: Callable           # Underlying callable function
    broker: Broker         # Associated broker
    actor_name: str        # Actor name
    queue_name: str        # Queue name
    priority: int          # Priority level
    options: Dict[str, Any] # Actor options

Usage:

# Create actor manually
def my_function(x, y):
    return x + y

my_actor = dramatiq.Actor(
    my_function,
    broker=dramatiq.get_broker(),
    actor_name="adder",
    queue_name="math",
    priority=5,
    options={"max_retries": 3}
)

# Use the actor
result_msg = my_actor.send(10, 20)
direct_result = my_actor(10, 20)  # Synchronous execution

Generic Actor Class

Base class for creating class-based actors with metaclass support and configuration via Meta class.

class GenericActor:
    """
    Base class for class-based actors.
    
    Subclasses must implement the perform() method and can define
    configuration through a Meta inner class.
    """
    
    class Meta:
        # Configuration options (all optional)
        actor_name: str = None
        queue_name: str = "default"
        priority: int = 0
        broker: Broker = None
        # Any additional options...
    
    def perform(self, *args, **kwargs):
        """
        Abstract method that subclasses must implement.
        This method contains the actual task logic.
        
        Parameters:
        - *args: Variable positional arguments
        - **kwargs: Variable keyword arguments
        
        Returns:
        Task result (any type)
        """
        raise NotImplementedError

Usage:

class EmailActor(dramatiq.GenericActor):
    class Meta:
        queue_name = "emails"
        priority = 10
        max_retries = 5
    
    def perform(self, to, subject, body):
        # Email sending logic
        print(f"Sending email to {to}: {subject}")
        # ... actual email sending ...
        return f"Email sent to {to}"

class PaymentActor(dramatiq.GenericActor):
    class Meta:
        queue_name = "payments"
        priority = 1  # Higher priority
        time_limit = 30000  # 30 seconds
    
    def perform(self, user_id, amount, payment_method):
        # Payment processing logic
        print(f"Processing ${amount} payment for user {user_id}")
        # ... payment processing ...
        return {"status": "success", "transaction_id": "12345"}

# Send messages to class-based actors
EmailActor.send("user@example.com", "Welcome", "Thanks for signing up!")
PaymentActor.send(123, 99.99, "credit_card")

Actor Options

Actors support various options that control their behavior and integration with middleware:

# Common actor options
ACTOR_OPTIONS = {
    # Retry configuration
    "max_retries": int,          # Maximum retry attempts (default: 20)
    "min_backoff": int,          # Minimum backoff in ms (default: 15000)
    "max_backoff": int,          # Maximum backoff in ms (default: 604800000)
    "retry_when": Callable,      # Function to determine if retry should occur
    
    # Time limits
    "time_limit": int,           # Maximum execution time in ms (default: 600000)
    
    # Age limits
    "max_age": int,              # Maximum message age in ms before rejection
    
    # Results storage
    "store_results": bool,       # Whether to store results (default: False)
    
    # Callbacks
    "on_success": str,           # Actor name to call on success
    "on_failure": str,           # Actor name to call on failure
    
    # Custom options for specific middleware/brokers
    # (any additional key-value pairs)
}

Usage:

@dramatiq.actor(
    queue_name="critical",
    priority=1,
    max_retries=3,
    time_limit=60000,  # 1 minute
    store_results=True,
    on_success="log_success",
    on_failure="handle_failure"
)
def critical_task(data):
    # Critical processing logic
    return process_critical_data(data)

@dramatiq.actor
def log_success(message_data, result):
    print(f"Task {message_data.message_id} succeeded: {result}")

@dramatiq.actor
def handle_failure(message_data, exception_data):
    print(f"Task {message_data.message_id} failed: {exception_data}")

Advanced Actor Patterns

Conditional Retries

def should_retry(retries_so_far, exception):
    """Custom retry logic"""
    if isinstance(exception, TemporaryError):
        return retries_so_far < 5
    return False

@dramatiq.actor(retry_when=should_retry)
def smart_retry_task(data):
    # Task that uses custom retry logic
    if random.random() < 0.3:
        raise TemporaryError("Temporary failure")
    return "Success"

Dynamic Actor Creation

def create_actor(name, queue, priority=0):
    """Factory function for creating actors dynamically"""
    
    @dramatiq.actor(
        actor_name=name,
        queue_name=queue,
        priority=priority
    )
    def dynamic_task(*args, **kwargs):
        print(f"Actor {name} processing: {args}, {kwargs}")
        return f"Processed by {name}"
    
    return dynamic_task

# Create actors dynamically
email_actor = create_actor("email_sender", "emails", priority=5)
sms_actor = create_actor("sms_sender", "sms", priority=3)

email_actor.send("user@example.com", "Hello")
sms_actor.send("+1234567890", "Hello")

Actor Composition with Message Building

@dramatiq.actor
def step_one(data):
    return {"processed": data, "step": 1}

@dramatiq.actor
def step_two(data):
    return {"processed": data, "step": 2}

@dramatiq.actor
def final_step(data):
    return {"final": data}

# Build messages for composition
msg1 = step_one.message({"input": "data"})
msg2 = step_two.message({"from_step_one": True})
msg3 = final_step.message({"complete": True})

# Create pipeline
pipeline = msg1 | msg2 | msg3
pipeline.run()

Queue Name Validation

Queue names must follow specific naming rules:

# Valid queue name pattern
QUEUE_NAME_PATTERN = r"[a-zA-Z_][a-zA-Z0-9._-]*"

# Valid examples:
"default"           # ✓
"high_priority"     # ✓
"user.emails"       # ✓
"queue-1"           # ✓
"_internal"         # ✓

# Invalid examples:
"123-queue"         # ✗ (starts with number)
"my queue"          # ✗ (contains space)
"queue@domain"      # ✗ (contains @)

Install with Tessl CLI

npx tessl i tessl/pypi-dramatiq

docs

actors.md

brokers.md

composition.md

index.md

messages.md

middleware.md

rate-limiting.md

results.md

workers.md

tile.json