Real-time event streaming via HTTP and WebSocket connections.
Mastodon provides real-time streaming of events through two protocols:
Both provide the same events but use different transport mechanisms.
type Event interface {
// contains filtered or unexported methods
}Event is an interface for all streaming events. Concrete event types implement this interface.
type UpdateEvent struct {
Status *Status
}UpdateEvent represents a new status appearing in a timeline.
Fields:
Status - The new status that was postedtype UpdateEditEvent struct {
Status *Status
}UpdateEditEvent represents a status being edited.
Fields:
Status - The edited status with updated contenttype NotificationEvent struct {
Notification *Notification
}NotificationEvent represents a new notification.
Fields:
Notification - The notification objecttype DeleteEvent struct {
ID ID
}DeleteEvent represents a status being deleted.
Fields:
ID - ID of the deleted statustype ConversationEvent struct {
Conversation *Conversation
}ConversationEvent represents a conversation update.
Fields:
Conversation - The updated conversationtype ErrorEvent struct {
Err error
}ErrorEvent represents an error in the stream.
Fields:
Err - The error that occurredMethod:
func (e *ErrorEvent) Error() stringReturns the error message as a string.
type Stream struct {
Event string
Payload interface{}
}Stream is the raw stream data structure (typically not used directly).
Fields:
Event - Event type namePayload - Event payload dataHTTP streaming uses Server-Sent Events over HTTP connections.
func (c *Client) StreamingUser(ctx context.Context) (chan Event, error)Streams home timeline events (followed accounts and own posts).
Parameters:
ctx - Context for cancellation/timeoutReturns: Channel of Event objects or error
Example:
ctx := context.Background()
events, err := client.StreamingUser(ctx)
if err != nil {
log.Fatal(err)
}
for event := range events {
switch e := event.(type) {
case *mastodon.UpdateEvent:
fmt.Printf("New status from @%s: %s\n",
e.Status.Account.Acct, e.Status.Content)
case *mastodon.NotificationEvent:
fmt.Printf("Notification: %s from @%s\n",
e.Notification.Type, e.Notification.Account.Acct)
case *mastodon.DeleteEvent:
fmt.Printf("Status deleted: %s\n", e.ID)
case *mastodon.ErrorEvent:
log.Printf("Stream error: %v\n", e.Err)
}
}func (c *Client) StreamingPublic(ctx context.Context, isLocal bool) (chan Event, error)Streams public timeline events.
Parameters:
ctx - Context for cancellation/timeoutisLocal - If true, only stream local statuses; if false, stream federated timelineReturns: Channel of Event objects or error
Example:
// Stream local public timeline
events, err := client.StreamingPublic(ctx, true)
if err != nil {
log.Fatal(err)
}
for event := range events {
if update, ok := event.(*mastodon.UpdateEvent); ok {
fmt.Printf("Public post: %s\n", update.Status.Content)
}
}func (c *Client) StreamingHashtag(ctx context.Context, tag string, isLocal bool) (chan Event, error)Streams hashtag timeline events.
Parameters:
ctx - Context for cancellation/timeouttag - Hashtag name (without '#' prefix)isLocal - If true, only stream local statusesReturns: Channel of Event objects or error
Example:
events, err := client.StreamingHashtag(ctx, "golang", false)
if err != nil {
log.Fatal(err)
}
for event := range events {
if update, ok := event.(*mastodon.UpdateEvent); ok {
fmt.Printf("#golang post: %s\n", update.Status.Content)
}
}func (c *Client) StreamingList(ctx context.Context, id ID) (chan Event, error)Streams list timeline events.
Parameters:
ctx - Context for cancellation/timeoutid - List ID to streamReturns: Channel of Event objects or error
Example:
listID := "123456"
events, err := client.StreamingList(ctx, listID)
if err != nil {
log.Fatal(err)
}
for event := range events {
if update, ok := event.(*mastodon.UpdateEvent); ok {
fmt.Printf("List update: %s\n", update.Status.Content)
}
}func (c *Client) StreamingDirect(ctx context.Context) (chan Event, error)Streams direct message events.
Parameters:
ctx - Context for cancellation/timeoutReturns: Channel of Event objects or error
Example:
events, err := client.StreamingDirect(ctx)
if err != nil {
log.Fatal(err)
}
for event := range events {
if update, ok := event.(*mastodon.UpdateEvent); ok {
fmt.Printf("DM from @%s: %s\n",
update.Status.Account.Acct, update.Status.Content)
}
}WebSocket streaming provides the same functionality as HTTP streaming but uses WebSocket protocol.
type WSClient struct {
websocket.Dialer
// contains filtered or unexported fields
}WSClient is a WebSocket client for streaming.
func (c *Client) NewWSClient() *WSClientCreates a WebSocket client from the HTTP client.
Returns: WSClient for WebSocket streaming
Example:
wsClient := client.NewWSClient()func (c *WSClient) StreamingWSUser(ctx context.Context) (chan Event, error)WebSocket stream for home timeline.
Parameters:
ctx - Context for cancellation/timeoutReturns: Channel of Event objects or error
Example:
wsClient := client.NewWSClient()
events, err := wsClient.StreamingWSUser(ctx)
if err != nil {
log.Fatal(err)
}
for event := range events {
switch e := event.(type) {
case *mastodon.UpdateEvent:
fmt.Printf("New status: %s\n", e.Status.Content)
case *mastodon.ErrorEvent:
log.Printf("Error: %v\n", e.Err)
}
}func (c *WSClient) StreamingWSPublic(ctx context.Context, isLocal bool) (chan Event, error)WebSocket stream for public timeline.
Parameters:
ctx - Context for cancellation/timeoutisLocal - If true, only stream local statusesReturns: Channel of Event objects or error
func (c *WSClient) StreamingWSHashtag(ctx context.Context, tag string, isLocal bool) (chan Event, error)WebSocket stream for hashtag timeline.
Parameters:
ctx - Context for cancellation/timeouttag - Hashtag name (without '#' prefix)isLocal - If true, only stream local statusesReturns: Channel of Event objects or error
func (c *WSClient) StreamingWSList(ctx context.Context, id ID) (chan Event, error)WebSocket stream for list timeline.
Parameters:
ctx - Context for cancellation/timeoutid - List ID to streamReturns: Channel of Event objects or error
func (c *WSClient) StreamingWSDirect(ctx context.Context) (chan Event, error)WebSocket stream for direct messages.
Parameters:
ctx - Context for cancellation/timeoutReturns: Channel of Event objects or error
func monitorStreams(client *mastodon.Client) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start home timeline stream
homeEvents, err := client.StreamingUser(ctx)
if err != nil {
log.Fatal(err)
}
// Start hashtag stream
hashtagEvents, err := client.StreamingHashtag(ctx, "golang", false)
if err != nil {
log.Fatal(err)
}
// Process events from both streams
for {
select {
case event := <-homeEvents:
handleHomeEvent(event)
case event := <-hashtagEvents:
handleHashtagEvent(event)
case <-ctx.Done():
return
}
}
}
func handleHomeEvent(event mastodon.Event) {
if update, ok := event.(*mastodon.UpdateEvent); ok {
fmt.Printf("[HOME] @%s: %s\n",
update.Status.Account.Acct, update.Status.Content)
}
}
func handleHashtagEvent(event mastodon.Event) {
if update, ok := event.(*mastodon.UpdateEvent); ok {
fmt.Printf("[#golang] @%s: %s\n",
update.Status.Account.Acct, update.Status.Content)
}
}func streamWithReconnect(client *mastodon.Client) {
for {
ctx, cancel := context.WithCancel(context.Background())
events, err := client.StreamingUser(ctx)
if err != nil {
log.Printf("Stream error: %v, reconnecting in 5s...", err)
cancel()
time.Sleep(5 * time.Second)
continue
}
// Process events
for event := range events {
if errEvent, ok := event.(*mastodon.ErrorEvent); ok {
log.Printf("Stream error: %v, reconnecting...", errEvent.Err)
cancel()
time.Sleep(5 * time.Second)
break
}
handleEvent(event)
}
cancel()
}
}
func handleEvent(event mastodon.Event) {
switch e := event.(type) {
case *mastodon.UpdateEvent:
processStatus(e.Status)
case *mastodon.NotificationEvent:
processNotification(e.Notification)
case *mastodon.DeleteEvent:
handleDeletion(e.ID)
}
}type StreamStats struct {
Updates int
Notifications int
Deletes int
Errors int
mu sync.Mutex
}
func (s *StreamStats) Record(event mastodon.Event) {
s.mu.Lock()
defer s.mu.Unlock()
switch event.(type) {
case *mastodon.UpdateEvent:
s.Updates++
case *mastodon.NotificationEvent:
s.Notifications++
case *mastodon.DeleteEvent:
s.Deletes++
case *mastodon.ErrorEvent:
s.Errors++
}
}
func (s *StreamStats) String() string {
s.mu.Lock()
defer s.mu.Unlock()
return fmt.Sprintf("Updates: %d, Notifications: %d, Deletes: %d, Errors: %d",
s.Updates, s.Notifications, s.Deletes, s.Errors)
}
// Usage
stats := &StreamStats{}
events, _ := client.StreamingUser(ctx)
for event := range events {
stats.Record(event)
handleEvent(event)
}func filterMentions(events chan mastodon.Event) chan *mastodon.Status {
mentions := make(chan *mastodon.Status)
go func() {
defer close(mentions)
for event := range events {
switch e := event.(type) {
case *mastodon.UpdateEvent:
// Check if status mentions current user
for _, mention := range e.Status.Mentions {
// Add your user ID check here
mentions <- e.Status
break
}
case *mastodon.NotificationEvent:
if e.Notification.Type == "mention" && e.Notification.Status != nil {
mentions <- e.Notification.Status
}
}
}
}()
return mentions
}
// Usage
events, _ := client.StreamingUser(ctx)
mentions := filterMentions(events)
for status := range mentions {
fmt.Printf("Mentioned by @%s: %s\n",
status.Account.Acct, status.Content)
}// HTTP Streaming (Server-Sent Events)
func useHTTPStreaming(client *mastodon.Client) {
ctx := context.Background()
events, err := client.StreamingUser(ctx)
if err != nil {
log.Fatal(err)
}
for event := range events {
handleEvent(event)
}
}
// WebSocket Streaming
func useWebSocketStreaming(client *mastodon.Client) {
ctx := context.Background()
wsClient := client.NewWSClient()
events, err := wsClient.StreamingWSUser(ctx)
if err != nil {
log.Fatal(err)
}
for event := range events {
handleEvent(event)
}
}Always pass a context that can be cancelled:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
events, err := client.StreamingUser(ctx)
// Cancel when done
cancel()Use type assertions or switches to handle all event types:
for event := range events {
switch e := event.(type) {
case *mastodon.UpdateEvent:
// Handle new status
case *mastodon.NotificationEvent:
// Handle notification
case *mastodon.DeleteEvent:
// Handle deletion
case *mastodon.UpdateEditEvent:
// Handle edit
case *mastodon.ConversationEvent:
// Handle conversation
case *mastodon.ErrorEvent:
// Handle error - consider reconnecting
log.Printf("Stream error: %v", e.Err)
}
}Streams can disconnect; implement automatic reconnection:
func maintainStream(client *mastodon.Client) {
backoff := time.Second
for {
err := runStream(client)
if err != nil {
log.Printf("Stream disconnected: %v", err)
time.Sleep(backoff)
backoff = backoff * 2
if backoff > time.Minute {
backoff = time.Minute
}
}
}
}ErrorEvents indicate stream problems:
case *mastodon.ErrorEvent:
log.Printf("Stream error: %v", e.Err)
// Consider reconnecting
returnStreams count toward API rate limits. Don't open unnecessary streams.
Always close streams gracefully:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
events, err := client.StreamingUser(ctx)
// On shutdown signal
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
go func() {
<-sigChan
cancel() // Gracefully close stream
}()HTTP Streaming (SSE):
WebSocket:
Unbounded channel buffers can cause memory issues:
// Don't let events pile up
for event := range events {
// Process immediately or use buffered channel
go handleEvent(event)
}See also: