CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-i3ipc

Python library for controlling the i3 window manager and sway compositor through their IPC interface

Pending
Overview
Eval results
Files

async.mddocs/

Async API

Complete asynchronous interface for non-blocking i3/sway IPC communication. Provides async versions of all functionality with proper asyncio integration and concurrent event handling.

Capabilities

Async Connection

from i3ipc.aio import Connection

class Connection:
    def __init__(self, socket_path: Optional[str] = None, auto_reconnect: bool = False):
        """
        Create an async connection to i3/sway IPC socket.
        
        Parameters:
        - socket_path: Optional[str], path to IPC socket (auto-detected if None)
        - auto_reconnect: bool, whether to reconnect automatically on disconnect
        """
    
    async def connect(self) -> Connection:
        """
        Establish the async connection to i3/sway.
        
        Returns:
        Connection: connected instance ready for async operations
        """
    
    @property
    def socket_path(self) -> str:
        """Get the IPC socket path."""
    
    @property 
    def auto_reconect(self) -> bool:
        """Get the auto-reconnect setting."""

Async Query Methods

async def command(self, cmd: str) -> List[CommandReply]:
    """
    Execute i3/sway commands asynchronously.
    
    Parameters:
    - cmd: str, command string to execute
    
    Returns:
    List[CommandReply]: command execution results
    """

async def get_version(self) -> VersionReply:
    """
    Get i3/sway version information.
    
    Returns:
    VersionReply: version details and configuration
    """

async def get_bar_config_list(self) -> List[str]:
    """
    Get list of bar configuration IDs.
    
    Returns:
    List[str]: bar ID strings
    """

async def get_bar_config(self, bar_id=None) -> Optional[BarConfigReply]:
    """
    Get bar configuration details.
    
    Parameters:
    - bar_id: Optional[str], specific bar ID (first bar if None)
    
    Returns:
    Optional[BarConfigReply]: bar configuration
    """

async def get_outputs(self) -> List[OutputReply]:
    """
    Get output/monitor information.
    
    Returns:
    List[OutputReply]: output details
    """

async def get_workspaces(self) -> List[WorkspaceReply]:
    """
    Get workspace information.
    
    Returns:
    List[WorkspaceReply]: workspace details
    """

async def get_tree(self) -> Con:
    """
    Get the complete container tree.
    
    Returns:
    Con: root container with full hierarchy
    """

async def get_marks(self) -> List[str]:
    """
    Get all container marks.
    
    Returns:
    List[str]: mark names
    """

async def get_binding_modes(self) -> List[str]:
    """
    Get available binding modes.
    
    Returns:
    List[str]: binding mode names
    """

async def get_config(self) -> ConfigReply:
    """
    Get configuration file contents.
    
    Returns:
    ConfigReply: configuration data
    """

async def send_tick(self, payload: str = "") -> TickReply:
    """
    Send tick event with payload.
    
    Parameters:
    - payload: str, optional tick data
    
    Returns:
    TickReply: tick processing confirmation
    """

async def get_inputs(self) -> List[InputReply]:
    """
    Get input device information (sway only).
    
    Returns:
    List[InputReply]: input device details
    """

async def get_seats(self) -> List[SeatReply]:
    """
    Get seat information (sway only).
    
    Returns:
    List[SeatReply]: seat details
    """

Async Event Handling

async def subscribe(self, events: Union[List[Event], List[str]], force: bool = False) -> None:
    """
    Subscribe to events asynchronously.
    
    Parameters:
    - events: Union[List[Event], List[str]], events to subscribe to
    - force: bool, whether to force resubscription
    """

def on(self, event: Union[Event, str], handler: Callable[[Connection, IpcBaseEvent], None]) -> None:
    """
    Register event handler (supports both sync and async handlers).
    
    Parameters:
    - event: Union[Event, str], event type to handle
    - handler: Callable, handler function (can be sync or async)
    """

def off(self, handler: Callable[[Connection, IpcBaseEvent], None]) -> None:
    """
    Remove event handler from all subscriptions.
    
    Parameters:
    - handler: Callable, handler function to remove
    """

