tessl install tessl/pypi-python-libmaas@0.6.0Python client library for MAAS 2.0+ with sync/async support, providing machine provisioning, network management, and storage configuration.
Advanced scenarios, edge cases, and troubleshooting for python-libmaas.
Implement retry logic for transient failures.
from maas.client import connect
from maas.client.errors import MAASException, PowerError
import time
client = connect('http://maas.example.com:5240/MAAS/', apikey='key')
def retry_operation(func, max_attempts=3, delay=5):
"""Retry operation with exponential backoff."""
for attempt in range(max_attempts):
try:
return func()
except (MAASException, PowerError) as e:
if attempt < max_attempts - 1:
wait_time = delay * (2 ** attempt)
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
print(f"All {max_attempts} attempts failed")
raise
# Usage
def power_on_with_retry(machine):
return retry_operation(
lambda: machine.power_on(wait=True),
max_attempts=3,
delay=5
)
machine = client.machines.get('system_id')
power_on_with_retry(machine)Handle scenarios where deployment partially succeeds.
from maas.client import connect
from maas.client.enum import NodeStatus
from maas.client.viscera.machines import FailedDeployment
client = connect('http://maas.example.com:5240/MAAS/', apikey='key')
def deploy_with_recovery(machine, max_retries=2):
"""Deploy with automatic recovery on failure."""
for attempt in range(max_retries + 1):
try:
print(f"Deployment attempt {attempt + 1}...")
machine.deploy(distro_series='jammy', wait=True)
print(f"✓ Deployment successful")
return True
except FailedDeployment as e:
print(f"✗ Deployment failed: {e}")
# Check if machine is in failed state
machine.refresh()
if machine.status in [NodeStatus.FAILED_DEPLOYMENT, NodeStatus.BROKEN]:
print(f" Machine status: {machine.status}")
if attempt < max_retries:
print(f" Attempting recovery...")
# Mark as fixed if broken
if machine.status == NodeStatus.BROKEN:
machine.mark_fixed(comment='Auto-recovery attempt')
# Release and try again
machine.release(quick_erase=True, wait=True)
print(f" Released machine, retrying...")
else:
print(f" Max retries reached, giving up")
raise
return False
machine = client.machines.allocate(cpu_count=4)
deploy_with_recovery(machine)Handle IP address conflicts and network configuration issues.
from maas.client import connect
from maas.client.enum import LinkMode
from maas.client.errors import MAASException
client = connect('http://maas.example.com:5240/MAAS/', apikey='key')
def configure_ip_with_fallback(interface, subnet, preferred_ip, ip_range):
"""Configure IP with fallback to available addresses."""
# Try preferred IP first
try:
link = interface.links.create(
interface,
LinkMode.STATIC,
subnet=subnet,
ip_address=preferred_ip
)
print(f"✓ Configured preferred IP: {preferred_ip}")
return link
except MAASException as e:
print(f"✗ Preferred IP unavailable: {e}")
# Try fallback IPs
for ip in ip_range:
try:
link = interface.links.create(
interface,
LinkMode.STATIC,
subnet=subnet,
ip_address=ip
)
print(f"✓ Configured fallback IP: {ip}")
return link
except MAASException:
continue
# Fall back to AUTO mode
print("⚠ No static IPs available, using AUTO mode")
link = interface.links.create(
interface,
LinkMode.AUTO,
subnet=subnet
)
return link
# Usage
machine = client.machines.get('system_id')
interface = machine.interfaces.get_by_name('eth0')
subnet = client.subnets.get(cidr='10.0.0.0/24')
# Try IPs in range
ip_range = [f'10.0.0.{i}' for i in range(100, 110)]
link = configure_ip_with_fallback(interface, subnet, '10.0.0.100', ip_range)Handle scenarios where expected storage devices aren't found.
from maas.client import connect
client = connect('http://maas.example.com:5240/MAAS/', apikey='key')
def find_storage_device(machine, preferred_names, min_size_gb=100):
"""Find storage device with fallback options."""
# Try preferred device names
for name in preferred_names:
try:
device = machine.block_devices.get_by_name(name)
size_gb = device.size / (1024**3)
if size_gb >= min_size_gb:
print(f"✓ Found {name}: {size_gb:.2f} GB")
return device
else:
print(f"⚠ {name} too small: {size_gb:.2f} GB < {min_size_gb} GB")
except KeyError:
print(f"✗ Device {name} not found")
# Find any suitable device
print(f"Searching for any device >= {min_size_gb} GB...")
for device in machine.block_devices:
size_gb = device.size / (1024**3)
if size_gb >= min_size_gb and device.type == 'physical':
print(f"✓ Found alternative: {device.name} ({size_gb:.2f} GB)")
return device
raise ValueError(f"No suitable storage device found (>= {min_size_gb} GB)")
# Usage
machine = client.machines.get('system_id')
device = find_storage_device(
machine,
preferred_names=['nvme0n1', 'sda', 'vda'],
min_size_gb=100
)Handle concurrent allocation attempts.
from maas.client import connect
from maas.client.viscera.machines import MachineNotFound
import time
client = connect('http://maas.example.com:5240/MAAS/', apikey='key')
def allocate_with_retry(requirements, max_attempts=5, delay=2):
"""Allocate machine with retry on race conditions."""
for attempt in range(max_attempts):
try:
machine = client.machines.allocate(**requirements)
print(f"✓ Allocated: {machine.hostname}")
return machine
except MachineNotFound as e:
if attempt < max_attempts - 1:
print(f"⚠ No machine available (attempt {attempt + 1})")
print(f" Waiting {delay}s for machines to become available...")
time.sleep(delay)
else:
print(f"✗ No machines available after {max_attempts} attempts")
raise
return None
# Usage
machine = allocate_with_retry({
'cpu_count': 8,
'mem': 16384,
'tags': ['ssd']
}, max_attempts=5)Properly handle async resources and cleanup.
import asyncio
from maas.client import connect
class MachinePool:
"""Async context manager for machine pool."""
def __init__(self, url, apikey):
self.url = url
self.apikey = apikey
self.client = None
self.allocated_machines = []
async def __aenter__(self):
self.client = await connect(self.url, apikey=self.apikey)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# Clean up allocated machines
print(f"Cleaning up {len(self.allocated_machines)} machines...")
for machine in self.allocated_machines:
try:
await machine.release(quick_erase=True)
print(f" ✓ Released: {machine.hostname}")
except Exception as e:
print(f" ✗ Failed to release {machine.hostname}: {e}")
async def allocate(self, **requirements):
"""Allocate machine and track for cleanup."""
machine = await self.client.machines.allocate(**requirements)
self.allocated_machines.append(machine)
return machine
# Usage
async def main():
async with MachinePool('http://maas.example.com:5240/MAAS/', 'key') as pool:
# Allocate machines
web = await pool.allocate(cpu_count=4, mem=8192)
db = await pool.allocate(cpu_count=8, mem=32768)
# Use machines
await web.deploy(distro_series='jammy')
await db.deploy(distro_series='jammy')
# Machines automatically released on exit
asyncio.run(main())Handle complex storage scenarios with multiple device types.
from maas.client import connect
from maas.client.enum import RaidLevel, CacheMode
client = connect('http://maas.example.com:5240/MAAS/', apikey='key')
def configure_tiered_storage(machine):
"""Configure multi-tier storage: NVMe + RAID + Bcache."""
# Identify devices by type
nvme_devices = [d for d in machine.block_devices if 'nvme' in d.name]
ssd_devices = [d for d in machine.block_devices if 'ssd' in d.tags]
hdd_devices = [d for d in machine.block_devices if d.type == 'physical'
and 'nvme' not in d.name and 'ssd' not in d.tags]
print(f"Found: {len(nvme_devices)} NVMe, {len(ssd_devices)} SSD, {len(hdd_devices)} HDD")
# Tier 1: NVMe for OS and hot data
if nvme_devices:
nvme = nvme_devices[0]
# Partition NVMe
os_part = nvme.partitions.create(nvme, size=50 * 1024**3) # 50GB OS
os_part.format('ext4')
os_part.mount('/')
hot_part = nvme.partitions.create(nvme, size=nvme.available_size)
hot_part.format('xfs')
hot_part.mount('/hot')
print(f"✓ Tier 1 (NVMe): OS + hot data on {nvme.name}")
# Tier 2: SSD RAID for warm data
if len(ssd_devices) >= 2:
raid = machine.raids.create(
machine,
RaidLevel.RAID_1,
devices=ssd_devices[:2],
name='md-ssd'
)
raid.virtual_device.format('xfs')
raid.virtual_device.mount('/warm')
print(f"✓ Tier 2 (SSD RAID): warm data on {len(ssd_devices[:2])} SSDs")
# Tier 3: Bcache for cold data (HDD + SSD cache)
if hdd_devices and len(ssd_devices) > 2:
# Use remaining SSD for cache
cache_ssd = ssd_devices[2] if len(ssd_devices) > 2 else ssd_devices[0]
cache_set = machine.bcache_cache_sets.create(machine, cache_ssd)
for i, hdd in enumerate(hdd_devices):
bcache = machine.bcaches.create(
machine,
name=f'bcache{i}',
backing_device=hdd,
cache_set=cache_set,
cache_mode=CacheMode.WRITETHROUGH
)
bcache.virtual_device.format('xfs')
bcache.virtual_device.mount(f'/cold{i}')
print(f"✓ Tier 3 (Bcache): cold data on {len(hdd_devices)} HDDs with SSD cache")
# Usage
machine = client.machines.allocate(tags=['storage'])
configure_tiered_storage(machine)
machine.deploy(distro_series='jammy', wait=True)Debug connection problems systematically.
from maas.client import connect
from maas.client.bones.helpers import ConnectError, RemoteError
import socket
def diagnose_connection(url, apikey):
"""Diagnose connection issues."""
print(f"Diagnosing connection to: {url}")
# Parse URL
from urllib.parse import urlparse
parsed = urlparse(url)
host = parsed.hostname
port = parsed.port or 5240
# Check DNS resolution
try:
ip = socket.gethostbyname(host)
print(f"✓ DNS resolution: {host} -> {ip}")
except socket.gaierror as e:
print(f"✗ DNS resolution failed: {e}")
return
# Check port connectivity
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex((host, port))
sock.close()
if result == 0:
print(f"✓ Port {port} is reachable")
else:
print(f"✗ Port {port} is not reachable")
return
except Exception as e:
print(f"✗ Port check failed: {e}")
return
# Try connection
try:
client = connect(url, apikey=apikey)
print(f"✓ MAAS connection successful")
# Test API call
version = client.version.get()
print(f"✓ MAAS version: {version.version}")
except ConnectError as e:
print(f"✗ Connection error: {e}")
except RemoteError as e:
print(f"✗ Remote error (status {e.status}): {e.content}")
except Exception as e:
print(f"✗ Unexpected error: {e}")
# Usage
diagnose_connection('http://maas.example.com:5240/MAAS/', 'key:token:secret')Handle invalid state transitions gracefully.
from maas.client import connect
from maas.client.enum import NodeStatus
from maas.client.errors import OperationNotAllowed
client = connect('http://maas.example.com:5240/MAAS/', apikey='key')
def safe_deploy(machine):
"""Deploy machine with state validation."""
machine.refresh()
status = machine.status
print(f"Machine status: {status}")
# Handle different states
if status == NodeStatus.DEPLOYED:
print("⚠ Machine already deployed")
return True
elif status == NodeStatus.DEPLOYING:
print("⚠ Machine is deploying, waiting...")
# Could implement polling here
return False
elif status == NodeStatus.ALLOCATED:
print("✓ Machine allocated, deploying...")
try:
machine.deploy(distro_series='jammy', wait=True)
return True
except OperationNotAllowed as e:
print(f"✗ Deployment not allowed: {e}")
return False
elif status == NodeStatus.READY:
print("⚠ Machine not allocated, allocating first...")
# This shouldn't happen if you have the machine object from allocation
# but handle it anyway
return False
elif status in [NodeStatus.BROKEN, NodeStatus.FAILED_DEPLOYMENT]:
print(f"✗ Machine in failed state: {status}")
print(" Attempting to mark as fixed...")
try:
machine.mark_fixed(comment='Auto-recovery')
machine.refresh()
return safe_deploy(machine) # Retry
except Exception as e:
print(f" ✗ Recovery failed: {e}")
return False
else:
print(f"✗ Unexpected state: {status}")
return False
# Usage
machine = client.machines.get('system_id')
success = safe_deploy(machine)