Scan dependencies for known vulnerabilities and licenses.
Overall
score
61%
Safety CLI provides comprehensive programmatic access to vulnerability scanning, authentication, and security analysis capabilities. This enables developers to integrate Safety's security features directly into applications, CI/CD pipelines, and custom tooling.
Import Statements:
from safety.cli import cli
from safety.scan.main import process_files
from safety.scan.finder import FileFinder
from safety.auth.cli_utils import build_client_session
from safety.formatter import SafetyFormatter
from safety.models import (
Vulnerability, Package, SafetyRequirement,
CVE, Severity, Fix, SafetyEncoder
)Description: Direct access to CLI functionality through Python functions.
from safety.cli import cli
import click
from click.testing import CliRunner
def run_safety_command(args: List[str]) -> click.testing.Result:
"""
Execute Safety CLI command programmatically.
Args:
args (List[str]): Command line arguments
Returns:
click.testing.Result: Command execution result with output and exit code
"""
runner = CliRunner()
return runner.invoke(cli, args)
# Example usage
result = run_safety_command(['scan', '--output', 'json'])
if result.exit_code == 0:
print("Scan completed successfully")
scan_data = json.loads(result.output)
else:
print(f"Scan failed with exit code: {result.exit_code}")
print(f"Error: {result.output}")from safety.scan.main import process_files
from safety.scan.finder import FileFinder
from safety_schemas.models import Ecosystem, ConfigModel
from pathlib import Path
from typing import Dict, Set, List, Tuple
def scan_project_programmatically(
target_path: Path,
ecosystems: List[Ecosystem] = None,
config: ConfigModel = None,
max_depth: int = 10,
exclude_patterns: List[str] = None
) -> List[Tuple[Path, 'InspectableFile']]:
"""
Perform programmatic vulnerability scan on project.
Args:
target_path (Path): Root directory to scan
ecosystems (List[Ecosystem]): Target ecosystems (default: [Python])
config (ConfigModel): Scan configuration
max_depth (int): Maximum directory traversal depth
exclude_patterns (List[str]): File/directory exclusion patterns
Returns:
List[Tuple[Path, InspectableFile]]: Discovered files and analysis results
Raises:
SafetyError: If scan fails or configuration is invalid
"""
# Set defaults
if ecosystems is None:
ecosystems = [Ecosystem.PYTHON]
if config is None:
config = ConfigModel()
if exclude_patterns is None:
exclude_patterns = ['.git', '__pycache__', '.venv', 'node_modules']
# Initialize file finder
finder = FileFinder(
max_level=max_depth,
ecosystems=ecosystems,
target=target_path,
exclude=exclude_patterns
)
# Discover dependency files
_, discovered_files = finder.process_directory(str(target_path))
# Process and analyze files
results = []
for file_path, inspectable_file in process_files(
paths=discovered_files,
config=config,
target=target_path
):
results.append((file_path, inspectable_file))
return results
# Example usage
from safety_schemas.models import ConfigModel, Ecosystem
from pathlib import Path
# Configure scan
config = ConfigModel(
ignore_cvss_severity_below=7,
detailed_output=True
)
# Scan project
results = scan_project_programmatically(
target_path=Path("./my-project"),
ecosystems=[Ecosystem.PYTHON],
config=config,
exclude_patterns=['.git', '.venv', 'tests']
)
# Process results
vulnerabilities = []
for file_path, inspectable_file in results:
analysis = inspectable_file.inspect(config=config)
if analysis.vulnerabilities:
vulnerabilities.extend(analysis.vulnerabilities)
print(f"Found {len(vulnerabilities)} vulnerabilities across {len(results)} files")from safety.models import Vulnerability, Package
from typing import List, Dict, Any
def analyze_vulnerabilities(
vulnerabilities: List[Vulnerability],
severity_threshold: float = 7.0
) -> Dict[str, Any]:
"""
Analyze vulnerability scan results with categorization and statistics.
Args:
vulnerabilities (List[Vulnerability]): Found vulnerabilities
severity_threshold (float): CVSS threshold for high-priority issues
Returns:
Dict[str, Any]: Analysis summary with statistics and categorization
"""
analysis = {
'total_vulnerabilities': len(vulnerabilities),
'by_severity': {'critical': 0, 'high': 0, 'medium': 0, 'low': 0, 'unknown': 0},
'by_package': {},
'ignored_count': 0,
'high_priority': [],
'remediation_available': 0,
'transitive_dependencies': 0
}
for vuln in vulnerabilities:
# Count ignored vulnerabilities
if vuln.ignored:
analysis['ignored_count'] += 1
continue
# Categorize by severity
if vuln.severity and vuln.severity.cvssv3:
score = vuln.severity.cvssv3
if score >= 9.0:
analysis['by_severity']['critical'] += 1
elif score >= 7.0:
analysis['by_severity']['high'] += 1
elif score >= 4.0:
analysis['by_severity']['medium'] += 1
else:
analysis['by_severity']['low'] += 1
# High priority vulnerabilities
if score >= severity_threshold:
analysis['high_priority'].append({
'id': vuln.vulnerability_id,
'package': vuln.package_name,
'version': vuln.analyzed_version,
'cvss': score,
'advisory': vuln.get_advisory()
})
else:
analysis['by_severity']['unknown'] += 1
# Count by package
pkg_name = vuln.package_name
if pkg_name not in analysis['by_package']:
analysis['by_package'][pkg_name] = 0
analysis['by_package'][pkg_name] += 1
# Remediation availability
if vuln.fixed_versions:
analysis['remediation_available'] += 1
# Transitive dependency tracking
if vuln.is_transitive:
analysis['transitive_dependencies'] += 1
return analysis
# Example usage
vulnerabilities = [] # From scan results
analysis = analyze_vulnerabilities(vulnerabilities, severity_threshold=7.0)
print(f"Security Analysis Summary:")
print(f"Total vulnerabilities: {analysis['total_vulnerabilities']}")
print(f"High priority issues: {len(analysis['high_priority'])}")
print(f"Critical: {analysis['by_severity']['critical']}")
print(f"High: {analysis['by_severity']['high']}")
print(f"Remediation available: {analysis['remediation_available']}")from safety.auth.cli_utils import build_client_session
from safety.auth.models import Auth, Organization
from safety.auth.utils import SafetyAuthSession
from safety_schemas.models import Stage
def create_authenticated_session(
api_key: str = None,
organization_id: str = None,
stage: Stage = Stage.PRODUCTION,
proxy_config: Dict[str, Any] = None
) -> SafetyAuthSession:
"""
Create authenticated Safety session for programmatic access.
Args:
api_key (str): Safety API key (overrides environment/stored credentials)
organization_id (str): Target organization ID
stage (Stage): Environment stage
proxy_config (Dict): Proxy configuration
Returns:
SafetyAuthSession: Authenticated session for API calls
Raises:
InvalidCredentialError: If authentication fails
"""
# Build session with configuration
session_kwargs = {}
if api_key:
session_kwargs['api_key'] = api_key
if organization_id:
org = Organization(id=organization_id, name="")
session_kwargs['organization'] = org
session_kwargs['stage'] = stage
# Add proxy configuration
if proxy_config:
session_kwargs.update(proxy_config)
# Create authenticated session
session = build_client_session(**session_kwargs)
# Verify authentication
if not session.is_using_auth_credentials():
raise ValueError("No authentication credentials configured")
return session
def get_user_profile(session: SafetyAuthSession) -> Dict[str, Any]:
"""
Get authenticated user profile information.
Args:
session (SafetyAuthSession): Authenticated session
Returns:
Dict[str, Any]: User profile data
"""
response = session.get("/user/profile")
response.raise_for_status()
return response.json()
def list_organizations(session: SafetyAuthSession) -> List[Dict[str, Any]]:
"""
List organizations accessible to authenticated user.
Args:
session (SafetyAuthSession): Authenticated session
Returns:
List[Dict[str, Any]]: Organization list
"""
response = session.get("/organizations")
response.raise_for_status()
return response.json()
# Example usage
import os
# Create authenticated session
session = create_authenticated_session(
api_key=os.getenv("SAFETY_API_KEY"),
stage=Stage.PRODUCTION
)
# Get user information
try:
profile = get_user_profile(session)
print(f"Authenticated as: {profile.get('email')}")
orgs = list_organizations(session)
print(f"Available organizations: {len(orgs)}")
except Exception as e:
print(f"Authentication failed: {e}")from safety.formatter import SafetyFormatter
from safety.models import SafetyEncoder
import json
from typing import Dict, List, Any
def generate_security_report(
vulnerabilities: List[Vulnerability],
packages: List[Package],
output_format: str = 'json',
detailed: bool = True,
save_path: str = None
) -> str:
"""
Generate security report in specified format.
Args:
vulnerabilities (List[Vulnerability]): Scan results
packages (List[Package]): Analyzed packages
output_format (str): Output format (json, html, text, screen)
detailed (bool): Include detailed vulnerability information
save_path (str): Optional file path to save report
Returns:
str: Generated report content
"""
# Prepare data for formatter
announcements = [] # Platform announcements
remediations = {} # Remediation suggestions
fixes = () # Applied fixes
# Create formatter
formatter = SafetyFormatter(output=output_format)
# Generate report
report_content = formatter.render_vulnerabilities(
announcements=announcements,
vulnerabilities=vulnerabilities,
remediations=remediations,
full=detailed,
packages=packages,
fixes=fixes
)
# Save to file if path provided
if save_path:
with open(save_path, 'w', encoding='utf-8') as f:
f.write(report_content)
return report_content
def create_custom_report(
scan_results: Dict[str, Any],
template: Dict[str, Any] = None
) -> Dict[str, Any]:
"""
Create custom report format with additional metadata.
Args:
scan_results (Dict[str, Any]): Raw scan results
template (Dict[str, Any]): Custom report template
Returns:
Dict[str, Any]: Custom formatted report
"""
from datetime import datetime
# Default template structure
if template is None:
template = {
'report_metadata': {
'generated_at': datetime.now().isoformat(),
'safety_version': '3.6.1',
'report_format_version': '1.0'
},
'scan_summary': {},
'findings': {},
'recommendations': []
}
# Populate template with scan data
report = template.copy()
# Add scan summary
vulnerabilities = scan_results.get('vulnerabilities', [])
report['scan_summary'] = {
'total_packages': len(scan_results.get('packages', [])),
'total_vulnerabilities': len(vulnerabilities),
'critical_vulnerabilities': len([v for v in vulnerabilities
if v.get('severity', {}).get('cvssv3', 0) >= 9.0]),
'high_vulnerabilities': len([v for v in vulnerabilities
if 7.0 <= v.get('severity', {}).get('cvssv3', 0) < 9.0])
}
# Add findings
report['findings'] = {
'vulnerabilities': vulnerabilities,
'packages': scan_results.get('packages', []),
'ignored_vulnerabilities': [v for v in vulnerabilities if v.get('ignored', False)]
}
# Add recommendations
report['recommendations'] = generate_recommendations(vulnerabilities)
return report
def generate_recommendations(vulnerabilities: List[Dict]) -> List[Dict[str, str]]:
"""Generate actionable recommendations based on vulnerabilities."""
recommendations = []
# High severity recommendations
high_severity = [v for v in vulnerabilities
if v.get('severity', {}).get('cvssv3', 0) >= 7.0]
if high_severity:
recommendations.append({
'priority': 'high',
'action': 'Update critical dependencies',
'description': f'Update {len(high_severity)} high-severity vulnerable packages immediately'
})
# Package-specific recommendations
package_counts = {}
for vuln in vulnerabilities:
pkg = vuln.get('package_name')
if pkg:
package_counts[pkg] = package_counts.get(pkg, 0) + 1
for pkg, count in package_counts.items():
if count > 1:
recommendations.append({
'priority': 'medium',
'action': f'Review {pkg} package',
'description': f'Package {pkg} has {count} vulnerabilities - consider alternatives'
})
return recommendations
# Example usage
# Generate JSON report
json_report = generate_security_report(
vulnerabilities=vulnerabilities,
packages=packages,
output_format='json',
detailed=True,
save_path='security_report.json'
)
# Generate HTML report for web viewing
html_report = generate_security_report(
vulnerabilities=vulnerabilities,
packages=packages,
output_format='html',
save_path='security_report.html'
)
# Create custom executive summary
executive_report = create_custom_report({
'vulnerabilities': vulnerabilities,
'packages': packages
})
print(json.dumps(executive_report, cls=SafetyEncoder, indent=2))from safety_schemas.models import ConfigModel
from pathlib import Path
from typing import Dict, Any
def create_scan_configuration(
severity_threshold: float = 7.0,
ignore_unpinned: bool = True,
auto_fix: bool = False,
max_fixes: int = 5,
continue_on_error: bool = False,
exclude_patterns: List[str] = None
) -> ConfigModel:
"""
Create scan configuration programmatically.
Args:
severity_threshold (float): Minimum CVSS score to report
ignore_unpinned (bool): Skip unpinned requirements
auto_fix (bool): Enable automatic remediation
max_fixes (int): Maximum number of fixes to apply
continue_on_error (bool): Continue scan despite errors
exclude_patterns (List[str]): File exclusion patterns
Returns:
ConfigModel: Configured scan settings
"""
config = ConfigModel(
ignore_cvss_severity_below=severity_threshold,
ignore_unpinned_requirements=ignore_unpinned,
continue_on_vulnerability_error=continue_on_error,
auto_remediation_enabled=auto_fix,
auto_remediation_limit=max_fixes if auto_fix else None
)
if exclude_patterns:
config.exclude_patterns = exclude_patterns
return config
def load_configuration_from_dict(config_dict: Dict[str, Any]) -> ConfigModel:
"""
Load configuration from dictionary (e.g., from JSON/YAML).
Args:
config_dict (Dict[str, Any]): Configuration dictionary
Returns:
ConfigModel: Parsed configuration
"""
# Extract security settings
security_config = config_dict.get('security', {})
config = ConfigModel(
ignore_cvss_severity_below=security_config.get('ignore-cvss-severity-below', 0),
ignore_unpinned_requirements=security_config.get('ignore-unpinned-requirements', False),
continue_on_vulnerability_error=security_config.get('continue-on-vulnerability-error', False),
ignore_cvss_unknown_severity=security_config.get('ignore-cvss-unknown-severity', False)
)
# Add specific vulnerability ignores
ignore_vulns = security_config.get('ignore-vulnerabilities', {})
if ignore_vulns:
config.ignore_vulnerabilities = list(ignore_vulns.keys())
return config
# Example usage
# Create configuration for CI/CD
ci_config = create_scan_configuration(
severity_threshold=7.0,
ignore_unpinned=True,
continue_on_error=True, # Don't fail builds
exclude_patterns=['tests/', '.venv/', 'docs/']
)
# Create configuration for production deployment
prod_config = create_scan_configuration(
severity_threshold=4.0, # Stricter threshold
ignore_unpinned=False, # Require pinned versions
auto_fix=True, # Enable auto-remediation
max_fixes=10,
continue_on_error=False # Fail on any vulnerabilities
)
# Load from external configuration
external_config = {
'security': {
'ignore-cvss-severity-below': 6,
'ignore-unpinned-requirements': True,
'continue-on-vulnerability-error': False,
'ignore-vulnerabilities': {
'12345': {'reason': 'False positive', 'expires': '2024-12-31'},
'67890': {'reason': 'Mitigated by firewall'}
}
}
}
loaded_config = load_configuration_from_dict(external_config)from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Dict, Any
import asyncio
from pathlib import Path
app = FastAPI(title="Security Scanning API")
class ScanRequest(BaseModel):
project_path: str
severity_threshold: float = 7.0
output_format: str = 'json'
class ScanResult(BaseModel):
scan_id: str
status: str
vulnerabilities_count: int
report_url: str = None
# In-memory storage for demo (use database in production)
scan_results = {}
@app.post("/scan", response_model=ScanResult)
async def start_security_scan(
request: ScanRequest,
background_tasks: BackgroundTasks
):
"""Start asynchronous security scan."""
import uuid
scan_id = str(uuid.uuid4())
# Initialize scan result
scan_results[scan_id] = {
'status': 'running',
'vulnerabilities_count': 0
}
# Start background scan
background_tasks.add_task(
perform_scan,
scan_id,
request.project_path,
request.severity_threshold,
request.output_format
)
return ScanResult(
scan_id=scan_id,
status='running',
vulnerabilities_count=0
)
async def perform_scan(
scan_id: str,
project_path: str,
threshold: float,
output_format: str
):
"""Perform the actual security scan."""
try:
# Configure scan
config = create_scan_configuration(
severity_threshold=threshold,
continue_on_error=True
)
# Perform scan
results = scan_project_programmatically(
target_path=Path(project_path),
config=config
)
# Analyze results
vulnerabilities = []
for file_path, inspectable_file in results:
analysis = inspectable_file.inspect(config=config)
vulnerabilities.extend(analysis.vulnerabilities)
# Generate report
report = generate_security_report(
vulnerabilities=vulnerabilities,
packages=[], # Extract from results
output_format=output_format
)
# Update scan result
scan_results[scan_id].update({
'status': 'completed',
'vulnerabilities_count': len(vulnerabilities),
'report': report
})
except Exception as e:
scan_results[scan_id].update({
'status': 'failed',
'error': str(e)
})
@app.get("/scan/{scan_id}", response_model=ScanResult)
async def get_scan_result(scan_id: str):
"""Get scan result by ID."""
if scan_id not in scan_results:
raise HTTPException(status_code=404, detail="Scan not found")
result = scan_results[scan_id]
return ScanResult(
scan_id=scan_id,
status=result['status'],
vulnerabilities_count=result.get('vulnerabilities_count', 0)
)
@app.get("/scan/{scan_id}/report")
async def get_scan_report(scan_id: str):
"""Get scan report content."""
if scan_id not in scan_results:
raise HTTPException(status_code=404, detail="Scan not found")
result = scan_results[scan_id]
if result['status'] != 'completed':
raise HTTPException(status_code=400, detail="Scan not completed")
return {'report': result.get('report', '')}# Django management command for security scanning
from django.core.management.base import BaseCommand
from django.conf import settings
from pathlib import Path
import json
class Command(BaseCommand):
help = 'Run security vulnerability scan'
def add_arguments(self, parser):
parser.add_argument('--path', type=str, default='.',
help='Path to scan (default: current directory)')
parser.add_argument('--threshold', type=float, default=7.0,
help='CVSS severity threshold')
parser.add_argument('--output', type=str, default='json',
choices=['json', 'text', 'html'],
help='Output format')
parser.add_argument('--save', type=str,
help='Save report to file')
def handle(self, *args, **options):
"""Execute security scan."""
self.stdout.write('Starting security scan...')
# Configure scan
config = create_scan_configuration(
severity_threshold=options['threshold'],
continue_on_error=True
)
# Perform scan
try:
results = scan_project_programmatically(
target_path=Path(options['path']),
config=config
)
# Process results
vulnerabilities = []
for file_path, inspectable_file in results:
analysis = inspectable_file.inspect(config=config)
vulnerabilities.extend(analysis.vulnerabilities)
# Generate report
report = generate_security_report(
vulnerabilities=vulnerabilities,
packages=[],
output_format=options['output']
)
# Save or display report
if options['save']:
with open(options['save'], 'w') as f:
f.write(report)
self.stdout.write(f"Report saved to: {options['save']}")
else:
self.stdout.write(report)
# Summary
vuln_count = len(vulnerabilities)
if vuln_count > 0:
self.stdout.write(
self.style.WARNING(f'Found {vuln_count} vulnerabilities')
)
else:
self.stdout.write(
self.style.SUCCESS('No vulnerabilities found')
)
except Exception as e:
self.stdout.write(
self.style.ERROR(f'Scan failed: {e}')
)
raise
# Usage: python manage.py security_scan --path ./myproject --threshold 6.0 --save security_report.jsonThis comprehensive programmatic API documentation provides developers with all the tools needed to integrate Safety CLI's security scanning capabilities into their applications, services, and automation workflows.
Install with Tessl CLI
npx tessl i tessl/pypi-safetydocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10