or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/python-libmaas@0.6.x

docs

examples

edge-cases.mdreal-world-scenarios.md
index.md
tile.json

tessl/pypi-python-libmaas

tessl install tessl/pypi-python-libmaas@0.6.0

Python client library for MAAS 2.0+ with sync/async support, providing machine provisioning, network management, and storage configuration.

edge-cases.mddocs/examples/

Edge Cases & Advanced Usage

Advanced scenarios, edge cases, and troubleshooting for python-libmaas.

Edge Case: Handling Transient Failures

Implement retry logic for transient failures.

from maas.client import connect
from maas.client.errors import MAASException, PowerError
import time

client = connect('http://maas.example.com:5240/MAAS/', apikey='key')

def retry_operation(func, max_attempts=3, delay=5):
    """Retry operation with exponential backoff."""
    for attempt in range(max_attempts):
        try:
            return func()
        except (MAASException, PowerError) as e:
            if attempt < max_attempts - 1:
                wait_time = delay * (2 ** attempt)
                print(f"Attempt {attempt + 1} failed: {e}")
                print(f"Retrying in {wait_time}s...")
                time.sleep(wait_time)
            else:
                print(f"All {max_attempts} attempts failed")
                raise

# Usage
def power_on_with_retry(machine):
    return retry_operation(
        lambda: machine.power_on(wait=True),
        max_attempts=3,
        delay=5
    )

machine = client.machines.get('system_id')
power_on_with_retry(machine)

Edge Case: Partial Deployment Failures

Handle scenarios where deployment partially succeeds.

from maas.client import connect
from maas.client.enum import NodeStatus
from maas.client.viscera.machines import FailedDeployment

client = connect('http://maas.example.com:5240/MAAS/', apikey='key')

def deploy_with_recovery(machine, max_retries=2):
    """Deploy with automatic recovery on failure."""
    for attempt in range(max_retries + 1):
        try:
            print(f"Deployment attempt {attempt + 1}...")
            machine.deploy(distro_series='jammy', wait=True)
            print(f"✓ Deployment successful")
            return True
            
        except FailedDeployment as e:
            print(f"✗ Deployment failed: {e}")
            
            # Check if machine is in failed state
            machine.refresh()
            if machine.status in [NodeStatus.FAILED_DEPLOYMENT, NodeStatus.BROKEN]:
                print(f"  Machine status: {machine.status}")
                
                if attempt < max_retries:
                    print(f"  Attempting recovery...")
                    
                    # Mark as fixed if broken
                    if machine.status == NodeStatus.BROKEN:
                        machine.mark_fixed(comment='Auto-recovery attempt')
                    
                    # Release and try again
                    machine.release(quick_erase=True, wait=True)
                    print(f"  Released machine, retrying...")
                else:
                    print(f"  Max retries reached, giving up")
                    raise
    
    return False

machine = client.machines.allocate(cpu_count=4)
deploy_with_recovery(machine)

Edge Case: Network Configuration Conflicts

Handle IP address conflicts and network configuration issues.

from maas.client import connect
from maas.client.enum import LinkMode
from maas.client.errors import MAASException

client = connect('http://maas.example.com:5240/MAAS/', apikey='key')

def configure_ip_with_fallback(interface, subnet, preferred_ip, ip_range):
    """Configure IP with fallback to available addresses."""
    
    # Try preferred IP first
    try:
        link = interface.links.create(
            interface,
            LinkMode.STATIC,
            subnet=subnet,
            ip_address=preferred_ip
        )
        print(f"✓ Configured preferred IP: {preferred_ip}")
        return link
    except MAASException as e:
        print(f"✗ Preferred IP unavailable: {e}")
    
    # Try fallback IPs
    for ip in ip_range:
        try:
            link = interface.links.create(
                interface,
                LinkMode.STATIC,
                subnet=subnet,
                ip_address=ip
            )
            print(f"✓ Configured fallback IP: {ip}")
            return link
        except MAASException:
            continue
    
    # Fall back to AUTO mode
    print("⚠ No static IPs available, using AUTO mode")
    link = interface.links.create(
        interface,
        LinkMode.AUTO,
        subnet=subnet
    )
    return link

# Usage
machine = client.machines.get('system_id')
interface = machine.interfaces.get_by_name('eth0')
subnet = client.subnets.get(cidr='10.0.0.0/24')

# Try IPs in range
ip_range = [f'10.0.0.{i}' for i in range(100, 110)]
link = configure_ip_with_fallback(interface, subnet, '10.0.0.100', ip_range)

Edge Case: Storage Device Detection Issues

Handle scenarios where expected storage devices aren't found.

from maas.client import connect

client = connect('http://maas.example.com:5240/MAAS/', apikey='key')

