CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pycrdt

Python bindings for Yrs CRDT library providing collaborative data structures for real-time synchronization.

Pending
Overview
Eval results
Files

position-undo.mddocs/

Position Management & Undo

Overview

pycrdt provides robust position tracking and undo/redo functionality essential for collaborative editing. StickyIndex enables persistent position tracking that survives concurrent edits, while UndoManager provides comprehensive undo/redo operations with origin filtering and scope control. These features are crucial for building user-friendly collaborative editors.

Position Management

StickyIndex

Persistent position tracker that maintains its location during concurrent edits.

class StickyIndex:
    """
    Permanent position that maintains location during concurrent updates.
    
    StickyIndex represents a position in a sequence (Text or Array) that 
    automatically adjusts when other clients make concurrent edits.
    """
    
    @property
    def assoc(self) -> Assoc:
        """Get the association type (before/after) for this position."""

    def get_index(self, transaction: Transaction | None = None) -> int:
        """
        Get the current index position.
        
        Args:
            transaction (Transaction, optional): Transaction context for reading position
            
        Returns:
            int: Current index position
        """

    def encode(self) -> bytes:
        """
        Encode the sticky index to binary format.
        
        Returns:
            bytes: Encoded position data
        """

    def to_json(self) -> dict:
        """
        Convert sticky index to JSON-serializable format.
        
        Returns:
            dict: JSON representation of the position
        """

    @classmethod
    def new(cls, sequence: Sequence, index: int, assoc: Assoc) -> Self:
        """
        Create a new sticky index for a sequence.
        
        Args:
            sequence (Sequence): Text or Array to create position for
            index (int): Initial position index
            assoc (Assoc): Association type (BEFORE or AFTER)
            
        Returns:
            StickyIndex: New sticky index instance
        """

    @classmethod
    def decode(cls, data: bytes, sequence: Sequence | None = None) -> Self:
        """
        Decode a sticky index from binary data.
        
        Args:
            data (bytes): Encoded position data
            sequence (Sequence, optional): Sequence to attach position to
            
        Returns:
            StickyIndex: Decoded sticky index
        """

    @classmethod
    def from_json(cls, data: dict, sequence: Sequence | None = None) -> Self:
        """
        Create sticky index from JSON data.
        
        Args:
            data (dict): JSON representation of position
            sequence (Sequence, optional): Sequence to attach position to
            
        Returns:
            StickyIndex: Sticky index from JSON data
        """

Assoc

Association type for sticky positions.

class Assoc(IntEnum):
    """Specifies whether sticky index associates before or after position."""
    
    AFTER = 0   # Associate with item after the position
    BEFORE = -1 # Associate with item before the position

Undo Management

UndoManager

Comprehensive undo/redo operations with origin filtering and scope control.

class UndoManager:
    def __init__(
        self,
        *,
        doc: Doc | None = None,
        scopes: list[BaseType] = [],
        capture_timeout_millis: int = 500,
        timestamp: Callable[[], int] = timestamp,
    ) -> None:
        """
        Create an undo manager for document operations.
        
        Args:
            doc (Doc, optional): Document to track changes for
            scopes (list[BaseType]): List of shared types to track
            capture_timeout_millis (int): Timeout for capturing operations into single undo step
            timestamp (Callable): Function to generate timestamps
        """

    @property
    def undo_stack(self) -> list[StackItem]:
        """Get the list of undoable operations."""

    @property
    def redo_stack(self) -> list[StackItem]:
        """Get the list of redoable operations."""

    def expand_scope(self, scope: BaseType) -> None:
        """
        Add a shared type to the undo manager's scope.
        
        Args:
            scope (BaseType): Shared type to start tracking
        """

    def include_origin(self, origin: Any) -> None:
        """
        Include operations with specific origin in undo tracking.
        
        Args:
            origin: Origin identifier to include
        """

    def exclude_origin(self, origin: Any) -> None:
        """
        Exclude operations with specific origin from undo tracking.
        
        Args:
            origin: Origin identifier to exclude
        """

    def can_undo(self) -> bool:
        """
        Check if there are operations that can be undone.
        
        Returns:
            bool: True if undo is possible
        """

    def undo(self) -> bool:
        """
        Undo the last operation.
        
        Returns:
            bool: True if undo was performed
        """

    def can_redo(self) -> bool:
        """
        Check if there are operations that can be redone.
        
        Returns:
            bool: True if redo is possible
        """

    def redo(self) -> bool:
        """
        Redo the last undone operation.
        
        Returns:
            bool: True if redo was performed
        """

    def clear(self) -> None:
        """Clear all undo and redo history."""

