or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

factory-connection.mdindex.mdpubsub.mdpushpull.mdreqrep.mdrouter-dealer.md

router-dealer.mddocs/

0

# Router-Dealer Messaging

1

2

Advanced routing patterns providing raw ROUTER and DEALER socket access for building custom messaging topologies and complex routing scenarios. These patterns offer the most flexibility for creating sophisticated distributed messaging architectures, custom load balancing, and multi-hop routing systems.

3

4

## Capabilities

5

6

### Base Connection Class for Advanced Patterns

7

8

Foundation class providing uniform interface for ROUTER and DEALER socket types with consistent message handling methods.

9

10

```python { .api }

11

class ZmqBase(ZmqConnection):

12

"""

13

Base class for advanced ZMQ connection types with uniform interface.

14

15

Provides consistent wrapper API over underlying socket types while allowing

16

connection-specific implementations. Does not hide socket type differences

17

but provides more consistent interaction patterns.

18

"""

19

20

def sendMsg(self, message):

21

"""

22

Send single-part message.

23

24

Default implementation delegates to sendMultipart([message]).

25

Subclasses can override with connection-specific behavior.

26

27

Args:

28

message (bytes): Message content to send

29

"""

30

31

def sendMultipart(self, parts):

32

"""

33

Send multipart message.

34

35

Default implementation delegates to underlying ZmqConnection.send().

36

Subclasses can override with connection-specific routing logic.

37

38

Args:

39

parts (list): List of message parts (bytes)

40

"""

41

42

def messageReceived(self, message):

43

"""

44

Handle incoming message and delegate to gotMessage.

45

46

Args:

47

message (list): List of message parts from ZeroMQ

48

"""

49

50

def gotMessage(self, *args, **kwargs):

51

"""

52

Abstract method for handling received messages.

53

54

Must be implemented by subclasses with socket-specific signature.

55

"""

56

```

57

58

### Router Connection

59

60

Routes messages between multiple peers with explicit addressing. Can send messages to specific recipients and receive messages with sender identification.

61

62

```python { .api }

63

class ZmqRouterConnection(ZmqBase):

64

"""

65

Router connection for advanced message routing.

66

67

Uses ZeroMQ ROUTER socket type. Can route messages to specific recipients

68

and receives messages with sender identity information.

69

Provides the foundation for building custom routing topologies.

70

"""

71

72

socketType = constants.ROUTER

73

74

def sendMsg(self, recipientId, message):

75

"""

76

Send single-part message to specific recipient.

77

78

Args:

79

recipientId (bytes): Identity of the target recipient

80

message (bytes): Message content to send

81

"""

82

83

def sendMultipart(self, recipientId, parts):

84

"""

85

Send multipart message to specific recipient.

86

87

Args:

88

recipientId (bytes): Identity of the target recipient

89

parts (list): List of message parts (bytes)

90

"""

91

92

def gotMessage(self, sender_id, *message):

93

"""

94

Abstract method called when message is received.

95

96

Must be implemented by subclasses to handle incoming messages.

97

98

Args:

99

sender_id (bytes): Identity of the message sender

100

*message: Variable number of message parts (bytes)

101

"""

102

```

103

104

#### Router Usage Example

105

106

