CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-amazon-braket-sdk

An open source library for interacting with quantum computing devices on Amazon Braket

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

aws-integration.mddocs/

AWS Integration

Complete AWS cloud integration for quantum computing on Amazon Braket, including device access, task management, sessions, jobs, and cost tracking.

Core Imports

from braket.aws import (
    AwsDevice, AwsDeviceType, AwsQuantumTask, AwsQuantumTaskBatch, 
    AwsQuantumJob, AwsSession, DirectReservation
)
from braket.jobs import (
    hybrid_job, CheckpointConfig, InstanceConfig, OutputDataConfig,
    S3DataSourceConfig, StoppingCondition, LocalQuantumJob,
    load_job_checkpoint, load_job_result, save_job_checkpoint, save_job_result,
    get_checkpoint_dir, get_hyperparameters, get_input_data_dir, 
    get_job_device_arn, get_job_name, get_results_dir,
    Framework, retrieve_image
)
from braket.tracking import Tracker
from braket.tasks import QuantumTask, QuantumTaskBatch

AWS Device Management

Device Discovery and Access

from braket.aws import AwsDevice, AwsDeviceType
from enum import Enum

class AwsDeviceType(Enum):
    """AWS Braket device type enumeration."""
    SIMULATOR = "SIMULATOR"
    QPU = "QPU"

class AwsDevice:
    """Amazon Braket implementation of quantum devices."""
    
    def __init__(self, arn: str, aws_session: 'AwsSession' = None):
        """
        Initialize AWS quantum device.
        
        Args:
            arn: AWS device ARN
            aws_session: Optional AWS session, creates default if None
        """
        self.arn = arn
        self.aws_session = aws_session
    
    @staticmethod
    def get_devices(
        arns: list[str] = None, 
        names: list[str] = None,
        types: list[AwsDeviceType] = None,
        statuses: list[str] = None,
        provider_names: list[str] = None,
        aws_session: 'AwsSession' = None
    ) -> list['AwsDevice']:
        """
        Get AWS Braket devices matching criteria.
        
        Args:
            arns: Device ARNs to filter by
            names: Device names to filter by  
            types: Device types to filter by
            statuses: Device statuses to filter by
            provider_names: Provider names to filter by
            aws_session: AWS session for API calls
            
        Returns:
            list[AwsDevice]: Matching devices
        """
        pass
    
    def run(
        self, 
        task_specification, 
        shots: int = 1000,
        inputs: dict = None,
        gate_definitions: dict = None,
        reservation_arn: str = None,
        tags: dict = None
    ) -> 'AwsQuantumTask':
        """
        Execute quantum task on AWS device.
        
        Args:
            task_specification: Circuit, AHS program, or annealing problem
            shots: Number of measurement shots
            inputs: Input parameters for parameterized programs
            gate_definitions: Custom gate definitions
            reservation_arn: Device reservation ARN
            tags: AWS resource tags
            
        Returns:
            AwsQuantumTask: Quantum task execution handle
        """
        pass
    
    def run_batch(
        self,
        task_specifications: list,
        shots: int = 1000,
        max_parallel: int = 10,
        inputs: list[dict] = None,
        tags: dict = None
    ) -> 'AwsQuantumTaskBatch':
        """
        Execute batch of quantum tasks.
        
        Args:
            task_specifications: List of circuits/programs
            shots: Number of measurement shots per task
            max_parallel: Maximum parallel executions
            inputs: Input parameters for each task
            tags: AWS resource tags
            
        Returns:
            AwsQuantumTaskBatch: Batch execution handle
        """
        pass
    
    @property
    def name(self) -> str:
        """Device name."""
        pass
    
    @property
    def status(self) -> str:
        """Current device status."""
        pass
    
    @property
    def type(self) -> AwsDeviceType:
        """Device type (QPU or Simulator)."""
        pass
    
    @property
    def provider_name(self) -> str:
        """Device provider name."""
        pass
    
    @property
    def properties(self) -> dict:
        """Device capabilities and properties."""
        pass

# Device discovery examples
def discover_aws_devices() -> dict[str, list[AwsDevice]]:
    """
    Discover and categorize available AWS Braket devices.
    
    Returns:
        dict: Categorized devices by provider and type
    """
    devices = {
        'simulators': AwsDevice.get_devices(types=[AwsDeviceType.SIMULATOR]),
        'qpus': AwsDevice.get_devices(types=[AwsDeviceType.QPU]),
        'ionq': AwsDevice.get_devices(provider_names=['IonQ']),
        'rigetti': AwsDevice.get_devices(provider_names=['Rigetti']),
        'oqc': AwsDevice.get_devices(provider_names=['Oxford Quantum Computing']),
        'quera': AwsDevice.get_devices(provider_names=['QuEra'])
    }
    
    return devices