StackItem

Undo stack item containing reversible operations.

class StackItem:
    """
    Represents a single undoable operation or group of operations.
    
    StackItems are created automatically by the UndoManager and contain
    the information needed to reverse document changes.
    """

Usage Examples

Basic Position Tracking

from pycrdt import Doc, Text, StickyIndex, Assoc

doc = Doc()
text = doc.get("content", type=Text)

# Create initial content
text.insert(0, "Hello, world! This is a test document.")

# Create sticky positions
start_pos = text.sticky_index(7, Assoc.BEFORE)  # Before "world"
end_pos = text.sticky_index(12, Assoc.AFTER)    # After "world"
cursor_pos = text.sticky_index(20, Assoc.AFTER) # After "This"

print(f"Initial positions: start={start_pos.get_index()}, end={end_pos.get_index()}, cursor={cursor_pos.get_index()}")

# Make edits that affect positions
text.insert(0, "Well, ")  # Insert at beginning
print(f"After insert at 0: start={start_pos.get_index()}, end={end_pos.get_index()}, cursor={cursor_pos.get_index()}")

text.insert(15, "beautiful ")  # Insert within tracked region
print(f"After insert at 15: start={start_pos.get_index()}, end={end_pos.get_index()}, cursor={cursor_pos.get_index()}")

# Delete text before tracked positions
del text[0:6]  # Remove "Well, "
print(f"After delete: start={start_pos.get_index()}, end={end_pos.get_index()}, cursor={cursor_pos.get_index()}")

# Get text at tracked positions
with doc.transaction() as txn:
    start_idx = start_pos.get_index(txn)
    end_idx = end_pos.get_index(txn) 
    tracked_text = text[start_idx:end_idx]
    print(f"Tracked text: '{tracked_text}'")

Position Persistence

import json
from pycrdt import Doc, Text, StickyIndex, Assoc

doc = Doc()
text = doc.get("content", type=Text)
text.insert(0, "Persistent position tracking example.")

# Create positions
bookmark = text.sticky_index(10, Assoc.AFTER)
selection_start = text.sticky_index(15, Assoc.BEFORE)
selection_end = text.sticky_index(23, Assoc.AFTER)

# Serialize positions
bookmark_data = bookmark.to_json()
selection_data = {
    "start": selection_start.to_json(),
    "end": selection_end.to_json()
}

positions_json = json.dumps({
    "bookmark": bookmark_data,
    "selection": selection_data
}, indent=2)

print(f"Serialized positions:\n{positions_json}")

# Make changes to document
text.insert(5, "NEW ")
text.insert(30, " UPDATED")

# Deserialize positions in new session
doc2 = Doc()  
text2 = doc2.get("content", type=Text)

# Apply the changes to new document
update = doc.get_update()
doc2.apply_update(update)

# Restore positions
positions_data = json.loads(positions_json)
restored_bookmark = StickyIndex.from_json(positions_data["bookmark"], text2)
restored_start = StickyIndex.from_json(positions_data["selection"]["start"], text2)
restored_end = StickyIndex.from_json(positions_data["selection"]["end"], text2)

print(f"Restored bookmark: {restored_bookmark.get_index()}")
print(f"Restored selection: {restored_start.get_index()}-{restored_end.get_index()}")

# Verify positions still track correctly
with doc2.transaction() as txn:
    start_idx = restored_start.get_index(txn)
    end_idx = restored_end.get_index(txn)
    selected_text = text2[start_idx:end_idx]
    print(f"Selected text: '{selected_text}'")

Basic Undo/Redo Operations

from pycrdt import Doc, Text, Array, UndoManager