```python

107

from twisted.internet import reactor

108

from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqRouterConnection

109

import json

110

import time

111

112

class MessageBroker(ZmqRouterConnection):

113

"""Central message broker using ROUTER socket."""

114

115

def __init__(self, factory, endpoint):

116

super().__init__(factory, endpoint)

117

self.clients = {} # Track connected clients

118

self.message_stats = {'received': 0, 'sent': 0, 'errors': 0}

119

print("Message broker started")

120

121

def gotMessage(self, sender_id, *message):

122

"""Handle incoming message from any client."""

123

self.message_stats['received'] += 1

124

sender_str = sender_id.decode('utf-8') if sender_id else 'unknown'

125

126

try:

127

# Parse message

128

msg_data = json.loads(message[0].decode('utf-8'))

129

msg_type = msg_data.get('type')

130

131

print(f"Received {msg_type} from {sender_str}")

132

133

# Update client registry

134

if sender_id not in self.clients:

135

self.clients[sender_id] = {

136

'first_seen': time.time(),

137

'last_seen': time.time(),

138

'message_count': 0

139

}

140

141

self.clients[sender_id]['last_seen'] = time.time()

142

self.clients[sender_id]['message_count'] += 1

143

144

# Route message based on type

145

if msg_type == 'register':

146

self.handle_registration(sender_id, msg_data)

147

elif msg_type == 'direct_message':

148

self.handle_direct_message(sender_id, msg_data)

149

elif msg_type == 'broadcast':

150

self.handle_broadcast(sender_id, msg_data)

151

elif msg_type == 'list_clients':

152

self.handle_list_clients(sender_id)

153

elif msg_type == 'ping':

154

self.handle_ping(sender_id, msg_data)

155

else:

156

self.send_error(sender_id, f"Unknown message type: {msg_type}")

157

158

except Exception as e:

159

print(f"Error processing message from {sender_str}: {e}")

160

self.message_stats['errors'] += 1

161

self.send_error(sender_id, f"Message processing error: {e}")

162

163

def handle_registration(self, client_id, msg_data):

164

"""Handle client registration."""

165

client_name = msg_data.get('name', 'unnamed')

166

self.clients[client_id]['name'] = client_name

167

168

response = {

169

'type': 'registration_ack',

170

'status': 'success',

171

'client_id': client_id.decode('utf-8'),

172

'message': f'Registered as {client_name}'

173

}

174

self.send_response(client_id, response)

175

176

def handle_direct_message(self, sender_id, msg_data):

177

"""Route message to specific recipient."""

178

target_name = msg_data.get('target')

179

content = msg_data.get('content')

180

181

# Find target client by name

182

target_id = None

183

for cid, info in self.clients.items():

184

if info.get('name') == target_name:

185

target_id = cid

186

break

187

188

if target_id:

189

# Forward message to target

190

forward_msg = {

191

'type': 'direct_message',

192

'from': self.clients[sender_id].get('name', 'unknown'),

193

'content': content,

194

'timestamp': time.time()

195

}

196

self.send_response(target_id, forward_msg)

197

198

# Confirm to sender

199

confirm_msg = {

200

'type': 'delivery_confirmation',

201

'target': target_name,

202

'status': 'delivered'

203

}

204

self.send_response(sender_id, confirm_msg)

205

else:

206

self.send_error(sender_id, f"Target client '{target_name}' not found")

207

208

def handle_broadcast(self, sender_id, msg_data):

209

"""Broadcast message to all connected clients except sender."""

210

content = msg_data.get('content')

211

sender_name = self.clients[sender_id].get('name', 'unknown')

212

213

broadcast_msg = {

214

'type': 'broadcast',

215

'from': sender_name,

216

'content': content,

217

'timestamp': time.time()

218

}

219

220

sent_count = 0

221

for client_id in self.clients:

222

if client_id != sender_id: # Don't send to sender

223

self.send_response(client_id, broadcast_msg)

224

sent_count += 1

225

226

# Confirm to sender

227

confirm_msg = {

228

'type': 'broadcast_confirmation',

229

'recipients': sent_count

230

}

231

self.send_response(sender_id, confirm_msg)

232

233

def handle_list_clients(self, requester_id):

234

"""Send list of connected clients."""

235

client_list = []

236

for client_id, info in self.clients.items():

237

client_list.append({

238

'name': info.get('name', 'unnamed'),

239

'connected_since': info['first_seen'],

240

'last_activity': info['last_seen'],

241

'message_count': info['message_count']

242

})

243

244

response = {

245

'type': 'client_list',

246

'clients': client_list,

247

'total': len(client_list)

248

}

249

self.send_response(requester_id, response)

250

251

def handle_ping(self, client_id, msg_data):

252

"""Respond to ping with pong."""

253

response = {

254

'type': 'pong',

255

'timestamp': time.time(),

256

'stats': self.message_stats

257

}

258

self.send_response(client_id, response)

259

260

def send_response(self, client_id, response_data):

261

"""Send response to specific client."""

262

message = json.dumps(response_data).encode('utf-8')

263

self.sendMsg(client_id, message)

264

self.message_stats['sent'] += 1

265

266

def send_error(self, client_id, error_message):

267

"""Send error response to client."""

268

error_response = {

269

'type': 'error',

270

'message': error_message,

271

'timestamp': time.time()

272

}

273

self.send_response(client_id, error_response)

274

275

# Start message broker

276

factory = ZmqFactory()

277

factory.registerForShutdown()

278

279

endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")

280

broker = MessageBroker(factory, endpoint)

281

282

print("Message broker listening on tcp://*:5555")

283

reactor.run()

284

```

285

286

### Dealer Connection

287

288