def select_optimal_device(circuit, requirements: dict) -> AwsDevice:
    """
    Select optimal device based on circuit and requirements.
    
    Args:
        circuit: Quantum circuit to execute
        requirements: Performance and cost requirements
        
    Returns:
        AwsDevice: Selected optimal device
    """
    all_devices = AwsDevice.get_devices()
    
    # Filter by availability and qubit count
    available_devices = []
    for device in all_devices:
        if (device.status == 'ONLINE' and 
            device.properties.get('qubitCount', 0) >= circuit.qubit_count):
            available_devices.append(device)
    
    # Select based on requirements
    if requirements.get('cost_priority', False):
        # Prefer simulators for cost
        simulators = [d for d in available_devices if d.type == AwsDeviceType.SIMULATOR]
        return simulators[0] if simulators else available_devices[0]
    
    elif requirements.get('real_hardware', False):
        # Prefer QPUs for real quantum effects
        qpus = [d for d in available_devices if d.type == AwsDeviceType.QPU]
        return qpus[0] if qpus else available_devices[0]
    
    else:
        # Default selection
        return available_devices[0]

Device Properties and Capabilities

def analyze_device_capabilities(device: AwsDevice) -> dict:
    """
    Analyze device capabilities and constraints.
    
    Args:
        device: AWS device to analyze
        
    Returns:
        dict: Comprehensive device capability analysis
    """
    properties = device.properties
    
    capabilities = {
        'basic_info': {
            'name': device.name,
            'provider': device.provider_name,
            'type': device.type.value,
            'status': device.status
        },
        'quantum_specs': {},
        'gate_set': {},
        'connectivity': {},
        'timing_info': {},
        'error_rates': {}
    }
    
    # Quantum specifications
    if 'qubitCount' in properties:
        capabilities['quantum_specs']['qubit_count'] = properties['qubitCount']
    
    # Gate set analysis
    if 'supportedOperations' in properties:
        capabilities['gate_set'] = properties['supportedOperations']
    
    # Connectivity graph
    if 'connectivity' in properties:
        capabilities['connectivity'] = properties['connectivity']
    
    # Timing constraints
    if 'timing' in properties:
        capabilities['timing_info'] = properties['timing']
    
    # Error characteristics
    if 'fidelity' in properties:
        capabilities['error_rates'] = properties['fidelity']
    
    return capabilities

def check_circuit_compatibility(circuit, device: AwsDevice) -> dict:
    """
    Check circuit compatibility with device.
    
    Args:
        circuit: Quantum circuit to validate
        device: Target AWS device
        
    Returns:
        dict: Compatibility analysis results
    """
    properties = device.properties
    
    compatibility = {
        'is_compatible': True,
        'issues': [],
        'warnings': [],
        'recommendations': []
    }
    
    # Check qubit count
    if circuit.qubit_count > properties.get('qubitCount', 0):
        compatibility['is_compatible'] = False
        compatibility['issues'].append(f"Circuit requires {circuit.qubit_count} qubits, device has {properties.get('qubitCount', 0)}")
    
    # Check gate support
    supported_gates = properties.get('supportedOperations', [])
    for instruction in circuit.instructions:
        gate_name = instruction.operator.__class__.__name__
        if gate_name not in supported_gates:
            compatibility['warnings'].append(f"Gate {gate_name} may not be natively supported")
    
    # Check connectivity (for QPUs)
    if device.type == AwsDeviceType.QPU and 'connectivity' in properties:
        connectivity_graph = properties['connectivity']
        for instruction in circuit.instructions:
            if len(instruction.target) == 2:  # Two-qubit gate
                qubit_pair = tuple(instruction.target)
                if qubit_pair not in connectivity_graph.get('fullyConnected', []):
                    compatibility['warnings'].append(f"Two-qubit gate on {qubit_pair} may require SWAP operations")
    
    return compatibility

Quantum Task Management

Task Execution and Monitoring

from braket.aws import AwsQuantumTask, AwsQuantumTaskBatch
from braket.tasks import QuantumTask

class AwsQuantumTask(QuantumTask):
    """AWS quantum task execution interface."""
    
    def __init__(self, arn: str, aws_session: 'AwsSession' = None):
        """
        Initialize AWS quantum task.
        
        Args:
            arn: Task ARN
            aws_session: AWS session for API calls
        """
        self.arn = arn
        self.aws_session = aws_session
    
    def result(self) -> 'TaskResult':
        """
        Get task execution results.
        
        Returns:
            TaskResult: Quantum task results
        """
        pass
    
    def state(self) -> str:
        """
        Get current task state.
        
        Returns:
            str: Task state (QUEUED, RUNNING, COMPLETED, FAILED, CANCELLED)
        """
        pass
    
    def cancel(self) -> None:
        """Cancel running task."""
        pass
    
    def metadata(self) -> dict:
        """
        Get task metadata.
        
        Returns:
            dict: Task execution metadata
        """
        pass
    
    @property
    def id(self) -> str:
        """Task ID."""
        pass
    
    @staticmethod
    def get(arn: str, aws_session: 'AwsSession' = None) -> 'AwsQuantumTask':
        """
        Get existing task by ARN.
        
        Args:
            arn: Task ARN
            aws_session: AWS session
            
        Returns:
            AwsQuantumTask: Task instance
        """
        pass

