or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-client.mderror-handling.mdindex.mdjetstream-management.mdjetstream.mdkey-value-store.mdmessage-handling.mdmicroservices.mdobject-store.md

core-client.mddocs/

0

# Core NATS Client

1

2

Essential connection management and messaging functionality for the NATS Python client. Provides connection lifecycle management, publish/subscribe messaging, request/reply patterns, and subscription handling.

3

4

## Capabilities

5

6

### Connection Management

7

8

Establish and manage connections to NATS servers with support for clustering, authentication, TLS, and automatic reconnection.

9

10

```python { .api }

11

async def connect(

12

servers: Union[str, List[str]] = ["nats://localhost:4222"],

13

error_cb: Optional[ErrorCallback] = None,

14

disconnected_cb: Optional[Callback] = None,

15

closed_cb: Optional[Callback] = None,

16

discovered_server_cb: Optional[Callback] = None,

17

reconnected_cb: Optional[Callback] = None,

18

name: Optional[str] = None,

19

pedantic: bool = False,

20

verbose: bool = False,

21

allow_reconnect: bool = True,

22

connect_timeout: int = 2,

23

reconnect_time_wait: int = 2,

24

max_reconnect_attempts: int = 60,

25

ping_interval: int = 120,

26

max_outstanding_pings: int = 2,

27

dont_randomize: bool = False,

28

flusher_queue_size: int = 1024,

29

no_echo: bool = False,

30

tls: Optional[ssl.SSLContext] = None,

31

tls_hostname: Optional[str] = None,

32

tls_handshake_first: bool = False,

33

user: Optional[str] = None,

34

password: Optional[str] = None,

35

token: Optional[str] = None,

36

drain_timeout: int = 30,

37

signature_cb: Optional[SignatureCallback] = None,

38

user_jwt_cb: Optional[JWTCallback] = None,

39

user_credentials: Optional[Credentials] = None,

40

nkeys_seed: Optional[str] = None,

41

nkeys_seed_str: Optional[str] = None,

42

inbox_prefix: Union[str, bytes] = b"_INBOX",

43

pending_size: int = 2 * 1024 * 1024,

44

flush_timeout: Optional[float] = None

45

) -> NATS:

46

"""

47

Connect to NATS server(s).

48

49

Parameters:

50

- servers: Server URLs to connect to

51

- error_cb: Callback for error events

52

- disconnected_cb: Callback for disconnection events

53

- closed_cb: Callback for connection closed events

54

- discovered_server_cb: Callback for server discovery events

55

- reconnected_cb: Callback for reconnection events

56

- name: Client name for identification

57

- pedantic: Enable pedantic protocol checking

58

- verbose: Enable verbose protocol logging

59

- allow_reconnect: Enable automatic reconnection

60

- connect_timeout: Connection timeout in seconds

61

- reconnect_time_wait: Wait time between reconnection attempts

62

- max_reconnect_attempts: Maximum reconnection attempts

63

- ping_interval: Ping interval in seconds

64

- max_outstanding_pings: Maximum outstanding pings

65

- dont_randomize: Don't randomize server connection order

66

- flusher_queue_size: Maximum flusher queue size

67

- no_echo: Disable message echo from server

68

- tls: SSL context for TLS connections

69

- tls_hostname: Hostname for TLS verification

70

- tls_handshake_first: Perform TLS handshake before INFO

71

- user: Username for authentication

72

- password: Password for authentication

73

- token: Token for authentication

74

- drain_timeout: Drain timeout in seconds

75

- signature_cb: Callback for message signing

76

- user_jwt_cb: Callback for JWT authentication

77

- user_credentials: Path to user credentials file

78

- nkeys_seed: NKEYS seed for authentication

79

- nkeys_seed_str: NKEYS seed string for authentication

80

- inbox_prefix: Prefix for inbox subjects

81

- pending_size: Maximum pending data size

82

- flush_timeout: Flush timeout in seconds

83

84

Returns:

85

Connected NATS client instance

86

"""

87

```

88

89

#### Usage Examples

90

91

