or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-client.mderror-handling.mdindex.mdjetstream-management.mdjetstream.mdkey-value-store.mdmessage-handling.mdmicroservices.mdobject-store.md

jetstream.mddocs/

0

# JetStream

1

2

JetStream provides persistent messaging capabilities built on NATS streams. It offers message storage, replay, delivery guarantees, and advanced consumption patterns for building resilient distributed applications.

3

4

## Capabilities

5

6

### JetStream Context

7

8

Core JetStream functionality for publishing to streams and subscribing to consumers.

9

10

```python { .api }

11

class JetStreamContext:

12

async def publish(

13

self,

14

subject: str,

15

payload: bytes = b"",

16

timeout: Optional[float] = None,

17

stream: Optional[str] = None,

18

headers: Optional[Dict[str, Any]] = None

19

) -> PubAck:

20

"""

21

Publish message to JetStream stream.

22

23

Parameters:

24

- subject: Stream subject

25

- payload: Message data

26

- timeout: Publish timeout

27

- stream: Target stream name (optional)

28

- headers: Message headers

29

30

Returns:

31

Publish acknowledgment with sequence info

32

"""

33

34

async def publish_async(

35

self,

36

subject: str,

37

payload: bytes = b"",

38

wait_stall: Optional[float] = None,

39

stream: Optional[str] = None,

40

headers: Optional[Dict] = None

41

) -> asyncio.Future[PubAck]:

42

"""

43

Publish message asynchronously without blocking.

44

45

Returns:

46

Future that resolves to publish acknowledgment

47

"""

48

49

def publish_async_pending(self) -> int:

50

"""Get count of pending async publishes."""

51

52

async def publish_async_completed(self) -> None:

53

"""Wait for all pending async publishes to complete."""

54

```

55

56

#### Usage Examples

57

58

```python

59

import asyncio

60

import nats

61

62

async def main():

63

nc = await nats.connect()

64

js = nc.jetstream()

65

66

# Synchronous publish with acknowledgment

67

ack = await js.publish("events.user.login", b'{"user_id": 123, "timestamp": "2024-01-01T10:00:00Z"}')

68

print(f"Message stored at sequence {ack.seq}")

69

70

# Asynchronous publish for high throughput

71

future1 = await js.publish_async("metrics.cpu", b'{"usage": 75.5}')

72

future2 = await js.publish_async("metrics.memory", b'{"usage": 82.1}')

73

74

# Wait for specific acknowledgments

75

ack1 = await future1

76

ack2 = await future2

77

78

# Wait for all pending publishes

79

await js.publish_async_completed()

80

```

81

82

### Push Subscriptions

83

84

Subscribe to JetStream messages with automatic delivery to callback handlers.

85

86

```python { .api }

87

class JetStreamContext:

88

async def subscribe(

89

self,

90

subject: str,

91

queue: str = "",

92

cb: Callable[[Msg], None] = None,

93

durable: str = None,

94

stream: str = None,

95

config: ConsumerConfig = None,

96

manual_ack: bool = False,

97

ordered_consumer: bool = False,

98

idle_heartbeat: float = None,

99

flow_control: bool = False,

100

**kwargs

101

) -> JetStreamSubscription:

102

"""

103

Subscribe to JetStream stream with push delivery.

104

105

Parameters:

106

- subject: Subject pattern to subscribe to

107

- queue: Queue group for load balancing

108

- cb: Message callback handler

109

- durable: Durable consumer name

110

- stream: Source stream name

111

- config: Consumer configuration

112

- manual_ack: Require manual message acknowledgment

113

- ordered_consumer: Enable ordered message delivery

114

- idle_heartbeat: Heartbeat interval for flow control

115

- flow_control: Enable flow control

116

117

Returns:

118

JetStream subscription

119

"""

120

121

async def subscribe_bind(

122

self,

123

stream: str,

124

consumer: str,

125

**kwargs

126

) -> JetStreamSubscription:

127

"""

128

Bind to existing durable consumer.

129

130

Parameters:

131

- stream: Stream name

132

- consumer: Consumer name

133

134

Returns:

135

Bound JetStream subscription

136

"""

137

```