class AwsQuantumTaskBatch:
    """Batch execution of quantum tasks."""
    
    def __init__(self, tasks: list[AwsQuantumTask]):
        """
        Initialize task batch.
        
        Args:
            tasks: List of quantum tasks
        """
        self.tasks = tasks
    
    def results(self) -> list:
        """
        Get results from all tasks in batch.
        
        Returns:
            list: Results from completed tasks
        """
        pass
    
    def unsuccessful(self) -> list[AwsQuantumTask]:
        """
        Get unsuccessful tasks.
        
        Returns:
            list[AwsQuantumTask]: Failed or cancelled tasks
        """
        pass
    
    def retry_unsuccessful_tasks(self) -> 'AwsQuantumTaskBatch':
        """
        Retry failed tasks.
        
        Returns:
            AwsQuantumTaskBatch: New batch with retried tasks
        """
        pass

# Task execution examples
def execute_quantum_task_with_monitoring(circuit, device: AwsDevice) -> dict:
    """
    Execute quantum task with comprehensive monitoring.
    
    Args:
        circuit: Quantum circuit to execute
        device: Target AWS device
        
    Returns:
        dict: Task results and execution information
    """
    import time
    
    # Submit task
    task = device.run(circuit, shots=1000)
    
    execution_info = {
        'task_arn': task.arn,
        'submission_time': time.time(),
        'device_arn': device.arn,
        'status_history': []
    }
    
    # Monitor execution
    while task.state() not in ['COMPLETED', 'FAILED', 'CANCELLED']:
        status = task.state()
        execution_info['status_history'].append({
            'status': status,
            'timestamp': time.time()
        })
        
        print(f"Task status: {status}")
        time.sleep(5)  # Check every 5 seconds
    
    # Get final results
    if task.state() == 'COMPLETED':
        result = task.result()
        execution_info['completion_time'] = time.time()
        execution_info['results'] = {
            'measurement_counts': result.measurement_counts,
            'task_metadata': result.task_metadata
        }
    else:
        execution_info['error_info'] = task.metadata()
    
    return execution_info

def execute_parameter_sweep(circuit_template, parameter_ranges: dict, device: AwsDevice) -> dict:
    """
    Execute parameter sweep across quantum circuit parameters.
    
    Args:
        circuit_template: Parameterized circuit template
        parameter_ranges: Parameter ranges to sweep
        device: Target AWS device
        
    Returns:
        dict: Parameter sweep results
    """
    import itertools
    from braket.parametric import FreeParameter
    
    # Generate parameter combinations
    param_names = list(parameter_ranges.keys())
    param_values = list(parameter_ranges.values())
    param_combinations = list(itertools.product(*param_values))
    
    # Create circuits for each parameter combination
    circuits = []
    for combination in param_combinations:
        param_dict = dict(zip(param_names, combination))
        circuit = circuit_template.copy()
        
        # Bind parameters
        for param_name, value in param_dict.items():
            circuit = circuit.substitute_free_parameter(FreeParameter(param_name), value)
        
        circuits.append((circuit, param_dict))
    
    # Execute batch
    task_specs = [circuit for circuit, _ in circuits]
    batch = device.run_batch(task_specs, shots=1000)
    
    # Collect results
    results = batch.results()
    sweep_results = {
        'parameter_combinations': [params for _, params in circuits],
        'results': results,
        'analysis': {}
    }
    
    # Analyze results
    for i, (result, params) in enumerate(zip(results, sweep_results['parameter_combinations'])):
        sweep_results['analysis'][str(params)] = {
            'measurement_counts': result.measurement_counts,
            'expectation_values': getattr(result, 'values', [])
        }
    
    return sweep_results

AWS Session Management

Authentication and Configuration

from braket.aws import AwsSession
import boto3

class AwsSession:
    """AWS authentication and session management."""
    
    def __init__(
        self,
        boto_session: boto3.Session = None,
        braket_user_agents: list[str] = None,
        config: dict = None
    ):
        """
        Initialize AWS session for Braket.
        
        Args:
            boto_session: Pre-configured boto3 session
            braket_user_agents: Custom user agent strings
            config: Session configuration options
        """
        self.boto_session = boto_session
        self.braket_user_agents = braket_user_agents
        self.config = config or {}
    
    @property
    def region(self) -> str:
        """Current AWS region."""
        pass
    
    @property
    def account_id(self) -> str:
        """AWS account ID."""
        pass
    
    def get_device(self, arn: str) -> AwsDevice:
        """
        Get device using this session.
        
        Args:
            arn: Device ARN
            
        Returns:
            AwsDevice: Device with this session
        """
        pass

