or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-operations.mdcore-functionality.mdcryptographic-operations.mdidentity-resolution.mdindex.mdjwt-operations.mdreal-time-streaming.md

jwt-operations.mddocs/

0

# JWT Operations

1

2

JSON Web Token parsing, payload extraction, validation, and signature verification for AT Protocol authentication and authorization. These operations handle JWT lifecycle management including creation, validation, and verification of tokens used in AT Protocol sessions.

3

4

## Capabilities

5

6

### JWT Payload Structure

7

8

AT Protocol JWT payloads follow RFC 7519 standards with additional AT Protocol-specific fields.

9

10

```python { .api }

11

class JwtPayload:

12

"""

13

JWT payload structure based on RFC 7519 with AT Protocol extensions.

14

15

Attributes:

16

iss (Optional[str]): Issuer (DID) - who issued the token

17

sub (Optional[str]): Subject (DID) - who the token is about

18

aud (Optional[Union[str, List[str]]]): Audience (DID) - intended recipients

19

jti (Optional[str]): JWT ID - unique identifier for this token

20

nbf (Optional[int]): Not Before - earliest time token is valid (Unix timestamp)

21

exp (Optional[int]): Expiration Time - when token expires (Unix timestamp)

22

iat (Optional[int]): Issued At - when token was issued (Unix timestamp)

23

scope (Optional[str]): ATProto-specific scope for permissions

24

"""

25

iss: Optional[str]

26

sub: Optional[str]

27

aud: Optional[Union[str, List[str]]]

28

jti: Optional[str]

29

nbf: Optional[int]

30

exp: Optional[int]

31

iat: Optional[int]

32

scope: Optional[str]

33

34

def is_expired(self, current_time: Optional[int] = None) -> bool:

35

"""

36

Check if token is expired.

37

38

Args:

39

current_time (int, optional): Current Unix timestamp (default: now)

40

41

Returns:

42

bool: True if token is expired

43

"""

44

45

def is_valid_at_time(self, timestamp: int) -> bool:

46

"""

47

Check if token is valid at specific timestamp.

48

49

Args:

50

timestamp (int): Unix timestamp to check

51

52

Returns:

53

bool: True if token is valid at given time

54

"""

55

56

def time_until_expiration(self) -> Optional[int]:

57

"""

58

Get seconds until token expires.

59

60

Returns:

61

Optional[int]: Seconds until expiration or None if no expiration

62

"""

63

64

def validate_audience(self, expected_audience: Union[str, List[str]]) -> bool:

65

"""

66

Validate token audience against expected values.

67

68

Args:

69

expected_audience (Union[str, List[str]]): Expected audience values

70

71

Returns:

72

bool: True if audience matches expectations

73

"""

74

```

75

76

### JWT Parsing

77

78

Parse JWT tokens into their component parts for inspection and validation.

79

80

```python { .api }

81

def parse_jwt(jwt: Union[str, bytes]) -> Tuple[bytes, bytes, Dict[str, Any], bytes]:

82

"""

83

Parse JWT into components without verification.

84

85

Args:

86

jwt (Union[str, bytes]): JWT token to parse

87

88

Returns:

89

Tuple[bytes, bytes, Dict[str, Any], bytes]:

90

(payload_bytes, signing_input, header_dict, signature_bytes)

91

92

Raises:

93

JwtParsingError: If JWT format is invalid

94

"""

95

96

def get_jwt_payload(jwt: Union[str, bytes]) -> JwtPayload:

97

"""

98

Extract and validate JWT payload.

99

100

Args:

101

jwt (Union[str, bytes]): JWT token

102

103

Returns:

104

JwtPayload: Decoded and validated payload

105

106

Raises:

107

JwtParsingError: If JWT format is invalid

108

JwtValidationError: If payload validation fails

109

"""

110

111

def decode_jwt_payload(jwt: Union[str, bytes]) -> JwtPayload:

112

"""

113

Decode JWT payload without validation.

114

115

Args:

116

jwt (Union[str, bytes]): JWT token

117

118

Returns:

119

JwtPayload: Decoded payload (may be invalid/expired)

120

121

Raises:

122

JwtParsingError: If JWT format is invalid

123

"""

124

```

125

126

Usage examples:

127

128

