or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application.mdauthentication.mdbroker.mdcommand-line.mdevents.mdindex.mdrest-api.mdtasks.mdutilities.mdweb-interface.mdworkers.md

broker.mddocs/

0

# Broker Integration

1

2

Multi-broker support for monitoring message queues across different broker types including RabbitMQ, Redis, and variants with comprehensive queue statistics.

3

4

## Capabilities

5

6

### Broker Factory

7

8

```python { .api }

9

class Broker:

10

"""

11

Broker abstraction factory for different message brokers.

12

13

Automatically selects appropriate broker implementation based on URL scheme.

14

"""

15

16

def __new__(cls, broker_url, *args, **kwargs):

17

"""

18

Create appropriate broker instance based on URL scheme.

19

20

Args:

21

broker_url (str): Broker connection URL with scheme

22

*args: Additional arguments passed to broker implementation

23

**kwargs: Additional keyword arguments passed to broker implementation

24

25

Returns:

26

BrokerBase: Broker implementation instance (RabbitMQ, Redis, etc.)

27

28

Supported schemes:

29

- 'amqp': RabbitMQ broker

30

- 'redis': Redis broker

31

- 'rediss': SSL Redis broker

32

- 'redis+socket': Unix socket Redis broker

33

- 'sentinel': Redis Sentinel broker

34

"""

35

36

async def queues(self, names):

37

"""

38

Get queue information for specified queues.

39

40

Args:

41

names (list): List of queue names to query

42

43

Returns:

44

dict: Queue information keyed by queue name

45

46

Async method that returns queue statistics including message counts,

47

consumer information, and queue configuration.

48

"""

49

```

50

51

### Broker Base Class

52

53

```python { .api }

54

class BrokerBase:

55

"""Abstract base class for broker implementations."""

56

57

def __init__(self, broker_url, *_, **__):

58

"""

59

Initialize broker with connection URL.

60

61

Args:

62

broker_url (str): Broker connection URL

63

"""

64

65

async def queues(self, names):

66

"""

67

Abstract method for retrieving queue information.

68

69

Args:

70

names (list): Queue names to query

71

72

Returns:

73

dict: Queue statistics and configuration

74

"""

75

```

76

77

### RabbitMQ Support

78

79

```python { .api }

80

class RabbitMQ(BrokerBase):

81

"""

82

RabbitMQ broker integration using Management API.

83

84

Supports comprehensive queue monitoring through RabbitMQ's

85

HTTP Management API.

86

"""

87

88

async def queues(self, names):

89

"""

90

Get RabbitMQ queue information via Management API.

91

92

Returns detailed queue statistics including:

93

- Message counts (ready, unacked, total)

94

- Consumer counts and details

95

- Queue configuration and properties

96

- Memory usage and performance metrics

97

"""

98

99

def __init__(self, broker_url, http_api, io_loop=None, **kwargs):

100

"""

101

Initialize RabbitMQ broker.

102

103

Args:

104

broker_url (str): AMQP connection URL

105

http_api (str): HTTP Management API URL

106

io_loop: Tornado IOLoop instance

107

**kwargs: Additional broker options

108

"""

109

110

@classmethod

111

def validate_http_api(cls, http_api):

112

"""

113

Validate HTTP Management API URL format.

114

115

Args:

116

http_api (str): Management API URL to validate

117

118

Raises:

119

ValueError: If URL scheme is invalid

120

"""

121

```

122

123

### Redis Base Class

124

125

```python { .api }

126

class RedisBase(BrokerBase):

127

"""

128

Base class for Redis broker implementations.

129

130

Provides common functionality for Redis-based brokers including

131

priority queue support and message counting.

132

"""

133

134

DEFAULT_SEP = '\x06\x16' # Priority separator

135

DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9] # Default priority levels

136

137

def __init__(self, broker_url, *_, **kwargs):

138

"""

139

Initialize Redis base broker.

140

141

Args:

142

broker_url (str): Redis connection URL

143

**kwargs: Broker options including:

144

- priority_steps: Custom priority levels

145

- sep: Priority separator character

146

- global_keyprefix: Key prefix for all operations

147

"""

148

149

def _q_for_pri(self, queue, pri):

150

"""

151

Generate priority-specific queue name.

152

153

Args:

154

queue (str): Base queue name

155

pri (int): Priority level

156

157

Returns:

158

str: Priority-specific queue name

159

"""

160

161

async def queues(self, names):

162

"""

163

Get queue statistics for Redis queues.

164

165

Args:

166

names (list): Queue names to query

167

168

Returns:

169

list: Queue statistics with message counts

170

171

Counts messages across all priority levels for each queue.

172

"""

173

```

