CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-irc

IRC (Internet Relay Chat) protocol library for Python

Pending
Overview
Eval results
Files

protocol-extensions.mddocs/

Protocol Extensions

Support for IRC protocol extensions including CTCP (Client-To-Client Protocol), DCC (Direct Client-to-Client), IRCv3 message tags, and server capability detection. These extensions provide enhanced functionality beyond basic IRC protocol.

Capabilities

CTCP (Client-To-Client Protocol)

CTCP enables direct client-to-client communication through specially formatted messages embedded in PRIVMSG and NOTICE commands.

# CTCP constants
LOW_LEVEL_QUOTE = "\x10"     # Low-level quote character
LEVEL_QUOTE = "\\"           # Level quote character  
DELIMITER = "\x01"           # CTCP delimiter character

# Character mapping for low-level quoting
low_level_mapping = {
    "\x10": "\x10\x10",      # Quote -> Quote Quote
    "\x00": "\x10\x30",      # NUL -> Quote 0
    "\x0a": "\x10\x6e",      # LF -> Quote n
    "\x0d": "\x10\x72"       # CR -> Quote r
}

def dequote(message: str) -> list:
    """
    Dequote CTCP message according to CTCP specifications.
    
    Processes CTCP quoting and extracts CTCP commands from IRC messages.
    
    Parameters:
    - message: str, raw CTCP message with quoting
    
    Returns:
    list, dequoted CTCP commands
    """

DCC (Direct Client-to-Client)

DCC provides direct TCP connections between IRC clients, bypassing the IRC server for file transfers and chat.

class DCCConnection:
    """Direct client-to-client connection for file transfers and chat."""
    
    def connect(self, address: tuple, port: int):
        """
        Connect to DCC peer.
        
        Parameters:
        - address: tuple, (hostname, port) of peer
        - port: int, port number for connection
        """
    
    def listen(self, addr=None):
        """
        Listen for DCC connections.
        
        Parameters:
        - addr: tuple, optional bind address (default: all interfaces)
        """
    
    def disconnect(self, message: str = ""):
        """
        Disconnect DCC connection.
        
        Parameters:
        - message: str, optional disconnect message
        """
    
    def privmsg(self, text: str):
        """
        Send private message over DCC chat.
        
        Parameters:
        - text: str, message text to send
        """
    
    def send_bytes(self, bytes_data: bytes):
        """
        Send raw bytes over DCC connection.
        
        Parameters:
        - bytes_data: bytes, raw data to send
        """
    
    @property
    def connected(self) -> bool:
        """Whether DCC connection is established."""
    
    @property
    def socket(self):
        """Underlying socket object."""

Server Features (ISUPPORT)

Server capability detection and management based on IRC ISUPPORT (005) messages.

class FeatureSet:
    """Manages IRC server features and capabilities from ISUPPORT."""
    
    def __init__(self):
        """Initialize empty feature set."""
    
    def set(self, name: str, value=True):
        """
        Set server feature value.
        
        Parameters:
        - name: str, feature name (e.g., "CHANTYPES", "NICKLEN")
        - value: feature value (True for boolean features, str/int for valued features)
        """
    
    def remove(self, feature_name: str):
        """
        Remove server feature.
        
        Parameters:
        - feature_name: str, name of feature to remove
        """
    
    def load(self, arguments: list):
        """
        Load features from ISUPPORT message arguments.
        
        Parameters:
        - arguments: list, ISUPPORT message arguments
        """
    
    def load_feature(self, feature: str):
        """
        Load individual feature string.
        
        Parameters:
        - feature: str, feature string (e.g., "CHANTYPES=#&", "NICKLEN=30")
        """
    
    def _parse_PREFIX(self) -> dict:
        """Parse PREFIX feature (channel user modes)."""
    
    def _parse_CHANMODES(self) -> dict:
        """Parse CHANMODES feature (channel mode types)."""
    
    def _parse_TARGMAX(self) -> dict:
        """Parse TARGMAX feature (maximum targets per command)."""
    
    def _parse_CHANLIMIT(self) -> dict:
        """Parse CHANLIMIT feature (channel limits by type)."""
    
    def _parse_MAXLIST(self) -> dict:
        """Parse MAXLIST feature (maximum list entries)."""
    
    def _parse_other(self) -> dict:
        """Parse other miscellaneous features."""

