or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-interface.mddependency-injection.mdevent-system.mdhttp-interface.mdindex.mdrpc-communication.mdservice-management.mdstandalone-clients.mdtesting-framework.mdtimer-scheduling.md

rpc-communication.mddocs/

0

# RPC Communication

1

2

Remote procedure call system that enables synchronous inter-service communication with automatic serialization, load balancing, error handling, and distributed tracing support.

3

4

## Capabilities

5

6

### RPC Entrypoint Decorator

7

8

Decorator that exposes service methods as RPC endpoints, making them callable from other services or standalone clients.

9

10

```python { .api }

11

def rpc(fn=None, expected_exceptions=()):

12

"""

13

Decorator to expose a service method as an RPC endpoint.

14

15

Parameters:

16

- fn: The function to decorate (used internally)

17

- expected_exceptions: Tuple of exception types that should be propagated to callers

18

19

Returns:

20

Decorated method that can be called remotely

21

"""

22

```

23

24

**Usage Example:**

25

26

```python

27

from nameko.rpc import rpc

28

from nameko.exceptions import BadRequest

29

30

class CalculatorService:

31

name = "calculator"

32

33

@rpc

34

def add(self, a, b):

35

return a + b

36

37

@rpc(expected_exceptions=(ValueError, BadRequest))

38

def divide(self, a, b):

39

if b == 0:

40

raise ValueError("Division by zero")

41

return a / b

42

```

43

44

### RPC Proxy

45

46

Dependency provider that enables services to make RPC calls to other services with automatic service discovery and load balancing.

47

48

```python { .api }

49

class RpcProxy:

50

"""

51

Dependency provider for making RPC calls to other services.

52

53

Parameters:

54

- target_service: Name of the target service to call

55

- **options: Additional options for the RPC proxy

56

"""

57

58

def __init__(self, target_service, **options): ...

59

```

60

61

**Usage Example:**

62

63

```python

64

from nameko.rpc import rpc, RpcProxy

65

66

class UserService:

67

name = "user_service"

68

69

# RPC proxy to calculator service

70

calculator_rpc = RpcProxy('calculator')

71

72

@rpc

73

def calculate_user_score(self, user_id, base_score, multiplier):

74

# Call remote service method

75

score = self.calculator_rpc.multiply(base_score, multiplier)

76

return {'user_id': user_id, 'score': score}

77

```

78

79

### Asynchronous RPC Calls

80

81

RPC proxies support asynchronous calls that return immediately with a reply object, allowing for non-blocking parallel service calls.

82

83

```python { .api }

84

class MethodProxy:

85

"""

86

Proxy for individual RPC methods with synchronous and asynchronous call support.

87

"""

88

89

def __call__(self, *args, **kwargs):

90

"""Make synchronous RPC call"""

91

...

92

93

def call_async(self, *args, **kwargs):

94

"""

95

Make asynchronous RPC call.

96

97

Returns:

98

RpcReply object that can be used to retrieve the result later

99

"""

100

...

101

102

class RpcReply:

103

"""

104

Represents a pending or completed asynchronous RPC call.

105

"""

106

107

def result(self):

108

"""

109

Get the result of the asynchronous RPC call.

110

Blocks until the result is available.

111

112

Returns:

113

The result of the RPC call

114

115

Raises:

116

Any exception that occurred during the remote call

117

"""

118

...

119

```

120

121

**Usage Example:**

122

123

