or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-client.mderror-handling.mdindex.mdjetstream-management.mdjetstream.mdkey-value-store.mdmessage-handling.mdmicroservices.mdobject-store.md

jetstream-management.mddocs/

0

# JetStream Management

1

2

Administrative APIs for managing JetStream streams, consumers, and accounts. Provides comprehensive configuration, monitoring, and maintenance capabilities for JetStream infrastructure.

3

4

## Capabilities

5

6

### Stream Management

7

8

Create, configure, and manage JetStream streams for persistent message storage.

9

10

```python { .api }

11

class JetStreamManager:

12

async def add_stream(

13

self,

14

config: StreamConfig = None,

15

**params

16

) -> StreamInfo:

17

"""

18

Create new JetStream stream.

19

20

Parameters:

21

- config: Complete stream configuration

22

- **params: Individual configuration parameters

23

24

Returns:

25

Stream information including configuration and state

26

"""

27

28

async def update_stream(

29

self,

30

config: StreamConfig = None,

31

**params

32

) -> StreamInfo:

33

"""

34

Update existing stream configuration.

35

36

Parameters:

37

- config: Updated stream configuration

38

- **params: Individual configuration parameters

39

40

Returns:

41

Updated stream information

42

"""

43

44

async def delete_stream(self, name: str) -> bool:

45

"""

46

Delete stream and all its messages.

47

48

Parameters:

49

- name: Stream name to delete

50

51

Returns:

52

True if stream was deleted

53

"""

54

55

async def stream_info(

56

self,

57

name: str,

58

subjects_filter: str = None

59

) -> StreamInfo:

60

"""

61

Get stream information and statistics.

62

63

Parameters:

64

- name: Stream name

65

- subjects_filter: Filter subjects in response

66

67

Returns:

68

Stream information including state and configuration

69

"""

70

71

async def find_stream_name_by_subject(self, subject: str) -> str:

72

"""

73

Find stream name that matches subject.

74

75

Parameters:

76

- subject: Subject to find stream for

77

78

Returns:

79

Stream name or raises NotFoundError

80

"""

81

```

82

83

#### Usage Examples

84

85

```python

86

import asyncio

87

import nats

88

from nats.js.api import StreamConfig

89

90

async def main():

91

nc = await nats.connect()

92

jsm = nc.jsm()

93

94

# Create stream with configuration object

95

stream_config = StreamConfig(

96

name="events",

97

subjects=["events.*", "alerts.>"],

98

storage="file",

99

retention="limits",

100

max_msgs=1000000,

101

max_bytes=1024*1024*1024, # 1GB

102

max_age=timedelta(days=30),

103

max_consumers=10,

104

duplicate_window=timedelta(minutes=2)

105

)

106

107

stream_info = await jsm.add_stream(config=stream_config)

108

print(f"Created stream: {stream_info.config.name}")

109

110

# Create stream with parameters

111

await jsm.add_stream(

112

name="metrics",

113

subjects=["metrics.cpu.*", "metrics.memory.*"],

114

storage="memory",

115

max_msgs=100000,

116

max_age=timedelta(hours=24)

117

)

118

119

# Update stream configuration

120

await jsm.update_stream(

121

name="events",

122

max_msgs=2000000,

123

description="Updated event stream"

124

)

125

126

# Get stream information

127

info = await jsm.stream_info("events")

128

print(f"Stream has {info.state.messages} messages")

129

130

# Find stream by subject

131

stream_name = await jsm.find_stream_name_by_subject("events.user.login")

132

print(f"Subject belongs to stream: {stream_name}")

133

```

134

135

### Stream Listing and Iteration

136

137

List and iterate through all streams in the account.

138

139

```python { .api }

140

class JetStreamManager:

141

async def streams_info(self, offset: int = 0) -> List[StreamInfo]:

142

"""

143

Get information for all streams.

144

145

Parameters:

146

- offset: Starting offset for pagination

147

148

Returns:

149

List of stream information objects

150

"""

151

152

async def streams_info_iterator(self, **kwargs) -> AsyncIterator[StreamInfo]:

153

"""

154

Iterate through all streams with pagination.

155

156

Returns:

157

Async iterator yielding stream information

158

"""

159

```

160

161

#### Usage Examples

162

163

```python

164

# List all streams

165

streams = await jsm.streams_info()

166

for stream in streams:

167

print(f"Stream: {stream.config.name}, Messages: {stream.state.messages}")

168

169

# Iterate through streams

170

async for stream_info in jsm.streams_info_iterator():

171

print(f"Processing stream: {stream_info.config.name}")

172

if stream_info.state.bytes > 1024*1024*100: # 100MB

173

print(f"Large stream: {stream_info.config.name}")

174

```