def find_storage_device(machine, preferred_names, min_size_gb=100):
    """Find storage device with fallback options."""
    
    # Try preferred device names
    for name in preferred_names:
        try:
            device = machine.block_devices.get_by_name(name)
            size_gb = device.size / (1024**3)
            
            if size_gb >= min_size_gb:
                print(f"✓ Found {name}: {size_gb:.2f} GB")
                return device
            else:
                print(f"⚠ {name} too small: {size_gb:.2f} GB < {min_size_gb} GB")
        except KeyError:
            print(f"✗ Device {name} not found")
    
    # Find any suitable device
    print(f"Searching for any device >= {min_size_gb} GB...")
    for device in machine.block_devices:
        size_gb = device.size / (1024**3)
        if size_gb >= min_size_gb and device.type == 'physical':
            print(f"✓ Found alternative: {device.name} ({size_gb:.2f} GB)")
            return device
    
    raise ValueError(f"No suitable storage device found (>= {min_size_gb} GB)")

# Usage
machine = client.machines.get('system_id')
device = find_storage_device(
    machine,
    preferred_names=['nvme0n1', 'sda', 'vda'],
    min_size_gb=100
)

Edge Case: Race Conditions in Allocation

Handle concurrent allocation attempts.

from maas.client import connect
from maas.client.viscera.machines import MachineNotFound
import time

client = connect('http://maas.example.com:5240/MAAS/', apikey='key')

def allocate_with_retry(requirements, max_attempts=5, delay=2):
    """Allocate machine with retry on race conditions."""
    
    for attempt in range(max_attempts):
        try:
            machine = client.machines.allocate(**requirements)
            print(f"✓ Allocated: {machine.hostname}")
            return machine
            
        except MachineNotFound as e:
            if attempt < max_attempts - 1:
                print(f"⚠ No machine available (attempt {attempt + 1})")
                print(f"  Waiting {delay}s for machines to become available...")
                time.sleep(delay)
            else:
                print(f"✗ No machines available after {max_attempts} attempts")
                raise
    
    return None

# Usage
machine = allocate_with_retry({
    'cpu_count': 8,
    'mem': 16384,
    'tags': ['ssd']
}, max_attempts=5)

Edge Case: Async Context Management

Properly handle async resources and cleanup.

import asyncio
from maas.client import connect

class MachinePool:
    """Async context manager for machine pool."""
    
    def __init__(self, url, apikey):
        self.url = url
        self.apikey = apikey
        self.client = None
        self.allocated_machines = []
    
    async def __aenter__(self):
        self.client = await connect(self.url, apikey=self.apikey)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # Clean up allocated machines
        print(f"Cleaning up {len(self.allocated_machines)} machines...")
        for machine in self.allocated_machines:
            try:
                await machine.release(quick_erase=True)
                print(f"  ✓ Released: {machine.hostname}")
            except Exception as e:
                print(f"  ✗ Failed to release {machine.hostname}: {e}")
    
    async def allocate(self, **requirements):
        """Allocate machine and track for cleanup."""
        machine = await self.client.machines.allocate(**requirements)
        self.allocated_machines.append(machine)
        return machine

# Usage
async def main():
    async with MachinePool('http://maas.example.com:5240/MAAS/', 'key') as pool:
        # Allocate machines
        web = await pool.allocate(cpu_count=4, mem=8192)
        db = await pool.allocate(cpu_count=8, mem=32768)
        
        # Use machines
        await web.deploy(distro_series='jammy')
        await db.deploy(distro_series='jammy')
        
        # Machines automatically released on exit

asyncio.run(main())

Edge Case: Complex Storage Layouts

Handle complex storage scenarios with multiple device types.

from maas.client import connect
from maas.client.enum import RaidLevel, CacheMode

client = connect('http://maas.example.com:5240/MAAS/', apikey='key')

