Microsoft Authentication Library for Python enabling OAuth2/OIDC authentication with Microsoft identity platform
—
MSAL Python provides advanced security features including Proof-of-Possession (PoP) tokens for enhanced security, certificate-based authentication, custom authentication schemes, comprehensive error handling, and various utility functions for certificate management and JWT token processing.
PoP tokens provide enhanced security by cryptographically binding access tokens to specific HTTP requests, preventing token replay attacks and improving overall security posture.
class PopAuthScheme:
# HTTP method constants
HTTP_GET = "GET"
HTTP_POST = "POST"
HTTP_PUT = "PUT"
HTTP_DELETE = "DELETE"
HTTP_PATCH = "PATCH"
def __init__(
self,
http_method: str = None,
url: str = None,
nonce: str = None
):
"""
Create Proof-of-Possession authentication scheme.
Parameters:
- http_method: HTTP method (GET, POST, PUT, DELETE, PATCH)
- url: Full URL to be signed
- nonce: Nonce from resource server challenge
All parameters are required for PoP token creation.
"""Usage example:
import msal
# Create PoP auth scheme for specific HTTP request
pop_scheme = msal.PopAuthScheme(
http_method=msal.PopAuthScheme.HTTP_GET,
url="https://graph.microsoft.com/v1.0/me",
nonce="nonce-from-resource-server"
)
# Note: PoP tokens are currently only available through broker
app = msal.PublicClientApplication(
client_id="your-client-id",
authority="https://login.microsoftonline.com/common",
enable_broker_on_windows=True # PoP requires broker
)
# Acquire PoP token (broker-enabled apps only)
result = app.acquire_token_interactive(
scopes=["User.Read"],
auth_scheme=pop_scheme # Include PoP scheme
)
if "access_token" in result:
print("PoP token acquired successfully!")
pop_token = result["access_token"]
# Use PoP token in HTTP request
import requests
headers = {
"Authorization": f"PoP {pop_token}",
"Content-Type": "application/json"
}
response = requests.get("https://graph.microsoft.com/v1.0/me", headers=headers)
else:
print(f"PoP token acquisition failed: {result.get('error_description')}")Utilities for handling X.509 certificates in various formats for certificate-based authentication.
def extract_certs(public_cert_content: str) -> list:
"""
Parse public certificate content and extract certificate strings.
Parameters:
- public_cert_content: Raw certificate content (PEM format or base64)
Returns:
List of certificate strings suitable for x5c JWT header
Raises:
ValueError: If private key content is detected instead of public certificate
"""Usage example:
import msal
# Load certificate from file
with open("certificate.pem", "r") as f:
cert_content = f.read()
# Extract certificates for JWT header
try:
certificates = msal.extract_certs(cert_content)
print(f"Extracted {len(certificates)} certificates")
# Use in confidential client application
client_credential = {
"private_key": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----",
"thumbprint": "A1B2C3D4E5F6...",
"public_certificate": cert_content # For Subject Name/Issuer auth
}
app = msal.ConfidentialClientApplication(
client_id="your-client-id",
client_credential=client_credential
)
except ValueError as e:
print(f"Certificate parsing error: {e}")import msal
# Certificate with private key and thumbprint
client_credential = {
"private_key": """-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC...
-----END PRIVATE KEY-----""",
"thumbprint": "A1B2C3D4E5F6789012345678901234567890ABCD"
}
app = msal.ConfidentialClientApplication(
client_id="your-client-id",
client_credential=client_credential,
authority="https://login.microsoftonline.com/your-tenant-id"
)
result = app.acquire_token_for_client(
scopes=["https://graph.microsoft.com/.default"]
)import msal
# Encrypted private key with passphrase
client_credential = {
"private_key": """-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIFHDBOBgkqhkiG9w0BBQ0wQTApBgkqhkiG9w0BBQwwHAQI...
-----END ENCRYPTED PRIVATE KEY-----""",
"thumbprint": "A1B2C3D4E5F6789012345678901234567890ABCD",
"passphrase": "your-certificate-passphrase"
}
app = msal.ConfidentialClientApplication(
client_id="your-client-id",
client_credential=client_credential
)import msal
# SNI authentication for certificate auto-rotation
client_credential = {
"private_key": """-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC...
-----END PRIVATE KEY-----""",
"thumbprint": "A1B2C3D4E5F6789012345678901234567890ABCD",
"public_certificate": """-----BEGIN CERTIFICATE-----
MIIDXTCCAkWgAwIBAgIJAKhwU2Y4bkFoMA0GCSqGSIb3DQEBCwUA...
-----END CERTIFICATE-----"""
}
app = msal.ConfidentialClientApplication(
client_id="your-client-id",
client_credential=client_credential
)
# SNI allows certificate rotation without updating application registration
result = app.acquire_token_for_client(
scopes=["https://graph.microsoft.com/.default"]
)OIDC prompt parameter constants for controlling authentication UI behavior.
class Prompt:
NONE = "none" # No UI, fail if interaction required
LOGIN = "login" # Force re-authentication
CONSENT = "consent" # Force consent screen
SELECT_ACCOUNT = "select_account" # Show account picker
CREATE = "create" # Show account creation optionUsage example:
import msal
app = msal.PublicClientApplication(
client_id="your-client-id",
authority="https://login.microsoftonline.com/common"
)
# Force account selection
result = app.acquire_token_interactive(
scopes=["User.Read"],
prompt=msal.Prompt.SELECT_ACCOUNT
)
# Force re-authentication (ignore SSO)
result = app.acquire_token_interactive(
scopes=["User.Read"],
prompt=msal.Prompt.LOGIN
)
# Force consent (even if previously granted)
result = app.acquire_token_interactive(
scopes=["User.Read", "Mail.Read"],
prompt=msal.Prompt.CONSENT
)
# Fail if any interaction required
result = app.acquire_token_interactive(
scopes=["User.Read"],
prompt=msal.Prompt.NONE
)Utilities for decoding and validating JWT tokens, particularly ID tokens.
def decode_part(raw, encoding="utf-8"):
"""
Decode a part of JWT token.
Parameters:
- raw: Base64-encoded JWT part
- encoding: Character encoding (default: utf-8), use None for binary output
Returns:
Decoded string or binary data based on encoding parameter
"""
def decode_id_token(
id_token,
client_id=None,
issuer=None,
nonce=None,
now=None
):
"""
Decode and validate ID token.
Parameters:
- id_token: JWT ID token string
- client_id: Expected audience (client ID)
- issuer: Expected issuer
- nonce: Expected nonce value
- now: Current time for expiration checking
Returns:
Dictionary of ID token claims
Raises:
IdTokenError: If token is invalid or validation fails
"""Usage example:
import msal
import time
app = msal.PublicClientApplication(
client_id="your-client-id",
authority="https://login.microsoftonline.com/common"
)
# Acquire token with ID token
result = app.acquire_token_interactive(
scopes=["openid", "profile", "User.Read"]
)
if "id_token" in result:
id_token = result["id_token"]
try:
# Decode and validate ID token
id_token_claims = msal.decode_id_token(
id_token=id_token,
client_id="your-client-id",
now=int(time.time())
)
print("ID Token Claims:")
print(f" Subject: {id_token_claims.get('sub')}")
print(f" Name: {id_token_claims.get('name')}")
print(f" Email: {id_token_claims.get('preferred_username')}")
print(f" Issued at: {id_token_claims.get('iat')}")
print(f" Expires at: {id_token_claims.get('exp')}")
except msal.IdTokenError as e:
print(f"ID token validation failed: {e}")
# Decode individual JWT parts
try:
# Split JWT into parts
header, payload, signature = id_token.split('.')
# Decode header
header_claims = msal.decode_part(header)
print(f"JWT Algorithm: {header_claims.get('alg')}")
print(f"Token Type: {header_claims.get('typ')}")
# Decode payload
payload_claims = msal.decode_part(payload)
print(f"Issuer: {payload_claims.get('iss')}")
print(f"Audience: {payload_claims.get('aud')}")
except Exception as e:
print(f"JWT decoding error: {e}")import msal
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Custom HTTP client with retry logic
class RetryHTTPClient:
def __init__(self):
self.session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
def post(self, url, **kwargs):
return self.session.post(url, **kwargs)
def get(self, url, **kwargs):
return self.session.get(url, **kwargs)
# Use custom HTTP client
custom_http_client = RetryHTTPClient()
app = msal.ConfidentialClientApplication(
client_id="your-client-id",
client_credential="your-client-secret",
http_client=custom_http_client
)import msal
# Use Azure regional endpoints for better performance
app = msal.ConfidentialClientApplication(
client_id="your-client-id",
client_credential="your-client-secret",
authority="https://login.microsoftonline.com/your-tenant-id",
azure_region="eastus" # Specify region
)
# Auto-detect region in Azure environment
app = msal.ConfidentialClientApplication(
client_id="your-client-id",
client_credential="your-client-secret",
azure_region=msal.ClientApplication.ATTEMPT_REGION_DISCOVERY
)
# Check if PoP is supported
if app.is_pop_supported():
print("Proof-of-Possession tokens are supported")
# Use PoP authentication scheme
else:
print("PoP not supported without broker")import msal
import json
app = msal.PublicClientApplication(
client_id="your-client-id",
authority="https://login.microsoftonline.com/your-tenant-id"
)
# Handle claims challenge from Conditional Access
def handle_claims_challenge(claims_challenge_header):
"""Parse and handle claims challenge from API response."""
# Extract claims challenge from WWW-Authenticate header
# Format: Bearer authorization_uri="...", error="insufficient_claims", claims="..."
if "claims=" in claims_challenge_header:
claims_start = claims_challenge_header.find('claims="') + 8
claims_end = claims_challenge_header.find('"', claims_start)
claims_challenge = claims_challenge_header[claims_start:claims_end]
# Parse claims challenge JSON
try:
claims_dict = json.loads(claims_challenge)
return claims_challenge
except json.JSONDecodeError:
return None
return None
# Initial token acquisition
result = app.acquire_token_interactive(scopes=["User.Read"])
if "access_token" in result:
access_token = result["access_token"]
# Call API that may require additional claims
import requests
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.get("https://graph.microsoft.com/v1.0/me", headers=headers)
if response.status_code == 401:
# Check for claims challenge
www_auth_header = response.headers.get("WWW-Authenticate", "")
claims_challenge = handle_claims_challenge(www_auth_header)
if claims_challenge:
print("Claims challenge detected, re-authenticating...")
# Re-authenticate with claims challenge
result = app.acquire_token_interactive(
scopes=["User.Read"],
claims_challenge=claims_challenge
)
if "access_token" in result:
# Retry API call with new token
new_token = result["access_token"]
headers = {"Authorization": f"Bearer {new_token}"}
response = requests.get("https://graph.microsoft.com/v1.0/me", headers=headers)Advanced error handling patterns for robust applications:
import msal
import logging
import time
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MSALAuthHandler:
def __init__(self, client_id, authority):
self.app = msal.PublicClientApplication(
client_id=client_id,
authority=authority
)
self.max_retries = 3
self.retry_delay = 1 # seconds
def acquire_token_with_retry(self, scopes, **kwargs):
"""Acquire token with automatic retry logic."""
for attempt in range(self.max_retries):
try:
# Try silent first if we have accounts
accounts = self.app.get_accounts()
if accounts:
result = self.app.acquire_token_silent(
scopes=scopes,
account=accounts[0]
)
if "access_token" in result:
logger.info("Silent authentication successful")
return result
error = result.get("error")
if error == "interaction_required":
logger.info("Interaction required, falling back to interactive")
elif error in ["invalid_grant", "token_expired"]:
logger.warning(f"Token issue: {error}, removing account")
self.app.remove_account(accounts[0])
# Fall back to interactive
result = self.app.acquire_token_interactive(scopes=scopes, **kwargs)
if "access_token" in result:
logger.info("Interactive authentication successful")
return result
# Handle specific errors
error = result.get("error")
error_description = result.get("error_description", "")
if error == "access_denied":
logger.error("User denied consent")
return result
elif error == "invalid_scope":
logger.error(f"Invalid scope: {error_description}")
return result
elif error in ["server_error", "temporarily_unavailable"]:
if attempt < self.max_retries - 1:
logger.warning(f"Server error, retrying in {self.retry_delay}s...")
time.sleep(self.retry_delay)
self.retry_delay *= 2 # Exponential backoff
continue
logger.error(f"Authentication failed: {error} - {error_description}")
return result
except msal.BrowserInteractionTimeoutError:
logger.error("Browser interaction timed out")
if attempt < self.max_retries - 1:
logger.info("Retrying authentication...")
continue
return {"error": "timeout", "error_description": "Browser interaction timed out"}
except Exception as e:
logger.error(f"Unexpected error: {e}")
if attempt < self.max_retries - 1:
logger.info("Retrying after unexpected error...")
time.sleep(self.retry_delay)
continue
return {"error": "unexpected_error", "error_description": str(e)}
return {"error": "max_retries_exceeded", "error_description": "Failed after maximum retry attempts"}
# Usage
auth_handler = MSALAuthHandler(
client_id="your-client-id",
authority="https://login.microsoftonline.com/common"
)
result = auth_handler.acquire_token_with_retry(
scopes=["User.Read", "Mail.Read"],
timeout=120
)
if "access_token" in result:
print("Authentication successful with retry logic!")
else:
print(f"Authentication ultimately failed: {result.get('error_description')}")import msal
import keyring
import json
class SecureTokenCache(msal.SerializableTokenCache):
def __init__(self, service_name, username):
super().__init__()
self.service_name = service_name
self.username = username
# Load from secure storage
try:
cache_data = keyring.get_password(service_name, username)
if cache_data:
self.deserialize(cache_data)
except Exception as e:
print(f"Warning: Could not load secure cache: {e}")
# Register cleanup
import atexit
atexit.register(self._save_cache)
def _save_cache(self):
if self.has_state_changed:
try:
keyring.set_password(
self.service_name,
self.username,
self.serialize()
)
except Exception as e:
print(f"Warning: Could not save secure cache: {e}")
# Usage with secure storage
secure_cache = SecureTokenCache("MyApp", "TokenCache")
app = msal.PublicClientApplication(
client_id="your-client-id",
token_cache=secure_cache
)import msal
import os
from typing import Optional
def create_msal_app(
client_type: str = "public",
enable_logging: bool = False
) -> Optional[msal.ClientApplication]:
"""Create MSAL application with environment-based configuration."""
# Required environment variables
client_id = os.environ.get("AZURE_CLIENT_ID")
if not client_id:
raise ValueError("AZURE_CLIENT_ID environment variable required")
# Optional configuration
authority = os.environ.get("AZURE_AUTHORITY", "https://login.microsoftonline.com/common")
# Enable PII logging if requested
if enable_logging:
logging.basicConfig(level=logging.DEBUG)
enable_pii_log = True
else:
enable_pii_log = False
if client_type == "public":
return msal.PublicClientApplication(
client_id=client_id,
authority=authority,
enable_pii_log=enable_pii_log
)
elif client_type == "confidential":
# Confidential client requires credential
client_secret = os.environ.get("AZURE_CLIENT_SECRET")
cert_path = os.environ.get("AZURE_CERTIFICATE_PATH")
cert_thumbprint = os.environ.get("AZURE_CERTIFICATE_THUMBPRINT")
if client_secret:
client_credential = client_secret
elif cert_path and cert_thumbprint:
with open(cert_path, 'r') as f:
private_key = f.read()
client_credential = {
"private_key": private_key,
"thumbprint": cert_thumbprint
}
else:
raise ValueError("Either AZURE_CLIENT_SECRET or certificate configuration required")
return msal.ConfidentialClientApplication(
client_id=client_id,
client_credential=client_credential,
authority=authority,
enable_pii_log=enable_pii_log
)
else:
raise ValueError("client_type must be 'public' or 'confidential'")
# Usage
try:
app = create_msal_app(client_type="public", enable_logging=True)
result = app.acquire_token_interactive(scopes=["User.Read"])
except ValueError as e:
print(f"Configuration error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-msal