LINE Messaging API SDK for Python with comprehensive bot development capabilities including messaging, webhooks, and platform integrations.
—
Channel access token management, OAuth flows, and authentication handling. The authentication module provides secure token lifecycle management for bot operations and integrations with LINE's OAuth system.
Core token operations including issuance, verification, and revocation for secure API access.
class ChannelAccessToken:
def issue_channel_token(
self,
grant_type: str = "client_credentials",
client_assertion_type: Optional[str] = None,
client_assertion: Optional[str] = None
) -> IssueChannelAccessTokenResponse:
"""
Issue a new channel access token using client credentials.
Args:
grant_type: OAuth grant type (default: "client_credentials")
client_assertion_type: JWT assertion type for advanced authentication
client_assertion: JWT assertion for advanced authentication
Returns:
IssueChannelAccessTokenResponse: New access token and metadata
"""
def revoke_channel_token(self, access_token: str) -> dict:
"""
Revoke an existing channel access token.
Args:
access_token: Token to revoke
Returns:
dict: Empty response on success
"""
def verify_channel_token(self, access_token: str) -> VerifyChannelAccessTokenResponse:
"""
Verify the validity and get metadata of a channel access token.
Args:
access_token: Token to verify
Returns:
VerifyChannelAccessTokenResponse: Token validity status and metadata
"""Manage short-lived tokens for enhanced security in specific use cases.
class ChannelAccessToken:
def issue_short_lived_channel_token(
self,
client_assertion: str,
client_assertion_type: str = "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
grant_type: str = "client_credentials"
) -> IssueShortLivedChannelAccessTokenResponse:
"""
Issue a short-lived channel access token with limited validity period.
Args:
client_assertion: JWT assertion for authentication
client_assertion_type: JWT assertion type specification
grant_type: OAuth grant type
Returns:
IssueShortLivedChannelAccessTokenResponse: Short-lived token and expiration info
"""Handle stateless authentication tokens for specific integration scenarios.
class ChannelAccessToken:
def issue_stateless_channel_token(
self,
client_assertion: str,
client_assertion_type: str = "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
grant_type: str = "client_credentials"
) -> IssueStatelessChannelAccessTokenResponse:
"""
Issue a stateless channel access token for specific use cases.
Args:
client_assertion: JWT assertion for authentication
client_assertion_type: JWT assertion type specification
grant_type: OAuth grant type
Returns:
IssueStatelessChannelAccessTokenResponse: Stateless token information
"""Manage and validate token signing keys for enhanced security.
class ChannelAccessToken:
def get_valid_channel_access_token_key_ids(self, client_assertion: str) -> ChannelAccessTokenKeyIdsResponse:
"""
Get list of valid key IDs for channel access token verification.
Args:
client_assertion: JWT assertion for authentication
Returns:
ChannelAccessTokenKeyIdsResponse: List of valid key identifiers
"""class IssueChannelAccessTokenResponse:
access_token: str
expires_in: int
token_type: str
key_id: Optional[str] = None
class IssueShortLivedChannelAccessTokenResponse:
access_token: str
expires_in: int
token_type: str
class IssueStatelessChannelAccessTokenResponse:
access_token: str
expires_in: int
token_type: str
class VerifyChannelAccessTokenResponse:
client_id: str
expires_in: int
scope: str
class ChannelAccessTokenKeyIdsResponse:
key_ids: List[str]class Configuration:
def __init__(
self,
access_token: Optional[str] = None,
host: str = "https://api.line.me",
username: Optional[str] = None,
password: Optional[str] = None,
discard_unknown_keys: bool = False,
disabled_client_side_validations: str = "",
server_index: Optional[int] = None,
server_variables: Optional[dict] = None,
server_operation_index: Optional[dict] = None,
server_operation_variables: Optional[dict] = None
):
"""
API client configuration including authentication credentials.
Args:
access_token: Channel access token for authentication
host: API endpoint base URL
username: Optional username for basic authentication
password: Optional password for basic authentication
discard_unknown_keys: Whether to ignore unknown response fields
"""from linebot.v3.oauth import ChannelAccessToken, Configuration
# Initialize OAuth client
oauth_config = Configuration(
# No access token needed for OAuth operations
)
oauth_client = ChannelAccessToken(oauth_config)
# Issue a new channel access token
token_response = oauth_client.issue_channel_token()
access_token = token_response.access_token
expires_in = token_response.expires_in
print(f"New access token: {access_token}")
print(f"Expires in: {expires_in} seconds")
# Use the token in messaging API
from linebot.v3.messaging import Configuration as MessagingConfig, MessagingApi
messaging_config = MessagingConfig(access_token=access_token)
messaging_api = MessagingApi(messaging_config)# Verify token validity
try:
verification = oauth_client.verify_channel_token(access_token)
print(f"Token is valid for client: {verification.client_id}")
print(f"Expires in: {verification.expires_in} seconds")
print(f"Scope: {verification.scope}")
except Exception as e:
print(f"Token verification failed: {e}")import jwt
import time
from datetime import datetime, timedelta
def create_jwt_assertion(channel_id: str, private_key: str, key_id: str) -> str:
"""
Create JWT assertion for advanced authentication.
Args:
channel_id: LINE channel ID
private_key: Private key for JWT signing
key_id: Key identifier
Returns:
JWT assertion string
"""
now = int(time.time())
payload = {
'iss': channel_id,
'sub': channel_id,
'aud': 'https://api.line.me/',
'exp': now + 1800, # 30 minutes
'iat': now,
'token_exp': 2592000 # 30 days in seconds
}
return jwt.encode(payload, private_key, algorithm='RS256', headers={'kid': key_id})
# Use JWT assertion for token issuance
private_key = """-----BEGIN PRIVATE KEY-----
YOUR_PRIVATE_KEY_HERE
-----END PRIVATE KEY-----"""
jwt_assertion = create_jwt_assertion(
channel_id="YOUR_CHANNEL_ID",
private_key=private_key,
key_id="YOUR_KEY_ID"
)
# Issue short-lived token with JWT
short_lived_response = oauth_client.issue_short_lived_channel_token(
client_assertion=jwt_assertion
)
print(f"Short-lived token: {short_lived_response.access_token}")
print(f"Expires in: {short_lived_response.expires_in} seconds")class TokenManager:
def __init__(self, oauth_client: ChannelAccessToken):
self.oauth_client = oauth_client
self.current_token = None
self.token_expires_at = None
def get_valid_token(self) -> str:
"""Get a valid access token, refreshing if necessary."""
if self.needs_refresh():
self.refresh_token()
return self.current_token
def needs_refresh(self) -> bool:
"""Check if token needs to be refreshed."""
if not self.current_token or not self.token_expires_at:
return True
# Refresh if token expires within 5 minutes
return datetime.now() + timedelta(minutes=5) >= self.token_expires_at
def refresh_token(self):
"""Issue a new access token."""
try:
response = self.oauth_client.issue_channel_token()
self.current_token = response.access_token
self.token_expires_at = datetime.now() + timedelta(seconds=response.expires_in)
print(f"Token refreshed, expires at: {self.token_expires_at}")
except Exception as e:
print(f"Token refresh failed: {e}")
raise
def revoke_current_token(self):
"""Revoke the current access token."""
if self.current_token:
try:
self.oauth_client.revoke_channel_token(self.current_token)
print("Token revoked successfully")
self.current_token = None
self.token_expires_at = None
except Exception as e:
print(f"Token revocation failed: {e}")
# Usage
token_manager = TokenManager(oauth_client)
valid_token = token_manager.get_valid_token()import os
from typing import Optional
class SecureConfig:
def __init__(self):
self.channel_secret = os.getenv('LINE_CHANNEL_SECRET')
self.channel_id = os.getenv('LINE_CHANNEL_ID')
self.private_key = os.getenv('LINE_PRIVATE_KEY')
self.key_id = os.getenv('LINE_KEY_ID')
if not all([self.channel_secret, self.channel_id]):
raise ValueError("Missing required LINE channel credentials")
def create_oauth_client(self) -> ChannelAccessToken:
"""Create OAuth client with secure configuration."""
config = Configuration()
return ChannelAccessToken(config)
def create_messaging_client(self, access_token: str) -> MessagingApi:
"""Create messaging client with access token."""
config = MessagingConfig(access_token=access_token)
return MessagingApi(config)
def get_jwt_assertion(self) -> Optional[str]:
"""Create JWT assertion if private key is available."""
if not self.private_key or not self.key_id:
return None
return create_jwt_assertion(
channel_id=self.channel_id,
private_key=self.private_key,
key_id=self.key_id
)
# Environment-based configuration
secure_config = SecureConfig()
oauth_client = secure_config.create_oauth_client()
# Use JWT if available, otherwise basic token
jwt_assertion = secure_config.get_jwt_assertion()
if jwt_assertion:
token_response = oauth_client.issue_short_lived_channel_token(jwt_assertion)
else:
token_response = oauth_client.issue_channel_token()
messaging_api = secure_config.create_messaging_client(token_response.access_token)import time
from random import uniform
class AuthenticationError(Exception):
pass
class TokenManager:
def __init__(self, oauth_client: ChannelAccessToken, max_retries: int = 3):
self.oauth_client = oauth_client
self.max_retries = max_retries
def issue_token_with_retry(self) -> IssueChannelAccessTokenResponse:
"""Issue token with exponential backoff retry logic."""
for attempt in range(self.max_retries):
try:
return self.oauth_client.issue_channel_token()
except Exception as e:
if attempt == self.max_retries - 1:
raise AuthenticationError(f"Failed to issue token after {self.max_retries} attempts: {e}")
# Exponential backoff with jitter
delay = (2 ** attempt) + uniform(0, 1)
print(f"Token issuance failed (attempt {attempt + 1}), retrying in {delay:.2f}s...")
time.sleep(delay)
def verify_token_with_fallback(self, access_token: str) -> bool:
"""Verify token with fallback behavior."""
try:
verification = self.oauth_client.verify_channel_token(access_token)
return verification.expires_in > 300 # Token valid for more than 5 minutes
except Exception as e:
print(f"Token verification failed: {e}")
return Falsefrom flask import Flask, request, session
from functools import wraps
app = Flask(__name__)
app.secret_key = 'your-secret-key'
def require_valid_token(f):
"""Decorator to ensure valid LINE access token."""
@wraps(f)
def decorated_function(*args, **kwargs):
token = session.get('line_access_token')
expires_at = session.get('token_expires_at')
if not token or not expires_at or datetime.now() >= expires_at:
# Refresh token
oauth_client = ChannelAccessToken(Configuration())
response = oauth_client.issue_channel_token()
session['line_access_token'] = response.access_token
session['token_expires_at'] = datetime.now() + timedelta(seconds=response.expires_in)
return f(*args, **kwargs)
return decorated_function
@app.route('/send-message')
@require_valid_token
def send_message():
"""Send message using automatically managed token."""
access_token = session['line_access_token']
config = MessagingConfig(access_token=access_token)
api = MessagingApi(config)
# Use messaging API...
return "Message sent successfully"Install with Tessl CLI
npx tessl i tessl/pypi-line-bot-sdk