or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-support.mdauthentication.mdconstants.mdcore-messaging.mddevices.mderror-handling.mdindex.mdmessage-handling.mdpolling.md

authentication.mddocs/

0

# Authentication

1

2

ZAP (ZMQ Authentication Protocol) implementation with support for NULL, PLAIN, and CURVE security mechanisms, including certificate generation and management.

3

4

## Capabilities

5

6

### Authenticator Classes

7

8

The base Authenticator class and its threading/asyncio variants provide ZAP authentication services.

9

10

```python { .api }

11

class Authenticator:

12

def __init__(self, context: Context = None, encoding: str = 'utf-8') -> None:

13

"""

14

Create a new Authenticator.

15

16

Parameters:

17

- context: ZMQ context (creates new if None)

18

- encoding: String encoding for credentials

19

"""

20

21

def start(self) -> None:

22

"""Start the authenticator in a background thread."""

23

24

def stop(self) -> None:

25

"""Stop the authenticator and clean up resources."""

26

27

def allow(self, *addresses: str) -> None:

28

"""

29

Allow connections from specified IP addresses.

30

31

Parameters:

32

- addresses: IP addresses or subnets to allow

33

"""

34

35

def deny(self, *addresses: str) -> None:

36

"""

37

Deny connections from specified IP addresses.

38

39

Parameters:

40

- addresses: IP addresses or subnets to deny

41

"""

42

43

def configure_plain(self, domain: str = '*', passwords: dict = None) -> None:

44

"""

45

Configure PLAIN authentication.

46

47

Parameters:

48

- domain: Authentication domain ('*' for all)

49

- passwords: Dict of username -> password mappings

50

"""

51

52

def configure_curve(self, domain: str = '*', location: str = '') -> None:

53

"""

54

Configure CURVE authentication.

55

56

Parameters:

57

- domain: Authentication domain ('*' for all)

58

- location: Directory containing authorized public keys

59

"""

60

61

def configure_curve_callback(self, domain: str = '*', callback: callable = None) -> None:

62

"""

63

Configure CURVE authentication with callback.

64

65

Parameters:

66

- domain: Authentication domain

67

- callback: Function to validate public keys

68

"""

69

```

70

71

### Threading Authenticator

72

73

Thread-based authenticator for synchronous applications.

74

75

```python { .api }

76

class ThreadAuthenticator(Authenticator):

77

def __init__(self, context: Context = None, encoding: str = 'utf-8', log: Any = None) -> None:

78

"""

79

Create a thread-based authenticator.

80

81

Parameters:

82

- context: ZMQ context

83

- encoding: String encoding

84

- log: Logger instance

85

"""

86

87

def __enter__(self) -> ThreadAuthenticator:

88

"""Context manager entry."""

89

90

def __exit__(self, exc_type, exc_val, exc_tb) -> None:

91

"""Context manager exit with cleanup."""

92

```

93

94

### AsyncIO Authenticator

95

96

Async-compatible authenticator for asyncio applications.

97

98

```python { .api }

99

class AsyncioAuthenticator(Authenticator):

100

def __init__(self, context: Context = None, encoding: str = 'utf-8', log: Any = None) -> None:

101

"""

102

Create an asyncio-compatible authenticator.

103

104

Parameters:

105

- context: ZMQ context

106

- encoding: String encoding

107

- log: Logger instance

108

"""

109

110

async def start(self) -> None:

111

"""Start the authenticator asynchronously."""

112

113

async def stop(self) -> None:

114

"""Stop the authenticator asynchronously."""

115

116

async def __aenter__(self) -> AsyncioAuthenticator:

117

"""Async context manager entry."""

118

119

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:

120

"""Async context manager exit with cleanup."""

121

```

122

123

### Certificate Management

124

125

Functions for generating and managing CURVE certificates.

126

127

```python { .api }

128

def create_certificates(keys_dir: str, name: str) -> tuple[str, str]:

129

"""

130

Create a new certificate (keypair).

131

132

Parameters:

133

- keys_dir: Directory to store certificate files

134

- name: Certificate name (used for filenames)

135

136

Returns:

137

- tuple: (public_key_file, secret_key_file) paths

138

"""

139

140

def load_certificate(filename: str) -> tuple[bytes, bytes]:

141

"""

142

Load a certificate from file.

143

144

Parameters:

145

- filename: Certificate file path

146

147

Returns:

148

- tuple: (public_key, secret_key) as bytes

149

"""

150

151

def save_certificate(filename: str, public_key: bytes, secret_key: bytes) -> None:

152

"""

153

Save a certificate to file.

154

155

Parameters:

156

- filename: Target file path

157

- public_key: Public key bytes

158

- secret_key: Secret key bytes

159

"""

160

161

def curve_keypair() -> tuple[bytes, bytes]:

162

"""

163

Generate a new CURVE keypair.

164

165

Returns:

166

- tuple: (public_key, secret_key) as bytes

167

"""

168

169

def curve_public(secret_key: bytes) -> bytes:

170

"""

171

Derive public key from secret key.

172

173

Parameters:

174

- secret_key: Secret key bytes

175

176

Returns:

177

- bytes: Corresponding public key

178

"""

179

```

