or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

array-operations.mdawareness.mddocument-management.mdindex.mdmap-operations.mdposition-undo.mdsynchronization.mdtext-operations.mdxml-support.md

synchronization.mddocs/

0

# Synchronization

1

2

## Overview

3

4

pycrdt provides comprehensive synchronization capabilities for sharing documents across multiple clients in real-time. The synchronization system includes message encoding/decoding, sync protocols, network providers, and transport abstractions. This enables building collaborative applications with automatic conflict resolution and efficient network communication.

5

6

## Core Types

7

8

### Message Type Enums

9

10

```python { .api }

11

class YMessageType(IntEnum):

12

"""Top-level message types for network protocol."""

13

SYNC = 0

14

AWARENESS = 1

15

16

class YSyncMessageType(IntEnum):

17

"""Sync-specific message types."""

18

SYNC_STEP1 = 0 # Send document state

19

SYNC_STEP2 = 1 # Reply with missing updates

20

SYNC_UPDATE = 2 # Document update broadcast

21

```

22

23

### Encoder

24

25

Binary message encoder for network protocols.

26

27

```python { .api }

28

class Encoder:

29

def __init__(self) -> None:

30

"""Create a new message encoder."""

31

32

def write_var_uint(self, num: int) -> None:

33

"""

34

Write a variable-length unsigned integer.

35

36

Args:

37

num (int): Integer to encode

38

"""

39

40

def write_var_string(self, text: str) -> None:

41

"""

42

Write a variable-length string.

43

44

Args:

45

text (str): String to encode

46

"""

47

48

def to_bytes(self) -> bytes:

49

"""

50

Get the encoded data as bytes.

51

52

Returns:

53

bytes: Encoded message data

54

"""

55

```

56

57

### Decoder

58

59

Binary message decoder for network protocols.

60

61

```python { .api }

62

class Decoder:

63

def __init__(self, stream: bytes) -> None:

64

"""

65

Create a new message decoder.

66

67

Args:

68

stream (bytes): Binary data to decode

69

"""

70

71

def read_var_uint(self) -> int:

72

"""

73

Read a variable-length unsigned integer.

74

75

Returns:

76

int: Decoded integer

77

"""

78

79

def read_message(self) -> bytes | None:

80

"""

81

Read a single message from the stream.

82

83

Returns:

84

bytes | None: Message data or None if no more messages

85

"""

86

87

def read_messages(self) -> Iterator[bytes]:

88

"""

89

Read all messages from the stream.

90

91

Returns:

92

Iterator[bytes]: Iterator over message data

93

"""

94

95

def read_var_string(self) -> str:

96

"""

97

Read a variable-length string.

98

99

Returns:

100

str: Decoded string

101

"""

102

```

103

104

### Channel Protocol

105

106

Transport abstraction for document synchronization.

107

108

```python { .api }

109

class Channel(Protocol):

110

"""Abstract interface for transport-agnostic synchronization channels."""

111

112

@property

113

def path(self) -> str:

114

"""Get the channel path identifier."""

115

116

async def send(self, message: bytes) -> None:

117

"""

118

Send a message through the channel.

119

120

Args:

121

message (bytes): Binary message to send

122

"""

123

124

async def recv(self) -> bytes:

125

"""

126

Receive a message from the channel.

127

128

Returns:

129

bytes: Received binary message

130

"""

131

132

def __aiter__(self) -> "Channel":

133

"""Return async iterator for receiving messages."""

134

135

async def __anext__(self) -> bytes:

136

"""Get next message from async iterator."""

137

```

138

139

### Provider

140

141

Document synchronization provider that manages network communication.

142

143

```python { .api }

144

class Provider:

145

def __init__(self, doc: Doc, channel: Channel, log: Logger | None = None) -> None:

146

"""

147

Create a new synchronization provider.

148

149

Args:

150

doc (Doc): Document to synchronize

151

channel (Channel): Transport channel for communication

152

log (Logger, optional): Logger for debugging

153

"""

154

155

@property

156

def started(self) -> Event:

157

"""Event that signals when the provider is ready."""

158

159

async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED) -> None:

160

"""

161

Start the synchronization provider.

162

163

Args:

164

task_status: Optional task status for structured concurrency

165

"""

166

167

async def stop(self) -> None:

168

"""Stop the synchronization provider."""

169

170

async def __aenter__(self) -> Provider:

171

"""Enter async context manager."""

172

173

async def __aexit__(self, exc_type, exc_value, exc_tb) -> None:

174

"""Exit async context manager."""

175

```

