CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mastodon-py

Python wrapper for the Mastodon API providing comprehensive access to social media functionality

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

streaming.mddocs/

Real-time Streaming

WebSocket-based streaming for real-time updates from timelines, notifications, and user events. Enables applications to receive live updates without polling, providing immediate notification of new posts, mentions, and other activities.

Capabilities

Stream Listeners

Base classes for handling streaming events with customizable callback methods.

class StreamListener:
    """
    Base class for handling streaming events.
    Override methods for events you want to handle.
    """
    
    def on_update(self, status: dict):
        """
        Handle new status updates.
        
        Args:
            status: Status dictionary for new post
        """
        pass
    
    def on_notification(self, notification: dict):
        """
        Handle new notifications.
        
        Args:
            notification: Notification dictionary
        """
        pass
    
    def on_delete(self, status_id: int):
        """
        Handle status deletions.
        
        Args:
            status_id: ID of deleted status
        """
        pass
    
    def on_filters_changed(self):
        """
        Handle filter updates (user changed content filters).
        No payload - refetch filters if needed.
        """
        pass
    
    def on_conversation(self, conversation: dict):
        """
        Handle direct message conversations.
        
        Args:
            conversation: Conversation dictionary
        """
        pass
    
    def on_announcement(self, announcement: dict):
        """
        Handle instance announcements.
        
        Args:
            announcement: Announcement dictionary
        """
        pass
    
    def on_announcement_reaction(self, reaction: dict):
        """
        Handle reactions to announcements.
        
        Args:
            reaction: Reaction dictionary
        """
        pass
    
    def on_announcement_delete(self, announcement_id: int):
        """
        Handle announcement deletions.
        
        Args:
            announcement_id: ID of deleted announcement
        """
        pass
    
    def on_status_update(self, status: dict):
        """
        Handle status edits.
        
        Args:
            status: Updated status dictionary
        """
        pass
    
    def on_encrypted_message(self, data: dict):
        """
        Handle encrypted messages (currently unused).
        
        Args:
            data: Encrypted message data
        """
        pass
    
    def on_abort(self, err: Exception):
        """
        Handle connection errors and stream failures.
        
        Args:
            err: Exception that caused the abort
        """
        pass
    
    def on_unknown_event(self, name: str, unknown_event: dict = None):
        """
        Handle unknown event types.
        
        Args:
            name: Event name
            unknown_event: Raw event data
        """
        pass

class CallbackStreamListener(StreamListener):
    """
    Stream listener that uses callback functions instead of inheritance.
    """
    
    def __init__(self, **callbacks):
        """
        Initialize with callback functions.
        
        Args:
            **callbacks: Callback functions (on_update=func, on_notification=func, etc.)
        """
        pass

Stream Endpoints

Connect to various streaming endpoints for different types of real-time data.

def stream_user(
    self,
    listener: StreamListener,
    run_async: bool = False,
    timeout: int = 300,
    reconnect_async: bool = False,
    reconnect_async_wait_sec: int = 5
):
    """
    Stream the authenticated user's timeline and notifications.
    
    Args:
        listener: StreamListener instance to handle events
        run_async: Run stream in background thread
        timeout: Connection timeout in seconds
        reconnect_async: Automatically reconnect on failure
        reconnect_async_wait_sec: Wait time between reconnection attempts
    """

def stream_public(
    self,
    listener: StreamListener,
    run_async: bool = False,
    timeout: int = 300,
    reconnect_async: bool = False,
    reconnect_async_wait_sec: int = 5
):
    """
    Stream the public timeline.
    
    Args:
        listener: StreamListener instance to handle events
        run_async: Run stream in background thread
        timeout: Connection timeout in seconds
        reconnect_async: Automatically reconnect on failure
        reconnect_async_wait_sec: Wait time between reconnection attempts
    """

def stream_local(
    self,
    listener: StreamListener,
    run_async: bool = False,
    timeout: int = 300,
    reconnect_async: bool = False,
    reconnect_async_wait_sec: int = 5
):
    """
    Stream the local instance timeline (deprecated).
    
    Args:
        listener: StreamListener instance to handle events
        run_async: Run stream in background thread
        timeout: Connection timeout in seconds
        reconnect_async: Automatically reconnect on failure
        reconnect_async_wait_sec: Wait time between reconnection attempts
    """

def stream_hashtag(
    self,
    tag: str,
    listener: StreamListener,
    local: bool = False,
    run_async: bool = False,
    timeout: int = 300,
    reconnect_async: bool = False,
    reconnect_async_wait_sec: int = 5
):
    """
    Stream posts containing a specific hashtag.
    
    Args:
        tag: Hashtag to stream (without # symbol)
        listener: StreamListener instance to handle events
        local: Only stream from local instance
        run_async: Run stream in background thread
        timeout: Connection timeout in seconds
        reconnect_async: Automatically reconnect on failure
        reconnect_async_wait_sec: Wait time between reconnection attempts
    """

