or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication-security.mdchannel-operations.mdconnection-adapters.mdconnection-management.mdexception-handling.mdindex.mdmessage-properties-types.md

connection-adapters.mddocs/

0

# Connection Adapters

1

2

Framework-specific connection adapters for asyncio, Tornado, Twisted, and Gevent integration enabling pika usage in different Python async frameworks and event loops.

3

4

## Capabilities

5

6

### Base Connection

7

8

Abstract base class for all connection adapters.

9

10

```python { .api }

11

class BaseConnection:

12

"""Abstract base class for all connection adapters."""

13

14

def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,

15

on_close_callback=None):

16

"""

17

Base connection initialization.

18

19

Parameters:

20

- parameters (ConnectionParameters): Connection configuration

21

- on_open_callback (callable): Called when connection opens

22

- on_open_error_callback (callable): Called on connection error

23

- on_close_callback (callable): Called when connection closes

24

"""

25

26

def channel(self, on_open_callback, channel_number=None):

27

"""

28

Create a new channel asynchronously.

29

30

Parameters:

31

- on_open_callback (callable): Called when channel opens

32

- channel_number (int, optional): Specific channel number

33

"""

34

35

def close(self, reply_code=200, reply_text='Normal shutdown'):

36

"""

37

Close connection.

38

39

Parameters:

40

- reply_code (int): AMQP reply code

41

- reply_text (str): Human-readable reason

42

"""

43

44

# Connection state properties

45

@property

46

def is_closed(self) -> bool:

47

"""True if connection is closed."""

48

49

@property

50

def is_open(self) -> bool:

51

"""True if connection is open."""

52

```

53

54

### AsyncIO Connection

55

56

Native Python 3 asyncio integration for async/await patterns.

57

58

```python { .api }

59

class AsyncioConnection(BaseConnection):

60

"""Connection adapter for Python asyncio."""

61

62

def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,

63

on_close_callback=None, loop=None):

64

"""

65

Create asyncio connection.

66

67

Parameters:

68

- parameters (ConnectionParameters): Connection configuration

69

- on_open_callback (callable): Called when connection opens

70

- on_open_error_callback (callable): Called on connection error

71

- on_close_callback (callable): Called when connection closes

72

- loop (asyncio.AbstractEventLoop): Event loop (uses current if None)

73

"""

74

75

def channel(self, on_open_callback, channel_number=None):

76

"""

77

Create asyncio channel.

78

79

Parameters:

80

- on_open_callback (callable): Called when channel opens

81

- channel_number (int, optional): Specific channel number

82

"""

83

```

84

85

### Tornado Connection

86

87

Integration with Tornado web framework and IOLoop.

88

89

```python { .api }

90

class TornadoConnection(BaseConnection):

91

"""Connection adapter for Tornado web framework."""

92

93

def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,

94

on_close_callback=None, ioloop=None):

95

"""

96

Create Tornado connection.

97

98

Parameters:

99

- parameters (ConnectionParameters): Connection configuration

100

- on_open_callback (callable): Called when connection opens

101

- on_open_error_callback (callable): Called on connection error

102

- on_close_callback (callable): Called when connection closes

103

- ioloop (tornado.ioloop.IOLoop): Tornado IOLoop instance

104

"""

105

```

106

107

### Twisted Connection

108

109

Integration with Twisted framework and reactor pattern.

110

111

```python { .api }

112

class TwistedProtocolConnection(BaseConnection):

113

"""Connection adapter for Twisted framework."""

114

115

def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,

116

on_close_callback=None):

117

"""

118

Create Twisted connection.

119

120

Parameters:

121

- parameters (ConnectionParameters): Connection configuration

122

- on_open_callback (callable): Called when connection opens

123

- on_open_error_callback (callable): Called on connection error

124

- on_close_callback (callable): Called when connection closes

125

"""

126

127

class TwistedConnection(TwistedProtocolConnection):

128

"""Alias for TwistedProtocolConnection."""

129

```