176

177

## Synchronization Functions

178

179

### Message Creation Functions

180

181

```python { .api }

182

def write_var_uint(num: int) -> bytes:

183

"""

184

Encode a variable-length unsigned integer.

185

186

Args:

187

num (int): Integer to encode

188

189

Returns:

190

bytes: Encoded integer data

191

"""

192

193

def create_awareness_message(data: bytes) -> bytes:

194

"""

195

Create an awareness protocol message.

196

197

Args:

198

data (bytes): Awareness data to wrap

199

200

Returns:

201

bytes: Complete awareness message

202

"""

203

204

def create_sync_message(ydoc: Doc) -> bytes:

205

"""

206

Create a synchronization message from document state.

207

208

Args:

209

ydoc (Doc): Document to create sync message for

210

211

Returns:

212

bytes: Sync message containing document state

213

"""

214

215

def create_update_message(data: bytes) -> bytes:

216

"""

217

Create an update message containing document changes.

218

219

Args:

220

data (bytes): Update data to wrap

221

222

Returns:

223

bytes: Complete update message

224

"""

225

226

def handle_sync_message(message: bytes, ydoc: Doc) -> bytes | None:

227

"""

228

Handle an incoming synchronization message.

229

230

Args:

231

message (bytes): Incoming sync message

232

ydoc (Doc): Document to apply sync message to

233

234

Returns:

235

bytes | None: Response message if needed, or None

236

"""

237

238

def read_message(stream: bytes) -> bytes:

239

"""

240

Read a single message from binary stream.

241

242

Args:

243

stream (bytes): Binary stream containing message

244

245

Returns:

246

bytes: Extracted message data

247

"""

248

249

def write_message(stream: bytes) -> bytes:

250

"""

251

Write a message to binary stream format.

252

253

Args:

254

stream (bytes): Message data to wrap

255

256

Returns:

257

bytes: Formatted message stream

258

"""

259

```

260

261

## Update Utilities

262

263

Standalone functions for working with document states and updates.

264

265

```python { .api }

266

def get_state(update: bytes) -> bytes:

267

"""

268

Extract the state vector from an update.

269

270

Args:

271

update (bytes): Binary update data

272

273

Returns:

274

bytes: State vector representing the document state

275

"""

276

277

def get_update(update: bytes, state: bytes) -> bytes:

278

"""

279

Generate an update containing changes since a given state.

280

281

Args:

282

update (bytes): Source update data

283

state (bytes): State to compare against

284

285

Returns:

286

bytes: Binary update containing incremental changes

287

"""

288

289

def merge_updates(*updates: bytes) -> bytes:

290

"""

291

Merge multiple updates into a single consolidated update.

292

293

Args:

294

*updates (bytes): Variable number of update data streams

295

296

Returns:

297

bytes: Merged update containing all changes

298

299

Example:

300

>>> update1 = doc1.get_update()

301

>>> update2 = doc2.get_update()

302

>>> merged = merge_updates(update1, update2)

303

>>> doc3.apply_update(merged) # Apply all changes at once

304

"""

305

```

306

307

## Usage Examples

308

309

### Basic Message Encoding/Decoding

310

311

```python

312

from pycrdt import Encoder, Decoder, write_var_uint

313

314

# Encode messages

315

encoder = Encoder()

316

encoder.write_var_uint(42)

317

encoder.write_var_string("Hello, world!")

318

encoded_data = encoder.to_bytes()

319

320

print(f"Encoded data: {encoded_data.hex()}")

321

322

# Decode messages

323

decoder = Decoder(encoded_data)

324

number = decoder.read_var_uint()

325

text = decoder.read_var_string()

326

327

print(f"Decoded number: {number}") # 42

328

print(f"Decoded text: {text}") # "Hello, world!"

329

330

# Direct variable-length integer encoding

331

uint_bytes = write_var_uint(1000)

332

print(f"Variable uint encoding: {uint_bytes.hex()}")

333

```

334

335

