0
# Logging and Utilities
1
2
Logging configuration, utility functions, and helper methods for string manipulation, data processing, and common operations across the Prowler ecosystem. This module provides centralized logging management, debugging support, and essential utility functions for security assessment workflows.
3
4
## Capabilities
5
6
### Logging Configuration
7
8
Comprehensive logging system with configurable levels, file output, and formatting options.
9
10
```python { .api }
11
def set_logging_config(
12
log_level: str,
13
log_file: str = None,
14
only_logs: bool = False
15
):
16
"""
17
Configure logging for Prowler operations.
18
19
Sets up the global logging configuration with specified level,
20
optional file output, and formatting preferences for consistent
21
logging across all Prowler components.
22
23
Parameters:
24
- log_level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
25
- log_file: Optional path to log file for persistent logging
26
- only_logs: Whether to suppress banner and progress output (quiet mode)
27
28
Returns:
29
None (configures global logger)
30
31
Raises:
32
ProwlerException: On invalid log level or file access errors
33
"""
34
35
logger: Logger
36
"""
37
Global logger instance for Prowler operations.
38
39
Pre-configured logger object used throughout Prowler for consistent
40
logging behavior across all modules and components.
41
"""
42
43
logging_levels: Dict[str, int] = {
44
"DEBUG": 10,
45
"INFO": 20,
46
"WARNING": 30,
47
"ERROR": 40,
48
"CRITICAL": 50
49
}
50
"""
51
Dictionary mapping log level names to logging constants.
52
53
Provides standardized mapping between string level names and
54
Python logging module constants for configuration purposes.
55
"""
56
```
57
58
### String Utilities
59
60
Helper functions for string manipulation and formatting commonly used in security assessments.
61
62
```python { .api }
63
def strip_ansi_codes(text: str) -> str:
64
"""
65
Remove ANSI color codes from text strings.
66
67
Strips ANSI escape sequences used for terminal coloring
68
to produce clean text suitable for file output or processing.
69
70
Parameters:
71
- text: Input string potentially containing ANSI codes
72
73
Returns:
74
Clean string with ANSI codes removed
75
"""
76
77
def format_json_output(data: dict, indent: int = 2) -> str:
78
"""
79
Format dictionary data as pretty-printed JSON.
80
81
Converts Python dictionaries to formatted JSON strings
82
with proper indentation and sorting for readable output.
83
84
Parameters:
85
- data: Dictionary to convert to JSON
86
- indent: Number of spaces for indentation (default: 2)
87
88
Returns:
89
Formatted JSON string
90
91
Raises:
92
TypeError: On non-serializable data types
93
"""
94
95
def normalize_resource_name(name: str) -> str:
96
"""
97
Normalize resource names for consistent identification.
98
99
Standardizes resource name formatting by removing special
100
characters, normalizing case, and applying consistent patterns.
101
102
Parameters:
103
- name: Raw resource name string
104
105
Returns:
106
Normalized resource name suitable for identification
107
"""
108
109
def generate_finding_uid(
110
check_id: str,
111
resource_uid: str,
112
region: str = "",
113
account_id: str = ""
114
) -> str:
115
"""
116
Generate unique identifier for security findings.
117
118
Creates deterministic unique identifiers for findings based on
119
check ID, resource identifier, and optional context information.
120
121
Parameters:
122
- check_id: Security check identifier
123
- resource_uid: Resource unique identifier
124
- region: Optional region information
125
- account_id: Optional account identifier
126
127
Returns:
128
Unique finding identifier string
129
"""
130
```
131
132
### Data Processing Utilities
133
134
Functions for processing and manipulating data structures commonly used in security assessments.
135
136
```python { .api }
137
def flatten_dict(
138
nested_dict: dict,
139
separator: str = ".",
140
prefix: str = ""
141
) -> dict:
142
"""
143
Flatten nested dictionary structures.
144
145
Converts nested dictionaries to flat structures with
146
concatenated keys for easier processing and analysis.
147
148
Parameters:
149
- nested_dict: Dictionary with nested structures
150
- separator: Character to use for key concatenation (default: ".")
151
- prefix: Optional prefix for all keys
152
153
Returns:
154
Flattened dictionary with concatenated keys
155
"""
156
157
def merge_dicts(*dicts: dict) -> dict:
158
"""
159
Merge multiple dictionaries with conflict resolution.
160
161
Combines multiple dictionaries into a single dictionary,
162
handling key conflicts by preferring later values.
163
164
Parameters:
165
*dicts: Variable number of dictionaries to merge
166
167
Returns:
168
Merged dictionary containing all key-value pairs
169
"""
170
171
def filter_empty_values(data: dict) -> dict:
172
"""
173
Remove empty values from dictionary structures.
174
175
Recursively removes None, empty strings, empty lists,
176
and empty dictionaries from nested data structures.
177
178
Parameters:
179
- data: Dictionary to filter
180
181
Returns:
182
Dictionary with empty values removed
183
"""
184
185
def chunk_list(input_list: List[Any], chunk_size: int) -> List[List[Any]]:
186
"""
187
Split list into smaller chunks of specified size.
188
189
Divides large lists into smaller manageable chunks for
190
batch processing and parallel execution scenarios.
191
192
Parameters:
193
- input_list: List to split into chunks
194
- chunk_size: Maximum size of each chunk
195
196
Returns:
197
List of lists containing chunked data
198
"""
199
```
200
201
### File and Path Utilities
202
203
Helper functions for file system operations and path management.
204
205
```python { .api }
206
def ensure_directory_exists(directory_path: str) -> str:
207
"""
208
Ensure directory exists, creating if necessary.
209
210
Creates directory structure if it doesn't exist,
211
handling permissions and parent directory creation.
212
213
Parameters:
214
- directory_path: Path to directory
215
216
Returns:
217
Absolute path to the directory
218
219
Raises:
220
ProwlerException: On directory creation errors or permission issues
221
"""
222
223
def get_file_extension(filename: str) -> str:
224
"""
225
Extract file extension from filename.
226
227
Returns the file extension (without dot) from a filename
228
or path, handling edge cases and special characters.
229
230
Parameters:
231
- filename: Filename or path string
232
233
Returns:
234
File extension string (without leading dot)
235
"""
236
237
def sanitize_filename(filename: str) -> str:
238
"""
239
Sanitize filename for cross-platform compatibility.
240
241
Removes or replaces characters that are invalid in filenames
242
across different operating systems and file systems.
243
244
Parameters:
245
- filename: Raw filename string
246
247
Returns:
248
Sanitized filename safe for file system operations
249
"""
250
251
def calculate_file_hash(file_path: str, algorithm: str = "sha256") -> str:
252
"""
253
Calculate cryptographic hash of file contents.
254
255
Computes hash values for files using specified algorithm
256
for integrity verification and duplicate detection.
257
258
Parameters:
259
- file_path: Path to file for hashing
260
- algorithm: Hash algorithm (md5, sha1, sha256, sha512)
261
262
Returns:
263
Hexadecimal hash string
264
265
Raises:
266
ProwlerException: On file access errors or invalid algorithm
267
"""
268
```
269
270
### Time and Date Utilities
271
272
Functions for handling timestamps, date formatting, and time zone operations.
273
274
```python { .api }
275
def get_iso_timestamp() -> str:
276
"""
277
Get current timestamp in ISO 8601 format.
278
279
Returns current UTC timestamp in ISO 8601 format
280
suitable for standardized logging and finding metadata.
281
282
Returns:
283
ISO 8601 formatted timestamp string (UTC)
284
"""
285
286
def parse_timestamp(timestamp_str: str) -> datetime:
287
"""
288
Parse timestamp string to datetime object.
289
290
Handles multiple timestamp formats commonly found in
291
cloud APIs and log files, with automatic format detection.
292
293
Parameters:
294
- timestamp_str: Timestamp string in various formats
295
296
Returns:
297
Python datetime object
298
299
Raises:
300
ValueError: On unparseable timestamp formats
301
"""
302
303
def format_duration(seconds: float) -> str:
304
"""
305
Format duration in seconds to human-readable string.
306
307
Converts numeric duration values to human-readable format
308
with appropriate units (seconds, minutes, hours).
309
310
Parameters:
311
- seconds: Duration in seconds (float for sub-second precision)
312
313
Returns:
314
Human-readable duration string (e.g., "2m 30s", "1h 15m")
315
"""
316
```
317
318
### Validation Utilities
319
320
Functions for validating data formats, URLs, and security-related patterns.
321
322
```python { .api }
323
def validate_email_address(email: str) -> bool:
324
"""
325
Validate email address format.
326
327
Checks if string matches valid email address patterns
328
using regex validation suitable for security contexts.
329
330
Parameters:
331
- email: Email address string to validate
332
333
Returns:
334
True if valid email format, False otherwise
335
"""
336
337
def validate_url(url: str) -> bool:
338
"""
339
Validate URL format and accessibility.
340
341
Checks URL format and optionally verifies accessibility
342
for security-related URL validation scenarios.
343
344
Parameters:
345
- url: URL string to validate
346
347
Returns:
348
True if valid URL format, False otherwise
349
"""
350
351
def validate_ip_address(ip: str) -> bool:
352
"""
353
Validate IP address format (IPv4 and IPv6).
354
355
Checks if string represents a valid IPv4 or IPv6 address
356
using standard validation patterns.
357
358
Parameters:
359
- ip: IP address string to validate
360
361
Returns:
362
True if valid IP address, False otherwise
363
"""
364
365
def validate_arn(arn: str) -> bool:
366
"""
367
Validate AWS ARN (Amazon Resource Name) format.
368
369
Checks if string matches valid AWS ARN pattern with
370
proper partition, service, region, and resource components.
371
372
Parameters:
373
- arn: ARN string to validate
374
375
Returns:
376
True if valid ARN format, False otherwise
377
"""
378
```
379
380
## Usage Examples
381
382
### Basic Logging Setup
383
384
```python
385
from prowler.lib.logger import set_logging_config, logger
386
387
# Configure logging with INFO level
388
set_logging_config("INFO")
389
390
# Log messages
391
logger.info("Starting security assessment")
392
logger.warning("Non-critical issue detected")
393
logger.error("Failed to authenticate with provider")
394
395
# Configure with file output
396
set_logging_config(
397
log_level="DEBUG",
398
log_file="/var/log/prowler.log",
399
only_logs=True # Quiet mode
400
)
401
402
logger.debug("Detailed debug information")
403
```
404
405
### String Processing
406
407
```python
408
from prowler.lib.utils.utils import (
409
strip_ansi_codes,
410
format_json_output,
411
normalize_resource_name,
412
generate_finding_uid
413
)
414
415
# Clean ANSI codes from terminal output
416
colored_text = "\033[31mERROR\033[0m: Failed check"
417
clean_text = strip_ansi_codes(colored_text)
418
print(f"Clean text: {clean_text}")
419
420
# Format JSON output
421
data = {"status": "FAIL", "resources": ["r1", "r2"]}
422
json_output = format_json_output(data, indent=4)
423
print(json_output)
424
425
# Normalize resource names
426
raw_name = "My Test Resource (Production)"
427
normalized = normalize_resource_name(raw_name)
428
print(f"Normalized: {normalized}")
429
430
# Generate finding UIDs
431
finding_uid = generate_finding_uid(
432
check_id="iam_user_mfa_enabled",
433
resource_uid="arn:aws:iam::123456789012:user/testuser",
434
region="us-east-1",
435
account_id="123456789012"
436
)
437
print(f"Finding UID: {finding_uid}")
438
```
439
440
### Data Processing
441
442
```python
443
from prowler.lib.utils.utils import (
444
flatten_dict,
445
merge_dicts,
446
filter_empty_values,
447
chunk_list
448
)
449
450
# Flatten nested configuration
451
config = {
452
"aws": {
453
"regions": ["us-east-1", "us-west-2"],
454
"services": {"ec2": True, "s3": True}
455
}
456
}
457
flat_config = flatten_dict(config)
458
print(f"Flattened: {flat_config}")
459
460
# Merge multiple configurations
461
base_config = {"timeout": 30, "retries": 3}
462
env_config = {"timeout": 60, "debug": True}
463
merged = merge_dicts(base_config, env_config)
464
print(f"Merged: {merged}")
465
466
# Filter empty values
467
data_with_empties = {
468
"name": "test",
469
"description": "",
470
"tags": {},
471
"items": [],
472
"config": None
473
}
474
filtered = filter_empty_values(data_with_empties)
475
print(f"Filtered: {filtered}")
476
477
# Process large lists in chunks
478
large_list = list(range(1000))
479
chunks = chunk_list(large_list, chunk_size=100)
480
print(f"Created {len(chunks)} chunks of max size 100")
481
```
482
483
### File Operations
484
485
```python
486
from prowler.lib.utils.utils import (
487
ensure_directory_exists,
488
sanitize_filename,
489
calculate_file_hash
490
)
491
import os
492
493
# Ensure output directory exists
494
output_dir = ensure_directory_exists("/tmp/prowler-output")
495
print(f"Output directory: {output_dir}")
496
497
# Sanitize filename for cross-platform compatibility
498
raw_filename = "Prowler Report: AWS/IAM Analysis (2024-01-01).json"
499
safe_filename = sanitize_filename(raw_filename)
500
print(f"Safe filename: {safe_filename}")
501
502
# Calculate file hash for integrity verification
503
if os.path.exists("/tmp/test-file.json"):
504
file_hash = calculate_file_hash("/tmp/test-file.json", "sha256")
505
print(f"File hash: {file_hash}")
506
```
507
508
### Time and Date Operations
509
510
```python
511
from prowler.lib.utils.utils import (
512
get_iso_timestamp,
513
parse_timestamp,
514
format_duration
515
)
516
from datetime import datetime
517
518
# Get current timestamp
519
current_time = get_iso_timestamp()
520
print(f"Current time: {current_time}")
521
522
# Parse various timestamp formats
523
timestamp_formats = [
524
"2024-01-01T12:00:00Z",
525
"2024-01-01 12:00:00",
526
"1704110400" # Unix timestamp
527
]
528
529
for ts in timestamp_formats:
530
try:
531
parsed = parse_timestamp(ts)
532
print(f"{ts} -> {parsed}")
533
except ValueError as e:
534
print(f"Failed to parse {ts}: {e}")
535
536
# Format execution duration
537
start_time = datetime.now()
538
# ... perform operations ...
539
duration = (datetime.now() - start_time).total_seconds()
540
formatted_duration = format_duration(duration)
541
print(f"Execution time: {formatted_duration}")
542
```
543
544
### Validation Functions
545
546
```python
547
from prowler.lib.utils.utils import (
548
validate_email_address,
549
validate_url,
550
validate_ip_address,
551
validate_arn
552
)
553
554
# Validate different data formats
555
test_data = {
556
"emails": ["user@example.com", "invalid-email", "test@domain.co.uk"],
557
"urls": ["https://example.com", "ftp://server.com", "invalid-url"],
558
"ips": ["192.168.1.1", "2001:db8::1", "999.999.999.999"],
559
"arns": [
560
"arn:aws:iam::123456789012:user/testuser",
561
"arn:aws:s3:::my-bucket",
562
"invalid-arn-format"
563
]
564
}
565
566
# Validate emails
567
for email in test_data["emails"]:
568
is_valid = validate_email_address(email)
569
print(f"Email {email}: {'valid' if is_valid else 'invalid'}")
570
571
# Validate URLs
572
for url in test_data["urls"]:
573
is_valid = validate_url(url)
574
print(f"URL {url}: {'valid' if is_valid else 'invalid'}")
575
576
# Validate IP addresses
577
for ip in test_data["ips"]:
578
is_valid = validate_ip_address(ip)
579
print(f"IP {ip}: {'valid' if is_valid else 'invalid'}")
580
581
# Validate ARNs
582
for arn in test_data["arns"]:
583
is_valid = validate_arn(arn)
584
print(f"ARN {arn}: {'valid' if is_valid else 'invalid'}")
585
```
586
587
### Advanced Logging Configuration
588
589
```python
590
from prowler.lib.logger import set_logging_config, logger, logging_levels
591
import logging
592
593
# Configure detailed logging with custom format
594
set_logging_config(
595
log_level="DEBUG",
596
log_file="/var/log/prowler-detailed.log"
597
)
598
599
# Create custom logger for specific component
600
component_logger = logging.getLogger("prowler.aws.iam")
601
component_logger.setLevel(logging.DEBUG)
602
603
# Log structured data
604
logger.info("Check execution started", extra={
605
"check_id": "iam_user_mfa_enabled",
606
"provider": "aws",
607
"account": "123456789012"
608
})
609
610
# Log with different severity levels
611
for level_name, level_value in logging_levels.items():
612
logger.log(level_value, f"Test message at {level_name} level")
613
```