Real-time message streaming and bidirectional communication with Gradio applications using WebSocket and Server-Sent Events protocols.
Stream real-time messages and data from Gradio applications with support for various communication protocols.
def stream_messages(
self,
*data,
api_name: str | None = None,
fn_index: int | None = None
) -> Iterator[Any]:
"""
Stream messages from a Gradio app in real-time.
Parameters:
- *data: Input data for the streaming endpoint
- api_name: Name of the streaming API endpoint
- fn_index: Index of the function if api_name not provided
Returns:
Iterator yielding messages as they arrive from the app
Raises:
- ConnectionError: If streaming connection fails
- AppError: If the Gradio app returns an error
"""Send data to streaming endpoints with protocol-specific handling and header customization.
def send_data(
self,
data: dict,
hash_data: dict,
protocol: str,
request_headers: dict
) -> Any:
"""
Send data to a streaming endpoint.
Parameters:
- data: Data payload to send
- hash_data: Hash information for data integrity
- protocol: Communication protocol ("ws", "sse", "sse_v1", "sse_v2", "sse_v2.1")
- request_headers: Additional headers for the request
Returns:
Response from the streaming endpoint
Raises:
- ConnectionError: If data transmission fails
- ValueError: If protocol is unsupported
"""Support for multiple streaming protocols with automatic protocol detection and fallback.
# Supported protocols
Protocol = Literal["ws", "sse", "sse_v1", "sse_v2", "sse_v2.1"]Data structures for streaming messages and communication events.
class Message(TypedDict, total=False):
msg: str # Message content
output: dict[str, Any] # Output data
event_id: str # Unique event identifier
rank: int # Queue position
rank_eta: float # Estimated time to completion
queue_size: int # Current queue size
success: bool # Whether operation was successful
progress_data: list[dict] # Progress information
log: str # Log messages
level: str # Log levelReal-time status updates for streaming operations and job progress.
class StatusUpdate(dict):
"""
Status update dictionary containing job progress information.
Common fields:
- msg: Current status message
- progress_data: List of progress updates
- success: Boolean indicating completion status
- time: Timestamp information
- queue_size: Current queue size if applicable
"""from gradio_client import Client
client = Client("abidlabs/streaming-chat")
# Stream messages from a chat endpoint
for message in client.stream_messages("Hello, how are you?", api_name="/chat"):
print(f"Received: {message}")
# Process each message as it arrives
if isinstance(message, dict) and message.get('msg'):
print(f"Chat response: {message['msg']}")from gradio_client import Client
client = Client("abidlabs/long-process")
# Stream progress updates
for update in client.stream_messages("large_dataset.csv", api_name="/process"):
if isinstance(update, dict):
# Handle progress updates
if 'progress_data' in update:
progress = update['progress_data']
if progress:
latest = progress[-1]
print(f"Progress: {latest.get('progress', 0):.1%}")
# Handle completion
if update.get('success') is not None:
if update['success']:
print("Processing completed successfully!")
result = update.get('output')
print(f"Final result: {result}")
else:
print("Processing failed!")
breakfrom gradio_client import Client
# Client will automatically use WebSocket if supported
client = Client("abidlabs/realtime-app")
# Stream real-time data
stream = client.stream_messages("sensor_data", api_name="/monitor")
try:
for data_point in stream:
if isinstance(data_point, dict):
# Process real-time sensor data
timestamp = data_point.get('timestamp')
value = data_point.get('value')
print(f"[{timestamp}] Sensor reading: {value}")
# Break on stop signal
if data_point.get('msg') == 'stop':
break
except KeyboardInterrupt:
print("Streaming stopped by user")from gradio_client import Client
import threading
import time
client = Client("abidlabs/interactive-app")
# Send data in a separate thread
def send_data_continuously():
counter = 0
while True:
data = {"counter": counter, "timestamp": time.time()}
hash_data = {"counter": str(counter)}
response = client.send_data(
data=data,
hash_data=hash_data,
protocol="ws",
request_headers={"Content-Type": "application/json"}
)
print(f"Sent data {counter}, response: {response}")
counter += 1
time.sleep(1)
# Start sending data
sender_thread = threading.Thread(target=send_data_continuously, daemon=True)
sender_thread.start()
# Receive streaming responses
for response in client.stream_messages(api_name="/interactive"):
print(f"Received response: {response}")
# Handle specific response types
if isinstance(response, dict):
if response.get('msg') == 'shutdown':
print("Server requested shutdown")
breakfrom gradio_client import Client
client = Client("abidlabs/queue-based-app")
# Submit to queue and monitor position
for update in client.stream_messages("batch_job", api_name="/queue_process"):
if isinstance(update, dict):
# Monitor queue position
rank = update.get('rank')
eta = update.get('rank_eta')
queue_size = update.get('queue_size')
if rank is not None:
print(f"Queue position: {rank}/{queue_size}")
if eta:
print(f"Estimated wait time: {eta:.1f} seconds")
# Process results when ready
if update.get('success') is not None:
if update['success']:
result = update.get('output')
print(f"Job completed: {result}")
breakfrom gradio_client import Client
from gradio_client.exceptions import AppError
client = Client("abidlabs/error-prone-stream")
try:
for message in client.stream_messages("test_input", api_name="/stream"):
# Check for error messages
if isinstance(message, dict):
if not message.get('success', True):
error_msg = message.get('msg', 'Unknown error')
print(f"Stream error: {error_msg}")
break
# Process normal messages
if 'output' in message:
print(f"Output: {message['output']}")
except AppError as e:
print(f"Application error during streaming: {e}")
except ConnectionError as e:
print(f"Connection error: {e}")from gradio_client import Client
# Force specific protocol
client = Client("abidlabs/sse-app")
# Check client protocol
print(f"Using protocol: {client.protocol}")
# Send with specific protocol requirements
if client.protocol.startswith("sse"):
# SSE-specific handling
for event in client.stream_messages("data", api_name="/sse_endpoint"):
print(f"SSE Event: {event}")
elif client.protocol == "ws":
# WebSocket-specific handling
for message in client.stream_messages("data", api_name="/ws_endpoint"):
print(f"WS Message: {message}")from gradio_client import Client
import asyncio
async def async_stream_handler():
client = Client("abidlabs/async-stream")
# Submit async job
job = client.submit("async_data", api_name="/async_stream")
# Use async iteration if supported
if hasattr(job, '__aiter__'):
async for update in job:
print(f"Async update: {update}")
await asyncio.sleep(0.1) # Non-blocking wait
else:
# Fall back to regular iteration
for update in job:
print(f"Sync update: {update}")
return job.result()
# Run async streaming
result = asyncio.run(async_stream_handler())
print(f"Final result: {result}")