138

139

#### Usage Examples

140

141

```python

142

# Simple JetStream subscription

143

async def handle_event(msg):

144

data = msg.data.decode()

145

print(f"Processing: {data}")

146

await msg.ack() # Acknowledge message

147

148

js_sub = await js.subscribe("events.>", cb=handle_event)

149

150

# Durable consumer subscription

151

await js.subscribe(

152

"orders.created",

153

durable="order-processor",

154

manual_ack=True,

155

cb=process_order

156

)

157

158

# Ordered consumer for sequential processing

159

await js.subscribe(

160

"audit.logs",

161

ordered_consumer=True,

162

cb=process_audit_log

163

)

164

165

# Queue group for load balancing

166

await js.subscribe(

167

"work.tasks",

168

queue="workers",

169

durable="task-worker",

170

cb=process_task

171

)

172

```

173

174

### Pull Subscriptions

175

176

Subscribe with manual message fetching for controlled consumption patterns.

177

178

```python { .api }

179

class JetStreamContext:

180

async def pull_subscribe(

181

self,

182

subject: str,

183

durable: str = None,

184

stream: str = None,

185

config: ConsumerConfig = None,

186

**kwargs

187

) -> PullSubscription:

188

"""

189

Create pull-based subscription for manual message fetching.

190

191

Parameters:

192

- subject: Subject pattern to subscribe to

193

- durable: Durable consumer name

194

- stream: Source stream name

195

- config: Consumer configuration

196

197

Returns:

198

Pull subscription for manual fetching

199

"""

200

201

async def pull_subscribe_bind(

202

self,

203

stream: str,

204

consumer: str,

205

**kwargs

206

) -> PullSubscription:

207

"""

208

Bind pull subscription to existing consumer.

209

210

Parameters:

211

- stream: Stream name

212

- consumer: Consumer name

213

214

Returns:

215

Bound pull subscription

216

"""

217

```

218

219

#### Usage Examples

220

221

```python

222

# Pull subscription with manual fetching

223

pull_sub = await js.pull_subscribe("batch.jobs", durable="job-processor")

224

225

# Fetch specific number of messages

226

msgs = await pull_sub.fetch(batch_size=10, timeout=5.0)

227

for msg in msgs:

228

await process_job(msg.data)

229

await msg.ack()

230

231

# Fetch with wait

232

msgs = await pull_sub.fetch(batch_size=5, timeout=30.0)

233

if msgs:

234

await process_batch(msgs)

235

236

# Continuous fetching loop

237

async for msg in pull_sub.messages():

238

try:

239

await process_message(msg.data)

240

await msg.ack()

241

except Exception as e:

242

print(f"Processing failed: {e}")

243

await msg.nak() # Negative acknowledgment for redelivery

244

```

245

246

### Message Acknowledgment

247

248

Handle JetStream message acknowledgments with various strategies.

249

250

```python { .api }

251

class Msg:

252

async def ack(self) -> None:

253

"""Acknowledge message successfully processed."""

254

255

async def ack_sync(self, timeout: float = 1.0) -> None:

256

"""Synchronously acknowledge message with timeout."""

257

258

async def nak(self, delay: float = None) -> None:

259

"""

260

Negative acknowledgment - message will be redelivered.

261

262

Parameters:

263

- delay: Delay before redelivery in seconds

264

"""

265

266

async def in_progress(self) -> None:

267

"""Extend acknowledgment deadline for longer processing."""

268

269

async def term(self) -> None:

270

"""Terminate message - no further redelivery."""

271

```

272

273

#### Usage Examples

274

275

