or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asynchronous-decorators.mdconcurrent-decorators.mdfuture-types-exceptions.mdindex.mdprocess-pools.mdsynchronization-utilities.mdthread-pools.md

asynchronous-decorators.mddocs/

0

# Asynchronous Decorators

1

2

AsyncIO-compatible decorators for thread and process-based execution that return `asyncio.Future` objects. Perfect for integration with async/await patterns and AsyncIO applications, allowing seamless mixing of concurrent execution with asynchronous programming.

3

4

## Capabilities

5

6

### AsyncIO Thread Decorator

7

8

Executes the decorated function in a separate thread and returns an `asyncio.Future` object. Ideal for I/O-bound tasks in AsyncIO applications where you need to call synchronous functions without blocking the event loop.

9

10

```python { .api }

11

def thread(

12

func: Callable = None,

13

*,

14

name: Optional[str] = None,

15

daemon: bool = True,

16

pool: Optional[ThreadPool] = None

17

) -> Callable[..., asyncio.Future]:

18

"""

19

AsyncIO decorator for thread-based concurrent execution.

20

21

Parameters:

22

- func: Function to decorate (when used without parameters)

23

- name: Thread name for identification and debugging

24

- daemon: Whether thread runs as daemon (doesn't prevent program exit)

25

- pool: Existing ThreadPool instance to use instead of creating new thread

26

27

Returns:

28

Decorated function that returns asyncio.Future when called

29

"""

30

```

31

32

#### Usage Examples

33

34

```python

35

import asyncio

36

from pebble.asynchronous import thread

37

import time

38

import requests

39

40

# Simple usage for blocking I/O

41

@thread

42

def fetch_data(url):

43

# Synchronous I/O that would block event loop

44

response = requests.get(url)

45

return response.json()

46

47

# Usage with parameters

48

@thread(name="file-processor", daemon=False)

49

def process_file(filename):

50

with open(filename, 'r') as f:

51

return len(f.read())

52

53

# Using with existing pool

54

from pebble import ThreadPool

55

56

pool = ThreadPool(max_workers=4)

57

58

@thread(pool=pool)

59

def cpu_task(n):

60

return sum(i ** 2 for i in range(n))

61

62

# AsyncIO application

63

async def main():

64

# Schedule multiple concurrent operations

65

tasks = [

66

fetch_data("https://api.example.com/data1"),

67

fetch_data("https://api.example.com/data2"),

68

process_file("large_file.txt"),

69

cpu_task(1000)

70

]

71

72

# Wait for all to complete

73

results = await asyncio.gather(*tasks)

74

print(f"Results: {results}")

75

76

# Or process as they complete

77

for coro in asyncio.as_completed(tasks):

78

result = await coro

79

print(f"Completed: {result}")

80

81

# Run the AsyncIO application

82

asyncio.run(main())

83

```

84

85

### AsyncIO Process Decorator

86

87

Executes the decorated function in a separate process and returns an `asyncio.Future` object. Perfect for CPU-intensive tasks in AsyncIO applications that need true parallelism without blocking the event loop.

88

89

```python { .api }

90

def process(

91

func: Callable = None,

92

*,

93

name: Optional[str] = None,

94

daemon: bool = True,

95

timeout: Optional[float] = None,

96

mp_context: Optional[multiprocessing.context.BaseContext] = None,

97

pool: Optional[ProcessPool] = None

98

) -> Callable[..., asyncio.Future]:

99

"""

100

AsyncIO decorator for process-based concurrent execution.

101

102

Parameters:

103

- func: Function to decorate (when used without parameters)

104

- name: Process name for identification and debugging

105

- daemon: Whether process runs as daemon (doesn't prevent program exit)

106

- timeout: Maximum execution time in seconds (raises TimeoutError if exceeded)

107

- mp_context: Multiprocessing context for process creation

108

- pool: Existing ProcessPool instance to use instead of creating new process

109

110

Returns:

111

Decorated function that returns asyncio.Future when called

112

"""

113

```

114

115

#### Usage Examples

116

117