```python

129

from atproto import parse_jwt, get_jwt_payload, decode_jwt_payload, JwtPayload

130

131

# Example JWT token (access token from AT Protocol)

132

jwt_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ.eyJpc3MiOiJkaWQ6cGxjOmFsaWNlMTIzIiwic3ViIjoiZGlkOnBsYzphbGljZTEyMyIsImF1ZCI6ImRpZDp3ZWI6YnNreS5zb2NpYWwiLCJpYXQiOjE2ODAwMDAwMDAsImV4cCI6MTY4MDAwMzYwMCwic2NvcGUiOiJjb20uYXRwcm90by5hY2Nlc3MifQ.signature"

133

134

# Parse JWT components

135

payload_bytes, signing_input, header, signature = parse_jwt(jwt_token)

136

print(f"Header: {header}")

137

print(f"Payload bytes length: {len(payload_bytes)}")

138

print(f"Signing input length: {len(signing_input)}")

139

140

# Get validated payload

141

try:

142

payload = get_jwt_payload(jwt_token)

143

print(f"Issuer: {payload.iss}")

144

print(f"Subject: {payload.sub}")

145

print(f"Audience: {payload.aud}")

146

print(f"Scope: {payload.scope}")

147

print(f"Expires at: {payload.exp}")

148

149

# Check expiration

150

if payload.is_expired():

151

print("⚠️ Token is expired")

152

else:

153

print(f"✓ Token valid for {payload.time_until_expiration()} more seconds")

154

155

except JwtValidationError as e:

156

print(f"JWT validation failed: {e}")

157

158

# Decode without validation (useful for debugging)

159

payload_unvalidated = decode_jwt_payload(jwt_token)

160

print(f"Raw payload (unvalidated): {payload_unvalidated}")

161

```

162

163

### JWT Validation

164

165

Validate JWT payload claims according to AT Protocol requirements.

166

167

```python { .api }

168

def validate_jwt_payload(payload: JwtPayload,

169

expected_audience: Optional[Union[str, List[str]]] = None,

170

expected_issuer: Optional[str] = None,

171

current_time: Optional[int] = None) -> bool:

172

"""

173

Validate JWT payload claims.

174

175

Args:

176

payload (JwtPayload): Payload to validate

177

expected_audience (Union[str, List[str]], optional): Expected audience

178

expected_issuer (str, optional): Expected issuer DID

179

current_time (int, optional): Current timestamp for time validation

180

181

Returns:

182

bool: True if payload is valid

183

184

Raises:

185

JwtValidationError: If validation fails with details

186

"""

187

188

def validate_access_token_payload(payload: JwtPayload, pds_did: str) -> bool:

189

"""

190

Validate access token payload for AT Protocol.

191

192

Args:

193

payload (JwtPayload): Access token payload

194

pds_did (str): Expected PDS DID as audience

195

196

Returns:

197

bool: True if valid access token

198

199

Raises:

200

JwtValidationError: If access token validation fails

201

"""

202

203

def validate_refresh_token_payload(payload: JwtPayload) -> bool:

204

"""

205

Validate refresh token payload for AT Protocol.

206

207

Args:

208

payload (JwtPayload): Refresh token payload

209

210

Returns:

211

bool: True if valid refresh token

212

213

Raises:

214

JwtValidationError: If refresh token validation fails

215

"""

216

```

217

218

Usage examples:

219

220

```python

221

from atproto import (

222

validate_jwt_payload, validate_access_token_payload,

223

validate_refresh_token_payload, get_jwt_payload

224

)

225

import time

226

227

# Validate general JWT payload

228

jwt_token = "..." # Your JWT token

229

payload = get_jwt_payload(jwt_token)

230

231

# Basic validation

232

try:

233

is_valid = validate_jwt_payload(

234

payload,

235

expected_audience="did:web:bsky.social",

236

expected_issuer="did:plc:alice123",

237

current_time=int(time.time())

238

)

239

print(f"JWT is valid: {is_valid}")

240

except JwtValidationError as e:

241

print(f"Validation failed: {e}")

242

243

# Validate AT Protocol access token

244

access_token = "..." # Access token JWT

245

access_payload = get_jwt_payload(access_token)

246

247

try:

248

is_valid_access = validate_access_token_payload(

249

access_payload,

250

pds_did="did:web:alice.pds.example.com"

251

)

252

print(f"Access token is valid: {is_valid_access}")

253

except JwtValidationError as e:

254

print(f"Access token validation failed: {e}")

255

256

# Validate refresh token

257

refresh_token = "..." # Refresh token JWT

258

refresh_payload = get_jwt_payload(refresh_token)

259

260

try:

261

is_valid_refresh = validate_refresh_token_payload(refresh_payload)

262

print(f"Refresh token is valid: {is_valid_refresh}")

263

except JwtValidationError as e:

264

print(f"Refresh token validation failed: {e}")

265

```

