Azure Active Directory Authentication Library for Python that provides OAuth2/OpenID Connect authentication flows and token management for accessing Azure AD protected resources
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Thread-safe token storage and management with serialization capabilities for persistent caching across application sessions. The TokenCache class provides efficient token lookup, storage, removal, and automatic cleanup functionality.
Creates a new token cache instance with optional deserialization from previously saved state.
def __init__(self, state=None):
"""
Initialize token cache.
Parameters:
- state (str, optional): Serialized cache state from previous session
"""Usage Example:
import adal
# Create empty cache
cache = adal.TokenCache()
# Create cache from previously saved state
with open('token_cache.json', 'r') as f:
cache_state = f.read()
cache = adal.TokenCache(state=cache_state)
# Use cache with authentication context
context = adal.AuthenticationContext(
authority='https://login.microsoftonline.com/tenant-id',
cache=cache
)Searches the cache for tokens matching specified criteria. Returns a list of matching token entries.
def find(self, query):
"""
Find tokens in cache matching query criteria.
Parameters:
- query (dict): Search criteria with optional keys:
- '_clientId' (str): OAuth client ID (note the underscore prefix)
- 'userId' (str): User identifier
- 'isMRRT' (bool): Whether to match multi-resource refresh tokens
Returns:
list: List of matching authentication result dictionaries
Note: Any parameter set to None acts as a wildcard (matches all).
All specified criteria must match (AND logic).
"""Usage Example:
# Find all tokens for a specific user
user_tokens = cache.find({'userId': 'user@tenant.com'})
# Find tokens for specific client and user
app_tokens = cache.find({
'_clientId': 'your-client-id',
'userId': 'user@tenant.com'
})
# Find multi-resource refresh tokens for a user
mrrt_tokens = cache.find({
'userId': 'user@tenant.com',
'isMRRT': True
})
# Find all cached tokens
all_tokens = cache.find({})
for token in user_tokens:
print(f"Resource: {token.get('resource')}")
print(f"Expires: {token.get('expiresOn')}")Adds new token entries to the cache. Automatically handles deduplication and updates existing entries.
def add(self, entries):
"""
Add token entries to cache.
Parameters:
- entries (list or dict): Single token entry or list of token entries to add
Each entry should be an authentication result dictionary
"""Usage Example:
# Tokens are typically added automatically by AuthenticationContext
# But you can add tokens manually if needed
token_entry = {
'accessToken': 'eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIs...',
'refreshToken': 'AQABAAAAAAD--DLA3VO7QrddgJg7WevrAg...',
'tokenType': 'Bearer',
'expiresIn': 3600,
'expiresOn': '2023-10-01T12:00:00Z',
'resource': 'https://management.azure.com/',
'userId': 'user@tenant.com',
'clientId': 'your-client-id'
}
cache.add(token_entry)
print(f"Cache has changes: {cache.has_state_changed}")Removes token entries from the cache based on matching criteria.
def remove(self, entries):
"""
Remove token entries from cache.
Parameters:
- entries (list or dict): Single token entry or list of token entries to remove
Entries are matched based on authority, resource, client ID, and user ID
"""Usage Example:
# Remove specific tokens (typically done automatically on expiration)
expired_tokens = cache.find({'userId': 'user@tenant.com'})
cache.remove(expired_tokens)
# Remove single token entry
cache.remove(token_entry)Serializes the entire cache to a JSON string for persistent storage across application sessions.
def serialize(self):
"""
Serialize cache to JSON string.
Returns:
str: JSON representation of cache contents
"""Usage Example:
# Save cache to file
cache_state = cache.serialize()
with open('token_cache.json', 'w') as f:
f.write(cache_state)
# Save cache to database or other storage
import base64
encoded_cache = base64.b64encode(cache_state.encode()).decode()
# Store encoded_cache in databaseLoads cache contents from a previously serialized JSON string.
def deserialize(self, state):
"""
Load cache contents from serialized state.
Parameters:
- state (str): JSON string from previous serialize() call
"""Usage Example:
# Load cache from file
with open('token_cache.json', 'r') as f:
cache_state = f.read()
cache = adal.TokenCache()
cache.deserialize(cache_state)
# Load from database
# encoded_cache = get_from_database()
# cache_state = base64.b64decode(encoded_cache).decode()
# cache.deserialize(cache_state)Returns all cache entries as key-value pairs for inspection and debugging.
def read_items(self):
"""
Get all cache entries as key-value pairs.
Returns:
list: List of (TokenCacheKey, authentication_result) tuples
"""Usage Example:
# Inspect cache contents
items = cache.read_items()
print(f"Cache contains {len(items)} entries:")
for cache_key, token_data in items:
print(f"User: {cache_key.user_id}")
print(f"Resource: {cache_key.resource}")
print(f"Client: {cache_key.client_id}")
print(f"Expires: {token_data.get('expiresOn')}")
print("---")has_state_changed: bool # Read-only property indicating if cache has been modifiedUsage Example:
# Check if cache needs to be saved
if cache.has_state_changed:
cache_state = cache.serialize()
save_cache_to_storage(cache_state)
print("Cache saved")
else:
print("No changes to save")import adal
import os
import json
class PersistentTokenCache:
def __init__(self, cache_file='adal_cache.json'):
self.cache_file = cache_file
self.cache = self._load_cache()
def _load_cache(self):
"""Load cache from file if it exists"""
if os.path.exists(self.cache_file):
try:
with open(self.cache_file, 'r') as f:
cache_state = f.read()
return adal.TokenCache(state=cache_state)
except Exception as e:
print(f"Failed to load cache: {e}")
return adal.TokenCache()
def save_cache(self):
"""Save cache to file if it has changed"""
if self.cache.has_state_changed:
try:
cache_state = self.cache.serialize()
with open(self.cache_file, 'w') as f:
f.write(cache_state)
print(f"Cache saved to {self.cache_file}")
except Exception as e:
print(f"Failed to save cache: {e}")
def get_context(self, authority):
"""Get authentication context with persistent cache"""
return adal.AuthenticationContext(authority, cache=self.cache)
def clear_cache(self):
"""Clear all cached tokens"""
all_tokens = self.cache.find({})
if all_tokens:
self.cache.remove(all_tokens)
self.save_cache()
print("Cache cleared")
# Usage example
def main():
# Create persistent cache manager
cache_manager = PersistentTokenCache('my_app_cache.json')
# Get authentication context with cache
authority = 'https://login.microsoftonline.com/tenant-id'
context = cache_manager.get_context(authority)
try:
# Try to get token (will use cache if available)
token = context.acquire_token_with_client_credentials(
resource='https://management.azure.com/',
client_id='your-client-id',
client_secret='your-client-secret'
)
print("Authentication successful!")
print(f"Token expires: {token.get('expiresOn')}")
# Save cache for next time
cache_manager.save_cache()
except adal.AdalError as e:
print(f"Authentication failed: {e}")
# Inspect cache contents
cached_tokens = cache_manager.cache.find({})
print(f"Cache contains {len(cached_tokens)} tokens")
if __name__ == '__main__':
main()TokenCache is thread-safe and can be used safely across multiple threads. All operations use internal locking to prevent race conditions.
import threading
import adal
# Shared cache across threads
shared_cache = adal.TokenCache()
def worker_thread(thread_id):
context = adal.AuthenticationContext(
'https://login.microsoftonline.com/tenant-id',
cache=shared_cache
)
# Safe to call from multiple threads
token = context.acquire_token_with_client_credentials(
resource='https://management.azure.com/',
client_id='your-client-id',
client_secret='your-client-secret'
)
print(f"Thread {thread_id} got token")
# Create multiple threads using same cache
threads = []
for i in range(5):
t = threading.Thread(target=worker_thread, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()Install with Tessl CLI
npx tessl i tessl/pypi-adal