doc = Doc()
text = doc.get("content", type=Text)
array = doc.get("items", type=Array)

# Create undo manager
undo_manager = UndoManager(doc=doc, scopes=[text, array])

# Make some changes
with doc.transaction(origin="user"):
    text.insert(0, "Hello, world!")

print(f"Text: {str(text)}")
print(f"Can undo: {undo_manager.can_undo()}")

# Make more changes
with doc.transaction(origin="user"):
    array.extend(["item1", "item2", "item3"])

print(f"Array: {list(array)}")

# Undo last operation
if undo_manager.can_undo():
    undo_manager.undo()
    print(f"After undo - Array: {list(array)}")
    print(f"Can redo: {undo_manager.can_redo()}")

# Undo text changes
if undo_manager.can_undo():
    undo_manager.undo()
    print(f"After undo - Text: '{str(text)}'")

# Redo operations
if undo_manager.can_redo():
    undo_manager.redo()
    print(f"After redo - Text: '{str(text)}'")

if undo_manager.can_redo():
    undo_manager.redo()
    print(f"After redo - Array: {list(array)}")

Origin Filtering

from pycrdt import Doc, Text, UndoManager

doc = Doc()
text = doc.get("content", type=Text)

# Create undo manager that only tracks user operations
undo_manager = UndoManager(doc=doc, scopes=[text])
undo_manager.include_origin("user")  # Only track "user" origin

# Make changes with different origins
with doc.transaction(origin="user"):
    text.insert(0, "User change 1")

with doc.transaction(origin="system"):
    text.insert(0, "System change - ")  # This won't be tracked

with doc.transaction(origin="user"):
    text.insert(len(text), " - User change 2")

print(f"Final text: {str(text)}")
print(f"Undo stack size: {len(undo_manager.undo_stack)}")

# Undo - should only undo user changes
undo_manager.undo()
print(f"After first undo: {str(text)}")

undo_manager.undo()  
print(f"After second undo: {str(text)}")

# System change remains because it wasn't tracked
print(f"System change still present: {'System change' in str(text)}")

Collaborative Undo with Position Tracking

from pycrdt import Doc, Text, UndoManager, StickyIndex, Assoc

# Simulate collaborative editing with undo
doc1 = Doc(client_id=1)
doc2 = Doc(client_id=2)

text1 = doc1.get("document", type=Text)
text2 = doc2.get("document", type=Text)

# Create undo managers for each client
undo1 = UndoManager(doc=doc1, scopes=[text1])
undo1.include_origin("client1")

undo2 = UndoManager(doc=doc2, scopes=[text2])
undo2.include_origin("client2")

# Create position trackers
cursor1 = None
cursor2 = None

# Client 1 makes initial changes
with doc1.transaction(origin="client1"):
    text1.insert(0, "Collaborative document. ")
    cursor1 = text1.sticky_index(len(text1), Assoc.AFTER)

# Sync to client 2
update = doc1.get_update()
doc2.apply_update(update)
cursor2 = text2.sticky_index(10, Assoc.AFTER)  # Position at "document"

print(f"Initial state: '{str(text1)}'")

# Client 2 makes concurrent changes
with doc2.transaction(origin="client2"):
    pos = cursor2.get_index()
    text2.insert(pos, " EDITED")

# Client 1 continues editing
with doc1.transaction(origin="client1"):
    pos = cursor1.get_index()
    text1.insert(pos, "More content from client 1.")

# Sync changes
update1 = doc1.get_update(doc2.get_state())
update2 = doc2.get_update(doc1.get_state())

doc2.apply_update(update1)
doc1.apply_update(update2)

print(f"After collaboration: '{str(text1)}'")
print(f"Client 2 sees: '{str(text2)}'")

# Each client can undo their own changes
print(f"Client 1 can undo: {undo1.can_undo()}")
print(f"Client 2 can undo: {undo2.can_undo()}")

# Client 1 undoes their changes
if undo1.can_undo():
    undo1.undo()  # Undo "More content from client 1."
    print(f"Client 1 after undo: '{str(text1)}'")

