or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/python-libmaas@0.6.x

docs

examples

edge-cases.mdreal-world-scenarios.md
index.md
tile.json

tessl/pypi-python-libmaas

tessl install tessl/pypi-python-libmaas@0.6.0

Python client library for MAAS 2.0+ with sync/async support, providing machine provisioning, network management, and storage configuration.

real-world-scenarios.mddocs/examples/

Real-World Scenarios

Complete examples of common MAAS workflows and use cases.

Scenario 1: Automated Data Center Provisioning

Deploy multiple machines with specific configurations for different roles.

from maas.client import connect
from maas.client.enum import NodeStatus

client = connect('http://maas.example.com:5240/MAAS/', apikey='key')

def provision_web_servers(count=3):
    """Provision web server cluster."""
    web_servers = []
    
    for i in range(count):
        # Allocate machine
        machine = client.machines.allocate(
            cpu_count=4,
            mem=8192,
            tags=['ssd', 'web'],
            zone='dmz'
        )
        
        # Deploy with specific configuration
        machine.deploy(
            distro_series='jammy',
            wait=True,
            user_data="""#cloud-config
packages:
  - nginx
  - certbot
runcmd:
  - systemctl enable nginx
  - systemctl start nginx
"""
        )
        
        web_servers.append(machine)
        print(f"Deployed web server: {machine.hostname} at {machine.ip_addresses}")
    
    return web_servers

def provision_database_servers(count=2):
    """Provision database cluster with RAID."""
    db_servers = []
    
    for i in range(count):
        # Allocate high-memory machine
        machine = client.machines.allocate(
            cpu_count=8,
            mem=32768,
            tags=['database', 'raid'],
            zone='internal'
        )
        
        # Configure RAID before deployment
        from maas.client.enum import RaidLevel
        
        sdb = machine.block_devices.get_by_name('sdb')
        sdc = machine.block_devices.get_by_name('sdc')
        
        raid = machine.raids.create(
            machine,
            RaidLevel.RAID_1,
            devices=[sdb, sdc],
            name='md0'
        )
        
        # Format and mount RAID
        virtual_device = raid.virtual_device
        virtual_device.format('ext4')
        virtual_device.mount('/var/lib/postgresql')
        
        # Deploy
        machine.deploy(
            distro_series='jammy',
            wait=True,
            user_data="""#cloud-config
packages:
  - postgresql-14
"""
        )
        
        db_servers.append(machine)
        print(f"Deployed DB server: {machine.hostname} with RAID at {machine.ip_addresses}")
    
    return db_servers

# Provision infrastructure
web_servers = provision_web_servers(3)
db_servers = provision_database_servers(2)

print(f"\nProvisioned {len(web_servers)} web servers and {len(db_servers)} database servers")

Scenario 2: Network Segmentation

Configure machines in different network segments with proper isolation.

from maas.client import connect
from maas.client.enum import LinkMode

client = connect('http://maas.example.com:5240/MAAS/', apikey='key')

def configure_dmz_machine(machine):
    """Configure machine in DMZ with public and private interfaces."""
    
    # Public interface (eth0) - DMZ subnet
    eth0 = machine.interfaces.get_by_name('eth0')
    dmz_subnet = client.subnets.get(cidr='203.0.113.0/24')
    
    public_link = eth0.links.create(
        eth0,
        LinkMode.STATIC,
        subnet=dmz_subnet,
        ip_address='203.0.113.10',
        default_gateway=True
    )
    
    # Private interface (eth1) - Internal subnet
    eth1 = machine.interfaces.get_by_name('eth1')
    internal_subnet = client.subnets.get(cidr='10.0.1.0/24')
    
    private_link = eth1.links.create(
        eth1,
        LinkMode.STATIC,
        subnet=internal_subnet,
        ip_address='10.0.1.10'
    )
    
    print(f"Configured {machine.hostname}:")
    print(f"  Public: {public_link.ip_address}")
    print(f"  Private: {private_link.ip_address}")

def configure_internal_machine(machine):
    """Configure machine in internal network only."""
    
    eth0 = machine.interfaces.get_by_name('eth0')
    internal_subnet = client.subnets.get(cidr='10.0.2.0/24')
    
    link = eth0.links.create(
        eth0,
        LinkMode.STATIC,
        subnet=internal_subnet,
        ip_address='10.0.2.20',
        default_gateway=True
    )
    
    print(f"Configured {machine.hostname}: {link.ip_address}")

# Allocate and configure machines
dmz_machine = client.machines.allocate(tags=['dmz'])
configure_dmz_machine(dmz_machine)

internal_machine = client.machines.allocate(tags=['internal'])
configure_internal_machine(internal_machine)

Scenario 3: Storage Configuration with LVM

Set up complex storage layouts with LVM for flexibility.

from maas.client import connect

client = connect('http://maas.example.com:5240/MAAS/', apikey='key')

