Python bindings for Yrs CRDT library providing collaborative data structures for real-time synchronization.
—
The Awareness protocol in pycrdt enables client presence management and metadata sharing in collaborative sessions. It allows clients to share information about user cursors, selections, online status, and other ephemeral data that doesn't belong in the persistent document but is essential for collaborative user experience.
Client awareness management for sharing presence and metadata.
class Awareness:
def __init__(self, ydoc: Doc, *, outdated_timeout: int = 30000) -> None:
"""
Create an awareness instance for a document.
Args:
ydoc (Doc): Document to attach awareness to
outdated_timeout (int): Timeout in milliseconds for considering clients outdated
"""
@property
def client_id(self) -> int:
"""Get the local client identifier."""
@property
def meta(self) -> dict[int, dict[str, Any]]:
"""Get metadata for all clients (read-only)."""
@property
def states(self) -> dict[int, dict[str, Any]]:
"""Get states for all clients (read-only)."""
async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) -> None:
"""
Start the awareness system.
Args:
task_status: Optional task status for structured concurrency
"""
async def stop(self) -> None:
"""Stop the awareness system and clean up resources."""
def get_local_state(self) -> dict[str, Any] | None:
"""
Get the local client's state.
Returns:
dict | None: Local state dictionary or None if not set
"""
def set_local_state(self, state: dict[str, Any] | None) -> None:
"""
Set the local client's state.
Args:
state (dict | None): State dictionary to set, or None to clear
"""
def set_local_state_field(self, field: str, value: Any) -> None:
"""
Set a specific field in the local client's state.
Args:
field (str): Field name to set
value: Value to set for the field
"""
def remove_awareness_states(self, client_ids: list[int], origin: Any) -> None:
"""
Remove awareness states for specified clients.
Args:
client_ids (list[int]): List of client IDs to remove
origin: Origin identifier for the removal operation
"""
def encode_awareness_update(self, client_ids: list[int]) -> bytes:
"""
Encode awareness update for specified clients.
Args:
client_ids (list[int]): List of client IDs to include in update
Returns:
bytes: Encoded awareness update message
"""
def apply_awareness_update(self, update: bytes, origin: Any) -> None:
"""
Apply an awareness update from another client.
Args:
update (bytes): Encoded awareness update to apply
origin: Origin identifier for the update
"""
def observe(self, callback: Callable[[str, tuple[dict[str, Any], Any]], None]) -> str:
"""
Observe awareness changes.
Args:
callback: Function called when awareness changes occur.
Receives (event_type, (change_info, origin)) as arguments.
Event types: "add", "update", "remove"
Returns:
str: Observer ID for unsubscribing
"""
def unobserve(self, id: str) -> None:
"""
Remove an awareness observer.
Args:
id (str): Observer ID returned from observe()
"""def is_awareness_disconnect_message(message: bytes) -> bool:
"""
Check if a message is an awareness disconnect message.
Args:
message (bytes): Message to check
Returns:
bool: True if message indicates client disconnect
"""import asyncio
import anyio
from pycrdt import Doc, Awareness
async def basic_awareness_example():
"""Basic awareness setup and usage."""
doc = Doc(client_id=1)
awareness = Awareness(doc)
async with anyio.create_task_group() as tg:
# Start awareness system
tg.start_soon(awareness.start)
# Set local user information
awareness.set_local_state({
"user": {
"name": "Alice",
"color": "#ff0000",
"cursor": {"line": 0, "column": 0}
}
})
# Update specific fields
awareness.set_local_state_field("user.cursor", {"line": 5, "column": 10})
awareness.set_local_state_field("user.selection", {"start": 0, "end": 5})
# Get current state
local_state = awareness.get_local_state()
print(f"Local state: {local_state}")
# Check all client states
all_states = awareness.states
print(f"All client states: {all_states}")
await anyio.sleep(0.1) # Allow processing
# Stop awareness
await awareness.stop()
asyncio.run(basic_awareness_example())import asyncio
import anyio
from pycrdt import Doc, Awareness
async def multi_client_awareness():
"""Example with multiple clients sharing awareness."""
# Create multiple clients
doc1 = Doc(client_id=1)
doc2 = Doc(client_id=2)
doc3 = Doc(client_id=3)
awareness1 = Awareness(doc1)
awareness2 = Awareness(doc2)
awareness3 = Awareness(doc3)
async with anyio.create_task_group() as tg:
# Start all awareness systems
tg.start_soon(awareness1.start)
tg.start_soon(awareness2.start)
tg.start_soon(awareness3.start)
# Set up different users
awareness1.set_local_state({
"user": {"name": "Alice", "color": "#ff0000", "role": "editor"},
"cursor": {"line": 0, "column": 0},
"online": True
})
awareness2.set_local_state({
"user": {"name": "Bob", "color": "#00ff00", "role": "reviewer"},
"cursor": {"line": 10, "column": 5},
"online": True
})
awareness3.set_local_state({
"user": {"name": "Charlie", "color": "#0000ff", "role": "viewer"},
"cursor": {"line": 20, "column": 0},
"online": True
})
await anyio.sleep(0.1)
# Simulate awareness synchronization
# In real application, this would happen through network
# Client 1 sends its awareness to others
client1_update = awareness1.encode_awareness_update([1])
awareness2.apply_awareness_update(client1_update, "network")
awareness3.apply_awareness_update(client1_update, "network")
# Client 2 sends its awareness to others
client2_update = awareness2.encode_awareness_update([2])
awareness1.apply_awareness_update(client2_update, "network")
awareness3.apply_awareness_update(client2_update, "network")
# Client 3 sends its awareness to others
client3_update = awareness3.encode_awareness_update([3])
awareness1.apply_awareness_update(client3_update, "network")
awareness2.apply_awareness_update(client3_update, "network")
# Now all clients know about each other
print("Client 1 sees:")
for client_id, state in awareness1.states.items():
user = state.get("user", {})
cursor = state.get("cursor", {})
print(f" Client {client_id}: {user.get('name')} at {cursor}")
print("Client 2 sees:")
for client_id, state in awareness2.states.items():
user = state.get("user", {})
cursor = state.get("cursor", {})
print(f" Client {client_id}: {user.get('name')} at {cursor}")
# Simulate cursor movement
awareness1.set_local_state_field("cursor", {"line": 5, "column": 8})
awareness2.set_local_state_field("cursor", {"line": 12, "column": 3})
# Stop all awareness systems
await awareness1.stop()
await awareness2.stop()
await awareness3.stop()
asyncio.run(multi_client_awareness())import asyncio
import anyio
from pycrdt import Doc, Awareness
async def awareness_events_example():
"""Example of observing awareness events."""
doc = Doc(client_id=1)
awareness = Awareness(doc)
def on_awareness_change(event_type: str, change_data):
"""Handle awareness changes."""
change_info, origin = change_data
if event_type == "add":
client_ids = change_info.get("added", [])
print(f"Clients added: {client_ids} (origin: {origin})")
elif event_type == "update":
client_ids = change_info.get("updated", [])
print(f"Clients updated: {client_ids} (origin: {origin})")
elif event_type == "remove":
client_ids = change_info.get("removed", [])
print(f"Clients removed: {client_ids} (origin: {origin})")
# Print current states
for client_id, state in awareness.states.items():
user = state.get("user", {})
print(f" Client {client_id}: {user}")
# Subscribe to awareness changes
observer_id = awareness.observe(on_awareness_change)
async with anyio.create_task_group() as tg:
tg.start_soon(awareness.start)
# Make changes to trigger events
awareness.set_local_state({
"user": {"name": "Alice", "status": "typing"},
"timestamp": "2024-01-01T10:00:00Z"
})
await anyio.sleep(0.1)
# Update state
awareness.set_local_state_field("user.status", "idle")
await anyio.sleep(0.1)
# Simulate another client joining
other_client_update = awareness.encode_awareness_update([]) # Empty for now
# In real app, this would be an actual client's awareness data
# Simulate client leaving
awareness.remove_awareness_states([2], "disconnect")
await anyio.sleep(0.1)
# Clean up
awareness.unobserve(observer_id)
await awareness.stop()
asyncio.run(awareness_events_example())import asyncio
import anyio
from pycrdt import Doc, Awareness, Text
class CursorTracker:
"""Track and display user cursors in real-time."""
def __init__(self, doc: Doc, user_name: str, user_color: str):
self.doc = doc
self.awareness = Awareness(doc)
self.user_name = user_name
self.user_color = user_color
self.cursors = {} # client_id -> cursor_info
async def start(self):
"""Start cursor tracking."""
# Set up awareness observer
self.observer_id = self.awareness.observe(self._on_awareness_change)
# Start awareness system
await self.awareness.start()
# Set initial user state
self.awareness.set_local_state({
"user": {
"name": self.user_name,
"color": self.user_color
},
"cursor": {"position": 0, "anchor": 0}
})
async def stop(self):
"""Stop cursor tracking."""
self.awareness.unobserve(self.observer_id)
await self.awareness.stop()
def _on_awareness_change(self, event_type: str, change_data):
"""Handle awareness changes to update cursor display."""
change_info, origin = change_data
# Update cursor information
for client_id, state in self.awareness.states.items():
if client_id != self.awareness.client_id: # Skip own cursor
user = state.get("user", {})
cursor = state.get("cursor", {})
if user and cursor:
self.cursors[client_id] = {
"name": user.get("name", f"User {client_id}"),
"color": user.get("color", "#000000"),
"position": cursor.get("position", 0),
"anchor": cursor.get("anchor", 0)
}
self._display_cursors()
def _display_cursors(self):
"""Display current cursor positions."""
if self.cursors:
print(f"\n--- Cursors for {self.user_name} ---")
for client_id, cursor_info in self.cursors.items():
name = cursor_info["name"]
pos = cursor_info["position"]
anchor = cursor_info["anchor"]
if pos == anchor:
print(f"{name}: cursor at {pos}")
else:
print(f"{name}: selection {min(pos, anchor)}-{max(pos, anchor)}")
def move_cursor(self, position: int, anchor: int = None):
"""Move the local cursor."""
if anchor is None:
anchor = position
self.awareness.set_local_state_field("cursor", {
"position": position,
"anchor": anchor
})
async def cursor_tracking_example():
"""Example of real-time cursor tracking."""
# Create shared document
doc1 = Doc(client_id=1)
doc2 = Doc(client_id=2)
# Add shared text
text1 = doc1.get("content", type=Text)
text2 = doc2.get("content", type=Text)
with doc1.transaction():
text1.insert(0, "This is a shared document for cursor tracking example.")
# Sync documents
update = doc1.get_update()
doc2.apply_update(update)
# Create cursor trackers
tracker1 = CursorTracker(doc1, "Alice", "#ff0000")
tracker2 = CursorTracker(doc2, "Bob", "#00ff00")
async with anyio.create_task_group() as tg:
# Start trackers
tg.start_soon(tracker1.start)
tg.start_soon(tracker2.start)
await anyio.sleep(0.1)
# Simulate cursor movements
print("Alice moves cursor to position 10")
tracker1.move_cursor(10)
await anyio.sleep(0.1)
# Exchange awareness updates (normally done via network)
alice_update = tracker1.awareness.encode_awareness_update([1])
tracker2.awareness.apply_awareness_update(alice_update, "network")
await anyio.sleep(0.1)
print("Bob makes a selection from 20 to 30")
tracker2.move_cursor(30, 20)
bob_update = tracker2.awareness.encode_awareness_update([2])
tracker1.awareness.apply_awareness_update(bob_update, "network")
await anyio.sleep(0.1)
print("Alice makes a selection from 5 to 15")
tracker1.move_cursor(15, 5)
alice_update = tracker1.awareness.encode_awareness_update([1])
tracker2.awareness.apply_awareness_update(alice_update, "network")
await anyio.sleep(0.1)
# Stop trackers
await tracker1.stop()
await tracker2.stop()
asyncio.run(cursor_tracking_example())import asyncio
import anyio
from pycrdt import Doc, Awareness
class OnlineStatusManager:
"""Manage online status for collaborative sessions."""
def __init__(self, doc: Doc, user_info: dict):
self.doc = doc
self.awareness = Awareness(doc)
self.user_info = user_info
self.online_users = {}
async def start(self):
"""Start online status management."""
self.observer_id = self.awareness.observe(self._on_status_change)
await self.awareness.start()
# Set initial online status
self.awareness.set_local_state({
"user": self.user_info,
"online": True,
"last_seen": self._current_timestamp()
})
async def stop(self):
"""Stop and cleanup."""
# Set offline status before stopping
self.awareness.set_local_state_field("online", False)
# Send final awareness update
offline_update = self.awareness.encode_awareness_update([self.awareness.client_id])
self.awareness.unobserve(self.observer_id)
await self.awareness.stop()
return offline_update # Return for broadcasting to other clients
def _on_status_change(self, event_type: str, change_data):
"""Handle online status changes."""
change_info, origin = change_data
# Update online users list
self.online_users.clear()
for client_id, state in self.awareness.states.items():
user = state.get("user", {})
online = state.get("online", False)
last_seen = state.get("last_seen", "unknown")
if online and user:
self.online_users[client_id] = {
"name": user.get("name", f"User {client_id}"),
"last_seen": last_seen
}
self._display_online_status()
def _display_online_status(self):
"""Display current online users."""
print(f"\n--- Online Users ({len(self.online_users)}) ---")
for client_id, info in self.online_users.items():
name = info["name"]
last_seen = info["last_seen"]
status = "🟢" if client_id in self.awareness.states else "🔴"
print(f"{status} {name} (last seen: {last_seen})")
def update_activity(self):
"""Update last activity timestamp."""
self.awareness.set_local_state_field("last_seen", self._current_timestamp())
def _current_timestamp(self) -> str:
"""Get current timestamp."""
import datetime
return datetime.datetime.now().isoformat()
async def online_status_example():
"""Example of online status management."""
# Create clients
doc1 = Doc(client_id=1)
doc2 = Doc(client_id=2)
doc3 = Doc(client_id=3)
# Create status managers
manager1 = OnlineStatusManager(doc1, {"name": "Alice", "role": "editor"})
manager2 = OnlineStatusManager(doc2, {"name": "Bob", "role": "reviewer"})
manager3 = OnlineStatusManager(doc3, {"name": "Charlie", "role": "viewer"})
async with anyio.create_task_group() as tg:
# Start managers
tg.start_soon(manager1.start)
tg.start_soon(manager2.start)
tg.start_soon(manager3.start)
await anyio.sleep(0.1)
# Simulate awareness synchronization
def sync_awareness(managers):
"""Sync awareness between all managers."""
updates = []
for manager in managers:
client_ids = [manager.awareness.client_id]
update = manager.awareness.encode_awareness_update(client_ids)
updates.append((update, manager.awareness.client_id))
# Apply each update to other managers
for update, sender_id in updates:
for manager in managers:
if manager.awareness.client_id != sender_id:
manager.awareness.apply_awareness_update(update, "network")
# Initial sync
sync_awareness([manager1, manager2, manager3])
await anyio.sleep(0.1)
# Simulate activity
print("Alice updates activity")
manager1.update_activity()
sync_awareness([manager1, manager2, manager3])
await anyio.sleep(0.1)
print("Charlie goes offline")
offline_update = await manager3.stop()
# Broadcast Charlie's offline status
manager1.awareness.apply_awareness_update(offline_update, "network")
manager2.awareness.apply_awareness_update(offline_update, "network")
await anyio.sleep(0.1)
print("Bob updates activity")
manager2.update_activity()
sync_awareness([manager1, manager2])
await anyio.sleep(0.1)
# Stop remaining managers
await manager1.stop()
await manager2.stop()
asyncio.run(online_status_example())from pycrdt import is_awareness_disconnect_message
def handle_awareness_message(message: bytes, awareness: Awareness):
"""Handle incoming awareness message with disconnect detection."""
if is_awareness_disconnect_message(message):
print("Received disconnect message")
# Handle client disconnect
# The message itself contains information about which client disconnected
return
# Apply normal awareness update
awareness.apply_awareness_update(message, "network")
# Example usage
doc = Doc()
awareness = Awareness(doc)
# Create a disconnect message (normally received from network)
awareness.set_local_state({"user": {"name": "Alice"}})
disconnect_update = awareness.encode_awareness_update([awareness.client_id])
# Simulate disconnect by clearing state
awareness.set_local_state(None)
disconnect_message = awareness.encode_awareness_update([awareness.client_id])
# Check if it's a disconnect message
if is_awareness_disconnect_message(disconnect_message):
print("This is a disconnect message")
else:
print("This is a regular awareness update")import asyncio
from pycrdt import Doc, Awareness
async def awareness_error_handling():
"""Example of error handling in awareness operations."""
doc = Doc()
awareness = Awareness(doc)
try:
# Start awareness
await awareness.start()
# Invalid state operations
awareness.set_local_state({"invalid": object()}) # May cause issues
# Invalid update data
try:
invalid_update = b"invalid_awareness_data"
awareness.apply_awareness_update(invalid_update, "test")
except Exception as e:
print(f"Update error: {e}")
# Observer errors
def failing_observer(event_type, change_data):
raise ValueError("Observer failed")
observer_id = awareness.observe(failing_observer)
# This might trigger the failing observer
try:
awareness.set_local_state({"test": "value"})
except Exception as e:
print(f"Observer error: {e}")
awareness.unobserve(observer_id)
except Exception as e:
print(f"Awareness error: {e}")
finally:
# Always clean up
try:
await awareness.stop()
except Exception as e:
print(f"Stop error: {e}")
asyncio.run(awareness_error_handling())Install with Tessl CLI
npx tessl i tessl/pypi-pycrdt