CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-prowler

Open source cloud security assessment tool for AWS, Azure, GCP, and Kubernetes with hundreds of compliance checks.

Pending
Overview
Eval results
Files

finding-management.mddocs/

Finding Management and Output

Comprehensive finding representation and output generation supporting multiple formats including JSON, CSV, HTML, ASFF (AWS Security Finding Format), and OCSF (Open Cybersecurity Schema Framework). This module provides standardized finding processing, compliance reporting, and integration capabilities for security assessments.

Capabilities

Finding Data Model

Core model representing security assessment findings with comprehensive metadata and compliance information.

class Finding(BaseModel):
    """
    Pydantic model representing a security finding across different providers.
    
    This class encapsulates the details of a finding and supports
    serialization to various formats such as CSV. It serves as the base
    model for storing and managing finding information for every provider.
    
    Attributes:
    - auth_method: str - Authentication method used for the scan
    - timestamp: Union[int, datetime] - Finding generation timestamp
    - account_uid: str - Account/subscription unique identifier
    - account_name: Optional[str] - Account/subscription name
    - account_email: Optional[str] - Account contact email
    - account_organization_uid: Optional[str] - Organization identifier
    - account_organization_name: Optional[str] - Organization name
    - metadata: CheckMetadata - Associated check metadata
    - account_tags: dict - Account-level tags and metadata
    - uid: str - Unique finding identifier
    - status: Status - Finding status (PASS/FAIL/MANUAL)
    - status_extended: str - Extended status information and details
    - muted: bool - Whether finding is muted/suppressed
    - resource_uid: str - Resource unique identifier (ARN, ID, etc.)
    - resource_metadata: dict - Resource-specific metadata
    - resource_name: str - Resource name or identifier
    - resource_details: str - Additional resource details and context
    - resource_tags: dict - Resource-level tags
    - partition: Optional[str] - Cloud partition (aws, aws-cn, aws-us-gov, etc.)
    - region: str - Cloud region where resource is located
    - compliance: dict - Compliance framework mappings and requirements
    - prowler_version: str - Version of Prowler that generated the finding
    - raw: dict - Raw finding data for additional context
    """
    
    auth_method: str
    timestamp: Union[int, datetime]
    account_uid: str
    account_name: Optional[str] = None
    account_email: Optional[str] = None
    account_organization_uid: Optional[str] = None
    account_organization_name: Optional[str] = None
    metadata: CheckMetadata
    account_tags: dict = Field(default_factory=dict)
    uid: str
    status: Status
    status_extended: str
    muted: bool = False
    resource_uid: str
    resource_metadata: dict = Field(default_factory=dict)
    resource_name: str
    resource_details: str
    resource_tags: dict = Field(default_factory=dict)
    partition: Optional[str] = None
    region: str
    compliance: dict = Field(default_factory=dict)
    prowler_version: str = prowler_version
    raw: dict = Field(default_factory=dict)

    # Properties
    @property
    def provider(self) -> str:
        """
        Returns the provider from the finding check's metadata.
        
        Returns:
        str: Provider name (aws, azure, gcp, etc.)
        """

    @property
    def check_id(self) -> str:
        """
        Returns the ID from the finding check's metadata.
        
        Returns:
        str: Check unique identifier
        """

    @property
    def severity(self) -> str:
        """
        Returns the severity from the finding check's metadata.
        
        Returns:
        str: Severity level (critical, high, medium, low, informational)
        """

    @property
    def resource_type(self) -> str:
        """
        Returns the resource type from the finding check's metadata.
        
        Returns:
        str: Resource type being assessed
        """

    @property
    def service_name(self) -> str:
        """
        Returns the service name from the finding check's metadata.
        
        Returns:
        str: Cloud service name
        """

    # Instance Methods
    def get_metadata(self) -> dict:
        """
        Retrieves the metadata of the object and returns it as a dictionary with all keys in lowercase.
        
        Returns:
        dict: A dictionary containing the metadata with keys converted to lowercase
        """

    # Class Methods
    @classmethod
    def generate_output(
        cls, 
        provider: Provider, 
        check_output: Check_Report, 
        output_options
    ) -> "Finding":
        """
        Generates the output for a finding based on the provider and output options.

        Args:
        - provider: Provider - The provider object
        - check_output: Check_Report - The check output object
        - output_options: Provider-specific output options object

        Returns:
        Finding: The finding output object
        """

    @classmethod
    def transform_api_finding(cls, finding, provider) -> "Finding":
        """
        Transform a FindingModel instance into an API-friendly Finding object.

        This class method extracts data from a FindingModel instance and maps its
        properties to a new Finding object. The transformation populates various
        fields including authentication details, timestamp, account information,
        check metadata, as well as resource-specific data.

        Args:
        - finding: API Finding instance containing data from the database
        - provider: Provider - The provider object

        Returns:
        Finding: A new Finding instance populated with data from the provided model
        """

    @staticmethod
    def _transform_findings_stats(scan_summaries: list[dict]) -> dict:
        """
        Aggregate and transform scan summary data into findings statistics.

        This function processes a list of scan summary objects and calculates overall
        metrics such as the total number of passed and failed findings (including muted counts),
        as well as a breakdown of results by severity levels.

        Args:
        - scan_summaries: list[dict] - A list of scan summary objects

        Returns:
        dict: A dictionary containing aggregated findings statistics with metrics like:
            - total_pass: Total number of passed findings
            - total_fail: Total number of failed findings
            - total_muted_pass: Total number of muted passed findings
            - total_muted_fail: Total number of muted failed findings
            - resources_count: The unique resource count
            - findings_count: Total number of findings
            - Severity breakdowns (critical, high, medium, low)
            - all_fails_are_muted: Boolean indicating if all failing findings are muted
        """

