or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

accounts.mdauthentication.mdconversations.mdfilters.mdindex.mdinstance.mdlists.mdmedia.mdnotifications.mdpolls.mdreports.mdsearch.mdstatuses.mdstreaming.mdtags.mdtimelines.mdtypes.md
tile.json

streaming.mddocs/

Streaming

Real-time event streaming via HTTP and WebSocket connections.

Overview

Mastodon provides real-time streaming of events through two protocols:

  • HTTP Streaming - Server-Sent Events (SSE) over HTTP
  • WebSocket Streaming - Binary WebSocket connections

Both provide the same events but use different transport mechanisms.

Event Types

Event Interface

type Event interface {
    // contains filtered or unexported methods
}

Event is an interface for all streaming events. Concrete event types implement this interface.

UpdateEvent

type UpdateEvent struct {
    Status *Status
}

UpdateEvent represents a new status appearing in a timeline.

Fields:

  • Status - The new status that was posted

UpdateEditEvent

type UpdateEditEvent struct {
    Status *Status
}

UpdateEditEvent represents a status being edited.

Fields:

  • Status - The edited status with updated content

NotificationEvent

type NotificationEvent struct {
    Notification *Notification
}

NotificationEvent represents a new notification.

Fields:

  • Notification - The notification object

DeleteEvent

type DeleteEvent struct {
    ID ID
}

DeleteEvent represents a status being deleted.

Fields:

  • ID - ID of the deleted status

ConversationEvent

type ConversationEvent struct {
    Conversation *Conversation
}

ConversationEvent represents a conversation update.

Fields:

  • Conversation - The updated conversation

ErrorEvent

type ErrorEvent struct {
    Err error
}

ErrorEvent represents an error in the stream.

Fields:

  • Err - The error that occurred

Method:

func (e *ErrorEvent) Error() string

Returns the error message as a string.

Stream

type Stream struct {
    Event   string
    Payload interface{}
}

Stream is the raw stream data structure (typically not used directly).

Fields:

  • Event - Event type name
  • Payload - Event payload data

HTTP Streaming

HTTP streaming uses Server-Sent Events over HTTP connections.

StreamingUser { .api }

func (c *Client) StreamingUser(ctx context.Context) (chan Event, error)

Streams home timeline events (followed accounts and own posts).

Parameters:

  • ctx - Context for cancellation/timeout

Returns: Channel of Event objects or error

Example:

ctx := context.Background()
events, err := client.StreamingUser(ctx)
if err != nil {
    log.Fatal(err)
}

for event := range events {
    switch e := event.(type) {
    case *mastodon.UpdateEvent:
        fmt.Printf("New status from @%s: %s\n",
            e.Status.Account.Acct, e.Status.Content)

    case *mastodon.NotificationEvent:
        fmt.Printf("Notification: %s from @%s\n",
            e.Notification.Type, e.Notification.Account.Acct)

    case *mastodon.DeleteEvent:
        fmt.Printf("Status deleted: %s\n", e.ID)

    case *mastodon.ErrorEvent:
        log.Printf("Stream error: %v\n", e.Err)
    }
}

StreamingPublic { .api }

func (c *Client) StreamingPublic(ctx context.Context, isLocal bool) (chan Event, error)

Streams public timeline events.

Parameters:

  • ctx - Context for cancellation/timeout
  • isLocal - If true, only stream local statuses; if false, stream federated timeline

Returns: Channel of Event objects or error

Example:

// Stream local public timeline
events, err := client.StreamingPublic(ctx, true)
if err != nil {
    log.Fatal(err)
}

for event := range events {
    if update, ok := event.(*mastodon.UpdateEvent); ok {
        fmt.Printf("Public post: %s\n", update.Status.Content)
    }
}

StreamingHashtag { .api }

func (c *Client) StreamingHashtag(ctx context.Context, tag string, isLocal bool) (chan Event, error)

Streams hashtag timeline events.

Parameters:

  • ctx - Context for cancellation/timeout
  • tag - Hashtag name (without '#' prefix)
  • isLocal - If true, only stream local statuses

Returns: Channel of Event objects or error

Example:

events, err := client.StreamingHashtag(ctx, "golang", false)
if err != nil {
    log.Fatal(err)
}

for event := range events {
    if update, ok := event.(*mastodon.UpdateEvent); ok {
        fmt.Printf("#golang post: %s\n", update.Status.Content)
    }
}

StreamingList { .api }

func (c *Client) StreamingList(ctx context.Context, id ID) (chan Event, error)

Streams list timeline events.

Parameters:

  • ctx - Context for cancellation/timeout
  • id - List ID to stream

Returns: Channel of Event objects or error

Example:

listID := "123456"
events, err := client.StreamingList(ctx, listID)
if err != nil {
    log.Fatal(err)
}

for event := range events {
    if update, ok := event.(*mastodon.UpdateEvent); ok {
        fmt.Printf("List update: %s\n", update.Status.Content)
    }
}

StreamingDirect { .api }

func (c *Client) StreamingDirect(ctx context.Context) (chan Event, error)

Streams direct message events.

