0
# Greengrass Core IPC
1
2
Complete Greengrass Core Inter-Process Communication client providing access to all Greengrass Core services including component management, configuration, local deployment, pub/sub messaging, device shadow operations, security services, secrets management, and metrics collection.
3
4
## Capabilities
5
6
### Connection Setup
7
8
#### Connect Function
9
10
```python { .api }
11
def connect(**kwargs):
12
"""
13
Create IPC connection to Greengrass Core.
14
15
Parameters:
16
- ipc_socket (str): IPC socket path (optional, defaults to environment variable)
17
- authtoken (str): Authentication token (optional, defaults to environment variable)
18
- lifecycle_handler: Lifecycle event handler (optional)
19
- timeout (float): Connection timeout in seconds (optional)
20
21
Returns:
22
GreengrassCoreIPCClient: Connected IPC client
23
"""
24
```
25
26
Usage example:
27
28
```python
29
from awsiot.greengrasscoreipc import connect
30
31
# Connect using environment variables (typical for Greengrass components)
32
ipc_client = connect()
33
34
# Or specify connection parameters explicitly
35
ipc_client = connect(
36
ipc_socket="/tmp/greengrass_ipc.sock",
37
authtoken="your-auth-token"
38
)
39
```
40
41
### V1 IPC Client
42
43
#### Client Class
44
45
```python { .api }
46
class GreengrassCoreIPCClient:
47
"""
48
V1 client for Greengrass Core IPC operations with callback-based interface.
49
"""
50
def __init__(self, connection): ...
51
```
52
53
#### Component Management Operations
54
55
```python { .api }
56
def new_get_component_details(self):
57
"""Create operation to get component details."""
58
59
def new_list_components(self):
60
"""Create operation to list components."""
61
62
def new_restart_component(self):
63
"""Create operation to restart a component."""
64
65
def new_stop_component(self):
66
"""Create operation to stop a component."""
67
68
def new_pause_component(self):
69
"""Create operation to pause a component."""
70
71
def new_resume_component(self):
72
"""Create operation to resume a component."""
73
```
74
75
#### Configuration Management Operations
76
77
```python { .api }
78
def new_get_configuration(self):
79
"""Create operation to get configuration."""
80
81
def new_update_configuration(self):
82
"""Create operation to update configuration."""
83
84
def new_subscribe_to_configuration_update(self):
85
"""Create streaming operation to subscribe to configuration updates."""
86
```
87
88
#### Deployment Management Operations
89
90
```python { .api }
91
def new_create_local_deployment(self):
92
"""Create operation to create local deployment."""
93
94
def new_get_local_deployment_status(self):
95
"""Create operation to get local deployment status."""
96
97
def new_list_local_deployments(self):
98
"""Create operation to list local deployments."""
99
100
def new_cancel_local_deployment(self):
101
"""Create operation to cancel local deployment."""
102
```
103
104
#### IoT Core Integration Operations
105
106
```python { .api }
107
def new_publish_to_iot_core(self):
108
"""Create operation to publish message to IoT Core."""
109
110
def new_subscribe_to_iot_core(self):
111
"""Create streaming operation to subscribe to IoT Core topics."""
112
```
113
114
#### Local Pub/Sub Operations
115
116
```python { .api }
117
def new_publish_to_topic(self):
118
"""Create operation to publish to local topic."""
119
120
def new_subscribe_to_topic(self):
121
"""Create streaming operation to subscribe to local topics."""
122
```
123
124
#### Device Shadow Operations
125
126
```python { .api }
127
def new_get_thing_shadow(self):
128
"""Create operation to get thing shadow."""
129
130
def new_update_thing_shadow(self):
131
"""Create operation to update thing shadow."""
132
133
def new_delete_thing_shadow(self):
134
"""Create operation to delete thing shadow."""
135
136
def new_list_named_shadows_for_thing(self):
137
"""Create operation to list named shadows for thing."""
138
```
139
140
#### Security Operations
141
142
```python { .api }
143
def new_get_client_device_auth_token(self):
144
"""Create operation to get client device auth token."""
145
146
def new_verify_client_device_identity(self):
147
"""Create operation to verify client device identity."""
148
149
def new_authorize_client_device_action(self):
150
"""Create operation to authorize client device action."""
151
```
152
153
#### Secrets Management Operations
154
155
```python { .api }
156
def new_get_secret_value(self):
157
"""Create operation to get secret value."""
158
```
159
160
#### Metrics Operations
161
162
```python { .api }
163
def new_put_component_metric(self):
164
"""Create operation to put component metric."""
165
```
166
167
#### State Management Operations
168
169
```python { .api }
170
def new_update_state(self):
171
"""Create operation to update component state."""
172
173
def new_defer_component_update(self):
174
"""Create operation to defer component update."""
175
```
176
177
#### Certificate Management Operations
178
179
```python { .api }
180
def new_subscribe_to_certificate_updates(self):
181
"""Create streaming operation to subscribe to certificate updates."""
182
```
183
184
### V2 IPC Client
185
186
#### Client Class
187
188
```python { .api }
189
class GreengrassCoreIPCClientV2:
190
"""
191
V2 client for Greengrass Core IPC operations with Future-based interface.
192
"""
193
def __init__(self, connection): ...
194
```
195
196
The V2 client provides the same operations as V1 but with Future-based return types for better async/await support.
197
198
### Data Model Classes
199
200
#### Component Management
201
202
```python { .api }
203
@dataclass
204
class GetComponentDetailsRequest:
205
"""Request to get component details."""
206
component_name: str
207
208
@dataclass
209
class GetComponentDetailsResponse:
210
"""Response containing component details."""
211
component_details: Optional[ComponentDetails] = None
212
213
@dataclass
214
class ComponentDetails:
215
"""Details about a Greengrass component."""
216
component_name: Optional[str] = None
217
version: Optional[str] = None
218
state: Optional[str] = None # LifecycleState
219
configuration: Optional[Dict[str, Any]] = None
220
221
@dataclass
222
class ListComponentsRequest:
223
"""Request to list components."""
224
pass
225
226
@dataclass
227
class ListComponentsResponse:
228
"""Response containing component list."""
229
components: Optional[List[ComponentDetails]] = None
230
231
@dataclass
232
class RestartComponentRequest:
233
"""Request to restart component."""
234
component_name: str
235
236
@dataclass
237
class RestartComponentResponse:
238
"""Response from restart component operation."""
239
restart_status: Optional[str] = None # RequestStatus
240
message: Optional[str] = None
241
```
242
243
#### Configuration Management
244
245
```python { .api }
246
@dataclass
247
class GetConfigurationRequest:
248
"""Request to get configuration."""
249
component_name: Optional[str] = None
250
key_path: Optional[List[str]] = None
251
252
@dataclass
253
class GetConfigurationResponse:
254
"""Response containing configuration."""
255
component_name: Optional[str] = None
256
value: Optional[Dict[str, Any]] = None
257
258
@dataclass
259
class UpdateConfigurationRequest:
260
"""Request to update configuration."""
261
key_path: Optional[List[str]] = None
262
timestamp: Optional[datetime.datetime] = None
263
value_to_merge: Optional[Dict[str, Any]] = None
264
265
@dataclass
266
class UpdateConfigurationResponse:
267
"""Response from configuration update."""
268
pass
269
270
@dataclass
271
class ConfigurationUpdateEvent:
272
"""Event for configuration updates."""
273
component_name: Optional[str] = None
274
key_path: Optional[List[str]] = None
275
```
276
277
#### Deployment Management
278
279
```python { .api }
280
@dataclass
281
class CreateLocalDeploymentRequest:
282
"""Request to create local deployment."""
283
group_name: str
284
deployment_id: Optional[str] = None
285
components_to_add: Optional[Dict[str, ComponentDeploymentSpecification]] = None
286
components_to_remove: Optional[List[str]] = None
287
components_to_merge: Optional[Dict[str, ComponentDeploymentSpecification]] = None
288
failure_handling_policy: Optional[str] = None # FailureHandlingPolicy
289
recipe_directory_path: Optional[str] = None
290
artifacts_directory_path: Optional[str] = None
291
292
@dataclass
293
class CreateLocalDeploymentResponse:
294
"""Response from create local deployment."""
295
deployment_id: Optional[str] = None
296
297
@dataclass
298
class GetLocalDeploymentStatusRequest:
299
"""Request to get deployment status."""
300
deployment_id: str
301
302
@dataclass
303
class GetLocalDeploymentStatusResponse:
304
"""Response containing deployment status."""
305
deployment: Optional[LocalDeployment] = None
306
```
307
308
#### Pub/Sub Operations
309
310
```python { .api }
311
@dataclass
312
class PublishToTopicRequest:
313
"""Request to publish to local topic."""
314
topic: str
315
publish_message: Optional[PublishMessage] = None
316
317
@dataclass
318
class PublishToTopicResponse:
319
"""Response from publish to topic."""
320
pass
321
322
@dataclass
323
class PublishMessage:
324
"""Message to publish."""
325
json_message: Optional[Dict[str, Any]] = None
326
binary_message: Optional[BinaryMessage] = None
327
328
@dataclass
329
class SubscribeToTopicRequest:
330
"""Request to subscribe to local topic."""
331
topic: str
332
333
@dataclass
334
class SubscriptionResponseMessage:
335
"""Message received from subscription."""
336
json_message: Optional[Dict[str, Any]] = None
337
binary_message: Optional[BinaryMessage] = None
338
context: Optional[MessageContext] = None
339
```
340
341
#### IoT Core Integration
342
343
```python { .api }
344
@dataclass
345
class PublishToIoTCoreRequest:
346
"""Request to publish to IoT Core."""
347
topic_name: str
348
qos: Optional[str] = None # QOS
349
payload: Optional[bytes] = None
350
retain: Optional[bool] = None
351
user_properties: Optional[List[UserProperty]] = None
352
message_expiry_interval_seconds: Optional[int] = None
353
correlation_data: Optional[bytes] = None
354
response_topic: Optional[str] = None
355
payload_format: Optional[str] = None # PayloadFormat
356
content_type: Optional[str] = None
357
358
@dataclass
359
class PublishToIoTCoreResponse:
360
"""Response from publish to IoT Core."""
361
pass
362
363
@dataclass
364
class SubscribeToIoTCoreRequest:
365
"""Request to subscribe to IoT Core topic."""
366
topic_name: str
367
qos: Optional[str] = None # QOS
368
369
@dataclass
370
class IoTCoreMessage:
371
"""Message from IoT Core."""
372
message: Optional[MQTTMessage] = None
373
```
374
375
#### Device Shadow Operations
376
377
```python { .api }
378
@dataclass
379
class GetThingShadowRequest:
380
"""Request to get thing shadow."""
381
thing_name: str
382
shadow_name: Optional[str] = None
383
384
@dataclass
385
class GetThingShadowResponse:
386
"""Response containing thing shadow."""
387
payload: Optional[bytes] = None
388
389
@dataclass
390
class UpdateThingShadowRequest:
391
"""Request to update thing shadow."""
392
thing_name: str
393
payload: bytes
394
shadow_name: Optional[str] = None
395
396
@dataclass
397
class UpdateThingShadowResponse:
398
"""Response from update thing shadow."""
399
payload: Optional[bytes] = None
400
401
@dataclass
402
class DeleteThingShadowRequest:
403
"""Request to delete thing shadow."""
404
thing_name: str
405
shadow_name: Optional[str] = None
406
407
@dataclass
408
class DeleteThingShadowResponse:
409
"""Response from delete thing shadow."""
410
payload: Optional[bytes] = None
411
```
412
413
#### Security Operations
414
415
```python { .api }
416
@dataclass
417
class GetClientDeviceAuthTokenRequest:
418
"""Request to get client device auth token."""
419
credential: Optional[CredentialDocument] = None
420
421
@dataclass
422
class GetClientDeviceAuthTokenResponse:
423
"""Response containing auth token."""
424
client_device_auth_token: Optional[str] = None
425
426
@dataclass
427
class VerifyClientDeviceIdentityRequest:
428
"""Request to verify client device identity."""
429
credential: Optional[CredentialDocument] = None
430
431
@dataclass
432
class VerifyClientDeviceIdentityResponse:
433
"""Response from identity verification."""
434
is_valid_client_device: Optional[bool] = None
435
436
@dataclass
437
class AuthorizeClientDeviceActionRequest:
438
"""Request to authorize client device action."""
439
client_device_auth_token: str
440
operation: str
441
resource: str
442
443
@dataclass
444
class AuthorizeClientDeviceActionResponse:
445
"""Response from authorization check."""
446
is_authorized: Optional[bool] = None
447
```
448
449
#### Secrets Management
450
451
```python { .api }
452
@dataclass
453
class GetSecretValueRequest:
454
"""Request to get secret value."""
455
secret_id: str
456
version_id: Optional[str] = None
457
version_stage: Optional[str] = None
458
459
@dataclass
460
class GetSecretValueResponse:
461
"""Response containing secret value."""
462
secret_id: Optional[str] = None
463
version_id: Optional[str] = None
464
secret_binary: Optional[bytes] = None
465
secret_string: Optional[str] = None
466
```
467
468
#### Metrics
469
470
```python { .api }
471
@dataclass
472
class PutComponentMetricRequest:
473
"""Request to put component metric."""
474
metrics: List[Metric]
475
476
@dataclass
477
class PutComponentMetricResponse:
478
"""Response from put metric operation."""
479
pass
480
481
@dataclass
482
class Metric:
483
"""Component metric data."""
484
name: str
485
unit: Optional[str] = None # MetricUnitType
486
value: Optional[float] = None
487
timestamp: Optional[datetime.datetime] = None
488
```
489
490
### Constants
491
492
#### Lifecycle States
493
494
```python { .api }
495
class LifecycleState:
496
"""Component lifecycle state constants."""
497
NEW = "NEW"
498
INSTALLED = "INSTALLED"
499
STARTING = "STARTING"
500
RUNNING = "RUNNING"
501
STOPPING = "STOPPING"
502
ERRORED = "ERRORED"
503
BROKEN = "BROKEN"
504
FINISHED = "FINISHED"
505
```
506
507
#### Request Status
508
509
```python { .api }
510
class RequestStatus:
511
"""Request status constants."""
512
SUCCEEDED = "SUCCEEDED"
513
FAILED = "FAILED"
514
```
515
516
#### QoS Levels
517
518
```python { .api }
519
class QOS:
520
"""MQTT QoS level constants."""
521
AT_MOST_ONCE = "AT_MOST_ONCE"
522
AT_LEAST_ONCE = "AT_LEAST_ONCE"
523
```
524
525
## Usage Examples
526
527
### Component Management
528
529
```python
530
from awsiot.greengrasscoreipc import connect
531
import json
532
533
# Connect to Greengrass Core
534
ipc_client = connect()
535
536
def list_all_components():
537
"""List all Greengrass components."""
538
try:
539
# Create operation
540
operation = ipc_client.new_list_components()
541
542
# Activate operation
543
operation.activate({})
544
545
# Get result
546
result = operation.get_response().result()
547
548
print("Greengrass Components:")
549
for component in result.components:
550
print(f" Name: {component.component_name}")
551
print(f" Version: {component.version}")
552
print(f" State: {component.state}")
553
print(" ---")
554
555
return result.components
556
557
except Exception as e:
558
print(f"Failed to list components: {e}")
559
return []
560
561
def get_component_details(component_name):
562
"""Get details for a specific component."""
563
try:
564
operation = ipc_client.new_get_component_details()
565
566
request = {
567
"componentName": component_name
568
}
569
570
operation.activate(request)
571
result = operation.get_response().result()
572
573
details = result.component_details
574
print(f"Component: {details.component_name}")
575
print(f"Version: {details.version}")
576
print(f"State: {details.state}")
577
print(f"Configuration: {json.dumps(details.configuration, indent=2)}")
578
579
return details
580
581
except Exception as e:
582
print(f"Failed to get component details: {e}")
583
return None
584
585
def restart_component(component_name):
586
"""Restart a Greengrass component."""
587
try:
588
operation = ipc_client.new_restart_component()
589
590
request = {
591
"componentName": component_name
592
}
593
594
operation.activate(request)
595
result = operation.get_response().result()
596
597
print(f"Restart status: {result.restart_status}")
598
if result.message:
599
print(f"Message: {result.message}")
600
601
return result.restart_status == "SUCCEEDED"
602
603
except Exception as e:
604
print(f"Failed to restart component: {e}")
605
return False
606
607
# Usage
608
components = list_all_components()
609
if components:
610
details = get_component_details(components[0].component_name)
611
612
# Restart a component (be careful with this!)
613
# restart_component("YourComponentName")
614
```
615
616
### Local Pub/Sub
617
618
```python
619
from awsiot.greengrasscoreipc import connect
620
import json
621
import time
622
import threading
623
624
ipc_client = connect()
625
626
def publish_to_local_topic(topic, message):
627
"""Publish message to local Greengrass topic."""
628
try:
629
operation = ipc_client.new_publish_to_topic()
630
631
request = {
632
"topic": topic,
633
"publishMessage": {
634
"jsonMessage": message
635
}
636
}
637
638
operation.activate(request)
639
result = operation.get_response().result()
640
641
print(f"Published to topic {topic}: {message}")
642
return True
643
644
except Exception as e:
645
print(f"Failed to publish: {e}")
646
return False
647
648
def subscribe_to_local_topic(topic, callback):
649
"""Subscribe to local Greengrass topic."""
650
try:
651
operation = ipc_client.new_subscribe_to_topic()
652
653
request = {
654
"topic": topic
655
}
656
657
def stream_handler(event):
658
"""Handle incoming messages."""
659
try:
660
if hasattr(event, 'json_message') and event.json_message:
661
callback(topic, event.json_message)
662
elif hasattr(event, 'binary_message') and event.binary_message:
663
callback(topic, event.binary_message.message)
664
except Exception as e:
665
print(f"Error handling message: {e}")
666
667
# Activate subscription
668
operation.activate(request)
669
670
# Set up message handler
671
operation.get_response().add_done_callback(
672
lambda future: print(f"Subscription to {topic} established")
673
)
674
675
# Handle stream events
676
stream = operation.get_response().result()
677
while True:
678
try:
679
event = stream.get_next_message().result(timeout=1.0)
680
stream_handler(event)
681
except Exception:
682
# Timeout or stream closed
683
break
684
685
except Exception as e:
686
print(f"Failed to subscribe to topic: {e}")
687
688
def message_handler(topic, message):
689
"""Handle received messages."""
690
print(f"Received on {topic}: {message}")
691
692
# Example usage
693
def pub_sub_example():
694
"""Example of local pub/sub communication."""
695
696
# Start subscriber in background thread
697
subscribe_thread = threading.Thread(
698
target=subscribe_to_local_topic,
699
args=("sensor/temperature", message_handler)
700
)
701
subscribe_thread.daemon = True
702
subscribe_thread.start()
703
704
# Give subscriber time to connect
705
time.sleep(1)
706
707
# Publish some messages
708
for i in range(5):
709
message = {
710
"temperature": 20.0 + i,
711
"humidity": 45.0 + i * 2,
712
"timestamp": time.time()
713
}
714
715
publish_to_local_topic("sensor/temperature", message)
716
time.sleep(2)
717
718
# Run example
719
pub_sub_example()
720
```
721
722
### Configuration Management
723
724
```python
725
from awsiot.greengrasscoreipc import connect
726
import json
727
728
ipc_client = connect()
729
730
def get_component_configuration(component_name, key_path=None):
731
"""Get component configuration."""
732
try:
733
operation = ipc_client.new_get_configuration()
734
735
request = {
736
"componentName": component_name
737
}
738
739
if key_path:
740
request["keyPath"] = key_path
741
742
operation.activate(request)
743
result = operation.get_response().result()
744
745
print(f"Configuration for {component_name}:")
746
print(json.dumps(result.value, indent=2))
747
748
return result.value
749
750
except Exception as e:
751
print(f"Failed to get configuration: {e}")
752
return None
753
754
def update_component_configuration(key_path, value_to_merge):
755
"""Update component configuration."""
756
try:
757
operation = ipc_client.new_update_configuration()
758
759
request = {
760
"keyPath": key_path,
761
"valueToMerge": value_to_merge
762
}
763
764
operation.activate(request)
765
result = operation.get_response().result()
766
767
print("Configuration updated successfully")
768
return True
769
770
except Exception as e:
771
print(f"Failed to update configuration: {e}")
772
return False
773
774
def subscribe_to_config_updates():
775
"""Subscribe to configuration update events."""
776
try:
777
operation = ipc_client.new_subscribe_to_configuration_update()
778
779
request = {}
780
781
def config_update_handler(event):
782
"""Handle configuration update events."""
783
print(f"Configuration updated:")
784
print(f" Component: {event.component_name}")
785
print(f" Key path: {event.key_path}")
786
787
operation.activate(request)
788
789
# Handle stream events
790
stream = operation.get_response().result()
791
while True:
792
try:
793
event = stream.get_next_message().result(timeout=30.0)
794
config_update_handler(event)
795
except Exception:
796
break
797
798
except Exception as e:
799
print(f"Failed to subscribe to config updates: {e}")
800
801
# Usage examples
802
config = get_component_configuration("MyComponent")
803
804
# Update configuration
805
update_component_configuration(
806
["mqtt", "timeout"],
807
{"connectionTimeout": 30}
808
)
809
```
810
811
### Device Shadow Operations
812
813
```python
814
from awsiot.greengrasscoreipc import connect
815
import json
816
817
ipc_client = connect()
818
819
def get_device_shadow(thing_name, shadow_name=None):
820
"""Get device shadow."""
821
try:
822
operation = ipc_client.new_get_thing_shadow()
823
824
request = {
825
"thingName": thing_name
826
}
827
828
if shadow_name:
829
request["shadowName"] = shadow_name
830
831
operation.activate(request)
832
result = operation.get_response().result()
833
834
# Parse shadow payload
835
shadow_data = json.loads(result.payload.decode())
836
print(f"Shadow for {thing_name}:")
837
print(json.dumps(shadow_data, indent=2))
838
839
return shadow_data
840
841
except Exception as e:
842
print(f"Failed to get shadow: {e}")
843
return None
844
845
def update_device_shadow(thing_name, shadow_state, shadow_name=None):
846
"""Update device shadow."""
847
try:
848
operation = ipc_client.new_update_thing_shadow()
849
850
shadow_document = {
851
"state": shadow_state
852
}
853
854
request = {
855
"thingName": thing_name,
856
"payload": json.dumps(shadow_document).encode()
857
}
858
859
if shadow_name:
860
request["shadowName"] = shadow_name
861
862
operation.activate(request)
863
result = operation.get_response().result()
864
865
# Parse response
866
response_data = json.loads(result.payload.decode())
867
print(f"Shadow updated for {thing_name}:")
868
print(json.dumps(response_data, indent=2))
869
870
return response_data
871
872
except Exception as e:
873
print(f"Failed to update shadow: {e}")
874
return None
875
876
# Usage
877
shadow = get_device_shadow("MyGreengrassDevice")
878
879
# Update shadow
880
updated_shadow = update_device_shadow(
881
"MyGreengrassDevice",
882
{
883
"reported": {
884
"temperature": 23.5,
885
"status": "online",
886
"last_update": "2023-12-07T10:30:00Z"
887
}
888
}
889
)
890
```
891
892
### Secrets Management
893
894
```python
895
from awsiot.greengrasscoreipc import connect
896
897
ipc_client = connect()
898
899
def get_secret(secret_id, version_id=None, version_stage=None):
900
"""Get secret value from AWS Secrets Manager."""
901
try:
902
operation = ipc_client.new_get_secret_value()
903
904
request = {
905
"secretId": secret_id
906
}
907
908
if version_id:
909
request["versionId"] = version_id
910
if version_stage:
911
request["versionStage"] = version_stage
912
913
operation.activate(request)
914
result = operation.get_response().result()
915
916
print(f"Retrieved secret: {result.secret_id}")
917
918
if result.secret_string:
919
return result.secret_string
920
elif result.secret_binary:
921
return result.secret_binary
922
else:
923
return None
924
925
except Exception as e:
926
print(f"Failed to get secret: {e}")
927
return None
928
929
# Usage
930
database_password = get_secret("prod/database/password")
931
if database_password:
932
print("Database password retrieved successfully")
933
# Use the password in your application
934
```