Provides raw DEALER socket functionality for peer-to-peer communication and custom routing scenarios.

289

290

```python { .api }

291

class ZmqDealerConnection(ZmqBase):

292

"""

293

Dealer connection for peer-to-peer messaging.

294

295

Uses ZeroMQ DEALER socket type. Provides raw socket access without

296

built-in message correlation. Suitable for custom protocols and

297

advanced messaging patterns.

298

"""

299

300

socketType = constants.DEALER

301

302

def sendMsg(self, message):

303

"""

304

Send single-part message.

305

306

Args:

307

message (bytes): Message content to send

308

"""

309

310

def sendMultipart(self, parts):

311

"""

312

Send multipart message.

313

314

Args:

315

parts (list): List of message parts (bytes)

316

"""

317

318

def gotMessage(self, *args):

319

"""

320

Abstract method called when message is received.

321

322

Must be implemented by subclasses to handle incoming messages.

323

324

Args:

325

*args: Variable number of message parts (bytes)

326

"""

327

```

328

329

#### Dealer Usage Example

330

331

```python

332

from twisted.internet import reactor, defer

333

from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqDealerConnection

334

import json

335

import uuid

336

337

class BrokerClient(ZmqDealerConnection):

338

"""Client connecting to message broker using DEALER socket."""

339

340

def __init__(self, factory, endpoint, client_name):

341

super().__init__(factory, endpoint)

342

self.client_name = client_name

343

self.pending_requests = {} # Track pending requests for correlation

344

self.is_registered = False

345

346

# Register with broker on connection

347

reactor.callWhenRunning(self.register)

348

349

def register(self):

350

"""Register with the message broker."""

351

register_msg = {

352

'type': 'register',

353

'name': self.client_name

354

}

355

self.send_message(register_msg)

356

357

def send_direct_message(self, target_name, content):

358

"""Send direct message to another client."""

359

msg = {

360

'type': 'direct_message',

361

'target': target_name,

362

'content': content

363

}

364

self.send_message(msg)

365

366

def send_broadcast(self, content):

367

"""Broadcast message to all clients."""

368

msg = {

369

'type': 'broadcast',

370

'content': content

371

}

372

self.send_message(msg)

373

374

def list_clients(self):

375

"""Request list of connected clients."""

376

msg = {'type': 'list_clients'}

377

self.send_message(msg)

378

379

def ping_broker(self):

380

"""Send ping to broker."""

381

msg = {'type': 'ping'}

382

self.send_message(msg)

383

384

def send_message(self, msg_data):

385

"""Send message to broker."""

386

message = json.dumps(msg_data).encode('utf-8')

387

self.sendMsg(message)

388

389

def gotMessage(self, *message):

390

"""Handle incoming message from broker."""

391

try:

392

msg_data = json.loads(message[0].decode('utf-8'))

393

msg_type = msg_data.get('type')

394

395

if msg_type == 'registration_ack':

396

self.handle_registration_ack(msg_data)

397

elif msg_type == 'direct_message':

398

self.handle_direct_message(msg_data)

399

elif msg_type == 'broadcast':

400

self.handle_broadcast(msg_data)

401

elif msg_type == 'client_list':

402

self.handle_client_list(msg_data)

403

elif msg_type == 'pong':

404

self.handle_pong(msg_data)

405

elif msg_type == 'delivery_confirmation':

406

self.handle_delivery_confirmation(msg_data)

407

elif msg_type == 'broadcast_confirmation':

408

self.handle_broadcast_confirmation(msg_data)

409

elif msg_type == 'error':

410

self.handle_error(msg_data)

411

else:

412

print(f"Unknown message type: {msg_type}")

413

414

except Exception as e:

415

print(f"Error processing message: {e}")

416

417

def handle_registration_ack(self, msg_data):

418

"""Handle registration acknowledgment."""

419

self.is_registered = True

420

print(f"βœ“ {msg_data['message']}")

421

422

def handle_direct_message(self, msg_data):

423

"""Handle incoming direct message."""

424

sender = msg_data['from']

425

content = msg_data['content']

426

timestamp = msg_data['timestamp']

427

print(f"πŸ’¬ Direct from {sender}: {content}")

428

429

def handle_broadcast(self, msg_data):

430

"""Handle incoming broadcast message."""

431

sender = msg_data['from']

432

content = msg_data['content']

433

print(f"πŸ“’ Broadcast from {sender}: {content}")

434

435

def handle_client_list(self, msg_data):

436

"""Handle client list response."""

437

clients = msg_data['clients']

438

print(f"πŸ‘₯ Connected clients ({msg_data['total']}):")

439

for client in clients:

440

print(f" - {client['name']} (messages: {client['message_count']})")

441

442

def handle_pong(self, msg_data):

443

"""Handle pong response."""

444

stats = msg_data['stats']

445

print(f"πŸ“ Pong! Broker stats: {stats}")

446

447

def handle_delivery_confirmation(self, msg_data):

448

"""Handle message delivery confirmation."""

449

target = msg_data['target']

450

status = msg_data['status']

451

print(f"βœ“ Message to {target}: {status}")

452

453

def handle_broadcast_confirmation(self, msg_data):

454

"""Handle broadcast confirmation."""

455

count = msg_data['recipients']

456

print(f"βœ“ Broadcast sent to {count} recipients")

457

458

def handle_error(self, msg_data):

459

"""Handle error message from broker."""

460

error = msg_data['message']

461

print(f"❌ Error: {error}")

462

463

# Interactive client example

464

def create_interactive_client(client_name):

465

"""Create an interactive client for testing."""

466

factory = ZmqFactory()

467

endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")

468

client = BrokerClient(factory, endpoint, client_name)

469

470

def send_test_messages():

471

if not client.is_registered:

472

reactor.callLater(1.0, send_test_messages)

473

return

474

475

# Send various test messages

476

client.list_clients()

477

client.ping_broker()

478

client.send_broadcast(f"Hello from {client_name}!")

479

480

# Schedule periodic messages

481

reactor.callLater(10.0, send_test_messages)

482

483

reactor.callLater(2.0, send_test_messages)

484

return client

485

486

# Usage: Create multiple clients

487

factory = ZmqFactory()

488

factory.registerForShutdown()

489

490

client1 = create_interactive_client("Alice")

491

client2 = create_interactive_client("Bob")

492

client3 = create_interactive_client("Charlie")

493

494

print("Started 3 clients: Alice, Bob, Charlie")

495

print("They will automatically interact with the broker")

496

reactor.run()

497

```

