or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

device-shadow.mdfleet-provisioning.mdgreengrass-discovery.mdgreengrass-ipc.mdindex.mdiot-jobs.mdmqtt-connections.md

greengrass-ipc.mddocs/

0

# Greengrass Core IPC

1

2

Complete Greengrass Core Inter-Process Communication client providing access to all Greengrass Core services including component management, configuration, local deployment, pub/sub messaging, device shadow operations, security services, secrets management, and metrics collection.

3

4

## Capabilities

5

6

### Connection Setup

7

8

#### Connect Function

9

10

```python { .api }

11

def connect(**kwargs):

12

"""

13

Create IPC connection to Greengrass Core.

14

15

Parameters:

16

- ipc_socket (str): IPC socket path (optional, defaults to environment variable)

17

- authtoken (str): Authentication token (optional, defaults to environment variable)

18

- lifecycle_handler: Lifecycle event handler (optional)

19

- timeout (float): Connection timeout in seconds (optional)

20

21

Returns:

22

GreengrassCoreIPCClient: Connected IPC client

23

"""

24

```

25

26

Usage example:

27

28

```python

29

from awsiot.greengrasscoreipc import connect

30

31

# Connect using environment variables (typical for Greengrass components)

32

ipc_client = connect()

33

34

# Or specify connection parameters explicitly

35

ipc_client = connect(

36

ipc_socket="/tmp/greengrass_ipc.sock",

37

authtoken="your-auth-token"

38

)

39

```

40

41

### V1 IPC Client

42

43

#### Client Class

44

45

```python { .api }

46

class GreengrassCoreIPCClient:

47

"""

48

V1 client for Greengrass Core IPC operations with callback-based interface.

49

"""

50

def __init__(self, connection): ...

51

```

52

53

#### Component Management Operations

54

55

```python { .api }

56

def new_get_component_details(self):

57

"""Create operation to get component details."""

58

59

def new_list_components(self):

60

"""Create operation to list components."""

61

62

def new_restart_component(self):

63

"""Create operation to restart a component."""

64

65

def new_stop_component(self):

66

"""Create operation to stop a component."""

67

68

def new_pause_component(self):

69

"""Create operation to pause a component."""

70

71

def new_resume_component(self):

72

"""Create operation to resume a component."""

73

```

74

75

#### Configuration Management Operations

76

77

```python { .api }

78

def new_get_configuration(self):

79

"""Create operation to get configuration."""

80

81

def new_update_configuration(self):

82

"""Create operation to update configuration."""

83

84

def new_subscribe_to_configuration_update(self):

85

"""Create streaming operation to subscribe to configuration updates."""

86

```

87

88

#### Deployment Management Operations

89

90

```python { .api }

91

def new_create_local_deployment(self):

92

"""Create operation to create local deployment."""

93

94

def new_get_local_deployment_status(self):

95

"""Create operation to get local deployment status."""

96

97

def new_list_local_deployments(self):

98

"""Create operation to list local deployments."""

99

100

def new_cancel_local_deployment(self):

101

"""Create operation to cancel local deployment."""

102

```

103

104

#### IoT Core Integration Operations

105

106

```python { .api }

107

def new_publish_to_iot_core(self):

108

"""Create operation to publish message to IoT Core."""

109

110

def new_subscribe_to_iot_core(self):

111

"""Create streaming operation to subscribe to IoT Core topics."""

112

```

113

114

#### Local Pub/Sub Operations

115

116

```python { .api }

117

def new_publish_to_topic(self):

118

"""Create operation to publish to local topic."""

119

120

def new_subscribe_to_topic(self):

121

"""Create streaming operation to subscribe to local topics."""

122

```

123

124

#### Device Shadow Operations

125

126

```python { .api }

127

def new_get_thing_shadow(self):

128

"""Create operation to get thing shadow."""

129

130

def new_update_thing_shadow(self):

131

"""Create operation to update thing shadow."""

132

133

def new_delete_thing_shadow(self):

134

"""Create operation to delete thing shadow."""

135

136

def new_list_named_shadows_for_thing(self):

137

"""Create operation to list named shadows for thing."""

138

```

139

140

#### Security Operations

141

142

