or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-features.mdaws-integration.mdcircuits.mddevices-simulation.mdindex.mdquantum-information.md

aws-integration.mddocs/

0

# AWS Integration

1

2

Complete AWS cloud integration for quantum computing on Amazon Braket, including device access, task management, sessions, jobs, and cost tracking.

3

4

## Core Imports

5

6

```python { .api }

7

from braket.aws import (

8

AwsDevice, AwsDeviceType, AwsQuantumTask, AwsQuantumTaskBatch,

9

AwsQuantumJob, AwsSession, DirectReservation

10

)

11

from braket.jobs import (

12

hybrid_job, CheckpointConfig, InstanceConfig, OutputDataConfig,

13

S3DataSourceConfig, StoppingCondition, LocalQuantumJob,

14

load_job_checkpoint, load_job_result, save_job_checkpoint, save_job_result,

15

get_checkpoint_dir, get_hyperparameters, get_input_data_dir,

16

get_job_device_arn, get_job_name, get_results_dir,

17

Framework, retrieve_image

18

)

19

from braket.tracking import Tracker

20

from braket.tasks import QuantumTask, QuantumTaskBatch

21

```

22

23

## AWS Device Management

24

25

### Device Discovery and Access

26

27

```python { .api }

28

from braket.aws import AwsDevice, AwsDeviceType

29

from enum import Enum

30

31

class AwsDeviceType(Enum):

32

"""AWS Braket device type enumeration."""

33

SIMULATOR = "SIMULATOR"

34

QPU = "QPU"

35

36

class AwsDevice:

37

"""Amazon Braket implementation of quantum devices."""

38

39

def __init__(self, arn: str, aws_session: 'AwsSession' = None):

40

"""

41

Initialize AWS quantum device.

42

43

Args:

44

arn: AWS device ARN

45

aws_session: Optional AWS session, creates default if None

46

"""

47

self.arn = arn

48

self.aws_session = aws_session

49

50

@staticmethod

51

def get_devices(

52

arns: list[str] = None,

53

names: list[str] = None,

54

types: list[AwsDeviceType] = None,

55

statuses: list[str] = None,

56

provider_names: list[str] = None,

57

aws_session: 'AwsSession' = None

58

) -> list['AwsDevice']:

59

"""

60

Get AWS Braket devices matching criteria.

61

62

Args:

63

arns: Device ARNs to filter by

64

names: Device names to filter by

65

types: Device types to filter by

66

statuses: Device statuses to filter by

67

provider_names: Provider names to filter by

68

aws_session: AWS session for API calls

69

70

Returns:

71

list[AwsDevice]: Matching devices

72

"""

73

pass

74

75

def run(

76

self,

77

task_specification,

78

shots: int = 1000,

79

inputs: dict = None,

80

gate_definitions: dict = None,

81

reservation_arn: str = None,

82

tags: dict = None

83

) -> 'AwsQuantumTask':

84

"""

85

Execute quantum task on AWS device.

86

87

Args:

88

task_specification: Circuit, AHS program, or annealing problem

89

shots: Number of measurement shots

90

inputs: Input parameters for parameterized programs

91

gate_definitions: Custom gate definitions

92

reservation_arn: Device reservation ARN

93

tags: AWS resource tags

94

95

Returns:

96

AwsQuantumTask: Quantum task execution handle

97

"""

98

pass

99

100

def run_batch(

101

self,

102

task_specifications: list,

103

shots: int = 1000,

104

max_parallel: int = 10,

105

inputs: list[dict] = None,

106

tags: dict = None

107

) -> 'AwsQuantumTaskBatch':

108

"""

109

Execute batch of quantum tasks.

110

111

Args:

112

task_specifications: List of circuits/programs

113

shots: Number of measurement shots per task

114

max_parallel: Maximum parallel executions

115

inputs: Input parameters for each task

116

tags: AWS resource tags

117

118

Returns:

119

AwsQuantumTaskBatch: Batch execution handle

120

"""

121

pass

122

123

@property

124

def name(self) -> str:

125

"""Device name."""

126

pass

127

128

@property

129

def status(self) -> str:

130

"""Current device status."""

131

pass

132

133

@property

134

def type(self) -> AwsDeviceType:

135

"""Device type (QPU or Simulator)."""

136

pass

137

138

@property

139

def provider_name(self) -> str:

140

"""Device provider name."""

141

pass

142

143

@property

144

def properties(self) -> dict:

145

"""Device capabilities and properties."""

146

pass

147

148

# Device discovery examples

149

def discover_aws_devices() -> dict[str, list[AwsDevice]]:

150

"""

151

Discover and categorize available AWS Braket devices.

152

153

Returns:

154

dict: Categorized devices by provider and type

155

"""

156

devices = {

157

'simulators': AwsDevice.get_devices(types=[AwsDeviceType.SIMULATOR]),

158

'qpus': AwsDevice.get_devices(types=[AwsDeviceType.QPU]),

159

'ionq': AwsDevice.get_devices(provider_names=['IonQ']),

160

'rigetti': AwsDevice.get_devices(provider_names=['Rigetti']),

161

'oqc': AwsDevice.get_devices(provider_names=['Oxford Quantum Computing']),

162

'quera': AwsDevice.get_devices(provider_names=['QuEra'])

163

}

164

165

return devices

166

167

def select_optimal_device(circuit, requirements: dict) -> AwsDevice:

168

"""

169

Select optimal device based on circuit and requirements.

170

171

Args:

172

circuit: Quantum circuit to execute

173

requirements: Performance and cost requirements

174

175

Returns:

176

AwsDevice: Selected optimal device

177

"""

178

all_devices = AwsDevice.get_devices()

179

180

# Filter by availability and qubit count

181

available_devices = []

182

for device in all_devices:

183

if (device.status == 'ONLINE' and

184

device.properties.get('qubitCount', 0) >= circuit.qubit_count):

185

available_devices.append(device)

186

187

# Select based on requirements

188

if requirements.get('cost_priority', False):

189

# Prefer simulators for cost

190

simulators = [d for d in available_devices if d.type == AwsDeviceType.SIMULATOR]

191

return simulators[0] if simulators else available_devices[0]

192

193

elif requirements.get('real_hardware', False):

194

# Prefer QPUs for real quantum effects

195

qpus = [d for d in available_devices if d.type == AwsDeviceType.QPU]

196

return qpus[0] if qpus else available_devices[0]

197

198

else:

199

# Default selection

200

return available_devices[0]

201

```

