0
# Finding Management and Output
1
2
Comprehensive finding representation and output generation supporting multiple formats including JSON, CSV, HTML, ASFF (AWS Security Finding Format), and OCSF (Open Cybersecurity Schema Framework). This module provides standardized finding processing, compliance reporting, and integration capabilities for security assessments.
3
4
## Capabilities
5
6
### Finding Data Model
7
8
Core model representing security assessment findings with comprehensive metadata and compliance information.
9
10
```python { .api }
11
class Finding(BaseModel):
12
"""
13
Pydantic model representing a security finding across different providers.
14
15
This class encapsulates the details of a finding and supports
16
serialization to various formats such as CSV. It serves as the base
17
model for storing and managing finding information for every provider.
18
19
Attributes:
20
- auth_method: str - Authentication method used for the scan
21
- timestamp: Union[int, datetime] - Finding generation timestamp
22
- account_uid: str - Account/subscription unique identifier
23
- account_name: Optional[str] - Account/subscription name
24
- account_email: Optional[str] - Account contact email
25
- account_organization_uid: Optional[str] - Organization identifier
26
- account_organization_name: Optional[str] - Organization name
27
- metadata: CheckMetadata - Associated check metadata
28
- account_tags: dict - Account-level tags and metadata
29
- uid: str - Unique finding identifier
30
- status: Status - Finding status (PASS/FAIL/MANUAL)
31
- status_extended: str - Extended status information and details
32
- muted: bool - Whether finding is muted/suppressed
33
- resource_uid: str - Resource unique identifier (ARN, ID, etc.)
34
- resource_metadata: dict - Resource-specific metadata
35
- resource_name: str - Resource name or identifier
36
- resource_details: str - Additional resource details and context
37
- resource_tags: dict - Resource-level tags
38
- partition: Optional[str] - Cloud partition (aws, aws-cn, aws-us-gov, etc.)
39
- region: str - Cloud region where resource is located
40
- compliance: dict - Compliance framework mappings and requirements
41
- prowler_version: str - Version of Prowler that generated the finding
42
- raw: dict - Raw finding data for additional context
43
"""
44
45
auth_method: str
46
timestamp: Union[int, datetime]
47
account_uid: str
48
account_name: Optional[str] = None
49
account_email: Optional[str] = None
50
account_organization_uid: Optional[str] = None
51
account_organization_name: Optional[str] = None
52
metadata: CheckMetadata
53
account_tags: dict = Field(default_factory=dict)
54
uid: str
55
status: Status
56
status_extended: str
57
muted: bool = False
58
resource_uid: str
59
resource_metadata: dict = Field(default_factory=dict)
60
resource_name: str
61
resource_details: str
62
resource_tags: dict = Field(default_factory=dict)
63
partition: Optional[str] = None
64
region: str
65
compliance: dict = Field(default_factory=dict)
66
prowler_version: str = prowler_version
67
raw: dict = Field(default_factory=dict)
68
69
# Properties
70
@property
71
def provider(self) -> str:
72
"""
73
Returns the provider from the finding check's metadata.
74
75
Returns:
76
str: Provider name (aws, azure, gcp, etc.)
77
"""
78
79
@property
80
def check_id(self) -> str:
81
"""
82
Returns the ID from the finding check's metadata.
83
84
Returns:
85
str: Check unique identifier
86
"""
87
88
@property
89
def severity(self) -> str:
90
"""
91
Returns the severity from the finding check's metadata.
92
93
Returns:
94
str: Severity level (critical, high, medium, low, informational)
95
"""
96
97
@property
98
def resource_type(self) -> str:
99
"""
100
Returns the resource type from the finding check's metadata.
101
102
Returns:
103
str: Resource type being assessed
104
"""
105
106
@property
107
def service_name(self) -> str:
108
"""
109
Returns the service name from the finding check's metadata.
110
111
Returns:
112
str: Cloud service name
113
"""
114
115
# Instance Methods
116
def get_metadata(self) -> dict:
117
"""
118
Retrieves the metadata of the object and returns it as a dictionary with all keys in lowercase.
119
120
Returns:
121
dict: A dictionary containing the metadata with keys converted to lowercase
122
"""
123
124
# Class Methods
125
@classmethod
126
def generate_output(
127
cls,
128
provider: Provider,
129
check_output: Check_Report,
130
output_options
131
) -> "Finding":
132
"""
133
Generates the output for a finding based on the provider and output options.
134
135
Args:
136
- provider: Provider - The provider object
137
- check_output: Check_Report - The check output object
138
- output_options: Provider-specific output options object
139
140
Returns:
141
Finding: The finding output object
142
"""
143
144
@classmethod
145
def transform_api_finding(cls, finding, provider) -> "Finding":
146
"""
147
Transform a FindingModel instance into an API-friendly Finding object.
148
149
This class method extracts data from a FindingModel instance and maps its
150
properties to a new Finding object. The transformation populates various
151
fields including authentication details, timestamp, account information,
152
check metadata, as well as resource-specific data.
153
154
Args:
155
- finding: API Finding instance containing data from the database
156
- provider: Provider - The provider object
157
158
Returns:
159
Finding: A new Finding instance populated with data from the provided model
160
"""
161
162
@staticmethod
163
def _transform_findings_stats(scan_summaries: list[dict]) -> dict:
164
"""
165
Aggregate and transform scan summary data into findings statistics.
166
167
This function processes a list of scan summary objects and calculates overall
168
metrics such as the total number of passed and failed findings (including muted counts),
169
as well as a breakdown of results by severity levels.
170
171
Args:
172
- scan_summaries: list[dict] - A list of scan summary objects
173
174
Returns:
175
dict: A dictionary containing aggregated findings statistics with metrics like:
176
- total_pass: Total number of passed findings
177
- total_fail: Total number of failed findings
178
- total_muted_pass: Total number of muted passed findings
179
- total_muted_fail: Total number of muted failed findings
180
- resources_count: The unique resource count
181
- findings_count: Total number of findings
182
- Severity breakdowns (critical, high, medium, low)
183
- all_fails_are_muted: Boolean indicating if all failing findings are muted
184
"""
185
```
186
187
### Finding Status Enumeration
188
189
Standardized status values for security findings.
190
191
```python { .api }
192
class Status(str, Enum):
193
"""
194
Finding status enumeration.
195
196
Standardized status values for security check results
197
aligned with industry security assessment frameworks.
198
"""
199
200
PASS = "PASS" # Security check passed successfully
201
FAIL = "FAIL" # Security check failed, issue identified
202
MANUAL = "MANUAL" # Manual review required, cannot be automated
203
MUTED = "MUTED" # Finding has been muted/suppressed
204
```
205
206
### Common Finding Functions
207
208
Utility functions for populating and processing finding data.
209
210
```python { .api }
211
def fill_common_finding_data(
212
finding: Finding,
213
provider: Provider,
214
check_metadata: CheckMetadata
215
) -> Finding:
216
"""
217
Populate common finding fields from provider and metadata.
218
219
Fills standard finding attributes that are consistent across
220
all findings for a given provider and check execution.
221
222
Parameters:
223
- finding: Finding object to populate
224
- provider: Provider instance with account/session information
225
- check_metadata: CheckMetadata with check information
226
227
Returns:
228
Updated Finding object with common fields populated
229
"""
230
```
231
232
### Output Format Handlers
233
234
Specialized classes for generating different output formats.
235
236
```python { .api }
237
class CSV:
238
"""
239
CSV output format handler.
240
241
Generates comma-separated value files suitable for spreadsheet
242
analysis and data processing workflows.
243
"""
244
245
def __init__(self, findings: List[Finding], output_directory: str):
246
"""
247
Initialize CSV output handler.
248
249
Parameters:
250
- findings: List of findings to export
251
- output_directory: Target directory for CSV files
252
"""
253
254
def generate_output(self) -> str:
255
"""
256
Generate CSV output file.
257
258
Returns:
259
Path to generated CSV file
260
"""
261
262
class ASFF:
263
"""
264
AWS Security Finding Format (ASFF) output handler.
265
266
Generates ASFF-compliant JSON for integration with AWS Security Hub
267
and other AWS security services.
268
"""
269
270
def __init__(self, findings: List[Finding], output_directory: str):
271
"""
272
Initialize ASFF output handler.
273
274
Parameters:
275
- findings: List of findings to convert
276
- output_directory: Target directory for ASFF files
277
"""
278
279
def generate_output(self) -> str:
280
"""
281
Generate ASFF-compliant JSON output.
282
283
Returns:
284
Path to generated ASFF JSON file
285
"""
286
287
class OCSF:
288
"""
289
Open Cybersecurity Schema Framework (OCSF) output handler.
290
291
Generates OCSF-compliant JSON for standardized security
292
event representation and SIEM integration.
293
"""
294
295
def __init__(self, findings: List[Finding], output_directory: str):
296
"""
297
Initialize OCSF output handler.
298
299
Parameters:
300
- findings: List of findings to convert
301
- output_directory: Target directory for OCSF files
302
"""
303
304
def generate_output(self) -> str:
305
"""
306
Generate OCSF-compliant JSON output.
307
308
Returns:
309
Path to generated OCSF JSON file
310
"""
311
312
class HTML:
313
"""
314
HTML report output handler.
315
316
Generates comprehensive HTML reports with interactive features,
317
charts, and detailed finding analysis.
318
"""
319
320
def __init__(self, findings: List[Finding], output_directory: str):
321
"""
322
Initialize HTML output handler.
323
324
Parameters:
325
- findings: List of findings for report generation
326
- output_directory: Target directory for HTML files
327
"""
328
329
def generate_output(self) -> str:
330
"""
331
Generate interactive HTML report.
332
333
Returns:
334
Path to generated HTML report file
335
"""
336
337
class Slack:
338
"""
339
Slack integration output handler.
340
341
Sends finding summaries and alerts to Slack channels
342
for real-time security notifications.
343
"""
344
345
def __init__(
346
self,
347
findings: List[Finding],
348
slack_token: str,
349
channel: str
350
):
351
"""
352
Initialize Slack output handler.
353
354
Parameters:
355
- findings: List of findings to send
356
- slack_token: Slack API token
357
- channel: Target Slack channel
358
"""
359
360
def send_findings(self) -> dict:
361
"""
362
Send findings to Slack channel.
363
364
Returns:
365
Dictionary with send results and message IDs
366
"""
367
```
368
369
## Usage Examples
370
371
### Basic Finding Processing
372
373
```python
374
from prowler.lib.outputs.finding import Finding, generate_output
375
from prowler.lib.outputs.common import Status, fill_common_finding_data
376
from prowler.lib.check.models import CheckMetadata, Severity
377
from datetime import datetime
378
379
# Create a finding
380
finding = Finding(
381
auth_method="iam_user_access_key",
382
timestamp=datetime.utcnow(),
383
account_uid="123456789012",
384
account_name="production-account",
385
region="us-east-1",
386
finding_uid="prowler-finding-12345",
387
provider="aws",
388
check_metadata=check_metadata,
389
status=Status.FAIL,
390
resource_uid="arn:aws:iam::123456789012:user/test-user",
391
resource_name="test-user",
392
compliance={"CIS": ["1.4"], "NIST": ["AC-2"]},
393
muted=False
394
)
395
396
# Fill common data from provider
397
finding = fill_common_finding_data(finding, provider, check_metadata)
398
399
# Generate output in multiple formats
400
json_file = generate_output([finding], "json", "/tmp/output")
401
csv_file = generate_output([finding], "csv", "/tmp/output")
402
html_file = generate_output([finding], "html", "/tmp/output")
403
```
404
405
### Filtering and Processing Findings
406
407
```python
408
from prowler.lib.outputs.common import Status
409
410
# Filter findings by status
411
failed_findings = [f for f in findings if f.status == Status.FAIL]
412
passed_findings = [f for f in findings if f.status == Status.PASS]
413
414
# Filter by severity
415
critical_findings = [
416
f for f in findings
417
if f.check_metadata.Severity == Severity.critical
418
]
419
420
# Filter by compliance framework
421
cis_findings = [
422
f for f in findings
423
if "CIS" in f.compliance
424
]
425
426
# Group findings by service
427
service_findings = {}
428
for finding in findings:
429
service = finding.check_metadata.ServiceName
430
if service not in service_findings:
431
service_findings[service] = []
432
service_findings[service].append(finding)
433
```
434
435
### AWS Security Hub Integration
436
437
```python
438
from prowler.lib.outputs.asff.asff import ASFF
439
440
# Generate ASFF output for Security Hub
441
asff_handler = ASFF(findings, "/tmp/security-hub-output")
442
asff_file = asff_handler.generate_output()
443
444
print(f"ASFF file generated: {asff_file}")
445
446
# The generated ASFF file can be imported into AWS Security Hub
447
# using the BatchImportFindings API
448
```
449
450
### SIEM Integration with OCSF
451
452
```python
453
from prowler.lib.outputs.ocsf.ocsf import OCSF
454
455
# Generate OCSF output for SIEM integration
456
ocsf_handler = OCSF(findings, "/tmp/siem-output")
457
ocsf_file = ocsf_handler.generate_output()
458
459
print(f"OCSF file generated: {ocsf_file}")
460
461
# The OCSF format provides standardized security events
462
# compatible with major SIEM platforms
463
```
464
465
### Slack Notifications
466
467
```python
468
from prowler.lib.outputs.slack.slack import Slack
469
470
# Filter critical and high severity findings for alerts
471
alert_findings = [
472
f for f in findings
473
if f.status == Status.FAIL and
474
f.check_metadata.Severity in [Severity.critical, Severity.high]
475
]
476
477
# Send to Slack
478
slack_handler = Slack(
479
findings=alert_findings,
480
slack_token="xoxb-your-token",
481
channel="#security-alerts"
482
)
483
484
results = slack_handler.send_findings()
485
print(f"Sent {len(alert_findings)} findings to Slack")
486
```
487
488
### Custom Finding Analysis
489
490
```python
491
from collections import Counter
492
from datetime import datetime, timedelta
493
494
# Analyze findings by provider and region
495
provider_stats = Counter(f.provider for f in findings)
496
region_stats = Counter(f.region for f in findings)
497
498
# Analyze findings by severity
499
severity_stats = Counter(
500
f.check_metadata.Severity.value for f in findings
501
)
502
503
# Find recent findings (last 24 hours)
504
recent_threshold = datetime.utcnow() - timedelta(hours=24)
505
recent_findings = [
506
f for f in findings
507
if f.timestamp > recent_threshold
508
]
509
510
# Generate summary report
511
summary = {
512
"total_findings": len(findings),
513
"failed_findings": len([f for f in findings if f.status == Status.FAIL]),
514
"providers": dict(provider_stats),
515
"regions": dict(region_stats),
516
"severity_breakdown": dict(severity_stats),
517
"recent_findings": len(recent_findings)
518
}
519
520
print(f"Assessment Summary: {summary}")
521
```
522
523
### Compliance Reporting
524
525
```python
526
# Generate compliance-specific reports
527
def generate_compliance_report(findings, framework):
528
compliance_findings = [
529
f for f in findings
530
if framework in f.compliance
531
]
532
533
# Group by compliance controls
534
control_findings = {}
535
for finding in compliance_findings:
536
for control in finding.compliance[framework]:
537
if control not in control_findings:
538
control_findings[control] = []
539
control_findings[control].append(finding)
540
541
# Calculate pass/fail rates per control
542
control_stats = {}
543
for control, control_findings_list in control_findings.items():
544
passed = len([f for f in control_findings_list if f.status == Status.PASS])
545
failed = len([f for f in control_findings_list if f.status == Status.FAIL])
546
control_stats[control] = {
547
"passed": passed,
548
"failed": failed,
549
"total": passed + failed,
550
"pass_rate": passed / (passed + failed) if (passed + failed) > 0 else 0
551
}
552
553
return control_stats
554
555
# Generate CIS compliance report
556
cis_report = generate_compliance_report(findings, "CIS")
557
print(f"CIS Compliance Report: {cis_report}")
558
```