An open source library for interacting with quantum computing devices on Amazon Braket
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Complete AWS cloud integration for quantum computing on Amazon Braket, including device access, task management, sessions, jobs, and cost tracking.
from braket.aws import (
AwsDevice, AwsDeviceType, AwsQuantumTask, AwsQuantumTaskBatch,
AwsQuantumJob, AwsSession, DirectReservation
)
from braket.jobs import (
hybrid_job, CheckpointConfig, InstanceConfig, OutputDataConfig,
S3DataSourceConfig, StoppingCondition, LocalQuantumJob,
load_job_checkpoint, load_job_result, save_job_checkpoint, save_job_result,
get_checkpoint_dir, get_hyperparameters, get_input_data_dir,
get_job_device_arn, get_job_name, get_results_dir,
Framework, retrieve_image
)
from braket.tracking import Tracker
from braket.tasks import QuantumTask, QuantumTaskBatchfrom braket.aws import AwsDevice, AwsDeviceType
from enum import Enum
class AwsDeviceType(Enum):
"""AWS Braket device type enumeration."""
SIMULATOR = "SIMULATOR"
QPU = "QPU"
class AwsDevice:
"""Amazon Braket implementation of quantum devices."""
def __init__(self, arn: str, aws_session: 'AwsSession' = None):
"""
Initialize AWS quantum device.
Args:
arn: AWS device ARN
aws_session: Optional AWS session, creates default if None
"""
self.arn = arn
self.aws_session = aws_session
@staticmethod
def get_devices(
arns: list[str] = None,
names: list[str] = None,
types: list[AwsDeviceType] = None,
statuses: list[str] = None,
provider_names: list[str] = None,
aws_session: 'AwsSession' = None
) -> list['AwsDevice']:
"""
Get AWS Braket devices matching criteria.
Args:
arns: Device ARNs to filter by
names: Device names to filter by
types: Device types to filter by
statuses: Device statuses to filter by
provider_names: Provider names to filter by
aws_session: AWS session for API calls
Returns:
list[AwsDevice]: Matching devices
"""
pass
def run(
self,
task_specification,
shots: int = 1000,
inputs: dict = None,
gate_definitions: dict = None,
reservation_arn: str = None,
tags: dict = None
) -> 'AwsQuantumTask':
"""
Execute quantum task on AWS device.
Args:
task_specification: Circuit, AHS program, or annealing problem
shots: Number of measurement shots
inputs: Input parameters for parameterized programs
gate_definitions: Custom gate definitions
reservation_arn: Device reservation ARN
tags: AWS resource tags
Returns:
AwsQuantumTask: Quantum task execution handle
"""
pass
def run_batch(
self,
task_specifications: list,
shots: int = 1000,
max_parallel: int = 10,
inputs: list[dict] = None,
tags: dict = None
) -> 'AwsQuantumTaskBatch':
"""
Execute batch of quantum tasks.
Args:
task_specifications: List of circuits/programs
shots: Number of measurement shots per task
max_parallel: Maximum parallel executions
inputs: Input parameters for each task
tags: AWS resource tags
Returns:
AwsQuantumTaskBatch: Batch execution handle
"""
pass
@property
def name(self) -> str:
"""Device name."""
pass
@property
def status(self) -> str:
"""Current device status."""
pass
@property
def type(self) -> AwsDeviceType:
"""Device type (QPU or Simulator)."""
pass
@property
def provider_name(self) -> str:
"""Device provider name."""
pass
@property
def properties(self) -> dict:
"""Device capabilities and properties."""
pass
# Device discovery examples
def discover_aws_devices() -> dict[str, list[AwsDevice]]:
"""
Discover and categorize available AWS Braket devices.
Returns:
dict: Categorized devices by provider and type
"""
devices = {
'simulators': AwsDevice.get_devices(types=[AwsDeviceType.SIMULATOR]),
'qpus': AwsDevice.get_devices(types=[AwsDeviceType.QPU]),
'ionq': AwsDevice.get_devices(provider_names=['IonQ']),
'rigetti': AwsDevice.get_devices(provider_names=['Rigetti']),
'oqc': AwsDevice.get_devices(provider_names=['Oxford Quantum Computing']),
'quera': AwsDevice.get_devices(provider_names=['QuEra'])
}
return devices
def select_optimal_device(circuit, requirements: dict) -> AwsDevice:
"""
Select optimal device based on circuit and requirements.
Args:
circuit: Quantum circuit to execute
requirements: Performance and cost requirements
Returns:
AwsDevice: Selected optimal device
"""
all_devices = AwsDevice.get_devices()
# Filter by availability and qubit count
available_devices = []
for device in all_devices:
if (device.status == 'ONLINE' and
device.properties.get('qubitCount', 0) >= circuit.qubit_count):
available_devices.append(device)
# Select based on requirements
if requirements.get('cost_priority', False):
# Prefer simulators for cost
simulators = [d for d in available_devices if d.type == AwsDeviceType.SIMULATOR]
return simulators[0] if simulators else available_devices[0]
elif requirements.get('real_hardware', False):
# Prefer QPUs for real quantum effects
qpus = [d for d in available_devices if d.type == AwsDeviceType.QPU]
return qpus[0] if qpus else available_devices[0]
else:
# Default selection
return available_devices[0]def analyze_device_capabilities(device: AwsDevice) -> dict:
"""
Analyze device capabilities and constraints.
Args:
device: AWS device to analyze
Returns:
dict: Comprehensive device capability analysis
"""
properties = device.properties
capabilities = {
'basic_info': {
'name': device.name,
'provider': device.provider_name,
'type': device.type.value,
'status': device.status
},
'quantum_specs': {},
'gate_set': {},
'connectivity': {},
'timing_info': {},
'error_rates': {}
}
# Quantum specifications
if 'qubitCount' in properties:
capabilities['quantum_specs']['qubit_count'] = properties['qubitCount']
# Gate set analysis
if 'supportedOperations' in properties:
capabilities['gate_set'] = properties['supportedOperations']
# Connectivity graph
if 'connectivity' in properties:
capabilities['connectivity'] = properties['connectivity']
# Timing constraints
if 'timing' in properties:
capabilities['timing_info'] = properties['timing']
# Error characteristics
if 'fidelity' in properties:
capabilities['error_rates'] = properties['fidelity']
return capabilities
def check_circuit_compatibility(circuit, device: AwsDevice) -> dict:
"""
Check circuit compatibility with device.
Args:
circuit: Quantum circuit to validate
device: Target AWS device
Returns:
dict: Compatibility analysis results
"""
properties = device.properties
compatibility = {
'is_compatible': True,
'issues': [],
'warnings': [],
'recommendations': []
}
# Check qubit count
if circuit.qubit_count > properties.get('qubitCount', 0):
compatibility['is_compatible'] = False
compatibility['issues'].append(f"Circuit requires {circuit.qubit_count} qubits, device has {properties.get('qubitCount', 0)}")
# Check gate support
supported_gates = properties.get('supportedOperations', [])
for instruction in circuit.instructions:
gate_name = instruction.operator.__class__.__name__
if gate_name not in supported_gates:
compatibility['warnings'].append(f"Gate {gate_name} may not be natively supported")
# Check connectivity (for QPUs)
if device.type == AwsDeviceType.QPU and 'connectivity' in properties:
connectivity_graph = properties['connectivity']
for instruction in circuit.instructions:
if len(instruction.target) == 2: # Two-qubit gate
qubit_pair = tuple(instruction.target)
if qubit_pair not in connectivity_graph.get('fullyConnected', []):
compatibility['warnings'].append(f"Two-qubit gate on {qubit_pair} may require SWAP operations")
return compatibilityfrom braket.aws import AwsQuantumTask, AwsQuantumTaskBatch
from braket.tasks import QuantumTask
class AwsQuantumTask(QuantumTask):
"""AWS quantum task execution interface."""
def __init__(self, arn: str, aws_session: 'AwsSession' = None):
"""
Initialize AWS quantum task.
Args:
arn: Task ARN
aws_session: AWS session for API calls
"""
self.arn = arn
self.aws_session = aws_session
def result(self) -> 'TaskResult':
"""
Get task execution results.
Returns:
TaskResult: Quantum task results
"""
pass
def state(self) -> str:
"""
Get current task state.
Returns:
str: Task state (QUEUED, RUNNING, COMPLETED, FAILED, CANCELLED)
"""
pass
def cancel(self) -> None:
"""Cancel running task."""
pass
def metadata(self) -> dict:
"""
Get task metadata.
Returns:
dict: Task execution metadata
"""
pass
@property
def id(self) -> str:
"""Task ID."""
pass
@staticmethod
def get(arn: str, aws_session: 'AwsSession' = None) -> 'AwsQuantumTask':
"""
Get existing task by ARN.
Args:
arn: Task ARN
aws_session: AWS session
Returns:
AwsQuantumTask: Task instance
"""
pass
class AwsQuantumTaskBatch:
"""Batch execution of quantum tasks."""
def __init__(self, tasks: list[AwsQuantumTask]):
"""
Initialize task batch.
Args:
tasks: List of quantum tasks
"""
self.tasks = tasks
def results(self) -> list:
"""
Get results from all tasks in batch.
Returns:
list: Results from completed tasks
"""
pass
def unsuccessful(self) -> list[AwsQuantumTask]:
"""
Get unsuccessful tasks.
Returns:
list[AwsQuantumTask]: Failed or cancelled tasks
"""
pass
def retry_unsuccessful_tasks(self) -> 'AwsQuantumTaskBatch':
"""
Retry failed tasks.
Returns:
AwsQuantumTaskBatch: New batch with retried tasks
"""
pass
# Task execution examples
def execute_quantum_task_with_monitoring(circuit, device: AwsDevice) -> dict:
"""
Execute quantum task with comprehensive monitoring.
Args:
circuit: Quantum circuit to execute
device: Target AWS device
Returns:
dict: Task results and execution information
"""
import time
# Submit task
task = device.run(circuit, shots=1000)
execution_info = {
'task_arn': task.arn,
'submission_time': time.time(),
'device_arn': device.arn,
'status_history': []
}
# Monitor execution
while task.state() not in ['COMPLETED', 'FAILED', 'CANCELLED']:
status = task.state()
execution_info['status_history'].append({
'status': status,
'timestamp': time.time()
})
print(f"Task status: {status}")
time.sleep(5) # Check every 5 seconds
# Get final results
if task.state() == 'COMPLETED':
result = task.result()
execution_info['completion_time'] = time.time()
execution_info['results'] = {
'measurement_counts': result.measurement_counts,
'task_metadata': result.task_metadata
}
else:
execution_info['error_info'] = task.metadata()
return execution_info
def execute_parameter_sweep(circuit_template, parameter_ranges: dict, device: AwsDevice) -> dict:
"""
Execute parameter sweep across quantum circuit parameters.
Args:
circuit_template: Parameterized circuit template
parameter_ranges: Parameter ranges to sweep
device: Target AWS device
Returns:
dict: Parameter sweep results
"""
import itertools
from braket.parametric import FreeParameter
# Generate parameter combinations
param_names = list(parameter_ranges.keys())
param_values = list(parameter_ranges.values())
param_combinations = list(itertools.product(*param_values))
# Create circuits for each parameter combination
circuits = []
for combination in param_combinations:
param_dict = dict(zip(param_names, combination))
circuit = circuit_template.copy()
# Bind parameters
for param_name, value in param_dict.items():
circuit = circuit.substitute_free_parameter(FreeParameter(param_name), value)
circuits.append((circuit, param_dict))
# Execute batch
task_specs = [circuit for circuit, _ in circuits]
batch = device.run_batch(task_specs, shots=1000)
# Collect results
results = batch.results()
sweep_results = {
'parameter_combinations': [params for _, params in circuits],
'results': results,
'analysis': {}
}
# Analyze results
for i, (result, params) in enumerate(zip(results, sweep_results['parameter_combinations'])):
sweep_results['analysis'][str(params)] = {
'measurement_counts': result.measurement_counts,
'expectation_values': getattr(result, 'values', [])
}
return sweep_resultsfrom braket.aws import AwsSession
import boto3
class AwsSession:
"""AWS authentication and session management."""
def __init__(
self,
boto_session: boto3.Session = None,
braket_user_agents: list[str] = None,
config: dict = None
):
"""
Initialize AWS session for Braket.
Args:
boto_session: Pre-configured boto3 session
braket_user_agents: Custom user agent strings
config: Session configuration options
"""
self.boto_session = boto_session
self.braket_user_agents = braket_user_agents
self.config = config or {}
@property
def region(self) -> str:
"""Current AWS region."""
pass
@property
def account_id(self) -> str:
"""AWS account ID."""
pass
def get_device(self, arn: str) -> AwsDevice:
"""
Get device using this session.
Args:
arn: Device ARN
Returns:
AwsDevice: Device with this session
"""
pass
def create_aws_session_with_profile(profile_name: str, region: str = 'us-east-1') -> AwsSession:
"""
Create AWS session using named profile.
Args:
profile_name: AWS credential profile name
region: AWS region for Braket services
Returns:
AwsSession: Configured session
"""
boto_session = boto3.Session(profile_name=profile_name, region_name=region)
return AwsSession(boto_session=boto_session)
def create_aws_session_with_keys(access_key: str, secret_key: str, region: str = 'us-east-1') -> AwsSession:
"""
Create AWS session using access keys.
Args:
access_key: AWS access key ID
secret_key: AWS secret access key
region: AWS region for Braket services
Returns:
AwsSession: Configured session
"""
boto_session = boto3.Session(
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region
)
return AwsSession(boto_session=boto_session)
def validate_aws_session(session: AwsSession) -> dict:
"""
Validate AWS session configuration and permissions.
Args:
session: AWS session to validate
Returns:
dict: Validation results and recommendations
"""
validation = {
'is_valid': True,
'region': session.region,
'account_id': session.account_id,
'permissions': {},
'recommendations': []
}
try:
# Test basic Braket permissions
devices = AwsDevice.get_devices(aws_session=session)
validation['permissions']['device_access'] = True
validation['permissions']['accessible_devices'] = len(devices)
except Exception as e:
validation['is_valid'] = False
validation['permissions']['device_access'] = False
validation['error'] = str(e)
validation['recommendations'].append("Check Braket service permissions")
# Check for cost optimization
if validation['region'] != 'us-east-1':
validation['recommendations'].append("Consider us-east-1 for lowest latency to Braket services")
return validationfrom braket.aws import DirectReservation
class DirectReservation:
"""Context manager for using reserved quantum devices on AWS Braket."""
def __init__(
self,
device,
reservation_arn: str = None
):
"""
Initialize direct reservation context.
Args:
device: Braket device or device ARN with reservation
reservation_arn: AWS Braket Direct reservation ARN
"""
self.device = device
self.reservation_arn = reservation_arn
def __enter__(self) -> 'DirectReservation':
"""
Enter reservation context and apply reservation to quantum tasks.
Returns:
DirectReservation: Self for context management
"""
pass
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit reservation context and restore normal device access."""
pass
def start(self) -> 'DirectReservation':
"""
Start reservation without context manager.
Returns:
DirectReservation: Self for method chaining
"""
pass
def stop(self) -> 'DirectReservation':
"""
Stop reservation manually.
Returns:
DirectReservation: Self for method chaining
"""
pass
# Reservation management examples
def use_device_reservation(device_arn: str, reservation_arn: str, circuits: list) -> dict:
"""
Execute circuits using a reserved quantum device.
Args:
device_arn: AWS device ARN with reservation
reservation_arn: Reservation ARN for exclusive access
circuits: List of quantum circuits to execute
Returns:
dict: Execution results with reservation metadata
"""
from braket.aws import AwsDevice
device = AwsDevice(device_arn)
reservation_results = {
'reservation_arn': reservation_arn,
'device_arn': device_arn,
'circuits_executed': len(circuits),
'results': [],
'cost_analysis': {},
'reservation_utilization': {}
}
# Execute circuits within reservation context
with DirectReservation(device=device, reservation_arn=reservation_arn):
for i, circuit in enumerate(circuits):
task = device.run(circuit, shots=1000)
result = task.result()
reservation_results['results'].append({
'circuit_index': i,
'task_arn': task.arn,
'measurement_counts': result.measurement_counts,
'execution_duration': result.task_metadata.get('executionDuration', 'unknown')
})
# Analyze reservation utilization
reservation_results['reservation_utilization'] = {
'total_tasks': len(circuits),
'estimated_time_savings': 'No queue wait time',
'dedicated_access': True,
'billing_model': 'Reservation-based'
}
return reservation_results
def manage_reservation_lifecycle(device_arn: str) -> dict:
"""
Demonstrate reservation lifecycle management.
Args:
device_arn: AWS device ARN for reservation
Returns:
dict: Reservation management workflow
"""
reservation_workflow = {
'phases': [
'reservation_planning',
'reservation_creation',
'reservation_usage',
'reservation_monitoring',
'reservation_completion'
],
'implementation_guide': {}
}
# Reservation planning phase
planning_recommendations = {
'optimal_duration': 'Based on workload analysis',
'cost_estimation': {
'reserved_vs_on_demand': 'Calculate savings for extended usage',
'minimum_duration': '1 hour minimum for most devices',
'cancellation_policy': 'Check device-specific policies'
},
'scheduling_considerations': [
'Device availability windows',
'Time zone coordination for team access',
'Buffer time for job completion'
]
}
# Usage optimization
usage_optimization = {
'batch_execution': 'Group circuits for efficient execution',
'parallel_jobs': 'Use multiple job slots if available',
'monitoring_tools': 'Track utilization during reservation',
'fallback_strategy': 'Plan for unexpected issues'
}
reservation_workflow['implementation_guide'] = {
'planning': planning_recommendations,
'optimization': usage_optimization,
'monitoring': {
'key_metrics': ['utilization_rate', 'cost_per_shot', 'queue_time_saved'],
'alerting': 'Set up notifications for underutilization'
}
}
return reservation_workflow
def calculate_reservation_cost_benefit(usage_pattern: dict, device_pricing: dict) -> dict:
"""
Calculate cost benefits of device reservations vs on-demand usage.
Args:
usage_pattern: Expected usage patterns
device_pricing: Device pricing information
Returns:
dict: Cost-benefit analysis for reservations
"""
analysis = {
'usage_input': usage_pattern,
'pricing_input': device_pricing,
'cost_comparison': {},
'recommendation': {}
}
# Calculate on-demand costs
shots_per_month = usage_pattern.get('shots_per_month', 100000)
on_demand_rate = device_pricing.get('per_shot', 0.00075) # Example pricing
on_demand_monthly_cost = shots_per_month * on_demand_rate
# Calculate reservation costs (example)
reservation_hourly_rate = device_pricing.get('reservation_hourly', 1.50)
hours_per_month = usage_pattern.get('hours_per_month', 40)
reservation_monthly_cost = hours_per_month * reservation_hourly_rate
# Cost comparison
monthly_savings = on_demand_monthly_cost - reservation_monthly_cost
break_even_shots = reservation_monthly_cost / on_demand_rate
analysis['cost_comparison'] = {
'on_demand_monthly': on_demand_monthly_cost,
'reservation_monthly': reservation_monthly_cost,
'monthly_savings': monthly_savings,
'savings_percentage': (monthly_savings / on_demand_monthly_cost) * 100 if on_demand_monthly_cost > 0 else 0,
'break_even_shots': break_even_shots
}
# Recommendation
if monthly_savings > 0:
analysis['recommendation'] = {
'use_reservations': True,
'reason': f'Save ${monthly_savings:.2f} per month ({analysis["cost_comparison"]["savings_percentage"]:.1f}%)',
'optimal_reservation_hours': hours_per_month
}
else:
analysis['recommendation'] = {
'use_reservations': False,
'reason': 'On-demand pricing more cost-effective for current usage',
'threshold_usage': f'Consider reservations above {break_even_shots:.0f} shots/month'
}
return analysisfrom braket.jobs import hybrid_job, InstanceConfig, CheckpointConfig, OutputDataConfig
@hybrid_job(device="ml.m5.large")
def basic_hybrid_algorithm():
"""
Basic hybrid quantum-classical algorithm.
This job demonstrates the structure of hybrid algorithms
combining quantum circuit execution with classical optimization.
"""
from braket.aws import AwsDevice
from braket.jobs import get_job_device_arn, save_job_result
import numpy as np
# Get quantum device for this job
device_arn = get_job_device_arn()
device = AwsDevice(device_arn)
def quantum_cost_function(params: np.ndarray) -> float:
"""Quantum cost function using variational circuit."""
from braket.circuits import Circuit
from braket.circuits.gates import Ry, CNot
from braket.circuits.observables import Z
circuit = Circuit()
for i, param in enumerate(params):
circuit.ry(i, param)
circuit.cnot(0, 1)
circuit.expectation(observable=Z() @ Z(), target=[0, 1])
task = device.run(circuit, shots=1000)
result = task.result()
return result.values[0]
# Classical optimization
from scipy.optimize import minimize
initial_params = np.random.random(2) * np.pi
result = minimize(
quantum_cost_function,
initial_params,
method='COBYLA',
options={'maxiter': 50}
)
# Save results
save_job_result({
'optimal_parameters': result.x.tolist(),
'optimal_value': float(result.fun),
'optimization_success': result.success
})
class InstanceConfig:
"""Configuration for job compute instance."""
def __init__(
self,
instance_type: str,
instance_count: int = 1,
volume_size_gb: int = 30
):
"""
Initialize instance configuration.
Args:
instance_type: EC2 instance type (e.g., 'ml.m5.large')
instance_count: Number of instances
volume_size_gb: EBS volume size in GB
"""
self.instance_type = instance_type
self.instance_count = instance_count
self.volume_size_gb = volume_size_gb
class CheckpointConfig:
"""Configuration for job checkpointing."""
def __init__(
self,
s3_uri: str,
local_path: str = "/opt/braket/checkpoints"
):
"""
Initialize checkpoint configuration.
Args:
s3_uri: S3 URI for checkpoint storage
local_path: Local checkpoint directory
"""
self.s3_uri = s3_uri
self.local_path = local_path
class OutputDataConfig:
"""Configuration for job output data."""
def __init__(
self,
s3_path: str,
kms_key_id: str = None
):
"""
Initialize output data configuration.
Args:
s3_path: S3 path for output data
kms_key_id: KMS key for encryption
"""
self.s3_path = s3_path
self.kms_key_id = kms_key_id
# Advanced job configuration
@hybrid_job(
device="ml.p3.2xlarge", # GPU instance for ML
instance_config=InstanceConfig("ml.p3.2xlarge", instance_count=1),
checkpoint_config=CheckpointConfig("s3://my-bucket/checkpoints/"),
output_data_config=OutputDataConfig("s3://my-bucket/results/")
)
def advanced_variational_algorithm():
"""
Advanced hybrid algorithm with ML and checkpointing.
Demonstrates quantum machine learning with classical neural networks
and comprehensive checkpoint management.
"""
import torch
import torch.nn as nn
from braket.jobs import (
save_job_checkpoint, load_job_checkpoint,
get_hyperparameters, save_job_result
)
# Get job hyperparameters
hyperparams = get_hyperparameters()
learning_rate = hyperparams.get('learning_rate', 0.01)
epochs = hyperparams.get('epochs', 100)
# Define hybrid model
class HybridQuantumClassicalModel(nn.Module):
def __init__(self):
super().__init__()
self.classical_layers = nn.Sequential(
nn.Linear(4, 16),
nn.ReLU(),
nn.Linear(16, 2) # 2 quantum parameters
)
def quantum_layer(self, params):
"""Quantum variational layer."""
from braket.circuits import Circuit
from braket.circuits.gates import Ry, CNot
from braket.circuits.observables import Z
circuit = Circuit()
circuit.ry(0, params[0])
circuit.ry(1, params[1])
circuit.cnot(0, 1)
circuit.expectation(observable=Z() @ Z(), target=[0, 1])
device_arn = get_job_device_arn()
device = AwsDevice(device_arn)
task = device.run(circuit, shots=1000)
return task.result().values[0]
def forward(self, x):
classical_out = self.classical_layers(x)
quantum_out = self.quantum_layer(classical_out)
return quantum_out
# Training loop with checkpointing
model = HybridQuantumClassicalModel()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
# Load checkpoint if exists
try:
checkpoint = load_job_checkpoint()
model.load_state_dict(checkpoint['model_state'])
optimizer.load_state_dict(checkpoint['optimizer_state'])
start_epoch = checkpoint['epoch']
except:
start_epoch = 0
# Training loop (simplified)
for epoch in range(start_epoch, epochs):
# Training step would go here
loss = torch.randn(1) # Placeholder
# Save checkpoint every 10 epochs
if epoch % 10 == 0:
save_job_checkpoint({
'model_state': model.state_dict(),
'optimizer_state': optimizer.state_dict(),
'epoch': epoch,
'loss': float(loss)
})
# Save final results
save_job_result({
'final_model_state': model.state_dict(),
'training_loss': float(loss),
'total_epochs': epochs
})from braket.jobs import (
get_checkpoint_dir, get_hyperparameters, get_input_data_dir,
get_job_device_arn, get_job_name, get_results_dir
)
def get_job_device_arn() -> str:
"""
Get quantum device ARN for current job.
Returns:
str: Device ARN assigned to job
"""
pass
def get_job_name() -> str:
"""
Get current job name.
Returns:
str: Job name
"""
pass
def get_hyperparameters() -> dict:
"""
Get job hyperparameters.
Returns:
dict: Hyperparameter dictionary
"""
pass
def get_input_data_dir(channel: str = "training") -> str:
"""
Get input data directory path.
Args:
channel: Data channel name
Returns:
str: Input data directory path
"""
pass
def get_results_dir() -> str:
"""
Get results output directory path.
Returns:
str: Results directory path
"""
pass
def get_checkpoint_dir() -> str:
"""
Get checkpoint directory path.
Returns:
str: Checkpoint directory path
"""
pass
def save_job_checkpoint(checkpoint_data: dict) -> None:
"""
Save job checkpoint data.
Args:
checkpoint_data: Data to checkpoint
"""
pass
def load_job_checkpoint() -> dict:
"""
Load job checkpoint data.
Returns:
dict: Loaded checkpoint data
"""
pass
def save_job_result(result_data: dict) -> None:
"""
Save job result data.
Args:
result_data: Results to save
"""
pass
def load_job_result(job_arn: str) -> dict:
"""
Load results from completed job.
Args:
job_arn: Job ARN
Returns:
dict: Job results
"""
pass
# Job utilities example
def comprehensive_job_setup() -> dict:
"""
Setup comprehensive job environment and configuration.
Returns:
dict: Job environment information
"""
job_info = {
'job_name': get_job_name(),
'device_arn': get_job_device_arn(),
'hyperparameters': get_hyperparameters(),
'directories': {
'input_data': get_input_data_dir(),
'results': get_results_dir(),
'checkpoints': get_checkpoint_dir()
}
}
# Setup logging
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f"{get_results_dir()}/job.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger('QuantumJob')
logger.info(f"Starting job: {job_info['job_name']}")
logger.info(f"Using device: {job_info['device_arn']}")
return job_infofrom braket.tracking import Tracker
class Tracker:
"""Tracks quantum computing costs and resource usage."""
def __init__(self):
"""Initialize cost tracker."""
pass
def __enter__(self) -> 'Tracker':
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit."""
pass
def quantum_tasks_cost(self) -> float:
"""
Get total cost for quantum tasks.
Returns:
float: Total cost in USD
"""
pass
def simulator_tasks_cost(self) -> float:
"""
Get total cost for simulator tasks.
Returns:
float: Total cost in USD
"""
pass
def quantum_task_statistics(self) -> dict:
"""
Get detailed quantum task statistics.
Returns:
dict: Task execution statistics
"""
pass
def simulator_task_statistics(self) -> dict:
"""
Get detailed simulator task statistics.
Returns:
dict: Simulator execution statistics
"""
pass
def track_quantum_experiment_costs(experiment_function: callable, *args, **kwargs) -> dict:
"""
Track costs for quantum experiment execution.
Args:
experiment_function: Function that runs quantum experiments
*args: Arguments for experiment function
**kwargs: Keyword arguments for experiment function
Returns:
dict: Cost analysis and experiment results
"""
with Tracker() as tracker:
# Run experiment
results = experiment_function(*args, **kwargs)
# Collect cost information
cost_analysis = {
'total_cost': tracker.quantum_tasks_cost() + tracker.simulator_tasks_cost(),
'quantum_cost': tracker.quantum_tasks_cost(),
'simulator_cost': tracker.simulator_tasks_cost(),
'quantum_stats': tracker.quantum_task_statistics(),
'simulator_stats': tracker.simulator_task_statistics(),
'cost_per_shot': 0.0,
'results': results
}
# Calculate cost per shot
total_shots = (
cost_analysis['quantum_stats'].get('shots', 0) +
cost_analysis['simulator_stats'].get('shots', 0)
)
if total_shots > 0:
cost_analysis['cost_per_shot'] = cost_analysis['total_cost'] / total_shots
return cost_analysis
def optimize_experiment_for_cost(circuits: list, target_accuracy: float = 0.01) -> dict:
"""
Optimize experiment execution for cost while maintaining accuracy.
Args:
circuits: List of quantum circuits to execute
target_accuracy: Target measurement accuracy
Returns:
dict: Optimized execution plan with cost estimates
"""
optimization_plan = {
'execution_strategy': {},
'estimated_costs': {},
'recommendations': []
}
# Analyze circuits for optimization opportunities
for i, circuit in enumerate(circuits):
circuit_analysis = {
'qubit_count': circuit.qubit_count,
'depth': circuit.depth,
'gate_count': len(circuit.instructions)
}
# Recommend device based on circuit characteristics
if circuit.qubit_count <= 10 and circuit.depth <= 20:
# Small circuit - use simulator
recommended_device = "Local Simulator"
estimated_cost = 0.0
optimization_plan['recommendations'].append(
f"Circuit {i}: Use local simulator (free) - small circuit"
)
elif circuit.qubit_count <= 20:
# Medium circuit - use cloud simulator
recommended_device = "AWS SV1"
estimated_cost = 0.075 * (2 ** min(circuit.qubit_count, 17))
optimization_plan['recommendations'].append(
f"Circuit {i}: Use SV1 simulator - cost effective for {circuit.qubit_count} qubits"
)
else:
# Large circuit - may need QPU or advanced simulator
recommended_device = "AWS QPU or TN1"
estimated_cost = 0.30 + (1000 * 0.0003) # Task fee + shots
optimization_plan['recommendations'].append(
f"Circuit {i}: Consider QPU for {circuit.qubit_count} qubits"
)
optimization_plan['execution_strategy'][f'circuit_{i}'] = {
'recommended_device': recommended_device,
'estimated_cost': estimated_cost,
'analysis': circuit_analysis
}
# Calculate total estimated cost
total_cost = sum(
strategy['estimated_cost']
for strategy in optimization_plan['execution_strategy'].values()
)
optimization_plan['estimated_costs']['total'] = total_cost
return optimization_planfrom braket.aws import DirectReservation
class DirectReservation:
"""Direct quantum device reservation interface."""
def __init__(self, arn: str, aws_session: 'AwsSession' = None):
"""
Initialize device reservation.
Args:
arn: Reservation ARN
aws_session: AWS session for API calls
"""
self.arn = arn
self.aws_session = aws_session
@property
def device_arn(self) -> str:
"""Reserved device ARN."""
pass
@property
def start_time(self) -> str:
"""Reservation start time."""
pass
@property
def end_time(self) -> str:
"""Reservation end time."""
pass
@property
def status(self) -> str:
"""Reservation status."""
pass
def cancel(self) -> None:
"""Cancel reservation."""
pass
def create_device_reservation(device_arn: str, start_time: str, duration_minutes: int) -> DirectReservation:
"""
Create direct device reservation.
Args:
device_arn: Target device ARN
start_time: ISO format start time
duration_minutes: Reservation duration in minutes
Returns:
DirectReservation: Device reservation handle
"""
# This is a placeholder - actual implementation would use AWS API
pass
def execute_with_reservation(reservation: DirectReservation, circuits: list) -> dict:
"""
Execute circuits using device reservation.
Args:
reservation: Active device reservation
circuits: Circuits to execute during reservation
Returns:
dict: Execution results and reservation utilization
"""
device = AwsDevice(reservation.device_arn)
results = []
for circuit in circuits:
task = device.run(circuit, shots=1000, reservation_arn=reservation.arn)
results.append(task.result())
return {
'results': results,
'reservation_info': {
'device_arn': reservation.device_arn,
'start_time': reservation.start_time,
'end_time': reservation.end_time,
'utilization': len(circuits)
}
}This comprehensive AWS integration documentation covers all aspects of cloud quantum computing with Amazon Braket, including device management, task execution, job orchestration, cost optimization, and resource tracking.
Install with Tessl CLI
npx tessl i tessl/pypi-amazon-braket-sdk