or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

code-completion.mdevent-streaming.mdfile-system.mdhttp-client.mdindex.mdnetwork-utilities.mdpackage-management.mdsvelte-integration.mdworker-management.md
tile.json

event-streaming.mddocs/

Event Streaming

WebAssembly-based server-sent events implementation for real-time communication between the browser and Python code running in the worker environment.

Capabilities

WasmWorkerEventSource Class

EventSource-like implementation that provides server-sent events through the WebAssembly worker.

/**
 * EventSource-like implementation for server-sent events through WASM worker
 * Provides real-time communication between browser and Python code
 */
class WasmWorkerEventSource {
  /**
   * Create a new WasmWorkerEventSource instance
   * @param workerProxy - WorkerProxy instance to communicate through
   * @param url - URL for the event stream endpoint
   */
  constructor(workerProxy: WorkerProxy, url: URL);

  /** Connection state: 0=connecting, 1=open, 2=closed */
  readonly readyState: number;
  
  /** Event stream URL */
  readonly url: URL;

  /** Event handler for connection open events */
  onopen: ((ev: Event) => any) | undefined;
  
  /** Event handler for incoming messages */
  onmessage: ((ev: MessageEvent) => any) | undefined;
  
  /** Event handler for error events */
  onerror: ((ev: Event) => any) | undefined;

  /**
   * Close the event stream connection
   */
  close(): void;
}

Usage Examples:

import { WorkerProxy, WasmWorkerEventSource } from "@gradio/wasm";

const worker = new WorkerProxy(options);

// Wait for worker initialization
worker.addEventListener("initialization-completed", () => {
  // Create event source for real-time updates
  const eventSource = new WasmWorkerEventSource(
    worker,
    new URL("/api/stream", "http://localhost:8000")
  );

  // Handle connection events
  eventSource.onopen = (event) => {
    console.log("Event stream connected");
  };

  eventSource.onerror = (event) => {
    console.error("Event stream error:", event);
  };

  // Handle incoming messages
  eventSource.onmessage = (event) => {
    try {
      const data = JSON.parse(event.data);
      console.log("Received data:", data);
      
      // Handle different message types
      switch (data.type) {
        case "progress":
          updateProgressBar(data.progress);
          break;
        case "result":
          displayResult(data.result);
          break;
        case "error":
          showError(data.error);
          break;
      }
    } catch (error) {
      console.error("Failed to parse message:", error);
    }
  };

  // Close connection when done
  setTimeout(() => {
    eventSource.close();
    console.log("Event stream closed");
  }, 30000);
});

Real-time Data Processing

Example of using event streaming for real-time data processing and visualization:

import { WorkerProxy, WasmWorkerEventSource } from "@gradio/wasm";

class RealTimeDataProcessor {
  private worker: WorkerProxy;
  private eventSource: WasmWorkerEventSource | null = null;

  constructor(workerOptions: WorkerProxyOptions) {
    this.worker = new WorkerProxy(workerOptions);
    this.setupWorker();
  }

  private setupWorker() {
    this.worker.addEventListener("initialization-completed", () => {
      console.log("Worker ready for real-time processing");
      this.startStreaming();
    });
  }

  private startStreaming() {
    // Start Python data processing server
    this.worker.runPythonCode(`
import gradio as gr
import asyncio
import json
import time

# Set up a streaming endpoint
def stream_data():
    for i in range(100):
        data = {
            "type": "data",
            "timestamp": time.time(),
            "value": i * 2,
            "progress": i / 100
        }
        yield f"data: {json.dumps(data)}\\n\\n"
        time.sleep(0.1)

# Create Gradio interface with streaming
with gr.Blocks() as demo:
    pass  # Setup interface

demo.launch(server_name="0.0.0.0", server_port=8000)
`);

    // Connect to the stream
    this.eventSource = new WasmWorkerEventSource(
      this.worker,
      new URL("/api/stream", "http://localhost:8000")
    );

    this.eventSource.onmessage = (event) => {
      const data = JSON.parse(event.data);
      this.processData(data);
    };

    this.eventSource.onerror = (event) => {
      console.error("Streaming error:", event);
      this.reconnect();
    };
  }

  private processData(data: any) {
    switch (data.type) {
      case "data":
        this.updateVisualization(data.value, data.timestamp);
        this.updateProgress(data.progress);
        break;
      case "complete":
        console.log("Processing complete");
        break;
    }
  }

  private updateVisualization(value: number, timestamp: number) {
    // Update charts, graphs, or other UI elements
    console.log(`Data point: ${value} at ${new Date(timestamp * 1000)}`);
  }

  private updateProgress(progress: number) {
    console.log(`Progress: ${(progress * 100).toFixed(1)}%`);
  }