174

175

### Redis Standard Implementation

176

177

```python { .api }

178

class Redis(RedisBase):

179

"""

180

Standard Redis broker implementation.

181

182

Supports Redis database selection and standard authentication.

183

"""

184

185

def __init__(self, broker_url, *args, **kwargs):

186

"""

187

Initialize Redis broker.

188

189

Args:

190

broker_url (str): Redis connection URL (redis://host:port/db)

191

"""

192

193

def _prepare_virtual_host(self, vhost):

194

"""

195

Convert virtual host to Redis database number.

196

197

Args:

198

vhost (str): Virtual host from URL

199

200

Returns:

201

int: Redis database number (0-15)

202

"""

203

204

def _get_redis_client_args(self):

205

"""

206

Get Redis client connection arguments.

207

208

Returns:

209

dict: Connection parameters for Redis client

210

"""

211

212

def _get_redis_client(self):

213

"""

214

Create Redis client instance.

215

216

Returns:

217

redis.Redis: Configured Redis client

218

"""

219

```

220

221

### Redis SSL Implementation

222

223

```python { .api }

224

class RedisSsl(Redis):

225

"""

226

Redis SSL/TLS broker implementation.

227

228

Provides encrypted connections to Redis using SSL/TLS with

229

configurable SSL parameters.

230

"""

231

232

def __init__(self, broker_url, *args, **kwargs):

233

"""

234

Initialize SSL Redis broker.

235

236

Args:

237

broker_url (str): Redis SSL URL (rediss://host:port/db)

238

**kwargs: Must include 'broker_use_ssl' configuration

239

240

Raises:

241

ValueError: If broker_use_ssl is not configured

242

"""

243

244

def _get_redis_client_args(self):

245

"""

246

Get SSL-enabled Redis client arguments.

247

248

Returns:

249

dict: Connection parameters with SSL configuration

250

"""

251

```

252

253

### Redis Sentinel Implementation

254

255

```python { .api }

256

class RedisSentinel(RedisBase):

257

"""

258

Redis Sentinel cluster implementation.

259

260

Provides high availability Redis access through Sentinel

261

with automatic failover support.

262

"""

263

264

def __init__(self, broker_url, *args, **kwargs):

265

"""

266

Initialize Redis Sentinel broker.

267

268

Args:

269

broker_url (str): Sentinel URL (sentinel://host:port/service)

270

**kwargs: Must include broker_options with master_name

271

272

Raises:

273

ValueError: If master_name is not provided

274

"""

275

276

def _prepare_master_name(self, broker_options):

277

"""

278

Extract master name from broker options.

279

280

Args:

281

broker_options (dict): Broker configuration

282

283

Returns:

284

str: Redis master service name

285

"""

286

287

def _get_redis_client(self, broker_options):

288

"""

289

Create Sentinel-aware Redis client.

290

291

Args:

292

broker_options (dict): Sentinel configuration

293

294

Returns:

295

redis.Redis: Sentinel-managed Redis client

296

"""

297

```

298

299

### Redis Unix Socket Implementation

300

301

```python { .api }

302

class RedisSocket(RedisBase):

303

"""

304

Redis Unix domain socket implementation.

305

306

Provides local Redis access through Unix sockets for

307

improved performance and security.

308

"""

309

310

def __init__(self, broker_url, *args, **kwargs):

311

"""

312

Initialize Unix socket Redis broker.

313

314

Args:

315

broker_url (str): Socket URL (redis+socket:///path/to/socket)

316

"""

317

```

318

319

## Queue Information Structure

320

321

