or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

client-management.mderror-handling.mdfile-handling.mdindex.mdpredictions-jobs.mdspace-management.mdstreaming.md
tile.json

streaming.mddocs/

Streaming

Real-time message streaming and bidirectional communication with Gradio applications using WebSocket and Server-Sent Events protocols.

Capabilities

Message Streaming

Stream real-time messages and data from Gradio applications with support for various communication protocols.

def stream_messages(
    self,
    *data,
    api_name: str | None = None,
    fn_index: int | None = None
) -> Iterator[Any]:
    """
    Stream messages from a Gradio app in real-time.

    Parameters:
    - *data: Input data for the streaming endpoint
    - api_name: Name of the streaming API endpoint
    - fn_index: Index of the function if api_name not provided

    Returns:
    Iterator yielding messages as they arrive from the app

    Raises:
    - ConnectionError: If streaming connection fails
    - AppError: If the Gradio app returns an error
    """

Data Transmission

Send data to streaming endpoints with protocol-specific handling and header customization.

def send_data(
    self,
    data: dict,
    hash_data: dict,
    protocol: str,
    request_headers: dict
) -> Any:
    """
    Send data to a streaming endpoint.

    Parameters:
    - data: Data payload to send
    - hash_data: Hash information for data integrity
    - protocol: Communication protocol ("ws", "sse", "sse_v1", "sse_v2", "sse_v2.1")
    - request_headers: Additional headers for the request

    Returns:
    Response from the streaming endpoint

    Raises:
    - ConnectionError: If data transmission fails
    - ValueError: If protocol is unsupported
    """

Communication Protocols

Support for multiple streaming protocols with automatic protocol detection and fallback.

# Supported protocols
Protocol = Literal["ws", "sse", "sse_v1", "sse_v2", "sse_v2.1"]

Message Types

Data structures for streaming messages and communication events.

class Message(TypedDict, total=False):
    msg: str                    # Message content
    output: dict[str, Any]      # Output data
    event_id: str              # Unique event identifier
    rank: int                  # Queue position
    rank_eta: float            # Estimated time to completion
    queue_size: int            # Current queue size
    success: bool              # Whether operation was successful
    progress_data: list[dict]  # Progress information
    log: str                   # Log messages
    level: str                 # Log level

Status Updates

Real-time status updates for streaming operations and job progress.

class StatusUpdate(dict):
    """
    Status update dictionary containing job progress information.
    
    Common fields:
    - msg: Current status message
    - progress_data: List of progress updates
    - success: Boolean indicating completion status
    - time: Timestamp information
    - queue_size: Current queue size if applicable
    """

Usage Examples

Basic Message Streaming

from gradio_client import Client

client = Client("abidlabs/streaming-chat")

# Stream messages from a chat endpoint
for message in client.stream_messages("Hello, how are you?", api_name="/chat"):
    print(f"Received: {message}")
    
    # Process each message as it arrives
    if isinstance(message, dict) and message.get('msg'):
        print(f"Chat response: {message['msg']}")

Real-time Progress Monitoring

from gradio_client import Client

client = Client("abidlabs/long-process")

# Stream progress updates
for update in client.stream_messages("large_dataset.csv", api_name="/process"):
    if isinstance(update, dict):
        # Handle progress updates
        if 'progress_data' in update:
            progress = update['progress_data']
            if progress:
                latest = progress[-1]
                print(f"Progress: {latest.get('progress', 0):.1%}")
        
        # Handle completion
        if update.get('success') is not None:
            if update['success']:
                print("Processing completed successfully!")
                result = update.get('output')
                print(f"Final result: {result}")
            else:
                print("Processing failed!")
            break

WebSocket Streaming

from gradio_client import Client

# Client will automatically use WebSocket if supported
client = Client("abidlabs/realtime-app")

# Stream real-time data
stream = client.stream_messages("sensor_data", api_name="/monitor")

