0
# Authentication
1
2
Pluggable authentication system with bearer token support, custom authentication handlers, and identity management for securing routes and WebSocket connections.
3
4
## Capabilities
5
6
### Authentication Handler
7
8
Abstract base class for implementing custom authentication logic.
9
10
```python { .api }
11
class AuthenticationHandler:
12
def __init__(self, token_getter: TokenGetter):
13
"""
14
Initialize authentication handler.
15
16
Args:
17
token_getter: TokenGetter instance for extracting tokens from requests
18
"""
19
20
def authenticate(self, request: Request) -> Optional[Identity]:
21
"""
22
Authenticate a request and return user identity.
23
24
Args:
25
request: HTTP request object
26
27
Returns:
28
Identity object if authentication successful, None otherwise
29
30
Note:
31
This is an abstract method that must be implemented by subclasses
32
"""
33
34
@property
35
def unauthorized_response(self) -> Response:
36
"""
37
Default response for unauthorized requests.
38
39
Returns:
40
Response object with 401 status and unauthorized message
41
"""
42
```
43
44
### Bearer Token Authentication
45
46
Built-in token getter for extracting Bearer tokens from Authorization headers.
47
48
```python { .api }
49
class BearerGetter:
50
def get_token(self, request: Request) -> Optional[str]:
51
"""
52
Extract Bearer token from Authorization header.
53
54
Args:
55
request: HTTP request object
56
57
Returns:
58
Token string if found, None otherwise
59
60
Example:
61
For header "Authorization: Bearer abc123", returns "abc123"
62
"""
63
64
def set_token(self, request: Request, token: str):
65
"""
66
Set Bearer token in request (for testing/internal use).
67
68
Args:
69
request: HTTP request object
70
token: Token string to set
71
"""
72
```
73
74
### Identity Object
75
76
Represents an authenticated user's identity and claims.
77
78
```python { .api }
79
@dataclass
80
class Identity:
81
claims: dict[str, str]
82
83
"""
84
User identity containing authentication claims.
85
86
Attributes:
87
claims: Dictionary of user claims (e.g., user_id, email, roles)
88
"""
89
```
90
91
## Usage Examples
92
93
### Basic Authentication Handler
94
95
```python
96
from robyn import Robyn, AuthenticationHandler, BearerGetter, Identity
97
import jwt
98
99
class JWTAuthenticationHandler(AuthenticationHandler):
100
def __init__(self, secret_key: str):
101
self.secret_key = secret_key
102
super().__init__(BearerGetter())
103
104
def authenticate(self, request) -> Optional[Identity]:
105
# Get token using the bearer token getter
106
token = self.token_getter.get_token(request)
107
108
if not token:
109
return None
110
111
try:
112
# Decode JWT token
113
payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
114
115
# Create identity from token payload
116
identity = Identity(claims={
117
"user_id": payload.get("user_id"),
118
"email": payload.get("email"),
119
"role": payload.get("role", "user")
120
})
121
122
return identity
123
124
except jwt.InvalidTokenError:
125
return None
126
127
app = Robyn(__file__)
128
129
# Configure authentication
130
auth_handler = JWTAuthenticationHandler("your-secret-key")
131
app.configure_authentication(auth_handler)
132
133
# Public route (no authentication required)
134
@app.get("/")
135
def public_route(request):
136
return {"message": "This is a public endpoint"}
137
138
# Protected route (authentication required)
139
@app.get("/profile", auth_required=True)
140
def get_profile(request):
141
# Access authenticated user's identity
142
user_identity = request.identity
143
return {
144
"user_id": user_identity.claims["user_id"],
145
"email": user_identity.claims["email"],
146
"role": user_identity.claims["role"]
147
}
148
149
# Protected route with role checking
150
@app.get("/admin", auth_required=True)
151
def admin_only(request):
152
user_identity = request.identity
153
154
if user_identity.claims.get("role") != "admin":
155
return Response(
156
status_code=403,
157
headers={},
158
description="Forbidden: Admin access required"
159
)
160
161
return {"message": "Welcome, admin!"}
162
163
app.start()
164
```
165
166
### API Key Authentication
167
168
```python
169
from robyn import Robyn, AuthenticationHandler, Identity
170
import hashlib
171
172
class APIKeyGetter:
173
def get_token(self, request):
174
# Get API key from X-API-Key header
175
return request.headers.get("X-API-Key")
176
177
def set_token(self, request, token):
178
request.headers.set("X-API-Key", token)
179
180
class APIKeyAuthenticationHandler(AuthenticationHandler):
181
def __init__(self, valid_api_keys: dict):
182
self.valid_api_keys = valid_api_keys # api_key -> user_info mapping
183
super().__init__(APIKeyGetter())
184
185
def authenticate(self, request) -> Optional[Identity]:
186
api_key = self.token_getter.get_token(request)
187
188
if not api_key or api_key not in self.valid_api_keys:
189
return None
190
191
user_info = self.valid_api_keys[api_key]
192
return Identity(claims=user_info)
193
194
app = Robyn(__file__)
195
196
# Configure API key authentication
197
valid_keys = {
198
"api_key_123": {"user_id": "user1", "name": "Alice", "tier": "premium"},
199
"api_key_456": {"user_id": "user2", "name": "Bob", "tier": "basic"}
200
}
201
202
auth_handler = APIKeyAuthenticationHandler(valid_keys)
203
app.configure_authentication(auth_handler)
204
205
@app.get("/api/data", auth_required=True)
206
def get_data(request):
207
user_identity = request.identity
208
tier = user_identity.claims.get("tier", "basic")
209
210
if tier == "premium":
211
return {"data": "Premium data with extra features"}
212
else:
213
return {"data": "Basic data"}
214
215
app.start()
216
```
217
218
### Database-Based Authentication
219
220
```python
221
from robyn import Robyn, AuthenticationHandler, BearerGetter, Identity
222
import sqlite3
223
import bcrypt
224
225
class DatabaseAuthenticationHandler(AuthenticationHandler):
226
def __init__(self, db_path: str):
227
self.db_path = db_path
228
super().__init__(BearerGetter())
229
self._init_db()
230
231
def _init_db(self):
232
# Initialize database with users and sessions tables
233
conn = sqlite3.connect(self.db_path)
234
cursor = conn.cursor()
235
236
cursor.execute('''
237
CREATE TABLE IF NOT EXISTS users (
238
id INTEGER PRIMARY KEY,
239
username TEXT UNIQUE,
240
email TEXT UNIQUE,
241
password_hash TEXT,
242
role TEXT DEFAULT 'user'
243
)
244
''')
245
246
cursor.execute('''
247
CREATE TABLE IF NOT EXISTS sessions (
248
token TEXT PRIMARY KEY,
249
user_id INTEGER,
250
expires_at DATETIME,
251
FOREIGN KEY (user_id) REFERENCES users (id)
252
)
253
''')
254
255
conn.commit()
256
conn.close()
257
258
def authenticate(self, request) -> Optional[Identity]:
259
token = self.token_getter.get_token(request)
260
261
if not token:
262
return None
263
264
conn = sqlite3.connect(self.db_path)
265
cursor = conn.cursor()
266
267
# Check if token is valid and not expired
268
cursor.execute('''
269
SELECT u.id, u.username, u.email, u.role
270
FROM users u
271
JOIN sessions s ON u.id = s.user_id
272
WHERE s.token = ? AND s.expires_at > datetime('now')
273
''', (token,))
274
275
result = cursor.fetchone()
276
conn.close()
277
278
if result:
279
user_id, username, email, role = result
280
return Identity(claims={
281
"user_id": str(user_id),
282
"username": username,
283
"email": email,
284
"role": role
285
})
286
287
return None
288
289
def create_user(self, username: str, email: str, password: str, role: str = "user"):
290
"""Helper method to create a new user"""
291
password_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
292
293
conn = sqlite3.connect(self.db_path)
294
cursor = conn.cursor()
295
296
try:
297
cursor.execute('''
298
INSERT INTO users (username, email, password_hash, role)
299
VALUES (?, ?, ?, ?)
300
''', (username, email, password_hash, role))
301
conn.commit()
302
return cursor.lastrowid
303
except sqlite3.IntegrityError:
304
return None
305
finally:
306
conn.close()
307
308
def create_session(self, username: str, password: str) -> Optional[str]:
309
"""Helper method to create a session token"""
310
import secrets
311
from datetime import datetime, timedelta
312
313
conn = sqlite3.connect(self.db_path)
314
cursor = conn.cursor()
315
316
# Verify user credentials
317
cursor.execute('''
318
SELECT id, password_hash FROM users WHERE username = ?
319
''', (username,))
320
321
result = cursor.fetchone()
322
if not result:
323
conn.close()
324
return None
325
326
user_id, stored_hash = result
327
328
if not bcrypt.checkpw(password.encode('utf-8'), stored_hash):
329
conn.close()
330
return None
331
332
# Create session token
333
token = secrets.token_urlsafe(32)
334
expires_at = datetime.now() + timedelta(hours=24)
335
336
cursor.execute('''
337
INSERT INTO sessions (token, user_id, expires_at)
338
VALUES (?, ?, ?)
339
''', (token, user_id, expires_at))
340
341
conn.commit()
342
conn.close()
343
344
return token
345
346
app = Robyn(__file__)
347
348
# Configure database authentication
349
auth_handler = DatabaseAuthenticationHandler("users.db")
350
app.configure_authentication(auth_handler)
351
352
# Login endpoint
353
@app.post("/login")
354
def login(request):
355
data = request.json()
356
username = data.get("username")
357
password = data.get("password")
358
359
token = auth_handler.create_session(username, password)
360
361
if token:
362
return {"token": token, "message": "Login successful"}
363
else:
364
return Response(401, {}, {"error": "Invalid credentials"})
365
366
# Registration endpoint
367
@app.post("/register")
368
def register(request):
369
data = request.json()
370
username = data.get("username")
371
email = data.get("email")
372
password = data.get("password")
373
374
user_id = auth_handler.create_user(username, email, password)
375
376
if user_id:
377
return {"message": "User created successfully", "user_id": user_id}
378
else:
379
return Response(400, {}, {"error": "User already exists"})
380
381
# Protected endpoints
382
@app.get("/profile", auth_required=True)
383
def get_profile(request):
384
return {"user": request.identity.claims}
385
386
@app.get("/admin/users", auth_required=True)
387
def list_users(request):
388
if request.identity.claims.get("role") != "admin":
389
return Response(403, {}, {"error": "Admin access required"})
390
391
# Return list of users (implementation depends on your needs)
392
return {"users": ["user1", "user2"]}
393
394
app.start()
395
```
396
397
### Custom Token Extraction
398
399
```python
400
from robyn import Robyn, AuthenticationHandler, Identity
401
402
class CookieTokenGetter:
403
def __init__(self, cookie_name: str = "auth_token"):
404
self.cookie_name = cookie_name
405
406
def get_token(self, request) -> Optional[str]:
407
# Extract token from cookie
408
cookie_header = request.headers.get("Cookie")
409
if not cookie_header:
410
return None
411
412
# Simple cookie parsing (in production, use a proper cookie parser)
413
cookies = {}
414
for cookie in cookie_header.split(';'):
415
if '=' in cookie:
416
name, value = cookie.strip().split('=', 1)
417
cookies[name] = value
418
419
return cookies.get(self.cookie_name)
420
421
def set_token(self, request, token):
422
# This would typically be used in tests or internal processing
423
pass
424
425
class SessionAuthenticationHandler(AuthenticationHandler):
426
def __init__(self):
427
# Use custom cookie token getter
428
super().__init__(CookieTokenGetter("session_id"))
429
self.sessions = {} # In production, use Redis or database
430
431
def authenticate(self, request) -> Optional[Identity]:
432
session_id = self.token_getter.get_token(request)
433
434
if session_id and session_id in self.sessions:
435
user_data = self.sessions[session_id]
436
return Identity(claims=user_data)
437
438
return None
439
440
def create_session(self, user_data: dict) -> str:
441
import uuid
442
session_id = str(uuid.uuid4())
443
self.sessions[session_id] = user_data
444
return session_id
445
446
app = Robyn(__file__)
447
448
auth_handler = SessionAuthenticationHandler()
449
app.configure_authentication(auth_handler)
450
451
@app.post("/login")
452
def login(request):
453
# Simulate login validation
454
data = request.json()
455
username = data.get("username")
456
457
if username: # Simplified validation
458
session_id = auth_handler.create_session({
459
"username": username,
460
"user_id": "123",
461
"role": "user"
462
})
463
464
# Set session cookie in response
465
response = Response(200, {}, {"message": "Login successful"})
466
response.set_cookie("session_id", session_id)
467
return response
468
469
return Response(401, {}, {"error": "Invalid login"})
470
471
@app.get("/dashboard", auth_required=True)
472
def dashboard(request):
473
username = request.identity.claims["username"]
474
return {"message": f"Welcome to your dashboard, {username}!"}
475
476
app.start()
477
```