Finding Status Enumeration

Standardized status values for security findings.

class Status(str, Enum):
    """
    Finding status enumeration.
    
    Standardized status values for security check results
    aligned with industry security assessment frameworks.
    """
    
    PASS = "PASS"      # Security check passed successfully
    FAIL = "FAIL"      # Security check failed, issue identified
    MANUAL = "MANUAL"  # Manual review required, cannot be automated
    MUTED = "MUTED"    # Finding has been muted/suppressed

Common Finding Functions

Utility functions for populating and processing finding data.

def fill_common_finding_data(
    finding: Finding,
    provider: Provider,
    check_metadata: CheckMetadata
) -> Finding:
    """
    Populate common finding fields from provider and metadata.
    
    Fills standard finding attributes that are consistent across
    all findings for a given provider and check execution.
    
    Parameters:
    - finding: Finding object to populate
    - provider: Provider instance with account/session information
    - check_metadata: CheckMetadata with check information
    
    Returns:
    Updated Finding object with common fields populated
    """

Output Format Handlers

Specialized classes for generating different output formats.

class CSV:
    """
    CSV output format handler.
    
    Generates comma-separated value files suitable for spreadsheet
    analysis and data processing workflows.
    """
    
    def __init__(self, findings: List[Finding], output_directory: str):
        """
        Initialize CSV output handler.
        
        Parameters:
        - findings: List of findings to export
        - output_directory: Target directory for CSV files
        """
    
    def generate_output(self) -> str:
        """
        Generate CSV output file.
        
        Returns:
        Path to generated CSV file
        """

class ASFF:
    """
    AWS Security Finding Format (ASFF) output handler.
    
    Generates ASFF-compliant JSON for integration with AWS Security Hub
    and other AWS security services.
    """
    
    def __init__(self, findings: List[Finding], output_directory: str):
        """
        Initialize ASFF output handler.
        
        Parameters:
        - findings: List of findings to convert
        - output_directory: Target directory for ASFF files
        """
    
    def generate_output(self) -> str:
        """
        Generate ASFF-compliant JSON output.
        
        Returns:
        Path to generated ASFF JSON file
        """

class OCSF:
    """
    Open Cybersecurity Schema Framework (OCSF) output handler.
    
    Generates OCSF-compliant JSON for standardized security
    event representation and SIEM integration.
    """
    
    def __init__(self, findings: List[Finding], output_directory: str):
        """
        Initialize OCSF output handler.
        
        Parameters:
        - findings: List of findings to convert
        - output_directory: Target directory for OCSF files
        """
    
    def generate_output(self) -> str:
        """
        Generate OCSF-compliant JSON output.
        
        Returns:
        Path to generated OCSF JSON file
        """

class HTML:
    """
    HTML report output handler.
    
    Generates comprehensive HTML reports with interactive features,
    charts, and detailed finding analysis.
    """
    
    def __init__(self, findings: List[Finding], output_directory: str):
        """
        Initialize HTML output handler.
        
        Parameters:
        - findings: List of findings for report generation
        - output_directory: Target directory for HTML files
        """
    
    def generate_output(self) -> str:
        """
        Generate interactive HTML report.
        
        Returns:
        Path to generated HTML report file
        """

class Slack:
    """
    Slack integration output handler.
    
    Sends finding summaries and alerts to Slack channels
    for real-time security notifications.
    """
    
    def __init__(
        self,
        findings: List[Finding],
        slack_token: str,
        channel: str
    ):
        """
        Initialize Slack output handler.
        
        Parameters:
        - findings: List of findings to send
        - slack_token: Slack API token
        - channel: Target Slack channel
        """
    
    def send_findings(self) -> dict:
        """
        Send findings to Slack channel.
        
        Returns:
        Dictionary with send results and message IDs
        """

Usage Examples

Basic Finding Processing

from prowler.lib.outputs.finding import Finding, generate_output
from prowler.lib.outputs.common import Status, fill_common_finding_data
from prowler.lib.check.models import CheckMetadata, Severity
from datetime import datetime

