or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdclient-management.mddatabase-operations.mdedge-functions.mdindex.mdrealtime-subscriptions.mdstorage-operations.md

realtime-subscriptions.mddocs/

0

# Real-time Subscriptions

1

2

WebSocket-based real-time functionality including database change subscriptions, presence tracking, message broadcasting, and channel management. Provides live updates and interactive features through integration with Supabase Realtime.

3

4

## Capabilities

5

6

### Channel Management

7

8

Create and manage real-time channels for organizing different types of real-time communications.

9

10

```python { .api }

11

def channel(

12

self,

13

topic: str,

14

params: RealtimeChannelOptions = {}

15

) -> SyncRealtimeChannel | AsyncRealtimeChannel:

16

"""

17

Create a real-time channel for subscriptions and messaging.

18

19

Parameters:

20

- topic: Channel topic/name for identification

21

- params: Channel configuration options

22

23

Returns:

24

Realtime channel instance (sync or async) for subscribing to events

25

26

Note: Channels enable broadcast, presence, and postgres_changes functionality

27

"""

28

29

def get_channels(self) -> List[SyncRealtimeChannel | AsyncRealtimeChannel]:

30

"""

31

Get all active real-time channels.

32

33

Returns:

34

List of active channel instances

35

"""

36

37

def remove_channel(self, channel: SyncRealtimeChannel | AsyncRealtimeChannel) -> None:

38

"""

39

Unsubscribe and remove a specific real-time channel.

40

41

Parameters:

42

- channel: Channel instance to remove

43

44

Note: For AsyncClient, this method is async and returns a coroutine

45

"""

46

47

def remove_all_channels(self) -> None:

48

"""

49

Unsubscribe and remove all real-time channels.

50

51

Note: For AsyncClient, this method is async and returns a coroutine

52

"""

53

```

54

55

**Usage Examples:**

56

57

```python

58

# Create a channel

59

channel = supabase.channel("room-1")

60

61

# Create channel with options

62

channel = supabase.channel("game-lobby", {

63

"presence": {"key": "user_id"},

64

"broadcast": {"self": True}

65

})

66

67

# List all channels

68

channels = supabase.get_channels()

69

print(f"Active channels: {len(channels)}")

70

71

# Remove specific channel

72

supabase.remove_channel(channel)

73

74

# Remove all channels

75

supabase.remove_all_channels()

76

77

# Async channel management

78

async def manage_async_channels():

79

client = await create_async_client(url, key)

80

channel = client.channel("async-room")

81

82

# Remove async channel

83

await client.remove_channel(channel)

84

await client.remove_all_channels()

85

```

86

87

### Database Change Subscriptions

88

89

Subscribe to real-time database changes including inserts, updates, deletes, and specific table events.

90

91

```python { .api }

92

# Channel subscription methods (on channel instance)

93

def on(

94

self,

95

event: str,

96

filter: dict,

97

callback: Callable

98

) -> 'Channel':

99

"""

100

Subscribe to real-time events on the channel.

101

102

Parameters:

103

- event: Event type ("postgres_changes", "broadcast", "presence")

104

- filter: Event filter configuration

105

- callback: Function called when event occurs

106

107

Returns:

108

Channel instance for method chaining

109

"""

110

111

def subscribe(self, timeout: int = 10000) -> 'Channel':

112

"""

113

Start listening for events on the channel.

114

115

Parameters:

116

- timeout: Subscription timeout in milliseconds

117

118

Returns:

119

Channel instance

120

121

Note: Must be called after setting up event handlers

122

"""

123

124

def unsubscribe(self) -> None:

125

"""

126

Stop listening for events and close the channel.

127

"""

128

```

129

130

**Usage Examples:**

131

132

