Python bindings for Yrs CRDT library providing collaborative data structures for real-time synchronization.
—
pycrdt provides robust position tracking and undo/redo functionality essential for collaborative editing. StickyIndex enables persistent position tracking that survives concurrent edits, while UndoManager provides comprehensive undo/redo operations with origin filtering and scope control. These features are crucial for building user-friendly collaborative editors.
Persistent position tracker that maintains its location during concurrent edits.
class StickyIndex:
"""
Permanent position that maintains location during concurrent updates.
StickyIndex represents a position in a sequence (Text or Array) that
automatically adjusts when other clients make concurrent edits.
"""
@property
def assoc(self) -> Assoc:
"""Get the association type (before/after) for this position."""
def get_index(self, transaction: Transaction | None = None) -> int:
"""
Get the current index position.
Args:
transaction (Transaction, optional): Transaction context for reading position
Returns:
int: Current index position
"""
def encode(self) -> bytes:
"""
Encode the sticky index to binary format.
Returns:
bytes: Encoded position data
"""
def to_json(self) -> dict:
"""
Convert sticky index to JSON-serializable format.
Returns:
dict: JSON representation of the position
"""
@classmethod
def new(cls, sequence: Sequence, index: int, assoc: Assoc) -> Self:
"""
Create a new sticky index for a sequence.
Args:
sequence (Sequence): Text or Array to create position for
index (int): Initial position index
assoc (Assoc): Association type (BEFORE or AFTER)
Returns:
StickyIndex: New sticky index instance
"""
@classmethod
def decode(cls, data: bytes, sequence: Sequence | None = None) -> Self:
"""
Decode a sticky index from binary data.
Args:
data (bytes): Encoded position data
sequence (Sequence, optional): Sequence to attach position to
Returns:
StickyIndex: Decoded sticky index
"""
@classmethod
def from_json(cls, data: dict, sequence: Sequence | None = None) -> Self:
"""
Create sticky index from JSON data.
Args:
data (dict): JSON representation of position
sequence (Sequence, optional): Sequence to attach position to
Returns:
StickyIndex: Sticky index from JSON data
"""Association type for sticky positions.
class Assoc(IntEnum):
"""Specifies whether sticky index associates before or after position."""
AFTER = 0 # Associate with item after the position
BEFORE = -1 # Associate with item before the positionComprehensive undo/redo operations with origin filtering and scope control.
class UndoManager:
def __init__(
self,
*,
doc: Doc | None = None,
scopes: list[BaseType] = [],
capture_timeout_millis: int = 500,
timestamp: Callable[[], int] = timestamp,
) -> None:
"""
Create an undo manager for document operations.
Args:
doc (Doc, optional): Document to track changes for
scopes (list[BaseType]): List of shared types to track
capture_timeout_millis (int): Timeout for capturing operations into single undo step
timestamp (Callable): Function to generate timestamps
"""
@property
def undo_stack(self) -> list[StackItem]:
"""Get the list of undoable operations."""
@property
def redo_stack(self) -> list[StackItem]:
"""Get the list of redoable operations."""
def expand_scope(self, scope: BaseType) -> None:
"""
Add a shared type to the undo manager's scope.
Args:
scope (BaseType): Shared type to start tracking
"""
def include_origin(self, origin: Any) -> None:
"""
Include operations with specific origin in undo tracking.
Args:
origin: Origin identifier to include
"""
def exclude_origin(self, origin: Any) -> None:
"""
Exclude operations with specific origin from undo tracking.
Args:
origin: Origin identifier to exclude
"""
def can_undo(self) -> bool:
"""
Check if there are operations that can be undone.
Returns:
bool: True if undo is possible
"""
def undo(self) -> bool:
"""
Undo the last operation.
Returns:
bool: True if undo was performed
"""
def can_redo(self) -> bool:
"""
Check if there are operations that can be redone.
Returns:
bool: True if redo is possible
"""
def redo(self) -> bool:
"""
Redo the last undone operation.
Returns:
bool: True if redo was performed
"""
def clear(self) -> None:
"""Clear all undo and redo history."""Undo stack item containing reversible operations.
class StackItem:
"""
Represents a single undoable operation or group of operations.
StackItems are created automatically by the UndoManager and contain
the information needed to reverse document changes.
"""from pycrdt import Doc, Text, StickyIndex, Assoc
doc = Doc()
text = doc.get("content", type=Text)
# Create initial content
text.insert(0, "Hello, world! This is a test document.")
# Create sticky positions
start_pos = text.sticky_index(7, Assoc.BEFORE) # Before "world"
end_pos = text.sticky_index(12, Assoc.AFTER) # After "world"
cursor_pos = text.sticky_index(20, Assoc.AFTER) # After "This"
print(f"Initial positions: start={start_pos.get_index()}, end={end_pos.get_index()}, cursor={cursor_pos.get_index()}")
# Make edits that affect positions
text.insert(0, "Well, ") # Insert at beginning
print(f"After insert at 0: start={start_pos.get_index()}, end={end_pos.get_index()}, cursor={cursor_pos.get_index()}")
text.insert(15, "beautiful ") # Insert within tracked region
print(f"After insert at 15: start={start_pos.get_index()}, end={end_pos.get_index()}, cursor={cursor_pos.get_index()}")
# Delete text before tracked positions
del text[0:6] # Remove "Well, "
print(f"After delete: start={start_pos.get_index()}, end={end_pos.get_index()}, cursor={cursor_pos.get_index()}")
# Get text at tracked positions
with doc.transaction() as txn:
start_idx = start_pos.get_index(txn)
end_idx = end_pos.get_index(txn)
tracked_text = text[start_idx:end_idx]
print(f"Tracked text: '{tracked_text}'")import json
from pycrdt import Doc, Text, StickyIndex, Assoc
doc = Doc()
text = doc.get("content", type=Text)
text.insert(0, "Persistent position tracking example.")
# Create positions
bookmark = text.sticky_index(10, Assoc.AFTER)
selection_start = text.sticky_index(15, Assoc.BEFORE)
selection_end = text.sticky_index(23, Assoc.AFTER)
# Serialize positions
bookmark_data = bookmark.to_json()
selection_data = {
"start": selection_start.to_json(),
"end": selection_end.to_json()
}
positions_json = json.dumps({
"bookmark": bookmark_data,
"selection": selection_data
}, indent=2)
print(f"Serialized positions:\n{positions_json}")
# Make changes to document
text.insert(5, "NEW ")
text.insert(30, " UPDATED")
# Deserialize positions in new session
doc2 = Doc()
text2 = doc2.get("content", type=Text)
# Apply the changes to new document
update = doc.get_update()
doc2.apply_update(update)
# Restore positions
positions_data = json.loads(positions_json)
restored_bookmark = StickyIndex.from_json(positions_data["bookmark"], text2)
restored_start = StickyIndex.from_json(positions_data["selection"]["start"], text2)
restored_end = StickyIndex.from_json(positions_data["selection"]["end"], text2)
print(f"Restored bookmark: {restored_bookmark.get_index()}")
print(f"Restored selection: {restored_start.get_index()}-{restored_end.get_index()}")
# Verify positions still track correctly
with doc2.transaction() as txn:
start_idx = restored_start.get_index(txn)
end_idx = restored_end.get_index(txn)
selected_text = text2[start_idx:end_idx]
print(f"Selected text: '{selected_text}'")from pycrdt import Doc, Text, Array, UndoManager
doc = Doc()
text = doc.get("content", type=Text)
array = doc.get("items", type=Array)
# Create undo manager
undo_manager = UndoManager(doc=doc, scopes=[text, array])
# Make some changes
with doc.transaction(origin="user"):
text.insert(0, "Hello, world!")
print(f"Text: {str(text)}")
print(f"Can undo: {undo_manager.can_undo()}")
# Make more changes
with doc.transaction(origin="user"):
array.extend(["item1", "item2", "item3"])
print(f"Array: {list(array)}")
# Undo last operation
if undo_manager.can_undo():
undo_manager.undo()
print(f"After undo - Array: {list(array)}")
print(f"Can redo: {undo_manager.can_redo()}")
# Undo text changes
if undo_manager.can_undo():
undo_manager.undo()
print(f"After undo - Text: '{str(text)}'")
# Redo operations
if undo_manager.can_redo():
undo_manager.redo()
print(f"After redo - Text: '{str(text)}'")
if undo_manager.can_redo():
undo_manager.redo()
print(f"After redo - Array: {list(array)}")from pycrdt import Doc, Text, UndoManager
doc = Doc()
text = doc.get("content", type=Text)
# Create undo manager that only tracks user operations
undo_manager = UndoManager(doc=doc, scopes=[text])
undo_manager.include_origin("user") # Only track "user" origin
# Make changes with different origins
with doc.transaction(origin="user"):
text.insert(0, "User change 1")
with doc.transaction(origin="system"):
text.insert(0, "System change - ") # This won't be tracked
with doc.transaction(origin="user"):
text.insert(len(text), " - User change 2")
print(f"Final text: {str(text)}")
print(f"Undo stack size: {len(undo_manager.undo_stack)}")
# Undo - should only undo user changes
undo_manager.undo()
print(f"After first undo: {str(text)}")
undo_manager.undo()
print(f"After second undo: {str(text)}")
# System change remains because it wasn't tracked
print(f"System change still present: {'System change' in str(text)}")from pycrdt import Doc, Text, UndoManager, StickyIndex, Assoc
# Simulate collaborative editing with undo
doc1 = Doc(client_id=1)
doc2 = Doc(client_id=2)
text1 = doc1.get("document", type=Text)
text2 = doc2.get("document", type=Text)
# Create undo managers for each client
undo1 = UndoManager(doc=doc1, scopes=[text1])
undo1.include_origin("client1")
undo2 = UndoManager(doc=doc2, scopes=[text2])
undo2.include_origin("client2")
# Create position trackers
cursor1 = None
cursor2 = None
# Client 1 makes initial changes
with doc1.transaction(origin="client1"):
text1.insert(0, "Collaborative document. ")
cursor1 = text1.sticky_index(len(text1), Assoc.AFTER)
# Sync to client 2
update = doc1.get_update()
doc2.apply_update(update)
cursor2 = text2.sticky_index(10, Assoc.AFTER) # Position at "document"
print(f"Initial state: '{str(text1)}'")
# Client 2 makes concurrent changes
with doc2.transaction(origin="client2"):
pos = cursor2.get_index()
text2.insert(pos, " EDITED")
# Client 1 continues editing
with doc1.transaction(origin="client1"):
pos = cursor1.get_index()
text1.insert(pos, "More content from client 1.")
# Sync changes
update1 = doc1.get_update(doc2.get_state())
update2 = doc2.get_update(doc1.get_state())
doc2.apply_update(update1)
doc1.apply_update(update2)
print(f"After collaboration: '{str(text1)}'")
print(f"Client 2 sees: '{str(text2)}'")
# Each client can undo their own changes
print(f"Client 1 can undo: {undo1.can_undo()}")
print(f"Client 2 can undo: {undo2.can_undo()}")
# Client 1 undoes their changes
if undo1.can_undo():
undo1.undo() # Undo "More content from client 1."
print(f"Client 1 after undo: '{str(text1)}'")
if undo1.can_undo():
undo1.undo() # Undo initial text
print(f"Client 1 after second undo: '{str(text1)}'")
# Client 2's changes remain
print(f"Client 2's changes still present: {'EDITED' in str(text1)}")from pycrdt import Doc, Text, Array, Map, UndoManager
import time
def custom_timestamp():
"""Custom timestamp function."""
return int(time.time() * 1000)
doc = Doc()
text = doc.get("text", type=Text)
array = doc.get("array", type=Array)
map_data = doc.get("map", type=Map)
# Create undo manager with custom configuration
undo_manager = UndoManager(
doc=doc,
scopes=[text, array], # Only track text and array, not map
capture_timeout_millis=1000, # Group operations within 1 second
timestamp=custom_timestamp
)
# Configure origin filtering
undo_manager.include_origin("user")
undo_manager.exclude_origin("auto-save")
print("Making grouped changes (within timeout)...")
# Make changes quickly (will be grouped)
start_time = time.time()
with doc.transaction(origin="user"):
text.insert(0, "Quick ")
with doc.transaction(origin="user"):
text.insert(6, "changes ")
with doc.transaction(origin="user"):
array.append("item1")
elapsed = (time.time() - start_time) * 1000
print(f"Changes made in {elapsed:.1f}ms")
# Make changes to untracked type
with doc.transaction(origin="user"):
map_data["key"] = "value" # Not tracked
print(f"Undo stack size: {len(undo_manager.undo_stack)}")
# Wait for timeout then make another change
time.sleep(1.1) # Exceed capture timeout
with doc.transaction(origin="user"):
text.insert(len(text), "separate")
print(f"Undo stack size after timeout: {len(undo_manager.undo_stack)}")
# Make auto-save change (will be ignored)
with doc.transaction(origin="auto-save"):
text.insert(0, "[SAVED] ")
print(f"Text after auto-save: '{str(text)}'")
print(f"Undo stack size (auto-save ignored): {len(undo_manager.undo_stack)}")
# Undo operations
print("\nUndoing operations:")
while undo_manager.can_undo():
undo_manager.undo()
print(f"Text: '{str(text)}', Array: {list(array)}")
# Auto-save prefix remains (wasn't tracked)
print(f"Auto-save change remains: {str(text).startswith('[SAVED]')}")from pycrdt import Doc, Text, Array, Map, UndoManager
doc = Doc()
text = doc.get("text", type=Text)
array = doc.get("array", type=Array)
map_data = doc.get("map", type=Map)
# Create undo manager with initial scope
undo_manager = UndoManager(doc=doc, scopes=[text])
# Make changes to tracked type
with doc.transaction(origin="user"):
text.insert(0, "Tracked text")
# Make changes to untracked type
with doc.transaction(origin="user"):
array.append("untracked item")
print(f"Initial undo stack size: {len(undo_manager.undo_stack)}")
# Expand scope to include array
undo_manager.expand_scope(array)
# Now array changes will be tracked
with doc.transaction(origin="user"):
array.append("tracked item")
text.insert(0, "More ")
print(f"After scope expansion: {len(undo_manager.undo_stack)}")
# Add map to scope
undo_manager.expand_scope(map_data)
with doc.transaction(origin="user"):
map_data["key1"] = "value1"
map_data["key2"] = "value2"
print(f"After adding map: {len(undo_manager.undo_stack)}")
# Undo all tracked operations
print("\nUndoing all operations:")
while undo_manager.can_undo():
print(f"Before undo: text='{str(text)}', array={list(array)}, map={dict(map_data.items())}")
undo_manager.undo()
# The first array item remains (was added before array was in scope)
print(f"Final state: text='{str(text)}', array={list(array)}, map={dict(map_data.items())}")from pycrdt import Doc, Text, UndoManager, StickyIndex, Assoc
class PositionAwareEditor:
"""Editor that maintains cursor position through undo/redo."""
def __init__(self, doc: Doc):
self.doc = doc
self.text = doc.get("content", type=Text)
self.undo_manager = UndoManager(doc=doc, scopes=[self.text])
self.cursor_pos = self.text.sticky_index(0, Assoc.AFTER)
def insert_text(self, text: str, origin="user"):
"""Insert text at cursor position."""
with self.doc.transaction(origin=origin):
pos = self.cursor_pos.get_index()
self.text.insert(pos, text)
# Update cursor to end of inserted text
self.cursor_pos = self.text.sticky_index(pos + len(text), Assoc.AFTER)
def delete_range(self, start: int, end: int, origin="user"):
"""Delete text range and update cursor."""
with self.doc.transaction(origin=origin):
del self.text[start:end]
# Move cursor to start of deleted range
self.cursor_pos = self.text.sticky_index(start, Assoc.AFTER)
def move_cursor(self, position: int):
"""Move cursor to specific position."""
position = max(0, min(position, len(self.text)))
self.cursor_pos = self.text.sticky_index(position, Assoc.AFTER)
def get_cursor_position(self) -> int:
"""Get current cursor position."""
return self.cursor_pos.get_index()
def undo(self):
"""Undo with cursor position awareness."""
if self.undo_manager.can_undo():
old_pos = self.get_cursor_position()
self.undo_manager.undo()
# Try to maintain reasonable cursor position
new_pos = min(old_pos, len(self.text))
self.move_cursor(new_pos)
return True
return False
def redo(self):
"""Redo with cursor position awareness."""
if self.undo_manager.can_redo():
old_pos = self.get_cursor_position()
self.undo_manager.redo()
# Try to maintain reasonable cursor position
new_pos = min(old_pos, len(self.text))
self.move_cursor(new_pos)
return True
return False
# Example usage
doc = Doc()
editor = PositionAwareEditor(doc)
print("Position-aware editing demo:")
# Type some text
editor.insert_text("Hello, ")
print(f"Text: '{str(editor.text)}', Cursor: {editor.get_cursor_position()}")
editor.insert_text("world!")
print(f"Text: '{str(editor.text)}', Cursor: {editor.get_cursor_position()}")
# Move cursor and insert
editor.move_cursor(7) # Between "Hello, " and "world!"
editor.insert_text("beautiful ")
print(f"Text: '{str(editor.text)}', Cursor: {editor.get_cursor_position()}")
# Undo operations
print("\nUndoing operations:")
while editor.undo():
print(f"Text: '{str(editor.text)}', Cursor: {editor.get_cursor_position()}")
# Redo operations
print("\nRedoing operations:")
while editor.redo():
print(f"Text: '{str(editor.text)}', Cursor: {editor.get_cursor_position()}")from pycrdt import Doc, Text, StickyIndex, UndoManager, Assoc
doc = Doc()
text = doc.get("content", type=Text)
try:
# Invalid sticky index operations
invalid_pos = text.sticky_index(-1, Assoc.AFTER) # May raise ValueError
except ValueError as e:
print(f"StickyIndex error: {e}")
try:
# Invalid undo manager operations
undo_manager = UndoManager(doc=doc, scopes=[text])
# Try to undo when nothing to undo
if not undo_manager.can_undo():
result = undo_manager.undo() # Returns False, doesn't raise
print(f"Undo result when nothing to undo: {result}")
# Invalid scope expansion
undo_manager.expand_scope(None) # May raise TypeError
except (TypeError, ValueError) as e:
print(f"UndoManager error: {e}")
try:
# Position encoding/decoding errors
pos = text.sticky_index(0, Assoc.AFTER)
invalid_data = b"invalid"
StickyIndex.decode(invalid_data, text) # May raise decoding error
except Exception as e:
print(f"Position decoding error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-pycrdt