CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pycrdt

Python bindings for Yrs CRDT library providing collaborative data structures for real-time synchronization.

Pending
Overview
Eval results
Files

synchronization.mddocs/

Synchronization

Overview

pycrdt provides comprehensive synchronization capabilities for sharing documents across multiple clients in real-time. The synchronization system includes message encoding/decoding, sync protocols, network providers, and transport abstractions. This enables building collaborative applications with automatic conflict resolution and efficient network communication.

Core Types

Message Type Enums

class YMessageType(IntEnum):
    """Top-level message types for network protocol."""
    SYNC = 0
    AWARENESS = 1

class YSyncMessageType(IntEnum):
    """Sync-specific message types."""
    SYNC_STEP1 = 0  # Send document state
    SYNC_STEP2 = 1  # Reply with missing updates  
    SYNC_UPDATE = 2 # Document update broadcast

Encoder

Binary message encoder for network protocols.

class Encoder:
    def __init__(self) -> None:
        """Create a new message encoder."""

    def write_var_uint(self, num: int) -> None:
        """
        Write a variable-length unsigned integer.
        
        Args:
            num (int): Integer to encode
        """

    def write_var_string(self, text: str) -> None:
        """
        Write a variable-length string.
        
        Args:
            text (str): String to encode
        """

    def to_bytes(self) -> bytes:
        """
        Get the encoded data as bytes.
        
        Returns:
            bytes: Encoded message data
        """

Decoder

Binary message decoder for network protocols.

class Decoder:
    def __init__(self, stream: bytes) -> None:
        """
        Create a new message decoder.
        
        Args:
            stream (bytes): Binary data to decode
        """

    def read_var_uint(self) -> int:
        """
        Read a variable-length unsigned integer.
        
        Returns:
            int: Decoded integer
        """

    def read_message(self) -> bytes | None:
        """
        Read a single message from the stream.
        
        Returns:
            bytes | None: Message data or None if no more messages
        """

    def read_messages(self) -> Iterator[bytes]:
        """
        Read all messages from the stream.
        
        Returns:
            Iterator[bytes]: Iterator over message data
        """

    def read_var_string(self) -> str:
        """
        Read a variable-length string.
        
        Returns:
            str: Decoded string
        """

Channel Protocol

Transport abstraction for document synchronization.

class Channel(Protocol):
    """Abstract interface for transport-agnostic synchronization channels."""
    
    @property
    def path(self) -> str:
        """Get the channel path identifier."""

    async def send(self, message: bytes) -> None:
        """
        Send a message through the channel.
        
        Args:
            message (bytes): Binary message to send
        """

    async def recv(self) -> bytes:
        """
        Receive a message from the channel.
        
        Returns:
            bytes: Received binary message
        """

    def __aiter__(self) -> "Channel":
        """Return async iterator for receiving messages."""

    async def __anext__(self) -> bytes:
        """Get next message from async iterator."""

Provider

Document synchronization provider that manages network communication.

class Provider:
    def __init__(self, doc: Doc, channel: Channel, log: Logger | None = None) -> None:
        """
        Create a new synchronization provider.
        
        Args:
            doc (Doc): Document to synchronize
            channel (Channel): Transport channel for communication
            log (Logger, optional): Logger for debugging
        """

    @property
    def started(self) -> Event:
        """Event that signals when the provider is ready."""

    async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) -> None:
        """
        Start the synchronization provider.
        
        Args:
            task_status: Optional task status for structured concurrency
        """

    async def stop(self) -> None:
        """Stop the synchronization provider."""

    async def __aenter__(self) -> Provider:
        """Enter async context manager."""

    async def __aexit__(self, exc_type, exc_value, exc_tb) -> None:
        """Exit async context manager."""

Synchronization Functions

Message Creation Functions

def write_var_uint(num: int) -> bytes:
    """
    Encode a variable-length unsigned integer.
    
    Args:
        num (int): Integer to encode
        
    Returns:
        bytes: Encoded integer data
    """

def create_awareness_message(data: bytes) -> bytes:
    """
    Create an awareness protocol message.
    
    Args:
        data (bytes): Awareness data to wrap
        
    Returns:
        bytes: Complete awareness message
    """

def create_sync_message(ydoc: Doc) -> bytes:
    """
    Create a synchronization message from document state.
    
    Args:
        ydoc (Doc): Document to create sync message for
        
    Returns:
        bytes: Sync message containing document state
    """

def create_update_message(data: bytes) -> bytes:
    """
    Create an update message containing document changes.
    
    Args:
        data (bytes): Update data to wrap
        
    Returns:
        bytes: Complete update message
    """

