0
# Utility Classes
1
2
Modal provides essential utility classes for error handling and file pattern matching. These utilities support robust error management and flexible file filtering capabilities across Modal applications.
3
4
## Capabilities
5
6
### Package Version
7
8
Package version string for the Modal client library.
9
10
```python { .api }
11
__version__: str # Current package version (e.g., "1.1.4")
12
```
13
14
### Error - Base Exception Class
15
16
Base exception class for all Modal-specific errors, providing a hierarchy of specialized error types for different failure scenarios.
17
18
```python { .api }
19
class Error(Exception):
20
"""Base class for all Modal errors"""
21
22
# Specialized error types
23
class AlreadyExistsError(Error):
24
"""Raised when a resource creation conflicts with an existing resource"""
25
26
class RemoteError(Error):
27
"""Raised when an error occurs on the Modal server"""
28
29
class TimeoutError(Error):
30
"""Base class for Modal timeouts"""
31
32
class FunctionTimeoutError(TimeoutError):
33
"""Raised when a Function exceeds its execution duration limit"""
34
35
class SandboxTimeoutError(TimeoutError):
36
"""Raised when a Sandbox exceeds its execution duration limit"""
37
38
class VolumeUploadTimeoutError(TimeoutError):
39
"""Raised when a Volume upload times out"""
40
41
class MountUploadTimeoutError(TimeoutError):
42
"""Raised when a Mount upload times out"""
43
44
class InteractiveTimeoutError(TimeoutError):
45
"""Raised when interactive frontends time out while connecting"""
46
47
class OutputExpiredError(TimeoutError):
48
"""Raised when the Output exceeds expiration"""
49
50
class AuthError(Error):
51
"""Raised when a client has missing or invalid authentication"""
52
53
class ConnectionError(Error):
54
"""Raised when an issue occurs while connecting to Modal servers"""
55
56
class InvalidError(Error):
57
"""Raised when user does something invalid"""
58
59
class VersionError(Error):
60
"""Raised when the current client version is unsupported"""
61
62
class NotFoundError(Error):
63
"""Raised when a requested resource was not found"""
64
65
class ExecutionError(Error):
66
"""Raised when something unexpected happened during runtime"""
67
68
class DeserializationError(Error):
69
"""Raised when an error is encountered during deserialization"""
70
71
class SerializationError(Error):
72
"""Raised when an error is encountered during serialization"""
73
```
74
75
#### Usage Examples
76
77
```python
78
import modal
79
80
app = modal.App("error-handling")
81
82
@app.function()
83
def error_prone_function(data: dict):
84
"""Function demonstrating Modal error handling"""
85
86
try:
87
# Operation that might fail with different error types
88
result = risky_operation(data)
89
return {"success": True, "result": result}
90
91
except modal.FunctionTimeoutError as e:
92
print(f"Function timed out: {e}")
93
return {"error": "timeout", "message": str(e)}
94
95
except modal.DeserializationError as e:
96
print(f"Data deserialization failed: {e}")
97
return {"error": "invalid_data", "message": str(e)}
98
99
except modal.RemoteError as e:
100
print(f"Server error occurred: {e}")
101
return {"error": "server_error", "message": str(e)}
102
103
except modal.Error as e:
104
# Catch any Modal-specific error
105
print(f"Modal error: {e}")
106
return {"error": "modal_error", "type": type(e).__name__, "message": str(e)}
107
108
except Exception as e:
109
# Non-Modal errors
110
print(f"Unexpected error: {e}")
111
return {"error": "unexpected", "message": str(e)}
112
113
@app.function()
114
def resource_creation_function(resource_name: str):
115
"""Function that handles resource creation errors"""
116
117
try:
118
# Attempt to create a new resource
119
volume = modal.Volume.persist(resource_name)
120
return {"success": True, "resource": resource_name}
121
122
except modal.AlreadyExistsError:
123
# Resource already exists - use existing one
124
print(f"Volume {resource_name} already exists, using existing volume")
125
volume = modal.Volume.from_name(resource_name)
126
return {"success": True, "resource": resource_name, "status": "existing"}
127
128
except modal.AuthError as e:
129
print(f"Authentication failed: {e}")
130
return {"error": "auth_failed", "message": str(e)}
131
132
except modal.InvalidError as e:
133
print(f"Invalid resource name: {e}")
134
return {"error": "invalid_name", "message": str(e)}
135
136
@app.function()
137
def upload_with_timeout_handling(local_path: str, remote_path: str):
138
"""Function that handles upload timeout errors"""
139
140
volume = modal.Volume.from_name("upload-volume")
141
142
try:
143
volume.put_file(local_path, remote_path)
144
return {"success": True, "uploaded": remote_path}
145
146
except modal.VolumeUploadTimeoutError as e:
147
print(f"Volume upload timed out: {e}")
148
# Implement retry logic or alternative approach
149
return {"error": "upload_timeout", "retry_recommended": True}
150
151
except modal.NotFoundError as e:
152
print(f"File not found: {e}")
153
return {"error": "file_not_found", "path": local_path}
154
155
# Custom error handling with context
156
def handle_modal_errors(func):
157
"""Decorator for comprehensive Modal error handling"""
158
def wrapper(*args, **kwargs):
159
try:
160
return func(*args, **kwargs)
161
except modal.TimeoutError as e:
162
return {"error": "timeout", "type": type(e).__name__, "message": str(e)}
163
except modal.AuthError as e:
164
return {"error": "authentication", "message": str(e)}
165
except modal.ConnectionError as e:
166
return {"error": "connection", "message": str(e)}
167
except modal.Error as e:
168
return {"error": "modal", "type": type(e).__name__, "message": str(e)}
169
except Exception as e:
170
return {"error": "unknown", "message": str(e)}
171
return wrapper
172
173
@app.function()
174
@handle_modal_errors
175
def decorated_function(data: str):
176
"""Function with decorator-based error handling"""
177
return process_data_safely(data)
178
179
@app.local_entrypoint()
180
def main():
181
# Test error handling
182
result1 = error_prone_function.remote({"test": "data"})
183
print("Error handling result:", result1)
184
185
# Test resource creation
186
result2 = resource_creation_function.remote("test-volume")
187
print("Resource creation result:", result2)
188
189
# Test upload timeout handling
190
result3 = upload_with_timeout_handling.remote("local_file.txt", "/remote/path.txt")
191
print("Upload result:", result3)
192
```
193
194
### FilePatternMatcher - File Pattern Matching
195
196
Pattern matching utility for filtering files based on glob patterns, similar to Docker's pattern matching with support for exclusion and complex patterns.
197
198
```python { .api }
199
class FilePatternMatcher:
200
def __init__(self, *patterns: str) -> None:
201
"""Create a file pattern matcher from glob patterns"""
202
203
def __call__(self, path: Path) -> bool:
204
"""Check if a path matches any of the patterns"""
205
206
def __invert__(self) -> "FilePatternMatcher":
207
"""Invert the matcher to exclude matching patterns"""
208
209
def can_prune_directories(self) -> bool:
210
"""Check if directory pruning is safe for optimization"""
211
```
212
213
#### Usage Examples
214
215
```python
216
import modal
217
from pathlib import Path
218
219
app = modal.App("pattern-matching")
220
221
# Basic pattern matching
222
python_matcher = modal.FilePatternMatcher("*.py")
223
assert python_matcher(Path("script.py"))
224
assert not python_matcher(Path("README.md"))
225
226
# Multiple patterns
227
code_matcher = modal.FilePatternMatcher("*.py", "*.js", "*.ts")
228
assert code_matcher(Path("app.js"))
229
assert code_matcher(Path("component.ts"))
230
assert not code_matcher(Path("config.json"))
231
232
# Complex patterns with wildcards
233
deep_matcher = modal.FilePatternMatcher("**/*.py", "src/**/*.js")
234
assert deep_matcher(Path("deep/nested/script.py"))
235
assert deep_matcher(Path("src/components/app.js"))
236
237
# Inverted patterns (exclusion)
238
non_python_matcher = ~modal.FilePatternMatcher("*.py")
239
assert not non_python_matcher(Path("script.py"))
240
assert non_python_matcher(Path("README.md"))
241
242
@app.function()
243
def filter_files_function(file_paths: list[str]) -> dict:
244
"""Function that filters files using pattern matching"""
245
246
# Create different matchers for different file types
247
source_files = modal.FilePatternMatcher("*.py", "*.js", "*.ts", "*.java")
248
config_files = modal.FilePatternMatcher("*.json", "*.yaml", "*.yml", "*.toml")
249
doc_files = modal.FilePatternMatcher("*.md", "*.txt", "*.rst")
250
251
# Exclude certain patterns
252
exclude_matcher = modal.FilePatternMatcher(
253
"**/__pycache__/**",
254
"**/node_modules/**",
255
"**/.git/**",
256
"*.pyc",
257
"*.tmp"
258
)
259
260
results = {
261
"source_files": [],
262
"config_files": [],
263
"doc_files": [],
264
"other_files": [],
265
"excluded_files": []
266
}
267
268
for file_path_str in file_paths:
269
file_path = Path(file_path_str)
270
271
# Check if file should be excluded
272
if exclude_matcher(file_path):
273
results["excluded_files"].append(file_path_str)
274
continue
275
276
# Categorize files by type
277
if source_files(file_path):
278
results["source_files"].append(file_path_str)
279
elif config_files(file_path):
280
results["config_files"].append(file_path_str)
281
elif doc_files(file_path):
282
results["doc_files"].append(file_path_str)
283
else:
284
results["other_files"].append(file_path_str)
285
286
return results
287
288
@app.function()
289
def advanced_pattern_matching(directory_paths: list[str]) -> dict:
290
"""Advanced pattern matching with multiple criteria"""
291
292
# Different matchers for different purposes
293
test_files = modal.FilePatternMatcher("**/test_*.py", "**/*_test.py", "**/tests/**/*.py")
294
build_artifacts = modal.FilePatternMatcher(
295
"**/build/**",
296
"**/dist/**",
297
"**/*.egg-info/**"
298
)
299
hidden_files = modal.FilePatternMatcher(".*", "**/.*/**")
300
301
# Inverted matchers for inclusion patterns
302
include_source = ~modal.FilePatternMatcher(
303
"**/__pycache__/**",
304
"**/.*/**",
305
"**/node_modules/**"
306
)
307
308
analysis = {
309
"test_files": [],
310
"build_artifacts": [],
311
"hidden_files": [],
312
"source_files": [],
313
"total_processed": 0
314
}
315
316
for dir_path_str in directory_paths:
317
dir_path = Path(dir_path_str)
318
319
# Walk through directory structure
320
if dir_path.exists() and dir_path.is_dir():
321
for file_path in dir_path.rglob("*"):
322
if file_path.is_file():
323
analysis["total_processed"] += 1
324
relative_path = file_path.relative_to(dir_path)
325
326
# Categorize files
327
if test_files(relative_path):
328
analysis["test_files"].append(str(relative_path))
329
elif build_artifacts(relative_path):
330
analysis["build_artifacts"].append(str(relative_path))
331
elif hidden_files(relative_path):
332
analysis["hidden_files"].append(str(relative_path))
333
elif include_source(relative_path):
334
analysis["source_files"].append(str(relative_path))
335
336
return analysis
337
338
@app.function()
339
def custom_file_processing():
340
"""Demonstrate custom pattern matching logic"""
341
342
# Create matcher for Python files, excluding tests
343
python_no_tests = modal.FilePatternMatcher("*.py") & ~modal.FilePatternMatcher("test_*.py", "*_test.py")
344
345
# Process files with custom logic
346
files_to_process = [
347
"app.py",
348
"test_app.py",
349
"utils.py",
350
"config_test.py",
351
"main.py"
352
]
353
354
results = []
355
for filename in files_to_process:
356
file_path = Path(filename)
357
358
if python_no_tests(file_path):
359
# Custom processing for source files
360
result = {
361
"file": filename,
362
"type": "source",
363
"processed": True,
364
"action": "analyze_code"
365
}
366
else:
367
result = {
368
"file": filename,
369
"type": "test" if "test" in filename else "other",
370
"processed": False,
371
"action": "skip"
372
}
373
374
results.append(result)
375
376
return results
377
378
# Utility function for complex pattern combinations
379
def create_project_filter():
380
"""Create a comprehensive project file filter"""
381
382
# Include source files
383
include_patterns = modal.FilePatternMatcher(
384
"**/*.py",
385
"**/*.js",
386
"**/*.ts",
387
"**/*.json",
388
"**/*.yaml",
389
"**/*.yml",
390
"**/*.md"
391
)
392
393
# Exclude build artifacts and temporary files
394
exclude_patterns = modal.FilePatternMatcher(
395
"**/__pycache__/**",
396
"**/node_modules/**",
397
"**/.git/**",
398
"**/build/**",
399
"**/dist/**",
400
"**/*.pyc",
401
"**/*.pyo",
402
"**/*.egg-info/**",
403
"**/coverage/**",
404
"**/.pytest_cache/**"
405
)
406
407
# Combine patterns: include source files but exclude artifacts
408
return lambda path: include_patterns(path) and not exclude_patterns(path)
409
410
@app.local_entrypoint()
411
def main():
412
# Test file filtering
413
sample_files = [
414
"src/app.py",
415
"src/utils.py",
416
"tests/test_app.py",
417
"config.json",
418
"README.md",
419
"build/output.js",
420
"__pycache__/app.cpython-39.pyc",
421
"node_modules/package/index.js"
422
]
423
424
filter_result = filter_files_function.remote(sample_files)
425
print("File filtering result:", filter_result)
426
427
# Test advanced pattern matching
428
directories = ["./src", "./tests", "./config"]
429
advanced_result = advanced_pattern_matching.remote(directories)
430
print("Advanced pattern matching:", advanced_result)
431
432
# Test custom processing
433
custom_result = custom_file_processing.remote()
434
print("Custom processing result:", custom_result)
435
```
436
437
## Advanced Utility Patterns
438
439
### Comprehensive Error Recovery System
440
441
```python
442
import modal
443
import time
444
import random
445
446
app = modal.App("error-recovery")
447
448
class RetryableError(modal.Error):
449
"""Custom error type for retryable operations"""
450
pass
451
452
def with_error_recovery(max_retries: int = 3, base_delay: float = 1.0):
453
"""Decorator for automatic error recovery"""
454
def decorator(func):
455
def wrapper(*args, **kwargs):
456
last_error = None
457
458
for attempt in range(max_retries + 1):
459
try:
460
return func(*args, **kwargs)
461
462
except modal.TimeoutError as e:
463
last_error = e
464
if attempt < max_retries:
465
delay = base_delay * (2 ** attempt) # Exponential backoff
466
print(f"Timeout on attempt {attempt + 1}, retrying in {delay}s")
467
time.sleep(delay)
468
else:
469
print(f"All {max_retries + 1} attempts failed due to timeout")
470
471
except modal.ConnectionError as e:
472
last_error = e
473
if attempt < max_retries:
474
delay = base_delay * (1.5 ** attempt)
475
print(f"Connection error on attempt {attempt + 1}, retrying in {delay}s")
476
time.sleep(delay)
477
else:
478
print(f"All {max_retries + 1} attempts failed due to connection issues")
479
480
except RetryableError as e:
481
last_error = e
482
if attempt < max_retries:
483
delay = base_delay + random.uniform(0, 1) # Jittered delay
484
print(f"Retryable error on attempt {attempt + 1}, retrying in {delay:.2f}s")
485
time.sleep(delay)
486
else:
487
print(f"All {max_retries + 1} attempts failed due to retryable errors")
488
489
except modal.Error as e:
490
# Non-retryable Modal errors
491
print(f"Non-retryable Modal error: {type(e).__name__}: {e}")
492
raise
493
494
except Exception as e:
495
# Unknown errors
496
print(f"Unknown error: {type(e).__name__}: {e}")
497
raise
498
499
# All retries exhausted
500
raise last_error
501
502
return wrapper
503
return decorator
504
505
@app.function()
506
@with_error_recovery(max_retries=5, base_delay=2.0)
507
def resilient_operation(data: dict):
508
"""Operation with comprehensive error recovery"""
509
510
# Simulate various failure modes
511
failure_mode = data.get("failure_mode", "none")
512
513
if failure_mode == "timeout":
514
raise modal.FunctionTimeoutError("Simulated timeout")
515
elif failure_mode == "connection":
516
raise modal.ConnectionError("Simulated connection error")
517
elif failure_mode == "retryable":
518
raise RetryableError("Simulated retryable error")
519
elif failure_mode == "auth":
520
raise modal.AuthError("Simulated auth error") # Non-retryable
521
522
# Success case
523
return {"status": "success", "data": data}
524
525
@app.local_entrypoint()
526
def main():
527
# Test different error scenarios
528
test_cases = [
529
{"data": {"value": 1}, "expected": "success"},
530
{"data": {"value": 2, "failure_mode": "timeout"}, "expected": "retry_success"},
531
{"data": {"value": 3, "failure_mode": "connection"}, "expected": "retry_success"},
532
{"data": {"value": 4, "failure_mode": "retryable"}, "expected": "retry_success"},
533
{"data": {"value": 5, "failure_mode": "auth"}, "expected": "immediate_failure"},
534
]
535
536
for i, test_case in enumerate(test_cases):
537
print(f"\nTest case {i + 1}: {test_case['expected']}")
538
try:
539
result = resilient_operation.remote(test_case["data"])
540
print(f"Result: {result}")
541
except Exception as e:
542
print(f"Final error: {type(e).__name__}: {e}")
543
```
544
545
### Intelligent File Processing Pipeline
546
547
```python
548
import modal
549
from pathlib import Path
550
import json
551
552
app = modal.App("file-processing-pipeline")
553
554
class FileProcessor:
555
"""Advanced file processing with pattern matching"""
556
557
def __init__(self):
558
# Define processing rules based on file patterns
559
self.processors = {
560
"python": {
561
"matcher": modal.FilePatternMatcher("*.py"),
562
"action": self.process_python_file,
563
"priority": 1
564
},
565
"javascript": {
566
"matcher": modal.FilePatternMatcher("*.js", "*.ts"),
567
"action": self.process_js_file,
568
"priority": 1
569
},
570
"config": {
571
"matcher": modal.FilePatternMatcher("*.json", "*.yaml", "*.yml"),
572
"action": self.process_config_file,
573
"priority": 2
574
},
575
"documentation": {
576
"matcher": modal.FilePatternMatcher("*.md", "*.rst", "*.txt"),
577
"action": self.process_doc_file,
578
"priority": 3
579
},
580
"tests": {
581
"matcher": modal.FilePatternMatcher("**/test_*.py", "**/*_test.py", "**/tests/**/*.py"),
582
"action": self.process_test_file,
583
"priority": 0 # Highest priority
584
}
585
}
586
587
# Files to exclude from processing
588
self.exclude_matcher = modal.FilePatternMatcher(
589
"**/__pycache__/**",
590
"**/node_modules/**",
591
"**/.git/**",
592
"**/venv/**",
593
"**/*.pyc"
594
)
595
596
def process_python_file(self, file_path: Path) -> dict:
597
"""Process Python source files"""
598
return {
599
"type": "python",
600
"analysis": "syntax_check_passed",
601
"metrics": {"lines": 100, "functions": 5},
602
"issues": []
603
}
604
605
def process_js_file(self, file_path: Path) -> dict:
606
"""Process JavaScript/TypeScript files"""
607
return {
608
"type": "javascript",
609
"analysis": "linting_passed",
610
"metrics": {"lines": 75, "functions": 3},
611
"issues": []
612
}
613
614
def process_config_file(self, file_path: Path) -> dict:
615
"""Process configuration files"""
616
return {
617
"type": "config",
618
"analysis": "valid_format",
619
"validation": "schema_compliant",
620
"issues": []
621
}
622
623
def process_doc_file(self, file_path: Path) -> dict:
624
"""Process documentation files"""
625
return {
626
"type": "documentation",
627
"analysis": "spell_check_passed",
628
"metrics": {"word_count": 500},
629
"issues": []
630
}
631
632
def process_test_file(self, file_path: Path) -> dict:
633
"""Process test files"""
634
return {
635
"type": "test",
636
"analysis": "test_discovery_complete",
637
"metrics": {"test_count": 10, "coverage": 85.5},
638
"issues": []
639
}
640
641
def classify_file(self, file_path: Path) -> tuple[str, dict]:
642
"""Classify file and return appropriate processor"""
643
# Check exclusions first
644
if self.exclude_matcher(file_path):
645
return "excluded", {}
646
647
# Find matching processor with highest priority
648
matches = []
649
for name, processor in self.processors.items():
650
if processor["matcher"](file_path):
651
matches.append((processor["priority"], name, processor))
652
653
if matches:
654
# Sort by priority (lower number = higher priority)
655
matches.sort(key=lambda x: x[0])
656
_, name, processor = matches[0]
657
return name, processor
658
659
return "unknown", {}
660
661
@app.function()
662
def intelligent_file_processing(file_paths: list[str]) -> dict:
663
"""Process files using intelligent pattern matching"""
664
665
processor = FileProcessor()
666
results = {
667
"processed": {},
668
"excluded": [],
669
"unknown": [],
670
"errors": [],
671
"statistics": {
672
"total_files": len(file_paths),
673
"processed_count": 0,
674
"excluded_count": 0,
675
"unknown_count": 0,
676
"error_count": 0
677
}
678
}
679
680
for file_path_str in file_paths:
681
try:
682
file_path = Path(file_path_str)
683
file_type, processor_info = processor.classify_file(file_path)
684
685
if file_type == "excluded":
686
results["excluded"].append(file_path_str)
687
results["statistics"]["excluded_count"] += 1
688
689
elif file_type == "unknown":
690
results["unknown"].append(file_path_str)
691
results["statistics"]["unknown_count"] += 1
692
693
else:
694
# Process the file
695
process_result = processor_info["action"](file_path)
696
results["processed"][file_path_str] = {
697
"file_type": file_type,
698
"priority": processor_info["priority"],
699
"result": process_result
700
}
701
results["statistics"]["processed_count"] += 1
702
703
except Exception as e:
704
error_info = {
705
"file": file_path_str,
706
"error": str(e),
707
"error_type": type(e).__name__
708
}
709
results["errors"].append(error_info)
710
results["statistics"]["error_count"] += 1
711
712
return results
713
714
@app.function()
715
def batch_file_analysis(directory_paths: list[str]) -> dict:
716
"""Analyze entire directories with pattern-based filtering"""
717
718
processor = FileProcessor()
719
analysis = {
720
"directories": {},
721
"summary": {
722
"total_directories": len(directory_paths),
723
"total_files": 0,
724
"file_type_counts": {},
725
"processing_time": 0
726
}
727
}
728
729
start_time = time.time()
730
731
for dir_path_str in directory_paths:
732
dir_path = Path(dir_path_str)
733
dir_analysis = {
734
"files": [],
735
"file_counts": {},
736
"excluded_files": []
737
}
738
739
if dir_path.exists() and dir_path.is_dir():
740
for file_path in dir_path.rglob("*"):
741
if file_path.is_file():
742
analysis["summary"]["total_files"] += 1
743
relative_path = file_path.relative_to(dir_path)
744
745
file_type, processor_info = processor.classify_file(relative_path)
746
747
if file_type == "excluded":
748
dir_analysis["excluded_files"].append(str(relative_path))
749
else:
750
dir_analysis["files"].append({
751
"path": str(relative_path),
752
"type": file_type,
753
"priority": processor_info.get("priority", 999)
754
})
755
756
# Update counts
757
dir_analysis["file_counts"][file_type] = dir_analysis["file_counts"].get(file_type, 0) + 1
758
analysis["summary"]["file_type_counts"][file_type] = analysis["summary"]["file_type_counts"].get(file_type, 0) + 1
759
760
analysis["directories"][dir_path_str] = dir_analysis
761
762
analysis["summary"]["processing_time"] = time.time() - start_time
763
return analysis
764
765
@app.local_entrypoint()
766
def main():
767
# Test intelligent file processing
768
sample_files = [
769
"src/main.py",
770
"src/utils.py",
771
"tests/test_main.py",
772
"config/settings.json",
773
"docs/README.md",
774
"package.json",
775
"node_modules/lib/index.js", # Should be excluded
776
"__pycache__/main.cpython-39.pyc", # Should be excluded
777
"data/sample.csv", # Unknown type
778
"Dockerfile" # Unknown type
779
]
780
781
processing_result = intelligent_file_processing.remote(sample_files)
782
print("Intelligent processing result:")
783
print(json.dumps(processing_result, indent=2))
784
785
# Test batch directory analysis
786
directories = ["./src", "./tests", "./config"]
787
analysis_result = batch_file_analysis.remote(directories)
788
print("\nBatch analysis result:")
789
print(json.dumps(analysis_result, indent=2))
790
```