### Document Synchronization Protocol

336

337

```python

338

from pycrdt import Doc, create_sync_message, handle_sync_message, create_update_message

339

340

# Create two documents for sync simulation

341

doc1 = Doc(client_id=1)

342

doc2 = Doc(client_id=2)

343

344

# Add content to doc1

345

text1 = doc1.get("content", type=Text)

346

with doc1.transaction():

347

text1.insert(0, "Hello from doc1")

348

349

# Step 1: Create initial sync message from doc1

350

sync_message = create_sync_message(doc1)

351

print(f"Initial sync message: {len(sync_message)} bytes")

352

353

# Step 2: Handle sync message on doc2 and get response

354

response = handle_sync_message(sync_message, doc2)

355

if response:

356

print(f"Sync response: {len(response)} bytes")

357

358

# Step 3: Handle response back on doc1

359

final_response = handle_sync_message(response, doc1)

360

if final_response:

361

print(f"Final response: {len(final_response)} bytes")

362

363

# Verify synchronization

364

text2 = doc2.get("content", type=Text)

365

print(f"Doc1 content: {str(text1)}")

366

print(f"Doc2 content: {str(text2)}")

367

368

# Create update message for incremental sync

369

with doc1.transaction():

370

text1.insert(len(text1), " - updated!")

371

372

update_data = doc1.get_update(doc2.get_state())

373

update_message = create_update_message(update_data)

374

375

# Apply update to doc2

376

handle_sync_message(update_message, doc2)

377

print(f"Updated doc2 content: {str(text2)}")

378

```

379

380

### Custom Channel Implementation

381

382

```python

383

import asyncio

384

from pycrdt import Channel

385

386

class InMemoryChannel:

387

"""Simple in-memory channel for testing."""

388

389

def __init__(self, path: str):

390

self._path = path

391

self._queue = asyncio.Queue()

392

self._peer_queue = None

393

394

@property

395

def path(self) -> str:

396

return self._path

397

398

def connect_to(self, other_channel):

399

"""Connect two channels for bidirectional communication."""

400

self._peer_queue = other_channel._queue

401

other_channel._peer_queue = self._queue

402

403

async def send(self, message: bytes) -> None:

404

if self._peer_queue:

405

await self._peer_queue.put(message)

406

407

async def recv(self) -> bytes:

408

return await self._queue.get()

409

410

def __aiter__(self):

411

return self

412

413

async def __anext__(self) -> bytes:

414

return await self.recv()

415

416

# Example usage

417

async def test_custom_channel():

418

channel1 = InMemoryChannel("/doc1")

419

channel2 = InMemoryChannel("/doc2")

420

channel1.connect_to(channel2)

421

422

# Send message from channel1 to channel2

423

await channel1.send(b"Hello from channel1")

424

message = await channel2.recv()

425

print(f"Received: {message}")

426

427

# Send response back

428

await channel2.send(b"Hello back from channel2")

429

response = await channel1.recv()

430

print(f"Response: {response}")

431

432

# Run the test

433

asyncio.run(test_custom_channel())

434

```

435

436

### Provider-Based Synchronization

437

438