202

203

### Device Properties and Capabilities

204

205

```python { .api }

206

def analyze_device_capabilities(device: AwsDevice) -> dict:

207

"""

208

Analyze device capabilities and constraints.

209

210

Args:

211

device: AWS device to analyze

212

213

Returns:

214

dict: Comprehensive device capability analysis

215

"""

216

properties = device.properties

217

218

capabilities = {

219

'basic_info': {

220

'name': device.name,

221

'provider': device.provider_name,

222

'type': device.type.value,

223

'status': device.status

224

},

225

'quantum_specs': {},

226

'gate_set': {},

227

'connectivity': {},

228

'timing_info': {},

229

'error_rates': {}

230

}

231

232

# Quantum specifications

233

if 'qubitCount' in properties:

234

capabilities['quantum_specs']['qubit_count'] = properties['qubitCount']

235

236

# Gate set analysis

237

if 'supportedOperations' in properties:

238

capabilities['gate_set'] = properties['supportedOperations']

239

240

# Connectivity graph

241

if 'connectivity' in properties:

242

capabilities['connectivity'] = properties['connectivity']

243

244

# Timing constraints

245

if 'timing' in properties:

246

capabilities['timing_info'] = properties['timing']

247

248

# Error characteristics

249

if 'fidelity' in properties:

250

capabilities['error_rates'] = properties['fidelity']

251

252

return capabilities

253

254

def check_circuit_compatibility(circuit, device: AwsDevice) -> dict:

255

"""

256

Check circuit compatibility with device.

257

258

Args:

259

circuit: Quantum circuit to validate

260

device: Target AWS device

261

262

Returns:

263

dict: Compatibility analysis results

264

"""

265

properties = device.properties

266

267

compatibility = {

268

'is_compatible': True,

269

'issues': [],

270

'warnings': [],

271

'recommendations': []

272

}

273

274

# Check qubit count

275

if circuit.qubit_count > properties.get('qubitCount', 0):

276

compatibility['is_compatible'] = False

277

compatibility['issues'].append(f"Circuit requires {circuit.qubit_count} qubits, device has {properties.get('qubitCount', 0)}")

278

279

# Check gate support

280

supported_gates = properties.get('supportedOperations', [])

281

for instruction in circuit.instructions:

282

gate_name = instruction.operator.__class__.__name__

283

if gate_name not in supported_gates:

284

compatibility['warnings'].append(f"Gate {gate_name} may not be natively supported")

285

286

# Check connectivity (for QPUs)

287

if device.type == AwsDeviceType.QPU and 'connectivity' in properties:

288

connectivity_graph = properties['connectivity']

289

for instruction in circuit.instructions:

290

if len(instruction.target) == 2: # Two-qubit gate

291

qubit_pair = tuple(instruction.target)

292

if qubit_pair not in connectivity_graph.get('fullyConnected', []):

293

compatibility['warnings'].append(f"Two-qubit gate on {qubit_pair} may require SWAP operations")

294

295

return compatibility

296

```

297

298

## Quantum Task Management

299

300

### Task Execution and Monitoring

301

302

