0
# Event System
1
2
Publish-subscribe event system for asynchronous inter-service communication with reliable message delivery, flexible routing patterns, and event-driven architecture support.
3
4
## Capabilities
5
6
### Event Handler Decorator
7
8
Decorator that subscribes service methods to specific events, enabling asynchronous event-driven processing.
9
10
```python { .api }
11
def event_handler(source_service, event_type, **kwargs):
12
"""
13
Decorator to handle events from other services.
14
15
Parameters:
16
- source_service: Name of the service that publishes the event
17
- event_type: Type/name of the event to handle
18
- handler_type: Optional handler type ('broadcast' or 'service_pool')
19
- reliable_delivery: Whether to ensure reliable message delivery
20
- requeue_on_error: Whether to requeue messages on handler errors
21
22
Returns:
23
Decorated method that will be called when matching events are received
24
"""
25
```
26
27
**Usage Example:**
28
29
```python
30
from nameko.events import event_handler
31
32
class EmailService:
33
name = "email_service"
34
35
@event_handler('user_service', 'user_registered')
36
def send_welcome_email(self, payload):
37
user_email = payload['email']
38
user_name = payload['name']
39
# Send welcome email logic
40
self._send_email(user_email, 'Welcome!', f'Hello {user_name}')
41
42
@event_handler('order_service', 'order_completed',
43
handler_type='broadcast', reliable_delivery=True)
44
def send_order_confirmation(self, payload):
45
order_id = payload['order_id']
46
customer_email = payload['customer_email']
47
# Send order confirmation
48
self._send_email(customer_email, 'Order Confirmation',
49
f'Your order {order_id} is confirmed')
50
```
51
52
### Event Dispatcher
53
54
Dependency provider that enables services to publish events to the event bus for consumption by event handlers.
55
56
```python { .api }
57
class EventDispatcher:
58
"""
59
Dependency provider for dispatching events to other services.
60
"""
61
62
def __call__(self, event_type, event_data):
63
"""
64
Dispatch an event to the event bus.
65
66
Parameters:
67
- event_type: Type/name of the event
68
- event_data: Dictionary containing event payload data
69
"""
70
```
71
72
**Usage Example:**
73
74
```python
75
from nameko.rpc import rpc
76
from nameko.events import EventDispatcher
77
78
class UserService:
79
name = "user_service"
80
81
event_dispatcher = EventDispatcher()
82
83
@rpc
84
def create_user(self, user_data):
85
# Create user in database
86
user_id = self._save_user(user_data)
87
88
# Dispatch event to notify other services
89
self.event_dispatcher('user_created', {
90
'user_id': user_id,
91
'email': user_data['email'],
92
'name': user_data['name'],
93
'created_at': time.time()
94
})
95
96
return {'user_id': user_id, 'status': 'created'}
97
98
@rpc
99
def update_user_email(self, user_id, new_email):
100
# Update email in database
101
old_email = self._update_user_email(user_id, new_email)
102
103
# Dispatch event about email change
104
self.event_dispatcher('user_email_changed', {
105
'user_id': user_id,
106
'old_email': old_email,
107
'new_email': new_email,
108
'changed_at': time.time()
109
})
110
111
return {'status': 'updated'}
112
```
113
114
### Event Handler Types
115
116
Different handler types provide different delivery semantics for event processing.
117
118
**Service Pool Handler (Default):**
119
- Events are load-balanced across instances of the same service
120
- Only one instance processes each event
121
- Good for work distribution
122
123
```python
124
@event_handler('user_service', 'user_created', handler_type='service_pool')
125
def process_user_created(self, payload):
126
# Only one instance of this service will process this event
127
pass
128
```
129
130
**Broadcast Handler:**
131
- Events are delivered to all instances of the service
132
- Every instance processes the event
133
- Good for cache invalidation, logging
134
135
```python
136
@event_handler('user_service', 'user_created', handler_type='broadcast')
137
def invalidate_user_cache(self, payload):
138
# All instances will invalidate their user cache
139
user_id = payload['user_id']
140
self.cache.delete(f'user:{user_id}')
141
```
142
143
### Reliable Delivery
144
145
Event handlers can be configured for reliable delivery to ensure events are not lost.
146
147
```python { .api }
148
@event_handler('payment_service', 'payment_failed',
149
reliable_delivery=True, requeue_on_error=True)
150
def handle_payment_failure(self, payload):
151
"""
152
Parameters:
153
- reliable_delivery: Ensures message is acknowledged only after successful processing
154
- requeue_on_error: Requeues message if handler raises an exception
155
"""
156
```
157
158
### Event Filtering
159
160
Event handlers can include additional filtering criteria beyond service and event type.
161
162
```python
163
from nameko.events import event_handler, BROADCAST
164
165
class NotificationService:
166
name = "notification_service"
167
168
@event_handler('order_service', 'order_status_changed',
169
handler_type=BROADCAST)
170
def notify_status_change(self, payload):
171
# Filter based on event content
172
if payload.get('status') in ['shipped', 'delivered']:
173
customer_id = payload['customer_id']
174
self._send_notification(customer_id, payload)
175
```
176
177
### Event Message Structure
178
179
Events follow a standard message structure for consistency across services.
180
181
```python { .api }
182
# Event message structure
183
{
184
"source_service": "user_service",
185
"event_type": "user_created",
186
"data": {
187
"user_id": 123,
188
"email": "user@example.com",
189
"name": "John Doe"
190
},
191
"metadata": {
192
"timestamp": "2023-01-01T12:00:00Z",
193
"correlation_id": "abc-123-def",
194
"event_id": "event-uuid-456"
195
}
196
}
197
```
198
199
### Dead Letter Handling
200
201
Failed event processing can be configured to route messages to dead letter queues for manual inspection.
202
203
```python
204
# Configuration for dead letter handling
205
EVENT_HANDLER_CONFIG = {
206
'dead_letter_ttl': 86400000, # 24 hours in milliseconds
207
'max_delivery_attempts': 5,
208
'dead_letter_exchange': 'dlx'
209
}
210
```
211
212
### Performance Considerations
213
214
- **Event Ordering**: Events are not guaranteed to be processed in order unless using single consumer
215
- **Event Size**: Keep event payloads small; include only necessary data
216
- **Handler Performance**: Long-running handlers should be avoided or processed asynchronously
217
- **Queue Management**: Monitor queue depths to prevent memory issues
218
219
**Best Practices:**
220
221
```python
222
class OptimizedEventHandler:
223
name = "optimized_service"
224
225
@event_handler('user_service', 'user_bulk_import',
226
handler_type='service_pool')
227
def process_bulk_import(self, payload):
228
# Process in batches for better performance
229
user_ids = payload['user_ids']
230
batch_size = 100
231
232
for i in range(0, len(user_ids), batch_size):
233
batch = user_ids[i:i + batch_size]
234
self._process_user_batch(batch)
235
```