or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

accounts.mdauthentication.mdindex.mdsearch.mdstatuses.mdstreaming.md

streaming.mddocs/

0

# Real-time Streaming

1

2

WebSocket-based streaming for real-time updates from timelines, notifications, and user events. Enables applications to receive live updates without polling, providing immediate notification of new posts, mentions, and other activities.

3

4

## Capabilities

5

6

### Stream Listeners

7

8

Base classes for handling streaming events with customizable callback methods.

9

10

```python { .api }

11

class StreamListener:

12

"""

13

Base class for handling streaming events.

14

Override methods for events you want to handle.

15

"""

16

17

def on_update(self, status: dict):

18

"""

19

Handle new status updates.

20

21

Args:

22

status: Status dictionary for new post

23

"""

24

pass

25

26

def on_notification(self, notification: dict):

27

"""

28

Handle new notifications.

29

30

Args:

31

notification: Notification dictionary

32

"""

33

pass

34

35

def on_delete(self, status_id: int):

36

"""

37

Handle status deletions.

38

39

Args:

40

status_id: ID of deleted status

41

"""

42

pass

43

44

def on_filters_changed(self):

45

"""

46

Handle filter updates (user changed content filters).

47

No payload - refetch filters if needed.

48

"""

49

pass

50

51

def on_conversation(self, conversation: dict):

52

"""

53

Handle direct message conversations.

54

55

Args:

56

conversation: Conversation dictionary

57

"""

58

pass

59

60

def on_announcement(self, announcement: dict):

61

"""

62

Handle instance announcements.

63

64

Args:

65

announcement: Announcement dictionary

66

"""

67

pass

68

69

def on_announcement_reaction(self, reaction: dict):

70

"""

71

Handle reactions to announcements.

72

73

Args:

74

reaction: Reaction dictionary

75

"""

76

pass

77

78

def on_announcement_delete(self, announcement_id: int):

79

"""

80

Handle announcement deletions.

81

82

Args:

83

announcement_id: ID of deleted announcement

84

"""

85

pass

86

87

def on_status_update(self, status: dict):

88

"""

89

Handle status edits.

90

91

Args:

92

status: Updated status dictionary

93

"""

94

pass

95

96

def on_encrypted_message(self, data: dict):

97

"""

98

Handle encrypted messages (currently unused).

99

100

Args:

101

data: Encrypted message data

102

"""

103

pass

104

105

def on_abort(self, err: Exception):

106

"""

107

Handle connection errors and stream failures.

108

109

Args:

110

err: Exception that caused the abort

111

"""

112

pass

113

114

def on_unknown_event(self, name: str, unknown_event: dict = None):

115

"""

116

Handle unknown event types.

117

118

Args:

119

name: Event name

120

unknown_event: Raw event data

121

"""

122

pass

123

124

class CallbackStreamListener(StreamListener):

125

"""

126

Stream listener that uses callback functions instead of inheritance.

127

"""

128

129

def __init__(self, **callbacks):

130

"""

131

Initialize with callback functions.

132

133

Args:

134

**callbacks: Callback functions (on_update=func, on_notification=func, etc.)

135

"""

136

pass

137

```

138

139

### Stream Endpoints

140

141

Connect to various streaming endpoints for different types of real-time data.

142

143

