or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-routing.mdconfiguration.mddto.mdexceptions.mdhttp-handlers.mdindex.mdmiddleware.mdopenapi.mdplugins.mdrequest-response.mdsecurity.mdtesting.mdwebsocket.md

security.mddocs/

0

# Security and Authentication

1

2

Authentication and authorization systems including JWT authentication, session-based auth, and OAuth2 support. Litestar provides comprehensive security primitives for protecting routes and managing user sessions.

3

4

## Capabilities

5

6

### JWT Authentication

7

8

JSON Web Token-based authentication with support for various token storage methods and OAuth2 flows.

9

10

```python { .api }

11

class JWTAuth(BaseJWTAuth):

12

def __init__(

13

self,

14

token_secret: str,

15

retrieve_user_handler: UserHandlerProtocol,

16

*,

17

algorithm: str = "HS256",

18

auth_header: str = "Authorization",

19

default_token_expiration: timedelta = timedelta(days=1),

20

openapi_security_scheme_name: str = "BearerToken",

21

description: str = "JWT api-key authentication and authorization.",

22

authentication_middleware_class: type[AbstractAuthenticationMiddleware] = JWTAuthenticationMiddleware,

23

guards: Sequence[Guard] | None = None,

24

exclude: str | list[str] | None = None,

25

exclude_opt_key: str = "exclude_from_auth",

26

scopes: Scopes | None = None,

27

route_handlers: Sequence[BaseRouteHandler] | None = None,

28

dependencies: Dependencies | None = None,

29

middleware: Sequence[Middleware] | None = None,

30

):

31

"""

32

JWT Authentication configuration.

33

34

Parameters:

35

- token_secret: Secret key for signing tokens

36

- retrieve_user_handler: Function to retrieve user from token payload

37

- algorithm: JWT signing algorithm (HS256, RS256, etc.)

38

- auth_header: HTTP header containing the token

39

- default_token_expiration: Default token expiration time

40

- openapi_security_scheme_name: Name for OpenAPI security scheme

41

- description: Description for OpenAPI documentation

42

- authentication_middleware_class: Middleware class to use

43

- guards: Authorization guards

44

- exclude: Paths to exclude from authentication

45

- exclude_opt_key: Key for excluding specific routes

46

- scopes: OAuth2 scopes configuration

47

- route_handlers: Route handlers requiring authentication

48

- dependencies: Additional dependencies

49

- middleware: Additional middleware

50

"""

51

52

def create_token(

53

self,

54

identifier: str,

55

*,

56

exp: datetime | None = None,

57

sub: str | None = None,

58

aud: str | list[str] | None = None,

59

iss: str | None = None,

60

jti: str | None = None,

61

**kwargs: Any,

62

) -> str:

63

"""

64

Create a JWT token.

65

66

Parameters:

67

- identifier: Unique user identifier

68

- exp: Token expiration time

69

- sub: Token subject

70

- aud: Token audience

71

- iss: Token issuer

72

- jti: JWT ID

73

- **kwargs: Additional claims

74

75

Returns:

76

Encoded JWT token string

77

"""

78

79

def decode_token(self, encoded_token: str) -> dict[str, Any]:

80

"""

81

Decode and validate a JWT token.

82

83

Parameters:

84

- encoded_token: JWT token string

85

86

Returns:

87

Decoded token payload

88

"""

89

90

class JWTCookieAuth(JWTAuth):

91

def __init__(

92

self,

93

token_secret: str,

94

retrieve_user_handler: UserHandlerProtocol,

95

*,

96

key: str = "token",

97

path: str = "/",

98

domain: str | None = None,

99

secure: bool = True,

100

httponly: bool = True,

101

samesite: Literal["lax", "strict", "none"] = "lax",

102

**kwargs: Any,

103

):

104

"""

105

Cookie-based JWT authentication.

106

107

Parameters:

108

- token_secret: Secret key for signing tokens

109

- retrieve_user_handler: Function to retrieve user from token payload

110

- key: Cookie name for storing the token

111

- path: Cookie path

112

- domain: Cookie domain

113

- secure: Use secure cookies (HTTPS only)

114

- httponly: Use HTTP-only cookies

115

- samesite: SameSite cookie attribute

116

"""

117

118

def create_response_with_token(

119

self,

120

identifier: str,

121

response: Response | None = None,

122

**kwargs: Any,

123

) -> Response:

124

"""

125

Create a response with JWT token set as cookie.

126

127

Parameters:

128

- identifier: User identifier for token

129

- response: Existing response to modify (creates new if None)

130

- **kwargs: Additional token claims

131

132

Returns:

133

Response with token cookie set

134

"""

135

136

def create_logout_response(self, response: Response | None = None) -> Response:

137

"""

138

Create a response that clears the authentication cookie.

139

140

Parameters:

141

- response: Existing response to modify

142

143

Returns:

144

Response with cleared authentication cookie

145

"""

146

```