Parameters:

  • ctx - Context for cancellation/timeout

Returns: Channel of Event objects or error

Example:

events, err := client.StreamingDirect(ctx)
if err != nil {
    log.Fatal(err)
}

for event := range events {
    if update, ok := event.(*mastodon.UpdateEvent); ok {
        fmt.Printf("DM from @%s: %s\n",
            update.Status.Account.Acct, update.Status.Content)
    }
}

WebSocket Streaming

WebSocket streaming provides the same functionality as HTTP streaming but uses WebSocket protocol.

WSClient

type WSClient struct {
    websocket.Dialer
    // contains filtered or unexported fields
}

WSClient is a WebSocket client for streaming.

NewWSClient { .api }

func (c *Client) NewWSClient() *WSClient

Creates a WebSocket client from the HTTP client.

Returns: WSClient for WebSocket streaming

Example:

wsClient := client.NewWSClient()

StreamingWSUser { .api }

func (c *WSClient) StreamingWSUser(ctx context.Context) (chan Event, error)

WebSocket stream for home timeline.

Parameters:

  • ctx - Context for cancellation/timeout

Returns: Channel of Event objects or error

Example:

wsClient := client.NewWSClient()
events, err := wsClient.StreamingWSUser(ctx)
if err != nil {
    log.Fatal(err)
}

for event := range events {
    switch e := event.(type) {
    case *mastodon.UpdateEvent:
        fmt.Printf("New status: %s\n", e.Status.Content)
    case *mastodon.ErrorEvent:
        log.Printf("Error: %v\n", e.Err)
    }
}

StreamingWSPublic { .api }

func (c *WSClient) StreamingWSPublic(ctx context.Context, isLocal bool) (chan Event, error)

WebSocket stream for public timeline.

Parameters:

  • ctx - Context for cancellation/timeout
  • isLocal - If true, only stream local statuses

Returns: Channel of Event objects or error

StreamingWSHashtag { .api }

func (c *WSClient) StreamingWSHashtag(ctx context.Context, tag string, isLocal bool) (chan Event, error)

WebSocket stream for hashtag timeline.

Parameters:

  • ctx - Context for cancellation/timeout
  • tag - Hashtag name (without '#' prefix)
  • isLocal - If true, only stream local statuses

Returns: Channel of Event objects or error

StreamingWSList { .api }

func (c *WSClient) StreamingWSList(ctx context.Context, id ID) (chan Event, error)

WebSocket stream for list timeline.

Parameters:

  • ctx - Context for cancellation/timeout
  • id - List ID to stream

Returns: Channel of Event objects or error

StreamingWSDirect { .api }

func (c *WSClient) StreamingWSDirect(ctx context.Context) (chan Event, error)

WebSocket stream for direct messages.

Parameters:

  • ctx - Context for cancellation/timeout

Returns: Channel of Event objects or error

Usage Examples

Example: Monitor Multiple Streams

func monitorStreams(client *mastodon.Client) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Start home timeline stream
    homeEvents, err := client.StreamingUser(ctx)
    if err != nil {
        log.Fatal(err)
    }

    // Start hashtag stream
    hashtagEvents, err := client.StreamingHashtag(ctx, "golang", false)
    if err != nil {
        log.Fatal(err)
    }

    // Process events from both streams
    for {
        select {
        case event := <-homeEvents:
            handleHomeEvent(event)

        case event := <-hashtagEvents:
            handleHashtagEvent(event)

        case <-ctx.Done():
            return
        }
    }
}

func handleHomeEvent(event mastodon.Event) {
    if update, ok := event.(*mastodon.UpdateEvent); ok {
        fmt.Printf("[HOME] @%s: %s\n",
            update.Status.Account.Acct, update.Status.Content)
    }
}

func handleHashtagEvent(event mastodon.Event) {
    if update, ok := event.(*mastodon.UpdateEvent); ok {
        fmt.Printf("[#golang] @%s: %s\n",
            update.Status.Account.Acct, update.Status.Content)
    }
}

Example: Reconnection Logic

func streamWithReconnect(client *mastodon.Client) {
    for {
        ctx, cancel := context.WithCancel(context.Background())

        events, err := client.StreamingUser(ctx)
        if err != nil {
            log.Printf("Stream error: %v, reconnecting in 5s...", err)
            cancel()
            time.Sleep(5 * time.Second)
            continue
        }

        // Process events
        for event := range events {
            if errEvent, ok := event.(*mastodon.ErrorEvent); ok {
                log.Printf("Stream error: %v, reconnecting...", errEvent.Err)
                cancel()
                time.Sleep(5 * time.Second)
                break
            }

            handleEvent(event)
        }

        cancel()
    }
}

func handleEvent(event mastodon.Event) {
    switch e := event.(type) {
    case *mastodon.UpdateEvent:
        processStatus(e.Status)
    case *mastodon.NotificationEvent:
        processNotification(e.Notification)
    case *mastodon.DeleteEvent:
        handleDeletion(e.ID)
    }
}

Example: Event Statistics

type StreamStats struct {
    Updates       int
    Notifications int
    Deletes       int
    Errors        int
    mu            sync.Mutex
}