def handle_sync_message(message: bytes, ydoc: Doc) -> bytes | None:
    """
    Handle an incoming synchronization message.
    
    Args:
        message (bytes): Incoming sync message
        ydoc (Doc): Document to apply sync message to
        
    Returns:
        bytes | None: Response message if needed, or None
    """

def read_message(stream: bytes) -> bytes:
    """
    Read a single message from binary stream.
    
    Args:
        stream (bytes): Binary stream containing message
        
    Returns:
        bytes: Extracted message data
    """

def write_message(stream: bytes) -> bytes:
    """
    Write a message to binary stream format.
    
    Args:
        stream (bytes): Message data to wrap
        
    Returns:
        bytes: Formatted message stream
    """

Update Utilities

Standalone functions for working with document states and updates.

def get_state(update: bytes) -> bytes:
    """
    Extract the state vector from an update.
    
    Args:
        update (bytes): Binary update data
        
    Returns:
        bytes: State vector representing the document state
    """

def get_update(update: bytes, state: bytes) -> bytes:
    """
    Generate an update containing changes since a given state.
    
    Args:
        update (bytes): Source update data
        state (bytes): State to compare against
        
    Returns:
        bytes: Binary update containing incremental changes
    """

def merge_updates(*updates: bytes) -> bytes:
    """
    Merge multiple updates into a single consolidated update.
    
    Args:
        *updates (bytes): Variable number of update data streams
        
    Returns:
        bytes: Merged update containing all changes
        
    Example:
        >>> update1 = doc1.get_update()
        >>> update2 = doc2.get_update()
        >>> merged = merge_updates(update1, update2)
        >>> doc3.apply_update(merged)  # Apply all changes at once
    """

Usage Examples

Basic Message Encoding/Decoding

from pycrdt import Encoder, Decoder, write_var_uint

# Encode messages
encoder = Encoder()
encoder.write_var_uint(42)
encoder.write_var_string("Hello, world!")
encoded_data = encoder.to_bytes()

print(f"Encoded data: {encoded_data.hex()}")

# Decode messages
decoder = Decoder(encoded_data)
number = decoder.read_var_uint()
text = decoder.read_var_string()

print(f"Decoded number: {number}")  # 42
print(f"Decoded text: {text}")      # "Hello, world!"

# Direct variable-length integer encoding
uint_bytes = write_var_uint(1000)
print(f"Variable uint encoding: {uint_bytes.hex()}")

Document Synchronization Protocol

from pycrdt import Doc, create_sync_message, handle_sync_message, create_update_message

# Create two documents for sync simulation
doc1 = Doc(client_id=1)
doc2 = Doc(client_id=2)

# Add content to doc1
text1 = doc1.get("content", type=Text)
with doc1.transaction():
    text1.insert(0, "Hello from doc1")

# Step 1: Create initial sync message from doc1
sync_message = create_sync_message(doc1)
print(f"Initial sync message: {len(sync_message)} bytes")

# Step 2: Handle sync message on doc2 and get response
response = handle_sync_message(sync_message, doc2)
if response:
    print(f"Sync response: {len(response)} bytes")
    
    # Step 3: Handle response back on doc1
    final_response = handle_sync_message(response, doc1)
    if final_response:
        print(f"Final response: {len(final_response)} bytes")

# Verify synchronization
text2 = doc2.get("content", type=Text)
print(f"Doc1 content: {str(text1)}")
print(f"Doc2 content: {str(text2)}")

# Create update message for incremental sync
with doc1.transaction():
    text1.insert(len(text1), " - updated!")

update_data = doc1.get_update(doc2.get_state())
update_message = create_update_message(update_data)

# Apply update to doc2
handle_sync_message(update_message, doc2)
print(f"Updated doc2 content: {str(text2)}")

Custom Channel Implementation

import asyncio
from pycrdt import Channel

class InMemoryChannel:
    """Simple in-memory channel for testing."""
    
    def __init__(self, path: str):
        self._path = path
        self._queue = asyncio.Queue()
        self._peer_queue = None
    
    @property
    def path(self) -> str:
        return self._path
    
    def connect_to(self, other_channel):
        """Connect two channels for bidirectional communication."""
        self._peer_queue = other_channel._queue
        other_channel._peer_queue = self._queue
    
    async def send(self, message: bytes) -> None:
        if self._peer_queue:
            await self._peer_queue.put(message)
    
    async def recv(self) -> bytes:
        return await self._queue.get()
    
    def __aiter__(self):
        return self
    
    async def __anext__(self) -> bytes:
        return await self.recv()

