Python APIs and tools for Matter (Project CHIP) protocol implementation, specifically the chip clusters functionality used by Home Assistant for Matter device control and communication
—
Core Matter stack initialization, configuration, and lifecycle management providing the foundation for all Matter protocol operations.
Initialize and manage the core Matter protocol stack.
class ChipStack:
"""Core CHIP stack for Matter protocol operations."""
def __init__(
self,
persistentStoragePath: str = None,
enableServerInteractions: bool = True,
bluetoothAdapter: int = None,
**kwargs
):
"""
Initialize the CHIP stack.
Parameters:
- persistentStoragePath: Path for persistent storage (None for in-memory)
- enableServerInteractions: Enable server-side interaction handling
- bluetoothAdapter: Bluetooth adapter ID (platform-specific)
- kwargs: Additional stack configuration options
"""
...
def Call(self, f):
"""
Execute a function on the CHIP stack thread.
Parameters:
- f: Function to execute
Returns:
Function return value
"""
...
def CallAsync(self, f):
"""
Execute an async function on the CHIP stack thread.
Parameters:
- f: Async function to execute
Returns:
Awaitable result
"""
...
def Shutdown(self):
"""
Clean shutdown of the CHIP stack.
Stops all operations and releases resources.
"""
...
def GetStorageManager(self):
"""
Get the storage manager instance.
Returns:
Storage manager object
"""
...
def GetFabricTable(self):
"""
Get the fabric table instance.
Returns:
Fabric table object
"""
...
def GetCASESessionManager(self):
"""
Get the CASE session manager.
Returns:
CASE session manager object
"""
...
def GetPASESessionManager(self):
"""
Get the PASE session manager.
Returns:
PASE session manager object
"""
...Manage device proxy objects for communicating with Matter devices.
class DeviceProxyWrapper:
"""Wrapper for device proxy objects."""
def __init__(self, device):
"""
Initialize device proxy wrapper.
Parameters:
- device: Native device proxy object
"""
...
def GetDevice(self):
"""
Get the underlying device proxy.
Returns:
Native device proxy object
"""
...
def GetDeviceId(self) -> int:
"""
Get the device node ID.
Returns:
Device node ID
"""
...
def IsConnected(self) -> bool:
"""
Check if device is currently connected.
Returns:
True if device is connected
"""
...
def IsSecurelyConnected(self) -> bool:
"""
Check if device has a secure connection.
Returns:
True if securely connected
"""
...
def GetLastError(self) -> int:
"""
Get the last error code for this device.
Returns:
Error code or 0 if no error
"""
...
def ResetErrorCode(self):
"""Reset the device error code to 0."""
...Configure various aspects of the Matter stack behavior.
class ChipStackConfiguration:
"""Configuration settings for the CHIP stack."""
def __init__(self):
"""Initialize with default configuration."""
...
def set_ble_platform_config(self, config: dict):
"""
Set BLE platform-specific configuration.
Parameters:
- config: BLE configuration dictionary
"""
...
def set_wifi_platform_config(self, config: dict):
"""
Set WiFi platform-specific configuration.
Parameters:
- config: WiFi configuration dictionary
"""
...
def set_thread_platform_config(self, config: dict):
"""
Set Thread platform-specific configuration.
Parameters:
- config: Thread configuration dictionary
"""
...
def set_commissioning_config(self, config: dict):
"""
Set commissioning-related configuration.
Parameters:
- config: Commissioning configuration dictionary
"""
...
def set_security_config(self, config: dict):
"""
Set security and cryptographic configuration.
Parameters:
- config: Security configuration dictionary
"""
...
def get_config(self) -> dict:
"""
Get current stack configuration.
Returns:
Complete configuration dictionary
"""
...Integrate the Matter stack with application event loops.
class ChipEventLoop:
"""Event loop integration for CHIP stack."""
def __init__(self, stack: ChipStack):
"""
Initialize event loop integration.
Parameters:
- stack: CHIP stack instance
"""
...
def start(self):
"""Start the event loop processing."""
...
def stop(self):
"""Stop the event loop processing."""
...
def run_until_complete(self, coro):
"""
Run until the given coroutine completes.
Parameters:
- coro: Coroutine to run
Returns:
Coroutine result
"""
...
def schedule_callback(self, callback, delay: float = 0):
"""
Schedule a callback to run after a delay.
Parameters:
- callback: Function to call
- delay: Delay in seconds
"""
...
def is_running(self) -> bool:
"""
Check if event loop is running.
Returns:
True if event loop is running
"""
...Manage threading for Matter stack operations.
class ChipThreadManager:
"""Thread management for CHIP operations."""
def __init__(self, stack: ChipStack):
"""
Initialize thread manager.
Parameters:
- stack: CHIP stack instance
"""
...
def start_thread(self, target, args: tuple = (), name: str = None):
"""
Start a new thread for CHIP operations.
Parameters:
- target: Function to run in thread
- args: Arguments for target function
- name: Thread name (optional)
Returns:
Thread object
"""
...
def run_on_chip_thread(self, func, *args, **kwargs):
"""
Run function on the main CHIP thread.
Parameters:
- func: Function to execute
- args: Positional arguments
- kwargs: Keyword arguments
Returns:
Function result
"""
...
def run_on_chip_thread_async(self, func, *args, **kwargs):
"""
Run async function on the main CHIP thread.
Parameters:
- func: Async function to execute
- args: Positional arguments
- kwargs: Keyword arguments
Returns:
Awaitable result
"""
...
def is_chip_thread(self) -> bool:
"""
Check if current thread is the CHIP thread.
Returns:
True if running on CHIP thread
"""
...
def join_all_threads(self, timeout: float = None):
"""
Wait for all CHIP threads to complete.
Parameters:
- timeout: Maximum wait time in seconds
"""
...Monitor and manage the overall state of the Matter stack.
class ChipStackState:
"""State monitoring for CHIP stack."""
def __init__(self, stack: ChipStack):
"""
Initialize stack state monitor.
Parameters:
- stack: CHIP stack instance
"""
...
def is_initialized(self) -> bool:
"""
Check if stack is fully initialized.
Returns:
True if stack is initialized
"""
...
def is_running(self) -> bool:
"""
Check if stack is currently running.
Returns:
True if stack is running
"""
...
def is_shutting_down(self) -> bool:
"""
Check if stack is in shutdown process.
Returns:
True if stack is shutting down
"""
...
def get_active_connections(self) -> int:
"""
Get number of active device connections.
Returns:
Number of active connections
"""
...
def get_memory_usage(self) -> dict:
"""
Get memory usage statistics.
Returns:
Dictionary with memory usage information
"""
...
def get_performance_metrics(self) -> dict:
"""
Get performance metrics.
Returns:
Dictionary with performance statistics
"""
...
def reset_metrics(self):
"""Reset performance metrics counters."""
...from chip.ChipStack import ChipStack
# Initialize stack with persistent storage
stack = ChipStack(
persistentStoragePath="/tmp/chip_storage",
enableServerInteractions=True
)
try:
# Perform stack operations
print("Stack initialized successfully")
# Execute functions on stack thread
def get_fabric_count():
fabric_table = stack.GetFabricTable()
return fabric_table.GetFabricCount()
fabric_count = stack.Call(get_fabric_count)
print(f"Number of fabrics: {fabric_count}")
finally:
# Always shutdown cleanly
stack.Shutdown()from chip.ChipStack import ChipStack, ChipStackConfiguration
# Create custom configuration
config = ChipStackConfiguration()
# Configure BLE settings
config.set_ble_platform_config({
'adapter_id': 0,
'scan_timeout': 30,
'connection_timeout': 15
})
# Configure WiFi settings
config.set_wifi_platform_config({
'max_networks': 10,
'scan_interval': 60
})
# Configure commissioning
config.set_commissioning_config({
'enable_enhanced_commissioning': True,
'commissioning_timeout': 300,
'max_concurrent_commissions': 5
})
# Initialize stack with custom config
stack = ChipStack(
persistentStoragePath="/var/lib/chip",
enableServerInteractions=True,
configuration=config
)
# Monitor stack state
from chip.ChipStack import ChipStackState
state_monitor = ChipStackState(stack)
if state_monitor.is_initialized():
print("Stack is ready")
print(f"Active connections: {state_monitor.get_active_connections()}")
# Get performance metrics
metrics = state_monitor.get_performance_metrics()
print(f"Commands processed: {metrics.get('commands_processed', 0)}")
print(f"Attributes read: {metrics.get('attributes_read', 0)}")
stack.Shutdown()import asyncio
from chip.ChipStack import ChipStack, ChipEventLoop
async def main():
# Initialize stack
stack = ChipStack(persistentStoragePath="/tmp/chip_storage")
# Create event loop integration
chip_loop = ChipEventLoop(stack)
chip_loop.start()
try:
# Run async operations on CHIP stack
async def async_operation():
# Simulate some async work
await asyncio.sleep(1)
return "Operation completed"
result = await chip_loop.run_until_complete(async_operation())
print(result)
# Schedule periodic callback
def periodic_task():
print(f"Periodic task - Active connections: {stack.GetActiveConnections()}")
chip_loop.schedule_callback(periodic_task, delay=5.0)
# Wait a bit for the callback
await asyncio.sleep(6)
finally:
chip_loop.stop()
stack.Shutdown()
# Run the async main function
asyncio.run(main())import threading
import time
from chip.ChipStack import ChipStack, ChipThreadManager
stack = ChipStack(persistentStoragePath="/tmp/chip_storage")
thread_mgr = ChipThreadManager(stack)
try:
# Function to run on CHIP thread
def chip_work():
print(f"Running on CHIP thread: {thread_mgr.is_chip_thread()}")
return "CHIP work completed"
# Execute on CHIP thread
result = thread_mgr.run_on_chip_thread(chip_work)
print(result)
# Start background thread for monitoring
def monitor_thread():
while not stop_monitoring:
if thread_mgr.is_chip_thread():
print("Monitor running on CHIP thread")
else:
print("Monitor running on separate thread")
time.sleep(2)
stop_monitoring = False
monitor = thread_mgr.start_thread(
target=monitor_thread,
name="StackMonitor"
)
# Let it run for a while
time.sleep(10)
# Stop monitoring
stop_monitoring = True
# Wait for all threads to complete
thread_mgr.join_all_threads(timeout=5.0)
finally:
stack.Shutdown()from chip.ChipStack import ChipStack, DeviceProxyWrapper
from chip.ChipDeviceCtrl import ChipDeviceController
# Initialize stack and controller
stack = ChipStack(persistentStoragePath="/tmp/chip_storage")
controller = ChipDeviceController(controllerNodeId=12345)
try:
# Commission a device first...
# (commissioning code omitted for brevity)
# Get device proxy
device_proxy = controller.GetConnectedDevice(nodeId=1)
if device_proxy:
# Wrap the device proxy
wrapped_device = DeviceProxyWrapper(device_proxy)
print(f"Device ID: {wrapped_device.GetDeviceId()}")
print(f"Connected: {wrapped_device.IsConnected()}")
print(f"Secure: {wrapped_device.IsSecurelyConnected()}")
# Check for errors
error_code = wrapped_device.GetLastError()
if error_code != 0:
print(f"Device error: {error_code}")
wrapped_device.ResetErrorCode()
finally:
controller.Shutdown()
stack.Shutdown()Install with Tessl CLI
npx tessl i tessl/pypi-home-assistant-chip-clusters