0
# Authentication and OAuth
1
2
Channel access token management, OAuth flows, and authentication handling. The authentication module provides secure token lifecycle management for bot operations and integrations with LINE's OAuth system.
3
4
## Capabilities
5
6
### Channel Access Token Management
7
8
Core token operations including issuance, verification, and revocation for secure API access.
9
10
```python { .api }
11
class ChannelAccessToken:
12
def issue_channel_token(
13
self,
14
grant_type: str = "client_credentials",
15
client_assertion_type: Optional[str] = None,
16
client_assertion: Optional[str] = None
17
) -> IssueChannelAccessTokenResponse:
18
"""
19
Issue a new channel access token using client credentials.
20
21
Args:
22
grant_type: OAuth grant type (default: "client_credentials")
23
client_assertion_type: JWT assertion type for advanced authentication
24
client_assertion: JWT assertion for advanced authentication
25
26
Returns:
27
IssueChannelAccessTokenResponse: New access token and metadata
28
"""
29
30
def revoke_channel_token(self, access_token: str) -> dict:
31
"""
32
Revoke an existing channel access token.
33
34
Args:
35
access_token: Token to revoke
36
37
Returns:
38
dict: Empty response on success
39
"""
40
41
def verify_channel_token(self, access_token: str) -> VerifyChannelAccessTokenResponse:
42
"""
43
Verify the validity and get metadata of a channel access token.
44
45
Args:
46
access_token: Token to verify
47
48
Returns:
49
VerifyChannelAccessTokenResponse: Token validity status and metadata
50
"""
51
```
52
53
### Short-Lived Token Management
54
55
Manage short-lived tokens for enhanced security in specific use cases.
56
57
```python { .api }
58
class ChannelAccessToken:
59
def issue_short_lived_channel_token(
60
self,
61
client_assertion: str,
62
client_assertion_type: str = "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
63
grant_type: str = "client_credentials"
64
) -> IssueShortLivedChannelAccessTokenResponse:
65
"""
66
Issue a short-lived channel access token with limited validity period.
67
68
Args:
69
client_assertion: JWT assertion for authentication
70
client_assertion_type: JWT assertion type specification
71
grant_type: OAuth grant type
72
73
Returns:
74
IssueShortLivedChannelAccessTokenResponse: Short-lived token and expiration info
75
"""
76
```
77
78
### Stateless Token Operations
79
80
Handle stateless authentication tokens for specific integration scenarios.
81
82
```python { .api }
83
class ChannelAccessToken:
84
def issue_stateless_channel_token(
85
self,
86
client_assertion: str,
87
client_assertion_type: str = "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
88
grant_type: str = "client_credentials"
89
) -> IssueStatelessChannelAccessTokenResponse:
90
"""
91
Issue a stateless channel access token for specific use cases.
92
93
Args:
94
client_assertion: JWT assertion for authentication
95
client_assertion_type: JWT assertion type specification
96
grant_type: OAuth grant type
97
98
Returns:
99
IssueStatelessChannelAccessTokenResponse: Stateless token information
100
"""
101
```
102
103
### Token Key Management
104
105
Manage and validate token signing keys for enhanced security.
106
107
```python { .api }
108
class ChannelAccessToken:
109
def get_valid_channel_access_token_key_ids(self, client_assertion: str) -> ChannelAccessTokenKeyIdsResponse:
110
"""
111
Get list of valid key IDs for channel access token verification.
112
113
Args:
114
client_assertion: JWT assertion for authentication
115
116
Returns:
117
ChannelAccessTokenKeyIdsResponse: List of valid key identifiers
118
"""
119
```
120
121
## Authentication Models
122
123
### Token Response Models
124
125
```python { .api }
126
class IssueChannelAccessTokenResponse:
127
access_token: str
128
expires_in: int
129
token_type: str
130
key_id: Optional[str] = None
131
132
class IssueShortLivedChannelAccessTokenResponse:
133
access_token: str
134
expires_in: int
135
token_type: str
136
137
class IssueStatelessChannelAccessTokenResponse:
138
access_token: str
139
expires_in: int
140
token_type: str
141
142
class VerifyChannelAccessTokenResponse:
143
client_id: str
144
expires_in: int
145
scope: str
146
147
class ChannelAccessTokenKeyIdsResponse:
148
key_ids: List[str]
149
```
150
151
### Configuration Models
152
153
```python { .api }
154
class Configuration:
155
def __init__(
156
self,
157
access_token: Optional[str] = None,
158
host: str = "https://api.line.me",
159
username: Optional[str] = None,
160
password: Optional[str] = None,
161
discard_unknown_keys: bool = False,
162
disabled_client_side_validations: str = "",
163
server_index: Optional[int] = None,
164
server_variables: Optional[dict] = None,
165
server_operation_index: Optional[dict] = None,
166
server_operation_variables: Optional[dict] = None
167
):
168
"""
169
API client configuration including authentication credentials.
170
171
Args:
172
access_token: Channel access token for authentication
173
host: API endpoint base URL
174
username: Optional username for basic authentication
175
password: Optional password for basic authentication
176
discard_unknown_keys: Whether to ignore unknown response fields
177
"""
178
```
179
180
## Usage Examples
181
182
### Basic Token Management
183
184
```python
185
from linebot.v3.oauth import ChannelAccessToken, Configuration
186
187
# Initialize OAuth client
188
oauth_config = Configuration(
189
# No access token needed for OAuth operations
190
)
191
oauth_client = ChannelAccessToken(oauth_config)
192
193
# Issue a new channel access token
194
token_response = oauth_client.issue_channel_token()
195
access_token = token_response.access_token
196
expires_in = token_response.expires_in
197
198
print(f"New access token: {access_token}")
199
print(f"Expires in: {expires_in} seconds")
200
201
# Use the token in messaging API
202
from linebot.v3.messaging import Configuration as MessagingConfig, MessagingApi
203
204
messaging_config = MessagingConfig(access_token=access_token)
205
messaging_api = MessagingApi(messaging_config)
206
```
207
208
### Token Verification
209
210
```python
211
# Verify token validity
212
try:
213
verification = oauth_client.verify_channel_token(access_token)
214
print(f"Token is valid for client: {verification.client_id}")
215
print(f"Expires in: {verification.expires_in} seconds")
216
print(f"Scope: {verification.scope}")
217
except Exception as e:
218
print(f"Token verification failed: {e}")
219
```
220
221
### Advanced Authentication with JWT
222
223
```python
224
import jwt
225
import time
226
from datetime import datetime, timedelta
227
228
def create_jwt_assertion(channel_id: str, private_key: str, key_id: str) -> str:
229
"""
230
Create JWT assertion for advanced authentication.
231
232
Args:
233
channel_id: LINE channel ID
234
private_key: Private key for JWT signing
235
key_id: Key identifier
236
237
Returns:
238
JWT assertion string
239
"""
240
now = int(time.time())
241
payload = {
242
'iss': channel_id,
243
'sub': channel_id,
244
'aud': 'https://api.line.me/',
245
'exp': now + 1800, # 30 minutes
246
'iat': now,
247
'token_exp': 2592000 # 30 days in seconds
248
}
249
250
return jwt.encode(payload, private_key, algorithm='RS256', headers={'kid': key_id})
251
252
# Use JWT assertion for token issuance
253
private_key = """-----BEGIN PRIVATE KEY-----
254
YOUR_PRIVATE_KEY_HERE
255
-----END PRIVATE KEY-----"""
256
257
jwt_assertion = create_jwt_assertion(
258
channel_id="YOUR_CHANNEL_ID",
259
private_key=private_key,
260
key_id="YOUR_KEY_ID"
261
)
262
263
# Issue short-lived token with JWT
264
short_lived_response = oauth_client.issue_short_lived_channel_token(
265
client_assertion=jwt_assertion
266
)
267
268
print(f"Short-lived token: {short_lived_response.access_token}")
269
print(f"Expires in: {short_lived_response.expires_in} seconds")
270
```
271
272
### Token Lifecycle Management
273
274
```python
275
class TokenManager:
276
def __init__(self, oauth_client: ChannelAccessToken):
277
self.oauth_client = oauth_client
278
self.current_token = None
279
self.token_expires_at = None
280
281
def get_valid_token(self) -> str:
282
"""Get a valid access token, refreshing if necessary."""
283
if self.needs_refresh():
284
self.refresh_token()
285
return self.current_token
286
287
def needs_refresh(self) -> bool:
288
"""Check if token needs to be refreshed."""
289
if not self.current_token or not self.token_expires_at:
290
return True
291
292
# Refresh if token expires within 5 minutes
293
return datetime.now() + timedelta(minutes=5) >= self.token_expires_at
294
295
def refresh_token(self):
296
"""Issue a new access token."""
297
try:
298
response = self.oauth_client.issue_channel_token()
299
self.current_token = response.access_token
300
self.token_expires_at = datetime.now() + timedelta(seconds=response.expires_in)
301
print(f"Token refreshed, expires at: {self.token_expires_at}")
302
except Exception as e:
303
print(f"Token refresh failed: {e}")
304
raise
305
306
def revoke_current_token(self):
307
"""Revoke the current access token."""
308
if self.current_token:
309
try:
310
self.oauth_client.revoke_channel_token(self.current_token)
311
print("Token revoked successfully")
312
self.current_token = None
313
self.token_expires_at = None
314
except Exception as e:
315
print(f"Token revocation failed: {e}")
316
317
# Usage
318
token_manager = TokenManager(oauth_client)
319
valid_token = token_manager.get_valid_token()
320
```
321
322
### Secure Configuration Management
323
324
```python
325
import os
326
from typing import Optional
327
328
class SecureConfig:
329
def __init__(self):
330
self.channel_secret = os.getenv('LINE_CHANNEL_SECRET')
331
self.channel_id = os.getenv('LINE_CHANNEL_ID')
332
self.private_key = os.getenv('LINE_PRIVATE_KEY')
333
self.key_id = os.getenv('LINE_KEY_ID')
334
335
if not all([self.channel_secret, self.channel_id]):
336
raise ValueError("Missing required LINE channel credentials")
337
338
def create_oauth_client(self) -> ChannelAccessToken:
339
"""Create OAuth client with secure configuration."""
340
config = Configuration()
341
return ChannelAccessToken(config)
342
343
def create_messaging_client(self, access_token: str) -> MessagingApi:
344
"""Create messaging client with access token."""
345
config = MessagingConfig(access_token=access_token)
346
return MessagingApi(config)
347
348
def get_jwt_assertion(self) -> Optional[str]:
349
"""Create JWT assertion if private key is available."""
350
if not self.private_key or not self.key_id:
351
return None
352
353
return create_jwt_assertion(
354
channel_id=self.channel_id,
355
private_key=self.private_key,
356
key_id=self.key_id
357
)
358
359
# Environment-based configuration
360
secure_config = SecureConfig()
361
oauth_client = secure_config.create_oauth_client()
362
363
# Use JWT if available, otherwise basic token
364
jwt_assertion = secure_config.get_jwt_assertion()
365
if jwt_assertion:
366
token_response = oauth_client.issue_short_lived_channel_token(jwt_assertion)
367
else:
368
token_response = oauth_client.issue_channel_token()
369
370
messaging_api = secure_config.create_messaging_client(token_response.access_token)
371
```
372
373
### Error Handling and Retry Logic
374
375
```python
376
import time
377
from random import uniform
378
379
class AuthenticationError(Exception):
380
pass
381
382
class TokenManager:
383
def __init__(self, oauth_client: ChannelAccessToken, max_retries: int = 3):
384
self.oauth_client = oauth_client
385
self.max_retries = max_retries
386
387
def issue_token_with_retry(self) -> IssueChannelAccessTokenResponse:
388
"""Issue token with exponential backoff retry logic."""
389
for attempt in range(self.max_retries):
390
try:
391
return self.oauth_client.issue_channel_token()
392
except Exception as e:
393
if attempt == self.max_retries - 1:
394
raise AuthenticationError(f"Failed to issue token after {self.max_retries} attempts: {e}")
395
396
# Exponential backoff with jitter
397
delay = (2 ** attempt) + uniform(0, 1)
398
print(f"Token issuance failed (attempt {attempt + 1}), retrying in {delay:.2f}s...")
399
time.sleep(delay)
400
401
def verify_token_with_fallback(self, access_token: str) -> bool:
402
"""Verify token with fallback behavior."""
403
try:
404
verification = self.oauth_client.verify_channel_token(access_token)
405
return verification.expires_in > 300 # Token valid for more than 5 minutes
406
except Exception as e:
407
print(f"Token verification failed: {e}")
408
return False
409
```
410
411
### Integration with Web Framework
412
413
```python
414
from flask import Flask, request, session
415
from functools import wraps
416
417
app = Flask(__name__)
418
app.secret_key = 'your-secret-key'
419
420
def require_valid_token(f):
421
"""Decorator to ensure valid LINE access token."""
422
@wraps(f)
423
def decorated_function(*args, **kwargs):
424
token = session.get('line_access_token')
425
expires_at = session.get('token_expires_at')
426
427
if not token or not expires_at or datetime.now() >= expires_at:
428
# Refresh token
429
oauth_client = ChannelAccessToken(Configuration())
430
response = oauth_client.issue_channel_token()
431
432
session['line_access_token'] = response.access_token
433
session['token_expires_at'] = datetime.now() + timedelta(seconds=response.expires_in)
434
435
return f(*args, **kwargs)
436
return decorated_function
437
438
@app.route('/send-message')
439
@require_valid_token
440
def send_message():
441
"""Send message using automatically managed token."""
442
access_token = session['line_access_token']
443
config = MessagingConfig(access_token=access_token)
444
api = MessagingApi(config)
445
446
# Use messaging API...
447
return "Message sent successfully"
448
```