```python { .api }

303

from braket.aws import AwsQuantumTask, AwsQuantumTaskBatch

304

from braket.tasks import QuantumTask

305

306

class AwsQuantumTask(QuantumTask):

307

"""AWS quantum task execution interface."""

308

309

def __init__(self, arn: str, aws_session: 'AwsSession' = None):

310

"""

311

Initialize AWS quantum task.

312

313

Args:

314

arn: Task ARN

315

aws_session: AWS session for API calls

316

"""

317

self.arn = arn

318

self.aws_session = aws_session

319

320

def result(self) -> 'TaskResult':

321

"""

322

Get task execution results.

323

324

Returns:

325

TaskResult: Quantum task results

326

"""

327

pass

328

329

def state(self) -> str:

330

"""

331

Get current task state.

332

333

Returns:

334

str: Task state (QUEUED, RUNNING, COMPLETED, FAILED, CANCELLED)

335

"""

336

pass

337

338

def cancel(self) -> None:

339

"""Cancel running task."""

340

pass

341

342

def metadata(self) -> dict:

343

"""

344

Get task metadata.

345

346

Returns:

347

dict: Task execution metadata

348

"""

349

pass

350

351

@property

352

def id(self) -> str:

353

"""Task ID."""

354

pass

355

356

@staticmethod

357

def get(arn: str, aws_session: 'AwsSession' = None) -> 'AwsQuantumTask':

358

"""

359

Get existing task by ARN.

360

361

Args:

362

arn: Task ARN

363

aws_session: AWS session

364

365

Returns:

366

AwsQuantumTask: Task instance

367

"""

368

pass

369

370

class AwsQuantumTaskBatch:

371

"""Batch execution of quantum tasks."""

372

373

def __init__(self, tasks: list[AwsQuantumTask]):

374

"""

375

Initialize task batch.

376

377

Args:

378

tasks: List of quantum tasks

379

"""

380

self.tasks = tasks

381

382

def results(self) -> list:

383

"""

384

Get results from all tasks in batch.

385

386

Returns:

387

list: Results from completed tasks

388

"""

389

pass

390

391

def unsuccessful(self) -> list[AwsQuantumTask]:

392

"""

393

Get unsuccessful tasks.

394

395

Returns:

396

list[AwsQuantumTask]: Failed or cancelled tasks

397

"""

398

pass

399

400

def retry_unsuccessful_tasks(self) -> 'AwsQuantumTaskBatch':

401

"""

402

Retry failed tasks.

403

404

Returns:

405

AwsQuantumTaskBatch: New batch with retried tasks

406

"""

407

pass

408

409

# Task execution examples

410

def execute_quantum_task_with_monitoring(circuit, device: AwsDevice) -> dict:

411

"""

412

Execute quantum task with comprehensive monitoring.

413

414

Args:

415

circuit: Quantum circuit to execute

416

device: Target AWS device

417

418

Returns:

419

dict: Task results and execution information

420

"""

421

import time

422

423

# Submit task

424

task = device.run(circuit, shots=1000)

425

426

execution_info = {

427

'task_arn': task.arn,

428

'submission_time': time.time(),

429

'device_arn': device.arn,

430

'status_history': []

431

}

432

433

# Monitor execution

434

while task.state() not in ['COMPLETED', 'FAILED', 'CANCELLED']:

435

status = task.state()

436

execution_info['status_history'].append({

437

'status': status,

438

'timestamp': time.time()

439

})

440

441

print(f"Task status: {status}")

442

time.sleep(5) # Check every 5 seconds

443

444

# Get final results

445

if task.state() == 'COMPLETED':

446

result = task.result()

447

execution_info['completion_time'] = time.time()

448

execution_info['results'] = {

449

'measurement_counts': result.measurement_counts,

450

'task_metadata': result.task_metadata

451

}

452

else:

453

execution_info['error_info'] = task.metadata()

454

455

return execution_info

456

457

def execute_parameter_sweep(circuit_template, parameter_ranges: dict, device: AwsDevice) -> dict:

458

"""

459

Execute parameter sweep across quantum circuit parameters.

460

461

Args:

462

circuit_template: Parameterized circuit template

463

parameter_ranges: Parameter ranges to sweep

464

device: Target AWS device

465

466

Returns:

467

dict: Parameter sweep results

468

"""

469

import itertools

470

from braket.parametric import FreeParameter

471

472

# Generate parameter combinations

473

param_names = list(parameter_ranges.keys())

474

param_values = list(parameter_ranges.values())

475

param_combinations = list(itertools.product(*param_values))

476

477

# Create circuits for each parameter combination

478

circuits = []

479

for combination in param_combinations:

480

param_dict = dict(zip(param_names, combination))

481

circuit = circuit_template.copy()

482

483

# Bind parameters

484

for param_name, value in param_dict.items():

485

circuit = circuit.substitute_free_parameter(FreeParameter(param_name), value)

486

487

circuits.append((circuit, param_dict))

488

489

# Execute batch

490

task_specs = [circuit for circuit, _ in circuits]

491

batch = device.run_batch(task_specs, shots=1000)

492

493

# Collect results

494

results = batch.results()

495

sweep_results = {

496

'parameter_combinations': [params for _, params in circuits],

497

'results': results,

498

'analysis': {}

499

}

500

501

# Analyze results

502

for i, (result, params) in enumerate(zip(results, sweep_results['parameter_combinations'])):

503

sweep_results['analysis'][str(params)] = {

504

'measurement_counts': result.measurement_counts,

505

'expectation_values': getattr(result, 'values', [])

506

}

507

508

return sweep_results

509

```

510

511

## AWS Session Management

512

513

### Authentication and Configuration

514

515