180

181

## Usage Examples

182

183

### Basic PLAIN Authentication

184

185

```python

186

import zmq

187

from zmq.auth import Authenticator

188

189

# Server with PLAIN authentication

190

context = zmq.Context()

191

192

# Create and start authenticator

193

auth = Authenticator(context)

194

auth.start()

195

196

# Configure PLAIN authentication

197

passwords = {'admin': 'secret', 'user': 'password'}

198

auth.configure_plain(domain='*', passwords=passwords)

199

200

# Create server socket

201

server = context.socket(zmq.REP)

202

server.set(zmq.PLAIN_SERVER, 1) # Enable PLAIN server

203

server.bind("tcp://*:5555")

204

205

try:

206

while True:

207

message = server.recv_string()

208

print(f"Authenticated message: {message}")

209

server.send_string(f"Echo: {message}")

210

211

except KeyboardInterrupt:

212

pass

213

finally:

214

server.close()

215

auth.stop()

216

context.term()

217

```

218

219

```python

220

import zmq

221

222

# Client with PLAIN authentication

223

context = zmq.Context()

224

225

client = context.socket(zmq.REQ)

226

client.set(zmq.PLAIN_USERNAME, b'admin')

227

client.set(zmq.PLAIN_PASSWORD, b'secret')

228

client.connect("tcp://localhost:5555")

229

230

try:

231

client.send_string("Hello authenticated server")

232

reply = client.recv_string()

233

print(f"Server reply: {reply}")

234

finally:

235

client.close()

236

context.term()

237

```

238

239

### CURVE Authentication

240

241

```python

242

import zmq

243

from zmq.auth import Authenticator, create_certificates

244

import os

245

246

# Create certificates directory

247

keys_dir = "certificates"

248

os.makedirs(keys_dir, exist_ok=True)

249

250

# Generate server certificate

251

server_public_file, server_secret_file = create_certificates(keys_dir, "server")

252

client_public_file, client_secret_file = create_certificates(keys_dir, "client")

253

254

# Server with CURVE authentication

255

context = zmq.Context()

256

257

# Load server keys

258

with open(server_secret_file, 'rb') as f:

259

server_secret = f.read()

260

with open(server_public_file, 'rb') as f:

261

server_public = f.read()

262

263

# Create and configure authenticator

264

auth = Authenticator(context)

265

auth.start()

266

auth.configure_curve(domain='*', location=keys_dir)

267

268

# Create server socket

269

server = context.socket(zmq.REP)

270

server.set(zmq.CURVE_SERVER, 1)

271

server.set(zmq.CURVE_SECRETKEY, server_secret)

272

273

server.bind("tcp://*:5555")

274

275

try:

276

while True:

277

message = server.recv_string()

278

print(f"Encrypted message: {message}")

279

server.send_string(f"Encrypted echo: {message}")

280

281

except KeyboardInterrupt:

282

pass

283

finally:

284

server.close()

285

auth.stop()

286

context.term()

287

```

288

289

```python

290

import zmq

291

292

# Client with CURVE authentication

293

context = zmq.Context()

294

295

# Load client keys

296

with open(client_secret_file, 'rb') as f:

297

client_secret = f.read()

298

with open(client_public_file, 'rb') as f:

299

client_public = f.read()

300

with open(server_public_file, 'rb') as f:

301

server_public = f.read()

302

303

client = context.socket(zmq.REQ)

304

client.set(zmq.CURVE_SECRETKEY, client_secret)

305

client.set(zmq.CURVE_PUBLICKEY, client_public)

306

client.set(zmq.CURVE_SERVERKEY, server_public)

307

308

client.connect("tcp://localhost:5555")

309

310

try:

311

client.send_string("Hello encrypted server")

312

reply = client.recv_string()

313

print(f"Encrypted reply: {reply}")

314

finally:

315

client.close()

316

context.term()

317

```

