0
# HTTP Client Integrations
1
2
Seamless integration with popular HTTP clients including Requests and HTTPX for both synchronous and asynchronous OAuth operations. Provides automatic token management, request signing, and support for all OAuth flows with minimal configuration.
3
4
## Capabilities
5
6
### Requests Integration
7
8
Complete OAuth 1.0 and OAuth 2.0 integration with the popular Requests library.
9
10
```python { .api }
11
class OAuth1Session:
12
"""Requests OAuth 1.0 session."""
13
14
def __init__(self, client_key: str, client_secret: str = None, resource_owner_key: str = None, resource_owner_secret: str = None, callback_uri: str = None, signature_method: str = 'HMAC-SHA1', signature_type: str = 'AUTH_HEADER', rsa_key=None, verifier: str = None, client_class=None, force_include_body: bool = False, **request_kwargs) -> None:
15
"""
16
Initialize OAuth 1.0 session.
17
18
Args:
19
client_key: Client identifier
20
client_secret: Client secret
21
resource_owner_key: Resource owner token
22
resource_owner_secret: Resource owner token secret
23
callback_uri: Callback URI for authorization
24
signature_method: Signature method (HMAC-SHA1, RSA-SHA1, PLAINTEXT)
25
signature_type: Signature type (AUTH_HEADER, QUERY, BODY)
26
rsa_key: RSA private key for RSA-SHA1
27
verifier: OAuth verifier
28
client_class: Custom OAuth1Client class
29
force_include_body: Force include body in signature
30
**request_kwargs: Additional requests session arguments
31
"""
32
33
def fetch_request_token(self, request_token_url: str, realm: str = None) -> dict:
34
"""
35
Fetch request token from authorization server.
36
37
Args:
38
request_token_url: Request token endpoint URL
39
realm: Authorization realm
40
41
Returns:
42
Dictionary with oauth_token and oauth_token_secret
43
"""
44
45
def parse_authorization_response_url(self, authorization_response_url: str) -> dict:
46
"""
47
Parse authorization callback response.
48
49
Args:
50
authorization_response_url: Full callback URL with parameters
51
52
Returns:
53
Dictionary with parsed parameters
54
"""
55
56
def fetch_access_token(self, access_token_url: str, verifier: str = None) -> dict:
57
"""
58
Exchange authorization verifier for access token.
59
60
Args:
61
access_token_url: Access token endpoint URL
62
verifier: OAuth verifier from callback
63
64
Returns:
65
Dictionary with oauth_token and oauth_token_secret
66
"""
67
68
class OAuth1Auth:
69
"""Requests OAuth 1.0 authentication handler."""
70
71
def __init__(self, client_key: str, client_secret: str = None, resource_owner_key: str = None, resource_owner_secret: str = None, callback_uri: str = None, signature_method: str = 'HMAC-SHA1', signature_type: str = 'AUTH_HEADER', rsa_key=None, verifier: str = None, **kwargs) -> None:
72
"""
73
Initialize OAuth 1.0 auth handler.
74
75
Args:
76
client_key: Client identifier
77
client_secret: Client secret
78
resource_owner_key: Resource owner token
79
resource_owner_secret: Resource owner token secret
80
callback_uri: Callback URI
81
signature_method: Signature method
82
signature_type: Signature type
83
rsa_key: RSA private key
84
verifier: OAuth verifier
85
"""
86
87
def __call__(self, request: PreparedRequest) -> PreparedRequest:
88
"""
89
Apply OAuth 1.0 signature to request.
90
91
Args:
92
request: Prepared request object
93
94
Returns:
95
Request with OAuth signature applied
96
"""
97
98
class OAuth2Session(requests.Session):
99
"""Requests OAuth 2.0 session."""
100
101
def __init__(self, client_id: str = None, client: OAuth2Client = None, auto_refresh_url: str = None, auto_refresh_kwargs: dict = None, token_updater: callable = None, **kwargs) -> None:
102
"""
103
Initialize OAuth 2.0 session.
104
105
Args:
106
client_id: Client identifier
107
client: OAuth2Client instance
108
auto_refresh_url: URL for automatic token refresh
109
auto_refresh_kwargs: Arguments for token refresh
110
token_updater: Callback for token updates
111
**kwargs: Additional session arguments
112
"""
113
114
def new_state(self) -> str:
115
"""
116
Generate new state parameter for CSRF protection.
117
118
Returns:
119
Random state string
120
"""
121
122
def authorization_url(self, authorization_endpoint: str, state: str = None, **kwargs) -> tuple:
123
"""
124
Generate authorization URL.
125
126
Args:
127
authorization_endpoint: Authorization server's authorization endpoint
128
state: CSRF protection state (generated if None)
129
**kwargs: Additional authorization parameters
130
131
Returns:
132
Tuple of (authorization_url, state)
133
"""
134
135
def fetch_token(self, token_endpoint: str, code: str = None, authorization_response: str = None, body: str = '', auth: tuple = None, username: str = None, password: str = None, method: str = 'POST', timeout: int = None, headers: dict = None, verify: bool = True, proxies: dict = None, include_client_id: bool = None, **kwargs) -> dict:
136
"""
137
Fetch access token from authorization server.
138
139
Args:
140
token_endpoint: Token endpoint URL
141
code: Authorization code
142
authorization_response: Full authorization response URL
143
body: Additional request body
144
auth: HTTP basic auth tuple
145
username: Username for password grant
146
password: Password for password grant
147
method: HTTP method
148
timeout: Request timeout
149
headers: HTTP headers
150
verify: SSL verification
151
proxies: HTTP proxies
152
include_client_id: Include client ID in request
153
**kwargs: Additional token parameters
154
155
Returns:
156
Token dictionary
157
"""
158
159
def token_from_fragment(self, authorization_response_url: str) -> dict:
160
"""
161
Extract token from URL fragment (implicit flow).
162
163
Args:
164
authorization_response_url: Authorization response URL with fragment
165
166
Returns:
167
Token dictionary
168
"""
169
170
def refresh_token(self, token_url: str, refresh_token: str = None, body: str = '', auth: tuple = None, timeout: int = None, headers: dict = None, verify: bool = True, **kwargs) -> dict:
171
"""
172
Refresh access token.
173
174
Args:
175
token_url: Token endpoint URL
176
refresh_token: Refresh token (uses stored if None)
177
body: Additional request body
178
auth: HTTP basic auth tuple
179
timeout: Request timeout
180
headers: HTTP headers
181
verify: SSL verification
182
**kwargs: Additional parameters
183
184
Returns:
185
New token dictionary
186
"""
187
188
def revoke_token(self, revocation_endpoint: str, token: str = None, token_type_hint: str = None, body: str = '', auth: tuple = None, timeout: int = None, headers: dict = None, verify: bool = True, **kwargs) -> dict:
189
"""
190
Revoke access or refresh token.
191
192
Args:
193
revocation_endpoint: Revocation endpoint URL
194
token: Token to revoke
195
token_type_hint: Hint about token type
196
body: Additional request body
197
auth: HTTP basic auth tuple
198
timeout: Request timeout
199
headers: HTTP headers
200
verify: SSL verification
201
**kwargs: Additional parameters
202
203
Returns:
204
Response dictionary
205
"""
206
207
class OAuth2Auth:
208
"""Requests OAuth 2.0 authentication handler."""
209
210
def __init__(self, client_id: str = None, client: OAuth2Client = None, token: dict = None) -> None:
211
"""
212
Initialize OAuth 2.0 auth handler.
213
214
Args:
215
client_id: Client identifier
216
client: OAuth2Client instance
217
token: Access token dictionary
218
"""
219
220
def __call__(self, request: PreparedRequest) -> PreparedRequest:
221
"""
222
Apply OAuth 2.0 token to request.
223
224
Args:
225
request: Prepared request object
226
227
Returns:
228
Request with OAuth token applied
229
"""
230
231
class AssertionSession:
232
"""JWT assertion session for OAuth 2.0."""
233
234
def __init__(self, grant_type: str, assertion: str, issuer: str, audience: str, subject: str = None, **kwargs) -> None:
235
"""
236
Initialize JWT assertion session.
237
238
Args:
239
grant_type: Assertion grant type
240
assertion: JWT assertion string
241
issuer: Token issuer
242
audience: Token audience
243
subject: Token subject
244
**kwargs: Additional session arguments
245
"""
246
247
def fetch_token(self, token_endpoint: str, **kwargs) -> dict:
248
"""
249
Fetch token using JWT assertion.
250
251
Args:
252
token_endpoint: Token endpoint URL
253
**kwargs: Additional parameters
254
255
Returns:
256
Token dictionary
257
"""
258
```
259
260
### HTTPX Integration
261
262
Complete OAuth 1.0 and OAuth 2.0 integration with HTTPX for both sync and async operations.
263
264
```python { .api }
265
class OAuth1Client(httpx.Client):
266
"""HTTPX OAuth 1.0 client."""
267
268
def __init__(self, client_key: str, client_secret: str = None, token: str = None, token_secret: str = None, redirect_uri: str = None, rsa_key=None, verifier: str = None, signature_method: str = 'HMAC-SHA1', signature_type: str = 'AUTH_HEADER', force_include_body: bool = False, **kwargs) -> None:
269
"""
270
Initialize HTTPX OAuth 1.0 client.
271
272
Args:
273
client_key: Client identifier
274
client_secret: Client secret
275
token: OAuth token
276
token_secret: OAuth token secret
277
redirect_uri: Redirect URI
278
rsa_key: RSA private key
279
verifier: OAuth verifier
280
signature_method: Signature method
281
signature_type: Signature type
282
force_include_body: Force include body in signature
283
**kwargs: Additional HTTPX client arguments
284
"""
285
286
def fetch_request_token(self, url: str, realm: str = None, **kwargs) -> dict:
287
"""
288
Fetch request token.
289
290
Args:
291
url: Request token endpoint URL
292
realm: Authorization realm
293
**kwargs: Additional request arguments
294
295
Returns:
296
Dictionary with oauth_token and oauth_token_secret
297
"""
298
299
def create_authorization_url(self, url: str, **kwargs) -> str:
300
"""
301
Create authorization URL.
302
303
Args:
304
url: Authorization endpoint URL
305
**kwargs: Additional authorization parameters
306
307
Returns:
308
Authorization URL
309
"""
310
311
def fetch_access_token(self, url: str, verifier: str = None, **kwargs) -> dict:
312
"""
313
Fetch access token.
314
315
Args:
316
url: Access token endpoint URL
317
verifier: OAuth verifier
318
**kwargs: Additional request arguments
319
320
Returns:
321
Dictionary with oauth_token and oauth_token_secret
322
"""
323
324
class AsyncOAuth1Client(httpx.AsyncClient):
325
"""HTTPX async OAuth 1.0 client."""
326
327
def __init__(self, client_key: str, client_secret: str = None, token: str = None, token_secret: str = None, redirect_uri: str = None, rsa_key=None, verifier: str = None, signature_method: str = 'HMAC-SHA1', signature_type: str = 'AUTH_HEADER', force_include_body: bool = False, **kwargs) -> None:
328
"""Initialize async OAuth 1.0 client with same parameters as sync version."""
329
330
async def fetch_request_token(self, url: str, realm: str = None, **kwargs) -> dict:
331
"""Async version of fetch_request_token."""
332
333
async def fetch_access_token(self, url: str, verifier: str = None, **kwargs) -> dict:
334
"""Async version of fetch_access_token."""
335
336
class OAuth1Auth:
337
"""HTTPX OAuth 1.0 authentication handler."""
338
339
def __init__(self, client_key: str, client_secret: str = None, token: str = None, token_secret: str = None, signature_method: str = 'HMAC-SHA1', signature_type: str = 'AUTH_HEADER', rsa_key=None, verifier: str = None, **kwargs) -> None:
340
"""
341
Initialize OAuth 1.0 auth handler.
342
343
Args:
344
client_key: Client identifier
345
client_secret: Client secret
346
token: OAuth token
347
token_secret: OAuth token secret
348
signature_method: Signature method
349
signature_type: Signature type
350
rsa_key: RSA private key
351
verifier: OAuth verifier
352
"""
353
354
def auth_flow(self, request: httpx.Request) -> httpx.Request:
355
"""
356
Apply OAuth 1.0 signature to HTTPX request.
357
358
Args:
359
request: HTTPX request object
360
361
Returns:
362
Request with OAuth signature applied
363
"""
364
365
class OAuth2Client(httpx.Client):
366
"""HTTPX OAuth 2.0 client."""
367
368
def __init__(self, client_id: str = None, client_secret: str = None, token_endpoint_auth_method: str = 'client_secret_basic', revocation_endpoint_auth_method: str = None, scope: str = None, redirect_uri: str = None, token: dict = None, token_placement: str = 'header', update_token: callable = None, **kwargs) -> None:
369
"""
370
Initialize HTTPX OAuth 2.0 client.
371
372
Args:
373
client_id: Client identifier
374
client_secret: Client secret
375
token_endpoint_auth_method: Token endpoint auth method
376
revocation_endpoint_auth_method: Revocation endpoint auth method
377
scope: Default scope
378
redirect_uri: Default redirect URI
379
token: Access token dictionary
380
token_placement: Token placement (header, body, uri)
381
update_token: Token update callback
382
**kwargs: Additional HTTPX client arguments
383
"""
384
385
def create_authorization_url(self, authorization_endpoint: str, state: str = None, code_challenge: str = None, code_challenge_method: str = None, **kwargs) -> tuple:
386
"""
387
Create authorization URL.
388
389
Args:
390
authorization_endpoint: Authorization endpoint URL
391
state: CSRF state parameter
392
code_challenge: PKCE code challenge
393
code_challenge_method: PKCE challenge method
394
**kwargs: Additional authorization parameters
395
396
Returns:
397
Tuple of (authorization_url, state)
398
"""
399
400
def fetch_token(self, token_endpoint: str, code: str = None, authorization_response: str = None, body: str = '', auth: tuple = None, username: str = None, password: str = None, **kwargs) -> dict:
401
"""
402
Fetch access token.
403
404
Args:
405
token_endpoint: Token endpoint URL
406
code: Authorization code
407
authorization_response: Authorization response URL
408
body: Additional request body
409
auth: HTTP basic auth tuple
410
username: Username for password grant
411
password: Password for password grant
412
**kwargs: Additional token parameters
413
414
Returns:
415
Token dictionary
416
"""
417
418
def refresh_token(self, token_endpoint: str, refresh_token: str = None, body: str = '', auth: tuple = None, **kwargs) -> dict:
419
"""
420
Refresh access token.
421
422
Args:
423
token_endpoint: Token endpoint URL
424
refresh_token: Refresh token
425
body: Additional request body
426
auth: HTTP basic auth tuple
427
**kwargs: Additional parameters
428
429
Returns:
430
New token dictionary
431
"""
432
433
def revoke_token(self, revocation_endpoint: str, token: str = None, token_type_hint: str = None, body: str = '', auth: tuple = None, **kwargs) -> dict:
434
"""
435
Revoke token.
436
437
Args:
438
revocation_endpoint: Revocation endpoint URL
439
token: Token to revoke
440
token_type_hint: Token type hint
441
body: Additional request body
442
auth: HTTP basic auth tuple
443
**kwargs: Additional parameters
444
445
Returns:
446
Response dictionary
447
"""
448
449
class AsyncOAuth2Client(httpx.AsyncClient):
450
"""HTTPX async OAuth 2.0 client."""
451
452
def __init__(self, client_id: str = None, client_secret: str = None, token_endpoint_auth_method: str = 'client_secret_basic', revocation_endpoint_auth_method: str = None, scope: str = None, redirect_uri: str = None, token: dict = None, token_placement: str = 'header', update_token: callable = None, **kwargs) -> None:
453
"""Initialize async OAuth 2.0 client with same parameters as sync version."""
454
455
async def fetch_token(self, token_endpoint: str, code: str = None, authorization_response: str = None, body: str = '', auth: tuple = None, username: str = None, password: str = None, **kwargs) -> dict:
456
"""Async version of fetch_token."""
457
458
async def refresh_token(self, token_endpoint: str, refresh_token: str = None, body: str = '', auth: tuple = None, **kwargs) -> dict:
459
"""Async version of refresh_token."""
460
461
async def revoke_token(self, revocation_endpoint: str, token: str = None, token_type_hint: str = None, body: str = '', auth: tuple = None, **kwargs) -> dict:
462
"""Async version of revoke_token."""
463
464
class OAuth2Auth:
465
"""HTTPX OAuth 2.0 authentication handler."""
466
467
def __init__(self, client_id: str = None, client_secret: str = None, token: dict = None, token_placement: str = 'header', **kwargs) -> None:
468
"""
469
Initialize OAuth 2.0 auth handler.
470
471
Args:
472
client_id: Client identifier
473
client_secret: Client secret
474
token: Access token dictionary
475
token_placement: Token placement (header, body, uri)
476
"""
477
478
def auth_flow(self, request: httpx.Request) -> httpx.Request:
479
"""
480
Apply OAuth 2.0 token to HTTPX request.
481
482
Args:
483
request: HTTPX request object
484
485
Returns:
486
Request with OAuth token applied
487
"""
488
489
class OAuth2ClientAuth:
490
"""HTTPX OAuth 2.0 client authentication."""
491
492
def __init__(self, client_id: str, client_secret: str = None, method: str = 'client_secret_basic') -> None:
493
"""
494
Initialize client authentication.
495
496
Args:
497
client_id: Client identifier
498
client_secret: Client secret
499
method: Authentication method
500
"""
501
502
def auth_flow(self, request: httpx.Request) -> httpx.Request:
503
"""
504
Apply client authentication to request.
505
506
Args:
507
request: HTTPX request object
508
509
Returns:
510
Request with client authentication applied
511
"""
512
513
class AssertionClient(httpx.Client):
514
"""HTTPX JWT assertion client."""
515
516
def __init__(self, grant_type: str, assertion: str, **kwargs) -> None:
517
"""
518
Initialize JWT assertion client.
519
520
Args:
521
grant_type: Assertion grant type
522
assertion: JWT assertion string
523
**kwargs: Additional client arguments
524
"""
525
526
def fetch_token(self, token_endpoint: str, **kwargs) -> dict:
527
"""
528
Fetch token using JWT assertion.
529
530
Args:
531
token_endpoint: Token endpoint URL
532
**kwargs: Additional parameters
533
534
Returns:
535
Token dictionary
536
"""
537
538
class AsyncAssertionClient(httpx.AsyncClient):
539
"""HTTPX async JWT assertion client."""
540
541
def __init__(self, grant_type: str, assertion: str, **kwargs) -> None:
542
"""Initialize async assertion client with same parameters as sync version."""
543
544
async def fetch_token(self, token_endpoint: str, **kwargs) -> dict:
545
"""Async version of fetch_token."""
546
```
547
548
## Usage Examples
549
550
### Requests OAuth 2.0 Client
551
552
```python
553
from authlib.integrations.requests_client import OAuth2Session
554
import requests
555
556
# Basic OAuth 2.0 flow
557
client = OAuth2Session(
558
client_id='your-client-id',
559
redirect_uri='https://your-app.com/callback'
560
)
561
562
# Step 1: Get authorization URL
563
authorization_url, state = client.authorization_url(
564
'https://provider.com/authorize',
565
scope='read write'
566
)
567
print(f'Please visit: {authorization_url}')
568
569
# Step 2: Exchange authorization code for token
570
authorization_response = input('Paste the full redirect URL here:')
571
token = client.fetch_token(
572
'https://provider.com/token',
573
authorization_response=authorization_response,
574
client_secret='your-client-secret'
575
)
576
577
# Step 3: Make authenticated requests
578
response = client.get('https://api.provider.com/user')
579
user_data = response.json()
580
581
# Automatic token refresh
582
def token_saver(token):
583
# Save token to database or file
584
save_token_to_storage(token)
585
586
client = OAuth2Session(
587
client_id='your-client-id',
588
token=stored_token,
589
auto_refresh_url='https://provider.com/token',
590
auto_refresh_kwargs={'client_id': 'your-client-id', 'client_secret': 'your-client-secret'},
591
token_updater=token_saver
592
)
593
594
# Token will be automatically refreshed if expired
595
response = client.get('https://api.provider.com/user')
596
```
597
598
### Requests OAuth 1.0 Client
599
600
```python
601
from authlib.integrations.requests_client import OAuth1Session
602
603
# OAuth 1.0 flow
604
client = OAuth1Session(
605
client_key='your-client-key',
606
client_secret='your-client-secret'
607
)
608
609
# Step 1: Fetch request token
610
request_token = client.fetch_request_token('https://provider.com/oauth/request_token')
611
612
# Step 2: Get authorization URL
613
authorization_url = f"https://provider.com/oauth/authorize?oauth_token={request_token['oauth_token']}"
614
print(f'Please visit: {authorization_url}')
615
616
# Step 3: Get verifier from user
617
verifier = input('Please enter the verifier:')
618
client.verifier = verifier
619
620
# Step 4: Fetch access token
621
access_token = client.fetch_access_token('https://provider.com/oauth/access_token')
622
623
# Step 5: Make authenticated requests
624
response = client.get('https://api.provider.com/user')
625
user_data = response.json()
626
```
627
628
### HTTPX OAuth 2.0 Client
629
630
```python
631
from authlib.integrations.httpx_client import OAuth2Client
632
import httpx
633
634
# Synchronous OAuth 2.0 client
635
with OAuth2Client(
636
client_id='your-client-id',
637
client_secret='your-client-secret'
638
) as client:
639
640
# Authorization code flow
641
authorization_url, state = client.create_authorization_url(
642
'https://provider.com/authorize',
643
scope='read write'
644
)
645
646
# After user authorization
647
token = client.fetch_token(
648
'https://provider.com/token',
649
code='authorization-code-from-callback'
650
)
651
652
# Make authenticated requests
653
response = client.get('https://api.provider.com/user')
654
user_data = response.json()
655
656
# Asynchronous OAuth 2.0 client
657
import asyncio
658
659
async def oauth_flow():
660
async with AsyncOAuth2Client(
661
client_id='your-client-id',
662
client_secret='your-client-secret'
663
) as client:
664
665
# Fetch token using client credentials
666
token = await client.fetch_token(
667
'https://provider.com/token',
668
grant_type='client_credentials',
669
scope='api:read'
670
)
671
672
# Make authenticated requests
673
response = await client.get('https://api.provider.com/data')
674
data = response.json()
675
return data
676
677
# Run async function
678
data = asyncio.run(oauth_flow())
679
```
680
681
### HTTPX OAuth 1.0 Client
682
683
```python
684
from authlib.integrations.httpx_client import OAuth1Client, AsyncOAuth1Client
685
686
# Synchronous OAuth 1.0
687
with OAuth1Client(
688
client_key='your-client-key',
689
client_secret='your-client-secret'
690
) as client:
691
692
# Complete OAuth 1.0 flow
693
request_token = client.fetch_request_token('https://provider.com/oauth/request_token')
694
695
auth_url = client.create_authorization_url(
696
'https://provider.com/oauth/authorize'
697
)
698
699
# After user authorization
700
client.verifier = 'user-provided-verifier'
701
access_token = client.fetch_access_token('https://provider.com/oauth/access_token')
702
703
# Make authenticated requests
704
response = client.get('https://api.provider.com/user')
705
706
# Asynchronous OAuth 1.0
707
async def async_oauth1_flow():
708
async with AsyncOAuth1Client(
709
client_key='your-client-key',
710
client_secret='your-client-secret'
711
) as client:
712
713
request_token = await client.fetch_request_token(
714
'https://provider.com/oauth/request_token'
715
)
716
717
# Handle authorization...
718
719
access_token = await client.fetch_access_token(
720
'https://provider.com/oauth/access_token',
721
verifier='user-verifier'
722
)
723
724
response = await client.get('https://api.provider.com/user')
725
return response.json()
726
727
user_data = asyncio.run(async_oauth1_flow())
728
```
729
730
### Custom Authentication Handlers
731
732
```python
733
import requests
734
from authlib.integrations.requests_client import OAuth2Auth
735
736
# Using OAuth2Auth with regular requests
737
auth = OAuth2Auth(token={'access_token': 'your-access-token', 'token_type': 'Bearer'})
738
739
response = requests.get('https://api.provider.com/user', auth=auth)
740
741
# Custom token placement
742
auth = OAuth2Auth(
743
token={'access_token': 'your-access-token', 'token_type': 'Bearer'},
744
token_placement='uri' # Add token as query parameter
745
)
746
747
response = requests.get('https://api.provider.com/user', auth=auth)
748
```
749
750
### JWT Assertion Flow
751
752
```python
753
from authlib.integrations.requests_client import AssertionSession
754
from authlib.jose import JsonWebToken
755
import time
756
757
# Create JWT assertion
758
jwt = JsonWebToken(['RS256'])
759
header = {'alg': 'RS256'}
760
payload = {
761
'iss': 'your-client-id',
762
'sub': 'your-client-id',
763
'aud': 'https://provider.com/token',
764
'iat': int(time.time()),
765
'exp': int(time.time()) + 3600
766
}
767
assertion = jwt.encode(header, payload, private_key)
768
769
# Use assertion to get token
770
session = AssertionSession(
771
grant_type='urn:ietf:params:oauth:grant-type:jwt-bearer',
772
assertion=assertion,
773
issuer='your-client-id',
774
audience='https://provider.com/token'
775
)
776
777
token = session.fetch_token('https://provider.com/token')
778
779
# Make authenticated requests
780
response = session.get('https://api.provider.com/user')
781
```
782
783
### Error Handling
784
785
```python
786
from authlib.integrations.requests_client import OAuth2Session
787
from authlib.common.errors import AuthlibHTTPError
788
from requests.exceptions import HTTPError
789
790
client = OAuth2Session(client_id='your-client-id')
791
792
try:
793
token = client.fetch_token(
794
'https://provider.com/token',
795
code='invalid-code',
796
client_secret='your-client-secret'
797
)
798
except AuthlibHTTPError as error:
799
print(f'OAuth error: {error.error}')
800
print(f'Description: {error.description}')
801
print(f'Status code: {error.status_code}')
802
except HTTPError as error:
803
print(f'HTTP error: {error.response.status_code}')
804
```
805
806
### Token Management
807
808
```python
809
import json
810
from authlib.integrations.requests_client import OAuth2Session
811
812
def load_token():
813
try:
814
with open('token.json', 'r') as f:
815
return json.load(f)
816
except FileNotFoundError:
817
return None
818
819
def save_token(token):
820
with open('token.json', 'w') as f:
821
json.dump(token, f)
822
823
# Load existing token
824
token = load_token()
825
826
client = OAuth2Session(
827
client_id='your-client-id',
828
token=token,
829
auto_refresh_url='https://provider.com/token',
830
auto_refresh_kwargs={'client_secret': 'your-client-secret'},
831
token_updater=save_token
832
)
833
834
# Token will be automatically saved when refreshed
835
response = client.get('https://api.provider.com/user')
836
```