or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

common-utilities.mddevice-flow.mderror-handling.mdindex.mdoauth1.mdoauth2-clients.mdoauth2-servers.mdopenid-connect.mdrequest-validation.mdtoken-management.md

request-validation.mddocs/

0

# Request Validation

1

2

Comprehensive request validation interfaces for OAuth 1.0 and OAuth 2.0 flows. Provides extensible validation framework for implementing custom authentication logic, security policies, and token management.

3

4

## Capabilities

5

6

### OAuth 2.0 Request Validator

7

8

Base request validation interface that must be implemented to provide custom authentication and authorization logic for OAuth 2.0 servers.

9

10

```python { .api }

11

class RequestValidator:

12

def __init__(self): ...

13

14

# Client Authentication

15

def client_authentication_required(

16

self,

17

request,

18

*args,

19

**kwargs,

20

) -> bool:

21

"""

22

Determine if client authentication is required.

23

24

Returns:

25

True if client must authenticate, False otherwise

26

"""

27

28

def authenticate_client(self, request, *args, **kwargs) -> bool:

29

"""

30

Authenticate client credentials.

31

32

Parameters:

33

- request: OAuth request object with client credentials

34

35

Returns:

36

True if client is authenticated, False otherwise

37

"""

38

39

def authenticate_client_id(

40

self,

41

client_id: str,

42

request,

43

*args,

44

**kwargs,

45

) -> bool:

46

"""

47

Authenticate client by ID only (for public clients).

48

49

Parameters:

50

- client_id: Client identifier

51

- request: OAuth request object

52

53

Returns:

54

True if client ID is valid, False otherwise

55

"""

56

57

# Client Validation

58

def validate_client_id(

59

self,

60

client_id: str,

61

request,

62

*args,

63

**kwargs,

64

) -> bool:

65

"""

66

Validate client identifier.

67

68

Parameters:

69

- client_id: Client identifier to validate

70

- request: OAuth request object

71

72

Returns:

73

True if client ID is valid, False otherwise

74

"""

75

76

def validate_redirect_uri(

77

self,

78

client_id: str,

79

redirect_uri: str,

80

request,

81

*args,

82

**kwargs,

83

) -> bool:

84

"""

85

Validate redirect URI for client.

86

87

Parameters:

88

- client_id: Client identifier

89

- redirect_uri: Redirect URI to validate

90

- request: OAuth request object

91

92

Returns:

93

True if redirect URI is valid for client, False otherwise

94

"""

95

96

def get_default_redirect_uri(

97

self,

98

client_id: str,

99

request,

100

*args,

101

**kwargs,

102

) -> str:

103

"""

104

Get default redirect URI for client.

105

106

Parameters:

107

- client_id: Client identifier

108

- request: OAuth request object

109

110

Returns:

111

Default redirect URI for client

112

"""

113

114

# Scope Validation

115

def validate_scopes(

116

self,

117

client_id: str,

118

scopes: list[str],

119

client,

120

request,

121

*args,

122

**kwargs,

123

) -> bool:

124

"""

125

Validate requested scopes for client.

126

127

Parameters:

128

- client_id: Client identifier

129

- scopes: List of requested scopes

130

- client: Client object

131

- request: OAuth request object

132

133

Returns:

134

True if scopes are valid for client, False otherwise

135

"""

136

137

def get_default_scopes(

138

self,

139

client_id: str,

140

request,

141

*args,

142

**kwargs,

143

) -> list[str]:

144

"""

145

Get default scopes for client.

146

147

Parameters:

148

- client_id: Client identifier

149

- request: OAuth request object

150

151

Returns:

152

List of default scopes for client

153

"""

154

155

# Response Type Validation

156

def validate_response_type(

157

self,

158

client_id: str,

159

response_type: str,

160

client,

161

request,

162

*args,

163

**kwargs,

164

) -> bool:

165

"""

166

Validate response type for client.

167

168

Parameters:

169

- client_id: Client identifier

170

- response_type: Response type (code, token)

171

- client: Client object

172

- request: OAuth request object

173

174

Returns:

175

True if response type is valid for client, False otherwise

176

"""

177

178

# Authorization Code Validation

179

def validate_code(

180

self,

181

client_id: str,

182

code: str,

183

client,

184

request,

185

*args,

186

**kwargs,

187

) -> bool:

188

"""

189

Validate authorization code.

190

191

Parameters:

192

- client_id: Client identifier

193

- code: Authorization code to validate

194

- client: Client object

195

- request: OAuth request object

196

197

Returns:

198

True if code is valid, False otherwise

199

"""

200

201

def confirm_redirect_uri(

202

self,

203

client_id: str,

204

code: str,

205

redirect_uri: str,

206

client,

207

request,

208

*args,

209

**kwargs,

210

) -> bool:

211

"""

212

Confirm redirect URI matches original authorization request.

213

214

Parameters:

215

- client_id: Client identifier

216

- code: Authorization code

217

- redirect_uri: Redirect URI to confirm

218

- client: Client object

219

- request: OAuth request object

220

221

Returns:

222

True if redirect URI matches, False otherwise

223

"""

224

225

def save_authorization_code(

226

self,

227

client_id: str,

228

code: dict,

229

request,

230

*args,

231

**kwargs,

232

) -> None:

233

"""

234

Store authorization code for later validation.

235

236

Parameters:

237

- client_id: Client identifier

238

- code: Authorization code data

239

- request: OAuth request object

240

"""

241

242

def invalidate_authorization_code(

243

self,

244

client_id: str,

245

code: str,

246

request,

247

*args,

248

**kwargs,

249

) -> None:

250

"""

251

Invalidate used authorization code.

252

253

Parameters:

254

- client_id: Client identifier

255

- code: Authorization code to invalidate

256

- request: OAuth request object

257

"""

258

259

# Grant Type Validation

260

def validate_grant_type(

261

self,

262

client_id: str,

263

grant_type: str,

264

client,

265

request,

266

*args,

267

**kwargs,

268

) -> bool:

269

"""

270

Validate grant type for client.

271

272

Parameters:

273

- client_id: Client identifier

274

- grant_type: Grant type to validate

275

- client: Client object

276

- request: OAuth request object

277

278

Returns:

279

True if grant type is valid for client, False otherwise

280

"""

281

282

# User Credentials Validation (Password Grant)

283

def validate_user(

284

self,

285

username: str,

286

password: str,

287

client,

288

request,

289

*args,

290

**kwargs,

291

) -> bool:

292

"""

293

Validate user credentials for password grant.

294

295

Parameters:

296

- username: User's username

297

- password: User's password

298

- client: Client object

299

- request: OAuth request object

300

301

Returns:

302

True if credentials are valid, False otherwise

303

"""

304

305

# Token Validation

306

def validate_bearer_token(

307

self,

308

token: str,

309

scopes: list[str],

310

request,

311

) -> bool:

312

"""

313

Validate bearer token and required scopes.

314

315

Parameters:

316

- token: Bearer token to validate

317

- scopes: Required scopes for resource access

318

- request: OAuth request object

319

320

Returns:

321

True if token is valid and has required scopes, False otherwise

322

"""

323

324

def validate_refresh_token(

325

self,

326

refresh_token: str,

327

client,

328

request,

329

*args,

330

**kwargs,

331

) -> bool:

332

"""

333

Validate refresh token.

334

335

Parameters:

336

- refresh_token: Refresh token to validate

337

- client: Client object

338

- request: OAuth request object

339

340

Returns:

341

True if refresh token is valid, False otherwise

342

"""

343

344

def get_original_scopes(

345

self,

346

refresh_token: str,

347

request,

348

*args,

349

**kwargs,

350

) -> list[str]:

351

"""

352

Get original scopes associated with refresh token.

353

354

Parameters:

355

- refresh_token: Refresh token

356

- request: OAuth request object

357

358

Returns:

359

List of original scopes

360

"""

361

362

def is_within_original_scope(

363

self,

364

request_scopes: list[str],

365

refresh_token: str,

366

request,

367

*args,

368

**kwargs,

369

) -> bool:

370

"""

371

Check if requested scopes are within original scope.

372

373

Parameters:

374

- request_scopes: Requested scopes

375

- refresh_token: Refresh token

376

- request: OAuth request object

377

378

Returns:

379

True if requested scopes are subset of original, False otherwise

380

"""

381

382

# Token Storage

383

def save_token(

384

self,

385

token: dict,

386

request,

387

*args,

388

**kwargs,

389

) -> None:

390

"""

391

Store access token.

392

393

Parameters:

394

- token: Token data to store

395

- request: OAuth request object

396

"""

397

398

def save_bearer_token(

399

self,

400

token: dict,

401

request,

402

*args,

403

**kwargs,

404

) -> None:

405

"""

406

Store bearer token.

407

408

Parameters:

409

- token: Bearer token data to store

410

- request: OAuth request object

411

"""

412

413

# Token Management

414

def revoke_token(

415

self,

416

token: str,

417

token_type_hint: str,

418

request,

419

*args,

420

**kwargs,

421

) -> None:

422

"""

423

Revoke access or refresh token.

424

425

Parameters:

426

- token: Token to revoke

427

- token_type_hint: Type of token (access_token, refresh_token)

428

- request: OAuth request object

429

"""

430

431

def rotate_refresh_token(self, request) -> bool:

432

"""

433

Determine if refresh token should be rotated.

434

435

Parameters:

436

- request: OAuth request object

437

438

Returns:

439

True if refresh token should be rotated, False otherwise

440

"""

441

442

def introspect_token(

443

self,

444

token: str,

445

token_type_hint: str,

446

request,

447

*args,

448

**kwargs,

449

) -> dict | None:

450

"""

451

Get token metadata for introspection.

452

453

Parameters:

454

- token: Token to introspect

455

- token_type_hint: Type of token

456

- request: OAuth request object

457

458

Returns:

459

Dictionary with token metadata or None if invalid

460

"""

461

462

# PKCE Support

463

def is_pkce_required(self, client_id: str, request) -> bool:

464

"""

465

Determine if PKCE is required for client.

466

467

Parameters:

468

- client_id: Client identifier

469

- request: OAuth request object

470

471

Returns:

472

True if PKCE is required, False otherwise

473

"""

474

475

def get_code_challenge(self, code: str, request) -> str:

476

"""

477

Get PKCE code challenge for authorization code.

478

479

Parameters:

480

- code: Authorization code

481

- request: OAuth request object

482

483

Returns:

484

PKCE code challenge

485

"""

486

487

def get_code_challenge_method(self, code: str, request) -> str:

488

"""

489

Get PKCE code challenge method for authorization code.

490

491

Parameters:

492

- code: Authorization code

493

- request: OAuth request object

494

495

Returns:

496

PKCE code challenge method (plain, S256)

497

"""

498

499

# CORS Support

500

def is_origin_allowed(

501

self,

502

client_id: str,

503

origin: str,

504

request,

505

*args,

506

**kwargs,

507

) -> bool:

508

"""

509

Check if origin is allowed for CORS requests.

510

511

Parameters:

512

- client_id: Client identifier

513

- origin: Request origin

514

- request: OAuth request object

515

516

Returns:

517

True if origin is allowed, False otherwise

518

"""

519

```