```python

92

import asyncio

93

import nats

94

import ssl

95

96

# Basic connection

97

nc = await nats.connect()

98

99

# Multiple servers with clustering

100

nc = await nats.connect([

101

"nats://server1:4222",

102

"nats://server2:4222",

103

"nats://server3:4222"

104

])

105

106

# Authenticated connection

107

nc = await nats.connect(

108

servers=["nats://demo.nats.io:4222"],

109

user="myuser",

110

password="mypass"

111

)

112

113

# TLS connection

114

ssl_ctx = ssl.create_default_context()

115

nc = await nats.connect(

116

servers=["tls://demo.nats.io:4443"],

117

tls=ssl_ctx

118

)

119

120

# With credentials file

121

nc = await nats.connect(

122

servers=["nats://connect.ngs.global"],

123

user_credentials="/path/to/user.creds"

124

)

125

```

126

127

### Connection Lifecycle

128

129

Manage connection state and gracefully close connections.

130

131

```python { .api }

132

class NATS:

133

# Connection state constants

134

DISCONNECTED = 0

135

CONNECTED = 1

136

CLOSED = 2

137

RECONNECTING = 3

138

CONNECTING = 4

139

DRAINING_SUBS = 5

140

DRAINING_PUBS = 6

141

142

async def close(self) -> None:

143

"""Close the connection immediately."""

144

145

async def drain(self) -> None:

146

"""

147

Drain and close connection gracefully.

148

Stops accepting new messages and closes after processing pending.

149

"""

150

151

def is_connected(self) -> bool:

152

"""Check if client is connected to server."""

153

154

def is_closed(self) -> bool:

155

"""Check if client connection is closed."""

156

157

def is_reconnecting(self) -> bool:

158

"""Check if client is in reconnecting state."""

159

160

def is_connecting(self) -> bool:

161

"""Check if client is in connecting state."""

162

163

def is_draining(self) -> bool:

164

"""Check if client is draining subscriptions."""

165

166

def is_draining_pubs(self) -> bool:

167

"""Check if client is draining publications."""

168

```

169

170

### Publishing

171

172

Send messages to subjects with optional reply subjects and headers.

173

174

```python { .api }

175

class NATS:

176

async def publish(

177

self,

178

subject: str,

179

payload: bytes = b"",

180

reply: str = "",

181

headers: Optional[Dict[str, str]] = None

182

) -> None:

183

"""

184

Publish message to subject.

185

186

Parameters:

187

- subject: Target subject

188

- payload: Message data

189

- reply: Reply subject for responses

190

- headers: Message headers

191

"""

192

193

async def flush(self, timeout: float = 10.0) -> None:

194

"""

195

Flush pending messages to server.

196

197

Parameters:

198

- timeout: Flush timeout in seconds

199

"""

200

201

def pending_data_size(self) -> int:

202

"""Get size of pending outbound data in bytes."""

203

```

204

205

#### Usage Examples

206

207

```python

208

# Simple publish

209

await nc.publish("events.user.created", b'{"user_id": 123}')

210

211

# Publish with reply subject

212

await nc.publish("events.notify", b"Alert!", reply="responses.alerts")

213

214

# Publish with headers

215

headers = {"Content-Type": "application/json", "User-ID": "123"}

216

await nc.publish("api.requests", b'{"action": "create"}', headers=headers)

217

218

# Ensure delivery

219

await nc.publish("critical.event", b"Important data")

220

await nc.flush() # Wait for server acknowledgment

221

```

222

223

### Subscribing

224

225

Subscribe to subjects with callback handlers or async iteration.

226

227

```python { .api }

228

class NATS:

229

async def subscribe(

230

self,

231

subject: str,

232

queue: str = "",

233

cb: Optional[Callable[[Msg], Awaitable[None]]] = None,

234

future: Optional[asyncio.Future] = None,

235

max_msgs: int = 0,

236

pending_msgs_limit: int = 65536,

237

pending_bytes_limit: int = 67108864

238

) -> Subscription:

239

"""

240

Subscribe to subject.

241

242

Parameters:

243

- subject: Subject pattern to subscribe to

244

- queue: Queue group for load balancing

245

- cb: Message callback handler

246

- future: Future to complete on first message

247

- max_msgs: Maximum messages (0 = unlimited)

248

- pending_msgs_limit: Maximum pending messages

249

- pending_bytes_limit: Maximum pending bytes

250

251

Returns:

252

Subscription object

253

"""

254

255

def new_inbox(self) -> str:

256

"""Generate unique inbox subject for replies."""

257

```

258

259

#### Usage Examples

260

261

