CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-home-assistant-chip-clusters

Python APIs and tools for Matter (Project CHIP) protocol implementation, specifically the chip clusters functionality used by Home Assistant for Matter device control and communication

Pending
Overview
Eval results
Files

crypto-credentials.mddocs/

Crypto and Credentials

Security and cryptographic functionality including certificate management, fabric credentials, and secure communication setup.

Capabilities

Certificate Authority Management

Manage certificate authorities and certificate generation for Matter fabrics.

class CertificateAuthority:
    """Certificate Authority for Matter fabric certificate management."""
    
    def __init__(self, ca_cert: bytes = None, ca_key: bytes = None):
        """
        Initialize Certificate Authority.
        
        Parameters:
        - ca_cert: CA certificate in DER format (optional, generates new if None)
        - ca_key: CA private key in DER format (optional, generates new if None)
        """
        ...
    
    def generate_noc_chain(
        self,
        csr: bytes,
        fabric_id: int,
        node_id: int,
        cat_tags: list = None
    ) -> dict:
        """
        Generate Node Operational Certificate chain.
        
        Parameters:
        - csr: Certificate Signing Request in DER format
        - fabric_id: Target fabric ID
        - node_id: Target node ID
        - cat_tags: CASE Authenticated Tags (optional)
        
        Returns:
        Dictionary containing 'noc', 'icac', and 'rcac' certificates in DER format
        """
        ...
    
    def get_root_cert(self) -> bytes:
        """
        Get the root certificate.
        
        Returns:
        Root certificate in DER format
        """
        ...
    
    def get_intermediate_cert(self) -> bytes:
        """
        Get the intermediate certificate.
        
        Returns:
        Intermediate certificate in DER format (may be None)
        """
        ...
    
    def verify_certificate(self, cert: bytes) -> bool:
        """
        Verify a certificate against this CA.
        
        Parameters:
        - cert: Certificate to verify in DER format
        
        Returns:
        True if certificate is valid
        """
        ...
    
    def revoke_certificate(self, cert: bytes) -> bool:
        """
        Revoke a certificate.
        
        Parameters:
        - cert: Certificate to revoke in DER format
        
        Returns:
        True if revocation successful
        """
        ...
    
    def get_crl(self) -> bytes:
        """
        Get Certificate Revocation List.
        
        Returns:
        CRL in DER format
        """
        ...

Fabric Administration

Manage fabric credentials and administrative operations.

class FabricAdmin:
    """Fabric administration and credential management."""
    
    def __init__(
        self,
        fabricId: int,
        vendorId: int = 0xFFF1,
        caStorage: dict = None
    ):
        """
        Initialize Fabric Administrator.
        
        Parameters:
        - fabricId: Unique fabric identifier
        - vendorId: Vendor ID for this fabric
        - caStorage: Certificate Authority storage configuration
        """
        ...
    
    def generate_controller_noc_chain(
        self,
        node_id: int,
        cat_tags: list = None
    ) -> dict:
        """
        Generate NOC chain for a controller.
        
        Parameters:
        - node_id: Controller node ID
        - cat_tags: CASE Authenticated Tags (optional)
        
        Returns:
        Dictionary with 'noc', 'icac', 'rcac', and 'private_key'
        """
        ...
    
    def get_fabric_id(self) -> int:
        """
        Get the fabric ID.
        
        Returns:
        Fabric ID
        """
        ...
    
    def get_vendor_id(self) -> int:
        """
        Get the vendor ID.
        
        Returns:
        Vendor ID
        """
        ...
    
    def get_root_certificate(self) -> bytes:
        """
        Get the fabric root certificate.
        
        Returns:
        Root certificate in DER format
        """
        ...
    
    def get_intermediate_certificate(self) -> bytes:
        """
        Get the fabric intermediate certificate.
        
        Returns:
        Intermediate certificate in DER format (may be None)
        """
        ...
    
    def set_root_certificate(self, cert: bytes):
        """
        Set the fabric root certificate.
        
        Parameters:
        - cert: Root certificate in DER format
        """
        ...
    
    def set_intermediate_certificate(self, cert: bytes):
        """
        Set the fabric intermediate certificate.
        
        Parameters:
        - cert: Intermediate certificate in DER format
        """
        ...
    
    def create_controller_credentials(
        self,
        controller_node_id: int,
        cat_tags: list = None
    ) -> dict:
        """
        Create complete credentials for a controller.
        
        Parameters:
        - controller_node_id: Node ID for the controller
        - cat_tags: CASE Authenticated Tags (optional)
        
        Returns:
        Dictionary with all credential components
        """
        ...

Cryptographic Operations

Low-level cryptographic operations for Matter protocol.