```python { .api }

144

def stream_user(

145

self,

146

listener: StreamListener,

147

run_async: bool = False,

148

timeout: int = 300,

149

reconnect_async: bool = False,

150

reconnect_async_wait_sec: int = 5

151

):

152

"""

153

Stream the authenticated user's timeline and notifications.

154

155

Args:

156

listener: StreamListener instance to handle events

157

run_async: Run stream in background thread

158

timeout: Connection timeout in seconds

159

reconnect_async: Automatically reconnect on failure

160

reconnect_async_wait_sec: Wait time between reconnection attempts

161

"""

162

163

def stream_public(

164

self,

165

listener: StreamListener,

166

run_async: bool = False,

167

timeout: int = 300,

168

reconnect_async: bool = False,

169

reconnect_async_wait_sec: int = 5

170

):

171

"""

172

Stream the public timeline.

173

174

Args:

175

listener: StreamListener instance to handle events

176

run_async: Run stream in background thread

177

timeout: Connection timeout in seconds

178

reconnect_async: Automatically reconnect on failure

179

reconnect_async_wait_sec: Wait time between reconnection attempts

180

"""

181

182

def stream_local(

183

self,

184

listener: StreamListener,

185

run_async: bool = False,

186

timeout: int = 300,

187

reconnect_async: bool = False,

188

reconnect_async_wait_sec: int = 5

189

):

190

"""

191

Stream the local instance timeline (deprecated).

192

193

Args:

194

listener: StreamListener instance to handle events

195

run_async: Run stream in background thread

196

timeout: Connection timeout in seconds

197

reconnect_async: Automatically reconnect on failure

198

reconnect_async_wait_sec: Wait time between reconnection attempts

199

"""

200

201

def stream_hashtag(

202

self,

203

tag: str,

204

listener: StreamListener,

205

local: bool = False,

206

run_async: bool = False,

207

timeout: int = 300,

208

reconnect_async: bool = False,

209

reconnect_async_wait_sec: int = 5

210

):

211

"""

212

Stream posts containing a specific hashtag.

213

214

Args:

215

tag: Hashtag to stream (without # symbol)

216

listener: StreamListener instance to handle events

217

local: Only stream from local instance

218

run_async: Run stream in background thread

219

timeout: Connection timeout in seconds

220

reconnect_async: Automatically reconnect on failure

221

reconnect_async_wait_sec: Wait time between reconnection attempts

222

"""

223

224

def stream_list(

225

self,

226

id: int,

227

listener: StreamListener,

228

run_async: bool = False,

229

timeout: int = 300,

230

reconnect_async: bool = False,

231

reconnect_async_wait_sec: int = 5

232

):

233

"""

234

Stream posts from a specific list.

235

236

Args:

237

id: List ID to stream

238

listener: StreamListener instance to handle events

239

run_async: Run stream in background thread

240

timeout: Connection timeout in seconds

241

reconnect_async: Automatically reconnect on failure

242

reconnect_async_wait_sec: Wait time between reconnection attempts

243

"""

244

245

def stream_direct(

246

self,

247

listener: StreamListener,

248

run_async: bool = False,

249

timeout: int = 300,

250

reconnect_async: bool = False,

251

reconnect_async_wait_sec: int = 5

252

):

253

"""

254

Stream direct messages.

255

256

Args:

257

listener: StreamListener instance to handle events

258

run_async: Run stream in background thread

259

timeout: Connection timeout in seconds

260

reconnect_async: Automatically reconnect on failure

261

reconnect_async_wait_sec: Wait time between reconnection attempts

262

"""

263

```

264

265

### Stream Health Monitoring

266

267

Check streaming API availability and health status.

268

269

```python { .api }

270

def stream_healthy(self) -> bool:

271

"""

272

Check if the streaming API is available and healthy.

273

274

Returns:

275

True if streaming is available, False otherwise

276

"""

277

```

278

279

## Usage Examples

280

281

### Basic Stream Listener

282

283

```python

284

from mastodon import Mastodon, StreamListener

285

286

class MyStreamListener(StreamListener):

287

def on_update(self, status):

288

print(f"New post from {status['account']['acct']}: {status['content']}")

289

290

def on_notification(self, notification):

291

print(f"Notification: {notification['type']} from {notification['account']['acct']}")

292

293

def on_delete(self, status_id):

294

print(f"Status {status_id} was deleted")

295

296

def on_abort(self, err):

297

print(f"Stream error: {err}")

298

299

# Set up the client and listener

300

mastodon = Mastodon(

301

access_token='your_token',

302

api_base_url='https://mastodon.social'

303

)

304

305

listener = MyStreamListener()

306

307

# Start streaming user timeline

308

print("Starting user stream...")

309

mastodon.stream_user(listener)

310

```

311

312

### Callback-Based Streaming

313

314

