CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-prowler

Open source cloud security assessment tool for AWS, Azure, GCP, and Kubernetes with hundreds of compliance checks.

Pending
Overview
Eval results
Files

logging-utilities.mddocs/

Logging and Utilities

Logging configuration, utility functions, and helper methods for string manipulation, data processing, and common operations across the Prowler ecosystem. This module provides centralized logging management, debugging support, and essential utility functions for security assessment workflows.

Capabilities

Logging Configuration

Comprehensive logging system with configurable levels, file output, and formatting options.

def set_logging_config(
    log_level: str,
    log_file: str = None,
    only_logs: bool = False
):
    """
    Configure logging for Prowler operations.
    
    Sets up the global logging configuration with specified level,
    optional file output, and formatting preferences for consistent
    logging across all Prowler components.
    
    Parameters:
    - log_level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
    - log_file: Optional path to log file for persistent logging
    - only_logs: Whether to suppress banner and progress output (quiet mode)
    
    Returns:
    None (configures global logger)
    
    Raises:
    ProwlerException: On invalid log level or file access errors
    """

logger: Logger
"""
Global logger instance for Prowler operations.

Pre-configured logger object used throughout Prowler for consistent
logging behavior across all modules and components.
"""

logging_levels: Dict[str, int] = {
    "DEBUG": 10,
    "INFO": 20,
    "WARNING": 30,
    "ERROR": 40,
    "CRITICAL": 50
}
"""
Dictionary mapping log level names to logging constants.

Provides standardized mapping between string level names and
Python logging module constants for configuration purposes.
"""

String Utilities

Helper functions for string manipulation and formatting commonly used in security assessments.

def strip_ansi_codes(text: str) -> str:
    """
    Remove ANSI color codes from text strings.
    
    Strips ANSI escape sequences used for terminal coloring
    to produce clean text suitable for file output or processing.
    
    Parameters:
    - text: Input string potentially containing ANSI codes
    
    Returns:
    Clean string with ANSI codes removed
    """

def format_json_output(data: dict, indent: int = 2) -> str:
    """
    Format dictionary data as pretty-printed JSON.
    
    Converts Python dictionaries to formatted JSON strings
    with proper indentation and sorting for readable output.
    
    Parameters:
    - data: Dictionary to convert to JSON
    - indent: Number of spaces for indentation (default: 2)
    
    Returns:
    Formatted JSON string
    
    Raises:
    TypeError: On non-serializable data types
    """

def normalize_resource_name(name: str) -> str:
    """
    Normalize resource names for consistent identification.
    
    Standardizes resource name formatting by removing special
    characters, normalizing case, and applying consistent patterns.
    
    Parameters:
    - name: Raw resource name string
    
    Returns:
    Normalized resource name suitable for identification
    """

def generate_finding_uid(
    check_id: str,
    resource_uid: str,
    region: str = "",
    account_id: str = ""
) -> str:
    """
    Generate unique identifier for security findings.
    
    Creates deterministic unique identifiers for findings based on
    check ID, resource identifier, and optional context information.
    
    Parameters:
    - check_id: Security check identifier
    - resource_uid: Resource unique identifier
    - region: Optional region information
    - account_id: Optional account identifier
    
    Returns:
    Unique finding identifier string
    """

Data Processing Utilities

Functions for processing and manipulating data structures commonly used in security assessments.

def flatten_dict(
    nested_dict: dict,
    separator: str = ".",
    prefix: str = ""
) -> dict:
    """
    Flatten nested dictionary structures.
    
    Converts nested dictionaries to flat structures with
    concatenated keys for easier processing and analysis.
    
    Parameters:
    - nested_dict: Dictionary with nested structures
    - separator: Character to use for key concatenation (default: ".")
    - prefix: Optional prefix for all keys
    
    Returns:
    Flattened dictionary with concatenated keys
    """

def merge_dicts(*dicts: dict) -> dict:
    """
    Merge multiple dictionaries with conflict resolution.
    
    Combines multiple dictionaries into a single dictionary,
    handling key conflicts by preferring later values.
    
    Parameters:
    *dicts: Variable number of dictionaries to merge
    
    Returns:
    Merged dictionary containing all key-value pairs
    """

def filter_empty_values(data: dict) -> dict:
    """
    Remove empty values from dictionary structures.
    
    Recursively removes None, empty strings, empty lists,
    and empty dictionaries from nested data structures.
    
    Parameters:
    - data: Dictionary to filter
    
    Returns:
    Dictionary with empty values removed
    """

def chunk_list(input_list: List[Any], chunk_size: int) -> List[List[Any]]:
    """
    Split list into smaller chunks of specified size.
    
    Divides large lists into smaller manageable chunks for
    batch processing and parallel execution scenarios.
    
    Parameters:
    - input_list: List to split into chunks
    - chunk_size: Maximum size of each chunk
    
    Returns:
    List of lists containing chunked data
    """

File and Path Utilities

Helper functions for file system operations and path management.