def configure_tiered_storage(machine):
    """Configure multi-tier storage: NVMe + RAID + Bcache."""
    
    # Identify devices by type
    nvme_devices = [d for d in machine.block_devices if 'nvme' in d.name]
    ssd_devices = [d for d in machine.block_devices if 'ssd' in d.tags]
    hdd_devices = [d for d in machine.block_devices if d.type == 'physical' 
                   and 'nvme' not in d.name and 'ssd' not in d.tags]
    
    print(f"Found: {len(nvme_devices)} NVMe, {len(ssd_devices)} SSD, {len(hdd_devices)} HDD")
    
    # Tier 1: NVMe for OS and hot data
    if nvme_devices:
        nvme = nvme_devices[0]
        
        # Partition NVMe
        os_part = nvme.partitions.create(nvme, size=50 * 1024**3)  # 50GB OS
        os_part.format('ext4')
        os_part.mount('/')
        
        hot_part = nvme.partitions.create(nvme, size=nvme.available_size)
        hot_part.format('xfs')
        hot_part.mount('/hot')
        
        print(f"✓ Tier 1 (NVMe): OS + hot data on {nvme.name}")
    
    # Tier 2: SSD RAID for warm data
    if len(ssd_devices) >= 2:
        raid = machine.raids.create(
            machine,
            RaidLevel.RAID_1,
            devices=ssd_devices[:2],
            name='md-ssd'
        )
        
        raid.virtual_device.format('xfs')
        raid.virtual_device.mount('/warm')
        
        print(f"✓ Tier 2 (SSD RAID): warm data on {len(ssd_devices[:2])} SSDs")
    
    # Tier 3: Bcache for cold data (HDD + SSD cache)
    if hdd_devices and len(ssd_devices) > 2:
        # Use remaining SSD for cache
        cache_ssd = ssd_devices[2] if len(ssd_devices) > 2 else ssd_devices[0]
        cache_set = machine.bcache_cache_sets.create(machine, cache_ssd)
        
        for i, hdd in enumerate(hdd_devices):
            bcache = machine.bcaches.create(
                machine,
                name=f'bcache{i}',
                backing_device=hdd,
                cache_set=cache_set,
                cache_mode=CacheMode.WRITETHROUGH
            )
            
            bcache.virtual_device.format('xfs')
            bcache.virtual_device.mount(f'/cold{i}')
        
        print(f"✓ Tier 3 (Bcache): cold data on {len(hdd_devices)} HDDs with SSD cache")

# Usage
machine = client.machines.allocate(tags=['storage'])
configure_tiered_storage(machine)
machine.deploy(distro_series='jammy', wait=True)

Troubleshooting: Connection Issues

Debug connection problems systematically.

from maas.client import connect
from maas.client.bones.helpers import ConnectError, RemoteError
import socket

def diagnose_connection(url, apikey):
    """Diagnose connection issues."""
    
    print(f"Diagnosing connection to: {url}")
    
    # Parse URL
    from urllib.parse import urlparse
    parsed = urlparse(url)
    host = parsed.hostname
    port = parsed.port or 5240
    
    # Check DNS resolution
    try:
        ip = socket.gethostbyname(host)
        print(f"✓ DNS resolution: {host} -> {ip}")
    except socket.gaierror as e:
        print(f"✗ DNS resolution failed: {e}")
        return
    
    # Check port connectivity
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(5)
        result = sock.connect_ex((host, port))
        sock.close()
        
        if result == 0:
            print(f"✓ Port {port} is reachable")
        else:
            print(f"✗ Port {port} is not reachable")
            return
    except Exception as e:
        print(f"✗ Port check failed: {e}")
        return
    
    # Try connection
    try:
        client = connect(url, apikey=apikey)
        print(f"✓ MAAS connection successful")
        
        # Test API call
        version = client.version.get()
        print(f"✓ MAAS version: {version.version}")
        
    except ConnectError as e:
        print(f"✗ Connection error: {e}")
    except RemoteError as e:
        print(f"✗ Remote error (status {e.status}): {e.content}")
    except Exception as e:
        print(f"✗ Unexpected error: {e}")

# Usage
diagnose_connection('http://maas.example.com:5240/MAAS/', 'key:token:secret')

Troubleshooting: State Transition Issues

Handle invalid state transitions gracefully.

from maas.client import connect
from maas.client.enum import NodeStatus
from maas.client.errors import OperationNotAllowed

client = connect('http://maas.example.com:5240/MAAS/', apikey='key')

def safe_deploy(machine):
    """Deploy machine with state validation."""
    
    machine.refresh()
    status = machine.status
    
    print(f"Machine status: {status}")
    
    # Handle different states
    if status == NodeStatus.DEPLOYED:
        print("⚠ Machine already deployed")
        return True
    
    elif status == NodeStatus.DEPLOYING:
        print("⚠ Machine is deploying, waiting...")
        # Could implement polling here
        return False
    
    elif status == NodeStatus.ALLOCATED:
        print("✓ Machine allocated, deploying...")
        try:
            machine.deploy(distro_series='jammy', wait=True)
            return True
        except OperationNotAllowed as e:
            print(f"✗ Deployment not allowed: {e}")
            return False
    
    elif status == NodeStatus.READY:
        print("⚠ Machine not allocated, allocating first...")
        # This shouldn't happen if you have the machine object from allocation
        # but handle it anyway
        return False
    
    elif status in [NodeStatus.BROKEN, NodeStatus.FAILED_DEPLOYMENT]:
        print(f"✗ Machine in failed state: {status}")
        print("  Attempting to mark as fixed...")
        try:
            machine.mark_fixed(comment='Auto-recovery')
            machine.refresh()
            return safe_deploy(machine)  # Retry
        except Exception as e:
            print(f"  ✗ Recovery failed: {e}")
            return False
    
    else:
        print(f"✗ Unexpected state: {status}")
        return False

# Usage
machine = client.machines.get('system_id')
success = safe_deploy(machine)

Next Steps

  • Real-World Scenarios
  • Quick Start Guide