CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-supabase

Supabase client for Python providing database operations, authentication, storage, real-time subscriptions, and edge functions.

Overview
Eval results
Files

realtime-subscriptions.mddocs/

Real-time Subscriptions

WebSocket-based real-time functionality including database change subscriptions, presence tracking, message broadcasting, and channel management. Provides live updates and interactive features through integration with Supabase Realtime.

Capabilities

Channel Management

Create and manage real-time channels for organizing different types of real-time communications.

def channel(
    self, 
    topic: str, 
    params: RealtimeChannelOptions = {}
) -> SyncRealtimeChannel | AsyncRealtimeChannel:
    """
    Create a real-time channel for subscriptions and messaging.

    Parameters:
    - topic: Channel topic/name for identification
    - params: Channel configuration options

    Returns:
    Realtime channel instance (sync or async) for subscribing to events

    Note: Channels enable broadcast, presence, and postgres_changes functionality
    """

def get_channels(self) -> List[SyncRealtimeChannel | AsyncRealtimeChannel]:
    """
    Get all active real-time channels.

    Returns:
    List of active channel instances
    """

def remove_channel(self, channel: SyncRealtimeChannel | AsyncRealtimeChannel) -> None:
    """
    Unsubscribe and remove a specific real-time channel.

    Parameters:
    - channel: Channel instance to remove

    Note: For AsyncClient, this method is async and returns a coroutine
    """

def remove_all_channels(self) -> None:
    """
    Unsubscribe and remove all real-time channels.

    Note: For AsyncClient, this method is async and returns a coroutine
    """

Usage Examples:

# Create a channel
channel = supabase.channel("room-1")

# Create channel with options
channel = supabase.channel("game-lobby", {
    "presence": {"key": "user_id"},
    "broadcast": {"self": True}
})

# List all channels
channels = supabase.get_channels()
print(f"Active channels: {len(channels)}")

# Remove specific channel
supabase.remove_channel(channel)

# Remove all channels
supabase.remove_all_channels()

# Async channel management
async def manage_async_channels():
    client = await create_async_client(url, key)
    channel = client.channel("async-room")
    
    # Remove async channel
    await client.remove_channel(channel)
    await client.remove_all_channels()

Database Change Subscriptions

Subscribe to real-time database changes including inserts, updates, deletes, and specific table events.

# Channel subscription methods (on channel instance)
def on(
    self,
    event: str,
    filter: dict,
    callback: Callable
) -> 'Channel':
    """
    Subscribe to real-time events on the channel.

    Parameters:
    - event: Event type ("postgres_changes", "broadcast", "presence")
    - filter: Event filter configuration
    - callback: Function called when event occurs

    Returns:
    Channel instance for method chaining
    """

def subscribe(self, timeout: int = 10000) -> 'Channel':
    """
    Start listening for events on the channel.

    Parameters:
    - timeout: Subscription timeout in milliseconds

    Returns:
    Channel instance

    Note: Must be called after setting up event handlers
    """

def unsubscribe(self) -> None:
    """
    Stop listening for events and close the channel.
    """

Usage Examples:

# Subscribe to all changes in a table
def handle_changes(payload):
    print(f"Change detected: {payload['eventType']}")
    print(f"Table: {payload['table']}")
    print(f"Data: {payload['new']}")

channel = supabase.channel("table-changes")
channel.on(
    "postgres_changes",
    {
        "event": "*",         # All events (INSERT, UPDATE, DELETE)
        "schema": "public",   # Database schema
        "table": "countries"  # Specific table
    },
    handle_changes
)
channel.subscribe()

# Subscribe to specific event types
def handle_inserts(payload):
    print(f"New record: {payload['new']}")

def handle_updates(payload):
    print(f"Updated: {payload['old']} -> {payload['new']}")

def handle_deletes(payload):
    print(f"Deleted: {payload['old']}")

# Insert events
supabase.channel("inserts").on(
    "postgres_changes",
    {"event": "INSERT", "schema": "public", "table": "users"},
    handle_inserts
).subscribe()

# Update events
supabase.channel("updates").on(
    "postgres_changes", 
    {"event": "UPDATE", "schema": "public", "table": "users"},
    handle_updates
).subscribe()