318

319

### Context Manager Usage

320

321

```python

322

import zmq

323

from zmq.auth import ThreadAuthenticator

324

325

context = zmq.Context()

326

327

# Use authenticator as context manager

328

with ThreadAuthenticator(context) as auth:

329

# Configure authentication

330

passwords = {'client': 'secret'}

331

auth.configure_plain(domain='*', passwords=passwords)

332

333

# Create server

334

server = context.socket(zmq.REP)

335

server.set(zmq.PLAIN_SERVER, 1)

336

server.bind("tcp://*:5555")

337

338

try:

339

message = server.recv_string()

340

server.send_string(f"Authenticated: {message}")

341

finally:

342

server.close()

343

# Authenticator automatically stopped when leaving context

344

345

context.term()

346

```

347

348

### AsyncIO Authentication

349

350

```python

351

import asyncio

352

import zmq.asyncio

353

from zmq.auth.asyncio import AsyncioAuthenticator

354

355

async def authenticated_server():

356

context = zmq.asyncio.Context()

357

358

async with AsyncioAuthenticator(context) as auth:

359

# Configure authentication

360

passwords = {'async_client': 'async_secret'}

361

auth.configure_plain(domain='*', passwords=passwords)

362

363

# Create server socket

364

server = context.socket(zmq.REP)

365

server.set(zmq.PLAIN_SERVER, 1)

366

server.bind("tcp://*:5555")

367

368

try:

369

while True:

370

message = await server.recv_string()

371

print(f"Async authenticated: {message}")

372

await server.send_string(f"Async echo: {message}")

373

except KeyboardInterrupt:

374

pass

375

finally:

376

server.close()

377

378

context.term()

379

380

# Run async server

381

asyncio.run(authenticated_server())

382

```

383

384

### IP Address Filtering

385

386

```python

387

import zmq

388

from zmq.auth import ThreadAuthenticator

389

390

context = zmq.Context()

391

392

with ThreadAuthenticator(context) as auth:

393

# Allow specific IP addresses

394

auth.allow('127.0.0.1', '192.168.1.0/24')

395

396

# Deny specific IP addresses

397

auth.deny('192.168.1.100')

398

399

# Configure PLAIN authentication

400

passwords = {'user': 'pass'}

401

auth.configure_plain(domain='*', passwords=passwords)

402

403

server = context.socket(zmq.REP)

404

server.set(zmq.PLAIN_SERVER, 1)

405

server.bind("tcp://*:5555")

406

407

try:

408

while True:

409

message = server.recv_string()

410

server.send_string(f"Filtered and authenticated: {message}")

411

except KeyboardInterrupt:

412

pass

413

finally:

414

server.close()

415

416

context.term()

417

```

418

419

### CURVE with Callback Authentication

420

421

```python

422

import zmq

423

from zmq.auth import ThreadAuthenticator

424

425

def validate_key(public_key):

426

"""Custom public key validation callback"""

427

# Implement custom validation logic

428

allowed_keys = load_allowed_keys() # Your implementation

429

return public_key in allowed_keys

430

431

context = zmq.Context()

432

433

with ThreadAuthenticator(context) as auth:

434

# Configure CURVE with callback

435

auth.configure_curve_callback(domain='*', callback=validate_key)

436

437

server = context.socket(zmq.REP)

438

server.set(zmq.CURVE_SERVER, 1)

439

server.set(zmq.CURVE_SECRETKEY, server_secret_key)

440

server.bind("tcp://*:5555")

441

442

try:

443

while True:

444

message = server.recv_string()

445

server.send_string(f"Callback authenticated: {message}")

446

except KeyboardInterrupt:

447

pass

448

finally:

449

server.close()

450

451

context.term()

452

```

453

454

### Certificate Management

455

456

```python

457

import zmq

458

from zmq.auth.certs import create_certificates, load_certificate

459

import os

460

461

# Create certificates directory

462

certs_dir = "auth_certificates"

463

os.makedirs(certs_dir, exist_ok=True)

464

465

# Generate multiple certificates

466

server_pub, server_sec = create_certificates(certs_dir, "server")

467

client1_pub, client1_sec = create_certificates(certs_dir, "client1")

468

client2_pub, client2_sec = create_certificates(certs_dir, "client2")

469

470

print(f"Server certificate: {server_pub}")

471

print(f"Client certificates: {client1_pub}, {client2_pub}")

472

473

# Load certificate for use

474

public_key, secret_key = load_certificate(server_sec)

475

print(f"Server keys loaded: {len(public_key)} + {len(secret_key)} bytes")

476

477

# Generate keypair programmatically

478

public, secret = zmq.curve_keypair()

479

print(f"Generated keypair: {len(public)} + {len(secret)} bytes")

480

481

# Derive public key from secret

482

derived_public = zmq.curve_public(secret)

483

assert public == derived_public

484

print("Key derivation verified")

485

```