def create_aws_session_with_profile(profile_name: str, region: str = 'us-east-1') -> AwsSession:
    """
    Create AWS session using named profile.
    
    Args:
        profile_name: AWS credential profile name
        region: AWS region for Braket services
        
    Returns:
        AwsSession: Configured session
    """
    boto_session = boto3.Session(profile_name=profile_name, region_name=region)
    return AwsSession(boto_session=boto_session)

def create_aws_session_with_keys(access_key: str, secret_key: str, region: str = 'us-east-1') -> AwsSession:
    """
    Create AWS session using access keys.
    
    Args:
        access_key: AWS access key ID
        secret_key: AWS secret access key  
        region: AWS region for Braket services
        
    Returns:
        AwsSession: Configured session
    """
    boto_session = boto3.Session(
        aws_access_key_id=access_key,
        aws_secret_access_key=secret_key,
        region_name=region
    )
    return AwsSession(boto_session=boto_session)

def validate_aws_session(session: AwsSession) -> dict:
    """
    Validate AWS session configuration and permissions.
    
    Args:
        session: AWS session to validate
        
    Returns:
        dict: Validation results and recommendations
    """
    validation = {
        'is_valid': True,
        'region': session.region,
        'account_id': session.account_id,
        'permissions': {},
        'recommendations': []
    }
    
    try:
        # Test basic Braket permissions
        devices = AwsDevice.get_devices(aws_session=session)
        validation['permissions']['device_access'] = True
        validation['permissions']['accessible_devices'] = len(devices)
        
    except Exception as e:
        validation['is_valid'] = False
        validation['permissions']['device_access'] = False
        validation['error'] = str(e)
        validation['recommendations'].append("Check Braket service permissions")
    
    # Check for cost optimization
    if validation['region'] != 'us-east-1':
        validation['recommendations'].append("Consider us-east-1 for lowest latency to Braket services")
    
    return validation

Device Reservations

Direct Reservations

from braket.aws import DirectReservation

