or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

additional.mdauth.mdclient.mdcore-server.mdindex.mdrequest-response.mdtesting.mdwebsockets.md

auth.mddocs/

0

# Authentication & Authorization

1

2

BlackSheep provides comprehensive authentication and authorization features through integration with the `guardpost` library, supporting JWT tokens, cookie-based authentication, and flexible authorization policies.

3

4

## Authentication Overview

5

6

Authentication in BlackSheep is handled through authentication strategies that can validate user credentials from various sources like JWT tokens, cookies, or custom authentication mechanisms.

7

8

### Authentication Setup

9

10

```python { .api }

11

from blacksheep import Application

12

from blacksheep.server.authentication import AuthenticationStrategy

13

from guardpost import Identity

14

15

# Basic authentication setup

16

app = Application()

17

auth_strategy = app.use_authentication()

18

19

# Custom authentication handler

20

class CustomAuthHandler:

21

async def authenticate(self, context: Request) -> Optional[Identity]:

22

# Extract and validate credentials

23

auth_header = context.get_first_header(b"Authorization")

24

if auth_header:

25

# Validate token/credentials

26

user_id = validate_token(auth_header.decode())

27

if user_id:

28

return Identity({"sub": user_id, "name": f"User {user_id}"})

29

return None

30

31

auth_strategy.add(CustomAuthHandler())

32

```

33

34

### Authentication Decorators

35

36

```python { .api }

37

from blacksheep import auth, allow_anonymous

38

39

# Require authentication

40

@app.get("/protected")

41

@auth() # Requires authenticated user

42

async def protected_endpoint():

43

return json({"message": "You are authenticated"})

44

45

# Allow anonymous access (overrides global auth requirements)

46

@app.get("/public")

47

@allow_anonymous

48

async def public_endpoint():

49

return json({"message": "Public access"})

50

51

# Require specific authentication schemes

52

@app.get("/jwt-only")

53

@auth(authentication_schemes=["JWT Bearer"])

54

async def jwt_only_endpoint():

55

return json({"message": "JWT authenticated"})

56

```

57

58

## JWT Bearer Authentication

59

60

BlackSheep provides built-in support for JWT Bearer token authentication with OIDC integration.

61

62

### JWT Configuration

63

64

```python { .api }

65

from blacksheep.server.authentication.jwt import JWTBearerAuthentication

66

67

# Basic JWT setup with OIDC discovery

68

jwt_auth = JWTBearerAuthentication(

69

valid_audiences=["api", "myapp"],

70

authority="https://auth.example.com", # OIDC authority for key discovery

71

require_kid=True # Require key ID in JWT header

72

)

73

74

auth_strategy = app.use_authentication()

75

auth_strategy.add(jwt_auth)

76

77

# Manual key configuration

78

jwt_auth = JWTBearerAuthentication(

79

valid_audiences=["api"],

80

valid_issuers=["https://auth.example.com"],

81

keys_url="https://auth.example.com/.well-known/jwks.json",

82

cache_time=3600, # Cache keys for 1 hour

83

auth_mode="JWT Bearer"

84

)

85

```

86

87

### JWT Claims Access

88

89

```python { .api }

90

from guardpost import Identity

91

92

@app.get("/profile")

93

@auth()

94

async def get_profile(request: Request):

95

identity: Identity = request.identity

96

97

# Standard JWT claims

98

user_id = identity.id # Subject (sub claim)

99

claims = identity.claims # All claims dict

100

101

# Access specific claims

102

email = claims.get("email")

103

roles = claims.get("roles", [])

104

scopes = claims.get("scope", "").split()

105

106

return json({

107

"user_id": user_id,

108

"email": email,

109

"roles": roles,

110

"scopes": scopes

111

})

112

```

113

114

### Custom JWT Key Providers

115

116

```python { .api }

117

from typing import Dict, Any, Optional

118

119

class CustomKeysProvider:

120

"""Custom key provider for JWT validation"""

121

122

async def get_keys(self) -> Dict[str, Any]:

123

"""Return JWKS keys for token validation"""

124

# Fetch keys from custom source

125

return {

126

"keys": [

127

{

128

"kty": "RSA",

129

"kid": "key1",

130

"n": "...", # RSA modulus

131

"e": "AQAB" # RSA exponent

132

}

133

]

134

}

135

136

# Use custom key provider

137

jwt_auth = JWTBearerAuthentication(

138

valid_audiences=["api"],

139

keys_provider=CustomKeysProvider(),

140

require_kid=True

141

)

142

```