if undo1.can_undo():
    undo1.undo()  # Undo initial text
    print(f"Client 1 after second undo: '{str(text1)}'")

# Client 2's changes remain
print(f"Client 2's changes still present: {'EDITED' in str(text1)}")

Advanced Undo Manager Configuration

from pycrdt import Doc, Text, Array, Map, UndoManager
import time

def custom_timestamp():
    """Custom timestamp function."""
    return int(time.time() * 1000)

doc = Doc()
text = doc.get("text", type=Text)
array = doc.get("array", type=Array)
map_data = doc.get("map", type=Map)

# Create undo manager with custom configuration
undo_manager = UndoManager(
    doc=doc,
    scopes=[text, array],  # Only track text and array, not map
    capture_timeout_millis=1000,  # Group operations within 1 second
    timestamp=custom_timestamp
)

# Configure origin filtering
undo_manager.include_origin("user")
undo_manager.exclude_origin("auto-save")

print("Making grouped changes (within timeout)...")

# Make changes quickly (will be grouped)
start_time = time.time()
with doc.transaction(origin="user"):
    text.insert(0, "Quick ")

with doc.transaction(origin="user"):
    text.insert(6, "changes ")

with doc.transaction(origin="user"):
    array.append("item1")

elapsed = (time.time() - start_time) * 1000
print(f"Changes made in {elapsed:.1f}ms")

# Make changes to untracked type
with doc.transaction(origin="user"):
    map_data["key"] = "value"  # Not tracked

print(f"Undo stack size: {len(undo_manager.undo_stack)}")

# Wait for timeout then make another change
time.sleep(1.1)  # Exceed capture timeout

with doc.transaction(origin="user"):
    text.insert(len(text), "separate")

print(f"Undo stack size after timeout: {len(undo_manager.undo_stack)}")

# Make auto-save change (will be ignored)
with doc.transaction(origin="auto-save"):
    text.insert(0, "[SAVED] ")

print(f"Text after auto-save: '{str(text)}'")
print(f"Undo stack size (auto-save ignored): {len(undo_manager.undo_stack)}")

# Undo operations
print("\nUndoing operations:")
while undo_manager.can_undo():
    undo_manager.undo()
    print(f"Text: '{str(text)}', Array: {list(array)}")

# Auto-save prefix remains (wasn't tracked)
print(f"Auto-save change remains: {str(text).startswith('[SAVED]')}")

Undo Manager Scope Management

from pycrdt import Doc, Text, Array, Map, UndoManager

doc = Doc()
text = doc.get("text", type=Text)
array = doc.get("array", type=Array)
map_data = doc.get("map", type=Map)

# Create undo manager with initial scope
undo_manager = UndoManager(doc=doc, scopes=[text])

# Make changes to tracked type
with doc.transaction(origin="user"):
    text.insert(0, "Tracked text")

# Make changes to untracked type
with doc.transaction(origin="user"):
    array.append("untracked item")

print(f"Initial undo stack size: {len(undo_manager.undo_stack)}")

# Expand scope to include array
undo_manager.expand_scope(array)

# Now array changes will be tracked
with doc.transaction(origin="user"):
    array.append("tracked item")
    text.insert(0, "More ")

print(f"After scope expansion: {len(undo_manager.undo_stack)}")

# Add map to scope
undo_manager.expand_scope(map_data)

with doc.transaction(origin="user"):
    map_data["key1"] = "value1"
    map_data["key2"] = "value2"

print(f"After adding map: {len(undo_manager.undo_stack)}")

# Undo all tracked operations
print("\nUndoing all operations:")
while undo_manager.can_undo():
    print(f"Before undo: text='{str(text)}', array={list(array)}, map={dict(map_data.items())}")
    undo_manager.undo()

# The first array item remains (was added before array was in scope)
print(f"Final state: text='{str(text)}', array={list(array)}, map={dict(map_data.items())}")

Position-Aware Undo Operations

from pycrdt import Doc, Text, UndoManager, StickyIndex, Assoc