```python { .api }

143

def new_get_client_device_auth_token(self):

144

"""Create operation to get client device auth token."""

145

146

def new_verify_client_device_identity(self):

147

"""Create operation to verify client device identity."""

148

149

def new_authorize_client_device_action(self):

150

"""Create operation to authorize client device action."""

151

```

152

153

#### Secrets Management Operations

154

155

```python { .api }

156

def new_get_secret_value(self):

157

"""Create operation to get secret value."""

158

```

159

160

#### Metrics Operations

161

162

```python { .api }

163

def new_put_component_metric(self):

164

"""Create operation to put component metric."""

165

```

166

167

#### State Management Operations

168

169

```python { .api }

170

def new_update_state(self):

171

"""Create operation to update component state."""

172

173

def new_defer_component_update(self):

174

"""Create operation to defer component update."""

175

```

176

177

#### Certificate Management Operations

178

179

```python { .api }

180

def new_subscribe_to_certificate_updates(self):

181

"""Create streaming operation to subscribe to certificate updates."""

182

```

183

184

### V2 IPC Client

185

186

#### Client Class

187

188

```python { .api }

189

class GreengrassCoreIPCClientV2:

190

"""

191

V2 client for Greengrass Core IPC operations with Future-based interface.

192

"""

193

def __init__(self, connection): ...

194

```

195

196

The V2 client provides the same operations as V1 but with Future-based return types for better async/await support.

197

198

### Data Model Classes

199

200

#### Component Management

201

202

```python { .api }

203

@dataclass

204

class GetComponentDetailsRequest:

205

"""Request to get component details."""

206

component_name: str

207

208

@dataclass

209

class GetComponentDetailsResponse:

210

"""Response containing component details."""

211

component_details: Optional[ComponentDetails] = None

212

213

@dataclass

214

class ComponentDetails:

215

"""Details about a Greengrass component."""

216

component_name: Optional[str] = None

217

version: Optional[str] = None

218

state: Optional[str] = None # LifecycleState

219

configuration: Optional[Dict[str, Any]] = None

220

221

@dataclass

222

class ListComponentsRequest:

223

"""Request to list components."""

224

pass

225

226

@dataclass

227

class ListComponentsResponse:

228

"""Response containing component list."""

229

components: Optional[List[ComponentDetails]] = None

230

231

@dataclass

232

class RestartComponentRequest:

233

"""Request to restart component."""

234

component_name: str

235

236

@dataclass

237

class RestartComponentResponse:

238

"""Response from restart component operation."""

239

restart_status: Optional[str] = None # RequestStatus

240

message: Optional[str] = None

241

```

242

243

#### Configuration Management

244

245

```python { .api }

246

@dataclass

247

class GetConfigurationRequest:

248

"""Request to get configuration."""

249

component_name: Optional[str] = None

250

key_path: Optional[List[str]] = None

251

252

@dataclass

253

class GetConfigurationResponse:

254

"""Response containing configuration."""

255

component_name: Optional[str] = None

256

value: Optional[Dict[str, Any]] = None

257

258

@dataclass

259

class UpdateConfigurationRequest:

260

"""Request to update configuration."""

261

key_path: Optional[List[str]] = None

262

timestamp: Optional[datetime.datetime] = None

263

value_to_merge: Optional[Dict[str, Any]] = None

264

265

@dataclass

266

class UpdateConfigurationResponse:

267

"""Response from configuration update."""

268

pass

269

270

@dataclass

271

class ConfigurationUpdateEvent:

272

"""Event for configuration updates."""

273

component_name: Optional[str] = None

274

key_path: Optional[List[str]] = None

275

```

276

277

#### Deployment Management

278

279

```python { .api }

280

@dataclass

281

class CreateLocalDeploymentRequest:

282

"""Request to create local deployment."""

283

group_name: str

284

deployment_id: Optional[str] = None

285

components_to_add: Optional[Dict[str, ComponentDeploymentSpecification]] = None

286

components_to_remove: Optional[List[str]] = None

287

components_to_merge: Optional[Dict[str, ComponentDeploymentSpecification]] = None

288

failure_handling_policy: Optional[str] = None # FailureHandlingPolicy

289

recipe_directory_path: Optional[str] = None

290

artifacts_directory_path: Optional[str] = None

291

292

@dataclass

293

class CreateLocalDeploymentResponse:

294

"""Response from create local deployment."""

295

deployment_id: Optional[str] = None

296

297

@dataclass

298

class GetLocalDeploymentStatusRequest:

299

"""Request to get deployment status."""

300

deployment_id: str

301

302

@dataclass

303

class GetLocalDeploymentStatusResponse:

304

"""Response containing deployment status."""

305

deployment: Optional[LocalDeployment] = None

306

```