266

267

### JWT Signature Verification

268

269

Verify JWT signatures using cryptographic keys to ensure token authenticity.

270

271

```python { .api }

272

def verify_jwt(jwt: Union[str, bytes], signing_key: str) -> bool:

273

"""

274

Verify JWT signature synchronously.

275

276

Args:

277

jwt (Union[str, bytes]): JWT token to verify

278

signing_key (str): DID key or public key for verification

279

280

Returns:

281

bool: True if signature is valid

282

283

Raises:

284

JwtVerificationError: If signature verification fails

285

UnsupportedAlgorithmError: If JWT algorithm is not supported

286

"""

287

288

def verify_jwt_async(jwt: Union[str, bytes], signing_key: str) -> bool:

289

"""

290

Verify JWT signature asynchronously.

291

292

Args:

293

jwt (Union[str, bytes]): JWT token to verify

294

signing_key (str): DID key or public key for verification

295

296

Returns:

297

bool: True if signature is valid

298

299

Raises:

300

JwtVerificationError: If signature verification fails

301

"""

302

```

303

304

Usage examples:

305

306

```python

307

from atproto import verify_jwt, verify_jwt_async, get_jwt_payload

308

import asyncio

309

310

# Synchronous JWT verification

311

jwt_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ..."

312

signing_key = "did:key:z6MkhaXgBZDvotDkL5257faiztiGiC2QtKLGpbnnEGta2doK"

313

314

try:

315

is_signature_valid = verify_jwt(jwt_token, signing_key)

316

if is_signature_valid:

317

print("✓ JWT signature is valid")

318

319

# Now safe to trust the payload

320

payload = get_jwt_payload(jwt_token)

321

print(f"Verified token from: {payload.iss}")

322

else:

323

print("✗ JWT signature is invalid")

324

325

except JwtVerificationError as e:

326

print(f"Signature verification failed: {e}")

327

except UnsupportedAlgorithmError as e:

328

print(f"Unsupported algorithm: {e}")

329

330

# Asynchronous JWT verification

331

async def verify_jwt_async_example():

332

try:

333

is_valid = await verify_jwt_async(jwt_token, signing_key)

334

if is_valid:

335

print("✓ Async JWT signature verification successful")

336

else:

337

print("✗ Async JWT signature verification failed")

338

except Exception as e:

339

print(f"Async verification error: {e}")

340

341

asyncio.run(verify_jwt_async_example())

342

```

343

344

### Complete JWT Verification Workflow

345

346

Comprehensive JWT verification combining parsing, validation, and signature verification.

347

348

```python { .api }

349

def verify_and_validate_jwt(jwt: Union[str, bytes],

350

signing_key: str,

351

expected_audience: Optional[Union[str, List[str]]] = None,

352

expected_issuer: Optional[str] = None) -> JwtPayload:

353

"""

354

Complete JWT verification and validation workflow.

355

356

Args:

357

jwt (Union[str, bytes]): JWT token

358

signing_key (str): DID key for signature verification

359

expected_audience (Union[str, List[str]], optional): Expected audience

360

expected_issuer (str, optional): Expected issuer

361

362

Returns:

363

JwtPayload: Verified and validated payload

364

365

Raises:

366

JwtVerificationError: If signature verification fails

367

JwtValidationError: If payload validation fails

368

JwtParsingError: If JWT parsing fails

369

"""

370

371

def verify_at_protocol_session_token(access_token: Union[str, bytes],

372

user_did: str,

373

pds_did: str,

374

signing_key: str) -> JwtPayload:

375

"""

376

Verify AT Protocol session access token.

377

378

Args:

379

access_token (Union[str, bytes]): Access token to verify

380

user_did (str): Expected user DID (subject)

381

pds_did (str): Expected PDS DID (audience)

382

signing_key (str): User's signing key for verification

383

384

Returns:

385

JwtPayload: Verified access token payload

386

387

Raises:

388

JwtVerificationError: If token verification fails

389

"""

390

```

