0
# Real-time Subscriptions
1
2
WebSocket-based real-time functionality including database change subscriptions, presence tracking, message broadcasting, and channel management. Provides live updates and interactive features through integration with Supabase Realtime.
3
4
## Capabilities
5
6
### Channel Management
7
8
Create and manage real-time channels for organizing different types of real-time communications.
9
10
```python { .api }
11
def channel(
12
self,
13
topic: str,
14
params: RealtimeChannelOptions = {}
15
) -> SyncRealtimeChannel | AsyncRealtimeChannel:
16
"""
17
Create a real-time channel for subscriptions and messaging.
18
19
Parameters:
20
- topic: Channel topic/name for identification
21
- params: Channel configuration options
22
23
Returns:
24
Realtime channel instance (sync or async) for subscribing to events
25
26
Note: Channels enable broadcast, presence, and postgres_changes functionality
27
"""
28
29
def get_channels(self) -> List[SyncRealtimeChannel | AsyncRealtimeChannel]:
30
"""
31
Get all active real-time channels.
32
33
Returns:
34
List of active channel instances
35
"""
36
37
def remove_channel(self, channel: SyncRealtimeChannel | AsyncRealtimeChannel) -> None:
38
"""
39
Unsubscribe and remove a specific real-time channel.
40
41
Parameters:
42
- channel: Channel instance to remove
43
44
Note: For AsyncClient, this method is async and returns a coroutine
45
"""
46
47
def remove_all_channels(self) -> None:
48
"""
49
Unsubscribe and remove all real-time channels.
50
51
Note: For AsyncClient, this method is async and returns a coroutine
52
"""
53
```
54
55
**Usage Examples:**
56
57
```python
58
# Create a channel
59
channel = supabase.channel("room-1")
60
61
# Create channel with options
62
channel = supabase.channel("game-lobby", {
63
"presence": {"key": "user_id"},
64
"broadcast": {"self": True}
65
})
66
67
# List all channels
68
channels = supabase.get_channels()
69
print(f"Active channels: {len(channels)}")
70
71
# Remove specific channel
72
supabase.remove_channel(channel)
73
74
# Remove all channels
75
supabase.remove_all_channels()
76
77
# Async channel management
78
async def manage_async_channels():
79
client = await create_async_client(url, key)
80
channel = client.channel("async-room")
81
82
# Remove async channel
83
await client.remove_channel(channel)
84
await client.remove_all_channels()
85
```
86
87
### Database Change Subscriptions
88
89
Subscribe to real-time database changes including inserts, updates, deletes, and specific table events.
90
91
```python { .api }
92
# Channel subscription methods (on channel instance)
93
def on(
94
self,
95
event: str,
96
filter: dict,
97
callback: Callable
98
) -> 'Channel':
99
"""
100
Subscribe to real-time events on the channel.
101
102
Parameters:
103
- event: Event type ("postgres_changes", "broadcast", "presence")
104
- filter: Event filter configuration
105
- callback: Function called when event occurs
106
107
Returns:
108
Channel instance for method chaining
109
"""
110
111
def subscribe(self, timeout: int = 10000) -> 'Channel':
112
"""
113
Start listening for events on the channel.
114
115
Parameters:
116
- timeout: Subscription timeout in milliseconds
117
118
Returns:
119
Channel instance
120
121
Note: Must be called after setting up event handlers
122
"""
123
124
def unsubscribe(self) -> None:
125
"""
126
Stop listening for events and close the channel.
127
"""
128
```
129
130
**Usage Examples:**
131
132
```python
133
# Subscribe to all changes in a table
134
def handle_changes(payload):
135
print(f"Change detected: {payload['eventType']}")
136
print(f"Table: {payload['table']}")
137
print(f"Data: {payload['new']}")
138
139
channel = supabase.channel("table-changes")
140
channel.on(
141
"postgres_changes",
142
{
143
"event": "*", # All events (INSERT, UPDATE, DELETE)
144
"schema": "public", # Database schema
145
"table": "countries" # Specific table
146
},
147
handle_changes
148
)
149
channel.subscribe()
150
151
# Subscribe to specific event types
152
def handle_inserts(payload):
153
print(f"New record: {payload['new']}")
154
155
def handle_updates(payload):
156
print(f"Updated: {payload['old']} -> {payload['new']}")
157
158
def handle_deletes(payload):
159
print(f"Deleted: {payload['old']}")
160
161
# Insert events
162
supabase.channel("inserts").on(
163
"postgres_changes",
164
{"event": "INSERT", "schema": "public", "table": "users"},
165
handle_inserts
166
).subscribe()
167
168
# Update events
169
supabase.channel("updates").on(
170
"postgres_changes",
171
{"event": "UPDATE", "schema": "public", "table": "users"},
172
handle_updates
173
).subscribe()
174
175
# Delete events
176
supabase.channel("deletes").on(
177
"postgres_changes",
178
{"event": "DELETE", "schema": "public", "table": "users"},
179
handle_deletes
180
).subscribe()
181
182
# Filter by specific columns
183
supabase.channel("user-status").on(
184
"postgres_changes",
185
{
186
"event": "UPDATE",
187
"schema": "public",
188
"table": "users",
189
"filter": "status=eq.active" # Only active users
190
},
191
lambda payload: print(f"Active user updated: {payload['new']['name']}")
192
).subscribe()
193
```
194
195
### Broadcast Messaging
196
197
Send and receive custom messages between clients through broadcast events.
198
199
```python { .api }
200
# Broadcasting methods (on channel instance)
201
def send(
202
self,
203
event_type: str,
204
payload: dict,
205
options: dict = None
206
) -> None:
207
"""
208
Send a broadcast message to all channel subscribers.
209
210
Parameters:
211
- event_type: Type of broadcast event
212
- payload: Message data to send
213
- options: Additional broadcast options
214
215
Note: For async channels, this method is async
216
"""
217
```
218
219
**Usage Examples:**
220
221
```python
222
# Set up broadcast listener
223
def handle_broadcast(payload):
224
print(f"Received broadcast: {payload}")
225
226
channel = supabase.channel("chat-room")
227
channel.on("broadcast", {"event": "message"}, handle_broadcast)
228
channel.subscribe()
229
230
# Send broadcast messages
231
channel.send("message", {
232
"user": "john_doe",
233
"text": "Hello everyone!",
234
"timestamp": "2023-12-01T10:00:00Z"
235
})
236
237
# Broadcast with different event types
238
channel.send("user_typing", {
239
"user": "john_doe",
240
"typing": True
241
})
242
243
channel.send("system_notification", {
244
"type": "maintenance",
245
"message": "System will restart in 5 minutes"
246
})
247
248
# Multiple event types on same channel
249
def handle_messages(payload):
250
print(f"Message: {payload['text']} from {payload['user']}")
251
252
def handle_typing(payload):
253
print(f"{payload['user']} is {'typing' if payload['typing'] else 'stopped typing'}")
254
255
channel = supabase.channel("multi-chat")
256
channel.on("broadcast", {"event": "message"}, handle_messages)
257
channel.on("broadcast", {"event": "typing"}, handle_typing)
258
channel.subscribe()
259
260
# Async broadcasting
261
async def async_broadcast():
262
client = await create_async_client(url, key)
263
channel = client.channel("async-chat")
264
265
await channel.send("message", {"text": "Async message"})
266
```
267
268
### Presence Tracking
269
270
Track user presence and status in real-time, showing who is online and their current state.
271
272
```python { .api }
273
# Presence methods (on channel instance)
274
def track(self, state: dict) -> None:
275
"""
276
Track user presence state on the channel.
277
278
Parameters:
279
- state: User state information to track
280
281
Note: For async channels, this method is async
282
"""
283
284
def untrack(self) -> None:
285
"""
286
Stop tracking user presence on the channel.
287
288
Note: For async channels, this method is async
289
"""
290
```
291
292
**Usage Examples:**
293
294
```python
295
# Track user presence
296
def handle_presence(payload):
297
print(f"Presence update: {payload}")
298
299
channel = supabase.channel("user-presence")
300
channel.on("presence", {"event": "sync"}, handle_presence)
301
channel.on("presence", {"event": "join"}, lambda p: print(f"User joined: {p}"))
302
channel.on("presence", {"event": "leave"}, lambda p: print(f"User left: {p}"))
303
channel.subscribe()
304
305
# Start tracking current user
306
channel.track({
307
"user_id": "user123",
308
"username": "john_doe",
309
"status": "online",
310
"last_seen": "2023-12-01T10:00:00Z"
311
})
312
313
# Update presence state
314
channel.track({
315
"user_id": "user123",
316
"username": "john_doe",
317
"status": "away",
318
"activity": "In a meeting"
319
})
320
321
# Stop tracking
322
channel.untrack()
323
324
# Detailed presence tracking
325
def handle_sync(payload):
326
"""Handle complete presence state sync"""
327
users_online = payload.get("presences", {})
328
print(f"Users online: {len(users_online)}")
329
for user_id, presence_data in users_online.items():
330
for presence in presence_data:
331
print(f"User {presence['username']}: {presence['status']}")
332
333
def handle_join(payload):
334
"""Handle user joining"""
335
user_data = payload["presence"]
336
print(f"{user_data['username']} joined the channel")
337
338
def handle_leave(payload):
339
"""Handle user leaving"""
340
user_data = payload["presence"]
341
print(f"{user_data['username']} left the channel")
342
343
channel = supabase.channel("game-lobby")
344
channel.on("presence", {"event": "sync"}, handle_sync)
345
channel.on("presence", {"event": "join"}, handle_join)
346
channel.on("presence", {"event": "leave"}, handle_leave)
347
channel.subscribe()
348
349
# Track game player status
350
channel.track({
351
"user_id": "player1",
352
"username": "Alice",
353
"status": "ready",
354
"character": "mage",
355
"level": 15
356
})
357
```
358
359
### Channel Configuration
360
361
Configure channel behavior and options for different use cases.
362
363
```python { .api }
364
# Channel configuration options
365
class RealtimeChannelOptions(TypedDict, total=False):
366
"""Configuration options for realtime channels."""
367
368
auto_reconnect: bool
369
"""Automatically reconnect on connection loss"""
370
371
hb_interval: int
372
"""Heartbeat interval in milliseconds"""
373
374
max_retries: int
375
"""Maximum reconnection attempts"""
376
377
initial_backoff: float
378
"""Initial backoff delay for reconnections"""
379
```
380
381
**Usage Examples:**
382
383
```python
384
# Configure channel with custom options
385
channel_options = {
386
"auto_reconnect": True,
387
"hb_interval": 30000, # 30 seconds
388
"max_retries": 5,
389
"initial_backoff": 1.0
390
}
391
392
channel = supabase.channel("reliable-channel", channel_options)
393
394
# Different configurations for different use cases
395
# High-frequency trading data
396
trading_channel = supabase.channel("trading-data", {
397
"hb_interval": 5000, # 5 seconds for quick detection
398
"max_retries": 10, # More retries for critical data
399
"auto_reconnect": True
400
})
401
402
# Chat application
403
chat_channel = supabase.channel("chat", {
404
"hb_interval": 60000, # 1 minute is fine for chat
405
"max_retries": 3, # Fewer retries for user experience
406
"auto_reconnect": True
407
})
408
409
# Gaming leaderboard
410
game_channel = supabase.channel("leaderboard", {
411
"hb_interval": 15000, # 15 seconds for game updates
412
"max_retries": 5,
413
"auto_reconnect": True
414
})
415
```
416
417
### Connection Management
418
419
Handle connection states, errors, and reconnection scenarios.
420
421
```python { .api }
422
# Connection event handlers
423
def on_error(self, callback: Callable) -> 'Channel':
424
"""
425
Handle channel connection errors.
426
427
Parameters:
428
- callback: Function called on connection errors
429
"""
430
431
def on_close(self, callback: Callable) -> 'Channel':
432
"""
433
Handle channel close events.
434
435
Parameters:
436
- callback: Function called when channel closes
437
"""
438
```
439
440
**Usage Examples:**
441
442
```python
443
def handle_error(error):
444
print(f"Channel error: {error}")
445
# Implement error recovery logic
446
447
def handle_close():
448
print("Channel closed")
449
# Clean up resources or attempt reconnection
450
451
def handle_connection_state(state):
452
print(f"Connection state: {state}")
453
if state == "connected":
454
print("Successfully connected to realtime")
455
elif state == "disconnected":
456
print("Lost connection to realtime")
457
458
# Set up connection handlers
459
channel = supabase.channel("monitored-channel")
460
channel.on_error(handle_error)
461
channel.on_close(handle_close)
462
463
# Monitor overall realtime connection
464
supabase.realtime.on_open(lambda: print("Realtime connection opened"))
465
supabase.realtime.on_close(lambda: print("Realtime connection closed"))
466
supabase.realtime.on_error(lambda e: print(f"Realtime error: {e}"))
467
468
# Graceful shutdown
469
def cleanup_realtime():
470
"""Clean up all realtime resources"""
471
try:
472
supabase.remove_all_channels()
473
print("All channels removed successfully")
474
except Exception as e:
475
print(f"Error during cleanup: {e}")
476
477
# Connection with retry logic
478
def connect_with_retry(channel, max_attempts=3):
479
"""Connect channel with retry logic"""
480
for attempt in range(max_attempts):
481
try:
482
channel.subscribe()
483
print(f"Connected on attempt {attempt + 1}")
484
break
485
except Exception as e:
486
print(f"Attempt {attempt + 1} failed: {e}")
487
if attempt == max_attempts - 1:
488
print("Max attempts reached, giving up")
489
raise
490
time.sleep(2 ** attempt) # Exponential backoff
491
```
492
493
### Complete Real-time Application Example
494
495
```python
496
class RealtimeManager:
497
"""Comprehensive real-time functionality manager"""
498
499
def __init__(self, supabase_client):
500
self.client = supabase_client
501
self.channels = {}
502
503
def setup_database_sync(self, table_name):
504
"""Set up real-time database synchronization"""
505
channel = self.client.channel(f"db-{table_name}")
506
507
def handle_insert(payload):
508
print(f"New {table_name}: {payload['new']}")
509
510
def handle_update(payload):
511
print(f"Updated {table_name}: {payload['new']}")
512
513
def handle_delete(payload):
514
print(f"Deleted {table_name}: {payload['old']}")
515
516
channel.on("postgres_changes", {
517
"event": "INSERT",
518
"schema": "public",
519
"table": table_name
520
}, handle_insert)
521
522
channel.on("postgres_changes", {
523
"event": "UPDATE",
524
"schema": "public",
525
"table": table_name
526
}, handle_update)
527
528
channel.on("postgres_changes", {
529
"event": "DELETE",
530
"schema": "public",
531
"table": table_name
532
}, handle_delete)
533
534
channel.subscribe()
535
self.channels[f"db-{table_name}"] = channel
536
537
def setup_chat_room(self, room_id):
538
"""Set up chat room with messages and presence"""
539
channel = self.client.channel(f"chat-{room_id}")
540
541
# Message handling
542
def handle_message(payload):
543
msg = payload.get("message", {})
544
print(f"[{msg.get('user')}]: {msg.get('text')}")
545
546
channel.on("broadcast", {"event": "message"}, handle_message)
547
548
# Presence handling
549
def handle_user_join(payload):
550
user = payload.get("presence", {})
551
print(f"{user.get('username')} joined the chat")
552
553
def handle_user_leave(payload):
554
user = payload.get("presence", {})
555
print(f"{user.get('username')} left the chat")
556
557
channel.on("presence", {"event": "join"}, handle_user_join)
558
channel.on("presence", {"event": "leave"}, handle_user_leave)
559
560
channel.subscribe()
561
self.channels[f"chat-{room_id}"] = channel
562
563
return channel
564
565
def send_chat_message(self, room_id, user, message):
566
"""Send message to chat room"""
567
channel = self.channels.get(f"chat-{room_id}")
568
if channel:
569
channel.send("message", {
570
"user": user,
571
"text": message,
572
"timestamp": datetime.now().isoformat()
573
})
574
575
def join_chat_room(self, room_id, user_info):
576
"""Join chat room with user presence"""
577
channel = self.channels.get(f"chat-{room_id}")
578
if channel:
579
channel.track({
580
"user_id": user_info["id"],
581
"username": user_info["username"],
582
"status": "online"
583
})
584
585
def cleanup(self):
586
"""Clean up all channels"""
587
self.client.remove_all_channels()
588
self.channels.clear()
589
590
# Usage
591
realtime = RealtimeManager(supabase)
592
593
# Set up database sync
594
realtime.setup_database_sync("messages")
595
realtime.setup_database_sync("users")
596
597
# Set up chat room
598
chat_channel = realtime.setup_chat_room("general")
599
600
# Join and send messages
601
realtime.join_chat_room("general", {"id": "user1", "username": "Alice"})
602
realtime.send_chat_message("general", "Alice", "Hello everyone!")
603
604
# Cleanup when done
605
realtime.cleanup()
606
```
607
608
### Error Handling
609
610
Handle real-time connection and subscription errors.
611
612
```python { .api }
613
# Realtime exceptions (from realtime library)
614
class AuthorizationError(Exception):
615
"""Authentication/authorization errors for realtime connections"""
616
617
class NotConnectedError(Exception):
618
"""Errors when trying to use disconnected realtime channels"""
619
```
620
621
**Error Handling Examples:**
622
623
```python
624
from realtime import AuthorizationError, NotConnectedError
625
626
try:
627
channel = supabase.channel("restricted-channel")
628
channel.subscribe()
629
except AuthorizationError:
630
print("Not authorized to access this channel")
631
except Exception as e:
632
print(f"Failed to subscribe: {e}")
633
634
try:
635
channel.send("message", {"text": "Hello"})
636
except NotConnectedError:
637
print("Channel not connected, attempting to reconnect...")
638
try:
639
channel.subscribe()
640
channel.send("message", {"text": "Hello"})
641
except Exception as e:
642
print(f"Reconnection failed: {e}")
643
644
# Robust error handling
645
def safe_channel_operation(channel, operation, *args, **kwargs):
646
"""Safely perform channel operations with error handling"""
647
try:
648
return operation(*args, **kwargs)
649
except NotConnectedError:
650
print("Reconnecting channel...")
651
try:
652
channel.subscribe()
653
return operation(*args, **kwargs)
654
except Exception as e:
655
print(f"Reconnection failed: {e}")
656
raise
657
except AuthorizationError:
658
print("Authorization error - check permissions")
659
raise
660
except Exception as e:
661
print(f"Unexpected error: {e}")
662
raise
663
664
# Usage
665
channel = supabase.channel("safe-channel")
666
safe_channel_operation(channel, channel.subscribe)
667
safe_channel_operation(channel, channel.send, "message", {"text": "Hello"})
668
```