143

144

## Cookie Authentication

145

146

Cookie-based authentication for web applications with secure session management.

147

148

### Cookie Authentication Setup

149

150

```python { .api }

151

from blacksheep.server.authentication.cookie import CookieAuthentication

152

from blacksheep.sessions.crypto import Serializer

153

154

# Basic cookie authentication

155

cookie_auth = CookieAuthentication(

156

cookie_name="auth_session",

157

secret_keys=["your-secret-key-here"],

158

auth_scheme="Cookie"

159

)

160

161

auth_strategy = app.use_authentication()

162

auth_strategy.add(cookie_auth)

163

164

# Advanced cookie authentication with custom serializer

165

custom_serializer = Serializer(

166

secret_key="encryption-key",

167

signing_key="signing-key"

168

)

169

170

cookie_auth = CookieAuthentication(

171

cookie_name="secure_session",

172

serializer=custom_serializer,

173

auth_scheme="SecureCookie"

174

)

175

```

176

177

### Cookie Authentication Usage

178

179

```python { .api }

180

from blacksheep.cookies import Cookie

181

from datetime import datetime, timedelta

182

183

# Login endpoint that sets authentication cookie

184

@app.post("/login")

185

async def login(credentials: FromJSON[dict]):

186

username = credentials.value.get("username")

187

password = credentials.value.get("password")

188

189

# Validate credentials

190

user = await authenticate_user(username, password)

191

if not user:

192

raise Unauthorized("Invalid credentials")

193

194

# Create response

195

response = json({"message": "Logged in successfully"})

196

197

# Set authentication cookie

198

auth_data = {"user_id": user.id, "username": user.username}

199

cookie_auth.set_cookie(auth_data, response, secure=True)

200

201

return response

202

203

# Logout endpoint that removes cookie

204

@app.post("/logout")

205

@auth()

206

async def logout():

207

response = json({"message": "Logged out successfully"})

208

cookie_auth.unset_cookie(response)

209

return response

210

211

# Access authenticated user data

212

@app.get("/me")

213

@auth()

214

async def get_current_user(request: Request):

215

# User identity is automatically available

216

identity = request.identity

217

return json({

218

"user_id": identity.id,

219

"claims": identity.claims

220

})

221

```

222

223

## Authorization

224

225

Authorization in BlackSheep is policy-based, allowing fine-grained access control through roles, permissions, and custom requirements.

226

227

### Authorization Setup

228

229

```python { .api }

230

from blacksheep.server.authorization import AuthorizationStrategy

231

from guardpost.authorization import Policy, Requirement

232

233

# Basic authorization setup

234

app.use_authentication() # Authentication required first

235

authz_strategy = app.use_authorization()

236

237

# Define authorization policies

238

admin_policy = Policy("admin", Requirement("role:admin"))

239

moderator_policy = Policy("moderator", Requirement("role:moderator"))

240

api_access_policy = Policy("api_access", Requirement("scope:api.read"))

241

242

authz_strategy.add(admin_policy)

243

authz_strategy.add(moderator_policy)

244

authz_strategy.add(api_access_policy)

245

246

# Default policy (just requires authentication)

247

default_policy = Policy("authenticated", AuthenticatedRequirement())

248

authz_strategy.default_policy = default_policy

249

```

250

251

### Authorization Decorators

252

253

```python { .api }

254

# Require specific policy

255

@app.get("/admin/users")

256

@auth("admin") # Requires "admin" policy

257

async def admin_users():

258

return json({"admin_access": True})

259

260

# Require multiple policies (all must pass)

261

@app.get("/moderator/posts")

262

@auth(["moderator", "api_access"])

263

async def moderate_posts():

264

return json({"moderator_access": True})

265

266

# Default authenticated policy

267

@app.get("/profile")

268

@auth() # Uses default "authenticated" policy

269

async def user_profile():

270

return json({"user_access": True})

271

272

# Allow anonymous (bypass authorization)

273

@app.get("/public")

274

@allow_anonymous

275

async def public_access():

276

return json({"public": True})

277

```

278

279

### Custom Requirements

280

281