def ensure_directory_exists(directory_path: str) -> str:
    """
    Ensure directory exists, creating if necessary.
    
    Creates directory structure if it doesn't exist,
    handling permissions and parent directory creation.
    
    Parameters:
    - directory_path: Path to directory
    
    Returns:
    Absolute path to the directory
    
    Raises:
    ProwlerException: On directory creation errors or permission issues
    """

def get_file_extension(filename: str) -> str:
    """
    Extract file extension from filename.
    
    Returns the file extension (without dot) from a filename
    or path, handling edge cases and special characters.
    
    Parameters:
    - filename: Filename or path string
    
    Returns:
    File extension string (without leading dot)
    """

def sanitize_filename(filename: str) -> str:
    """
    Sanitize filename for cross-platform compatibility.
    
    Removes or replaces characters that are invalid in filenames
    across different operating systems and file systems.
    
    Parameters:
    - filename: Raw filename string
    
    Returns:
    Sanitized filename safe for file system operations
    """

def calculate_file_hash(file_path: str, algorithm: str = "sha256") -> str:
    """
    Calculate cryptographic hash of file contents.
    
    Computes hash values for files using specified algorithm
    for integrity verification and duplicate detection.
    
    Parameters:
    - file_path: Path to file for hashing
    - algorithm: Hash algorithm (md5, sha1, sha256, sha512)
    
    Returns:
    Hexadecimal hash string
    
    Raises:
    ProwlerException: On file access errors or invalid algorithm
    """

Time and Date Utilities

Functions for handling timestamps, date formatting, and time zone operations.

def get_iso_timestamp() -> str:
    """
    Get current timestamp in ISO 8601 format.
    
    Returns current UTC timestamp in ISO 8601 format
    suitable for standardized logging and finding metadata.
    
    Returns:
    ISO 8601 formatted timestamp string (UTC)
    """

def parse_timestamp(timestamp_str: str) -> datetime:
    """
    Parse timestamp string to datetime object.
    
    Handles multiple timestamp formats commonly found in
    cloud APIs and log files, with automatic format detection.
    
    Parameters:
    - timestamp_str: Timestamp string in various formats
    
    Returns:
    Python datetime object
    
    Raises:
    ValueError: On unparseable timestamp formats
    """

def format_duration(seconds: float) -> str:
    """
    Format duration in seconds to human-readable string.
    
    Converts numeric duration values to human-readable format
    with appropriate units (seconds, minutes, hours).
    
    Parameters:
    - seconds: Duration in seconds (float for sub-second precision)
    
    Returns:
    Human-readable duration string (e.g., "2m 30s", "1h 15m")
    """

Validation Utilities

Functions for validating data formats, URLs, and security-related patterns.

def validate_email_address(email: str) -> bool:
    """
    Validate email address format.
    
    Checks if string matches valid email address patterns
    using regex validation suitable for security contexts.
    
    Parameters:
    - email: Email address string to validate
    
    Returns:
    True if valid email format, False otherwise
    """

def validate_url(url: str) -> bool:
    """
    Validate URL format and accessibility.
    
    Checks URL format and optionally verifies accessibility
    for security-related URL validation scenarios.
    
    Parameters:
    - url: URL string to validate
    
    Returns:
    True if valid URL format, False otherwise
    """

def validate_ip_address(ip: str) -> bool:
    """
    Validate IP address format (IPv4 and IPv6).
    
    Checks if string represents a valid IPv4 or IPv6 address
    using standard validation patterns.
    
    Parameters:
    - ip: IP address string to validate
    
    Returns:
    True if valid IP address, False otherwise
    """

def validate_arn(arn: str) -> bool:
    """
    Validate AWS ARN (Amazon Resource Name) format.
    
    Checks if string matches valid AWS ARN pattern with
    proper partition, service, region, and resource components.
    
    Parameters:
    - arn: ARN string to validate
    
    Returns:
    True if valid ARN format, False otherwise
    """

Usage Examples

Basic Logging Setup

from prowler.lib.logger import set_logging_config, logger

# Configure logging with INFO level
set_logging_config("INFO")

# Log messages
logger.info("Starting security assessment")
logger.warning("Non-critical issue detected")
logger.error("Failed to authenticate with provider")

# Configure with file output
set_logging_config(
    log_level="DEBUG",
    log_file="/var/log/prowler.log",
    only_logs=True  # Quiet mode
)

logger.debug("Detailed debug information")

String Processing

from prowler.lib.utils.utils import (
    strip_ansi_codes,
    format_json_output,
    normalize_resource_name,
    generate_finding_uid
)

# Clean ANSI codes from terminal output
colored_text = "\033[31mERROR\033[0m: Failed check"
clean_text = strip_ansi_codes(colored_text)
print(f"Clean text: {clean_text}")

# Format JSON output
data = {"status": "FAIL", "resources": ["r1", "r2"]}
json_output = format_json_output(data, indent=4)
print(json_output)

# Normalize resource names
raw_name = "My Test Resource (Production)"
normalized = normalize_resource_name(raw_name)
print(f"Normalized: {normalized}")