175

176

### Consumer Management

177

178

Create and manage consumers for stream consumption patterns.

179

180

```python { .api }

181

class JetStreamManager:

182

async def add_consumer(

183

self,

184

stream: str,

185

config: ConsumerConfig = None,

186

**params

187

) -> ConsumerInfo:

188

"""

189

Create consumer for stream.

190

191

Parameters:

192

- stream: Stream name

193

- config: Consumer configuration

194

- **params: Individual configuration parameters

195

196

Returns:

197

Consumer information including configuration and state

198

"""

199

200

async def delete_consumer(self, stream: str, consumer: str) -> bool:

201

"""

202

Delete consumer from stream.

203

204

Parameters:

205

- stream: Stream name

206

- consumer: Consumer name

207

208

Returns:

209

True if consumer was deleted

210

"""

211

212

async def consumer_info(self, stream: str, consumer: str) -> ConsumerInfo:

213

"""

214

Get consumer information and statistics.

215

216

Parameters:

217

- stream: Stream name

218

- consumer: Consumer name

219

220

Returns:

221

Consumer information including state and configuration

222

"""

223

224

async def consumers_info(self, stream: str, offset: int = 0) -> List[ConsumerInfo]:

225

"""

226

Get information for all consumers in stream.

227

228

Parameters:

229

- stream: Stream name

230

- offset: Starting offset for pagination

231

232

Returns:

233

List of consumer information objects

234

"""

235

```

236

237

#### Usage Examples

238

239

```python

240

from nats.js.api import ConsumerConfig

241

from datetime import timedelta

242

243

# Create durable consumer

244

consumer_config = ConsumerConfig(

245

durable_name="event-processor",

246

deliver_policy="all",

247

ack_policy="explicit",

248

ack_wait=timedelta(seconds=30),

249

max_deliver=3,

250

filter_subject="events.user.*"

251

)

252

253

consumer_info = await jsm.add_consumer("events", config=consumer_config)

254

print(f"Created consumer: {consumer_info.name}")

255

256

# Create ephemeral consumer with parameters

257

await jsm.add_consumer(

258

stream="metrics",

259

deliver_policy="new",

260

ack_policy="explicit",

261

max_ack_pending=100

262

)

263

264

# Get consumer information

265

info = await jsm.consumer_info("events", "event-processor")

266

print(f"Consumer delivered {info.delivered.stream_seq} messages")

267

268

# List all consumers for stream

269

consumers = await jsm.consumers_info("events")

270

for consumer in consumers:

271

print(f"Consumer: {consumer.name}, Pending: {consumer.num_pending}")

272

```

273

274

### Message Management

275

276

Direct message operations on streams.

277

278

```python { .api }

279

class JetStreamManager:

280

async def get_msg(

281

self,

282

stream_name: str,

283

seq: int,

284

**kwargs

285

) -> RawStreamMsg:

286

"""

287

Get message by sequence number.

288

289

Parameters:

290

- stream_name: Stream name

291

- seq: Message sequence number

292

293

Returns:

294

Raw stream message with metadata

295

"""

296

297

async def delete_msg(self, stream_name: str, seq: int) -> bool:

298

"""

299

Delete message by sequence number.

300

301

Parameters:

302

- stream_name: Stream name

303

- seq: Message sequence number

304

305

Returns:

306

True if message was deleted

307

"""

308

309

async def get_last_msg(self, stream_name: str, subject: str) -> RawStreamMsg:

310

"""

311

Get last message for subject.

312

313

Parameters:

314

- stream_name: Stream name

315

- subject: Subject filter

316

317

Returns:

318

Last message matching subject

319

"""

320

321

async def purge_stream(self, name: str, **opts) -> bool:

322

"""

323

Purge messages from stream.

324

325

Parameters:

326

- name: Stream name

327

- subject: Purge messages matching subject filter

328

- seq: Purge up to sequence number

329

- keep: Keep latest N messages

330

331

Returns:

332

True if stream was purged

333

"""

334

```

335

336

#### Usage Examples

337

338

```python

339

# Get specific message

340

msg = await jsm.get_msg("events", seq=12345)

341

print(f"Message data: {msg.data.decode()}")

342

print(f"Subject: {msg.subject}")

343

344

# Get last message for subject

345

last_msg = await jsm.get_last_msg("events", "events.user.login")

346

print(f"Last login: {last_msg.data.decode()}")

347

348

# Delete specific message

349

await jsm.delete_msg("events", seq=12345)

350

351

# Purge old messages, keep latest 1000

352

await jsm.purge_stream("events", keep=1000)

353

354

# Purge messages by subject

355

await jsm.purge_stream("events", subject="events.test.*")

356

```

357

