Python class to integrate Boto3's Cognito client so it is easy to login users with SRP support.
—
Low-level Secure Remote Password (SRP) protocol implementation for AWS Cognito User Pools. SRP provides cryptographically secure authentication without transmitting passwords over the network, forming the foundation for secure user authentication in pycognito.
The core SRP implementation that handles the complete SRP authentication flow with AWS Cognito.
class AWSSRP:
"""AWS SRP authentication protocol implementation."""
# Challenge type constants
SMS_MFA_CHALLENGE = "SMS_MFA"
SOFTWARE_TOKEN_MFA_CHALLENGE = "SOFTWARE_TOKEN_MFA"
NEW_PASSWORD_REQUIRED_CHALLENGE = "NEW_PASSWORD_REQUIRED"
PASSWORD_VERIFIER_CHALLENGE = "PASSWORD_VERIFIER"
DEVICE_SRP_CHALLENGE = "DEVICE_SRP_AUTH"
DEVICE_PASSWORD_VERIFIER_CHALLENGE = "DEVICE_PASSWORD_VERIFIER"
def __init__(self, username: str, password: str, pool_id: str, client_id: str,
pool_region: str = None, client=None, client_secret: str = None,
device_key: str = None, device_group_key: str = None,
device_password: str = None):
"""
Initialize AWSSRP authentication client.
Args:
username (str): User's username
password (str): User's password
pool_id (str): Cognito User Pool ID
client_id (str): Cognito Client ID
pool_region (str, optional): AWS region (extracted from pool_id if not provided)
client (boto3.client, optional): Pre-configured boto3 cognito-idp client
client_secret (str, optional): Client secret if app client requires it
device_key (str, optional): Device key for device authentication
device_group_key (str, optional): Device group key for device authentication
device_password (str, optional): Device password for device authentication
Raises:
ValueError: If pool_region and client are both specified, or if device parameters are partially provided
Note:
Device authentication requires all three device parameters or none at all.
"""Usage Example:
from pycognito.aws_srp import AWSSRP
# Basic SRP authentication
aws_srp = AWSSRP(
username='user@example.com',
password='user-password',
pool_id='us-east-1_example123',
client_id='your-client-id'
)
# With client secret
aws_srp = AWSSRP(
username='user@example.com',
password='user-password',
pool_id='us-east-1_example123',
client_id='your-client-id',
client_secret='your-client-secret'
)
# With device authentication
aws_srp = AWSSRP(
username='user@example.com',
password='user-password',
pool_id='us-east-1_example123',
client_id='your-client-id',
device_key='device-key-123',
device_group_key='device-group-456',
device_password='device-password-789'
)Execute the complete SRP authentication flow with challenge handling.
def authenticate_user(self, client=None, client_metadata: dict = None) -> dict:
"""
Authenticate user using SRP protocol.
Args:
client (boto3.client, optional): Override the default boto3 client
client_metadata (dict, optional): Custom workflow metadata
Returns:
dict: Authentication result with tokens
{
'AuthenticationResult': {
'AccessToken': str,
'IdToken': str,
'RefreshToken': str,
'TokenType': str,
'ExpiresIn': int
},
'ChallengeName': str, # If additional challenges required
'Session': str # Challenge session token
}
Raises:
ForceChangePasswordException: When password change is required
SoftwareTokenMFAChallengeException: When TOTP MFA is required
SMSMFAChallengeException: When SMS MFA is required
NotImplementedError: For unsupported challenge types
Flow:
1. Initiate SRP authentication with public key
2. Process password verifier challenge
3. Handle device challenges if device keys provided
4. Return tokens or raise MFA/password challenges
"""Usage Example:
from pycognito.exceptions import (
ForceChangePasswordException,
SoftwareTokenMFAChallengeException,
SMSMFAChallengeException
)
try:
# Perform SRP authentication
tokens = aws_srp.authenticate_user()
# Success - tokens contain access, ID, and refresh tokens
print("Authentication successful!")
print(f"Access Token: {tokens['AuthenticationResult']['AccessToken']}")
except ForceChangePasswordException:
print("Password change required")
# Handle new password challenge
except SoftwareTokenMFAChallengeException as e:
print("TOTP MFA required")
# Handle software token MFA
mfa_tokens = e.get_tokens()
except SMSMFAChallengeException as e:
print("SMS MFA required")
# Handle SMS MFA
mfa_tokens = e.get_tokens()Handle the new password required challenge when users must set a new password.
def set_new_password_challenge(self, new_password: str, client=None) -> dict:
"""
Handle new password required challenge.
Args:
new_password (str): New password to set
client (boto3.client, optional): Override the default boto3 client
Returns:
dict: Authentication result after password is set
Use Cases:
- User logging in with temporary password (admin created users)
- Password expiration policies requiring new password
- Security policies forcing password updates
Flow:
1. Initiate SRP authentication
2. Process password verifier challenge
3. Handle NEW_PASSWORD_REQUIRED challenge with new password
4. Return final authentication tokens
"""Usage Example:
# Handle new password requirement
try:
tokens = aws_srp.authenticate_user()
except ForceChangePasswordException:
print("Setting new password...")
new_password = input("Enter new password: ")
tokens = aws_srp.set_new_password_challenge(new_password)
print("New password set successfully!")
print(f"Access Token: {tokens['AuthenticationResult']['AccessToken']}")Generate authentication parameters for SRP initiation.
def get_auth_params(self) -> dict:
"""
Get authentication parameters for SRP initiation.
Returns:
dict: Parameters for initiate_auth call
{
'USERNAME': str,
'SRP_A': str, # Public key for SRP
'SECRET_HASH': str, # If client secret is configured
'DEVICE_KEY': str # If device authentication enabled
}
Note:
These parameters are used internally but can be useful for
custom authentication flows or debugging.
"""Process SRP authentication challenges with password verification.
def process_challenge(self, challenge_parameters: dict, request_parameters: dict) -> dict:
"""
Process password verifier challenge.
Args:
challenge_parameters (dict): Challenge data from AWS Cognito
request_parameters (dict): Original request parameters
Returns:
dict: Challenge response parameters
{
'TIMESTAMP': str,
'USERNAME': str,
'PASSWORD_CLAIM_SECRET_BLOCK': str,
'PASSWORD_CLAIM_SIGNATURE': str,
'SECRET_HASH': str, # If client secret configured
'DEVICE_KEY': str # If device authentication enabled
}
Internal Flow:
1. Extract challenge parameters (USER_ID_FOR_SRP, SALT, SRP_B, SECRET_BLOCK)
2. Compute password authentication key using SRP protocol
3. Generate HMAC signature with secret block and timestamp
4. Return challenge response for AWS verification
"""
def process_device_challenge(self, challenge_parameters: dict) -> dict:
"""
Process device password verifier challenge.
Args:
challenge_parameters (dict): Device challenge data from AWS Cognito
Returns:
dict: Device challenge response parameters
Note:
Similar to process_challenge but uses device-specific credentials
and authentication keys for device verification.
"""Static methods for SRP cryptographic operations.
@staticmethod
def get_secret_hash(username: str, client_id: str, client_secret: str) -> str:
"""
Calculate secret hash for client secret authentication.
Args:
username (str): Username
client_id (str): Cognito client ID
client_secret (str): Cognito client secret
Returns:
str: Base64-encoded HMAC-SHA256 hash
Formula:
HMAC-SHA256(client_secret, username + client_id)
"""
@staticmethod
def get_cognito_formatted_timestamp(input_datetime: datetime) -> str:
"""
Format datetime for Cognito authentication.
Args:
input_datetime (datetime): Datetime to format
Returns:
str: Formatted timestamp string (e.g., "Mon Jan 2 15:04:05 UTC 2006")
Note:
Uses specific format required by AWS Cognito SRP protocol.
"""Usage Example:
from datetime import datetime
# Calculate secret hash
secret_hash = AWSSRP.get_secret_hash(
username='user@example.com',
client_id='your-client-id',
client_secret='your-client-secret'
)
# Format timestamp for Cognito
timestamp = AWSSRP.get_cognito_formatted_timestamp(datetime.utcnow())
print(f"Cognito timestamp: {timestamp}")Low-level cryptographic functions that implement the SRP protocol mathematics.
def hash_sha256(buf: bytes) -> str:
"""Hash buffer using SHA256 and return hex digest."""
def hex_hash(hex_string: str) -> str:
"""Hash hex string using SHA256."""
def hex_to_long(hex_string: str) -> int:
"""Convert hex string to long integer."""
def long_to_hex(long_num: int) -> str:
"""Convert long integer to hex string."""
def get_random(nbytes: int) -> int:
"""Generate random long integer from n bytes."""
def pad_hex(long_int: int | str) -> str:
"""Pad hex string for consistent hashing."""
def compute_hkdf(ikm: bytes, salt: bytes) -> bytes:
"""Compute HKDF (HMAC-based Key Derivation Function)."""
def calculate_u(big_a: int, big_b: int) -> int:
"""Calculate SRP U value from client and server public keys."""
def generate_hash_device(device_group_key: str, device_key: str) -> tuple:
"""Generate device hash and verifier for device authentication."""def direct_srp_authentication(username, password, pool_id, client_id):
"""Direct SRP authentication without Cognito wrapper."""
# Initialize SRP client
aws_srp = AWSSRP(
username=username,
password=password,
pool_id=pool_id,
client_id=client_id
)
try:
# Perform authentication
tokens = aws_srp.authenticate_user()
# Extract tokens
auth_result = tokens['AuthenticationResult']
access_token = auth_result['AccessToken']
id_token = auth_result['IdToken']
refresh_token = auth_result.get('RefreshToken')
print("SRP Authentication successful!")
return {
'access_token': access_token,
'id_token': id_token,
'refresh_token': refresh_token
}
except Exception as e:
print(f"SRP Authentication failed: {e}")
return None
# Usage
tokens = direct_srp_authentication(
'user@example.com',
'password',
'us-east-1_example123',
'your-client-id'
)def custom_srp_flow_with_challenges(username, password, pool_id, client_id):
"""Custom SRP flow with manual challenge handling."""
aws_srp = AWSSRP(username, password, pool_id, client_id)
# Step 1: Get authentication parameters
auth_params = aws_srp.get_auth_params()
print(f"SRP_A (public key): {auth_params['SRP_A']}")
# Step 2: Initiate auth with AWS
response = aws_srp.client.initiate_auth(
AuthFlow='USER_SRP_AUTH',
AuthParameters=auth_params,
ClientId=client_id
)
# Step 3: Handle password verifier challenge
if response['ChallengeName'] == 'PASSWORD_VERIFIER':
print("Processing password verifier challenge...")
# Process challenge
challenge_response = aws_srp.process_challenge(
response['ChallengeParameters'],
auth_params
)
# Respond to challenge
tokens = aws_srp.client.respond_to_auth_challenge(
ClientId=client_id,
ChallengeName='PASSWORD_VERIFIER',
ChallengeResponses=challenge_response
)
# Check for additional challenges
if 'ChallengeName' in tokens:
print(f"Additional challenge required: {tokens['ChallengeName']}")
return tokens # Return for further processing
else:
print("Authentication completed!")
return tokens['AuthenticationResult']
else:
print(f"Unexpected challenge: {response['ChallengeName']}")
return None
# Usage
result = custom_srp_flow_with_challenges(
'user@example.com',
'password',
'us-east-1_example123',
'your-client-id'
)def srp_with_custom_boto3():
"""SRP authentication with custom boto3 configuration."""
import boto3
from botocore.config import Config
# Custom boto3 configuration
config = Config(
region_name='us-east-1',
retries={'max_attempts': 3},
read_timeout=30,
connect_timeout=10
)
# Create custom cognito client
cognito_client = boto3.client('cognito-idp', config=config)
# Initialize SRP with custom client
aws_srp = AWSSRP(
username='user@example.com',
password='password',
pool_id='us-east-1_example123',
client_id='your-client-id',
client=cognito_client # Use custom client
)
# Authenticate
tokens = aws_srp.authenticate_user()
return tokens
# Usage
tokens = srp_with_custom_boto3()def demonstrate_srp_mathematics():
"""Demonstrate the mathematical operations in SRP protocol."""
from pycognito.aws_srp import (
hex_to_long, long_to_hex, pad_hex,
calculate_u, compute_hkdf, N_HEX, G_HEX
)
# SRP protocol constants
N = hex_to_long(N_HEX) # Large prime number
g = hex_to_long(G_HEX) # Generator (2)
print(f"SRP Prime N: {N}")
print(f"SRP Generator g: {g}")
# Simulate SRP key generation
a = 12345 # Client private key (normally random)
A = pow(g, a, N) # Client public key: g^a mod N
print(f"Client private key a: {a}")
print(f"Client public key A: {A}")
print(f"A (hex): {long_to_hex(A)}")
print(f"A (padded): {pad_hex(A)}")
# Simulate server public key
B = 67890 # Server public key (normally calculated)
# Calculate U value (hash of A and B)
u = calculate_u(A, B)
print(f"U value: {u}")
# Demonstrate HKDF
ikm = b"input key material"
salt = b"salt value"
derived_key = compute_hkdf(ikm, salt)
print(f"HKDF result: {derived_key.hex()}")
# Usage
demonstrate_srp_mathematics()class ProductionSRPClient:
"""Production-ready SRP client with error handling and logging."""
def __init__(self, pool_id, client_id, client_secret=None):
self.pool_id = pool_id
self.client_id = client_id
self.client_secret = client_secret
def authenticate(self, username, password, retry_count=3):
"""Authenticate with retry logic and comprehensive error handling."""
for attempt in range(retry_count):
try:
print(f"Authentication attempt {attempt + 1}")
aws_srp = AWSSRP(
username=username,
password=password,
pool_id=self.pool_id,
client_id=self.client_id,
client_secret=self.client_secret
)
tokens = aws_srp.authenticate_user()
print("Authentication successful!")
return {
'success': True,
'tokens': tokens,
'attempt': attempt + 1
}
except ForceChangePasswordException:
return {
'success': False,
'error': 'PASSWORD_CHANGE_REQUIRED',
'message': 'User must change password'
}
except SoftwareTokenMFAChallengeException as e:
return {
'success': False,
'error': 'MFA_REQUIRED_TOTP',
'mfa_tokens': e.get_tokens()
}
except SMSMFAChallengeException as e:
return {
'success': False,
'error': 'MFA_REQUIRED_SMS',
'mfa_tokens': e.get_tokens()
}
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt == retry_count - 1:
return {
'success': False,
'error': 'AUTHENTICATION_FAILED',
'message': str(e)
}
return {
'success': False,
'error': 'MAX_RETRIES_EXCEEDED'
}
# Usage
srp_client = ProductionSRPClient(
pool_id='us-east-1_example123',
client_id='your-client-id',
client_secret='your-client-secret'
)
result = srp_client.authenticate('user@example.com', 'password')
if result['success']:
print("Authentication successful!")
tokens = result['tokens']
else:
print(f"Authentication failed: {result['error']}")
if 'mfa_tokens' in result:
print("MFA required - handle accordingly")Install with Tessl CLI
npx tessl i tessl/pypi-pycognito