def configure_lvm_storage(machine):
    """Configure LVM with multiple logical volumes."""
    
    # Get block devices
    sdb = machine.block_devices.get_by_name('sdb')
    sdc = machine.block_devices.get_by_name('sdc')
    
    # Create volume group from multiple disks
    vg = machine.volume_groups.create(
        machine,
        name='vg-data',
        devices=[sdb, sdc]
    )
    
    print(f"Created volume group: {vg.name}")
    print(f"  Total size: {vg.size / (1024**3):.2f} GB")
    print(f"  Available: {vg.available_size / (1024**3):.2f} GB")
    
    # Create logical volumes for different purposes
    
    # Application data (200GB)
    lv_app = vg.logical_volumes.create(
        vg,
        name='lv-app',
        size=200 * 1024**3,
        tags=['application']
    )
    lv_app.format('ext4')
    lv_app.mount('/opt/app')
    
    # Database (300GB)
    lv_db = vg.logical_volumes.create(
        vg,
        name='lv-db',
        size=300 * 1024**3,
        tags=['database']
    )
    lv_db.format('xfs')
    lv_db.mount('/var/lib/mysql')
    
    # Logs (100GB)
    lv_logs = vg.logical_volumes.create(
        vg,
        name='lv-logs',
        size=100 * 1024**3,
        tags=['logs']
    )
    lv_logs.format('ext4')
    lv_logs.mount('/var/log/app')
    
    # Backups (remaining space)
    lv_backup = vg.logical_volumes.create(
        vg,
        name='lv-backup',
        size=vg.available_size,
        tags=['backup']
    )
    lv_backup.format('ext4')
    lv_backup.mount('/backup')
    
    print(f"\nCreated logical volumes:")
    for lv in vg.logical_volumes:
        fs = lv.filesystem
        print(f"  {lv.name}: {lv.size / (1024**3):.2f} GB -> {fs.mount_point}")

# Allocate machine and configure storage
machine = client.machines.allocate(tags=['storage'])
configure_lvm_storage(machine)

# Deploy with storage configured
machine.deploy(distro_series='jammy', wait=True)

Scenario 4: High-Performance Storage with Bcache

Configure SSD caching for improved I/O performance.

from maas.client import connect
from maas.client.enum import CacheMode

client = connect('http://maas.example.com:5240/MAAS/', apikey='key')

def configure_bcache(machine):
    """Configure Bcache with NVMe SSD cache and HDD backing."""
    
    # Create cache set from NVMe SSD
    nvme = machine.block_devices.get_by_name('nvme0n1')
    cache_set = machine.bcache_cache_sets.create(machine, nvme)
    
    print(f"Created cache set from {nvme.name}")
    
    # Configure bcache for each HDD
    hdds = ['sdb', 'sdc', 'sdd']
    bcache_devices = []
    
    for i, hdd_name in enumerate(hdds):
        hdd = machine.block_devices.get_by_name(hdd_name)
        
        bcache = machine.bcaches.create(
            machine,
            name=f'bcache{i}',
            backing_device=hdd,
            cache_set=cache_set,
            cache_mode=CacheMode.WRITEBACK  # Maximum performance
        )
        
        # Format and mount
        virtual_device = bcache.virtual_device
        virtual_device.format('xfs')
        virtual_device.mount(f'/data{i}')
        
        bcache_devices.append(bcache)
        print(f"  Configured {bcache.name}: {hdd.name} cached by {nvme.name}")
    
    return bcache_devices

# Allocate machine with NVMe
machine = client.machines.allocate(tags=['nvme', 'storage'])
bcache_devices = configure_bcache(machine)

# Deploy
machine.deploy(distro_series='jammy', wait=True)
print(f"\nDeployed {machine.hostname} with {len(bcache_devices)} bcache devices")

Scenario 5: Automated Machine Lifecycle Management

Complete lifecycle management with monitoring and cleanup.

import time
from maas.client import connect
from maas.client.enum import NodeStatus, PowerState

client = connect('http://maas.example.com:5240/MAAS/', apikey='key')