# Delete events  
supabase.channel("deletes").on(
    "postgres_changes",
    {"event": "DELETE", "schema": "public", "table": "users"},
    handle_deletes
).subscribe()

# Filter by specific columns
supabase.channel("user-status").on(
    "postgres_changes",
    {
        "event": "UPDATE",
        "schema": "public", 
        "table": "users",
        "filter": "status=eq.active"  # Only active users
    },
    lambda payload: print(f"Active user updated: {payload['new']['name']}")
).subscribe()

Broadcast Messaging

Send and receive custom messages between clients through broadcast events.

# Broadcasting methods (on channel instance)
def send(
    self,
    event_type: str,
    payload: dict,
    options: dict = None
) -> None:
    """
    Send a broadcast message to all channel subscribers.

    Parameters:
    - event_type: Type of broadcast event
    - payload: Message data to send
    - options: Additional broadcast options

    Note: For async channels, this method is async
    """

Usage Examples:

# Set up broadcast listener
def handle_broadcast(payload):
    print(f"Received broadcast: {payload}")

channel = supabase.channel("chat-room")
channel.on("broadcast", {"event": "message"}, handle_broadcast)
channel.subscribe()

# Send broadcast messages
channel.send("message", {
    "user": "john_doe",
    "text": "Hello everyone!",
    "timestamp": "2023-12-01T10:00:00Z"
})

# Broadcast with different event types
channel.send("user_typing", {
    "user": "john_doe",
    "typing": True
})

channel.send("system_notification", {
    "type": "maintenance",
    "message": "System will restart in 5 minutes"
})

# Multiple event types on same channel
def handle_messages(payload):
    print(f"Message: {payload['text']} from {payload['user']}")

def handle_typing(payload):
    print(f"{payload['user']} is {'typing' if payload['typing'] else 'stopped typing'}")

channel = supabase.channel("multi-chat")
channel.on("broadcast", {"event": "message"}, handle_messages)
channel.on("broadcast", {"event": "typing"}, handle_typing)
channel.subscribe()

# Async broadcasting
async def async_broadcast():
    client = await create_async_client(url, key)
    channel = client.channel("async-chat")
    
    await channel.send("message", {"text": "Async message"})

Presence Tracking

Track user presence and status in real-time, showing who is online and their current state.

# Presence methods (on channel instance)  
def track(self, state: dict) -> None:
    """
    Track user presence state on the channel.

    Parameters:
    - state: User state information to track

    Note: For async channels, this method is async
    """

def untrack(self) -> None:
    """
    Stop tracking user presence on the channel.

    Note: For async channels, this method is async
    """

Usage Examples:

# Track user presence
def handle_presence(payload):
    print(f"Presence update: {payload}")

channel = supabase.channel("user-presence")
channel.on("presence", {"event": "sync"}, handle_presence)
channel.on("presence", {"event": "join"}, lambda p: print(f"User joined: {p}"))
channel.on("presence", {"event": "leave"}, lambda p: print(f"User left: {p}"))
channel.subscribe()

# Start tracking current user
channel.track({
    "user_id": "user123",
    "username": "john_doe", 
    "status": "online",
    "last_seen": "2023-12-01T10:00:00Z"
})

# Update presence state
channel.track({
    "user_id": "user123",
    "username": "john_doe",
    "status": "away", 
    "activity": "In a meeting"
})

# Stop tracking
channel.untrack()

# Detailed presence tracking
def handle_sync(payload):
    """Handle complete presence state sync"""
    users_online = payload.get("presences", {})
    print(f"Users online: {len(users_online)}")
    for user_id, presence_data in users_online.items():
        for presence in presence_data:
            print(f"User {presence['username']}: {presence['status']}")

def handle_join(payload):
    """Handle user joining"""
    user_data = payload["presence"]
    print(f"{user_data['username']} joined the channel")

def handle_leave(payload):
    """Handle user leaving"""
    user_data = payload["presence"]
    print(f"{user_data['username']} left the channel")

channel = supabase.channel("game-lobby")
channel.on("presence", {"event": "sync"}, handle_sync)
channel.on("presence", {"event": "join"}, handle_join)
channel.on("presence", {"event": "leave"}, handle_leave)
channel.subscribe()