```python { .api }

516

from braket.aws import AwsSession

517

import boto3

518

519

class AwsSession:

520

"""AWS authentication and session management."""

521

522

def __init__(

523

self,

524

boto_session: boto3.Session = None,

525

braket_user_agents: list[str] = None,

526

config: dict = None

527

):

528

"""

529

Initialize AWS session for Braket.

530

531

Args:

532

boto_session: Pre-configured boto3 session

533

braket_user_agents: Custom user agent strings

534

config: Session configuration options

535

"""

536

self.boto_session = boto_session

537

self.braket_user_agents = braket_user_agents

538

self.config = config or {}

539

540

@property

541

def region(self) -> str:

542

"""Current AWS region."""

543

pass

544

545

@property

546

def account_id(self) -> str:

547

"""AWS account ID."""

548

pass

549

550

def get_device(self, arn: str) -> AwsDevice:

551

"""

552

Get device using this session.

553

554

Args:

555

arn: Device ARN

556

557

Returns:

558

AwsDevice: Device with this session

559

"""

560

pass

561

562

def create_aws_session_with_profile(profile_name: str, region: str = 'us-east-1') -> AwsSession:

563

"""

564

Create AWS session using named profile.

565

566

Args:

567

profile_name: AWS credential profile name

568

region: AWS region for Braket services

569

570

Returns:

571

AwsSession: Configured session

572

"""

573

boto_session = boto3.Session(profile_name=profile_name, region_name=region)

574

return AwsSession(boto_session=boto_session)

575

576

def create_aws_session_with_keys(access_key: str, secret_key: str, region: str = 'us-east-1') -> AwsSession:

577

"""

578

Create AWS session using access keys.

579

580

Args:

581

access_key: AWS access key ID

582

secret_key: AWS secret access key

583

region: AWS region for Braket services

584

585

Returns:

586

AwsSession: Configured session

587

"""

588

boto_session = boto3.Session(

589

aws_access_key_id=access_key,

590

aws_secret_access_key=secret_key,

591

region_name=region

592

)

593

return AwsSession(boto_session=boto_session)

594

595

def validate_aws_session(session: AwsSession) -> dict:

596

"""

597

Validate AWS session configuration and permissions.

598

599

Args:

600

session: AWS session to validate

601

602

Returns:

603

dict: Validation results and recommendations

604

"""

605

validation = {

606

'is_valid': True,

607

'region': session.region,

608

'account_id': session.account_id,

609

'permissions': {},

610

'recommendations': []

611

}

612

613

try:

614

# Test basic Braket permissions

615

devices = AwsDevice.get_devices(aws_session=session)

616

validation['permissions']['device_access'] = True

617

validation['permissions']['accessible_devices'] = len(devices)

618

619

except Exception as e:

620

validation['is_valid'] = False

621

validation['permissions']['device_access'] = False

622

validation['error'] = str(e)

623

validation['recommendations'].append("Check Braket service permissions")

624

625

# Check for cost optimization

626

if validation['region'] != 'us-east-1':

627

validation['recommendations'].append("Consider us-east-1 for lowest latency to Braket services")

628

629

return validation

630

```

631

632

## Device Reservations

633

634

### Direct Reservations

635

636

