or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

factory-connection.mdindex.mdpubsub.mdpushpull.mdreqrep.mdrouter-dealer.md

factory-connection.mddocs/

0

# Factory and Connection Management

1

2

Core infrastructure for managing ZeroMQ context, creating connections, and handling connection lifecycle within Twisted's reactor pattern. The factory manages the global ZeroMQ context while connections handle individual socket operations.

3

4

## Capabilities

5

6

### ZeroMQ Factory

7

8

Manages ZeroMQ context and connection lifecycle, providing centralized context management and reactor integration.

9

10

```python { .api }

11

class ZmqFactory(object):

12

"""

13

Factory for creating and managing ZeroMQ connections.

14

15

Attributes:

16

reactor: Twisted reactor reference (default: twisted.internet.reactor)

17

ioThreads (int): Number of I/O threads for ZeroMQ context (default: 1)

18

lingerPeriod (int): Linger period in milliseconds for socket closure (default: 100)

19

connections (set): Set of active ZmqConnection instances

20

context: ZeroMQ context instance

21

"""

22

23

reactor = reactor

24

ioThreads = 1

25

lingerPeriod = 100

26

27

def __init__(self):

28

"""Create ZeroMQ context with specified I/O threads."""

29

30

def shutdown(self):

31

"""

32

Shutdown all connections and terminate ZeroMQ context.

33

Cleans up reactor triggers and closes all managed connections.

34

"""

35

36

def registerForShutdown(self):

37

"""

38

Register factory for automatic shutdown when reactor shuts down.

39

Recommended to call on any created factory.

40

"""

41

```

42

43

#### Usage Example

44

45

```python

46

from twisted.internet import reactor

47

from txzmq import ZmqFactory

48

49

# Create factory

50

factory = ZmqFactory()

51

factory.registerForShutdown() # Auto-cleanup on reactor shutdown

52

53

# Configure factory settings

54

factory.ioThreads = 2 # Use 2 I/O threads

55

factory.lingerPeriod = 500 # Wait 500ms for pending messages

56

57

# Factory will automatically manage context and connections

58

# When reactor shuts down, all connections are cleaned up

59

```

60

61

### Connection Endpoints

62

63

Endpoint specification for ZeroMQ connection addressing and binding/connecting semantics.

64

65

```python { .api }

66

class ZmqEndpointType(object):

67

"""Constants for endpoint connection types."""

68

69

bind = "bind" # Bind and listen for incoming connections

70

connect = "connect" # Connect to existing endpoint

71

72

ZmqEndpoint = namedtuple('ZmqEndpoint', ['type', 'address'])

73

"""

74

Named tuple representing a ZeroMQ endpoint.

75

76

Fields:

77

type (str): Either ZmqEndpointType.bind or ZmqEndpointType.connect

78

address (str): ZeroMQ address (e.g., "tcp://127.0.0.1:5555", "ipc:///tmp/socket")

79

"""

80

```

81

82

#### Usage Example

83

84

```python

85

from txzmq import ZmqEndpoint, ZmqEndpointType

86

87

# Create endpoints for different protocols

88

tcp_bind = ZmqEndpoint(ZmqEndpointType.bind, "tcp://127.0.0.1:5555")

89

tcp_connect = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")

90

91

ipc_bind = ZmqEndpoint(ZmqEndpointType.bind, "ipc:///tmp/my_socket")

92

inproc_connect = ZmqEndpoint(ZmqEndpointType.connect, "inproc://internal")

93

94

# Multiple endpoints can be used with single connection

95

endpoints = [

96

ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555"),

97

ZmqEndpoint(ZmqEndpointType.bind, "ipc:///tmp/backup")

98

]

99

```

100

101

### Base Connection Class

102

103

Abstract base class for all ZeroMQ connections, implementing Twisted descriptor interfaces and providing common connection functionality.

104

105