```python { .api }

282

from guardpost.authorization import Requirement, AsyncRequirement

283

from guardpost import Identity

284

285

class RoleRequirement(Requirement):

286

"""Require specific role"""

287

288

def __init__(self, required_role: str):

289

self.required_role = required_role

290

291

def handle(self, identity: Identity) -> bool:

292

user_roles = identity.claims.get("roles", [])

293

return self.required_role in user_roles

294

295

class PermissionRequirement(AsyncRequirement):

296

"""Require specific permission (async validation)"""

297

298

def __init__(self, permission: str):

299

self.permission = permission

300

301

async def handle(self, identity: Identity) -> bool:

302

user_id = identity.id

303

# Check permission in database

304

has_permission = await check_user_permission(user_id, self.permission)

305

return has_permission

306

307

# Use custom requirements

308

admin_role_policy = Policy("admin_role", RoleRequirement("admin"))

309

write_permission_policy = Policy("can_write", PermissionRequirement("posts:write"))

310

311

authz_strategy.add(admin_role_policy)

312

authz_strategy.add(write_permission_policy)

313

314

@app.post("/posts")

315

@auth("can_write") # Uses async permission check

316

async def create_post(post_data: FromJSON[dict]):

317

return json({"post_created": True})

318

```

319

320

### Resource-Based Authorization

321

322

```python { .api }

323

class ResourceOwnerRequirement(AsyncRequirement):

324

"""Check if user owns the resource"""

325

326

async def handle(self, identity: Identity, resource_id: str = None) -> bool:

327

if not resource_id:

328

return False

329

330

user_id = identity.id

331

resource = await get_resource(resource_id)

332

return resource and resource.owner_id == user_id

333

334

# Resource-specific authorization

335

@app.get("/posts/{post_id:int}")

336

@auth()

337

async def get_post(post_id: FromRoute[int], request: Request):

338

# Manual authorization check

339

identity = request.identity

340

post = await get_post_by_id(post_id.value)

341

342

if post.is_private and post.owner_id != identity.id:

343

raise Forbidden("Access denied to private post")

344

345

return json(post.to_dict())

346

```

347

348

## Authentication Challenges

349

350

Handle authentication challenges and provide appropriate responses for different authentication failures.

351

352

### Challenge Handling

353

354

```python { .api }

355

from blacksheep.server.authentication import AuthenticateChallenge

356

357

# Custom challenge handler

358

async def custom_auth_challenge_handler(app: Application, request: Request, exception: AuthenticateChallenge):

359

"""Handle authentication challenges"""

360

361

# Get WWW-Authenticate header

362

auth_header = exception.get_header()

363

364

# Custom challenge response

365

if request.path.startswith("/api/"):

366

# API endpoints get JSON response

367

response = Response(401, content=JSONContent({

368

"error": "authentication_required",

369

"message": "Valid authentication required",

370

"scheme": exception.scheme

371

}))

372

else:

373

# Web endpoints redirect to login

374

response = redirect("/login")

375

376

response.headers.add(*auth_header)

377

return response

378

379

# Register challenge handler

380

app.exception_handler(AuthenticateChallenge)(custom_auth_challenge_handler)

381

```

382

383

## Error Handling

384

385

Comprehensive error handling for authentication and authorization failures.

386

387

### Authentication Errors

388

389

```python { .api }

390

from blacksheep.exceptions import Unauthorized, Forbidden

391

from blacksheep.server.authorization import ForbiddenError

392

from guardpost.authorization import UnauthorizedError

393

394

# Handle 401 Unauthorized

395

@app.exception_handler(401)

396

async def handle_unauthorized(app: Application, request: Request, exc: HTTPException):

397

if request.path.startswith("/api/"):

398

return Response(401, content=JSONContent({

399

"error": "unauthorized",

400

"message": "Authentication required"

401

}))

402

else:

403

return redirect("/login")

404

405

# Handle 403 Forbidden

406

@app.exception_handler(403)

407

async def handle_forbidden(app: Application, request: Request, exc: HTTPException):

408

return Response(403, content=JSONContent({

409

"error": "forbidden",

410

"message": "Insufficient permissions"

411

}))

412

413

# Handle authorization errors

414

@app.exception_handler(UnauthorizedError)

415

async def handle_authz_error(app: Application, request: Request, exc: UnauthorizedError):

416

if isinstance(exc, ForbiddenError):

417

status = 403

418

message = "Access forbidden"

419

else:

420

status = 401

421

message = "Authentication required"

422

423

return Response(status, content=JSONContent({

424

"error": "authorization_failed",

425

"message": message

426

}))

427

```