391

392

Usage examples:

393

394

```python

395

from atproto import (

396

verify_and_validate_jwt, verify_at_protocol_session_token,

397

JwtVerificationError, JwtValidationError

398

)

399

400

# Complete JWT verification workflow

401

def authenticate_request(jwt_token, user_signing_key):

402

"""Example request authentication using JWT."""

403

try:

404

# Verify signature and validate claims

405

payload = verify_and_validate_jwt(

406

jwt_token,

407

signing_key=user_signing_key,

408

expected_audience="did:web:bsky.social",

409

expected_issuer="did:plc:alice123"

410

)

411

412

print(f"✓ Authenticated user: {payload.sub}")

413

print(f"✓ Token scope: {payload.scope}")

414

print(f"✓ Valid until: {payload.exp}")

415

416

return {'authenticated': True, 'user_did': payload.sub, 'scope': payload.scope}

417

418

except JwtVerificationError:

419

print("✗ JWT signature verification failed")

420

return {'authenticated': False, 'error': 'invalid_signature'}

421

422

except JwtValidationError as e:

423

print(f"✗ JWT validation failed: {e}")

424

return {'authenticated': False, 'error': 'invalid_claims'}

425

426

except Exception as e:

427

print(f"✗ Authentication error: {e}")

428

return {'authenticated': False, 'error': 'authentication_failed'}

429

430

# AT Protocol session verification

431

def verify_session_token(access_token, user_did, pds_did, signing_key):

432

"""Verify AT Protocol session access token."""

433

try:

434

payload = verify_at_protocol_session_token(

435

access_token=access_token,

436

user_did=user_did,

437

pds_did=pds_did,

438

signing_key=signing_key

439

)

440

441

print(f"✓ Valid session for user: {payload.sub}")

442

print(f"✓ Session scope: {payload.scope}")

443

444

return payload

445

446

except JwtVerificationError as e:

447

print(f"✗ Session token verification failed: {e}")

448

raise

449

450

# Example usage

451

jwt_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ..."

452

user_signing_key = "did:key:z6MkhaXgBZDv..."

453

454

# Authenticate request

455

auth_result = authenticate_request(jwt_token, user_signing_key)

456

if auth_result['authenticated']:

457

print(f"User {auth_result['user_did']} authenticated with scope {auth_result['scope']}")

458

459

# Verify session token

460

try:

461

session_payload = verify_session_token(

462

access_token=jwt_token,

463

user_did="did:plc:alice123",

464

pds_did="did:web:alice.pds.example.com",

465

signing_key=user_signing_key

466

)

467

print("Session verification successful")

468

except Exception as e:

469

print(f"Session verification failed: {e}")

470

```

471

472

### JWT Utilities

473

474

Utility functions for working with JWT tokens in AT Protocol contexts.

475

476

```python { .api }

477

def extract_did_from_jwt(jwt: Union[str, bytes]) -> Optional[str]:

478

"""

479

Extract DID from JWT issuer claim.

480

481

Args:

482

jwt (Union[str, bytes]): JWT token

483

484

Returns:

485

Optional[str]: Issuer DID or None if not found

486

"""

487

488

def get_jwt_algorithm(jwt: Union[str, bytes]) -> str:

489

"""

490

Get algorithm from JWT header.

491

492

Args:

493

jwt (Union[str, bytes]): JWT token

494

495

Returns:

496

str: Algorithm identifier (e.g., 'ES256K', 'EdDSA')

497

498

Raises:

499

JwtParsingError: If header cannot be parsed

500

"""

501

502

def is_access_token(jwt: Union[str, bytes]) -> bool:

503

"""

504

Check if JWT is an AT Protocol access token.

505

506

Args:

507

jwt (Union[str, bytes]): JWT token

508

509

Returns:

510

bool: True if appears to be access token

511

"""

512

513

def is_refresh_token(jwt: Union[str, bytes]) -> bool:

514

"""

515

Check if JWT is an AT Protocol refresh token.

516

517

Args:

518

jwt (Union[str, bytes]): JWT token

519

520

Returns:

521

bool: True if appears to be refresh token

522

"""

523

524

def get_token_expiration_time(jwt: Union[str, bytes]) -> Optional[int]:

525

"""

526

Get token expiration timestamp.

527

528

Args:

529

jwt (Union[str, bytes]): JWT token

530

531

Returns:

532

Optional[int]: Unix timestamp of expiration or None

533

"""

534

```

