0
# Authentication & Authorization
1
2
BlackSheep provides comprehensive authentication and authorization features through integration with the `guardpost` library, supporting JWT tokens, cookie-based authentication, and flexible authorization policies.
3
4
## Authentication Overview
5
6
Authentication in BlackSheep is handled through authentication strategies that can validate user credentials from various sources like JWT tokens, cookies, or custom authentication mechanisms.
7
8
### Authentication Setup
9
10
```python { .api }
11
from blacksheep import Application
12
from blacksheep.server.authentication import AuthenticationStrategy
13
from guardpost import Identity
14
15
# Basic authentication setup
16
app = Application()
17
auth_strategy = app.use_authentication()
18
19
# Custom authentication handler
20
class CustomAuthHandler:
21
async def authenticate(self, context: Request) -> Optional[Identity]:
22
# Extract and validate credentials
23
auth_header = context.get_first_header(b"Authorization")
24
if auth_header:
25
# Validate token/credentials
26
user_id = validate_token(auth_header.decode())
27
if user_id:
28
return Identity({"sub": user_id, "name": f"User {user_id}"})
29
return None
30
31
auth_strategy.add(CustomAuthHandler())
32
```
33
34
### Authentication Decorators
35
36
```python { .api }
37
from blacksheep import auth, allow_anonymous
38
39
# Require authentication
40
@app.get("/protected")
41
@auth() # Requires authenticated user
42
async def protected_endpoint():
43
return json({"message": "You are authenticated"})
44
45
# Allow anonymous access (overrides global auth requirements)
46
@app.get("/public")
47
@allow_anonymous
48
async def public_endpoint():
49
return json({"message": "Public access"})
50
51
# Require specific authentication schemes
52
@app.get("/jwt-only")
53
@auth(authentication_schemes=["JWT Bearer"])
54
async def jwt_only_endpoint():
55
return json({"message": "JWT authenticated"})
56
```
57
58
## JWT Bearer Authentication
59
60
BlackSheep provides built-in support for JWT Bearer token authentication with OIDC integration.
61
62
### JWT Configuration
63
64
```python { .api }
65
from blacksheep.server.authentication.jwt import JWTBearerAuthentication
66
67
# Basic JWT setup with OIDC discovery
68
jwt_auth = JWTBearerAuthentication(
69
valid_audiences=["api", "myapp"],
70
authority="https://auth.example.com", # OIDC authority for key discovery
71
require_kid=True # Require key ID in JWT header
72
)
73
74
auth_strategy = app.use_authentication()
75
auth_strategy.add(jwt_auth)
76
77
# Manual key configuration
78
jwt_auth = JWTBearerAuthentication(
79
valid_audiences=["api"],
80
valid_issuers=["https://auth.example.com"],
81
keys_url="https://auth.example.com/.well-known/jwks.json",
82
cache_time=3600, # Cache keys for 1 hour
83
auth_mode="JWT Bearer"
84
)
85
```
86
87
### JWT Claims Access
88
89
```python { .api }
90
from guardpost import Identity
91
92
@app.get("/profile")
93
@auth()
94
async def get_profile(request: Request):
95
identity: Identity = request.identity
96
97
# Standard JWT claims
98
user_id = identity.id # Subject (sub claim)
99
claims = identity.claims # All claims dict
100
101
# Access specific claims
102
email = claims.get("email")
103
roles = claims.get("roles", [])
104
scopes = claims.get("scope", "").split()
105
106
return json({
107
"user_id": user_id,
108
"email": email,
109
"roles": roles,
110
"scopes": scopes
111
})
112
```
113
114
### Custom JWT Key Providers
115
116
```python { .api }
117
from typing import Dict, Any, Optional
118
119
class CustomKeysProvider:
120
"""Custom key provider for JWT validation"""
121
122
async def get_keys(self) -> Dict[str, Any]:
123
"""Return JWKS keys for token validation"""
124
# Fetch keys from custom source
125
return {
126
"keys": [
127
{
128
"kty": "RSA",
129
"kid": "key1",
130
"n": "...", # RSA modulus
131
"e": "AQAB" # RSA exponent
132
}
133
]
134
}
135
136
# Use custom key provider
137
jwt_auth = JWTBearerAuthentication(
138
valid_audiences=["api"],
139
keys_provider=CustomKeysProvider(),
140
require_kid=True
141
)
142
```
143
144
## Cookie Authentication
145
146
Cookie-based authentication for web applications with secure session management.
147
148
### Cookie Authentication Setup
149
150
```python { .api }
151
from blacksheep.server.authentication.cookie import CookieAuthentication
152
from blacksheep.sessions.crypto import Serializer
153
154
# Basic cookie authentication
155
cookie_auth = CookieAuthentication(
156
cookie_name="auth_session",
157
secret_keys=["your-secret-key-here"],
158
auth_scheme="Cookie"
159
)
160
161
auth_strategy = app.use_authentication()
162
auth_strategy.add(cookie_auth)
163
164
# Advanced cookie authentication with custom serializer
165
custom_serializer = Serializer(
166
secret_key="encryption-key",
167
signing_key="signing-key"
168
)
169
170
cookie_auth = CookieAuthentication(
171
cookie_name="secure_session",
172
serializer=custom_serializer,
173
auth_scheme="SecureCookie"
174
)
175
```
176
177
### Cookie Authentication Usage
178
179
```python { .api }
180
from blacksheep.cookies import Cookie
181
from datetime import datetime, timedelta
182
183
# Login endpoint that sets authentication cookie
184
@app.post("/login")
185
async def login(credentials: FromJSON[dict]):
186
username = credentials.value.get("username")
187
password = credentials.value.get("password")
188
189
# Validate credentials
190
user = await authenticate_user(username, password)
191
if not user:
192
raise Unauthorized("Invalid credentials")
193
194
# Create response
195
response = json({"message": "Logged in successfully"})
196
197
# Set authentication cookie
198
auth_data = {"user_id": user.id, "username": user.username}
199
cookie_auth.set_cookie(auth_data, response, secure=True)
200
201
return response
202
203
# Logout endpoint that removes cookie
204
@app.post("/logout")
205
@auth()
206
async def logout():
207
response = json({"message": "Logged out successfully"})
208
cookie_auth.unset_cookie(response)
209
return response
210
211
# Access authenticated user data
212
@app.get("/me")
213
@auth()
214
async def get_current_user(request: Request):
215
# User identity is automatically available
216
identity = request.identity
217
return json({
218
"user_id": identity.id,
219
"claims": identity.claims
220
})
221
```
222
223
## Authorization
224
225
Authorization in BlackSheep is policy-based, allowing fine-grained access control through roles, permissions, and custom requirements.
226
227
### Authorization Setup
228
229
```python { .api }
230
from blacksheep.server.authorization import AuthorizationStrategy
231
from guardpost.authorization import Policy, Requirement
232
233
# Basic authorization setup
234
app.use_authentication() # Authentication required first
235
authz_strategy = app.use_authorization()
236
237
# Define authorization policies
238
admin_policy = Policy("admin", Requirement("role:admin"))
239
moderator_policy = Policy("moderator", Requirement("role:moderator"))
240
api_access_policy = Policy("api_access", Requirement("scope:api.read"))
241
242
authz_strategy.add(admin_policy)
243
authz_strategy.add(moderator_policy)
244
authz_strategy.add(api_access_policy)
245
246
# Default policy (just requires authentication)
247
default_policy = Policy("authenticated", AuthenticatedRequirement())
248
authz_strategy.default_policy = default_policy
249
```
250
251
### Authorization Decorators
252
253
```python { .api }
254
# Require specific policy
255
@app.get("/admin/users")
256
@auth("admin") # Requires "admin" policy
257
async def admin_users():
258
return json({"admin_access": True})
259
260
# Require multiple policies (all must pass)
261
@app.get("/moderator/posts")
262
@auth(["moderator", "api_access"])
263
async def moderate_posts():
264
return json({"moderator_access": True})
265
266
# Default authenticated policy
267
@app.get("/profile")
268
@auth() # Uses default "authenticated" policy
269
async def user_profile():
270
return json({"user_access": True})
271
272
# Allow anonymous (bypass authorization)
273
@app.get("/public")
274
@allow_anonymous
275
async def public_access():
276
return json({"public": True})
277
```
278
279
### Custom Requirements
280
281
```python { .api }
282
from guardpost.authorization import Requirement, AsyncRequirement
283
from guardpost import Identity
284
285
class RoleRequirement(Requirement):
286
"""Require specific role"""
287
288
def __init__(self, required_role: str):
289
self.required_role = required_role
290
291
def handle(self, identity: Identity) -> bool:
292
user_roles = identity.claims.get("roles", [])
293
return self.required_role in user_roles
294
295
class PermissionRequirement(AsyncRequirement):
296
"""Require specific permission (async validation)"""
297
298
def __init__(self, permission: str):
299
self.permission = permission
300
301
async def handle(self, identity: Identity) -> bool:
302
user_id = identity.id
303
# Check permission in database
304
has_permission = await check_user_permission(user_id, self.permission)
305
return has_permission
306
307
# Use custom requirements
308
admin_role_policy = Policy("admin_role", RoleRequirement("admin"))
309
write_permission_policy = Policy("can_write", PermissionRequirement("posts:write"))
310
311
authz_strategy.add(admin_role_policy)
312
authz_strategy.add(write_permission_policy)
313
314
@app.post("/posts")
315
@auth("can_write") # Uses async permission check
316
async def create_post(post_data: FromJSON[dict]):
317
return json({"post_created": True})
318
```
319
320
### Resource-Based Authorization
321
322
```python { .api }
323
class ResourceOwnerRequirement(AsyncRequirement):
324
"""Check if user owns the resource"""
325
326
async def handle(self, identity: Identity, resource_id: str = None) -> bool:
327
if not resource_id:
328
return False
329
330
user_id = identity.id
331
resource = await get_resource(resource_id)
332
return resource and resource.owner_id == user_id
333
334
# Resource-specific authorization
335
@app.get("/posts/{post_id:int}")
336
@auth()
337
async def get_post(post_id: FromRoute[int], request: Request):
338
# Manual authorization check
339
identity = request.identity
340
post = await get_post_by_id(post_id.value)
341
342
if post.is_private and post.owner_id != identity.id:
343
raise Forbidden("Access denied to private post")
344
345
return json(post.to_dict())
346
```
347
348
## Authentication Challenges
349
350
Handle authentication challenges and provide appropriate responses for different authentication failures.
351
352
### Challenge Handling
353
354
```python { .api }
355
from blacksheep.server.authentication import AuthenticateChallenge
356
357
# Custom challenge handler
358
async def custom_auth_challenge_handler(app: Application, request: Request, exception: AuthenticateChallenge):
359
"""Handle authentication challenges"""
360
361
# Get WWW-Authenticate header
362
auth_header = exception.get_header()
363
364
# Custom challenge response
365
if request.path.startswith("/api/"):
366
# API endpoints get JSON response
367
response = Response(401, content=JSONContent({
368
"error": "authentication_required",
369
"message": "Valid authentication required",
370
"scheme": exception.scheme
371
}))
372
else:
373
# Web endpoints redirect to login
374
response = redirect("/login")
375
376
response.headers.add(*auth_header)
377
return response
378
379
# Register challenge handler
380
app.exception_handler(AuthenticateChallenge)(custom_auth_challenge_handler)
381
```
382
383
## Error Handling
384
385
Comprehensive error handling for authentication and authorization failures.
386
387
### Authentication Errors
388
389
```python { .api }
390
from blacksheep.exceptions import Unauthorized, Forbidden
391
from blacksheep.server.authorization import ForbiddenError
392
from guardpost.authorization import UnauthorizedError
393
394
# Handle 401 Unauthorized
395
@app.exception_handler(401)
396
async def handle_unauthorized(app: Application, request: Request, exc: HTTPException):
397
if request.path.startswith("/api/"):
398
return Response(401, content=JSONContent({
399
"error": "unauthorized",
400
"message": "Authentication required"
401
}))
402
else:
403
return redirect("/login")
404
405
# Handle 403 Forbidden
406
@app.exception_handler(403)
407
async def handle_forbidden(app: Application, request: Request, exc: HTTPException):
408
return Response(403, content=JSONContent({
409
"error": "forbidden",
410
"message": "Insufficient permissions"
411
}))
412
413
# Handle authorization errors
414
@app.exception_handler(UnauthorizedError)
415
async def handle_authz_error(app: Application, request: Request, exc: UnauthorizedError):
416
if isinstance(exc, ForbiddenError):
417
status = 403
418
message = "Access forbidden"
419
else:
420
status = 401
421
message = "Authentication required"
422
423
return Response(status, content=JSONContent({
424
"error": "authorization_failed",
425
"message": message
426
}))
427
```
428
429
## Session Management
430
431
Secure session management with encryption and signing for web applications.
432
433
### Session Configuration
434
435
```python { .api }
436
from blacksheep.sessions import SessionMiddleware
437
from blacksheep.sessions.crypto import Encryptor, Signer
438
439
# Basic session setup
440
app.use_sessions(
441
secret_key="your-secret-key-here",
442
session_cookie="session",
443
session_max_age=3600 # 1 hour
444
)
445
446
# Advanced session configuration
447
custom_encryptor = Encryptor("encryption-key")
448
custom_signer = Signer("signing-key")
449
450
session_middleware = SessionMiddleware(
451
secret_key="session-secret",
452
session_cookie="secure_session",
453
encryptor=custom_encryptor,
454
signer=custom_signer,
455
session_max_age=86400 # 24 hours
456
)
457
458
app.middlewares.append(session_middleware)
459
```
460
461
### Session Usage
462
463
```python { .api }
464
from blacksheep.sessions import Session
465
466
# Access session in handlers
467
@app.get("/dashboard")
468
async def dashboard(request: Request):
469
session: Session = request.session
470
471
# Get session data
472
user_id = session.get("user_id")
473
preferences = session.get("preferences", {})
474
475
if not user_id:
476
return redirect("/login")
477
478
return json({
479
"user_id": user_id,
480
"preferences": preferences
481
})
482
483
# Set session data
484
@app.post("/login")
485
async def login(credentials: FromJSON[dict]):
486
# Authenticate user
487
user = await authenticate_user(credentials.value)
488
if not user:
489
raise Unauthorized("Invalid credentials")
490
491
# Set session data
492
session = request.session
493
session["user_id"] = user.id
494
session["username"] = user.username
495
session["login_time"] = datetime.utcnow().isoformat()
496
497
return json({"message": "Login successful"})
498
499
# Clear session
500
@app.post("/logout")
501
async def logout(request: Request):
502
session = request.session
503
session.clear() # Clear all session data
504
return json({"message": "Logged out"})
505
```
506
507
## Complete Authentication Example
508
509
Here's a complete example showing authentication and authorization in a BlackSheep application:
510
511
```python { .api }
512
from blacksheep import Application, json, Request, FromJSON
513
from blacksheep.server.authentication.jwt import JWTBearerAuthentication
514
from blacksheep.server.authentication.cookie import CookieAuthentication
515
from blacksheep import auth, allow_anonymous
516
from guardpost.authorization import Policy, Requirement
517
from guardpost import Identity
518
519
app = Application()
520
521
# Setup authentication strategies
522
auth_strategy = app.use_authentication()
523
524
# JWT for API access
525
jwt_auth = JWTBearerAuthentication(
526
valid_audiences=["api"],
527
authority="https://auth.example.com"
528
)
529
auth_strategy.add(jwt_auth)
530
531
# Cookie auth for web interface
532
cookie_auth = CookieAuthentication(
533
cookie_name="webapp_session",
534
secret_keys=["webapp-secret-key"]
535
)
536
auth_strategy.add(cookie_auth)
537
538
# Setup authorization
539
authz_strategy = app.use_authorization()
540
authz_strategy.add(Policy("admin", Requirement("role:admin")))
541
authz_strategy.add(Policy("user", Requirement("role:user")))
542
543
# Public endpoints
544
@app.get("/")
545
@allow_anonymous
546
async def home():
547
return json({"message": "Welcome to BlackSheep API"})
548
549
# User endpoints (require authentication)
550
@app.get("/profile")
551
@auth()
552
async def get_profile(request: Request):
553
identity: Identity = request.identity
554
return json({
555
"user_id": identity.id,
556
"claims": identity.claims
557
})
558
559
# Admin endpoints (require admin role)
560
@app.get("/admin/users")
561
@auth("admin")
562
async def list_all_users():
563
# Only admins can access this
564
users = await get_all_users()
565
return json({"users": users})
566
567
# API endpoints (JWT only)
568
@app.get("/api/data")
569
@auth(authentication_schemes=["JWT Bearer"])
570
async def api_data():
571
return json({"data": "API response"})
572
573
# Run with: uvicorn main:app
574
```
575
576
This authentication and authorization system provides secure, flexible access control for both API and web application use cases with support for multiple authentication methods and fine-grained authorization policies.