```python

124

from nameko.rpc import rpc, RpcProxy

125

126

class OrderService:

127

name = "order_service"

128

129

payment_rpc = RpcProxy('payment_service')

130

inventory_rpc = RpcProxy('inventory_service')

131

email_rpc = RpcProxy('email_service')

132

133

@rpc

134

def process_order_parallel(self, order_data):

135

"""Process order with parallel service calls for better performance"""

136

137

# Start multiple async calls in parallel

138

payment_reply = self.payment_rpc.process_payment.call_async(

139

order_data['payment_info']

140

)

141

inventory_reply = self.inventory_rpc.reserve_items.call_async(

142

order_data['items']

143

)

144

email_reply = self.email_rpc.send_confirmation.call_async(

145

order_data['customer_email'],

146

order_data['order_id']

147

)

148

149

# Collect results (this blocks until all complete)

150

try:

151

payment_result = payment_reply.result()

152

inventory_result = inventory_reply.result()

153

email_result = email_reply.result()

154

155

return {

156

'status': 'success',

157

'payment': payment_result,

158

'inventory': inventory_result,

159

'email_sent': email_result

160

}

161

except Exception as e:

162

# Handle any failures in the parallel calls

163

return {'status': 'failed', 'error': str(e)}

164

165

@rpc

166

def get_order_summary_async(self, order_id):

167

"""Fetch order data from multiple services asynchronously"""

168

169

# Launch parallel requests

170

order_reply = self.order_db.get_order.call_async(order_id)

171

customer_reply = self.customer_rpc.get_customer.call_async(order_id)

172

shipping_reply = self.shipping_rpc.get_tracking.call_async(order_id)

173

174

# Wait for all results

175

order = order_reply.result()

176

customer = customer_reply.result()

177

shipping = shipping_reply.result()

178

179

return {

180

'order': order,

181

'customer': customer,

182

'shipping': shipping

183

}

184

```

185

186

### Error Handling

187

188

RPC calls automatically handle serialization and propagation of exceptions between services.

189

190

**Built-in Exception Types:**

191

192

```python { .api }

193

class RemoteError(NamekoException):

194

"""

195

Wraps exceptions that occurred in remote service calls.

196

197

Attributes:

198

- exc_type: Original exception type name

199

- exc_args: Original exception arguments

200

- exc_path: Service path where exception occurred

201

"""

202

203

class ServiceNotFound(NamekoException):

204

"""Raised when the target service cannot be found"""

205

206

class MethodNotFound(NamekoException):

207

"""Raised when the target method is not found on the service"""

208

```

209

210

**Exception Handling Example:**

211

212

```python

213

from nameko.rpc import rpc, RpcProxy

214

from nameko.exceptions import RemoteError, ServiceNotFound

215

216

class OrderService:

217

name = "order_service"

218

219

payment_rpc = RpcProxy('payment_service')

220

221

@rpc

222

def process_order(self, order_data):

223

try:

224

# This might raise a remote exception

225

payment_result = self.payment_rpc.process_payment(

226

order_data['payment_info']

227

)

228

return {'status': 'success', 'payment': payment_result}

229

except RemoteError as e:

230

# Handle remote service errors

231

return {'status': 'payment_failed', 'error': str(e)}

232

except ServiceNotFound:

233

# Handle service discovery failures

234

return {'status': 'service_unavailable'}

235

```

236

237

### Context Propagation

238

239

RPC calls automatically propagate context data like correlation IDs, user information, and tracing data across service boundaries.

240

241

**Context Data Access:**

242

243

```python

244

from nameko.contextdata import ContextDataProvider

245

246

class AuditService:

247

name = "audit_service"

248

249

context_data = ContextDataProvider()

250

251

@rpc

252

def log_action(self, action, data):

253

# Access context passed from calling service

254

user_id = self.context_data.get('user_id')

255

correlation_id = self.context_data.get('correlation_id')

256

257

return {

258

'action': action,

259

'user_id': user_id,

260

'correlation_id': correlation_id,

261

'timestamp': time.time()

262

}

263

```

264

265

### Performance Considerations

266

267

- **Connection Pooling**: RPC proxies automatically manage connection pools

268

- **Load Balancing**: Requests are distributed across available service instances

269

- **Timeouts**: Configure request timeouts to prevent hanging calls

270

- **Retries**: Built-in retry mechanisms for transient failures

271

272

**Configuration Example:**

273

274

```python

275

# config.yaml

276

AMQP_URI: 'amqp://guest:guest@localhost:5672//'

277

RPC_TIMEOUT: 30 # seconds

278

RPC_RETRY_POLICY:

279

max_retries: 3

280

interval_start: 0.1

281

interval_step: 0.2

282

```