535

536

Usage examples:

537

538

```python

539

from atproto import (

540

extract_did_from_jwt, get_jwt_algorithm, is_access_token,

541

is_refresh_token, get_token_expiration_time

542

)

543

from datetime import datetime

544

545

# JWT utility functions

546

access_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ..."

547

refresh_token = "eyJhbGciOiJFUzI1NksiLCJ0eXAiOiJKV1QifQ..."

548

549

# Extract information from tokens

550

issuer_did = extract_did_from_jwt(access_token)

551

print(f"Token issued by: {issuer_did}")

552

553

algorithm = get_jwt_algorithm(access_token)

554

print(f"Token algorithm: {algorithm}")

555

556

# Check token types

557

print(f"Is access token: {is_access_token(access_token)}")

558

print(f"Is refresh token: {is_refresh_token(access_token)}")

559

560

# Get expiration time

561

exp_timestamp = get_token_expiration_time(access_token)

562

if exp_timestamp:

563

exp_datetime = datetime.fromtimestamp(exp_timestamp)

564

print(f"Token expires: {exp_datetime}")

565

```

566

567

### Error Handling

568

569

```python { .api }

570

class JwtError(Exception):

571

"""Base exception for JWT operations."""

572

573

class JwtParsingError(JwtError):

574

"""Raised when JWT parsing fails."""

575

576

class JwtValidationError(JwtError):

577

"""Raised when JWT validation fails."""

578

579

class JwtVerificationError(JwtError):

580

"""Raised when JWT signature verification fails."""

581

582

class UnsupportedAlgorithmError(JwtError):

583

"""Raised when JWT uses unsupported algorithm."""

584

585

class ExpiredTokenError(JwtValidationError):

586

"""Raised when JWT is expired."""

587

588

class InvalidAudienceError(JwtValidationError):

589

"""Raised when JWT audience is invalid."""

590

591

class InvalidIssuerError(JwtValidationError):

592

"""Raised when JWT issuer is invalid."""

593

```

594

595

Comprehensive error handling example:

596

597

```python

598

from atproto import (

599

verify_and_validate_jwt,

600

JwtParsingError, JwtValidationError, JwtVerificationError,

601

ExpiredTokenError, InvalidAudienceError, UnsupportedAlgorithmError

602

)

603

604

def robust_jwt_verification(jwt_token, signing_key, expected_audience=None):

605

"""Robust JWT verification with detailed error handling."""

606

try:

607

payload = verify_and_validate_jwt(

608

jwt_token,

609

signing_key,

610

expected_audience=expected_audience

611

)

612

613

return {

614

'success': True,

615

'payload': payload,

616

'user_did': payload.sub,

617

'expires_at': payload.exp

618

}

619

620

except JwtParsingError as e:

621

return {

622

'success': False,

623

'error': 'malformed_token',

624

'message': f'Token parsing failed: {e}'

625

}

626

627

except ExpiredTokenError as e:

628

return {

629

'success': False,

630

'error': 'token_expired',

631

'message': 'Token has expired'

632

}

633

634

except InvalidAudienceError as e:

635

return {

636

'success': False,

637

'error': 'invalid_audience',

638

'message': f'Token audience mismatch: {e}'

639

}

640

641

except JwtVerificationError as e:

642

return {

643

'success': False,

644

'error': 'signature_invalid',

645

'message': 'Token signature verification failed'

646

}

647

648

except UnsupportedAlgorithmError as e:

649

return {

650

'success': False,

651

'error': 'unsupported_algorithm',

652

'message': f'Token uses unsupported algorithm: {e}'

653

}

654

655

except JwtValidationError as e:

656

return {

657

'success': False,

658

'error': 'validation_failed',

659

'message': f'Token validation failed: {e}'

660

}

661

662

except Exception as e:

663

return {

664

'success': False,

665

'error': 'unknown_error',

666

'message': f'Unexpected error: {e}'

667

}

668

669

# Usage with error handling

670

jwt_token = "..."

671

signing_key = "did:key:..."

672

673

result = robust_jwt_verification(jwt_token, signing_key, "did:web:bsky.social")

674

675

if result['success']:

676

print(f"✓ JWT verified for user: {result['user_did']}")

677

else:

678

print(f"✗ JWT verification failed: {result['error']} - {result['message']}")

679

```