```python { .api }

322

QueueInfo = {

323

'name': str, # Queue name

324

'messages': int, # Total messages in queue

325

'messages_ready': int, # Messages ready for delivery

326

'messages_unacknowledged': int, # Messages awaiting acknowledgment

327

'consumers': int, # Number of consumers

328

'consumer_details': [

329

{

330

'consumer_tag': str, # Consumer identifier

331

'channel_details': dict, # Channel information

332

'ack_required': bool, # Acknowledgment required

333

'prefetch_count': int, # Prefetch limit

334

}

335

],

336

'memory': int, # Memory usage in bytes

337

'policy': str, # Queue policy name

338

'arguments': dict, # Queue arguments

339

'auto_delete': bool, # Auto-delete setting

340

'durable': bool, # Durability setting

341

'exclusive': bool, # Exclusivity setting

342

'node': str, # Cluster node hosting queue

343

'state': str, # Queue state

344

'backing_queue_status': dict, # Internal queue status

345

}

346

```

347

348

## Usage Examples

349

350

### Basic Queue Monitoring

351

352

```python

353

from flower.utils.broker import Broker

354

355

# Get broker instance (auto-detected from Celery config)

356

broker = Broker(celery_app.conf.broker_url)

357

358

# Get queue information

359

queue_names = ['celery', 'high_priority', 'low_priority']

360

queue_info = await broker.queues(queue_names)

361

362

for name, info in queue_info.items():

363

print(f"Queue {name}:")

364

print(f" Messages: {info.get('messages', 'N/A')}")

365

print(f" Consumers: {info.get('consumers', 'N/A')}")

366

```

367

368

### RabbitMQ Management API

369

370

```python

371

from flower.utils.broker import RabbitMQ

372

373

# Configure Management API access

374

rabbitmq = RabbitMQ('amqp://guest:guest@localhost:5672//')

375

rabbitmq.management_api = 'http://guest:guest@localhost:15672/api/'

376

377

# Get detailed queue information

378

queues = await rabbitmq.queues(['celery'])

379

queue_info = queues['celery']

380

381

print(f"Ready messages: {queue_info['messages_ready']}")

382

print(f"Unacked messages: {queue_info['messages_unacknowledged']}")

383

print(f"Consumer count: {queue_info['consumers']}")

384

```

385

386

### Redis Queue Monitoring

387

388

```python

389

from flower.utils.broker import Redis

390

391

# Redis broker

392

redis_broker = Redis('redis://localhost:6379/0')

393

394

# Get queue lengths

395

queue_info = await redis_broker.queues(['celery'])

396

print(f"Queue length: {queue_info['celery']['messages']}")

397

```

398

399

## Configuration

400

401

### RabbitMQ Management API

402

403

```bash

404

# Configure Management API access

405

--broker-api=http://guest:guest@localhost:15672/api/

406

407

# Environment variable

408

export FLOWER_BROKER_API=http://guest:guest@localhost:15672/api/

409

```

410

411

### Broker URL Formats

412

413

```python

414

# RabbitMQ

415

broker_url = 'amqp://user:pass@host:5672/vhost'

416

417

# Redis

418

broker_url = 'redis://host:6379/0'

419

420

# Redis SSL

421

broker_url = 'rediss://host:6380/0'

422

423

# Redis Sentinel

424

broker_url = 'sentinel://host:26379/service-name'

425

426

# Redis Unix Socket

427

broker_url = 'redis+socket:///tmp/redis.sock'

428

```

429

430

## Error Handling

431

432

Broker integration includes comprehensive error handling:

433

434

```python

435

try:

436

queue_info = await broker.queues(['celery'])

437

except ConnectionError:

438

print("Cannot connect to broker")

439

except TimeoutError:

440

print("Broker request timed out")

441

except Exception as e:

442

print(f"Broker error: {e}")

443

```

444

445

## Performance Considerations

446

447

- Queue monitoring can impact broker performance with many queues

448

- Use appropriate timeouts for broker requests

449

- Cache queue information when possible

450

- Monitor API rate limits (RabbitMQ Management API)

451

- Consider broker load when configuring monitoring frequency