or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdcore-app.mdexceptions.mdindex.mdmcp.mdopenapi.mdrequest-response.mdstatus-codes.mdtemplating.mdwebsocket.md

authentication.mddocs/

0

# Authentication

1

2

Pluggable authentication system with bearer token support, custom authentication handlers, and identity management for securing routes and WebSocket connections.

3

4

## Capabilities

5

6

### Authentication Handler

7

8

Abstract base class for implementing custom authentication logic.

9

10

```python { .api }

11

class AuthenticationHandler:

12

def __init__(self, token_getter: TokenGetter):

13

"""

14

Initialize authentication handler.

15

16

Args:

17

token_getter: TokenGetter instance for extracting tokens from requests

18

"""

19

20

def authenticate(self, request: Request) -> Optional[Identity]:

21

"""

22

Authenticate a request and return user identity.

23

24

Args:

25

request: HTTP request object

26

27

Returns:

28

Identity object if authentication successful, None otherwise

29

30

Note:

31

This is an abstract method that must be implemented by subclasses

32

"""

33

34

@property

35

def unauthorized_response(self) -> Response:

36

"""

37

Default response for unauthorized requests.

38

39

Returns:

40

Response object with 401 status and unauthorized message

41

"""

42

```

43

44

### Bearer Token Authentication

45

46

Built-in token getter for extracting Bearer tokens from Authorization headers.

47

48

```python { .api }

49

class BearerGetter:

50

def get_token(self, request: Request) -> Optional[str]:

51

"""

52

Extract Bearer token from Authorization header.

53

54

Args:

55

request: HTTP request object

56

57

Returns:

58

Token string if found, None otherwise

59

60

Example:

61

For header "Authorization: Bearer abc123", returns "abc123"

62

"""

63

64

def set_token(self, request: Request, token: str):

65

"""

66

Set Bearer token in request (for testing/internal use).

67

68

Args:

69

request: HTTP request object

70

token: Token string to set

71

"""

72

```

73

74

### Identity Object

75

76

Represents an authenticated user's identity and claims.

77

78

```python { .api }

79

@dataclass

80

class Identity:

81

claims: dict[str, str]

82

83

"""

84

User identity containing authentication claims.

85

86

Attributes:

87

claims: Dictionary of user claims (e.g., user_id, email, roles)

88

"""

89

```

90

91

## Usage Examples

92

93

### Basic Authentication Handler

94

95

```python

96

from robyn import Robyn, AuthenticationHandler, BearerGetter, Identity

97

import jwt

98

99

class JWTAuthenticationHandler(AuthenticationHandler):

100

def __init__(self, secret_key: str):

101

self.secret_key = secret_key

102

super().__init__(BearerGetter())

103

104

def authenticate(self, request) -> Optional[Identity]:

105

# Get token using the bearer token getter

106

token = self.token_getter.get_token(request)

107

108

if not token:

109

return None

110

111

try:

112

# Decode JWT token

113

payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])

114

115

# Create identity from token payload

116

identity = Identity(claims={

117

"user_id": payload.get("user_id"),

118

"email": payload.get("email"),

119

"role": payload.get("role", "user")

120

})

121

122

return identity

123

124

except jwt.InvalidTokenError:

125

return None

126

127

app = Robyn(__file__)

128

129

# Configure authentication

130

auth_handler = JWTAuthenticationHandler("your-secret-key")

131

app.configure_authentication(auth_handler)

132

133

# Public route (no authentication required)

134

@app.get("/")

135

def public_route(request):

136

return {"message": "This is a public endpoint"}

137

138

# Protected route (authentication required)

139

@app.get("/profile", auth_required=True)

140

def get_profile(request):

141

# Access authenticated user's identity

142

user_identity = request.identity

143

return {

144

"user_id": user_identity.claims["user_id"],

145

"email": user_identity.claims["email"],

146

"role": user_identity.claims["role"]

147

}

148

149

# Protected route with role checking

150

@app.get("/admin", auth_required=True)

151

def admin_only(request):

152

user_identity = request.identity

153

154

if user_identity.claims.get("role") != "admin":

155

return Response(

156

status_code=403,

157

headers={},

158

description="Forbidden: Admin access required"

159

)

160

161

return {"message": "Welcome, admin!"}

162

163

app.start()

164

```

165

166

### API Key Authentication

167

168

