or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-operations.mdcore-functionality.mdcryptographic-operations.mdidentity-resolution.mdindex.mdjwt-operations.mdreal-time-streaming.md

real-time-streaming.mddocs/

0

# Real-time Streaming

1

2

Firehose clients for consuming live AT Protocol data streams including repository updates and labeling events. The streaming functionality provides both synchronous and asynchronous interfaces for processing real-time data from the AT Protocol network.

3

4

## Capabilities

5

6

### Repository Streaming

7

8

Stream real-time repository updates including posts, follows, likes, and other record operations from the AT Protocol network.

9

10

#### Synchronous Repository Client

11

12

```python { .api }

13

class FirehoseSubscribeReposClient:

14

"""

15

Synchronous client for repository event streaming.

16

17

Provides real-time access to repository operations across the AT Protocol network.

18

"""

19

def __init__(self, base_url: str = 'wss://bsky.network', *args, **kwargs):

20

"""

21

Initialize the repository streaming client.

22

23

Args:

24

base_url (str): WebSocket endpoint for firehose (default: Bluesky firehose)

25

*args, **kwargs: Additional client configuration

26

"""

27

28

def start(self, cursor: Optional[int] = None) -> Iterator['SubscribeReposMessage']:

29

"""

30

Start streaming repository events.

31

32

Args:

33

cursor (int, optional): Start from specific sequence number

34

35

Yields:

36

SubscribeReposMessage: Repository event messages

37

38

Raises:

39

NetworkError: If connection fails

40

StreamError: If stream processing fails

41

"""

42

43

def stop(self):

44

"""Stop the streaming connection."""

45

46

def get_cursor(self) -> Optional[int]:

47

"""

48

Get current stream cursor position.

49

50

Returns:

51

Optional[int]: Current cursor or None if not started

52

"""

53

```

54

55

#### Asynchronous Repository Client

56

57

```python { .api }

58

class AsyncFirehoseSubscribeReposClient:

59

"""

60

Asynchronous client for repository event streaming.

61

62

Async version of repository firehose client for non-blocking operations.

63

"""

64

def __init__(self, base_url: str = 'wss://bsky.network', *args, **kwargs):

65

"""

66

Initialize the async repository streaming client.

67

68

Args:

69

base_url (str): WebSocket endpoint for firehose

70

*args, **kwargs: Additional client configuration

71

"""

72

73

async def start(self, cursor: Optional[int] = None) -> AsyncIterator['SubscribeReposMessage']:

74

"""

75

Start streaming repository events asynchronously.

76

77

Args:

78

cursor (int, optional): Start from specific sequence number

79

80

Yields:

81

SubscribeReposMessage: Repository event messages

82

"""

83

84

async def stop(self):

85

"""Stop the streaming connection asynchronously."""

86

87

async def close(self):

88

"""Close the async client connection."""

89

```

90

91

Usage examples:

92

93

```python

94

from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message

95

96

# Synchronous streaming

97

client = FirehoseSubscribeReposClient()

98

99

try:

100

for message in client.start():

101

# Parse the message

102

parsed = parse_subscribe_repos_message(message)

103

104

# Handle different message types

105

if parsed.commit:

106

print(f"Repository update: {parsed.commit.repo}")

107

for op in parsed.commit.ops:

108

print(f" Operation: {op.action} {op.path}")

109

elif parsed.handle:

110

print(f"Handle update: {parsed.handle.did} -> {parsed.handle.handle}")

111

elif parsed.info:

112

print(f"Stream info: {parsed.info.name}")

113

elif parsed.error:

114

print(f"Stream error: {parsed.error.error} - {parsed.error.message}")

115

116

except KeyboardInterrupt:

117

print("Stopping stream...")

118

finally:

119

client.stop()

120

```

121

122

```python

123

import asyncio

124

from atproto import AsyncFirehoseSubscribeReposClient, parse_subscribe_repos_message

125

126

async def stream_repos():

127

client = AsyncFirehoseSubscribeReposClient()

128

129

try:

130

async for message in client.start():

131

parsed = parse_subscribe_repos_message(message)

132

133

if parsed.commit and parsed.commit.ops:

134

for op in parsed.commit.ops:

135

if op.action == 'create' and 'app.bsky.feed.post' in op.path:

136

print(f"New post created: {op.path}")

137

138

except Exception as e:

139

print(f"Stream error: {e}")

140

finally:

141

await client.close()

142

143

# Run the async stream

144

asyncio.run(stream_repos())

145

```

146

147

### Label Streaming

148

149

Stream real-time labeling events for content moderation and filtering across the AT Protocol network.

150

151

#### Synchronous Labels Client

152

153