class ChipCrypto:
    """Cryptographic operations for CHIP protocol."""
    
    @staticmethod
    def generate_keypair() -> tuple:
        """
        Generate an EC P-256 key pair.
        
        Returns:
        Tuple of (private_key_bytes, public_key_bytes)
        """
        ...
    
    @staticmethod
    def sign_message(private_key: bytes, message: bytes) -> bytes:
        """
        Sign a message using ECDSA.
        
        Parameters:
        - private_key: Private key in DER format
        - message: Message to sign
        
        Returns:
        Signature bytes
        """
        ...
    
    @staticmethod
    def verify_signature(
        public_key: bytes,
        message: bytes,
        signature: bytes
    ) -> bool:
        """
        Verify an ECDSA signature.
        
        Parameters:
        - public_key: Public key in DER format
        - message: Original message
        - signature: Signature to verify
        
        Returns:
        True if signature is valid
        """
        ...
    
    @staticmethod
    def encrypt_message(key: bytes, nonce: bytes, plaintext: bytes) -> bytes:
        """
        Encrypt a message using AES-CCM.
        
        Parameters:
        - key: Encryption key (16 bytes)
        - nonce: Nonce for encryption (13 bytes)
        - plaintext: Data to encrypt
        
        Returns:
        Encrypted data with authentication tag
        """
        ...
    
    @staticmethod
    def decrypt_message(key: bytes, nonce: bytes, ciphertext: bytes) -> bytes:
        """
        Decrypt a message using AES-CCM.
        
        Parameters:
        - key: Decryption key (16 bytes)
        - nonce: Nonce used for encryption (13 bytes)
        - ciphertext: Encrypted data with authentication tag
        
        Returns:
        Decrypted plaintext
        
        Raises:
        ChipStackException: If decryption fails or authentication fails
        """
        ...
    
    @staticmethod
    def derive_key(shared_secret: bytes, salt: bytes, info: bytes) -> bytes:
        """
        Derive key using HKDF.
        
        Parameters:
        - shared_secret: Shared secret from key exchange
        - salt: Salt value
        - info: Context information
        
        Returns:
        Derived key bytes
        """
        ...
    
    @staticmethod
    def hash_message(message: bytes) -> bytes:
        """
        Hash a message using SHA-256.
        
        Parameters:
        - message: Data to hash
        
        Returns:
        SHA-256 hash bytes (32 bytes)
        """
        ...
    
    @staticmethod
    def generate_random(length: int) -> bytes:
        """
        Generate cryptographically secure random bytes.
        
        Parameters:
        - length: Number of random bytes to generate
        
        Returns:
        Random bytes
        """
        ...

Certificate Validation

Validate and process Matter certificates.

class CertificateValidator:
    """Certificate validation for Matter protocol."""
    
    def __init__(self, paa_trust_store: list = None):
        """
        Initialize certificate validator.
        
        Parameters:
        - paa_trust_store: List of trusted PAA certificates in DER format
        """
        ...
    
    def validate_device_attestation_certificate(
        self,
        dac: bytes,
        pai: bytes,
        paa: bytes = None
    ) -> bool:
        """
        Validate Device Attestation Certificate chain.
        
        Parameters:
        - dac: Device Attestation Certificate in DER format
        - pai: Product Attestation Intermediate certificate in DER format
        - paa: Product Attestation Authority certificate in DER format (optional)
        
        Returns:
        True if certificate chain is valid
        """
        ...
    
    def validate_operational_certificate(
        self,
        noc: bytes,
        icac: bytes = None,
        rcac: bytes = None
    ) -> bool:
        """
        Validate Node Operational Certificate chain.
        
        Parameters:
        - noc: Node Operational Certificate in DER format
        - icac: Intermediate CA certificate in DER format (optional)
        - rcac: Root CA certificate in DER format (optional)
        
        Returns:
        True if certificate chain is valid
        """
        ...
    
    def extract_fabric_id(self, noc: bytes) -> int:
        """
        Extract fabric ID from NOC certificate.
        
        Parameters:
        - noc: Node Operational Certificate in DER format
        
        Returns:
        Fabric ID or None if not found
        """
        ...
    
    def extract_node_id(self, noc: bytes) -> int:
        """
        Extract node ID from NOC certificate.
        
        Parameters:
        - noc: Node Operational Certificate in DER format
        
        Returns:
        Node ID or None if not found
        """
        ...
    
    def extract_cat_tags(self, noc: bytes) -> list:
        """
        Extract CASE Authenticated Tags from NOC certificate.
        
        Parameters:
        - noc: Node Operational Certificate in DER format
        
        Returns:
        List of CAT tag values
        """
        ...
    
    def is_certificate_expired(self, cert: bytes) -> bool:
        """
        Check if a certificate is expired.
        
        Parameters:
        - cert: Certificate in DER format
        
        Returns:
        True if certificate is expired
        """
        ...
    
    def get_certificate_expiry(self, cert: bytes) -> int:
        """
        Get certificate expiration timestamp.
        
        Parameters:
        - cert: Certificate in DER format
        
        Returns:
        Unix timestamp of expiration
        """
        ...