def string_int_pair(target: str, sep: str = ":") -> tuple:
    """
    Parse string:integer pair from server features.
    
    Parameters:
    - target: str, string to parse (e.g., "#:120")
    - sep: str, separator character (default ":")
    
    Returns:
    tuple, (string, integer) pair
    """

IRCv3 Message Tags

Support for IRCv3 message tags that provide metadata and extended functionality.

class Tag:
    """IRCv3 message tag parsing and handling."""
    
    @staticmethod
    def parse(item: str) -> dict:
        """
        Parse IRCv3 tag string into key-value pairs.
        
        Parameters:
        - item: str, tag string (e.g., "key=value;key2=value2")
        
        Returns:
        dict, parsed tags
        """
    
    @classmethod
    def from_group(cls, group):
        """
        Create Tag from regex match group.
        
        Parameters:
        - group: regex match group containing tag data
        
        Returns:
        Tag instance
        """

# Tag unescaping for IRCv3 compliance
_TAG_UNESCAPE_MAP = {
    "\\\\": "\\",    # Backslash
    "\\_": "_",      # Underscore  
    "\\:": ";",      # Semicolon
    "\\s": " ",      # Space
    "\\r": "\r",     # Carriage return
    "\\n": "\n"      # Line feed
}

Message Parsing

IRC message parsing utilities for handling protocol messages and arguments.

class Arguments(list):
    """IRC command arguments with special parsing rules."""
    
    @staticmethod  
    def from_group(group) -> list:
        """
        Parse IRC command arguments from regex match group.
        
        Parameters:
        - group: regex match group containing arguments
        
        Returns:
        list, parsed arguments
        """

Usage Examples

CTCP Handler

import irc.client
import irc.ctcp

def handle_ctcp(connection, event):
    """Handle CTCP queries and provide standard responses."""
    ctcp_command = event.arguments[0]
    nick = event.source.nick
    
    if ctcp_command == "VERSION":
        version_info = f"Python IRC Bot 1.0 using python-irc library"
        connection.ctcp_reply(nick, f"VERSION {version_info}")
    
    elif ctcp_command == "TIME":
        import datetime
        current_time = datetime.datetime.now().strftime("%a %b %d %H:%M:%S %Y")
        connection.ctcp_reply(nick, f"TIME {current_time}")
    
    elif ctcp_command.startswith("PING"):
        # Echo back the ping data
        ping_data = ctcp_command[5:] if len(ctcp_command) > 5 else ""
        connection.ctcp_reply(nick, f"PING {ping_data}")
    
    elif ctcp_command == "CLIENTINFO":
        supported_commands = "VERSION TIME PING CLIENTINFO"
        connection.ctcp_reply(nick, f"CLIENTINFO {supported_commands}")
    
    else:
        print(f"Unknown CTCP command from {nick}: {ctcp_command}")

def handle_ctcp_reply(connection, event):
    """Handle CTCP replies."""
    reply = event.arguments[0]
    nick = event.source.nick
    print(f"CTCP reply from {nick}: {reply}")

def on_connect(connection, event):
    connection.join("#test")

# Set up CTCP handling
client = irc.client.SimpleIRCClient()
client.connection.add_global_handler("welcome", on_connect)
client.connection.add_global_handler("ctcp", handle_ctcp)
client.connection.add_global_handler("ctcpreply", handle_ctcp_reply)

client.connect("irc.libera.chat", 6667, "ctcpbot")
client.start()

DCC Chat Example

