Python bindings for Yrs CRDT library providing collaborative data structures for real-time synchronization.
—
pycrdt provides comprehensive synchronization capabilities for sharing documents across multiple clients in real-time. The synchronization system includes message encoding/decoding, sync protocols, network providers, and transport abstractions. This enables building collaborative applications with automatic conflict resolution and efficient network communication.
class YMessageType(IntEnum):
"""Top-level message types for network protocol."""
SYNC = 0
AWARENESS = 1
class YSyncMessageType(IntEnum):
"""Sync-specific message types."""
SYNC_STEP1 = 0 # Send document state
SYNC_STEP2 = 1 # Reply with missing updates
SYNC_UPDATE = 2 # Document update broadcastBinary message encoder for network protocols.
class Encoder:
def __init__(self) -> None:
"""Create a new message encoder."""
def write_var_uint(self, num: int) -> None:
"""
Write a variable-length unsigned integer.
Args:
num (int): Integer to encode
"""
def write_var_string(self, text: str) -> None:
"""
Write a variable-length string.
Args:
text (str): String to encode
"""
def to_bytes(self) -> bytes:
"""
Get the encoded data as bytes.
Returns:
bytes: Encoded message data
"""Binary message decoder for network protocols.
class Decoder:
def __init__(self, stream: bytes) -> None:
"""
Create a new message decoder.
Args:
stream (bytes): Binary data to decode
"""
def read_var_uint(self) -> int:
"""
Read a variable-length unsigned integer.
Returns:
int: Decoded integer
"""
def read_message(self) -> bytes | None:
"""
Read a single message from the stream.
Returns:
bytes | None: Message data or None if no more messages
"""
def read_messages(self) -> Iterator[bytes]:
"""
Read all messages from the stream.
Returns:
Iterator[bytes]: Iterator over message data
"""
def read_var_string(self) -> str:
"""
Read a variable-length string.
Returns:
str: Decoded string
"""Transport abstraction for document synchronization.
class Channel(Protocol):
"""Abstract interface for transport-agnostic synchronization channels."""
@property
def path(self) -> str:
"""Get the channel path identifier."""
async def send(self, message: bytes) -> None:
"""
Send a message through the channel.
Args:
message (bytes): Binary message to send
"""
async def recv(self) -> bytes:
"""
Receive a message from the channel.
Returns:
bytes: Received binary message
"""
def __aiter__(self) -> "Channel":
"""Return async iterator for receiving messages."""
async def __anext__(self) -> bytes:
"""Get next message from async iterator."""Document synchronization provider that manages network communication.
class Provider:
def __init__(self, doc: Doc, channel: Channel, log: Logger | None = None) -> None:
"""
Create a new synchronization provider.
Args:
doc (Doc): Document to synchronize
channel (Channel): Transport channel for communication
log (Logger, optional): Logger for debugging
"""
@property
def started(self) -> Event:
"""Event that signals when the provider is ready."""
async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) -> None:
"""
Start the synchronization provider.
Args:
task_status: Optional task status for structured concurrency
"""
async def stop(self) -> None:
"""Stop the synchronization provider."""
async def __aenter__(self) -> Provider:
"""Enter async context manager."""
async def __aexit__(self, exc_type, exc_value, exc_tb) -> None:
"""Exit async context manager."""def write_var_uint(num: int) -> bytes:
"""
Encode a variable-length unsigned integer.
Args:
num (int): Integer to encode
Returns:
bytes: Encoded integer data
"""
def create_awareness_message(data: bytes) -> bytes:
"""
Create an awareness protocol message.
Args:
data (bytes): Awareness data to wrap
Returns:
bytes: Complete awareness message
"""
def create_sync_message(ydoc: Doc) -> bytes:
"""
Create a synchronization message from document state.
Args:
ydoc (Doc): Document to create sync message for
Returns:
bytes: Sync message containing document state
"""
def create_update_message(data: bytes) -> bytes:
"""
Create an update message containing document changes.
Args:
data (bytes): Update data to wrap
Returns:
bytes: Complete update message
"""
def handle_sync_message(message: bytes, ydoc: Doc) -> bytes | None:
"""
Handle an incoming synchronization message.
Args:
message (bytes): Incoming sync message
ydoc (Doc): Document to apply sync message to
Returns:
bytes | None: Response message if needed, or None
"""
def read_message(stream: bytes) -> bytes:
"""
Read a single message from binary stream.
Args:
stream (bytes): Binary stream containing message
Returns:
bytes: Extracted message data
"""
def write_message(stream: bytes) -> bytes:
"""
Write a message to binary stream format.
Args:
stream (bytes): Message data to wrap
Returns:
bytes: Formatted message stream
"""Standalone functions for working with document states and updates.
def get_state(update: bytes) -> bytes:
"""
Extract the state vector from an update.
Args:
update (bytes): Binary update data
Returns:
bytes: State vector representing the document state
"""
def get_update(update: bytes, state: bytes) -> bytes:
"""
Generate an update containing changes since a given state.
Args:
update (bytes): Source update data
state (bytes): State to compare against
Returns:
bytes: Binary update containing incremental changes
"""
def merge_updates(*updates: bytes) -> bytes:
"""
Merge multiple updates into a single consolidated update.
Args:
*updates (bytes): Variable number of update data streams
Returns:
bytes: Merged update containing all changes
Example:
>>> update1 = doc1.get_update()
>>> update2 = doc2.get_update()
>>> merged = merge_updates(update1, update2)
>>> doc3.apply_update(merged) # Apply all changes at once
"""from pycrdt import Encoder, Decoder, write_var_uint
# Encode messages
encoder = Encoder()
encoder.write_var_uint(42)
encoder.write_var_string("Hello, world!")
encoded_data = encoder.to_bytes()
print(f"Encoded data: {encoded_data.hex()}")
# Decode messages
decoder = Decoder(encoded_data)
number = decoder.read_var_uint()
text = decoder.read_var_string()
print(f"Decoded number: {number}") # 42
print(f"Decoded text: {text}") # "Hello, world!"
# Direct variable-length integer encoding
uint_bytes = write_var_uint(1000)
print(f"Variable uint encoding: {uint_bytes.hex()}")from pycrdt import Doc, create_sync_message, handle_sync_message, create_update_message
# Create two documents for sync simulation
doc1 = Doc(client_id=1)
doc2 = Doc(client_id=2)
# Add content to doc1
text1 = doc1.get("content", type=Text)
with doc1.transaction():
text1.insert(0, "Hello from doc1")
# Step 1: Create initial sync message from doc1
sync_message = create_sync_message(doc1)
print(f"Initial sync message: {len(sync_message)} bytes")
# Step 2: Handle sync message on doc2 and get response
response = handle_sync_message(sync_message, doc2)
if response:
print(f"Sync response: {len(response)} bytes")
# Step 3: Handle response back on doc1
final_response = handle_sync_message(response, doc1)
if final_response:
print(f"Final response: {len(final_response)} bytes")
# Verify synchronization
text2 = doc2.get("content", type=Text)
print(f"Doc1 content: {str(text1)}")
print(f"Doc2 content: {str(text2)}")
# Create update message for incremental sync
with doc1.transaction():
text1.insert(len(text1), " - updated!")
update_data = doc1.get_update(doc2.get_state())
update_message = create_update_message(update_data)
# Apply update to doc2
handle_sync_message(update_message, doc2)
print(f"Updated doc2 content: {str(text2)}")import asyncio
from pycrdt import Channel
class InMemoryChannel:
"""Simple in-memory channel for testing."""
def __init__(self, path: str):
self._path = path
self._queue = asyncio.Queue()
self._peer_queue = None
@property
def path(self) -> str:
return self._path
def connect_to(self, other_channel):
"""Connect two channels for bidirectional communication."""
self._peer_queue = other_channel._queue
other_channel._peer_queue = self._queue
async def send(self, message: bytes) -> None:
if self._peer_queue:
await self._peer_queue.put(message)
async def recv(self) -> bytes:
return await self._queue.get()
def __aiter__(self):
return self
async def __anext__(self) -> bytes:
return await self.recv()
# Example usage
async def test_custom_channel():
channel1 = InMemoryChannel("/doc1")
channel2 = InMemoryChannel("/doc2")
channel1.connect_to(channel2)
# Send message from channel1 to channel2
await channel1.send(b"Hello from channel1")
message = await channel2.recv()
print(f"Received: {message}")
# Send response back
await channel2.send(b"Hello back from channel2")
response = await channel1.recv()
print(f"Response: {response}")
# Run the test
asyncio.run(test_custom_channel())import asyncio
import anyio
from pycrdt import Doc, Provider, Text
class WebSocketChannel:
"""WebSocket-like channel implementation."""
def __init__(self, path: str):
self._path = path
self._send_queue = asyncio.Queue()
self._recv_queue = asyncio.Queue()
self._peer = None
@property
def path(self) -> str:
return self._path
def connect_to(self, peer):
"""Simulate WebSocket connection to peer."""
self._peer = peer
peer._peer = self
async def send(self, message: bytes) -> None:
if self._peer:
await self._peer._recv_queue.put(message)
async def recv(self) -> bytes:
return await self._recv_queue.get()
def __aiter__(self):
return self
async def __anext__(self) -> bytes:
return await self.recv()
async def collaborative_editing_example():
"""Example of collaborative editing using providers."""
# Create documents and channels
doc1 = Doc(client_id=1)
doc2 = Doc(client_id=2)
channel1 = WebSocketChannel("/doc/shared")
channel2 = WebSocketChannel("/doc/shared")
channel1.connect_to(channel2)
# Create providers
provider1 = Provider(doc1, channel1)
provider2 = Provider(doc2, channel2)
async with anyio.create_task_group() as tg:
# Start providers
tg.start_soon(provider1.start)
tg.start_soon(provider2.start)
# Wait for providers to be ready
await provider1.started.wait()
await provider2.started.wait()
# Get shared text objects
text1 = doc1.get("content", type=Text)
text2 = doc2.get("content", type=Text)
# Client 1 makes changes
await anyio.sleep(0.1) # Allow sync to happen
with doc1.transaction(origin="client1"):
text1.insert(0, "Hello from client 1! ")
# Client 2 makes concurrent changes
await anyio.sleep(0.1)
with doc2.transaction(origin="client2"):
text2.insert(0, "Hi from client 2! ")
# Allow synchronization to complete
await anyio.sleep(0.2)
print(f"Client 1 sees: {str(text1)}")
print(f"Client 2 sees: {str(text2)}")
# Stop providers
await provider1.stop()
await provider2.stop()
# Run collaborative editing example
asyncio.run(collaborative_editing_example())from pycrdt import Decoder, YMessageType, YSyncMessageType
def process_message_stream(stream: bytes):
"""Process a stream of multiple messages."""
decoder = Decoder(stream)
for message_data in decoder.read_messages():
# Each message starts with message type
msg_decoder = Decoder(message_data)
msg_type = msg_decoder.read_var_uint()
if msg_type == YMessageType.SYNC:
# Process sync message
sync_type = msg_decoder.read_var_uint()
if sync_type == YSyncMessageType.SYNC_STEP1:
print("Received sync step 1 (state vector)")
elif sync_type == YSyncMessageType.SYNC_STEP2:
print("Received sync step 2 (missing updates)")
elif sync_type == YSyncMessageType.SYNC_UPDATE:
print("Received sync update")
elif msg_type == YMessageType.AWARENESS:
print("Received awareness message")
# Example message stream
encoder = Encoder()
# Create sync step 1 message
encoder.write_var_uint(YMessageType.SYNC)
encoder.write_var_uint(YSyncMessageType.SYNC_STEP1)
encoder.write_var_string("state_vector_data")
# Create awareness message
encoder.write_var_uint(YMessageType.AWARENESS)
encoder.write_var_string("awareness_data")
message_stream = encoder.to_bytes()
process_message_stream(message_stream)import asyncio
import logging
from pycrdt import Doc, Provider, Channel
logger = logging.getLogger(__name__)
class ReliableChannel:
"""Channel with automatic reconnection and error handling."""
def __init__(self, path: str, max_retries: int = 3):
self._path = path
self._max_retries = max_retries
self._connected = False
self._queue = asyncio.Queue()
@property
def path(self) -> str:
return self._path
async def send(self, message: bytes) -> None:
retries = 0
while retries < self._max_retries:
try:
if not self._connected:
await self._reconnect()
# Simulate network send
await self._send_impl(message)
return
except Exception as e:
retries += 1
logger.warning(f"Send failed (attempt {retries}): {e}")
if retries >= self._max_retries:
raise
await asyncio.sleep(1.0 * retries) # Exponential backoff
async def recv(self) -> bytes:
while True:
try:
if not self._connected:
await self._reconnect()
return await self._recv_impl()
except Exception as e:
logger.warning(f"Receive failed: {e}")
self._connected = False
await asyncio.sleep(1.0)
async def _reconnect(self):
"""Simulate reconnection logic."""
logger.info(f"Reconnecting to {self._path}")
await asyncio.sleep(0.1) # Simulate connection time
self._connected = True
async def _send_impl(self, message: bytes):
"""Simulate actual network send."""
if not self._connected:
raise ConnectionError("Not connected")
# Store in queue for this example
await self._queue.put(message)
async def _recv_impl(self) -> bytes:
"""Simulate actual network receive."""
if not self._connected:
raise ConnectionError("Not connected")
return await self._queue.get()
def __aiter__(self):
return self
async def __anext__(self) -> bytes:
return await self.recv()
async def robust_sync_example():
"""Example with error handling and reconnection."""
doc = Doc()
channel = ReliableChannel("/robust/sync")
# Create provider with logging
logging.basicConfig(level=logging.INFO)
provider = Provider(doc, channel, log=logger)
try:
async with provider: # Use context manager for cleanup
text = doc.get("content", type=Text)
# Make changes
with doc.transaction():
text.insert(0, "Robust sync test")
# Simulate some work
await asyncio.sleep(1.0)
except Exception as e:
logger.error(f"Sync failed: {e}")
# Run robust sync example
asyncio.run(robust_sync_example())import time
from pycrdt import Doc, Text, create_sync_message, handle_sync_message
def benchmark_sync_performance():
"""Benchmark synchronization performance."""
# Create documents with different amounts of content
sizes = [100, 1000, 10000]
for size in sizes:
doc1 = Doc(client_id=1)
doc2 = Doc(client_id=2)
text1 = doc1.get("content", type=Text)
# Generate content
content = "x" * size
with doc1.transaction():
text1.insert(0, content)
# Measure sync message creation
start_time = time.time()
sync_msg = create_sync_message(doc1)
create_time = time.time() - start_time
# Measure sync message handling
start_time = time.time()
response = handle_sync_message(sync_msg, doc2)
handle_time = time.time() - start_time
# Measure final sync
start_time = time.time()
if response:
handle_sync_message(response, doc1)
complete_time = time.time() - start_time
print(f"Size {size}:")
print(f" Message size: {len(sync_msg)} bytes")
print(f" Create time: {create_time:.4f}s")
print(f" Handle time: {handle_time:.4f}s")
print(f" Complete time: {complete_time:.4f}s")
print()
benchmark_sync_performance()from pycrdt import Doc, Provider, Encoder, Decoder
async def sync_with_error_handling():
"""Example of proper error handling in synchronization."""
doc = Doc()
try:
# Encoding errors
encoder = Encoder()
encoder.write_var_uint(-1) # May raise ValueError for negative numbers
except ValueError as e:
print(f"Encoding error: {e}")
try:
# Decoding errors
invalid_data = b"invalid"
decoder = Decoder(invalid_data)
decoder.read_var_uint() # May raise decoding error
except Exception as e:
print(f"Decoding error: {e}")
try:
# Provider errors
class FailingChannel:
path = "/fail"
async def send(self, msg): raise ConnectionError("Network down")
async def recv(self): raise ConnectionError("Network down")
def __aiter__(self): return self
async def __anext__(self): raise ConnectionError("Network down")
provider = Provider(doc, FailingChannel())
await provider.start()
except Exception as e:
print(f"Provider error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-pycrdt