CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pycrdt

Python bindings for Yrs CRDT library providing collaborative data structures for real-time synchronization.

Pending
Overview
Eval results
Files

awareness.mddocs/

Awareness Protocol

Overview

The Awareness protocol in pycrdt enables client presence management and metadata sharing in collaborative sessions. It allows clients to share information about user cursors, selections, online status, and other ephemeral data that doesn't belong in the persistent document but is essential for collaborative user experience.

Core Types

Awareness

Client awareness management for sharing presence and metadata.

class Awareness:
    def __init__(self, ydoc: Doc, *, outdated_timeout: int = 30000) -> None:
        """
        Create an awareness instance for a document.
        
        Args:
            ydoc (Doc): Document to attach awareness to
            outdated_timeout (int): Timeout in milliseconds for considering clients outdated
        """

    @property
    def client_id(self) -> int:
        """Get the local client identifier."""

    @property
    def meta(self) -> dict[int, dict[str, Any]]:
        """Get metadata for all clients (read-only)."""

    @property
    def states(self) -> dict[int, dict[str, Any]]:
        """Get states for all clients (read-only)."""

    async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) -> None:
        """
        Start the awareness system.
        
        Args:
            task_status: Optional task status for structured concurrency
        """

    async def stop(self) -> None:
        """Stop the awareness system and clean up resources."""

    def get_local_state(self) -> dict[str, Any] | None:
        """
        Get the local client's state.
        
        Returns:
            dict | None: Local state dictionary or None if not set
        """

    def set_local_state(self, state: dict[str, Any] | None) -> None:
        """
        Set the local client's state.
        
        Args:
            state (dict | None): State dictionary to set, or None to clear
        """

    def set_local_state_field(self, field: str, value: Any) -> None:
        """
        Set a specific field in the local client's state.
        
        Args:
            field (str): Field name to set
            value: Value to set for the field
        """

    def remove_awareness_states(self, client_ids: list[int], origin: Any) -> None:
        """
        Remove awareness states for specified clients.
        
        Args:
            client_ids (list[int]): List of client IDs to remove
            origin: Origin identifier for the removal operation
        """

    def encode_awareness_update(self, client_ids: list[int]) -> bytes:
        """
        Encode awareness update for specified clients.
        
        Args:
            client_ids (list[int]): List of client IDs to include in update
            
        Returns:
            bytes: Encoded awareness update message
        """

    def apply_awareness_update(self, update: bytes, origin: Any) -> None:
        """
        Apply an awareness update from another client.
        
        Args:
            update (bytes): Encoded awareness update to apply
            origin: Origin identifier for the update
        """

    def observe(self, callback: Callable[[str, tuple[dict[str, Any], Any]], None]) -> str:
        """
        Observe awareness changes.
        
        Args:
            callback: Function called when awareness changes occur.
                     Receives (event_type, (change_info, origin)) as arguments.
                     Event types: "add", "update", "remove"
            
        Returns:
            str: Observer ID for unsubscribing
        """

    def unobserve(self, id: str) -> None:
        """
        Remove an awareness observer.
        
        Args:
            id (str): Observer ID returned from observe()
        """

Utility Functions

def is_awareness_disconnect_message(message: bytes) -> bool:
    """
    Check if a message is an awareness disconnect message.
    
    Args:
        message (bytes): Message to check
        
    Returns:
        bool: True if message indicates client disconnect
    """

Usage Examples

Basic Awareness Setup

import asyncio
import anyio
from pycrdt import Doc, Awareness

async def basic_awareness_example():
    """Basic awareness setup and usage."""
    
    doc = Doc(client_id=1)
    awareness = Awareness(doc)
    
    async with anyio.create_task_group() as tg:
        # Start awareness system
        tg.start_soon(awareness.start)
        
        # Set local user information
        awareness.set_local_state({
            "user": {
                "name": "Alice",
                "color": "#ff0000",
                "cursor": {"line": 0, "column": 0}
            }
        })
        
        # Update specific fields
        awareness.set_local_state_field("user.cursor", {"line": 5, "column": 10})
        awareness.set_local_state_field("user.selection", {"start": 0, "end": 5})
        
        # Get current state
        local_state = awareness.get_local_state()
        print(f"Local state: {local_state}")
        
        # Check all client states
        all_states = awareness.states
        print(f"All client states: {all_states}")
        
        await anyio.sleep(0.1)  # Allow processing
        
        # Stop awareness
        await awareness.stop()

