or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-management.mderror-handling.mdfile-handling.mdindex.mdpredictions-jobs.mdspace-management.mdstreaming.md

streaming.mddocs/

0

# Streaming

1

2

Real-time message streaming and bidirectional communication with Gradio applications using WebSocket and Server-Sent Events protocols.

3

4

## Capabilities

5

6

### Message Streaming

7

8

Stream real-time messages and data from Gradio applications with support for various communication protocols.

9

10

```python { .api }

11

def stream_messages(

12

self,

13

*data,

14

api_name: str | None = None,

15

fn_index: int | None = None

16

) -> Iterator[Any]:

17

"""

18

Stream messages from a Gradio app in real-time.

19

20

Parameters:

21

- *data: Input data for the streaming endpoint

22

- api_name: Name of the streaming API endpoint

23

- fn_index: Index of the function if api_name not provided

24

25

Returns:

26

Iterator yielding messages as they arrive from the app

27

28

Raises:

29

- ConnectionError: If streaming connection fails

30

- AppError: If the Gradio app returns an error

31

"""

32

```

33

34

### Data Transmission

35

36

Send data to streaming endpoints with protocol-specific handling and header customization.

37

38

```python { .api }

39

def send_data(

40

self,

41

data: dict,

42

hash_data: dict,

43

protocol: str,

44

request_headers: dict

45

) -> Any:

46

"""

47

Send data to a streaming endpoint.

48

49

Parameters:

50

- data: Data payload to send

51

- hash_data: Hash information for data integrity

52

- protocol: Communication protocol ("ws", "sse", "sse_v1", "sse_v2", "sse_v2.1")

53

- request_headers: Additional headers for the request

54

55

Returns:

56

Response from the streaming endpoint

57

58

Raises:

59

- ConnectionError: If data transmission fails

60

- ValueError: If protocol is unsupported

61

"""

62

```

63

64

### Communication Protocols

65

66

Support for multiple streaming protocols with automatic protocol detection and fallback.

67

68

```python { .api }

69

# Supported protocols

70

Protocol = Literal["ws", "sse", "sse_v1", "sse_v2", "sse_v2.1"]

71

```

72

73

### Message Types

74

75

Data structures for streaming messages and communication events.

76

77

```python { .api }

78

class Message(TypedDict, total=False):

79

msg: str # Message content

80

output: dict[str, Any] # Output data

81

event_id: str # Unique event identifier

82

rank: int # Queue position

83

rank_eta: float # Estimated time to completion

84

queue_size: int # Current queue size

85

success: bool # Whether operation was successful

86

progress_data: list[dict] # Progress information

87

log: str # Log messages

88

level: str # Log level

89

```

90

91

### Status Updates

92

93

Real-time status updates for streaming operations and job progress.

94

95

```python { .api }

96

class StatusUpdate(dict):

97

"""

98

Status update dictionary containing job progress information.

99

100

Common fields:

101

- msg: Current status message

102

- progress_data: List of progress updates

103

- success: Boolean indicating completion status

104

- time: Timestamp information

105

- queue_size: Current queue size if applicable

106

"""

107

```

108

109

## Usage Examples

110

111

### Basic Message Streaming

112

113

```python

114

from gradio_client import Client

115

116

client = Client("abidlabs/streaming-chat")

117

118

# Stream messages from a chat endpoint

119

for message in client.stream_messages("Hello, how are you?", api_name="/chat"):

120

print(f"Received: {message}")

121

122

# Process each message as it arrives

123

if isinstance(message, dict) and message.get('msg'):

124

print(f"Chat response: {message['msg']}")

125

```

126

127

### Real-time Progress Monitoring

128

129

```python

130

from gradio_client import Client

131

132

client = Client("abidlabs/long-process")

133

134

# Stream progress updates

135

for update in client.stream_messages("large_dataset.csv", api_name="/process"):

136

if isinstance(update, dict):

137

# Handle progress updates

138

if 'progress_data' in update:

139

progress = update['progress_data']

140

if progress:

141

latest = progress[-1]

142

print(f"Progress: {latest.get('progress', 0):.1%}")

143

144

# Handle completion

145

if update.get('success') is not None:

146

if update['success']:

147

print("Processing completed successfully!")

148

result = update.get('output')

149

print(f"Final result: {result}")

150

else:

151

print("Processing failed!")

152

break

153

```

154

155

### WebSocket Streaming

156

157