# Track game player status
channel.track({
    "user_id": "player1",
    "username": "Alice",
    "status": "ready",
    "character": "mage",
    "level": 15
})

Channel Configuration

Configure channel behavior and options for different use cases.

# Channel configuration options
class RealtimeChannelOptions(TypedDict, total=False):
    """Configuration options for realtime channels."""
    
    auto_reconnect: bool
    """Automatically reconnect on connection loss"""
    
    hb_interval: int  
    """Heartbeat interval in milliseconds"""
    
    max_retries: int
    """Maximum reconnection attempts"""
    
    initial_backoff: float
    """Initial backoff delay for reconnections"""

Usage Examples:

# Configure channel with custom options
channel_options = {
    "auto_reconnect": True,
    "hb_interval": 30000,  # 30 seconds
    "max_retries": 5,
    "initial_backoff": 1.0
}

channel = supabase.channel("reliable-channel", channel_options)

# Different configurations for different use cases
# High-frequency trading data
trading_channel = supabase.channel("trading-data", {
    "hb_interval": 5000,   # 5 seconds for quick detection
    "max_retries": 10,     # More retries for critical data
    "auto_reconnect": True
})

# Chat application
chat_channel = supabase.channel("chat", {
    "hb_interval": 60000,  # 1 minute is fine for chat
    "max_retries": 3,      # Fewer retries for user experience
    "auto_reconnect": True
})

# Gaming leaderboard
game_channel = supabase.channel("leaderboard", {
    "hb_interval": 15000,  # 15 seconds for game updates
    "max_retries": 5,
    "auto_reconnect": True
})

Connection Management

Handle connection states, errors, and reconnection scenarios.

# Connection event handlers
def on_error(self, callback: Callable) -> 'Channel':
    """
    Handle channel connection errors.

    Parameters:
    - callback: Function called on connection errors
    """

def on_close(self, callback: Callable) -> 'Channel':
    """
    Handle channel close events.

    Parameters:
    - callback: Function called when channel closes
    """

Usage Examples:

def handle_error(error):
    print(f"Channel error: {error}")
    # Implement error recovery logic

def handle_close():
    print("Channel closed")
    # Clean up resources or attempt reconnection

def handle_connection_state(state):
    print(f"Connection state: {state}")
    if state == "connected":
        print("Successfully connected to realtime")
    elif state == "disconnected":
        print("Lost connection to realtime")

# Set up connection handlers
channel = supabase.channel("monitored-channel")
channel.on_error(handle_error)
channel.on_close(handle_close)

# Monitor overall realtime connection
supabase.realtime.on_open(lambda: print("Realtime connection opened"))
supabase.realtime.on_close(lambda: print("Realtime connection closed"))
supabase.realtime.on_error(lambda e: print(f"Realtime error: {e}"))

# Graceful shutdown
def cleanup_realtime():
    """Clean up all realtime resources"""
    try:
        supabase.remove_all_channels()
        print("All channels removed successfully")
    except Exception as e:
        print(f"Error during cleanup: {e}")

# Connection with retry logic
def connect_with_retry(channel, max_attempts=3):
    """Connect channel with retry logic"""
    for attempt in range(max_attempts):
        try:
            channel.subscribe()
            print(f"Connected on attempt {attempt + 1}")
            break
        except Exception as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt == max_attempts - 1:
                print("Max attempts reached, giving up")
                raise
            time.sleep(2 ** attempt)  # Exponential backoff

Complete Real-time Application Example