498

499

### Advanced Routing Patterns

500

501

Complex routing topologies using ROUTER-DEALER combinations for building scalable distributed systems.

502

503

#### Multi-Hop Routing

504

505

```python

506

class RoutingNode(ZmqRouterConnection):

507

"""Intermediate routing node for multi-hop messaging."""

508

509

def __init__(self, factory, bind_endpoint, node_id, upstream_endpoints=None):

510

super().__init__(factory, bind_endpoint)

511

self.node_id = node_id

512

self.routing_table = {} # destination -> next_hop

513

self.upstream_connections = []

514

515

# Connect to upstream nodes

516

if upstream_endpoints:

517

for endpoint in upstream_endpoints:

518

dealer = ZmqDealerConnection(factory, endpoint)

519

dealer.messageReceived = self.handle_upstream_message

520

self.upstream_connections.append(dealer)

521

522

print(f"Routing node {node_id} started")

523

524

def gotMessage(self, sender_id, *message):

525

"""Handle message from downstream clients."""

526

try:

527

msg_data = json.loads(message[0].decode('utf-8'))

528

destination = msg_data.get('destination')

529

530

if destination == self.node_id:

531

# Message for this node

532

self.handle_local_message(sender_id, msg_data)

533

else:

534

# Route message based on routing table

535

self.route_message(sender_id, destination, msg_data)

536

537

except Exception as e:

538

print(f"Routing error: {e}")

539

540

def route_message(self, sender_id, destination, msg_data):

541

"""Route message to appropriate next hop."""

542

if destination in self.routing_table:

543

next_hop = self.routing_table[destination]

544

# Add routing information

545

msg_data['route_path'] = msg_data.get('route_path', []) + [self.node_id]

546

msg_data['original_sender'] = sender_id.decode('utf-8')

547

548

# Forward to next hop

549

forwarded_msg = json.dumps(msg_data).encode('utf-8')

550

if next_hop == 'upstream':

551

# Send via upstream connection

552

for conn in self.upstream_connections:

553

conn.sendMsg(forwarded_msg)

554

else:

555

# Send to local client

556

self.sendMsg(next_hop.encode('utf-8'), forwarded_msg)

557

else:

558

# Unknown destination - send error back

559

error_msg = {

560

'type': 'routing_error',

561

'message': f'Unknown destination: {destination}',

562

'original_message': msg_data

563

}

564

self.sendMsg(sender_id, json.dumps(error_msg).encode('utf-8'))

565

566

def handle_upstream_message(self, message):

567

"""Handle message from upstream routing node."""

568

try:

569

msg_data = json.loads(message[0].decode('utf-8'))

570

destination = msg_data.get('destination')

571

572

if destination in self.routing_table:

573

# Route locally

574

target_id = self.routing_table[destination].encode('utf-8')

575

self.sendMsg(target_id, message[0])

576

else:

577

print(f"Cannot route upstream message to {destination}")

578

579

except Exception as e:

580

print(f"Upstream message handling error: {e}")

581

582

def update_routing_table(self, destination, next_hop):

583

"""Update routing table entry."""

584

self.routing_table[destination] = next_hop

585

print(f"Route updated: {destination} -> {next_hop}")

586

587

# Create hierarchical routing topology

588

factory = ZmqFactory()

589

590

# Top-level router

591

top_router = RoutingNode(

592

factory,

593

ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555"),

594

"top_router"

595

)

596

597

# Regional routers

598

east_router = RoutingNode(

599

factory,

600

ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5556"),

601

"east_router",

602

[ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")]

603

)

604

605

west_router = RoutingNode(

606

factory,

607

ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5557"),

608

"west_router",

609

[ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")]

610

)

611

612

# Configure routing tables

613

top_router.update_routing_table("east_region", "upstream")

614

top_router.update_routing_table("west_region", "upstream")

615

616

print("Multi-hop routing topology created")

617

```

