Python bindings for Yrs CRDT library providing collaborative data structures for real-time synchronization.
—
The Map type in pycrdt provides collaborative dictionary/map functionality with automatic conflict resolution across multiple clients. It supports a complete dict-like interface with additional collaborative features like change tracking, deep observation, and type-safe variants.
Collaborative map with dict-like interface and change tracking.
class Map[T]:
def __init__(
self,
init: dict[str, T] | None = None,
*,
_doc: Doc | None = None,
_integrated: _Map | None = None,
) -> None:
"""
Create a new collaborative map.
Args:
init (dict, optional): Initial map contents
_doc (Doc, optional): Parent document
_integrated (_Map, optional): Native map instance
"""
# Dict-like interface
def __len__(self) -> int:
"""Get the number of key-value pairs in the map."""
def __str__(self) -> str:
"""Get string representation of the map."""
def __iter__(self) -> Iterable[str]:
"""Iterate over map keys."""
def __contains__(self, item: str) -> bool:
"""Check if key exists in map."""
def __getitem__(self, key: str) -> T:
"""Get value by key."""
def __setitem__(self, key: str, value: T) -> None:
"""Set value for key."""
def __delitem__(self, key: str) -> None:
"""Delete key-value pair."""
# Map manipulation methods
def get(self, key: str, default_value: T_DefaultValue = None) -> T | T_DefaultValue | None:
"""
Get value for key with optional default.
Args:
key (str): Key to lookup
default_value: Default value if key not found
Returns:
T | T_DefaultValue | None: Value or default
"""
def pop(self, key: str, default_value: T_DefaultValue = None) -> T | T_DefaultValue:
"""
Remove key and return its value.
Args:
key (str): Key to remove
default_value: Default value if key not found
Returns:
T | T_DefaultValue: Removed value or default
"""
def keys(self) -> Iterable[str]:
"""Get all keys in the map."""
def values(self) -> Iterable[T]:
"""Get all values in the map."""
def items(self) -> Iterable[tuple[str, T]]:
"""Get all key-value pairs as tuples."""
def clear(self) -> None:
"""Remove all key-value pairs from the map."""
def update(self, value: dict[str, T]) -> None:
"""
Update map with key-value pairs from another dict.
Args:
value (dict): Dictionary to merge into this map
"""
def to_py(self) -> dict[str, T] | None:
"""
Convert map to a Python dictionary.
Returns:
dict | None: Map contents as dict, or None if empty
"""
def observe(self, callback: Callable[[MapEvent], None]) -> Subscription:
"""
Observe map changes.
Args:
callback: Function called when map changes occur
Returns:
Subscription: Handle for unsubscribing
"""
def observe_deep(self, callback: Callable[[list[MapEvent]], None]) -> Subscription:
"""
Observe deep changes including nested structures.
Args:
callback: Function called with list of change events
Returns:
Subscription: Handle for unsubscribing
"""
def unobserve(self, subscription: Subscription) -> None:
"""
Remove an event observer.
Args:
subscription: Subscription handle to remove
"""
async def events(
self,
deep: bool = False,
max_buffer_size: float = float("inf")
) -> MemoryObjectReceiveStream:
"""
Get an async stream of map events.
Args:
deep (bool): Include deep change events
max_buffer_size (float): Maximum event buffer size
Returns:
MemoryObjectReceiveStream: Async event stream
"""Event emitted when map changes occur.
class MapEvent:
@property
def target(self) -> Map:
"""Get the map that changed."""
@property
def keys(self) -> list[str]:
"""Get the list of keys that changed."""
@property
def path(self) -> list[int | str]:
"""Get the path to the changed map within the document structure."""Type-safe wrapper for Map with typed attributes.
class TypedMap:
"""
Type-safe map container with runtime type checking.
Usage:
class UserMap(TypedMap):
name: str
age: int
active: bool
user = UserMap()
user.name = "Alice" # Type-safe
user.age = 30 # Type-safe
name: str = user.name # Typed access
"""from pycrdt import Doc, Map
doc = Doc()
config = doc.get("config", type=Map)
# Basic dict operations
config["theme"] = "dark"
config["font_size"] = 14
config["auto_save"] = True
print(len(config)) # 3
# Dict-like access
print(config["theme"]) # "dark"
print(config.get("missing")) # None
print(config.get("missing", "default")) # "default"
# Check keys
print("theme" in config) # True
print("color" in config) # False
# Iteration
for key in config:
print(f"{key}: {config[key]}")
for key, value in config.items():
print(f"{key} = {value}")from pycrdt import Doc, Map
doc = Doc()
settings = doc.get("settings", type=Map)
# Build settings
settings.update({
"width": 800,
"height": 600,
"fullscreen": False,
"vsync": True
})
# Modify individual settings
settings["width"] = 1024
settings["height"] = 768
# Remove settings
old_value = settings.pop("vsync")
print(f"Removed vsync: {old_value}")
# Remove with default
fps_limit = settings.pop("fps_limit", 60)
print(f"FPS limit: {fps_limit}")
# Delete by key
del settings["fullscreen"]
# Check current state
print(f"Current settings: {dict(settings.items())}")from pycrdt import Doc, Map, Array
doc = Doc()
user_profile = doc.get("profile", type=Map)
# Create nested structure
user_profile["personal"] = Map()
user_profile["personal"]["name"] = "Alice"
user_profile["personal"]["email"] = "alice@example.com"
user_profile["preferences"] = Map()
user_profile["preferences"]["theme"] = "dark"
user_profile["preferences"]["notifications"] = True
user_profile["tags"] = Array()
user_profile["tags"].extend(["developer", "python", "collaborative"])
# Access nested data
print(user_profile["personal"]["name"]) # "Alice"
print(user_profile["preferences"]["theme"]) # "dark"
print(list(user_profile["tags"])) # ["developer", "python", "collaborative"]
# Modify nested structures
user_profile["personal"]["age"] = 30
user_profile["tags"].append("crdt")from pycrdt import TypedMap, Doc
class UserProfile(TypedMap):
name: str
age: int
email: str
active: bool
class DatabaseConfig(TypedMap):
host: str
port: int
ssl_enabled: bool
timeout: float
doc = Doc()
# Create typed maps
user = UserProfile()
db_config = DatabaseConfig()
# Type-safe operations
user.name = "Bob" # OK
user.age = 25 # OK
user.email = "bob@test.com" # OK
user.active = True # OK
db_config.host = "localhost"
db_config.port = 5432
db_config.ssl_enabled = True
db_config.timeout = 30.0
try:
user.age = "not a number" # May raise TypeError
except TypeError as e:
print(f"Type error: {e}")
# Typed access
name: str = user.name # Typed
port: int = db_config.port # Typedfrom pycrdt import Doc, Map, MapEvent
doc = Doc()
data = doc.get("data", type=Map)
def on_map_change(event: MapEvent):
print(f"Map changed: {event.target}")
print(f"Changed keys: {event.keys}")
print(f"Path: {event.path}")
# Subscribe to changes
subscription = data.observe(on_map_change)
# Make changes to trigger events
data["key1"] = "value1"
data["key2"] = "value2"
data.update({"key3": "value3", "key4": "value4"})
del data["key1"]
# Clean up
data.unobserve(subscription)from pycrdt import Doc, Map, Array
doc = Doc()
root = doc.get("root", type=Map)
# Create nested structure
root["level1"] = Map()
root["level1"]["level2"] = Map()
root["level1"]["level2"]["data"] = Array()
def on_deep_change(events):
print(f"Deep changes detected: {len(events)} events")
for event in events:
if hasattr(event, 'keys'): # MapEvent
print(f" Map change at {event.path}: keys {event.keys}")
elif hasattr(event, 'delta'): # ArrayEvent
print(f" Array change at {event.path}: {event.delta}")
# Subscribe to deep changes
subscription = root.observe_deep(on_deep_change)
# Make nested changes
root["level1"]["level2"]["data"].append("item1")
root["level1"]["level2"]["new_key"] = "new_value"
root["level1"]["another_map"] = Map()
# Clean up
root.unobserve(subscription)import anyio
from pycrdt import Doc, Map
async def monitor_map_changes(map_obj: Map):
async with map_obj.events() as event_stream:
async for event in event_stream:
print(f"Map event: keys {event.keys}")
doc = Doc()
config = doc.get("config", type=Map)
async def main():
async with anyio.create_task_group() as tg:
tg.start_soon(monitor_map_changes, config)
# Make changes
await anyio.sleep(0.1)
config["setting1"] = "value1"
await anyio.sleep(0.1)
config.update({"setting2": "value2", "setting3": "value3"})
await anyio.sleep(0.1)
anyio.run(main)from pycrdt import Doc, Map
# Simulate two clients editing the same map
doc1 = Doc(client_id=1)
doc2 = Doc(client_id=2)
config1 = doc1.get("shared_config", type=Map)
config2 = doc2.get("shared_config", type=Map)
# Client 1 sets initial configuration
with doc1.transaction(origin="client1"):
config1.update({
"theme": "light",
"font_size": 12,
"auto_save": True
})
# Sync to client 2
update = doc1.get_update()
doc2.apply_update(update)
print(f"Client 2 config: {dict(config2.items())}")
# Client 2 makes concurrent changes
with doc2.transaction(origin="client2"):
config2["theme"] = "dark" # Conflict with client 1
config2["line_numbers"] = True # New setting
config2["font_size"] = 14 # Different value
# Both clients make more changes
with doc1.transaction(origin="client1"):
config1["word_wrap"] = True # New setting from client 1
with doc2.transaction(origin="client2"):
config2["auto_save"] = False # Change existing setting
# Sync changes
update1 = doc1.get_update(doc2.get_state())
update2 = doc2.get_update(doc1.get_state())
doc2.apply_update(update1)
doc1.apply_update(update2)
# Both clients now have consistent state
print(f"Client 1 final: {dict(config1.items())}")
print(f"Client 2 final: {dict(config2.items())}")from pycrdt import Doc, Map, Array
doc = Doc()
database = doc.get("database", type=Map)
# Create complex data structure
database["users"] = Map()
database["posts"] = Map()
database["comments"] = Map()
# Add users
users = database["users"]
users["1"] = Map()
users["1"].update({"name": "Alice", "email": "alice@test.com", "posts": []})
users["2"] = Map()
users["2"].update({"name": "Bob", "email": "bob@test.com", "posts": []})
# Add posts
posts = database["posts"]
posts["1"] = Map()
posts["1"].update({
"title": "First Post",
"content": "Hello, world!",
"author_id": "1",
"comments": []
})
posts["2"] = Map()
posts["2"].update({
"title": "Second Post",
"content": "More content",
"author_id": "2",
"comments": []
})
# Link posts to users
users["1"]["posts"].append("1")
users["2"]["posts"].append("2")
# Add comments
comments = database["comments"]
comments["1"] = Map()
comments["1"].update({
"content": "Great post!",
"author_id": "2",
"post_id": "1"
})
posts["1"]["comments"].append("1")
# Query operations
def get_user_posts(database: Map, user_id: str) -> list:
"""Get all posts by a user."""
user = database["users"][user_id]
post_ids = user["posts"]
posts_data = []
for post_id in post_ids:
post = database["posts"][post_id]
posts_data.append({
"id": post_id,
"title": post["title"],
"content": post["content"]
})
return posts_data
def get_post_with_comments(database: Map, post_id: str) -> dict:
"""Get post with all its comments."""
post = database["posts"][post_id]
comment_ids = post["comments"]
comments_data = []
for comment_id in comment_ids:
comment = database["comments"][comment_id]
author = database["users"][comment["author_id"]]
comments_data.append({
"content": comment["content"],
"author": author["name"]
})
return {
"title": post["title"],
"content": post["content"],
"comments": comments_data
}
# Use query operations
alice_posts = get_user_posts(database, "1")
print(f"Alice's posts: {alice_posts}")
first_post = get_post_with_comments(database, "1")
print(f"First post with comments: {first_post}")import json
from pycrdt import Doc, Map, Array
def serialize_map(map_obj: Map) -> dict:
"""Serialize a collaborative map to JSON-compatible dict."""
result = {}
for key, value in map_obj.items():
if isinstance(value, Map):
result[key] = serialize_map(value)
elif isinstance(value, Array):
result[key] = serialize_array(value)
else:
result[key] = value
return result
def serialize_array(array_obj: Array) -> list:
"""Serialize a collaborative array to JSON-compatible list."""
result = []
for item in array_obj:
if isinstance(item, Map):
result.append(serialize_map(item))
elif isinstance(item, Array):
result.append(serialize_array(item))
else:
result.append(item)
return result
def deserialize_to_map(data: dict, doc: Doc) -> Map:
"""Deserialize dict to collaborative map."""
map_obj = Map()
for key, value in data.items():
if isinstance(value, dict):
map_obj[key] = deserialize_to_map(value, doc)
elif isinstance(value, list):
map_obj[key] = deserialize_to_array(value, doc)
else:
map_obj[key] = value
return map_obj
def deserialize_to_array(data: list, doc: Doc) -> Array:
"""Deserialize list to collaborative array."""
array_obj = Array()
for item in data:
if isinstance(item, dict):
array_obj.append(deserialize_to_map(item, doc))
elif isinstance(item, list):
array_obj.append(deserialize_to_array(item, doc))
else:
array_obj.append(item)
return array_obj
# Example usage
doc = Doc()
config = doc.get("config", type=Map)
# Build complex configuration
config["database"] = Map()
config["database"]["host"] = "localhost"
config["database"]["port"] = 5432
config["features"] = Array()
config["features"].extend(["auth", "logging", "caching"])
# Serialize to JSON
config_dict = serialize_map(config)
json_str = json.dumps(config_dict, indent=2)
print(f"Serialized config:\n{json_str}")
# Deserialize back
loaded_data = json.loads(json_str)
new_doc = Doc()
restored_config = deserialize_to_map(loaded_data, new_doc)
print(f"Restored config: {dict(restored_config.items())}")from pycrdt import Doc, Map
doc = Doc()
data = doc.get("data", type=Map)
try:
# Key not found
value = data["nonexistent"] # May raise KeyError
# Invalid operations
del data["nonexistent"] # May raise KeyError
# Type mismatches in typed maps
class StrictMap(TypedMap):
number_field: int
strict_map = StrictMap()
strict_map.number_field = "string" # May raise TypeError
except (KeyError, TypeError, ValueError) as e:
print(f"Map operation failed: {e}")
# Safe operations
value = data.get("nonexistent", "default") # Returns "default"
removed = data.pop("nonexistent", None) # Returns NoneInstall with Tessl CLI
npx tessl i tessl/pypi-pycrdt