```python { .api }

637

from braket.aws import DirectReservation

638

639

class DirectReservation:

640

"""Context manager for using reserved quantum devices on AWS Braket."""

641

642

def __init__(

643

self,

644

device,

645

reservation_arn: str = None

646

):

647

"""

648

Initialize direct reservation context.

649

650

Args:

651

device: Braket device or device ARN with reservation

652

reservation_arn: AWS Braket Direct reservation ARN

653

"""

654

self.device = device

655

self.reservation_arn = reservation_arn

656

657

def __enter__(self) -> 'DirectReservation':

658

"""

659

Enter reservation context and apply reservation to quantum tasks.

660

661

Returns:

662

DirectReservation: Self for context management

663

"""

664

pass

665

666

def __exit__(self, exc_type, exc_val, exc_tb):

667

"""Exit reservation context and restore normal device access."""

668

pass

669

670

def start(self) -> 'DirectReservation':

671

"""

672

Start reservation without context manager.

673

674

Returns:

675

DirectReservation: Self for method chaining

676

"""

677

pass

678

679

def stop(self) -> 'DirectReservation':

680

"""

681

Stop reservation manually.

682

683

Returns:

684

DirectReservation: Self for method chaining

685

"""

686

pass

687

688

# Reservation management examples

689

def use_device_reservation(device_arn: str, reservation_arn: str, circuits: list) -> dict:

690

"""

691

Execute circuits using a reserved quantum device.

692

693

Args:

694

device_arn: AWS device ARN with reservation

695

reservation_arn: Reservation ARN for exclusive access

696

circuits: List of quantum circuits to execute

697

698

Returns:

699

dict: Execution results with reservation metadata

700

"""

701

from braket.aws import AwsDevice

702

703

device = AwsDevice(device_arn)

704

reservation_results = {

705

'reservation_arn': reservation_arn,

706

'device_arn': device_arn,

707

'circuits_executed': len(circuits),

708

'results': [],

709

'cost_analysis': {},

710

'reservation_utilization': {}

711

}

712

713

# Execute circuits within reservation context

714

with DirectReservation(device=device, reservation_arn=reservation_arn):

715

for i, circuit in enumerate(circuits):

716

task = device.run(circuit, shots=1000)

717

result = task.result()

718

719

reservation_results['results'].append({

720

'circuit_index': i,

721

'task_arn': task.arn,

722

'measurement_counts': result.measurement_counts,

723

'execution_duration': result.task_metadata.get('executionDuration', 'unknown')

724

})

725

726

# Analyze reservation utilization

727

reservation_results['reservation_utilization'] = {

728

'total_tasks': len(circuits),

729

'estimated_time_savings': 'No queue wait time',

730

'dedicated_access': True,

731

'billing_model': 'Reservation-based'

732

}

733

734

return reservation_results

735

736

def manage_reservation_lifecycle(device_arn: str) -> dict:

737

"""

738

Demonstrate reservation lifecycle management.

739

740

Args:

741

device_arn: AWS device ARN for reservation

742

743

Returns:

744

dict: Reservation management workflow

745

"""

746

reservation_workflow = {

747

'phases': [

748

'reservation_planning',

749

'reservation_creation',

750

'reservation_usage',

751

'reservation_monitoring',

752

'reservation_completion'

753

],

754

'implementation_guide': {}

755

}

756

757

# Reservation planning phase

758

planning_recommendations = {

759

'optimal_duration': 'Based on workload analysis',

760

'cost_estimation': {

761

'reserved_vs_on_demand': 'Calculate savings for extended usage',

762

'minimum_duration': '1 hour minimum for most devices',

763

'cancellation_policy': 'Check device-specific policies'

764

},

765

'scheduling_considerations': [

766

'Device availability windows',

767

'Time zone coordination for team access',

768

'Buffer time for job completion'

769

]

770

}

771

772

# Usage optimization

773

usage_optimization = {

774

'batch_execution': 'Group circuits for efficient execution',

775

'parallel_jobs': 'Use multiple job slots if available',

776

'monitoring_tools': 'Track utilization during reservation',

777

'fallback_strategy': 'Plan for unexpected issues'

778

}

779

780

reservation_workflow['implementation_guide'] = {

781

'planning': planning_recommendations,

782

'optimization': usage_optimization,

783

'monitoring': {

784

'key_metrics': ['utilization_rate', 'cost_per_shot', 'queue_time_saved'],

785

'alerting': 'Set up notifications for underutilization'

786

}

787

}

788

789

return reservation_workflow

790

791

def calculate_reservation_cost_benefit(usage_pattern: dict, device_pricing: dict) -> dict:

792

"""

793

Calculate cost benefits of device reservations vs on-demand usage.

794

795

Args:

796

usage_pattern: Expected usage patterns

797

device_pricing: Device pricing information

798

799

Returns:

800

dict: Cost-benefit analysis for reservations

801

"""

802

analysis = {

803

'usage_input': usage_pattern,

804

'pricing_input': device_pricing,

805

'cost_comparison': {},

806

'recommendation': {}

807

}

808

809

# Calculate on-demand costs

810

shots_per_month = usage_pattern.get('shots_per_month', 100000)

811

on_demand_rate = device_pricing.get('per_shot', 0.00075) # Example pricing

812

813

on_demand_monthly_cost = shots_per_month * on_demand_rate

814

815

# Calculate reservation costs (example)

816

reservation_hourly_rate = device_pricing.get('reservation_hourly', 1.50)

817

hours_per_month = usage_pattern.get('hours_per_month', 40)

818

819

reservation_monthly_cost = hours_per_month * reservation_hourly_rate

820

821

# Cost comparison

822

monthly_savings = on_demand_monthly_cost - reservation_monthly_cost

823

break_even_shots = reservation_monthly_cost / on_demand_rate

824

825

analysis['cost_comparison'] = {

826

'on_demand_monthly': on_demand_monthly_cost,

827

'reservation_monthly': reservation_monthly_cost,

828

'monthly_savings': monthly_savings,

829

'savings_percentage': (monthly_savings / on_demand_monthly_cost) * 100 if on_demand_monthly_cost > 0 else 0,

830

'break_even_shots': break_even_shots

831

}

832

833

# Recommendation

834

if monthly_savings > 0:

835

analysis['recommendation'] = {

836

'use_reservations': True,

837

'reason': f'Save ${monthly_savings:.2f} per month ({analysis["cost_comparison"]["savings_percentage"]:.1f}%)',

838

'optimal_reservation_hours': hours_per_month

839

}

840

else:

841

analysis['recommendation'] = {

842

'use_reservations': False,

843

'reason': 'On-demand pricing more cost-effective for current usage',

844

'threshold_usage': f'Consider reservations above {break_even_shots:.0f} shots/month'

845

}

846

847

return analysis

848

```

849

850

## Hybrid Quantum Jobs

851

852

### Job Definition and Execution

853

854