asyncio.run(basic_awareness_example())

Multi-Client Awareness

import asyncio
import anyio
from pycrdt import Doc, Awareness

async def multi_client_awareness():
    """Example with multiple clients sharing awareness."""
    
    # Create multiple clients
    doc1 = Doc(client_id=1)
    doc2 = Doc(client_id=2)
    doc3 = Doc(client_id=3)
    
    awareness1 = Awareness(doc1)
    awareness2 = Awareness(doc2)
    awareness3 = Awareness(doc3)
    
    async with anyio.create_task_group() as tg:
        # Start all awareness systems
        tg.start_soon(awareness1.start)
        tg.start_soon(awareness2.start)
        tg.start_soon(awareness3.start)
        
        # Set up different users
        awareness1.set_local_state({
            "user": {"name": "Alice", "color": "#ff0000", "role": "editor"},
            "cursor": {"line": 0, "column": 0},
            "online": True
        })
        
        awareness2.set_local_state({
            "user": {"name": "Bob", "color": "#00ff00", "role": "reviewer"}, 
            "cursor": {"line": 10, "column": 5},
            "online": True
        })
        
        awareness3.set_local_state({
            "user": {"name": "Charlie", "color": "#0000ff", "role": "viewer"},
            "cursor": {"line": 20, "column": 0},
            "online": True
        })
        
        await anyio.sleep(0.1)
        
        # Simulate awareness synchronization
        # In real application, this would happen through network
        
        # Client 1 sends its awareness to others
        client1_update = awareness1.encode_awareness_update([1])
        awareness2.apply_awareness_update(client1_update, "network")
        awareness3.apply_awareness_update(client1_update, "network")
        
        # Client 2 sends its awareness to others
        client2_update = awareness2.encode_awareness_update([2])
        awareness1.apply_awareness_update(client2_update, "network")
        awareness3.apply_awareness_update(client2_update, "network")
        
        # Client 3 sends its awareness to others
        client3_update = awareness3.encode_awareness_update([3])
        awareness1.apply_awareness_update(client3_update, "network")
        awareness2.apply_awareness_update(client3_update, "network")
        
        # Now all clients know about each other
        print("Client 1 sees:")
        for client_id, state in awareness1.states.items():
            user = state.get("user", {})
            cursor = state.get("cursor", {})
            print(f"  Client {client_id}: {user.get('name')} at {cursor}")
        
        print("Client 2 sees:")
        for client_id, state in awareness2.states.items():
            user = state.get("user", {})
            cursor = state.get("cursor", {})
            print(f"  Client {client_id}: {user.get('name')} at {cursor}")
        
        # Simulate cursor movement
        awareness1.set_local_state_field("cursor", {"line": 5, "column": 8})
        awareness2.set_local_state_field("cursor", {"line": 12, "column": 3})
        
        # Stop all awareness systems
        await awareness1.stop()
        await awareness2.stop()
        await awareness3.stop()

asyncio.run(multi_client_awareness())

Awareness Event Observation

import asyncio
import anyio
from pycrdt import Doc, Awareness