147

148

### OAuth2 Authentication

149

150

OAuth2 authentication flows including password bearer and authorization code flows.

151

152

```python { .api }

153

class OAuth2PasswordBearerAuth(JWTAuth):

154

def __init__(

155

self,

156

token_url: str,

157

token_secret: str,

158

retrieve_user_handler: UserHandlerProtocol,

159

*,

160

scopes: dict[str, str] | None = None,

161

**kwargs: Any,

162

):

163

"""

164

OAuth2 password bearer authentication.

165

166

Parameters:

167

- token_url: URL endpoint for token requests

168

- token_secret: Secret for signing tokens

169

- retrieve_user_handler: Function to retrieve user

170

- scopes: OAuth2 scopes mapping

171

"""

172

173

class OAuth2Login:

174

def __init__(

175

self,

176

*,

177

identifier: str,

178

exp: datetime | None = None,

179

sub: str | None = None,

180

aud: str | list[str] | None = None,

181

iss: str | None = None,

182

jti: str | None = None,

183

**kwargs: Any,

184

):

185

"""

186

OAuth2 login token configuration.

187

188

Parameters:

189

- identifier: User identifier

190

- exp: Expiration time

191

- sub: Subject

192

- aud: Audience

193

- iss: Issuer

194

- jti: JWT ID

195

"""

196

```

197

198

### Session Authentication

199

200

Session-based authentication using server-side session storage.

201

202

```python { .api }

203

class SessionAuth:

204

def __init__(

205

self,

206

retrieve_user_handler: UserHandlerProtocol,

207

session_backend: BaseSessionBackend,

208

*,

209

exclude: str | list[str] | None = None,

210

exclude_opt_key: str = "exclude_from_auth",

211

guards: Sequence[Guard] | None = None,

212

route_handlers: Sequence[BaseRouteHandler] | None = None,

213

dependencies: Dependencies | None = None,

214

middleware: Sequence[Middleware] | None = None,

215

):

216

"""

217

Session-based authentication.

218

219

Parameters:

220

- retrieve_user_handler: Function to retrieve user from session

221

- session_backend: Session storage backend

222

- exclude: Paths to exclude from authentication

223

- exclude_opt_key: Key for excluding specific routes

224

- guards: Authorization guards

225

- route_handlers: Route handlers requiring authentication

226

- dependencies: Additional dependencies

227

- middleware: Additional middleware

228

"""

229

230

async def login(

231

self,

232

identifier: str,

233

request: Request,

234

*,

235

response: Response | None = None,

236

**kwargs: Any,

237

) -> Response:

238

"""

239

Log in a user by creating a session.

240

241

Parameters:

242

- identifier: User identifier

243

- request: HTTP request object

244

- response: Existing response to modify

245

- **kwargs: Additional session data

246

247

Returns:

248

Response with session established

249

"""

250

251

async def logout(

252

self,

253

request: Request,

254

response: Response | None = None,

255

) -> Response:

256

"""

257

Log out a user by destroying the session.

258

259

Parameters:

260

- request: HTTP request object

261

- response: Existing response to modify

262

263

Returns:

264

Response with session cleared

265

"""

266

```