```python { .api }

855

from braket.jobs import hybrid_job, InstanceConfig, CheckpointConfig, OutputDataConfig

856

857

@hybrid_job(device="ml.m5.large")

858

def basic_hybrid_algorithm():

859

"""

860

Basic hybrid quantum-classical algorithm.

861

862

This job demonstrates the structure of hybrid algorithms

863

combining quantum circuit execution with classical optimization.

864

"""

865

from braket.aws import AwsDevice

866

from braket.jobs import get_job_device_arn, save_job_result

867

import numpy as np

868

869

# Get quantum device for this job

870

device_arn = get_job_device_arn()

871

device = AwsDevice(device_arn)

872

873

def quantum_cost_function(params: np.ndarray) -> float:

874

"""Quantum cost function using variational circuit."""

875

from braket.circuits import Circuit

876

from braket.circuits.gates import Ry, CNot

877

from braket.circuits.observables import Z

878

879

circuit = Circuit()

880

for i, param in enumerate(params):

881

circuit.ry(i, param)

882

883

circuit.cnot(0, 1)

884

circuit.expectation(observable=Z() @ Z(), target=[0, 1])

885

886

task = device.run(circuit, shots=1000)

887

result = task.result()

888

return result.values[0]

889

890

# Classical optimization

891

from scipy.optimize import minimize

892

initial_params = np.random.random(2) * np.pi

893

894

result = minimize(

895

quantum_cost_function,

896

initial_params,

897

method='COBYLA',

898

options={'maxiter': 50}

899

)

900

901

# Save results

902

save_job_result({

903

'optimal_parameters': result.x.tolist(),

904

'optimal_value': float(result.fun),

905

'optimization_success': result.success

906

})

907

908

class InstanceConfig:

909

"""Configuration for job compute instance."""

910

911

def __init__(

912

self,

913

instance_type: str,

914

instance_count: int = 1,

915

volume_size_gb: int = 30

916

):

917

"""

918

Initialize instance configuration.

919

920

Args:

921

instance_type: EC2 instance type (e.g., 'ml.m5.large')

922

instance_count: Number of instances

923

volume_size_gb: EBS volume size in GB

924

"""

925

self.instance_type = instance_type

926

self.instance_count = instance_count

927

self.volume_size_gb = volume_size_gb

928

929

class CheckpointConfig:

930

"""Configuration for job checkpointing."""

931

932

def __init__(

933

self,

934

s3_uri: str,

935

local_path: str = "/opt/braket/checkpoints"

936

):

937

"""

938

Initialize checkpoint configuration.

939

940

Args:

941

s3_uri: S3 URI for checkpoint storage

942

local_path: Local checkpoint directory

943

"""

944

self.s3_uri = s3_uri

945

self.local_path = local_path

946

947

class OutputDataConfig:

948

"""Configuration for job output data."""

949

950

def __init__(

951

self,

952

s3_path: str,

953

kms_key_id: str = None

954

):

955

"""

956

Initialize output data configuration.

957

958

Args:

959

s3_path: S3 path for output data

960

kms_key_id: KMS key for encryption

961

"""

962

self.s3_path = s3_path

963

self.kms_key_id = kms_key_id

964

965

# Advanced job configuration

966

@hybrid_job(

967

device="ml.p3.2xlarge", # GPU instance for ML

968

instance_config=InstanceConfig("ml.p3.2xlarge", instance_count=1),

969

checkpoint_config=CheckpointConfig("s3://my-bucket/checkpoints/"),

970

output_data_config=OutputDataConfig("s3://my-bucket/results/")

971

)

972

def advanced_variational_algorithm():

973

"""

974

Advanced hybrid algorithm with ML and checkpointing.

975

976

Demonstrates quantum machine learning with classical neural networks

977

and comprehensive checkpoint management.

978

"""

979

import torch

980

import torch.nn as nn

981

from braket.jobs import (

982

save_job_checkpoint, load_job_checkpoint,

983

get_hyperparameters, save_job_result

984

)

985

986

# Get job hyperparameters

987

hyperparams = get_hyperparameters()

988

learning_rate = hyperparams.get('learning_rate', 0.01)

989

epochs = hyperparams.get('epochs', 100)

990

991

# Define hybrid model

992

class HybridQuantumClassicalModel(nn.Module):

993

def __init__(self):

994

super().__init__()

995

self.classical_layers = nn.Sequential(

996

nn.Linear(4, 16),

997

nn.ReLU(),

998

nn.Linear(16, 2) # 2 quantum parameters

999

)

1000

1001

def quantum_layer(self, params):

1002

"""Quantum variational layer."""

1003

from braket.circuits import Circuit

1004

from braket.circuits.gates import Ry, CNot

1005

from braket.circuits.observables import Z

1006

1007

circuit = Circuit()

1008

circuit.ry(0, params[0])

1009

circuit.ry(1, params[1])

1010

circuit.cnot(0, 1)

1011

circuit.expectation(observable=Z() @ Z(), target=[0, 1])

1012

1013

device_arn = get_job_device_arn()

1014

device = AwsDevice(device_arn)

1015

task = device.run(circuit, shots=1000)

1016

return task.result().values[0]

1017

1018

def forward(self, x):

1019

classical_out = self.classical_layers(x)

1020

quantum_out = self.quantum_layer(classical_out)

1021

return quantum_out

1022

1023

# Training loop with checkpointing

1024

model = HybridQuantumClassicalModel()

1025

optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

1026

1027

# Load checkpoint if exists

1028

try:

1029

checkpoint = load_job_checkpoint()

1030

model.load_state_dict(checkpoint['model_state'])

1031

optimizer.load_state_dict(checkpoint['optimizer_state'])

1032

start_epoch = checkpoint['epoch']

1033

except:

1034

start_epoch = 0

1035

1036

# Training loop (simplified)

1037

for epoch in range(start_epoch, epochs):

1038

# Training step would go here

1039

loss = torch.randn(1) # Placeholder

1040

1041

# Save checkpoint every 10 epochs

1042

if epoch % 10 == 0:

1043

save_job_checkpoint({

1044

'model_state': model.state_dict(),

1045

'optimizer_state': optimizer.state_dict(),

1046

'epoch': epoch,

1047

'loss': float(loss)

1048

})

1049

1050

# Save final results

1051

save_job_result({

1052

'final_model_state': model.state_dict(),

1053

'training_loss': float(loss),

1054

'total_epochs': epochs

1055

})

1056

```

1057

1058

### Job Environment and Utilities

1059

1060

