0
# Programmatic API
1
2
Safety CLI provides comprehensive programmatic access to vulnerability scanning, authentication, and security analysis capabilities. This enables developers to integrate Safety's security features directly into applications, CI/CD pipelines, and custom tooling.
3
4
## Core Programmatic Interfaces
5
6
### Main CLI Entry Point { .api }
7
8
**Import Statements:**
9
10
```python
11
from safety.cli import cli
12
from safety.scan.main import process_files
13
from safety.scan.finder import FileFinder
14
from safety.auth.cli_utils import build_client_session
15
from safety.formatter import SafetyFormatter
16
from safety.models import (
17
Vulnerability, Package, SafetyRequirement,
18
CVE, Severity, Fix, SafetyEncoder
19
)
20
```
21
22
#### CLI Function Access { .api }
23
24
**Description**: Direct access to CLI functionality through Python functions.
25
26
```python
27
from safety.cli import cli
28
import click
29
from click.testing import CliRunner
30
31
def run_safety_command(args: List[str]) -> click.testing.Result:
32
"""
33
Execute Safety CLI command programmatically.
34
35
Args:
36
args (List[str]): Command line arguments
37
38
Returns:
39
click.testing.Result: Command execution result with output and exit code
40
"""
41
runner = CliRunner()
42
return runner.invoke(cli, args)
43
44
# Example usage
45
result = run_safety_command(['scan', '--output', 'json'])
46
if result.exit_code == 0:
47
print("Scan completed successfully")
48
scan_data = json.loads(result.output)
49
else:
50
print(f"Scan failed with exit code: {result.exit_code}")
51
print(f"Error: {result.output}")
52
```
53
54
### Vulnerability Scanning API { .api }
55
56
#### Direct Scanning Interface { .api }
57
58
```python
59
from safety.scan.main import process_files
60
from safety.scan.finder import FileFinder
61
from safety_schemas.models import Ecosystem, ConfigModel
62
from pathlib import Path
63
from typing import Dict, Set, List, Tuple
64
65
def scan_project_programmatically(
66
target_path: Path,
67
ecosystems: List[Ecosystem] = None,
68
config: ConfigModel = None,
69
max_depth: int = 10,
70
exclude_patterns: List[str] = None
71
) -> List[Tuple[Path, 'InspectableFile']]:
72
"""
73
Perform programmatic vulnerability scan on project.
74
75
Args:
76
target_path (Path): Root directory to scan
77
ecosystems (List[Ecosystem]): Target ecosystems (default: [Python])
78
config (ConfigModel): Scan configuration
79
max_depth (int): Maximum directory traversal depth
80
exclude_patterns (List[str]): File/directory exclusion patterns
81
82
Returns:
83
List[Tuple[Path, InspectableFile]]: Discovered files and analysis results
84
85
Raises:
86
SafetyError: If scan fails or configuration is invalid
87
"""
88
89
# Set defaults
90
if ecosystems is None:
91
ecosystems = [Ecosystem.PYTHON]
92
if config is None:
93
config = ConfigModel()
94
if exclude_patterns is None:
95
exclude_patterns = ['.git', '__pycache__', '.venv', 'node_modules']
96
97
# Initialize file finder
98
finder = FileFinder(
99
max_level=max_depth,
100
ecosystems=ecosystems,
101
target=target_path,
102
exclude=exclude_patterns
103
)
104
105
# Discover dependency files
106
_, discovered_files = finder.process_directory(str(target_path))
107
108
# Process and analyze files
109
results = []
110
for file_path, inspectable_file in process_files(
111
paths=discovered_files,
112
config=config,
113
target=target_path
114
):
115
results.append((file_path, inspectable_file))
116
117
return results
118
119
# Example usage
120
from safety_schemas.models import ConfigModel, Ecosystem
121
from pathlib import Path
122
123
# Configure scan
124
config = ConfigModel(
125
ignore_cvss_severity_below=7,
126
detailed_output=True
127
)
128
129
# Scan project
130
results = scan_project_programmatically(
131
target_path=Path("./my-project"),
132
ecosystems=[Ecosystem.PYTHON],
133
config=config,
134
exclude_patterns=['.git', '.venv', 'tests']
135
)
136
137
# Process results
138
vulnerabilities = []
139
for file_path, inspectable_file in results:
140
analysis = inspectable_file.inspect(config=config)
141
if analysis.vulnerabilities:
142
vulnerabilities.extend(analysis.vulnerabilities)
143
144
print(f"Found {len(vulnerabilities)} vulnerabilities across {len(results)} files")
145
```
146
147
#### Vulnerability Analysis { .api }
148
149
```python
150
from safety.models import Vulnerability, Package
151
from typing import List, Dict, Any
152
153
def analyze_vulnerabilities(
154
vulnerabilities: List[Vulnerability],
155
severity_threshold: float = 7.0
156
) -> Dict[str, Any]:
157
"""
158
Analyze vulnerability scan results with categorization and statistics.
159
160
Args:
161
vulnerabilities (List[Vulnerability]): Found vulnerabilities
162
severity_threshold (float): CVSS threshold for high-priority issues
163
164
Returns:
165
Dict[str, Any]: Analysis summary with statistics and categorization
166
"""
167
168
analysis = {
169
'total_vulnerabilities': len(vulnerabilities),
170
'by_severity': {'critical': 0, 'high': 0, 'medium': 0, 'low': 0, 'unknown': 0},
171
'by_package': {},
172
'ignored_count': 0,
173
'high_priority': [],
174
'remediation_available': 0,
175
'transitive_dependencies': 0
176
}
177
178
for vuln in vulnerabilities:
179
# Count ignored vulnerabilities
180
if vuln.ignored:
181
analysis['ignored_count'] += 1
182
continue
183
184
# Categorize by severity
185
if vuln.severity and vuln.severity.cvssv3:
186
score = vuln.severity.cvssv3
187
if score >= 9.0:
188
analysis['by_severity']['critical'] += 1
189
elif score >= 7.0:
190
analysis['by_severity']['high'] += 1
191
elif score >= 4.0:
192
analysis['by_severity']['medium'] += 1
193
else:
194
analysis['by_severity']['low'] += 1
195
196
# High priority vulnerabilities
197
if score >= severity_threshold:
198
analysis['high_priority'].append({
199
'id': vuln.vulnerability_id,
200
'package': vuln.package_name,
201
'version': vuln.analyzed_version,
202
'cvss': score,
203
'advisory': vuln.get_advisory()
204
})
205
else:
206
analysis['by_severity']['unknown'] += 1
207
208
# Count by package
209
pkg_name = vuln.package_name
210
if pkg_name not in analysis['by_package']:
211
analysis['by_package'][pkg_name] = 0
212
analysis['by_package'][pkg_name] += 1
213
214
# Remediation availability
215
if vuln.fixed_versions:
216
analysis['remediation_available'] += 1
217
218
# Transitive dependency tracking
219
if vuln.is_transitive:
220
analysis['transitive_dependencies'] += 1
221
222
return analysis
223
224
# Example usage
225
vulnerabilities = [] # From scan results
226
analysis = analyze_vulnerabilities(vulnerabilities, severity_threshold=7.0)
227
228
print(f"Security Analysis Summary:")
229
print(f"Total vulnerabilities: {analysis['total_vulnerabilities']}")
230
print(f"High priority issues: {len(analysis['high_priority'])}")
231
print(f"Critical: {analysis['by_severity']['critical']}")
232
print(f"High: {analysis['by_severity']['high']}")
233
print(f"Remediation available: {analysis['remediation_available']}")
234
```
235
236
### Authentication API { .api }
237
238
#### Programmatic Authentication { .api }
239
240
```python
241
from safety.auth.cli_utils import build_client_session
242
from safety.auth.models import Auth, Organization
243
from safety.auth.utils import SafetyAuthSession
244
from safety_schemas.models import Stage
245
246
def create_authenticated_session(
247
api_key: str = None,
248
organization_id: str = None,
249
stage: Stage = Stage.PRODUCTION,
250
proxy_config: Dict[str, Any] = None
251
) -> SafetyAuthSession:
252
"""
253
Create authenticated Safety session for programmatic access.
254
255
Args:
256
api_key (str): Safety API key (overrides environment/stored credentials)
257
organization_id (str): Target organization ID
258
stage (Stage): Environment stage
259
proxy_config (Dict): Proxy configuration
260
261
Returns:
262
SafetyAuthSession: Authenticated session for API calls
263
264
Raises:
265
InvalidCredentialError: If authentication fails
266
"""
267
268
# Build session with configuration
269
session_kwargs = {}
270
271
if api_key:
272
session_kwargs['api_key'] = api_key
273
274
if organization_id:
275
org = Organization(id=organization_id, name="")
276
session_kwargs['organization'] = org
277
278
session_kwargs['stage'] = stage
279
280
# Add proxy configuration
281
if proxy_config:
282
session_kwargs.update(proxy_config)
283
284
# Create authenticated session
285
session = build_client_session(**session_kwargs)
286
287
# Verify authentication
288
if not session.is_using_auth_credentials():
289
raise ValueError("No authentication credentials configured")
290
291
return session
292
293
def get_user_profile(session: SafetyAuthSession) -> Dict[str, Any]:
294
"""
295
Get authenticated user profile information.
296
297
Args:
298
session (SafetyAuthSession): Authenticated session
299
300
Returns:
301
Dict[str, Any]: User profile data
302
"""
303
response = session.get("/user/profile")
304
response.raise_for_status()
305
return response.json()
306
307
def list_organizations(session: SafetyAuthSession) -> List[Dict[str, Any]]:
308
"""
309
List organizations accessible to authenticated user.
310
311
Args:
312
session (SafetyAuthSession): Authenticated session
313
314
Returns:
315
List[Dict[str, Any]]: Organization list
316
"""
317
response = session.get("/organizations")
318
response.raise_for_status()
319
return response.json()
320
321
# Example usage
322
import os
323
324
# Create authenticated session
325
session = create_authenticated_session(
326
api_key=os.getenv("SAFETY_API_KEY"),
327
stage=Stage.PRODUCTION
328
)
329
330
# Get user information
331
try:
332
profile = get_user_profile(session)
333
print(f"Authenticated as: {profile.get('email')}")
334
335
orgs = list_organizations(session)
336
print(f"Available organizations: {len(orgs)}")
337
338
except Exception as e:
339
print(f"Authentication failed: {e}")
340
```
341
342
### Report Generation API { .api }
343
344
#### Programmatic Report Creation { .api }
345
346
```python
347
from safety.formatter import SafetyFormatter
348
from safety.models import SafetyEncoder
349
import json
350
from typing import Dict, List, Any
351
352
def generate_security_report(
353
vulnerabilities: List[Vulnerability],
354
packages: List[Package],
355
output_format: str = 'json',
356
detailed: bool = True,
357
save_path: str = None
358
) -> str:
359
"""
360
Generate security report in specified format.
361
362
Args:
363
vulnerabilities (List[Vulnerability]): Scan results
364
packages (List[Package]): Analyzed packages
365
output_format (str): Output format (json, html, text, screen)
366
detailed (bool): Include detailed vulnerability information
367
save_path (str): Optional file path to save report
368
369
Returns:
370
str: Generated report content
371
"""
372
373
# Prepare data for formatter
374
announcements = [] # Platform announcements
375
remediations = {} # Remediation suggestions
376
fixes = () # Applied fixes
377
378
# Create formatter
379
formatter = SafetyFormatter(output=output_format)
380
381
# Generate report
382
report_content = formatter.render_vulnerabilities(
383
announcements=announcements,
384
vulnerabilities=vulnerabilities,
385
remediations=remediations,
386
full=detailed,
387
packages=packages,
388
fixes=fixes
389
)
390
391
# Save to file if path provided
392
if save_path:
393
with open(save_path, 'w', encoding='utf-8') as f:
394
f.write(report_content)
395
396
return report_content
397
398
def create_custom_report(
399
scan_results: Dict[str, Any],
400
template: Dict[str, Any] = None
401
) -> Dict[str, Any]:
402
"""
403
Create custom report format with additional metadata.
404
405
Args:
406
scan_results (Dict[str, Any]): Raw scan results
407
template (Dict[str, Any]): Custom report template
408
409
Returns:
410
Dict[str, Any]: Custom formatted report
411
"""
412
413
from datetime import datetime
414
415
# Default template structure
416
if template is None:
417
template = {
418
'report_metadata': {
419
'generated_at': datetime.now().isoformat(),
420
'safety_version': '3.6.1',
421
'report_format_version': '1.0'
422
},
423
'scan_summary': {},
424
'findings': {},
425
'recommendations': []
426
}
427
428
# Populate template with scan data
429
report = template.copy()
430
431
# Add scan summary
432
vulnerabilities = scan_results.get('vulnerabilities', [])
433
report['scan_summary'] = {
434
'total_packages': len(scan_results.get('packages', [])),
435
'total_vulnerabilities': len(vulnerabilities),
436
'critical_vulnerabilities': len([v for v in vulnerabilities
437
if v.get('severity', {}).get('cvssv3', 0) >= 9.0]),
438
'high_vulnerabilities': len([v for v in vulnerabilities
439
if 7.0 <= v.get('severity', {}).get('cvssv3', 0) < 9.0])
440
}
441
442
# Add findings
443
report['findings'] = {
444
'vulnerabilities': vulnerabilities,
445
'packages': scan_results.get('packages', []),
446
'ignored_vulnerabilities': [v for v in vulnerabilities if v.get('ignored', False)]
447
}
448
449
# Add recommendations
450
report['recommendations'] = generate_recommendations(vulnerabilities)
451
452
return report
453
454
def generate_recommendations(vulnerabilities: List[Dict]) -> List[Dict[str, str]]:
455
"""Generate actionable recommendations based on vulnerabilities."""
456
457
recommendations = []
458
459
# High severity recommendations
460
high_severity = [v for v in vulnerabilities
461
if v.get('severity', {}).get('cvssv3', 0) >= 7.0]
462
463
if high_severity:
464
recommendations.append({
465
'priority': 'high',
466
'action': 'Update critical dependencies',
467
'description': f'Update {len(high_severity)} high-severity vulnerable packages immediately'
468
})
469
470
# Package-specific recommendations
471
package_counts = {}
472
for vuln in vulnerabilities:
473
pkg = vuln.get('package_name')
474
if pkg:
475
package_counts[pkg] = package_counts.get(pkg, 0) + 1
476
477
for pkg, count in package_counts.items():
478
if count > 1:
479
recommendations.append({
480
'priority': 'medium',
481
'action': f'Review {pkg} package',
482
'description': f'Package {pkg} has {count} vulnerabilities - consider alternatives'
483
})
484
485
return recommendations
486
487
# Example usage
488
# Generate JSON report
489
json_report = generate_security_report(
490
vulnerabilities=vulnerabilities,
491
packages=packages,
492
output_format='json',
493
detailed=True,
494
save_path='security_report.json'
495
)
496
497
# Generate HTML report for web viewing
498
html_report = generate_security_report(
499
vulnerabilities=vulnerabilities,
500
packages=packages,
501
output_format='html',
502
save_path='security_report.html'
503
)
504
505
# Create custom executive summary
506
executive_report = create_custom_report({
507
'vulnerabilities': vulnerabilities,
508
'packages': packages
509
})
510
511
print(json.dumps(executive_report, cls=SafetyEncoder, indent=2))
512
```
513
514
### Configuration API { .api }
515
516
#### Dynamic Configuration Management { .api }
517
518
```python
519
from safety_schemas.models import ConfigModel
520
from pathlib import Path
521
from typing import Dict, Any
522
523
def create_scan_configuration(
524
severity_threshold: float = 7.0,
525
ignore_unpinned: bool = True,
526
auto_fix: bool = False,
527
max_fixes: int = 5,
528
continue_on_error: bool = False,
529
exclude_patterns: List[str] = None
530
) -> ConfigModel:
531
"""
532
Create scan configuration programmatically.
533
534
Args:
535
severity_threshold (float): Minimum CVSS score to report
536
ignore_unpinned (bool): Skip unpinned requirements
537
auto_fix (bool): Enable automatic remediation
538
max_fixes (int): Maximum number of fixes to apply
539
continue_on_error (bool): Continue scan despite errors
540
exclude_patterns (List[str]): File exclusion patterns
541
542
Returns:
543
ConfigModel: Configured scan settings
544
"""
545
546
config = ConfigModel(
547
ignore_cvss_severity_below=severity_threshold,
548
ignore_unpinned_requirements=ignore_unpinned,
549
continue_on_vulnerability_error=continue_on_error,
550
auto_remediation_enabled=auto_fix,
551
auto_remediation_limit=max_fixes if auto_fix else None
552
)
553
554
if exclude_patterns:
555
config.exclude_patterns = exclude_patterns
556
557
return config
558
559
def load_configuration_from_dict(config_dict: Dict[str, Any]) -> ConfigModel:
560
"""
561
Load configuration from dictionary (e.g., from JSON/YAML).
562
563
Args:
564
config_dict (Dict[str, Any]): Configuration dictionary
565
566
Returns:
567
ConfigModel: Parsed configuration
568
"""
569
570
# Extract security settings
571
security_config = config_dict.get('security', {})
572
573
config = ConfigModel(
574
ignore_cvss_severity_below=security_config.get('ignore-cvss-severity-below', 0),
575
ignore_unpinned_requirements=security_config.get('ignore-unpinned-requirements', False),
576
continue_on_vulnerability_error=security_config.get('continue-on-vulnerability-error', False),
577
ignore_cvss_unknown_severity=security_config.get('ignore-cvss-unknown-severity', False)
578
)
579
580
# Add specific vulnerability ignores
581
ignore_vulns = security_config.get('ignore-vulnerabilities', {})
582
if ignore_vulns:
583
config.ignore_vulnerabilities = list(ignore_vulns.keys())
584
585
return config
586
587
# Example usage
588
# Create configuration for CI/CD
589
ci_config = create_scan_configuration(
590
severity_threshold=7.0,
591
ignore_unpinned=True,
592
continue_on_error=True, # Don't fail builds
593
exclude_patterns=['tests/', '.venv/', 'docs/']
594
)
595
596
# Create configuration for production deployment
597
prod_config = create_scan_configuration(
598
severity_threshold=4.0, # Stricter threshold
599
ignore_unpinned=False, # Require pinned versions
600
auto_fix=True, # Enable auto-remediation
601
max_fixes=10,
602
continue_on_error=False # Fail on any vulnerabilities
603
)
604
605
# Load from external configuration
606
external_config = {
607
'security': {
608
'ignore-cvss-severity-below': 6,
609
'ignore-unpinned-requirements': True,
610
'continue-on-vulnerability-error': False,
611
'ignore-vulnerabilities': {
612
'12345': {'reason': 'False positive', 'expires': '2024-12-31'},
613
'67890': {'reason': 'Mitigated by firewall'}
614
}
615
}
616
}
617
618
loaded_config = load_configuration_from_dict(external_config)
619
```
620
621
## Integration Examples
622
623
### FastAPI Integration { .api }
624
625
```python
626
from fastapi import FastAPI, HTTPException, BackgroundTasks
627
from pydantic import BaseModel
628
from typing import List, Dict, Any
629
import asyncio
630
from pathlib import Path
631
632
app = FastAPI(title="Security Scanning API")
633
634
class ScanRequest(BaseModel):
635
project_path: str
636
severity_threshold: float = 7.0
637
output_format: str = 'json'
638
639
class ScanResult(BaseModel):
640
scan_id: str
641
status: str
642
vulnerabilities_count: int
643
report_url: str = None
644
645
# In-memory storage for demo (use database in production)
646
scan_results = {}
647
648
@app.post("/scan", response_model=ScanResult)
649
async def start_security_scan(
650
request: ScanRequest,
651
background_tasks: BackgroundTasks
652
):
653
"""Start asynchronous security scan."""
654
655
import uuid
656
scan_id = str(uuid.uuid4())
657
658
# Initialize scan result
659
scan_results[scan_id] = {
660
'status': 'running',
661
'vulnerabilities_count': 0
662
}
663
664
# Start background scan
665
background_tasks.add_task(
666
perform_scan,
667
scan_id,
668
request.project_path,
669
request.severity_threshold,
670
request.output_format
671
)
672
673
return ScanResult(
674
scan_id=scan_id,
675
status='running',
676
vulnerabilities_count=0
677
)
678
679
async def perform_scan(
680
scan_id: str,
681
project_path: str,
682
threshold: float,
683
output_format: str
684
):
685
"""Perform the actual security scan."""
686
687
try:
688
# Configure scan
689
config = create_scan_configuration(
690
severity_threshold=threshold,
691
continue_on_error=True
692
)
693
694
# Perform scan
695
results = scan_project_programmatically(
696
target_path=Path(project_path),
697
config=config
698
)
699
700
# Analyze results
701
vulnerabilities = []
702
for file_path, inspectable_file in results:
703
analysis = inspectable_file.inspect(config=config)
704
vulnerabilities.extend(analysis.vulnerabilities)
705
706
# Generate report
707
report = generate_security_report(
708
vulnerabilities=vulnerabilities,
709
packages=[], # Extract from results
710
output_format=output_format
711
)
712
713
# Update scan result
714
scan_results[scan_id].update({
715
'status': 'completed',
716
'vulnerabilities_count': len(vulnerabilities),
717
'report': report
718
})
719
720
except Exception as e:
721
scan_results[scan_id].update({
722
'status': 'failed',
723
'error': str(e)
724
})
725
726
@app.get("/scan/{scan_id}", response_model=ScanResult)
727
async def get_scan_result(scan_id: str):
728
"""Get scan result by ID."""
729
730
if scan_id not in scan_results:
731
raise HTTPException(status_code=404, detail="Scan not found")
732
733
result = scan_results[scan_id]
734
return ScanResult(
735
scan_id=scan_id,
736
status=result['status'],
737
vulnerabilities_count=result.get('vulnerabilities_count', 0)
738
)
739
740
@app.get("/scan/{scan_id}/report")
741
async def get_scan_report(scan_id: str):
742
"""Get scan report content."""
743
744
if scan_id not in scan_results:
745
raise HTTPException(status_code=404, detail="Scan not found")
746
747
result = scan_results[scan_id]
748
if result['status'] != 'completed':
749
raise HTTPException(status_code=400, detail="Scan not completed")
750
751
return {'report': result.get('report', '')}
752
```
753
754
### Django Integration { .api }
755
756
```python
757
# Django management command for security scanning
758
from django.core.management.base import BaseCommand
759
from django.conf import settings
760
from pathlib import Path
761
import json
762
763
class Command(BaseCommand):
764
help = 'Run security vulnerability scan'
765
766
def add_arguments(self, parser):
767
parser.add_argument('--path', type=str, default='.',
768
help='Path to scan (default: current directory)')
769
parser.add_argument('--threshold', type=float, default=7.0,
770
help='CVSS severity threshold')
771
parser.add_argument('--output', type=str, default='json',
772
choices=['json', 'text', 'html'],
773
help='Output format')
774
parser.add_argument('--save', type=str,
775
help='Save report to file')
776
777
def handle(self, *args, **options):
778
"""Execute security scan."""
779
780
self.stdout.write('Starting security scan...')
781
782
# Configure scan
783
config = create_scan_configuration(
784
severity_threshold=options['threshold'],
785
continue_on_error=True
786
)
787
788
# Perform scan
789
try:
790
results = scan_project_programmatically(
791
target_path=Path(options['path']),
792
config=config
793
)
794
795
# Process results
796
vulnerabilities = []
797
for file_path, inspectable_file in results:
798
analysis = inspectable_file.inspect(config=config)
799
vulnerabilities.extend(analysis.vulnerabilities)
800
801
# Generate report
802
report = generate_security_report(
803
vulnerabilities=vulnerabilities,
804
packages=[],
805
output_format=options['output']
806
)
807
808
# Save or display report
809
if options['save']:
810
with open(options['save'], 'w') as f:
811
f.write(report)
812
self.stdout.write(f"Report saved to: {options['save']}")
813
else:
814
self.stdout.write(report)
815
816
# Summary
817
vuln_count = len(vulnerabilities)
818
if vuln_count > 0:
819
self.stdout.write(
820
self.style.WARNING(f'Found {vuln_count} vulnerabilities')
821
)
822
else:
823
self.stdout.write(
824
self.style.SUCCESS('No vulnerabilities found')
825
)
826
827
except Exception as e:
828
self.stdout.write(
829
self.style.ERROR(f'Scan failed: {e}')
830
)
831
raise
832
833
# Usage: python manage.py security_scan --path ./myproject --threshold 6.0 --save security_report.json
834
```
835
836
This comprehensive programmatic API documentation provides developers with all the tools needed to integrate Safety CLI's security scanning capabilities into their applications, services, and automation workflows.