async def awareness_events_example():
    """Example of observing awareness events."""
    
    doc = Doc(client_id=1)
    awareness = Awareness(doc)
    
    def on_awareness_change(event_type: str, change_data):
        """Handle awareness changes."""
        change_info, origin = change_data
        
        if event_type == "add":
            client_ids = change_info.get("added", [])
            print(f"Clients added: {client_ids} (origin: {origin})")
            
        elif event_type == "update":
            client_ids = change_info.get("updated", [])
            print(f"Clients updated: {client_ids} (origin: {origin})")
            
        elif event_type == "remove":
            client_ids = change_info.get("removed", [])
            print(f"Clients removed: {client_ids} (origin: {origin})")
        
        # Print current states
        for client_id, state in awareness.states.items():
            user = state.get("user", {})
            print(f"  Client {client_id}: {user}")
    
    # Subscribe to awareness changes
    observer_id = awareness.observe(on_awareness_change)
    
    async with anyio.create_task_group() as tg:
        tg.start_soon(awareness.start)
        
        # Make changes to trigger events
        awareness.set_local_state({
            "user": {"name": "Alice", "status": "typing"},
            "timestamp": "2024-01-01T10:00:00Z"
        })
        
        await anyio.sleep(0.1)
        
        # Update state
        awareness.set_local_state_field("user.status", "idle")
        
        await anyio.sleep(0.1)
        
        # Simulate another client joining
        other_client_update = awareness.encode_awareness_update([])  # Empty for now
        # In real app, this would be an actual client's awareness data
        
        # Simulate client leaving
        awareness.remove_awareness_states([2], "disconnect")
        
        await anyio.sleep(0.1)
        
        # Clean up
        awareness.unobserve(observer_id)
        await awareness.stop()

asyncio.run(awareness_events_example())

Real-Time Cursor Tracking

import asyncio
import anyio
from pycrdt import Doc, Awareness, Text

class CursorTracker:
    """Track and display user cursors in real-time."""
    
    def __init__(self, doc: Doc, user_name: str, user_color: str):
        self.doc = doc
        self.awareness = Awareness(doc)
        self.user_name = user_name
        self.user_color = user_color
        self.cursors = {}  # client_id -> cursor_info
        
    async def start(self):
        """Start cursor tracking."""
        # Set up awareness observer
        self.observer_id = self.awareness.observe(self._on_awareness_change)
        
        # Start awareness system
        await self.awareness.start()
        
        # Set initial user state
        self.awareness.set_local_state({
            "user": {
                "name": self.user_name,
                "color": self.user_color
            },
            "cursor": {"position": 0, "anchor": 0}
        })
    
    async def stop(self):
        """Stop cursor tracking."""
        self.awareness.unobserve(self.observer_id)
        await self.awareness.stop()
    
    def _on_awareness_change(self, event_type: str, change_data):
        """Handle awareness changes to update cursor display."""
        change_info, origin = change_data
        
        # Update cursor information
        for client_id, state in self.awareness.states.items():
            if client_id != self.awareness.client_id:  # Skip own cursor
                user = state.get("user", {})
                cursor = state.get("cursor", {})
                
                if user and cursor:
                    self.cursors[client_id] = {
                        "name": user.get("name", f"User {client_id}"),
                        "color": user.get("color", "#000000"),
                        "position": cursor.get("position", 0),
                        "anchor": cursor.get("anchor", 0)
                    }
        
        self._display_cursors()
    
    def _display_cursors(self):
        """Display current cursor positions."""
        if self.cursors:
            print(f"\n--- Cursors for {self.user_name} ---")
            for client_id, cursor_info in self.cursors.items():
                name = cursor_info["name"]
                pos = cursor_info["position"]
                anchor = cursor_info["anchor"]
                
                if pos == anchor:
                    print(f"{name}: cursor at {pos}")
                else:
                    print(f"{name}: selection {min(pos, anchor)}-{max(pos, anchor)}")
    
    def move_cursor(self, position: int, anchor: int = None):
        """Move the local cursor."""
        if anchor is None:
            anchor = position
            
        self.awareness.set_local_state_field("cursor", {
            "position": position,
            "anchor": anchor
        })

