Python wrapper for the Mastodon API providing comprehensive access to social media functionality
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
WebSocket-based streaming for real-time updates from timelines, notifications, and user events. Enables applications to receive live updates without polling, providing immediate notification of new posts, mentions, and other activities.
Base classes for handling streaming events with customizable callback methods.
class StreamListener:
"""
Base class for handling streaming events.
Override methods for events you want to handle.
"""
def on_update(self, status: dict):
"""
Handle new status updates.
Args:
status: Status dictionary for new post
"""
pass
def on_notification(self, notification: dict):
"""
Handle new notifications.
Args:
notification: Notification dictionary
"""
pass
def on_delete(self, status_id: int):
"""
Handle status deletions.
Args:
status_id: ID of deleted status
"""
pass
def on_filters_changed(self):
"""
Handle filter updates (user changed content filters).
No payload - refetch filters if needed.
"""
pass
def on_conversation(self, conversation: dict):
"""
Handle direct message conversations.
Args:
conversation: Conversation dictionary
"""
pass
def on_announcement(self, announcement: dict):
"""
Handle instance announcements.
Args:
announcement: Announcement dictionary
"""
pass
def on_announcement_reaction(self, reaction: dict):
"""
Handle reactions to announcements.
Args:
reaction: Reaction dictionary
"""
pass
def on_announcement_delete(self, announcement_id: int):
"""
Handle announcement deletions.
Args:
announcement_id: ID of deleted announcement
"""
pass
def on_status_update(self, status: dict):
"""
Handle status edits.
Args:
status: Updated status dictionary
"""
pass
def on_encrypted_message(self, data: dict):
"""
Handle encrypted messages (currently unused).
Args:
data: Encrypted message data
"""
pass
def on_abort(self, err: Exception):
"""
Handle connection errors and stream failures.
Args:
err: Exception that caused the abort
"""
pass
def on_unknown_event(self, name: str, unknown_event: dict = None):
"""
Handle unknown event types.
Args:
name: Event name
unknown_event: Raw event data
"""
pass
class CallbackStreamListener(StreamListener):
"""
Stream listener that uses callback functions instead of inheritance.
"""
def __init__(self, **callbacks):
"""
Initialize with callback functions.
Args:
**callbacks: Callback functions (on_update=func, on_notification=func, etc.)
"""
passConnect to various streaming endpoints for different types of real-time data.
def stream_user(
self,
listener: StreamListener,
run_async: bool = False,
timeout: int = 300,
reconnect_async: bool = False,
reconnect_async_wait_sec: int = 5
):
"""
Stream the authenticated user's timeline and notifications.
Args:
listener: StreamListener instance to handle events
run_async: Run stream in background thread
timeout: Connection timeout in seconds
reconnect_async: Automatically reconnect on failure
reconnect_async_wait_sec: Wait time between reconnection attempts
"""
def stream_public(
self,
listener: StreamListener,
run_async: bool = False,
timeout: int = 300,
reconnect_async: bool = False,
reconnect_async_wait_sec: int = 5
):
"""
Stream the public timeline.
Args:
listener: StreamListener instance to handle events
run_async: Run stream in background thread
timeout: Connection timeout in seconds
reconnect_async: Automatically reconnect on failure
reconnect_async_wait_sec: Wait time between reconnection attempts
"""
def stream_local(
self,
listener: StreamListener,
run_async: bool = False,
timeout: int = 300,
reconnect_async: bool = False,
reconnect_async_wait_sec: int = 5
):
"""
Stream the local instance timeline (deprecated).
Args:
listener: StreamListener instance to handle events
run_async: Run stream in background thread
timeout: Connection timeout in seconds
reconnect_async: Automatically reconnect on failure
reconnect_async_wait_sec: Wait time between reconnection attempts
"""
def stream_hashtag(
self,
tag: str,
listener: StreamListener,
local: bool = False,
run_async: bool = False,
timeout: int = 300,
reconnect_async: bool = False,
reconnect_async_wait_sec: int = 5
):
"""
Stream posts containing a specific hashtag.
Args:
tag: Hashtag to stream (without # symbol)
listener: StreamListener instance to handle events
local: Only stream from local instance
run_async: Run stream in background thread
timeout: Connection timeout in seconds
reconnect_async: Automatically reconnect on failure
reconnect_async_wait_sec: Wait time between reconnection attempts
"""
def stream_list(
self,
id: int,
listener: StreamListener,
run_async: bool = False,
timeout: int = 300,
reconnect_async: bool = False,
reconnect_async_wait_sec: int = 5
):
"""
Stream posts from a specific list.
Args:
id: List ID to stream
listener: StreamListener instance to handle events
run_async: Run stream in background thread
timeout: Connection timeout in seconds
reconnect_async: Automatically reconnect on failure
reconnect_async_wait_sec: Wait time between reconnection attempts
"""
def stream_direct(
self,
listener: StreamListener,
run_async: bool = False,
timeout: int = 300,
reconnect_async: bool = False,
reconnect_async_wait_sec: int = 5
):
"""
Stream direct messages.
Args:
listener: StreamListener instance to handle events
run_async: Run stream in background thread
timeout: Connection timeout in seconds
reconnect_async: Automatically reconnect on failure
reconnect_async_wait_sec: Wait time between reconnection attempts
"""Check streaming API availability and health status.
def stream_healthy(self) -> bool:
"""
Check if the streaming API is available and healthy.
Returns:
True if streaming is available, False otherwise
"""from mastodon import Mastodon, StreamListener
class MyStreamListener(StreamListener):
def on_update(self, status):
print(f"New post from {status['account']['acct']}: {status['content']}")
def on_notification(self, notification):
print(f"Notification: {notification['type']} from {notification['account']['acct']}")
def on_delete(self, status_id):
print(f"Status {status_id} was deleted")
def on_abort(self, err):
print(f"Stream error: {err}")
# Set up the client and listener
mastodon = Mastodon(
access_token='your_token',
api_base_url='https://mastodon.social'
)
listener = MyStreamListener()
# Start streaming user timeline
print("Starting user stream...")
mastodon.stream_user(listener)from mastodon import Mastodon, CallbackStreamListener
def handle_update(status):
print(f"📝 {status['account']['acct']}: {status['content'][:50]}...")
def handle_notification(notification):
account = notification['account']['acct']
notif_type = notification['type']
print(f"🔔 {notif_type} from {account}")
def handle_error(err):
print(f"❌ Stream error: {err}")
# Create callback listener
listener = CallbackStreamListener(
on_update=handle_update,
on_notification=handle_notification,
on_abort=handle_error
)
mastodon = Mastodon(
access_token='your_token',
api_base_url='https://mastodon.social'
)
# Stream with callbacks
mastodon.stream_user(listener)import threading
import time
from mastodon import Mastodon, StreamListener
class AsyncStreamListener(StreamListener):
def __init__(self):
self.running = True
self.message_count = 0
def on_update(self, status):
self.message_count += 1
print(f"Message #{self.message_count}: {status['account']['acct']}")
def on_abort(self, err):
print(f"Stream disconnected: {err}")
if self.running:
print("Attempting to reconnect...")
mastodon = Mastodon(
access_token='your_token',
api_base_url='https://mastodon.social'
)
listener = AsyncStreamListener()
# Start stream in background with auto-reconnect
print("Starting async stream with auto-reconnect...")
mastodon.stream_user(
listener,
run_async=True,
reconnect_async=True,
reconnect_async_wait_sec=10
)
# Do other work while streaming runs in background
try:
while True:
print("Main thread doing other work...")
time.sleep(30)
except KeyboardInterrupt:
listener.running = False
print("Stopping stream...")from mastodon import Mastodon, StreamListener
class HashtagListener(StreamListener):
def __init__(self, hashtag):
self.hashtag = hashtag
def on_update(self, status):
# Filter out reblogs for cleaner output
if status.get('reblog') is None:
print(f"#{self.hashtag}: {status['account']['acct']} - {status['content'][:100]}...")
mastodon = Mastodon(
access_token='your_token',
api_base_url='https://mastodon.social'
)
# Stream a specific hashtag
hashtag_listener = HashtagListener("python")
print("Streaming #python hashtag...")
mastodon.stream_hashtag("python", hashtag_listener, local=False)
# Alternative: Stream from a list
# Get your lists first
# lists = mastodon.lists()
# if lists:
# list_listener = StreamListener()
# mastodon.stream_list(lists[0]['id'], list_listener)import threading
from mastodon import Mastodon, StreamListener
class MultiStreamManager:
def __init__(self, mastodon_client):
self.mastodon = mastodon_client
self.streams = []
self.running = False
def start_user_stream(self):
listener = self.UserStreamListener()
thread = threading.Thread(
target=self.mastodon.stream_user,
args=(listener,),
kwargs={'run_async': False}
)
thread.daemon = True
self.streams.append(thread)
thread.start()
def start_hashtag_stream(self, hashtag):
listener = self.HashtagStreamListener(hashtag)
thread = threading.Thread(
target=self.mastodon.stream_hashtag,
args=(hashtag, listener),
kwargs={'run_async': False}
)
thread.daemon = True
self.streams.append(thread)
thread.start()
class UserStreamListener(StreamListener):
def on_notification(self, notification):
print(f"🔔 {notification['type']}: {notification['account']['acct']}")
class HashtagStreamListener(StreamListener):
def __init__(self, hashtag):
self.hashtag = hashtag
def on_update(self, status):
print(f"#{self.hashtag}: New post from {status['account']['acct']}")
# Usage
mastodon = Mastodon(
access_token='your_token',
api_base_url='https://mastodon.social'
)
manager = MultiStreamManager(mastodon)
manager.start_user_stream()
manager.start_hashtag_stream("opensource")
manager.start_hashtag_stream("python")
print("Multiple streams running...")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Shutting down streams...")from mastodon import Mastodon, StreamListener
import time
class ReliableStreamListener(StreamListener):
def __init__(self, mastodon_client):
self.mastodon = mastodon_client
self.last_message = time.time()
def on_update(self, status):
self.last_message = time.time()
print(f"Update: {status['account']['acct']}")
def on_notification(self, notification):
self.last_message = time.time()
print(f"Notification: {notification['type']}")
def on_abort(self, err):
print(f"Stream error: {err}")
self.reconnect_if_needed()
def reconnect_if_needed(self):
if self.mastodon.stream_healthy():
print("Streaming API is healthy, reconnecting...")
time.sleep(5)
self.start_stream()
else:
print("Streaming API is unhealthy, waiting...")
time.sleep(30)
self.reconnect_if_needed()
def start_stream(self):
try:
self.mastodon.stream_user(self)
except Exception as e:
print(f"Failed to start stream: {e}")
self.reconnect_if_needed()
# Usage with health monitoring
mastodon = Mastodon(
access_token='your_token',
api_base_url='https://mastodon.social'
)
# Check if streaming is available
if mastodon.stream_healthy():
print("Starting reliable stream...")
listener = ReliableStreamListener(mastodon)
listener.start_stream()
else:
print("Streaming API is currently unavailable")# Stream notification types
NOTIFICATION_TYPES = [
'mention', # Mentioned in a status
'status', # Someone you follow posted
'reblog', # Your status was reblogged
'follow', # Someone followed you
'follow_request', # Someone requested to follow you
'favourite', # Your status was favorited
'poll', # Poll you voted in or created has ended
'update', # Status you interacted with was edited
'admin.sign_up', # New user signed up (admin only)
'admin.report', # New report submitted (admin only)
]
# Stream event types
STREAM_EVENTS = [
'update', # New status
'delete', # Status deleted
'notification', # New notification
'filters_changed', # Content filters updated
'conversation', # Direct message
'announcement', # Instance announcement
'announcement_reaction', # Announcement reaction
'announcement_delete', # Announcement deleted
'status_update', # Status edited
'encrypted_message', # Encrypted message (unused)
]
# Stream listener event mapping
StreamListener.__EVENT_NAME_TO_TYPE = {
"update": dict, # Status object
"delete": int, # Status ID
"notification": dict, # Notification object
"filters_changed": None, # No payload
"conversation": dict, # Conversation object
"announcement": dict, # Announcement object
"announcement_reaction": dict, # Reaction object
"announcement_delete": int, # Announcement ID
"status_update": dict, # Updated status object
"encrypted_message": dict, # Encrypted data
}Install with Tessl CLI
npx tessl i tessl/pypi-mastodon-py