or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-interface.mddependency-injection.mdevent-system.mdhttp-interface.mdindex.mdrpc-communication.mdservice-management.mdstandalone-clients.mdtesting-framework.mdtimer-scheduling.md

event-system.mddocs/

0

# Event System

1

2

Publish-subscribe event system for asynchronous inter-service communication with reliable message delivery, flexible routing patterns, and event-driven architecture support.

3

4

## Capabilities

5

6

### Event Handler Decorator

7

8

Decorator that subscribes service methods to specific events, enabling asynchronous event-driven processing.

9

10

```python { .api }

11

def event_handler(source_service, event_type, **kwargs):

12

"""

13

Decorator to handle events from other services.

14

15

Parameters:

16

- source_service: Name of the service that publishes the event

17

- event_type: Type/name of the event to handle

18

- handler_type: Optional handler type ('broadcast' or 'service_pool')

19

- reliable_delivery: Whether to ensure reliable message delivery

20

- requeue_on_error: Whether to requeue messages on handler errors

21

22

Returns:

23

Decorated method that will be called when matching events are received

24

"""

25

```

26

27

**Usage Example:**

28

29

```python

30

from nameko.events import event_handler

31

32

class EmailService:

33

name = "email_service"

34

35

@event_handler('user_service', 'user_registered')

36

def send_welcome_email(self, payload):

37

user_email = payload['email']

38

user_name = payload['name']

39

# Send welcome email logic

40

self._send_email(user_email, 'Welcome!', f'Hello {user_name}')

41

42

@event_handler('order_service', 'order_completed',

43

handler_type='broadcast', reliable_delivery=True)

44

def send_order_confirmation(self, payload):

45

order_id = payload['order_id']

46

customer_email = payload['customer_email']

47

# Send order confirmation

48

self._send_email(customer_email, 'Order Confirmation',

49

f'Your order {order_id} is confirmed')

50

```

51

52

### Event Dispatcher

53

54

Dependency provider that enables services to publish events to the event bus for consumption by event handlers.

55

56

```python { .api }

57

class EventDispatcher:

58

"""

59

Dependency provider for dispatching events to other services.

60

"""

61

62

def __call__(self, event_type, event_data):

63

"""

64

Dispatch an event to the event bus.

65

66

Parameters:

67

- event_type: Type/name of the event

68

- event_data: Dictionary containing event payload data

69

"""

70

```

71

72

**Usage Example:**

73

74

```python

75

from nameko.rpc import rpc

76

from nameko.events import EventDispatcher

77

78

class UserService:

79

name = "user_service"

80

81

event_dispatcher = EventDispatcher()

82

83

@rpc

84

def create_user(self, user_data):

85

# Create user in database

86

user_id = self._save_user(user_data)

87

88

# Dispatch event to notify other services

89

self.event_dispatcher('user_created', {

90

'user_id': user_id,

91

'email': user_data['email'],

92

'name': user_data['name'],

93

'created_at': time.time()

94

})

95

96

return {'user_id': user_id, 'status': 'created'}

97

98

@rpc

99

def update_user_email(self, user_id, new_email):

100

# Update email in database

101

old_email = self._update_user_email(user_id, new_email)

102

103

# Dispatch event about email change

104

self.event_dispatcher('user_email_changed', {

105

'user_id': user_id,

106

'old_email': old_email,

107

'new_email': new_email,

108

'changed_at': time.time()

109

})

110

111

return {'status': 'updated'}

112

```

113

114

### Event Handler Types

115

116

Different handler types provide different delivery semantics for event processing.

117

118

**Service Pool Handler (Default):**

119

- Events are load-balanced across instances of the same service

120

- Only one instance processes each event

121

- Good for work distribution

122

123

```python

124

@event_handler('user_service', 'user_created', handler_type='service_pool')

125

def process_user_created(self, payload):

126

# Only one instance of this service will process this event

127

pass

128

```

129

130

**Broadcast Handler:**

131

- Events are delivered to all instances of the service

132

- Every instance processes the event

133

- Good for cache invalidation, logging

134

135

```python

136

@event_handler('user_service', 'user_created', handler_type='broadcast')

137

def invalidate_user_cache(self, payload):

138

# All instances will invalidate their user cache

139

user_id = payload['user_id']

140

self.cache.delete(f'user:{user_id}')

141

```

142

143

### Reliable Delivery

144

145

Event handlers can be configured for reliable delivery to ensure events are not lost.

146

147

```python { .api }

148

@event_handler('payment_service', 'payment_failed',

149

reliable_delivery=True, requeue_on_error=True)

150

def handle_payment_failure(self, payload):

151

"""

152

Parameters:

153

- reliable_delivery: Ensures message is acknowledged only after successful processing

154

- requeue_on_error: Requeues message if handler raises an exception

155

"""

156

```

157

158

### Event Filtering

159

160

Event handlers can include additional filtering criteria beyond service and event type.

161

162

```python

163

from nameko.events import event_handler, BROADCAST

164

165

class NotificationService:

166

name = "notification_service"

167

168

@event_handler('order_service', 'order_status_changed',

169

handler_type=BROADCAST)

170

def notify_status_change(self, payload):

171

# Filter based on event content

172

if payload.get('status') in ['shipped', 'delivered']:

173

customer_id = payload['customer_id']

174

self._send_notification(customer_id, payload)

175

```

176

177

### Event Message Structure

178

179

Events follow a standard message structure for consistency across services.

180

181

```python { .api }

182

# Event message structure

183

{

184

"source_service": "user_service",

185

"event_type": "user_created",

186

"data": {

187

"user_id": 123,

188

"email": "user@example.com",

189

"name": "John Doe"

190

},

191

"metadata": {

192

"timestamp": "2023-01-01T12:00:00Z",

193

"correlation_id": "abc-123-def",

194

"event_id": "event-uuid-456"

195

}

196

}

197

```

198

199

### Dead Letter Handling

200

201

Failed event processing can be configured to route messages to dead letter queues for manual inspection.

202

203

```python

204

# Configuration for dead letter handling

205

EVENT_HANDLER_CONFIG = {

206

'dead_letter_ttl': 86400000, # 24 hours in milliseconds

207

'max_delivery_attempts': 5,

208

'dead_letter_exchange': 'dlx'

209

}

210

```

211

212

### Performance Considerations

213

214

- **Event Ordering**: Events are not guaranteed to be processed in order unless using single consumer

215

- **Event Size**: Keep event payloads small; include only necessary data

216

- **Handler Performance**: Long-running handlers should be avoided or processed asynchronously

217

- **Queue Management**: Monitor queue depths to prevent memory issues

218

219

**Best Practices:**

220

221

```python

222

class OptimizedEventHandler:

223

name = "optimized_service"

224

225

@event_handler('user_service', 'user_bulk_import',

226

handler_type='service_pool')

227

def process_bulk_import(self, payload):

228

# Process in batches for better performance

229

user_ids = payload['user_ids']

230

batch_size = 100

231

232

for i in range(0, len(user_ids), batch_size):

233

batch = user_ids[i:i + batch_size]

234

self._process_user_batch(batch)

235

```