```python

133

# Subscribe to all changes in a table

134

def handle_changes(payload):

135

print(f"Change detected: {payload['eventType']}")

136

print(f"Table: {payload['table']}")

137

print(f"Data: {payload['new']}")

138

139

channel = supabase.channel("table-changes")

140

channel.on(

141

"postgres_changes",

142

{

143

"event": "*", # All events (INSERT, UPDATE, DELETE)

144

"schema": "public", # Database schema

145

"table": "countries" # Specific table

146

},

147

handle_changes

148

)

149

channel.subscribe()

150

151

# Subscribe to specific event types

152

def handle_inserts(payload):

153

print(f"New record: {payload['new']}")

154

155

def handle_updates(payload):

156

print(f"Updated: {payload['old']} -> {payload['new']}")

157

158

def handle_deletes(payload):

159

print(f"Deleted: {payload['old']}")

160

161

# Insert events

162

supabase.channel("inserts").on(

163

"postgres_changes",

164

{"event": "INSERT", "schema": "public", "table": "users"},

165

handle_inserts

166

).subscribe()

167

168

# Update events

169

supabase.channel("updates").on(

170

"postgres_changes",

171

{"event": "UPDATE", "schema": "public", "table": "users"},

172

handle_updates

173

).subscribe()

174

175

# Delete events

176

supabase.channel("deletes").on(

177

"postgres_changes",

178

{"event": "DELETE", "schema": "public", "table": "users"},

179

handle_deletes

180

).subscribe()

181

182

# Filter by specific columns

183

supabase.channel("user-status").on(

184

"postgres_changes",

185

{

186

"event": "UPDATE",

187

"schema": "public",

188

"table": "users",

189

"filter": "status=eq.active" # Only active users

190

},

191

lambda payload: print(f"Active user updated: {payload['new']['name']}")

192

).subscribe()

193

```

194

195

### Broadcast Messaging

196

197

Send and receive custom messages between clients through broadcast events.

198

199

```python { .api }

200

# Broadcasting methods (on channel instance)

201

def send(

202

self,

203

event_type: str,

204

payload: dict,

205

options: dict = None

206

) -> None:

207

"""

208

Send a broadcast message to all channel subscribers.

209

210

Parameters:

211

- event_type: Type of broadcast event

212

- payload: Message data to send

213

- options: Additional broadcast options

214

215

Note: For async channels, this method is async

216

"""

217

```

218

219

**Usage Examples:**

220

221

```python

222

# Set up broadcast listener

223

def handle_broadcast(payload):

224

print(f"Received broadcast: {payload}")

225

226

channel = supabase.channel("chat-room")

227

channel.on("broadcast", {"event": "message"}, handle_broadcast)

228

channel.subscribe()

229

230

# Send broadcast messages

231

channel.send("message", {

232

"user": "john_doe",

233

"text": "Hello everyone!",

234

"timestamp": "2023-12-01T10:00:00Z"

235

})

236

237

# Broadcast with different event types

238

channel.send("user_typing", {

239

"user": "john_doe",

240

"typing": True

241

})

242

243

channel.send("system_notification", {

244

"type": "maintenance",

245

"message": "System will restart in 5 minutes"

246

})

247

248

# Multiple event types on same channel

249

def handle_messages(payload):

250

print(f"Message: {payload['text']} from {payload['user']}")

251

252

def handle_typing(payload):

253

print(f"{payload['user']} is {'typing' if payload['typing'] else 'stopped typing'}")

254

255

channel = supabase.channel("multi-chat")

256

channel.on("broadcast", {"event": "message"}, handle_messages)

257

channel.on("broadcast", {"event": "typing"}, handle_typing)

258

channel.subscribe()

259

260

# Async broadcasting

261

async def async_broadcast():

262

client = await create_async_client(url, key)

263

channel = client.channel("async-chat")

264

265

await channel.send("message", {"text": "Async message"})

266

```

267

268

### Presence Tracking

269

270

Track user presence and status in real-time, showing who is online and their current state.

271

272

