WebAssembly-based server-sent events implementation for real-time communication between the browser and Python code running in the worker environment.
EventSource-like implementation that provides server-sent events through the WebAssembly worker.
/**
* EventSource-like implementation for server-sent events through WASM worker
* Provides real-time communication between browser and Python code
*/
class WasmWorkerEventSource {
/**
* Create a new WasmWorkerEventSource instance
* @param workerProxy - WorkerProxy instance to communicate through
* @param url - URL for the event stream endpoint
*/
constructor(workerProxy: WorkerProxy, url: URL);
/** Connection state: 0=connecting, 1=open, 2=closed */
readonly readyState: number;
/** Event stream URL */
readonly url: URL;
/** Event handler for connection open events */
onopen: ((ev: Event) => any) | undefined;
/** Event handler for incoming messages */
onmessage: ((ev: MessageEvent) => any) | undefined;
/** Event handler for error events */
onerror: ((ev: Event) => any) | undefined;
/**
* Close the event stream connection
*/
close(): void;
}Usage Examples:
import { WorkerProxy, WasmWorkerEventSource } from "@gradio/wasm";
const worker = new WorkerProxy(options);
// Wait for worker initialization
worker.addEventListener("initialization-completed", () => {
// Create event source for real-time updates
const eventSource = new WasmWorkerEventSource(
worker,
new URL("/api/stream", "http://localhost:8000")
);
// Handle connection events
eventSource.onopen = (event) => {
console.log("Event stream connected");
};
eventSource.onerror = (event) => {
console.error("Event stream error:", event);
};
// Handle incoming messages
eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
console.log("Received data:", data);
// Handle different message types
switch (data.type) {
case "progress":
updateProgressBar(data.progress);
break;
case "result":
displayResult(data.result);
break;
case "error":
showError(data.error);
break;
}
} catch (error) {
console.error("Failed to parse message:", error);
}
};
// Close connection when done
setTimeout(() => {
eventSource.close();
console.log("Event stream closed");
}, 30000);
});Example of using event streaming for real-time data processing and visualization:
import { WorkerProxy, WasmWorkerEventSource } from "@gradio/wasm";
class RealTimeDataProcessor {
private worker: WorkerProxy;
private eventSource: WasmWorkerEventSource | null = null;
constructor(workerOptions: WorkerProxyOptions) {
this.worker = new WorkerProxy(workerOptions);
this.setupWorker();
}
private setupWorker() {
this.worker.addEventListener("initialization-completed", () => {
console.log("Worker ready for real-time processing");
this.startStreaming();
});
}
private startStreaming() {
// Start Python data processing server
this.worker.runPythonCode(`
import gradio as gr
import asyncio
import json
import time
# Set up a streaming endpoint
def stream_data():
for i in range(100):
data = {
"type": "data",
"timestamp": time.time(),
"value": i * 2,
"progress": i / 100
}
yield f"data: {json.dumps(data)}\\n\\n"
time.sleep(0.1)
# Create Gradio interface with streaming
with gr.Blocks() as demo:
pass # Setup interface
demo.launch(server_name="0.0.0.0", server_port=8000)
`);
// Connect to the stream
this.eventSource = new WasmWorkerEventSource(
this.worker,
new URL("/api/stream", "http://localhost:8000")
);
this.eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
this.processData(data);
};
this.eventSource.onerror = (event) => {
console.error("Streaming error:", event);
this.reconnect();
};
}
private processData(data: any) {
switch (data.type) {
case "data":
this.updateVisualization(data.value, data.timestamp);
this.updateProgress(data.progress);
break;
case "complete":
console.log("Processing complete");
break;
}
}
private updateVisualization(value: number, timestamp: number) {
// Update charts, graphs, or other UI elements
console.log(`Data point: ${value} at ${new Date(timestamp * 1000)}`);
}
private updateProgress(progress: number) {
console.log(`Progress: ${(progress * 100).toFixed(1)}%`);
}
private reconnect() {
if (this.eventSource) {
this.eventSource.close();
}
// Retry connection after delay
setTimeout(() => {
this.startStreaming();
}, 5000);
}
public stop() {
if (this.eventSource) {
this.eventSource.close();
}
this.worker.terminate();
}
}
// Usage
const processor = new RealTimeDataProcessor(workerOptions);
// Clean up when done
window.addEventListener("beforeunload", () => {
processor.stop();
});The WasmWorkerEventSource follows the same state model as the native EventSource:
// ReadyState constants
const CONNECTING = 0; // Connection is being established
const OPEN = 1; // Connection is open and receiving events
const CLOSED = 2; // Connection is closed
// Example state handling
const eventSource = new WasmWorkerEventSource(worker, url);
function checkConnectionState() {
switch (eventSource.readyState) {
case 0: // CONNECTING
console.log("Connecting to event stream...");
break;
case 1: // OPEN
console.log("Event stream is open and receiving data");
break;
case 2: // CLOSED
console.log("Event stream is closed");
break;
}
}
// Monitor state changes
eventSource.onopen = () => {
console.log("State changed to OPEN");
checkConnectionState();
};
eventSource.onerror = () => {
console.log("State changed due to error");
checkConnectionState();
};Event streaming is particularly useful for Gradio applications that need real-time updates:
import { WorkerProxy, WasmWorkerEventSource } from "@gradio/wasm";
// Set up worker with Gradio application
const worker = new WorkerProxy({
gradioWheelUrl: "https://example.com/gradio.whl",
gradioClientWheelUrl: "https://example.com/gradio_client.whl",
files: {},
requirements: ["gradio", "numpy", "matplotlib"],
sharedWorkerMode: false
});
worker.addEventListener("initialization-completed", async () => {
// Create Gradio app with streaming capabilities
await worker.runPythonCode(`
import gradio as gr
import numpy as np
import matplotlib.pyplot as plt
import json
import time
def generate_realtime_plot():
"""Generate real-time plotting data"""
x = np.linspace(0, 10, 100)
for phase in np.linspace(0, 4*np.pi, 50):
y = np.sin(x + phase)
# Create plot
plt.figure(figsize=(10, 6))
plt.plot(x, y)
plt.title(f"Real-time Sine Wave (phase: {phase:.2f})")
plt.xlabel("X")
plt.ylabel("Y")
# Convert to base64 for streaming
plt.savefig("temp_plot.png")
plt.close()
# Stream the update
yield {
"type": "plot_update",
"phase": phase,
"timestamp": time.time()
}
time.sleep(0.1)
# Set up Gradio interface with streaming endpoint
with gr.Blocks() as demo:
plot_output = gr.Image(label="Real-time Plot")
def start_streaming():
return generate_realtime_plot()
stream_btn = gr.Button("Start Streaming")
stream_btn.click(start_streaming, outputs=plot_output)
demo.launch(server_name="0.0.0.0", server_port=7860)
`);
// Connect to the Gradio streaming endpoint
const eventSource = new WasmWorkerEventSource(
worker,
new URL("/api/stream", "http://localhost:7860")
);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === "plot_update") {
console.log(`Plot updated: phase=${data.phase}, time=${data.timestamp}`);
// Update UI with new plot data
}
};
});Robust error handling for event streaming:
class RobustEventStream {
private eventSource: WasmWorkerEventSource | null = null;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
private reconnectDelay = 1000;
constructor(
private worker: WorkerProxy,
private url: URL,
private onMessage: (event: MessageEvent) => void
) {
this.connect();
}
private connect() {
this.eventSource = new WasmWorkerEventSource(this.worker, this.url);
this.eventSource.onopen = () => {
console.log("Event stream connected");
this.reconnectAttempts = 0; // Reset on successful connection
};
this.eventSource.onmessage = this.onMessage;
this.eventSource.onerror = (event) => {
console.error("Event stream error:", event);
this.handleReconnection();
};
}
private handleReconnection() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error("Max reconnection attempts reached");
return;
}
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
console.log(`Attempting reconnection ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`);
setTimeout(() => {
if (this.eventSource) {
this.eventSource.close();
}
this.connect();
}, delay);
}
public close() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
}
}