import irc.client
import threading

class DCCChatBot:
    def __init__(self):
        self.client = irc.client.SimpleIRCClient()
        self.dcc_connections = {}
        self.setup_handlers()
    
    def setup_handlers(self):
        """Set up IRC and DCC event handlers."""
        def on_connect(connection, event):
            connection.join("#dcctest")
        
        def on_pubmsg(connection, event):
            message = event.arguments[0]
            nick = event.source.nick
            
            if message.startswith("!dcc"):
                # Offer DCC chat
                dcc = self.client.dcc("chat")
                dcc.listen()
                
                # Get listening address and port
                address = dcc.socket.getsockname()
                host_ip = "127.0.0.1"  # Use actual external IP
                port = address[1]
                
                # Send DCC CHAT request
                dcc_msg = f"\x01DCC CHAT chat {self._ip_to_int(host_ip)} {port}\x01"
                connection.privmsg(nick, dcc_msg)
                
                self.dcc_connections[nick] = dcc
        
        def on_dcc_connect(connection, event):
            """Handle DCC connection established."""
            nick = event.source.nick if event.source else "unknown"
            print(f"DCC chat connected with {nick}")
            
            # Send welcome message
            connection.privmsg("Welcome to DCC chat!")
        
        def on_dccmsg(connection, event):
            """Handle DCC chat messages."""
            nick = event.source.nick if event.source else "unknown"
            message = event.arguments[0]
            print(f"DCC <{nick}> {message}")
            
            # Echo message back
            connection.privmsg(f"Echo: {message}")
        
        def on_dcc_disconnect(connection, event):
            """Handle DCC disconnection."""
            print("DCC chat disconnected")
        
        self.client.connection.add_global_handler("welcome", on_connect)
        self.client.connection.add_global_handler("pubmsg", on_pubmsg)
        self.client.connection.add_global_handler("dcc_connect", on_dcc_connect)
        self.client.connection.add_global_handler("dccmsg", on_dccmsg)
        self.client.connection.add_global_handler("dcc_disconnect", on_dcc_disconnect)
    
    def _ip_to_int(self, ip_str):
        """Convert IP address string to integer."""
        parts = ip_str.split('.')
        return (int(parts[0]) << 24) + (int(parts[1]) << 16) + (int(parts[2]) << 8) + int(parts[3])
    
    def start(self):
        """Start the bot."""
        self.client.connect("irc.libera.chat", 6667, "dccbot")
        self.client.start()

# Usage
bot = DCCChatBot()
bot.start()

Server Features Detection

import irc.client

