Python APIs and tools for Matter (Project CHIP) protocol implementation, specifically the chip clusters functionality used by Home Assistant for Matter device control and communication
—
Security and cryptographic functionality including certificate management, fabric credentials, and secure communication setup.
Manage certificate authorities and certificate generation for Matter fabrics.
class CertificateAuthority:
"""Certificate Authority for Matter fabric certificate management."""
def __init__(self, ca_cert: bytes = None, ca_key: bytes = None):
"""
Initialize Certificate Authority.
Parameters:
- ca_cert: CA certificate in DER format (optional, generates new if None)
- ca_key: CA private key in DER format (optional, generates new if None)
"""
...
def generate_noc_chain(
self,
csr: bytes,
fabric_id: int,
node_id: int,
cat_tags: list = None
) -> dict:
"""
Generate Node Operational Certificate chain.
Parameters:
- csr: Certificate Signing Request in DER format
- fabric_id: Target fabric ID
- node_id: Target node ID
- cat_tags: CASE Authenticated Tags (optional)
Returns:
Dictionary containing 'noc', 'icac', and 'rcac' certificates in DER format
"""
...
def get_root_cert(self) -> bytes:
"""
Get the root certificate.
Returns:
Root certificate in DER format
"""
...
def get_intermediate_cert(self) -> bytes:
"""
Get the intermediate certificate.
Returns:
Intermediate certificate in DER format (may be None)
"""
...
def verify_certificate(self, cert: bytes) -> bool:
"""
Verify a certificate against this CA.
Parameters:
- cert: Certificate to verify in DER format
Returns:
True if certificate is valid
"""
...
def revoke_certificate(self, cert: bytes) -> bool:
"""
Revoke a certificate.
Parameters:
- cert: Certificate to revoke in DER format
Returns:
True if revocation successful
"""
...
def get_crl(self) -> bytes:
"""
Get Certificate Revocation List.
Returns:
CRL in DER format
"""
...Manage fabric credentials and administrative operations.
class FabricAdmin:
"""Fabric administration and credential management."""
def __init__(
self,
fabricId: int,
vendorId: int = 0xFFF1,
caStorage: dict = None
):
"""
Initialize Fabric Administrator.
Parameters:
- fabricId: Unique fabric identifier
- vendorId: Vendor ID for this fabric
- caStorage: Certificate Authority storage configuration
"""
...
def generate_controller_noc_chain(
self,
node_id: int,
cat_tags: list = None
) -> dict:
"""
Generate NOC chain for a controller.
Parameters:
- node_id: Controller node ID
- cat_tags: CASE Authenticated Tags (optional)
Returns:
Dictionary with 'noc', 'icac', 'rcac', and 'private_key'
"""
...
def get_fabric_id(self) -> int:
"""
Get the fabric ID.
Returns:
Fabric ID
"""
...
def get_vendor_id(self) -> int:
"""
Get the vendor ID.
Returns:
Vendor ID
"""
...
def get_root_certificate(self) -> bytes:
"""
Get the fabric root certificate.
Returns:
Root certificate in DER format
"""
...
def get_intermediate_certificate(self) -> bytes:
"""
Get the fabric intermediate certificate.
Returns:
Intermediate certificate in DER format (may be None)
"""
...
def set_root_certificate(self, cert: bytes):
"""
Set the fabric root certificate.
Parameters:
- cert: Root certificate in DER format
"""
...
def set_intermediate_certificate(self, cert: bytes):
"""
Set the fabric intermediate certificate.
Parameters:
- cert: Intermediate certificate in DER format
"""
...
def create_controller_credentials(
self,
controller_node_id: int,
cat_tags: list = None
) -> dict:
"""
Create complete credentials for a controller.
Parameters:
- controller_node_id: Node ID for the controller
- cat_tags: CASE Authenticated Tags (optional)
Returns:
Dictionary with all credential components
"""
...Low-level cryptographic operations for Matter protocol.
class ChipCrypto:
"""Cryptographic operations for CHIP protocol."""
@staticmethod
def generate_keypair() -> tuple:
"""
Generate an EC P-256 key pair.
Returns:
Tuple of (private_key_bytes, public_key_bytes)
"""
...
@staticmethod
def sign_message(private_key: bytes, message: bytes) -> bytes:
"""
Sign a message using ECDSA.
Parameters:
- private_key: Private key in DER format
- message: Message to sign
Returns:
Signature bytes
"""
...
@staticmethod
def verify_signature(
public_key: bytes,
message: bytes,
signature: bytes
) -> bool:
"""
Verify an ECDSA signature.
Parameters:
- public_key: Public key in DER format
- message: Original message
- signature: Signature to verify
Returns:
True if signature is valid
"""
...
@staticmethod
def encrypt_message(key: bytes, nonce: bytes, plaintext: bytes) -> bytes:
"""
Encrypt a message using AES-CCM.
Parameters:
- key: Encryption key (16 bytes)
- nonce: Nonce for encryption (13 bytes)
- plaintext: Data to encrypt
Returns:
Encrypted data with authentication tag
"""
...
@staticmethod
def decrypt_message(key: bytes, nonce: bytes, ciphertext: bytes) -> bytes:
"""
Decrypt a message using AES-CCM.
Parameters:
- key: Decryption key (16 bytes)
- nonce: Nonce used for encryption (13 bytes)
- ciphertext: Encrypted data with authentication tag
Returns:
Decrypted plaintext
Raises:
ChipStackException: If decryption fails or authentication fails
"""
...
@staticmethod
def derive_key(shared_secret: bytes, salt: bytes, info: bytes) -> bytes:
"""
Derive key using HKDF.
Parameters:
- shared_secret: Shared secret from key exchange
- salt: Salt value
- info: Context information
Returns:
Derived key bytes
"""
...
@staticmethod
def hash_message(message: bytes) -> bytes:
"""
Hash a message using SHA-256.
Parameters:
- message: Data to hash
Returns:
SHA-256 hash bytes (32 bytes)
"""
...
@staticmethod
def generate_random(length: int) -> bytes:
"""
Generate cryptographically secure random bytes.
Parameters:
- length: Number of random bytes to generate
Returns:
Random bytes
"""
...Validate and process Matter certificates.
class CertificateValidator:
"""Certificate validation for Matter protocol."""
def __init__(self, paa_trust_store: list = None):
"""
Initialize certificate validator.
Parameters:
- paa_trust_store: List of trusted PAA certificates in DER format
"""
...
def validate_device_attestation_certificate(
self,
dac: bytes,
pai: bytes,
paa: bytes = None
) -> bool:
"""
Validate Device Attestation Certificate chain.
Parameters:
- dac: Device Attestation Certificate in DER format
- pai: Product Attestation Intermediate certificate in DER format
- paa: Product Attestation Authority certificate in DER format (optional)
Returns:
True if certificate chain is valid
"""
...
def validate_operational_certificate(
self,
noc: bytes,
icac: bytes = None,
rcac: bytes = None
) -> bool:
"""
Validate Node Operational Certificate chain.
Parameters:
- noc: Node Operational Certificate in DER format
- icac: Intermediate CA certificate in DER format (optional)
- rcac: Root CA certificate in DER format (optional)
Returns:
True if certificate chain is valid
"""
...
def extract_fabric_id(self, noc: bytes) -> int:
"""
Extract fabric ID from NOC certificate.
Parameters:
- noc: Node Operational Certificate in DER format
Returns:
Fabric ID or None if not found
"""
...
def extract_node_id(self, noc: bytes) -> int:
"""
Extract node ID from NOC certificate.
Parameters:
- noc: Node Operational Certificate in DER format
Returns:
Node ID or None if not found
"""
...
def extract_cat_tags(self, noc: bytes) -> list:
"""
Extract CASE Authenticated Tags from NOC certificate.
Parameters:
- noc: Node Operational Certificate in DER format
Returns:
List of CAT tag values
"""
...
def is_certificate_expired(self, cert: bytes) -> bool:
"""
Check if a certificate is expired.
Parameters:
- cert: Certificate in DER format
Returns:
True if certificate is expired
"""
...
def get_certificate_expiry(self, cert: bytes) -> int:
"""
Get certificate expiration timestamp.
Parameters:
- cert: Certificate in DER format
Returns:
Unix timestamp of expiration
"""
...Manage secure sessions and encryption keys.
class SecureSessionManager:
"""Secure session management for Matter communications."""
def __init__(self):
"""Initialize secure session manager."""
...
def establish_case_session(
self,
peer_node_id: int,
fabric_id: int,
credentials: dict
) -> dict:
"""
Establish CASE (Certificate Authenticated Session Establishment) session.
Parameters:
- peer_node_id: Peer device node ID
- fabric_id: Fabric ID
- credentials: Dictionary with 'noc', 'icac', 'rcac', 'private_key'
Returns:
Session information dictionary
"""
...
def establish_pase_session(
self,
setup_pin_code: int,
discriminator: int
) -> dict:
"""
Establish PASE (Password Authenticated Session Establishment) session.
Parameters:
- setup_pin_code: Device setup PIN code
- discriminator: Device discriminator
Returns:
Session information dictionary
"""
...
def get_session_keys(self, session_id: int) -> dict:
"""
Get encryption keys for a session.
Parameters:
- session_id: Session identifier
Returns:
Dictionary with encryption and decryption keys
"""
...
def close_session(self, session_id: int) -> bool:
"""
Close a secure session.
Parameters:
- session_id: Session identifier
Returns:
True if session closed successfully
"""
...
def is_session_active(self, session_id: int) -> bool:
"""
Check if a session is active.
Parameters:
- session_id: Session identifier
Returns:
True if session is active
"""
...
def refresh_session(self, session_id: int) -> bool:
"""
Refresh an existing session.
Parameters:
- session_id: Session identifier
Returns:
True if refresh successful
"""
...from chip.CertificateAuthority import CertificateAuthority
from chip.FabricAdmin import FabricAdmin
# Create a new Certificate Authority
ca = CertificateAuthority()
# Get the root certificate for distribution
root_cert = ca.get_root_cert()
print(f"Root certificate length: {len(root_cert)} bytes")
# Create a fabric administrator
fabric_admin = FabricAdmin(fabricId=1, vendorId=0x1234)
# Generate controller credentials
controller_creds = fabric_admin.generate_controller_noc_chain(
node_id=12345,
cat_tags=[0x00010001, 0x00020002] # Example CAT tags
)
print("Controller credentials generated:")
print(f" NOC length: {len(controller_creds['noc'])} bytes")
print(f" Private key length: {len(controller_creds['private_key'])} bytes")
print(f" Root cert length: {len(controller_creds['rcac'])} bytes")
# Use credentials with a device controller
from chip.ChipDeviceCtrl import ChipDeviceController
controller = ChipDeviceController(
controllerNodeId=12345,
fabricAdmin=fabric_admin
)from chip.crypto import CertificateValidator
# Initialize validator with trusted PAA certificates
paa_trust_store = [
# Load PAA certificates from trust store
open("/path/to/paa1.der", "rb").read(),
open("/path/to/paa2.der", "rb").read()
]
validator = CertificateValidator(paa_trust_store=paa_trust_store)
# Validate device attestation during commissioning
# (These would typically come from the device during commissioning)
device_dac = b"..." # Device Attestation Certificate
product_pai = b"..." # Product Attestation Intermediate
product_paa = b"..." # Product Attestation Authority
is_valid = validator.validate_device_attestation_certificate(
dac=device_dac,
pai=product_pai,
paa=product_paa
)
if is_valid:
print("Device attestation certificate is valid")
# Extract information from certificates
fabric_id = validator.extract_fabric_id(device_dac)
node_id = validator.extract_node_id(device_dac)
cat_tags = validator.extract_cat_tags(device_dac)
print(f"Fabric ID: {fabric_id}")
print(f"Node ID: {node_id}")
print(f"CAT Tags: {cat_tags}")
# Check expiration
if validator.is_certificate_expired(device_dac):
print("WARNING: Device certificate is expired")
else:
expiry = validator.get_certificate_expiry(device_dac)
print(f"Certificate expires: {expiry}")
else:
print("Device attestation certificate validation failed")from chip.crypto import ChipCrypto
# Generate a key pair
private_key, public_key = ChipCrypto.generate_keypair()
print(f"Generated key pair - Private: {len(private_key)} bytes, Public: {len(public_key)} bytes")
# Sign and verify a message
message = b"Hello, Matter!"
signature = ChipCrypto.sign_message(private_key, message)
print(f"Signature length: {len(signature)} bytes")
is_valid = ChipCrypto.verify_signature(public_key, message, signature)
print(f"Signature valid: {is_valid}")
# Encrypt and decrypt data
encryption_key = ChipCrypto.generate_random(16) # 128-bit key
nonce = ChipCrypto.generate_random(13) # 104-bit nonce
plaintext = b"Secret device configuration data"
encrypted = ChipCrypto.encrypt_message(encryption_key, nonce, plaintext)
print(f"Encrypted data length: {len(encrypted)} bytes")
decrypted = ChipCrypto.decrypt_message(encryption_key, nonce, encrypted)
print(f"Decrypted: {decrypted}")
print(f"Decryption successful: {decrypted == plaintext}")
# Key derivation
shared_secret = ChipCrypto.generate_random(32)
salt = ChipCrypto.generate_random(16)
info = b"Matter Key Derivation"
derived_key = ChipCrypto.derive_key(shared_secret, salt, info)
print(f"Derived key length: {len(derived_key)} bytes")
# Hash a message
message_hash = ChipCrypto.hash_message(message)
print(f"SHA-256 hash: {message_hash.hex()}")from chip.crypto import SecureSessionManager
# Initialize session manager
session_mgr = SecureSessionManager()
# Establish CASE session (for operational devices)
credentials = {
'noc': controller_creds['noc'],
'icac': controller_creds.get('icac'),
'rcac': controller_creds['rcac'],
'private_key': controller_creds['private_key']
}
case_session = session_mgr.establish_case_session(
peer_node_id=1,
fabric_id=1,
credentials=credentials
)
if case_session:
print(f"CASE session established: {case_session['session_id']}")
# Get session keys
session_keys = session_mgr.get_session_keys(case_session['session_id'])
print(f"Session keys available: {list(session_keys.keys())}")
# Check if session is active
is_active = session_mgr.is_session_active(case_session['session_id'])
print(f"Session active: {is_active}")
# Establish PASE session (for commissioning)
pase_session = session_mgr.establish_pase_session(
setup_pin_code=20202021,
discriminator=3840
)
if pase_session:
print(f"PASE session established: {pase_session['session_id']}")
# Use PASE session for commissioning operations
# After commissioning is complete, close the session
session_mgr.close_session(pase_session['session_id'])
print("PASE session closed")from chip.CertificateAuthority import CertificateAuthority
from chip.FabricAdmin import FabricAdmin
from chip.ChipDeviceCtrl import ChipDeviceController
from chip.crypto import CertificateValidator
# 1. Set up Certificate Authority
ca = CertificateAuthority()
root_cert = ca.get_root_cert()
# 2. Create Fabric Administrator
fabric_admin = FabricAdmin(fabricId=1, vendorId=0x1234)
fabric_admin.set_root_certificate(root_cert)
# 3. Generate controller credentials
controller_creds = fabric_admin.generate_controller_noc_chain(
node_id=12345,
cat_tags=[0x00010001] # Admin access tag
)
# 4. Initialize Device Controller
controller = ChipDeviceController(
controllerNodeId=12345,
fabricAdmin=fabric_admin
)
# 5. Set up certificate validator
validator = CertificateValidator()
try:
# 6. Commission a device
success = controller.CommissionOnNetwork(
nodeId=1,
setupPinCode=20202021
)
if success:
print("Device commissioned successfully")
# 7. Validate the operational certificate created during commissioning
# (This would be available from the device controller after commissioning)
device_noc = controller.GetDeviceCredentials(nodeId=1)['noc']
is_valid = validator.validate_operational_certificate(
noc=device_noc,
rcac=root_cert
)
if is_valid:
print("Device operational certificate is valid")
# Extract device information
device_fabric_id = validator.extract_fabric_id(device_noc)
device_node_id = validator.extract_node_id(device_noc)
print(f"Device is in fabric {device_fabric_id} with node ID {device_node_id}")
else:
print("WARNING: Device operational certificate validation failed")
finally:
controller.Shutdown()Install with Tessl CLI
npx tessl i tessl/pypi-home-assistant-chip-clusters