# Create a finding
finding = Finding(
    auth_method="iam_user_access_key",
    timestamp=datetime.utcnow(),
    account_uid="123456789012",
    account_name="production-account",
    region="us-east-1",
    finding_uid="prowler-finding-12345",
    provider="aws",
    check_metadata=check_metadata,
    status=Status.FAIL,
    resource_uid="arn:aws:iam::123456789012:user/test-user",
    resource_name="test-user",
    compliance={"CIS": ["1.4"], "NIST": ["AC-2"]},
    muted=False
)

# Fill common data from provider
finding = fill_common_finding_data(finding, provider, check_metadata)

# Generate output in multiple formats
json_file = generate_output([finding], "json", "/tmp/output")
csv_file = generate_output([finding], "csv", "/tmp/output")
html_file = generate_output([finding], "html", "/tmp/output")

Filtering and Processing Findings

from prowler.lib.outputs.common import Status

# Filter findings by status
failed_findings = [f for f in findings if f.status == Status.FAIL]
passed_findings = [f for f in findings if f.status == Status.PASS]

# Filter by severity
critical_findings = [
    f for f in findings 
    if f.check_metadata.Severity == Severity.critical
]

# Filter by compliance framework
cis_findings = [
    f for f in findings 
    if "CIS" in f.compliance
]

# Group findings by service
service_findings = {}
for finding in findings:
    service = finding.check_metadata.ServiceName
    if service not in service_findings:
        service_findings[service] = []
    service_findings[service].append(finding)

AWS Security Hub Integration

from prowler.lib.outputs.asff.asff import ASFF

# Generate ASFF output for Security Hub
asff_handler = ASFF(findings, "/tmp/security-hub-output")
asff_file = asff_handler.generate_output()

print(f"ASFF file generated: {asff_file}")

# The generated ASFF file can be imported into AWS Security Hub
# using the BatchImportFindings API

SIEM Integration with OCSF

from prowler.lib.outputs.ocsf.ocsf import OCSF

# Generate OCSF output for SIEM integration
ocsf_handler = OCSF(findings, "/tmp/siem-output")
ocsf_file = ocsf_handler.generate_output()

print(f"OCSF file generated: {ocsf_file}")

# The OCSF format provides standardized security events
# compatible with major SIEM platforms

Slack Notifications

from prowler.lib.outputs.slack.slack import Slack

# Filter critical and high severity findings for alerts
alert_findings = [
    f for f in findings
    if f.status == Status.FAIL and 
    f.check_metadata.Severity in [Severity.critical, Severity.high]
]

# Send to Slack
slack_handler = Slack(
    findings=alert_findings,
    slack_token="xoxb-your-token",
    channel="#security-alerts"
)

results = slack_handler.send_findings()
print(f"Sent {len(alert_findings)} findings to Slack")

Custom Finding Analysis

from collections import Counter
from datetime import datetime, timedelta

# Analyze findings by provider and region
provider_stats = Counter(f.provider for f in findings)
region_stats = Counter(f.region for f in findings)

# Analyze findings by severity
severity_stats = Counter(
    f.check_metadata.Severity.value for f in findings
)

# Find recent findings (last 24 hours)
recent_threshold = datetime.utcnow() - timedelta(hours=24)
recent_findings = [
    f for f in findings
    if f.timestamp > recent_threshold
]

# Generate summary report
summary = {
    "total_findings": len(findings),
    "failed_findings": len([f for f in findings if f.status == Status.FAIL]),
    "providers": dict(provider_stats),
    "regions": dict(region_stats),
    "severity_breakdown": dict(severity_stats),
    "recent_findings": len(recent_findings)
}

print(f"Assessment Summary: {summary}")

Compliance Reporting

# Generate compliance-specific reports
def generate_compliance_report(findings, framework):
    compliance_findings = [
        f for f in findings
        if framework in f.compliance
    ]
    
    # Group by compliance controls
    control_findings = {}
    for finding in compliance_findings:
        for control in finding.compliance[framework]:
            if control not in control_findings:
                control_findings[control] = []
            control_findings[control].append(finding)
    
    # Calculate pass/fail rates per control
    control_stats = {}
    for control, control_findings_list in control_findings.items():
        passed = len([f for f in control_findings_list if f.status == Status.PASS])
        failed = len([f for f in control_findings_list if f.status == Status.FAIL])
        control_stats[control] = {
            "passed": passed,
            "failed": failed,
            "total": passed + failed,
            "pass_rate": passed / (passed + failed) if (passed + failed) > 0 else 0
        }
    
    return control_stats

# Generate CIS compliance report
cis_report = generate_compliance_report(findings, "CIS")
print(f"CIS Compliance Report: {cis_report}")

Install with Tessl CLI

npx tessl i tessl/pypi-prowler

docs

check-management.md

check-models.md

cli-interface.md

configuration.md

finding-management.md

index.md

logging-utilities.md

provider-framework.md

tile.json