0
# AWS Integration
1
2
Complete AWS cloud integration for quantum computing on Amazon Braket, including device access, task management, sessions, jobs, and cost tracking.
3
4
## Core Imports
5
6
```python { .api }
7
from braket.aws import (
8
AwsDevice, AwsDeviceType, AwsQuantumTask, AwsQuantumTaskBatch,
9
AwsQuantumJob, AwsSession, DirectReservation
10
)
11
from braket.jobs import (
12
hybrid_job, CheckpointConfig, InstanceConfig, OutputDataConfig,
13
S3DataSourceConfig, StoppingCondition, LocalQuantumJob,
14
load_job_checkpoint, load_job_result, save_job_checkpoint, save_job_result,
15
get_checkpoint_dir, get_hyperparameters, get_input_data_dir,
16
get_job_device_arn, get_job_name, get_results_dir,
17
Framework, retrieve_image
18
)
19
from braket.tracking import Tracker
20
from braket.tasks import QuantumTask, QuantumTaskBatch
21
```
22
23
## AWS Device Management
24
25
### Device Discovery and Access
26
27
```python { .api }
28
from braket.aws import AwsDevice, AwsDeviceType
29
from enum import Enum
30
31
class AwsDeviceType(Enum):
32
"""AWS Braket device type enumeration."""
33
SIMULATOR = "SIMULATOR"
34
QPU = "QPU"
35
36
class AwsDevice:
37
"""Amazon Braket implementation of quantum devices."""
38
39
def __init__(self, arn: str, aws_session: 'AwsSession' = None):
40
"""
41
Initialize AWS quantum device.
42
43
Args:
44
arn: AWS device ARN
45
aws_session: Optional AWS session, creates default if None
46
"""
47
self.arn = arn
48
self.aws_session = aws_session
49
50
@staticmethod
51
def get_devices(
52
arns: list[str] = None,
53
names: list[str] = None,
54
types: list[AwsDeviceType] = None,
55
statuses: list[str] = None,
56
provider_names: list[str] = None,
57
aws_session: 'AwsSession' = None
58
) -> list['AwsDevice']:
59
"""
60
Get AWS Braket devices matching criteria.
61
62
Args:
63
arns: Device ARNs to filter by
64
names: Device names to filter by
65
types: Device types to filter by
66
statuses: Device statuses to filter by
67
provider_names: Provider names to filter by
68
aws_session: AWS session for API calls
69
70
Returns:
71
list[AwsDevice]: Matching devices
72
"""
73
pass
74
75
def run(
76
self,
77
task_specification,
78
shots: int = 1000,
79
inputs: dict = None,
80
gate_definitions: dict = None,
81
reservation_arn: str = None,
82
tags: dict = None
83
) -> 'AwsQuantumTask':
84
"""
85
Execute quantum task on AWS device.
86
87
Args:
88
task_specification: Circuit, AHS program, or annealing problem
89
shots: Number of measurement shots
90
inputs: Input parameters for parameterized programs
91
gate_definitions: Custom gate definitions
92
reservation_arn: Device reservation ARN
93
tags: AWS resource tags
94
95
Returns:
96
AwsQuantumTask: Quantum task execution handle
97
"""
98
pass
99
100
def run_batch(
101
self,
102
task_specifications: list,
103
shots: int = 1000,
104
max_parallel: int = 10,
105
inputs: list[dict] = None,
106
tags: dict = None
107
) -> 'AwsQuantumTaskBatch':
108
"""
109
Execute batch of quantum tasks.
110
111
Args:
112
task_specifications: List of circuits/programs
113
shots: Number of measurement shots per task
114
max_parallel: Maximum parallel executions
115
inputs: Input parameters for each task
116
tags: AWS resource tags
117
118
Returns:
119
AwsQuantumTaskBatch: Batch execution handle
120
"""
121
pass
122
123
@property
124
def name(self) -> str:
125
"""Device name."""
126
pass
127
128
@property
129
def status(self) -> str:
130
"""Current device status."""
131
pass
132
133
@property
134
def type(self) -> AwsDeviceType:
135
"""Device type (QPU or Simulator)."""
136
pass
137
138
@property
139
def provider_name(self) -> str:
140
"""Device provider name."""
141
pass
142
143
@property
144
def properties(self) -> dict:
145
"""Device capabilities and properties."""
146
pass
147
148
# Device discovery examples
149
def discover_aws_devices() -> dict[str, list[AwsDevice]]:
150
"""
151
Discover and categorize available AWS Braket devices.
152
153
Returns:
154
dict: Categorized devices by provider and type
155
"""
156
devices = {
157
'simulators': AwsDevice.get_devices(types=[AwsDeviceType.SIMULATOR]),
158
'qpus': AwsDevice.get_devices(types=[AwsDeviceType.QPU]),
159
'ionq': AwsDevice.get_devices(provider_names=['IonQ']),
160
'rigetti': AwsDevice.get_devices(provider_names=['Rigetti']),
161
'oqc': AwsDevice.get_devices(provider_names=['Oxford Quantum Computing']),
162
'quera': AwsDevice.get_devices(provider_names=['QuEra'])
163
}
164
165
return devices
166
167
def select_optimal_device(circuit, requirements: dict) -> AwsDevice:
168
"""
169
Select optimal device based on circuit and requirements.
170
171
Args:
172
circuit: Quantum circuit to execute
173
requirements: Performance and cost requirements
174
175
Returns:
176
AwsDevice: Selected optimal device
177
"""
178
all_devices = AwsDevice.get_devices()
179
180
# Filter by availability and qubit count
181
available_devices = []
182
for device in all_devices:
183
if (device.status == 'ONLINE' and
184
device.properties.get('qubitCount', 0) >= circuit.qubit_count):
185
available_devices.append(device)
186
187
# Select based on requirements
188
if requirements.get('cost_priority', False):
189
# Prefer simulators for cost
190
simulators = [d for d in available_devices if d.type == AwsDeviceType.SIMULATOR]
191
return simulators[0] if simulators else available_devices[0]
192
193
elif requirements.get('real_hardware', False):
194
# Prefer QPUs for real quantum effects
195
qpus = [d for d in available_devices if d.type == AwsDeviceType.QPU]
196
return qpus[0] if qpus else available_devices[0]
197
198
else:
199
# Default selection
200
return available_devices[0]
201
```
202
203
### Device Properties and Capabilities
204
205
```python { .api }
206
def analyze_device_capabilities(device: AwsDevice) -> dict:
207
"""
208
Analyze device capabilities and constraints.
209
210
Args:
211
device: AWS device to analyze
212
213
Returns:
214
dict: Comprehensive device capability analysis
215
"""
216
properties = device.properties
217
218
capabilities = {
219
'basic_info': {
220
'name': device.name,
221
'provider': device.provider_name,
222
'type': device.type.value,
223
'status': device.status
224
},
225
'quantum_specs': {},
226
'gate_set': {},
227
'connectivity': {},
228
'timing_info': {},
229
'error_rates': {}
230
}
231
232
# Quantum specifications
233
if 'qubitCount' in properties:
234
capabilities['quantum_specs']['qubit_count'] = properties['qubitCount']
235
236
# Gate set analysis
237
if 'supportedOperations' in properties:
238
capabilities['gate_set'] = properties['supportedOperations']
239
240
# Connectivity graph
241
if 'connectivity' in properties:
242
capabilities['connectivity'] = properties['connectivity']
243
244
# Timing constraints
245
if 'timing' in properties:
246
capabilities['timing_info'] = properties['timing']
247
248
# Error characteristics
249
if 'fidelity' in properties:
250
capabilities['error_rates'] = properties['fidelity']
251
252
return capabilities
253
254
def check_circuit_compatibility(circuit, device: AwsDevice) -> dict:
255
"""
256
Check circuit compatibility with device.
257
258
Args:
259
circuit: Quantum circuit to validate
260
device: Target AWS device
261
262
Returns:
263
dict: Compatibility analysis results
264
"""
265
properties = device.properties
266
267
compatibility = {
268
'is_compatible': True,
269
'issues': [],
270
'warnings': [],
271
'recommendations': []
272
}
273
274
# Check qubit count
275
if circuit.qubit_count > properties.get('qubitCount', 0):
276
compatibility['is_compatible'] = False
277
compatibility['issues'].append(f"Circuit requires {circuit.qubit_count} qubits, device has {properties.get('qubitCount', 0)}")
278
279
# Check gate support
280
supported_gates = properties.get('supportedOperations', [])
281
for instruction in circuit.instructions:
282
gate_name = instruction.operator.__class__.__name__
283
if gate_name not in supported_gates:
284
compatibility['warnings'].append(f"Gate {gate_name} may not be natively supported")
285
286
# Check connectivity (for QPUs)
287
if device.type == AwsDeviceType.QPU and 'connectivity' in properties:
288
connectivity_graph = properties['connectivity']
289
for instruction in circuit.instructions:
290
if len(instruction.target) == 2: # Two-qubit gate
291
qubit_pair = tuple(instruction.target)
292
if qubit_pair not in connectivity_graph.get('fullyConnected', []):
293
compatibility['warnings'].append(f"Two-qubit gate on {qubit_pair} may require SWAP operations")
294
295
return compatibility
296
```
297
298
## Quantum Task Management
299
300
### Task Execution and Monitoring
301
302
```python { .api }
303
from braket.aws import AwsQuantumTask, AwsQuantumTaskBatch
304
from braket.tasks import QuantumTask
305
306
class AwsQuantumTask(QuantumTask):
307
"""AWS quantum task execution interface."""
308
309
def __init__(self, arn: str, aws_session: 'AwsSession' = None):
310
"""
311
Initialize AWS quantum task.
312
313
Args:
314
arn: Task ARN
315
aws_session: AWS session for API calls
316
"""
317
self.arn = arn
318
self.aws_session = aws_session
319
320
def result(self) -> 'TaskResult':
321
"""
322
Get task execution results.
323
324
Returns:
325
TaskResult: Quantum task results
326
"""
327
pass
328
329
def state(self) -> str:
330
"""
331
Get current task state.
332
333
Returns:
334
str: Task state (QUEUED, RUNNING, COMPLETED, FAILED, CANCELLED)
335
"""
336
pass
337
338
def cancel(self) -> None:
339
"""Cancel running task."""
340
pass
341
342
def metadata(self) -> dict:
343
"""
344
Get task metadata.
345
346
Returns:
347
dict: Task execution metadata
348
"""
349
pass
350
351
@property
352
def id(self) -> str:
353
"""Task ID."""
354
pass
355
356
@staticmethod
357
def get(arn: str, aws_session: 'AwsSession' = None) -> 'AwsQuantumTask':
358
"""
359
Get existing task by ARN.
360
361
Args:
362
arn: Task ARN
363
aws_session: AWS session
364
365
Returns:
366
AwsQuantumTask: Task instance
367
"""
368
pass
369
370
class AwsQuantumTaskBatch:
371
"""Batch execution of quantum tasks."""
372
373
def __init__(self, tasks: list[AwsQuantumTask]):
374
"""
375
Initialize task batch.
376
377
Args:
378
tasks: List of quantum tasks
379
"""
380
self.tasks = tasks
381
382
def results(self) -> list:
383
"""
384
Get results from all tasks in batch.
385
386
Returns:
387
list: Results from completed tasks
388
"""
389
pass
390
391
def unsuccessful(self) -> list[AwsQuantumTask]:
392
"""
393
Get unsuccessful tasks.
394
395
Returns:
396
list[AwsQuantumTask]: Failed or cancelled tasks
397
"""
398
pass
399
400
def retry_unsuccessful_tasks(self) -> 'AwsQuantumTaskBatch':
401
"""
402
Retry failed tasks.
403
404
Returns:
405
AwsQuantumTaskBatch: New batch with retried tasks
406
"""
407
pass
408
409
# Task execution examples
410
def execute_quantum_task_with_monitoring(circuit, device: AwsDevice) -> dict:
411
"""
412
Execute quantum task with comprehensive monitoring.
413
414
Args:
415
circuit: Quantum circuit to execute
416
device: Target AWS device
417
418
Returns:
419
dict: Task results and execution information
420
"""
421
import time
422
423
# Submit task
424
task = device.run(circuit, shots=1000)
425
426
execution_info = {
427
'task_arn': task.arn,
428
'submission_time': time.time(),
429
'device_arn': device.arn,
430
'status_history': []
431
}
432
433
# Monitor execution
434
while task.state() not in ['COMPLETED', 'FAILED', 'CANCELLED']:
435
status = task.state()
436
execution_info['status_history'].append({
437
'status': status,
438
'timestamp': time.time()
439
})
440
441
print(f"Task status: {status}")
442
time.sleep(5) # Check every 5 seconds
443
444
# Get final results
445
if task.state() == 'COMPLETED':
446
result = task.result()
447
execution_info['completion_time'] = time.time()
448
execution_info['results'] = {
449
'measurement_counts': result.measurement_counts,
450
'task_metadata': result.task_metadata
451
}
452
else:
453
execution_info['error_info'] = task.metadata()
454
455
return execution_info
456
457
def execute_parameter_sweep(circuit_template, parameter_ranges: dict, device: AwsDevice) -> dict:
458
"""
459
Execute parameter sweep across quantum circuit parameters.
460
461
Args:
462
circuit_template: Parameterized circuit template
463
parameter_ranges: Parameter ranges to sweep
464
device: Target AWS device
465
466
Returns:
467
dict: Parameter sweep results
468
"""
469
import itertools
470
from braket.parametric import FreeParameter
471
472
# Generate parameter combinations
473
param_names = list(parameter_ranges.keys())
474
param_values = list(parameter_ranges.values())
475
param_combinations = list(itertools.product(*param_values))
476
477
# Create circuits for each parameter combination
478
circuits = []
479
for combination in param_combinations:
480
param_dict = dict(zip(param_names, combination))
481
circuit = circuit_template.copy()
482
483
# Bind parameters
484
for param_name, value in param_dict.items():
485
circuit = circuit.substitute_free_parameter(FreeParameter(param_name), value)
486
487
circuits.append((circuit, param_dict))
488
489
# Execute batch
490
task_specs = [circuit for circuit, _ in circuits]
491
batch = device.run_batch(task_specs, shots=1000)
492
493
# Collect results
494
results = batch.results()
495
sweep_results = {
496
'parameter_combinations': [params for _, params in circuits],
497
'results': results,
498
'analysis': {}
499
}
500
501
# Analyze results
502
for i, (result, params) in enumerate(zip(results, sweep_results['parameter_combinations'])):
503
sweep_results['analysis'][str(params)] = {
504
'measurement_counts': result.measurement_counts,
505
'expectation_values': getattr(result, 'values', [])
506
}
507
508
return sweep_results
509
```
510
511
## AWS Session Management
512
513
### Authentication and Configuration
514
515
```python { .api }
516
from braket.aws import AwsSession
517
import boto3
518
519
class AwsSession:
520
"""AWS authentication and session management."""
521
522
def __init__(
523
self,
524
boto_session: boto3.Session = None,
525
braket_user_agents: list[str] = None,
526
config: dict = None
527
):
528
"""
529
Initialize AWS session for Braket.
530
531
Args:
532
boto_session: Pre-configured boto3 session
533
braket_user_agents: Custom user agent strings
534
config: Session configuration options
535
"""
536
self.boto_session = boto_session
537
self.braket_user_agents = braket_user_agents
538
self.config = config or {}
539
540
@property
541
def region(self) -> str:
542
"""Current AWS region."""
543
pass
544
545
@property
546
def account_id(self) -> str:
547
"""AWS account ID."""
548
pass
549
550
def get_device(self, arn: str) -> AwsDevice:
551
"""
552
Get device using this session.
553
554
Args:
555
arn: Device ARN
556
557
Returns:
558
AwsDevice: Device with this session
559
"""
560
pass
561
562
def create_aws_session_with_profile(profile_name: str, region: str = 'us-east-1') -> AwsSession:
563
"""
564
Create AWS session using named profile.
565
566
Args:
567
profile_name: AWS credential profile name
568
region: AWS region for Braket services
569
570
Returns:
571
AwsSession: Configured session
572
"""
573
boto_session = boto3.Session(profile_name=profile_name, region_name=region)
574
return AwsSession(boto_session=boto_session)
575
576
def create_aws_session_with_keys(access_key: str, secret_key: str, region: str = 'us-east-1') -> AwsSession:
577
"""
578
Create AWS session using access keys.
579
580
Args:
581
access_key: AWS access key ID
582
secret_key: AWS secret access key
583
region: AWS region for Braket services
584
585
Returns:
586
AwsSession: Configured session
587
"""
588
boto_session = boto3.Session(
589
aws_access_key_id=access_key,
590
aws_secret_access_key=secret_key,
591
region_name=region
592
)
593
return AwsSession(boto_session=boto_session)
594
595
def validate_aws_session(session: AwsSession) -> dict:
596
"""
597
Validate AWS session configuration and permissions.
598
599
Args:
600
session: AWS session to validate
601
602
Returns:
603
dict: Validation results and recommendations
604
"""
605
validation = {
606
'is_valid': True,
607
'region': session.region,
608
'account_id': session.account_id,
609
'permissions': {},
610
'recommendations': []
611
}
612
613
try:
614
# Test basic Braket permissions
615
devices = AwsDevice.get_devices(aws_session=session)
616
validation['permissions']['device_access'] = True
617
validation['permissions']['accessible_devices'] = len(devices)
618
619
except Exception as e:
620
validation['is_valid'] = False
621
validation['permissions']['device_access'] = False
622
validation['error'] = str(e)
623
validation['recommendations'].append("Check Braket service permissions")
624
625
# Check for cost optimization
626
if validation['region'] != 'us-east-1':
627
validation['recommendations'].append("Consider us-east-1 for lowest latency to Braket services")
628
629
return validation
630
```
631
632
## Device Reservations
633
634
### Direct Reservations
635
636
```python { .api }
637
from braket.aws import DirectReservation
638
639
class DirectReservation:
640
"""Context manager for using reserved quantum devices on AWS Braket."""
641
642
def __init__(
643
self,
644
device,
645
reservation_arn: str = None
646
):
647
"""
648
Initialize direct reservation context.
649
650
Args:
651
device: Braket device or device ARN with reservation
652
reservation_arn: AWS Braket Direct reservation ARN
653
"""
654
self.device = device
655
self.reservation_arn = reservation_arn
656
657
def __enter__(self) -> 'DirectReservation':
658
"""
659
Enter reservation context and apply reservation to quantum tasks.
660
661
Returns:
662
DirectReservation: Self for context management
663
"""
664
pass
665
666
def __exit__(self, exc_type, exc_val, exc_tb):
667
"""Exit reservation context and restore normal device access."""
668
pass
669
670
def start(self) -> 'DirectReservation':
671
"""
672
Start reservation without context manager.
673
674
Returns:
675
DirectReservation: Self for method chaining
676
"""
677
pass
678
679
def stop(self) -> 'DirectReservation':
680
"""
681
Stop reservation manually.
682
683
Returns:
684
DirectReservation: Self for method chaining
685
"""
686
pass
687
688
# Reservation management examples
689
def use_device_reservation(device_arn: str, reservation_arn: str, circuits: list) -> dict:
690
"""
691
Execute circuits using a reserved quantum device.
692
693
Args:
694
device_arn: AWS device ARN with reservation
695
reservation_arn: Reservation ARN for exclusive access
696
circuits: List of quantum circuits to execute
697
698
Returns:
699
dict: Execution results with reservation metadata
700
"""
701
from braket.aws import AwsDevice
702
703
device = AwsDevice(device_arn)
704
reservation_results = {
705
'reservation_arn': reservation_arn,
706
'device_arn': device_arn,
707
'circuits_executed': len(circuits),
708
'results': [],
709
'cost_analysis': {},
710
'reservation_utilization': {}
711
}
712
713
# Execute circuits within reservation context
714
with DirectReservation(device=device, reservation_arn=reservation_arn):
715
for i, circuit in enumerate(circuits):
716
task = device.run(circuit, shots=1000)
717
result = task.result()
718
719
reservation_results['results'].append({
720
'circuit_index': i,
721
'task_arn': task.arn,
722
'measurement_counts': result.measurement_counts,
723
'execution_duration': result.task_metadata.get('executionDuration', 'unknown')
724
})
725
726
# Analyze reservation utilization
727
reservation_results['reservation_utilization'] = {
728
'total_tasks': len(circuits),
729
'estimated_time_savings': 'No queue wait time',
730
'dedicated_access': True,
731
'billing_model': 'Reservation-based'
732
}
733
734
return reservation_results
735
736
def manage_reservation_lifecycle(device_arn: str) -> dict:
737
"""
738
Demonstrate reservation lifecycle management.
739
740
Args:
741
device_arn: AWS device ARN for reservation
742
743
Returns:
744
dict: Reservation management workflow
745
"""
746
reservation_workflow = {
747
'phases': [
748
'reservation_planning',
749
'reservation_creation',
750
'reservation_usage',
751
'reservation_monitoring',
752
'reservation_completion'
753
],
754
'implementation_guide': {}
755
}
756
757
# Reservation planning phase
758
planning_recommendations = {
759
'optimal_duration': 'Based on workload analysis',
760
'cost_estimation': {
761
'reserved_vs_on_demand': 'Calculate savings for extended usage',
762
'minimum_duration': '1 hour minimum for most devices',
763
'cancellation_policy': 'Check device-specific policies'
764
},
765
'scheduling_considerations': [
766
'Device availability windows',
767
'Time zone coordination for team access',
768
'Buffer time for job completion'
769
]
770
}
771
772
# Usage optimization
773
usage_optimization = {
774
'batch_execution': 'Group circuits for efficient execution',
775
'parallel_jobs': 'Use multiple job slots if available',
776
'monitoring_tools': 'Track utilization during reservation',
777
'fallback_strategy': 'Plan for unexpected issues'
778
}
779
780
reservation_workflow['implementation_guide'] = {
781
'planning': planning_recommendations,
782
'optimization': usage_optimization,
783
'monitoring': {
784
'key_metrics': ['utilization_rate', 'cost_per_shot', 'queue_time_saved'],
785
'alerting': 'Set up notifications for underutilization'
786
}
787
}
788
789
return reservation_workflow
790
791
def calculate_reservation_cost_benefit(usage_pattern: dict, device_pricing: dict) -> dict:
792
"""
793
Calculate cost benefits of device reservations vs on-demand usage.
794
795
Args:
796
usage_pattern: Expected usage patterns
797
device_pricing: Device pricing information
798
799
Returns:
800
dict: Cost-benefit analysis for reservations
801
"""
802
analysis = {
803
'usage_input': usage_pattern,
804
'pricing_input': device_pricing,
805
'cost_comparison': {},
806
'recommendation': {}
807
}
808
809
# Calculate on-demand costs
810
shots_per_month = usage_pattern.get('shots_per_month', 100000)
811
on_demand_rate = device_pricing.get('per_shot', 0.00075) # Example pricing
812
813
on_demand_monthly_cost = shots_per_month * on_demand_rate
814
815
# Calculate reservation costs (example)
816
reservation_hourly_rate = device_pricing.get('reservation_hourly', 1.50)
817
hours_per_month = usage_pattern.get('hours_per_month', 40)
818
819
reservation_monthly_cost = hours_per_month * reservation_hourly_rate
820
821
# Cost comparison
822
monthly_savings = on_demand_monthly_cost - reservation_monthly_cost
823
break_even_shots = reservation_monthly_cost / on_demand_rate
824
825
analysis['cost_comparison'] = {
826
'on_demand_monthly': on_demand_monthly_cost,
827
'reservation_monthly': reservation_monthly_cost,
828
'monthly_savings': monthly_savings,
829
'savings_percentage': (monthly_savings / on_demand_monthly_cost) * 100 if on_demand_monthly_cost > 0 else 0,
830
'break_even_shots': break_even_shots
831
}
832
833
# Recommendation
834
if monthly_savings > 0:
835
analysis['recommendation'] = {
836
'use_reservations': True,
837
'reason': f'Save ${monthly_savings:.2f} per month ({analysis["cost_comparison"]["savings_percentage"]:.1f}%)',
838
'optimal_reservation_hours': hours_per_month
839
}
840
else:
841
analysis['recommendation'] = {
842
'use_reservations': False,
843
'reason': 'On-demand pricing more cost-effective for current usage',
844
'threshold_usage': f'Consider reservations above {break_even_shots:.0f} shots/month'
845
}
846
847
return analysis
848
```
849
850
## Hybrid Quantum Jobs
851
852
### Job Definition and Execution
853
854
```python { .api }
855
from braket.jobs import hybrid_job, InstanceConfig, CheckpointConfig, OutputDataConfig
856
857
@hybrid_job(device="ml.m5.large")
858
def basic_hybrid_algorithm():
859
"""
860
Basic hybrid quantum-classical algorithm.
861
862
This job demonstrates the structure of hybrid algorithms
863
combining quantum circuit execution with classical optimization.
864
"""
865
from braket.aws import AwsDevice
866
from braket.jobs import get_job_device_arn, save_job_result
867
import numpy as np
868
869
# Get quantum device for this job
870
device_arn = get_job_device_arn()
871
device = AwsDevice(device_arn)
872
873
def quantum_cost_function(params: np.ndarray) -> float:
874
"""Quantum cost function using variational circuit."""
875
from braket.circuits import Circuit
876
from braket.circuits.gates import Ry, CNot
877
from braket.circuits.observables import Z
878
879
circuit = Circuit()
880
for i, param in enumerate(params):
881
circuit.ry(i, param)
882
883
circuit.cnot(0, 1)
884
circuit.expectation(observable=Z() @ Z(), target=[0, 1])
885
886
task = device.run(circuit, shots=1000)
887
result = task.result()
888
return result.values[0]
889
890
# Classical optimization
891
from scipy.optimize import minimize
892
initial_params = np.random.random(2) * np.pi
893
894
result = minimize(
895
quantum_cost_function,
896
initial_params,
897
method='COBYLA',
898
options={'maxiter': 50}
899
)
900
901
# Save results
902
save_job_result({
903
'optimal_parameters': result.x.tolist(),
904
'optimal_value': float(result.fun),
905
'optimization_success': result.success
906
})
907
908
class InstanceConfig:
909
"""Configuration for job compute instance."""
910
911
def __init__(
912
self,
913
instance_type: str,
914
instance_count: int = 1,
915
volume_size_gb: int = 30
916
):
917
"""
918
Initialize instance configuration.
919
920
Args:
921
instance_type: EC2 instance type (e.g., 'ml.m5.large')
922
instance_count: Number of instances
923
volume_size_gb: EBS volume size in GB
924
"""
925
self.instance_type = instance_type
926
self.instance_count = instance_count
927
self.volume_size_gb = volume_size_gb
928
929
class CheckpointConfig:
930
"""Configuration for job checkpointing."""
931
932
def __init__(
933
self,
934
s3_uri: str,
935
local_path: str = "/opt/braket/checkpoints"
936
):
937
"""
938
Initialize checkpoint configuration.
939
940
Args:
941
s3_uri: S3 URI for checkpoint storage
942
local_path: Local checkpoint directory
943
"""
944
self.s3_uri = s3_uri
945
self.local_path = local_path
946
947
class OutputDataConfig:
948
"""Configuration for job output data."""
949
950
def __init__(
951
self,
952
s3_path: str,
953
kms_key_id: str = None
954
):
955
"""
956
Initialize output data configuration.
957
958
Args:
959
s3_path: S3 path for output data
960
kms_key_id: KMS key for encryption
961
"""
962
self.s3_path = s3_path
963
self.kms_key_id = kms_key_id
964
965
# Advanced job configuration
966
@hybrid_job(
967
device="ml.p3.2xlarge", # GPU instance for ML
968
instance_config=InstanceConfig("ml.p3.2xlarge", instance_count=1),
969
checkpoint_config=CheckpointConfig("s3://my-bucket/checkpoints/"),
970
output_data_config=OutputDataConfig("s3://my-bucket/results/")
971
)
972
def advanced_variational_algorithm():
973
"""
974
Advanced hybrid algorithm with ML and checkpointing.
975
976
Demonstrates quantum machine learning with classical neural networks
977
and comprehensive checkpoint management.
978
"""
979
import torch
980
import torch.nn as nn
981
from braket.jobs import (
982
save_job_checkpoint, load_job_checkpoint,
983
get_hyperparameters, save_job_result
984
)
985
986
# Get job hyperparameters
987
hyperparams = get_hyperparameters()
988
learning_rate = hyperparams.get('learning_rate', 0.01)
989
epochs = hyperparams.get('epochs', 100)
990
991
# Define hybrid model
992
class HybridQuantumClassicalModel(nn.Module):
993
def __init__(self):
994
super().__init__()
995
self.classical_layers = nn.Sequential(
996
nn.Linear(4, 16),
997
nn.ReLU(),
998
nn.Linear(16, 2) # 2 quantum parameters
999
)
1000
1001
def quantum_layer(self, params):
1002
"""Quantum variational layer."""
1003
from braket.circuits import Circuit
1004
from braket.circuits.gates import Ry, CNot
1005
from braket.circuits.observables import Z
1006
1007
circuit = Circuit()
1008
circuit.ry(0, params[0])
1009
circuit.ry(1, params[1])
1010
circuit.cnot(0, 1)
1011
circuit.expectation(observable=Z() @ Z(), target=[0, 1])
1012
1013
device_arn = get_job_device_arn()
1014
device = AwsDevice(device_arn)
1015
task = device.run(circuit, shots=1000)
1016
return task.result().values[0]
1017
1018
def forward(self, x):
1019
classical_out = self.classical_layers(x)
1020
quantum_out = self.quantum_layer(classical_out)
1021
return quantum_out
1022
1023
# Training loop with checkpointing
1024
model = HybridQuantumClassicalModel()
1025
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
1026
1027
# Load checkpoint if exists
1028
try:
1029
checkpoint = load_job_checkpoint()
1030
model.load_state_dict(checkpoint['model_state'])
1031
optimizer.load_state_dict(checkpoint['optimizer_state'])
1032
start_epoch = checkpoint['epoch']
1033
except:
1034
start_epoch = 0
1035
1036
# Training loop (simplified)
1037
for epoch in range(start_epoch, epochs):
1038
# Training step would go here
1039
loss = torch.randn(1) # Placeholder
1040
1041
# Save checkpoint every 10 epochs
1042
if epoch % 10 == 0:
1043
save_job_checkpoint({
1044
'model_state': model.state_dict(),
1045
'optimizer_state': optimizer.state_dict(),
1046
'epoch': epoch,
1047
'loss': float(loss)
1048
})
1049
1050
# Save final results
1051
save_job_result({
1052
'final_model_state': model.state_dict(),
1053
'training_loss': float(loss),
1054
'total_epochs': epochs
1055
})
1056
```
1057
1058
### Job Environment and Utilities
1059
1060
```python { .api }
1061
from braket.jobs import (
1062
get_checkpoint_dir, get_hyperparameters, get_input_data_dir,
1063
get_job_device_arn, get_job_name, get_results_dir
1064
)
1065
1066
def get_job_device_arn() -> str:
1067
"""
1068
Get quantum device ARN for current job.
1069
1070
Returns:
1071
str: Device ARN assigned to job
1072
"""
1073
pass
1074
1075
def get_job_name() -> str:
1076
"""
1077
Get current job name.
1078
1079
Returns:
1080
str: Job name
1081
"""
1082
pass
1083
1084
def get_hyperparameters() -> dict:
1085
"""
1086
Get job hyperparameters.
1087
1088
Returns:
1089
dict: Hyperparameter dictionary
1090
"""
1091
pass
1092
1093
def get_input_data_dir(channel: str = "training") -> str:
1094
"""
1095
Get input data directory path.
1096
1097
Args:
1098
channel: Data channel name
1099
1100
Returns:
1101
str: Input data directory path
1102
"""
1103
pass
1104
1105
def get_results_dir() -> str:
1106
"""
1107
Get results output directory path.
1108
1109
Returns:
1110
str: Results directory path
1111
"""
1112
pass
1113
1114
def get_checkpoint_dir() -> str:
1115
"""
1116
Get checkpoint directory path.
1117
1118
Returns:
1119
str: Checkpoint directory path
1120
"""
1121
pass
1122
1123
def save_job_checkpoint(checkpoint_data: dict) -> None:
1124
"""
1125
Save job checkpoint data.
1126
1127
Args:
1128
checkpoint_data: Data to checkpoint
1129
"""
1130
pass
1131
1132
def load_job_checkpoint() -> dict:
1133
"""
1134
Load job checkpoint data.
1135
1136
Returns:
1137
dict: Loaded checkpoint data
1138
"""
1139
pass
1140
1141
def save_job_result(result_data: dict) -> None:
1142
"""
1143
Save job result data.
1144
1145
Args:
1146
result_data: Results to save
1147
"""
1148
pass
1149
1150
def load_job_result(job_arn: str) -> dict:
1151
"""
1152
Load results from completed job.
1153
1154
Args:
1155
job_arn: Job ARN
1156
1157
Returns:
1158
dict: Job results
1159
"""
1160
pass
1161
1162
# Job utilities example
1163
def comprehensive_job_setup() -> dict:
1164
"""
1165
Setup comprehensive job environment and configuration.
1166
1167
Returns:
1168
dict: Job environment information
1169
"""
1170
job_info = {
1171
'job_name': get_job_name(),
1172
'device_arn': get_job_device_arn(),
1173
'hyperparameters': get_hyperparameters(),
1174
'directories': {
1175
'input_data': get_input_data_dir(),
1176
'results': get_results_dir(),
1177
'checkpoints': get_checkpoint_dir()
1178
}
1179
}
1180
1181
# Setup logging
1182
import logging
1183
logging.basicConfig(
1184
level=logging.INFO,
1185
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
1186
handlers=[
1187
logging.FileHandler(f"{get_results_dir()}/job.log"),
1188
logging.StreamHandler()
1189
]
1190
)
1191
1192
logger = logging.getLogger('QuantumJob')
1193
logger.info(f"Starting job: {job_info['job_name']}")
1194
logger.info(f"Using device: {job_info['device_arn']}")
1195
1196
return job_info
1197
```
1198
1199
## Cost and Usage Tracking
1200
1201
### Resource Tracking
1202
1203
```python { .api }
1204
from braket.tracking import Tracker
1205
1206
class Tracker:
1207
"""Tracks quantum computing costs and resource usage."""
1208
1209
def __init__(self):
1210
"""Initialize cost tracker."""
1211
pass
1212
1213
def __enter__(self) -> 'Tracker':
1214
"""Context manager entry."""
1215
return self
1216
1217
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
1218
"""Context manager exit."""
1219
pass
1220
1221
def quantum_tasks_cost(self) -> float:
1222
"""
1223
Get total cost for quantum tasks.
1224
1225
Returns:
1226
float: Total cost in USD
1227
"""
1228
pass
1229
1230
def simulator_tasks_cost(self) -> float:
1231
"""
1232
Get total cost for simulator tasks.
1233
1234
Returns:
1235
float: Total cost in USD
1236
"""
1237
pass
1238
1239
def quantum_task_statistics(self) -> dict:
1240
"""
1241
Get detailed quantum task statistics.
1242
1243
Returns:
1244
dict: Task execution statistics
1245
"""
1246
pass
1247
1248
def simulator_task_statistics(self) -> dict:
1249
"""
1250
Get detailed simulator task statistics.
1251
1252
Returns:
1253
dict: Simulator execution statistics
1254
"""
1255
pass
1256
1257
def track_quantum_experiment_costs(experiment_function: callable, *args, **kwargs) -> dict:
1258
"""
1259
Track costs for quantum experiment execution.
1260
1261
Args:
1262
experiment_function: Function that runs quantum experiments
1263
*args: Arguments for experiment function
1264
**kwargs: Keyword arguments for experiment function
1265
1266
Returns:
1267
dict: Cost analysis and experiment results
1268
"""
1269
with Tracker() as tracker:
1270
# Run experiment
1271
results = experiment_function(*args, **kwargs)
1272
1273
# Collect cost information
1274
cost_analysis = {
1275
'total_cost': tracker.quantum_tasks_cost() + tracker.simulator_tasks_cost(),
1276
'quantum_cost': tracker.quantum_tasks_cost(),
1277
'simulator_cost': tracker.simulator_tasks_cost(),
1278
'quantum_stats': tracker.quantum_task_statistics(),
1279
'simulator_stats': tracker.simulator_task_statistics(),
1280
'cost_per_shot': 0.0,
1281
'results': results
1282
}
1283
1284
# Calculate cost per shot
1285
total_shots = (
1286
cost_analysis['quantum_stats'].get('shots', 0) +
1287
cost_analysis['simulator_stats'].get('shots', 0)
1288
)
1289
1290
if total_shots > 0:
1291
cost_analysis['cost_per_shot'] = cost_analysis['total_cost'] / total_shots
1292
1293
return cost_analysis
1294
1295
def optimize_experiment_for_cost(circuits: list, target_accuracy: float = 0.01) -> dict:
1296
"""
1297
Optimize experiment execution for cost while maintaining accuracy.
1298
1299
Args:
1300
circuits: List of quantum circuits to execute
1301
target_accuracy: Target measurement accuracy
1302
1303
Returns:
1304
dict: Optimized execution plan with cost estimates
1305
"""
1306
optimization_plan = {
1307
'execution_strategy': {},
1308
'estimated_costs': {},
1309
'recommendations': []
1310
}
1311
1312
# Analyze circuits for optimization opportunities
1313
for i, circuit in enumerate(circuits):
1314
circuit_analysis = {
1315
'qubit_count': circuit.qubit_count,
1316
'depth': circuit.depth,
1317
'gate_count': len(circuit.instructions)
1318
}
1319
1320
# Recommend device based on circuit characteristics
1321
if circuit.qubit_count <= 10 and circuit.depth <= 20:
1322
# Small circuit - use simulator
1323
recommended_device = "Local Simulator"
1324
estimated_cost = 0.0
1325
optimization_plan['recommendations'].append(
1326
f"Circuit {i}: Use local simulator (free) - small circuit"
1327
)
1328
elif circuit.qubit_count <= 20:
1329
# Medium circuit - use cloud simulator
1330
recommended_device = "AWS SV1"
1331
estimated_cost = 0.075 * (2 ** min(circuit.qubit_count, 17))
1332
optimization_plan['recommendations'].append(
1333
f"Circuit {i}: Use SV1 simulator - cost effective for {circuit.qubit_count} qubits"
1334
)
1335
else:
1336
# Large circuit - may need QPU or advanced simulator
1337
recommended_device = "AWS QPU or TN1"
1338
estimated_cost = 0.30 + (1000 * 0.0003) # Task fee + shots
1339
optimization_plan['recommendations'].append(
1340
f"Circuit {i}: Consider QPU for {circuit.qubit_count} qubits"
1341
)
1342
1343
optimization_plan['execution_strategy'][f'circuit_{i}'] = {
1344
'recommended_device': recommended_device,
1345
'estimated_cost': estimated_cost,
1346
'analysis': circuit_analysis
1347
}
1348
1349
# Calculate total estimated cost
1350
total_cost = sum(
1351
strategy['estimated_cost']
1352
for strategy in optimization_plan['execution_strategy'].values()
1353
)
1354
optimization_plan['estimated_costs']['total'] = total_cost
1355
1356
return optimization_plan
1357
```
1358
1359
## Device Reservations
1360
1361
### Direct Device Access
1362
1363
```python { .api }
1364
from braket.aws import DirectReservation
1365
1366
class DirectReservation:
1367
"""Direct quantum device reservation interface."""
1368
1369
def __init__(self, arn: str, aws_session: 'AwsSession' = None):
1370
"""
1371
Initialize device reservation.
1372
1373
Args:
1374
arn: Reservation ARN
1375
aws_session: AWS session for API calls
1376
"""
1377
self.arn = arn
1378
self.aws_session = aws_session
1379
1380
@property
1381
def device_arn(self) -> str:
1382
"""Reserved device ARN."""
1383
pass
1384
1385
@property
1386
def start_time(self) -> str:
1387
"""Reservation start time."""
1388
pass
1389
1390
@property
1391
def end_time(self) -> str:
1392
"""Reservation end time."""
1393
pass
1394
1395
@property
1396
def status(self) -> str:
1397
"""Reservation status."""
1398
pass
1399
1400
def cancel(self) -> None:
1401
"""Cancel reservation."""
1402
pass
1403
1404
def create_device_reservation(device_arn: str, start_time: str, duration_minutes: int) -> DirectReservation:
1405
"""
1406
Create direct device reservation.
1407
1408
Args:
1409
device_arn: Target device ARN
1410
start_time: ISO format start time
1411
duration_minutes: Reservation duration in minutes
1412
1413
Returns:
1414
DirectReservation: Device reservation handle
1415
"""
1416
# This is a placeholder - actual implementation would use AWS API
1417
pass
1418
1419
def execute_with_reservation(reservation: DirectReservation, circuits: list) -> dict:
1420
"""
1421
Execute circuits using device reservation.
1422
1423
Args:
1424
reservation: Active device reservation
1425
circuits: Circuits to execute during reservation
1426
1427
Returns:
1428
dict: Execution results and reservation utilization
1429
"""
1430
device = AwsDevice(reservation.device_arn)
1431
1432
results = []
1433
for circuit in circuits:
1434
task = device.run(circuit, shots=1000, reservation_arn=reservation.arn)
1435
results.append(task.result())
1436
1437
return {
1438
'results': results,
1439
'reservation_info': {
1440
'device_arn': reservation.device_arn,
1441
'start_time': reservation.start_time,
1442
'end_time': reservation.end_time,
1443
'utilization': len(circuits)
1444
}
1445
}
1446
```
1447
1448
This comprehensive AWS integration documentation covers all aspects of cloud quantum computing with Amazon Braket, including device management, task execution, job orchestration, cost optimization, and resource tracking.