```python { .api }

1061

from braket.jobs import (

1062

get_checkpoint_dir, get_hyperparameters, get_input_data_dir,

1063

get_job_device_arn, get_job_name, get_results_dir

1064

)

1065

1066

def get_job_device_arn() -> str:

1067

"""

1068

Get quantum device ARN for current job.

1069

1070

Returns:

1071

str: Device ARN assigned to job

1072

"""

1073

pass

1074

1075

def get_job_name() -> str:

1076

"""

1077

Get current job name.

1078

1079

Returns:

1080

str: Job name

1081

"""

1082

pass

1083

1084

def get_hyperparameters() -> dict:

1085

"""

1086

Get job hyperparameters.

1087

1088

Returns:

1089

dict: Hyperparameter dictionary

1090

"""

1091

pass

1092

1093

def get_input_data_dir(channel: str = "training") -> str:

1094

"""

1095

Get input data directory path.

1096

1097

Args:

1098

channel: Data channel name

1099

1100

Returns:

1101

str: Input data directory path

1102

"""

1103

pass

1104

1105

def get_results_dir() -> str:

1106

"""

1107

Get results output directory path.

1108

1109

Returns:

1110

str: Results directory path

1111

"""

1112

pass

1113

1114

def get_checkpoint_dir() -> str:

1115

"""

1116

Get checkpoint directory path.

1117

1118

Returns:

1119

str: Checkpoint directory path

1120

"""

1121

pass

1122

1123

def save_job_checkpoint(checkpoint_data: dict) -> None:

1124

"""

1125

Save job checkpoint data.

1126

1127

Args:

1128

checkpoint_data: Data to checkpoint

1129

"""

1130

pass

1131

1132

def load_job_checkpoint() -> dict:

1133

"""

1134

Load job checkpoint data.

1135

1136

Returns:

1137

dict: Loaded checkpoint data

1138

"""

1139

pass

1140

1141

def save_job_result(result_data: dict) -> None:

1142

"""

1143

Save job result data.

1144

1145

Args:

1146

result_data: Results to save

1147

"""

1148

pass

1149

1150

def load_job_result(job_arn: str) -> dict:

1151

"""

1152

Load results from completed job.

1153

1154

Args:

1155

job_arn: Job ARN

1156

1157

Returns:

1158

dict: Job results

1159

"""

1160

pass

1161

1162

# Job utilities example

1163

def comprehensive_job_setup() -> dict:

1164

"""

1165

Setup comprehensive job environment and configuration.

1166

1167

Returns:

1168

dict: Job environment information

1169

"""

1170

job_info = {

1171

'job_name': get_job_name(),

1172

'device_arn': get_job_device_arn(),

1173

'hyperparameters': get_hyperparameters(),

1174

'directories': {

1175

'input_data': get_input_data_dir(),

1176

'results': get_results_dir(),

1177

'checkpoints': get_checkpoint_dir()

1178

}

1179

}

1180

1181

# Setup logging

1182

import logging

1183

logging.basicConfig(

1184

level=logging.INFO,

1185

format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',

1186

handlers=[

1187

logging.FileHandler(f"{get_results_dir()}/job.log"),

1188

logging.StreamHandler()

1189

]

1190

)

1191

1192

logger = logging.getLogger('QuantumJob')

1193

logger.info(f"Starting job: {job_info['job_name']}")

1194

logger.info(f"Using device: {job_info['device_arn']}")

1195

1196

return job_info

1197

```

1198

1199

## Cost and Usage Tracking

1200

1201

### Resource Tracking

1202

1203

