0
# Asynchronous Processing
1
2
High-performance parallel processing capabilities for large codebases using multiprocessing. Pylama provides asynchronous file checking to significantly improve performance when analyzing many files.
3
4
## Capabilities
5
6
### Parallel File Checking
7
8
Process multiple files concurrently using a process pool for maximum performance.
9
10
```python { .api }
11
def check_async(
12
paths: List[str],
13
code: str = None,
14
options: Namespace = None,
15
rootdir: Path = None
16
) -> List[Error]:
17
"""
18
Check files asynchronously using process pool.
19
20
Args:
21
paths: List of file paths to check
22
code: Source code string (if checking single file with custom code)
23
options: Configuration options containing concurrency settings
24
rootdir: Root directory for path resolution
25
26
Returns:
27
List[Error]: All errors found across all files
28
29
Performance considerations:
30
- Uses ProcessPoolExecutor for true parallelism
31
- Automatically determines optimal worker count based on CPU cores
32
- Distributes files across workers for load balancing
33
- Aggregates results from all workers
34
- Significantly faster than sequential checking for multiple files
35
"""
36
```
37
38
### Worker Function
39
40
Individual worker function that processes files in separate processes.
41
42
```python { .api }
43
def worker(params):
44
"""
45
Worker function for parallel file processing.
46
47
Args:
48
params: Tuple containing (path, code, options, rootdir)
49
50
Returns:
51
List[Error]: Errors found in the processed file
52
53
This function runs in a separate process and:
54
- Receives serialized parameters
55
- Imports pylama.core.run in the worker process
56
- Processes a single file
57
- Returns serialized results
58
"""
59
```
60
61
## Configuration
62
63
### CPU Detection
64
65
Automatic detection of available CPU cores for optimal performance.
66
67
```python { .api }
68
CPU_COUNT: int
69
"""
70
Number of available CPU cores for parallel processing.
71
72
Automatically detected using multiprocessing.cpu_count().
73
Falls back to 1 if multiprocessing is not available or fails.
74
Used to determine optimal worker pool size.
75
"""
76
```
77
78
### Enabling Async Processing
79
80
Async processing can be enabled through configuration:
81
82
- **Command line**: `--async` or `--concurrent` flags
83
- **Configuration file**: `async = 1` or `concurrent = 1`
84
- **Programmatic**: Set `options.concurrent = True`
85
86
## Usage Examples
87
88
### Basic Async Usage
89
90
```python
91
from typing import List
92
from pylama.main import check_paths
93
from pylama.config import parse_options
94
95
# Enable async processing via command line options
96
options = parse_options(['--async', 'src/', 'tests/'])
97
errors = check_paths(None, options) # Uses async processing
98
99
print(f"Found {len(errors)} issues across all files")
100
```
101
102
### Programmatic Async Control
103
104
```python
105
from typing import List
106
from pylama.check_async import check_async
107
from pylama.config import parse_options
108
from pathlib import Path
109
110
# Get list of Python files to check
111
files = [
112
'src/module1.py',
113
'src/module2.py',
114
'src/package/__init__.py',
115
'src/package/core.py',
116
'tests/test_module1.py',
117
'tests/test_module2.py'
118
]
119
120
# Configure options
121
options = parse_options(['--linters=pycodestyle,pyflakes'])
122
123
# Run async checking
124
errors = check_async(
125
paths=files,
126
options=options,
127
rootdir=Path.cwd()
128
)
129
130
# Process results
131
for error in errors:
132
print(f"{error.filename}:{error.lnum} - {error.message}")
133
```
134
135
### Performance Comparison
136
137
```python
138
import time
139
from typing import List
140
from pylama.main import check_paths
141
from pylama.config import parse_options
142
143
# Large list of files
144
files = ['src/' + f'module{i}.py' for i in range(100)]
145
146
# Sequential processing
147
start_time = time.time()
148
options_seq = parse_options(['--linters=pycodestyle,pyflakes'])
149
options_seq.concurrent = False
150
errors_seq = check_paths(files, options_seq)
151
seq_time = time.time() - start_time
152
153
# Async processing
154
start_time = time.time()
155
options_async = parse_options(['--async', '--linters=pycodestyle,pyflakes'])
156
errors_async = check_paths(files, options_async)
157
async_time = time.time() - start_time
158
159
print(f"Sequential: {seq_time:.2f}s, Async: {async_time:.2f}s")
160
print(f"Speedup: {seq_time/async_time:.1f}x")
161
```
162
163
### Custom Worker Pool Size
164
165
```python
166
import multiprocessing
167
from typing import List
168
from concurrent.futures import ProcessPoolExecutor
169
from pylama.check_async import worker
170
171
def custom_async_check(files, options, max_workers=None):
172
"""Custom async checking with configurable worker count."""
173
174
if max_workers is None:
175
max_workers = multiprocessing.cpu_count()
176
177
# Prepare parameters for workers
178
params_list = [
179
(file_path, None, options, Path.cwd())
180
for file_path in files
181
]
182
183
# Run with custom worker pool
184
with ProcessPoolExecutor(max_workers=max_workers) as executor:
185
results = list(executor.map(worker, params_list))
186
187
# Flatten results
188
all_errors = []
189
for error_list in results:
190
all_errors.extend(error_list)
191
192
return all_errors
193
```
194
195
### Error Handling in Async Mode
196
197
```python
198
from typing import List
199
from pylama.check_async import check_async
200
from pylama.config import parse_options
201
202
try:
203
files = ['src/valid.py', 'src/invalid_syntax.py']
204
options = parse_options(['--linters=pyflakes'])
205
206
errors = check_async(files, options=options)
207
208
# Separate syntax errors from style issues
209
syntax_errors = [e for e in errors if 'SyntaxError' in e.message]
210
style_errors = [e for e in errors if 'SyntaxError' not in e.message]
211
212
print(f"Syntax errors: {len(syntax_errors)}")
213
print(f"Style issues: {len(style_errors)}")
214
215
except Exception as e:
216
print(f"Async processing failed: {e}")
217
# Fallback to sequential processing
218
from pylama.main import check_paths
219
options.concurrent = False
220
errors = check_paths(files, options)
221
```
222
223
### Monitoring Progress
224
225
```python
226
import time
227
from typing import List
228
from concurrent.futures import ProcessPoolExecutor, as_completed
229
from pylama.check_async import worker
230
231
def check_with_progress(files, options):
232
"""Async checking with progress monitoring."""
233
234
params_list = [
235
(file_path, None, options, Path.cwd())
236
for file_path in files
237
]
238
239
all_errors = []
240
241
with ProcessPoolExecutor() as executor:
242
# Submit all tasks
243
future_to_file = {
244
executor.submit(worker, params): params[0]
245
for params in params_list
246
}
247
248
# Process completed tasks
249
completed = 0
250
for future in as_completed(future_to_file):
251
file_path = future_to_file[future]
252
try:
253
errors = future.result()
254
all_errors.extend(errors)
255
completed += 1
256
print(f"Processed {completed}/{len(files)}: {file_path}")
257
except Exception as e:
258
print(f"Error processing {file_path}: {e}")
259
260
return all_errors
261
```
262
263
### Integration with Configuration
264
265
```python
266
from typing import List
267
from pylama.config import parse_options
268
from pylama.main import check_paths
269
270
# Configuration file with async settings
271
config_content = """
272
[pylama]
273
async = 1
274
linters = pycodestyle,pyflakes,mccabe
275
paths = src/,tests/
276
"""
277
278
# Write config file
279
with open('pylama.ini', 'w') as f:
280
f.write(config_content)
281
282
# Load configuration (async will be enabled automatically)
283
options = parse_options([])
284
print(f"Async enabled: {options.concurrent}")
285
286
# Check files (will use async processing)
287
errors = check_paths(None, options)
288
```
289
290
### Memory Considerations
291
292
```python
293
import os
294
import psutil
295
from typing import List
296
from pylama.check_async import check_async
297
298
def check_with_memory_monitoring(files, options):
299
"""Monitor memory usage during async processing."""
300
301
process = psutil.Process(os.getpid())
302
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
303
304
print(f"Initial memory usage: {initial_memory:.1f} MB")
305
306
# Run async checking
307
errors = check_async(files, options=options)
308
309
final_memory = process.memory_info().rss / 1024 / 1024 # MB
310
print(f"Final memory usage: {final_memory:.1f} MB")
311
print(f"Memory increase: {final_memory - initial_memory:.1f} MB")
312
313
return errors
314
```
315
316
## Performance Guidelines
317
318
### When to Use Async Processing
319
320
**Recommended for:**
321
- Projects with 50+ Python files
322
- CI/CD pipelines with time constraints
323
- Large codebases (>10,000 lines of code)
324
- Multiple linters enabled simultaneously
325
326
**Not recommended for:**
327
- Single file checking
328
- Very small projects (<10 files)
329
- Memory-constrained environments
330
- Systems with limited CPU cores
331
332
### Optimization Tips
333
334
```python
335
# Optimal configuration for async processing
336
recommended_options = [
337
'--async', # Enable async processing
338
'--linters=pycodestyle,pyflakes', # Use fast linters
339
'--ignore=E501', # Ignore non-critical issues
340
'--skip=migrations/*,build/*' # Skip non-essential directories
341
]
342
343
options = parse_options(recommended_options)
344
```
345
346
### Troubleshooting Async Issues
347
348
```python
349
from typing import List
350
from pylama.check_async import CPU_COUNT
351
import multiprocessing
352
353
print(f"Detected CPU cores: {CPU_COUNT}")
354
print(f"Multiprocessing available: {multiprocessing.cpu_count()}")
355
356
# Test worker function
357
from pylama.check_async import worker
358
from pylama.config import parse_options
359
360
test_params = ('test_file.py', 'print("test")', parse_options([]), Path.cwd())
361
try:
362
result = worker(test_params)
363
print(f"Worker test successful: {len(result)} errors")
364
except Exception as e:
365
print(f"Worker test failed: {e}")
366
```