Python bindings for Yrs CRDT library providing collaborative data structures for real-time synchronization.
—
The Array type in pycrdt provides collaborative array/list functionality with automatic conflict resolution across multiple clients. It supports a complete list-like interface with additional collaborative features like move operations, change tracking, and type-safe variants.
Collaborative array with list-like interface and change tracking.
class Array[T]:
def __init__(
self,
init: list[T] | None = None,
*,
_doc: Doc | None = None,
_integrated: _Array | None = None,
) -> None:
"""
Create a new collaborative array.
Args:
init (list, optional): Initial array contents
_doc (Doc, optional): Parent document
_integrated (_Array, optional): Native array instance
"""
# List-like interface
def __len__(self) -> int:
"""Get the length of the array."""
def __str__(self) -> str:
"""Get string representation of the array."""
def __iter__(self) -> ArrayIterator:
"""Iterate over array elements."""
def __contains__(self, item: T) -> bool:
"""Check if item exists in array."""
def __getitem__(self, key: int | slice) -> T | list[T]:
"""Get element or slice by index."""
def __setitem__(self, key: int | slice, value: T | list[T]) -> None:
"""Set element or slice by index."""
def __delitem__(self, key: int | slice) -> None:
"""Delete element or slice by index."""
def __add__(self, value: list[T]) -> Array[T]:
"""Concatenate with another list."""
def __radd__(self, value: list[T]) -> Array[T]:
"""Right-side concatenation with another list."""
# Array manipulation methods
def append(self, value: T) -> None:
"""
Append an element to the end of the array.
Args:
value: Element to append
"""
def extend(self, value: list[T]) -> None:
"""
Extend the array with elements from an iterable.
Args:
value (list): Elements to add to the array
"""
def insert(self, index: int, object: T) -> None:
"""
Insert an element at the specified index.
Args:
index (int): Position to insert element
object: Element to insert
"""
def pop(self, index: int = -1) -> T:
"""
Remove and return element at index (default last).
Args:
index (int): Index of element to remove
Returns:
T: Removed element
"""
def move(self, source_index: int, destination_index: int) -> None:
"""
Move an element from source to destination index.
Args:
source_index (int): Current position of element
destination_index (int): New position for element
"""
def clear(self) -> None:
"""Remove all elements from the array."""
def to_py(self) -> list[T] | None:
"""
Convert array to a Python list.
Returns:
list | None: Array contents as list, or None if empty
"""
def observe(self, callback: Callable[[ArrayEvent], None]) -> Subscription:
"""
Observe array changes.
Args:
callback: Function called when array changes occur
Returns:
Subscription: Handle for unsubscribing
"""
def observe_deep(self, callback: Callable[[list[ArrayEvent]], None]) -> Subscription:
"""
Observe deep changes including nested structures.
Args:
callback: Function called with list of change events
Returns:
Subscription: Handle for unsubscribing
"""
def unobserve(self, subscription: Subscription) -> None:
"""
Remove an event observer.
Args:
subscription: Subscription handle to remove
"""
async def events(
self,
deep: bool = False,
max_buffer_size: float = float("inf")
) -> MemoryObjectReceiveStream:
"""
Get an async stream of array events.
Args:
deep (bool): Include deep change events
max_buffer_size (float): Maximum event buffer size
Returns:
MemoryObjectReceiveStream: Async event stream
"""
def sticky_index(self, index: int, assoc: Assoc = Assoc.AFTER) -> StickyIndex:
"""
Create a sticky index that maintains its position during edits.
Args:
index (int): Initial index position
assoc (Assoc): Association type (BEFORE or AFTER)
Returns:
StickyIndex: Persistent position tracker
"""Event emitted when array changes occur.
class ArrayEvent:
@property
def target(self) -> Array:
"""Get the array that changed."""
@property
def delta(self) -> list[dict[str, Any]]:
"""
Get the delta describing the changes.
Delta format:
- {"retain": n} - Keep n elements unchanged
- {"insert": [items]} - Insert elements
- {"delete": n} - Delete n elements
"""
@property
def path(self) -> list[int | str]:
"""Get the path to the changed array within the document structure."""Type-safe wrapper for Array with typed elements.
class TypedArray[T]:
"""
Type-safe array container with runtime type checking.
Usage:
class StringArray(TypedArray[str]):
type = str # Define element type
array = StringArray()
array.append("hello") # Type-safe
item: str = array[0] # Typed access
"""from pycrdt import Doc, Array
doc = Doc()
array = doc.get("items", type=Array)
# Basic list operations
array.append("item1")
array.append("item2")
array.extend(["item3", "item4"])
print(len(array)) # 4
# List-like access
print(array[0]) # "item1"
print(array[-1]) # "item4"
print(array[1:3]) # ["item2", "item3"]
# Modification
array[1] = "modified_item2"
array.insert(2, "inserted_item")
# Check contents
print("item1" in array) # True
print(list(array)) # All elementsfrom pycrdt import Doc, Array
doc = Doc()
numbers = doc.get("numbers", type=Array)
# Build array
numbers.extend([1, 2, 3, 4, 5])
# Move operations (unique to collaborative arrays)
numbers.move(0, 4) # Move first element to end
print(list(numbers)) # [2, 3, 4, 5, 1]
# Pop operations
last = numbers.pop() # Remove and return last element
first = numbers.pop(0) # Remove and return first element
print(f"Removed: {first}, {last}")
# Slice operations
numbers[1:3] = [10, 20, 30] # Replace slice
del numbers[2:4] # Delete slicefrom pycrdt import Doc, Array, Map
doc = Doc()
users = doc.get("users", type=Array)
# Add user objects
user1 = Map()
user1["name"] = "Alice"
user1["age"] = 30
users.append(user1)
user2 = Map()
user2["name"] = "Bob"
user2["age"] = 25
users.append(user2)
# Access nested data
print(users[0]["name"]) # "Alice"
print(users[1]["age"]) # 25
# Modify nested structures
users[0]["age"] = 31
print(users[0]["age"]) # 31from pycrdt import TypedArray, Doc
class NumberList(TypedArray[int]):
type = int
class StringList(TypedArray[str]):
type = str
doc = Doc()
# Create typed arrays
numbers = NumberList()
strings = StringList()
# Type-safe operations
numbers.append(42) # OK
strings.append("hello") # OK
try:
numbers.append("string") # May raise TypeError
except TypeError as e:
print(f"Type error: {e}")
# Typed access
first_number: int = numbers[0] # Typed
first_string: str = strings[0] # Typedfrom pycrdt import Doc, Array, Assoc
doc = Doc()
tasks = doc.get("tasks", type=Array)
tasks.extend(["Task 1", "Task 2", "Task 3", "Task 4"])
# Create sticky indices
important_pos = tasks.sticky_index(1, Assoc.BEFORE) # Before "Task 2"
end_pos = tasks.sticky_index(3, Assoc.AFTER) # After "Task 3"
# Insert elements
tasks.insert(0, "Urgent Task")
tasks.append("Final Task")
# Check positions (they maintain relative positions)
with doc.transaction() as txn:
important_idx = important_pos.get_index(txn)
end_idx = end_pos.get_index(txn)
print(f"Important task at: {important_idx}") # Adjusted index
print(f"End position at: {end_idx}") # Adjusted indexfrom pycrdt import Doc, Array, ArrayEvent
doc = Doc()
array = doc.get("items", type=Array)
def on_array_change(event: ArrayEvent):
print(f"Array changed: {event.target}")
print(f"Delta: {event.delta}")
for op in event.delta:
if "retain" in op:
print(f" Retain {op['retain']} elements")
elif "insert" in op:
items = op["insert"]
print(f" Insert {len(items)} elements: {items}")
elif "delete" in op:
print(f" Delete {op['delete']} elements")
# Subscribe to changes
subscription = array.observe(on_array_change)
# Make changes to trigger events
array.append("item1")
array.extend(["item2", "item3"])
array.move(0, 2)
array.pop()
# Clean up
array.unobserve(subscription)import anyio
from pycrdt import Doc, Array
async def monitor_array_changes(array: Array):
async with array.events() as event_stream:
async for event in event_stream:
print(f"Array event: {event.delta}")
doc = Doc()
array = doc.get("items", type=Array)
async def main():
async with anyio.create_task_group() as tg:
tg.start_soon(monitor_array_changes, array)
# Make changes
await anyio.sleep(0.1)
array.append("item1")
await anyio.sleep(0.1)
array.extend(["item2", "item3"])
await anyio.sleep(0.1)
anyio.run(main)from pycrdt import Doc, Array
# Simulate two clients editing the same array
doc1 = Doc(client_id=1)
doc2 = Doc(client_id=2)
array1 = doc1.get("shared_list", type=Array)
array2 = doc2.get("shared_list", type=Array)
# Client 1 adds items
with doc1.transaction(origin="client1"):
array1.extend([1, 2, 3])
# Sync to client 2
update = doc1.get_update()
doc2.apply_update(update)
print(list(array2)) # [1, 2, 3]
# Client 2 makes concurrent changes
with doc2.transaction(origin="client2"):
array2.insert(0, 0) # Insert at beginning
array2.append(4) # Add to end
array2.move(2, 0) # Move element
# Sync back to client 1
update = doc2.get_update(doc1.get_state())
doc1.apply_update(update)
# Both clients now have the same state
print(f"Client 1: {list(array1)}")
print(f"Client 2: {list(array2)}")from pycrdt import Doc, Array, Map
doc = Doc()
inventory = doc.get("inventory", type=Array)
# Build complex inventory data
items = [
{"name": "Widget A", "price": 10.99, "quantity": 50},
{"name": "Widget B", "price": 15.99, "quantity": 30},
{"name": "Widget C", "price": 8.99, "quantity": 75},
]
for item_data in items:
item = Map()
for key, value in item_data.items():
item[key] = value
inventory.append(item)
# Process inventory
def calculate_total_value(inventory: Array) -> float:
"""Calculate total inventory value."""
total = 0.0
for item in inventory:
price = item["price"]
quantity = item["quantity"]
total += price * quantity
return total
print(f"Total inventory value: ${calculate_total_value(inventory):.2f}")
# Update quantities
def update_quantity(inventory: Array, name: str, new_quantity: int):
"""Update quantity for a specific item."""
for item in inventory:
if item["name"] == name:
item["quantity"] = new_quantity
break
update_quantity(inventory, "Widget A", 45)
print(f"Updated inventory value: ${calculate_total_value(inventory):.2f}")
# Add new items
new_item = Map()
new_item["name"] = "Widget D"
new_item["price"] = 12.99
new_item["quantity"] = 20
inventory.append(new_item)from pycrdt import Doc, Array
doc = Doc()
numbers = doc.get("numbers", type=Array)
numbers.extend([3, 1, 4, 1, 5, 9, 2, 6])
# Sort array (collaborative way)
def collaborative_sort(array: Array, key=None, reverse=False):
"""Sort array in place using collaborative moves."""
# Get current elements
elements = list(array)
# Get sorted indices
sorted_indices = sorted(range(len(elements)),
key=lambda i: elements[i] if key is None else key(elements[i]),
reverse=reverse)
# Apply moves to achieve sorted order
for target_pos, source_pos in enumerate(sorted_indices):
if source_pos != target_pos:
# Find current position of the element we want to move
current_pos = source_pos
for i, idx in enumerate(sorted_indices[:target_pos]):
if idx < source_pos:
current_pos -= 1
if current_pos != target_pos:
array.move(current_pos, target_pos)
# Sort the array
collaborative_sort(numbers)
print(f"Sorted: {list(numbers)}")
# Filter and rebuild (non-collaborative approach)
def filter_array(array: Array, predicate) -> Array:
"""Create new array with filtered elements."""
filtered = Array()
for element in array:
if predicate(element):
filtered.append(element)
return filtered
even_numbers = filter_array(numbers, lambda x: x % 2 == 0)
print(f"Even numbers: {list(even_numbers)}")Array changes are represented as delta operations:
# Example delta operations
delta_examples = [
{"retain": 2}, # Keep 2 elements
{"insert": ["a", "b"]}, # Insert elements
{"delete": 1}, # Delete 1 element
]
# Processing deltas
def apply_delta(array: Array, delta: list[dict]):
"""Apply a delta to array (conceptual example)."""
pos = 0
for op in delta:
if "retain" in op:
pos += op["retain"]
elif "insert" in op:
items = op["insert"]
for i, item in enumerate(items):
array.insert(pos + i, item)
pos += len(items)
elif "delete" in op:
for _ in range(op["delete"]):
del array[pos]from pycrdt import Doc, Array
doc = Doc()
array = doc.get("items", type=Array)
try:
# Invalid index operations
array.insert(-1, "invalid") # May raise ValueError
# Out of bounds access
item = array[100] # May raise IndexError
# Invalid move operations
array.move(0, 100) # May raise ValueError
# Pop from empty array
array.clear()
array.pop() # May raise IndexError
except (ValueError, IndexError, TypeError) as e:
print(f"Array operation failed: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-pycrdt