```python { .api }

1204

from braket.tracking import Tracker

1205

1206

class Tracker:

1207

"""Tracks quantum computing costs and resource usage."""

1208

1209

def __init__(self):

1210

"""Initialize cost tracker."""

1211

pass

1212

1213

def __enter__(self) -> 'Tracker':

1214

"""Context manager entry."""

1215

return self

1216

1217

def __exit__(self, exc_type, exc_val, exc_tb) -> None:

1218

"""Context manager exit."""

1219

pass

1220

1221

def quantum_tasks_cost(self) -> float:

1222

"""

1223

Get total cost for quantum tasks.

1224

1225

Returns:

1226

float: Total cost in USD

1227

"""

1228

pass

1229

1230

def simulator_tasks_cost(self) -> float:

1231

"""

1232

Get total cost for simulator tasks.

1233

1234

Returns:

1235

float: Total cost in USD

1236

"""

1237

pass

1238

1239

def quantum_task_statistics(self) -> dict:

1240

"""

1241

Get detailed quantum task statistics.

1242

1243

Returns:

1244

dict: Task execution statistics

1245

"""

1246

pass

1247

1248

def simulator_task_statistics(self) -> dict:

1249

"""

1250

Get detailed simulator task statistics.

1251

1252

Returns:

1253

dict: Simulator execution statistics

1254

"""

1255

pass

1256

1257

def track_quantum_experiment_costs(experiment_function: callable, *args, **kwargs) -> dict:

1258

"""

1259

Track costs for quantum experiment execution.

1260

1261

Args:

1262

experiment_function: Function that runs quantum experiments

1263

*args: Arguments for experiment function

1264

**kwargs: Keyword arguments for experiment function

1265

1266

Returns:

1267

dict: Cost analysis and experiment results

1268

"""

1269

with Tracker() as tracker:

1270

# Run experiment

1271

results = experiment_function(*args, **kwargs)

1272

1273

# Collect cost information

1274

cost_analysis = {

1275

'total_cost': tracker.quantum_tasks_cost() + tracker.simulator_tasks_cost(),

1276

'quantum_cost': tracker.quantum_tasks_cost(),

1277

'simulator_cost': tracker.simulator_tasks_cost(),

1278

'quantum_stats': tracker.quantum_task_statistics(),

1279

'simulator_stats': tracker.simulator_task_statistics(),

1280

'cost_per_shot': 0.0,

1281

'results': results

1282

}

1283

1284

# Calculate cost per shot

1285

total_shots = (

1286

cost_analysis['quantum_stats'].get('shots', 0) +

1287

cost_analysis['simulator_stats'].get('shots', 0)

1288

)

1289

1290

if total_shots > 0:

1291

cost_analysis['cost_per_shot'] = cost_analysis['total_cost'] / total_shots

1292

1293

return cost_analysis

1294

1295

def optimize_experiment_for_cost(circuits: list, target_accuracy: float = 0.01) -> dict:

1296

"""

1297

Optimize experiment execution for cost while maintaining accuracy.

1298

1299

Args:

1300

circuits: List of quantum circuits to execute

1301

target_accuracy: Target measurement accuracy

1302

1303

Returns:

1304

dict: Optimized execution plan with cost estimates

1305

"""

1306

optimization_plan = {

1307

'execution_strategy': {},

1308

'estimated_costs': {},

1309

'recommendations': []

1310

}

1311

1312

# Analyze circuits for optimization opportunities

1313

for i, circuit in enumerate(circuits):

1314

circuit_analysis = {

1315

'qubit_count': circuit.qubit_count,

1316

'depth': circuit.depth,

1317

'gate_count': len(circuit.instructions)

1318

}

1319

1320

# Recommend device based on circuit characteristics

1321

if circuit.qubit_count <= 10 and circuit.depth <= 20:

1322

# Small circuit - use simulator

1323

recommended_device = "Local Simulator"

1324

estimated_cost = 0.0

1325

optimization_plan['recommendations'].append(

1326

f"Circuit {i}: Use local simulator (free) - small circuit"

1327

)

1328

elif circuit.qubit_count <= 20:

1329

# Medium circuit - use cloud simulator

1330

recommended_device = "AWS SV1"

1331

estimated_cost = 0.075 * (2 ** min(circuit.qubit_count, 17))

1332

optimization_plan['recommendations'].append(

1333

f"Circuit {i}: Use SV1 simulator - cost effective for {circuit.qubit_count} qubits"

1334

)

1335

else:

1336

# Large circuit - may need QPU or advanced simulator

1337

recommended_device = "AWS QPU or TN1"

1338

estimated_cost = 0.30 + (1000 * 0.0003) # Task fee + shots

1339

optimization_plan['recommendations'].append(

1340

f"Circuit {i}: Consider QPU for {circuit.qubit_count} qubits"

1341

)

1342

1343

optimization_plan['execution_strategy'][f'circuit_{i}'] = {

1344

'recommended_device': recommended_device,

1345

'estimated_cost': estimated_cost,

1346

'analysis': circuit_analysis

1347

}

1348

1349

# Calculate total estimated cost

1350

total_cost = sum(

1351

strategy['estimated_cost']

1352

for strategy in optimization_plan['execution_strategy'].values()

1353

)

1354

optimization_plan['estimated_costs']['total'] = total_cost

1355

1356

return optimization_plan

1357

```

1358

1359

## Device Reservations

1360

1361

### Direct Device Access

1362

1363

```python { .api }

1364

from braket.aws import DirectReservation

1365

1366

class DirectReservation:

1367

"""Direct quantum device reservation interface."""

1368

1369

def __init__(self, arn: str, aws_session: 'AwsSession' = None):

1370

"""

1371

Initialize device reservation.

1372

1373

Args:

1374

arn: Reservation ARN

1375

aws_session: AWS session for API calls

1376

"""

1377

self.arn = arn

1378

self.aws_session = aws_session

1379

1380

@property

1381

def device_arn(self) -> str:

1382

"""Reserved device ARN."""

1383

pass

1384

1385

@property

1386

def start_time(self) -> str:

1387

"""Reservation start time."""

1388

pass

1389

1390

@property

1391

def end_time(self) -> str:

1392

"""Reservation end time."""

1393

pass

1394

1395

@property

1396

def status(self) -> str:

1397

"""Reservation status."""

1398

pass

1399

1400

def cancel(self) -> None:

1401

"""Cancel reservation."""

1402

pass

1403

1404

def create_device_reservation(device_arn: str, start_time: str, duration_minutes: int) -> DirectReservation:

1405

"""

1406

Create direct device reservation.

1407

1408

Args:

1409

device_arn: Target device ARN

1410

start_time: ISO format start time

1411

duration_minutes: Reservation duration in minutes

1412

1413

Returns:

1414

DirectReservation: Device reservation handle

1415

"""

1416

# This is a placeholder - actual implementation would use AWS API

1417

pass

1418

1419

def execute_with_reservation(reservation: DirectReservation, circuits: list) -> dict:

1420

"""

1421

Execute circuits using device reservation.

1422

1423

Args:

1424

reservation: Active device reservation

1425

circuits: Circuits to execute during reservation

1426

1427

Returns:

1428

dict: Execution results and reservation utilization

1429

"""

1430

device = AwsDevice(reservation.device_arn)

1431

1432

results = []

1433

for circuit in circuits:

1434

task = device.run(circuit, shots=1000, reservation_arn=reservation.arn)

1435

results.append(task.result())

1436

1437

return {

1438

'results': results,

1439

'reservation_info': {

1440

'device_arn': reservation.device_arn,

1441

'start_time': reservation.start_time,

1442

'end_time': reservation.end_time,

1443

'utilization': len(circuits)

1444

}

1445

}

1446

```

1447

1448

This comprehensive AWS integration documentation covers all aspects of cloud quantum computing with Amazon Braket, including device management, task execution, job orchestration, cost optimization, and resource tracking.