```python

118

import asyncio

119

import multiprocessing

120

from pebble.asynchronous import process

121

122

# CPU-intensive task

123

@process

124

def heavy_computation(data):

125

# Simulate heavy CPU work

126

result = 0

127

for item in data:

128

result += item ** 3

129

return result

130

131

# With timeout for long-running tasks

132

@process(timeout=30.0)

133

def data_analysis(dataset):

134

# Simulate data analysis that might take a while

135

import time

136

time.sleep(5) # Simulate processing

137

return {"mean": sum(dataset) / len(dataset), "size": len(dataset)}

138

139

# Using custom multiprocessing context

140

ctx = multiprocessing.get_context('spawn')

141

142

@process(mp_context=ctx, name="isolated-worker")

143

def isolated_task(config):

144

# Task that needs process isolation

145

return config["value"] * 2

146

147

# AsyncIO application with CPU-intensive tasks

148

async def data_pipeline():

149

# Generate data

150

datasets = [

151

list(range(1000 * i, 1000 * (i + 1)))

152

for i in range(5)

153

]

154

155

# Process datasets concurrently

156

computation_tasks = [

157

heavy_computation(dataset)

158

for dataset in datasets

159

]

160

161

analysis_tasks = [

162

data_analysis(dataset)

163

for dataset in datasets

164

]

165

166

# Wait for computations

167

print("Starting heavy computations...")

168

computation_results = await asyncio.gather(*computation_tasks)

169

170

# Wait for analysis

171

print("Starting data analysis...")

172

analysis_results = await asyncio.gather(*analysis_tasks)

173

174

print(f"Computation results: {computation_results}")

175

print(f"Analysis results: {analysis_results}")

176

177

# Mix with other async operations

178

isolated_result = await isolated_task({"value": 42})

179

print(f"Isolated result: {isolated_result}")

180

181

# Run the pipeline

182

asyncio.run(data_pipeline())

183

```

184

185

### Integration with AsyncIO Patterns

186

187

The asynchronous decorators integrate seamlessly with AsyncIO patterns and utilities:

188

189

```python

190

import asyncio

191

from pebble.asynchronous import thread, process

192

193

@thread

194

def sync_io_operation(url):

195

import requests

196

return requests.get(url).json()

197

198

@process

199

def cpu_bound_task(n):

200

return sum(i * i for i in range(n))

201

202

async def advanced_patterns():

203

# Using asyncio.wait with timeout

204

tasks = [

205

sync_io_operation("https://api1.example.com"),

206

sync_io_operation("https://api2.example.com"),

207

cpu_bound_task(1000000)

208

]

209

210

done, pending = await asyncio.wait(

211

tasks,

212

timeout=10.0,

213

return_when=asyncio.FIRST_COMPLETED

214

)

215

216

# Cancel pending tasks

217

for task in pending:

218

task.cancel()

219

220

# Process completed tasks

221

for task in done:

222

try:

223

result = await task

224

print(f"Completed: {result}")

225

except Exception as e:

226

print(f"Failed: {e}")

227

228

# Error handling with AsyncIO

229

async def error_handling_example():

230

@process(timeout=2.0)

231

def might_timeout():

232

import time

233

time.sleep(5) # Will timeout

234

return "Done"

235

236

@thread

237

def might_fail():

238

raise ValueError("Something went wrong")

239

240

try:

241

result1 = await might_timeout()

242

except asyncio.TimeoutError:

243

print("Process timed out")

244

245

try:

246

result2 = await might_fail()

247

except ValueError as e:

248

print(f"Thread failed: {e}")

249

250

asyncio.run(advanced_patterns())

251

asyncio.run(error_handling_example())

252

```

253

254

### AsyncIO Context Management

255

256

Using asynchronous decorators with AsyncIO context managers and resource management:

257

258

```python

259

import asyncio

260

from pebble.asynchronous import thread

261

from contextlib import asynccontextmanager

262

263

@thread

264

def database_query(query):

265

# Simulate database query

266

import time

267

time.sleep(1)

268

return f"Result for: {query}"

269

270

@asynccontextmanager

271

async def database_session():

272

print("Opening database session")

273

try:

274

yield "session"

275

finally:

276

print("Closing database session")

277

278

async def database_operations():

279

async with database_session() as session:

280

# Execute multiple queries concurrently

281

queries = [

282

database_query("SELECT * FROM users"),

283

database_query("SELECT * FROM orders"),

284

database_query("SELECT * FROM products")

285

]

286

287

results = await asyncio.gather(*queries)

288

return results

289

290

# Resource cleanup with asyncio

291

async def resource_management():

292

tasks = []

293

294

try:

295

# Start multiple background tasks

296

for i in range(5):

297

task = database_query(f"Query {i}")

298

tasks.append(task)

299

300

# Wait for all with timeout

301

results = await asyncio.wait_for(

302

asyncio.gather(*tasks),

303

timeout=10.0

304

)

305

306

return results

307

308

except asyncio.TimeoutError:

309

print("Operations timed out, cleaning up...")

310

# Tasks are automatically cancelled by wait_for

311

return None

312

313

asyncio.run(database_operations())

314

asyncio.run(resource_management())

315

```