Supabase client for Python providing database operations, authentication, storage, real-time subscriptions, and edge functions.
WebSocket-based real-time functionality including database change subscriptions, presence tracking, message broadcasting, and channel management. Provides live updates and interactive features through integration with Supabase Realtime.
Create and manage real-time channels for organizing different types of real-time communications.
def channel(
self,
topic: str,
params: RealtimeChannelOptions = {}
) -> SyncRealtimeChannel | AsyncRealtimeChannel:
"""
Create a real-time channel for subscriptions and messaging.
Parameters:
- topic: Channel topic/name for identification
- params: Channel configuration options
Returns:
Realtime channel instance (sync or async) for subscribing to events
Note: Channels enable broadcast, presence, and postgres_changes functionality
"""
def get_channels(self) -> List[SyncRealtimeChannel | AsyncRealtimeChannel]:
"""
Get all active real-time channels.
Returns:
List of active channel instances
"""
def remove_channel(self, channel: SyncRealtimeChannel | AsyncRealtimeChannel) -> None:
"""
Unsubscribe and remove a specific real-time channel.
Parameters:
- channel: Channel instance to remove
Note: For AsyncClient, this method is async and returns a coroutine
"""
def remove_all_channels(self) -> None:
"""
Unsubscribe and remove all real-time channels.
Note: For AsyncClient, this method is async and returns a coroutine
"""Usage Examples:
# Create a channel
channel = supabase.channel("room-1")
# Create channel with options
channel = supabase.channel("game-lobby", {
"presence": {"key": "user_id"},
"broadcast": {"self": True}
})
# List all channels
channels = supabase.get_channels()
print(f"Active channels: {len(channels)}")
# Remove specific channel
supabase.remove_channel(channel)
# Remove all channels
supabase.remove_all_channels()
# Async channel management
async def manage_async_channels():
client = await create_async_client(url, key)
channel = client.channel("async-room")
# Remove async channel
await client.remove_channel(channel)
await client.remove_all_channels()Subscribe to real-time database changes including inserts, updates, deletes, and specific table events.
# Channel subscription methods (on channel instance)
def on(
self,
event: str,
filter: dict,
callback: Callable
) -> 'Channel':
"""
Subscribe to real-time events on the channel.
Parameters:
- event: Event type ("postgres_changes", "broadcast", "presence")
- filter: Event filter configuration
- callback: Function called when event occurs
Returns:
Channel instance for method chaining
"""
def subscribe(self, timeout: int = 10000) -> 'Channel':
"""
Start listening for events on the channel.
Parameters:
- timeout: Subscription timeout in milliseconds
Returns:
Channel instance
Note: Must be called after setting up event handlers
"""
def unsubscribe(self) -> None:
"""
Stop listening for events and close the channel.
"""Usage Examples:
# Subscribe to all changes in a table
def handle_changes(payload):
print(f"Change detected: {payload['eventType']}")
print(f"Table: {payload['table']}")
print(f"Data: {payload['new']}")
channel = supabase.channel("table-changes")
channel.on(
"postgres_changes",
{
"event": "*", # All events (INSERT, UPDATE, DELETE)
"schema": "public", # Database schema
"table": "countries" # Specific table
},
handle_changes
)
channel.subscribe()
# Subscribe to specific event types
def handle_inserts(payload):
print(f"New record: {payload['new']}")
def handle_updates(payload):
print(f"Updated: {payload['old']} -> {payload['new']}")
def handle_deletes(payload):
print(f"Deleted: {payload['old']}")
# Insert events
supabase.channel("inserts").on(
"postgres_changes",
{"event": "INSERT", "schema": "public", "table": "users"},
handle_inserts
).subscribe()
# Update events
supabase.channel("updates").on(
"postgres_changes",
{"event": "UPDATE", "schema": "public", "table": "users"},
handle_updates
).subscribe()
# Delete events
supabase.channel("deletes").on(
"postgres_changes",
{"event": "DELETE", "schema": "public", "table": "users"},
handle_deletes
).subscribe()
# Filter by specific columns
supabase.channel("user-status").on(
"postgres_changes",
{
"event": "UPDATE",
"schema": "public",
"table": "users",
"filter": "status=eq.active" # Only active users
},
lambda payload: print(f"Active user updated: {payload['new']['name']}")
).subscribe()Send and receive custom messages between clients through broadcast events.
# Broadcasting methods (on channel instance)
def send(
self,
event_type: str,
payload: dict,
options: dict = None
) -> None:
"""
Send a broadcast message to all channel subscribers.
Parameters:
- event_type: Type of broadcast event
- payload: Message data to send
- options: Additional broadcast options
Note: For async channels, this method is async
"""Usage Examples:
# Set up broadcast listener
def handle_broadcast(payload):
print(f"Received broadcast: {payload}")
channel = supabase.channel("chat-room")
channel.on("broadcast", {"event": "message"}, handle_broadcast)
channel.subscribe()
# Send broadcast messages
channel.send("message", {
"user": "john_doe",
"text": "Hello everyone!",
"timestamp": "2023-12-01T10:00:00Z"
})
# Broadcast with different event types
channel.send("user_typing", {
"user": "john_doe",
"typing": True
})
channel.send("system_notification", {
"type": "maintenance",
"message": "System will restart in 5 minutes"
})
# Multiple event types on same channel
def handle_messages(payload):
print(f"Message: {payload['text']} from {payload['user']}")
def handle_typing(payload):
print(f"{payload['user']} is {'typing' if payload['typing'] else 'stopped typing'}")
channel = supabase.channel("multi-chat")
channel.on("broadcast", {"event": "message"}, handle_messages)
channel.on("broadcast", {"event": "typing"}, handle_typing)
channel.subscribe()
# Async broadcasting
async def async_broadcast():
client = await create_async_client(url, key)
channel = client.channel("async-chat")
await channel.send("message", {"text": "Async message"})Track user presence and status in real-time, showing who is online and their current state.
# Presence methods (on channel instance)
def track(self, state: dict) -> None:
"""
Track user presence state on the channel.
Parameters:
- state: User state information to track
Note: For async channels, this method is async
"""
def untrack(self) -> None:
"""
Stop tracking user presence on the channel.
Note: For async channels, this method is async
"""Usage Examples:
# Track user presence
def handle_presence(payload):
print(f"Presence update: {payload}")
channel = supabase.channel("user-presence")
channel.on("presence", {"event": "sync"}, handle_presence)
channel.on("presence", {"event": "join"}, lambda p: print(f"User joined: {p}"))
channel.on("presence", {"event": "leave"}, lambda p: print(f"User left: {p}"))
channel.subscribe()
# Start tracking current user
channel.track({
"user_id": "user123",
"username": "john_doe",
"status": "online",
"last_seen": "2023-12-01T10:00:00Z"
})
# Update presence state
channel.track({
"user_id": "user123",
"username": "john_doe",
"status": "away",
"activity": "In a meeting"
})
# Stop tracking
channel.untrack()
# Detailed presence tracking
def handle_sync(payload):
"""Handle complete presence state sync"""
users_online = payload.get("presences", {})
print(f"Users online: {len(users_online)}")
for user_id, presence_data in users_online.items():
for presence in presence_data:
print(f"User {presence['username']}: {presence['status']}")
def handle_join(payload):
"""Handle user joining"""
user_data = payload["presence"]
print(f"{user_data['username']} joined the channel")
def handle_leave(payload):
"""Handle user leaving"""
user_data = payload["presence"]
print(f"{user_data['username']} left the channel")
channel = supabase.channel("game-lobby")
channel.on("presence", {"event": "sync"}, handle_sync)
channel.on("presence", {"event": "join"}, handle_join)
channel.on("presence", {"event": "leave"}, handle_leave)
channel.subscribe()
# Track game player status
channel.track({
"user_id": "player1",
"username": "Alice",
"status": "ready",
"character": "mage",
"level": 15
})Configure channel behavior and options for different use cases.
# Channel configuration options
class RealtimeChannelOptions(TypedDict, total=False):
"""Configuration options for realtime channels."""
auto_reconnect: bool
"""Automatically reconnect on connection loss"""
hb_interval: int
"""Heartbeat interval in milliseconds"""
max_retries: int
"""Maximum reconnection attempts"""
initial_backoff: float
"""Initial backoff delay for reconnections"""Usage Examples:
# Configure channel with custom options
channel_options = {
"auto_reconnect": True,
"hb_interval": 30000, # 30 seconds
"max_retries": 5,
"initial_backoff": 1.0
}
channel = supabase.channel("reliable-channel", channel_options)
# Different configurations for different use cases
# High-frequency trading data
trading_channel = supabase.channel("trading-data", {
"hb_interval": 5000, # 5 seconds for quick detection
"max_retries": 10, # More retries for critical data
"auto_reconnect": True
})
# Chat application
chat_channel = supabase.channel("chat", {
"hb_interval": 60000, # 1 minute is fine for chat
"max_retries": 3, # Fewer retries for user experience
"auto_reconnect": True
})
# Gaming leaderboard
game_channel = supabase.channel("leaderboard", {
"hb_interval": 15000, # 15 seconds for game updates
"max_retries": 5,
"auto_reconnect": True
})Handle connection states, errors, and reconnection scenarios.
# Connection event handlers
def on_error(self, callback: Callable) -> 'Channel':
"""
Handle channel connection errors.
Parameters:
- callback: Function called on connection errors
"""
def on_close(self, callback: Callable) -> 'Channel':
"""
Handle channel close events.
Parameters:
- callback: Function called when channel closes
"""Usage Examples:
def handle_error(error):
print(f"Channel error: {error}")
# Implement error recovery logic
def handle_close():
print("Channel closed")
# Clean up resources or attempt reconnection
def handle_connection_state(state):
print(f"Connection state: {state}")
if state == "connected":
print("Successfully connected to realtime")
elif state == "disconnected":
print("Lost connection to realtime")
# Set up connection handlers
channel = supabase.channel("monitored-channel")
channel.on_error(handle_error)
channel.on_close(handle_close)
# Monitor overall realtime connection
supabase.realtime.on_open(lambda: print("Realtime connection opened"))
supabase.realtime.on_close(lambda: print("Realtime connection closed"))
supabase.realtime.on_error(lambda e: print(f"Realtime error: {e}"))
# Graceful shutdown
def cleanup_realtime():
"""Clean up all realtime resources"""
try:
supabase.remove_all_channels()
print("All channels removed successfully")
except Exception as e:
print(f"Error during cleanup: {e}")
# Connection with retry logic
def connect_with_retry(channel, max_attempts=3):
"""Connect channel with retry logic"""
for attempt in range(max_attempts):
try:
channel.subscribe()
print(f"Connected on attempt {attempt + 1}")
break
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt == max_attempts - 1:
print("Max attempts reached, giving up")
raise
time.sleep(2 ** attempt) # Exponential backoffclass RealtimeManager:
"""Comprehensive real-time functionality manager"""
def __init__(self, supabase_client):
self.client = supabase_client
self.channels = {}
def setup_database_sync(self, table_name):
"""Set up real-time database synchronization"""
channel = self.client.channel(f"db-{table_name}")
def handle_insert(payload):
print(f"New {table_name}: {payload['new']}")
def handle_update(payload):
print(f"Updated {table_name}: {payload['new']}")
def handle_delete(payload):
print(f"Deleted {table_name}: {payload['old']}")
channel.on("postgres_changes", {
"event": "INSERT",
"schema": "public",
"table": table_name
}, handle_insert)
channel.on("postgres_changes", {
"event": "UPDATE",
"schema": "public",
"table": table_name
}, handle_update)
channel.on("postgres_changes", {
"event": "DELETE",
"schema": "public",
"table": table_name
}, handle_delete)
channel.subscribe()
self.channels[f"db-{table_name}"] = channel
def setup_chat_room(self, room_id):
"""Set up chat room with messages and presence"""
channel = self.client.channel(f"chat-{room_id}")
# Message handling
def handle_message(payload):
msg = payload.get("message", {})
print(f"[{msg.get('user')}]: {msg.get('text')}")
channel.on("broadcast", {"event": "message"}, handle_message)
# Presence handling
def handle_user_join(payload):
user = payload.get("presence", {})
print(f"{user.get('username')} joined the chat")
def handle_user_leave(payload):
user = payload.get("presence", {})
print(f"{user.get('username')} left the chat")
channel.on("presence", {"event": "join"}, handle_user_join)
channel.on("presence", {"event": "leave"}, handle_user_leave)
channel.subscribe()
self.channels[f"chat-{room_id}"] = channel
return channel
def send_chat_message(self, room_id, user, message):
"""Send message to chat room"""
channel = self.channels.get(f"chat-{room_id}")
if channel:
channel.send("message", {
"user": user,
"text": message,
"timestamp": datetime.now().isoformat()
})
def join_chat_room(self, room_id, user_info):
"""Join chat room with user presence"""
channel = self.channels.get(f"chat-{room_id}")
if channel:
channel.track({
"user_id": user_info["id"],
"username": user_info["username"],
"status": "online"
})
def cleanup(self):
"""Clean up all channels"""
self.client.remove_all_channels()
self.channels.clear()
# Usage
realtime = RealtimeManager(supabase)
# Set up database sync
realtime.setup_database_sync("messages")
realtime.setup_database_sync("users")
# Set up chat room
chat_channel = realtime.setup_chat_room("general")
# Join and send messages
realtime.join_chat_room("general", {"id": "user1", "username": "Alice"})
realtime.send_chat_message("general", "Alice", "Hello everyone!")
# Cleanup when done
realtime.cleanup()Handle real-time connection and subscription errors.
# Realtime exceptions (from realtime library)
class AuthorizationError(Exception):
"""Authentication/authorization errors for realtime connections"""
class NotConnectedError(Exception):
"""Errors when trying to use disconnected realtime channels"""Error Handling Examples:
from realtime import AuthorizationError, NotConnectedError
try:
channel = supabase.channel("restricted-channel")
channel.subscribe()
except AuthorizationError:
print("Not authorized to access this channel")
except Exception as e:
print(f"Failed to subscribe: {e}")
try:
channel.send("message", {"text": "Hello"})
except NotConnectedError:
print("Channel not connected, attempting to reconnect...")
try:
channel.subscribe()
channel.send("message", {"text": "Hello"})
except Exception as e:
print(f"Reconnection failed: {e}")
# Robust error handling
def safe_channel_operation(channel, operation, *args, **kwargs):
"""Safely perform channel operations with error handling"""
try:
return operation(*args, **kwargs)
except NotConnectedError:
print("Reconnecting channel...")
try:
channel.subscribe()
return operation(*args, **kwargs)
except Exception as e:
print(f"Reconnection failed: {e}")
raise
except AuthorizationError:
print("Authorization error - check permissions")
raise
except Exception as e:
print(f"Unexpected error: {e}")
raise
# Usage
channel = supabase.channel("safe-channel")
safe_channel_operation(channel, channel.subscribe)
safe_channel_operation(channel, channel.send, "message", {"text": "Hello"})Install with Tessl CLI
npx tessl i tessl/pypi-supabase