```python { .api }

154

class FirehoseSubscribeLabelsClient:

155

"""

156

Synchronous client for label event streaming.

157

158

Provides real-time access to labeling events for content moderation.

159

"""

160

def __init__(self, base_url: str = 'wss://bsky.network', *args, **kwargs):

161

"""

162

Initialize the labels streaming client.

163

164

Args:

165

base_url (str): WebSocket endpoint for label firehose

166

*args, **kwargs: Additional client configuration

167

"""

168

169

def start(self, cursor: Optional[int] = None) -> Iterator['SubscribeLabelsMessage']:

170

"""

171

Start streaming label events.

172

173

Args:

174

cursor (int, optional): Start from specific sequence number

175

176

Yields:

177

SubscribeLabelsMessage: Label event messages

178

"""

179

180

def stop(self):

181

"""Stop the streaming connection."""

182

```

183

184

#### Asynchronous Labels Client

185

186

```python { .api }

187

class AsyncFirehoseSubscribeLabelsClient:

188

"""

189

Asynchronous client for label event streaming.

190

191

Async version of labels firehose client.

192

"""

193

def __init__(self, base_url: str = 'wss://bsky.network', *args, **kwargs):

194

"""

195

Initialize the async labels streaming client.

196

197

Args:

198

base_url (str): WebSocket endpoint for label firehose

199

*args, **kwargs: Additional client configuration

200

"""

201

202

async def start(self, cursor: Optional[int] = None) -> AsyncIterator['SubscribeLabelsMessage']:

203

"""

204

Start streaming label events asynchronously.

205

206

Args:

207

cursor (int, optional): Start from specific sequence number

208

209

Yields:

210

SubscribeLabelsMessage: Label event messages

211

"""

212

213

async def stop(self):

214

"""Stop the streaming connection asynchronously."""

215

216

async def close(self):

217

"""Close the async client connection."""

218

```

219

220

Usage example:

221

222

```python

223

from atproto import FirehoseSubscribeLabelsClient, parse_subscribe_labels_message

224

225

# Stream label events

226

client = FirehoseSubscribeLabelsClient()

227

228

try:

229

for message in client.start():

230

parsed = parse_subscribe_labels_message(message)

231

232

if parsed.labels:

233

for label in parsed.labels:

234

print(f"Label applied: {label.val} to {label.uri}")

235

if label.neg:

236

print(" (Negative label - removes previous label)")

237

if label.exp:

238

print(f" Expires: {label.exp}")

239

240

except KeyboardInterrupt:

241

print("Stopping label stream...")

242

finally:

243

client.stop()

244

```

245

246

### Message Parsing

247

248

Parse and process firehose messages for both repository and label streams.

249

250

#### Repository Message Parsing

251

252

```python { .api }

253

def parse_subscribe_repos_message(message: 'MessageFrame') -> 'SubscribeReposMessage':

254

"""

255

Parse repository subscription message.

256

257

Args:

258

message (MessageFrame): Raw message frame from firehose

259

260

Returns:

261

SubscribeReposMessage: Parsed message with typed content

262

263

Raises:

264

MessageParsingError: If message format is invalid

265

"""

266

267

# Repository message types

268

class SubscribeReposMessage:

269

"""Union type for repository stream messages."""

270

commit: Optional['RepoCommit'] # Repository commit with operations

271

handle: Optional['HandleUpdate'] # Handle change notification

272

migrate: Optional['RepoMigrate'] # Repository migration

273

tombstone: Optional['RepoTombstone'] # Repository deletion

274

info: Optional['InfoMessage'] # Stream information

275

error: Optional['ErrorMessage'] # Stream error

276

277

class RepoCommit:

278

"""Repository commit with operations."""

279

seq: int # Sequence number

280

rebase: bool # Whether this is a rebase

281

too_big: bool # Whether commit was too large

282

repo: str # Repository DID

283

commit: CID # Commit CID

284

prev: Optional[CID] # Previous commit CID

285

rev: str # Repository revision

286

since: Optional[str] # Since parameter

287

blocks: bytes # CAR blocks

288

ops: List['RepoOperation'] # Repository operations

289

blobs: List[CID] # Referenced blobs

290

time: str # Timestamp

291

292

class RepoOperation:

293

"""Individual repository operation."""

294

action: str # 'create', 'update', 'delete'

295

path: str # Record path

296

cid: Optional[CID] # Record CID (create/update)

297

```

298

299

#### Label Message Parsing

300

301

```python { .api }

302

def parse_subscribe_labels_message(message: 'MessageFrame') -> 'SubscribeLabelsMessage':

303

"""

304

Parse label subscription message.

305

306

Args:

307

message (MessageFrame): Raw message frame from firehose

308

309

Returns:

310

SubscribeLabelsMessage: Parsed message with typed content

311

312

Raises:

313

MessageParsingError: If message format is invalid

314

"""

315

316

# Label message types

317

class SubscribeLabelsMessage:

318

"""Union type for label stream messages."""

319

labels: Optional['Labels'] # Label operations

320

info: Optional['InfoMessage'] # Stream information

321

error: Optional['ErrorMessage'] # Stream error

322

323

class Labels:

324

"""Label operations message."""

325

seq: int # Sequence number

326

labels: List['Label'] # Label operations

327

328

class Label:

329

"""Individual label operation."""

330

src: str # Label source DID

331

uri: str # Labeled content URI

332

cid: Optional[str] # Content CID

333

val: str # Label value

334

neg: Optional[bool] # Negative label (removal)

335

cts: str # Creation timestamp

336

exp: Optional[str] # Expiration timestamp

337

sig: Optional[bytes] # Label signature

338

```