```python { .api }

273

# Presence methods (on channel instance)

274

def track(self, state: dict) -> None:

275

"""

276

Track user presence state on the channel.

277

278

Parameters:

279

- state: User state information to track

280

281

Note: For async channels, this method is async

282

"""

283

284

def untrack(self) -> None:

285

"""

286

Stop tracking user presence on the channel.

287

288

Note: For async channels, this method is async

289

"""

290

```

291

292

**Usage Examples:**

293

294

```python

295

# Track user presence

296

def handle_presence(payload):

297

print(f"Presence update: {payload}")

298

299

channel = supabase.channel("user-presence")

300

channel.on("presence", {"event": "sync"}, handle_presence)

301

channel.on("presence", {"event": "join"}, lambda p: print(f"User joined: {p}"))

302

channel.on("presence", {"event": "leave"}, lambda p: print(f"User left: {p}"))

303

channel.subscribe()

304

305

# Start tracking current user

306

channel.track({

307

"user_id": "user123",

308

"username": "john_doe",

309

"status": "online",

310

"last_seen": "2023-12-01T10:00:00Z"

311

})

312

313

# Update presence state

314

channel.track({

315

"user_id": "user123",

316

"username": "john_doe",

317

"status": "away",

318

"activity": "In a meeting"

319

})

320

321

# Stop tracking

322

channel.untrack()

323

324

# Detailed presence tracking

325

def handle_sync(payload):

326

"""Handle complete presence state sync"""

327

users_online = payload.get("presences", {})

328

print(f"Users online: {len(users_online)}")

329

for user_id, presence_data in users_online.items():

330

for presence in presence_data:

331

print(f"User {presence['username']}: {presence['status']}")

332

333

def handle_join(payload):

334

"""Handle user joining"""

335

user_data = payload["presence"]

336

print(f"{user_data['username']} joined the channel")

337

338

def handle_leave(payload):

339

"""Handle user leaving"""

340

user_data = payload["presence"]

341

print(f"{user_data['username']} left the channel")

342

343

channel = supabase.channel("game-lobby")

344

channel.on("presence", {"event": "sync"}, handle_sync)

345

channel.on("presence", {"event": "join"}, handle_join)

346

channel.on("presence", {"event": "leave"}, handle_leave)

347

channel.subscribe()

348

349

# Track game player status

350

channel.track({

351

"user_id": "player1",

352

"username": "Alice",

353

"status": "ready",

354

"character": "mage",

355

"level": 15

356

})

357

```

358

359

### Channel Configuration

360

361

Configure channel behavior and options for different use cases.

362

363

```python { .api }

364

# Channel configuration options

365

class RealtimeChannelOptions(TypedDict, total=False):

366

"""Configuration options for realtime channels."""

367

368

auto_reconnect: bool

369

"""Automatically reconnect on connection loss"""

370

371

hb_interval: int

372

"""Heartbeat interval in milliseconds"""

373

374

max_retries: int

375

"""Maximum reconnection attempts"""

376

377

initial_backoff: float

378

"""Initial backoff delay for reconnections"""

379

```

380

381

**Usage Examples:**

382

383

```python

384

# Configure channel with custom options

385

channel_options = {

386

"auto_reconnect": True,

387

"hb_interval": 30000, # 30 seconds

388

"max_retries": 5,

389

"initial_backoff": 1.0

390

}

391

392

channel = supabase.channel("reliable-channel", channel_options)

393

394

# Different configurations for different use cases

395

# High-frequency trading data

396

trading_channel = supabase.channel("trading-data", {

397

"hb_interval": 5000, # 5 seconds for quick detection

398

"max_retries": 10, # More retries for critical data

399

"auto_reconnect": True

400

})

401

402

# Chat application

403

chat_channel = supabase.channel("chat", {

404

"hb_interval": 60000, # 1 minute is fine for chat

405

"max_retries": 3, # Fewer retries for user experience

406

"auto_reconnect": True

407

})

408

409

# Gaming leaderboard

410

game_channel = supabase.channel("leaderboard", {

411

"hb_interval": 15000, # 15 seconds for game updates

412

"max_retries": 5,

413

"auto_reconnect": True

414

})

415

```