307

308

#### Pub/Sub Operations

309

310

```python { .api }

311

@dataclass

312

class PublishToTopicRequest:

313

"""Request to publish to local topic."""

314

topic: str

315

publish_message: Optional[PublishMessage] = None

316

317

@dataclass

318

class PublishToTopicResponse:

319

"""Response from publish to topic."""

320

pass

321

322

@dataclass

323

class PublishMessage:

324

"""Message to publish."""

325

json_message: Optional[Dict[str, Any]] = None

326

binary_message: Optional[BinaryMessage] = None

327

328

@dataclass

329

class SubscribeToTopicRequest:

330

"""Request to subscribe to local topic."""

331

topic: str

332

333

@dataclass

334

class SubscriptionResponseMessage:

335

"""Message received from subscription."""

336

json_message: Optional[Dict[str, Any]] = None

337

binary_message: Optional[BinaryMessage] = None

338

context: Optional[MessageContext] = None

339

```

340

341

#### IoT Core Integration

342

343

```python { .api }

344

@dataclass

345

class PublishToIoTCoreRequest:

346

"""Request to publish to IoT Core."""

347

topic_name: str

348

qos: Optional[str] = None # QOS

349

payload: Optional[bytes] = None

350

retain: Optional[bool] = None

351

user_properties: Optional[List[UserProperty]] = None

352

message_expiry_interval_seconds: Optional[int] = None

353

correlation_data: Optional[bytes] = None

354

response_topic: Optional[str] = None

355

payload_format: Optional[str] = None # PayloadFormat

356

content_type: Optional[str] = None

357

358

@dataclass

359

class PublishToIoTCoreResponse:

360

"""Response from publish to IoT Core."""

361

pass

362

363

@dataclass

364

class SubscribeToIoTCoreRequest:

365

"""Request to subscribe to IoT Core topic."""

366

topic_name: str

367

qos: Optional[str] = None # QOS

368

369

@dataclass

370

class IoTCoreMessage:

371

"""Message from IoT Core."""

372

message: Optional[MQTTMessage] = None

373

```

374

375

#### Device Shadow Operations

376

377

```python { .api }

378

@dataclass

379

class GetThingShadowRequest:

380

"""Request to get thing shadow."""

381

thing_name: str

382

shadow_name: Optional[str] = None

383

384

@dataclass

385

class GetThingShadowResponse:

386

"""Response containing thing shadow."""

387

payload: Optional[bytes] = None

388

389

@dataclass

390

class UpdateThingShadowRequest:

391

"""Request to update thing shadow."""

392

thing_name: str

393

payload: bytes

394

shadow_name: Optional[str] = None

395

396

@dataclass

397

class UpdateThingShadowResponse:

398

"""Response from update thing shadow."""

399

payload: Optional[bytes] = None

400

401

@dataclass

402

class DeleteThingShadowRequest:

403

"""Request to delete thing shadow."""

404

thing_name: str

405

shadow_name: Optional[str] = None

406

407

@dataclass

408

class DeleteThingShadowResponse:

409

"""Response from delete thing shadow."""

410

payload: Optional[bytes] = None

411

```

412

413

#### Security Operations

414

415

```python { .api }

416

@dataclass

417

class GetClientDeviceAuthTokenRequest:

418

"""Request to get client device auth token."""

419

credential: Optional[CredentialDocument] = None

420

421

@dataclass

422

class GetClientDeviceAuthTokenResponse:

423

"""Response containing auth token."""

424

client_device_auth_token: Optional[str] = None

425

426

@dataclass

427

class VerifyClientDeviceIdentityRequest:

428

"""Request to verify client device identity."""

429

credential: Optional[CredentialDocument] = None

430

431

@dataclass

432

class VerifyClientDeviceIdentityResponse:

433

"""Response from identity verification."""

434

is_valid_client_device: Optional[bool] = None

435

436

@dataclass

437

class AuthorizeClientDeviceActionRequest:

438

"""Request to authorize client device action."""

439

client_device_auth_token: str

440

operation: str

441

resource: str

442

443

@dataclass

444

class AuthorizeClientDeviceActionResponse:

445

"""Response from authorization check."""

446

is_authorized: Optional[bool] = None

447

```