try:
    for data_point in stream:
        if isinstance(data_point, dict):
            # Process real-time sensor data
            timestamp = data_point.get('timestamp')
            value = data_point.get('value')
            print(f"[{timestamp}] Sensor reading: {value}")
            
            # Break on stop signal
            if data_point.get('msg') == 'stop':
                break
except KeyboardInterrupt:
    print("Streaming stopped by user")

Bidirectional Communication

from gradio_client import Client
import threading
import time

client = Client("abidlabs/interactive-app")

# Send data in a separate thread
def send_data_continuously():
    counter = 0
    while True:
        data = {"counter": counter, "timestamp": time.time()}
        hash_data = {"counter": str(counter)}
        
        response = client.send_data(
            data=data,
            hash_data=hash_data,
            protocol="ws",
            request_headers={"Content-Type": "application/json"}
        )
        
        print(f"Sent data {counter}, response: {response}")
        counter += 1
        time.sleep(1)

# Start sending data
sender_thread = threading.Thread(target=send_data_continuously, daemon=True)
sender_thread.start()

# Receive streaming responses
for response in client.stream_messages(api_name="/interactive"):
    print(f"Received response: {response}")
    
    # Handle specific response types
    if isinstance(response, dict):
        if response.get('msg') == 'shutdown':
            print("Server requested shutdown")
            break

Queue Management

from gradio_client import Client

client = Client("abidlabs/queue-based-app")

# Submit to queue and monitor position
for update in client.stream_messages("batch_job", api_name="/queue_process"):
    if isinstance(update, dict):
        # Monitor queue position
        rank = update.get('rank')
        eta = update.get('rank_eta')
        queue_size = update.get('queue_size')
        
        if rank is not None:
            print(f"Queue position: {rank}/{queue_size}")
            if eta:
                print(f"Estimated wait time: {eta:.1f} seconds")
        
        # Process results when ready
        if update.get('success') is not None:
            if update['success']:
                result = update.get('output')
                print(f"Job completed: {result}")
            break

Error Handling in Streams

from gradio_client import Client
from gradio_client.exceptions import AppError

client = Client("abidlabs/error-prone-stream")

try:
    for message in client.stream_messages("test_input", api_name="/stream"):
        # Check for error messages
        if isinstance(message, dict):
            if not message.get('success', True):
                error_msg = message.get('msg', 'Unknown error')
                print(f"Stream error: {error_msg}")
                break
            
            # Process normal messages
            if 'output' in message:
                print(f"Output: {message['output']}")
                
except AppError as e:
    print(f"Application error during streaming: {e}")
except ConnectionError as e:
    print(f"Connection error: {e}")

Protocol-Specific Streaming

from gradio_client import Client

# Force specific protocol
client = Client("abidlabs/sse-app")

# Check client protocol
print(f"Using protocol: {client.protocol}")

# Send with specific protocol requirements
if client.protocol.startswith("sse"):
    # SSE-specific handling
    for event in client.stream_messages("data", api_name="/sse_endpoint"):
        print(f"SSE Event: {event}")
elif client.protocol == "ws":
    # WebSocket-specific handling
    for message in client.stream_messages("data", api_name="/ws_endpoint"):
        print(f"WS Message: {message}")

Asynchronous Streaming

from gradio_client import Client
import asyncio

async def async_stream_handler():
    client = Client("abidlabs/async-stream")
    
    # Submit async job
    job = client.submit("async_data", api_name="/async_stream")
    
    # Use async iteration if supported
    if hasattr(job, '__aiter__'):
        async for update in job:
            print(f"Async update: {update}")
            await asyncio.sleep(0.1)  # Non-blocking wait
    else:
        # Fall back to regular iteration
        for update in job:
            print(f"Sync update: {update}")
    
    return job.result()

# Run async streaming
result = asyncio.run(async_stream_handler())
print(f"Final result: {result}")