or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actors.mdbrokers.mdcomposition.mdindex.mdmessages.mdmiddleware.mdrate-limiting.mdresults.mdworkers.md

actors.mddocs/

0

# Actors

1

2

The actor system in Dramatiq provides the foundation for defining and managing background tasks. Actors are functions or classes decorated to run asynchronously, supporting queues, priorities, retries, and custom processing options.

3

4

## Capabilities

5

6

### Actor Decorator

7

8

Transform regular functions into actors that can be executed asynchronously by workers.

9

10

```python { .api }

11

@actor(

12

fn=None, *,

13

actor_class=Actor,

14

actor_name: str = None,

15

queue_name: str = "default",

16

priority: int = 0,

17

broker: Broker = None,

18

**options

19

)

20

def decorated_function(*args, **kwargs): ...

21

```

22

23

**Parameters:**

24

- `fn`: Function to decorate (optional for parametrized decorator)

25

- `actor_class`: Actor class to use (default: Actor)

26

- `actor_name`: Actor name (defaults to function name)

27

- `queue_name`: Queue name (default: "default")

28

- `priority`: Priority level (default: 0, lower values = higher priority)

29

- `broker`: Broker instance (uses global broker if None)

30

- `**options`: Additional actor options for middleware/broker

31

32

**Usage:**

33

34

```python

35

# Simple actor

36

@dramatiq.actor

37

def send_email(to, subject, body):

38

print(f"Sending email to {to}: {subject}")

39

40

# Actor with options

41

@dramatiq.actor(queue_name="high_priority", priority=1, max_retries=5)

42

def process_payment(user_id, amount):

43

print(f"Processing ${amount} for user {user_id}")

44

45

# Send messages

46

send_email.send("user@example.com", "Hello", "Welcome!")

47

process_payment.send(123, 99.99)

48

```

49

50

### Actor Class

51

52

The core actor implementation that wraps functions for asynchronous execution.

53

54

```python { .api }

55

class Actor:

56

def __init__(self, fn, *, broker, actor_name, queue_name, priority, options):

57

"""

58

Create an actor instance.

59

60

Parameters:

61

- fn: Callable function (supports sync/async)

62

- broker: Broker instance

63

- actor_name: str - Name of the actor

64

- queue_name: str - Queue name

65

- priority: int - Actor priority (lower = higher priority)

66

- options: Dict[str, Any] - Arbitrary options for broker/middleware

67

"""

68

69

def message(self, *args, **kwargs) -> Message:

70

"""

71

Build a message without sending it.

72

73

Returns:

74

Message object that can be sent later or used in composition

75

"""

76

77

def message_with_options(self, *, args=(), kwargs=None, **options) -> Message:

78

"""

79

Build message with custom options.

80

81

Parameters:

82

- args: tuple - Arguments for the actor

83

- kwargs: dict - Keyword arguments for the actor

84

- **options: Additional message options (delay, etc.)

85

86

Returns:

87

Message object with specified options

88

"""

89

90

def send(self, *args, **kwargs) -> Message:

91

"""

92

Send message asynchronously to broker.

93

94

Returns:

95

Message object representing the enqueued task

96

"""

97

98

def send_with_options(self, *, args=(), kwargs=None, delay=None, **options) -> Message:

99

"""

100

Send message with custom options.

101

102

Parameters:

103

- args: tuple - Arguments for the actor

104

- kwargs: dict - Keyword arguments for the actor

105

- delay: int - Delay in milliseconds before processing

106

- **options: Additional message options

107

108

Returns:

109

Message object representing the enqueued task

110

"""

111

112

def __call__(self, *args, **kwargs):

113

"""

114

Execute the actor synchronously (for testing/development).

115

116

Returns:

117

Result of the wrapped function

118

"""

119

120

# Properties

121

logger: Logger # Actor's logger instance

122

fn: Callable # Underlying callable function

123

broker: Broker # Associated broker

124

actor_name: str # Actor name

125

queue_name: str # Queue name

126

priority: int # Priority level

127

options: Dict[str, Any] # Actor options

128

```

129

130

**Usage:**

131

132

```python

133

# Create actor manually

134

def my_function(x, y):

135

return x + y

136

137

my_actor = dramatiq.Actor(

138

my_function,

139

broker=dramatiq.get_broker(),

140

actor_name="adder",

141

queue_name="math",

142

priority=5,

143

options={"max_retries": 3}

144

)

145

146

# Use the actor

147

result_msg = my_actor.send(10, 20)

148

direct_result = my_actor(10, 20) # Synchronous execution

149

```

150

151

### Generic Actor Class

152

153

Base class for creating class-based actors with metaclass support and configuration via Meta class.

154

155

```python { .api }

156

class GenericActor:

157

"""

158

Base class for class-based actors.

159

160

Subclasses must implement the perform() method and can define

161

configuration through a Meta inner class.

162

"""

163

164

class Meta:

165

# Configuration options (all optional)

166

actor_name: str = None

167

queue_name: str = "default"

168

priority: int = 0

169

broker: Broker = None

170

# Any additional options...

171

172

def perform(self, *args, **kwargs):

173

"""

174

Abstract method that subclasses must implement.

175

This method contains the actual task logic.

176

177

Parameters:

178

- *args: Variable positional arguments

179

- **kwargs: Variable keyword arguments

180

181

Returns:

182

Task result (any type)

183

"""

184

raise NotImplementedError

185

```

