CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-atproto

Comprehensive Python SDK for the AT Protocol, providing client interfaces, authentication, and real-time streaming for decentralized social networks.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

real-time-streaming.mddocs/

Real-time Streaming

Firehose clients for consuming live AT Protocol data streams including repository updates and labeling events. The streaming functionality provides both synchronous and asynchronous interfaces for processing real-time data from the AT Protocol network.

Capabilities

Repository Streaming

Stream real-time repository updates including posts, follows, likes, and other record operations from the AT Protocol network.

Synchronous Repository Client

class FirehoseSubscribeReposClient:
    """
    Synchronous client for repository event streaming.
    
    Provides real-time access to repository operations across the AT Protocol network.
    """
    def __init__(self, base_url: str = 'wss://bsky.network', *args, **kwargs):
        """
        Initialize the repository streaming client.
        
        Args:
            base_url (str): WebSocket endpoint for firehose (default: Bluesky firehose)
            *args, **kwargs: Additional client configuration
        """
    
    def start(self, cursor: Optional[int] = None) -> Iterator['SubscribeReposMessage']:
        """
        Start streaming repository events.
        
        Args:
            cursor (int, optional): Start from specific sequence number
            
        Yields:
            SubscribeReposMessage: Repository event messages
            
        Raises:
            NetworkError: If connection fails
            StreamError: If stream processing fails
        """
    
    def stop(self):
        """Stop the streaming connection."""
    
    def get_cursor(self) -> Optional[int]:
        """
        Get current stream cursor position.
        
        Returns:
            Optional[int]: Current cursor or None if not started
        """

Asynchronous Repository Client

class AsyncFirehoseSubscribeReposClient:
    """
    Asynchronous client for repository event streaming.
    
    Async version of repository firehose client for non-blocking operations.
    """
    def __init__(self, base_url: str = 'wss://bsky.network', *args, **kwargs):
        """
        Initialize the async repository streaming client.
        
        Args:
            base_url (str): WebSocket endpoint for firehose
            *args, **kwargs: Additional client configuration
        """
    
    async def start(self, cursor: Optional[int] = None) -> AsyncIterator['SubscribeReposMessage']:
        """
        Start streaming repository events asynchronously.
        
        Args:
            cursor (int, optional): Start from specific sequence number
            
        Yields:
            SubscribeReposMessage: Repository event messages
        """
    
    async def stop(self):
        """Stop the streaming connection asynchronously."""
    
    async def close(self):
        """Close the async client connection."""

Usage examples:

from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message

# Synchronous streaming
client = FirehoseSubscribeReposClient()

try:
    for message in client.start():
        # Parse the message
        parsed = parse_subscribe_repos_message(message)
        
        # Handle different message types
        if parsed.commit:
            print(f"Repository update: {parsed.commit.repo}")
            for op in parsed.commit.ops:
                print(f"  Operation: {op.action} {op.path}")
        elif parsed.handle:
            print(f"Handle update: {parsed.handle.did} -> {parsed.handle.handle}")
        elif parsed.info:
            print(f"Stream info: {parsed.info.name}")
        elif parsed.error:
            print(f"Stream error: {parsed.error.error} - {parsed.error.message}")
            
except KeyboardInterrupt:
    print("Stopping stream...")
finally:
    client.stop()
import asyncio
from atproto import AsyncFirehoseSubscribeReposClient, parse_subscribe_repos_message

async def stream_repos():
    client = AsyncFirehoseSubscribeReposClient()
    
    try:
        async for message in client.start():
            parsed = parse_subscribe_repos_message(message)
            
            if parsed.commit and parsed.commit.ops:
                for op in parsed.commit.ops:
                    if op.action == 'create' and 'app.bsky.feed.post' in op.path:
                        print(f"New post created: {op.path}")
                        
    except Exception as e:
        print(f"Stream error: {e}")
    finally:
        await client.close()

# Run the async stream
asyncio.run(stream_repos())

Label Streaming

Stream real-time labeling events for content moderation and filtering across the AT Protocol network.

Synchronous Labels Client

class FirehoseSubscribeLabelsClient:
    """
    Synchronous client for label event streaming.
    
    Provides real-time access to labeling events for content moderation.
    """
    def __init__(self, base_url: str = 'wss://bsky.network', *args, **kwargs):
        """
        Initialize the labels streaming client.
        
        Args:
            base_url (str): WebSocket endpoint for label firehose
            *args, **kwargs: Additional client configuration
        """
    
    def start(self, cursor: Optional[int] = None) -> Iterator['SubscribeLabelsMessage']:
        """
        Start streaming label events.
        
        Args:
            cursor (int, optional): Start from specific sequence number
            
        Yields:
            SubscribeLabelsMessage: Label event messages
        """
    
    def stop(self):
        """Stop the streaming connection."""

