Comprehensive Python SDK for the AT Protocol, providing client interfaces, authentication, and real-time streaming for decentralized social networks.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Firehose clients for consuming live AT Protocol data streams including repository updates and labeling events. The streaming functionality provides both synchronous and asynchronous interfaces for processing real-time data from the AT Protocol network.
Stream real-time repository updates including posts, follows, likes, and other record operations from the AT Protocol network.
class FirehoseSubscribeReposClient:
"""
Synchronous client for repository event streaming.
Provides real-time access to repository operations across the AT Protocol network.
"""
def __init__(self, base_url: str = 'wss://bsky.network', *args, **kwargs):
"""
Initialize the repository streaming client.
Args:
base_url (str): WebSocket endpoint for firehose (default: Bluesky firehose)
*args, **kwargs: Additional client configuration
"""
def start(self, cursor: Optional[int] = None) -> Iterator['SubscribeReposMessage']:
"""
Start streaming repository events.
Args:
cursor (int, optional): Start from specific sequence number
Yields:
SubscribeReposMessage: Repository event messages
Raises:
NetworkError: If connection fails
StreamError: If stream processing fails
"""
def stop(self):
"""Stop the streaming connection."""
def get_cursor(self) -> Optional[int]:
"""
Get current stream cursor position.
Returns:
Optional[int]: Current cursor or None if not started
"""class AsyncFirehoseSubscribeReposClient:
"""
Asynchronous client for repository event streaming.
Async version of repository firehose client for non-blocking operations.
"""
def __init__(self, base_url: str = 'wss://bsky.network', *args, **kwargs):
"""
Initialize the async repository streaming client.
Args:
base_url (str): WebSocket endpoint for firehose
*args, **kwargs: Additional client configuration
"""
async def start(self, cursor: Optional[int] = None) -> AsyncIterator['SubscribeReposMessage']:
"""
Start streaming repository events asynchronously.
Args:
cursor (int, optional): Start from specific sequence number
Yields:
SubscribeReposMessage: Repository event messages
"""
async def stop(self):
"""Stop the streaming connection asynchronously."""
async def close(self):
"""Close the async client connection."""Usage examples:
from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message
# Synchronous streaming
client = FirehoseSubscribeReposClient()
try:
for message in client.start():
# Parse the message
parsed = parse_subscribe_repos_message(message)
# Handle different message types
if parsed.commit:
print(f"Repository update: {parsed.commit.repo}")
for op in parsed.commit.ops:
print(f" Operation: {op.action} {op.path}")
elif parsed.handle:
print(f"Handle update: {parsed.handle.did} -> {parsed.handle.handle}")
elif parsed.info:
print(f"Stream info: {parsed.info.name}")
elif parsed.error:
print(f"Stream error: {parsed.error.error} - {parsed.error.message}")
except KeyboardInterrupt:
print("Stopping stream...")
finally:
client.stop()import asyncio
from atproto import AsyncFirehoseSubscribeReposClient, parse_subscribe_repos_message
async def stream_repos():
client = AsyncFirehoseSubscribeReposClient()
try:
async for message in client.start():
parsed = parse_subscribe_repos_message(message)
if parsed.commit and parsed.commit.ops:
for op in parsed.commit.ops:
if op.action == 'create' and 'app.bsky.feed.post' in op.path:
print(f"New post created: {op.path}")
except Exception as e:
print(f"Stream error: {e}")
finally:
await client.close()
# Run the async stream
asyncio.run(stream_repos())Stream real-time labeling events for content moderation and filtering across the AT Protocol network.
class FirehoseSubscribeLabelsClient:
"""
Synchronous client for label event streaming.
Provides real-time access to labeling events for content moderation.
"""
def __init__(self, base_url: str = 'wss://bsky.network', *args, **kwargs):
"""
Initialize the labels streaming client.
Args:
base_url (str): WebSocket endpoint for label firehose
*args, **kwargs: Additional client configuration
"""
def start(self, cursor: Optional[int] = None) -> Iterator['SubscribeLabelsMessage']:
"""
Start streaming label events.
Args:
cursor (int, optional): Start from specific sequence number
Yields:
SubscribeLabelsMessage: Label event messages
"""
def stop(self):
"""Stop the streaming connection."""class AsyncFirehoseSubscribeLabelsClient:
"""
Asynchronous client for label event streaming.
Async version of labels firehose client.
"""
def __init__(self, base_url: str = 'wss://bsky.network', *args, **kwargs):
"""
Initialize the async labels streaming client.
Args:
base_url (str): WebSocket endpoint for label firehose
*args, **kwargs: Additional client configuration
"""
async def start(self, cursor: Optional[int] = None) -> AsyncIterator['SubscribeLabelsMessage']:
"""
Start streaming label events asynchronously.
Args:
cursor (int, optional): Start from specific sequence number
Yields:
SubscribeLabelsMessage: Label event messages
"""
async def stop(self):
"""Stop the streaming connection asynchronously."""
async def close(self):
"""Close the async client connection."""Usage example:
from atproto import FirehoseSubscribeLabelsClient, parse_subscribe_labels_message
# Stream label events
client = FirehoseSubscribeLabelsClient()
try:
for message in client.start():
parsed = parse_subscribe_labels_message(message)
if parsed.labels:
for label in parsed.labels:
print(f"Label applied: {label.val} to {label.uri}")
if label.neg:
print(" (Negative label - removes previous label)")
if label.exp:
print(f" Expires: {label.exp}")
except KeyboardInterrupt:
print("Stopping label stream...")
finally:
client.stop()Parse and process firehose messages for both repository and label streams.
def parse_subscribe_repos_message(message: 'MessageFrame') -> 'SubscribeReposMessage':
"""
Parse repository subscription message.
Args:
message (MessageFrame): Raw message frame from firehose
Returns:
SubscribeReposMessage: Parsed message with typed content
Raises:
MessageParsingError: If message format is invalid
"""
# Repository message types
class SubscribeReposMessage:
"""Union type for repository stream messages."""
commit: Optional['RepoCommit'] # Repository commit with operations
handle: Optional['HandleUpdate'] # Handle change notification
migrate: Optional['RepoMigrate'] # Repository migration
tombstone: Optional['RepoTombstone'] # Repository deletion
info: Optional['InfoMessage'] # Stream information
error: Optional['ErrorMessage'] # Stream error
class RepoCommit:
"""Repository commit with operations."""
seq: int # Sequence number
rebase: bool # Whether this is a rebase
too_big: bool # Whether commit was too large
repo: str # Repository DID
commit: CID # Commit CID
prev: Optional[CID] # Previous commit CID
rev: str # Repository revision
since: Optional[str] # Since parameter
blocks: bytes # CAR blocks
ops: List['RepoOperation'] # Repository operations
blobs: List[CID] # Referenced blobs
time: str # Timestamp
class RepoOperation:
"""Individual repository operation."""
action: str # 'create', 'update', 'delete'
path: str # Record path
cid: Optional[CID] # Record CID (create/update)def parse_subscribe_labels_message(message: 'MessageFrame') -> 'SubscribeLabelsMessage':
"""
Parse label subscription message.
Args:
message (MessageFrame): Raw message frame from firehose
Returns:
SubscribeLabelsMessage: Parsed message with typed content
Raises:
MessageParsingError: If message format is invalid
"""
# Label message types
class SubscribeLabelsMessage:
"""Union type for label stream messages."""
labels: Optional['Labels'] # Label operations
info: Optional['InfoMessage'] # Stream information
error: Optional['ErrorMessage'] # Stream error
class Labels:
"""Label operations message."""
seq: int # Sequence number
labels: List['Label'] # Label operations
class Label:
"""Individual label operation."""
src: str # Label source DID
uri: str # Labeled content URI
cid: Optional[str] # Content CID
val: str # Label value
neg: Optional[bool] # Negative label (removal)
cts: str # Creation timestamp
exp: Optional[str] # Expiration timestamp
sig: Optional[bytes] # Label signatureAdvanced parsing example:
from atproto import (
FirehoseSubscribeReposClient,
parse_subscribe_repos_message,
CAR
)
client = FirehoseSubscribeReposClient()
def process_repository_commit(commit):
"""Process a repository commit with detailed operation handling."""
print(f"Processing commit {commit.seq} from {commit.repo}")
# Parse CAR blocks if needed
if commit.blocks:
try:
car = CAR.from_bytes(commit.blocks)
print(f" Commit contains {len(car.blocks)} blocks")
except Exception as e:
print(f" Could not parse CAR blocks: {e}")
# Process each operation
for op in commit.ops:
if op.action == 'create':
if 'app.bsky.feed.post' in op.path:
print(f" 📝 New post: {op.path}")
elif 'app.bsky.feed.like' in op.path:
print(f" ❤️ New like: {op.path}")
elif 'app.bsky.graph.follow' in op.path:
print(f" 👥 New follow: {op.path}")
elif op.action == 'delete':
print(f" 🗑️ Deleted: {op.path}")
# Check for referenced blobs (images, videos, etc.)
if commit.blobs:
print(f" 📎 Contains {len(commit.blobs)} blobs")
# Stream and process commits
try:
for message in client.start():
parsed = parse_subscribe_repos_message(message)
if parsed.commit:
process_repository_commit(parsed.commit)
elif parsed.error:
print(f"❌ Stream error: {parsed.error.message}")
break
except KeyboardInterrupt:
print("Stream stopped by user")
finally:
client.stop()Core data models for firehose streaming operations.
# Message frame types
class FrameType(Enum):
"""Frame types for firehose messages."""
MESSAGE = 1 # Regular message
ERROR = -1 # Error message
class MessageFrameHeader:
"""Header structure for firehose messages."""
op: int # Operation code
t: Optional[str] # Message type
class MessageFrame:
"""Complete message frame from firehose."""
header: MessageFrameHeader # Frame header
body: bytes # Message body (CBOR encoded)
# Stream control messages
class InfoMessage:
"""Stream information message."""
name: str # Stream name
message: Optional[str] # Info message
class ErrorMessage:
"""Stream error message."""
error: str # Error code
message: Optional[str] # Error descriptionclass StreamError(Exception):
"""Base exception for streaming operations."""
class ConnectionError(StreamError):
"""Raised when connection to firehose fails."""
class MessageParsingError(StreamError):
"""Raised when message parsing fails."""
class StreamTimeoutError(StreamError):
"""Raised when stream times out."""Robust streaming with error handling:
from atproto import (
AsyncFirehoseSubscribeReposClient,
StreamError, ConnectionError, MessageParsingError
)
import asyncio
async def robust_streaming():
"""Example of robust streaming with reconnection logic."""
client = AsyncFirehoseSubscribeReposClient()
cursor = None
max_retries = 5
retry_count = 0
while retry_count < max_retries:
try:
print(f"Starting stream (attempt {retry_count + 1})")
async for message in client.start(cursor=cursor):
try:
parsed = parse_subscribe_repos_message(message)
if parsed.commit:
# Update cursor for reconnection
cursor = parsed.commit.seq
# Process commit...
elif parsed.error:
print(f"Stream error: {parsed.error.message}")
break
except MessageParsingError as e:
print(f"Failed to parse message: {e}")
continue # Skip malformed messages
except ConnectionError as e:
print(f"Connection failed: {e}")
retry_count += 1
if retry_count < max_retries:
wait_time = min(2 ** retry_count, 60) # Exponential backoff
print(f"Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
else:
print("Max retries exceeded")
break
except Exception as e:
print(f"Unexpected error: {e}")
break
await client.close()
# Run robust streaming
asyncio.run(robust_streaming())Install with Tessl CLI
npx tessl i tessl/pypi-atproto