130

131

### Gevent Connection

132

133

Integration with Gevent cooperative multitasking library.

134

135

```python { .api }

136

class GeventConnection(BaseConnection):

137

"""Connection adapter for Gevent."""

138

139

def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,

140

on_close_callback=None):

141

"""

142

Create Gevent connection.

143

144

Parameters:

145

- parameters (ConnectionParameters): Connection configuration

146

- on_open_callback (callable): Called when connection opens

147

- on_open_error_callback (callable): Called on connection error

148

- on_close_callback (callable): Called when connection closes

149

"""

150

```

151

152

### Select Connection

153

154

Event-driven connection using select/poll/epoll for high-performance async operations.

155

156

```python { .api }

157

class SelectConnection(BaseConnection):

158

"""Event-driven connection using select/poll/epoll."""

159

160

def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,

161

on_close_callback=None, ioloop=None):

162

"""

163

Create select-based connection.

164

165

Parameters:

166

- parameters (ConnectionParameters): Connection configuration

167

- on_open_callback (callable): Called when connection opens

168

- on_open_error_callback (callable): Called on connection error

169

- on_close_callback (callable): Called when connection closes

170

- ioloop (IOLoop): Event loop instance

171

"""

172

173

@property

174

def ioloop(self):

175

"""Get the IOLoop instance."""

176

177

class IOLoop:

178

"""Event loop implementation for SelectConnection."""

179

180

def start(self):

181

"""Start the event loop."""

182

183

def stop(self):

184

"""Stop the event loop."""

185

186

def add_timeout(self, deadline, callback_method):

187

"""

188

Add a timeout callback.

189

190

Parameters:

191

- deadline (float): Time when callback should be called

192

- callback_method (callable): Callback function

193

"""

194

195

def remove_timeout(self, timeout_id):

196

"""

197

Remove a timeout callback.

198

199

Parameters:

200

- timeout_id: Timeout identifier to remove

201

"""

202

203

def call_later(self, delay, callback, *args, **kwargs):

204

"""

205

Schedule callback execution after delay.

206

207

Parameters:

208

- delay (float): Delay in seconds

209

- callback (callable): Callback function

210

- *args, **kwargs: Arguments for callback

211

"""

212

```

213

214

## Usage Examples

215

216

### AsyncIO Connection

217

218

```python

219

import asyncio

220

import pika

221

from pika.adapters.asyncio_connection import AsyncioConnection

222

223

async def main():

224

# Connection callbacks

225

def on_connection_open(connection):

226

print('Connection opened')

227

connection.channel(on_open_callback=on_channel_open)

228

229

def on_connection_open_error(connection, error):

230

print(f'Connection failed: {error}')

231

232

def on_connection_closed(connection, reason):

233

print(f'Connection closed: {reason}')

234

235

def on_channel_open(channel):

236

print('Channel opened')

237

238

# Declare queue

239

channel.queue_declare(

240

queue='asyncio_test',

241

callback=on_queue_declared

242

)

243

244

def on_queue_declared(method_frame):

245

print('Queue declared')

246

247

# Publish message

248

channel.basic_publish(

249

exchange='',

250

routing_key='asyncio_test',

251

body='Hello AsyncIO!'

252

)

253

254

# Create connection

255

parameters = pika.ConnectionParameters('localhost')

256

connection = AsyncioConnection(

257

parameters,

258

on_open_callback=on_connection_open,

259

on_open_error_callback=on_connection_open_error,

260

on_close_callback=on_connection_closed

261

)

262

263

# Keep running

264

await asyncio.sleep(2)

265

connection.close()

266

267

# Run the async function

268

asyncio.run(main())

269

```

270

271

### Tornado Connection

272

273