Asynchronous Labels Client

class AsyncFirehoseSubscribeLabelsClient:
    """
    Asynchronous client for label event streaming.
    
    Async version of labels firehose client.
    """
    def __init__(self, base_url: str = 'wss://bsky.network', *args, **kwargs):
        """
        Initialize the async labels streaming client.
        
        Args:
            base_url (str): WebSocket endpoint for label firehose
            *args, **kwargs: Additional client configuration
        """
    
    async def start(self, cursor: Optional[int] = None) -> AsyncIterator['SubscribeLabelsMessage']:
        """
        Start streaming label events asynchronously.
        
        Args:
            cursor (int, optional): Start from specific sequence number
            
        Yields:
            SubscribeLabelsMessage: Label event messages
        """
    
    async def stop(self):
        """Stop the streaming connection asynchronously."""
    
    async def close(self):
        """Close the async client connection."""

Usage example:

from atproto import FirehoseSubscribeLabelsClient, parse_subscribe_labels_message

# Stream label events
client = FirehoseSubscribeLabelsClient()

try:
    for message in client.start():
        parsed = parse_subscribe_labels_message(message)
        
        if parsed.labels:
            for label in parsed.labels:
                print(f"Label applied: {label.val} to {label.uri}")
                if label.neg:
                    print("  (Negative label - removes previous label)")
                if label.exp:
                    print(f"  Expires: {label.exp}")
                    
except KeyboardInterrupt:
    print("Stopping label stream...")
finally:
    client.stop()

Message Parsing

Parse and process firehose messages for both repository and label streams.

Repository Message Parsing

def parse_subscribe_repos_message(message: 'MessageFrame') -> 'SubscribeReposMessage':
    """
    Parse repository subscription message.
    
    Args:
        message (MessageFrame): Raw message frame from firehose
        
    Returns:
        SubscribeReposMessage: Parsed message with typed content
        
    Raises:
        MessageParsingError: If message format is invalid
    """

# Repository message types
class SubscribeReposMessage:
    """Union type for repository stream messages."""
    commit: Optional['RepoCommit']       # Repository commit with operations
    handle: Optional['HandleUpdate']     # Handle change notification  
    migrate: Optional['RepoMigrate']     # Repository migration
    tombstone: Optional['RepoTombstone'] # Repository deletion
    info: Optional['InfoMessage']        # Stream information
    error: Optional['ErrorMessage']      # Stream error

class RepoCommit:
    """Repository commit with operations."""
    seq: int                            # Sequence number
    rebase: bool                        # Whether this is a rebase
    too_big: bool                       # Whether commit was too large
    repo: str                           # Repository DID
    commit: CID                         # Commit CID
    prev: Optional[CID]                 # Previous commit CID
    rev: str                            # Repository revision
    since: Optional[str]                # Since parameter
    blocks: bytes                       # CAR blocks
    ops: List['RepoOperation']          # Repository operations
    blobs: List[CID]                    # Referenced blobs
    time: str                           # Timestamp

class RepoOperation:
    """Individual repository operation."""
    action: str                         # 'create', 'update', 'delete'
    path: str                           # Record path
    cid: Optional[CID]                  # Record CID (create/update)

Label Message Parsing

def parse_subscribe_labels_message(message: 'MessageFrame') -> 'SubscribeLabelsMessage':
    """
    Parse label subscription message.
    
    Args:
        message (MessageFrame): Raw message frame from firehose
        
    Returns:
        SubscribeLabelsMessage: Parsed message with typed content
        
    Raises:
        MessageParsingError: If message format is invalid
    """

# Label message types  
class SubscribeLabelsMessage:
    """Union type for label stream messages."""
    labels: Optional['Labels']           # Label operations
    info: Optional['InfoMessage']        # Stream information
    error: Optional['ErrorMessage']      # Stream error

class Labels:
    """Label operations message."""
    seq: int                            # Sequence number
    labels: List['Label']               # Label operations

class Label:
    """Individual label operation."""
    src: str                            # Label source DID
    uri: str                            # Labeled content URI
    cid: Optional[str]                  # Content CID
    val: str                            # Label value
    neg: Optional[bool]                 # Negative label (removal)
    cts: str                            # Creation timestamp
    exp: Optional[str]                  # Expiration timestamp
    sig: Optional[bytes]                # Label signature