267

268

### Authentication Middleware

269

270

Base classes for implementing custom authentication middleware.

271

272

```python { .api }

273

class AbstractAuthenticationMiddleware(AbstractMiddleware):

274

def __init__(

275

self,

276

app: ASGIApp,

277

exclude: str | list[str] | None = None,

278

exclude_opt_key: str = "exclude_from_auth",

279

scopes: Scopes | None = None,

280

):

281

"""

282

Base authentication middleware.

283

284

Parameters:

285

- app: ASGI application

286

- exclude: Paths to exclude from authentication

287

- exclude_opt_key: Key for excluding specific routes

288

- scopes: OAuth2 scopes configuration

289

"""

290

291

async def authenticate_request(self, connection: ASGIConnection) -> AuthenticationResult:

292

"""

293

Authenticate an incoming request.

294

295

Parameters:

296

- connection: ASGI connection object

297

298

Returns:

299

Authentication result with user and auth information

300

"""

301

302

class JWTAuthenticationMiddleware(AbstractAuthenticationMiddleware):

303

"""JWT authentication middleware implementation."""

304

305

class JWTCookieAuthenticationMiddleware(AbstractAuthenticationMiddleware):

306

"""JWT cookie authentication middleware implementation."""

307

308

class SessionAuthMiddleware(AbstractAuthenticationMiddleware):

309

"""Session authentication middleware implementation."""

310

```

311

312

### Authorization and Guards

313

314

Authorization guards for protecting routes based on user permissions and roles.

315

316

```python { .api }

317

def guards(*guards: Guard) -> Callable[[T], T]:

318

"""

319

Decorator to apply authorization guards to route handlers.

320

321

Parameters:

322

- *guards: Guard functions to apply

323

324

Returns:

325

Decorator function

326

"""

327

328

# Common guard implementations

329

async def user_required_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:

330

"""Guard that requires an authenticated user."""

331

if not connection.user:

332

raise NotAuthorizedException("Authentication required")

333

334

async def admin_required_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:

335

"""Guard that requires admin privileges."""

336

if not connection.user or not getattr(connection.user, "is_admin", False):

337

raise PermissionDeniedException("Admin access required")

338

339

def role_required_guard(*required_roles: str) -> Guard:

340

"""

341

Create a guard that requires specific roles.

342

343

Parameters:

344

- *required_roles: Required role names

345

346

Returns:

347

Guard function

348

"""

349

async def guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:

350

if not connection.user:

351

raise NotAuthorizedException("Authentication required")

352

353

user_roles = getattr(connection.user, "roles", [])

354

if not any(role in user_roles for role in required_roles):

355

raise PermissionDeniedException("Insufficient permissions")

356

357

return guard

358

```

359

360

### Authentication Result

361

362

Data structures for authentication results and user information.

363

364

```python { .api }

365

class AuthenticationResult:

366

def __init__(self, user: Any | None = None, auth: Any | None = None):

367

"""

368

Authentication result.

369

370

Parameters:

371

- user: Authenticated user object

372

- auth: Authentication information

373

"""

374

self.user = user

375

self.auth = auth

376

377

class Token:

378

def __init__(

379

self,

380

token: str,

381

*,

382

exp: datetime | None = None,

383

sub: str | None = None,

384

aud: str | list[str] | None = None,

385

iss: str | None = None,

386

jti: str | None = None,

387

**claims: Any,

388

):

389

"""

390

JWT token representation.

391

392

Parameters:

393

- token: Encoded token string

394

- exp: Expiration time

395

- sub: Subject

396

- aud: Audience

397

- iss: Issuer

398

- jti: JWT ID

399

- **claims: Additional token claims

400

"""

401

```

402

403

## Usage Examples

404

405

### Basic JWT Authentication

406

407

