0
# Streaming
1
2
Real-time message streaming and bidirectional communication with Gradio applications using WebSocket and Server-Sent Events protocols.
3
4
## Capabilities
5
6
### Message Streaming
7
8
Stream real-time messages and data from Gradio applications with support for various communication protocols.
9
10
```python { .api }
11
def stream_messages(
12
self,
13
*data,
14
api_name: str | None = None,
15
fn_index: int | None = None
16
) -> Iterator[Any]:
17
"""
18
Stream messages from a Gradio app in real-time.
19
20
Parameters:
21
- *data: Input data for the streaming endpoint
22
- api_name: Name of the streaming API endpoint
23
- fn_index: Index of the function if api_name not provided
24
25
Returns:
26
Iterator yielding messages as they arrive from the app
27
28
Raises:
29
- ConnectionError: If streaming connection fails
30
- AppError: If the Gradio app returns an error
31
"""
32
```
33
34
### Data Transmission
35
36
Send data to streaming endpoints with protocol-specific handling and header customization.
37
38
```python { .api }
39
def send_data(
40
self,
41
data: dict,
42
hash_data: dict,
43
protocol: str,
44
request_headers: dict
45
) -> Any:
46
"""
47
Send data to a streaming endpoint.
48
49
Parameters:
50
- data: Data payload to send
51
- hash_data: Hash information for data integrity
52
- protocol: Communication protocol ("ws", "sse", "sse_v1", "sse_v2", "sse_v2.1")
53
- request_headers: Additional headers for the request
54
55
Returns:
56
Response from the streaming endpoint
57
58
Raises:
59
- ConnectionError: If data transmission fails
60
- ValueError: If protocol is unsupported
61
"""
62
```
63
64
### Communication Protocols
65
66
Support for multiple streaming protocols with automatic protocol detection and fallback.
67
68
```python { .api }
69
# Supported protocols
70
Protocol = Literal["ws", "sse", "sse_v1", "sse_v2", "sse_v2.1"]
71
```
72
73
### Message Types
74
75
Data structures for streaming messages and communication events.
76
77
```python { .api }
78
class Message(TypedDict, total=False):
79
msg: str # Message content
80
output: dict[str, Any] # Output data
81
event_id: str # Unique event identifier
82
rank: int # Queue position
83
rank_eta: float # Estimated time to completion
84
queue_size: int # Current queue size
85
success: bool # Whether operation was successful
86
progress_data: list[dict] # Progress information
87
log: str # Log messages
88
level: str # Log level
89
```
90
91
### Status Updates
92
93
Real-time status updates for streaming operations and job progress.
94
95
```python { .api }
96
class StatusUpdate(dict):
97
"""
98
Status update dictionary containing job progress information.
99
100
Common fields:
101
- msg: Current status message
102
- progress_data: List of progress updates
103
- success: Boolean indicating completion status
104
- time: Timestamp information
105
- queue_size: Current queue size if applicable
106
"""
107
```
108
109
## Usage Examples
110
111
### Basic Message Streaming
112
113
```python
114
from gradio_client import Client
115
116
client = Client("abidlabs/streaming-chat")
117
118
# Stream messages from a chat endpoint
119
for message in client.stream_messages("Hello, how are you?", api_name="/chat"):
120
print(f"Received: {message}")
121
122
# Process each message as it arrives
123
if isinstance(message, dict) and message.get('msg'):
124
print(f"Chat response: {message['msg']}")
125
```
126
127
### Real-time Progress Monitoring
128
129
```python
130
from gradio_client import Client
131
132
client = Client("abidlabs/long-process")
133
134
# Stream progress updates
135
for update in client.stream_messages("large_dataset.csv", api_name="/process"):
136
if isinstance(update, dict):
137
# Handle progress updates
138
if 'progress_data' in update:
139
progress = update['progress_data']
140
if progress:
141
latest = progress[-1]
142
print(f"Progress: {latest.get('progress', 0):.1%}")
143
144
# Handle completion
145
if update.get('success') is not None:
146
if update['success']:
147
print("Processing completed successfully!")
148
result = update.get('output')
149
print(f"Final result: {result}")
150
else:
151
print("Processing failed!")
152
break
153
```
154
155
### WebSocket Streaming
156
157
```python
158
from gradio_client import Client
159
160
# Client will automatically use WebSocket if supported
161
client = Client("abidlabs/realtime-app")
162
163
# Stream real-time data
164
stream = client.stream_messages("sensor_data", api_name="/monitor")
165
166
try:
167
for data_point in stream:
168
if isinstance(data_point, dict):
169
# Process real-time sensor data
170
timestamp = data_point.get('timestamp')
171
value = data_point.get('value')
172
print(f"[{timestamp}] Sensor reading: {value}")
173
174
# Break on stop signal
175
if data_point.get('msg') == 'stop':
176
break
177
except KeyboardInterrupt:
178
print("Streaming stopped by user")
179
```
180
181
### Bidirectional Communication
182
183
```python
184
from gradio_client import Client
185
import threading
186
import time
187
188
client = Client("abidlabs/interactive-app")
189
190
# Send data in a separate thread
191
def send_data_continuously():
192
counter = 0
193
while True:
194
data = {"counter": counter, "timestamp": time.time()}
195
hash_data = {"counter": str(counter)}
196
197
response = client.send_data(
198
data=data,
199
hash_data=hash_data,
200
protocol="ws",
201
request_headers={"Content-Type": "application/json"}
202
)
203
204
print(f"Sent data {counter}, response: {response}")
205
counter += 1
206
time.sleep(1)
207
208
# Start sending data
209
sender_thread = threading.Thread(target=send_data_continuously, daemon=True)
210
sender_thread.start()
211
212
# Receive streaming responses
213
for response in client.stream_messages(api_name="/interactive"):
214
print(f"Received response: {response}")
215
216
# Handle specific response types
217
if isinstance(response, dict):
218
if response.get('msg') == 'shutdown':
219
print("Server requested shutdown")
220
break
221
```
222
223
### Queue Management
224
225
```python
226
from gradio_client import Client
227
228
client = Client("abidlabs/queue-based-app")
229
230
# Submit to queue and monitor position
231
for update in client.stream_messages("batch_job", api_name="/queue_process"):
232
if isinstance(update, dict):
233
# Monitor queue position
234
rank = update.get('rank')
235
eta = update.get('rank_eta')
236
queue_size = update.get('queue_size')
237
238
if rank is not None:
239
print(f"Queue position: {rank}/{queue_size}")
240
if eta:
241
print(f"Estimated wait time: {eta:.1f} seconds")
242
243
# Process results when ready
244
if update.get('success') is not None:
245
if update['success']:
246
result = update.get('output')
247
print(f"Job completed: {result}")
248
break
249
```
250
251
### Error Handling in Streams
252
253
```python
254
from gradio_client import Client
255
from gradio_client.exceptions import AppError
256
257
client = Client("abidlabs/error-prone-stream")
258
259
try:
260
for message in client.stream_messages("test_input", api_name="/stream"):
261
# Check for error messages
262
if isinstance(message, dict):
263
if not message.get('success', True):
264
error_msg = message.get('msg', 'Unknown error')
265
print(f"Stream error: {error_msg}")
266
break
267
268
# Process normal messages
269
if 'output' in message:
270
print(f"Output: {message['output']}")
271
272
except AppError as e:
273
print(f"Application error during streaming: {e}")
274
except ConnectionError as e:
275
print(f"Connection error: {e}")
276
```
277
278
### Protocol-Specific Streaming
279
280
```python
281
from gradio_client import Client
282
283
# Force specific protocol
284
client = Client("abidlabs/sse-app")
285
286
# Check client protocol
287
print(f"Using protocol: {client.protocol}")
288
289
# Send with specific protocol requirements
290
if client.protocol.startswith("sse"):
291
# SSE-specific handling
292
for event in client.stream_messages("data", api_name="/sse_endpoint"):
293
print(f"SSE Event: {event}")
294
elif client.protocol == "ws":
295
# WebSocket-specific handling
296
for message in client.stream_messages("data", api_name="/ws_endpoint"):
297
print(f"WS Message: {message}")
298
```
299
300
### Asynchronous Streaming
301
302
```python
303
from gradio_client import Client
304
import asyncio
305
306
async def async_stream_handler():
307
client = Client("abidlabs/async-stream")
308
309
# Submit async job
310
job = client.submit("async_data", api_name="/async_stream")
311
312
# Use async iteration if supported
313
if hasattr(job, '__aiter__'):
314
async for update in job:
315
print(f"Async update: {update}")
316
await asyncio.sleep(0.1) # Non-blocking wait
317
else:
318
# Fall back to regular iteration
319
for update in job:
320
print(f"Sync update: {update}")
321
322
return job.result()
323
324
# Run async streaming
325
result = asyncio.run(async_stream_handler())
326
print(f"Final result: {result}")
327
```