CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-faust

Python stream processing library that ports Kafka Streams to Python for building distributed systems and real-time data pipelines

Pending
Overview
Eval results
Files

authentication.mddocs/

Authentication

Security and authentication mechanisms for secure connections to Kafka brokers in Faust applications. Provides SSL, SASL, and GSSAPI credential management with support for various authentication protocols, certificate handling, and secure communication configuration.

Capabilities

SSL Credentials

SSL/TLS authentication credentials for secure broker connections. Provides certificate-based authentication with support for custom SSL contexts, certificate files, and certificate authority validation.

class SSLCredentials:
    def __init__(
        self,
        *,
        context: ssl.SSLContext = None,
        purpose: ssl.Purpose = None,
        cafile: str = None,
        capath: str = None,
        cadata: str = None,
        certfile: str = None,
        keyfile: str = None,
        password: str = None,
        ciphers: str = None,
        **kwargs
    ):
        """
        Create SSL credentials for secure broker connections.
        
        Args:
            context: Custom SSL context
            purpose: SSL purpose (SERVER_AUTH, CLIENT_AUTH)
            cafile: Certificate authority file path
            capath: Certificate authority directory path
            cadata: Certificate authority data string
            certfile: Client certificate file path
            keyfile: Client private key file path
            password: Private key password
            ciphers: Allowed cipher suites
        """

    def load_verify_locations(
        self,
        cafile: str = None,
        capath: str = None,
        cadata: str = None
    ) -> None:
        """
        Load certificate authority verification locations.
        
        Args:
            cafile: CA certificate file
            capath: CA certificate directory
            cadata: CA certificate data
        """

    def load_cert_chain(
        self,
        certfile: str,
        keyfile: str = None,
        password: str = None
    ) -> None:
        """
        Load client certificate chain.
        
        Args:
            certfile: Certificate file path
            keyfile: Private key file path
            password: Private key password
        """

    def set_ciphers(self, ciphers: str) -> None:
        """
        Set allowed cipher suites.
        
        Args:
            ciphers: Cipher suite specification
        """

    @property
    def context(self) -> ssl.SSLContext:
        """SSL context object."""

    @property
    def cafile(self) -> str:
        """Certificate authority file path."""

    @property
    def certfile(self) -> str:
        """Client certificate file path."""

    @property
    def keyfile(self) -> str:
        """Client private key file path."""

SASL Credentials

Simple Authentication and Security Layer (SASL) credentials for broker authentication. Supports multiple SASL mechanisms including PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, and OAUTHBEARER.

class SASLCredentials:
    def __init__(
        self,
        *,
        mechanism: str = None,
        username: str = None,
        password: str = None,
        ssl_context: ssl.SSLContext = None,
        **kwargs
    ):
        """
        Create SASL credentials for broker authentication.
        
        Args:
            mechanism: SASL mechanism ('PLAIN', 'SCRAM-SHA-256', 'SCRAM-SHA-512', 'OAUTHBEARER')
            username: Authentication username
            password: Authentication password
            ssl_context: SSL context for secure connections
        """

    def create_authenticator(self) -> callable:
        """
        Create authenticator function for SASL mechanism.
        
        Returns:
            Authenticator function compatible with Kafka client
        """

    @property
    def mechanism(self) -> str:
        """SASL mechanism name."""

    @property
    def username(self) -> str:
        """Authentication username."""

    @property
    def password(self) -> str:
        """Authentication password."""

    @property
    def ssl_context(self) -> ssl.SSLContext:
        """SSL context for secure transport."""

class PlainCredentials(SASLCredentials):
    """SASL PLAIN mechanism credentials."""
    
    def __init__(self, *, username: str, password: str, **kwargs):
        super().__init__(mechanism='PLAIN', username=username, password=password, **kwargs)

class ScramCredentials(SASLCredentials):
    """SASL SCRAM mechanism credentials."""
    
    def __init__(
        self,
        *,
        username: str,
        password: str,
        mechanism: str = 'SCRAM-SHA-256',
        **kwargs
    ):
        super().__init__(mechanism=mechanism, username=username, password=password, **kwargs)

GSSAPI Credentials

Generic Security Services Application Program Interface (GSSAPI) credentials for Kerberos authentication. Provides integration with existing Kerberos infrastructure and ticket-based authentication.

