0
# Real-time Streaming
1
2
WebSocket-based streaming for real-time updates from timelines, notifications, and user events. Enables applications to receive live updates without polling, providing immediate notification of new posts, mentions, and other activities.
3
4
## Capabilities
5
6
### Stream Listeners
7
8
Base classes for handling streaming events with customizable callback methods.
9
10
```python { .api }
11
class StreamListener:
12
"""
13
Base class for handling streaming events.
14
Override methods for events you want to handle.
15
"""
16
17
def on_update(self, status: dict):
18
"""
19
Handle new status updates.
20
21
Args:
22
status: Status dictionary for new post
23
"""
24
pass
25
26
def on_notification(self, notification: dict):
27
"""
28
Handle new notifications.
29
30
Args:
31
notification: Notification dictionary
32
"""
33
pass
34
35
def on_delete(self, status_id: int):
36
"""
37
Handle status deletions.
38
39
Args:
40
status_id: ID of deleted status
41
"""
42
pass
43
44
def on_filters_changed(self):
45
"""
46
Handle filter updates (user changed content filters).
47
No payload - refetch filters if needed.
48
"""
49
pass
50
51
def on_conversation(self, conversation: dict):
52
"""
53
Handle direct message conversations.
54
55
Args:
56
conversation: Conversation dictionary
57
"""
58
pass
59
60
def on_announcement(self, announcement: dict):
61
"""
62
Handle instance announcements.
63
64
Args:
65
announcement: Announcement dictionary
66
"""
67
pass
68
69
def on_announcement_reaction(self, reaction: dict):
70
"""
71
Handle reactions to announcements.
72
73
Args:
74
reaction: Reaction dictionary
75
"""
76
pass
77
78
def on_announcement_delete(self, announcement_id: int):
79
"""
80
Handle announcement deletions.
81
82
Args:
83
announcement_id: ID of deleted announcement
84
"""
85
pass
86
87
def on_status_update(self, status: dict):
88
"""
89
Handle status edits.
90
91
Args:
92
status: Updated status dictionary
93
"""
94
pass
95
96
def on_encrypted_message(self, data: dict):
97
"""
98
Handle encrypted messages (currently unused).
99
100
Args:
101
data: Encrypted message data
102
"""
103
pass
104
105
def on_abort(self, err: Exception):
106
"""
107
Handle connection errors and stream failures.
108
109
Args:
110
err: Exception that caused the abort
111
"""
112
pass
113
114
def on_unknown_event(self, name: str, unknown_event: dict = None):
115
"""
116
Handle unknown event types.
117
118
Args:
119
name: Event name
120
unknown_event: Raw event data
121
"""
122
pass
123
124
class CallbackStreamListener(StreamListener):
125
"""
126
Stream listener that uses callback functions instead of inheritance.
127
"""
128
129
def __init__(self, **callbacks):
130
"""
131
Initialize with callback functions.
132
133
Args:
134
**callbacks: Callback functions (on_update=func, on_notification=func, etc.)
135
"""
136
pass
137
```
138
139
### Stream Endpoints
140
141
Connect to various streaming endpoints for different types of real-time data.
142
143
```python { .api }
144
def stream_user(
145
self,
146
listener: StreamListener,
147
run_async: bool = False,
148
timeout: int = 300,
149
reconnect_async: bool = False,
150
reconnect_async_wait_sec: int = 5
151
):
152
"""
153
Stream the authenticated user's timeline and notifications.
154
155
Args:
156
listener: StreamListener instance to handle events
157
run_async: Run stream in background thread
158
timeout: Connection timeout in seconds
159
reconnect_async: Automatically reconnect on failure
160
reconnect_async_wait_sec: Wait time between reconnection attempts
161
"""
162
163
def stream_public(
164
self,
165
listener: StreamListener,
166
run_async: bool = False,
167
timeout: int = 300,
168
reconnect_async: bool = False,
169
reconnect_async_wait_sec: int = 5
170
):
171
"""
172
Stream the public timeline.
173
174
Args:
175
listener: StreamListener instance to handle events
176
run_async: Run stream in background thread
177
timeout: Connection timeout in seconds
178
reconnect_async: Automatically reconnect on failure
179
reconnect_async_wait_sec: Wait time between reconnection attempts
180
"""
181
182
def stream_local(
183
self,
184
listener: StreamListener,
185
run_async: bool = False,
186
timeout: int = 300,
187
reconnect_async: bool = False,
188
reconnect_async_wait_sec: int = 5
189
):
190
"""
191
Stream the local instance timeline (deprecated).
192
193
Args:
194
listener: StreamListener instance to handle events
195
run_async: Run stream in background thread
196
timeout: Connection timeout in seconds
197
reconnect_async: Automatically reconnect on failure
198
reconnect_async_wait_sec: Wait time between reconnection attempts
199
"""
200
201
def stream_hashtag(
202
self,
203
tag: str,
204
listener: StreamListener,
205
local: bool = False,
206
run_async: bool = False,
207
timeout: int = 300,
208
reconnect_async: bool = False,
209
reconnect_async_wait_sec: int = 5
210
):
211
"""
212
Stream posts containing a specific hashtag.
213
214
Args:
215
tag: Hashtag to stream (without # symbol)
216
listener: StreamListener instance to handle events
217
local: Only stream from local instance
218
run_async: Run stream in background thread
219
timeout: Connection timeout in seconds
220
reconnect_async: Automatically reconnect on failure
221
reconnect_async_wait_sec: Wait time between reconnection attempts
222
"""
223
224
def stream_list(
225
self,
226
id: int,
227
listener: StreamListener,
228
run_async: bool = False,
229
timeout: int = 300,
230
reconnect_async: bool = False,
231
reconnect_async_wait_sec: int = 5
232
):
233
"""
234
Stream posts from a specific list.
235
236
Args:
237
id: List ID to stream
238
listener: StreamListener instance to handle events
239
run_async: Run stream in background thread
240
timeout: Connection timeout in seconds
241
reconnect_async: Automatically reconnect on failure
242
reconnect_async_wait_sec: Wait time between reconnection attempts
243
"""
244
245
def stream_direct(
246
self,
247
listener: StreamListener,
248
run_async: bool = False,
249
timeout: int = 300,
250
reconnect_async: bool = False,
251
reconnect_async_wait_sec: int = 5
252
):
253
"""
254
Stream direct messages.
255
256
Args:
257
listener: StreamListener instance to handle events
258
run_async: Run stream in background thread
259
timeout: Connection timeout in seconds
260
reconnect_async: Automatically reconnect on failure
261
reconnect_async_wait_sec: Wait time between reconnection attempts
262
"""
263
```
264
265
### Stream Health Monitoring
266
267
Check streaming API availability and health status.
268
269
```python { .api }
270
def stream_healthy(self) -> bool:
271
"""
272
Check if the streaming API is available and healthy.
273
274
Returns:
275
True if streaming is available, False otherwise
276
"""
277
```
278
279
## Usage Examples
280
281
### Basic Stream Listener
282
283
```python
284
from mastodon import Mastodon, StreamListener
285
286
class MyStreamListener(StreamListener):
287
def on_update(self, status):
288
print(f"New post from {status['account']['acct']}: {status['content']}")
289
290
def on_notification(self, notification):
291
print(f"Notification: {notification['type']} from {notification['account']['acct']}")
292
293
def on_delete(self, status_id):
294
print(f"Status {status_id} was deleted")
295
296
def on_abort(self, err):
297
print(f"Stream error: {err}")
298
299
# Set up the client and listener
300
mastodon = Mastodon(
301
access_token='your_token',
302
api_base_url='https://mastodon.social'
303
)
304
305
listener = MyStreamListener()
306
307
# Start streaming user timeline
308
print("Starting user stream...")
309
mastodon.stream_user(listener)
310
```
311
312
### Callback-Based Streaming
313
314
```python
315
from mastodon import Mastodon, CallbackStreamListener
316
317
def handle_update(status):
318
print(f"π {status['account']['acct']}: {status['content'][:50]}...")
319
320
def handle_notification(notification):
321
account = notification['account']['acct']
322
notif_type = notification['type']
323
print(f"π {notif_type} from {account}")
324
325
def handle_error(err):
326
print(f"β Stream error: {err}")
327
328
# Create callback listener
329
listener = CallbackStreamListener(
330
on_update=handle_update,
331
on_notification=handle_notification,
332
on_abort=handle_error
333
)
334
335
mastodon = Mastodon(
336
access_token='your_token',
337
api_base_url='https://mastodon.social'
338
)
339
340
# Stream with callbacks
341
mastodon.stream_user(listener)
342
```
343
344
### Asynchronous Streaming
345
346
```python
347
import threading
348
import time
349
from mastodon import Mastodon, StreamListener
350
351
class AsyncStreamListener(StreamListener):
352
def __init__(self):
353
self.running = True
354
self.message_count = 0
355
356
def on_update(self, status):
357
self.message_count += 1
358
print(f"Message #{self.message_count}: {status['account']['acct']}")
359
360
def on_abort(self, err):
361
print(f"Stream disconnected: {err}")
362
if self.running:
363
print("Attempting to reconnect...")
364
365
mastodon = Mastodon(
366
access_token='your_token',
367
api_base_url='https://mastodon.social'
368
)
369
370
listener = AsyncStreamListener()
371
372
# Start stream in background with auto-reconnect
373
print("Starting async stream with auto-reconnect...")
374
mastodon.stream_user(
375
listener,
376
run_async=True,
377
reconnect_async=True,
378
reconnect_async_wait_sec=10
379
)
380
381
# Do other work while streaming runs in background
382
try:
383
while True:
384
print("Main thread doing other work...")
385
time.sleep(30)
386
except KeyboardInterrupt:
387
listener.running = False
388
print("Stopping stream...")
389
```
390
391
### Hashtag and List Streaming
392
393
```python
394
from mastodon import Mastodon, StreamListener
395
396
class HashtagListener(StreamListener):
397
def __init__(self, hashtag):
398
self.hashtag = hashtag
399
400
def on_update(self, status):
401
# Filter out reblogs for cleaner output
402
if status.get('reblog') is None:
403
print(f"#{self.hashtag}: {status['account']['acct']} - {status['content'][:100]}...")
404
405
mastodon = Mastodon(
406
access_token='your_token',
407
api_base_url='https://mastodon.social'
408
)
409
410
# Stream a specific hashtag
411
hashtag_listener = HashtagListener("python")
412
print("Streaming #python hashtag...")
413
mastodon.stream_hashtag("python", hashtag_listener, local=False)
414
415
# Alternative: Stream from a list
416
# Get your lists first
417
# lists = mastodon.lists()
418
# if lists:
419
# list_listener = StreamListener()
420
# mastodon.stream_list(lists[0]['id'], list_listener)
421
```
422
423
### Multi-Stream Manager
424
425
```python
426
import threading
427
from mastodon import Mastodon, StreamListener
428
429
class MultiStreamManager:
430
def __init__(self, mastodon_client):
431
self.mastodon = mastodon_client
432
self.streams = []
433
self.running = False
434
435
def start_user_stream(self):
436
listener = self.UserStreamListener()
437
thread = threading.Thread(
438
target=self.mastodon.stream_user,
439
args=(listener,),
440
kwargs={'run_async': False}
441
)
442
thread.daemon = True
443
self.streams.append(thread)
444
thread.start()
445
446
def start_hashtag_stream(self, hashtag):
447
listener = self.HashtagStreamListener(hashtag)
448
thread = threading.Thread(
449
target=self.mastodon.stream_hashtag,
450
args=(hashtag, listener),
451
kwargs={'run_async': False}
452
)
453
thread.daemon = True
454
self.streams.append(thread)
455
thread.start()
456
457
class UserStreamListener(StreamListener):
458
def on_notification(self, notification):
459
print(f"π {notification['type']}: {notification['account']['acct']}")
460
461
class HashtagStreamListener(StreamListener):
462
def __init__(self, hashtag):
463
self.hashtag = hashtag
464
465
def on_update(self, status):
466
print(f"#{self.hashtag}: New post from {status['account']['acct']}")
467
468
# Usage
469
mastodon = Mastodon(
470
access_token='your_token',
471
api_base_url='https://mastodon.social'
472
)
473
474
manager = MultiStreamManager(mastodon)
475
manager.start_user_stream()
476
manager.start_hashtag_stream("opensource")
477
manager.start_hashtag_stream("python")
478
479
print("Multiple streams running...")
480
try:
481
while True:
482
time.sleep(1)
483
except KeyboardInterrupt:
484
print("Shutting down streams...")
485
```
486
487
### Stream Health Monitoring
488
489
```python
490
from mastodon import Mastodon, StreamListener
491
import time
492
493
class ReliableStreamListener(StreamListener):
494
def __init__(self, mastodon_client):
495
self.mastodon = mastodon_client
496
self.last_message = time.time()
497
498
def on_update(self, status):
499
self.last_message = time.time()
500
print(f"Update: {status['account']['acct']}")
501
502
def on_notification(self, notification):
503
self.last_message = time.time()
504
print(f"Notification: {notification['type']}")
505
506
def on_abort(self, err):
507
print(f"Stream error: {err}")
508
self.reconnect_if_needed()
509
510
def reconnect_if_needed(self):
511
if self.mastodon.stream_healthy():
512
print("Streaming API is healthy, reconnecting...")
513
time.sleep(5)
514
self.start_stream()
515
else:
516
print("Streaming API is unhealthy, waiting...")
517
time.sleep(30)
518
self.reconnect_if_needed()
519
520
def start_stream(self):
521
try:
522
self.mastodon.stream_user(self)
523
except Exception as e:
524
print(f"Failed to start stream: {e}")
525
self.reconnect_if_needed()
526
527
# Usage with health monitoring
528
mastodon = Mastodon(
529
access_token='your_token',
530
api_base_url='https://mastodon.social'
531
)
532
533
# Check if streaming is available
534
if mastodon.stream_healthy():
535
print("Starting reliable stream...")
536
listener = ReliableStreamListener(mastodon)
537
listener.start_stream()
538
else:
539
print("Streaming API is currently unavailable")
540
```
541
542
## Types
543
544
```python { .api }
545
# Stream notification types
546
NOTIFICATION_TYPES = [
547
'mention', # Mentioned in a status
548
'status', # Someone you follow posted
549
'reblog', # Your status was reblogged
550
'follow', # Someone followed you
551
'follow_request', # Someone requested to follow you
552
'favourite', # Your status was favorited
553
'poll', # Poll you voted in or created has ended
554
'update', # Status you interacted with was edited
555
'admin.sign_up', # New user signed up (admin only)
556
'admin.report', # New report submitted (admin only)
557
]
558
559
# Stream event types
560
STREAM_EVENTS = [
561
'update', # New status
562
'delete', # Status deleted
563
'notification', # New notification
564
'filters_changed', # Content filters updated
565
'conversation', # Direct message
566
'announcement', # Instance announcement
567
'announcement_reaction', # Announcement reaction
568
'announcement_delete', # Announcement deleted
569
'status_update', # Status edited
570
'encrypted_message', # Encrypted message (unused)
571
]
572
573
# Stream listener event mapping
574
StreamListener.__EVENT_NAME_TO_TYPE = {
575
"update": dict, # Status object
576
"delete": int, # Status ID
577
"notification": dict, # Notification object
578
"filters_changed": None, # No payload
579
"conversation": dict, # Conversation object
580
"announcement": dict, # Announcement object
581
"announcement_reaction": dict, # Reaction object
582
"announcement_delete": int, # Announcement ID
583
"status_update": dict, # Updated status object
584
"encrypted_message": dict, # Encrypted data
585
}
586
```