486

487

### Domain-Based Authentication

488

489

```python

490

import zmq

491

from zmq.auth import ThreadAuthenticator

492

493

context = zmq.Context()

494

495

with ThreadAuthenticator(context) as auth:

496

# Configure different authentication for different domains

497

admin_passwords = {'admin': 'admin_secret'}

498

user_passwords = {'user1': 'user_secret', 'user2': 'user_secret'}

499

500

auth.configure_plain(domain='admin', passwords=admin_passwords)

501

auth.configure_plain(domain='users', passwords=user_passwords)

502

503

# Create servers for different domains

504

admin_server = context.socket(zmq.REP)

505

admin_server.set(zmq.PLAIN_SERVER, 1)

506

admin_server.set_string(zmq.ZAP_DOMAIN, 'admin')

507

admin_server.bind("tcp://*:5555")

508

509

user_server = context.socket(zmq.REP)

510

user_server.set(zmq.PLAIN_SERVER, 1)

511

user_server.set_string(zmq.ZAP_DOMAIN, 'users')

512

user_server.bind("tcp://*:5556")

513

514

# Handle requests (simplified example)

515

poller = zmq.Poller()

516

poller.register(admin_server, zmq.POLLIN)

517

poller.register(user_server, zmq.POLLIN)

518

519

try:

520

while True:

521

events = poller.poll(1000)

522

for socket, event in events:

523

if socket is admin_server:

524

message = admin_server.recv_string()

525

admin_server.send_string(f"Admin: {message}")

526

elif socket is user_server:

527

message = user_server.recv_string()

528

user_server.send_string(f"User: {message}")

529

except KeyboardInterrupt:

530

pass

531

finally:

532

admin_server.close()

533

user_server.close()

534

535

context.term()

536

```

537

538

## Security Considerations

539

540

### Key Management

541

542

```python

543

import zmq

544

import os

545

546

# Generate secure random keys

547

public_key, secret_key = zmq.curve_keypair()

548

549

# Store keys securely (restrict file permissions)

550

os.umask(0o077) # Only owner can read/write

551

552

with open('server.key', 'wb') as f:

553

f.write(secret_key)

554

555

with open('server.pub', 'wb') as f:

556

f.write(public_key)

557

558

# Verify permissions

559

stat = os.stat('server.key')

560

assert stat.st_mode & 0o077 == 0, "Secret key file has insecure permissions"

561

```

562

563

### Authentication Logging

564

565

```python

566

import zmq

567

from zmq.auth import ThreadAuthenticator

568

import logging

569

570

# Configure logging

571

logging.basicConfig(level=logging.INFO)

572

logger = logging.getLogger('auth')

573

574

context = zmq.Context()

575

576

with ThreadAuthenticator(context, log=logger) as auth:

577

# Authentication events will be logged

578

passwords = {'user': 'secret'}

579

auth.configure_plain(domain='*', passwords=passwords)

580

581

server = context.socket(zmq.REP)

582

server.set(zmq.PLAIN_SERVER, 1)

583

server.bind("tcp://*:5555")

584

585

# Authentication successes/failures logged automatically

586

try:

587

message = server.recv_string()

588

server.send_string("Authenticated")

589

finally:

590

server.close()

591

592

context.term()

593

```

594

595

## Types

596

597

```python { .api }

598

from typing import Dict, Optional, Callable, Tuple, Any, Union

599

import logging

600

601

# Authentication types

602

Passwords = Dict[str, str] # username -> password mapping

603

PublicKey = bytes

604

SecretKey = bytes

605

Keypair = Tuple[PublicKey, SecretKey]

606

607

# Address types

608

IPAddress = str

609

AddressList = List[IPAddress]

610

611

# Callback types

612

KeyValidationCallback = Callable[[PublicKey], bool]

613

AuthContext = Union[Context, None]

614

AuthLogger = Union[logging.Logger, Any, None]

615

616

# Certificate types

617

CertificatePath = str

618

CertificateFiles = Tuple[CertificatePath, CertificatePath] # (public_file, secret_file)

619

620

# Domain types

621

AuthDomain = str

622

```