Secure Session Management

Manage secure sessions and encryption keys.

class SecureSessionManager:
    """Secure session management for Matter communications."""
    
    def __init__(self):
        """Initialize secure session manager."""
        ...
    
    def establish_case_session(
        self,
        peer_node_id: int,
        fabric_id: int,
        credentials: dict
    ) -> dict:
        """
        Establish CASE (Certificate Authenticated Session Establishment) session.
        
        Parameters:
        - peer_node_id: Peer device node ID
        - fabric_id: Fabric ID
        - credentials: Dictionary with 'noc', 'icac', 'rcac', 'private_key'
        
        Returns:
        Session information dictionary
        """
        ...
    
    def establish_pase_session(
        self,
        setup_pin_code: int,
        discriminator: int
    ) -> dict:
        """
        Establish PASE (Password Authenticated Session Establishment) session.
        
        Parameters:
        - setup_pin_code: Device setup PIN code
        - discriminator: Device discriminator
        
        Returns:
        Session information dictionary
        """
        ...
    
    def get_session_keys(self, session_id: int) -> dict:
        """
        Get encryption keys for a session.
        
        Parameters:
        - session_id: Session identifier
        
        Returns:
        Dictionary with encryption and decryption keys
        """
        ...
    
    def close_session(self, session_id: int) -> bool:
        """
        Close a secure session.
        
        Parameters:
        - session_id: Session identifier
        
        Returns:
        True if session closed successfully
        """
        ...
    
    def is_session_active(self, session_id: int) -> bool:
        """
        Check if a session is active.
        
        Parameters:
        - session_id: Session identifier
        
        Returns:
        True if session is active
        """
        ...
    
    def refresh_session(self, session_id: int) -> bool:
        """
        Refresh an existing session.
        
        Parameters:
        - session_id: Session identifier
        
        Returns:
        True if refresh successful
        """
        ...

Usage Examples

Certificate Authority Setup

from chip.CertificateAuthority import CertificateAuthority
from chip.FabricAdmin import FabricAdmin

# Create a new Certificate Authority
ca = CertificateAuthority()

# Get the root certificate for distribution
root_cert = ca.get_root_cert()
print(f"Root certificate length: {len(root_cert)} bytes")

# Create a fabric administrator
fabric_admin = FabricAdmin(fabricId=1, vendorId=0x1234)

# Generate controller credentials
controller_creds = fabric_admin.generate_controller_noc_chain(
    node_id=12345,
    cat_tags=[0x00010001, 0x00020002]  # Example CAT tags
)

print("Controller credentials generated:")
print(f"  NOC length: {len(controller_creds['noc'])} bytes")
print(f"  Private key length: {len(controller_creds['private_key'])} bytes")
print(f"  Root cert length: {len(controller_creds['rcac'])} bytes")

# Use credentials with a device controller
from chip.ChipDeviceCtrl import ChipDeviceController

controller = ChipDeviceController(
    controllerNodeId=12345,
    fabricAdmin=fabric_admin
)

Device Attestation Validation

from chip.crypto import CertificateValidator

# Initialize validator with trusted PAA certificates
paa_trust_store = [
    # Load PAA certificates from trust store
    open("/path/to/paa1.der", "rb").read(),
    open("/path/to/paa2.der", "rb").read()
]

validator = CertificateValidator(paa_trust_store=paa_trust_store)

# Validate device attestation during commissioning
# (These would typically come from the device during commissioning)
device_dac = b"..."  # Device Attestation Certificate
product_pai = b"..."  # Product Attestation Intermediate
product_paa = b"..."  # Product Attestation Authority

is_valid = validator.validate_device_attestation_certificate(
    dac=device_dac,
    pai=product_pai,
    paa=product_paa
)

if is_valid:
    print("Device attestation certificate is valid")
    
    # Extract information from certificates
    fabric_id = validator.extract_fabric_id(device_dac)
    node_id = validator.extract_node_id(device_dac)
    cat_tags = validator.extract_cat_tags(device_dac)
    
    print(f"Fabric ID: {fabric_id}")
    print(f"Node ID: {node_id}")
    print(f"CAT Tags: {cat_tags}")
    
    # Check expiration
    if validator.is_certificate_expired(device_dac):
        print("WARNING: Device certificate is expired")
    else:
        expiry = validator.get_certificate_expiry(device_dac)
        print(f"Certificate expires: {expiry}")
else:
    print("Device attestation certificate validation failed")

Cryptographic Operations

from chip.crypto import ChipCrypto