448

449

#### Secrets Management

450

451

```python { .api }

452

@dataclass

453

class GetSecretValueRequest:

454

"""Request to get secret value."""

455

secret_id: str

456

version_id: Optional[str] = None

457

version_stage: Optional[str] = None

458

459

@dataclass

460

class GetSecretValueResponse:

461

"""Response containing secret value."""

462

secret_id: Optional[str] = None

463

version_id: Optional[str] = None

464

secret_binary: Optional[bytes] = None

465

secret_string: Optional[str] = None

466

```

467

468

#### Metrics

469

470

```python { .api }

471

@dataclass

472

class PutComponentMetricRequest:

473

"""Request to put component metric."""

474

metrics: List[Metric]

475

476

@dataclass

477

class PutComponentMetricResponse:

478

"""Response from put metric operation."""

479

pass

480

481

@dataclass

482

class Metric:

483

"""Component metric data."""

484

name: str

485

unit: Optional[str] = None # MetricUnitType

486

value: Optional[float] = None

487

timestamp: Optional[datetime.datetime] = None

488

```

489

490

### Constants

491

492

#### Lifecycle States

493

494

```python { .api }

495

class LifecycleState:

496

"""Component lifecycle state constants."""

497

NEW = "NEW"

498

INSTALLED = "INSTALLED"

499

STARTING = "STARTING"

500

RUNNING = "RUNNING"

501

STOPPING = "STOPPING"

502

ERRORED = "ERRORED"

503

BROKEN = "BROKEN"

504

FINISHED = "FINISHED"

505

```

506

507

#### Request Status

508

509

```python { .api }

510

class RequestStatus:

511

"""Request status constants."""

512

SUCCEEDED = "SUCCEEDED"

513

FAILED = "FAILED"

514

```

515

516

#### QoS Levels

517

518

```python { .api }

519

class QOS:

520

"""MQTT QoS level constants."""

521

AT_MOST_ONCE = "AT_MOST_ONCE"

522

AT_LEAST_ONCE = "AT_LEAST_ONCE"

523

```

524

525

## Usage Examples

526

527

### Component Management

528

529

```python

530

from awsiot.greengrasscoreipc import connect

531

import json

532

533

# Connect to Greengrass Core

534

ipc_client = connect()

535

536

def list_all_components():

537

"""List all Greengrass components."""

538

try:

539

# Create operation

540

operation = ipc_client.new_list_components()

541

542

# Activate operation

543

operation.activate({})

544

545

# Get result

546

result = operation.get_response().result()

547

548

print("Greengrass Components:")

549

for component in result.components:

550

print(f" Name: {component.component_name}")

551

print(f" Version: {component.version}")

552

print(f" State: {component.state}")

553

print(" ---")

554

555

return result.components

556

557

except Exception as e:

558

print(f"Failed to list components: {e}")

559

return []

560

561

def get_component_details(component_name):

562

"""Get details for a specific component."""

563

try:

564

operation = ipc_client.new_get_component_details()

565

566

request = {

567

"componentName": component_name

568

}

569

570

operation.activate(request)

571

result = operation.get_response().result()

572

573

details = result.component_details

574

print(f"Component: {details.component_name}")

575

print(f"Version: {details.version}")

576

print(f"State: {details.state}")

577

print(f"Configuration: {json.dumps(details.configuration, indent=2)}")

578

579

return details

580

581

except Exception as e:

582

print(f"Failed to get component details: {e}")

583

return None

584

585

def restart_component(component_name):

586

"""Restart a Greengrass component."""

587

try:

588

operation = ipc_client.new_restart_component()

589

590

request = {

591

"componentName": component_name

592

}

593

594

operation.activate(request)

595

result = operation.get_response().result()

596

597

print(f"Restart status: {result.restart_status}")

598

if result.message:

599

print(f"Message: {result.message}")

600

601

return result.restart_status == "SUCCEEDED"

602

603

except Exception as e:

604

print(f"Failed to restart component: {e}")

605

return False

606

607

# Usage

608

components = list_all_components()

609

if components:

610

details = get_component_details(components[0].component_name)

611

612

# Restart a component (be careful with this!)

613

# restart_component("YourComponentName")

614

```

