Python library for controlling the i3 window manager and sway compositor through their IPC interface
—
Complete asynchronous interface for non-blocking i3/sway IPC communication. Provides async versions of all functionality with proper asyncio integration and concurrent event handling.
from i3ipc.aio import Connection
class Connection:
def __init__(self, socket_path: Optional[str] = None, auto_reconnect: bool = False):
"""
Create an async connection to i3/sway IPC socket.
Parameters:
- socket_path: Optional[str], path to IPC socket (auto-detected if None)
- auto_reconnect: bool, whether to reconnect automatically on disconnect
"""
async def connect(self) -> Connection:
"""
Establish the async connection to i3/sway.
Returns:
Connection: connected instance ready for async operations
"""
@property
def socket_path(self) -> str:
"""Get the IPC socket path."""
@property
def auto_reconect(self) -> bool:
"""Get the auto-reconnect setting."""async def command(self, cmd: str) -> List[CommandReply]:
"""
Execute i3/sway commands asynchronously.
Parameters:
- cmd: str, command string to execute
Returns:
List[CommandReply]: command execution results
"""
async def get_version(self) -> VersionReply:
"""
Get i3/sway version information.
Returns:
VersionReply: version details and configuration
"""
async def get_bar_config_list(self) -> List[str]:
"""
Get list of bar configuration IDs.
Returns:
List[str]: bar ID strings
"""
async def get_bar_config(self, bar_id=None) -> Optional[BarConfigReply]:
"""
Get bar configuration details.
Parameters:
- bar_id: Optional[str], specific bar ID (first bar if None)
Returns:
Optional[BarConfigReply]: bar configuration
"""
async def get_outputs(self) -> List[OutputReply]:
"""
Get output/monitor information.
Returns:
List[OutputReply]: output details
"""
async def get_workspaces(self) -> List[WorkspaceReply]:
"""
Get workspace information.
Returns:
List[WorkspaceReply]: workspace details
"""
async def get_tree(self) -> Con:
"""
Get the complete container tree.
Returns:
Con: root container with full hierarchy
"""
async def get_marks(self) -> List[str]:
"""
Get all container marks.
Returns:
List[str]: mark names
"""
async def get_binding_modes(self) -> List[str]:
"""
Get available binding modes.
Returns:
List[str]: binding mode names
"""
async def get_config(self) -> ConfigReply:
"""
Get configuration file contents.
Returns:
ConfigReply: configuration data
"""
async def send_tick(self, payload: str = "") -> TickReply:
"""
Send tick event with payload.
Parameters:
- payload: str, optional tick data
Returns:
TickReply: tick processing confirmation
"""
async def get_inputs(self) -> List[InputReply]:
"""
Get input device information (sway only).
Returns:
List[InputReply]: input device details
"""
async def get_seats(self) -> List[SeatReply]:
"""
Get seat information (sway only).
Returns:
List[SeatReply]: seat details
"""async def subscribe(self, events: Union[List[Event], List[str]], force: bool = False) -> None:
"""
Subscribe to events asynchronously.
Parameters:
- events: Union[List[Event], List[str]], events to subscribe to
- force: bool, whether to force resubscription
"""
def on(self, event: Union[Event, str], handler: Callable[[Connection, IpcBaseEvent], None]) -> None:
"""
Register event handler (supports both sync and async handlers).
Parameters:
- event: Union[Event, str], event type to handle
- handler: Callable, handler function (can be sync or async)
"""
def off(self, handler: Callable[[Connection, IpcBaseEvent], None]) -> None:
"""
Remove event handler from all subscriptions.
Parameters:
- handler: Callable, handler function to remove
"""
async def main(self) -> None:
"""
Start the async event loop for processing events.
"""
def main_quit(self, _error=None) -> None:
"""
Stop the async event loop.
Parameters:
- _error: Optional exception that caused the quit
"""# Available on Con objects in async context
async def command(self, command: str) -> List[CommandReply]:
"""
Execute command on container asynchronously.
Parameters:
- command: str, command to execute on this container
Returns:
List[CommandReply]: command execution results
"""
async def command_children(self, command: str) -> List[CommandReply]:
"""
Execute command on child containers asynchronously.
Parameters:
- command: str, command to execute on each child
Returns:
List[CommandReply]: results for each child container
"""import asyncio
from i3ipc.aio import Connection
from i3ipc import Event
async def main():
# Connect to i3/sway
i3 = await Connection().connect()
# Perform async queries
workspaces = await i3.get_workspaces()
print(f"Found {len(workspaces)} workspaces")
tree = await i3.get_tree()
focused = tree.find_focused()
if focused:
print(f"Focused window: {focused.name}")
# Execute commands
await i3.command('workspace 2')
await i3.command('exec firefox')
# Event handling with async handlers
async def on_window_focus(i3, event):
print(f"Window focused: {event.container.name}")
# Can perform async operations here
await i3.command(f'title_format "{event.container.name} - Focused"')
# Mix sync and async handlers
def on_workspace_change(i3, event):
print(f"Workspace changed: {event.change}")
i3.on(Event.WINDOW_FOCUS, on_window_focus) # async handler
i3.on(Event.WORKSPACE, on_workspace_change) # sync handler
# Start event processing
await i3.main()
# Run the async application
asyncio.run(main())import asyncio
from i3ipc.aio import Connection
from i3ipc import Event, WorkspaceEvent, WindowEvent
async def monitor_system():
"""Demonstrate concurrent async operations."""
i3 = await Connection(auto_reconnect=True).connect()
# Concurrent tasks
async def workspace_monitor():
"""Monitor workspace changes."""
def on_workspace(i3, event: WorkspaceEvent):
if event.change == 'focus':
print(f"Switched to: {event.current.name}")
i3.on(Event.WORKSPACE_FOCUS, on_workspace)
async def window_monitor():
"""Monitor window events."""
async def on_window(i3, event: WindowEvent):
if event.change == 'new':
print(f"New window: {event.container.name}")
# Async operation in event handler
await i3.command(f'[con_id={event.container.id}] mark new_window')
i3.on(Event.WINDOW_NEW, on_window)
async def periodic_status():
"""Periodically report system status."""
while True:
await asyncio.sleep(10)
workspaces = await i3.get_workspaces()
active_count = sum(1 for ws in workspaces if ws.visible)
print(f"Status: {active_count} active workspaces")
# Start all monitoring tasks concurrently
await asyncio.gather(
workspace_monitor(),
window_monitor(),
periodic_status(),
i3.main() # Event loop
)
# Handle connection errors gracefully
async def robust_connection():
"""Demonstrate error handling with auto-reconnect."""
try:
i3 = await Connection(auto_reconnect=True).connect()
def on_error(i3, event):
print("Connection error, will auto-reconnect...")
# Monitor connection health
while True:
try:
version = await i3.get_version()
print(f"Connected to {version.human_readable}")
await asyncio.sleep(30)
except Exception as e:
print(f"Connection check failed: {e}")
await asyncio.sleep(5)
except Exception as e:
print(f"Failed to establish connection: {e}")
asyncio.run(robust_connection())Install with Tessl CLI
npx tessl i tessl/pypi-i3ipc