```python

408

from litestar import Litestar, get, post

409

from litestar.security.jwt import JWTAuth, Token

410

from litestar.connection import ASGIConnection

411

from dataclasses import dataclass

412

from datetime import datetime, timedelta

413

414

@dataclass

415

class User:

416

id: int

417

username: str

418

email: str

419

420

# User retrieval function

421

async def retrieve_user_handler(token: Token, connection: ASGIConnection) -> User | None:

422

# In a real app, you'd query your database

423

if token.sub == "123":

424

return User(id=123, username="alice", email="alice@example.com")

425

return None

426

427

# JWT configuration

428

jwt_auth = JWTAuth(

429

token_secret="your-secret-key",

430

retrieve_user_handler=retrieve_user_handler,

431

default_token_expiration=timedelta(hours=24),

432

)

433

434

@post("/login", exclude_from_auth=True)

435

async def login_handler(data: dict) -> dict:

436

# Validate credentials (simplified)

437

if data.get("username") == "alice" and data.get("password") == "secret":

438

token = jwt_auth.create_token(identifier="123")

439

return {"access_token": token, "token_type": "bearer"}

440

441

raise NotAuthorizedException("Invalid credentials")

442

443

@get("/profile")

444

async def get_profile(request: Request) -> dict:

445

# request.user is automatically populated by JWT middleware

446

user = request.user

447

return {

448

"id": user.id,

449

"username": user.username,

450

"email": user.email

451

}

452

453

app = Litestar(

454

route_handlers=[login_handler, get_profile],

455

on_app_init=[jwt_auth.on_app_init],

456

)

457

```

458

459

### Cookie-Based JWT Authentication

460

461

```python

462

from litestar.security.jwt import JWTCookieAuth

463

from litestar.response import Response

464

465

jwt_cookie_auth = JWTCookieAuth(

466

token_secret="your-secret-key",

467

retrieve_user_handler=retrieve_user_handler,

468

key="access_token",

469

httponly=True,

470

secure=True,

471

samesite="strict",

472

)

473

474

@post("/login", exclude_from_auth=True)

475

async def login_with_cookie(data: dict) -> Response:

476

if data.get("username") == "alice" and data.get("password") == "secret":

477

response = Response({"status": "logged in"})

478

return jwt_cookie_auth.create_response_with_token(

479

identifier="123",

480

response=response

481

)

482

483

raise NotAuthorizedException("Invalid credentials")

484

485

@post("/logout")

486

async def logout() -> Response:

487

response = Response({"status": "logged out"})

488

return jwt_cookie_auth.create_logout_response(response)

489

```

490

491

### Session Authentication

492

493

```python

494

from litestar.security.session_auth import SessionAuth

495

from litestar.middleware.session import SessionMiddleware

496

from litestar.stores.memory import MemoryStore

497

498

# Session storage

499

session_store = MemoryStore()

500

501

# Session middleware

502

session_middleware = SessionMiddleware(

503

store=session_store,

504

secret="session-secret-key",

505

)

506

507

# Session auth

508

session_auth = SessionAuth(

509

retrieve_user_handler=retrieve_user_handler,

510

session_backend=session_store,

511

)

512

513

@post("/login", exclude_from_auth=True)

514

async def session_login(request: Request, data: dict) -> Response:

515

if data.get("username") == "alice" and data.get("password") == "secret":

516

return await session_auth.login(

517

identifier="123",

518

request=request,

519

response=Response({"status": "logged in"})

520

)

521

522

raise NotAuthorizedException("Invalid credentials")

523

524

@post("/logout")

525

async def session_logout(request: Request) -> Response:

526

return await session_auth.logout(

527

request=request,

528

response=Response({"status": "logged out"})

529

)

530

531

app = Litestar(

532

route_handlers=[session_login, session_logout, get_profile],

533

middleware=[session_middleware],

534

on_app_init=[session_auth.on_app_init],

535

)

536

```

537

538

### OAuth2 Password Bearer

539

540