615

616

### Local Pub/Sub

617

618

```python

619

from awsiot.greengrasscoreipc import connect

620

import json

621

import time

622

import threading

623

624

ipc_client = connect()

625

626

def publish_to_local_topic(topic, message):

627

"""Publish message to local Greengrass topic."""

628

try:

629

operation = ipc_client.new_publish_to_topic()

630

631

request = {

632

"topic": topic,

633

"publishMessage": {

634

"jsonMessage": message

635

}

636

}

637

638

operation.activate(request)

639

result = operation.get_response().result()

640

641

print(f"Published to topic {topic}: {message}")

642

return True

643

644

except Exception as e:

645

print(f"Failed to publish: {e}")

646

return False

647

648

def subscribe_to_local_topic(topic, callback):

649

"""Subscribe to local Greengrass topic."""

650

try:

651

operation = ipc_client.new_subscribe_to_topic()

652

653

request = {

654

"topic": topic

655

}

656

657

def stream_handler(event):

658

"""Handle incoming messages."""

659

try:

660

if hasattr(event, 'json_message') and event.json_message:

661

callback(topic, event.json_message)

662

elif hasattr(event, 'binary_message') and event.binary_message:

663

callback(topic, event.binary_message.message)

664

except Exception as e:

665

print(f"Error handling message: {e}")

666

667

# Activate subscription

668

operation.activate(request)

669

670

# Set up message handler

671

operation.get_response().add_done_callback(

672

lambda future: print(f"Subscription to {topic} established")

673

)

674

675

# Handle stream events

676

stream = operation.get_response().result()

677

while True:

678

try:

679

event = stream.get_next_message().result(timeout=1.0)

680

stream_handler(event)

681

except Exception:

682

# Timeout or stream closed

683

break

684

685

except Exception as e:

686

print(f"Failed to subscribe to topic: {e}")

687

688

def message_handler(topic, message):

689

"""Handle received messages."""

690

print(f"Received on {topic}: {message}")

691

692

# Example usage

693

def pub_sub_example():

694

"""Example of local pub/sub communication."""

695

696

# Start subscriber in background thread

697

subscribe_thread = threading.Thread(

698

target=subscribe_to_local_topic,

699

args=("sensor/temperature", message_handler)

700

)

701

subscribe_thread.daemon = True

702

subscribe_thread.start()

703

704

# Give subscriber time to connect

705

time.sleep(1)

706

707

# Publish some messages

708

for i in range(5):

709

message = {

710

"temperature": 20.0 + i,

711

"humidity": 45.0 + i * 2,

712

"timestamp": time.time()

713

}

714

715

publish_to_local_topic("sensor/temperature", message)

716

time.sleep(2)

717

718

# Run example

719

pub_sub_example()

720

```

721

722

### Configuration Management

723

724

