0
# Awareness Protocol
1
2
## Overview
3
4
The Awareness protocol in pycrdt enables client presence management and metadata sharing in collaborative sessions. It allows clients to share information about user cursors, selections, online status, and other ephemeral data that doesn't belong in the persistent document but is essential for collaborative user experience.
5
6
## Core Types
7
8
### Awareness
9
10
Client awareness management for sharing presence and metadata.
11
12
```python { .api }
13
class Awareness:
14
def __init__(self, ydoc: Doc, *, outdated_timeout: int = 30000) -> None:
15
"""
16
Create an awareness instance for a document.
17
18
Args:
19
ydoc (Doc): Document to attach awareness to
20
outdated_timeout (int): Timeout in milliseconds for considering clients outdated
21
"""
22
23
@property
24
def client_id(self) -> int:
25
"""Get the local client identifier."""
26
27
@property
28
def meta(self) -> dict[int, dict[str, Any]]:
29
"""Get metadata for all clients (read-only)."""
30
31
@property
32
def states(self) -> dict[int, dict[str, Any]]:
33
"""Get states for all clients (read-only)."""
34
35
async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) -> None:
36
"""
37
Start the awareness system.
38
39
Args:
40
task_status: Optional task status for structured concurrency
41
"""
42
43
async def stop(self) -> None:
44
"""Stop the awareness system and clean up resources."""
45
46
def get_local_state(self) -> dict[str, Any] | None:
47
"""
48
Get the local client's state.
49
50
Returns:
51
dict | None: Local state dictionary or None if not set
52
"""
53
54
def set_local_state(self, state: dict[str, Any] | None) -> None:
55
"""
56
Set the local client's state.
57
58
Args:
59
state (dict | None): State dictionary to set, or None to clear
60
"""
61
62
def set_local_state_field(self, field: str, value: Any) -> None:
63
"""
64
Set a specific field in the local client's state.
65
66
Args:
67
field (str): Field name to set
68
value: Value to set for the field
69
"""
70
71
def remove_awareness_states(self, client_ids: list[int], origin: Any) -> None:
72
"""
73
Remove awareness states for specified clients.
74
75
Args:
76
client_ids (list[int]): List of client IDs to remove
77
origin: Origin identifier for the removal operation
78
"""
79
80
def encode_awareness_update(self, client_ids: list[int]) -> bytes:
81
"""
82
Encode awareness update for specified clients.
83
84
Args:
85
client_ids (list[int]): List of client IDs to include in update
86
87
Returns:
88
bytes: Encoded awareness update message
89
"""
90
91
def apply_awareness_update(self, update: bytes, origin: Any) -> None:
92
"""
93
Apply an awareness update from another client.
94
95
Args:
96
update (bytes): Encoded awareness update to apply
97
origin: Origin identifier for the update
98
"""
99
100
def observe(self, callback: Callable[[str, tuple[dict[str, Any], Any]], None]) -> str:
101
"""
102
Observe awareness changes.
103
104
Args:
105
callback: Function called when awareness changes occur.
106
Receives (event_type, (change_info, origin)) as arguments.
107
Event types: "add", "update", "remove"
108
109
Returns:
110
str: Observer ID for unsubscribing
111
"""
112
113
def unobserve(self, id: str) -> None:
114
"""
115
Remove an awareness observer.
116
117
Args:
118
id (str): Observer ID returned from observe()
119
"""
120
```
121
122
### Utility Functions
123
124
```python { .api }
125
def is_awareness_disconnect_message(message: bytes) -> bool:
126
"""
127
Check if a message is an awareness disconnect message.
128
129
Args:
130
message (bytes): Message to check
131
132
Returns:
133
bool: True if message indicates client disconnect
134
"""
135
```
136
137
## Usage Examples
138
139
### Basic Awareness Setup
140
141
```python
142
import asyncio
143
import anyio
144
from pycrdt import Doc, Awareness
145
146
async def basic_awareness_example():
147
"""Basic awareness setup and usage."""
148
149
doc = Doc(client_id=1)
150
awareness = Awareness(doc)
151
152
async with anyio.create_task_group() as tg:
153
# Start awareness system
154
tg.start_soon(awareness.start)
155
156
# Set local user information
157
awareness.set_local_state({
158
"user": {
159
"name": "Alice",
160
"color": "#ff0000",
161
"cursor": {"line": 0, "column": 0}
162
}
163
})
164
165
# Update specific fields
166
awareness.set_local_state_field("user.cursor", {"line": 5, "column": 10})
167
awareness.set_local_state_field("user.selection", {"start": 0, "end": 5})
168
169
# Get current state
170
local_state = awareness.get_local_state()
171
print(f"Local state: {local_state}")
172
173
# Check all client states
174
all_states = awareness.states
175
print(f"All client states: {all_states}")
176
177
await anyio.sleep(0.1) # Allow processing
178
179
# Stop awareness
180
await awareness.stop()
181
182
asyncio.run(basic_awareness_example())
183
```
184
185
### Multi-Client Awareness
186
187
```python
188
import asyncio
189
import anyio
190
from pycrdt import Doc, Awareness
191
192
async def multi_client_awareness():
193
"""Example with multiple clients sharing awareness."""
194
195
# Create multiple clients
196
doc1 = Doc(client_id=1)
197
doc2 = Doc(client_id=2)
198
doc3 = Doc(client_id=3)
199
200
awareness1 = Awareness(doc1)
201
awareness2 = Awareness(doc2)
202
awareness3 = Awareness(doc3)
203
204
async with anyio.create_task_group() as tg:
205
# Start all awareness systems
206
tg.start_soon(awareness1.start)
207
tg.start_soon(awareness2.start)
208
tg.start_soon(awareness3.start)
209
210
# Set up different users
211
awareness1.set_local_state({
212
"user": {"name": "Alice", "color": "#ff0000", "role": "editor"},
213
"cursor": {"line": 0, "column": 0},
214
"online": True
215
})
216
217
awareness2.set_local_state({
218
"user": {"name": "Bob", "color": "#00ff00", "role": "reviewer"},
219
"cursor": {"line": 10, "column": 5},
220
"online": True
221
})
222
223
awareness3.set_local_state({
224
"user": {"name": "Charlie", "color": "#0000ff", "role": "viewer"},
225
"cursor": {"line": 20, "column": 0},
226
"online": True
227
})
228
229
await anyio.sleep(0.1)
230
231
# Simulate awareness synchronization
232
# In real application, this would happen through network
233
234
# Client 1 sends its awareness to others
235
client1_update = awareness1.encode_awareness_update([1])
236
awareness2.apply_awareness_update(client1_update, "network")
237
awareness3.apply_awareness_update(client1_update, "network")
238
239
# Client 2 sends its awareness to others
240
client2_update = awareness2.encode_awareness_update([2])
241
awareness1.apply_awareness_update(client2_update, "network")
242
awareness3.apply_awareness_update(client2_update, "network")
243
244
# Client 3 sends its awareness to others
245
client3_update = awareness3.encode_awareness_update([3])
246
awareness1.apply_awareness_update(client3_update, "network")
247
awareness2.apply_awareness_update(client3_update, "network")
248
249
# Now all clients know about each other
250
print("Client 1 sees:")
251
for client_id, state in awareness1.states.items():
252
user = state.get("user", {})
253
cursor = state.get("cursor", {})
254
print(f" Client {client_id}: {user.get('name')} at {cursor}")
255
256
print("Client 2 sees:")
257
for client_id, state in awareness2.states.items():
258
user = state.get("user", {})
259
cursor = state.get("cursor", {})
260
print(f" Client {client_id}: {user.get('name')} at {cursor}")
261
262
# Simulate cursor movement
263
awareness1.set_local_state_field("cursor", {"line": 5, "column": 8})
264
awareness2.set_local_state_field("cursor", {"line": 12, "column": 3})
265
266
# Stop all awareness systems
267
await awareness1.stop()
268
await awareness2.stop()
269
await awareness3.stop()
270
271
asyncio.run(multi_client_awareness())
272
```
273
274
### Awareness Event Observation
275
276
```python
277
import asyncio
278
import anyio
279
from pycrdt import Doc, Awareness
280
281
async def awareness_events_example():
282
"""Example of observing awareness events."""
283
284
doc = Doc(client_id=1)
285
awareness = Awareness(doc)
286
287
def on_awareness_change(event_type: str, change_data):
288
"""Handle awareness changes."""
289
change_info, origin = change_data
290
291
if event_type == "add":
292
client_ids = change_info.get("added", [])
293
print(f"Clients added: {client_ids} (origin: {origin})")
294
295
elif event_type == "update":
296
client_ids = change_info.get("updated", [])
297
print(f"Clients updated: {client_ids} (origin: {origin})")
298
299
elif event_type == "remove":
300
client_ids = change_info.get("removed", [])
301
print(f"Clients removed: {client_ids} (origin: {origin})")
302
303
# Print current states
304
for client_id, state in awareness.states.items():
305
user = state.get("user", {})
306
print(f" Client {client_id}: {user}")
307
308
# Subscribe to awareness changes
309
observer_id = awareness.observe(on_awareness_change)
310
311
async with anyio.create_task_group() as tg:
312
tg.start_soon(awareness.start)
313
314
# Make changes to trigger events
315
awareness.set_local_state({
316
"user": {"name": "Alice", "status": "typing"},
317
"timestamp": "2024-01-01T10:00:00Z"
318
})
319
320
await anyio.sleep(0.1)
321
322
# Update state
323
awareness.set_local_state_field("user.status", "idle")
324
325
await anyio.sleep(0.1)
326
327
# Simulate another client joining
328
other_client_update = awareness.encode_awareness_update([]) # Empty for now
329
# In real app, this would be an actual client's awareness data
330
331
# Simulate client leaving
332
awareness.remove_awareness_states([2], "disconnect")
333
334
await anyio.sleep(0.1)
335
336
# Clean up
337
awareness.unobserve(observer_id)
338
await awareness.stop()
339
340
asyncio.run(awareness_events_example())
341
```
342
343
### Real-Time Cursor Tracking
344
345
```python
346
import asyncio
347
import anyio
348
from pycrdt import Doc, Awareness, Text
349
350
class CursorTracker:
351
"""Track and display user cursors in real-time."""
352
353
def __init__(self, doc: Doc, user_name: str, user_color: str):
354
self.doc = doc
355
self.awareness = Awareness(doc)
356
self.user_name = user_name
357
self.user_color = user_color
358
self.cursors = {} # client_id -> cursor_info
359
360
async def start(self):
361
"""Start cursor tracking."""
362
# Set up awareness observer
363
self.observer_id = self.awareness.observe(self._on_awareness_change)
364
365
# Start awareness system
366
await self.awareness.start()
367
368
# Set initial user state
369
self.awareness.set_local_state({
370
"user": {
371
"name": self.user_name,
372
"color": self.user_color
373
},
374
"cursor": {"position": 0, "anchor": 0}
375
})
376
377
async def stop(self):
378
"""Stop cursor tracking."""
379
self.awareness.unobserve(self.observer_id)
380
await self.awareness.stop()
381
382
def _on_awareness_change(self, event_type: str, change_data):
383
"""Handle awareness changes to update cursor display."""
384
change_info, origin = change_data
385
386
# Update cursor information
387
for client_id, state in self.awareness.states.items():
388
if client_id != self.awareness.client_id: # Skip own cursor
389
user = state.get("user", {})
390
cursor = state.get("cursor", {})
391
392
if user and cursor:
393
self.cursors[client_id] = {
394
"name": user.get("name", f"User {client_id}"),
395
"color": user.get("color", "#000000"),
396
"position": cursor.get("position", 0),
397
"anchor": cursor.get("anchor", 0)
398
}
399
400
self._display_cursors()
401
402
def _display_cursors(self):
403
"""Display current cursor positions."""
404
if self.cursors:
405
print(f"\n--- Cursors for {self.user_name} ---")
406
for client_id, cursor_info in self.cursors.items():
407
name = cursor_info["name"]
408
pos = cursor_info["position"]
409
anchor = cursor_info["anchor"]
410
411
if pos == anchor:
412
print(f"{name}: cursor at {pos}")
413
else:
414
print(f"{name}: selection {min(pos, anchor)}-{max(pos, anchor)}")
415
416
def move_cursor(self, position: int, anchor: int = None):
417
"""Move the local cursor."""
418
if anchor is None:
419
anchor = position
420
421
self.awareness.set_local_state_field("cursor", {
422
"position": position,
423
"anchor": anchor
424
})
425
426
async def cursor_tracking_example():
427
"""Example of real-time cursor tracking."""
428
429
# Create shared document
430
doc1 = Doc(client_id=1)
431
doc2 = Doc(client_id=2)
432
433
# Add shared text
434
text1 = doc1.get("content", type=Text)
435
text2 = doc2.get("content", type=Text)
436
437
with doc1.transaction():
438
text1.insert(0, "This is a shared document for cursor tracking example.")
439
440
# Sync documents
441
update = doc1.get_update()
442
doc2.apply_update(update)
443
444
# Create cursor trackers
445
tracker1 = CursorTracker(doc1, "Alice", "#ff0000")
446
tracker2 = CursorTracker(doc2, "Bob", "#00ff00")
447
448
async with anyio.create_task_group() as tg:
449
# Start trackers
450
tg.start_soon(tracker1.start)
451
tg.start_soon(tracker2.start)
452
453
await anyio.sleep(0.1)
454
455
# Simulate cursor movements
456
print("Alice moves cursor to position 10")
457
tracker1.move_cursor(10)
458
459
await anyio.sleep(0.1)
460
461
# Exchange awareness updates (normally done via network)
462
alice_update = tracker1.awareness.encode_awareness_update([1])
463
tracker2.awareness.apply_awareness_update(alice_update, "network")
464
465
await anyio.sleep(0.1)
466
467
print("Bob makes a selection from 20 to 30")
468
tracker2.move_cursor(30, 20)
469
470
bob_update = tracker2.awareness.encode_awareness_update([2])
471
tracker1.awareness.apply_awareness_update(bob_update, "network")
472
473
await anyio.sleep(0.1)
474
475
print("Alice makes a selection from 5 to 15")
476
tracker1.move_cursor(15, 5)
477
478
alice_update = tracker1.awareness.encode_awareness_update([1])
479
tracker2.awareness.apply_awareness_update(alice_update, "network")
480
481
await anyio.sleep(0.1)
482
483
# Stop trackers
484
await tracker1.stop()
485
await tracker2.stop()
486
487
asyncio.run(cursor_tracking_example())
488
```
489
490
### Online Status Management
491
492
```python
493
import asyncio
494
import anyio
495
from pycrdt import Doc, Awareness
496
497
class OnlineStatusManager:
498
"""Manage online status for collaborative sessions."""
499
500
def __init__(self, doc: Doc, user_info: dict):
501
self.doc = doc
502
self.awareness = Awareness(doc)
503
self.user_info = user_info
504
self.online_users = {}
505
506
async def start(self):
507
"""Start online status management."""
508
self.observer_id = self.awareness.observe(self._on_status_change)
509
await self.awareness.start()
510
511
# Set initial online status
512
self.awareness.set_local_state({
513
"user": self.user_info,
514
"online": True,
515
"last_seen": self._current_timestamp()
516
})
517
518
async def stop(self):
519
"""Stop and cleanup."""
520
# Set offline status before stopping
521
self.awareness.set_local_state_field("online", False)
522
523
# Send final awareness update
524
offline_update = self.awareness.encode_awareness_update([self.awareness.client_id])
525
526
self.awareness.unobserve(self.observer_id)
527
await self.awareness.stop()
528
529
return offline_update # Return for broadcasting to other clients
530
531
def _on_status_change(self, event_type: str, change_data):
532
"""Handle online status changes."""
533
change_info, origin = change_data
534
535
# Update online users list
536
self.online_users.clear()
537
538
for client_id, state in self.awareness.states.items():
539
user = state.get("user", {})
540
online = state.get("online", False)
541
last_seen = state.get("last_seen", "unknown")
542
543
if online and user:
544
self.online_users[client_id] = {
545
"name": user.get("name", f"User {client_id}"),
546
"last_seen": last_seen
547
}
548
549
self._display_online_status()
550
551
def _display_online_status(self):
552
"""Display current online users."""
553
print(f"\n--- Online Users ({len(self.online_users)}) ---")
554
for client_id, info in self.online_users.items():
555
name = info["name"]
556
last_seen = info["last_seen"]
557
status = "🟢" if client_id in self.awareness.states else "🔴"
558
print(f"{status} {name} (last seen: {last_seen})")
559
560
def update_activity(self):
561
"""Update last activity timestamp."""
562
self.awareness.set_local_state_field("last_seen", self._current_timestamp())
563
564
def _current_timestamp(self) -> str:
565
"""Get current timestamp."""
566
import datetime
567
return datetime.datetime.now().isoformat()
568
569
async def online_status_example():
570
"""Example of online status management."""
571
572
# Create clients
573
doc1 = Doc(client_id=1)
574
doc2 = Doc(client_id=2)
575
doc3 = Doc(client_id=3)
576
577
# Create status managers
578
manager1 = OnlineStatusManager(doc1, {"name": "Alice", "role": "editor"})
579
manager2 = OnlineStatusManager(doc2, {"name": "Bob", "role": "reviewer"})
580
manager3 = OnlineStatusManager(doc3, {"name": "Charlie", "role": "viewer"})
581
582
async with anyio.create_task_group() as tg:
583
# Start managers
584
tg.start_soon(manager1.start)
585
tg.start_soon(manager2.start)
586
tg.start_soon(manager3.start)
587
588
await anyio.sleep(0.1)
589
590
# Simulate awareness synchronization
591
def sync_awareness(managers):
592
"""Sync awareness between all managers."""
593
updates = []
594
for manager in managers:
595
client_ids = [manager.awareness.client_id]
596
update = manager.awareness.encode_awareness_update(client_ids)
597
updates.append((update, manager.awareness.client_id))
598
599
# Apply each update to other managers
600
for update, sender_id in updates:
601
for manager in managers:
602
if manager.awareness.client_id != sender_id:
603
manager.awareness.apply_awareness_update(update, "network")
604
605
# Initial sync
606
sync_awareness([manager1, manager2, manager3])
607
await anyio.sleep(0.1)
608
609
# Simulate activity
610
print("Alice updates activity")
611
manager1.update_activity()
612
sync_awareness([manager1, manager2, manager3])
613
await anyio.sleep(0.1)
614
615
print("Charlie goes offline")
616
offline_update = await manager3.stop()
617
618
# Broadcast Charlie's offline status
619
manager1.awareness.apply_awareness_update(offline_update, "network")
620
manager2.awareness.apply_awareness_update(offline_update, "network")
621
await anyio.sleep(0.1)
622
623
print("Bob updates activity")
624
manager2.update_activity()
625
sync_awareness([manager1, manager2])
626
await anyio.sleep(0.1)
627
628
# Stop remaining managers
629
await manager1.stop()
630
await manager2.stop()
631
632
asyncio.run(online_status_example())
633
```
634
635
### Disconnect Detection
636
637
```python
638
from pycrdt import is_awareness_disconnect_message
639
640
def handle_awareness_message(message: bytes, awareness: Awareness):
641
"""Handle incoming awareness message with disconnect detection."""
642
643
if is_awareness_disconnect_message(message):
644
print("Received disconnect message")
645
# Handle client disconnect
646
# The message itself contains information about which client disconnected
647
return
648
649
# Apply normal awareness update
650
awareness.apply_awareness_update(message, "network")
651
652
# Example usage
653
doc = Doc()
654
awareness = Awareness(doc)
655
656
# Create a disconnect message (normally received from network)
657
awareness.set_local_state({"user": {"name": "Alice"}})
658
disconnect_update = awareness.encode_awareness_update([awareness.client_id])
659
660
# Simulate disconnect by clearing state
661
awareness.set_local_state(None)
662
disconnect_message = awareness.encode_awareness_update([awareness.client_id])
663
664
# Check if it's a disconnect message
665
if is_awareness_disconnect_message(disconnect_message):
666
print("This is a disconnect message")
667
else:
668
print("This is a regular awareness update")
669
```
670
671
## Error Handling
672
673
```python
674
import asyncio
675
from pycrdt import Doc, Awareness
676
677
async def awareness_error_handling():
678
"""Example of error handling in awareness operations."""
679
680
doc = Doc()
681
awareness = Awareness(doc)
682
683
try:
684
# Start awareness
685
await awareness.start()
686
687
# Invalid state operations
688
awareness.set_local_state({"invalid": object()}) # May cause issues
689
690
# Invalid update data
691
try:
692
invalid_update = b"invalid_awareness_data"
693
awareness.apply_awareness_update(invalid_update, "test")
694
except Exception as e:
695
print(f"Update error: {e}")
696
697
# Observer errors
698
def failing_observer(event_type, change_data):
699
raise ValueError("Observer failed")
700
701
observer_id = awareness.observe(failing_observer)
702
703
# This might trigger the failing observer
704
try:
705
awareness.set_local_state({"test": "value"})
706
except Exception as e:
707
print(f"Observer error: {e}")
708
709
awareness.unobserve(observer_id)
710
711
except Exception as e:
712
print(f"Awareness error: {e}")
713
714
finally:
715
# Always clean up
716
try:
717
await awareness.stop()
718
except Exception as e:
719
print(f"Stop error: {e}")
720
721
asyncio.run(awareness_error_handling())
722
```