416

417

### Connection Management

418

419

Handle connection states, errors, and reconnection scenarios.

420

421

```python { .api }

422

# Connection event handlers

423

def on_error(self, callback: Callable) -> 'Channel':

424

"""

425

Handle channel connection errors.

426

427

Parameters:

428

- callback: Function called on connection errors

429

"""

430

431

def on_close(self, callback: Callable) -> 'Channel':

432

"""

433

Handle channel close events.

434

435

Parameters:

436

- callback: Function called when channel closes

437

"""

438

```

439

440

**Usage Examples:**

441

442

```python

443

def handle_error(error):

444

print(f"Channel error: {error}")

445

# Implement error recovery logic

446

447

def handle_close():

448

print("Channel closed")

449

# Clean up resources or attempt reconnection

450

451

def handle_connection_state(state):

452

print(f"Connection state: {state}")

453

if state == "connected":

454

print("Successfully connected to realtime")

455

elif state == "disconnected":

456

print("Lost connection to realtime")

457

458

# Set up connection handlers

459

channel = supabase.channel("monitored-channel")

460

channel.on_error(handle_error)

461

channel.on_close(handle_close)

462

463

# Monitor overall realtime connection

464

supabase.realtime.on_open(lambda: print("Realtime connection opened"))

465

supabase.realtime.on_close(lambda: print("Realtime connection closed"))

466

supabase.realtime.on_error(lambda e: print(f"Realtime error: {e}"))

467

468

# Graceful shutdown

469

def cleanup_realtime():

470

"""Clean up all realtime resources"""

471

try:

472

supabase.remove_all_channels()

473

print("All channels removed successfully")

474

except Exception as e:

475

print(f"Error during cleanup: {e}")

476

477

# Connection with retry logic

478

def connect_with_retry(channel, max_attempts=3):

479

"""Connect channel with retry logic"""

480

for attempt in range(max_attempts):

481

try:

482

channel.subscribe()

483

print(f"Connected on attempt {attempt + 1}")

484

break

485

except Exception as e:

486

print(f"Attempt {attempt + 1} failed: {e}")

487

if attempt == max_attempts - 1:

488

print("Max attempts reached, giving up")

489

raise

490

time.sleep(2 ** attempt) # Exponential backoff

491

```

492

493

### Complete Real-time Application Example

494

495

