0
# Crypto and Credentials
1
2
Security and cryptographic functionality including certificate management, fabric credentials, and secure communication setup.
3
4
## Capabilities
5
6
### Certificate Authority Management
7
8
Manage certificate authorities and certificate generation for Matter fabrics.
9
10
```python { .api }
11
class CertificateAuthority:
12
"""Certificate Authority for Matter fabric certificate management."""
13
14
def __init__(self, ca_cert: bytes = None, ca_key: bytes = None):
15
"""
16
Initialize Certificate Authority.
17
18
Parameters:
19
- ca_cert: CA certificate in DER format (optional, generates new if None)
20
- ca_key: CA private key in DER format (optional, generates new if None)
21
"""
22
...
23
24
def generate_noc_chain(
25
self,
26
csr: bytes,
27
fabric_id: int,
28
node_id: int,
29
cat_tags: list = None
30
) -> dict:
31
"""
32
Generate Node Operational Certificate chain.
33
34
Parameters:
35
- csr: Certificate Signing Request in DER format
36
- fabric_id: Target fabric ID
37
- node_id: Target node ID
38
- cat_tags: CASE Authenticated Tags (optional)
39
40
Returns:
41
Dictionary containing 'noc', 'icac', and 'rcac' certificates in DER format
42
"""
43
...
44
45
def get_root_cert(self) -> bytes:
46
"""
47
Get the root certificate.
48
49
Returns:
50
Root certificate in DER format
51
"""
52
...
53
54
def get_intermediate_cert(self) -> bytes:
55
"""
56
Get the intermediate certificate.
57
58
Returns:
59
Intermediate certificate in DER format (may be None)
60
"""
61
...
62
63
def verify_certificate(self, cert: bytes) -> bool:
64
"""
65
Verify a certificate against this CA.
66
67
Parameters:
68
- cert: Certificate to verify in DER format
69
70
Returns:
71
True if certificate is valid
72
"""
73
...
74
75
def revoke_certificate(self, cert: bytes) -> bool:
76
"""
77
Revoke a certificate.
78
79
Parameters:
80
- cert: Certificate to revoke in DER format
81
82
Returns:
83
True if revocation successful
84
"""
85
...
86
87
def get_crl(self) -> bytes:
88
"""
89
Get Certificate Revocation List.
90
91
Returns:
92
CRL in DER format
93
"""
94
...
95
```
96
97
### Fabric Administration
98
99
Manage fabric credentials and administrative operations.
100
101
```python { .api }
102
class FabricAdmin:
103
"""Fabric administration and credential management."""
104
105
def __init__(
106
self,
107
fabricId: int,
108
vendorId: int = 0xFFF1,
109
caStorage: dict = None
110
):
111
"""
112
Initialize Fabric Administrator.
113
114
Parameters:
115
- fabricId: Unique fabric identifier
116
- vendorId: Vendor ID for this fabric
117
- caStorage: Certificate Authority storage configuration
118
"""
119
...
120
121
def generate_controller_noc_chain(
122
self,
123
node_id: int,
124
cat_tags: list = None
125
) -> dict:
126
"""
127
Generate NOC chain for a controller.
128
129
Parameters:
130
- node_id: Controller node ID
131
- cat_tags: CASE Authenticated Tags (optional)
132
133
Returns:
134
Dictionary with 'noc', 'icac', 'rcac', and 'private_key'
135
"""
136
...
137
138
def get_fabric_id(self) -> int:
139
"""
140
Get the fabric ID.
141
142
Returns:
143
Fabric ID
144
"""
145
...
146
147
def get_vendor_id(self) -> int:
148
"""
149
Get the vendor ID.
150
151
Returns:
152
Vendor ID
153
"""
154
...
155
156
def get_root_certificate(self) -> bytes:
157
"""
158
Get the fabric root certificate.
159
160
Returns:
161
Root certificate in DER format
162
"""
163
...
164
165
def get_intermediate_certificate(self) -> bytes:
166
"""
167
Get the fabric intermediate certificate.
168
169
Returns:
170
Intermediate certificate in DER format (may be None)
171
"""
172
...
173
174
def set_root_certificate(self, cert: bytes):
175
"""
176
Set the fabric root certificate.
177
178
Parameters:
179
- cert: Root certificate in DER format
180
"""
181
...
182
183
def set_intermediate_certificate(self, cert: bytes):
184
"""
185
Set the fabric intermediate certificate.
186
187
Parameters:
188
- cert: Intermediate certificate in DER format
189
"""
190
...
191
192
def create_controller_credentials(
193
self,
194
controller_node_id: int,
195
cat_tags: list = None
196
) -> dict:
197
"""
198
Create complete credentials for a controller.
199
200
Parameters:
201
- controller_node_id: Node ID for the controller
202
- cat_tags: CASE Authenticated Tags (optional)
203
204
Returns:
205
Dictionary with all credential components
206
"""
207
...
208
```
209
210
### Cryptographic Operations
211
212
Low-level cryptographic operations for Matter protocol.
213
214
```python { .api }
215
class ChipCrypto:
216
"""Cryptographic operations for CHIP protocol."""
217
218
@staticmethod
219
def generate_keypair() -> tuple:
220
"""
221
Generate an EC P-256 key pair.
222
223
Returns:
224
Tuple of (private_key_bytes, public_key_bytes)
225
"""
226
...
227
228
@staticmethod
229
def sign_message(private_key: bytes, message: bytes) -> bytes:
230
"""
231
Sign a message using ECDSA.
232
233
Parameters:
234
- private_key: Private key in DER format
235
- message: Message to sign
236
237
Returns:
238
Signature bytes
239
"""
240
...
241
242
@staticmethod
243
def verify_signature(
244
public_key: bytes,
245
message: bytes,
246
signature: bytes
247
) -> bool:
248
"""
249
Verify an ECDSA signature.
250
251
Parameters:
252
- public_key: Public key in DER format
253
- message: Original message
254
- signature: Signature to verify
255
256
Returns:
257
True if signature is valid
258
"""
259
...
260
261
@staticmethod
262
def encrypt_message(key: bytes, nonce: bytes, plaintext: bytes) -> bytes:
263
"""
264
Encrypt a message using AES-CCM.
265
266
Parameters:
267
- key: Encryption key (16 bytes)
268
- nonce: Nonce for encryption (13 bytes)
269
- plaintext: Data to encrypt
270
271
Returns:
272
Encrypted data with authentication tag
273
"""
274
...
275
276
@staticmethod
277
def decrypt_message(key: bytes, nonce: bytes, ciphertext: bytes) -> bytes:
278
"""
279
Decrypt a message using AES-CCM.
280
281
Parameters:
282
- key: Decryption key (16 bytes)
283
- nonce: Nonce used for encryption (13 bytes)
284
- ciphertext: Encrypted data with authentication tag
285
286
Returns:
287
Decrypted plaintext
288
289
Raises:
290
ChipStackException: If decryption fails or authentication fails
291
"""
292
...
293
294
@staticmethod
295
def derive_key(shared_secret: bytes, salt: bytes, info: bytes) -> bytes:
296
"""
297
Derive key using HKDF.
298
299
Parameters:
300
- shared_secret: Shared secret from key exchange
301
- salt: Salt value
302
- info: Context information
303
304
Returns:
305
Derived key bytes
306
"""
307
...
308
309
@staticmethod
310
def hash_message(message: bytes) -> bytes:
311
"""
312
Hash a message using SHA-256.
313
314
Parameters:
315
- message: Data to hash
316
317
Returns:
318
SHA-256 hash bytes (32 bytes)
319
"""
320
...
321
322
@staticmethod
323
def generate_random(length: int) -> bytes:
324
"""
325
Generate cryptographically secure random bytes.
326
327
Parameters:
328
- length: Number of random bytes to generate
329
330
Returns:
331
Random bytes
332
"""
333
...
334
```
335
336
### Certificate Validation
337
338
Validate and process Matter certificates.
339
340
```python { .api }
341
class CertificateValidator:
342
"""Certificate validation for Matter protocol."""
343
344
def __init__(self, paa_trust_store: list = None):
345
"""
346
Initialize certificate validator.
347
348
Parameters:
349
- paa_trust_store: List of trusted PAA certificates in DER format
350
"""
351
...
352
353
def validate_device_attestation_certificate(
354
self,
355
dac: bytes,
356
pai: bytes,
357
paa: bytes = None
358
) -> bool:
359
"""
360
Validate Device Attestation Certificate chain.
361
362
Parameters:
363
- dac: Device Attestation Certificate in DER format
364
- pai: Product Attestation Intermediate certificate in DER format
365
- paa: Product Attestation Authority certificate in DER format (optional)
366
367
Returns:
368
True if certificate chain is valid
369
"""
370
...
371
372
def validate_operational_certificate(
373
self,
374
noc: bytes,
375
icac: bytes = None,
376
rcac: bytes = None
377
) -> bool:
378
"""
379
Validate Node Operational Certificate chain.
380
381
Parameters:
382
- noc: Node Operational Certificate in DER format
383
- icac: Intermediate CA certificate in DER format (optional)
384
- rcac: Root CA certificate in DER format (optional)
385
386
Returns:
387
True if certificate chain is valid
388
"""
389
...
390
391
def extract_fabric_id(self, noc: bytes) -> int:
392
"""
393
Extract fabric ID from NOC certificate.
394
395
Parameters:
396
- noc: Node Operational Certificate in DER format
397
398
Returns:
399
Fabric ID or None if not found
400
"""
401
...
402
403
def extract_node_id(self, noc: bytes) -> int:
404
"""
405
Extract node ID from NOC certificate.
406
407
Parameters:
408
- noc: Node Operational Certificate in DER format
409
410
Returns:
411
Node ID or None if not found
412
"""
413
...
414
415
def extract_cat_tags(self, noc: bytes) -> list:
416
"""
417
Extract CASE Authenticated Tags from NOC certificate.
418
419
Parameters:
420
- noc: Node Operational Certificate in DER format
421
422
Returns:
423
List of CAT tag values
424
"""
425
...
426
427
def is_certificate_expired(self, cert: bytes) -> bool:
428
"""
429
Check if a certificate is expired.
430
431
Parameters:
432
- cert: Certificate in DER format
433
434
Returns:
435
True if certificate is expired
436
"""
437
...
438
439
def get_certificate_expiry(self, cert: bytes) -> int:
440
"""
441
Get certificate expiration timestamp.
442
443
Parameters:
444
- cert: Certificate in DER format
445
446
Returns:
447
Unix timestamp of expiration
448
"""
449
...
450
```
451
452
### Secure Session Management
453
454
Manage secure sessions and encryption keys.
455
456
```python { .api }
457
class SecureSessionManager:
458
"""Secure session management for Matter communications."""
459
460
def __init__(self):
461
"""Initialize secure session manager."""
462
...
463
464
def establish_case_session(
465
self,
466
peer_node_id: int,
467
fabric_id: int,
468
credentials: dict
469
) -> dict:
470
"""
471
Establish CASE (Certificate Authenticated Session Establishment) session.
472
473
Parameters:
474
- peer_node_id: Peer device node ID
475
- fabric_id: Fabric ID
476
- credentials: Dictionary with 'noc', 'icac', 'rcac', 'private_key'
477
478
Returns:
479
Session information dictionary
480
"""
481
...
482
483
def establish_pase_session(
484
self,
485
setup_pin_code: int,
486
discriminator: int
487
) -> dict:
488
"""
489
Establish PASE (Password Authenticated Session Establishment) session.
490
491
Parameters:
492
- setup_pin_code: Device setup PIN code
493
- discriminator: Device discriminator
494
495
Returns:
496
Session information dictionary
497
"""
498
...
499
500
def get_session_keys(self, session_id: int) -> dict:
501
"""
502
Get encryption keys for a session.
503
504
Parameters:
505
- session_id: Session identifier
506
507
Returns:
508
Dictionary with encryption and decryption keys
509
"""
510
...
511
512
def close_session(self, session_id: int) -> bool:
513
"""
514
Close a secure session.
515
516
Parameters:
517
- session_id: Session identifier
518
519
Returns:
520
True if session closed successfully
521
"""
522
...
523
524
def is_session_active(self, session_id: int) -> bool:
525
"""
526
Check if a session is active.
527
528
Parameters:
529
- session_id: Session identifier
530
531
Returns:
532
True if session is active
533
"""
534
...
535
536
def refresh_session(self, session_id: int) -> bool:
537
"""
538
Refresh an existing session.
539
540
Parameters:
541
- session_id: Session identifier
542
543
Returns:
544
True if refresh successful
545
"""
546
...
547
```
548
549
## Usage Examples
550
551
### Certificate Authority Setup
552
553
```python
554
from chip.CertificateAuthority import CertificateAuthority
555
from chip.FabricAdmin import FabricAdmin
556
557
# Create a new Certificate Authority
558
ca = CertificateAuthority()
559
560
# Get the root certificate for distribution
561
root_cert = ca.get_root_cert()
562
print(f"Root certificate length: {len(root_cert)} bytes")
563
564
# Create a fabric administrator
565
fabric_admin = FabricAdmin(fabricId=1, vendorId=0x1234)
566
567
# Generate controller credentials
568
controller_creds = fabric_admin.generate_controller_noc_chain(
569
node_id=12345,
570
cat_tags=[0x00010001, 0x00020002] # Example CAT tags
571
)
572
573
print("Controller credentials generated:")
574
print(f" NOC length: {len(controller_creds['noc'])} bytes")
575
print(f" Private key length: {len(controller_creds['private_key'])} bytes")
576
print(f" Root cert length: {len(controller_creds['rcac'])} bytes")
577
578
# Use credentials with a device controller
579
from chip.ChipDeviceCtrl import ChipDeviceController
580
581
controller = ChipDeviceController(
582
controllerNodeId=12345,
583
fabricAdmin=fabric_admin
584
)
585
```
586
587
### Device Attestation Validation
588
589
```python
590
from chip.crypto import CertificateValidator
591
592
# Initialize validator with trusted PAA certificates
593
paa_trust_store = [
594
# Load PAA certificates from trust store
595
open("/path/to/paa1.der", "rb").read(),
596
open("/path/to/paa2.der", "rb").read()
597
]
598
599
validator = CertificateValidator(paa_trust_store=paa_trust_store)
600
601
# Validate device attestation during commissioning
602
# (These would typically come from the device during commissioning)
603
device_dac = b"..." # Device Attestation Certificate
604
product_pai = b"..." # Product Attestation Intermediate
605
product_paa = b"..." # Product Attestation Authority
606
607
is_valid = validator.validate_device_attestation_certificate(
608
dac=device_dac,
609
pai=product_pai,
610
paa=product_paa
611
)
612
613
if is_valid:
614
print("Device attestation certificate is valid")
615
616
# Extract information from certificates
617
fabric_id = validator.extract_fabric_id(device_dac)
618
node_id = validator.extract_node_id(device_dac)
619
cat_tags = validator.extract_cat_tags(device_dac)
620
621
print(f"Fabric ID: {fabric_id}")
622
print(f"Node ID: {node_id}")
623
print(f"CAT Tags: {cat_tags}")
624
625
# Check expiration
626
if validator.is_certificate_expired(device_dac):
627
print("WARNING: Device certificate is expired")
628
else:
629
expiry = validator.get_certificate_expiry(device_dac)
630
print(f"Certificate expires: {expiry}")
631
else:
632
print("Device attestation certificate validation failed")
633
```
634
635
### Cryptographic Operations
636
637
```python
638
from chip.crypto import ChipCrypto
639
640
# Generate a key pair
641
private_key, public_key = ChipCrypto.generate_keypair()
642
print(f"Generated key pair - Private: {len(private_key)} bytes, Public: {len(public_key)} bytes")
643
644
# Sign and verify a message
645
message = b"Hello, Matter!"
646
signature = ChipCrypto.sign_message(private_key, message)
647
print(f"Signature length: {len(signature)} bytes")
648
649
is_valid = ChipCrypto.verify_signature(public_key, message, signature)
650
print(f"Signature valid: {is_valid}")
651
652
# Encrypt and decrypt data
653
encryption_key = ChipCrypto.generate_random(16) # 128-bit key
654
nonce = ChipCrypto.generate_random(13) # 104-bit nonce
655
plaintext = b"Secret device configuration data"
656
657
encrypted = ChipCrypto.encrypt_message(encryption_key, nonce, plaintext)
658
print(f"Encrypted data length: {len(encrypted)} bytes")
659
660
decrypted = ChipCrypto.decrypt_message(encryption_key, nonce, encrypted)
661
print(f"Decrypted: {decrypted}")
662
print(f"Decryption successful: {decrypted == plaintext}")
663
664
# Key derivation
665
shared_secret = ChipCrypto.generate_random(32)
666
salt = ChipCrypto.generate_random(16)
667
info = b"Matter Key Derivation"
668
669
derived_key = ChipCrypto.derive_key(shared_secret, salt, info)
670
print(f"Derived key length: {len(derived_key)} bytes")
671
672
# Hash a message
673
message_hash = ChipCrypto.hash_message(message)
674
print(f"SHA-256 hash: {message_hash.hex()}")
675
```
676
677
### Secure Session Establishment
678
679
```python
680
from chip.crypto import SecureSessionManager
681
682
# Initialize session manager
683
session_mgr = SecureSessionManager()
684
685
# Establish CASE session (for operational devices)
686
credentials = {
687
'noc': controller_creds['noc'],
688
'icac': controller_creds.get('icac'),
689
'rcac': controller_creds['rcac'],
690
'private_key': controller_creds['private_key']
691
}
692
693
case_session = session_mgr.establish_case_session(
694
peer_node_id=1,
695
fabric_id=1,
696
credentials=credentials
697
)
698
699
if case_session:
700
print(f"CASE session established: {case_session['session_id']}")
701
702
# Get session keys
703
session_keys = session_mgr.get_session_keys(case_session['session_id'])
704
print(f"Session keys available: {list(session_keys.keys())}")
705
706
# Check if session is active
707
is_active = session_mgr.is_session_active(case_session['session_id'])
708
print(f"Session active: {is_active}")
709
710
# Establish PASE session (for commissioning)
711
pase_session = session_mgr.establish_pase_session(
712
setup_pin_code=20202021,
713
discriminator=3840
714
)
715
716
if pase_session:
717
print(f"PASE session established: {pase_session['session_id']}")
718
719
# Use PASE session for commissioning operations
720
# After commissioning is complete, close the session
721
session_mgr.close_session(pase_session['session_id'])
722
print("PASE session closed")
723
```
724
725
### Complete Fabric and Device Setup
726
727
```python
728
from chip.CertificateAuthority import CertificateAuthority
729
from chip.FabricAdmin import FabricAdmin
730
from chip.ChipDeviceCtrl import ChipDeviceController
731
from chip.crypto import CertificateValidator
732
733
# 1. Set up Certificate Authority
734
ca = CertificateAuthority()
735
root_cert = ca.get_root_cert()
736
737
# 2. Create Fabric Administrator
738
fabric_admin = FabricAdmin(fabricId=1, vendorId=0x1234)
739
fabric_admin.set_root_certificate(root_cert)
740
741
# 3. Generate controller credentials
742
controller_creds = fabric_admin.generate_controller_noc_chain(
743
node_id=12345,
744
cat_tags=[0x00010001] # Admin access tag
745
)
746
747
# 4. Initialize Device Controller
748
controller = ChipDeviceController(
749
controllerNodeId=12345,
750
fabricAdmin=fabric_admin
751
)
752
753
# 5. Set up certificate validator
754
validator = CertificateValidator()
755
756
try:
757
# 6. Commission a device
758
success = controller.CommissionOnNetwork(
759
nodeId=1,
760
setupPinCode=20202021
761
)
762
763
if success:
764
print("Device commissioned successfully")
765
766
# 7. Validate the operational certificate created during commissioning
767
# (This would be available from the device controller after commissioning)
768
device_noc = controller.GetDeviceCredentials(nodeId=1)['noc']
769
770
is_valid = validator.validate_operational_certificate(
771
noc=device_noc,
772
rcac=root_cert
773
)
774
775
if is_valid:
776
print("Device operational certificate is valid")
777
778
# Extract device information
779
device_fabric_id = validator.extract_fabric_id(device_noc)
780
device_node_id = validator.extract_node_id(device_noc)
781
782
print(f"Device is in fabric {device_fabric_id} with node ID {device_node_id}")
783
else:
784
print("WARNING: Device operational certificate validation failed")
785
786
finally:
787
controller.Shutdown()
788
```