class FeatureAwareBot:
    def __init__(self):
        self.client = irc.client.SimpleIRCClient()
        self.server_features = None
        self.setup_handlers()
    
    def setup_handlers(self):
        """Set up event handlers."""
        def on_connect(connection, event):
            self.server_features = connection.features
            self.analyze_features()
            connection.join("#test")
        
        def on_isupport(connection, event):
            """Handle server feature announcements."""
            features = event.arguments
            print(f"Server features: {features}")
            
            # Features are automatically parsed into connection.features
            self.analyze_features()
        
        def on_pubmsg(connection, event):
            message = event.arguments[0]
            channel = event.target
            
            if message == "!features":
                self.show_features(connection, channel)
            elif message == "!limits":
                self.show_limits(connection, channel)
        
        self.client.connection.add_global_handler("welcome", on_connect)
        self.client.connection.add_global_handler("isupport", on_isupport)
        self.client.connection.add_global_handler("pubmsg", on_pubmsg)
    
    def analyze_features(self):
        """Analyze server features and adapt behavior."""
        if not self.server_features:
            return
        
        features = self.server_features
        
        # Check nickname length limit
        if hasattr(features, 'NICKLEN'):
            print(f"Maximum nickname length: {features.NICKLEN}")
        
        # Check channel types
        if hasattr(features, 'CHANTYPES'):
            print(f"Supported channel types: {features.CHANTYPES}")
        
        # Check channel modes
        if hasattr(features, 'CHANMODES'):
            print(f"Channel modes: {features.CHANMODES}")
        
        # Check prefix modes (op, voice, etc.)
        if hasattr(features, 'PREFIX'):
            print(f"User prefix modes: {features.PREFIX}")
        
        # Check maximum targets per command
        if hasattr(features, 'TARGMAX'):
            print(f"Target maximums: {features.TARGMAX}")
    
    def show_features(self, connection, channel):
        """Show server features in channel."""
        if not self.server_features:
            connection.privmsg(channel, "Server features not available")
            return
        
        features = []
        for attr in dir(self.server_features):
            if not attr.startswith('_'):
                value = getattr(self.server_features, attr)
                if value is not None:
                    features.append(f"{attr}={value}")
        
        # Send features in chunks to avoid message length limits
        chunk_size = 5
        for i in range(0, len(features), chunk_size):
            chunk = features[i:i+chunk_size]
            connection.privmsg(channel, " | ".join(chunk))
    
    def show_limits(self, connection, channel):
        """Show server limits."""
        limits = []
        
        if hasattr(self.server_features, 'NICKLEN'):
            limits.append(f"Nick: {self.server_features.NICKLEN}")
        
        if hasattr(self.server_features, 'CHANNELLEN'):
            limits.append(f"Channel: {self.server_features.CHANNELLEN}")
        
        if hasattr(self.server_features, 'TOPICLEN'):
            limits.append(f"Topic: {self.server_features.TOPICLEN}")
        
        if hasattr(self.server_features, 'KICKLEN'):
            limits.append(f"Kick: {self.server_features.KICKLEN}")
        
        if limits:
            connection.privmsg(channel, f"Server limits: {' | '.join(limits)}")
        else:
            connection.privmsg(channel, "No length limits announced")
    
    def start(self):
        """Start the bot."""
        self.client.connect("irc.libera.chat", 6667, "featurebot")
        self.client.start()

# Usage
bot = FeatureAwareBot()
bot.start()

IRCv3 Tags Handler

import irc.client
import irc.message

class IRCv3Bot:
    def __init__(self):
        self.client = irc.client.SimpleIRCClient()
        self.capabilities = set()
        self.setup_handlers()
    
    def setup_handlers(self):
        """Set up event handlers."""
        def on_cap_ls(connection, event):
            """Handle capability list."""
            available_caps = event.arguments[1].split()
            print(f"Available capabilities: {available_caps}")
            
            # Request desired capabilities
            desired_caps = ["message-tags", "server-time", "account-tag", "batch"]
            to_request = [cap for cap in desired_caps if cap in available_caps]
            
            if to_request:
                connection.send_raw(f"CAP REQ :{' '.join(to_request)}")
            connection.send_raw("CAP END")
        
        def on_cap_ack(connection, event):
            """Handle capability acknowledgment."""
            acked_caps = event.arguments[1].split()
            self.capabilities.update(acked_caps)
            print(f"Enabled capabilities: {acked_caps}")
        
        def on_connect(connection, event):
            """Handle connection."""
            # Request capabilities before registration
            connection.send_raw("CAP LS 302")
            connection.join("#ircv3test")
        
        def on_pubmsg(connection, event):
            """Handle public messages with IRCv3 tags."""
            self.handle_tagged_message(connection, event)
        
        def on_privmsg(connection, event):
            """Handle private messages with IRCv3 tags."""
            self.handle_tagged_message(connection, event)
        
        # Set up handlers
        self.client.connection.add_global_handler("cap", self.handle_cap)
        self.client.connection.add_global_handler("welcome", on_connect)
        self.client.connection.add_global_handler("pubmsg", on_pubmsg)
        self.client.connection.add_global_handler("privmsg", on_privmsg)
    
    def handle_cap(self, connection, event):
        """Handle all CAP subcommands."""
        cap_command = event.arguments[0]
        
        if cap_command == "LS":
            self.on_cap_ls(connection, event)
        elif cap_command == "ACK":
            self.on_cap_ack(connection, event)
        elif cap_command == "NAK":
            rejected_caps = event.arguments[1].split()
            print(f"Rejected capabilities: {rejected_caps}")
            connection.send_raw("CAP END")
    
    def handle_tagged_message(self, connection, event):
        """Handle messages with IRCv3 tags."""
        tags = event.tags or {}
        nick = event.source.nick if event.source else "server"
        message = event.arguments[0] if event.arguments else ""
        
        print(f"<{nick}> {message}")
        
        # Process specific tags
        if "account" in tags:
            print(f"  Account: {tags['account']}")
        
        if "server-time" in tags:
            print(f"  Time: {tags['server-time']}")
        
        if "batch" in tags:
            print(f"  Batch: {tags['batch']}")
        
        if "reply" in tags:
            print(f"  Reply to: {tags['reply']}")
        
        # Respond to tagged commands
        if message.startswith("!tag"):
            response_tags = {}
            if "msgid" in tags:
                response_tags["reply"] = tags["msgid"]
            
            # Send response with tags (if server supports message-tags)
            if "message-tags" in self.capabilities and response_tags:
                tag_string = ";".join(f"{k}={v}" for k, v in response_tags.items())
                connection.send_raw(f"@{tag_string} PRIVMSG {event.target} :Tagged response!")
            else:
                connection.privmsg(event.target, "Tagged response!")
    
    def start(self):
        """Start the bot."""
        self.client.connect("irc.libera.chat", 6667, "ircv3bot")
        self.client.start()

