CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-lona

Write responsive web apps in full python

Pending
Overview
Eval results
Files

channel-communication.mddocs/

Channel Communication

Pub/sub messaging system for communication between views, background tasks, and external services. Supports topic-based messaging with message expiry and local/broadcast modes.

Capabilities

Channel Class

Core messaging channel for pub/sub communication.

class Channel:
    def __init__(self, topic: str):
        """
        Create a messaging channel.
        
        Args:
            topic (str): Channel topic name
        """

    def subscribe(self, handler):
        """
        Subscribe to channel messages.
        
        Args:
            handler: Function to handle incoming messages
        """

    def unsubscribe(self):
        """Unsubscribe from channel messages."""

    def send(self, message_data=None, expiry=None, local: bool = False):
        """
        Send message to channel.
        
        Args:
            message_data: Data to send
            expiry: Message expiry time
            local (bool): Whether message is local only
        """

Message Class

Represents a channel message.

class Message:
    topic: str          # Channel topic
    data: object        # Message data
    timestamp: float    # Message timestamp
    expiry: float       # Message expiry time
    local: bool         # Local message flag

Usage Example

from lona import App, View, Channel
from lona.html import HTML, H1, Button, P

app = App(__file__)

# Create a global channel
notifications = Channel('notifications')

@app.route('/sender')
class SenderView(View):
    def handle_request(self, request):
        send_button = Button('Send Notification')
        html = HTML(H1('Sender'), send_button)
        self.show(html)
        
        while True:
            self.await_click(send_button)
            notifications.send({
                'message': 'Hello from sender!',
                'timestamp': time.time()
            })

@app.route('/receiver')
class ReceiverView(View):
    def handle_request(self, request):
        message_display = P('Waiting for messages...')
        html = HTML(H1('Receiver'), message_display)
        
        def handle_notification(message):
            message_display.set_text(f"Received: {message.data['message']}")
        
        notifications.subscribe(handle_notification)
        self.show(html)

Types

from typing import Union, Optional, Dict, Any, Callable

MessageData = Union[Dict[str, Any], str, int, float, bool, None]
MessageHandler = Callable[['Message'], None]
TopicName = str

Install with Tessl CLI

npx tessl i tessl/pypi-lona

docs

channel-communication.md

core-framework.md

events-interactivity.md

exception-handling.md

file-management.md

html-system.md

index.md

response-system.md

routing-urls.md

static-files-assets.md

tile.json