```python

158

from gradio_client import Client

159

160

# Client will automatically use WebSocket if supported

161

client = Client("abidlabs/realtime-app")

162

163

# Stream real-time data

164

stream = client.stream_messages("sensor_data", api_name="/monitor")

165

166

try:

167

for data_point in stream:

168

if isinstance(data_point, dict):

169

# Process real-time sensor data

170

timestamp = data_point.get('timestamp')

171

value = data_point.get('value')

172

print(f"[{timestamp}] Sensor reading: {value}")

173

174

# Break on stop signal

175

if data_point.get('msg') == 'stop':

176

break

177

except KeyboardInterrupt:

178

print("Streaming stopped by user")

179

```

180

181

### Bidirectional Communication

182

183

```python

184

from gradio_client import Client

185

import threading

186

import time

187

188

client = Client("abidlabs/interactive-app")

189

190

# Send data in a separate thread

191

def send_data_continuously():

192

counter = 0

193

while True:

194

data = {"counter": counter, "timestamp": time.time()}

195

hash_data = {"counter": str(counter)}

196

197

response = client.send_data(

198

data=data,

199

hash_data=hash_data,

200

protocol="ws",

201

request_headers={"Content-Type": "application/json"}

202

)

203

204

print(f"Sent data {counter}, response: {response}")

205

counter += 1

206

time.sleep(1)

207

208

# Start sending data

209

sender_thread = threading.Thread(target=send_data_continuously, daemon=True)

210

sender_thread.start()

211

212

# Receive streaming responses

213

for response in client.stream_messages(api_name="/interactive"):

214

print(f"Received response: {response}")

215

216

# Handle specific response types

217

if isinstance(response, dict):

218

if response.get('msg') == 'shutdown':

219

print("Server requested shutdown")

220

break

221

```

222

223

### Queue Management

224

225

```python

226

from gradio_client import Client

227

228

client = Client("abidlabs/queue-based-app")

229

230

# Submit to queue and monitor position

231

for update in client.stream_messages("batch_job", api_name="/queue_process"):

232

if isinstance(update, dict):

233

# Monitor queue position

234

rank = update.get('rank')

235

eta = update.get('rank_eta')

236

queue_size = update.get('queue_size')

237

238

if rank is not None:

239

print(f"Queue position: {rank}/{queue_size}")

240

if eta:

241

print(f"Estimated wait time: {eta:.1f} seconds")

242

243

# Process results when ready

244

if update.get('success') is not None:

245

if update['success']:

246

result = update.get('output')

247

print(f"Job completed: {result}")

248

break

249

```

250

251

### Error Handling in Streams

252

253

```python

254

from gradio_client import Client

255

from gradio_client.exceptions import AppError

256

257

client = Client("abidlabs/error-prone-stream")

258

259

try:

260

for message in client.stream_messages("test_input", api_name="/stream"):

261

# Check for error messages

262

if isinstance(message, dict):

263

if not message.get('success', True):

264

error_msg = message.get('msg', 'Unknown error')

265

print(f"Stream error: {error_msg}")

266

break

267

268

# Process normal messages

269

if 'output' in message:

270

print(f"Output: {message['output']}")

271

272

except AppError as e:

273

print(f"Application error during streaming: {e}")

274

except ConnectionError as e:

275

print(f"Connection error: {e}")

276

```

277

278

### Protocol-Specific Streaming

279

280

```python

281

from gradio_client import Client

282

283

# Force specific protocol

284

client = Client("abidlabs/sse-app")

285

286

# Check client protocol

287

print(f"Using protocol: {client.protocol}")

288

289

# Send with specific protocol requirements

290

if client.protocol.startswith("sse"):

291

# SSE-specific handling

292

for event in client.stream_messages("data", api_name="/sse_endpoint"):

293

print(f"SSE Event: {event}")

294

elif client.protocol == "ws":

295

# WebSocket-specific handling

296

for message in client.stream_messages("data", api_name="/ws_endpoint"):

297

print(f"WS Message: {message}")

298

```

299

300

### Asynchronous Streaming

301

302

```python

303

from gradio_client import Client

304

import asyncio

305

306

async def async_stream_handler():

307

client = Client("abidlabs/async-stream")

308

309

# Submit async job

310

job = client.submit("async_data", api_name="/async_stream")

311

312

# Use async iteration if supported

313

if hasattr(job, '__aiter__'):

314

async for update in job:

315

print(f"Async update: {update}")

316

await asyncio.sleep(0.1) # Non-blocking wait

317

else:

318

# Fall back to regular iteration

319

for update in job:

320

print(f"Sync update: {update}")

321

322

return job.result()

323

324

# Run async streaming

325

result = asyncio.run(async_stream_handler())

326

print(f"Final result: {result}")

327

```