Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines
—
Security and authentication mechanisms for secure connections to Kafka brokers in Faust applications. Provides SSL, SASL, and GSSAPI credential management with support for various authentication protocols, certificate handling, and secure communication configuration.
SSL/TLS authentication credentials for secure broker connections. Provides certificate-based authentication with support for custom SSL contexts, certificate files, and certificate authority validation.
class SSLCredentials:
def __init__(
self,
*,
context: ssl.SSLContext = None,
purpose: ssl.Purpose = None,
cafile: str = None,
capath: str = None,
cadata: str = None,
certfile: str = None,
keyfile: str = None,
password: str = None,
ciphers: str = None,
**kwargs
):
"""
Create SSL credentials for secure broker connections.
Args:
context: Custom SSL context
purpose: SSL purpose (SERVER_AUTH, CLIENT_AUTH)
cafile: Certificate authority file path
capath: Certificate authority directory path
cadata: Certificate authority data string
certfile: Client certificate file path
keyfile: Client private key file path
password: Private key password
ciphers: Allowed cipher suites
"""
def load_verify_locations(
self,
cafile: str = None,
capath: str = None,
cadata: str = None
) -> None:
"""
Load certificate authority verification locations.
Args:
cafile: CA certificate file
capath: CA certificate directory
cadata: CA certificate data
"""
def load_cert_chain(
self,
certfile: str,
keyfile: str = None,
password: str = None
) -> None:
"""
Load client certificate chain.
Args:
certfile: Certificate file path
keyfile: Private key file path
password: Private key password
"""
def set_ciphers(self, ciphers: str) -> None:
"""
Set allowed cipher suites.
Args:
ciphers: Cipher suite specification
"""
@property
def context(self) -> ssl.SSLContext:
"""SSL context object."""
@property
def cafile(self) -> str:
"""Certificate authority file path."""
@property
def certfile(self) -> str:
"""Client certificate file path."""
@property
def keyfile(self) -> str:
"""Client private key file path."""Simple Authentication and Security Layer (SASL) credentials for broker authentication. Supports multiple SASL mechanisms including PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, and OAUTHBEARER.
class SASLCredentials:
def __init__(
self,
*,
mechanism: str = None,
username: str = None,
password: str = None,
ssl_context: ssl.SSLContext = None,
**kwargs
):
"""
Create SASL credentials for broker authentication.
Args:
mechanism: SASL mechanism ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512', 'OAUTHBEARER')
username: Authentication username
password: Authentication password
ssl_context: SSL context for secure connections
"""
def create_authenticator(self) -> callable:
"""
Create authenticator function for SASL mechanism.
Returns:
Authenticator function compatible with Kafka client
"""
@property
def mechanism(self) -> str:
"""SASL mechanism name."""
@property
def username(self) -> str:
"""Authentication username."""
@property
def password(self) -> str:
"""Authentication password."""
@property
def ssl_context(self) -> ssl.SSLContext:
"""SSL context for secure transport."""
class PlainCredentials(SASLCredentials):
"""SASL PLAIN mechanism credentials."""
def __init__(self, *, username: str, password: str, **kwargs):
super().__init__(mechanism='PLAIN', username=username, password=password, **kwargs)
class ScramCredentials(SASLCredentials):
"""SASL SCRAM mechanism credentials."""
def __init__(
self,
*,
username: str,
password: str,
mechanism: str = 'SCRAM-SHA-256',
**kwargs
):
super().__init__(mechanism=mechanism, username=username, password=password, **kwargs)Generic Security Services Application Program Interface (GSSAPI) credentials for Kerberos authentication. Provides integration with existing Kerberos infrastructure and ticket-based authentication.
class GSSAPICredentials:
def __init__(
self,
*,
kerberos_service_name: str = 'kafka',
kerberos_domain_name: str = None,
principal: str = None,
kinit_cmd: str = None,
ticket_renew_window_factor: float = 0.8,
**kwargs
):
"""
Create GSSAPI credentials for Kerberos authentication.
Args:
kerberos_service_name: Kerberos service name (default: 'kafka')
kerberos_domain_name: Kerberos domain name
principal: Kerberos principal name
kinit_cmd: Custom kinit command
ticket_renew_window_factor: Ticket renewal threshold (0.0-1.0)
"""
def acquire_credentials(self) -> None:
"""
Acquire Kerberos credentials (TGT).
Raises:
AuthenticationError: If credential acquisition fails
"""
def renew_credentials(self) -> bool:
"""
Renew Kerberos credentials if needed.
Returns:
True if credentials were renewed
"""
def check_credentials(self) -> bool:
"""
Check if credentials are valid and not expired.
Returns:
True if credentials are valid
"""
@property
def service_name(self) -> str:
"""Kerberos service name."""
@property
def domain_name(self) -> str:
"""Kerberos domain name."""
@property
def principal(self) -> str:
"""Kerberos principal name."""OAuth 2.0 credentials for modern authentication workflows with token-based authentication and automatic token refresh capabilities.
class OAuthCredentials:
def __init__(
self,
*,
token_url: str,
client_id: str,
client_secret: str = None,
scope: str = None,
audience: str = None,
grant_type: str = 'client_credentials',
**kwargs
):
"""
Create OAuth credentials for token-based authentication.
Args:
token_url: OAuth token endpoint URL
client_id: OAuth client identifier
client_secret: OAuth client secret
scope: OAuth scope string
audience: OAuth audience
grant_type: OAuth grant type
"""
async def get_token(self) -> str:
"""
Get valid access token.
Returns:
Access token string
Raises:
AuthenticationError: If token acquisition fails
"""
async def refresh_token(self) -> str:
"""
Refresh access token.
Returns:
New access token string
"""
def is_token_expired(self) -> bool:
"""
Check if current token is expired.
Returns:
True if token needs refresh
"""
@property
def client_id(self) -> str:
"""OAuth client identifier."""
@property
def token_url(self) -> str:
"""OAuth token endpoint URL."""Utilities for configuring authentication at the application level with support for multiple credential types and broker-specific settings.
def configure_ssl(
app: App,
*,
cafile: str = None,
certfile: str = None,
keyfile: str = None,
password: str = None,
context: ssl.SSLContext = None,
**kwargs
) -> None:
"""
Configure SSL authentication for application.
Args:
app: Faust application
cafile: Certificate authority file
certfile: Client certificate file
keyfile: Client private key file
password: Private key password
context: Custom SSL context
"""
def configure_sasl(
app: App,
*,
mechanism: str,
username: str,
password: str,
**kwargs
) -> None:
"""
Configure SASL authentication for application.
Args:
app: Faust application
mechanism: SASL mechanism
username: Authentication username
password: Authentication password
"""
def configure_gssapi(
app: App,
*,
service_name: str = 'kafka',
domain_name: str = None,
**kwargs
) -> None:
"""
Configure GSSAPI authentication for application.
Args:
app: Faust application
service_name: Kerberos service name
domain_name: Kerberos domain name
"""
class AuthenticationError(Exception):
"""Raised when authentication fails."""
pass
class CredentialsError(Exception):
"""Raised when credential validation fails."""
passimport faust
import ssl
# Create SSL credentials
ssl_creds = faust.SSLCredentials(
cafile='/path/to/ca-cert.pem',
certfile='/path/to/client-cert.pem',
keyfile='/path/to/client-key.pem',
password='key-password'
)
# Application with SSL authentication
app = faust.App(
'secure-app',
broker='kafka://secure-broker:9093',
ssl_credentials=ssl_creds
)
# Alternative: Configure SSL context directly
ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
app = faust.App(
'secure-app',
broker='kafka://secure-broker:9093',
ssl_context=ssl_context
)# SASL PLAIN authentication
sasl_creds = faust.SASLCredentials(
mechanism='PLAIN',
username='kafka-user',
password='kafka-password'
)
app = faust.App(
'sasl-app',
broker='kafka://broker:9092',
sasl_credentials=sasl_creds
)
# SCRAM-SHA-256 authentication
scram_creds = faust.ScramCredentials(
username='kafka-user',
password='kafka-password',
mechanism='SCRAM-SHA-256'
)
app = faust.App(
'scram-app',
broker='kafka://broker:9092',
sasl_credentials=scram_creds
)# GSSAPI/Kerberos authentication
gssapi_creds = faust.GSSAPICredentials(
kerberos_service_name='kafka',
kerberos_domain_name='EXAMPLE.COM',
principal='kafka-client@EXAMPLE.COM'
)
app = faust.App(
'kerberos-app',
broker='kafka://broker:9092',
gssapi_credentials=gssapi_creds
)
# Acquire credentials before starting
@app.on_startup.connect
async def acquire_kerberos_ticket():
gssapi_creds.acquire_credentials()
print("Kerberos ticket acquired")
# Periodic ticket renewal
@app.timer(interval=3600.0) # Renew every hour
async def renew_kerberos_ticket():
if gssapi_creds.renew_credentials():
print("Kerberos ticket renewed")# SSL transport with SASL authentication
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = True
sasl_creds = faust.SASLCredentials(
mechanism='SCRAM-SHA-512',
username='secure-user',
password='secure-password',
ssl_context=ssl_context
)
app = faust.App(
'secure-sasl-app',
broker='kafka://secure-broker:9093',
ssl_context=ssl_context,
sasl_credentials=sasl_creds
)# OAuth 2.0 authentication
oauth_creds = faust.OAuthCredentials(
token_url='https://auth.example.com/oauth/token',
client_id='kafka-client',
client_secret='client-secret',
scope='kafka:read kafka:write'
)
app = faust.App(
'oauth-app',
broker='kafka://broker:9092',
oauth_credentials=oauth_creds
)
@app.on_startup.connect
async def get_initial_token():
token = await oauth_creds.get_token()
print(f"Initial token acquired: {token[:10]}...")
# Automatic token refresh
@app.timer(interval=1800.0) # Refresh every 30 minutes
async def refresh_oauth_token():
if oauth_creds.is_token_expired():
token = await oauth_creds.refresh_token()
print("OAuth token refreshed")import os
def create_app_with_auth():
auth_method = os.getenv('KAFKA_AUTH_METHOD', 'none')
if auth_method == 'ssl':
credentials = faust.SSLCredentials(
cafile=os.getenv('KAFKA_CA_FILE'),
certfile=os.getenv('KAFKA_CERT_FILE'),
keyfile=os.getenv('KAFKA_KEY_FILE'),
password=os.getenv('KAFKA_KEY_PASSWORD')
)
return faust.App(
'env-app',
broker=os.getenv('KAFKA_BROKER'),
ssl_credentials=credentials
)
elif auth_method == 'sasl':
credentials = faust.SASLCredentials(
mechanism=os.getenv('KAFKA_SASL_MECHANISM', 'PLAIN'),
username=os.getenv('KAFKA_USERNAME'),
password=os.getenv('KAFKA_PASSWORD')
)
return faust.App(
'env-app',
broker=os.getenv('KAFKA_BROKER'),
sasl_credentials=credentials
)
else:
return faust.App(
'env-app',
broker=os.getenv('KAFKA_BROKER', 'kafka://localhost:9092')
)
app = create_app_with_auth()from faust import AuthenticationError, CredentialsError
@app.on_startup.connect
async def validate_credentials():
try:
# Validate credentials before starting
if hasattr(app, 'ssl_credentials'):
app.ssl_credentials.load_verify_locations()
if hasattr(app, 'gssapi_credentials'):
if not app.gssapi_credentials.check_credentials():
app.gssapi_credentials.acquire_credentials()
except CredentialsError as e:
print(f"Credential validation failed: {e}")
raise
except AuthenticationError as e:
print(f"Authentication failed: {e}")
raisefrom typing import Protocol, Optional
import ssl
class CredentialsT(Protocol):
"""Base type interface for all credentials."""
pass
class SSLCredentialsT(CredentialsT, Protocol):
"""Type interface for SSL credentials."""
context: ssl.SSLContext
cafile: Optional[str]
certfile: Optional[str]
keyfile: Optional[str]
def load_verify_locations(self, **kwargs) -> None: ...
def load_cert_chain(self, certfile: str, **kwargs) -> None: ...
class SASLCredentialsT(CredentialsT, Protocol):
"""Type interface for SASL credentials."""
mechanism: str
username: str
password: str
ssl_context: Optional[ssl.SSLContext]
def create_authenticator(self) -> callable: ...
class GSSAPICredentialsT(CredentialsT, Protocol):
"""Type interface for GSSAPI credentials."""
service_name: str
domain_name: Optional[str]
principal: Optional[str]
def acquire_credentials(self) -> None: ...
def renew_credentials(self) -> bool: ...
def check_credentials(self) -> bool: ...Install with Tessl CLI
npx tessl i tessl/pypi-faust