```python

725

from awsiot.greengrasscoreipc import connect

726

import json

727

728

ipc_client = connect()

729

730

def get_component_configuration(component_name, key_path=None):

731

"""Get component configuration."""

732

try:

733

operation = ipc_client.new_get_configuration()

734

735

request = {

736

"componentName": component_name

737

}

738

739

if key_path:

740

request["keyPath"] = key_path

741

742

operation.activate(request)

743

result = operation.get_response().result()

744

745

print(f"Configuration for {component_name}:")

746

print(json.dumps(result.value, indent=2))

747

748

return result.value

749

750

except Exception as e:

751

print(f"Failed to get configuration: {e}")

752

return None

753

754

def update_component_configuration(key_path, value_to_merge):

755

"""Update component configuration."""

756

try:

757

operation = ipc_client.new_update_configuration()

758

759

request = {

760

"keyPath": key_path,

761

"valueToMerge": value_to_merge

762

}

763

764

operation.activate(request)

765

result = operation.get_response().result()

766

767

print("Configuration updated successfully")

768

return True

769

770

except Exception as e:

771

print(f"Failed to update configuration: {e}")

772

return False

773

774

def subscribe_to_config_updates():

775

"""Subscribe to configuration update events."""

776

try:

777

operation = ipc_client.new_subscribe_to_configuration_update()

778

779

request = {}

780

781

def config_update_handler(event):

782

"""Handle configuration update events."""

783

print(f"Configuration updated:")

784

print(f" Component: {event.component_name}")

785

print(f" Key path: {event.key_path}")

786

787

operation.activate(request)

788

789

# Handle stream events

790

stream = operation.get_response().result()

791

while True:

792

try:

793

event = stream.get_next_message().result(timeout=30.0)

794

config_update_handler(event)

795

except Exception:

796

break

797

798

except Exception as e:

799

print(f"Failed to subscribe to config updates: {e}")

800

801

# Usage examples

802

config = get_component_configuration("MyComponent")

803

804

# Update configuration

805

update_component_configuration(

806

["mqtt", "timeout"],

807

{"connectionTimeout": 30}

808

)

809

```

810

811

### Device Shadow Operations

812

813

```python

814

from awsiot.greengrasscoreipc import connect

815

import json

816

817

ipc_client = connect()

818

819

def get_device_shadow(thing_name, shadow_name=None):

820

"""Get device shadow."""

821

try:

822

operation = ipc_client.new_get_thing_shadow()

823

824

request = {

825

"thingName": thing_name

826

}

827

828

if shadow_name:

829

request["shadowName"] = shadow_name

830

831

operation.activate(request)

832

result = operation.get_response().result()

833

834

# Parse shadow payload

835

shadow_data = json.loads(result.payload.decode())

836

print(f"Shadow for {thing_name}:")

837

print(json.dumps(shadow_data, indent=2))

838

839

return shadow_data

840

841

except Exception as e:

842

print(f"Failed to get shadow: {e}")

843

return None

844

845

def update_device_shadow(thing_name, shadow_state, shadow_name=None):

846

"""Update device shadow."""

847

try:

848

operation = ipc_client.new_update_thing_shadow()

849

850

shadow_document = {

851

"state": shadow_state

852

}

853

854

request = {

855

"thingName": thing_name,

856

"payload": json.dumps(shadow_document).encode()

857

}

858

859

if shadow_name:

860

request["shadowName"] = shadow_name

861

862

operation.activate(request)

863

result = operation.get_response().result()

864

865

# Parse response

866

response_data = json.loads(result.payload.decode())

867

print(f"Shadow updated for {thing_name}:")

868

print(json.dumps(response_data, indent=2))

869

870

return response_data

871

872

except Exception as e:

873

print(f"Failed to update shadow: {e}")

874

return None

875

876

# Usage

877

shadow = get_device_shadow("MyGreengrassDevice")

878

879

# Update shadow

880

updated_shadow = update_device_shadow(

881

"MyGreengrassDevice",

882

{

883

"reported": {

884

"temperature": 23.5,

885

"status": "online",

886

"last_update": "2023-12-07T10:30:00Z"

887

}

888

}

889

)

890

```

891

892

### Secrets Management

893

894

```python

895

from awsiot.greengrasscoreipc import connect

896

897

ipc_client = connect()

898

899

def get_secret(secret_id, version_id=None, version_stage=None):

900

"""Get secret value from AWS Secrets Manager."""

901

try:

902

operation = ipc_client.new_get_secret_value()

903

904

request = {

905

"secretId": secret_id

906

}

907

908

if version_id:

909

request["versionId"] = version_id

910

if version_stage:

911

request["versionStage"] = version_stage

912

913

operation.activate(request)

914

result = operation.get_response().result()

915

916

print(f"Retrieved secret: {result.secret_id}")

917

918

if result.secret_string:

919

return result.secret_string

920

elif result.secret_binary:

921

return result.secret_binary

922

else:

923

return None

924

925

except Exception as e:

926

print(f"Failed to get secret: {e}")

927

return None

928

929

# Usage

930

database_password = get_secret("prod/database/password")

931

if database_password:

932

print("Database password retrieved successfully")

933

# Use the password in your application

934

```