186

187

**Usage:**

188

189

```python

190

class EmailActor(dramatiq.GenericActor):

191

class Meta:

192

queue_name = "emails"

193

priority = 10

194

max_retries = 5

195

196

def perform(self, to, subject, body):

197

# Email sending logic

198

print(f"Sending email to {to}: {subject}")

199

# ... actual email sending ...

200

return f"Email sent to {to}"

201

202

class PaymentActor(dramatiq.GenericActor):

203

class Meta:

204

queue_name = "payments"

205

priority = 1 # Higher priority

206

time_limit = 30000 # 30 seconds

207

208

def perform(self, user_id, amount, payment_method):

209

# Payment processing logic

210

print(f"Processing ${amount} payment for user {user_id}")

211

# ... payment processing ...

212

return {"status": "success", "transaction_id": "12345"}

213

214

# Send messages to class-based actors

215

EmailActor.send("user@example.com", "Welcome", "Thanks for signing up!")

216

PaymentActor.send(123, 99.99, "credit_card")

217

```

218

219

### Actor Options

220

221

Actors support various options that control their behavior and integration with middleware:

222

223

```python { .api }

224

# Common actor options

225

ACTOR_OPTIONS = {

226

# Retry configuration

227

"max_retries": int, # Maximum retry attempts (default: 20)

228

"min_backoff": int, # Minimum backoff in ms (default: 15000)

229

"max_backoff": int, # Maximum backoff in ms (default: 604800000)

230

"retry_when": Callable, # Function to determine if retry should occur

231

232

# Time limits

233

"time_limit": int, # Maximum execution time in ms (default: 600000)

234

235

# Age limits

236

"max_age": int, # Maximum message age in ms before rejection

237

238

# Results storage

239

"store_results": bool, # Whether to store results (default: False)

240

241

# Callbacks

242

"on_success": str, # Actor name to call on success

243

"on_failure": str, # Actor name to call on failure

244

245

# Custom options for specific middleware/brokers

246

# (any additional key-value pairs)

247

}

248

```

249

250

**Usage:**

251

252

```python

253

@dramatiq.actor(

254

queue_name="critical",

255

priority=1,

256

max_retries=3,

257

time_limit=60000, # 1 minute

258

store_results=True,

259

on_success="log_success",

260

on_failure="handle_failure"

261

)

262

def critical_task(data):

263

# Critical processing logic

264

return process_critical_data(data)

265

266

@dramatiq.actor

267

def log_success(message_data, result):

268

print(f"Task {message_data.message_id} succeeded: {result}")

269

270

@dramatiq.actor

271

def handle_failure(message_data, exception_data):

272

print(f"Task {message_data.message_id} failed: {exception_data}")

273

```

274

275

### Advanced Actor Patterns

276

277

#### Conditional Retries

278

279

```python

280

def should_retry(retries_so_far, exception):

281

"""Custom retry logic"""

282

if isinstance(exception, TemporaryError):

283

return retries_so_far < 5

284

return False

285

286

@dramatiq.actor(retry_when=should_retry)

287

def smart_retry_task(data):

288

# Task that uses custom retry logic

289

if random.random() < 0.3:

290

raise TemporaryError("Temporary failure")

291

return "Success"

292

```

293

294

#### Dynamic Actor Creation

295

296

```python

297

def create_actor(name, queue, priority=0):

298

"""Factory function for creating actors dynamically"""

299

300

@dramatiq.actor(

301

actor_name=name,

302

queue_name=queue,

303

priority=priority

304

)

305

def dynamic_task(*args, **kwargs):

306

print(f"Actor {name} processing: {args}, {kwargs}")

307

return f"Processed by {name}"

308

309

return dynamic_task

310

311

# Create actors dynamically

312

email_actor = create_actor("email_sender", "emails", priority=5)

313

sms_actor = create_actor("sms_sender", "sms", priority=3)

314

315

email_actor.send("user@example.com", "Hello")

316

sms_actor.send("+1234567890", "Hello")

317

```

318

319

#### Actor Composition with Message Building

320

321

```python

322

@dramatiq.actor

323

def step_one(data):

324

return {"processed": data, "step": 1}

325

326

@dramatiq.actor

327

def step_two(data):

328

return {"processed": data, "step": 2}

329

330

@dramatiq.actor

331

def final_step(data):

332

return {"final": data}

333

334

# Build messages for composition

335

msg1 = step_one.message({"input": "data"})

336

msg2 = step_two.message({"from_step_one": True})

337

msg3 = final_step.message({"complete": True})

338

339

# Create pipeline

340

pipeline = msg1 | msg2 | msg3

341

pipeline.run()

342

```

343

344

## Queue Name Validation

345

346

Queue names must follow specific naming rules:

347

348

```python { .api }

349

# Valid queue name pattern

350

QUEUE_NAME_PATTERN = r"[a-zA-Z_][a-zA-Z0-9._-]*"

351

352

# Valid examples:

353

"default" # ✓

354

"high_priority" # ✓

355

"user.emails" # ✓

356

"queue-1" # ✓

357

"_internal" # ✓

358

359

# Invalid examples:

360

"123-queue" # ✗ (starts with number)

361

"my queue" # ✗ (contains space)

362

"queue@domain" # ✗ (contains @)

363

```