0
# Request Validation
1
2
Comprehensive request validation interfaces for OAuth 1.0 and OAuth 2.0 flows. Provides extensible validation framework for implementing custom authentication logic, security policies, and token management.
3
4
## Capabilities
5
6
### OAuth 2.0 Request Validator
7
8
Base request validation interface that must be implemented to provide custom authentication and authorization logic for OAuth 2.0 servers.
9
10
```python { .api }
11
class RequestValidator:
12
def __init__(self): ...
13
14
# Client Authentication
15
def client_authentication_required(
16
self,
17
request,
18
*args,
19
**kwargs,
20
) -> bool:
21
"""
22
Determine if client authentication is required.
23
24
Returns:
25
True if client must authenticate, False otherwise
26
"""
27
28
def authenticate_client(self, request, *args, **kwargs) -> bool:
29
"""
30
Authenticate client credentials.
31
32
Parameters:
33
- request: OAuth request object with client credentials
34
35
Returns:
36
True if client is authenticated, False otherwise
37
"""
38
39
def authenticate_client_id(
40
self,
41
client_id: str,
42
request,
43
*args,
44
**kwargs,
45
) -> bool:
46
"""
47
Authenticate client by ID only (for public clients).
48
49
Parameters:
50
- client_id: Client identifier
51
- request: OAuth request object
52
53
Returns:
54
True if client ID is valid, False otherwise
55
"""
56
57
# Client Validation
58
def validate_client_id(
59
self,
60
client_id: str,
61
request,
62
*args,
63
**kwargs,
64
) -> bool:
65
"""
66
Validate client identifier.
67
68
Parameters:
69
- client_id: Client identifier to validate
70
- request: OAuth request object
71
72
Returns:
73
True if client ID is valid, False otherwise
74
"""
75
76
def validate_redirect_uri(
77
self,
78
client_id: str,
79
redirect_uri: str,
80
request,
81
*args,
82
**kwargs,
83
) -> bool:
84
"""
85
Validate redirect URI for client.
86
87
Parameters:
88
- client_id: Client identifier
89
- redirect_uri: Redirect URI to validate
90
- request: OAuth request object
91
92
Returns:
93
True if redirect URI is valid for client, False otherwise
94
"""
95
96
def get_default_redirect_uri(
97
self,
98
client_id: str,
99
request,
100
*args,
101
**kwargs,
102
) -> str:
103
"""
104
Get default redirect URI for client.
105
106
Parameters:
107
- client_id: Client identifier
108
- request: OAuth request object
109
110
Returns:
111
Default redirect URI for client
112
"""
113
114
# Scope Validation
115
def validate_scopes(
116
self,
117
client_id: str,
118
scopes: list[str],
119
client,
120
request,
121
*args,
122
**kwargs,
123
) -> bool:
124
"""
125
Validate requested scopes for client.
126
127
Parameters:
128
- client_id: Client identifier
129
- scopes: List of requested scopes
130
- client: Client object
131
- request: OAuth request object
132
133
Returns:
134
True if scopes are valid for client, False otherwise
135
"""
136
137
def get_default_scopes(
138
self,
139
client_id: str,
140
request,
141
*args,
142
**kwargs,
143
) -> list[str]:
144
"""
145
Get default scopes for client.
146
147
Parameters:
148
- client_id: Client identifier
149
- request: OAuth request object
150
151
Returns:
152
List of default scopes for client
153
"""
154
155
# Response Type Validation
156
def validate_response_type(
157
self,
158
client_id: str,
159
response_type: str,
160
client,
161
request,
162
*args,
163
**kwargs,
164
) -> bool:
165
"""
166
Validate response type for client.
167
168
Parameters:
169
- client_id: Client identifier
170
- response_type: Response type (code, token)
171
- client: Client object
172
- request: OAuth request object
173
174
Returns:
175
True if response type is valid for client, False otherwise
176
"""
177
178
# Authorization Code Validation
179
def validate_code(
180
self,
181
client_id: str,
182
code: str,
183
client,
184
request,
185
*args,
186
**kwargs,
187
) -> bool:
188
"""
189
Validate authorization code.
190
191
Parameters:
192
- client_id: Client identifier
193
- code: Authorization code to validate
194
- client: Client object
195
- request: OAuth request object
196
197
Returns:
198
True if code is valid, False otherwise
199
"""
200
201
def confirm_redirect_uri(
202
self,
203
client_id: str,
204
code: str,
205
redirect_uri: str,
206
client,
207
request,
208
*args,
209
**kwargs,
210
) -> bool:
211
"""
212
Confirm redirect URI matches original authorization request.
213
214
Parameters:
215
- client_id: Client identifier
216
- code: Authorization code
217
- redirect_uri: Redirect URI to confirm
218
- client: Client object
219
- request: OAuth request object
220
221
Returns:
222
True if redirect URI matches, False otherwise
223
"""
224
225
def save_authorization_code(
226
self,
227
client_id: str,
228
code: dict,
229
request,
230
*args,
231
**kwargs,
232
) -> None:
233
"""
234
Store authorization code for later validation.
235
236
Parameters:
237
- client_id: Client identifier
238
- code: Authorization code data
239
- request: OAuth request object
240
"""
241
242
def invalidate_authorization_code(
243
self,
244
client_id: str,
245
code: str,
246
request,
247
*args,
248
**kwargs,
249
) -> None:
250
"""
251
Invalidate used authorization code.
252
253
Parameters:
254
- client_id: Client identifier
255
- code: Authorization code to invalidate
256
- request: OAuth request object
257
"""
258
259
# Grant Type Validation
260
def validate_grant_type(
261
self,
262
client_id: str,
263
grant_type: str,
264
client,
265
request,
266
*args,
267
**kwargs,
268
) -> bool:
269
"""
270
Validate grant type for client.
271
272
Parameters:
273
- client_id: Client identifier
274
- grant_type: Grant type to validate
275
- client: Client object
276
- request: OAuth request object
277
278
Returns:
279
True if grant type is valid for client, False otherwise
280
"""
281
282
# User Credentials Validation (Password Grant)
283
def validate_user(
284
self,
285
username: str,
286
password: str,
287
client,
288
request,
289
*args,
290
**kwargs,
291
) -> bool:
292
"""
293
Validate user credentials for password grant.
294
295
Parameters:
296
- username: User's username
297
- password: User's password
298
- client: Client object
299
- request: OAuth request object
300
301
Returns:
302
True if credentials are valid, False otherwise
303
"""
304
305
# Token Validation
306
def validate_bearer_token(
307
self,
308
token: str,
309
scopes: list[str],
310
request,
311
) -> bool:
312
"""
313
Validate bearer token and required scopes.
314
315
Parameters:
316
- token: Bearer token to validate
317
- scopes: Required scopes for resource access
318
- request: OAuth request object
319
320
Returns:
321
True if token is valid and has required scopes, False otherwise
322
"""
323
324
def validate_refresh_token(
325
self,
326
refresh_token: str,
327
client,
328
request,
329
*args,
330
**kwargs,
331
) -> bool:
332
"""
333
Validate refresh token.
334
335
Parameters:
336
- refresh_token: Refresh token to validate
337
- client: Client object
338
- request: OAuth request object
339
340
Returns:
341
True if refresh token is valid, False otherwise
342
"""
343
344
def get_original_scopes(
345
self,
346
refresh_token: str,
347
request,
348
*args,
349
**kwargs,
350
) -> list[str]:
351
"""
352
Get original scopes associated with refresh token.
353
354
Parameters:
355
- refresh_token: Refresh token
356
- request: OAuth request object
357
358
Returns:
359
List of original scopes
360
"""
361
362
def is_within_original_scope(
363
self,
364
request_scopes: list[str],
365
refresh_token: str,
366
request,
367
*args,
368
**kwargs,
369
) -> bool:
370
"""
371
Check if requested scopes are within original scope.
372
373
Parameters:
374
- request_scopes: Requested scopes
375
- refresh_token: Refresh token
376
- request: OAuth request object
377
378
Returns:
379
True if requested scopes are subset of original, False otherwise
380
"""
381
382
# Token Storage
383
def save_token(
384
self,
385
token: dict,
386
request,
387
*args,
388
**kwargs,
389
) -> None:
390
"""
391
Store access token.
392
393
Parameters:
394
- token: Token data to store
395
- request: OAuth request object
396
"""
397
398
def save_bearer_token(
399
self,
400
token: dict,
401
request,
402
*args,
403
**kwargs,
404
) -> None:
405
"""
406
Store bearer token.
407
408
Parameters:
409
- token: Bearer token data to store
410
- request: OAuth request object
411
"""
412
413
# Token Management
414
def revoke_token(
415
self,
416
token: str,
417
token_type_hint: str,
418
request,
419
*args,
420
**kwargs,
421
) -> None:
422
"""
423
Revoke access or refresh token.
424
425
Parameters:
426
- token: Token to revoke
427
- token_type_hint: Type of token (access_token, refresh_token)
428
- request: OAuth request object
429
"""
430
431
def rotate_refresh_token(self, request) -> bool:
432
"""
433
Determine if refresh token should be rotated.
434
435
Parameters:
436
- request: OAuth request object
437
438
Returns:
439
True if refresh token should be rotated, False otherwise
440
"""
441
442
def introspect_token(
443
self,
444
token: str,
445
token_type_hint: str,
446
request,
447
*args,
448
**kwargs,
449
) -> dict | None:
450
"""
451
Get token metadata for introspection.
452
453
Parameters:
454
- token: Token to introspect
455
- token_type_hint: Type of token
456
- request: OAuth request object
457
458
Returns:
459
Dictionary with token metadata or None if invalid
460
"""
461
462
# PKCE Support
463
def is_pkce_required(self, client_id: str, request) -> bool:
464
"""
465
Determine if PKCE is required for client.
466
467
Parameters:
468
- client_id: Client identifier
469
- request: OAuth request object
470
471
Returns:
472
True if PKCE is required, False otherwise
473
"""
474
475
def get_code_challenge(self, code: str, request) -> str:
476
"""
477
Get PKCE code challenge for authorization code.
478
479
Parameters:
480
- code: Authorization code
481
- request: OAuth request object
482
483
Returns:
484
PKCE code challenge
485
"""
486
487
def get_code_challenge_method(self, code: str, request) -> str:
488
"""
489
Get PKCE code challenge method for authorization code.
490
491
Parameters:
492
- code: Authorization code
493
- request: OAuth request object
494
495
Returns:
496
PKCE code challenge method (plain, S256)
497
"""
498
499
# CORS Support
500
def is_origin_allowed(
501
self,
502
client_id: str,
503
origin: str,
504
request,
505
*args,
506
**kwargs,
507
) -> bool:
508
"""
509
Check if origin is allowed for CORS requests.
510
511
Parameters:
512
- client_id: Client identifier
513
- origin: Request origin
514
- request: OAuth request object
515
516
Returns:
517
True if origin is allowed, False otherwise
518
"""
519
```
520
521
## Implementation Example
522
523
```python
524
from oauthlib.oauth2 import RequestValidator
525
import hashlib
526
import secrets
527
from datetime import datetime, timedelta
528
529
class DatabaseRequestValidator(RequestValidator):
530
"""Complete request validator implementation using database storage."""
531
532
def __init__(self, db_session):
533
self.db = db_session
534
535
# Client Management
536
def validate_client_id(self, client_id, request, *args, **kwargs):
537
client = self.db.query(Client).filter_by(client_id=client_id).first()
538
return client is not None and client.is_active
539
540
def authenticate_client(self, request, *args, **kwargs):
541
client_id = getattr(request, 'client_id', None)
542
client_secret = getattr(request, 'client_secret', None)
543
544
if not client_id or not client_secret:
545
return False
546
547
client = self.db.query(Client).filter_by(client_id=client_id).first()
548
if not client or not client.is_active:
549
return False
550
551
# Use constant-time comparison for security
552
return secrets.compare_digest(client.client_secret, client_secret)
553
554
def client_authentication_required(self, request, *args, **kwargs):
555
# Require authentication for confidential clients
556
client = self.db.query(Client).filter_by(
557
client_id=request.client_id
558
).first()
559
return client and client.client_type == 'confidential'
560
561
# Redirect URI Validation
562
def validate_redirect_uri(self, client_id, redirect_uri, request, *args, **kwargs):
563
client = self.db.query(Client).filter_by(client_id=client_id).first()
564
if not client:
565
return False
566
567
return redirect_uri in [uri.uri for uri in client.redirect_uris]
568
569
def get_default_redirect_uri(self, client_id, request, *args, **kwargs):
570
client = self.db.query(Client).filter_by(client_id=client_id).first()
571
if client and client.redirect_uris:
572
return client.redirect_uris[0].uri
573
return None
574
575
# Scope Management
576
def validate_scopes(self, client_id, scopes, client, request, *args, **kwargs):
577
client_obj = self.db.query(Client).filter_by(client_id=client_id).first()
578
if not client_obj:
579
return False
580
581
allowed_scopes = {scope.name for scope in client_obj.allowed_scopes}
582
return all(scope in allowed_scopes for scope in scopes)
583
584
def get_default_scopes(self, client_id, request, *args, **kwargs):
585
client = self.db.query(Client).filter_by(client_id=client_id).first()
586
if client:
587
return [scope.name for scope in client.default_scopes]
588
return []
589
590
# Authorization Code Management
591
def save_authorization_code(self, client_id, code, request, *args, **kwargs):
592
auth_code = AuthorizationCode(
593
client_id=client_id,
594
user_id=request.user.id,
595
code=code['code'],
596
redirect_uri=request.redirect_uri,
597
scopes=' '.join(request.scopes),
598
code_challenge=getattr(request, 'code_challenge', None),
599
code_challenge_method=getattr(request, 'code_challenge_method', None),
600
expires_at=datetime.utcnow() + timedelta(minutes=10)
601
)
602
self.db.add(auth_code)
603
self.db.commit()
604
605
def validate_code(self, client_id, code, client, request, *args, **kwargs):
606
auth_code = self.db.query(AuthorizationCode).filter_by(
607
client_id=client_id,
608
code=code
609
).first()
610
611
if not auth_code:
612
return False
613
614
if auth_code.expires_at < datetime.utcnow():
615
return False
616
617
# Validate PKCE if present
618
if auth_code.code_challenge:
619
code_verifier = getattr(request, 'code_verifier', None)
620
if not code_verifier:
621
return False
622
623
if auth_code.code_challenge_method == 'S256':
624
challenge = hashlib.sha256(code_verifier.encode()).hexdigest()
625
else:
626
challenge = code_verifier
627
628
if not secrets.compare_digest(auth_code.code_challenge, challenge):
629
return False
630
631
return True
632
633
def confirm_redirect_uri(self, client_id, code, redirect_uri, client, request, *args, **kwargs):
634
auth_code = self.db.query(AuthorizationCode).filter_by(
635
client_id=client_id,
636
code=code
637
).first()
638
639
return auth_code and auth_code.redirect_uri == redirect_uri
640
641
def invalidate_authorization_code(self, client_id, code, request, *args, **kwargs):
642
auth_code = self.db.query(AuthorizationCode).filter_by(
643
client_id=client_id,
644
code=code
645
).first()
646
647
if auth_code:
648
self.db.delete(auth_code)
649
self.db.commit()
650
651
# Token Management
652
def save_bearer_token(self, token, request, *args, **kwargs):
653
access_token = AccessToken(
654
client_id=request.client_id,
655
user_id=getattr(request, 'user_id', None),
656
token=token['access_token'],
657
scopes=token.get('scope', ''),
658
expires_at=datetime.utcnow() + timedelta(seconds=token['expires_in'])
659
)
660
self.db.add(access_token)
661
662
if 'refresh_token' in token:
663
refresh_token = RefreshToken(
664
client_id=request.client_id,
665
user_id=getattr(request, 'user_id', None),
666
token=token['refresh_token'],
667
scopes=token.get('scope', ''),
668
access_token=access_token
669
)
670
self.db.add(refresh_token)
671
672
self.db.commit()
673
674
def validate_bearer_token(self, token, scopes, request):
675
access_token = self.db.query(AccessToken).filter_by(
676
token=token
677
).first()
678
679
if not access_token:
680
return False
681
682
if access_token.expires_at < datetime.utcnow():
683
return False
684
685
token_scopes = access_token.scopes.split()
686
if not all(scope in token_scopes for scope in scopes):
687
return False
688
689
# Add user info to request
690
request.user_id = access_token.user_id
691
request.client_id = access_token.client_id
692
request.scopes = token_scopes
693
694
return True
695
696
def validate_refresh_token(self, refresh_token, client, request, *args, **kwargs):
697
token = self.db.query(RefreshToken).filter_by(
698
token=refresh_token,
699
client_id=request.client_id
700
).first()
701
702
return token is not None
703
704
# User Authentication (Password Grant)
705
def validate_user(self, username, password, client, request, *args, **kwargs):
706
user = self.db.query(User).filter_by(username=username).first()
707
if not user or not user.is_active:
708
return False
709
710
# Use secure password verification
711
if verify_password(password, user.password_hash):
712
request.user_id = user.id
713
return True
714
715
return False
716
717
# Grant Type Validation
718
def validate_grant_type(self, client_id, grant_type, client, request, *args, **kwargs):
719
client_obj = self.db.query(Client).filter_by(client_id=client_id).first()
720
if not client_obj:
721
return False
722
723
allowed_grants = {grant.grant_type for grant in client_obj.allowed_grants}
724
return grant_type in allowed_grants
725
726
# PKCE Support
727
def is_pkce_required(self, client_id, request):
728
client = self.db.query(Client).filter_by(client_id=client_id).first()
729
return client and client.require_pkce
730
731
# Token Introspection
732
def introspect_token(self, token, token_type_hint, request, *args, **kwargs):
733
if token_type_hint == 'refresh_token':
734
token_obj = self.db.query(RefreshToken).filter_by(token=token).first()
735
else:
736
token_obj = self.db.query(AccessToken).filter_by(token=token).first()
737
738
if not token_obj:
739
return {'active': False}
740
741
if hasattr(token_obj, 'expires_at') and token_obj.expires_at < datetime.utcnow():
742
return {'active': False}
743
744
return {
745
'active': True,
746
'client_id': token_obj.client_id,
747
'username': token_obj.user.username if token_obj.user else None,
748
'scope': token_obj.scopes,
749
'exp': int(token_obj.expires_at.timestamp()) if hasattr(token_obj, 'expires_at') else None,
750
'token_type': 'Bearer' if isinstance(token_obj, AccessToken) else 'refresh_token'
751
}
752
```