0
# Distributed Objects
1
2
Perspective Broker (PB) system for distributed object communication and remote method calls across network boundaries. PB provides a way to transparently call methods on objects located on remote machines.
3
4
## Capabilities
5
6
### Perspective Broker Core
7
8
Core classes for creating remotely accessible objects and managing distributed communication.
9
10
```python { .api }
11
class pb.Referenceable:
12
"""
13
Base class for objects that can be referenced remotely.
14
15
Methods prefixed with 'remote_' are exposed for remote calls.
16
"""
17
def __init__(self):
18
"""Initialize referenceable object."""
19
20
def remote_methodName(self, *args, **kwargs):
21
"""
22
Example remote method.
23
24
Methods starting with 'remote_' are callable from remote clients.
25
Can return values directly or Deferreds for async operations.
26
"""
27
pass
28
29
def callRemote(self, method, *args, **kwargs):
30
"""
31
Call method on remote object.
32
33
Args:
34
method (str): Method name (without 'remote_' prefix)
35
*args, **kwargs: Method arguments
36
37
Returns:
38
Deferred: Method result
39
"""
40
41
class pb.Root:
42
"""
43
Root object for PB hierarchy - the initial object clients connect to.
44
45
The root object is what clients receive when they first connect.
46
"""
47
def __init__(self):
48
"""Initialize root object."""
49
50
def rootObject(self, broker):
51
"""
52
Get the root object for a broker connection.
53
54
Args:
55
broker: PB broker instance
56
57
Returns:
58
Referenceable: Root object for this connection
59
"""
60
return self
61
62
class pb.Avatar:
63
"""
64
Authenticated user representation in PB system.
65
66
Created by realm after successful authentication.
67
"""
68
def __init__(self):
69
"""Initialize avatar."""
70
71
def logout(self):
72
"""Called when user logs out."""
73
74
class pb.Perspective(pb.Avatar):
75
"""
76
User's view of the PB system after authentication.
77
78
Perspectives represent what an authenticated user can see and do.
79
"""
80
def __init__(self, avatarId):
81
"""
82
Args:
83
avatarId: Unique identifier for this user
84
"""
85
self.avatarId = avatarId
86
87
def perspective_methodName(self, *args, **kwargs):
88
"""
89
Perspective method callable by authenticated clients.
90
91
Methods prefixed with 'perspective_' are exposed to authenticated users.
92
"""
93
pass
94
95
class pb.Broker:
96
"""
97
Message broker managing PB protocol communication.
98
99
Handles serialization, transport, and method dispatch for PB.
100
"""
101
def __init__(self, isClient=True, security=None):
102
"""
103
Args:
104
isClient (bool): Whether this is a client broker
105
security: Security policy object
106
"""
107
108
def registerReference(self, object):
109
"""
110
Register object for remote references.
111
112
Args:
113
object: Object to register
114
115
Returns:
116
int: Reference ID
117
"""
118
119
def unregisterReference(self, refnum):
120
"""
121
Unregister object reference.
122
123
Args:
124
refnum (int): Reference ID to unregister
125
"""
126
```
127
128
**Basic PB Usage Example**:
129
130
```python
131
from twisted.spread import pb
132
from twisted.internet import reactor, endpoints, defer
133
134
# Server-side objects
135
class MathService(pb.Referenceable):
136
def remote_add(self, a, b):
137
return a + b
138
139
def remote_multiply(self, a, b):
140
return a * b
141
142
def remote_divide(self, a, b):
143
if b == 0:
144
raise ValueError("Division by zero")
145
return a / b
146
147
class Calculator(pb.Root):
148
def rootObject(self, broker):
149
return MathService()
150
151
# Start PB server
152
factory = pb.PBServerFactory(Calculator())
153
endpoint = endpoints.TCP4ServerEndpoint(reactor, 8800)
154
endpoint.listen(factory)
155
156
# Client code
157
@defer.inlineCallbacks
158
def client_example():
159
factory = pb.PBClientFactory()
160
endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", 8800)
161
162
yield endpoint.connect(factory)
163
root = yield factory.getRootObject()
164
165
# Call remote methods
166
result1 = yield root.callRemote("add", 5, 3)
167
print(f"5 + 3 = {result1}")
168
169
result2 = yield root.callRemote("multiply", 4, 7)
170
print(f"4 * 7 = {result2}")
171
172
try:
173
result3 = yield root.callRemote("divide", 10, 0)
174
except Exception as e:
175
print(f"Division error: {e}")
176
177
print("PB server running on port 8800")
178
reactor.run()
179
```
180
181
### PB Factories
182
183
Factory classes for creating PB client and server connections.
184
185
```python { .api }
186
class pb.PBClientFactory:
187
"""
188
Factory for PB client connections.
189
"""
190
def __init__(self):
191
"""Initialize client factory."""
192
self.deferred = defer.Deferred()
193
194
def getRootObject(self):
195
"""
196
Get root object from server.
197
198
Returns:
199
Deferred[RemoteReference]: Server's root object
200
"""
201
202
def login(self, credentials, client=None):
203
"""
204
Login with credentials.
205
206
Args:
207
credentials: User credentials
208
client: Client object for authenticated session
209
210
Returns:
211
Deferred[Avatar]: User avatar after authentication
212
"""
213
214
def buildProtocol(self, addr):
215
"""
216
Build client protocol.
217
218
Args:
219
addr: Connection address
220
221
Returns:
222
Broker: Client broker
223
"""
224
225
class pb.PBServerFactory:
226
"""
227
Factory for PB server connections.
228
"""
229
def __init__(self, root, security=None, portal=None):
230
"""
231
Args:
232
root: Root object or realm
233
security: Security policy
234
portal: Authentication portal
235
"""
236
self.root = root
237
self.security = security
238
self.portal = portal
239
240
def buildProtocol(self, addr):
241
"""
242
Build server protocol.
243
244
Args:
245
addr: Connection address
246
247
Returns:
248
Broker: Server broker
249
"""
250
251
def pb.connect(host, port, factory, contextFactory=None, bindAddress=None):
252
"""
253
Connect to PB server.
254
255
Args:
256
host (str): Server hostname
257
port (int): Server port
258
factory: Client factory
259
contextFactory: SSL context factory
260
bindAddress: Local bind address
261
262
Returns:
263
Deferred: Connection result
264
"""
265
266
def pb.getObjectAt(host, port, timeout=30):
267
"""
268
Get root object from PB server.
269
270
Args:
271
host (str): Server hostname
272
port (int): Server port
273
timeout (int): Connection timeout
274
275
Returns:
276
Deferred[RemoteReference]: Root object
277
"""
278
```
279
280
### Remote References
281
282
Proxy objects representing remote objects.
283
284
```python { .api }
285
class pb.RemoteReference:
286
"""
287
Proxy for remote object.
288
289
All method calls are transparently forwarded to the remote object.
290
"""
291
def callRemote(self, method, *args, **kwargs):
292
"""
293
Call method on remote object.
294
295
Args:
296
method (str): Method name
297
*args, **kwargs: Method arguments
298
299
Returns:
300
Deferred: Method result
301
"""
302
303
def remoteMethod(self, method):
304
"""
305
Get callable for remote method.
306
307
Args:
308
method (str): Method name
309
310
Returns:
311
callable: Function that calls remote method
312
"""
313
314
def notifyOnDisconnect(self, callback):
315
"""
316
Register callback for disconnection.
317
318
Args:
319
callback: Function to call on disconnect
320
"""
321
322
class pb.ViewPoint:
323
"""
324
Client's view of a remote perspective.
325
"""
326
def __init__(self, perspective, broker):
327
"""
328
Args:
329
perspective: Remote perspective object
330
broker: PB broker
331
"""
332
```
333
334
### Authentication Integration
335
336
Integration with Twisted's authentication system.
337
338
```python { .api }
339
class pb.IPerspective:
340
"""
341
Interface for PB perspectives.
342
"""
343
def perspectiveMessageReceived(broker, message, args, kw):
344
"""
345
Handle message from client.
346
347
Args:
348
broker: PB broker
349
message (str): Message name
350
args (tuple): Message arguments
351
kw (dict): Message keyword arguments
352
353
Returns:
354
Result of message handling
355
"""
356
357
class pb.Avatar(pb.IPerspective):
358
"""
359
Authenticated user avatar for PB.
360
"""
361
def __init__(self, avatarId):
362
self.avatarId = avatarId
363
364
def logout(self):
365
"""Called when user logs out."""
366
367
def perspectiveMessageReceived(self, broker, message, args, kw):
368
"""Handle perspective method calls."""
369
method = getattr(self, f'perspective_{message}', None)
370
if method:
371
return method(*args, **kw)
372
raise AttributeError(f"No such method: perspective_{message}")
373
374
class pb.Realm:
375
"""
376
PB realm for creating avatars.
377
"""
378
def requestAvatar(self, avatarId, mind, *interfaces):
379
"""
380
Create avatar for authenticated user.
381
382
Args:
383
avatarId: User identifier
384
mind: Client perspective
385
*interfaces: Requested interfaces
386
387
Returns:
388
tuple: (interface, avatar, logout_callable)
389
"""
390
if pb.IPerspective in interfaces:
391
avatar = pb.Avatar(avatarId)
392
return (pb.IPerspective, avatar, avatar.logout)
393
raise NotImplementedError()
394
```
395
396
**Authenticated PB Example**:
397
398
```python
399
from twisted.spread import pb
400
from twisted.cred import portal, checkers, credentials
401
from twisted.internet import reactor, endpoints, defer
402
403
# Server with authentication
404
class UserAvatar(pb.Avatar):
405
def __init__(self, username):
406
self.username = username
407
408
def perspective_getMessage(self):
409
return f"Hello, {self.username}!"
410
411
def perspective_getTime(self):
412
import time
413
return time.ctime()
414
415
class SimpleRealm:
416
def requestAvatar(self, avatarId, mind, *interfaces):
417
if pb.IPerspective in interfaces:
418
avatar = UserAvatar(avatarId.decode())
419
return (pb.IPerspective, avatar, avatar.logout)
420
raise NotImplementedError()
421
422
# Set up authentication
423
realm = SimpleRealm()
424
checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
425
checker.addUser(b"alice", b"password")
426
checker.addUser(b"bob", b"secret")
427
428
portal_obj = portal.Portal(realm, [checker])
429
430
# Start authenticated server
431
factory = pb.PBServerFactory(portal_obj)
432
endpoint = endpoints.TCP4ServerEndpoint(reactor, 8800)
433
endpoint.listen(factory)
434
435
# Authenticated client
436
@defer.inlineCallbacks
437
def authenticated_client():
438
factory = pb.PBClientFactory()
439
endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", 8800)
440
441
yield endpoint.connect(factory)
442
443
# Login with credentials
444
creds = credentials.UsernamePassword(b"alice", b"password")
445
avatar = yield factory.login(creds)
446
447
# Call perspective methods
448
message = yield avatar.callRemote("getMessage")
449
print(f"Message: {message}")
450
451
time_str = yield avatar.callRemote("getTime")
452
print(f"Server time: {time_str}")
453
454
authenticated_client()
455
reactor.run()
456
```
457
458
### Copyable Objects
459
460
Objects that can be copied across the network.
461
462
```python { .api }
463
class pb.Copyable:
464
"""
465
Base class for objects that can be copied to remote side.
466
467
The object is serialized and recreated on the remote side.
468
"""
469
def __init__(self):
470
"""Initialize copyable object."""
471
472
def getStateToCopy(self):
473
"""
474
Get state to copy to remote side.
475
476
Returns:
477
dict: Object state to copy
478
"""
479
return self.__dict__.copy()
480
481
class pb.RemoteCopy:
482
"""
483
Base class for receiving copied objects.
484
485
Paired with Copyable objects on the sending side.
486
"""
487
def setCopyableState(self, state):
488
"""
489
Restore object state from copy.
490
491
Args:
492
state (dict): Object state from remote side
493
"""
494
self.__dict__.update(state)
495
496
def pb.setUnjellyableForClass(classname, unjellyable):
497
"""
498
Register class for unjellying (deserializing).
499
500
Args:
501
classname (str): Class name
502
unjellyable: Class to create when unjellying
503
"""
504
505
class pb.Cacheable:
506
"""
507
Base class for objects that are cached on remote side.
508
509
Only one copy exists on remote side, updated when changed.
510
"""
511
def getStateToCacheAndObserveFor(self, perspective, observer):
512
"""
513
Get state to cache and set up observation.
514
515
Args:
516
perspective: Remote perspective
517
observer: Observer for changes
518
519
Returns:
520
dict: State to cache
521
"""
522
523
class pb.RemoteCache:
524
"""
525
Base class for cached remote objects.
526
"""
527
def setCacheableState(self, state):
528
"""
529
Set cached object state.
530
531
Args:
532
state (dict): Object state
533
"""
534
535
def observe_update(self, newState):
536
"""
537
Handle state update from remote side.
538
539
Args:
540
newState (dict): Updated state
541
"""
542
```
543
544
### Error Handling
545
546
PB-specific exceptions and error handling.
547
548
```python { .api }
549
class pb.Error(Exception):
550
"""Base class for PB errors."""
551
552
class pb.DeadReferenceError(pb.Error):
553
"""Reference to dead remote object."""
554
555
class pb.PBConnectionLost(pb.Error):
556
"""PB connection was lost."""
557
558
class pb.RemoteError(pb.Error):
559
"""Error from remote side."""
560
561
class pb.CopyableFailure:
562
"""
563
Serializable failure that can cross network boundaries.
564
"""
565
def __init__(self, failure):
566
"""
567
Args:
568
failure: Failure object to wrap
569
"""
570
self.type = qual(failure.type)
571
self.value = failure.getErrorMessage()
572
self.traceback = failure.getTraceback()
573
```
574
575
**Error Handling Example**:
576
577
```python
578
from twisted.spread import pb
579
from twisted.internet import defer
580
581
class ErrorService(pb.Referenceable):
582
def remote_causeError(self):
583
raise ValueError("Something went wrong!")
584
585
def remote_safeDivide(self, a, b):
586
try:
587
return a / b
588
except ZeroDivisionError:
589
# PB will serialize this exception
590
raise ValueError("Cannot divide by zero")
591
592
# Client error handling
593
@defer.inlineCallbacks
594
def handle_errors():
595
root = yield factory.getRootObject()
596
597
try:
598
yield root.callRemote("causeError")
599
except Exception as e:
600
print(f"Remote error: {e}")
601
602
try:
603
result = yield root.callRemote("safeDivide", 10, 0)
604
except ValueError as e:
605
print(f"Division error: {e}")
606
```
607
608
### Jelly Serialization
609
610
Low-level serialization system used by PB.
611
612
```python { .api }
613
def jelly.jelly(object):
614
"""
615
Serialize object to jelly format.
616
617
Args:
618
object: Python object to serialize
619
620
Returns:
621
Serialized representation
622
"""
623
624
def jelly.unjelly(jellyData):
625
"""
626
Deserialize object from jelly format.
627
628
Args:
629
jellyData: Serialized data
630
631
Returns:
632
Python object
633
"""
634
635
class jelly.Jellyable:
636
"""
637
Base class for objects that can control their serialization.
638
"""
639
def jellyFor(self, jellier):
640
"""
641
Custom serialization method.
642
643
Args:
644
jellier: Jellier instance
645
646
Returns:
647
Serialized representation
648
"""
649
650
class jelly.Unjellyable:
651
"""
652
Base class for objects that can control their deserialization.
653
"""
654
def unjellyFor(self, unjellier, jellyList):
655
"""
656
Custom deserialization method.
657
658
Args:
659
unjellier: Unjellier instance
660
jellyList: Serialized data
661
662
Returns:
663
Deserialized object
664
"""
665
```
666
667
**Complete PB Application Example**:
668
669
```python
670
from twisted.spread import pb
671
from twisted.internet import reactor, endpoints, defer
672
from twisted.cred import portal, checkers, credentials
673
from twisted.application import service
674
675
# Shared data objects
676
class SharedData(pb.Copyable, pb.RemoteCopy):
677
"""Data that can be copied across network."""
678
def __init__(self, name, value):
679
self.name = name
680
self.value = value
681
682
# Register for deserialization
683
pb.setUnjellyableForClass(SharedData, SharedData)
684
685
# Service implementation
686
class DataService(pb.Avatar):
687
def __init__(self, username):
688
self.username = username
689
self.data = {}
690
691
def perspective_store(self, key, data):
692
"""Store data object."""
693
self.data[key] = data
694
return f"Stored {key} for {self.username}"
695
696
def perspective_retrieve(self, key):
697
"""Retrieve data object."""
698
if key in self.data:
699
return self.data[key]
700
raise KeyError(f"No data for key: {key}")
701
702
def perspective_list_keys(self):
703
"""List all stored keys."""
704
return list(self.data.keys())
705
706
# Realm
707
class DataRealm:
708
def requestAvatar(self, avatarId, mind, *interfaces):
709
if pb.IPerspective in interfaces:
710
avatar = DataService(avatarId.decode())
711
return (pb.IPerspective, avatar, lambda: None)
712
raise NotImplementedError()
713
714
# Application setup
715
def setup_server():
716
realm = DataRealm()
717
checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
718
checker.addUser(b"user1", b"pass1")
719
checker.addUser(b"user2", b"pass2")
720
721
portal_obj = portal.Portal(realm, [checker])
722
factory = pb.PBServerFactory(portal_obj)
723
724
endpoint = endpoints.TCP4ServerEndpoint(reactor, 8800)
725
endpoint.listen(factory)
726
print("PB data service running on port 8800")
727
728
# Client example
729
@defer.inlineCallbacks
730
def client_session():
731
factory = pb.PBClientFactory()
732
endpoint = endpoints.TCP4ClientEndpoint(reactor, "localhost", 8800)
733
734
yield endpoint.connect(factory)
735
736
# Login
737
creds = credentials.UsernamePassword(b"user1", b"pass1")
738
avatar = yield factory.login(creds)
739
740
# Store data
741
data = SharedData("test_data", {"count": 42, "active": True})
742
result = yield avatar.callRemote("store", "item1", data)
743
print(result)
744
745
# Retrieve data
746
retrieved = yield avatar.callRemote("retrieve", "item1")
747
print(f"Retrieved: {retrieved.name} = {retrieved.value}")
748
749
# List keys
750
keys = yield avatar.callRemote("list_keys")
751
print(f"Keys: {keys}")
752
753
setup_server()
754
client_session()
755
reactor.run()
756
```