class DirectReservation:
    """Context manager for using reserved quantum devices on AWS Braket."""
    
    def __init__(
        self,
        device,
        reservation_arn: str = None
    ):
        """
        Initialize direct reservation context.
        
        Args:
            device: Braket device or device ARN with reservation
            reservation_arn: AWS Braket Direct reservation ARN
        """
        self.device = device
        self.reservation_arn = reservation_arn
    
    def __enter__(self) -> 'DirectReservation':
        """
        Enter reservation context and apply reservation to quantum tasks.
        
        Returns:
            DirectReservation: Self for context management
        """
        pass
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Exit reservation context and restore normal device access."""
        pass
    
    def start(self) -> 'DirectReservation':
        """
        Start reservation without context manager.
        
        Returns:
            DirectReservation: Self for method chaining
        """
        pass
    
    def stop(self) -> 'DirectReservation':
        """
        Stop reservation manually.
        
        Returns:
            DirectReservation: Self for method chaining
        """
        pass

# Reservation management examples
def use_device_reservation(device_arn: str, reservation_arn: str, circuits: list) -> dict:
    """
    Execute circuits using a reserved quantum device.
    
    Args:
        device_arn: AWS device ARN with reservation
        reservation_arn: Reservation ARN for exclusive access
        circuits: List of quantum circuits to execute
        
    Returns:
        dict: Execution results with reservation metadata
    """
    from braket.aws import AwsDevice
    
    device = AwsDevice(device_arn)
    reservation_results = {
        'reservation_arn': reservation_arn,
        'device_arn': device_arn,
        'circuits_executed': len(circuits),
        'results': [],
        'cost_analysis': {},
        'reservation_utilization': {}
    }
    
    # Execute circuits within reservation context
    with DirectReservation(device=device, reservation_arn=reservation_arn):
        for i, circuit in enumerate(circuits):
            task = device.run(circuit, shots=1000)
            result = task.result()
            
            reservation_results['results'].append({
                'circuit_index': i,
                'task_arn': task.arn,
                'measurement_counts': result.measurement_counts,
                'execution_duration': result.task_metadata.get('executionDuration', 'unknown')
            })
    
    # Analyze reservation utilization
    reservation_results['reservation_utilization'] = {
        'total_tasks': len(circuits),
        'estimated_time_savings': 'No queue wait time',
        'dedicated_access': True,
        'billing_model': 'Reservation-based'
    }
    
    return reservation_results

def manage_reservation_lifecycle(device_arn: str) -> dict:
    """
    Demonstrate reservation lifecycle management.
    
    Args:
        device_arn: AWS device ARN for reservation
        
    Returns:
        dict: Reservation management workflow
    """
    reservation_workflow = {
        'phases': [
            'reservation_planning',
            'reservation_creation', 
            'reservation_usage',
            'reservation_monitoring',
            'reservation_completion'
        ],
        'implementation_guide': {}
    }
    
    # Reservation planning phase
    planning_recommendations = {
        'optimal_duration': 'Based on workload analysis',
        'cost_estimation': {
            'reserved_vs_on_demand': 'Calculate savings for extended usage',
            'minimum_duration': '1 hour minimum for most devices',
            'cancellation_policy': 'Check device-specific policies'  
        },
        'scheduling_considerations': [
            'Device availability windows',
            'Time zone coordination for team access',
            'Buffer time for job completion'
        ]
    }
    
    # Usage optimization
    usage_optimization = {
        'batch_execution': 'Group circuits for efficient execution',
        'parallel_jobs': 'Use multiple job slots if available',
        'monitoring_tools': 'Track utilization during reservation',
        'fallback_strategy': 'Plan for unexpected issues'
    }
    
    reservation_workflow['implementation_guide'] = {
        'planning': planning_recommendations,
        'optimization': usage_optimization,
        'monitoring': {
            'key_metrics': ['utilization_rate', 'cost_per_shot', 'queue_time_saved'],
            'alerting': 'Set up notifications for underutilization'
        }
    }
    
    return reservation_workflow

def calculate_reservation_cost_benefit(usage_pattern: dict, device_pricing: dict) -> dict:
    """
    Calculate cost benefits of device reservations vs on-demand usage.
    
    Args:
        usage_pattern: Expected usage patterns
        device_pricing: Device pricing information
        
    Returns:
        dict: Cost-benefit analysis for reservations
    """
    analysis = {
        'usage_input': usage_pattern,
        'pricing_input': device_pricing,
        'cost_comparison': {},
        'recommendation': {}
    }
    
    # Calculate on-demand costs
    shots_per_month = usage_pattern.get('shots_per_month', 100000)
    on_demand_rate = device_pricing.get('per_shot', 0.00075)  # Example pricing
    
    on_demand_monthly_cost = shots_per_month * on_demand_rate
    
    # Calculate reservation costs (example)
    reservation_hourly_rate = device_pricing.get('reservation_hourly', 1.50)
    hours_per_month = usage_pattern.get('hours_per_month', 40)
    
    reservation_monthly_cost = hours_per_month * reservation_hourly_rate
    
    # Cost comparison
    monthly_savings = on_demand_monthly_cost - reservation_monthly_cost
    break_even_shots = reservation_monthly_cost / on_demand_rate
    
    analysis['cost_comparison'] = {
        'on_demand_monthly': on_demand_monthly_cost,
        'reservation_monthly': reservation_monthly_cost,
        'monthly_savings': monthly_savings,
        'savings_percentage': (monthly_savings / on_demand_monthly_cost) * 100 if on_demand_monthly_cost > 0 else 0,
        'break_even_shots': break_even_shots
    }
    
    # Recommendation
    if monthly_savings > 0:
        analysis['recommendation'] = {
            'use_reservations': True,
            'reason': f'Save ${monthly_savings:.2f} per month ({analysis["cost_comparison"]["savings_percentage"]:.1f}%)',
            'optimal_reservation_hours': hours_per_month
        }
    else:
        analysis['recommendation'] = {
            'use_reservations': False,
            'reason': 'On-demand pricing more cost-effective for current usage',
            'threshold_usage': f'Consider reservations above {break_even_shots:.0f} shots/month'
        }
    
    return analysis

Hybrid Quantum Jobs

Job Definition and Execution

from braket.jobs import hybrid_job, InstanceConfig, CheckpointConfig, OutputDataConfig

@hybrid_job(device="ml.m5.large")
def basic_hybrid_algorithm():
    """
    Basic hybrid quantum-classical algorithm.
    
    This job demonstrates the structure of hybrid algorithms
    combining quantum circuit execution with classical optimization.
    """
    from braket.aws import AwsDevice
    from braket.jobs import get_job_device_arn, save_job_result
    import numpy as np
    
    # Get quantum device for this job
    device_arn = get_job_device_arn()
    device = AwsDevice(device_arn)
    
    def quantum_cost_function(params: np.ndarray) -> float:
        """Quantum cost function using variational circuit."""
        from braket.circuits import Circuit
        from braket.circuits.gates import Ry, CNot
        from braket.circuits.observables import Z
        
        circuit = Circuit()
        for i, param in enumerate(params):
            circuit.ry(i, param)
        
        circuit.cnot(0, 1)
        circuit.expectation(observable=Z() @ Z(), target=[0, 1])
        
        task = device.run(circuit, shots=1000)
        result = task.result()
        return result.values[0]
    
    # Classical optimization
    from scipy.optimize import minimize
    initial_params = np.random.random(2) * np.pi
    
    result = minimize(
        quantum_cost_function,
        initial_params,
        method='COBYLA',
        options={'maxiter': 50}
    )
    
    # Save results
    save_job_result({
        'optimal_parameters': result.x.tolist(),
        'optimal_value': float(result.fun),
        'optimization_success': result.success
    })

class InstanceConfig:
    """Configuration for job compute instance."""
    
    def __init__(
        self, 
        instance_type: str, 
        instance_count: int = 1,
        volume_size_gb: int = 30
    ):
        """
        Initialize instance configuration.
        
        Args:
            instance_type: EC2 instance type (e.g., 'ml.m5.large')
            instance_count: Number of instances
            volume_size_gb: EBS volume size in GB
        """
        self.instance_type = instance_type
        self.instance_count = instance_count
        self.volume_size_gb = volume_size_gb

class CheckpointConfig:
    """Configuration for job checkpointing."""
    
    def __init__(
        self,
        s3_uri: str,
        local_path: str = "/opt/braket/checkpoints"
    ):
        """
        Initialize checkpoint configuration.
        
        Args:
            s3_uri: S3 URI for checkpoint storage
            local_path: Local checkpoint directory
        """
        self.s3_uri = s3_uri
        self.local_path = local_path

class OutputDataConfig:
    """Configuration for job output data."""
    
    def __init__(
        self,
        s3_path: str,
        kms_key_id: str = None
    ):
        """
        Initialize output data configuration.
        
        Args:
            s3_path: S3 path for output data
            kms_key_id: KMS key for encryption
        """
        self.s3_path = s3_path
        self.kms_key_id = kms_key_id

# Advanced job configuration
@hybrid_job(
    device="ml.p3.2xlarge",  # GPU instance for ML
    instance_config=InstanceConfig("ml.p3.2xlarge", instance_count=1),
    checkpoint_config=CheckpointConfig("s3://my-bucket/checkpoints/"),
    output_data_config=OutputDataConfig("s3://my-bucket/results/")
)
def advanced_variational_algorithm():
    """
    Advanced hybrid algorithm with ML and checkpointing.
    
    Demonstrates quantum machine learning with classical neural networks
    and comprehensive checkpoint management.
    """
    import torch
    import torch.nn as nn
    from braket.jobs import (
        save_job_checkpoint, load_job_checkpoint,
        get_hyperparameters, save_job_result
    )
    
    # Get job hyperparameters
    hyperparams = get_hyperparameters()
    learning_rate = hyperparams.get('learning_rate', 0.01)
    epochs = hyperparams.get('epochs', 100)
    
    # Define hybrid model
    class HybridQuantumClassicalModel(nn.Module):
        def __init__(self):
            super().__init__()
            self.classical_layers = nn.Sequential(
                nn.Linear(4, 16),
                nn.ReLU(),
                nn.Linear(16, 2)  # 2 quantum parameters
            )
            
        def quantum_layer(self, params):
            """Quantum variational layer."""
            from braket.circuits import Circuit
            from braket.circuits.gates import Ry, CNot
            from braket.circuits.observables import Z
            
            circuit = Circuit()
            circuit.ry(0, params[0])
            circuit.ry(1, params[1])
            circuit.cnot(0, 1)
            circuit.expectation(observable=Z() @ Z(), target=[0, 1])
            
            device_arn = get_job_device_arn()
            device = AwsDevice(device_arn)
            task = device.run(circuit, shots=1000)
            return task.result().values[0]
            
        def forward(self, x):
            classical_out = self.classical_layers(x)
            quantum_out = self.quantum_layer(classical_out)
            return quantum_out
    
    # Training loop with checkpointing
    model = HybridQuantumClassicalModel()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    
    # Load checkpoint if exists
    try:
        checkpoint = load_job_checkpoint()
        model.load_state_dict(checkpoint['model_state'])
        optimizer.load_state_dict(checkpoint['optimizer_state'])
        start_epoch = checkpoint['epoch']
    except:
        start_epoch = 0
    
    # Training loop (simplified)
    for epoch in range(start_epoch, epochs):
        # Training step would go here
        loss = torch.randn(1)  # Placeholder
        
        # Save checkpoint every 10 epochs
        if epoch % 10 == 0:
            save_job_checkpoint({
                'model_state': model.state_dict(),
                'optimizer_state': optimizer.state_dict(),
                'epoch': epoch,
                'loss': float(loss)
            })
    
    # Save final results
    save_job_result({
        'final_model_state': model.state_dict(),
        'training_loss': float(loss),
        'total_epochs': epochs
    })

Job Environment and Utilities

from braket.jobs import (
    get_checkpoint_dir, get_hyperparameters, get_input_data_dir,
    get_job_device_arn, get_job_name, get_results_dir
)

def get_job_device_arn() -> str:
    """
    Get quantum device ARN for current job.
    
    Returns:
        str: Device ARN assigned to job
    """
    pass

def get_job_name() -> str:
    """
    Get current job name.
    
    Returns:
        str: Job name
    """
    pass

def get_hyperparameters() -> dict:
    """
    Get job hyperparameters.
    
    Returns:
        dict: Hyperparameter dictionary
    """
    pass

def get_input_data_dir(channel: str = "training") -> str:
    """
    Get input data directory path.
    
    Args:
        channel: Data channel name
        
    Returns:
        str: Input data directory path
    """
    pass

def get_results_dir() -> str:
    """
    Get results output directory path.
    
    Returns:
        str: Results directory path
    """
    pass

def get_checkpoint_dir() -> str:
    """
    Get checkpoint directory path.
    
    Returns:
        str: Checkpoint directory path
    """
    pass

def save_job_checkpoint(checkpoint_data: dict) -> None:
    """
    Save job checkpoint data.
    
    Args:
        checkpoint_data: Data to checkpoint
    """
    pass

def load_job_checkpoint() -> dict:
    """
    Load job checkpoint data.
    
    Returns:
        dict: Loaded checkpoint data
    """
    pass

def save_job_result(result_data: dict) -> None:
    """
    Save job result data.
    
    Args:
        result_data: Results to save
    """
    pass

def load_job_result(job_arn: str) -> dict:
    """
    Load results from completed job.
    
    Args:
        job_arn: Job ARN
        
    Returns:
        dict: Job results
    """
    pass

# Job utilities example
def comprehensive_job_setup() -> dict:
    """
    Setup comprehensive job environment and configuration.
    
    Returns:
        dict: Job environment information
    """
    job_info = {
        'job_name': get_job_name(),
        'device_arn': get_job_device_arn(),
        'hyperparameters': get_hyperparameters(),
        'directories': {
            'input_data': get_input_data_dir(),
            'results': get_results_dir(),
            'checkpoints': get_checkpoint_dir()
        }
    }
    
    # Setup logging
    import logging
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(f"{get_results_dir()}/job.log"),
            logging.StreamHandler()
        ]
    )
    
    logger = logging.getLogger('QuantumJob')
    logger.info(f"Starting job: {job_info['job_name']}")
    logger.info(f"Using device: {job_info['device_arn']}")
    
    return job_info

Cost and Usage Tracking

Resource Tracking

from braket.tracking import Tracker

class Tracker:
    """Tracks quantum computing costs and resource usage."""
    
    def __init__(self):
        """Initialize cost tracker."""
        pass
    
    def __enter__(self) -> 'Tracker':
        """Context manager entry."""
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Context manager exit."""
        pass
    
    def quantum_tasks_cost(self) -> float:
        """
        Get total cost for quantum tasks.
        
        Returns:
            float: Total cost in USD
        """
        pass
    
    def simulator_tasks_cost(self) -> float:
        """
        Get total cost for simulator tasks.
        
        Returns:
            float: Total cost in USD
        """
        pass
    
    def quantum_task_statistics(self) -> dict:
        """
        Get detailed quantum task statistics.
        
        Returns:
            dict: Task execution statistics
        """
        pass
    
    def simulator_task_statistics(self) -> dict:
        """
        Get detailed simulator task statistics.
        
        Returns:
            dict: Simulator execution statistics
        """
        pass