# Example usage
async def test_custom_channel():
    channel1 = InMemoryChannel("/doc1")
    channel2 = InMemoryChannel("/doc2")
    channel1.connect_to(channel2)
    
    # Send message from channel1 to channel2
    await channel1.send(b"Hello from channel1")
    message = await channel2.recv()
    print(f"Received: {message}")
    
    # Send response back
    await channel2.send(b"Hello back from channel2")
    response = await channel1.recv()
    print(f"Response: {response}")

# Run the test
asyncio.run(test_custom_channel())

Provider-Based Synchronization

import asyncio
import anyio
from pycrdt import Doc, Provider, Text

class WebSocketChannel:
    """WebSocket-like channel implementation."""
    
    def __init__(self, path: str):
        self._path = path
        self._send_queue = asyncio.Queue()
        self._recv_queue = asyncio.Queue()
        self._peer = None
    
    @property
    def path(self) -> str:
        return self._path
    
    def connect_to(self, peer):
        """Simulate WebSocket connection to peer."""
        self._peer = peer
        peer._peer = self
    
    async def send(self, message: bytes) -> None:
        if self._peer:
            await self._peer._recv_queue.put(message)
    
    async def recv(self) -> bytes:
        return await self._recv_queue.get()
    
    def __aiter__(self):
        return self
    
    async def __anext__(self) -> bytes:
        return await self.recv()

async def collaborative_editing_example():
    """Example of collaborative editing using providers."""
    
    # Create documents and channels
    doc1 = Doc(client_id=1)
    doc2 = Doc(client_id=2)
    
    channel1 = WebSocketChannel("/doc/shared")
    channel2 = WebSocketChannel("/doc/shared")
    channel1.connect_to(channel2)
    
    # Create providers
    provider1 = Provider(doc1, channel1)
    provider2 = Provider(doc2, channel2)
    
    async with anyio.create_task_group() as tg:
        # Start providers
        tg.start_soon(provider1.start)
        tg.start_soon(provider2.start)
        
        # Wait for providers to be ready
        await provider1.started.wait()
        await provider2.started.wait()
        
        # Get shared text objects
        text1 = doc1.get("content", type=Text) 
        text2 = doc2.get("content", type=Text)
        
        # Client 1 makes changes
        await anyio.sleep(0.1)  # Allow sync to happen
        with doc1.transaction(origin="client1"):
            text1.insert(0, "Hello from client 1! ")
        
        # Client 2 makes concurrent changes  
        await anyio.sleep(0.1)
        with doc2.transaction(origin="client2"):
            text2.insert(0, "Hi from client 2! ")
        
        # Allow synchronization to complete
        await anyio.sleep(0.2)
        
        print(f"Client 1 sees: {str(text1)}")
        print(f"Client 2 sees: {str(text2)}")
        
        # Stop providers
        await provider1.stop()
        await provider2.stop()

# Run collaborative editing example
asyncio.run(collaborative_editing_example())

Message Stream Processing

from pycrdt import Decoder, YMessageType, YSyncMessageType

def process_message_stream(stream: bytes):
    """Process a stream of multiple messages."""
    decoder = Decoder(stream)
    
    for message_data in decoder.read_messages():
        # Each message starts with message type
        msg_decoder = Decoder(message_data)
        msg_type = msg_decoder.read_var_uint()
        
        if msg_type == YMessageType.SYNC:
            # Process sync message
            sync_type = msg_decoder.read_var_uint()
            
            if sync_type == YSyncMessageType.SYNC_STEP1:
                print("Received sync step 1 (state vector)")
            elif sync_type == YSyncMessageType.SYNC_STEP2:  
                print("Received sync step 2 (missing updates)")
            elif sync_type == YSyncMessageType.SYNC_UPDATE:
                print("Received sync update")
                
        elif msg_type == YMessageType.AWARENESS:
            print("Received awareness message")

# Example message stream
encoder = Encoder()

# Create sync step 1 message
encoder.write_var_uint(YMessageType.SYNC)
encoder.write_var_uint(YSyncMessageType.SYNC_STEP1)
encoder.write_var_string("state_vector_data")

# Create awareness message  
encoder.write_var_uint(YMessageType.AWARENESS)
encoder.write_var_string("awareness_data")

message_stream = encoder.to_bytes()
process_message_stream(message_stream)

Robust Synchronization with Error Handling

import asyncio
import logging
from pycrdt import Doc, Provider, Channel

logger = logging.getLogger(__name__)