```python { .api }

106

class ZmqConnection(object):

107

"""

108

Base class for ZeroMQ connections with Twisted integration.

109

110

Implements IReadDescriptor and IFileDescriptor interfaces.

111

Should not be used directly - use pattern-specific subclasses.

112

113

Class Attributes:

114

socketType: ZeroMQ socket type constant (must be set by subclasses)

115

allowLoopbackMulticast (bool): Allow loopback multicast (default: False)

116

multicastRate (int): Multicast rate in kbps (default: 100)

117

highWaterMark (int): High water mark for message queuing (default: 0)

118

tcpKeepalive (int): TCP keepalive setting (default: 0)

119

tcpKeepaliveCount (int): TCP keepalive count (default: 0)

120

tcpKeepaliveIdle (int): TCP keepalive idle time (default: 0)

121

tcpKeepaliveInterval (int): TCP keepalive interval (default: 0)

122

reconnectInterval (int): Reconnection interval in ms (default: 100)

123

reconnectIntervalMax (int): Maximum reconnection interval (default: 0)

124

125

Instance Attributes:

126

factory (ZmqFactory): Associated factory instance

127

endpoints (list): List of ZmqEndpoint objects

128

identity (bytes): Socket identity for routing

129

socket: Underlying ZeroMQ socket

130

fd (int): File descriptor for reactor integration

131

"""

132

133

socketType = None # Must be overridden by subclasses

134

allowLoopbackMulticast = False

135

multicastRate = 100

136

highWaterMark = 0

137

tcpKeepalive = 0

138

tcpKeepaliveCount = 0

139

tcpKeepaliveIdle = 0

140

tcpKeepaliveInterval = 0

141

reconnectInterval = 100

142

reconnectIntervalMax = 0

143

144

def __init__(self, factory, endpoint=None, identity=None):

145

"""

146

Initialize connection.

147

148

Args:

149

factory (ZmqFactory): Factory managing this connection

150

endpoint (ZmqEndpoint, optional): Initial endpoint to connect/bind

151

identity (bytes, optional): Socket identity for routing

152

"""

153

154

def addEndpoints(self, endpoints):

155

"""

156

Add connection endpoints after initialization.

157

158

Args:

159

endpoints (list): List of ZmqEndpoint objects to add

160

"""

161

162

def shutdown(self):

163

"""

164

Shutdown connection and close socket.

165

Removes from reactor and cleans up resources.

166

"""

167

168

def send(self, message):

169

"""

170

Send message via ZeroMQ socket.

171

172

Args:

173

message (bytes or list): Message data - single part (bytes) or

174

multipart (list of bytes)

175

176

Raises:

177

ZMQError: If sending fails (e.g., EAGAIN when HWM reached)

178

"""

179

180

def messageReceived(self, message):

181

"""

182

Abstract method called when message is received.

183

184

Must be implemented by subclasses to handle incoming messages.

185

186

Args:

187

message (list): List of message parts (bytes)

188

"""

189

190

def fileno(self):

191

"""

192

Get file descriptor for Twisted reactor integration.

193

194

Returns:

195

int: Platform file descriptor number

196

"""

197

198

def connectionLost(self, reason):

199

"""

200

Handle connection loss (Twisted interface).

201

202

Args:

203

reason: Reason for connection loss

204

"""

205

206

def doRead(self):

207

"""

208

Handle read events from reactor (Twisted interface).

209

Processes incoming ZeroMQ messages.

210

"""

211

212

def logPrefix(self):

213

"""

214

Get log prefix for Twisted logging.

215

216

Returns:

217

str: Log prefix ("ZMQ")

218

"""

219

```

220

221

#### Usage Example

222

223

```python

224

from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqPubConnection

225

226

factory = ZmqFactory()

227

228

# Create connection with single endpoint

229

endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://127.0.0.1:5555")

230

pub = ZmqPubConnection(factory, endpoint)

231

232

# Add additional endpoints

233

additional_endpoints = [

234

ZmqEndpoint(ZmqEndpointType.bind, "ipc:///tmp/pub_socket"),

235

ZmqEndpoint(ZmqEndpointType.connect, "tcp://remote-server:5556")

236

]

237

pub.addEndpoints(additional_endpoints)

238

239

# Configure connection-specific settings

240

pub.highWaterMark = 1000 # Limit queued messages

241

pub.multicastRate = 200 # Increase multicast rate

242

243

# Connection automatically integrates with Twisted reactor

244

# Messages are processed asynchronously through doRead()

245

```

246

247

### Connection Configuration

248

249

Advanced configuration options for fine-tuning connection behavior, network settings, and performance characteristics.

250

251

```python { .api }

252

# Network and Performance Settings

253

class ZmqConnection:

254

allowLoopbackMulticast = False # Enable loopback multicast

255

multicastRate = 100 # Multicast rate in kbps

256

highWaterMark = 0 # Message queue limit (0 = unlimited)

257

258

# TCP Keepalive (ZeroMQ 3.x only)

259

tcpKeepalive = 0 # Enable TCP keepalive

260

tcpKeepaliveCount = 0 # Keepalive probe count

261

tcpKeepaliveIdle = 0 # Keepalive idle time

262

tcpKeepaliveInterval = 0 # Keepalive probe interval

263

264

# Reconnection Settings (ZeroMQ 3.x only)

265

reconnectInterval = 100 # Initial reconnect interval (ms)

266

reconnectIntervalMax = 0 # Maximum reconnect interval (ms)

267

```

268

269

#### Configuration Example

270

271

```python

272

from txzmq import ZmqFactory, ZmqPubConnection, ZmqEndpoint, ZmqEndpointType

273

274

class HighPerformancePub(ZmqPubConnection):

275

# Configure for high-throughput publishing

276

highWaterMark = 10000 # Queue up to 10k messages

277

multicastRate = 1000 # High multicast rate

278

279

# Enable TCP keepalive for reliable connections

280

tcpKeepalive = 1

281

tcpKeepaliveIdle = 600 # 10 minutes idle

282

tcpKeepaliveInterval = 60 # 1 minute between probes

283

tcpKeepaliveCount = 3 # 3 failed probes = disconnect

284

285

# Aggressive reconnection

286

reconnectInterval = 50 # Start at 50ms

287

reconnectIntervalMax = 5000 # Cap at 5 seconds

288

289

factory = ZmqFactory()

290

endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")

291

pub = HighPerformancePub(factory, endpoint)

292

```