520

521

## Implementation Example

522

523

```python

524

from oauthlib.oauth2 import RequestValidator

525

import hashlib

526

import secrets

527

from datetime import datetime, timedelta

528

529

class DatabaseRequestValidator(RequestValidator):

530

"""Complete request validator implementation using database storage."""

531

532

def __init__(self, db_session):

533

self.db = db_session

534

535

# Client Management

536

def validate_client_id(self, client_id, request, *args, **kwargs):

537

client = self.db.query(Client).filter_by(client_id=client_id).first()

538

return client is not None and client.is_active

539

540

def authenticate_client(self, request, *args, **kwargs):

541

client_id = getattr(request, 'client_id', None)

542

client_secret = getattr(request, 'client_secret', None)

543

544

if not client_id or not client_secret:

545

return False

546

547

client = self.db.query(Client).filter_by(client_id=client_id).first()

548

if not client or not client.is_active:

549

return False

550

551

# Use constant-time comparison for security

552

return secrets.compare_digest(client.client_secret, client_secret)

553

554

def client_authentication_required(self, request, *args, **kwargs):

555

# Require authentication for confidential clients

556

client = self.db.query(Client).filter_by(

557

client_id=request.client_id

558

).first()

559

return client and client.client_type == 'confidential'

560

561

# Redirect URI Validation

562

def validate_redirect_uri(self, client_id, redirect_uri, request, *args, **kwargs):

563

client = self.db.query(Client).filter_by(client_id=client_id).first()

564

if not client:

565

return False

566

567

return redirect_uri in [uri.uri for uri in client.redirect_uris]

568

569

def get_default_redirect_uri(self, client_id, request, *args, **kwargs):

570

client = self.db.query(Client).filter_by(client_id=client_id).first()

571

if client and client.redirect_uris:

572

return client.redirect_uris[0].uri

573

return None

574

575

# Scope Management

576

def validate_scopes(self, client_id, scopes, client, request, *args, **kwargs):

577

client_obj = self.db.query(Client).filter_by(client_id=client_id).first()

578

if not client_obj:

579

return False

580

581

allowed_scopes = {scope.name for scope in client_obj.allowed_scopes}

582

return all(scope in allowed_scopes for scope in scopes)

583

584

def get_default_scopes(self, client_id, request, *args, **kwargs):

585

client = self.db.query(Client).filter_by(client_id=client_id).first()

586

if client:

587

return [scope.name for scope in client.default_scopes]

588

return []

589

590

# Authorization Code Management

591

def save_authorization_code(self, client_id, code, request, *args, **kwargs):

592

auth_code = AuthorizationCode(

593

client_id=client_id,

594

user_id=request.user.id,

595

code=code['code'],

596

redirect_uri=request.redirect_uri,

597

scopes=' '.join(request.scopes),

598

code_challenge=getattr(request, 'code_challenge', None),

599

code_challenge_method=getattr(request, 'code_challenge_method', None),

600

expires_at=datetime.utcnow() + timedelta(minutes=10)

601

)

602

self.db.add(auth_code)

603

self.db.commit()

604

605

def validate_code(self, client_id, code, client, request, *args, **kwargs):

606

auth_code = self.db.query(AuthorizationCode).filter_by(

607

client_id=client_id,

608

code=code

609

).first()

610

611

if not auth_code:

612

return False

613

614

if auth_code.expires_at < datetime.utcnow():

615

return False

616

617

# Validate PKCE if present

618

if auth_code.code_challenge:

619

code_verifier = getattr(request, 'code_verifier', None)

620

if not code_verifier:

621

return False

622

623

if auth_code.code_challenge_method == 'S256':

624

challenge = hashlib.sha256(code_verifier.encode()).hexdigest()

625

else:

626

challenge = code_verifier

627

628

if not secrets.compare_digest(auth_code.code_challenge, challenge):

629

return False

630

631

return True

632

633

def confirm_redirect_uri(self, client_id, code, redirect_uri, client, request, *args, **kwargs):

634

auth_code = self.db.query(AuthorizationCode).filter_by(

635

client_id=client_id,

636

code=code

637

).first()

638

639

return auth_code and auth_code.redirect_uri == redirect_uri

640

641

def invalidate_authorization_code(self, client_id, code, request, *args, **kwargs):

642

auth_code = self.db.query(AuthorizationCode).filter_by(

643

client_id=client_id,

644

code=code

645

).first()

646

647

if auth_code:

648

self.db.delete(auth_code)

649

self.db.commit()

650

651

# Token Management

652

def save_bearer_token(self, token, request, *args, **kwargs):

653

access_token = AccessToken(

654

client_id=request.client_id,

655

user_id=getattr(request, 'user_id', None),

656

token=token['access_token'],

657

scopes=token.get('scope', ''),

658

expires_at=datetime.utcnow() + timedelta(seconds=token['expires_in'])

659

)

660

self.db.add(access_token)

661

662

if 'refresh_token' in token:

663

refresh_token = RefreshToken(

664

client_id=request.client_id,

665

user_id=getattr(request, 'user_id', None),

666

token=token['refresh_token'],

667

scopes=token.get('scope', ''),

668

access_token=access_token

669

)

670

self.db.add(refresh_token)

671

672

self.db.commit()

673

674

def validate_bearer_token(self, token, scopes, request):

675

access_token = self.db.query(AccessToken).filter_by(

676

token=token

677

).first()

678

679

if not access_token:

680

return False

681

682

if access_token.expires_at < datetime.utcnow():

683

return False

684

685

token_scopes = access_token.scopes.split()

686

if not all(scope in token_scopes for scope in scopes):

687

return False

688

689

# Add user info to request

690

request.user_id = access_token.user_id

691

request.client_id = access_token.client_id

692

request.scopes = token_scopes

693

694

return True

695

696

def validate_refresh_token(self, refresh_token, client, request, *args, **kwargs):

697

token = self.db.query(RefreshToken).filter_by(

698

token=refresh_token,

699

client_id=request.client_id

700

).first()

701

702

return token is not None

703

704

# User Authentication (Password Grant)

705

def validate_user(self, username, password, client, request, *args, **kwargs):

706

user = self.db.query(User).filter_by(username=username).first()

707

if not user or not user.is_active:

708

return False

709

710

# Use secure password verification

711

if verify_password(password, user.password_hash):

712

request.user_id = user.id

713

return True

714

715

return False

716

717

# Grant Type Validation

718

def validate_grant_type(self, client_id, grant_type, client, request, *args, **kwargs):

719

client_obj = self.db.query(Client).filter_by(client_id=client_id).first()

720

if not client_obj:

721

return False

722

723

allowed_grants = {grant.grant_type for grant in client_obj.allowed_grants}

724

return grant_type in allowed_grants

725

726

# PKCE Support

727

def is_pkce_required(self, client_id, request):

728

client = self.db.query(Client).filter_by(client_id=client_id).first()

729

return client and client.require_pkce

730

731

# Token Introspection

732

def introspect_token(self, token, token_type_hint, request, *args, **kwargs):

733

if token_type_hint == 'refresh_token':

734

token_obj = self.db.query(RefreshToken).filter_by(token=token).first()

735

else:

736

token_obj = self.db.query(AccessToken).filter_by(token=token).first()

737

738

if not token_obj:

739

return {'active': False}

740

741

if hasattr(token_obj, 'expires_at') and token_obj.expires_at < datetime.utcnow():

742

return {'active': False}

743

744

return {

745

'active': True,

746

'client_id': token_obj.client_id,

747

'username': token_obj.user.username if token_obj.user else None,

748

'scope': token_obj.scopes,

749

'exp': int(token_obj.expires_at.timestamp()) if hasattr(token_obj, 'expires_at') else None,

750

'token_type': 'Bearer' if isinstance(token_obj, AccessToken) else 'refresh_token'

751

}

752

```