  private reconnect() {
    if (this.eventSource) {
      this.eventSource.close();
    }
    
    // Retry connection after delay
    setTimeout(() => {
      this.startStreaming();
    }, 5000);
  }

  public stop() {
    if (this.eventSource) {
      this.eventSource.close();
    }
    this.worker.terminate();
  }
}

// Usage
const processor = new RealTimeDataProcessor(workerOptions);

// Clean up when done
window.addEventListener("beforeunload", () => {
  processor.stop();
});

Event Stream States

The WasmWorkerEventSource follows the same state model as the native EventSource:

// ReadyState constants
const CONNECTING = 0; // Connection is being established
const OPEN = 1;       // Connection is open and receiving events
const CLOSED = 2;     // Connection is closed

// Example state handling
const eventSource = new WasmWorkerEventSource(worker, url);

function checkConnectionState() {
  switch (eventSource.readyState) {
    case 0: // CONNECTING
      console.log("Connecting to event stream...");
      break;
    case 1: // OPEN
      console.log("Event stream is open and receiving data");
      break;
    case 2: // CLOSED
      console.log("Event stream is closed");
      break;
  }
}

// Monitor state changes
eventSource.onopen = () => {
  console.log("State changed to OPEN");
  checkConnectionState();
};

eventSource.onerror = () => {
  console.log("State changed due to error");
  checkConnectionState();
};

Integration with Gradio Applications

Event streaming is particularly useful for Gradio applications that need real-time updates:

import { WorkerProxy, WasmWorkerEventSource } from "@gradio/wasm";

// Set up worker with Gradio application
const worker = new WorkerProxy({
  gradioWheelUrl: "https://example.com/gradio.whl",
  gradioClientWheelUrl: "https://example.com/gradio_client.whl",
  files: {},
  requirements: ["gradio", "numpy", "matplotlib"],
  sharedWorkerMode: false
});

worker.addEventListener("initialization-completed", async () => {
  // Create Gradio app with streaming capabilities
  await worker.runPythonCode(`
import gradio as gr
import numpy as np
import matplotlib.pyplot as plt
import json
import time

def generate_realtime_plot():
    """Generate real-time plotting data"""
    x = np.linspace(0, 10, 100)
    
    for phase in np.linspace(0, 4*np.pi, 50):
        y = np.sin(x + phase)
        
        # Create plot
        plt.figure(figsize=(10, 6))
        plt.plot(x, y)
        plt.title(f"Real-time Sine Wave (phase: {phase:.2f})")
        plt.xlabel("X")
        plt.ylabel("Y")
        
        # Convert to base64 for streaming
        plt.savefig("temp_plot.png")
        plt.close()
        
        # Stream the update
        yield {
            "type": "plot_update",
            "phase": phase,
            "timestamp": time.time()
        }
        
        time.sleep(0.1)

# Set up Gradio interface with streaming endpoint
with gr.Blocks() as demo:
    plot_output = gr.Image(label="Real-time Plot")
    
    def start_streaming():
        return generate_realtime_plot()
    
    stream_btn = gr.Button("Start Streaming")
    stream_btn.click(start_streaming, outputs=plot_output)

demo.launch(server_name="0.0.0.0", server_port=7860)
`);

  // Connect to the Gradio streaming endpoint
  const eventSource = new WasmWorkerEventSource(
    worker,
    new URL("/api/stream", "http://localhost:7860")
  );

  eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    
    if (data.type === "plot_update") {
      console.log(`Plot updated: phase=${data.phase}, time=${data.timestamp}`);
      // Update UI with new plot data
    }
  };
});

Error Handling and Reconnection

Robust error handling for event streaming:

class RobustEventStream {
  private eventSource: WasmWorkerEventSource | null = null;
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 5;
  private reconnectDelay = 1000;

  constructor(
    private worker: WorkerProxy,
    private url: URL,
    private onMessage: (event: MessageEvent) => void
  ) {
    this.connect();
  }

  private connect() {
    this.eventSource = new WasmWorkerEventSource(this.worker, this.url);

    this.eventSource.onopen = () => {
      console.log("Event stream connected");
      this.reconnectAttempts = 0; // Reset on successful connection
    };

    this.eventSource.onmessage = this.onMessage;

    this.eventSource.onerror = (event) => {
      console.error("Event stream error:", event);
      this.handleReconnection();
    };
  }

  private handleReconnection() {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error("Max reconnection attempts reached");
      return;
    }

    this.reconnectAttempts++;
    const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);

    console.log(`Attempting reconnection ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`);

    setTimeout(() => {
      if (this.eventSource) {
        this.eventSource.close();
      }
      this.connect();
    }, delay);
  }

  public close() {
    if (this.eventSource) {
      this.eventSource.close();
      this.eventSource = null;
    }
  }
}