or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdindex.mdmonitoring-control.mdpool-management.mdtask-management.md

monitoring-control.mddocs/

0

# Monitoring and Control

1

2

Functions for monitoring task execution, waiting for completion, and controlling the execution lifecycle. These provide essential synchronization and shutdown capabilities.

3

4

## Capabilities

5

6

### Task Synchronization

7

8

Wait for all background tasks to complete before continuing program execution.

9

10

```python { .api }

11

def wait_for_tasks(sleep: float = 0) -> bool:

12

"""

13

Block until all background tasks complete execution.

14

15

This is the primary synchronization mechanism. It prevents new

16

tasks from being created and waits for existing ones to finish.

17

Essential for ensuring all work is done before program exit.

18

19

Args:

20

sleep: Seconds to sleep between checks. 0 means busy-wait.

21

Higher values reduce CPU usage but increase latency.

22

23

Returns:

24

Always returns True when all tasks are complete

25

26

Note:

27

Sets KILL_RECEIVED=True during execution to prevent new tasks,

28

then resets it to False when done.

29

"""

30

```

31

32

**Usage Example:**

33

34

```python

35

import multitasking

36

import time

37

38

@multitasking.task

39

def background_work(task_id, duration):

40

print(f"Task {task_id} starting...")

41

time.sleep(duration)

42

print(f"Task {task_id} completed!")

43

44

# Start multiple tasks

45

for i in range(5):

46

background_work(i, i + 1)

47

48

print("All tasks started, waiting for completion...")

49

50

# Wait for all tasks (with CPU-friendly polling)

51

multitasking.wait_for_tasks(sleep=0.1)

52

53

print("All tasks completed, continuing program...")

54

```

55

56

### Active Task Monitoring

57

58

Monitor the number and status of currently running tasks.

59

60

```python { .api }

61

def get_active_tasks() -> List[Union[Thread, Process]]:

62

"""

63

Retrieve only the currently running tasks.

64

65

Filters the complete task list to show only tasks that are still

66

executing. This is more useful than get_list_of_tasks() for

67

monitoring current system load.

68

69

Returns:

70

List of Thread/Process objects that are still running

71

"""

72

```

73

74

**Progress Monitoring Example:**

75

76

```python

77

import multitasking

78

import time

79

80

@multitasking.task

81

def data_processing(batch_id, size):

82

print(f"Processing batch {batch_id} ({size} items)...")

83

time.sleep(size * 0.5) # Simulate variable processing time

84

print(f"Batch {batch_id} complete")

85

86

# Start tasks with different processing times

87

for i in range(10):

88

data_processing(i, i + 1)

89

90

# Monitor progress with detailed reporting

91

start_time = time.time()

92

while multitasking.get_active_tasks():

93

active = multitasking.get_active_tasks()

94

total = multitasking.get_list_of_tasks()

95

completed = len(total) - len(active)

96

97

progress = (completed / len(total)) * 100

98

elapsed = time.time() - start_time

99

100

print(f"Progress: {completed}/{len(total)} ({progress:.1f}%) - "

101

f"Active: {len(active)} - Elapsed: {elapsed:.1f}s")

102

103

time.sleep(1)

104

105

print("All processing complete!")

106

```

107

108

### Emergency Shutdown

109

110

Immediately terminate the program, typically used for emergency situations.

111

112

```python { .api }

113

def killall(self: Any = None, cls: Any = None) -> None:

114

"""

115

Emergency shutdown function that terminates the entire program.

116

117

This is a last-resort function that immediately exits the program,

118

potentially leaving tasks in an inconsistent state. It tries

119

sys.exit() first, then os._exit() as a final measure.

120

121

Args:

122

self: Unused parameter kept for backward compatibility

123

cls: Unused parameter kept for backward compatibility

124

125

Warning:

126

This function does NOT wait for tasks to complete cleanly.

127

Use wait_for_tasks() for graceful shutdown instead.

128

"""

129

```

130

131

**Signal Handling Example:**

132

133

```python

134

import multitasking

135

import signal

136

import time

137

138

# Option 1: Emergency shutdown (immediate termination)

139

signal.signal(signal.SIGINT, multitasking.killall)

140

141

# Option 2: Graceful shutdown (recommended)

142

def graceful_shutdown(signum, frame):

143

print("\\nShutting down gracefully...")

144

multitasking.wait_for_tasks()

145

print("All tasks completed. Exiting.")

146

exit(0)

147

148

signal.signal(signal.SIGINT, graceful_shutdown)

149

150

@multitasking.task

151

def long_running_task(task_id):

152

for i in range(10):

153

print(f"Task {task_id}: step {i}/10")

154

time.sleep(1)

155

156

# Start some long-running tasks

157

for i in range(3):

158

long_running_task(i)

159

160

print("Tasks started. Press Ctrl+C to test shutdown...")

161

multitasking.wait_for_tasks()

162

```

163

164

## Advanced Monitoring Patterns

165

166

### Task Lifecycle Tracking

167

168

Monitor task creation, execution, and completion:

169

170

```python

171

import multitasking

172

import time

173

from threading import Lock

174

175

# Thread-safe counters

176

stats_lock = Lock()

177

task_stats = {"created": 0, "completed": 0, "failed": 0}

178

179

def update_stats(key):

180

with stats_lock:

181

task_stats[key] += 1

182

183

@multitasking.task

184

def monitored_task(task_id):

185

update_stats("created")

186

try:

187

# Simulate work

188

time.sleep(1)

189

print(f"Task {task_id} succeeded")

190

update_stats("completed")

191

except Exception as e:

192

print(f"Task {task_id} failed: {e}")

193

update_stats("failed")

194

195

# Start tasks and monitor

196

for i in range(20):

197

monitored_task(i)

198

199

# Real-time monitoring

200

while multitasking.get_active_tasks():

201

with stats_lock:

202

print(f"Created: {task_stats['created']}, "

203

f"Completed: {task_stats['completed']}, "

204

f"Failed: {task_stats['failed']}, "

205

f"Active: {len(multitasking.get_active_tasks())}")

206

time.sleep(0.5)

207

208

print(f"Final stats: {task_stats}")

209

```

210

211

### Resource Usage Monitoring

212

213

Track system resource usage during task execution:

214

215

```python

216

import multitasking

217

import psutil

218

import time

219

220

def monitor_resources():

221

"""Monitor CPU and memory usage during task execution."""

222

print(f"CPU: {psutil.cpu_percent():.1f}%, "

223

f"Memory: {psutil.virtual_memory().percent:.1f}%, "

224

f"Active tasks: {len(multitasking.get_active_tasks())}")

225

226

@multitasking.task

227

def cpu_intensive_task(task_id):

228

# Simulate CPU-intensive work

229

total = 0

230

for i in range(1000000):

231

total += i * i

232

print(f"Task {task_id} result: {total}")

233

234

# Start monitoring

235

@multitasking.task

236

def resource_monitor():

237

while multitasking.get_active_tasks():

238

monitor_resources()

239

time.sleep(2)

240

241

# Start the monitor

242

resource_monitor()

243

244

# Start CPU-intensive tasks

245

for i in range(5):

246

cpu_intensive_task(i)

247

248

multitasking.wait_for_tasks()

249

print("All tasks and monitoring complete!")

250

```