```python

439

import asyncio

440

import anyio

441

from pycrdt import Doc, Provider, Text

442

443

class WebSocketChannel:

444

"""WebSocket-like channel implementation."""

445

446

def __init__(self, path: str):

447

self._path = path

448

self._send_queue = asyncio.Queue()

449

self._recv_queue = asyncio.Queue()

450

self._peer = None

451

452

@property

453

def path(self) -> str:

454

return self._path

455

456

def connect_to(self, peer):

457

"""Simulate WebSocket connection to peer."""

458

self._peer = peer

459

peer._peer = self

460

461

async def send(self, message: bytes) -> None:

462

if self._peer:

463

await self._peer._recv_queue.put(message)

464

465

async def recv(self) -> bytes:

466

return await self._recv_queue.get()

467

468

def __aiter__(self):

469

return self

470

471

async def __anext__(self) -> bytes:

472

return await self.recv()

473

474

async def collaborative_editing_example():

475

"""Example of collaborative editing using providers."""

476

477

# Create documents and channels

478

doc1 = Doc(client_id=1)

479

doc2 = Doc(client_id=2)

480

481

channel1 = WebSocketChannel("/doc/shared")

482

channel2 = WebSocketChannel("/doc/shared")

483

channel1.connect_to(channel2)

484

485

# Create providers

486

provider1 = Provider(doc1, channel1)

487

provider2 = Provider(doc2, channel2)

488

489

async with anyio.create_task_group() as tg:

490

# Start providers

491

tg.start_soon(provider1.start)

492

tg.start_soon(provider2.start)

493

494

# Wait for providers to be ready

495

await provider1.started.wait()

496

await provider2.started.wait()

497

498

# Get shared text objects

499

text1 = doc1.get("content", type=Text)

500

text2 = doc2.get("content", type=Text)

501

502

# Client 1 makes changes

503

await anyio.sleep(0.1) # Allow sync to happen

504

with doc1.transaction(origin="client1"):

505

text1.insert(0, "Hello from client 1! ")

506

507

# Client 2 makes concurrent changes

508

await anyio.sleep(0.1)

509

with doc2.transaction(origin="client2"):

510

text2.insert(0, "Hi from client 2! ")

511

512

# Allow synchronization to complete

513

await anyio.sleep(0.2)

514

515

print(f"Client 1 sees: {str(text1)}")

516

print(f"Client 2 sees: {str(text2)}")

517

518

# Stop providers

519

await provider1.stop()

520

await provider2.stop()

521

522

# Run collaborative editing example

523

asyncio.run(collaborative_editing_example())

524

```

525

526

### Message Stream Processing

527

528

```python

529

from pycrdt import Decoder, YMessageType, YSyncMessageType

530

531

def process_message_stream(stream: bytes):

532

"""Process a stream of multiple messages."""

533

decoder = Decoder(stream)

534

535

for message_data in decoder.read_messages():

536

# Each message starts with message type

537

msg_decoder = Decoder(message_data)

538

msg_type = msg_decoder.read_var_uint()

539

540

if msg_type == YMessageType.SYNC:

541

# Process sync message

542

sync_type = msg_decoder.read_var_uint()

543

544

if sync_type == YSyncMessageType.SYNC_STEP1:

545

print("Received sync step 1 (state vector)")

546

elif sync_type == YSyncMessageType.SYNC_STEP2:

547

print("Received sync step 2 (missing updates)")

548

elif sync_type == YSyncMessageType.SYNC_UPDATE:

549

print("Received sync update")

550

551

elif msg_type == YMessageType.AWARENESS:

552

print("Received awareness message")

553

554

# Example message stream

555

encoder = Encoder()

556

557

# Create sync step 1 message

558

encoder.write_var_uint(YMessageType.SYNC)

559

encoder.write_var_uint(YSyncMessageType.SYNC_STEP1)

560

encoder.write_var_string("state_vector_data")

561

562

# Create awareness message

563

encoder.write_var_uint(YMessageType.AWARENESS)

564

encoder.write_var_string("awareness_data")

565

566

message_stream = encoder.to_bytes()

567

process_message_stream(message_stream)

568

```

569

570

### Robust Synchronization with Error Handling

571

572

