or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asynchronous-decorators.mdconcurrent-decorators.mdfuture-types-exceptions.mdindex.mdprocess-pools.mdsynchronization-utilities.mdthread-pools.md

concurrent-decorators.mddocs/

0

# Concurrent Decorators

1

2

Synchronous decorators for thread and process-based execution that return `concurrent.futures.Future` objects. These decorators provide the simplest way to execute functions concurrently with minimal code changes.

3

4

## Capabilities

5

6

### Thread Decorator

7

8

Executes the decorated function in a separate thread and returns a `concurrent.futures.Future` object. Ideal for I/O-bound tasks that can benefit from concurrent execution without the overhead of process creation.

9

10

```python { .api }

11

def thread(

12

func: Callable = None,

13

*,

14

name: Optional[str] = None,

15

daemon: bool = True,

16

pool: Optional[ThreadPool] = None

17

) -> Callable[..., Future]:

18

"""

19

Decorator for thread-based concurrent execution.

20

21

Parameters:

22

- func: Function to decorate (when used without parameters)

23

- name: Thread name for identification and debugging

24

- daemon: Whether thread runs as daemon (doesn't prevent program exit)

25

- pool: Existing ThreadPool instance to use instead of creating new thread

26

27

Returns:

28

Decorated function that returns concurrent.futures.Future when called

29

"""

30

```

31

32

#### Usage Examples

33

34

```python

35

from pebble.concurrent import thread

36

import time

37

38

# Simple usage without parameters

39

@thread

40

def io_task(url):

41

# Simulate I/O operation

42

time.sleep(1)

43

return f"Data from {url}"

44

45

# Usage with parameters

46

@thread(name="background-worker", daemon=False)

47

def background_task(data):

48

# Process data in background

49

return len(data) * 2

50

51

# Using with existing pool

52

from pebble import ThreadPool

53

54

pool = ThreadPool(max_workers=4)

55

56

@thread(pool=pool)

57

def pooled_task(x):

58

return x ** 2

59

60

# Calling decorated functions

61

future1 = io_task("https://api.example.com")

62

future2 = background_task([1, 2, 3, 4])

63

future3 = pooled_task(5)

64

65

# Get results

66

result1 = future1.result() # Blocks until complete

67

result2 = future2.result(timeout=10) # With timeout

68

result3 = future3.result()

69

70

print(f"Results: {result1}, {result2}, {result3}")

71

```

72

73

### Process Decorator

74

75

Executes the decorated function in a separate process and returns a `ProcessFuture` object. Perfect for CPU-intensive tasks that can benefit from true parallelism by bypassing Python's Global Interpreter Lock (GIL).

76

77

```python { .api }

78

def process(

79

func: Callable = None,

80

*,

81

name: Optional[str] = None,

82

daemon: bool = True,

83

timeout: Optional[float] = None,

84

mp_context: Optional[multiprocessing.context.BaseContext] = None,

85

pool: Optional[ProcessPool] = None

86

) -> Callable[..., ProcessFuture]:

87

"""

88

Decorator for process-based concurrent execution.

89

90

Parameters:

91

- func: Function to decorate (when used without parameters)

92

- name: Process name for identification and debugging

93

- daemon: Whether process runs as daemon (doesn't prevent program exit)

94

- timeout: Maximum execution time in seconds (raises TimeoutError if exceeded)

95

- mp_context: Multiprocessing context for process creation (spawn, fork, forkserver)

96

- pool: Existing ProcessPool instance to use instead of creating new process

97

98

Returns:

99

Decorated function that returns ProcessFuture when called

100

"""

101

```

102

103

#### Usage Examples

104

105

```python

106

from pebble.concurrent import process

107

import multiprocessing

108

import time

109

110

# Simple usage for CPU-intensive task

111

@process

112

def cpu_intensive(n):

113

total = 0

114

for i in range(n):

115

total += i ** 2

116

return total

117

118

# Usage with timeout

119

@process(timeout=5.0)

120

def timed_task(duration):

121

time.sleep(duration)

122

return "Completed"

123

124

# Usage with custom multiprocessing context

125

ctx = multiprocessing.get_context('spawn')

126

127

@process(mp_context=ctx, name="spawned-worker")

128

def spawned_task(data):

129

return sum(data)

130

131

# Using with existing pool

132

from pebble import ProcessPool

133

134

pool = ProcessPool(max_workers=2)

135

136

@process(pool=pool)

137

def pooled_cpu_task(x, y):

138

return x * y + (x ** y)

139

140

# Calling decorated functions

141

future1 = cpu_intensive(1000000)

142

future2 = timed_task(2) # Will complete within timeout

143

future3 = spawned_task([1, 2, 3, 4, 5])

144

future4 = pooled_cpu_task(3, 4)

145

146

try:

147

# Get results

148

result1 = future1.result()

149

result2 = future2.result()

150

result3 = future3.result()

151

result4 = future4.result()

152

print(f"Results: {result1}, {result2}, {result3}, {result4}")

153

except TimeoutError:

154

print("Task exceeded timeout")

155

except Exception as e:

156

print(f"Task failed: {e}")

157

```

158

159

### Error Handling

160

161

Both decorators preserve exception tracebacks and provide comprehensive error information:

162

163

```python

164

from pebble.concurrent import process

165

from pebble import ProcessExpired

166

167

@process(timeout=1.0)

168

def failing_task():

169

raise ValueError("Something went wrong")

170

171

@process

172

def timeout_task():

173

import time

174

time.sleep(10) # Will timeout if timeout is set

175

return "Never reached"

176

177

# Handle various error conditions

178

future1 = failing_task()

179

future2 = timeout_task()

180

181

try:

182

result1 = future1.result()

183

except ValueError as e:

184

print(f"Function error: {e}")

185

186

try:

187

result2 = future2.result(timeout=2.0)

188

except TimeoutError:

189

print("Task timed out")

190

except ProcessExpired as e:

191

print(f"Process died unexpectedly: {e}")

192

```

193

194

### Function Requirements

195

196

Functions decorated with concurrent decorators must be:

197

198

1. **Serializable**: Pickleable for process decorator (not required for thread decorator)

199

2. **Pure or thread-safe**: Should not depend on global state that could cause race conditions

200

3. **Import-accessible**: For process decorator, function must be importable from the main module

201

202

```python

203

# Good: Pure function, easily serializable

204

@process

205

def calculate_fibonacci(n):

206

if n <= 1:

207

return n

208

return calculate_fibonacci(n-1) + calculate_fibonacci(n-2)

209

210

# Avoid: Depends on global state

211

global_counter = 0

212

213

@process # This could cause issues

214

def increment_global():

215

global global_counter

216

global_counter += 1

217

return global_counter

218

```