or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-client.mderror-handling.mdindex.mdjetstream-management.mdjetstream.mdkey-value-store.mdmessage-handling.mdmicroservices.mdobject-store.md

index.mddocs/

0

# NATS Python Client

1

2

An asyncio Python client for the NATS messaging system with comprehensive support for JetStream persistent messaging, microservices framework, key-value store, object store, and high-performance real-time messaging patterns.

3

4

## Package Information

5

6

- **Package Name**: nats-py

7

- **Language**: Python

8

- **Installation**: `pip install nats-py`

9

- **Minimum Python Version**: 3.7+

10

11

## Core Imports

12

13

```python

14

import nats

15

```

16

17

Common for working with the client:

18

19

```python

20

from nats.aio.client import Client as NATS

21

```

22

23

JetStream functionality:

24

25

```python

26

# JetStream is accessed via the client's jetstream() method

27

nc = await nats.connect()

28

js = nc.jetstream()

29

```

30

31

Microservices functionality:

32

33

```python

34

from nats.micro import add_service

35

```

36

37

Error handling:

38

39

```python

40

from nats.errors import ConnectionClosedError, TimeoutError

41

```

42

43

## Basic Usage

44

45

```python

46

import asyncio

47

import nats

48

49

async def main():

50

# Connect to NATS server

51

nc = await nats.connect("nats://localhost:4222")

52

53

# Simple publish

54

await nc.publish("foo", b"Hello World!")

55

56

# Simple subscribe

57

async def message_handler(msg):

58

print(f"Received: {msg.data.decode()}")

59

60

await nc.subscribe("foo", cb=message_handler)

61

62

# Request-reply pattern

63

response = await nc.request("help", b"help me", timeout=1.0)

64

print(f"Response: {response.data.decode()}")

65

66

# Cleanup

67

await nc.drain()

68

await nc.close()

69

70

if __name__ == '__main__':

71

asyncio.run(main())

72

```

73

74

## Architecture

75

76

NATS Python client provides a layered architecture for scalable messaging:

77

78

- **Core Client**: Connection management, publish/subscribe, request/reply messaging

79

- **JetStream**: Stream processing layer for persistent messaging, key-value store, object store

80

- **Microservices**: Service discovery, load balancing, monitoring framework

81

- **Transport Layer**: TCP, WebSocket, TLS support with automatic reconnection

82

- **Protocol Layer**: NATS wire protocol implementation with high-performance parsing

83

84

This design enables the client to serve as the foundation for distributed applications requiring reliable message delivery, real-time communication, and persistent data storage.

85

86

## Capabilities

87

88

### Core NATS Client

89

90

Essential connection management and messaging functionality including connect/disconnect, publish/subscribe, request/reply patterns, subscription management, and connection lifecycle handling.

91

92

```python { .api }

93

async def connect(servers=["nats://localhost:4222"], **options) -> NATS: ...

94

95

class NATS:

96

async def publish(self, subject: str, payload: bytes = b"", reply: str = "", headers: dict = None) -> None: ...

97

async def subscribe(self, subject: str, queue: str = "", cb: callable = None, **kwargs) -> Subscription: ...

98

async def request(self, subject: str, payload: bytes = b"", timeout: float = 0.5, headers: dict = None) -> Msg: ...

99

async def close(self) -> None: ...

100

async def drain(self) -> None: ...

101

```

102

103

[Core NATS Client](./core-client.md)

104

105

### JetStream Stream Processing

106

107

Persistent messaging with streams, consumers, message acknowledgments, delivery guarantees, and advanced features like ordered consumers and pull subscriptions for building resilient applications.

108

109

```python { .api }

110

class JetStreamContext:

111

async def publish(self, subject: str, payload: bytes = b"", timeout: float = None, stream: str = None, headers: dict = None) -> PubAck: ...

112

async def subscribe(self, subject: str, durable: str = None, config: ConsumerConfig = None, **kwargs) -> JetStreamSubscription: ...

113

async def pull_subscribe(self, subject: str, durable: str = None, config: ConsumerConfig = None, **kwargs) -> PullSubscription: ...

114

```