# Generate finding UIDs
finding_uid = generate_finding_uid(
    check_id="iam_user_mfa_enabled",
    resource_uid="arn:aws:iam::123456789012:user/testuser",
    region="us-east-1",
    account_id="123456789012"
)
print(f"Finding UID: {finding_uid}")

Data Processing

from prowler.lib.utils.utils import (
    flatten_dict,
    merge_dicts,
    filter_empty_values,
    chunk_list
)

# Flatten nested configuration
config = {
    "aws": {
        "regions": ["us-east-1", "us-west-2"],
        "services": {"ec2": True, "s3": True}
    }
}
flat_config = flatten_dict(config)
print(f"Flattened: {flat_config}")

# Merge multiple configurations
base_config = {"timeout": 30, "retries": 3}
env_config = {"timeout": 60, "debug": True}
merged = merge_dicts(base_config, env_config)
print(f"Merged: {merged}")

# Filter empty values
data_with_empties = {
    "name": "test",
    "description": "",
    "tags": {},
    "items": [],
    "config": None
}
filtered = filter_empty_values(data_with_empties)
print(f"Filtered: {filtered}")

# Process large lists in chunks
large_list = list(range(1000))
chunks = chunk_list(large_list, chunk_size=100)
print(f"Created {len(chunks)} chunks of max size 100")

File Operations

from prowler.lib.utils.utils import (
    ensure_directory_exists,
    sanitize_filename,
    calculate_file_hash
)
import os

# Ensure output directory exists
output_dir = ensure_directory_exists("/tmp/prowler-output")
print(f"Output directory: {output_dir}")

# Sanitize filename for cross-platform compatibility
raw_filename = "Prowler Report: AWS/IAM Analysis (2024-01-01).json"
safe_filename = sanitize_filename(raw_filename)
print(f"Safe filename: {safe_filename}")

# Calculate file hash for integrity verification
if os.path.exists("/tmp/test-file.json"):
    file_hash = calculate_file_hash("/tmp/test-file.json", "sha256")
    print(f"File hash: {file_hash}")

Time and Date Operations

from prowler.lib.utils.utils import (
    get_iso_timestamp,
    parse_timestamp,
    format_duration
)
from datetime import datetime

# Get current timestamp
current_time = get_iso_timestamp()
print(f"Current time: {current_time}")

# Parse various timestamp formats
timestamp_formats = [
    "2024-01-01T12:00:00Z",
    "2024-01-01 12:00:00",
    "1704110400"  # Unix timestamp
]

for ts in timestamp_formats:
    try:
        parsed = parse_timestamp(ts)
        print(f"{ts} -> {parsed}")
    except ValueError as e:
        print(f"Failed to parse {ts}: {e}")

# Format execution duration
start_time = datetime.now()
# ... perform operations ...
duration = (datetime.now() - start_time).total_seconds()
formatted_duration = format_duration(duration)
print(f"Execution time: {formatted_duration}")

Validation Functions

from prowler.lib.utils.utils import (
    validate_email_address,
    validate_url,
    validate_ip_address,
    validate_arn
)

# Validate different data formats
test_data = {
    "emails": ["user@example.com", "invalid-email", "test@domain.co.uk"],
    "urls": ["https://example.com", "ftp://server.com", "invalid-url"],
    "ips": ["192.168.1.1", "2001:db8::1", "999.999.999.999"],
    "arns": [
        "arn:aws:iam::123456789012:user/testuser",
        "arn:aws:s3:::my-bucket",
        "invalid-arn-format"
    ]
}

# Validate emails
for email in test_data["emails"]:
    is_valid = validate_email_address(email)
    print(f"Email {email}: {'valid' if is_valid else 'invalid'}")

# Validate URLs
for url in test_data["urls"]:
    is_valid = validate_url(url)
    print(f"URL {url}: {'valid' if is_valid else 'invalid'}")

# Validate IP addresses
for ip in test_data["ips"]:
    is_valid = validate_ip_address(ip)
    print(f"IP {ip}: {'valid' if is_valid else 'invalid'}")

# Validate ARNs
for arn in test_data["arns"]:
    is_valid = validate_arn(arn)
    print(f"ARN {arn}: {'valid' if is_valid else 'invalid'}")

Advanced Logging Configuration

from prowler.lib.logger import set_logging_config, logger, logging_levels
import logging

# Configure detailed logging with custom format
set_logging_config(
    log_level="DEBUG",
    log_file="/var/log/prowler-detailed.log"
)

# Create custom logger for specific component
component_logger = logging.getLogger("prowler.aws.iam")
component_logger.setLevel(logging.DEBUG)

# Log structured data
logger.info("Check execution started", extra={
    "check_id": "iam_user_mfa_enabled",
    "provider": "aws",
    "account": "123456789012"
})

# Log with different severity levels
for level_name, level_value in logging_levels.items():
    logger.log(level_value, f"Test message at {level_name} level")

Install with Tessl CLI

npx tessl i tessl/pypi-prowler

docs

check-management.md

check-models.md

cli-interface.md

configuration.md

finding-management.md

index.md

logging-utilities.md

provider-framework.md

tile.json