CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-safety

Scan dependencies for known vulnerabilities and licenses.

Overall
score

61%

Overview
Eval results
Files

programmatic.mddocs/

Programmatic API

Safety CLI provides comprehensive programmatic access to vulnerability scanning, authentication, and security analysis capabilities. This enables developers to integrate Safety's security features directly into applications, CI/CD pipelines, and custom tooling.

Core Programmatic Interfaces

Main CLI Entry Point { .api }

Import Statements:

from safety.cli import cli
from safety.scan.main import process_files
from safety.scan.finder import FileFinder
from safety.auth.cli_utils import build_client_session
from safety.formatter import SafetyFormatter
from safety.models import (
    Vulnerability, Package, SafetyRequirement,
    CVE, Severity, Fix, SafetyEncoder
)

CLI Function Access { .api }

Description: Direct access to CLI functionality through Python functions.

from safety.cli import cli
import click
from click.testing import CliRunner

def run_safety_command(args: List[str]) -> click.testing.Result:
    """
    Execute Safety CLI command programmatically.
    
    Args:
        args (List[str]): Command line arguments
        
    Returns:
        click.testing.Result: Command execution result with output and exit code
    """
    runner = CliRunner()
    return runner.invoke(cli, args)

# Example usage
result = run_safety_command(['scan', '--output', 'json'])
if result.exit_code == 0:
    print("Scan completed successfully")
    scan_data = json.loads(result.output)
else:
    print(f"Scan failed with exit code: {result.exit_code}")
    print(f"Error: {result.output}")

Vulnerability Scanning API { .api }

Direct Scanning Interface { .api }

from safety.scan.main import process_files
from safety.scan.finder import FileFinder
from safety_schemas.models import Ecosystem, ConfigModel
from pathlib import Path
from typing import Dict, Set, List, Tuple

def scan_project_programmatically(
    target_path: Path,
    ecosystems: List[Ecosystem] = None,
    config: ConfigModel = None,
    max_depth: int = 10,
    exclude_patterns: List[str] = None
) -> List[Tuple[Path, 'InspectableFile']]:
    """
    Perform programmatic vulnerability scan on project.
    
    Args:
        target_path (Path): Root directory to scan
        ecosystems (List[Ecosystem]): Target ecosystems (default: [Python])
        config (ConfigModel): Scan configuration
        max_depth (int): Maximum directory traversal depth
        exclude_patterns (List[str]): File/directory exclusion patterns
        
    Returns:
        List[Tuple[Path, InspectableFile]]: Discovered files and analysis results
        
    Raises:
        SafetyError: If scan fails or configuration is invalid
    """
    
    # Set defaults
    if ecosystems is None:
        ecosystems = [Ecosystem.PYTHON]
    if config is None:
        config = ConfigModel()
    if exclude_patterns is None:
        exclude_patterns = ['.git', '__pycache__', '.venv', 'node_modules']
    
    # Initialize file finder
    finder = FileFinder(
        max_level=max_depth,
        ecosystems=ecosystems,
        target=target_path,
        exclude=exclude_patterns
    )
    
    # Discover dependency files
    _, discovered_files = finder.process_directory(str(target_path))
    
    # Process and analyze files
    results = []
    for file_path, inspectable_file in process_files(
        paths=discovered_files,
        config=config,
        target=target_path
    ):
        results.append((file_path, inspectable_file))
    
    return results

# Example usage
from safety_schemas.models import ConfigModel, Ecosystem
from pathlib import Path

# Configure scan
config = ConfigModel(
    ignore_cvss_severity_below=7,
    detailed_output=True
)

# Scan project
results = scan_project_programmatically(
    target_path=Path("./my-project"),
    ecosystems=[Ecosystem.PYTHON],
    config=config,
    exclude_patterns=['.git', '.venv', 'tests']
)

# Process results
vulnerabilities = []
for file_path, inspectable_file in results:
    analysis = inspectable_file.inspect(config=config)
    if analysis.vulnerabilities:
        vulnerabilities.extend(analysis.vulnerabilities)
        
print(f"Found {len(vulnerabilities)} vulnerabilities across {len(results)} files")

Vulnerability Analysis { .api }

from safety.models import Vulnerability, Package
from typing import List, Dict, Any

