0
# Synchronization
1
2
## Overview
3
4
pycrdt provides comprehensive synchronization capabilities for sharing documents across multiple clients in real-time. The synchronization system includes message encoding/decoding, sync protocols, network providers, and transport abstractions. This enables building collaborative applications with automatic conflict resolution and efficient network communication.
5
6
## Core Types
7
8
### Message Type Enums
9
10
```python { .api }
11
class YMessageType(IntEnum):
12
"""Top-level message types for network protocol."""
13
SYNC = 0
14
AWARENESS = 1
15
16
class YSyncMessageType(IntEnum):
17
"""Sync-specific message types."""
18
SYNC_STEP1 = 0 # Send document state
19
SYNC_STEP2 = 1 # Reply with missing updates
20
SYNC_UPDATE = 2 # Document update broadcast
21
```
22
23
### Encoder
24
25
Binary message encoder for network protocols.
26
27
```python { .api }
28
class Encoder:
29
def __init__(self) -> None:
30
"""Create a new message encoder."""
31
32
def write_var_uint(self, num: int) -> None:
33
"""
34
Write a variable-length unsigned integer.
35
36
Args:
37
num (int): Integer to encode
38
"""
39
40
def write_var_string(self, text: str) -> None:
41
"""
42
Write a variable-length string.
43
44
Args:
45
text (str): String to encode
46
"""
47
48
def to_bytes(self) -> bytes:
49
"""
50
Get the encoded data as bytes.
51
52
Returns:
53
bytes: Encoded message data
54
"""
55
```
56
57
### Decoder
58
59
Binary message decoder for network protocols.
60
61
```python { .api }
62
class Decoder:
63
def __init__(self, stream: bytes) -> None:
64
"""
65
Create a new message decoder.
66
67
Args:
68
stream (bytes): Binary data to decode
69
"""
70
71
def read_var_uint(self) -> int:
72
"""
73
Read a variable-length unsigned integer.
74
75
Returns:
76
int: Decoded integer
77
"""
78
79
def read_message(self) -> bytes | None:
80
"""
81
Read a single message from the stream.
82
83
Returns:
84
bytes | None: Message data or None if no more messages
85
"""
86
87
def read_messages(self) -> Iterator[bytes]:
88
"""
89
Read all messages from the stream.
90
91
Returns:
92
Iterator[bytes]: Iterator over message data
93
"""
94
95
def read_var_string(self) -> str:
96
"""
97
Read a variable-length string.
98
99
Returns:
100
str: Decoded string
101
"""
102
```
103
104
### Channel Protocol
105
106
Transport abstraction for document synchronization.
107
108
```python { .api }
109
class Channel(Protocol):
110
"""Abstract interface for transport-agnostic synchronization channels."""
111
112
@property
113
def path(self) -> str:
114
"""Get the channel path identifier."""
115
116
async def send(self, message: bytes) -> None:
117
"""
118
Send a message through the channel.
119
120
Args:
121
message (bytes): Binary message to send
122
"""
123
124
async def recv(self) -> bytes:
125
"""
126
Receive a message from the channel.
127
128
Returns:
129
bytes: Received binary message
130
"""
131
132
def __aiter__(self) -> "Channel":
133
"""Return async iterator for receiving messages."""
134
135
async def __anext__(self) -> bytes:
136
"""Get next message from async iterator."""
137
```
138
139
### Provider
140
141
Document synchronization provider that manages network communication.
142
143
```python { .api }
144
class Provider:
145
def __init__(self, doc: Doc, channel: Channel, log: Logger | None = None) -> None:
146
"""
147
Create a new synchronization provider.
148
149
Args:
150
doc (Doc): Document to synchronize
151
channel (Channel): Transport channel for communication
152
log (Logger, optional): Logger for debugging
153
"""
154
155
@property
156
def started(self) -> Event:
157
"""Event that signals when the provider is ready."""
158
159
async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) -> None:
160
"""
161
Start the synchronization provider.
162
163
Args:
164
task_status: Optional task status for structured concurrency
165
"""
166
167
async def stop(self) -> None:
168
"""Stop the synchronization provider."""
169
170
async def __aenter__(self) -> Provider:
171
"""Enter async context manager."""
172
173
async def __aexit__(self, exc_type, exc_value, exc_tb) -> None:
174
"""Exit async context manager."""
175
```
176
177
## Synchronization Functions
178
179
### Message Creation Functions
180
181
```python { .api }
182
def write_var_uint(num: int) -> bytes:
183
"""
184
Encode a variable-length unsigned integer.
185
186
Args:
187
num (int): Integer to encode
188
189
Returns:
190
bytes: Encoded integer data
191
"""
192
193
def create_awareness_message(data: bytes) -> bytes:
194
"""
195
Create an awareness protocol message.
196
197
Args:
198
data (bytes): Awareness data to wrap
199
200
Returns:
201
bytes: Complete awareness message
202
"""
203
204
def create_sync_message(ydoc: Doc) -> bytes:
205
"""
206
Create a synchronization message from document state.
207
208
Args:
209
ydoc (Doc): Document to create sync message for
210
211
Returns:
212
bytes: Sync message containing document state
213
"""
214
215
def create_update_message(data: bytes) -> bytes:
216
"""
217
Create an update message containing document changes.
218
219
Args:
220
data (bytes): Update data to wrap
221
222
Returns:
223
bytes: Complete update message
224
"""
225
226
def handle_sync_message(message: bytes, ydoc: Doc) -> bytes | None:
227
"""
228
Handle an incoming synchronization message.
229
230
Args:
231
message (bytes): Incoming sync message
232
ydoc (Doc): Document to apply sync message to
233
234
Returns:
235
bytes | None: Response message if needed, or None
236
"""
237
238
def read_message(stream: bytes) -> bytes:
239
"""
240
Read a single message from binary stream.
241
242
Args:
243
stream (bytes): Binary stream containing message
244
245
Returns:
246
bytes: Extracted message data
247
"""
248
249
def write_message(stream: bytes) -> bytes:
250
"""
251
Write a message to binary stream format.
252
253
Args:
254
stream (bytes): Message data to wrap
255
256
Returns:
257
bytes: Formatted message stream
258
"""
259
```
260
261
## Update Utilities
262
263
Standalone functions for working with document states and updates.
264
265
```python { .api }
266
def get_state(update: bytes) -> bytes:
267
"""
268
Extract the state vector from an update.
269
270
Args:
271
update (bytes): Binary update data
272
273
Returns:
274
bytes: State vector representing the document state
275
"""
276
277
def get_update(update: bytes, state: bytes) -> bytes:
278
"""
279
Generate an update containing changes since a given state.
280
281
Args:
282
update (bytes): Source update data
283
state (bytes): State to compare against
284
285
Returns:
286
bytes: Binary update containing incremental changes
287
"""
288
289
def merge_updates(*updates: bytes) -> bytes:
290
"""
291
Merge multiple updates into a single consolidated update.
292
293
Args:
294
*updates (bytes): Variable number of update data streams
295
296
Returns:
297
bytes: Merged update containing all changes
298
299
Example:
300
>>> update1 = doc1.get_update()
301
>>> update2 = doc2.get_update()
302
>>> merged = merge_updates(update1, update2)
303
>>> doc3.apply_update(merged) # Apply all changes at once
304
"""
305
```
306
307
## Usage Examples
308
309
### Basic Message Encoding/Decoding
310
311
```python
312
from pycrdt import Encoder, Decoder, write_var_uint
313
314
# Encode messages
315
encoder = Encoder()
316
encoder.write_var_uint(42)
317
encoder.write_var_string("Hello, world!")
318
encoded_data = encoder.to_bytes()
319
320
print(f"Encoded data: {encoded_data.hex()}")
321
322
# Decode messages
323
decoder = Decoder(encoded_data)
324
number = decoder.read_var_uint()
325
text = decoder.read_var_string()
326
327
print(f"Decoded number: {number}") # 42
328
print(f"Decoded text: {text}") # "Hello, world!"
329
330
# Direct variable-length integer encoding
331
uint_bytes = write_var_uint(1000)
332
print(f"Variable uint encoding: {uint_bytes.hex()}")
333
```
334
335
### Document Synchronization Protocol
336
337
```python
338
from pycrdt import Doc, create_sync_message, handle_sync_message, create_update_message
339
340
# Create two documents for sync simulation
341
doc1 = Doc(client_id=1)
342
doc2 = Doc(client_id=2)
343
344
# Add content to doc1
345
text1 = doc1.get("content", type=Text)
346
with doc1.transaction():
347
text1.insert(0, "Hello from doc1")
348
349
# Step 1: Create initial sync message from doc1
350
sync_message = create_sync_message(doc1)
351
print(f"Initial sync message: {len(sync_message)} bytes")
352
353
# Step 2: Handle sync message on doc2 and get response
354
response = handle_sync_message(sync_message, doc2)
355
if response:
356
print(f"Sync response: {len(response)} bytes")
357
358
# Step 3: Handle response back on doc1
359
final_response = handle_sync_message(response, doc1)
360
if final_response:
361
print(f"Final response: {len(final_response)} bytes")
362
363
# Verify synchronization
364
text2 = doc2.get("content", type=Text)
365
print(f"Doc1 content: {str(text1)}")
366
print(f"Doc2 content: {str(text2)}")
367
368
# Create update message for incremental sync
369
with doc1.transaction():
370
text1.insert(len(text1), " - updated!")
371
372
update_data = doc1.get_update(doc2.get_state())
373
update_message = create_update_message(update_data)
374
375
# Apply update to doc2
376
handle_sync_message(update_message, doc2)
377
print(f"Updated doc2 content: {str(text2)}")
378
```
379
380
### Custom Channel Implementation
381
382
```python
383
import asyncio
384
from pycrdt import Channel
385
386
class InMemoryChannel:
387
"""Simple in-memory channel for testing."""
388
389
def __init__(self, path: str):
390
self._path = path
391
self._queue = asyncio.Queue()
392
self._peer_queue = None
393
394
@property
395
def path(self) -> str:
396
return self._path
397
398
def connect_to(self, other_channel):
399
"""Connect two channels for bidirectional communication."""
400
self._peer_queue = other_channel._queue
401
other_channel._peer_queue = self._queue
402
403
async def send(self, message: bytes) -> None:
404
if self._peer_queue:
405
await self._peer_queue.put(message)
406
407
async def recv(self) -> bytes:
408
return await self._queue.get()
409
410
def __aiter__(self):
411
return self
412
413
async def __anext__(self) -> bytes:
414
return await self.recv()
415
416
# Example usage
417
async def test_custom_channel():
418
channel1 = InMemoryChannel("/doc1")
419
channel2 = InMemoryChannel("/doc2")
420
channel1.connect_to(channel2)
421
422
# Send message from channel1 to channel2
423
await channel1.send(b"Hello from channel1")
424
message = await channel2.recv()
425
print(f"Received: {message}")
426
427
# Send response back
428
await channel2.send(b"Hello back from channel2")
429
response = await channel1.recv()
430
print(f"Response: {response}")
431
432
# Run the test
433
asyncio.run(test_custom_channel())
434
```
435
436
### Provider-Based Synchronization
437
438
```python
439
import asyncio
440
import anyio
441
from pycrdt import Doc, Provider, Text
442
443
class WebSocketChannel:
444
"""WebSocket-like channel implementation."""
445
446
def __init__(self, path: str):
447
self._path = path
448
self._send_queue = asyncio.Queue()
449
self._recv_queue = asyncio.Queue()
450
self._peer = None
451
452
@property
453
def path(self) -> str:
454
return self._path
455
456
def connect_to(self, peer):
457
"""Simulate WebSocket connection to peer."""
458
self._peer = peer
459
peer._peer = self
460
461
async def send(self, message: bytes) -> None:
462
if self._peer:
463
await self._peer._recv_queue.put(message)
464
465
async def recv(self) -> bytes:
466
return await self._recv_queue.get()
467
468
def __aiter__(self):
469
return self
470
471
async def __anext__(self) -> bytes:
472
return await self.recv()
473
474
async def collaborative_editing_example():
475
"""Example of collaborative editing using providers."""
476
477
# Create documents and channels
478
doc1 = Doc(client_id=1)
479
doc2 = Doc(client_id=2)
480
481
channel1 = WebSocketChannel("/doc/shared")
482
channel2 = WebSocketChannel("/doc/shared")
483
channel1.connect_to(channel2)
484
485
# Create providers
486
provider1 = Provider(doc1, channel1)
487
provider2 = Provider(doc2, channel2)
488
489
async with anyio.create_task_group() as tg:
490
# Start providers
491
tg.start_soon(provider1.start)
492
tg.start_soon(provider2.start)
493
494
# Wait for providers to be ready
495
await provider1.started.wait()
496
await provider2.started.wait()
497
498
# Get shared text objects
499
text1 = doc1.get("content", type=Text)
500
text2 = doc2.get("content", type=Text)
501
502
# Client 1 makes changes
503
await anyio.sleep(0.1) # Allow sync to happen
504
with doc1.transaction(origin="client1"):
505
text1.insert(0, "Hello from client 1! ")
506
507
# Client 2 makes concurrent changes
508
await anyio.sleep(0.1)
509
with doc2.transaction(origin="client2"):
510
text2.insert(0, "Hi from client 2! ")
511
512
# Allow synchronization to complete
513
await anyio.sleep(0.2)
514
515
print(f"Client 1 sees: {str(text1)}")
516
print(f"Client 2 sees: {str(text2)}")
517
518
# Stop providers
519
await provider1.stop()
520
await provider2.stop()
521
522
# Run collaborative editing example
523
asyncio.run(collaborative_editing_example())
524
```
525
526
### Message Stream Processing
527
528
```python
529
from pycrdt import Decoder, YMessageType, YSyncMessageType
530
531
def process_message_stream(stream: bytes):
532
"""Process a stream of multiple messages."""
533
decoder = Decoder(stream)
534
535
for message_data in decoder.read_messages():
536
# Each message starts with message type
537
msg_decoder = Decoder(message_data)
538
msg_type = msg_decoder.read_var_uint()
539
540
if msg_type == YMessageType.SYNC:
541
# Process sync message
542
sync_type = msg_decoder.read_var_uint()
543
544
if sync_type == YSyncMessageType.SYNC_STEP1:
545
print("Received sync step 1 (state vector)")
546
elif sync_type == YSyncMessageType.SYNC_STEP2:
547
print("Received sync step 2 (missing updates)")
548
elif sync_type == YSyncMessageType.SYNC_UPDATE:
549
print("Received sync update")
550
551
elif msg_type == YMessageType.AWARENESS:
552
print("Received awareness message")
553
554
# Example message stream
555
encoder = Encoder()
556
557
# Create sync step 1 message
558
encoder.write_var_uint(YMessageType.SYNC)
559
encoder.write_var_uint(YSyncMessageType.SYNC_STEP1)
560
encoder.write_var_string("state_vector_data")
561
562
# Create awareness message
563
encoder.write_var_uint(YMessageType.AWARENESS)
564
encoder.write_var_string("awareness_data")
565
566
message_stream = encoder.to_bytes()
567
process_message_stream(message_stream)
568
```
569
570
### Robust Synchronization with Error Handling
571
572
```python
573
import asyncio
574
import logging
575
from pycrdt import Doc, Provider, Channel
576
577
logger = logging.getLogger(__name__)
578
579
class ReliableChannel:
580
"""Channel with automatic reconnection and error handling."""
581
582
def __init__(self, path: str, max_retries: int = 3):
583
self._path = path
584
self._max_retries = max_retries
585
self._connected = False
586
self._queue = asyncio.Queue()
587
588
@property
589
def path(self) -> str:
590
return self._path
591
592
async def send(self, message: bytes) -> None:
593
retries = 0
594
while retries < self._max_retries:
595
try:
596
if not self._connected:
597
await self._reconnect()
598
599
# Simulate network send
600
await self._send_impl(message)
601
return
602
603
except Exception as e:
604
retries += 1
605
logger.warning(f"Send failed (attempt {retries}): {e}")
606
if retries >= self._max_retries:
607
raise
608
await asyncio.sleep(1.0 * retries) # Exponential backoff
609
610
async def recv(self) -> bytes:
611
while True:
612
try:
613
if not self._connected:
614
await self._reconnect()
615
616
return await self._recv_impl()
617
618
except Exception as e:
619
logger.warning(f"Receive failed: {e}")
620
self._connected = False
621
await asyncio.sleep(1.0)
622
623
async def _reconnect(self):
624
"""Simulate reconnection logic."""
625
logger.info(f"Reconnecting to {self._path}")
626
await asyncio.sleep(0.1) # Simulate connection time
627
self._connected = True
628
629
async def _send_impl(self, message: bytes):
630
"""Simulate actual network send."""
631
if not self._connected:
632
raise ConnectionError("Not connected")
633
# Store in queue for this example
634
await self._queue.put(message)
635
636
async def _recv_impl(self) -> bytes:
637
"""Simulate actual network receive."""
638
if not self._connected:
639
raise ConnectionError("Not connected")
640
return await self._queue.get()
641
642
def __aiter__(self):
643
return self
644
645
async def __anext__(self) -> bytes:
646
return await self.recv()
647
648
async def robust_sync_example():
649
"""Example with error handling and reconnection."""
650
doc = Doc()
651
channel = ReliableChannel("/robust/sync")
652
653
# Create provider with logging
654
logging.basicConfig(level=logging.INFO)
655
provider = Provider(doc, channel, log=logger)
656
657
try:
658
async with provider: # Use context manager for cleanup
659
text = doc.get("content", type=Text)
660
661
# Make changes
662
with doc.transaction():
663
text.insert(0, "Robust sync test")
664
665
# Simulate some work
666
await asyncio.sleep(1.0)
667
668
except Exception as e:
669
logger.error(f"Sync failed: {e}")
670
671
# Run robust sync example
672
asyncio.run(robust_sync_example())
673
```
674
675
### Performance Monitoring
676
677
```python
678
import time
679
from pycrdt import Doc, Text, create_sync_message, handle_sync_message
680
681
def benchmark_sync_performance():
682
"""Benchmark synchronization performance."""
683
684
# Create documents with different amounts of content
685
sizes = [100, 1000, 10000]
686
687
for size in sizes:
688
doc1 = Doc(client_id=1)
689
doc2 = Doc(client_id=2)
690
691
text1 = doc1.get("content", type=Text)
692
693
# Generate content
694
content = "x" * size
695
with doc1.transaction():
696
text1.insert(0, content)
697
698
# Measure sync message creation
699
start_time = time.time()
700
sync_msg = create_sync_message(doc1)
701
create_time = time.time() - start_time
702
703
# Measure sync message handling
704
start_time = time.time()
705
response = handle_sync_message(sync_msg, doc2)
706
handle_time = time.time() - start_time
707
708
# Measure final sync
709
start_time = time.time()
710
if response:
711
handle_sync_message(response, doc1)
712
complete_time = time.time() - start_time
713
714
print(f"Size {size}:")
715
print(f" Message size: {len(sync_msg)} bytes")
716
print(f" Create time: {create_time:.4f}s")
717
print(f" Handle time: {handle_time:.4f}s")
718
print(f" Complete time: {complete_time:.4f}s")
719
print()
720
721
benchmark_sync_performance()
722
```
723
724
## Error Handling
725
726
```python
727
from pycrdt import Doc, Provider, Encoder, Decoder
728
729
async def sync_with_error_handling():
730
"""Example of proper error handling in synchronization."""
731
732
doc = Doc()
733
734
try:
735
# Encoding errors
736
encoder = Encoder()
737
encoder.write_var_uint(-1) # May raise ValueError for negative numbers
738
739
except ValueError as e:
740
print(f"Encoding error: {e}")
741
742
try:
743
# Decoding errors
744
invalid_data = b"invalid"
745
decoder = Decoder(invalid_data)
746
decoder.read_var_uint() # May raise decoding error
747
748
except Exception as e:
749
print(f"Decoding error: {e}")
750
751
try:
752
# Provider errors
753
class FailingChannel:
754
path = "/fail"
755
async def send(self, msg): raise ConnectionError("Network down")
756
async def recv(self): raise ConnectionError("Network down")
757
def __aiter__(self): return self
758
async def __anext__(self): raise ConnectionError("Network down")
759
760
provider = Provider(doc, FailingChannel())
761
await provider.start()
762
763
except Exception as e:
764
print(f"Provider error: {e}")
765
```