```python

315

from mastodon import Mastodon, CallbackStreamListener

316

317

def handle_update(status):

318

print(f"πŸ“ {status['account']['acct']}: {status['content'][:50]}...")

319

320

def handle_notification(notification):

321

account = notification['account']['acct']

322

notif_type = notification['type']

323

print(f"πŸ”” {notif_type} from {account}")

324

325

def handle_error(err):

326

print(f"❌ Stream error: {err}")

327

328

# Create callback listener

329

listener = CallbackStreamListener(

330

on_update=handle_update,

331

on_notification=handle_notification,

332

on_abort=handle_error

333

)

334

335

mastodon = Mastodon(

336

access_token='your_token',

337

api_base_url='https://mastodon.social'

338

)

339

340

# Stream with callbacks

341

mastodon.stream_user(listener)

342

```

343

344

### Asynchronous Streaming

345

346

```python

347

import threading

348

import time

349

from mastodon import Mastodon, StreamListener

350

351

class AsyncStreamListener(StreamListener):

352

def __init__(self):

353

self.running = True

354

self.message_count = 0

355

356

def on_update(self, status):

357

self.message_count += 1

358

print(f"Message #{self.message_count}: {status['account']['acct']}")

359

360

def on_abort(self, err):

361

print(f"Stream disconnected: {err}")

362

if self.running:

363

print("Attempting to reconnect...")

364

365

mastodon = Mastodon(

366

access_token='your_token',

367

api_base_url='https://mastodon.social'

368

)

369

370

listener = AsyncStreamListener()

371

372

# Start stream in background with auto-reconnect

373

print("Starting async stream with auto-reconnect...")

374

mastodon.stream_user(

375

listener,

376

run_async=True,

377

reconnect_async=True,

378

reconnect_async_wait_sec=10

379

)

380

381

# Do other work while streaming runs in background

382

try:

383

while True:

384

print("Main thread doing other work...")

385

time.sleep(30)

386

except KeyboardInterrupt:

387

listener.running = False

388

print("Stopping stream...")

389

```

390

391

### Hashtag and List Streaming

392

393

```python

394

from mastodon import Mastodon, StreamListener

395

396

class HashtagListener(StreamListener):

397

def __init__(self, hashtag):

398

self.hashtag = hashtag

399

400

def on_update(self, status):

401

# Filter out reblogs for cleaner output

402

if status.get('reblog') is None:

403

print(f"#{self.hashtag}: {status['account']['acct']} - {status['content'][:100]}...")

404

405

mastodon = Mastodon(

406

access_token='your_token',

407

api_base_url='https://mastodon.social'

408

)

409

410

# Stream a specific hashtag

411

hashtag_listener = HashtagListener("python")

412

print("Streaming #python hashtag...")

413

mastodon.stream_hashtag("python", hashtag_listener, local=False)

414

415

# Alternative: Stream from a list

416

# Get your lists first

417

# lists = mastodon.lists()

418

# if lists:

419

# list_listener = StreamListener()

420

# mastodon.stream_list(lists[0]['id'], list_listener)

421

```

422

423

### Multi-Stream Manager

424

425

```python

426

import threading

427

from mastodon import Mastodon, StreamListener

428

429

class MultiStreamManager:

430

def __init__(self, mastodon_client):

431

self.mastodon = mastodon_client

432

self.streams = []

433

self.running = False

434

435

def start_user_stream(self):

436

listener = self.UserStreamListener()

437

thread = threading.Thread(

438

target=self.mastodon.stream_user,

439

args=(listener,),

440

kwargs={'run_async': False}

441

)

442

thread.daemon = True

443

self.streams.append(thread)

444

thread.start()

445

446

def start_hashtag_stream(self, hashtag):

447

listener = self.HashtagStreamListener(hashtag)

448

thread = threading.Thread(

449

target=self.mastodon.stream_hashtag,

450

args=(hashtag, listener),

451

kwargs={'run_async': False}

452

)

453

thread.daemon = True

454

self.streams.append(thread)

455

thread.start()

456

457

class UserStreamListener(StreamListener):

458

def on_notification(self, notification):

459

print(f"πŸ”” {notification['type']}: {notification['account']['acct']}")

460

461

class HashtagStreamListener(StreamListener):

462

def __init__(self, hashtag):

463

self.hashtag = hashtag

464

465

def on_update(self, status):

466

print(f"#{self.hashtag}: New post from {status['account']['acct']}")

467

468

# Usage

469

mastodon = Mastodon(

470

access_token='your_token',

471

api_base_url='https://mastodon.social'

472

)

473

474

manager = MultiStreamManager(mastodon)

475

manager.start_user_stream()

476

manager.start_hashtag_stream("opensource")

477

manager.start_hashtag_stream("python")

478

479

print("Multiple streams running...")

480

try:

481

while True:

482

time.sleep(1)

483

except KeyboardInterrupt:

484

print("Shutting down streams...")

485

```

