or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async.mdcommands.mdconnection.mdcontainers.mdevents.mdindex.mdoutputs.mdworkspaces.md

async.mddocs/

0

# Async API

1

2

Complete asynchronous interface for non-blocking i3/sway IPC communication. Provides async versions of all functionality with proper asyncio integration and concurrent event handling.

3

4

## Capabilities

5

6

### Async Connection

7

8

```python { .api }

9

from i3ipc.aio import Connection

10

11

class Connection:

12

def __init__(self, socket_path: Optional[str] = None, auto_reconnect: bool = False):

13

"""

14

Create an async connection to i3/sway IPC socket.

15

16

Parameters:

17

- socket_path: Optional[str], path to IPC socket (auto-detected if None)

18

- auto_reconnect: bool, whether to reconnect automatically on disconnect

19

"""

20

21

async def connect(self) -> Connection:

22

"""

23

Establish the async connection to i3/sway.

24

25

Returns:

26

Connection: connected instance ready for async operations

27

"""

28

29

@property

30

def socket_path(self) -> str:

31

"""Get the IPC socket path."""

32

33

@property

34

def auto_reconect(self) -> bool:

35

"""Get the auto-reconnect setting."""

36

```

37

38

### Async Query Methods

39

40

```python { .api }

41

async def command(self, cmd: str) -> List[CommandReply]:

42

"""

43

Execute i3/sway commands asynchronously.

44

45

Parameters:

46

- cmd: str, command string to execute

47

48

Returns:

49

List[CommandReply]: command execution results

50

"""

51

52

async def get_version(self) -> VersionReply:

53

"""

54

Get i3/sway version information.

55

56

Returns:

57

VersionReply: version details and configuration

58

"""

59

60

async def get_bar_config_list(self) -> List[str]:

61

"""

62

Get list of bar configuration IDs.

63

64

Returns:

65

List[str]: bar ID strings

66

"""

67

68

async def get_bar_config(self, bar_id=None) -> Optional[BarConfigReply]:

69

"""

70

Get bar configuration details.

71

72

Parameters:

73

- bar_id: Optional[str], specific bar ID (first bar if None)

74

75

Returns:

76

Optional[BarConfigReply]: bar configuration

77

"""

78

79

async def get_outputs(self) -> List[OutputReply]:

80

"""

81

Get output/monitor information.

82

83

Returns:

84

List[OutputReply]: output details

85

"""

86

87

async def get_workspaces(self) -> List[WorkspaceReply]:

88

"""

89

Get workspace information.

90

91

Returns:

92

List[WorkspaceReply]: workspace details

93

"""

94

95

async def get_tree(self) -> Con:

96

"""

97

Get the complete container tree.

98

99

Returns:

100

Con: root container with full hierarchy

101

"""

102

103

async def get_marks(self) -> List[str]:

104

"""

105

Get all container marks.

106

107

Returns:

108

List[str]: mark names

109

"""

110

111

async def get_binding_modes(self) -> List[str]:

112

"""

113

Get available binding modes.

114

115

Returns:

116

List[str]: binding mode names

117

"""

118

119

async def get_config(self) -> ConfigReply:

120

"""

121

Get configuration file contents.

122

123

Returns:

124

ConfigReply: configuration data

125

"""

126

127

async def send_tick(self, payload: str = "") -> TickReply:

128

"""

129

Send tick event with payload.

130

131

Parameters:

132

- payload: str, optional tick data

133

134

Returns:

135

TickReply: tick processing confirmation

136

"""

137

138

async def get_inputs(self) -> List[InputReply]:

139

"""

140

Get input device information (sway only).

141

142

Returns:

143

List[InputReply]: input device details

144

"""

145

146

async def get_seats(self) -> List[SeatReply]:

147

"""

148

Get seat information (sway only).

149

150

Returns:

151

List[SeatReply]: seat details

152

"""

153

```

154

155

### Async Event Handling

156

157

```python { .api }

158

async def subscribe(self, events: Union[List[Event], List[str]], force: bool = False) -> None:

159

"""

160

Subscribe to events asynchronously.

161

162

Parameters:

163

- events: Union[List[Event], List[str]], events to subscribe to

164

- force: bool, whether to force resubscription

165

"""

166

167

def on(self, event: Union[Event, str], handler: Callable[[Connection, IpcBaseEvent], None]) -> None:

168

"""

169

Register event handler (supports both sync and async handlers).

170

171

Parameters:

172

- event: Union[Event, str], event type to handle

173

- handler: Callable, handler function (can be sync or async)

174

"""

175

176

def off(self, handler: Callable[[Connection, IpcBaseEvent], None]) -> None:

177

"""

178

Remove event handler from all subscriptions.

179

180

Parameters:

181

- handler: Callable, handler function to remove

182

"""

183

184

async def main(self) -> None:

185

"""

186

Start the async event loop for processing events.

187

"""

188

189

def main_quit(self, _error=None) -> None:

190

"""

191

Stop the async event loop.

192

193

Parameters:

194

- _error: Optional exception that caused the quit

195

"""

196

```