async def cursor_tracking_example():
    """Example of real-time cursor tracking."""
    
    # Create shared document
    doc1 = Doc(client_id=1)
    doc2 = Doc(client_id=2)
    
    # Add shared text
    text1 = doc1.get("content", type=Text)
    text2 = doc2.get("content", type=Text)
    
    with doc1.transaction():
        text1.insert(0, "This is a shared document for cursor tracking example.")
    
    # Sync documents
    update = doc1.get_update()
    doc2.apply_update(update)
    
    # Create cursor trackers
    tracker1 = CursorTracker(doc1, "Alice", "#ff0000")
    tracker2 = CursorTracker(doc2, "Bob", "#00ff00")
    
    async with anyio.create_task_group() as tg:
        # Start trackers
        tg.start_soon(tracker1.start)
        tg.start_soon(tracker2.start)
        
        await anyio.sleep(0.1)
        
        # Simulate cursor movements
        print("Alice moves cursor to position 10")
        tracker1.move_cursor(10)
        
        await anyio.sleep(0.1)
        
        # Exchange awareness updates (normally done via network)
        alice_update = tracker1.awareness.encode_awareness_update([1])
        tracker2.awareness.apply_awareness_update(alice_update, "network")
        
        await anyio.sleep(0.1)
        
        print("Bob makes a selection from 20 to 30")
        tracker2.move_cursor(30, 20)
        
        bob_update = tracker2.awareness.encode_awareness_update([2])
        tracker1.awareness.apply_awareness_update(bob_update, "network")
        
        await anyio.sleep(0.1)
        
        print("Alice makes a selection from 5 to 15")
        tracker1.move_cursor(15, 5)
        
        alice_update = tracker1.awareness.encode_awareness_update([1])
        tracker2.awareness.apply_awareness_update(alice_update, "network")
        
        await anyio.sleep(0.1)
        
        # Stop trackers
        await tracker1.stop()
        await tracker2.stop()

asyncio.run(cursor_tracking_example())

Online Status Management

import asyncio
import anyio
from pycrdt import Doc, Awareness

class OnlineStatusManager:
    """Manage online status for collaborative sessions."""
    
    def __init__(self, doc: Doc, user_info: dict):
        self.doc = doc
        self.awareness = Awareness(doc)
        self.user_info = user_info
        self.online_users = {}
        
    async def start(self):
        """Start online status management."""
        self.observer_id = self.awareness.observe(self._on_status_change)
        await self.awareness.start()
        
        # Set initial online status
        self.awareness.set_local_state({
            "user": self.user_info,
            "online": True,
            "last_seen": self._current_timestamp()
        })
    
    async def stop(self):
        """Stop and cleanup."""
        # Set offline status before stopping
        self.awareness.set_local_state_field("online", False)
        
        # Send final awareness update
        offline_update = self.awareness.encode_awareness_update([self.awareness.client_id])
        
        self.awareness.unobserve(self.observer_id) 
        await self.awareness.stop()
        
        return offline_update  # Return for broadcasting to other clients
    
    def _on_status_change(self, event_type: str, change_data):
        """Handle online status changes."""
        change_info, origin = change_data
        
        # Update online users list
        self.online_users.clear()
        
        for client_id, state in self.awareness.states.items():
            user = state.get("user", {})
            online = state.get("online", False)
            last_seen = state.get("last_seen", "unknown")
            
            if online and user:
                self.online_users[client_id] = {
                    "name": user.get("name", f"User {client_id}"),
                    "last_seen": last_seen
                }
        
        self._display_online_status()
    
    def _display_online_status(self):
        """Display current online users."""
        print(f"\n--- Online Users ({len(self.online_users)}) ---")
        for client_id, info in self.online_users.items():
            name = info["name"]
            last_seen = info["last_seen"]
            status = "🟢" if client_id in self.awareness.states else "🔴"
            print(f"{status} {name} (last seen: {last_seen})")
    
    def update_activity(self):
        """Update last activity timestamp."""
        self.awareness.set_local_state_field("last_seen", self._current_timestamp())
    
    def _current_timestamp(self) -> str:
        """Get current timestamp."""
        import datetime
        return datetime.datetime.now().isoformat()