class GSSAPICredentials:
    def __init__(
        self,
        *,
        kerberos_service_name: str = 'kafka',
        kerberos_domain_name: str = None,
        principal: str = None,
        kinit_cmd: str = None,
        ticket_renew_window_factor: float = 0.8,
        **kwargs
    ):
        """
        Create GSSAPI credentials for Kerberos authentication.
        
        Args:
            kerberos_service_name: Kerberos service name (default: 'kafka')
            kerberos_domain_name: Kerberos domain name
            principal: Kerberos principal name
            kinit_cmd: Custom kinit command
            ticket_renew_window_factor: Ticket renewal threshold (0.0-1.0)
        """

    def acquire_credentials(self) -> None:
        """
        Acquire Kerberos credentials (TGT).
        
        Raises:
            AuthenticationError: If credential acquisition fails
        """

    def renew_credentials(self) -> bool:
        """
        Renew Kerberos credentials if needed.
        
        Returns:
            True if credentials were renewed
        """

    def check_credentials(self) -> bool:
        """
        Check if credentials are valid and not expired.
        
        Returns:
            True if credentials are valid
        """

    @property
    def service_name(self) -> str:
        """Kerberos service name."""

    @property
    def domain_name(self) -> str:
        """Kerberos domain name."""

    @property
    def principal(self) -> str:
        """Kerberos principal name."""

OAuth Credentials

OAuth 2.0 credentials for modern authentication workflows with token-based authentication and automatic token refresh capabilities.

class OAuthCredentials:
    def __init__(
        self,
        *,
        token_url: str,
        client_id: str,
        client_secret: str = None,
        scope: str = None,
        audience: str = None,
        grant_type: str = 'client_credentials',
        **kwargs
    ):
        """
        Create OAuth credentials for token-based authentication.
        
        Args:
            token_url: OAuth token endpoint URL
            client_id: OAuth client identifier
            client_secret: OAuth client secret
            scope: OAuth scope string
            audience: OAuth audience
            grant_type: OAuth grant type
        """

    async def get_token(self) -> str:
        """
        Get valid access token.
        
        Returns:
            Access token string
            
        Raises:
            AuthenticationError: If token acquisition fails
        """

    async def refresh_token(self) -> str:
        """
        Refresh access token.
        
        Returns:
            New access token string
        """

    def is_token_expired(self) -> bool:
        """
        Check if current token is expired.
        
        Returns:
            True if token needs refresh
        """

    @property
    def client_id(self) -> str:
        """OAuth client identifier."""

    @property
    def token_url(self) -> str:
        """OAuth token endpoint URL."""

Authentication Configuration

Utilities for configuring authentication at the application level with support for multiple credential types and broker-specific settings.

def configure_ssl(
    app: App,
    *,
    cafile: str = None,
    certfile: str = None,
    keyfile: str = None,
    password: str = None,
    context: ssl.SSLContext = None,
    **kwargs
) -> None:
    """
    Configure SSL authentication for application.
    
    Args:
        app: Faust application
        cafile: Certificate authority file
        certfile: Client certificate file
        keyfile: Client private key file
        password: Private key password
        context: Custom SSL context
    """

def configure_sasl(
    app: App,
    *,
    mechanism: str,
    username: str,
    password: str,
    **kwargs
) -> None:
    """
    Configure SASL authentication for application.
    
    Args:
        app: Faust application
        mechanism: SASL mechanism
        username: Authentication username
        password: Authentication password
    """

def configure_gssapi(
    app: App,
    *,
    service_name: str = 'kafka',
    domain_name: str = None,
    **kwargs
) -> None:
    """
    Configure GSSAPI authentication for application.
    
    Args:
        app: Faust application
        service_name: Kerberos service name
        domain_name: Kerberos domain name
    """

class AuthenticationError(Exception):
    """Raised when authentication fails."""
    pass

class CredentialsError(Exception):
    """Raised when credential validation fails."""
    pass

Usage Examples

SSL Authentication

import faust
import ssl

# Create SSL credentials
ssl_creds = faust.SSLCredentials(
    cafile='/path/to/ca-cert.pem',
    certfile='/path/to/client-cert.pem',
    keyfile='/path/to/client-key.pem',
    password='key-password'
)

# Application with SSL authentication
app = faust.App(
    'secure-app',
    broker='kafka://secure-broker:9093',
    ssl_credentials=ssl_creds
)

# Alternative: Configure SSL context directly
ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

app = faust.App(
    'secure-app',
    broker='kafka://secure-broker:9093',
    ssl_context=ssl_context
)

SASL Authentication

# SASL PLAIN authentication
sasl_creds = faust.SASLCredentials(
    mechanism='PLAIN',
    username='kafka-user',
    password='kafka-password'
)

app = faust.App(
    'sasl-app',
    broker='kafka://broker:9092',
    sasl_credentials=sasl_creds
)

# SCRAM-SHA-256 authentication
scram_creds = faust.ScramCredentials(
    username='kafka-user',
    password='kafka-password',
    mechanism='SCRAM-SHA-256'
)

app = faust.App(
    'scram-app',
    broker='kafka://broker:9092',
    sasl_credentials=scram_creds
)

Kerberos Authentication

# GSSAPI/Kerberos authentication
gssapi_creds = faust.GSSAPICredentials(
    kerberos_service_name='kafka',
    kerberos_domain_name='EXAMPLE.COM',
    principal='kafka-client@EXAMPLE.COM'
)

app = faust.App(
    'kerberos-app',
    broker='kafka://broker:9092',
    gssapi_credentials=gssapi_creds
)