```python

274

import pika

275

from pika.adapters.tornado_connection import TornadoConnection

276

from tornado import ioloop

277

278

def on_connection_open(connection):

279

print('Connection opened')

280

connection.channel(on_open_callback=on_channel_open)

281

282

def on_connection_open_error(connection, error):

283

print(f'Connection failed: {error}')

284

ioloop.IOLoop.current().stop()

285

286

def on_connection_closed(connection, reason):

287

print(f'Connection closed: {reason}')

288

ioloop.IOLoop.current().stop()

289

290

def on_channel_open(channel):

291

print('Channel opened')

292

293

def on_queue_declared(method_frame):

294

# Start consuming

295

channel.basic_consume(

296

queue='tornado_test',

297

on_message_callback=on_message,

298

auto_ack=True

299

)

300

301

# Declare queue

302

channel.queue_declare(queue='tornado_test', callback=on_queue_declared)

303

304

def on_message(channel, method, properties, body):

305

print(f'Received: {body.decode()}')

306

307

# Stop after first message

308

channel.close()

309

310

# Create connection

311

parameters = pika.ConnectionParameters('localhost')

312

connection = TornadoConnection(

313

parameters,

314

on_open_callback=on_connection_open,

315

on_open_error_callback=on_connection_open_error,

316

on_close_callback=on_connection_closed

317

)

318

319

# Start Tornado IOLoop

320

ioloop.IOLoop.current().start()

321

```

322

323

### Twisted Connection

324

325

```python

326

import pika

327

from pika.adapters.twisted_connection import TwistedProtocolConnection

328

from twisted.internet import reactor, defer

329

330

class TwistedExample:

331

def __init__(self):

332

self.connection = None

333

self.channel = None

334

335

def connect(self):

336

parameters = pika.ConnectionParameters('localhost')

337

self.connection = TwistedProtocolConnection(

338

parameters,

339

on_open_callback=self.on_connection_open,

340

on_open_error_callback=self.on_connection_error,

341

on_close_callback=self.on_connection_closed

342

)

343

344

def on_connection_open(self, connection):

345

print('Connection opened')

346

connection.channel(on_open_callback=self.on_channel_open)

347

348

def on_connection_error(self, connection, error):

349

print(f'Connection failed: {error}')

350

reactor.stop()

351

352

def on_connection_closed(self, connection, reason):

353

print(f'Connection closed: {reason}')

354

reactor.stop()

355

356

def on_channel_open(self, channel):

357

print('Channel opened')

358

self.channel = channel

359

360

# Declare queue

361

channel.queue_declare(

362

queue='twisted_test',

363

callback=self.on_queue_declared

364

)

365

366

def on_queue_declared(self, method_frame):

367

# Publish message

368

self.channel.basic_publish(

369

exchange='',

370

routing_key='twisted_test',

371

body='Hello Twisted!'

372

)

373

374

# Close connection after publishing

375

reactor.callLater(1, self.connection.close)

376

377

# Usage

378

example = TwistedExample()

379

example.connect()

380

reactor.run()

381

```

382

383

### Gevent Connection

384

385

```python

386

import pika

387

from pika.adapters.gevent_connection import GeventConnection

388

import gevent

389

390

class GeventExample:

391

def __init__(self):

392

self.connection = None

393

self.channel = None

394

self.is_ready = False

395

396

def connect(self):

397

parameters = pika.ConnectionParameters('localhost')

398

self.connection = GeventConnection(

399

parameters,

400

on_open_callback=self.on_connection_open,

401

on_open_error_callback=self.on_connection_error,

402

on_close_callback=self.on_connection_closed

403

)

404

405

def on_connection_open(self, connection):

406

print('Connection opened')

407

connection.channel(on_open_callback=self.on_channel_open)

408

409

def on_connection_error(self, connection, error):

410

print(f'Connection failed: {error}')

411

412

def on_connection_closed(self, connection, reason):

413

print(f'Connection closed: {reason}')

414

415

def on_channel_open(self, channel):

416

print('Channel opened')

417

self.channel = channel

418

self.is_ready = True

419

420

def publish_message(self, message):

421

if self.is_ready:

422

self.channel.basic_publish(

423

exchange='',

424

routing_key='gevent_test',

425

body=message

426

)

427

print(f'Published: {message}')

428

429

# Usage

430

example = GeventExample()

431

example.connect()

432

433

# Wait for connection to be ready

434

while not example.is_ready:

435

gevent.sleep(0.1)

436

437

# Publish some messages

438

for i in range(5):

439

example.publish_message(f'Message {i}')

440

gevent.sleep(1)

441

442

example.connection.close()

443

```

