Python bindings for Yrs CRDT library providing collaborative data structures for real-time synchronization.
—
Document management in pycrdt centers around the Doc class, which serves as the container for all collaborative data types. Documents provide transaction management, state tracking, and synchronization capabilities. The library supports both basic document operations and type-safe document variants for structured applications.
The main document container that holds collaborative data types and manages their synchronization.
class Doc:
def __init__(
self,
init: dict[str, T] = {},
*,
client_id: int | None = None,
doc: _Doc | None = None,
Model=None,
allow_multithreading: bool = False
) -> None:
"""
Create a new collaborative document.
Args:
init (dict): Initial data for the document
client_id (int, optional): Unique client identifier
doc (_Doc, optional): Existing native document instance
Model: Model class for typed documents
allow_multithreading (bool): Enable multithreading support
"""
@property
def guid(self) -> int:
"""Get the globally unique document identifier."""
@property
def client_id(self) -> int:
"""Get the client identifier for this document."""
def transaction(self, origin: Any = None) -> Transaction:
"""
Create a new read-write transaction context.
Args:
origin: Optional origin identifier for the transaction
Returns:
Transaction: Context manager for batched operations
"""
def new_transaction(self, origin: Any = None, timeout: float | None = None) -> NewTransaction:
"""
Create a new transaction with async support.
Args:
origin: Optional origin identifier
timeout (float, optional): Transaction timeout in seconds
Returns:
NewTransaction: Async-compatible transaction context
"""
def get_state(self) -> bytes:
"""
Get the current document state as binary data.
Returns:
bytes: Document state vector
"""
def get_update(self, state: bytes | None = None) -> bytes:
"""
Generate an update containing changes since the given state.
Args:
state (bytes, optional): Previous state to compare against
Returns:
bytes: Binary update data
"""
def apply_update(self, update: bytes) -> None:
"""
Apply an update to this document.
Args:
update (bytes): Binary update data to apply
"""
def get(self, key: str, *, type: type[T]) -> T:
"""
Get or create a shared data type at the given key.
Args:
key (str): Key to access the shared type
type: Class of the shared type to create/retrieve
Returns:
T: The shared data type instance
"""
def keys(self) -> Iterable[str]:
"""
Get all keys of shared data types in this document.
Returns:
Iterable[str]: Iterator over document keys
"""
def values(self) -> Iterable[T]:
"""
Get all shared data type values in this document.
Returns:
Iterable[T]: Iterator over document values
"""
def items(self) -> Iterable[tuple[str, T]]:
"""
Get all key-value pairs of shared data types in this document.
Returns:
Iterable[tuple[str, T]]: Iterator over (key, value) pairs
"""
def observe(self, callback: Callable[[TransactionEvent], None]) -> Subscription:
"""
Observe document-level changes.
Args:
callback: Function called when document changes occur
Returns:
Subscription: Handle for unsubscribing
"""
def observe_subdocs(self, callback: Callable[[SubdocsEvent], None]) -> Subscription:
"""
Observe subdocument changes.
Args:
callback: Function called when subdocuments change
Returns:
Subscription: Handle for unsubscribing
"""
def unobserve(self, subscription: Subscription) -> None:
"""
Remove an event observer.
Args:
subscription: Subscription handle to remove
"""
async def events(
self,
subdocs: bool = False,
max_buffer_size: float = float("inf")
) -> MemoryObjectReceiveStream:
"""
Get an async stream of document events.
Args:
subdocs (bool): Include subdocument events
max_buffer_size (float): Maximum event buffer size
Returns:
MemoryObjectReceiveStream: Async event stream
"""
# Dictionary-like interface
def __getitem__(self, key: str) -> T:
"""Get shared type by key."""
def __setitem__(self, key: str, value: T) -> None:
"""Set shared type at key."""
def __iter__(self) -> Iterable[str]:
"""Iterate over document keys."""
def keys(self) -> Iterable[str]:
"""Get all document keys."""
def values(self) -> Iterable[T]:
"""Get all shared type values."""
def items(self) -> Iterable[tuple[str, T]]:
"""Get key-value pairs."""A type-safe wrapper around Doc that provides typed access to root shared values.
class TypedDoc:
"""
Base class for type-safe document containers.
Usage:
class MyDoc(TypedDoc):
map0: Map[int]
array0: Array[bool]
text0: Text
doc = MyDoc()
doc.map0["foo"] = 3
doc.array0.append(True)
doc.text0 += "Hello"
"""Context manager for read-write operations on documents.
class Transaction:
def __init__(self, doc: Doc, origin: Any = None) -> None:
"""
Create a transaction context.
Args:
doc (Doc): Document to create transaction for
origin: Optional transaction origin identifier
"""
@property
def origin(self) -> Any:
"""Get the transaction origin identifier."""
def __enter__(self) -> Transaction:
"""Enter transaction context."""
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit transaction context and commit changes."""Supports both sync and async context managers for new transactions.
class NewTransaction:
def __init__(self, doc: Doc, origin: Any = None, timeout: float | None = None) -> None:
"""
Create a new transaction with async support.
Args:
doc (Doc): Document to create transaction for
origin: Optional transaction origin identifier
timeout (float, optional): Transaction timeout
"""
def __enter__(self) -> NewTransaction:
"""Enter sync context."""
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit sync context."""
async def __aenter__(self) -> NewTransaction:
"""Enter async context."""
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Exit async context."""Read-only transaction that cannot modify document state.
class ReadTransaction:
"""
Read-only transaction context for safe read operations.
Cannot be used to modify document state.
"""Event emitted when document changes occur.
class TransactionEvent:
@property
def update(self) -> bytes:
"""Get the binary update data for this transaction."""Event emitted when subdocument changes occur.
class SubdocsEvent:
"""Event containing subdocument change information."""Handle for managing event subscriptions.
class Subscription:
"""
Subscription handle for event observers.
Used with unobserve() to clean up event listeners.
"""Base class for document containers with multithreading support.
class BaseDoc:
"""
Base class for document containers with multithreading support.
Provides common functionality for Doc and typed document variants.
"""Abstract base class for all shared collaborative data types.
class BaseType:
"""
Abstract base for all shared collaborative data types.
Provides common functionality for Text, Array, Map, and XML types.
"""
@property
def doc(self) -> Doc:
"""Get the document this type belongs to."""
@property
def is_prelim(self) -> bool:
"""Check if type is preliminary (not yet integrated)."""
@property
def is_integrated(self) -> bool:
"""Check if type is integrated into document."""
@property
def type_name(self) -> str:
"""Get the name of the type."""
def observe(self, callback: Callable[[BaseEvent], None]) -> Subscription:
"""Observe changes to this type."""
def observe_deep(self, callback: Callable[[list[BaseEvent]], None]) -> Subscription:
"""Observe deep changes including nested structures."""
def unobserve(self, subscription: Subscription) -> None:
"""Remove an event observer."""
def to_py(self) -> Any:
"""Convert to Python native type (abstract method)."""Base class for sequential data types (Text, Array).
class Sequence(BaseType):
"""
Base class for sequential data types like Text and Array.
Provides position tracking functionality.
"""
def sticky_index(self, index: int, assoc: Assoc = Assoc.AFTER) -> StickyIndex:
"""Create a sticky index for position tracking."""Base class for all change events.
class BaseEvent:
"""
Base class for all change events with automatic attribute processing.
Used by TextEvent, ArrayEvent, MapEvent, and XmlEvent.
"""Base class for type-safe containers.
class Typed:
"""
Base class for type-safe containers with runtime type checking.
Used by TypedDoc, TypedArray, and TypedMap.
"""import pycrdt
from pycrdt import Doc, Text, Array, Map
# Create a new document
doc = Doc()
# Access shared types
text = doc.get("content", type=Text)
users = doc.get("users", type=Array)
settings = doc.get("settings", type=Map)
# Use dictionary-like interface
text_alt = doc["content"] # Get existing shared type
doc["metadata"] = Map() # Create new shared typefrom pycrdt import Doc, Text, Array
doc = Doc()
text = doc.get("content", type=Text)
items = doc.get("items", type=Array)
# Batch multiple operations in a transaction
with doc.transaction() as txn:
text.insert(0, "Hello, world!")
items.append("item1")
items.append("item2")
# All changes committed atomically
# Async transaction
async def update_document():
async with doc.new_transaction() as txn:
text += " More content"
items.extend(["item3", "item4"])from pycrdt import TypedDoc, Text, Array, Map
class ProjectDoc(TypedDoc):
title: Text
tasks: Array[str]
metadata: Map[Any]
# Create typed document
project = ProjectDoc()
# Type-safe access
project.title.insert(0, "My Project")
project.tasks.append("Task 1")
project.metadata["created"] = "2024-01-01"
# Access underlying document
raw_doc = project._docfrom pycrdt import Doc, TransactionEvent
doc = Doc()
def on_document_change(event: TransactionEvent):
print(f"Document updated: {len(event.update)} bytes")
# Subscribe to document changes
subscription = doc.observe(on_document_change)
# Make changes to trigger events
with doc.transaction():
text = doc.get("content", type=Text)
text.insert(0, "Hello")
# Clean up subscription
doc.unobserve(subscription)import anyio
from pycrdt import Doc
async def monitor_document(doc: Doc):
async with doc.events() as event_stream:
async for event in event_stream:
print(f"Event received: {event}")
# Run event monitoring
doc = Doc()
anyio.run(monitor_document, doc)from pycrdt import Doc, Text
# Create documents
doc1 = Doc()
doc2 = Doc()
text1 = doc1.get("content", type=Text)
text1.insert(0, "Hello from doc1")
# Get state and update
state1 = doc1.get_state()
update = doc1.get_update()
# Apply to second document
doc2.apply_update(update)
text2 = doc2.get("content", type=Text)
print(str(text2)) # "Hello from doc1"
# Incremental updates
text1.insert(5, " there")
incremental_update = doc1.get_update(state1)
doc2.apply_update(incremental_update)from pycrdt import Doc, Text
doc = Doc()
try:
# Invalid transaction usage
txn = doc.transaction()
# Forgetting to use context manager can cause issues
# Type mismatches in typed documents
class StrictDoc(TypedDoc):
numbers: Array[int]
strict_doc = StrictDoc()
strict_doc.numbers.append("string") # May raise TypeError
except (ValueError, TypeError, RuntimeError) as e:
print(f"Document operation failed: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-pycrdt