618

619

### Custom Protocol Implementation

620

621

Building custom messaging protocols using ROUTER-DEALER patterns.

622

623

```python

624

class CustomProtocolRouter(ZmqRouterConnection):

625

"""Custom protocol implementation with message versioning and compression."""

626

627

PROTOCOL_VERSION = "1.0"

628

629

def __init__(self, factory, endpoint):

630

super().__init__(factory, endpoint)

631

self.session_store = {} # client_id -> session_info

632

self.message_handlers = {

633

'HELLO': self.handle_hello,

634

'DATA': self.handle_data,

635

'PING': self.handle_ping,

636

'BYE': self.handle_bye

637

}

638

639

def gotMessage(self, sender_id, *message):

640

"""Handle incoming protocol message."""

641

try:

642

# Parse protocol header

643

if len(message) < 2:

644

self.send_error(sender_id, "Invalid message format")

645

return

646

647

header = json.loads(message[0].decode('utf-8'))

648

payload = message[1]

649

650

# Validate protocol version

651

if header.get('version') != self.PROTOCOL_VERSION:

652

self.send_error(sender_id, "Unsupported protocol version")

653

return

654

655

# Extract message info

656

msg_type = header.get('type')

657

msg_id = header.get('id')

658

compressed = header.get('compressed', False)

659

660

# Decompress payload if needed

661

if compressed:

662

import zlib

663

payload = zlib.decompress(payload)

664

665

# Route to handler

666

if msg_type in self.message_handlers:

667

self.message_handlers[msg_type](sender_id, header, payload, msg_id)

668

else:

669

self.send_error(sender_id, f"Unknown message type: {msg_type}")

670

671

except Exception as e:

672

print(f"Protocol error: {e}")

673

self.send_error(sender_id, f"Protocol error: {e}")

674

675

def handle_hello(self, sender_id, header, payload, msg_id):

676

"""Handle client hello/handshake."""

677

try:

678

hello_data = json.loads(payload.decode('utf-8'))

679

client_name = hello_data.get('name', 'unknown')

680

681

# Create session

682

self.session_store[sender_id] = {

683

'name': client_name,

684

'connected_at': time.time(),

685

'last_ping': time.time(),

686

'message_count': 0

687

}

688

689

# Send welcome response

690

welcome_data = {

691

'status': 'welcome',

692

'server_time': time.time(),

693

'session_id': sender_id.decode('utf-8')

694

}

695

696

self.send_protocol_message(sender_id, 'WELCOME', welcome_data, msg_id)

697

print(f"Client {client_name} connected")

698

699

except Exception as e:

700

self.send_error(sender_id, f"Hello processing error: {e}")

701

702

def handle_data(self, sender_id, header, payload, msg_id):

703

"""Handle data message."""

704

if sender_id not in self.session_store:

705

self.send_error(sender_id, "Not authenticated")

706

return

707

708

# Update session

709

self.session_store[sender_id]['message_count'] += 1

710

self.session_store[sender_id]['last_activity'] = time.time()

711

712

# Process data (echo back with processing info)

713

processed_data = {

714

'original_size': len(payload),

715

'processed_at': time.time(),

716

'message_number': self.session_store[sender_id]['message_count'],

717

'echo': payload.decode('utf-8')[:100] # First 100 chars

718

}

719

720

self.send_protocol_message(sender_id, 'DATA_ACK', processed_data, msg_id)

721

722

def handle_ping(self, sender_id, header, payload, msg_id):

723

"""Handle ping message."""

724

if sender_id in self.session_store:

725

self.session_store[sender_id]['last_ping'] = time.time()

726

727

pong_data = {

728

'server_time': time.time(),

729

'client_count': len(self.session_store)

730

}

731

732

self.send_protocol_message(sender_id, 'PONG', pong_data, msg_id)

733

734

def handle_bye(self, sender_id, header, payload, msg_id):

735

"""Handle client disconnect."""

736

if sender_id in self.session_store:

737

session = self.session_store.pop(sender_id)

738

print(f"Client {session.get('name', 'unknown')} disconnected")

739

740

bye_data = {'status': 'goodbye'}

741

self.send_protocol_message(sender_id, 'BYE_ACK', bye_data, msg_id)

742

743

def send_protocol_message(self, client_id, msg_type, data, reply_to_id=None):

744

"""Send message using custom protocol format."""

745

import uuid

746

747

# Create protocol header

748

header = {

749

'version': self.PROTOCOL_VERSION,

750

'type': msg_type,

751

'id': str(uuid.uuid4()),

752

'timestamp': time.time(),

753

'compressed': False

754

}

755

756

if reply_to_id:

757

header['reply_to'] = reply_to_id

758

759

# Serialize payload

760

payload = json.dumps(data).encode('utf-8')

761

762

# Optional compression for large messages

763

if len(payload) > 1024:

764

import zlib

765

payload = zlib.compress(payload)

766

header['compressed'] = True

767

768

# Send as multipart message

769

header_bytes = json.dumps(header).encode('utf-8')

770

self.sendMultipart(client_id, [header_bytes, payload])

771

772

def send_error(self, client_id, error_message):

773

"""Send error message to client."""

774

error_data = {'error': error_message}

775

self.send_protocol_message(client_id, 'ERROR', error_data)

776

777

# Custom protocol client

778

class CustomProtocolClient(ZmqDealerConnection):

779

"""Client implementing custom protocol."""

780

781

def __init__(self, factory, endpoint, client_name):

782

super().__init__(factory, endpoint)

783

self.client_name = client_name

784

self.connected = False

785

786

# Send hello on connection

787

reactor.callWhenRunning(self.send_hello)

788

789

def send_hello(self):

790

"""Send hello message to server."""

791

hello_data = {'name': self.client_name}

792

self.send_protocol_message('HELLO', hello_data)

793

794

def send_protocol_message(self, msg_type, data):

795

"""Send message using custom protocol."""

796

import uuid

797

798

header = {

799

'version': "1.0",

800

'type': msg_type,

801

'id': str(uuid.uuid4()),

802

'timestamp': time.time(),

803

'compressed': False

804

}

805

806

payload = json.dumps(data).encode('utf-8')

807

header_bytes = json.dumps(header).encode('utf-8')

808

809

self.sendMultipart([header_bytes, payload])

810

811

def gotMessage(self, *message):

812

"""Handle incoming protocol message."""

813

try:

814

header = json.loads(message[0].decode('utf-8'))

815

payload = json.loads(message[1].decode('utf-8'))

816

817

msg_type = header['type']

818

819

if msg_type == 'WELCOME':

820

self.connected = True

821

print(f"βœ“ Connected as {self.client_name}")

822

elif msg_type == 'DATA_ACK':

823

print(f"Data processed: {payload}")

824

elif msg_type == 'PONG':

825

print(f"Pong: {payload}")

826

elif msg_type == 'ERROR':

827

print(f"Error: {payload['error']}")

828

829

except Exception as e:

830

print(f"Client protocol error: {e}")

831

832

# Usage

833

factory = ZmqFactory()

834

835

# Start custom protocol server

836

server_endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")

837

server = CustomProtocolRouter(factory, server_endpoint)

838

839

# Create clients

840

client1 = CustomProtocolClient(

841

factory,

842

ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555"),

843

"TestClient1"

844

)

845

846

print("Custom protocol server and client started")

847

```