def stream_list(
    self,
    id: int,
    listener: StreamListener,
    run_async: bool = False,
    timeout: int = 300,
    reconnect_async: bool = False,
    reconnect_async_wait_sec: int = 5
):
    """
    Stream posts from a specific list.
    
    Args:
        id: List ID to stream
        listener: StreamListener instance to handle events
        run_async: Run stream in background thread
        timeout: Connection timeout in seconds
        reconnect_async: Automatically reconnect on failure
        reconnect_async_wait_sec: Wait time between reconnection attempts
    """

def stream_direct(
    self,
    listener: StreamListener,
    run_async: bool = False,
    timeout: int = 300,
    reconnect_async: bool = False,
    reconnect_async_wait_sec: int = 5
):
    """
    Stream direct messages.
    
    Args:
        listener: StreamListener instance to handle events
        run_async: Run stream in background thread
        timeout: Connection timeout in seconds
        reconnect_async: Automatically reconnect on failure
        reconnect_async_wait_sec: Wait time between reconnection attempts
    """

Stream Health Monitoring

Check streaming API availability and health status.

def stream_healthy(self) -> bool:
    """
    Check if the streaming API is available and healthy.
    
    Returns:
        True if streaming is available, False otherwise
    """

Usage Examples

Basic Stream Listener

from mastodon import Mastodon, StreamListener

class MyStreamListener(StreamListener):
    def on_update(self, status):
        print(f"New post from {status['account']['acct']}: {status['content']}")
    
    def on_notification(self, notification):
        print(f"Notification: {notification['type']} from {notification['account']['acct']}")
    
    def on_delete(self, status_id):
        print(f"Status {status_id} was deleted")
    
    def on_abort(self, err):
        print(f"Stream error: {err}")

# Set up the client and listener
mastodon = Mastodon(
    access_token='your_token',
    api_base_url='https://mastodon.social'
)

listener = MyStreamListener()

# Start streaming user timeline
print("Starting user stream...")
mastodon.stream_user(listener)

Callback-Based Streaming

from mastodon import Mastodon, CallbackStreamListener

def handle_update(status):
    print(f"📝 {status['account']['acct']}: {status['content'][:50]}...")

def handle_notification(notification):
    account = notification['account']['acct']
    notif_type = notification['type']
    print(f"🔔 {notif_type} from {account}")

def handle_error(err):
    print(f"❌ Stream error: {err}")

# Create callback listener
listener = CallbackStreamListener(
    on_update=handle_update,
    on_notification=handle_notification,
    on_abort=handle_error
)

mastodon = Mastodon(
    access_token='your_token',
    api_base_url='https://mastodon.social'
)

# Stream with callbacks
mastodon.stream_user(listener)

Asynchronous Streaming

import threading
import time
from mastodon import Mastodon, StreamListener

class AsyncStreamListener(StreamListener):
    def __init__(self):
        self.running = True
        self.message_count = 0
    
    def on_update(self, status):
        self.message_count += 1
        print(f"Message #{self.message_count}: {status['account']['acct']}")
    
    def on_abort(self, err):
        print(f"Stream disconnected: {err}")
        if self.running:
            print("Attempting to reconnect...")

mastodon = Mastodon(
    access_token='your_token',
    api_base_url='https://mastodon.social'
)

listener = AsyncStreamListener()

# Start stream in background with auto-reconnect
print("Starting async stream with auto-reconnect...")
mastodon.stream_user(
    listener,
    run_async=True,
    reconnect_async=True,
    reconnect_async_wait_sec=10
)

# Do other work while streaming runs in background
try:
    while True:
        print("Main thread doing other work...")
        time.sleep(30)
except KeyboardInterrupt:
    listener.running = False
    print("Stopping stream...")

Hashtag and List Streaming

from mastodon import Mastodon, StreamListener

class HashtagListener(StreamListener):
    def __init__(self, hashtag):
        self.hashtag = hashtag
    
    def on_update(self, status):
        # Filter out reblogs for cleaner output
        if status.get('reblog') is None:
            print(f"#{self.hashtag}: {status['account']['acct']} - {status['content'][:100]}...")

mastodon = Mastodon(
    access_token='your_token',
    api_base_url='https://mastodon.social'
)

# Stream a specific hashtag
hashtag_listener = HashtagListener("python")
print("Streaming #python hashtag...")
mastodon.stream_hashtag("python", hashtag_listener, local=False)

# Alternative: Stream from a list
# Get your lists first
# lists = mastodon.lists()
# if lists:
#     list_listener = StreamListener()
#     mastodon.stream_list(lists[0]['id'], list_listener)

