Open source cloud security assessment tool for AWS, Azure, GCP, and Kubernetes with hundreds of compliance checks.
—
Comprehensive finding representation and output generation supporting multiple formats including JSON, CSV, HTML, ASFF (AWS Security Finding Format), and OCSF (Open Cybersecurity Schema Framework). This module provides standardized finding processing, compliance reporting, and integration capabilities for security assessments.
Core model representing security assessment findings with comprehensive metadata and compliance information.
class Finding(BaseModel):
"""
Pydantic model representing a security finding across different providers.
This class encapsulates the details of a finding and supports
serialization to various formats such as CSV. It serves as the base
model for storing and managing finding information for every provider.
Attributes:
- auth_method: str - Authentication method used for the scan
- timestamp: Union[int, datetime] - Finding generation timestamp
- account_uid: str - Account/subscription unique identifier
- account_name: Optional[str] - Account/subscription name
- account_email: Optional[str] - Account contact email
- account_organization_uid: Optional[str] - Organization identifier
- account_organization_name: Optional[str] - Organization name
- metadata: CheckMetadata - Associated check metadata
- account_tags: dict - Account-level tags and metadata
- uid: str - Unique finding identifier
- status: Status - Finding status (PASS/FAIL/MANUAL)
- status_extended: str - Extended status information and details
- muted: bool - Whether finding is muted/suppressed
- resource_uid: str - Resource unique identifier (ARN, ID, etc.)
- resource_metadata: dict - Resource-specific metadata
- resource_name: str - Resource name or identifier
- resource_details: str - Additional resource details and context
- resource_tags: dict - Resource-level tags
- partition: Optional[str] - Cloud partition (aws, aws-cn, aws-us-gov, etc.)
- region: str - Cloud region where resource is located
- compliance: dict - Compliance framework mappings and requirements
- prowler_version: str - Version of Prowler that generated the finding
- raw: dict - Raw finding data for additional context
"""
auth_method: str
timestamp: Union[int, datetime]
account_uid: str
account_name: Optional[str] = None
account_email: Optional[str] = None
account_organization_uid: Optional[str] = None
account_organization_name: Optional[str] = None
metadata: CheckMetadata
account_tags: dict = Field(default_factory=dict)
uid: str
status: Status
status_extended: str
muted: bool = False
resource_uid: str
resource_metadata: dict = Field(default_factory=dict)
resource_name: str
resource_details: str
resource_tags: dict = Field(default_factory=dict)
partition: Optional[str] = None
region: str
compliance: dict = Field(default_factory=dict)
prowler_version: str = prowler_version
raw: dict = Field(default_factory=dict)
# Properties
@property
def provider(self) -> str:
"""
Returns the provider from the finding check's metadata.
Returns:
str: Provider name (aws, azure, gcp, etc.)
"""
@property
def check_id(self) -> str:
"""
Returns the ID from the finding check's metadata.
Returns:
str: Check unique identifier
"""
@property
def severity(self) -> str:
"""
Returns the severity from the finding check's metadata.
Returns:
str: Severity level (critical, high, medium, low, informational)
"""
@property
def resource_type(self) -> str:
"""
Returns the resource type from the finding check's metadata.
Returns:
str: Resource type being assessed
"""
@property
def service_name(self) -> str:
"""
Returns the service name from the finding check's metadata.
Returns:
str: Cloud service name
"""
# Instance Methods
def get_metadata(self) -> dict:
"""
Retrieves the metadata of the object and returns it as a dictionary with all keys in lowercase.
Returns:
dict: A dictionary containing the metadata with keys converted to lowercase
"""
# Class Methods
@classmethod
def generate_output(
cls,
provider: Provider,
check_output: Check_Report,
output_options
) -> "Finding":
"""
Generates the output for a finding based on the provider and output options.
Args:
- provider: Provider - The provider object
- check_output: Check_Report - The check output object
- output_options: Provider-specific output options object
Returns:
Finding: The finding output object
"""
@classmethod
def transform_api_finding(cls, finding, provider) -> "Finding":
"""
Transform a FindingModel instance into an API-friendly Finding object.
This class method extracts data from a FindingModel instance and maps its
properties to a new Finding object. The transformation populates various
fields including authentication details, timestamp, account information,
check metadata, as well as resource-specific data.
Args:
- finding: API Finding instance containing data from the database
- provider: Provider - The provider object
Returns:
Finding: A new Finding instance populated with data from the provided model
"""
@staticmethod
def _transform_findings_stats(scan_summaries: list[dict]) -> dict:
"""
Aggregate and transform scan summary data into findings statistics.
This function processes a list of scan summary objects and calculates overall
metrics such as the total number of passed and failed findings (including muted counts),
as well as a breakdown of results by severity levels.
Args:
- scan_summaries: list[dict] - A list of scan summary objects
Returns:
dict: A dictionary containing aggregated findings statistics with metrics like:
- total_pass: Total number of passed findings
- total_fail: Total number of failed findings
- total_muted_pass: Total number of muted passed findings
- total_muted_fail: Total number of muted failed findings
- resources_count: The unique resource count
- findings_count: Total number of findings
- Severity breakdowns (critical, high, medium, low)
- all_fails_are_muted: Boolean indicating if all failing findings are muted
"""Standardized status values for security findings.
class Status(str, Enum):
"""
Finding status enumeration.
Standardized status values for security check results
aligned with industry security assessment frameworks.
"""
PASS = "PASS" # Security check passed successfully
FAIL = "FAIL" # Security check failed, issue identified
MANUAL = "MANUAL" # Manual review required, cannot be automated
MUTED = "MUTED" # Finding has been muted/suppressedUtility functions for populating and processing finding data.
def fill_common_finding_data(
finding: Finding,
provider: Provider,
check_metadata: CheckMetadata
) -> Finding:
"""
Populate common finding fields from provider and metadata.
Fills standard finding attributes that are consistent across
all findings for a given provider and check execution.
Parameters:
- finding: Finding object to populate
- provider: Provider instance with account/session information
- check_metadata: CheckMetadata with check information
Returns:
Updated Finding object with common fields populated
"""Specialized classes for generating different output formats.
class CSV:
"""
CSV output format handler.
Generates comma-separated value files suitable for spreadsheet
analysis and data processing workflows.
"""
def __init__(self, findings: List[Finding], output_directory: str):
"""
Initialize CSV output handler.
Parameters:
- findings: List of findings to export
- output_directory: Target directory for CSV files
"""
def generate_output(self) -> str:
"""
Generate CSV output file.
Returns:
Path to generated CSV file
"""
class ASFF:
"""
AWS Security Finding Format (ASFF) output handler.
Generates ASFF-compliant JSON for integration with AWS Security Hub
and other AWS security services.
"""
def __init__(self, findings: List[Finding], output_directory: str):
"""
Initialize ASFF output handler.
Parameters:
- findings: List of findings to convert
- output_directory: Target directory for ASFF files
"""
def generate_output(self) -> str:
"""
Generate ASFF-compliant JSON output.
Returns:
Path to generated ASFF JSON file
"""
class OCSF:
"""
Open Cybersecurity Schema Framework (OCSF) output handler.
Generates OCSF-compliant JSON for standardized security
event representation and SIEM integration.
"""
def __init__(self, findings: List[Finding], output_directory: str):
"""
Initialize OCSF output handler.
Parameters:
- findings: List of findings to convert
- output_directory: Target directory for OCSF files
"""
def generate_output(self) -> str:
"""
Generate OCSF-compliant JSON output.
Returns:
Path to generated OCSF JSON file
"""
class HTML:
"""
HTML report output handler.
Generates comprehensive HTML reports with interactive features,
charts, and detailed finding analysis.
"""
def __init__(self, findings: List[Finding], output_directory: str):
"""
Initialize HTML output handler.
Parameters:
- findings: List of findings for report generation
- output_directory: Target directory for HTML files
"""
def generate_output(self) -> str:
"""
Generate interactive HTML report.
Returns:
Path to generated HTML report file
"""
class Slack:
"""
Slack integration output handler.
Sends finding summaries and alerts to Slack channels
for real-time security notifications.
"""
def __init__(
self,
findings: List[Finding],
slack_token: str,
channel: str
):
"""
Initialize Slack output handler.
Parameters:
- findings: List of findings to send
- slack_token: Slack API token
- channel: Target Slack channel
"""
def send_findings(self) -> dict:
"""
Send findings to Slack channel.
Returns:
Dictionary with send results and message IDs
"""from prowler.lib.outputs.finding import Finding, generate_output
from prowler.lib.outputs.common import Status, fill_common_finding_data
from prowler.lib.check.models import CheckMetadata, Severity
from datetime import datetime
# Create a finding
finding = Finding(
auth_method="iam_user_access_key",
timestamp=datetime.utcnow(),
account_uid="123456789012",
account_name="production-account",
region="us-east-1",
finding_uid="prowler-finding-12345",
provider="aws",
check_metadata=check_metadata,
status=Status.FAIL,
resource_uid="arn:aws:iam::123456789012:user/test-user",
resource_name="test-user",
compliance={"CIS": ["1.4"], "NIST": ["AC-2"]},
muted=False
)
# Fill common data from provider
finding = fill_common_finding_data(finding, provider, check_metadata)
# Generate output in multiple formats
json_file = generate_output([finding], "json", "/tmp/output")
csv_file = generate_output([finding], "csv", "/tmp/output")
html_file = generate_output([finding], "html", "/tmp/output")from prowler.lib.outputs.common import Status
# Filter findings by status
failed_findings = [f for f in findings if f.status == Status.FAIL]
passed_findings = [f for f in findings if f.status == Status.PASS]
# Filter by severity
critical_findings = [
f for f in findings
if f.check_metadata.Severity == Severity.critical
]
# Filter by compliance framework
cis_findings = [
f for f in findings
if "CIS" in f.compliance
]
# Group findings by service
service_findings = {}
for finding in findings:
service = finding.check_metadata.ServiceName
if service not in service_findings:
service_findings[service] = []
service_findings[service].append(finding)from prowler.lib.outputs.asff.asff import ASFF
# Generate ASFF output for Security Hub
asff_handler = ASFF(findings, "/tmp/security-hub-output")
asff_file = asff_handler.generate_output()
print(f"ASFF file generated: {asff_file}")
# The generated ASFF file can be imported into AWS Security Hub
# using the BatchImportFindings APIfrom prowler.lib.outputs.ocsf.ocsf import OCSF
# Generate OCSF output for SIEM integration
ocsf_handler = OCSF(findings, "/tmp/siem-output")
ocsf_file = ocsf_handler.generate_output()
print(f"OCSF file generated: {ocsf_file}")
# The OCSF format provides standardized security events
# compatible with major SIEM platformsfrom prowler.lib.outputs.slack.slack import Slack
# Filter critical and high severity findings for alerts
alert_findings = [
f for f in findings
if f.status == Status.FAIL and
f.check_metadata.Severity in [Severity.critical, Severity.high]
]
# Send to Slack
slack_handler = Slack(
findings=alert_findings,
slack_token="xoxb-your-token",
channel="#security-alerts"
)
results = slack_handler.send_findings()
print(f"Sent {len(alert_findings)} findings to Slack")from collections import Counter
from datetime import datetime, timedelta
# Analyze findings by provider and region
provider_stats = Counter(f.provider for f in findings)
region_stats = Counter(f.region for f in findings)
# Analyze findings by severity
severity_stats = Counter(
f.check_metadata.Severity.value for f in findings
)
# Find recent findings (last 24 hours)
recent_threshold = datetime.utcnow() - timedelta(hours=24)
recent_findings = [
f for f in findings
if f.timestamp > recent_threshold
]
# Generate summary report
summary = {
"total_findings": len(findings),
"failed_findings": len([f for f in findings if f.status == Status.FAIL]),
"providers": dict(provider_stats),
"regions": dict(region_stats),
"severity_breakdown": dict(severity_stats),
"recent_findings": len(recent_findings)
}
print(f"Assessment Summary: {summary}")# Generate compliance-specific reports
def generate_compliance_report(findings, framework):
compliance_findings = [
f for f in findings
if framework in f.compliance
]
# Group by compliance controls
control_findings = {}
for finding in compliance_findings:
for control in finding.compliance[framework]:
if control not in control_findings:
control_findings[control] = []
control_findings[control].append(finding)
# Calculate pass/fail rates per control
control_stats = {}
for control, control_findings_list in control_findings.items():
passed = len([f for f in control_findings_list if f.status == Status.PASS])
failed = len([f for f in control_findings_list if f.status == Status.FAIL])
control_stats[control] = {
"passed": passed,
"failed": failed,
"total": passed + failed,
"pass_rate": passed / (passed + failed) if (passed + failed) > 0 else 0
}
return control_stats
# Generate CIS compliance report
cis_report = generate_compliance_report(findings, "CIS")
print(f"CIS Compliance Report: {cis_report}")Install with Tessl CLI
npx tessl i tessl/pypi-prowler