Advanced parsing example:

from atproto import (
    FirehoseSubscribeReposClient, 
    parse_subscribe_repos_message,
    CAR
)

client = FirehoseSubscribeReposClient()

def process_repository_commit(commit):
    """Process a repository commit with detailed operation handling."""
    print(f"Processing commit {commit.seq} from {commit.repo}")
    
    # Parse CAR blocks if needed
    if commit.blocks:
        try:
            car = CAR.from_bytes(commit.blocks)
            print(f"  Commit contains {len(car.blocks)} blocks")
        except Exception as e:
            print(f"  Could not parse CAR blocks: {e}")
    
    # Process each operation
    for op in commit.ops: 
        if op.action == 'create':
            if 'app.bsky.feed.post' in op.path:
                print(f"  📝 New post: {op.path}")
            elif 'app.bsky.feed.like' in op.path:
                print(f"  ❤️  New like: {op.path}")
            elif 'app.bsky.graph.follow' in op.path:
                print(f"  👥 New follow: {op.path}")
                
        elif op.action == 'delete':
            print(f"  🗑️  Deleted: {op.path}")
            
    # Check for referenced blobs (images, videos, etc.)
    if commit.blobs:
        print(f"  📎 Contains {len(commit.blobs)} blobs")

# Stream and process commits
try:
    for message in client.start():
        parsed = parse_subscribe_repos_message(message)
        
        if parsed.commit:
            process_repository_commit(parsed.commit)
        elif parsed.error:
            print(f"❌ Stream error: {parsed.error.message}")
            break
            
except KeyboardInterrupt:
    print("Stream stopped by user")
finally:
    client.stop()

Firehose Models

Core data models for firehose streaming operations.

# Message frame types
class FrameType(Enum):
    """Frame types for firehose messages."""
    MESSAGE = 1     # Regular message
    ERROR = -1      # Error message

class MessageFrameHeader:
    """Header structure for firehose messages."""
    op: int                             # Operation code
    t: Optional[str]                    # Message type

class MessageFrame:
    """Complete message frame from firehose."""
    header: MessageFrameHeader          # Frame header
    body: bytes                         # Message body (CBOR encoded)

# Stream control messages
class InfoMessage:
    """Stream information message."""
    name: str                           # Stream name
    message: Optional[str]              # Info message

class ErrorMessage:
    """Stream error message."""
    error: str                          # Error code
    message: Optional[str]              # Error description

Error Handling

class StreamError(Exception):
    """Base exception for streaming operations."""

class ConnectionError(StreamError):
    """Raised when connection to firehose fails."""

class MessageParsingError(StreamError):
    """Raised when message parsing fails."""

class StreamTimeoutError(StreamError):
    """Raised when stream times out."""

Robust streaming with error handling:

from atproto import (
    AsyncFirehoseSubscribeReposClient,
    StreamError, ConnectionError, MessageParsingError
)
import asyncio

async def robust_streaming():
    """Example of robust streaming with reconnection logic."""
    client = AsyncFirehoseSubscribeReposClient()
    cursor = None
    max_retries = 5
    retry_count = 0
    
    while retry_count < max_retries:
        try:
            print(f"Starting stream (attempt {retry_count + 1})")
            
            async for message in client.start(cursor=cursor):
                try:
                    parsed = parse_subscribe_repos_message(message)
                    
                    if parsed.commit:
                        # Update cursor for reconnection
                        cursor = parsed.commit.seq
                        # Process commit...
                        
                    elif parsed.error:
                        print(f"Stream error: {parsed.error.message}")
                        break
                        
                except MessageParsingError as e:
                    print(f"Failed to parse message: {e}")
                    continue  # Skip malformed messages
                    
        except ConnectionError as e:
            print(f"Connection failed: {e}")
            retry_count += 1
            if retry_count < max_retries:
                wait_time = min(2 ** retry_count, 60)  # Exponential backoff
                print(f"Retrying in {wait_time} seconds...")
                await asyncio.sleep(wait_time)
            else:
                print("Max retries exceeded")
                break
                
        except Exception as e:
            print(f"Unexpected error: {e}")
            break
            
    await client.close()

# Run robust streaming
asyncio.run(robust_streaming())

Install with Tessl CLI

npx tessl i tessl/pypi-atproto

docs

client-operations.md

core-functionality.md

cryptographic-operations.md

identity-resolution.md

index.md

jwt-operations.md

real-time-streaming.md

tile.json