# Generate a key pair
private_key, public_key = ChipCrypto.generate_keypair()
print(f"Generated key pair - Private: {len(private_key)} bytes, Public: {len(public_key)} bytes")

# Sign and verify a message
message = b"Hello, Matter!"
signature = ChipCrypto.sign_message(private_key, message)
print(f"Signature length: {len(signature)} bytes")

is_valid = ChipCrypto.verify_signature(public_key, message, signature)
print(f"Signature valid: {is_valid}")

# Encrypt and decrypt data
encryption_key = ChipCrypto.generate_random(16)  # 128-bit key
nonce = ChipCrypto.generate_random(13)  # 104-bit nonce
plaintext = b"Secret device configuration data"

encrypted = ChipCrypto.encrypt_message(encryption_key, nonce, plaintext)
print(f"Encrypted data length: {len(encrypted)} bytes")

decrypted = ChipCrypto.decrypt_message(encryption_key, nonce, encrypted)
print(f"Decrypted: {decrypted}")
print(f"Decryption successful: {decrypted == plaintext}")

# Key derivation
shared_secret = ChipCrypto.generate_random(32)
salt = ChipCrypto.generate_random(16)
info = b"Matter Key Derivation"

derived_key = ChipCrypto.derive_key(shared_secret, salt, info)
print(f"Derived key length: {len(derived_key)} bytes")

# Hash a message
message_hash = ChipCrypto.hash_message(message)
print(f"SHA-256 hash: {message_hash.hex()}")

Secure Session Establishment

from chip.crypto import SecureSessionManager

# Initialize session manager
session_mgr = SecureSessionManager()

# Establish CASE session (for operational devices)
credentials = {
    'noc': controller_creds['noc'],
    'icac': controller_creds.get('icac'),
    'rcac': controller_creds['rcac'],
    'private_key': controller_creds['private_key']
}

case_session = session_mgr.establish_case_session(
    peer_node_id=1,
    fabric_id=1,
    credentials=credentials
)

if case_session:
    print(f"CASE session established: {case_session['session_id']}")
    
    # Get session keys
    session_keys = session_mgr.get_session_keys(case_session['session_id'])
    print(f"Session keys available: {list(session_keys.keys())}")
    
    # Check if session is active
    is_active = session_mgr.is_session_active(case_session['session_id'])
    print(f"Session active: {is_active}")

# Establish PASE session (for commissioning)
pase_session = session_mgr.establish_pase_session(
    setup_pin_code=20202021,
    discriminator=3840
)

if pase_session:
    print(f"PASE session established: {pase_session['session_id']}")
    
    # Use PASE session for commissioning operations
    # After commissioning is complete, close the session
    session_mgr.close_session(pase_session['session_id'])
    print("PASE session closed")

Complete Fabric and Device Setup

from chip.CertificateAuthority import CertificateAuthority
from chip.FabricAdmin import FabricAdmin
from chip.ChipDeviceCtrl import ChipDeviceController
from chip.crypto import CertificateValidator

# 1. Set up Certificate Authority
ca = CertificateAuthority()
root_cert = ca.get_root_cert()

# 2. Create Fabric Administrator
fabric_admin = FabricAdmin(fabricId=1, vendorId=0x1234)
fabric_admin.set_root_certificate(root_cert)

# 3. Generate controller credentials
controller_creds = fabric_admin.generate_controller_noc_chain(
    node_id=12345,
    cat_tags=[0x00010001]  # Admin access tag
)

# 4. Initialize Device Controller
controller = ChipDeviceController(
    controllerNodeId=12345,
    fabricAdmin=fabric_admin
)

# 5. Set up certificate validator
validator = CertificateValidator()

try:
    # 6. Commission a device
    success = controller.CommissionOnNetwork(
        nodeId=1,
        setupPinCode=20202021
    )
    
    if success:
        print("Device commissioned successfully")
        
        # 7. Validate the operational certificate created during commissioning
        # (This would be available from the device controller after commissioning)
        device_noc = controller.GetDeviceCredentials(nodeId=1)['noc']
        
        is_valid = validator.validate_operational_certificate(
            noc=device_noc,
            rcac=root_cert
        )
        
        if is_valid:
            print("Device operational certificate is valid")
            
            # Extract device information
            device_fabric_id = validator.extract_fabric_id(device_noc)
            device_node_id = validator.extract_node_id(device_noc)
            
            print(f"Device is in fabric {device_fabric_id} with node ID {device_node_id}")
        else:
            print("WARNING: Device operational certificate validation failed")

finally:
    controller.Shutdown()

Install with Tessl CLI

npx tessl i tessl/pypi-home-assistant-chip-clusters

docs

ble-discovery.md

clusters.md

crypto-credentials.md

device-controller.md

index.md

stack-management.md

storage.md

tlv-data.md

tile.json