0
# Monitoring and Control
1
2
Functions for monitoring task execution, waiting for completion, and controlling the execution lifecycle. These provide essential synchronization and shutdown capabilities.
3
4
## Capabilities
5
6
### Task Synchronization
7
8
Wait for all background tasks to complete before continuing program execution.
9
10
```python { .api }
11
def wait_for_tasks(sleep: float = 0) -> bool:
12
"""
13
Block until all background tasks complete execution.
14
15
This is the primary synchronization mechanism. It prevents new
16
tasks from being created and waits for existing ones to finish.
17
Essential for ensuring all work is done before program exit.
18
19
Args:
20
sleep: Seconds to sleep between checks. 0 means busy-wait.
21
Higher values reduce CPU usage but increase latency.
22
23
Returns:
24
Always returns True when all tasks are complete
25
26
Note:
27
Sets KILL_RECEIVED=True during execution to prevent new tasks,
28
then resets it to False when done.
29
"""
30
```
31
32
**Usage Example:**
33
34
```python
35
import multitasking
36
import time
37
38
@multitasking.task
39
def background_work(task_id, duration):
40
print(f"Task {task_id} starting...")
41
time.sleep(duration)
42
print(f"Task {task_id} completed!")
43
44
# Start multiple tasks
45
for i in range(5):
46
background_work(i, i + 1)
47
48
print("All tasks started, waiting for completion...")
49
50
# Wait for all tasks (with CPU-friendly polling)
51
multitasking.wait_for_tasks(sleep=0.1)
52
53
print("All tasks completed, continuing program...")
54
```
55
56
### Active Task Monitoring
57
58
Monitor the number and status of currently running tasks.
59
60
```python { .api }
61
def get_active_tasks() -> List[Union[Thread, Process]]:
62
"""
63
Retrieve only the currently running tasks.
64
65
Filters the complete task list to show only tasks that are still
66
executing. This is more useful than get_list_of_tasks() for
67
monitoring current system load.
68
69
Returns:
70
List of Thread/Process objects that are still running
71
"""
72
```
73
74
**Progress Monitoring Example:**
75
76
```python
77
import multitasking
78
import time
79
80
@multitasking.task
81
def data_processing(batch_id, size):
82
print(f"Processing batch {batch_id} ({size} items)...")
83
time.sleep(size * 0.5) # Simulate variable processing time
84
print(f"Batch {batch_id} complete")
85
86
# Start tasks with different processing times
87
for i in range(10):
88
data_processing(i, i + 1)
89
90
# Monitor progress with detailed reporting
91
start_time = time.time()
92
while multitasking.get_active_tasks():
93
active = multitasking.get_active_tasks()
94
total = multitasking.get_list_of_tasks()
95
completed = len(total) - len(active)
96
97
progress = (completed / len(total)) * 100
98
elapsed = time.time() - start_time
99
100
print(f"Progress: {completed}/{len(total)} ({progress:.1f}%) - "
101
f"Active: {len(active)} - Elapsed: {elapsed:.1f}s")
102
103
time.sleep(1)
104
105
print("All processing complete!")
106
```
107
108
### Emergency Shutdown
109
110
Immediately terminate the program, typically used for emergency situations.
111
112
```python { .api }
113
def killall(self: Any = None, cls: Any = None) -> None:
114
"""
115
Emergency shutdown function that terminates the entire program.
116
117
This is a last-resort function that immediately exits the program,
118
potentially leaving tasks in an inconsistent state. It tries
119
sys.exit() first, then os._exit() as a final measure.
120
121
Args:
122
self: Unused parameter kept for backward compatibility
123
cls: Unused parameter kept for backward compatibility
124
125
Warning:
126
This function does NOT wait for tasks to complete cleanly.
127
Use wait_for_tasks() for graceful shutdown instead.
128
"""
129
```
130
131
**Signal Handling Example:**
132
133
```python
134
import multitasking
135
import signal
136
import time
137
138
# Option 1: Emergency shutdown (immediate termination)
139
signal.signal(signal.SIGINT, multitasking.killall)
140
141
# Option 2: Graceful shutdown (recommended)
142
def graceful_shutdown(signum, frame):
143
print("\\nShutting down gracefully...")
144
multitasking.wait_for_tasks()
145
print("All tasks completed. Exiting.")
146
exit(0)
147
148
signal.signal(signal.SIGINT, graceful_shutdown)
149
150
@multitasking.task
151
def long_running_task(task_id):
152
for i in range(10):
153
print(f"Task {task_id}: step {i}/10")
154
time.sleep(1)
155
156
# Start some long-running tasks
157
for i in range(3):
158
long_running_task(i)
159
160
print("Tasks started. Press Ctrl+C to test shutdown...")
161
multitasking.wait_for_tasks()
162
```
163
164
## Advanced Monitoring Patterns
165
166
### Task Lifecycle Tracking
167
168
Monitor task creation, execution, and completion:
169
170
```python
171
import multitasking
172
import time
173
from threading import Lock
174
175
# Thread-safe counters
176
stats_lock = Lock()
177
task_stats = {"created": 0, "completed": 0, "failed": 0}
178
179
def update_stats(key):
180
with stats_lock:
181
task_stats[key] += 1
182
183
@multitasking.task
184
def monitored_task(task_id):
185
update_stats("created")
186
try:
187
# Simulate work
188
time.sleep(1)
189
print(f"Task {task_id} succeeded")
190
update_stats("completed")
191
except Exception as e:
192
print(f"Task {task_id} failed: {e}")
193
update_stats("failed")
194
195
# Start tasks and monitor
196
for i in range(20):
197
monitored_task(i)
198
199
# Real-time monitoring
200
while multitasking.get_active_tasks():
201
with stats_lock:
202
print(f"Created: {task_stats['created']}, "
203
f"Completed: {task_stats['completed']}, "
204
f"Failed: {task_stats['failed']}, "
205
f"Active: {len(multitasking.get_active_tasks())}")
206
time.sleep(0.5)
207
208
print(f"Final stats: {task_stats}")
209
```
210
211
### Resource Usage Monitoring
212
213
Track system resource usage during task execution:
214
215
```python
216
import multitasking
217
import psutil
218
import time
219
220
def monitor_resources():
221
"""Monitor CPU and memory usage during task execution."""
222
print(f"CPU: {psutil.cpu_percent():.1f}%, "
223
f"Memory: {psutil.virtual_memory().percent:.1f}%, "
224
f"Active tasks: {len(multitasking.get_active_tasks())}")
225
226
@multitasking.task
227
def cpu_intensive_task(task_id):
228
# Simulate CPU-intensive work
229
total = 0
230
for i in range(1000000):
231
total += i * i
232
print(f"Task {task_id} result: {total}")
233
234
# Start monitoring
235
@multitasking.task
236
def resource_monitor():
237
while multitasking.get_active_tasks():
238
monitor_resources()
239
time.sleep(2)
240
241
# Start the monitor
242
resource_monitor()
243
244
# Start CPU-intensive tasks
245
for i in range(5):
246
cpu_intensive_task(i)
247
248
multitasking.wait_for_tasks()
249
print("All tasks and monitoring complete!")
250
```