486

487

### Stream Health Monitoring

488

489

```python

490

from mastodon import Mastodon, StreamListener

491

import time

492

493

class ReliableStreamListener(StreamListener):

494

def __init__(self, mastodon_client):

495

self.mastodon = mastodon_client

496

self.last_message = time.time()

497

498

def on_update(self, status):

499

self.last_message = time.time()

500

print(f"Update: {status['account']['acct']}")

501

502

def on_notification(self, notification):

503

self.last_message = time.time()

504

print(f"Notification: {notification['type']}")

505

506

def on_abort(self, err):

507

print(f"Stream error: {err}")

508

self.reconnect_if_needed()

509

510

def reconnect_if_needed(self):

511

if self.mastodon.stream_healthy():

512

print("Streaming API is healthy, reconnecting...")

513

time.sleep(5)

514

self.start_stream()

515

else:

516

print("Streaming API is unhealthy, waiting...")

517

time.sleep(30)

518

self.reconnect_if_needed()

519

520

def start_stream(self):

521

try:

522

self.mastodon.stream_user(self)

523

except Exception as e:

524

print(f"Failed to start stream: {e}")

525

self.reconnect_if_needed()

526

527

# Usage with health monitoring

528

mastodon = Mastodon(

529

access_token='your_token',

530

api_base_url='https://mastodon.social'

531

)

532

533

# Check if streaming is available

534

if mastodon.stream_healthy():

535

print("Starting reliable stream...")

536

listener = ReliableStreamListener(mastodon)

537

listener.start_stream()

538

else:

539

print("Streaming API is currently unavailable")

540

```

541

542

## Types

543

544

```python { .api }

545

# Stream notification types

546

NOTIFICATION_TYPES = [

547

'mention', # Mentioned in a status

548

'status', # Someone you follow posted

549

'reblog', # Your status was reblogged

550

'follow', # Someone followed you

551

'follow_request', # Someone requested to follow you

552

'favourite', # Your status was favorited

553

'poll', # Poll you voted in or created has ended

554

'update', # Status you interacted with was edited

555

'admin.sign_up', # New user signed up (admin only)

556

'admin.report', # New report submitted (admin only)

557

]

558

559

# Stream event types

560

STREAM_EVENTS = [

561

'update', # New status

562

'delete', # Status deleted

563

'notification', # New notification

564

'filters_changed', # Content filters updated

565

'conversation', # Direct message

566

'announcement', # Instance announcement

567

'announcement_reaction', # Announcement reaction

568

'announcement_delete', # Announcement deleted

569

'status_update', # Status edited

570

'encrypted_message', # Encrypted message (unused)

571

]

572

573

# Stream listener event mapping

574

StreamListener.__EVENT_NAME_TO_TYPE = {

575

"update": dict, # Status object

576

"delete": int, # Status ID

577

"notification": dict, # Notification object

578

"filters_changed": None, # No payload

579

"conversation": dict, # Conversation object

580

"announcement": dict, # Announcement object

581

"announcement_reaction": dict, # Reaction object

582

"announcement_delete": int, # Announcement ID

583

"status_update": dict, # Updated status object

584

"encrypted_message": dict, # Encrypted data

585

}

586

```