def analyze_vulnerabilities(
    vulnerabilities: List[Vulnerability],
    severity_threshold: float = 7.0
) -> Dict[str, Any]:
    """
    Analyze vulnerability scan results with categorization and statistics.
    
    Args:
        vulnerabilities (List[Vulnerability]): Found vulnerabilities
        severity_threshold (float): CVSS threshold for high-priority issues
        
    Returns:
        Dict[str, Any]: Analysis summary with statistics and categorization
    """
    
    analysis = {
        'total_vulnerabilities': len(vulnerabilities),
        'by_severity': {'critical': 0, 'high': 0, 'medium': 0, 'low': 0, 'unknown': 0},
        'by_package': {},
        'ignored_count': 0,
        'high_priority': [],
        'remediation_available': 0,
        'transitive_dependencies': 0
    }
    
    for vuln in vulnerabilities:
        # Count ignored vulnerabilities
        if vuln.ignored:
            analysis['ignored_count'] += 1
            continue
            
        # Categorize by severity
        if vuln.severity and vuln.severity.cvssv3:
            score = vuln.severity.cvssv3
            if score >= 9.0:
                analysis['by_severity']['critical'] += 1
            elif score >= 7.0:
                analysis['by_severity']['high'] += 1
            elif score >= 4.0:
                analysis['by_severity']['medium'] += 1
            else:
                analysis['by_severity']['low'] += 1
                
            # High priority vulnerabilities
            if score >= severity_threshold:
                analysis['high_priority'].append({
                    'id': vuln.vulnerability_id,
                    'package': vuln.package_name,
                    'version': vuln.analyzed_version,
                    'cvss': score,
                    'advisory': vuln.get_advisory()
                })
        else:
            analysis['by_severity']['unknown'] += 1
        
        # Count by package
        pkg_name = vuln.package_name
        if pkg_name not in analysis['by_package']:
            analysis['by_package'][pkg_name] = 0
        analysis['by_package'][pkg_name] += 1
        
        # Remediation availability
        if vuln.fixed_versions:
            analysis['remediation_available'] += 1
            
        # Transitive dependency tracking
        if vuln.is_transitive:
            analysis['transitive_dependencies'] += 1
    
    return analysis

# Example usage
vulnerabilities = []  # From scan results
analysis = analyze_vulnerabilities(vulnerabilities, severity_threshold=7.0)

print(f"Security Analysis Summary:")
print(f"Total vulnerabilities: {analysis['total_vulnerabilities']}")
print(f"High priority issues: {len(analysis['high_priority'])}")
print(f"Critical: {analysis['by_severity']['critical']}")
print(f"High: {analysis['by_severity']['high']}")
print(f"Remediation available: {analysis['remediation_available']}")

Authentication API { .api }

Programmatic Authentication { .api }

from safety.auth.cli_utils import build_client_session
from safety.auth.models import Auth, Organization
from safety.auth.utils import SafetyAuthSession
from safety_schemas.models import Stage

def create_authenticated_session(
    api_key: str = None,
    organization_id: str = None,
    stage: Stage = Stage.PRODUCTION,
    proxy_config: Dict[str, Any] = None
) -> SafetyAuthSession:
    """
    Create authenticated Safety session for programmatic access.
    
    Args:
        api_key (str): Safety API key (overrides environment/stored credentials)
        organization_id (str): Target organization ID
        stage (Stage): Environment stage
        proxy_config (Dict): Proxy configuration
        
    Returns:
        SafetyAuthSession: Authenticated session for API calls
        
    Raises:
        InvalidCredentialError: If authentication fails
    """
    
    # Build session with configuration
    session_kwargs = {}
    
    if api_key:
        session_kwargs['api_key'] = api_key
        
    if organization_id:
        org = Organization(id=organization_id, name="")
        session_kwargs['organization'] = org
        
    session_kwargs['stage'] = stage
    
    # Add proxy configuration
    if proxy_config:
        session_kwargs.update(proxy_config)
    
    # Create authenticated session
    session = build_client_session(**session_kwargs)
    
    # Verify authentication
    if not session.is_using_auth_credentials():
        raise ValueError("No authentication credentials configured")
    
    return session

def get_user_profile(session: SafetyAuthSession) -> Dict[str, Any]:
    """
    Get authenticated user profile information.
    
    Args:
        session (SafetyAuthSession): Authenticated session
        
    Returns:
        Dict[str, Any]: User profile data
    """
    response = session.get("/user/profile")
    response.raise_for_status()
    return response.json()

def list_organizations(session: SafetyAuthSession) -> List[Dict[str, Any]]:
    """
    List organizations accessible to authenticated user.
    
    Args:
        session (SafetyAuthSession): Authenticated session
        
    Returns:
        List[Dict[str, Any]]: Organization list
    """
    response = session.get("/organizations")
    response.raise_for_status()
    return response.json()

# Example usage
import os

# Create authenticated session
session = create_authenticated_session(
    api_key=os.getenv("SAFETY_API_KEY"),
    stage=Stage.PRODUCTION
)