115

116

[JetStream](./jetstream.md)

117

118

### JetStream Management

119

120

Administrative APIs for creating and managing streams, consumers, accounts with comprehensive configuration options, monitoring capabilities, and cluster management.

121

122

```python { .api }

123

class JetStreamManager:

124

async def add_stream(self, config: StreamConfig = None, **params) -> StreamInfo: ...

125

async def add_consumer(self, stream: str, config: ConsumerConfig = None, **params) -> ConsumerInfo: ...

126

async def account_info(self) -> AccountInfo: ...

127

async def delete_stream(self, name: str) -> bool: ...

128

```

129

130

[JetStream Management](./jetstream-management.md)

131

132

### Key-Value Store

133

134

Distributed key-value storage built on JetStream streams with atomic operations, conditional updates, history tracking, and watch capabilities for building stateful applications.

135

136

```python { .api }

137

class KeyValue:

138

async def get(self, key: str, revision: int = None) -> Entry: ...

139

async def put(self, key: str, value: bytes, revision: int = None) -> int: ...

140

async def delete(self, key: str, revision: int = None) -> bool: ...

141

async def watch(self, key: str, **kwargs) -> AsyncIterator[Entry]: ...

142

```

143

144

[Key-Value Store](./key-value-store.md)

145

146

### Object Store

147

148

Scalable object storage with metadata, chunking for large objects, content addressing, and efficient streaming for storing and retrieving binary data and files.

149

150

```python { .api }

151

class ObjectStore:

152

async def put(self, name: str, data: bytes, **kwargs) -> ObjectInfo: ...

153

async def get(self, name: str) -> bytes: ...

154

async def get_info(self, name: str, show_deleted: bool = False) -> ObjectInfo: ...

155

async def delete(self, name: str) -> bool: ...

156

```

157

158

[Object Store](./object-store.md)

159

160

### Microservices Framework

161

162

Service discovery, request routing, monitoring, and health checks with built-in load balancing, error handling, and comprehensive service information for building distributed microservice architectures.

163

164

```python { .api }

165

async def add_service(nc: NATS, config: ServiceConfig = None, **kwargs) -> Service: ...

166

167

class Service:

168

async def start(self) -> None: ...

169

async def stop(self) -> None: ...

170

async def add_endpoint(self, config: EndpointConfig) -> Endpoint: ...

171

```

172

173

[Microservices](./microservices.md)

174

175

### Message Handling

176

177

Message processing with headers, reply handling, JetStream acknowledgments, and comprehensive metadata access for building sophisticated message-driven applications.

178

179

```python { .api }

180

class Msg:

181

subject: str

182

data: bytes

183

reply: str

184

headers: dict

185

186

async def respond(self, data: bytes) -> None: ...

187

async def ack(self) -> None: ...

188

async def nak(self, delay: float = None) -> None: ...

189

```

190

191

[Message Handling](./message-handling.md)

192

193

### Error Handling

194

195

Comprehensive exception hierarchy covering connection, protocol, JetStream, key-value, object store, and microservice errors with specific error types for precise error handling.

196

197

```python { .api }

198

class Error(Exception): ...

199

class ConnectionClosedError(Error): ...

200

class TimeoutError(Error): ...

201

class NoRespondersError(Error): ...

202

# ... additional error types

203

```

204

205

[Error Handling](./error-handling.md)

206

207

## Types

208

209

```python { .api }

210

from typing import Union, List, Dict, Optional, Callable, AsyncIterator, Awaitable, Any

211

from dataclasses import dataclass

212

213

# Core types used across the API

214

Servers = Union[str, List[str]]

215

Handler = Callable[[Msg], Awaitable[None]]

216

Headers = Optional[Dict[str, str]]

217

218

# Callback types

219

Callback = Callable[[], Awaitable[None]]

220

ErrorCallback = Callable[[Exception], Awaitable[None]]

221

JWTCallback = Callable[[], Union[bytearray, bytes]]

222

SignatureCallback = Callable[[str], bytes]

223

```