# Usage
bot = IRCv3Bot()
bot.start()

Multi-Protocol Extension Bot

import irc.client
import irc.ctcp
import time

class ExtensionBot:
    def __init__(self):
        self.client = irc.client.SimpleIRCClient()
        self.dcc_connections = {}
        self.batch_buffer = {}
        self.setup_handlers()
    
    def setup_handlers(self):
        """Set up handlers for all protocol extensions."""
        # Basic connection
        def on_connect(connection, event):
            connection.send_raw("CAP LS 302")  # Request IRCv3 capabilities
            connection.join("#extensions")
        
        # CTCP handling
        def on_ctcp(connection, event):
            self.handle_ctcp(connection, event)
        
        # DCC handling  
        def on_dcc_connect(connection, event):
            print("DCC connection established")
            connection.privmsg("Welcome to DCC chat! Type 'help' for commands.")
        
        def on_dccmsg(connection, event):
            message = event.arguments[0].strip()
            if message == "help":
                connection.privmsg("Commands: time, quit, echo <text>")
            elif message == "time":
                connection.privmsg(f"Current time: {time.ctime()}")
            elif message == "quit":
                connection.disconnect("Goodbye!")
            elif message.startswith("echo "):
                connection.privmsg(f"Echo: {message[5:]}")
        
        # IRCv3 batch handling
        def on_batch(connection, event):
            batch_id = event.arguments[0]
            batch_type = event.arguments[1] if len(event.arguments) > 1 else "unknown"
            
            if batch_id.startswith("+"):
                # Start of batch
                self.batch_buffer[batch_id[1:]] = {"type": batch_type, "messages": []}
                print(f"Started batch {batch_id[1:]} of type {batch_type}")
            elif batch_id.startswith("-"):
                # End of batch
                batch_data = self.batch_buffer.pop(batch_id[1:], None)
                if batch_data:
                    print(f"Completed batch {batch_id[1:]} with {len(batch_data['messages'])} messages")
        
        # Main message handler
        def on_pubmsg(connection, event):
            message = event.arguments[0]
            channel = event.target
            nick = event.source.nick
            
            # Handle batch messages
            if event.tags and "batch" in event.tags:
                batch_id = event.tags["batch"]
                if batch_id in self.batch_buffer:
                    self.batch_buffer[batch_id]["messages"].append(event)
                    return  # Don't process batched messages immediately
            
            # Regular message processing
            if message.startswith("!dcc"):
                self.offer_dcc_chat(connection, nick)
            elif message.startswith("!ctcp "):
                target = message.split()[1]
                ctcp_cmd = " ".join(message.split()[2:])
                connection.send_raw(f"PRIVMSG {target} :\x01{ctcp_cmd}\x01")
            elif message.startswith("!extensions"):
                self.show_extensions(connection, channel)
        
        # Set up all handlers
        self.client.connection.add_global_handler("welcome", on_connect)
        self.client.connection.add_global_handler("ctcp", on_ctcp)
        self.client.connection.add_global_handler("dcc_connect", on_dcc_connect)
        self.client.connection.add_global_handler("dccmsg", on_dccmsg)
        self.client.connection.add_global_handler("batch", on_batch)
        self.client.connection.add_global_handler("pubmsg", on_pubmsg)
    
    def handle_ctcp(self, connection, event):
        """Handle CTCP queries."""
        ctcp_command = event.arguments[0]
        nick = event.source.nick
        
        responses = {
            "VERSION": "ExtensionBot 1.0 - IRC Protocol Extension Demo",
            "TIME": time.ctime(),
            "CLIENTINFO": "VERSION TIME PING CLIENTINFO SOURCE"
        }
        
        if ctcp_command in responses:
            connection.ctcp_reply(nick, f"{ctcp_command} {responses[ctcp_command]}")
        elif ctcp_command.startswith("PING"):
            ping_data = ctcp_command[5:] if len(ctcp_command) > 5 else ""
            connection.ctcp_reply(nick, f"PING {ping_data}")
        elif ctcp_command == "SOURCE":
            connection.ctcp_reply(nick, "SOURCE https://github.com/jaraco/irc")
    
    def offer_dcc_chat(self, connection, nick):
        """Offer DCC chat to user."""
        try:
            dcc = self.client.dcc("chat")
            dcc.listen()
            
            # Get connection details
            address = dcc.socket.getsockname()
            host_ip = "127.0.0.1"  # Use actual external IP in real implementation
            port = address[1]
            
            # Convert IP to integer format
            ip_parts = host_ip.split('.')
            ip_int = (int(ip_parts[0]) << 24) + (int(ip_parts[1]) << 16) + \
                     (int(ip_parts[2]) << 8) + int(ip_parts[3])
            
            # Send DCC CHAT offer
            dcc_msg = f"\x01DCC CHAT chat {ip_int} {port}\x01"
            connection.privmsg(nick, dcc_msg)
            
            self.dcc_connections[nick] = dcc
            print(f"Offered DCC chat to {nick} on port {port}")
            
        except Exception as e:
            print(f"Failed to offer DCC chat: {e}")
            connection.privmsg(nick, "Sorry, DCC chat is not available right now.")
    
    def show_extensions(self, connection, channel):
        """Show supported protocol extensions."""
        extensions = [
            "CTCP (Client-To-Client Protocol)",
            "DCC Chat (Direct Client-to-Client)",
            "IRCv3 Message Tags",
            "IRCv3 Batches", 
            "Server Feature Detection (ISUPPORT)"
        ]
        
        connection.privmsg(channel, "Supported extensions:")
        for ext in extensions:
            connection.privmsg(channel, f"  • {ext}")
    
    def start(self):
        """Start the bot."""
        self.client.connect("irc.libera.chat", 6667, "extensionbot")
        self.client.start()

# Usage
bot = ExtensionBot()
bot.start()

Install with Tessl CLI

npx tessl i tessl/pypi-irc

docs

asynchronous-client.md

bot-framework.md

connection-management.md

event-system.md

index.md

protocol-extensions.md

synchronous-client.md

utilities.md

tile.json