197

198

### Container Async Methods

199

200

```python { .api }

201

# Available on Con objects in async context

202

async def command(self, command: str) -> List[CommandReply]:

203

"""

204

Execute command on container asynchronously.

205

206

Parameters:

207

- command: str, command to execute on this container

208

209

Returns:

210

List[CommandReply]: command execution results

211

"""

212

213

async def command_children(self, command: str) -> List[CommandReply]:

214

"""

215

Execute command on child containers asynchronously.

216

217

Parameters:

218

- command: str, command to execute on each child

219

220

Returns:

221

List[CommandReply]: results for each child container

222

"""

223

```

224

225

### Basic Async Usage

226

227

```python

228

import asyncio

229

from i3ipc.aio import Connection

230

from i3ipc import Event

231

232

async def main():

233

# Connect to i3/sway

234

i3 = await Connection().connect()

235

236

# Perform async queries

237

workspaces = await i3.get_workspaces()

238

print(f"Found {len(workspaces)} workspaces")

239

240

tree = await i3.get_tree()

241

focused = tree.find_focused()

242

if focused:

243

print(f"Focused window: {focused.name}")

244

245

# Execute commands

246

await i3.command('workspace 2')

247

await i3.command('exec firefox')

248

249

# Event handling with async handlers

250

async def on_window_focus(i3, event):

251

print(f"Window focused: {event.container.name}")

252

# Can perform async operations here

253

await i3.command(f'title_format "{event.container.name} - Focused"')

254

255

# Mix sync and async handlers

256

def on_workspace_change(i3, event):

257

print(f"Workspace changed: {event.change}")

258

259

i3.on(Event.WINDOW_FOCUS, on_window_focus) # async handler

260

i3.on(Event.WORKSPACE, on_workspace_change) # sync handler

261

262

# Start event processing

263

await i3.main()

264

265

# Run the async application

266

asyncio.run(main())

267

```

268

269

### Advanced Async Patterns

270

271

```python

272

import asyncio

273

from i3ipc.aio import Connection

274

from i3ipc import Event, WorkspaceEvent, WindowEvent

275

276

async def monitor_system():

277

"""Demonstrate concurrent async operations."""

278

i3 = await Connection(auto_reconnect=True).connect()

279

280

# Concurrent tasks

281

async def workspace_monitor():

282

"""Monitor workspace changes."""

283

def on_workspace(i3, event: WorkspaceEvent):

284

if event.change == 'focus':

285

print(f"Switched to: {event.current.name}")

286

287

i3.on(Event.WORKSPACE_FOCUS, on_workspace)

288

289

async def window_monitor():

290

"""Monitor window events."""

291

async def on_window(i3, event: WindowEvent):

292

if event.change == 'new':

293

print(f"New window: {event.container.name}")

294

# Async operation in event handler

295

await i3.command(f'[con_id={event.container.id}] mark new_window')

296

297

i3.on(Event.WINDOW_NEW, on_window)

298

299

async def periodic_status():

300

"""Periodically report system status."""

301

while True:

302

await asyncio.sleep(10)

303

workspaces = await i3.get_workspaces()

304

active_count = sum(1 for ws in workspaces if ws.visible)

305

print(f"Status: {active_count} active workspaces")

306

307

# Start all monitoring tasks concurrently

308

await asyncio.gather(

309

workspace_monitor(),

310

window_monitor(),

311

periodic_status(),

312

i3.main() # Event loop

313

)

314

315

# Handle connection errors gracefully

316

async def robust_connection():

317

"""Demonstrate error handling with auto-reconnect."""

318

try:

319

i3 = await Connection(auto_reconnect=True).connect()

320

321

def on_error(i3, event):

322

print("Connection error, will auto-reconnect...")

323

324

# Monitor connection health

325

while True:

326

try:

327

version = await i3.get_version()

328

print(f"Connected to {version.human_readable}")

329

await asyncio.sleep(30)

330

except Exception as e:

331

print(f"Connection check failed: {e}")

332

await asyncio.sleep(5)

333

334

except Exception as e:

335

print(f"Failed to establish connection: {e}")

336

337

asyncio.run(robust_connection())

338

```