# Get user information
try:
    profile = get_user_profile(session)
    print(f"Authenticated as: {profile.get('email')}")
    
    orgs = list_organizations(session)
    print(f"Available organizations: {len(orgs)}")
    
except Exception as e:
    print(f"Authentication failed: {e}")

Report Generation API { .api }

Programmatic Report Creation { .api }

from safety.formatter import SafetyFormatter
from safety.models import SafetyEncoder
import json
from typing import Dict, List, Any

def generate_security_report(
    vulnerabilities: List[Vulnerability],
    packages: List[Package],
    output_format: str = 'json',
    detailed: bool = True,
    save_path: str = None
) -> str:
    """
    Generate security report in specified format.
    
    Args:
        vulnerabilities (List[Vulnerability]): Scan results
        packages (List[Package]): Analyzed packages
        output_format (str): Output format (json, html, text, screen)
        detailed (bool): Include detailed vulnerability information
        save_path (str): Optional file path to save report
        
    Returns:
        str: Generated report content
    """
    
    # Prepare data for formatter
    announcements = []  # Platform announcements
    remediations = {}   # Remediation suggestions
    fixes = ()         # Applied fixes
    
    # Create formatter
    formatter = SafetyFormatter(output=output_format)
    
    # Generate report
    report_content = formatter.render_vulnerabilities(
        announcements=announcements,
        vulnerabilities=vulnerabilities,
        remediations=remediations,
        full=detailed,
        packages=packages,
        fixes=fixes
    )
    
    # Save to file if path provided
    if save_path:
        with open(save_path, 'w', encoding='utf-8') as f:
            f.write(report_content)
    
    return report_content

def create_custom_report(
    scan_results: Dict[str, Any],
    template: Dict[str, Any] = None
) -> Dict[str, Any]:
    """
    Create custom report format with additional metadata.
    
    Args:
        scan_results (Dict[str, Any]): Raw scan results
        template (Dict[str, Any]): Custom report template
        
    Returns:
        Dict[str, Any]: Custom formatted report
    """
    
    from datetime import datetime
    
    # Default template structure
    if template is None:
        template = {
            'report_metadata': {
                'generated_at': datetime.now().isoformat(),
                'safety_version': '3.6.1',
                'report_format_version': '1.0'
            },
            'scan_summary': {},
            'findings': {},
            'recommendations': []
        }
    
    # Populate template with scan data
    report = template.copy()
    
    # Add scan summary
    vulnerabilities = scan_results.get('vulnerabilities', [])
    report['scan_summary'] = {
        'total_packages': len(scan_results.get('packages', [])),
        'total_vulnerabilities': len(vulnerabilities),
        'critical_vulnerabilities': len([v for v in vulnerabilities 
                                       if v.get('severity', {}).get('cvssv3', 0) >= 9.0]),
        'high_vulnerabilities': len([v for v in vulnerabilities 
                                   if 7.0 <= v.get('severity', {}).get('cvssv3', 0) < 9.0])
    }
    
    # Add findings
    report['findings'] = {
        'vulnerabilities': vulnerabilities,
        'packages': scan_results.get('packages', []),
        'ignored_vulnerabilities': [v for v in vulnerabilities if v.get('ignored', False)]
    }
    
    # Add recommendations
    report['recommendations'] = generate_recommendations(vulnerabilities)
    
    return report

def generate_recommendations(vulnerabilities: List[Dict]) -> List[Dict[str, str]]:
    """Generate actionable recommendations based on vulnerabilities."""
    
    recommendations = []
    
    # High severity recommendations
    high_severity = [v for v in vulnerabilities 
                    if v.get('severity', {}).get('cvssv3', 0) >= 7.0]
    
    if high_severity:
        recommendations.append({
            'priority': 'high',
            'action': 'Update critical dependencies',
            'description': f'Update {len(high_severity)} high-severity vulnerable packages immediately'
        })
    
    # Package-specific recommendations
    package_counts = {}
    for vuln in vulnerabilities:
        pkg = vuln.get('package_name')
        if pkg:
            package_counts[pkg] = package_counts.get(pkg, 0) + 1
    
    for pkg, count in package_counts.items():
        if count > 1:
            recommendations.append({
                'priority': 'medium',
                'action': f'Review {pkg} package',
                'description': f'Package {pkg} has {count} vulnerabilities - consider alternatives'
            })
    
    return recommendations

# Example usage
# Generate JSON report
json_report = generate_security_report(
    vulnerabilities=vulnerabilities,
    packages=packages,
    output_format='json',
    detailed=True,
    save_path='security_report.json'
)