358

### Account Information

359

360

Get JetStream account limits and usage information.

361

362

```python { .api }

363

class JetStreamManager:

364

async def account_info(self) -> AccountInfo:

365

"""

366

Get JetStream account information.

367

368

Returns:

369

Account information including limits and usage statistics

370

"""

371

```

372

373

#### Usage Examples

374

375

```python

376

# Get account information

377

account = await jsm.account_info()

378

print(f"Memory usage: {account.memory} / {account.limits.max_memory}")

379

print(f"Storage usage: {account.store} / {account.limits.max_storage}")

380

print(f"Streams: {account.streams} / {account.limits.max_streams}")

381

print(f"Consumers: {account.consumers} / {account.limits.max_consumers}")

382

383

# Check if approaching limits

384

if account.memory > account.limits.max_memory * 0.8:

385

print("Warning: Approaching memory limit")

386

```

387

388

## Configuration Types

389

390

```python { .api }

391

from dataclasses import dataclass

392

from typing import Optional, List, Dict

393

from datetime import datetime, timedelta

394

395

@dataclass

396

class StreamConfig:

397

name: str

398

subjects: List[str] = None

399

retention: str = "limits" # "limits", "interest", "workqueue"

400

max_consumers: int = -1

401

max_msgs: int = -1

402

max_bytes: int = -1

403

max_age: timedelta = None

404

max_msgs_per_subject: int = -1

405

max_msg_size: int = -1

406

storage: str = "file" # "file", "memory"

407

num_replicas: int = 1

408

no_ack: bool = False

409

template_owner: str = None

410

discard: str = "old" # "old", "new"

411

duplicate_window: timedelta = None

412

placement: Placement = None

413

mirror: StreamSource = None

414

sources: List[StreamSource] = None

415

sealed: bool = False

416

deny_delete: bool = False

417

deny_purge: bool = False

418

allow_rollup_hdrs: bool = False

419

allow_direct: bool = False

420

mirror_direct: bool = False

421

republish: RePublish = None

422

description: str = None

423

metadata: Dict[str, str] = None

424

425

@dataclass

426

class ConsumerConfig:

427

durable_name: Optional[str] = None

428

name: Optional[str] = None

429

description: Optional[str] = None

430

deliver_policy: str = "all"

431

opt_start_seq: Optional[int] = None

432

opt_start_time: Optional[datetime] = None

433

ack_policy: str = "explicit"

434

ack_wait: Optional[timedelta] = None

435

max_deliver: Optional[int] = None

436

filter_subject: Optional[str] = None

437

replay_policy: str = "instant"

438

rate_limit_bps: Optional[int] = None

439

sample_freq: Optional[str] = None

440

max_waiting: Optional[int] = None

441

max_ack_pending: Optional[int] = None

442

flow_control: bool = False

443

idle_heartbeat: Optional[timedelta] = None

444

headers_only: bool = False

445

max_request_batch: Optional[int] = None

446

max_request_expires: Optional[timedelta] = None

447

inactive_threshold: Optional[timedelta] = None

448

num_replicas: int = 0

449

mem_storage: bool = False

450

metadata: Dict[str, str] = None

451

```

452

453

## Information Types

454

455

```python { .api }

456

@dataclass

457

class StreamInfo:

458

config: StreamConfig

459

state: StreamState

460

cluster: Optional[ClusterInfo] = None

461

mirror: Optional[StreamSourceInfo] = None

462

sources: Optional[List[StreamSourceInfo]] = None

463

alternates: Optional[List[StreamAlternate]] = None

464

465

@dataclass

466

class StreamState:

467

messages: int

468

bytes: int

469

first_seq: int

470

first_ts: datetime

471

last_seq: int

472

last_ts: datetime

473

consumers: int

474

deleted: Optional[List[int]] = None

475

lost: Optional[LostStreamData] = None

476

num_subjects: Optional[int] = None

477

478

@dataclass

479

class ConsumerInfo:

480

name: str

481

config: ConsumerConfig

482

delivered: SequenceInfo

483

ack_floor: SequenceInfo

484

num_ack_pending: int

485

num_redelivered: int

486

num_waiting: int

487

num_pending: int

488

cluster: Optional[ClusterInfo] = None

489

push_bound: bool = False

490

491

@dataclass

492

class AccountInfo:

493

memory: int

494

storage: int

495

streams: int

496

consumers: int

497

limits: AccountLimits

498

api: APIStats

499

domain: Optional[str] = None

500

501

@dataclass

502

class RawStreamMsg:

503

subject: str

504

seq: int

505

data: bytes

506

hdrs: Optional[bytes] = None

507

time: Optional[datetime] = None

508

stream: Optional[str] = None

509

```