async def online_status_example():
    """Example of online status management."""
    
    # Create clients
    doc1 = Doc(client_id=1)
    doc2 = Doc(client_id=2)
    doc3 = Doc(client_id=3)
    
    # Create status managers
    manager1 = OnlineStatusManager(doc1, {"name": "Alice", "role": "editor"})
    manager2 = OnlineStatusManager(doc2, {"name": "Bob", "role": "reviewer"})
    manager3 = OnlineStatusManager(doc3, {"name": "Charlie", "role": "viewer"})
    
    async with anyio.create_task_group() as tg:
        # Start managers
        tg.start_soon(manager1.start)
        tg.start_soon(manager2.start)
        tg.start_soon(manager3.start)
        
        await anyio.sleep(0.1)
        
        # Simulate awareness synchronization
        def sync_awareness(managers):
            """Sync awareness between all managers."""
            updates = []
            for manager in managers:
                client_ids = [manager.awareness.client_id]
                update = manager.awareness.encode_awareness_update(client_ids)
                updates.append((update, manager.awareness.client_id))
            
            # Apply each update to other managers
            for update, sender_id in updates:
                for manager in managers:
                    if manager.awareness.client_id != sender_id:
                        manager.awareness.apply_awareness_update(update, "network")
        
        # Initial sync
        sync_awareness([manager1, manager2, manager3])
        await anyio.sleep(0.1)
        
        # Simulate activity
        print("Alice updates activity")
        manager1.update_activity()
        sync_awareness([manager1, manager2, manager3])
        await anyio.sleep(0.1)
        
        print("Charlie goes offline")
        offline_update = await manager3.stop()
        
        # Broadcast Charlie's offline status
        manager1.awareness.apply_awareness_update(offline_update, "network") 
        manager2.awareness.apply_awareness_update(offline_update, "network")
        await anyio.sleep(0.1)
        
        print("Bob updates activity")
        manager2.update_activity()
        sync_awareness([manager1, manager2])
        await anyio.sleep(0.1)
        
        # Stop remaining managers
        await manager1.stop()
        await manager2.stop()

asyncio.run(online_status_example())

Disconnect Detection

from pycrdt import is_awareness_disconnect_message

def handle_awareness_message(message: bytes, awareness: Awareness):
    """Handle incoming awareness message with disconnect detection."""
    
    if is_awareness_disconnect_message(message):
        print("Received disconnect message")
        # Handle client disconnect
        # The message itself contains information about which client disconnected
        return
    
    # Apply normal awareness update
    awareness.apply_awareness_update(message, "network")

# Example usage
doc = Doc()
awareness = Awareness(doc)

# Create a disconnect message (normally received from network)
awareness.set_local_state({"user": {"name": "Alice"}})
disconnect_update = awareness.encode_awareness_update([awareness.client_id])

# Simulate disconnect by clearing state
awareness.set_local_state(None)
disconnect_message = awareness.encode_awareness_update([awareness.client_id])

# Check if it's a disconnect message
if is_awareness_disconnect_message(disconnect_message):
    print("This is a disconnect message")
else:
    print("This is a regular awareness update")

Error Handling

import asyncio
from pycrdt import Doc, Awareness

async def awareness_error_handling():
    """Example of error handling in awareness operations."""
    
    doc = Doc()
    awareness = Awareness(doc)
    
    try:
        # Start awareness
        await awareness.start()
        
        # Invalid state operations
        awareness.set_local_state({"invalid": object()})  # May cause issues
        
        # Invalid update data
        try:
            invalid_update = b"invalid_awareness_data"
            awareness.apply_awareness_update(invalid_update, "test")
        except Exception as e:
            print(f"Update error: {e}")
        
        # Observer errors
        def failing_observer(event_type, change_data):
            raise ValueError("Observer failed")
        
        observer_id = awareness.observe(failing_observer)
        
        # This might trigger the failing observer
        try:
            awareness.set_local_state({"test": "value"})
        except Exception as e:
            print(f"Observer error: {e}")
        
        awareness.unobserve(observer_id)
        
    except Exception as e:
        print(f"Awareness error: {e}")
    
    finally:
        # Always clean up
        try:
            await awareness.stop()
        except Exception as e:
            print(f"Stop error: {e}")

asyncio.run(awareness_error_handling())

Install with Tessl CLI

npx tessl i tessl/pypi-pycrdt

docs

array-operations.md

awareness.md

document-management.md

index.md

map-operations.md

position-undo.md

synchronization.md

text-operations.md

xml-support.md

tile.json