def track_quantum_experiment_costs(experiment_function: callable, *args, **kwargs) -> dict:
    """
    Track costs for quantum experiment execution.
    
    Args:
        experiment_function: Function that runs quantum experiments
        *args: Arguments for experiment function
        **kwargs: Keyword arguments for experiment function
        
    Returns:
        dict: Cost analysis and experiment results
    """
    with Tracker() as tracker:
        # Run experiment
        results = experiment_function(*args, **kwargs)
        
        # Collect cost information
        cost_analysis = {
            'total_cost': tracker.quantum_tasks_cost() + tracker.simulator_tasks_cost(),
            'quantum_cost': tracker.quantum_tasks_cost(),
            'simulator_cost': tracker.simulator_tasks_cost(),
            'quantum_stats': tracker.quantum_task_statistics(),
            'simulator_stats': tracker.simulator_task_statistics(),
            'cost_per_shot': 0.0,
            'results': results
        }
        
        # Calculate cost per shot
        total_shots = (
            cost_analysis['quantum_stats'].get('shots', 0) +
            cost_analysis['simulator_stats'].get('shots', 0)
        )
        
        if total_shots > 0:
            cost_analysis['cost_per_shot'] = cost_analysis['total_cost'] / total_shots
        
    return cost_analysis

def optimize_experiment_for_cost(circuits: list, target_accuracy: float = 0.01) -> dict:
    """
    Optimize experiment execution for cost while maintaining accuracy.
    
    Args:
        circuits: List of quantum circuits to execute
        target_accuracy: Target measurement accuracy
        
    Returns:
        dict: Optimized execution plan with cost estimates
    """
    optimization_plan = {
        'execution_strategy': {},
        'estimated_costs': {},
        'recommendations': []
    }
    
    # Analyze circuits for optimization opportunities
    for i, circuit in enumerate(circuits):
        circuit_analysis = {
            'qubit_count': circuit.qubit_count,
            'depth': circuit.depth,
            'gate_count': len(circuit.instructions)
        }
        
        # Recommend device based on circuit characteristics
        if circuit.qubit_count <= 10 and circuit.depth <= 20:
            # Small circuit - use simulator
            recommended_device = "Local Simulator"
            estimated_cost = 0.0
            optimization_plan['recommendations'].append(
                f"Circuit {i}: Use local simulator (free) - small circuit"
            )
        elif circuit.qubit_count <= 20:
            # Medium circuit - use cloud simulator
            recommended_device = "AWS SV1"
            estimated_cost = 0.075 * (2 ** min(circuit.qubit_count, 17))
            optimization_plan['recommendations'].append(
                f"Circuit {i}: Use SV1 simulator - cost effective for {circuit.qubit_count} qubits"
            )
        else:
            # Large circuit - may need QPU or advanced simulator
            recommended_device = "AWS QPU or TN1"
            estimated_cost = 0.30 + (1000 * 0.0003)  # Task fee + shots
            optimization_plan['recommendations'].append(
                f"Circuit {i}: Consider QPU for {circuit.qubit_count} qubits"
            )
        
        optimization_plan['execution_strategy'][f'circuit_{i}'] = {
            'recommended_device': recommended_device,
            'estimated_cost': estimated_cost,
            'analysis': circuit_analysis
        }
    
    # Calculate total estimated cost
    total_cost = sum(
        strategy['estimated_cost'] 
        for strategy in optimization_plan['execution_strategy'].values()
    )
    optimization_plan['estimated_costs']['total'] = total_cost
    
    return optimization_plan

