An asynchronous networking framework written in Python
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Implementations of various network protocols including FTP, IRC, XMPP, AMP (Asynchronous Messaging Protocol), and utilities for building custom protocols. Twisted provides a rich ecosystem of protocol implementations.
Fundamental protocol classes for building line-oriented and length-prefixed protocols.
class basic.LineReceiver:
"""
Protocol that receives data line by line.
Attributes:
- delimiter: Line delimiter (default: b'\\r\\n')
- MAX_LENGTH: Maximum line length
- clearLineBuffer: Whether to clear buffer on line length exceeded
"""
delimiter = b'\r\n'
MAX_LENGTH = 16384
clearLineBuffer = False
def lineReceived(self, line):
"""
Called when a complete line is received.
Args:
line (bytes): Received line (without delimiter)
"""
def lineLengthExceeded(self, line):
"""
Called when line exceeds MAX_LENGTH.
Args:
line (bytes): Partial line data
"""
def sendLine(self, line):
"""
Send a line with delimiter.
Args:
line (bytes): Line to send
"""
def setLineMode(self, extra=b''):
"""
Switch to line-based receiving mode.
Args:
extra (bytes): Extra data to process
"""
def setRawMode(self):
"""Switch to raw data receiving mode."""
class basic.IntNStringReceiver:
"""
Protocol that receives length-prefixed strings.
Attributes:
- structFormat: Struct format for length field
- prefixLength: Length of length prefix
- MAX_LENGTH: Maximum string length
"""
structFormat = '!I' # 32-bit big-endian unsigned int
prefixLength = 4
MAX_LENGTH = 99999999
def stringReceived(self, string):
"""
Called when a complete string is received.
Args:
string (bytes): Received string
"""
def lengthLimitExceeded(self, length):
"""
Called when string length exceeds MAX_LENGTH.
Args:
length (int): Attempted string length
"""
def sendString(self, string):
"""
Send a length-prefixed string.
Args:
string (bytes): String to send
"""
class basic.Int32StringReceiver(basic.IntNStringReceiver):
"""32-bit length-prefixed string receiver."""
structFormat = '!I'
prefixLength = 4
class basic.Int16StringReceiver(basic.IntNStringReceiver):
"""16-bit length-prefixed string receiver."""
structFormat = '!H'
prefixLength = 2
class basic.Int8StringReceiver(basic.IntNStringReceiver):
"""8-bit length-prefixed string receiver."""
structFormat = '!B'
prefixLength = 1
class basic.NetstringReceiver:
"""
Protocol that receives netstring-encoded data.
Netstring format: <length>:<data>,
"""
MAX_LENGTH = 99999999
def stringReceived(self, string):
"""
Called when a complete netstring is received.
Args:
string (bytes): Received string data
"""
def lengthLimitExceeded(self, length):
"""
Called when netstring length exceeds MAX_LENGTH.
Args:
length (int): Attempted string length
"""
def sendString(self, string):
"""
Send a netstring.
Args:
string (bytes): String to send
"""
class basic.StatefulStringProtocol:
"""
Protocol with multiple states for complex parsing.
States are methods named 'proto_<STATE>' that return next state.
"""
def proto_init(self):
"""
Initial protocol state.
Returns:
str: Next state name
"""Basic Protocol Usage Example:
from twisted.protocols import basic
from twisted.internet import protocol, reactor, endpoints
class ChatProtocol(basic.LineReceiver):
delimiter = b'\n'
def connectionMade(self):
self.sendLine(b"Welcome to chat server!")
self.factory.clients.append(self)
def connectionLost(self, reason):
if self in self.factory.clients:
self.factory.clients.remove(self)
def lineReceived(self, line):
message = f"User says: {line.decode()}"
# Broadcast to all clients
for client in self.factory.clients:
if client != self:
client.sendLine(message.encode())
class ChatFactory(protocol.ServerFactory):
def __init__(self):
self.clients = []
def buildProtocol(self, addr):
protocol = ChatProtocol()
protocol.factory = self
return protocol
# Start server
endpoint = endpoints.TCP4ServerEndpoint(reactor, 8080)
endpoint.listen(ChatFactory())
reactor.run()High-level RPC protocol with structured commands and responses.
class amp.AMP:
"""
Asynchronous Messaging Protocol implementation.
Provides structured command/response messaging over TCP.
"""
def __init__(self, boxReceiver=None, locator=None):
"""
Args:
boxReceiver: Custom box receiver
locator: Command locator
"""
def callRemote(self, command, **kwargs):
"""
Call remote command.
Args:
command: AMP command class
**kwargs: Command arguments
Returns:
Deferred: Response from remote
"""
def callRemoteString(self, commandName, **kwargs):
"""
Call remote command by name.
Args:
commandName (str): Command name
**kwargs: Command arguments
Returns:
Deferred: Response from remote
"""
class amp.Command:
"""
Base class for AMP commands.
Attributes:
- commandName: Command name (bytes)
- arguments: List of command arguments
- response: List of response arguments
- errors: Dict of error mappings
- requiresAnswer: Whether command expects response
"""
commandName = None
arguments = []
response = []
errors = {}
requiresAnswer = True
@classmethod
def makeArguments(cls, objects, proto):
"""
Serialize command arguments.
Args:
objects (dict): Argument values
proto: Protocol instance
Returns:
dict: Serialized arguments
"""
@classmethod
def parseResponse(cls, strings, proto):
"""
Parse command response.
Args:
strings (dict): Response data
proto: Protocol instance
Returns:
dict: Parsed response
"""
# AMP argument types
class amp.Argument:
"""Base class for AMP argument types."""
def toString(self, inObject):
"""
Serialize argument to string.
Args:
inObject: Python object
Returns:
bytes: Serialized form
"""
def fromString(self, inString):
"""
Deserialize argument from string.
Args:
inString (bytes): Serialized form
Returns:
Python object
"""
class amp.Integer(amp.Argument):
"""Integer argument type."""
class amp.String(amp.Argument):
"""String argument type."""
class amp.Unicode(amp.Argument):
"""Unicode string argument type."""
class amp.Boolean(amp.Argument):
"""Boolean argument type."""
class amp.Float(amp.Argument):
"""Float argument type."""
class amp.Decimal(amp.Argument):
"""Decimal argument type."""
class amp.DateTime(amp.Argument):
"""DateTime argument type."""
class amp.ListOf(amp.Argument):
"""List argument type."""
def __init__(self, elementType):
"""
Args:
elementType: Type of list elements
"""AMP Usage Example:
from twisted.protocols import amp
from twisted.internet import reactor, defer
# Define commands
class Sum(amp.Command):
commandName = b'sum'
arguments = [
(b'a', amp.Integer()),
(b'b', amp.Integer()),
]
response = [
(b'result', amp.Integer()),
]
class Greeting(amp.Command):
commandName = b'greeting'
arguments = [
(b'name', amp.Unicode()),
]
response = [
(b'message', amp.Unicode()),
]
# Server implementation
class MathServer(amp.AMP):
def sum(self, a, b):
return {'result': a + b}
def greeting(self, name):
return {'message': f'Hello, {name}!'}
# Client usage
class MathClient(amp.AMP):
def connectionMade(self):
amp.AMP.connectionMade(self)
# Call remote commands
d1 = self.callRemote(Sum, a=5, b=3)
d1.addCallback(lambda result: print(f"Sum: {result['result']}"))
d2 = self.callRemote(Greeting, name="Alice")
d2.addCallback(lambda result: print(f"Greeting: {result['message']}"))File Transfer Protocol client and server implementations.
class ftp.FTP:
"""
FTP server protocol implementation.
Attributes:
- shell: Avatar object providing file system interface
- workingDirectory: Current working directory
"""
shell = None
workingDirectory = None
def ftp_USER(self, params):
"""Handle USER command."""
def ftp_PASS(self, params):
"""Handle PASS command."""
def ftp_LIST(self, params):
"""Handle LIST command."""
def ftp_RETR(self, params):
"""Handle RETR command."""
def ftp_STOR(self, params):
"""Handle STOR command."""
class ftp.FTPFactory:
"""
FTP server factory.
"""
def __init__(self):
self.timeOut = 600 # Session timeout
self.welcomeMessage = "Welcome to FTP server"
class ftp.FTPClient:
"""
FTP client protocol implementation.
"""
def __init__(self, username='anonymous', password='twisted@twistedmatrix.com', passive=1):
"""
Args:
username (str): FTP username
password (str): FTP password
passive (bool): Use passive mode
"""
def retrieveFile(self, filename, callback, offset=0):
"""
Retrieve file from server.
Args:
filename (str): Remote filename
callback: Function to receive file data
offset (int): Start offset
Returns:
Deferred: Completion indicator
"""
def storeFile(self, filename, file, offset=0):
"""
Store file on server.
Args:
filename (str): Remote filename
file: File-like object to upload
offset (int): Start offset
Returns:
Deferred: Completion indicator
"""
def list(self, path, callback):
"""
List directory contents.
Args:
path (str): Directory path
callback: Function to receive listing data
Returns:
Deferred: Completion indicator
"""
def makeDirectory(self, path):
"""
Create directory.
Args:
path (str): Directory path
Returns:
Deferred: Completion indicator
"""
def removeFile(self, path):
"""
Remove file.
Args:
path (str): File path
Returns:
Deferred: Completion indicator
"""
class ftp.FTPClientBasic:
"""
Basic FTP client with simple interface.
"""
def __init__(self, username='anonymous', password='twisted@twistedmatrix.com'):
"""
Args:
username (str): FTP username
password (str): FTP password
"""
def connectTo(self, host, port=21):
"""
Connect to FTP server.
Args:
host (str): Server hostname
port (int): Server port
Returns:
Deferred[FTPClient]: Connected client
"""Wrappers and policies for controlling protocol behavior.
class policies.ProtocolWrapper:
"""
Base class for protocol wrappers.
"""
def __init__(self, protocol, factory):
"""
Args:
protocol: Wrapped protocol
factory: Wrapper factory
"""
class policies.WrappingFactory:
"""
Base factory for protocol wrappers.
"""
def __init__(self, wrappedFactory):
"""
Args:
wrappedFactory: Factory being wrapped
"""
def buildProtocol(self, addr):
"""
Build wrapped protocol.
Args:
addr: Connection address
Returns:
ProtocolWrapper: Wrapped protocol
"""
class policies.ThrottlingFactory:
"""
Factory that limits connection rate.
"""
def __init__(self, wrappedFactory, maxConnectionsPerIP=10):
"""
Args:
wrappedFactory: Factory being wrapped
maxConnectionsPerIP (int): Max connections per IP
"""
class policies.TimeoutFactory:
"""
Factory that enforces connection timeouts.
"""
def __init__(self, wrappedFactory, timeoutPeriod=30*60):
"""
Args:
wrappedFactory: Factory being wrapped
timeoutPeriod (int): Timeout in seconds
"""
class policies.TrafficLoggingFactory:
"""
Factory that logs all network traffic.
"""
def __init__(self, wrappedFactory, logfile):
"""
Args:
wrappedFactory: Factory being wrapped
logfile: File to log traffic to
"""Utilities for testing protocols without network connections.
def loopback.loopback(server, client, pumpPolicy=None):
"""
Connect two protocols in loopback for testing.
Args:
server: Server protocol instance
client: Client protocol instance
pumpPolicy: Data pumping policy
Returns:
Deferred: Fires when loopback complete
"""
def loopback.loopbackTCP(server, client, port=0):
"""
Connect protocols via TCP loopback.
Args:
server: Server protocol instance
client: Client protocol instance
port (int): Port number (0 for random)
Returns:
Deferred: Fires when connection established
"""
def loopback.loopbackUNIX(server, client):
"""
Connect protocols via Unix socket loopback.
Args:
server: Server protocol instance
client: Client protocol instance
Returns:
Deferred: Fires when connection established
"""Loopback Testing Example:
from twisted.protocols import basic, loopback
from twisted.test import proto_helpers
from twisted.trial import unittest
class EchoProtocol(basic.LineReceiver):
def lineReceived(self, line):
self.sendLine(line)
class TestEcho(unittest.TestCase):
def test_echo_loopback(self):
server = EchoProtocol()
client = EchoProtocol()
# Set up data collection
client_data = []
def collect_data(line):
client_data.append(line)
client.lineReceived = collect_data
# Run loopback test
d = loopback.loopback(server, client)
# Send test data
client.sendLine(b"Hello")
client.sendLine(b"World")
# Verify echo
def check_result(_):
self.assertEqual(client_data, [b"Hello", b"World"])
d.addCallback(check_result)
return dBrief overview of other protocol implementations available in Twisted.
# IRC Protocol (twisted.words.protocols.irc)
class irc.IRCClient:
"""IRC client protocol with event handlers."""
def joined(self, channel):
"""Called when joined a channel."""
def privmsg(self, user, channel, msg):
"""Called when receiving private message."""
# IDENT Protocol (twisted.protocols.ident)
class ident.IdentServer:
"""Ident server protocol (RFC 1413)."""
class ident.IdentClient:
"""Ident client protocol."""
# Memcached Protocol (twisted.protocols.memcache)
class memcache.MemCacheProtocol:
"""Memcached protocol client."""
def get(self, keys):
"""Get values by keys."""
def set(self, key, value, flags=0, expireTime=0):
"""Set key-value pair."""
# SOCKS Protocol (twisted.protocols.socks)
class socks.SOCKSv4:
"""SOCKS version 4 proxy protocol."""
class socks.SOCKSv4Factory:
"""SOCKS v4 factory."""Install with Tessl CLI
npx tessl i tessl/pypi-twisted