Multi-Stream Manager

import threading
from mastodon import Mastodon, StreamListener

class MultiStreamManager:
    def __init__(self, mastodon_client):
        self.mastodon = mastodon_client
        self.streams = []
        self.running = False
    
    def start_user_stream(self):
        listener = self.UserStreamListener()
        thread = threading.Thread(
            target=self.mastodon.stream_user,
            args=(listener,),
            kwargs={'run_async': False}
        )
        thread.daemon = True
        self.streams.append(thread)
        thread.start()
    
    def start_hashtag_stream(self, hashtag):
        listener = self.HashtagStreamListener(hashtag)
        thread = threading.Thread(
            target=self.mastodon.stream_hashtag,
            args=(hashtag, listener),
            kwargs={'run_async': False}
        )
        thread.daemon = True
        self.streams.append(thread)
        thread.start()
    
    class UserStreamListener(StreamListener):
        def on_notification(self, notification):
            print(f"🔔 {notification['type']}: {notification['account']['acct']}")
    
    class HashtagStreamListener(StreamListener):
        def __init__(self, hashtag):
            self.hashtag = hashtag
        
        def on_update(self, status):
            print(f"#{self.hashtag}: New post from {status['account']['acct']}")

# Usage
mastodon = Mastodon(
    access_token='your_token',
    api_base_url='https://mastodon.social'
)

manager = MultiStreamManager(mastodon)
manager.start_user_stream()
manager.start_hashtag_stream("opensource")
manager.start_hashtag_stream("python")

print("Multiple streams running...")
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("Shutting down streams...")

Stream Health Monitoring

from mastodon import Mastodon, StreamListener
import time

class ReliableStreamListener(StreamListener):
    def __init__(self, mastodon_client):
        self.mastodon = mastodon_client
        self.last_message = time.time()
    
    def on_update(self, status):
        self.last_message = time.time()
        print(f"Update: {status['account']['acct']}")
    
    def on_notification(self, notification):
        self.last_message = time.time()
        print(f"Notification: {notification['type']}")
    
    def on_abort(self, err):
        print(f"Stream error: {err}")
        self.reconnect_if_needed()
    
    def reconnect_if_needed(self):
        if self.mastodon.stream_healthy():
            print("Streaming API is healthy, reconnecting...")
            time.sleep(5)
            self.start_stream()
        else:
            print("Streaming API is unhealthy, waiting...")
            time.sleep(30)
            self.reconnect_if_needed()
    
    def start_stream(self):
        try:
            self.mastodon.stream_user(self)
        except Exception as e:
            print(f"Failed to start stream: {e}")
            self.reconnect_if_needed()

# Usage with health monitoring
mastodon = Mastodon(
    access_token='your_token',
    api_base_url='https://mastodon.social'
)

# Check if streaming is available
if mastodon.stream_healthy():
    print("Starting reliable stream...")
    listener = ReliableStreamListener(mastodon)
    listener.start_stream()
else:
    print("Streaming API is currently unavailable")

Types

# Stream notification types
NOTIFICATION_TYPES = [
    'mention',          # Mentioned in a status
    'status',          # Someone you follow posted
    'reblog',          # Your status was reblogged
    'follow',          # Someone followed you
    'follow_request',  # Someone requested to follow you
    'favourite',       # Your status was favorited
    'poll',           # Poll you voted in or created has ended
    'update',         # Status you interacted with was edited
    'admin.sign_up',  # New user signed up (admin only)
    'admin.report',   # New report submitted (admin only)
]

# Stream event types
STREAM_EVENTS = [
    'update',                    # New status
    'delete',                    # Status deleted
    'notification',              # New notification
    'filters_changed',           # Content filters updated
    'conversation',              # Direct message
    'announcement',              # Instance announcement
    'announcement_reaction',     # Announcement reaction
    'announcement_delete',       # Announcement deleted
    'status_update',            # Status edited
    'encrypted_message',        # Encrypted message (unused)
]

# Stream listener event mapping
StreamListener.__EVENT_NAME_TO_TYPE = {
    "update": dict,              # Status object
    "delete": int,               # Status ID
    "notification": dict,        # Notification object
    "filters_changed": None,     # No payload
    "conversation": dict,        # Conversation object
    "announcement": dict,        # Announcement object
    "announcement_reaction": dict, # Reaction object
    "announcement_delete": int,  # Announcement ID
    "status_update": dict,       # Updated status object
    "encrypted_message": dict,   # Encrypted data
}

Install with Tessl CLI

npx tessl i tessl/pypi-mastodon-py

docs

accounts.md

authentication.md

index.md

search.md

statuses.md

streaming.md

tile.json