class PositionAwareEditor:
    """Editor that maintains cursor position through undo/redo."""
    
    def __init__(self, doc: Doc):
        self.doc = doc
        self.text = doc.get("content", type=Text)
        self.undo_manager = UndoManager(doc=doc, scopes=[self.text])
        self.cursor_pos = self.text.sticky_index(0, Assoc.AFTER)
        
    def insert_text(self, text: str, origin="user"):
        """Insert text at cursor position."""
        with self.doc.transaction(origin=origin):
            pos = self.cursor_pos.get_index()
            self.text.insert(pos, text)
            # Update cursor to end of inserted text
            self.cursor_pos = self.text.sticky_index(pos + len(text), Assoc.AFTER)
    
    def delete_range(self, start: int, end: int, origin="user"):
        """Delete text range and update cursor."""
        with self.doc.transaction(origin=origin):
            del self.text[start:end]
            # Move cursor to start of deleted range
            self.cursor_pos = self.text.sticky_index(start, Assoc.AFTER)
    
    def move_cursor(self, position: int):
        """Move cursor to specific position."""
        position = max(0, min(position, len(self.text)))
        self.cursor_pos = self.text.sticky_index(position, Assoc.AFTER)
    
    def get_cursor_position(self) -> int:
        """Get current cursor position."""
        return self.cursor_pos.get_index()
    
    def undo(self):
        """Undo with cursor position awareness."""
        if self.undo_manager.can_undo():
            old_pos = self.get_cursor_position()
            self.undo_manager.undo()
            # Try to maintain reasonable cursor position
            new_pos = min(old_pos, len(self.text))
            self.move_cursor(new_pos)
            return True
        return False
    
    def redo(self):
        """Redo with cursor position awareness."""
        if self.undo_manager.can_redo():
            old_pos = self.get_cursor_position()
            self.undo_manager.redo()
            # Try to maintain reasonable cursor position
            new_pos = min(old_pos, len(self.text))
            self.move_cursor(new_pos)
            return True
        return False

# Example usage
doc = Doc()
editor = PositionAwareEditor(doc)

print("Position-aware editing demo:")

# Type some text
editor.insert_text("Hello, ")
print(f"Text: '{str(editor.text)}', Cursor: {editor.get_cursor_position()}")

editor.insert_text("world!")
print(f"Text: '{str(editor.text)}', Cursor: {editor.get_cursor_position()}")

# Move cursor and insert
editor.move_cursor(7)  # Between "Hello, " and "world!"
editor.insert_text("beautiful ")
print(f"Text: '{str(editor.text)}', Cursor: {editor.get_cursor_position()}")

# Undo operations
print("\nUndoing operations:")
while editor.undo():
    print(f"Text: '{str(editor.text)}', Cursor: {editor.get_cursor_position()}")

# Redo operations
print("\nRedoing operations:")
while editor.redo():
    print(f"Text: '{str(editor.text)}', Cursor: {editor.get_cursor_position()}")

Error Handling

from pycrdt import Doc, Text, StickyIndex, UndoManager, Assoc

doc = Doc()
text = doc.get("content", type=Text)

try:
    # Invalid sticky index operations
    invalid_pos = text.sticky_index(-1, Assoc.AFTER)  # May raise ValueError
    
except ValueError as e:
    print(f"StickyIndex error: {e}")

try:
    # Invalid undo manager operations
    undo_manager = UndoManager(doc=doc, scopes=[text])
    
    # Try to undo when nothing to undo
    if not undo_manager.can_undo():
        result = undo_manager.undo()  # Returns False, doesn't raise
        print(f"Undo result when nothing to undo: {result}")
    
    # Invalid scope expansion
    undo_manager.expand_scope(None)  # May raise TypeError
    
except (TypeError, ValueError) as e:
    print(f"UndoManager error: {e}")

try:
    # Position encoding/decoding errors
    pos = text.sticky_index(0, Assoc.AFTER)
    invalid_data = b"invalid"
    
    StickyIndex.decode(invalid_data, text)  # May raise decoding error
    
except Exception as e:
    print(f"Position decoding error: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-pycrdt

docs

array-operations.md

awareness.md

document-management.md

index.md

map-operations.md

position-undo.md

synchronization.md

text-operations.md

xml-support.md

tile.json