0
# Real-time Streaming
1
2
Firehose clients for consuming live AT Protocol data streams including repository updates and labeling events. The streaming functionality provides both synchronous and asynchronous interfaces for processing real-time data from the AT Protocol network.
3
4
## Capabilities
5
6
### Repository Streaming
7
8
Stream real-time repository updates including posts, follows, likes, and other record operations from the AT Protocol network.
9
10
#### Synchronous Repository Client
11
12
```python { .api }
13
class FirehoseSubscribeReposClient:
14
"""
15
Synchronous client for repository event streaming.
16
17
Provides real-time access to repository operations across the AT Protocol network.
18
"""
19
def __init__(self, base_url: str = 'wss://bsky.network', *args, **kwargs):
20
"""
21
Initialize the repository streaming client.
22
23
Args:
24
base_url (str): WebSocket endpoint for firehose (default: Bluesky firehose)
25
*args, **kwargs: Additional client configuration
26
"""
27
28
def start(self, cursor: Optional[int] = None) -> Iterator['SubscribeReposMessage']:
29
"""
30
Start streaming repository events.
31
32
Args:
33
cursor (int, optional): Start from specific sequence number
34
35
Yields:
36
SubscribeReposMessage: Repository event messages
37
38
Raises:
39
NetworkError: If connection fails
40
StreamError: If stream processing fails
41
"""
42
43
def stop(self):
44
"""Stop the streaming connection."""
45
46
def get_cursor(self) -> Optional[int]:
47
"""
48
Get current stream cursor position.
49
50
Returns:
51
Optional[int]: Current cursor or None if not started
52
"""
53
```
54
55
#### Asynchronous Repository Client
56
57
```python { .api }
58
class AsyncFirehoseSubscribeReposClient:
59
"""
60
Asynchronous client for repository event streaming.
61
62
Async version of repository firehose client for non-blocking operations.
63
"""
64
def __init__(self, base_url: str = 'wss://bsky.network', *args, **kwargs):
65
"""
66
Initialize the async repository streaming client.
67
68
Args:
69
base_url (str): WebSocket endpoint for firehose
70
*args, **kwargs: Additional client configuration
71
"""
72
73
async def start(self, cursor: Optional[int] = None) -> AsyncIterator['SubscribeReposMessage']:
74
"""
75
Start streaming repository events asynchronously.
76
77
Args:
78
cursor (int, optional): Start from specific sequence number
79
80
Yields:
81
SubscribeReposMessage: Repository event messages
82
"""
83
84
async def stop(self):
85
"""Stop the streaming connection asynchronously."""
86
87
async def close(self):
88
"""Close the async client connection."""
89
```
90
91
Usage examples:
92
93
```python
94
from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message
95
96
# Synchronous streaming
97
client = FirehoseSubscribeReposClient()
98
99
try:
100
for message in client.start():
101
# Parse the message
102
parsed = parse_subscribe_repos_message(message)
103
104
# Handle different message types
105
if parsed.commit:
106
print(f"Repository update: {parsed.commit.repo}")
107
for op in parsed.commit.ops:
108
print(f" Operation: {op.action} {op.path}")
109
elif parsed.handle:
110
print(f"Handle update: {parsed.handle.did} -> {parsed.handle.handle}")
111
elif parsed.info:
112
print(f"Stream info: {parsed.info.name}")
113
elif parsed.error:
114
print(f"Stream error: {parsed.error.error} - {parsed.error.message}")
115
116
except KeyboardInterrupt:
117
print("Stopping stream...")
118
finally:
119
client.stop()
120
```
121
122
```python
123
import asyncio
124
from atproto import AsyncFirehoseSubscribeReposClient, parse_subscribe_repos_message
125
126
async def stream_repos():
127
client = AsyncFirehoseSubscribeReposClient()
128
129
try:
130
async for message in client.start():
131
parsed = parse_subscribe_repos_message(message)
132
133
if parsed.commit and parsed.commit.ops:
134
for op in parsed.commit.ops:
135
if op.action == 'create' and 'app.bsky.feed.post' in op.path:
136
print(f"New post created: {op.path}")
137
138
except Exception as e:
139
print(f"Stream error: {e}")
140
finally:
141
await client.close()
142
143
# Run the async stream
144
asyncio.run(stream_repos())
145
```
146
147
### Label Streaming
148
149
Stream real-time labeling events for content moderation and filtering across the AT Protocol network.
150
151
#### Synchronous Labels Client
152
153
```python { .api }
154
class FirehoseSubscribeLabelsClient:
155
"""
156
Synchronous client for label event streaming.
157
158
Provides real-time access to labeling events for content moderation.
159
"""
160
def __init__(self, base_url: str = 'wss://bsky.network', *args, **kwargs):
161
"""
162
Initialize the labels streaming client.
163
164
Args:
165
base_url (str): WebSocket endpoint for label firehose
166
*args, **kwargs: Additional client configuration
167
"""
168
169
def start(self, cursor: Optional[int] = None) -> Iterator['SubscribeLabelsMessage']:
170
"""
171
Start streaming label events.
172
173
Args:
174
cursor (int, optional): Start from specific sequence number
175
176
Yields:
177
SubscribeLabelsMessage: Label event messages
178
"""
179
180
def stop(self):
181
"""Stop the streaming connection."""
182
```
183
184
#### Asynchronous Labels Client
185
186
```python { .api }
187
class AsyncFirehoseSubscribeLabelsClient:
188
"""
189
Asynchronous client for label event streaming.
190
191
Async version of labels firehose client.
192
"""
193
def __init__(self, base_url: str = 'wss://bsky.network', *args, **kwargs):
194
"""
195
Initialize the async labels streaming client.
196
197
Args:
198
base_url (str): WebSocket endpoint for label firehose
199
*args, **kwargs: Additional client configuration
200
"""
201
202
async def start(self, cursor: Optional[int] = None) -> AsyncIterator['SubscribeLabelsMessage']:
203
"""
204
Start streaming label events asynchronously.
205
206
Args:
207
cursor (int, optional): Start from specific sequence number
208
209
Yields:
210
SubscribeLabelsMessage: Label event messages
211
"""
212
213
async def stop(self):
214
"""Stop the streaming connection asynchronously."""
215
216
async def close(self):
217
"""Close the async client connection."""
218
```
219
220
Usage example:
221
222
```python
223
from atproto import FirehoseSubscribeLabelsClient, parse_subscribe_labels_message
224
225
# Stream label events
226
client = FirehoseSubscribeLabelsClient()
227
228
try:
229
for message in client.start():
230
parsed = parse_subscribe_labels_message(message)
231
232
if parsed.labels:
233
for label in parsed.labels:
234
print(f"Label applied: {label.val} to {label.uri}")
235
if label.neg:
236
print(" (Negative label - removes previous label)")
237
if label.exp:
238
print(f" Expires: {label.exp}")
239
240
except KeyboardInterrupt:
241
print("Stopping label stream...")
242
finally:
243
client.stop()
244
```
245
246
### Message Parsing
247
248
Parse and process firehose messages for both repository and label streams.
249
250
#### Repository Message Parsing
251
252
```python { .api }
253
def parse_subscribe_repos_message(message: 'MessageFrame') -> 'SubscribeReposMessage':
254
"""
255
Parse repository subscription message.
256
257
Args:
258
message (MessageFrame): Raw message frame from firehose
259
260
Returns:
261
SubscribeReposMessage: Parsed message with typed content
262
263
Raises:
264
MessageParsingError: If message format is invalid
265
"""
266
267
# Repository message types
268
class SubscribeReposMessage:
269
"""Union type for repository stream messages."""
270
commit: Optional['RepoCommit'] # Repository commit with operations
271
handle: Optional['HandleUpdate'] # Handle change notification
272
migrate: Optional['RepoMigrate'] # Repository migration
273
tombstone: Optional['RepoTombstone'] # Repository deletion
274
info: Optional['InfoMessage'] # Stream information
275
error: Optional['ErrorMessage'] # Stream error
276
277
class RepoCommit:
278
"""Repository commit with operations."""
279
seq: int # Sequence number
280
rebase: bool # Whether this is a rebase
281
too_big: bool # Whether commit was too large
282
repo: str # Repository DID
283
commit: CID # Commit CID
284
prev: Optional[CID] # Previous commit CID
285
rev: str # Repository revision
286
since: Optional[str] # Since parameter
287
blocks: bytes # CAR blocks
288
ops: List['RepoOperation'] # Repository operations
289
blobs: List[CID] # Referenced blobs
290
time: str # Timestamp
291
292
class RepoOperation:
293
"""Individual repository operation."""
294
action: str # 'create', 'update', 'delete'
295
path: str # Record path
296
cid: Optional[CID] # Record CID (create/update)
297
```
298
299
#### Label Message Parsing
300
301
```python { .api }
302
def parse_subscribe_labels_message(message: 'MessageFrame') -> 'SubscribeLabelsMessage':
303
"""
304
Parse label subscription message.
305
306
Args:
307
message (MessageFrame): Raw message frame from firehose
308
309
Returns:
310
SubscribeLabelsMessage: Parsed message with typed content
311
312
Raises:
313
MessageParsingError: If message format is invalid
314
"""
315
316
# Label message types
317
class SubscribeLabelsMessage:
318
"""Union type for label stream messages."""
319
labels: Optional['Labels'] # Label operations
320
info: Optional['InfoMessage'] # Stream information
321
error: Optional['ErrorMessage'] # Stream error
322
323
class Labels:
324
"""Label operations message."""
325
seq: int # Sequence number
326
labels: List['Label'] # Label operations
327
328
class Label:
329
"""Individual label operation."""
330
src: str # Label source DID
331
uri: str # Labeled content URI
332
cid: Optional[str] # Content CID
333
val: str # Label value
334
neg: Optional[bool] # Negative label (removal)
335
cts: str # Creation timestamp
336
exp: Optional[str] # Expiration timestamp
337
sig: Optional[bytes] # Label signature
338
```
339
340
Advanced parsing example:
341
342
```python
343
from atproto import (
344
FirehoseSubscribeReposClient,
345
parse_subscribe_repos_message,
346
CAR
347
)
348
349
client = FirehoseSubscribeReposClient()
350
351
def process_repository_commit(commit):
352
"""Process a repository commit with detailed operation handling."""
353
print(f"Processing commit {commit.seq} from {commit.repo}")
354
355
# Parse CAR blocks if needed
356
if commit.blocks:
357
try:
358
car = CAR.from_bytes(commit.blocks)
359
print(f" Commit contains {len(car.blocks)} blocks")
360
except Exception as e:
361
print(f" Could not parse CAR blocks: {e}")
362
363
# Process each operation
364
for op in commit.ops:
365
if op.action == 'create':
366
if 'app.bsky.feed.post' in op.path:
367
print(f" π New post: {op.path}")
368
elif 'app.bsky.feed.like' in op.path:
369
print(f" β€οΈ New like: {op.path}")
370
elif 'app.bsky.graph.follow' in op.path:
371
print(f" π₯ New follow: {op.path}")
372
373
elif op.action == 'delete':
374
print(f" ποΈ Deleted: {op.path}")
375
376
# Check for referenced blobs (images, videos, etc.)
377
if commit.blobs:
378
print(f" π Contains {len(commit.blobs)} blobs")
379
380
# Stream and process commits
381
try:
382
for message in client.start():
383
parsed = parse_subscribe_repos_message(message)
384
385
if parsed.commit:
386
process_repository_commit(parsed.commit)
387
elif parsed.error:
388
print(f"β Stream error: {parsed.error.message}")
389
break
390
391
except KeyboardInterrupt:
392
print("Stream stopped by user")
393
finally:
394
client.stop()
395
```
396
397
### Firehose Models
398
399
Core data models for firehose streaming operations.
400
401
```python { .api }
402
# Message frame types
403
class FrameType(Enum):
404
"""Frame types for firehose messages."""
405
MESSAGE = 1 # Regular message
406
ERROR = -1 # Error message
407
408
class MessageFrameHeader:
409
"""Header structure for firehose messages."""
410
op: int # Operation code
411
t: Optional[str] # Message type
412
413
class MessageFrame:
414
"""Complete message frame from firehose."""
415
header: MessageFrameHeader # Frame header
416
body: bytes # Message body (CBOR encoded)
417
418
# Stream control messages
419
class InfoMessage:
420
"""Stream information message."""
421
name: str # Stream name
422
message: Optional[str] # Info message
423
424
class ErrorMessage:
425
"""Stream error message."""
426
error: str # Error code
427
message: Optional[str] # Error description
428
```
429
430
### Error Handling
431
432
```python { .api }
433
class StreamError(Exception):
434
"""Base exception for streaming operations."""
435
436
class ConnectionError(StreamError):
437
"""Raised when connection to firehose fails."""
438
439
class MessageParsingError(StreamError):
440
"""Raised when message parsing fails."""
441
442
class StreamTimeoutError(StreamError):
443
"""Raised when stream times out."""
444
```
445
446
Robust streaming with error handling:
447
448
```python
449
from atproto import (
450
AsyncFirehoseSubscribeReposClient,
451
StreamError, ConnectionError, MessageParsingError
452
)
453
import asyncio
454
455
async def robust_streaming():
456
"""Example of robust streaming with reconnection logic."""
457
client = AsyncFirehoseSubscribeReposClient()
458
cursor = None
459
max_retries = 5
460
retry_count = 0
461
462
while retry_count < max_retries:
463
try:
464
print(f"Starting stream (attempt {retry_count + 1})")
465
466
async for message in client.start(cursor=cursor):
467
try:
468
parsed = parse_subscribe_repos_message(message)
469
470
if parsed.commit:
471
# Update cursor for reconnection
472
cursor = parsed.commit.seq
473
# Process commit...
474
475
elif parsed.error:
476
print(f"Stream error: {parsed.error.message}")
477
break
478
479
except MessageParsingError as e:
480
print(f"Failed to parse message: {e}")
481
continue # Skip malformed messages
482
483
except ConnectionError as e:
484
print(f"Connection failed: {e}")
485
retry_count += 1
486
if retry_count < max_retries:
487
wait_time = min(2 ** retry_count, 60) # Exponential backoff
488
print(f"Retrying in {wait_time} seconds...")
489
await asyncio.sleep(wait_time)
490
else:
491
print("Max retries exceeded")
492
break
493
494
except Exception as e:
495
print(f"Unexpected error: {e}")
496
break
497
498
await client.close()
499
500
# Run robust streaming
501
asyncio.run(robust_streaming())
502
```