or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

compression.mdconnection.mdentities.mdexceptions.mdindex.mdmessaging.mdmixins.mdpools.mdserialization.mdsimple.md

pools.mddocs/

0

# Connection and Producer Pooling

1

2

Resource pooling for connections and producers to optimize performance and manage resources efficiently in high-throughput applications. Connection pools prevent the overhead of establishing connections for each operation, while producer pools maintain ready-to-use producer instances.

3

4

## Core Imports

5

6

```python

7

from kombu import pools

8

from kombu.pools import connections, producers, ProducerPool, PoolGroup

9

from kombu.pools import get_limit, set_limit, reset

10

```

11

12

## Capabilities

13

14

### Global Pool Instances

15

16

Pre-configured global pools for connections and producers that can be used throughout an application.

17

18

```python { .api }

19

connections: Connections # Global connection pool group

20

producers: Producers # Global producer pool group

21

```

22

23

**Usage Example:**

24

25

```python

26

from kombu import pools

27

28

# Use global connection pool

29

with pools.connections['redis://localhost:6379/0'].acquire() as conn:

30

# Use connection

31

with conn.channel() as channel:

32

# Operate on channel

33

pass

34

35

# Use global producer pool

36

with pools.producers['redis://localhost:6379/0'].acquire() as producer:

37

producer.publish({'message': 'data'}, routing_key='task')

38

```

39

40

### ProducerPool Class

41

42

Pool of Producer instances that share connections from a connection pool.

43

44

```python { .api }

45

class ProducerPool:

46

"""Pool of kombu.Producer instances."""

47

48

def __init__(self, connections, *args, **kwargs):

49

"""

50

Initialize producer pool.

51

52

Parameters:

53

- connections: Connection pool to use for producers

54

- Producer: Producer class to use (default: kombu.Producer)

55

- *args, **kwargs: Passed to Resource base class

56

"""

57

58

def acquire(self, block=False, timeout=None):

59

"""

60

Acquire producer from pool.

61

62

Parameters:

63

- block (bool): Whether to block if no resource available

64

- timeout (float): Timeout for blocking acquire

65

66

Returns:

67

Producer instance from pool

68

"""

69

70

def release(self, resource):

71

"""

72

Release producer back to pool.

73

74

Parameters:

75

- resource: Producer instance to release

76

"""

77

78

def create_producer(self):

79

"""Create new producer instance."""

80

81

def prepare(self, producer):

82

"""Prepare producer for use."""

83

84

def close_resource(self, resource):

85

"""Close producer resource."""

86

```

87

88

### PoolGroup Class

89

90

Base class for managing collections of resource pools, automatically creating pools for different resources as needed.

91

92

```python { .api }

93

class PoolGroup:

94

"""Collection of resource pools."""

95

96

def __init__(self, limit=None, close_after_fork=True):

97

"""

98

Initialize pool group.

99

100

Parameters:

101

- limit (int): Default limit for created pools

102

- close_after_fork (bool): Whether to close pools after fork

103

"""

104

105

def create(self, resource, limit):

106

"""

107

Create pool for resource (must be implemented by subclasses).

108

109

Parameters:

110

- resource: Resource to create pool for

111

- limit (int): Pool size limit

112

113

Returns:

114

Pool instance for resource

115

"""

116

117

def __getitem__(self, resource):

118

"""Get or create pool for resource."""

119

120

def __missing__(self, resource):

121

"""Create new pool when resource not found."""

122

```

123

124

### Concrete Pool Group Classes

125

126

Specific implementations of PoolGroup for connections and producers.

127

128

```python { .api }

129

class Connections(PoolGroup):

130

"""Collection of connection pools."""

131

132

def create(self, connection, limit):

133

"""Create connection pool for given connection."""

134

135

class Producers(PoolGroup):

136

"""Collection of producer pools."""

137

138

def create(self, connection, limit):

139

"""Create producer pool for given connection."""

140

```

141

142

### Pool Management Functions

143

144

Global functions for managing pool limits and state.

145

146

```python { .api }

147

def get_limit() -> int:

148

"""Get current global connection pool limit."""

149

150

def set_limit(limit: int, force=False, reset_after=False, ignore_errors=False) -> int:

151

"""

152

Set new global connection pool limit.

153

154

Parameters:

155

- limit (int): New pool size limit

156

- force (bool): Force limit change even if same as current

157

- reset_after (bool): Reset pools after setting limit

158

- ignore_errors (bool): Ignore errors during pool resizing

159

160

Returns:

161

The set limit value

162

"""

163

164

def reset(*args, **kwargs):

165

"""Reset all pools by closing open resources and clearing groups."""

166

167

def register_group(group) -> PoolGroup:

168

"""Register pool group for management (can be used as decorator)."""

169

```

170

171

## Advanced Usage

172

173

### Custom Producer Pool

174

175

```python

176

from kombu.pools import ProducerPool

177

from kombu import Connection

178

179

# Create custom producer pool

180

conn_pool = Connection('redis://localhost:6379/0').Pool(limit=10)

181

producer_pool = ProducerPool(conn_pool, limit=5)

182

183

# Use custom pool

184

with producer_pool.acquire() as producer:

185

producer.publish({'custom': 'message'}, routing_key='custom')

186

```

187

188

### Pool Limit Management

189

190

```python

191

from kombu.pools import get_limit, set_limit, reset

192

193

# Check current limit

194

current = get_limit()

195

print(f"Current pool limit: {current}")

196

197

# Set new limit

198

set_limit(20)

199

200

# Reset all pools (useful for testing or cleanup)

201

reset()

202

```

203

204

### Context Manager Usage

205

206

```python

207

from kombu.pools import connections, producers

208

209

# Connection pool context manager

210

with connections['redis://localhost:6379/0'].acquire() as conn:

211

# Connection is automatically released when exiting context

212

with conn.channel() as channel:

213

# Use channel

214

pass

215

216

# Producer pool context manager

217

with producers['redis://localhost:6379/0'].acquire() as producer:

218

# Producer is automatically released when exiting context

219

producer.publish({'msg': 'data'}, routing_key='task')

220

```

221

222

## Error Handling

223

224

Pool operations can raise various exceptions:

225

226

```python

227

from kombu.pools import producers

228

from kombu.exceptions import LimitExceeded, OperationalError

229

230

try:

231

with producers['redis://localhost:6379/0'].acquire(timeout=5.0) as producer:

232

producer.publish({'data': 'message'}, routing_key='task')

233

except LimitExceeded:

234

print("Pool limit exceeded, no resources available")

235

except OperationalError as e:

236

print(f"Connection error: {e}")

237

```