```python

169

from robyn import Robyn, AuthenticationHandler, Identity

170

import hashlib

171

172

class APIKeyGetter:

173

def get_token(self, request):

174

# Get API key from X-API-Key header

175

return request.headers.get("X-API-Key")

176

177

def set_token(self, request, token):

178

request.headers.set("X-API-Key", token)

179

180

class APIKeyAuthenticationHandler(AuthenticationHandler):

181

def __init__(self, valid_api_keys: dict):

182

self.valid_api_keys = valid_api_keys # api_key -> user_info mapping

183

super().__init__(APIKeyGetter())

184

185

def authenticate(self, request) -> Optional[Identity]:

186

api_key = self.token_getter.get_token(request)

187

188

if not api_key or api_key not in self.valid_api_keys:

189

return None

190

191

user_info = self.valid_api_keys[api_key]

192

return Identity(claims=user_info)

193

194

app = Robyn(__file__)

195

196

# Configure API key authentication

197

valid_keys = {

198

"api_key_123": {"user_id": "user1", "name": "Alice", "tier": "premium"},

199

"api_key_456": {"user_id": "user2", "name": "Bob", "tier": "basic"}

200

}

201

202

auth_handler = APIKeyAuthenticationHandler(valid_keys)

203

app.configure_authentication(auth_handler)

204

205

@app.get("/api/data", auth_required=True)

206

def get_data(request):

207

user_identity = request.identity

208

tier = user_identity.claims.get("tier", "basic")

209

210

if tier == "premium":

211

return {"data": "Premium data with extra features"}

212

else:

213

return {"data": "Basic data"}

214

215

app.start()

216

```

217

218

### Database-Based Authentication

219

220

```python

221

from robyn import Robyn, AuthenticationHandler, BearerGetter, Identity

222

import sqlite3

223

import bcrypt

224

225

class DatabaseAuthenticationHandler(AuthenticationHandler):

226

def __init__(self, db_path: str):

227

self.db_path = db_path

228

super().__init__(BearerGetter())

229

self._init_db()

230

231

def _init_db(self):

232

# Initialize database with users and sessions tables

233

conn = sqlite3.connect(self.db_path)

234

cursor = conn.cursor()

235

236

cursor.execute('''

237

CREATE TABLE IF NOT EXISTS users (

238

id INTEGER PRIMARY KEY,

239

username TEXT UNIQUE,

240

email TEXT UNIQUE,

241

password_hash TEXT,

242

role TEXT DEFAULT 'user'

243

)

244

''')

245

246

cursor.execute('''

247

CREATE TABLE IF NOT EXISTS sessions (

248

token TEXT PRIMARY KEY,

249

user_id INTEGER,

250

expires_at DATETIME,

251

FOREIGN KEY (user_id) REFERENCES users (id)

252

)

253

''')

254

255

conn.commit()

256

conn.close()

257

258

def authenticate(self, request) -> Optional[Identity]:

259

token = self.token_getter.get_token(request)

260

261

if not token:

262

return None

263

264

conn = sqlite3.connect(self.db_path)

265

cursor = conn.cursor()

266

267

# Check if token is valid and not expired

268

cursor.execute('''

269

SELECT u.id, u.username, u.email, u.role

270

FROM users u

271

JOIN sessions s ON u.id = s.user_id

272

WHERE s.token = ? AND s.expires_at > datetime('now')

273

''', (token,))

274

275

result = cursor.fetchone()

276

conn.close()

277

278

if result:

279

user_id, username, email, role = result

280

return Identity(claims={

281

"user_id": str(user_id),

282

"username": username,

283

"email": email,

284

"role": role

285

})

286

287

return None

288

289

def create_user(self, username: str, email: str, password: str, role: str = "user"):

290

"""Helper method to create a new user"""

291

password_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())

292

293

conn = sqlite3.connect(self.db_path)

294

cursor = conn.cursor()

295

296

try:

297

cursor.execute('''

298

INSERT INTO users (username, email, password_hash, role)

299

VALUES (?, ?, ?, ?)

300

''', (username, email, password_hash, role))

301

conn.commit()

302

return cursor.lastrowid

303

except sqlite3.IntegrityError:

304

return None

305

finally:

306

conn.close()

307

308

def create_session(self, username: str, password: str) -> Optional[str]:

309

"""Helper method to create a session token"""

310

import secrets

311

from datetime import datetime, timedelta

312

313

conn = sqlite3.connect(self.db_path)

314

cursor = conn.cursor()

315

316

# Verify user credentials

317

cursor.execute('''

318

SELECT id, password_hash FROM users WHERE username = ?

319

''', (username,))

320

321

result = cursor.fetchone()

322

if not result:

323

conn.close()

324

return None

325

326

user_id, stored_hash = result

327

328

if not bcrypt.checkpw(password.encode('utf-8'), stored_hash):

329

conn.close()

330

return None

331

332

# Create session token

333

token = secrets.token_urlsafe(32)

334

expires_at = datetime.now() + timedelta(hours=24)

335

336

cursor.execute('''

337

INSERT INTO sessions (token, user_id, expires_at)

338

VALUES (?, ?, ?)

339

''', (token, user_id, expires_at))

340

341

conn.commit()

342

conn.close()

343

344

return token

345

346

app = Robyn(__file__)

347

348

# Configure database authentication

349

auth_handler = DatabaseAuthenticationHandler("users.db")

350

app.configure_authentication(auth_handler)

351

352

# Login endpoint

353

@app.post("/login")

354

def login(request):

355

data = request.json()

356

username = data.get("username")

357

password = data.get("password")

358

359

token = auth_handler.create_session(username, password)

360

361

if token:

362

return {"token": token, "message": "Login successful"}

363

else:

364

return Response(401, {}, {"error": "Invalid credentials"})

365

366

# Registration endpoint

367

@app.post("/register")

368

def register(request):

369

data = request.json()

370

username = data.get("username")

371

email = data.get("email")

372

password = data.get("password")

373

374

user_id = auth_handler.create_user(username, email, password)

375

376

if user_id:

377

return {"message": "User created successfully", "user_id": user_id}

378

else:

379

return Response(400, {}, {"error": "User already exists"})

380

381

# Protected endpoints

382

@app.get("/profile", auth_required=True)

383

def get_profile(request):

384

return {"user": request.identity.claims}

385

386

@app.get("/admin/users", auth_required=True)

387

def list_users(request):

388

if request.identity.claims.get("role") != "admin":

389

return Response(403, {}, {"error": "Admin access required"})

390

391

# Return list of users (implementation depends on your needs)

392

return {"users": ["user1", "user2"]}

393

394

app.start()

395

```

