0
# Security & Authentication
1
2
Built-in security handlers for common authentication methods with support for Bearer tokens, Basic auth, API keys, and OAuth2. Connexion provides automatic security validation based on OpenAPI security schemes.
3
4
## Capabilities
5
6
### Built-in Security Handlers
7
8
Pre-implemented security handlers for common authentication patterns.
9
10
```python { .api }
11
def bearer_auth(token: str) -> dict:
12
"""
13
Bearer token authentication handler.
14
15
Parameters:
16
- token: Bearer token from Authorization header
17
18
Returns:
19
dict: Token information or user context
20
21
Raises:
22
UnauthorizedProblem: If token is invalid
23
"""
24
25
def basic_auth(username: str, password: str) -> dict:
26
"""
27
HTTP Basic authentication handler.
28
29
Parameters:
30
- username: Username from Authorization header
31
- password: Password from Authorization header
32
33
Returns:
34
dict: User information or authentication context
35
36
Raises:
37
UnauthorizedProblem: If credentials are invalid
38
"""
39
40
def api_key_auth(api_key: str, required_scopes: list = None) -> dict:
41
"""
42
API key authentication handler.
43
44
Parameters:
45
- api_key: API key from header, query, or cookie
46
- required_scopes: List of required scopes for the operation
47
48
Returns:
49
dict: API key information or user context
50
51
Raises:
52
UnauthorizedProblem: If API key is invalid
53
ForbiddenProblem: If required scopes are not met
54
"""
55
56
def oauth2_auth(token: str, required_scopes: list) -> dict:
57
"""
58
OAuth2 authentication handler.
59
60
Parameters:
61
- token: OAuth2 access token
62
- required_scopes: List of required OAuth2 scopes
63
64
Returns:
65
dict: Token information with user context
66
67
Raises:
68
UnauthorizedProblem: If token is invalid
69
ForbiddenProblem: If required scopes are not met
70
"""
71
```
72
73
### Security Configuration
74
75
Configure security handlers for different authentication schemes.
76
77
```python { .api }
78
# Security handler mapping
79
SECURITY_HANDLERS = {
80
'bearerAuth': bearer_auth,
81
'basicAuth': basic_auth,
82
'apiKeyAuth': api_key_auth,
83
'oauth2': oauth2_auth
84
}
85
86
def configure_security(app, security_map: dict = None):
87
"""
88
Configure security handlers for the application.
89
90
Parameters:
91
- app: Connexion application instance
92
- security_map: Custom security handler mapping
93
"""
94
```
95
96
### Authentication Context
97
98
Access authentication information in endpoint functions.
99
100
```python { .api }
101
from connexion.context import request
102
103
# Access authenticated user information
104
user_info = request.context.get('user')
105
token_info = request.context.get('token_info')
106
```
107
108
### Security Exceptions
109
110
Exceptions for authentication and authorization failures.
111
112
```python { .api }
113
class UnauthorizedProblem(ProblemException):
114
"""401 Unauthorized - Authentication required or failed"""
115
def __init__(self, title: str = "Unauthorized", **kwargs):
116
super().__init__(status=401, title=title, **kwargs)
117
118
class ForbiddenProblem(ProblemException):
119
"""403 Forbidden - Insufficient permissions"""
120
def __init__(self, title: str = "Forbidden", **kwargs):
121
super().__init__(status=403, title=title, **kwargs)
122
```
123
124
## Usage Examples
125
126
### Bearer Token Authentication
127
128
```python
129
def bearer_auth(token):
130
"""Validate bearer token and return user info"""
131
# Validate token (e.g., JWT verification)
132
try:
133
user_id = verify_jwt_token(token)
134
user = get_user_by_id(user_id)
135
136
if not user:
137
raise UnauthorizedProblem("Invalid token")
138
139
return {
140
'user_id': user.id,
141
'username': user.username,
142
'roles': user.roles
143
}
144
except InvalidTokenError:
145
raise UnauthorizedProblem("Invalid or expired token")
146
147
# OpenAPI security scheme:
148
# securitySchemes:
149
# bearerAuth:
150
# type: http
151
# scheme: bearer
152
# bearerFormat: JWT
153
154
# Operation security:
155
# security:
156
# - bearerAuth: []
157
158
# Endpoint implementation
159
def get_protected_resource():
160
# Access authenticated user
161
user = request.context['user']
162
return {"message": f"Hello {user['username']}"}
163
```
164
165
### API Key Authentication
166
167
```python
168
def api_key_auth(api_key, required_scopes=None):
169
"""Validate API key and check scopes"""
170
# Look up API key in database
171
key_info = get_api_key_info(api_key)
172
173
if not key_info or not key_info.active:
174
raise UnauthorizedProblem("Invalid API key")
175
176
# Check if key has expired
177
if key_info.expires_at and key_info.expires_at < datetime.utcnow():
178
raise UnauthorizedProblem("API key has expired")
179
180
# Check required scopes
181
if required_scopes:
182
if not set(required_scopes).issubset(set(key_info.scopes)):
183
raise ForbiddenProblem("Insufficient API key permissions")
184
185
return {
186
'api_key_id': key_info.id,
187
'scopes': key_info.scopes,
188
'client_name': key_info.client_name
189
}
190
191
# OpenAPI security scheme:
192
# securitySchemes:
193
# apiKey:
194
# type: apiKey
195
# in: header
196
# name: X-API-Key
197
198
# Usage in endpoints
199
def create_resource():
200
# Access API key info
201
api_info = request.context['token_info']
202
client_name = api_info['client_name']
203
204
# Create resource...
205
return {"created_by": client_name}, 201
206
```
207
208
### OAuth2 Authentication
209
210
```python
211
def oauth2_auth(token, required_scopes):
212
"""Validate OAuth2 token and check scopes"""
213
# Verify token with OAuth2 provider
214
try:
215
token_info = verify_oauth2_token(token)
216
217
# Check if token has required scopes
218
token_scopes = set(token_info.get('scope', '').split())
219
required_scopes_set = set(required_scopes)
220
221
if not required_scopes_set.issubset(token_scopes):
222
raise ForbiddenProblem(
223
"Insufficient scope",
224
detail=f"Required: {required_scopes}, Available: {list(token_scopes)}"
225
)
226
227
return {
228
'user_id': token_info['sub'],
229
'scopes': list(token_scopes),
230
'client_id': token_info['client_id']
231
}
232
233
except TokenVerificationError:
234
raise UnauthorizedProblem("Invalid OAuth2 token")
235
236
# OpenAPI security scheme:
237
# securitySchemes:
238
# oauth2:
239
# type: oauth2
240
# flows:
241
# authorizationCode:
242
# authorizationUrl: https://auth.example.com/oauth/authorize
243
# tokenUrl: https://auth.example.com/oauth/token
244
# scopes:
245
# read: Read access
246
# write: Write access
247
# admin: Admin access
248
```
249
250
### Basic Authentication
251
252
```python
253
def basic_auth(username, password):
254
"""Validate username/password credentials"""
255
# Authenticate user
256
user = authenticate_user(username, password)
257
258
if not user:
259
raise UnauthorizedProblem("Invalid credentials")
260
261
if not user.active:
262
raise ForbiddenProblem("Account disabled")
263
264
return {
265
'user_id': user.id,
266
'username': user.username,
267
'roles': user.roles
268
}
269
270
def authenticate_user(username, password):
271
"""Authenticate user credentials"""
272
user = get_user_by_username(username)
273
if user and verify_password(password, user.password_hash):
274
return user
275
return None
276
277
# OpenAPI security scheme:
278
# securitySchemes:
279
# basicAuth:
280
# type: http
281
# scheme: basic
282
```
283
284
### Custom Security Handler
285
286
```python
287
def custom_jwt_auth(token):
288
"""Custom JWT authentication with additional claims"""
289
try:
290
# Decode JWT with custom logic
291
payload = jwt.decode(
292
token,
293
JWT_SECRET,
294
algorithms=['HS256'],
295
options={'verify_exp': True}
296
)
297
298
# Check custom claims
299
if payload.get('iss') != 'your-app':
300
raise UnauthorizedProblem("Invalid token issuer")
301
302
# Check user permissions
303
user_id = payload['sub']
304
permissions = get_user_permissions(user_id)
305
306
return {
307
'user_id': user_id,
308
'permissions': permissions,
309
'token_claims': payload
310
}
311
312
except jwt.ExpiredSignatureError:
313
raise UnauthorizedProblem("Token has expired")
314
except jwt.InvalidTokenError:
315
raise UnauthorizedProblem("Invalid token")
316
317
# Register custom handler
318
security_map = {
319
'customJWT': custom_jwt_auth
320
}
321
322
app.add_api('api.yaml', security_map=security_map)
323
```
324
325
### Role-Based Access Control
326
327
```python
328
def require_role(required_role):
329
"""Decorator for role-based access control"""
330
def decorator(func):
331
def wrapper(*args, **kwargs):
332
user = request.context.get('user', {})
333
user_roles = user.get('roles', [])
334
335
if required_role not in user_roles:
336
raise ForbiddenProblem(
337
f"Role '{required_role}' required"
338
)
339
340
return func(*args, **kwargs)
341
return wrapper
342
return decorator
343
344
# Usage in endpoints
345
@require_role('admin')
346
def delete_user(user_id):
347
# Only admin users can delete users
348
delete_user_by_id(user_id)
349
return NoContent, 204
350
351
# Alternative: Check in function
352
def update_user(user_id):
353
user = request.context['user']
354
355
# Users can only update their own profile unless they're admin
356
if user['user_id'] != user_id and 'admin' not in user['roles']:
357
raise ForbiddenProblem("Can only update your own profile")
358
359
# Update user...
360
return {"status": "updated"}
361
```
362
363
### Multiple Authentication Methods
364
365
```python
366
# Support multiple authentication methods in OpenAPI
367
# security:
368
# - bearerAuth: []
369
# - apiKey: []
370
# - basicAuth: []
371
372
def flexible_auth_handler():
373
"""Handle multiple authentication methods"""
374
auth_header = request.headers.get('Authorization', '')
375
376
if auth_header.startswith('Bearer '):
377
token = auth_header[7:]
378
return bearer_auth(token)
379
elif auth_header.startswith('Basic '):
380
# Extract credentials
381
import base64
382
credentials = base64.b64decode(auth_header[6:]).decode()
383
username, password = credentials.split(':', 1)
384
return basic_auth(username, password)
385
elif 'X-API-Key' in request.headers:
386
api_key = request.headers['X-API-Key']
387
return api_key_auth(api_key)
388
else:
389
raise UnauthorizedProblem("Authentication required")
390
391
# Configure multiple security handlers
392
security_map = {
393
'bearerAuth': bearer_auth,
394
'apiKey': api_key_auth,
395
'basicAuth': basic_auth,
396
'flexibleAuth': flexible_auth_handler
397
}
398
399
app.add_api('api.yaml', security_map=security_map)
400
```
401
402
### Security Middleware
403
404
```python
405
from connexion.middleware import SecurityMiddleware
406
407
class CustomSecurityMiddleware(SecurityMiddleware):
408
"""Custom security middleware with additional features"""
409
410
async def __call__(self, scope, receive, send):
411
# Add custom security headers
412
async def send_wrapper(message):
413
if message['type'] == 'http.response.start':
414
headers = dict(message.get('headers', []))
415
headers[b'x-frame-options'] = b'DENY'
416
headers[b'x-content-type-options'] = b'nosniff'
417
message['headers'] = list(headers.items())
418
await send(message)
419
420
await super().__call__(scope, receive, send_wrapper)
421
422
# Use custom middleware
423
app = AsyncApp(
424
__name__,
425
middlewares=[CustomSecurityMiddleware()]
426
)
427
```