async def main(self) -> None:
    """
    Start the async event loop for processing events.
    """

def main_quit(self, _error=None) -> None:
    """
    Stop the async event loop.
    
    Parameters:
    - _error: Optional exception that caused the quit
    """

Container Async Methods

# Available on Con objects in async context
async def command(self, command: str) -> List[CommandReply]:
    """
    Execute command on container asynchronously.
    
    Parameters:
    - command: str, command to execute on this container
    
    Returns:
    List[CommandReply]: command execution results
    """

async def command_children(self, command: str) -> List[CommandReply]:
    """
    Execute command on child containers asynchronously.
    
    Parameters:
    - command: str, command to execute on each child
    
    Returns:
    List[CommandReply]: results for each child container
    """

Basic Async Usage

import asyncio
from i3ipc.aio import Connection
from i3ipc import Event

async def main():
    # Connect to i3/sway
    i3 = await Connection().connect()
    
    # Perform async queries
    workspaces = await i3.get_workspaces()
    print(f"Found {len(workspaces)} workspaces")
    
    tree = await i3.get_tree()
    focused = tree.find_focused()
    if focused:
        print(f"Focused window: {focused.name}")
    
    # Execute commands
    await i3.command('workspace 2')
    await i3.command('exec firefox')
    
    # Event handling with async handlers
    async def on_window_focus(i3, event):
        print(f"Window focused: {event.container.name}")
        # Can perform async operations here
        await i3.command(f'title_format "{event.container.name} - Focused"')
    
    # Mix sync and async handlers
    def on_workspace_change(i3, event):
        print(f"Workspace changed: {event.change}")
    
    i3.on(Event.WINDOW_FOCUS, on_window_focus)  # async handler
    i3.on(Event.WORKSPACE, on_workspace_change)  # sync handler
    
    # Start event processing
    await i3.main()

# Run the async application
asyncio.run(main())

Advanced Async Patterns

import asyncio
from i3ipc.aio import Connection
from i3ipc import Event, WorkspaceEvent, WindowEvent

async def monitor_system():
    """Demonstrate concurrent async operations."""
    i3 = await Connection(auto_reconnect=True).connect()
    
    # Concurrent tasks
    async def workspace_monitor():
        """Monitor workspace changes."""
        def on_workspace(i3, event: WorkspaceEvent):
            if event.change == 'focus':
                print(f"Switched to: {event.current.name}")
        
        i3.on(Event.WORKSPACE_FOCUS, on_workspace)
    
    async def window_monitor():
        """Monitor window events."""
        async def on_window(i3, event: WindowEvent):
            if event.change == 'new':
                print(f"New window: {event.container.name}")
                # Async operation in event handler
                await i3.command(f'[con_id={event.container.id}] mark new_window')
        
        i3.on(Event.WINDOW_NEW, on_window)
    
    async def periodic_status():
        """Periodically report system status."""
        while True:
            await asyncio.sleep(10)
            workspaces = await i3.get_workspaces()
            active_count = sum(1 for ws in workspaces if ws.visible)
            print(f"Status: {active_count} active workspaces")
    
    # Start all monitoring tasks concurrently
    await asyncio.gather(
        workspace_monitor(),
        window_monitor(), 
        periodic_status(),
        i3.main()  # Event loop
    )

# Handle connection errors gracefully
async def robust_connection():
    """Demonstrate error handling with auto-reconnect."""
    try:
        i3 = await Connection(auto_reconnect=True).connect()
        
        def on_error(i3, event):
            print("Connection error, will auto-reconnect...")
        
        # Monitor connection health
        while True:
            try:
                version = await i3.get_version()
                print(f"Connected to {version.human_readable}")
                await asyncio.sleep(30)
            except Exception as e:
                print(f"Connection check failed: {e}")
                await asyncio.sleep(5)
                
    except Exception as e:
        print(f"Failed to establish connection: {e}")

asyncio.run(robust_connection())

Install with Tessl CLI

npx tessl i tessl/pypi-i3ipc

docs

async.md

commands.md

connection.md

containers.md

events.md

index.md

outputs.md

workspaces.md

tile.json