Device Reservations

Direct Device Access

from braket.aws import DirectReservation

class DirectReservation:
    """Direct quantum device reservation interface."""
    
    def __init__(self, arn: str, aws_session: 'AwsSession' = None):
        """
        Initialize device reservation.
        
        Args:
            arn: Reservation ARN
            aws_session: AWS session for API calls
        """
        self.arn = arn
        self.aws_session = aws_session
    
    @property
    def device_arn(self) -> str:
        """Reserved device ARN."""
        pass
    
    @property
    def start_time(self) -> str:
        """Reservation start time."""
        pass
    
    @property
    def end_time(self) -> str:
        """Reservation end time."""
        pass
    
    @property
    def status(self) -> str:
        """Reservation status."""
        pass
    
    def cancel(self) -> None:
        """Cancel reservation."""
        pass

def create_device_reservation(device_arn: str, start_time: str, duration_minutes: int) -> DirectReservation:
    """
    Create direct device reservation.
    
    Args:
        device_arn: Target device ARN
        start_time: ISO format start time
        duration_minutes: Reservation duration in minutes
        
    Returns:
        DirectReservation: Device reservation handle
    """
    # This is a placeholder - actual implementation would use AWS API
    pass

def execute_with_reservation(reservation: DirectReservation, circuits: list) -> dict:
    """
    Execute circuits using device reservation.
    
    Args:
        reservation: Active device reservation
        circuits: Circuits to execute during reservation
        
    Returns:
        dict: Execution results and reservation utilization
    """
    device = AwsDevice(reservation.device_arn)
    
    results = []
    for circuit in circuits:
        task = device.run(circuit, shots=1000, reservation_arn=reservation.arn)
        results.append(task.result())
    
    return {
        'results': results,
        'reservation_info': {
            'device_arn': reservation.device_arn,
            'start_time': reservation.start_time,
            'end_time': reservation.end_time,
            'utilization': len(circuits)
        }
    }

This comprehensive AWS integration documentation covers all aspects of cloud quantum computing with Amazon Braket, including device management, task execution, job orchestration, cost optimization, and resource tracking.

Install with Tessl CLI

npx tessl i tessl/pypi-amazon-braket-sdk

docs

advanced-features.md

aws-integration.md

circuits.md

devices-simulation.md

index.md

quantum-information.md

tile.json