or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

factory-connection.mdindex.mdpubsub.mdpushpull.mdreqrep.mdrouter-dealer.md

index.mddocs/

0

# txZMQ

1

2

txZMQ provides Twisted bindings for ZeroMQ, enabling seamless integration between ZeroMQ messaging library and Twisted's asynchronous networking framework. It allows developers to use ZeroMQ sockets within Twisted's event loop (reactor) for building high-performance, message-oriented applications that combine Twisted's networking capabilities with ZeroMQ's efficient messaging patterns.

3

4

## Package Information

5

6

- **Package Name**: txZMQ

7

- **Language**: Python

8

- **Installation**: `pip install txZMQ`

9

- **Dependencies**: `Twisted>=10.0`, `pyzmq>=13`

10

- **Compatibility**: Python 2/3, CPython/PyPy, ZeroMQ 2.2.x/3.2.x

11

12

## Core Imports

13

14

```python

15

import txzmq

16

```

17

18

Common imports for typical usage:

19

20

```python

21

from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType

22

from txzmq import ZmqPubConnection, ZmqSubConnection

23

from txzmq import ZmqPushConnection, ZmqPullConnection

24

from txzmq import ZmqREQConnection, ZmqREPConnection

25

from txzmq import ZmqRouterConnection, ZmqDealerConnection

26

```

27

28

## Basic Usage

29

30

```python

31

from twisted.internet import reactor

32

from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType

33

from txzmq import ZmqPubConnection, ZmqSubConnection

34

35

# Create factory for managing ZeroMQ context

36

factory = ZmqFactory()

37

factory.registerForShutdown()

38

39

# Publisher example

40

endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://127.0.0.1:5555")

41

publisher = ZmqPubConnection(factory, endpoint)

42

publisher.publish(b"Hello World", b"topic1")

43

44

# Subscriber example

45

class MySubscriber(ZmqSubConnection):

46

def gotMessage(self, message, tag):

47

print(f"Received: {message} with tag: {tag}")

48

49

endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")

50

subscriber = MySubscriber(factory, endpoint)

51

subscriber.subscribe(b"topic1")

52

53

reactor.run()

54

```

55

56

## Architecture

57

58

txZMQ follows a factory pattern combined with Twisted's descriptor interfaces:

59

60

- **ZmqFactory**: Manages ZeroMQ context and connection lifecycle, integrates with Twisted reactor

61

- **ZmqConnection**: Base class implementing Twisted's IReadDescriptor and IFileDescriptor interfaces

62

- **Pattern-Specific Connections**: Specialized classes for each ZeroMQ socket type (PUB/SUB, PUSH/PULL, REQ/REP, ROUTER/DEALER)

63

- **ZmqEndpoint**: Represents connection endpoints with bind/connect semantics

64

- **Integration**: Uses Twisted Deferred objects for asynchronous operations (REQ/REP pattern)

65

66

This design provides non-blocking message sending/receiving, proper integration with Twisted's event loop, and supports all major ZeroMQ messaging patterns within Twisted applications.

67

68

## Capabilities

69

70

### Factory and Connection Management

71

72

Core factory and connection infrastructure for managing ZeroMQ context, creating connections, and handling connection lifecycle within Twisted's reactor pattern.

73

74

```python { .api }

75

class ZmqFactory(object):

76

reactor = reactor

77

ioThreads = 1

78

lingerPeriod = 100

79

80

def __init__(self): ...

81

def shutdown(self): ...

82

def registerForShutdown(self): ...

83

84

class ZmqEndpoint(namedtuple('ZmqEndpoint', ['type', 'address'])): ...

85

86

class ZmqEndpointType(object):

87

bind = "bind"

88

connect = "connect"

89

90

class ZmqConnection(object):

91

def __init__(self, factory, endpoint=None, identity=None): ...

92

def addEndpoints(self, endpoints): ...

93

def shutdown(self): ...

94

def send(self, message): ...

95

def messageReceived(self, message): ...

96

```

97

98

[Factory and Connection Management](./factory-connection.md)

99

100

### Publish-Subscribe Messaging

101

102

Publisher-subscriber pattern for one-to-many broadcast messaging with topic-based filtering and subscription management.

103

104

```python { .api }

105

class ZmqPubConnection(ZmqConnection):

106

socketType = constants.PUB

107

108

def publish(self, message, tag=b''): ...

109

110

class ZmqSubConnection(ZmqConnection):

111

socketType = constants.SUB

112

113

def subscribe(self, tag): ...

114

def unsubscribe(self, tag): ...

115

def gotMessage(self, message, tag): ...

116

```

117

118

[Publish-Subscribe Messaging](./pubsub.md)

119

120

### Push-Pull Messaging

121

122

Push-pull pattern for load-balanced work distribution and result collection in distributed processing scenarios.

123

124

```python { .api }

125

class ZmqPushConnection(ZmqConnection):

126

socketType = constants.PUSH

127

128

def push(self, message): ...

129

130

class ZmqPullConnection(ZmqConnection):

131

socketType = constants.PULL

132

133

def onPull(self, message): ...

134

```

135

136

[Push-Pull Messaging](./pushpull.md)

137

138

### Request-Reply Messaging

139

140

Request-reply pattern with timeout support and Twisted Deferred integration for building client-server applications with asynchronous response handling.

141

142

```python { .api }

143

class ZmqRequestTimeoutError(Exception): ...

144

145

class ZmqREQConnection(ZmqConnection):

146

socketType = constants.DEALER

147

defaultRequestTimeout = None

148

149

def sendMsg(self, *messageParts, **kwargs): ...

150

151

class ZmqREPConnection(ZmqConnection):

152

socketType = constants.ROUTER

153

154

def reply(self, messageId, *messageParts): ...

155

def gotMessage(self, messageId, *messageParts): ...

156

```

157

158

[Request-Reply Messaging](./reqrep.md)

159

160

### Router-Dealer Messaging

161

162

Advanced routing patterns providing raw ROUTER and DEALER socket access for building custom messaging topologies and complex routing scenarios.

163

164

```python { .api }

165

class ZmqRouterConnection(ZmqConnection):

166

socketType = constants.ROUTER

167

168

def sendMsg(self, recipientId, message): ...

169

def sendMultipart(self, recipientId, parts): ...

170

def gotMessage(self, sender_id, *message): ...

171

172

class ZmqDealerConnection(ZmqConnection):

173

socketType = constants.DEALER

174

175

def sendMsg(self, message): ...

176

def sendMultipart(self, parts): ...

177

def gotMessage(self, *args): ...

178

```

179

180

[Router-Dealer Messaging](./router-dealer.md)

181

182

## Common Types

183

184

```python { .api }

185

from collections import namedtuple

186

from typing import List, Union

187

188

# Endpoint types

189

class ZmqEndpointType:

190

bind = "bind" # Bind and listen for connections

191

connect = "connect" # Connect to existing endpoint

192

193

# Endpoint specification

194

ZmqEndpoint = namedtuple('ZmqEndpoint', ['type', 'address'])

195

# Usage: ZmqEndpoint(ZmqEndpointType.bind, "tcp://127.0.0.1:5555")

196

197

# Message formats (type hints)

198

Message = Union[bytes, List[bytes]] # Single part or multipart message

199

Tag = bytes # Topic tag for pub/sub

200

MessageId = bytes # Unique identifier for req/rep

201

RecipientId = bytes # Recipient identifier for router

202

```