428

429

## Session Management

430

431

Secure session management with encryption and signing for web applications.

432

433

### Session Configuration

434

435

```python { .api }

436

from blacksheep.sessions import SessionMiddleware

437

from blacksheep.sessions.crypto import Encryptor, Signer

438

439

# Basic session setup

440

app.use_sessions(

441

secret_key="your-secret-key-here",

442

session_cookie="session",

443

session_max_age=3600 # 1 hour

444

)

445

446

# Advanced session configuration

447

custom_encryptor = Encryptor("encryption-key")

448

custom_signer = Signer("signing-key")

449

450

session_middleware = SessionMiddleware(

451

secret_key="session-secret",

452

session_cookie="secure_session",

453

encryptor=custom_encryptor,

454

signer=custom_signer,

455

session_max_age=86400 # 24 hours

456

)

457

458

app.middlewares.append(session_middleware)

459

```

460

461

### Session Usage

462

463

```python { .api }

464

from blacksheep.sessions import Session

465

466

# Access session in handlers

467

@app.get("/dashboard")

468

async def dashboard(request: Request):

469

session: Session = request.session

470

471

# Get session data

472

user_id = session.get("user_id")

473

preferences = session.get("preferences", {})

474

475

if not user_id:

476

return redirect("/login")

477

478

return json({

479

"user_id": user_id,

480

"preferences": preferences

481

})

482

483

# Set session data

484

@app.post("/login")

485

async def login(credentials: FromJSON[dict]):

486

# Authenticate user

487

user = await authenticate_user(credentials.value)

488

if not user:

489

raise Unauthorized("Invalid credentials")

490

491

# Set session data

492

session = request.session

493

session["user_id"] = user.id

494

session["username"] = user.username

495

session["login_time"] = datetime.utcnow().isoformat()

496

497

return json({"message": "Login successful"})

498

499

# Clear session

500

@app.post("/logout")

501

async def logout(request: Request):

502

session = request.session

503

session.clear() # Clear all session data

504

return json({"message": "Logged out"})

505

```

506

507

## Complete Authentication Example

508

509

Here's a complete example showing authentication and authorization in a BlackSheep application:

510

511

```python { .api }

512

from blacksheep import Application, json, Request, FromJSON

513

from blacksheep.server.authentication.jwt import JWTBearerAuthentication

514

from blacksheep.server.authentication.cookie import CookieAuthentication

515

from blacksheep import auth, allow_anonymous

516

from guardpost.authorization import Policy, Requirement

517

from guardpost import Identity

518

519

app = Application()

520

521

# Setup authentication strategies

522

auth_strategy = app.use_authentication()

523

524

# JWT for API access

525

jwt_auth = JWTBearerAuthentication(

526

valid_audiences=["api"],

527

authority="https://auth.example.com"

528

)

529

auth_strategy.add(jwt_auth)

530

531

# Cookie auth for web interface

532

cookie_auth = CookieAuthentication(

533

cookie_name="webapp_session",

534

secret_keys=["webapp-secret-key"]

535

)

536

auth_strategy.add(cookie_auth)

537

538

# Setup authorization

539

authz_strategy = app.use_authorization()

540

authz_strategy.add(Policy("admin", Requirement("role:admin")))

541

authz_strategy.add(Policy("user", Requirement("role:user")))

542

543

# Public endpoints

544

@app.get("/")

545

@allow_anonymous

546

async def home():

547

return json({"message": "Welcome to BlackSheep API"})

548

549

# User endpoints (require authentication)

550

@app.get("/profile")

551

@auth()

552

async def get_profile(request: Request):

553

identity: Identity = request.identity

554

return json({

555

"user_id": identity.id,

556

"claims": identity.claims

557

})

558

559

# Admin endpoints (require admin role)

560

@app.get("/admin/users")

561

@auth("admin")

562

async def list_all_users():

563

# Only admins can access this

564

users = await get_all_users()

565

return json({"users": users})

566

567

# API endpoints (JWT only)

568

@app.get("/api/data")

569

@auth(authentication_schemes=["JWT Bearer"])

570

async def api_data():

571

return json({"data": "API response"})

572

573

# Run with: uvicorn main:app

574

```

575

576

This authentication and authorization system provides secure, flexible access control for both API and web application use cases with support for multiple authentication methods and fine-grained authorization policies.