```python

262

# Callback-based subscription

263

async def message_handler(msg):

264

print(f"Received on {msg.subject}: {msg.data.decode()}")

265

266

sub = await nc.subscribe("events.*", cb=message_handler)

267

268

# Queue group subscription for load balancing

269

await nc.subscribe("work.queue", queue="workers", cb=process_work)

270

271

# Async iteration subscription

272

sub = await nc.subscribe("notifications")

273

async for msg in sub.messages():

274

await handle_notification(msg)

275

if some_condition:

276

break

277

278

# One-time subscription

279

future = asyncio.Future()

280

await nc.subscribe("single.event", future=future, max_msgs=1)

281

msg = await future

282

```

283

284

### Request-Reply

285

286

Send requests and receive responses with timeout handling.

287

288

```python { .api }

289

class NATS:

290

async def request(

291

self,

292

subject: str,

293

payload: bytes = b"",

294

timeout: float = 0.5,

295

old_style: bool = False,

296

headers: Optional[Dict[str, Any]] = None

297

) -> Msg:

298

"""

299

Send request and wait for response.

300

301

Parameters:

302

- subject: Request subject

303

- payload: Request data

304

- timeout: Response timeout in seconds

305

- old_style: Use old-style request format

306

- headers: Request headers

307

308

Returns:

309

Response message

310

311

Raises:

312

- TimeoutError: No response within timeout

313

- NoRespondersError: No services listening

314

"""

315

```

316

317

#### Usage Examples

318

319

```python

320

# Simple request-reply

321

try:

322

response = await nc.request("api.users.get", b'{"id": 123}', timeout=2.0)

323

user_data = response.data.decode()

324

print(f"User: {user_data}")

325

except TimeoutError:

326

print("Request timed out")

327

except NoRespondersError:

328

print("No service available")

329

330

# Request with headers

331

headers = {"Authorization": "Bearer token123"}

332

response = await nc.request(

333

"secure.api.data",

334

b'{"query": "SELECT * FROM users"}',

335

headers=headers,

336

timeout=5.0

337

)

338

```

339

340

### Server Information

341

342

Access server and connection information.

343

344

```python { .api }

345

class NATS:

346

def connected_url(self) -> str:

347

"""Get currently connected server URL."""

348

349

def servers(self) -> List[str]:

350

"""Get list of configured servers."""

351

352

def discovered_servers(self) -> List[str]:

353

"""Get list of servers discovered from cluster."""

354

355

def max_payload(self) -> int:

356

"""Get maximum payload size supported by server."""

357

358

def client_id(self) -> int:

359

"""Get unique client ID assigned by server."""

360

361

def connected_server_version(self) -> str:

362

"""Get version of connected server."""

363

364

def last_error(self) -> Exception:

365

"""Get last error encountered."""

366

```

367

368

### JetStream Integration

369

370

Access JetStream functionality from the core client.

371

372

```python { .api }

373

class NATS:

374

def jetstream(self, **opts) -> JetStreamContext:

375

"""

376

Get JetStream context for stream operations.

377

378

Parameters:

379

- prefix: Custom JetStream API prefix

380

- domain: JetStream domain

381

- timeout: Default operation timeout

382

383

Returns:

384

JetStream context instance

385

"""

386

387

def jsm(self, **opts) -> JetStreamManager:

388

"""

389

Get JetStream manager for administrative operations.

390

391

Parameters:

392

- prefix: Custom JetStream API prefix

393

- domain: JetStream domain

394

- timeout: Default operation timeout

395

396

Returns:

397

JetStream manager instance

398

"""

399

```

400

401

## Types

402

403

```python { .api }

404

from typing import Union, List, Dict, Optional, Callable, AsyncIterator

405

import ssl

406

import asyncio

407

408

# Connection types

409

Servers = Union[str, List[str]]

410

ConnectOptions = Dict[str, Union[str, int, bool, Callable]]

411

SSLContext = ssl.SSLContext

412

413

# Callback types

414

ErrorCallback = Callable[[Exception], None]

415

ClosedCallback = Callable[[], None]

416

DisconnectedCallback = Callable[[], None]

417

ReconnectedCallback = Callable[[], None]

418

MessageCallback = Callable[[Msg], None]

419

SignatureCallback = Callable[[str], bytes]

420

UserJWTCallback = Callable[[], Tuple[str, str]]

421

422

# Message types

423

Headers = Optional[Dict[str, str]]

424

Payload = bytes

425

Subject = str

426

```