# Generate HTML report for web viewing
html_report = generate_security_report(
    vulnerabilities=vulnerabilities,
    packages=packages,
    output_format='html',
    save_path='security_report.html'
)

# Create custom executive summary
executive_report = create_custom_report({
    'vulnerabilities': vulnerabilities,
    'packages': packages
})

print(json.dumps(executive_report, cls=SafetyEncoder, indent=2))

Configuration API { .api }

Dynamic Configuration Management { .api }

from safety_schemas.models import ConfigModel
from pathlib import Path
from typing import Dict, Any

def create_scan_configuration(
    severity_threshold: float = 7.0,
    ignore_unpinned: bool = True,
    auto_fix: bool = False,
    max_fixes: int = 5,
    continue_on_error: bool = False,
    exclude_patterns: List[str] = None
) -> ConfigModel:
    """
    Create scan configuration programmatically.
    
    Args:
        severity_threshold (float): Minimum CVSS score to report
        ignore_unpinned (bool): Skip unpinned requirements
        auto_fix (bool): Enable automatic remediation
        max_fixes (int): Maximum number of fixes to apply
        continue_on_error (bool): Continue scan despite errors
        exclude_patterns (List[str]): File exclusion patterns
        
    Returns:
        ConfigModel: Configured scan settings
    """
    
    config = ConfigModel(
        ignore_cvss_severity_below=severity_threshold,
        ignore_unpinned_requirements=ignore_unpinned,
        continue_on_vulnerability_error=continue_on_error,
        auto_remediation_enabled=auto_fix,
        auto_remediation_limit=max_fixes if auto_fix else None
    )
    
    if exclude_patterns:
        config.exclude_patterns = exclude_patterns
    
    return config

def load_configuration_from_dict(config_dict: Dict[str, Any]) -> ConfigModel:
    """
    Load configuration from dictionary (e.g., from JSON/YAML).
    
    Args:
        config_dict (Dict[str, Any]): Configuration dictionary
        
    Returns:
        ConfigModel: Parsed configuration
    """
    
    # Extract security settings
    security_config = config_dict.get('security', {})
    
    config = ConfigModel(
        ignore_cvss_severity_below=security_config.get('ignore-cvss-severity-below', 0),
        ignore_unpinned_requirements=security_config.get('ignore-unpinned-requirements', False),
        continue_on_vulnerability_error=security_config.get('continue-on-vulnerability-error', False),
        ignore_cvss_unknown_severity=security_config.get('ignore-cvss-unknown-severity', False)
    )
    
    # Add specific vulnerability ignores
    ignore_vulns = security_config.get('ignore-vulnerabilities', {})
    if ignore_vulns:
        config.ignore_vulnerabilities = list(ignore_vulns.keys())
    
    return config

# Example usage
# Create configuration for CI/CD
ci_config = create_scan_configuration(
    severity_threshold=7.0,
    ignore_unpinned=True,
    continue_on_error=True,  # Don't fail builds
    exclude_patterns=['tests/', '.venv/', 'docs/']
)

# Create configuration for production deployment
prod_config = create_scan_configuration(
    severity_threshold=4.0,  # Stricter threshold
    ignore_unpinned=False,   # Require pinned versions
    auto_fix=True,           # Enable auto-remediation
    max_fixes=10,
    continue_on_error=False  # Fail on any vulnerabilities
)

# Load from external configuration
external_config = {
    'security': {
        'ignore-cvss-severity-below': 6,
        'ignore-unpinned-requirements': True,
        'continue-on-vulnerability-error': False,
        'ignore-vulnerabilities': {
            '12345': {'reason': 'False positive', 'expires': '2024-12-31'},
            '67890': {'reason': 'Mitigated by firewall'}
        }
    }
}

loaded_config = load_configuration_from_dict(external_config)

Integration Examples

FastAPI Integration { .api }

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Dict, Any
import asyncio
from pathlib import Path

app = FastAPI(title="Security Scanning API")

class ScanRequest(BaseModel):
    project_path: str
    severity_threshold: float = 7.0
    output_format: str = 'json'

class ScanResult(BaseModel):
    scan_id: str
    status: str
    vulnerabilities_count: int
    report_url: str = None

# In-memory storage for demo (use database in production)
scan_results = {}

@app.post("/scan", response_model=ScanResult)
async def start_security_scan(
    request: ScanRequest, 
    background_tasks: BackgroundTasks
):
    """Start asynchronous security scan."""
    
    import uuid
    scan_id = str(uuid.uuid4())
    
    # Initialize scan result
    scan_results[scan_id] = {
        'status': 'running',
        'vulnerabilities_count': 0
    }
    
    # Start background scan
    background_tasks.add_task(
        perform_scan, 
        scan_id, 
        request.project_path, 
        request.severity_threshold,
        request.output_format
    )
    
    return ScanResult(
        scan_id=scan_id,
        status='running',
        vulnerabilities_count=0
    )