class ReliableChannel:
    """Channel with automatic reconnection and error handling."""
    
    def __init__(self, path: str, max_retries: int = 3):
        self._path = path
        self._max_retries = max_retries
        self._connected = False
        self._queue = asyncio.Queue()
        
    @property  
    def path(self) -> str:
        return self._path
    
    async def send(self, message: bytes) -> None:
        retries = 0
        while retries < self._max_retries:
            try:
                if not self._connected:
                    await self._reconnect()
                
                # Simulate network send
                await self._send_impl(message)
                return
                
            except Exception as e:
                retries += 1
                logger.warning(f"Send failed (attempt {retries}): {e}")
                if retries >= self._max_retries:
                    raise
                await asyncio.sleep(1.0 * retries)  # Exponential backoff
    
    async def recv(self) -> bytes:
        while True:
            try:
                if not self._connected:
                    await self._reconnect()
                
                return await self._recv_impl()
                
            except Exception as e:
                logger.warning(f"Receive failed: {e}")
                self._connected = False
                await asyncio.sleep(1.0)
    
    async def _reconnect(self):
        """Simulate reconnection logic."""
        logger.info(f"Reconnecting to {self._path}")
        await asyncio.sleep(0.1)  # Simulate connection time
        self._connected = True
    
    async def _send_impl(self, message: bytes):
        """Simulate actual network send."""
        if not self._connected:
            raise ConnectionError("Not connected")
        # Store in queue for this example
        await self._queue.put(message)
    
    async def _recv_impl(self) -> bytes:
        """Simulate actual network receive."""
        if not self._connected:
            raise ConnectionError("Not connected")
        return await self._queue.get()
    
    def __aiter__(self):
        return self
    
    async def __anext__(self) -> bytes:
        return await self.recv()

async def robust_sync_example():
    """Example with error handling and reconnection."""
    doc = Doc()
    channel = ReliableChannel("/robust/sync")
    
    # Create provider with logging
    logging.basicConfig(level=logging.INFO)
    provider = Provider(doc, channel, log=logger)
    
    try:
        async with provider:  # Use context manager for cleanup
            text = doc.get("content", type=Text)
            
            # Make changes
            with doc.transaction():
                text.insert(0, "Robust sync test")
            
            # Simulate some work
            await asyncio.sleep(1.0)
            
    except Exception as e:
        logger.error(f"Sync failed: {e}")

# Run robust sync example
asyncio.run(robust_sync_example())

Performance Monitoring

import time
from pycrdt import Doc, Text, create_sync_message, handle_sync_message

def benchmark_sync_performance():
    """Benchmark synchronization performance."""
    
    # Create documents with different amounts of content
    sizes = [100, 1000, 10000]
    
    for size in sizes:
        doc1 = Doc(client_id=1)
        doc2 = Doc(client_id=2)
        
        text1 = doc1.get("content", type=Text)
        
        # Generate content
        content = "x" * size
        with doc1.transaction():
            text1.insert(0, content)
        
        # Measure sync message creation
        start_time = time.time()
        sync_msg = create_sync_message(doc1)
        create_time = time.time() - start_time
        
        # Measure sync message handling
        start_time = time.time()
        response = handle_sync_message(sync_msg, doc2)
        handle_time = time.time() - start_time
        
        # Measure final sync
        start_time = time.time()
        if response:
            handle_sync_message(response, doc1)
        complete_time = time.time() - start_time
        
        print(f"Size {size}:")
        print(f"  Message size: {len(sync_msg)} bytes")
        print(f"  Create time: {create_time:.4f}s")
        print(f"  Handle time: {handle_time:.4f}s") 
        print(f"  Complete time: {complete_time:.4f}s")
        print()

benchmark_sync_performance()

Error Handling

from pycrdt import Doc, Provider, Encoder, Decoder

async def sync_with_error_handling():
    """Example of proper error handling in synchronization."""
    
    doc = Doc()
    
    try:
        # Encoding errors
        encoder = Encoder()
        encoder.write_var_uint(-1)  # May raise ValueError for negative numbers
        
    except ValueError as e:
        print(f"Encoding error: {e}")
    
    try:
        # Decoding errors
        invalid_data = b"invalid"
        decoder = Decoder(invalid_data)
        decoder.read_var_uint()  # May raise decoding error
        
    except Exception as e:
        print(f"Decoding error: {e}")
    
    try:
        # Provider errors
        class FailingChannel:
            path = "/fail"
            async def send(self, msg): raise ConnectionError("Network down")
            async def recv(self): raise ConnectionError("Network down")
            def __aiter__(self): return self
            async def __anext__(self): raise ConnectionError("Network down")
        
        provider = Provider(doc, FailingChannel())
        await provider.start()
        
    except Exception as e:
        print(f"Provider error: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-pycrdt

docs

array-operations.md

awareness.md

document-management.md

index.md

map-operations.md

position-undo.md

synchronization.md

text-operations.md

xml-support.md

tile.json