444

445

### Select Connection with Custom IOLoop

446

447

```python

448

import pika

449

from pika.adapters.select_connection import SelectConnection, IOLoop

450

451

class SelectExample:

452

def __init__(self):

453

self.connection = None

454

self.channel = None

455

self.ioloop = None

456

457

def connect(self):

458

# Create custom IOLoop

459

self.ioloop = IOLoop()

460

461

parameters = pika.ConnectionParameters('localhost')

462

self.connection = SelectConnection(

463

parameters,

464

on_open_callback=self.on_connection_open,

465

on_open_error_callback=self.on_connection_error,

466

on_close_callback=self.on_connection_closed,

467

ioloop=self.ioloop

468

)

469

470

def on_connection_open(self, connection):

471

print('Connection opened')

472

connection.channel(on_open_callback=self.on_channel_open)

473

474

def on_connection_error(self, connection, error):

475

print(f'Connection failed: {error}')

476

self.ioloop.stop()

477

478

def on_connection_closed(self, connection, reason):

479

print(f'Connection closed: {reason}')

480

self.ioloop.stop()

481

482

def on_channel_open(self, channel):

483

print('Channel opened')

484

self.channel = channel

485

486

# Schedule periodic message publishing

487

self.schedule_publish()

488

489

def schedule_publish(self):

490

# Publish message

491

self.channel.basic_publish(

492

exchange='',

493

routing_key='select_test',

494

body='Hello Select!'

495

)

496

497

# Schedule next publish in 2 seconds

498

self.ioloop.call_later(2.0, self.schedule_publish)

499

500

def run(self):

501

# Start the IOLoop

502

self.ioloop.start()

503

504

# Usage

505

example = SelectExample()

506

example.connect()

507

508

# Stop after 10 seconds

509

example.ioloop.call_later(10.0, example.connection.close)

510

511

example.run()

512

```

513

514

### Adapter Comparison

515

516

```python

517

import pika

518

519

def create_connection(adapter_type='blocking'):

520

"""Create connection based on adapter type."""

521

parameters = pika.ConnectionParameters('localhost')

522

523

if adapter_type == 'blocking':

524

return pika.BlockingConnection(parameters)

525

526

elif adapter_type == 'select':

527

def on_open(connection):

528

print('Select connection opened')

529

530

return pika.SelectConnection(

531

parameters,

532

on_open_callback=on_open

533

)

534

535

elif adapter_type == 'asyncio':

536

from pika.adapters.asyncio_connection import AsyncioConnection

537

538

def on_open(connection):

539

print('AsyncIO connection opened')

540

541

return AsyncioConnection(

542

parameters,

543

on_open_callback=on_open

544

)

545

546

elif adapter_type == 'tornado':

547

from pika.adapters.tornado_connection import TornadoConnection

548

549

def on_open(connection):

550

print('Tornado connection opened')

551

552

return TornadoConnection(

553

parameters,

554

on_open_callback=on_open

555

)

556

557

else:

558

raise ValueError(f"Unsupported adapter type: {adapter_type}")

559

560

# Usage examples

561

print("Available adapters:")

562

print("- blocking: Synchronous operations")

563

print("- select: Event-driven with select/poll")

564

print("- asyncio: Python 3 async/await")

565

print("- tornado: Tornado web framework")

566

print("- twisted: Twisted framework")

567

print("- gevent: Gevent cooperative multitasking")

568

```