async def perform_scan(
    scan_id: str, 
    project_path: str, 
    threshold: float,
    output_format: str
):
    """Perform the actual security scan."""
    
    try:
        # Configure scan
        config = create_scan_configuration(
            severity_threshold=threshold,
            continue_on_error=True
        )
        
        # Perform scan
        results = scan_project_programmatically(
            target_path=Path(project_path),
            config=config
        )
        
        # Analyze results
        vulnerabilities = []
        for file_path, inspectable_file in results:
            analysis = inspectable_file.inspect(config=config)
            vulnerabilities.extend(analysis.vulnerabilities)
        
        # Generate report
        report = generate_security_report(
            vulnerabilities=vulnerabilities,
            packages=[],  # Extract from results
            output_format=output_format
        )
        
        # Update scan result
        scan_results[scan_id].update({
            'status': 'completed',
            'vulnerabilities_count': len(vulnerabilities),
            'report': report
        })
        
    except Exception as e:
        scan_results[scan_id].update({
            'status': 'failed',
            'error': str(e)
        })

@app.get("/scan/{scan_id}", response_model=ScanResult)
async def get_scan_result(scan_id: str):
    """Get scan result by ID."""
    
    if scan_id not in scan_results:
        raise HTTPException(status_code=404, detail="Scan not found")
    
    result = scan_results[scan_id]
    return ScanResult(
        scan_id=scan_id,
        status=result['status'],
        vulnerabilities_count=result.get('vulnerabilities_count', 0)
    )

@app.get("/scan/{scan_id}/report")
async def get_scan_report(scan_id: str):
    """Get scan report content."""
    
    if scan_id not in scan_results:
        raise HTTPException(status_code=404, detail="Scan not found")
    
    result = scan_results[scan_id]
    if result['status'] != 'completed':
        raise HTTPException(status_code=400, detail="Scan not completed")
    
    return {'report': result.get('report', '')}

Django Integration { .api }

# Django management command for security scanning
from django.core.management.base import BaseCommand
from django.conf import settings
from pathlib import Path
import json

class Command(BaseCommand):
    help = 'Run security vulnerability scan'

    def add_arguments(self, parser):
        parser.add_argument('--path', type=str, default='.',
                          help='Path to scan (default: current directory)')
        parser.add_argument('--threshold', type=float, default=7.0,
                          help='CVSS severity threshold')
        parser.add_argument('--output', type=str, default='json',
                          choices=['json', 'text', 'html'],
                          help='Output format')
        parser.add_argument('--save', type=str,
                          help='Save report to file')

    def handle(self, *args, **options):
        """Execute security scan."""
        
        self.stdout.write('Starting security scan...')
        
        # Configure scan
        config = create_scan_configuration(
            severity_threshold=options['threshold'],
            continue_on_error=True
        )
        
        # Perform scan
        try:
            results = scan_project_programmatically(
                target_path=Path(options['path']),
                config=config
            )
            
            # Process results
            vulnerabilities = []
            for file_path, inspectable_file in results:
                analysis = inspectable_file.inspect(config=config)
                vulnerabilities.extend(analysis.vulnerabilities)
            
            # Generate report
            report = generate_security_report(
                vulnerabilities=vulnerabilities,
                packages=[],
                output_format=options['output']
            )
            
            # Save or display report
            if options['save']:
                with open(options['save'], 'w') as f:
                    f.write(report)
                self.stdout.write(f"Report saved to: {options['save']}")
            else:
                self.stdout.write(report)
            
            # Summary
            vuln_count = len(vulnerabilities)
            if vuln_count > 0:
                self.stdout.write(
                    self.style.WARNING(f'Found {vuln_count} vulnerabilities')
                )
            else:
                self.stdout.write(
                    self.style.SUCCESS('No vulnerabilities found')
                )
                
        except Exception as e:
            self.stdout.write(
                self.style.ERROR(f'Scan failed: {e}')
            )
            raise

# Usage: python manage.py security_scan --path ./myproject --threshold 6.0 --save security_report.json

This comprehensive programmatic API documentation provides developers with all the tools needed to integrate Safety CLI's security scanning capabilities into their applications, services, and automation workflows.

Install with Tessl CLI

npx tessl i tessl/pypi-safety

docs

authentication.md

cli-commands.md

configuration.md

errors.md

formatters.md

index.md

models.md

programmatic.md

scanning.md

tile.json