```python

496

class RealtimeManager:

497

"""Comprehensive real-time functionality manager"""

498

499

def __init__(self, supabase_client):

500

self.client = supabase_client

501

self.channels = {}

502

503

def setup_database_sync(self, table_name):

504

"""Set up real-time database synchronization"""

505

channel = self.client.channel(f"db-{table_name}")

506

507

def handle_insert(payload):

508

print(f"New {table_name}: {payload['new']}")

509

510

def handle_update(payload):

511

print(f"Updated {table_name}: {payload['new']}")

512

513

def handle_delete(payload):

514

print(f"Deleted {table_name}: {payload['old']}")

515

516

channel.on("postgres_changes", {

517

"event": "INSERT",

518

"schema": "public",

519

"table": table_name

520

}, handle_insert)

521

522

channel.on("postgres_changes", {

523

"event": "UPDATE",

524

"schema": "public",

525

"table": table_name

526

}, handle_update)

527

528

channel.on("postgres_changes", {

529

"event": "DELETE",

530

"schema": "public",

531

"table": table_name

532

}, handle_delete)

533

534

channel.subscribe()

535

self.channels[f"db-{table_name}"] = channel

536

537

def setup_chat_room(self, room_id):

538

"""Set up chat room with messages and presence"""

539

channel = self.client.channel(f"chat-{room_id}")

540

541

# Message handling

542

def handle_message(payload):

543

msg = payload.get("message", {})

544

print(f"[{msg.get('user')}]: {msg.get('text')}")

545

546

channel.on("broadcast", {"event": "message"}, handle_message)

547

548

# Presence handling

549

def handle_user_join(payload):

550

user = payload.get("presence", {})

551

print(f"{user.get('username')} joined the chat")

552

553

def handle_user_leave(payload):

554

user = payload.get("presence", {})

555

print(f"{user.get('username')} left the chat")

556

557

channel.on("presence", {"event": "join"}, handle_user_join)

558

channel.on("presence", {"event": "leave"}, handle_user_leave)

559

560

channel.subscribe()

561

self.channels[f"chat-{room_id}"] = channel

562

563

return channel

564

565

def send_chat_message(self, room_id, user, message):

566

"""Send message to chat room"""

567

channel = self.channels.get(f"chat-{room_id}")

568

if channel:

569

channel.send("message", {

570

"user": user,

571

"text": message,

572

"timestamp": datetime.now().isoformat()

573

})

574

575

def join_chat_room(self, room_id, user_info):

576

"""Join chat room with user presence"""

577

channel = self.channels.get(f"chat-{room_id}")

578

if channel:

579

channel.track({

580

"user_id": user_info["id"],

581

"username": user_info["username"],

582

"status": "online"

583

})

584

585

def cleanup(self):

586

"""Clean up all channels"""

587

self.client.remove_all_channels()

588

self.channels.clear()

589

590

# Usage

591

realtime = RealtimeManager(supabase)

592

593

# Set up database sync

594

realtime.setup_database_sync("messages")

595

realtime.setup_database_sync("users")

596

597

# Set up chat room

598

chat_channel = realtime.setup_chat_room("general")

599

600

# Join and send messages

601

realtime.join_chat_room("general", {"id": "user1", "username": "Alice"})

602

realtime.send_chat_message("general", "Alice", "Hello everyone!")

603

604

# Cleanup when done

605

realtime.cleanup()

606

```

607

608

### Error Handling

609

610

Handle real-time connection and subscription errors.

611

612

```python { .api }

613

# Realtime exceptions (from realtime library)

614

class AuthorizationError(Exception):

615

"""Authentication/authorization errors for realtime connections"""

616

617

class NotConnectedError(Exception):

618

"""Errors when trying to use disconnected realtime channels"""

619

```

620

621

**Error Handling Examples:**

622

623

```python

624

from realtime import AuthorizationError, NotConnectedError

625

626

try:

627

channel = supabase.channel("restricted-channel")

628

channel.subscribe()

629

except AuthorizationError:

630

print("Not authorized to access this channel")

631

except Exception as e:

632

print(f"Failed to subscribe: {e}")

633

634

try:

635

channel.send("message", {"text": "Hello"})

636

except NotConnectedError:

637

print("Channel not connected, attempting to reconnect...")

638

try:

639

channel.subscribe()

640

channel.send("message", {"text": "Hello"})

641

except Exception as e:

642

print(f"Reconnection failed: {e}")

643

644

# Robust error handling

645

def safe_channel_operation(channel, operation, *args, **kwargs):

646

"""Safely perform channel operations with error handling"""

647

try:

648

return operation(*args, **kwargs)

649

except NotConnectedError:

650

print("Reconnecting channel...")

651

try:

652

channel.subscribe()

653

return operation(*args, **kwargs)

654

except Exception as e:

655

print(f"Reconnection failed: {e}")

656

raise

657

except AuthorizationError:

658

print("Authorization error - check permissions")

659

raise

660

except Exception as e:

661

print(f"Unexpected error: {e}")

662

raise

663

664

# Usage

665

channel = supabase.channel("safe-channel")

666

safe_channel_operation(channel, channel.subscribe)

667

safe_channel_operation(channel, channel.send, "message", {"text": "Hello"})

668

```