```python

573

import asyncio

574

import logging

575

from pycrdt import Doc, Provider, Channel

576

577

logger = logging.getLogger(__name__)

578

579

class ReliableChannel:

580

"""Channel with automatic reconnection and error handling."""

581

582

def __init__(self, path: str, max_retries: int = 3):

583

self._path = path

584

self._max_retries = max_retries

585

self._connected = False

586

self._queue = asyncio.Queue()

587

588

@property

589

def path(self) -> str:

590

return self._path

591

592

async def send(self, message: bytes) -> None:

593

retries = 0

594

while retries < self._max_retries:

595

try:

596

if not self._connected:

597

await self._reconnect()

598

599

# Simulate network send

600

await self._send_impl(message)

601

return

602

603

except Exception as e:

604

retries += 1

605

logger.warning(f"Send failed (attempt {retries}): {e}")

606

if retries >= self._max_retries:

607

raise

608

await asyncio.sleep(1.0 * retries) # Exponential backoff

609

610

async def recv(self) -> bytes:

611

while True:

612

try:

613

if not self._connected:

614

await self._reconnect()

615

616

return await self._recv_impl()

617

618

except Exception as e:

619

logger.warning(f"Receive failed: {e}")

620

self._connected = False

621

await asyncio.sleep(1.0)

622

623

async def _reconnect(self):

624

"""Simulate reconnection logic."""

625

logger.info(f"Reconnecting to {self._path}")

626

await asyncio.sleep(0.1) # Simulate connection time

627

self._connected = True

628

629

async def _send_impl(self, message: bytes):

630

"""Simulate actual network send."""

631

if not self._connected:

632

raise ConnectionError("Not connected")

633

# Store in queue for this example

634

await self._queue.put(message)

635

636

async def _recv_impl(self) -> bytes:

637

"""Simulate actual network receive."""

638

if not self._connected:

639

raise ConnectionError("Not connected")

640

return await self._queue.get()

641

642

def __aiter__(self):

643

return self

644

645

async def __anext__(self) -> bytes:

646

return await self.recv()

647

648

async def robust_sync_example():

649

"""Example with error handling and reconnection."""

650

doc = Doc()

651

channel = ReliableChannel("/robust/sync")

652

653

# Create provider with logging

654

logging.basicConfig(level=logging.INFO)

655

provider = Provider(doc, channel, log=logger)

656

657

try:

658

async with provider: # Use context manager for cleanup

659

text = doc.get("content", type=Text)

660

661

# Make changes

662

with doc.transaction():

663

text.insert(0, "Robust sync test")

664

665

# Simulate some work

666

await asyncio.sleep(1.0)

667

668

except Exception as e:

669

logger.error(f"Sync failed: {e}")

670

671

# Run robust sync example

672

asyncio.run(robust_sync_example())

673

```

674

675

### Performance Monitoring

676

677

```python

678

import time

679

from pycrdt import Doc, Text, create_sync_message, handle_sync_message

680

681

def benchmark_sync_performance():

682

"""Benchmark synchronization performance."""

683

684

# Create documents with different amounts of content

685

sizes = [100, 1000, 10000]

686

687

for size in sizes:

688

doc1 = Doc(client_id=1)

689

doc2 = Doc(client_id=2)

690

691

text1 = doc1.get("content", type=Text)

692

693

# Generate content

694

content = "x" * size

695

with doc1.transaction():

696

text1.insert(0, content)

697

698

# Measure sync message creation

699

start_time = time.time()

700

sync_msg = create_sync_message(doc1)

701

create_time = time.time() - start_time

702

703

# Measure sync message handling

704

start_time = time.time()

705

response = handle_sync_message(sync_msg, doc2)

706

handle_time = time.time() - start_time

707

708

# Measure final sync

709

start_time = time.time()

710

if response:

711

handle_sync_message(response, doc1)

712

complete_time = time.time() - start_time

713

714

print(f"Size {size}:")

715

print(f" Message size: {len(sync_msg)} bytes")

716

print(f" Create time: {create_time:.4f}s")

717

print(f" Handle time: {handle_time:.4f}s")

718

print(f" Complete time: {complete_time:.4f}s")

719

print()

720

721

benchmark_sync_performance()

722

```

723

724

## Error Handling

725

726

```python

727

from pycrdt import Doc, Provider, Encoder, Decoder

728

729

async def sync_with_error_handling():

730

"""Example of proper error handling in synchronization."""

731

732

doc = Doc()

733

734

try:

735

# Encoding errors

736

encoder = Encoder()

737

encoder.write_var_uint(-1) # May raise ValueError for negative numbers

738

739

except ValueError as e:

740

print(f"Encoding error: {e}")

741

742

try:

743

# Decoding errors

744

invalid_data = b"invalid"

745

decoder = Decoder(invalid_data)

746

decoder.read_var_uint() # May raise decoding error

747

748

except Exception as e:

749

print(f"Decoding error: {e}")

750

751

try:

752

# Provider errors

753

class FailingChannel:

754

path = "/fail"

755

async def send(self, msg): raise ConnectionError("Network down")

756

async def recv(self): raise ConnectionError("Network down")

757

def __aiter__(self): return self

758

async def __anext__(self): raise ConnectionError("Network down")

759

760

provider = Provider(doc, FailingChannel())

761

await provider.start()

762

763

except Exception as e:

764

print(f"Provider error: {e}")

765

```