# Acquire credentials before starting
@app.on_startup.connect
async def acquire_kerberos_ticket():
    gssapi_creds.acquire_credentials()
    print("Kerberos ticket acquired")

# Periodic ticket renewal
@app.timer(interval=3600.0)  # Renew every hour
async def renew_kerberos_ticket():
    if gssapi_creds.renew_credentials():
        print("Kerberos ticket renewed")

Combined SSL + SASL

# SSL transport with SASL authentication
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = True

sasl_creds = faust.SASLCredentials(
    mechanism='SCRAM-SHA-512',
    username='secure-user',
    password='secure-password',
    ssl_context=ssl_context
)

app = faust.App(
    'secure-sasl-app',
    broker='kafka://secure-broker:9093',
    ssl_context=ssl_context,
    sasl_credentials=sasl_creds
)

OAuth Authentication

# OAuth 2.0 authentication
oauth_creds = faust.OAuthCredentials(
    token_url='https://auth.example.com/oauth/token',
    client_id='kafka-client',
    client_secret='client-secret',
    scope='kafka:read kafka:write'
)

app = faust.App(
    'oauth-app',
    broker='kafka://broker:9092',
    oauth_credentials=oauth_creds
)

@app.on_startup.connect
async def get_initial_token():
    token = await oauth_creds.get_token()
    print(f"Initial token acquired: {token[:10]}...")

# Automatic token refresh
@app.timer(interval=1800.0)  # Refresh every 30 minutes
async def refresh_oauth_token():
    if oauth_creds.is_token_expired():
        token = await oauth_creds.refresh_token()
        print("OAuth token refreshed")

Environment-based Configuration

import os

def create_app_with_auth():
    auth_method = os.getenv('KAFKA_AUTH_METHOD', 'none')
    
    if auth_method == 'ssl':
        credentials = faust.SSLCredentials(
            cafile=os.getenv('KAFKA_CA_FILE'),
            certfile=os.getenv('KAFKA_CERT_FILE'),
            keyfile=os.getenv('KAFKA_KEY_FILE'),
            password=os.getenv('KAFKA_KEY_PASSWORD')
        )
        return faust.App(
            'env-app',
            broker=os.getenv('KAFKA_BROKER'),
            ssl_credentials=credentials
        )
    
    elif auth_method == 'sasl':
        credentials = faust.SASLCredentials(
            mechanism=os.getenv('KAFKA_SASL_MECHANISM', 'PLAIN'),
            username=os.getenv('KAFKA_USERNAME'),
            password=os.getenv('KAFKA_PASSWORD')
        )
        return faust.App(
            'env-app',
            broker=os.getenv('KAFKA_BROKER'),
            sasl_credentials=credentials
        )
    
    else:
        return faust.App(
            'env-app',
            broker=os.getenv('KAFKA_BROKER', 'kafka://localhost:9092')
        )

app = create_app_with_auth()

Error Handling

from faust import AuthenticationError, CredentialsError

@app.on_startup.connect
async def validate_credentials():
    try:
        # Validate credentials before starting
        if hasattr(app, 'ssl_credentials'):
            app.ssl_credentials.load_verify_locations()
        
        if hasattr(app, 'gssapi_credentials'):
            if not app.gssapi_credentials.check_credentials():
                app.gssapi_credentials.acquire_credentials()
                
    except CredentialsError as e:
        print(f"Credential validation failed: {e}")
        raise
    
    except AuthenticationError as e:
        print(f"Authentication failed: {e}")
        raise

Type Interfaces

from typing import Protocol, Optional
import ssl

class CredentialsT(Protocol):
    """Base type interface for all credentials."""
    pass

class SSLCredentialsT(CredentialsT, Protocol):
    """Type interface for SSL credentials."""
    
    context: ssl.SSLContext
    cafile: Optional[str]
    certfile: Optional[str]
    keyfile: Optional[str]
    
    def load_verify_locations(self, **kwargs) -> None: ...
    def load_cert_chain(self, certfile: str, **kwargs) -> None: ...

class SASLCredentialsT(CredentialsT, Protocol):
    """Type interface for SASL credentials."""
    
    mechanism: str
    username: str
    password: str
    ssl_context: Optional[ssl.SSLContext]
    
    def create_authenticator(self) -> callable: ...

class GSSAPICredentialsT(CredentialsT, Protocol):
    """Type interface for GSSAPI credentials."""
    
    service_name: str
    domain_name: Optional[str]
    principal: Optional[str]
    
    def acquire_credentials(self) -> None: ...
    def renew_credentials(self) -> bool: ...
    def check_credentials(self) -> bool: ...

Install with Tessl CLI

npx tessl i tessl/pypi-faust

docs

authentication.md

cli-framework.md

core-application.md

data-management.md

index.md

monitoring.md

serialization.md

stream-processing.md

topics-channels.md

windowing.md

worker-management.md

tile.json