Comprehensive Python SDK for the AT Protocol, providing client interfaces, authentication, and real-time streaming for decentralized social networks.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
DID document resolution, caching mechanisms, and AT Protocol-specific identity data extraction for decentralized identity management. These components enable efficient resolution of decentralized identifiers and handles within the AT Protocol ecosystem.
Resolve decentralized identifiers (DIDs) and handles to their corresponding DID documents and service endpoints.
class IdResolver:
"""
Identity resolver for DIDs and handles.
Resolves both DID and handle identities with configurable PLC directory
and optional caching support.
"""
def __init__(self,
plc_url: Optional[str] = None,
timeout: Optional[float] = None,
cache: Optional['DidBaseCache'] = None):
"""
Initialize the identity resolver.
Args:
plc_url (str, optional): PLC directory URL (default: https://plc.directory)
timeout (float, optional): Request timeout in seconds
cache (DidBaseCache, optional): DID document cache implementation
"""
def resolve_did(self, did: str) -> 'DidDocument':
"""
Resolve DID to DID document.
Args:
did (str): DID to resolve (e.g., "did:plc:alice123")
Returns:
DidDocument: Resolved DID document
Raises:
ResolutionError: If DID cannot be resolved
NetworkError: If resolution service is unreachable
"""
def resolve_handle(self, handle: str) -> str:
"""
Resolve handle to DID.
Args:
handle (str): Handle to resolve (e.g., "alice.bsky.social")
Returns:
str: Resolved DID
Raises:
ResolutionError: If handle cannot be resolved
"""
def resolve_atproto_data(self, identifier: str) -> 'AtprotoData':
"""
Resolve identifier to AT Protocol-specific data.
Args:
identifier (str): DID or handle to resolve
Returns:
AtprotoData: Extracted AT Protocol data
"""
def get_pds_endpoint(self, identifier: str) -> Optional[str]:
"""
Get Personal Data Server endpoint for identifier.
Args:
identifier (str): DID or handle
Returns:
Optional[str]: PDS endpoint URL or None if not found
"""
def refresh_cache_entry(self, did: str):
"""
Force refresh of cached DID document.
Args:
did (str): DID to refresh in cache
"""class AsyncIdResolver:
"""
Asynchronous identity resolver for DIDs and handles.
Async version of identity resolver for non-blocking operations.
"""
def __init__(self,
plc_url: Optional[str] = None,
timeout: Optional[float] = None,
cache: Optional['AsyncDidBaseCache'] = None):
"""
Initialize the async identity resolver.
Args:
plc_url (str, optional): PLC directory URL
timeout (float, optional): Request timeout in seconds
cache (AsyncDidBaseCache, optional): Async DID document cache
"""
async def resolve_did(self, did: str) -> 'DidDocument':
"""
Resolve DID to DID document asynchronously.
Args:
did (str): DID to resolve
Returns:
DidDocument: Resolved DID document
"""
async def resolve_handle(self, handle: str) -> str:
"""
Resolve handle to DID asynchronously.
Args:
handle (str): Handle to resolve
Returns:
str: Resolved DID
"""
async def resolve_atproto_data(self, identifier: str) -> 'AtprotoData':
"""
Resolve identifier to AT Protocol data asynchronously.
Args:
identifier (str): DID or handle to resolve
Returns:
AtprotoData: Extracted AT Protocol data
"""
async def close(self):
"""Close the async resolver connections."""Usage examples:
from atproto import IdResolver, AtprotoData
# Initialize resolver with default settings
resolver = IdResolver()
# Resolve DID to document
did = "did:plc:alice123456789"
doc = resolver.resolve_did(did)
print(f"DID document ID: {doc.id}")
# Resolve handle to DID
handle = "alice.bsky.social"
resolved_did = resolver.resolve_handle(handle)
print(f"Handle {handle} resolves to: {resolved_did}")
# Get AT Protocol specific data
atproto_data = resolver.resolve_atproto_data(handle)
print(f"PDS endpoint: {atproto_data.pds}")
print(f"Signing key: {atproto_data.signing_key}")
# Get PDS endpoint directly
pds_endpoint = resolver.get_pds_endpoint("alice.bsky.social")
if pds_endpoint:
print(f"Alice's PDS: {pds_endpoint}")import asyncio
from atproto import AsyncIdResolver
async def resolve_identities():
resolver = AsyncIdResolver()
# Resolve multiple identities concurrently
identities = [
"alice.bsky.social",
"bob.bsky.social",
"did:plc:charlie789"
]
tasks = [resolver.resolve_atproto_data(identity) for identity in identities]
results = await asyncio.gather(*tasks, return_exceptions=True)
for identity, result in zip(identities, results):
if isinstance(result, Exception):
print(f"Failed to resolve {identity}: {result}")
else:
print(f"{identity} -> {result.did} (PDS: {result.pds})")
await resolver.close()
asyncio.run(resolve_identities())Efficient caching mechanisms for DID documents to reduce resolution latency and network overhead.
class DidInMemoryCache:
"""
In-memory cache for DID documents.
Provides fast access to frequently resolved DID documents with
configurable TTL and size limits.
"""
def __init__(self, max_size: int = 1000, ttl_seconds: int = 3600):
"""
Initialize the cache.
Args:
max_size (int): Maximum number of cached documents
ttl_seconds (int): Time-to-live for cache entries in seconds
"""
def get(self, did: str) -> Optional['DidDocument']:
"""
Get DID document from cache.
Args:
did (str): DID to look up
Returns:
Optional[DidDocument]: Cached document or None if not found/expired
"""
def set(self, did: str, document: 'DidDocument'):
"""
Store DID document in cache.
Args:
did (str): DID key
document (DidDocument): Document to cache
"""
def delete(self, did: str):
"""
Remove DID document from cache.
Args:
did (str): DID to remove
"""
def clear(self):
"""Clear all cached documents."""
def refresh(self, did: str, get_doc_callback: 'GetDocCallback'):
"""
Refresh cached document by fetching new version.
Args:
did (str): DID to refresh
get_doc_callback (GetDocCallback): Function to fetch fresh document
"""
def get_stats(self) -> Dict[str, Any]:
"""
Get cache statistics.
Returns:
Dict[str, Any]: Cache statistics (hits, misses, size, etc.)
"""class AsyncDidInMemoryCache:
"""
Asynchronous in-memory cache for DID documents.
Async version of DID document cache with thread-safe operations.
"""
def __init__(self, max_size: int = 1000, ttl_seconds: int = 3600):
"""
Initialize the async cache.
Args:
max_size (int): Maximum number of cached documents
ttl_seconds (int): Time-to-live for cache entries
"""
async def get(self, did: str) -> Optional['DidDocument']:
"""
Get DID document from cache asynchronously.
Args:
did (str): DID to look up
Returns:
Optional[DidDocument]: Cached document or None
"""
async def set(self, did: str, document: 'DidDocument'):
"""
Store DID document in cache asynchronously.
Args:
did (str): DID key
document (DidDocument): Document to cache
"""
async def refresh(self, did: str, get_doc_callback: 'AsyncGetDocCallback'):
"""
Refresh cached document asynchronously.
Args:
did (str): DID to refresh
get_doc_callback (AsyncGetDocCallback): Async function to fetch document
"""Usage examples:
from atproto import IdResolver, DidInMemoryCache
# Create resolver with cache
cache = DidInMemoryCache(max_size=500, ttl_seconds=1800) # 30 minute TTL
resolver = IdResolver(cache=cache)
# First resolution - cache miss
print("First resolution (cache miss)")
start_time = time.time()
doc1 = resolver.resolve_did("did:plc:alice123")
print(f"Resolved in {time.time() - start_time:.3f}s")
# Second resolution - cache hit
print("Second resolution (cache hit)")
start_time = time.time()
doc2 = resolver.resolve_did("did:plc:alice123")
print(f"Resolved in {time.time() - start_time:.3f}s")
# Check cache statistics
stats = cache.get_stats()
print(f"Cache hits: {stats['hits']}, misses: {stats['misses']}")
# Force refresh of cache entry
cache.refresh("did:plc:alice123", lambda did: resolver.resolve_did(did))Extract AT Protocol-specific information from DID documents for efficient client operations.
class AtprotoData:
"""
ATProtocol-specific data extracted from DID documents.
Attributes:
did (str): DID identifier
signing_key (Optional[str]): Signing key for the identity
handle (Optional[str]): Associated handle
pds (Optional[str]): Personal Data Server endpoint
pds_active (bool): Whether PDS is active
declarations (Dict[str, Any]): Additional declarations
"""
did: str
signing_key: Optional[str]
handle: Optional[str]
pds: Optional[str]
pds_active: bool
declarations: Dict[str, Any]
@classmethod
def from_did_doc(cls, did_doc: 'DidDocument') -> 'AtprotoData':
"""
Extract AT Protocol data from DID document.
Args:
did_doc (DidDocument): DID document to extract from
Returns:
AtprotoData: Extracted AT Protocol data
Raises:
ExtractionError: If required data cannot be extracted
"""
def get_service_endpoint(self, service_id: str) -> Optional[str]:
"""
Get service endpoint by ID.
Args:
service_id (str): Service identifier
Returns:
Optional[str]: Service endpoint URL
"""
def has_valid_pds(self) -> bool:
"""
Check if the identity has a valid PDS.
Returns:
bool: True if PDS is available and active
"""
def get_signing_key_multikey(self) -> Optional['Multikey']:
"""
Get signing key as Multikey object.
Returns:
Optional[Multikey]: Signing key or None if not available
"""
def to_dict(self) -> Dict[str, Any]:
"""
Convert to dictionary representation.
Returns:
Dict[str, Any]: Dictionary representation
"""Usage examples:
from atproto import IdResolver, AtprotoData, Multikey
resolver = IdResolver()
# Extract AT Protocol data
handle = "alice.bsky.social"
atproto_data = resolver.resolve_atproto_data(handle)
print(f"DID: {atproto_data.did}")
print(f"Handle: {atproto_data.handle}")
print(f"PDS: {atproto_data.pds}")
print(f"PDS Active: {atproto_data.pds_active}")
# Check if identity has valid PDS
if atproto_data.has_valid_pds():
print("✓ Identity has valid PDS")
# Get signing key
if atproto_data.signing_key:
signing_key = atproto_data.get_signing_key_multikey()
if signing_key:
print(f"Signing algorithm: {signing_key.jwt_alg}")
# Convert to dictionary for storage/serialization
data_dict = atproto_data.to_dict()
print(f"Serialized data: {data_dict}")
# Create from DID document directly
did_doc = resolver.resolve_did(atproto_data.did)
extracted_data = AtprotoData.from_did_doc(did_doc)
assert extracted_data.did == atproto_data.didUtility functions for identity resolution and validation.
def is_valid_did(did: str) -> bool:
"""
Validate DID format.
Args:
did (str): DID to validate
Returns:
bool: True if valid DID format
"""
def is_valid_handle(handle: str) -> bool:
"""
Validate handle format.
Args:
handle (str): Handle to validate
Returns:
bool: True if valid handle format
"""
def normalize_identifier(identifier: str) -> str:
"""
Normalize DID or handle identifier.
Args:
identifier (str): Identifier to normalize
Returns:
str: Normalized identifier
"""
def extract_handle_from_did_doc(did_doc: DidDocument) -> Optional[str]:
"""
Extract handle from DID document alsoKnownAs field.
Args:
did_doc (DidDocument): DID document
Returns:
Optional[str]: Extracted handle or None
"""Usage examples:
from atproto import (
is_valid_did, is_valid_handle, normalize_identifier,
extract_handle_from_did_doc, IdResolver
)
# Validate identifiers
identifiers = [
"did:plc:alice123456789",
"alice.bsky.social",
"invalid-identifier",
"@alice.bsky.social" # Should be normalized
]
for identifier in identifiers:
print(f"'{identifier}':")
print(f" Valid DID: {is_valid_did(identifier)}")
print(f" Valid handle: {is_valid_handle(identifier)}")
print(f" Normalized: {normalize_identifier(identifier)}")
print()
# Extract handle from DID document
resolver = IdResolver()
did_doc = resolver.resolve_did("did:plc:alice123")
handle = extract_handle_from_did_doc(did_doc)
if handle:
print(f"Handle from DID doc: {handle}")class ResolutionError(Exception):
"""Base exception for identity resolution errors."""
class DidNotFoundError(ResolutionError):
"""Raised when DID cannot be found."""
class HandleNotFoundError(ResolutionError):
"""Raised when handle cannot be resolved."""
class InvalidIdentifierError(ResolutionError):
"""Raised when identifier format is invalid."""
class ExtractionError(ResolutionError):
"""Raised when AT Protocol data cannot be extracted."""
class CacheError(Exception):
"""Base exception for cache operations."""Robust resolution with error handling:
from atproto import (
IdResolver, ResolutionError, DidNotFoundError,
HandleNotFoundError, InvalidIdentifierError
)
def safe_resolve_identity(resolver, identifier):
"""Safely resolve identity with comprehensive error handling."""
try:
# Validate identifier format first
if not (is_valid_did(identifier) or is_valid_handle(identifier)):
raise InvalidIdentifierError(f"Invalid identifier format: {identifier}")
# Normalize identifier
normalized = normalize_identifier(identifier)
# Resolve to AT Protocol data
atproto_data = resolver.resolve_atproto_data(normalized)
return {
'success': True,
'data': atproto_data,
'original': identifier,
'normalized': normalized
}
except DidNotFoundError:
return {'success': False, 'error': 'DID not found', 'identifier': identifier}
except HandleNotFoundError:
return {'success': False, 'error': 'Handle not found', 'identifier': identifier}
except InvalidIdentifierError as e:
return {'success': False, 'error': str(e), 'identifier': identifier}
except ResolutionError as e:
return {'success': False, 'error': f'Resolution failed: {e}', 'identifier': identifier}
except Exception as e:
return {'success': False, 'error': f'Unexpected error: {e}', 'identifier': identifier}
# Usage
resolver = IdResolver()
identifiers = ["alice.bsky.social", "invalid-id", "did:plc:alice123"]
for identifier in identifiers:
result = safe_resolve_identity(resolver, identifier)
if result['success']:
print(f"✓ {identifier} -> {result['data'].did}")
else:
print(f"✗ {identifier}: {result['error']}")Install with Tessl CLI
npx tessl i tessl/pypi-atproto