396

397

### Custom Token Extraction

398

399

```python

400

from robyn import Robyn, AuthenticationHandler, Identity

401

402

class CookieTokenGetter:

403

def __init__(self, cookie_name: str = "auth_token"):

404

self.cookie_name = cookie_name

405

406

def get_token(self, request) -> Optional[str]:

407

# Extract token from cookie

408

cookie_header = request.headers.get("Cookie")

409

if not cookie_header:

410

return None

411

412

# Simple cookie parsing (in production, use a proper cookie parser)

413

cookies = {}

414

for cookie in cookie_header.split(';'):

415

if '=' in cookie:

416

name, value = cookie.strip().split('=', 1)

417

cookies[name] = value

418

419

return cookies.get(self.cookie_name)

420

421

def set_token(self, request, token):

422

# This would typically be used in tests or internal processing

423

pass

424

425

class SessionAuthenticationHandler(AuthenticationHandler):

426

def __init__(self):

427

# Use custom cookie token getter

428

super().__init__(CookieTokenGetter("session_id"))

429

self.sessions = {} # In production, use Redis or database

430

431

def authenticate(self, request) -> Optional[Identity]:

432

session_id = self.token_getter.get_token(request)

433

434

if session_id and session_id in self.sessions:

435

user_data = self.sessions[session_id]

436

return Identity(claims=user_data)

437

438

return None

439

440

def create_session(self, user_data: dict) -> str:

441

import uuid

442

session_id = str(uuid.uuid4())

443

self.sessions[session_id] = user_data

444

return session_id

445

446

app = Robyn(__file__)

447

448

auth_handler = SessionAuthenticationHandler()

449

app.configure_authentication(auth_handler)

450

451

@app.post("/login")

452

def login(request):

453

# Simulate login validation

454

data = request.json()

455

username = data.get("username")

456

457

if username: # Simplified validation

458

session_id = auth_handler.create_session({

459

"username": username,

460

"user_id": "123",

461

"role": "user"

462

})

463

464

# Set session cookie in response

465

response = Response(200, {}, {"message": "Login successful"})

466

response.set_cookie("session_id", session_id)

467

return response

468

469

return Response(401, {}, {"error": "Invalid login"})

470

471

@app.get("/dashboard", auth_required=True)

472

def dashboard(request):

473

username = request.identity.claims["username"]

474

return {"message": f"Welcome to your dashboard, {username}!"}

475

476

app.start()

477

```