0
# Async API
1
2
Complete asynchronous interface for non-blocking i3/sway IPC communication. Provides async versions of all functionality with proper asyncio integration and concurrent event handling.
3
4
## Capabilities
5
6
### Async Connection
7
8
```python { .api }
9
from i3ipc.aio import Connection
10
11
class Connection:
12
def __init__(self, socket_path: Optional[str] = None, auto_reconnect: bool = False):
13
"""
14
Create an async connection to i3/sway IPC socket.
15
16
Parameters:
17
- socket_path: Optional[str], path to IPC socket (auto-detected if None)
18
- auto_reconnect: bool, whether to reconnect automatically on disconnect
19
"""
20
21
async def connect(self) -> Connection:
22
"""
23
Establish the async connection to i3/sway.
24
25
Returns:
26
Connection: connected instance ready for async operations
27
"""
28
29
@property
30
def socket_path(self) -> str:
31
"""Get the IPC socket path."""
32
33
@property
34
def auto_reconect(self) -> bool:
35
"""Get the auto-reconnect setting."""
36
```
37
38
### Async Query Methods
39
40
```python { .api }
41
async def command(self, cmd: str) -> List[CommandReply]:
42
"""
43
Execute i3/sway commands asynchronously.
44
45
Parameters:
46
- cmd: str, command string to execute
47
48
Returns:
49
List[CommandReply]: command execution results
50
"""
51
52
async def get_version(self) -> VersionReply:
53
"""
54
Get i3/sway version information.
55
56
Returns:
57
VersionReply: version details and configuration
58
"""
59
60
async def get_bar_config_list(self) -> List[str]:
61
"""
62
Get list of bar configuration IDs.
63
64
Returns:
65
List[str]: bar ID strings
66
"""
67
68
async def get_bar_config(self, bar_id=None) -> Optional[BarConfigReply]:
69
"""
70
Get bar configuration details.
71
72
Parameters:
73
- bar_id: Optional[str], specific bar ID (first bar if None)
74
75
Returns:
76
Optional[BarConfigReply]: bar configuration
77
"""
78
79
async def get_outputs(self) -> List[OutputReply]:
80
"""
81
Get output/monitor information.
82
83
Returns:
84
List[OutputReply]: output details
85
"""
86
87
async def get_workspaces(self) -> List[WorkspaceReply]:
88
"""
89
Get workspace information.
90
91
Returns:
92
List[WorkspaceReply]: workspace details
93
"""
94
95
async def get_tree(self) -> Con:
96
"""
97
Get the complete container tree.
98
99
Returns:
100
Con: root container with full hierarchy
101
"""
102
103
async def get_marks(self) -> List[str]:
104
"""
105
Get all container marks.
106
107
Returns:
108
List[str]: mark names
109
"""
110
111
async def get_binding_modes(self) -> List[str]:
112
"""
113
Get available binding modes.
114
115
Returns:
116
List[str]: binding mode names
117
"""
118
119
async def get_config(self) -> ConfigReply:
120
"""
121
Get configuration file contents.
122
123
Returns:
124
ConfigReply: configuration data
125
"""
126
127
async def send_tick(self, payload: str = "") -> TickReply:
128
"""
129
Send tick event with payload.
130
131
Parameters:
132
- payload: str, optional tick data
133
134
Returns:
135
TickReply: tick processing confirmation
136
"""
137
138
async def get_inputs(self) -> List[InputReply]:
139
"""
140
Get input device information (sway only).
141
142
Returns:
143
List[InputReply]: input device details
144
"""
145
146
async def get_seats(self) -> List[SeatReply]:
147
"""
148
Get seat information (sway only).
149
150
Returns:
151
List[SeatReply]: seat details
152
"""
153
```
154
155
### Async Event Handling
156
157
```python { .api }
158
async def subscribe(self, events: Union[List[Event], List[str]], force: bool = False) -> None:
159
"""
160
Subscribe to events asynchronously.
161
162
Parameters:
163
- events: Union[List[Event], List[str]], events to subscribe to
164
- force: bool, whether to force resubscription
165
"""
166
167
def on(self, event: Union[Event, str], handler: Callable[[Connection, IpcBaseEvent], None]) -> None:
168
"""
169
Register event handler (supports both sync and async handlers).
170
171
Parameters:
172
- event: Union[Event, str], event type to handle
173
- handler: Callable, handler function (can be sync or async)
174
"""
175
176
def off(self, handler: Callable[[Connection, IpcBaseEvent], None]) -> None:
177
"""
178
Remove event handler from all subscriptions.
179
180
Parameters:
181
- handler: Callable, handler function to remove
182
"""
183
184
async def main(self) -> None:
185
"""
186
Start the async event loop for processing events.
187
"""
188
189
def main_quit(self, _error=None) -> None:
190
"""
191
Stop the async event loop.
192
193
Parameters:
194
- _error: Optional exception that caused the quit
195
"""
196
```
197
198
### Container Async Methods
199
200
```python { .api }
201
# Available on Con objects in async context
202
async def command(self, command: str) -> List[CommandReply]:
203
"""
204
Execute command on container asynchronously.
205
206
Parameters:
207
- command: str, command to execute on this container
208
209
Returns:
210
List[CommandReply]: command execution results
211
"""
212
213
async def command_children(self, command: str) -> List[CommandReply]:
214
"""
215
Execute command on child containers asynchronously.
216
217
Parameters:
218
- command: str, command to execute on each child
219
220
Returns:
221
List[CommandReply]: results for each child container
222
"""
223
```
224
225
### Basic Async Usage
226
227
```python
228
import asyncio
229
from i3ipc.aio import Connection
230
from i3ipc import Event
231
232
async def main():
233
# Connect to i3/sway
234
i3 = await Connection().connect()
235
236
# Perform async queries
237
workspaces = await i3.get_workspaces()
238
print(f"Found {len(workspaces)} workspaces")
239
240
tree = await i3.get_tree()
241
focused = tree.find_focused()
242
if focused:
243
print(f"Focused window: {focused.name}")
244
245
# Execute commands
246
await i3.command('workspace 2')
247
await i3.command('exec firefox')
248
249
# Event handling with async handlers
250
async def on_window_focus(i3, event):
251
print(f"Window focused: {event.container.name}")
252
# Can perform async operations here
253
await i3.command(f'title_format "{event.container.name} - Focused"')
254
255
# Mix sync and async handlers
256
def on_workspace_change(i3, event):
257
print(f"Workspace changed: {event.change}")
258
259
i3.on(Event.WINDOW_FOCUS, on_window_focus) # async handler
260
i3.on(Event.WORKSPACE, on_workspace_change) # sync handler
261
262
# Start event processing
263
await i3.main()
264
265
# Run the async application
266
asyncio.run(main())
267
```
268
269
### Advanced Async Patterns
270
271
```python
272
import asyncio
273
from i3ipc.aio import Connection
274
from i3ipc import Event, WorkspaceEvent, WindowEvent
275
276
async def monitor_system():
277
"""Demonstrate concurrent async operations."""
278
i3 = await Connection(auto_reconnect=True).connect()
279
280
# Concurrent tasks
281
async def workspace_monitor():
282
"""Monitor workspace changes."""
283
def on_workspace(i3, event: WorkspaceEvent):
284
if event.change == 'focus':
285
print(f"Switched to: {event.current.name}")
286
287
i3.on(Event.WORKSPACE_FOCUS, on_workspace)
288
289
async def window_monitor():
290
"""Monitor window events."""
291
async def on_window(i3, event: WindowEvent):
292
if event.change == 'new':
293
print(f"New window: {event.container.name}")
294
# Async operation in event handler
295
await i3.command(f'[con_id={event.container.id}] mark new_window')
296
297
i3.on(Event.WINDOW_NEW, on_window)
298
299
async def periodic_status():
300
"""Periodically report system status."""
301
while True:
302
await asyncio.sleep(10)
303
workspaces = await i3.get_workspaces()
304
active_count = sum(1 for ws in workspaces if ws.visible)
305
print(f"Status: {active_count} active workspaces")
306
307
# Start all monitoring tasks concurrently
308
await asyncio.gather(
309
workspace_monitor(),
310
window_monitor(),
311
periodic_status(),
312
i3.main() # Event loop
313
)
314
315
# Handle connection errors gracefully
316
async def robust_connection():
317
"""Demonstrate error handling with auto-reconnect."""
318
try:
319
i3 = await Connection(auto_reconnect=True).connect()
320
321
def on_error(i3, event):
322
print("Connection error, will auto-reconnect...")
323
324
# Monitor connection health
325
while True:
326
try:
327
version = await i3.get_version()
328
print(f"Connected to {version.human_readable}")
329
await asyncio.sleep(30)
330
except Exception as e:
331
print(f"Connection check failed: {e}")
332
await asyncio.sleep(5)
333
334
except Exception as e:
335
print(f"Failed to establish connection: {e}")
336
337
asyncio.run(robust_connection())
338
```