Open source cloud security assessment tool for AWS, Azure, GCP, and Kubernetes with hundreds of compliance checks.
—
Logging configuration, utility functions, and helper methods for string manipulation, data processing, and common operations across the Prowler ecosystem. This module provides centralized logging management, debugging support, and essential utility functions for security assessment workflows.
Comprehensive logging system with configurable levels, file output, and formatting options.
def set_logging_config(
log_level: str,
log_file: str = None,
only_logs: bool = False
):
"""
Configure logging for Prowler operations.
Sets up the global logging configuration with specified level,
optional file output, and formatting preferences for consistent
logging across all Prowler components.
Parameters:
- log_level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
- log_file: Optional path to log file for persistent logging
- only_logs: Whether to suppress banner and progress output (quiet mode)
Returns:
None (configures global logger)
Raises:
ProwlerException: On invalid log level or file access errors
"""
logger: Logger
"""
Global logger instance for Prowler operations.
Pre-configured logger object used throughout Prowler for consistent
logging behavior across all modules and components.
"""
logging_levels: Dict[str, int] = {
"DEBUG": 10,
"INFO": 20,
"WARNING": 30,
"ERROR": 40,
"CRITICAL": 50
}
"""
Dictionary mapping log level names to logging constants.
Provides standardized mapping between string level names and
Python logging module constants for configuration purposes.
"""Helper functions for string manipulation and formatting commonly used in security assessments.
def strip_ansi_codes(text: str) -> str:
"""
Remove ANSI color codes from text strings.
Strips ANSI escape sequences used for terminal coloring
to produce clean text suitable for file output or processing.
Parameters:
- text: Input string potentially containing ANSI codes
Returns:
Clean string with ANSI codes removed
"""
def format_json_output(data: dict, indent: int = 2) -> str:
"""
Format dictionary data as pretty-printed JSON.
Converts Python dictionaries to formatted JSON strings
with proper indentation and sorting for readable output.
Parameters:
- data: Dictionary to convert to JSON
- indent: Number of spaces for indentation (default: 2)
Returns:
Formatted JSON string
Raises:
TypeError: On non-serializable data types
"""
def normalize_resource_name(name: str) -> str:
"""
Normalize resource names for consistent identification.
Standardizes resource name formatting by removing special
characters, normalizing case, and applying consistent patterns.
Parameters:
- name: Raw resource name string
Returns:
Normalized resource name suitable for identification
"""
def generate_finding_uid(
check_id: str,
resource_uid: str,
region: str = "",
account_id: str = ""
) -> str:
"""
Generate unique identifier for security findings.
Creates deterministic unique identifiers for findings based on
check ID, resource identifier, and optional context information.
Parameters:
- check_id: Security check identifier
- resource_uid: Resource unique identifier
- region: Optional region information
- account_id: Optional account identifier
Returns:
Unique finding identifier string
"""Functions for processing and manipulating data structures commonly used in security assessments.
def flatten_dict(
nested_dict: dict,
separator: str = ".",
prefix: str = ""
) -> dict:
"""
Flatten nested dictionary structures.
Converts nested dictionaries to flat structures with
concatenated keys for easier processing and analysis.
Parameters:
- nested_dict: Dictionary with nested structures
- separator: Character to use for key concatenation (default: ".")
- prefix: Optional prefix for all keys
Returns:
Flattened dictionary with concatenated keys
"""
def merge_dicts(*dicts: dict) -> dict:
"""
Merge multiple dictionaries with conflict resolution.
Combines multiple dictionaries into a single dictionary,
handling key conflicts by preferring later values.
Parameters:
*dicts: Variable number of dictionaries to merge
Returns:
Merged dictionary containing all key-value pairs
"""
def filter_empty_values(data: dict) -> dict:
"""
Remove empty values from dictionary structures.
Recursively removes None, empty strings, empty lists,
and empty dictionaries from nested data structures.
Parameters:
- data: Dictionary to filter
Returns:
Dictionary with empty values removed
"""
def chunk_list(input_list: List[Any], chunk_size: int) -> List[List[Any]]:
"""
Split list into smaller chunks of specified size.
Divides large lists into smaller manageable chunks for
batch processing and parallel execution scenarios.
Parameters:
- input_list: List to split into chunks
- chunk_size: Maximum size of each chunk
Returns:
List of lists containing chunked data
"""Helper functions for file system operations and path management.
def ensure_directory_exists(directory_path: str) -> str:
"""
Ensure directory exists, creating if necessary.
Creates directory structure if it doesn't exist,
handling permissions and parent directory creation.
Parameters:
- directory_path: Path to directory
Returns:
Absolute path to the directory
Raises:
ProwlerException: On directory creation errors or permission issues
"""
def get_file_extension(filename: str) -> str:
"""
Extract file extension from filename.
Returns the file extension (without dot) from a filename
or path, handling edge cases and special characters.
Parameters:
- filename: Filename or path string
Returns:
File extension string (without leading dot)
"""
def sanitize_filename(filename: str) -> str:
"""
Sanitize filename for cross-platform compatibility.
Removes or replaces characters that are invalid in filenames
across different operating systems and file systems.
Parameters:
- filename: Raw filename string
Returns:
Sanitized filename safe for file system operations
"""
def calculate_file_hash(file_path: str, algorithm: str = "sha256") -> str:
"""
Calculate cryptographic hash of file contents.
Computes hash values for files using specified algorithm
for integrity verification and duplicate detection.
Parameters:
- file_path: Path to file for hashing
- algorithm: Hash algorithm (md5, sha1, sha256, sha512)
Returns:
Hexadecimal hash string
Raises:
ProwlerException: On file access errors or invalid algorithm
"""Functions for handling timestamps, date formatting, and time zone operations.
def get_iso_timestamp() -> str:
"""
Get current timestamp in ISO 8601 format.
Returns current UTC timestamp in ISO 8601 format
suitable for standardized logging and finding metadata.
Returns:
ISO 8601 formatted timestamp string (UTC)
"""
def parse_timestamp(timestamp_str: str) -> datetime:
"""
Parse timestamp string to datetime object.
Handles multiple timestamp formats commonly found in
cloud APIs and log files, with automatic format detection.
Parameters:
- timestamp_str: Timestamp string in various formats
Returns:
Python datetime object
Raises:
ValueError: On unparseable timestamp formats
"""
def format_duration(seconds: float) -> str:
"""
Format duration in seconds to human-readable string.
Converts numeric duration values to human-readable format
with appropriate units (seconds, minutes, hours).
Parameters:
- seconds: Duration in seconds (float for sub-second precision)
Returns:
Human-readable duration string (e.g., "2m 30s", "1h 15m")
"""Functions for validating data formats, URLs, and security-related patterns.
def validate_email_address(email: str) -> bool:
"""
Validate email address format.
Checks if string matches valid email address patterns
using regex validation suitable for security contexts.
Parameters:
- email: Email address string to validate
Returns:
True if valid email format, False otherwise
"""
def validate_url(url: str) -> bool:
"""
Validate URL format and accessibility.
Checks URL format and optionally verifies accessibility
for security-related URL validation scenarios.
Parameters:
- url: URL string to validate
Returns:
True if valid URL format, False otherwise
"""
def validate_ip_address(ip: str) -> bool:
"""
Validate IP address format (IPv4 and IPv6).
Checks if string represents a valid IPv4 or IPv6 address
using standard validation patterns.
Parameters:
- ip: IP address string to validate
Returns:
True if valid IP address, False otherwise
"""
def validate_arn(arn: str) -> bool:
"""
Validate AWS ARN (Amazon Resource Name) format.
Checks if string matches valid AWS ARN pattern with
proper partition, service, region, and resource components.
Parameters:
- arn: ARN string to validate
Returns:
True if valid ARN format, False otherwise
"""from prowler.lib.logger import set_logging_config, logger
# Configure logging with INFO level
set_logging_config("INFO")
# Log messages
logger.info("Starting security assessment")
logger.warning("Non-critical issue detected")
logger.error("Failed to authenticate with provider")
# Configure with file output
set_logging_config(
log_level="DEBUG",
log_file="/var/log/prowler.log",
only_logs=True # Quiet mode
)
logger.debug("Detailed debug information")from prowler.lib.utils.utils import (
strip_ansi_codes,
format_json_output,
normalize_resource_name,
generate_finding_uid
)
# Clean ANSI codes from terminal output
colored_text = "\033[31mERROR\033[0m: Failed check"
clean_text = strip_ansi_codes(colored_text)
print(f"Clean text: {clean_text}")
# Format JSON output
data = {"status": "FAIL", "resources": ["r1", "r2"]}
json_output = format_json_output(data, indent=4)
print(json_output)
# Normalize resource names
raw_name = "My Test Resource (Production)"
normalized = normalize_resource_name(raw_name)
print(f"Normalized: {normalized}")
# Generate finding UIDs
finding_uid = generate_finding_uid(
check_id="iam_user_mfa_enabled",
resource_uid="arn:aws:iam::123456789012:user/testuser",
region="us-east-1",
account_id="123456789012"
)
print(f"Finding UID: {finding_uid}")from prowler.lib.utils.utils import (
flatten_dict,
merge_dicts,
filter_empty_values,
chunk_list
)
# Flatten nested configuration
config = {
"aws": {
"regions": ["us-east-1", "us-west-2"],
"services": {"ec2": True, "s3": True}
}
}
flat_config = flatten_dict(config)
print(f"Flattened: {flat_config}")
# Merge multiple configurations
base_config = {"timeout": 30, "retries": 3}
env_config = {"timeout": 60, "debug": True}
merged = merge_dicts(base_config, env_config)
print(f"Merged: {merged}")
# Filter empty values
data_with_empties = {
"name": "test",
"description": "",
"tags": {},
"items": [],
"config": None
}
filtered = filter_empty_values(data_with_empties)
print(f"Filtered: {filtered}")
# Process large lists in chunks
large_list = list(range(1000))
chunks = chunk_list(large_list, chunk_size=100)
print(f"Created {len(chunks)} chunks of max size 100")from prowler.lib.utils.utils import (
ensure_directory_exists,
sanitize_filename,
calculate_file_hash
)
import os
# Ensure output directory exists
output_dir = ensure_directory_exists("/tmp/prowler-output")
print(f"Output directory: {output_dir}")
# Sanitize filename for cross-platform compatibility
raw_filename = "Prowler Report: AWS/IAM Analysis (2024-01-01).json"
safe_filename = sanitize_filename(raw_filename)
print(f"Safe filename: {safe_filename}")
# Calculate file hash for integrity verification
if os.path.exists("/tmp/test-file.json"):
file_hash = calculate_file_hash("/tmp/test-file.json", "sha256")
print(f"File hash: {file_hash}")from prowler.lib.utils.utils import (
get_iso_timestamp,
parse_timestamp,
format_duration
)
from datetime import datetime
# Get current timestamp
current_time = get_iso_timestamp()
print(f"Current time: {current_time}")
# Parse various timestamp formats
timestamp_formats = [
"2024-01-01T12:00:00Z",
"2024-01-01 12:00:00",
"1704110400" # Unix timestamp
]
for ts in timestamp_formats:
try:
parsed = parse_timestamp(ts)
print(f"{ts} -> {parsed}")
except ValueError as e:
print(f"Failed to parse {ts}: {e}")
# Format execution duration
start_time = datetime.now()
# ... perform operations ...
duration = (datetime.now() - start_time).total_seconds()
formatted_duration = format_duration(duration)
print(f"Execution time: {formatted_duration}")from prowler.lib.utils.utils import (
validate_email_address,
validate_url,
validate_ip_address,
validate_arn
)
# Validate different data formats
test_data = {
"emails": ["user@example.com", "invalid-email", "test@domain.co.uk"],
"urls": ["https://example.com", "ftp://server.com", "invalid-url"],
"ips": ["192.168.1.1", "2001:db8::1", "999.999.999.999"],
"arns": [
"arn:aws:iam::123456789012:user/testuser",
"arn:aws:s3:::my-bucket",
"invalid-arn-format"
]
}
# Validate emails
for email in test_data["emails"]:
is_valid = validate_email_address(email)
print(f"Email {email}: {'valid' if is_valid else 'invalid'}")
# Validate URLs
for url in test_data["urls"]:
is_valid = validate_url(url)
print(f"URL {url}: {'valid' if is_valid else 'invalid'}")
# Validate IP addresses
for ip in test_data["ips"]:
is_valid = validate_ip_address(ip)
print(f"IP {ip}: {'valid' if is_valid else 'invalid'}")
# Validate ARNs
for arn in test_data["arns"]:
is_valid = validate_arn(arn)
print(f"ARN {arn}: {'valid' if is_valid else 'invalid'}")from prowler.lib.logger import set_logging_config, logger, logging_levels
import logging
# Configure detailed logging with custom format
set_logging_config(
log_level="DEBUG",
log_file="/var/log/prowler-detailed.log"
)
# Create custom logger for specific component
component_logger = logging.getLogger("prowler.aws.iam")
component_logger.setLevel(logging.DEBUG)
# Log structured data
logger.info("Check execution started", extra={
"check_id": "iam_user_mfa_enabled",
"provider": "aws",
"account": "123456789012"
})
# Log with different severity levels
for level_name, level_value in logging_levels.items():
logger.log(level_value, f"Test message at {level_name} level")Install with Tessl CLI
npx tessl i tessl/pypi-prowler