func (s *StreamStats) Record(event mastodon.Event) {
    s.mu.Lock()
    defer s.mu.Unlock()

    switch event.(type) {
    case *mastodon.UpdateEvent:
        s.Updates++
    case *mastodon.NotificationEvent:
        s.Notifications++
    case *mastodon.DeleteEvent:
        s.Deletes++
    case *mastodon.ErrorEvent:
        s.Errors++
    }
}

func (s *StreamStats) String() string {
    s.mu.Lock()
    defer s.mu.Unlock()

    return fmt.Sprintf("Updates: %d, Notifications: %d, Deletes: %d, Errors: %d",
        s.Updates, s.Notifications, s.Deletes, s.Errors)
}

// Usage
stats := &StreamStats{}
events, _ := client.StreamingUser(ctx)

for event := range events {
    stats.Record(event)
    handleEvent(event)
}

Example: Filter Stream Events

func filterMentions(events chan mastodon.Event) chan *mastodon.Status {
    mentions := make(chan *mastodon.Status)

    go func() {
        defer close(mentions)

        for event := range events {
            switch e := event.(type) {
            case *mastodon.UpdateEvent:
                // Check if status mentions current user
                for _, mention := range e.Status.Mentions {
                    // Add your user ID check here
                    mentions <- e.Status
                    break
                }

            case *mastodon.NotificationEvent:
                if e.Notification.Type == "mention" && e.Notification.Status != nil {
                    mentions <- e.Notification.Status
                }
            }
        }
    }()

    return mentions
}

// Usage
events, _ := client.StreamingUser(ctx)
mentions := filterMentions(events)

for status := range mentions {
    fmt.Printf("Mentioned by @%s: %s\n",
        status.Account.Acct, status.Content)
}

Example: WebSocket vs HTTP Comparison

// HTTP Streaming (Server-Sent Events)
func useHTTPStreaming(client *mastodon.Client) {
    ctx := context.Background()
    events, err := client.StreamingUser(ctx)
    if err != nil {
        log.Fatal(err)
    }

    for event := range events {
        handleEvent(event)
    }
}

// WebSocket Streaming
func useWebSocketStreaming(client *mastodon.Client) {
    ctx := context.Background()
    wsClient := client.NewWSClient()
    events, err := wsClient.StreamingWSUser(ctx)
    if err != nil {
        log.Fatal(err)
    }

    for event := range events {
        handleEvent(event)
    }
}

Best Practices

1. Use Context for Cancellation

Always pass a context that can be cancelled:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

events, err := client.StreamingUser(ctx)

// Cancel when done
cancel()

2. Handle All Event Types

Use type assertions or switches to handle all event types:

for event := range events {
    switch e := event.(type) {
    case *mastodon.UpdateEvent:
        // Handle new status
    case *mastodon.NotificationEvent:
        // Handle notification
    case *mastodon.DeleteEvent:
        // Handle deletion
    case *mastodon.UpdateEditEvent:
        // Handle edit
    case *mastodon.ConversationEvent:
        // Handle conversation
    case *mastodon.ErrorEvent:
        // Handle error - consider reconnecting
        log.Printf("Stream error: %v", e.Err)
    }
}

3. Implement Reconnection Logic

Streams can disconnect; implement automatic reconnection:

func maintainStream(client *mastodon.Client) {
    backoff := time.Second

    for {
        err := runStream(client)
        if err != nil {
            log.Printf("Stream disconnected: %v", err)
            time.Sleep(backoff)
            backoff = backoff * 2
            if backoff > time.Minute {
                backoff = time.Minute
            }
        }
    }
}

4. Choose Appropriate Protocol

  • HTTP Streaming: Better firewall/proxy compatibility, simpler
  • WebSocket: Lower latency, more efficient for high-volume streams

5. Monitor for Errors

ErrorEvents indicate stream problems:

case *mastodon.ErrorEvent:
    log.Printf("Stream error: %v", e.Err)
    // Consider reconnecting
    return

6. Rate Limit Awareness

Streams count toward API rate limits. Don't open unnecessary streams.

7. Clean Shutdown

Always close streams gracefully:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

events, err := client.StreamingUser(ctx)

// On shutdown signal
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)

go func() {
    <-sigChan
    cancel() // Gracefully close stream
}()

Performance Considerations

HTTP vs WebSocket

HTTP Streaming (SSE):

  • ✓ Better compatibility with proxies/firewalls
  • ✓ Simpler protocol
  • ✗ Higher latency
  • ✗ More overhead per message

WebSocket:

  • ✓ Lower latency
  • ✓ More efficient for high-volume
  • ✓ Binary protocol
  • ✗ May be blocked by some proxies
  • ✗ More complex connection handling

Memory Management

Unbounded channel buffers can cause memory issues:

// Don't let events pile up
for event := range events {
    // Process immediately or use buffered channel
    go handleEvent(event)
}

Related Types

See also:

  • Statuses - For UpdateEvent status data
  • Notifications - For NotificationEvent data
  • Timelines - For timeline polling alternative
  • Types - For common types