class RealtimeManager:
    """Comprehensive real-time functionality manager"""
    
    def __init__(self, supabase_client):
        self.client = supabase_client
        self.channels = {}
        
    def setup_database_sync(self, table_name):
        """Set up real-time database synchronization"""
        channel = self.client.channel(f"db-{table_name}")
        
        def handle_insert(payload):
            print(f"New {table_name}: {payload['new']}")
            
        def handle_update(payload):
            print(f"Updated {table_name}: {payload['new']}")
            
        def handle_delete(payload):
            print(f"Deleted {table_name}: {payload['old']}")
            
        channel.on("postgres_changes", {
            "event": "INSERT",
            "schema": "public",
            "table": table_name
        }, handle_insert)
        
        channel.on("postgres_changes", {
            "event": "UPDATE", 
            "schema": "public",
            "table": table_name
        }, handle_update)
        
        channel.on("postgres_changes", {
            "event": "DELETE",
            "schema": "public", 
            "table": table_name
        }, handle_delete)
        
        channel.subscribe()
        self.channels[f"db-{table_name}"] = channel
        
    def setup_chat_room(self, room_id):
        """Set up chat room with messages and presence"""
        channel = self.client.channel(f"chat-{room_id}")
        
        # Message handling
        def handle_message(payload):
            msg = payload.get("message", {})
            print(f"[{msg.get('user')}]: {msg.get('text')}")
            
        channel.on("broadcast", {"event": "message"}, handle_message)
        
        # Presence handling
        def handle_user_join(payload):
            user = payload.get("presence", {})
            print(f"{user.get('username')} joined the chat")
            
        def handle_user_leave(payload):
            user = payload.get("presence", {})
            print(f"{user.get('username')} left the chat")
            
        channel.on("presence", {"event": "join"}, handle_user_join)
        channel.on("presence", {"event": "leave"}, handle_user_leave)
        
        channel.subscribe()
        self.channels[f"chat-{room_id}"] = channel
        
        return channel
        
    def send_chat_message(self, room_id, user, message):
        """Send message to chat room"""
        channel = self.channels.get(f"chat-{room_id}")
        if channel:
            channel.send("message", {
                "user": user,
                "text": message,
                "timestamp": datetime.now().isoformat()
            })
            
    def join_chat_room(self, room_id, user_info):
        """Join chat room with user presence"""
        channel = self.channels.get(f"chat-{room_id}")
        if channel:
            channel.track({
                "user_id": user_info["id"],
                "username": user_info["username"],
                "status": "online"
            })
            
    def cleanup(self):
        """Clean up all channels"""
        self.client.remove_all_channels()
        self.channels.clear()

# Usage
realtime = RealtimeManager(supabase)

# Set up database sync
realtime.setup_database_sync("messages")
realtime.setup_database_sync("users")

# Set up chat room
chat_channel = realtime.setup_chat_room("general")

# Join and send messages
realtime.join_chat_room("general", {"id": "user1", "username": "Alice"})
realtime.send_chat_message("general", "Alice", "Hello everyone!")

# Cleanup when done
realtime.cleanup()

Error Handling

Handle real-time connection and subscription errors.

# Realtime exceptions (from realtime library)
class AuthorizationError(Exception):
    """Authentication/authorization errors for realtime connections"""

class NotConnectedError(Exception):
    """Errors when trying to use disconnected realtime channels"""

Error Handling Examples:

from realtime import AuthorizationError, NotConnectedError

try:
    channel = supabase.channel("restricted-channel")
    channel.subscribe()
except AuthorizationError:
    print("Not authorized to access this channel")
except Exception as e:
    print(f"Failed to subscribe: {e}")

try:
    channel.send("message", {"text": "Hello"})
except NotConnectedError:
    print("Channel not connected, attempting to reconnect...")
    try:
        channel.subscribe()
        channel.send("message", {"text": "Hello"})
    except Exception as e:
        print(f"Reconnection failed: {e}")

# Robust error handling
def safe_channel_operation(channel, operation, *args, **kwargs):
    """Safely perform channel operations with error handling"""
    try:
        return operation(*args, **kwargs)
    except NotConnectedError:
        print("Reconnecting channel...")
        try:
            channel.subscribe()
            return operation(*args, **kwargs)
        except Exception as e:
            print(f"Reconnection failed: {e}")
            raise
    except AuthorizationError:
        print("Authorization error - check permissions")
        raise
    except Exception as e:
        print(f"Unexpected error: {e}")
        raise

# Usage
channel = supabase.channel("safe-channel")
safe_channel_operation(channel, channel.subscribe)
safe_channel_operation(channel, channel.send, "message", {"text": "Hello"})

Install with Tessl CLI

npx tessl i tessl/pypi-supabase

docs

authentication.md

client-management.md

database-operations.md

edge-functions.md

index.md

realtime-subscriptions.md

storage-operations.md

tile.json