class MachineManager:
    """Manage machine lifecycle."""
    
    def __init__(self, client):
        self.client = client
    
    def provision(self, requirements):
        """Provision machine with requirements."""
        print(f"Provisioning machine with: {requirements}")
        
        try:
            # Allocate
            machine = self.client.machines.allocate(**requirements)
            print(f"  Allocated: {machine.hostname}")
            
            # Commission if needed
            if machine.status == NodeStatus.NEW:
                print(f"  Commissioning...")
                machine.commission(wait=True)
            
            # Power on
            if machine.power_state != PowerState.ON:
                print(f"  Powering on...")
                machine.power_on(wait=True)
            
            # Deploy
            print(f"  Deploying...")
            machine.deploy(distro_series='jammy', wait=True)
            
            print(f"  ✓ Deployed: {machine.hostname} at {machine.ip_addresses}")
            return machine
            
        except Exception as e:
            print(f"  ✗ Provisioning failed: {e}")
            raise
    
    def decommission(self, machine, secure_erase=True):
        """Decommission and release machine."""
        print(f"Decommissioning: {machine.hostname}")
        
        try:
            # Power off
            if machine.power_state == PowerState.ON:
                print(f"  Powering off...")
                machine.power_off(wait=True)
            
            # Release with secure erase
            print(f"  Releasing...")
            machine.release(
                secure_erase=secure_erase,
                comment='Automated decommission',
                wait=True
            )
            
            print(f"  ✓ Released: {machine.hostname}")
            
        except Exception as e:
            print(f"  ✗ Decommission failed: {e}")
            raise
    
    def health_check(self, machine):
        """Check machine health."""
        print(f"Health check: {machine.hostname}")
        
        # Refresh state
        machine.refresh()
        
        checks = {
            'status': machine.status == NodeStatus.DEPLOYED,
            'power': machine.power_state == PowerState.ON,
            'network': len(machine.ip_addresses) > 0,
        }
        
        for check, passed in checks.items():
            status = "✓" if passed else "✗"
            print(f"  {status} {check}")
        
        return all(checks.values())

# Usage
manager = MachineManager(client)

# Provision machines
web_server = manager.provision({
    'cpu_count': 4,
    'mem': 8192,
    'tags': ['web']
})

db_server = manager.provision({
    'cpu_count': 8,
    'mem': 32768,
    'tags': ['database']
})

# Monitor
time.sleep(60)  # Wait for machines to stabilize

# Health checks
manager.health_check(web_server)
manager.health_check(db_server)

# Decommission when done
# manager.decommission(web_server)
# manager.decommission(db_server)

Scenario 6: Bulk Operations

Manage multiple machines efficiently.

from maas.client import connect
from maas.client.enum import NodeStatus
import concurrent.futures

client = connect('http://maas.example.com:5240/MAAS/', apikey='key')

def deploy_machine(machine_id, config):
    """Deploy a single machine."""
    try:
        machine = client.machines.get(machine_id)
        machine.deploy(**config, wait=True)
        return {'success': True, 'machine': machine.hostname, 'ips': machine.ip_addresses}
    except Exception as e:
        return {'success': False, 'machine': machine_id, 'error': str(e)}

def bulk_deploy(machine_ids, config):
    """Deploy multiple machines in parallel."""
    print(f"Deploying {len(machine_ids)} machines...")
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = {
            executor.submit(deploy_machine, mid, config): mid 
            for mid in machine_ids
        }
        
        results = []
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            results.append(result)
            
            if result['success']:
                print(f"  ✓ {result['machine']}: {result['ips']}")
            else:
                print(f"  ✗ {result['machine']}: {result['error']}")
        
        return results

def bulk_release(machine_ids):
    """Release multiple machines."""
    print(f"Releasing {len(machine_ids)} machines...")
    
    for machine_id in machine_ids:
        try:
            machine = client.machines.get(machine_id)
            machine.release(comment='Bulk release')
            print(f"  ✓ Released: {machine.hostname}")
        except Exception as e:
            print(f"  ✗ Failed to release {machine_id}: {e}")

# Get all ready machines
ready_machines = [
    m.system_id for m in client.machines.list() 
    if m.status == NodeStatus.READY
]

# Deploy configuration
deploy_config = {
    'distro_series': 'jammy',
    'user_data': """#cloud-config
packages:
  - docker.io
"""
}

# Bulk deploy
results = bulk_deploy(ready_machines[:10], deploy_config)

# Summary
successful = sum(1 for r in results if r['success'])
failed = len(results) - successful
print(f"\nDeployment complete: {successful} successful, {failed} failed")

Scenario 7: Tag-Based Machine Management

Organize and manage machines using tags.

from maas.client import connect

client = connect('http://maas.example.com:5240/MAAS/', apikey='key')

# Create tags for organization
tags = {
    'production': client.tags.create(
        name='production',
        comment='Production environment machines'
    ),
    'staging': client.tags.create(
        name='staging',
        comment='Staging environment machines'
    ),
    'gpu': client.tags.create(
        name='gpu',
        definition='//node[@class="display"]/vendor[contains(text(), "NVIDIA")]',
        comment='Machines with NVIDIA GPUs'
    ),
}

# Tag machines
for machine in client.machines.list():
    if 'prod' in machine.hostname:
        machine.tags.add(tags['production'])
    elif 'staging' in machine.hostname:
        machine.tags.add(tags['staging'])

# Allocate by tags
gpu_machine = client.machines.allocate(tags=['gpu', 'production'])
print(f"Allocated GPU machine: {gpu_machine.hostname}")

# Query by tags
prod_machines = [
    m for m in client.machines.list()
    if 'production' in m.tags
]
print(f"Production machines: {len(prod_machines)}")

Next Steps

  • Edge Cases & Advanced Usage
  • API Reference
  • Quick Start Guide