0
# JWT Operations
1
2
JSON Web Token parsing, payload extraction, validation, and signature verification for AT Protocol authentication and authorization. These operations handle JWT lifecycle management including creation, validation, and verification of tokens used in AT Protocol sessions.
3
4
## Capabilities
5
6
### JWT Payload Structure
7
8
AT Protocol JWT payloads follow RFC 7519 standards with additional AT Protocol-specific fields.
9
10
```python { .api }
11
class JwtPayload:
12
"""
13
JWT payload structure based on RFC 7519 with AT Protocol extensions.
14
15
Attributes:
16
iss (Optional[str]): Issuer (DID) - who issued the token
17
sub (Optional[str]): Subject (DID) - who the token is about
18
aud (Optional[Union[str, List[str]]]): Audience (DID) - intended recipients
19
jti (Optional[str]): JWT ID - unique identifier for this token
20
nbf (Optional[int]): Not Before - earliest time token is valid (Unix timestamp)
21
exp (Optional[int]): Expiration Time - when token expires (Unix timestamp)
22
iat (Optional[int]): Issued At - when token was issued (Unix timestamp)
23
scope (Optional[str]): ATProto-specific scope for permissions
24
"""
25
iss: Optional[str]
26
sub: Optional[str]
27
aud: Optional[Union[str, List[str]]]
28
jti: Optional[str]
29
nbf: Optional[int]
30
exp: Optional[int]
31
iat: Optional[int]
32
scope: Optional[str]
33
34
def is_expired(self, current_time: Optional[int] = None) -> bool:
35
"""
36
Check if token is expired.
37
38
Args:
39
current_time (int, optional): Current Unix timestamp (default: now)
40
41
Returns:
42
bool: True if token is expired
43
"""
44
45
def is_valid_at_time(self, timestamp: int) -> bool:
46
"""
47
Check if token is valid at specific timestamp.
48
49
Args:
50
timestamp (int): Unix timestamp to check
51
52
Returns:
53
bool: True if token is valid at given time
54
"""
55
56
def time_until_expiration(self) -> Optional[int]:
57
"""
58
Get seconds until token expires.
59
60
Returns:
61
Optional[int]: Seconds until expiration or None if no expiration
62
"""
63
64
def validate_audience(self, expected_audience: Union[str, List[str]]) -> bool:
65
"""
66
Validate token audience against expected values.
67
68
Args:
69
expected_audience (Union[str, List[str]]): Expected audience values
70
71
Returns:
72
bool: True if audience matches expectations
73
"""
74
```
75
76
### JWT Parsing
77
78
Parse JWT tokens into their component parts for inspection and validation.
79
80
```python { .api }
81
def parse_jwt(jwt: Union[str, bytes]) -> Tuple[bytes, bytes, Dict[str, Any], bytes]:
82
"""
83
Parse JWT into components without verification.
84
85
Args:
86
jwt (Union[str, bytes]): JWT token to parse
87
88
Returns:
89
Tuple[bytes, bytes, Dict[str, Any], bytes]:
90
(payload_bytes, signing_input, header_dict, signature_bytes)
91
92
Raises:
93
JwtParsingError: If JWT format is invalid
94
"""
95
96
def get_jwt_payload(jwt: Union[str, bytes]) -> JwtPayload:
97
"""
98
Extract and validate JWT payload.
99
100
Args:
101
jwt (Union[str, bytes]): JWT token
102
103
Returns:
104
JwtPayload: Decoded and validated payload
105
106
Raises:
107
JwtParsingError: If JWT format is invalid
108
JwtValidationError: If payload validation fails
109
"""
110
111
def decode_jwt_payload(jwt: Union[str, bytes]) -> JwtPayload:
112
"""
113
Decode JWT payload without validation.
114
115
Args:
116
jwt (Union[str, bytes]): JWT token
117
118
Returns:
119
JwtPayload: Decoded payload (may be invalid/expired)
120
121
Raises:
122
JwtParsingError: If JWT format is invalid
123
"""
124
```
125
126
Usage examples:
127
128
```python
129
from atproto import parse_jwt, get_jwt_payload, decode_jwt_payload, JwtPayload
130
131
# Example JWT token (access token from AT Protocol)
132
jwt_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ.eyJpc3MiOiJkaWQ6cGxjOmFsaWNlMTIzIiwic3ViIjoiZGlkOnBsYzphbGljZTEyMyIsImF1ZCI6ImRpZDp3ZWI6YnNreS5zb2NpYWwiLCJpYXQiOjE2ODAwMDAwMDAsImV4cCI6MTY4MDAwMzYwMCwic2NvcGUiOiJjb20uYXRwcm90by5hY2Nlc3MifQ.signature"
133
134
# Parse JWT components
135
payload_bytes, signing_input, header, signature = parse_jwt(jwt_token)
136
print(f"Header: {header}")
137
print(f"Payload bytes length: {len(payload_bytes)}")
138
print(f"Signing input length: {len(signing_input)}")
139
140
# Get validated payload
141
try:
142
payload = get_jwt_payload(jwt_token)
143
print(f"Issuer: {payload.iss}")
144
print(f"Subject: {payload.sub}")
145
print(f"Audience: {payload.aud}")
146
print(f"Scope: {payload.scope}")
147
print(f"Expires at: {payload.exp}")
148
149
# Check expiration
150
if payload.is_expired():
151
print("⚠️ Token is expired")
152
else:
153
print(f"✓ Token valid for {payload.time_until_expiration()} more seconds")
154
155
except JwtValidationError as e:
156
print(f"JWT validation failed: {e}")
157
158
# Decode without validation (useful for debugging)
159
payload_unvalidated = decode_jwt_payload(jwt_token)
160
print(f"Raw payload (unvalidated): {payload_unvalidated}")
161
```
162
163
### JWT Validation
164
165
Validate JWT payload claims according to AT Protocol requirements.
166
167
```python { .api }
168
def validate_jwt_payload(payload: JwtPayload,
169
expected_audience: Optional[Union[str, List[str]]] = None,
170
expected_issuer: Optional[str] = None,
171
current_time: Optional[int] = None) -> bool:
172
"""
173
Validate JWT payload claims.
174
175
Args:
176
payload (JwtPayload): Payload to validate
177
expected_audience (Union[str, List[str]], optional): Expected audience
178
expected_issuer (str, optional): Expected issuer DID
179
current_time (int, optional): Current timestamp for time validation
180
181
Returns:
182
bool: True if payload is valid
183
184
Raises:
185
JwtValidationError: If validation fails with details
186
"""
187
188
def validate_access_token_payload(payload: JwtPayload, pds_did: str) -> bool:
189
"""
190
Validate access token payload for AT Protocol.
191
192
Args:
193
payload (JwtPayload): Access token payload
194
pds_did (str): Expected PDS DID as audience
195
196
Returns:
197
bool: True if valid access token
198
199
Raises:
200
JwtValidationError: If access token validation fails
201
"""
202
203
def validate_refresh_token_payload(payload: JwtPayload) -> bool:
204
"""
205
Validate refresh token payload for AT Protocol.
206
207
Args:
208
payload (JwtPayload): Refresh token payload
209
210
Returns:
211
bool: True if valid refresh token
212
213
Raises:
214
JwtValidationError: If refresh token validation fails
215
"""
216
```
217
218
Usage examples:
219
220
```python
221
from atproto import (
222
validate_jwt_payload, validate_access_token_payload,
223
validate_refresh_token_payload, get_jwt_payload
224
)
225
import time
226
227
# Validate general JWT payload
228
jwt_token = "..." # Your JWT token
229
payload = get_jwt_payload(jwt_token)
230
231
# Basic validation
232
try:
233
is_valid = validate_jwt_payload(
234
payload,
235
expected_audience="did:web:bsky.social",
236
expected_issuer="did:plc:alice123",
237
current_time=int(time.time())
238
)
239
print(f"JWT is valid: {is_valid}")
240
except JwtValidationError as e:
241
print(f"Validation failed: {e}")
242
243
# Validate AT Protocol access token
244
access_token = "..." # Access token JWT
245
access_payload = get_jwt_payload(access_token)
246
247
try:
248
is_valid_access = validate_access_token_payload(
249
access_payload,
250
pds_did="did:web:alice.pds.example.com"
251
)
252
print(f"Access token is valid: {is_valid_access}")
253
except JwtValidationError as e:
254
print(f"Access token validation failed: {e}")
255
256
# Validate refresh token
257
refresh_token = "..." # Refresh token JWT
258
refresh_payload = get_jwt_payload(refresh_token)
259
260
try:
261
is_valid_refresh = validate_refresh_token_payload(refresh_payload)
262
print(f"Refresh token is valid: {is_valid_refresh}")
263
except JwtValidationError as e:
264
print(f"Refresh token validation failed: {e}")
265
```
266
267
### JWT Signature Verification
268
269
Verify JWT signatures using cryptographic keys to ensure token authenticity.
270
271
```python { .api }
272
def verify_jwt(jwt: Union[str, bytes], signing_key: str) -> bool:
273
"""
274
Verify JWT signature synchronously.
275
276
Args:
277
jwt (Union[str, bytes]): JWT token to verify
278
signing_key (str): DID key or public key for verification
279
280
Returns:
281
bool: True if signature is valid
282
283
Raises:
284
JwtVerificationError: If signature verification fails
285
UnsupportedAlgorithmError: If JWT algorithm is not supported
286
"""
287
288
def verify_jwt_async(jwt: Union[str, bytes], signing_key: str) -> bool:
289
"""
290
Verify JWT signature asynchronously.
291
292
Args:
293
jwt (Union[str, bytes]): JWT token to verify
294
signing_key (str): DID key or public key for verification
295
296
Returns:
297
bool: True if signature is valid
298
299
Raises:
300
JwtVerificationError: If signature verification fails
301
"""
302
```
303
304
Usage examples:
305
306
```python
307
from atproto import verify_jwt, verify_jwt_async, get_jwt_payload
308
import asyncio
309
310
# Synchronous JWT verification
311
jwt_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ..."
312
signing_key = "did:key:z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK"
313
314
try:
315
is_signature_valid = verify_jwt(jwt_token, signing_key)
316
if is_signature_valid:
317
print("✓ JWT signature is valid")
318
319
# Now safe to trust the payload
320
payload = get_jwt_payload(jwt_token)
321
print(f"Verified token from: {payload.iss}")
322
else:
323
print("✗ JWT signature is invalid")
324
325
except JwtVerificationError as e:
326
print(f"Signature verification failed: {e}")
327
except UnsupportedAlgorithmError as e:
328
print(f"Unsupported algorithm: {e}")
329
330
# Asynchronous JWT verification
331
async def verify_jwt_async_example():
332
try:
333
is_valid = await verify_jwt_async(jwt_token, signing_key)
334
if is_valid:
335
print("✓ Async JWT signature verification successful")
336
else:
337
print("✗ Async JWT signature verification failed")
338
except Exception as e:
339
print(f"Async verification error: {e}")
340
341
asyncio.run(verify_jwt_async_example())
342
```
343
344
### Complete JWT Verification Workflow
345
346
Comprehensive JWT verification combining parsing, validation, and signature verification.
347
348
```python { .api }
349
def verify_and_validate_jwt(jwt: Union[str, bytes],
350
signing_key: str,
351
expected_audience: Optional[Union[str, List[str]]] = None,
352
expected_issuer: Optional[str] = None) -> JwtPayload:
353
"""
354
Complete JWT verification and validation workflow.
355
356
Args:
357
jwt (Union[str, bytes]): JWT token
358
signing_key (str): DID key for signature verification
359
expected_audience (Union[str, List[str]], optional): Expected audience
360
expected_issuer (str, optional): Expected issuer
361
362
Returns:
363
JwtPayload: Verified and validated payload
364
365
Raises:
366
JwtVerificationError: If signature verification fails
367
JwtValidationError: If payload validation fails
368
JwtParsingError: If JWT parsing fails
369
"""
370
371
def verify_at_protocol_session_token(access_token: Union[str, bytes],
372
user_did: str,
373
pds_did: str,
374
signing_key: str) -> JwtPayload:
375
"""
376
Verify AT Protocol session access token.
377
378
Args:
379
access_token (Union[str, bytes]): Access token to verify
380
user_did (str): Expected user DID (subject)
381
pds_did (str): Expected PDS DID (audience)
382
signing_key (str): User's signing key for verification
383
384
Returns:
385
JwtPayload: Verified access token payload
386
387
Raises:
388
JwtVerificationError: If token verification fails
389
"""
390
```
391
392
Usage examples:
393
394
```python
395
from atproto import (
396
verify_and_validate_jwt, verify_at_protocol_session_token,
397
JwtVerificationError, JwtValidationError
398
)
399
400
# Complete JWT verification workflow
401
def authenticate_request(jwt_token, user_signing_key):
402
"""Example request authentication using JWT."""
403
try:
404
# Verify signature and validate claims
405
payload = verify_and_validate_jwt(
406
jwt_token,
407
signing_key=user_signing_key,
408
expected_audience="did:web:bsky.social",
409
expected_issuer="did:plc:alice123"
410
)
411
412
print(f"✓ Authenticated user: {payload.sub}")
413
print(f"✓ Token scope: {payload.scope}")
414
print(f"✓ Valid until: {payload.exp}")
415
416
return {'authenticated': True, 'user_did': payload.sub, 'scope': payload.scope}
417
418
except JwtVerificationError:
419
print("✗ JWT signature verification failed")
420
return {'authenticated': False, 'error': 'invalid_signature'}
421
422
except JwtValidationError as e:
423
print(f"✗ JWT validation failed: {e}")
424
return {'authenticated': False, 'error': 'invalid_claims'}
425
426
except Exception as e:
427
print(f"✗ Authentication error: {e}")
428
return {'authenticated': False, 'error': 'authentication_failed'}
429
430
# AT Protocol session verification
431
def verify_session_token(access_token, user_did, pds_did, signing_key):
432
"""Verify AT Protocol session access token."""
433
try:
434
payload = verify_at_protocol_session_token(
435
access_token=access_token,
436
user_did=user_did,
437
pds_did=pds_did,
438
signing_key=signing_key
439
)
440
441
print(f"✓ Valid session for user: {payload.sub}")
442
print(f"✓ Session scope: {payload.scope}")
443
444
return payload
445
446
except JwtVerificationError as e:
447
print(f"✗ Session token verification failed: {e}")
448
raise
449
450
# Example usage
451
jwt_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ..."
452
user_signing_key = "did:key:z6MkhaXgBZDv..."
453
454
# Authenticate request
455
auth_result = authenticate_request(jwt_token, user_signing_key)
456
if auth_result['authenticated']:
457
print(f"User {auth_result['user_did']} authenticated with scope {auth_result['scope']}")
458
459
# Verify session token
460
try:
461
session_payload = verify_session_token(
462
access_token=jwt_token,
463
user_did="did:plc:alice123",
464
pds_did="did:web:alice.pds.example.com",
465
signing_key=user_signing_key
466
)
467
print("Session verification successful")
468
except Exception as e:
469
print(f"Session verification failed: {e}")
470
```
471
472
### JWT Utilities
473
474
Utility functions for working with JWT tokens in AT Protocol contexts.
475
476
```python { .api }
477
def extract_did_from_jwt(jwt: Union[str, bytes]) -> Optional[str]:
478
"""
479
Extract DID from JWT issuer claim.
480
481
Args:
482
jwt (Union[str, bytes]): JWT token
483
484
Returns:
485
Optional[str]: Issuer DID or None if not found
486
"""
487
488
def get_jwt_algorithm(jwt: Union[str, bytes]) -> str:
489
"""
490
Get algorithm from JWT header.
491
492
Args:
493
jwt (Union[str, bytes]): JWT token
494
495
Returns:
496
str: Algorithm identifier (e.g., 'ES256K', 'EdDSA')
497
498
Raises:
499
JwtParsingError: If header cannot be parsed
500
"""
501
502
def is_access_token(jwt: Union[str, bytes]) -> bool:
503
"""
504
Check if JWT is an AT Protocol access token.
505
506
Args:
507
jwt (Union[str, bytes]): JWT token
508
509
Returns:
510
bool: True if appears to be access token
511
"""
512
513
def is_refresh_token(jwt: Union[str, bytes]) -> bool:
514
"""
515
Check if JWT is an AT Protocol refresh token.
516
517
Args:
518
jwt (Union[str, bytes]): JWT token
519
520
Returns:
521
bool: True if appears to be refresh token
522
"""
523
524
def get_token_expiration_time(jwt: Union[str, bytes]) -> Optional[int]:
525
"""
526
Get token expiration timestamp.
527
528
Args:
529
jwt (Union[str, bytes]): JWT token
530
531
Returns:
532
Optional[int]: Unix timestamp of expiration or None
533
"""
534
```
535
536
Usage examples:
537
538
```python
539
from atproto import (
540
extract_did_from_jwt, get_jwt_algorithm, is_access_token,
541
is_refresh_token, get_token_expiration_time
542
)
543
from datetime import datetime
544
545
# JWT utility functions
546
access_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ..."
547
refresh_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ..."
548
549
# Extract information from tokens
550
issuer_did = extract_did_from_jwt(access_token)
551
print(f"Token issued by: {issuer_did}")
552
553
algorithm = get_jwt_algorithm(access_token)
554
print(f"Token algorithm: {algorithm}")
555
556
# Check token types
557
print(f"Is access token: {is_access_token(access_token)}")
558
print(f"Is refresh token: {is_refresh_token(access_token)}")
559
560
# Get expiration time
561
exp_timestamp = get_token_expiration_time(access_token)
562
if exp_timestamp:
563
exp_datetime = datetime.fromtimestamp(exp_timestamp)
564
print(f"Token expires: {exp_datetime}")
565
```
566
567
### Error Handling
568
569
```python { .api }
570
class JwtError(Exception):
571
"""Base exception for JWT operations."""
572
573
class JwtParsingError(JwtError):
574
"""Raised when JWT parsing fails."""
575
576
class JwtValidationError(JwtError):
577
"""Raised when JWT validation fails."""
578
579
class JwtVerificationError(JwtError):
580
"""Raised when JWT signature verification fails."""
581
582
class UnsupportedAlgorithmError(JwtError):
583
"""Raised when JWT uses unsupported algorithm."""
584
585
class ExpiredTokenError(JwtValidationError):
586
"""Raised when JWT is expired."""
587
588
class InvalidAudienceError(JwtValidationError):
589
"""Raised when JWT audience is invalid."""
590
591
class InvalidIssuerError(JwtValidationError):
592
"""Raised when JWT issuer is invalid."""
593
```
594
595
Comprehensive error handling example:
596
597
```python
598
from atproto import (
599
verify_and_validate_jwt,
600
JwtParsingError, JwtValidationError, JwtVerificationError,
601
ExpiredTokenError, InvalidAudienceError, UnsupportedAlgorithmError
602
)
603
604
def robust_jwt_verification(jwt_token, signing_key, expected_audience=None):
605
"""Robust JWT verification with detailed error handling."""
606
try:
607
payload = verify_and_validate_jwt(
608
jwt_token,
609
signing_key,
610
expected_audience=expected_audience
611
)
612
613
return {
614
'success': True,
615
'payload': payload,
616
'user_did': payload.sub,
617
'expires_at': payload.exp
618
}
619
620
except JwtParsingError as e:
621
return {
622
'success': False,
623
'error': 'malformed_token',
624
'message': f'Token parsing failed: {e}'
625
}
626
627
except ExpiredTokenError as e:
628
return {
629
'success': False,
630
'error': 'token_expired',
631
'message': 'Token has expired'
632
}
633
634
except InvalidAudienceError as e:
635
return {
636
'success': False,
637
'error': 'invalid_audience',
638
'message': f'Token audience mismatch: {e}'
639
}
640
641
except JwtVerificationError as e:
642
return {
643
'success': False,
644
'error': 'signature_invalid',
645
'message': 'Token signature verification failed'
646
}
647
648
except UnsupportedAlgorithmError as e:
649
return {
650
'success': False,
651
'error': 'unsupported_algorithm',
652
'message': f'Token uses unsupported algorithm: {e}'
653
}
654
655
except JwtValidationError as e:
656
return {
657
'success': False,
658
'error': 'validation_failed',
659
'message': f'Token validation failed: {e}'
660
}
661
662
except Exception as e:
663
return {
664
'success': False,
665
'error': 'unknown_error',
666
'message': f'Unexpected error: {e}'
667
}
668
669
# Usage with error handling
670
jwt_token = "..."
671
signing_key = "did:key:..."
672
673
result = robust_jwt_verification(jwt_token, signing_key, "did:web:bsky.social")
674
675
if result['success']:
676
print(f"✓ JWT verified for user: {result['user_did']}")
677
else:
678
print(f"✗ JWT verification failed: {result['error']} - {result['message']}")
679
```