```python

541

from litestar.security.jwt import OAuth2PasswordBearerAuth

542

543

oauth2_auth = OAuth2PasswordBearerAuth(

544

token_url="/token",

545

token_secret="your-secret-key",

546

retrieve_user_handler=retrieve_user_handler,

547

scopes={

548

"read": "Read access",

549

"write": "Write access",

550

"admin": "Administrative access"

551

}

552

)

553

554

@post("/token", exclude_from_auth=True)

555

async def get_token(data: dict) -> dict:

556

# OAuth2 token endpoint

557

if data.get("username") == "alice" and data.get("password") == "secret":

558

token = oauth2_auth.create_token(

559

identifier="123",

560

scopes=["read", "write"]

561

)

562

return {

563

"access_token": token,

564

"token_type": "bearer",

565

"expires_in": 3600,

566

"scope": "read write"

567

}

568

569

raise NotAuthorizedException("Invalid credentials")

570

```

571

572

### Authorization Guards

573

574

```python

575

from litestar.security import guards

576

577

@dataclass

578

class User:

579

id: int

580

username: str

581

roles: list[str]

582

is_active: bool = True

583

584

async def admin_guard(connection: ASGIConnection, _: BaseRouteHandler) -> None:

585

user = connection.user

586

if not user or "admin" not in user.roles:

587

raise PermissionDeniedException("Admin access required")

588

589

@get("/admin/users", guards=[admin_guard])

590

async def list_all_users() -> list[dict]:

591

# Only accessible by admins

592

return [{"id": 1, "username": "alice"}]

593

594

@get("/profile", guards=[user_required_guard])

595

async def get_user_profile(request: Request) -> dict:

596

# Requires any authenticated user

597

return {"user": request.user.username}

598

599

# Custom role-based guard

600

editor_guard = role_required_guard("editor", "admin")

601

602

@post("/content", guards=[editor_guard])

603

async def create_content(data: dict) -> dict:

604

# Requires editor or admin role

605

return {"status": "created", "content": data}

606

```

607

608

### Custom Authentication Middleware

609

610

```python

611

from litestar.middleware.authentication import AbstractAuthenticationMiddleware, AuthenticationResult

612

613

class APIKeyAuthMiddleware(AbstractAuthenticationMiddleware):

614

def __init__(self, app: ASGIApp, api_keys: dict[str, dict]):

615

super().__init__(app)

616

self.api_keys = api_keys # Map API keys to user data

617

618

async def authenticate_request(self, connection: ASGIConnection) -> AuthenticationResult:

619

api_key = connection.headers.get("X-API-Key")

620

621

if not api_key:

622

return AuthenticationResult()

623

624

user_data = self.api_keys.get(api_key)

625

if user_data:

626

user = User(**user_data)

627

return AuthenticationResult(user=user, auth={"api_key": api_key})

628

629

return AuthenticationResult()

630

631

# Usage

632

api_keys = {

633

"abc123": {"id": 1, "username": "service_user", "roles": ["api"]},

634

"def456": {"id": 2, "username": "admin_user", "roles": ["api", "admin"]},

635

}

636

637

app = Litestar(

638

route_handlers=[...],

639

middleware=[APIKeyAuthMiddleware(api_keys=api_keys)],

640

)

641

```

642

643

## Types

644

645

```python { .api }

646

# User handler protocol

647

class UserHandlerProtocol(Protocol):

648

async def __call__(self, token: Token, connection: ASGIConnection) -> Any | None:

649

"""Retrieve user from token and connection."""

650

651

# Guard type

652

Guard = Callable[[ASGIConnection, BaseRouteHandler], bool | None | Awaitable[bool | None]]

653

654

# Scopes type

655

Scopes = dict[str, str]

656

657

# JWT algorithm types

658

JWTAlgorithm = Literal["HS256", "HS384", "HS512", "RS256", "RS384", "RS512", "ES256", "ES384", "ES512"]

659

660

# Authentication result types

661

AuthType = Any

662

UserType = Any

663

```