A tool for scanning Python environments for known vulnerabilities
—
Functionality for resolving and applying fixes to vulnerable dependencies. This module handles the process of determining appropriate fix versions and coordinating upgrades.
Base classes representing fix versions and their resolution status.
@dataclass(frozen=True)
class FixVersion:
"""
Represents an abstract dependency fix version.
This class cannot be constructed directly.
"""
dep: ResolvedDependency
"""
The dependency that needs to be fixed.
"""
def is_skipped(self) -> bool:
"""
Check whether the FixVersion was unable to be resolved.
Returns:
True if this is a SkippedFixVersion, False otherwise
"""Represents a successfully resolved fix version.
@dataclass(frozen=True)
class ResolvedFixVersion(FixVersion):
"""
Represents a dependency fix version that was successfully resolved.
"""
version: Version
"""
The resolved fix version to upgrade to.
"""Represents a fix version that could not be resolved.
@dataclass(frozen=True)
class SkippedFixVersion(FixVersion):
"""
Represents a dependency fix version that was unable to be resolved.
"""
skip_reason: str
"""
The reason why this fix version could not be resolved.
"""Exception classes related to fix resolution.
class FixResolutionImpossible(Exception):
"""
Raised when resolve_fix_versions fails to find a fix version without known vulnerabilities.
"""Functions for resolving fix versions from vulnerability results.
def resolve_fix_versions(
dependency_results: Iterator[tuple[Dependency, list[VulnerabilityResult]]],
service: VulnerabilityService,
state: AuditState = AuditState(),
) -> Iterator[FixVersion]:
"""
Resolve fix versions for vulnerabilities.
Takes dependency audit results and determines appropriate fix versions
for each vulnerable dependency.
Parameters:
- dependency_results: Iterator of (dependency, vulnerabilities) tuples
- service: VulnerabilityService to query for additional information
- state: AuditState for progress tracking
Returns:
Iterator of FixVersion objects (either ResolvedFixVersion or SkippedFixVersion)
"""from pip_audit._fix import resolve_fix_versions
from pip_audit._audit import Auditor
from pip_audit._dependency_source import PipSource
from pip_audit._service import PyPIService
# Perform audit and resolve fixes
service = PyPIService()
source = PipSource()
auditor = Auditor(service=service)
# Get audit results
audit_results = auditor.audit(source)
# Resolve fix versions
fix_versions = resolve_fix_versions(audit_results, service)
for fix_version in fix_versions:
if fix_version.is_skipped():
print(f"Cannot fix {fix_version.dep.name}: {fix_version.skip_reason}")
else:
print(f"Fix {fix_version.dep.name} v{fix_version.dep.version} -> v{fix_version.version}")from pip_audit._fix import resolve_fix_versions
from pip_audit._audit import Auditor
from pip_audit._dependency_source import RequirementSource
from pip_audit._service import PyPIService
# Setup components
service = PyPIService()
source = RequirementSource("requirements.txt")
auditor = Auditor(service=service)
# Get vulnerabilities and resolve fixes
audit_results = list(auditor.audit(source))
vulnerable_results = [(dep, vulns) for dep, vulns in audit_results if vulns]
if vulnerable_results:
print(f"Found {len(vulnerable_results)} packages with vulnerabilities")
# Resolve fix versions
fix_versions = list(resolve_fix_versions(iter(vulnerable_results), service))
# Apply fixes
for fix_version in fix_versions:
if not fix_version.is_skipped():
try:
source.fix(fix_version)
print(f"Fixed {fix_version.dep.name} -> v{fix_version.version}")
except Exception as e:
print(f"Failed to fix {fix_version.dep.name}: {e}")
else:
print(f"Skipped {fix_version.dep.name}: {fix_version.skip_reason}")from pip_audit._fix import resolve_fix_versions
from pip_audit._audit import Auditor
from pip_audit._dependency_source import PyProjectSource
from pip_audit._service import OsvService
from pip_audit._state import AuditState
# Custom state handler for progress tracking
class ProgressTracker:
def update_state(self, message: str, logs: str | None = None):
print(f"Progress: {message}")
def initialize(self):
print("Starting fix resolution...")
def finalize(self):
print("Fix resolution completed.")
# Setup with state tracking
tracker = ProgressTracker()
state = AuditState(members=[tracker])
service = OsvService()
source = PyProjectSource("pyproject.toml")
auditor = Auditor(service=service)
# Perform audit with state tracking
state.initialize()
audit_results = auditor.audit(source)
# Resolve fixes with progress updates
fix_versions = resolve_fix_versions(audit_results, service, state=state)
resolved_count = 0
skipped_count = 0
for fix_version in fix_versions:
if fix_version.is_skipped():
skipped_count += 1
else:
resolved_count += 1
state.finalize()
print(f"Summary: {resolved_count} fixes resolved, {skipped_count} skipped")from pip_audit._fix import resolve_fix_versions
from pip_audit._audit import Auditor
from pip_audit._dependency_source import PipSource
from pip_audit._service import PyPIService
from packaging.version import Version
def choose_best_fix_version(vulnerabilities, available_versions):
"""
Custom logic to choose the best fix version from available options.
"""
# Get all fix versions from vulnerabilities
all_fix_versions = set()
for vuln in vulnerabilities:
all_fix_versions.update(vuln.fix_versions)
# Filter to versions that fix all vulnerabilities
valid_fixes = []
for version in all_fix_versions:
fixes_all = True
for vuln in vulnerabilities:
if version not in vuln.fix_versions:
fixes_all = False
break
if fixes_all:
valid_fixes.append(version)
# Choose the minimum version that fixes all issues
return min(valid_fixes) if valid_fixes else None
# Example usage
service = PyPIService()
source = PipSource()
auditor = Auditor(service=service)
# Get audit results and apply custom fix resolution
for dependency, vulnerabilities in auditor.audit(source):
if vulnerabilities:
# Use custom logic to determine best fix
best_fix = choose_best_fix_version(vulnerabilities, [])
if best_fix:
print(f"Recommended fix for {dependency.name}: v{best_fix}")
else:
print(f"No suitable fix found for {dependency.name}")Install with Tessl CLI
npx tessl i tessl/pypi-pip-audit