339

340

Advanced parsing example:

341

342

```python

343

from atproto import (

344

FirehoseSubscribeReposClient,

345

parse_subscribe_repos_message,

346

CAR

347

)

348

349

client = FirehoseSubscribeReposClient()

350

351

def process_repository_commit(commit):

352

"""Process a repository commit with detailed operation handling."""

353

print(f"Processing commit {commit.seq} from {commit.repo}")

354

355

# Parse CAR blocks if needed

356

if commit.blocks:

357

try:

358

car = CAR.from_bytes(commit.blocks)

359

print(f" Commit contains {len(car.blocks)} blocks")

360

except Exception as e:

361

print(f" Could not parse CAR blocks: {e}")

362

363

# Process each operation

364

for op in commit.ops:

365

if op.action == 'create':

366

if 'app.bsky.feed.post' in op.path:

367

print(f" πŸ“ New post: {op.path}")

368

elif 'app.bsky.feed.like' in op.path:

369

print(f" ❀️ New like: {op.path}")

370

elif 'app.bsky.graph.follow' in op.path:

371

print(f" πŸ‘₯ New follow: {op.path}")

372

373

elif op.action == 'delete':

374

print(f" πŸ—‘οΈ Deleted: {op.path}")

375

376

# Check for referenced blobs (images, videos, etc.)

377

if commit.blobs:

378

print(f" πŸ“Ž Contains {len(commit.blobs)} blobs")

379

380

# Stream and process commits

381

try:

382

for message in client.start():

383

parsed = parse_subscribe_repos_message(message)

384

385

if parsed.commit:

386

process_repository_commit(parsed.commit)

387

elif parsed.error:

388

print(f"❌ Stream error: {parsed.error.message}")

389

break

390

391

except KeyboardInterrupt:

392

print("Stream stopped by user")

393

finally:

394

client.stop()

395

```

396

397

### Firehose Models

398

399

Core data models for firehose streaming operations.

400

401

```python { .api }

402

# Message frame types

403

class FrameType(Enum):

404

"""Frame types for firehose messages."""

405

MESSAGE = 1 # Regular message

406

ERROR = -1 # Error message

407

408

class MessageFrameHeader:

409

"""Header structure for firehose messages."""

410

op: int # Operation code

411

t: Optional[str] # Message type

412

413

class MessageFrame:

414

"""Complete message frame from firehose."""

415

header: MessageFrameHeader # Frame header

416

body: bytes # Message body (CBOR encoded)

417

418

# Stream control messages

419

class InfoMessage:

420

"""Stream information message."""

421

name: str # Stream name

422

message: Optional[str] # Info message

423

424

class ErrorMessage:

425

"""Stream error message."""

426

error: str # Error code

427

message: Optional[str] # Error description

428

```

429

430

### Error Handling

431

432

```python { .api }

433

class StreamError(Exception):

434

"""Base exception for streaming operations."""

435

436

class ConnectionError(StreamError):

437

"""Raised when connection to firehose fails."""

438

439

class MessageParsingError(StreamError):

440

"""Raised when message parsing fails."""

441

442

class StreamTimeoutError(StreamError):

443

"""Raised when stream times out."""

444

```

445

446

Robust streaming with error handling:

447

448

```python

449

from atproto import (

450

AsyncFirehoseSubscribeReposClient,

451

StreamError, ConnectionError, MessageParsingError

452

)

453

import asyncio

454

455

async def robust_streaming():

456

"""Example of robust streaming with reconnection logic."""

457

client = AsyncFirehoseSubscribeReposClient()

458

cursor = None

459

max_retries = 5

460

retry_count = 0

461

462

while retry_count < max_retries:

463

try:

464

print(f"Starting stream (attempt {retry_count + 1})")

465

466

async for message in client.start(cursor=cursor):

467

try:

468

parsed = parse_subscribe_repos_message(message)

469

470

if parsed.commit:

471

# Update cursor for reconnection

472

cursor = parsed.commit.seq

473

# Process commit...

474

475

elif parsed.error:

476

print(f"Stream error: {parsed.error.message}")

477

break

478

479

except MessageParsingError as e:

480

print(f"Failed to parse message: {e}")

481

continue # Skip malformed messages

482

483

except ConnectionError as e:

484

print(f"Connection failed: {e}")

485

retry_count += 1

486

if retry_count < max_retries:

487

wait_time = min(2 ** retry_count, 60) # Exponential backoff

488

print(f"Retrying in {wait_time} seconds...")

489

await asyncio.sleep(wait_time)

490

else:

491

print("Max retries exceeded")

492

break

493

494

except Exception as e:

495

print(f"Unexpected error: {e}")

496

break

497

498

await client.close()

499

500

# Run robust streaming

501

asyncio.run(robust_streaming())

502

```