```python

276

async def message_handler(msg):

277

try:

278

# Long-running processing

279

await msg.in_progress() # Extend ack deadline

280

281

result = await complex_processing(msg.data)

282

283

if result.success:

284

await msg.ack() # Success

285

else:

286

await msg.nak(delay=30.0) # Retry after 30 seconds

287

288

except FatalError:

289

await msg.term() # Don't retry

290

except Exception:

291

await msg.nak() # Retry immediately

292

```

293

294

### Utility Functions

295

296

Helper functions for JetStream message handling.

297

298

```python { .api }

299

class JetStreamContext:

300

def is_status_msg(self, msg: Msg) -> bool:

301

"""

302

Check if message is a JetStream status message.

303

304

Parameters:

305

- msg: Message to check

306

307

Returns:

308

True if message is status message

309

"""

310

```

311

312

## Consumer Configuration

313

314

```python { .api }

315

from dataclasses import dataclass

316

from typing import Optional, List

317

from datetime import datetime, timedelta

318

319

@dataclass

320

class ConsumerConfig:

321

durable_name: Optional[str] = None

322

name: Optional[str] = None

323

description: Optional[str] = None

324

deliver_policy: str = "all" # "all", "last", "new", "by_start_sequence", "by_start_time"

325

opt_start_seq: Optional[int] = None

326

opt_start_time: Optional[datetime] = None

327

ack_policy: str = "explicit" # "none", "all", "explicit"

328

ack_wait: Optional[timedelta] = None

329

max_deliver: Optional[int] = None

330

filter_subject: Optional[str] = None

331

replay_policy: str = "instant" # "instant", "original"

332

rate_limit_bps: Optional[int] = None

333

sample_freq: Optional[str] = None

334

max_waiting: Optional[int] = None

335

max_ack_pending: Optional[int] = None

336

flow_control: bool = False

337

idle_heartbeat: Optional[timedelta] = None

338

headers_only: bool = False

339

max_request_batch: Optional[int] = None

340

max_request_expires: Optional[timedelta] = None

341

inactive_threshold: Optional[timedelta] = None

342

```

343

344

## JetStream Message Types

345

346

```python { .api }

347

@dataclass

348

class PubAck:

349

"""JetStream publish acknowledgment."""

350

stream: str

351

seq: int

352

duplicate: bool = False

353

domain: Optional[str] = None

354

355

class JetStreamSubscription:

356

"""JetStream push subscription."""

357

async def next_msg(self, timeout: float = 1.0) -> Msg:

358

"""Get next message with timeout."""

359

360

async def drain(self) -> None:

361

"""Drain subscription."""

362

363

def messages(self) -> AsyncIterator[Msg]:

364

"""Async iterator for messages."""

365

366

class PullSubscription:

367

"""JetStream pull subscription."""

368

async def fetch(

369

self,

370

batch_size: int = 1,

371

timeout: float = 5.0

372

) -> List[Msg]:

373

"""Fetch batch of messages."""

374

375

def messages(self) -> AsyncIterator[Msg]:

376

"""Async iterator for messages."""

377

```

378

379

## Constants

380

381

```python { .api }

382

# JetStream API constants

383

DEFAULT_JS_SUB_PENDING_MSGS_LIMIT = 512 * 1024

384

DEFAULT_JS_SUB_PENDING_BYTES_LIMIT = 256 * 1024 * 1024

385

386

# Delivery policies

387

DELIVER_ALL = "all"

388

DELIVER_LAST = "last"

389

DELIVER_NEW = "new"

390

DELIVER_BY_START_SEQUENCE = "by_start_sequence"

391

DELIVER_BY_START_TIME = "by_start_time"

392

393

# Acknowledgment policies

394

ACK_NONE = "none"

395

ACK_ALL = "all"

396

ACK_EXPLICIT = "explicit"

397

398

# Replay policies

399

REPLAY_INSTANT = "instant"

400

REPLAY_ORIGINAL = "original"

401

```