0
# Vulnerability Services
1
2
Services that provide vulnerability information for Python packages, supporting multiple backends including PyPI and OSV (Open Source Vulnerabilities).
3
4
## Capabilities
5
6
### Base Interface
7
8
Abstract base class for all vulnerability services.
9
10
```python { .api }
11
class VulnerabilityService(ABC):
12
"""
13
Represents an abstract vulnerability service.
14
15
Individual concrete vulnerability services are expected to subclass
16
VulnerabilityService and implement it in their terms.
17
"""
18
19
@abstractmethod
20
def query(self, spec: Dependency) -> tuple[Dependency, list[VulnerabilityResult]]:
21
"""
22
Query the service for vulnerabilities in the given dependency.
23
24
Parameters:
25
- spec: Dependency, the dependency to query for vulnerabilities
26
27
Returns:
28
Tuple of (dependency, list of vulnerability results)
29
"""
30
31
def query_all(
32
self, specs: Iterator[Dependency]
33
) -> Iterator[tuple[Dependency, list[VulnerabilityResult]]]:
34
"""
35
Query the vulnerability service for information on multiple dependencies.
36
37
VulnerabilityService implementations can override this with a more optimized
38
implementation if they support batched or bulk requests.
39
40
Parameters:
41
- specs: Iterator[Dependency], iterator of dependencies to query
42
43
Returns:
44
Iterator of (dependency, vulnerabilities) tuples
45
"""
46
```
47
48
### PyPI Service
49
50
Vulnerability service that uses the PyPI JSON API to query for known vulnerabilities.
51
52
```python { .api }
53
class PyPIService(VulnerabilityService):
54
"""
55
An implementation of VulnerabilityService that uses PyPI to provide Python
56
package vulnerability information.
57
"""
58
59
def __init__(self, cache_dir: Path | None = None, timeout: int | None = None):
60
"""
61
Create a new PyPIService.
62
63
Parameters:
64
- cache_dir: Path | None, optional cache directory for PyPI API requests
65
- timeout: int | None, timeout in seconds for network requests
66
"""
67
68
def query(self, spec: Dependency) -> tuple[Dependency, list[VulnerabilityResult]]:
69
"""
70
Query PyPI for vulnerabilities in the given package.
71
72
Parameters:
73
- spec: Dependency, the package to query
74
75
Returns:
76
Tuple of (dependency, vulnerabilities)
77
"""
78
```
79
80
### OSV Service
81
82
Vulnerability service that uses the OSV (Open Source Vulnerabilities) database.
83
84
```python { .api }
85
class OsvService(VulnerabilityService):
86
"""
87
An implementation of VulnerabilityService that uses OSV to provide vulnerability
88
information.
89
"""
90
91
def __init__(self, cache_dir: Path | None = None, timeout: int | None = None):
92
"""
93
Create a new OsvService.
94
95
Parameters:
96
- cache_dir: Path | None, optional cache directory for OSV API requests
97
- timeout: int | None, timeout in seconds for network requests
98
"""
99
100
def query(self, spec: Dependency) -> tuple[Dependency, list[VulnerabilityResult]]:
101
"""
102
Query OSV for vulnerabilities in the given package.
103
104
Parameters:
105
- spec: Dependency, the package to query
106
107
Returns:
108
Tuple of (dependency, vulnerabilities)
109
"""
110
```
111
112
### Exceptions
113
114
Exceptions raised by vulnerability services.
115
116
```python { .api }
117
class ServiceError(Exception):
118
"""
119
Base exception for vulnerability service errors.
120
"""
121
122
class ConnectionError(ServiceError):
123
"""
124
Raised when a vulnerability service fails to connect or times out.
125
"""
126
```
127
128
## Usage Examples
129
130
### Using PyPI Service
131
132
```python
133
from pip_audit._service import PyPIService
134
from pip_audit._service.interface import ResolvedDependency
135
from packaging.version import Version
136
137
# Create service
138
service = PyPIService()
139
140
# Query for vulnerabilities
141
dependency = ResolvedDependency(name="requests", version=Version("2.25.0"))
142
dep, vulnerabilities = service.query(dependency)
143
144
if vulnerabilities:
145
print(f"Found {len(vulnerabilities)} vulnerabilities in {dep.name}")
146
for vuln in vulnerabilities:
147
print(f" {vuln.id}: {vuln.description}")
148
if vuln.fix_versions:
149
print(f" Fix versions: {[str(v) for v in vuln.fix_versions]}")
150
```
151
152
### Using OSV Service
153
154
```python
155
from pip_audit._service import OsvService
156
from pip_audit._service.interface import ResolvedDependency
157
from packaging.version import Version
158
159
# Create service with custom timeout
160
service = OsvService(timeout=30)
161
162
# Query for vulnerabilities
163
dependency = ResolvedDependency(name="django", version=Version("3.0.0"))
164
dep, vulnerabilities = service.query(dependency)
165
166
for vuln in vulnerabilities:
167
print(f"Vulnerability: {vuln.id}")
168
print(f"Description: {vuln.description}")
169
print(f"Aliases: {vuln.aliases}")
170
```
171
172
### Service Comparison
173
174
```python
175
from pip_audit._service import PyPIService, OsvService
176
from pip_audit._service.interface import ResolvedDependency
177
from packaging.version import Version
178
179
# Compare results from different services
180
dependency = ResolvedDependency(name="flask", version=Version("1.0.0"))
181
182
pypi_service = PyPIService()
183
osv_service = OsvService()
184
185
_, pypi_vulns = pypi_service.query(dependency)
186
_, osv_vulns = osv_service.query(dependency)
187
188
print(f"PyPI found {len(pypi_vulns)} vulnerabilities")
189
print(f"OSV found {len(osv_vulns)} vulnerabilities")
190
191
# Combine results
192
all_vulns = pypi_vulns + osv_vulns
193
unique_ids = set(vuln.id for vuln in all_vulns)
194
print(f"Total unique vulnerabilities: {len(unique_ids)}")
195
```