tessl install tessl/pypi-python-libmaas@0.6.0Python client library for MAAS 2.0+ with sync/async support, providing machine provisioning, network management, and storage configuration.
Complete examples of common MAAS workflows and use cases.
Deploy multiple machines with specific configurations for different roles.
from maas.client import connect
from maas.client.enum import NodeStatus
client = connect('http://maas.example.com:5240/MAAS/', apikey='key')
def provision_web_servers(count=3):
"""Provision web server cluster."""
web_servers = []
for i in range(count):
# Allocate machine
machine = client.machines.allocate(
cpu_count=4,
mem=8192,
tags=['ssd', 'web'],
zone='dmz'
)
# Deploy with specific configuration
machine.deploy(
distro_series='jammy',
wait=True,
user_data="""#cloud-config
packages:
- nginx
- certbot
runcmd:
- systemctl enable nginx
- systemctl start nginx
"""
)
web_servers.append(machine)
print(f"Deployed web server: {machine.hostname} at {machine.ip_addresses}")
return web_servers
def provision_database_servers(count=2):
"""Provision database cluster with RAID."""
db_servers = []
for i in range(count):
# Allocate high-memory machine
machine = client.machines.allocate(
cpu_count=8,
mem=32768,
tags=['database', 'raid'],
zone='internal'
)
# Configure RAID before deployment
from maas.client.enum import RaidLevel
sdb = machine.block_devices.get_by_name('sdb')
sdc = machine.block_devices.get_by_name('sdc')
raid = machine.raids.create(
machine,
RaidLevel.RAID_1,
devices=[sdb, sdc],
name='md0'
)
# Format and mount RAID
virtual_device = raid.virtual_device
virtual_device.format('ext4')
virtual_device.mount('/var/lib/postgresql')
# Deploy
machine.deploy(
distro_series='jammy',
wait=True,
user_data="""#cloud-config
packages:
- postgresql-14
"""
)
db_servers.append(machine)
print(f"Deployed DB server: {machine.hostname} with RAID at {machine.ip_addresses}")
return db_servers
# Provision infrastructure
web_servers = provision_web_servers(3)
db_servers = provision_database_servers(2)
print(f"\nProvisioned {len(web_servers)} web servers and {len(db_servers)} database servers")Configure machines in different network segments with proper isolation.
from maas.client import connect
from maas.client.enum import LinkMode
client = connect('http://maas.example.com:5240/MAAS/', apikey='key')
def configure_dmz_machine(machine):
"""Configure machine in DMZ with public and private interfaces."""
# Public interface (eth0) - DMZ subnet
eth0 = machine.interfaces.get_by_name('eth0')
dmz_subnet = client.subnets.get(cidr='203.0.113.0/24')
public_link = eth0.links.create(
eth0,
LinkMode.STATIC,
subnet=dmz_subnet,
ip_address='203.0.113.10',
default_gateway=True
)
# Private interface (eth1) - Internal subnet
eth1 = machine.interfaces.get_by_name('eth1')
internal_subnet = client.subnets.get(cidr='10.0.1.0/24')
private_link = eth1.links.create(
eth1,
LinkMode.STATIC,
subnet=internal_subnet,
ip_address='10.0.1.10'
)
print(f"Configured {machine.hostname}:")
print(f" Public: {public_link.ip_address}")
print(f" Private: {private_link.ip_address}")
def configure_internal_machine(machine):
"""Configure machine in internal network only."""
eth0 = machine.interfaces.get_by_name('eth0')
internal_subnet = client.subnets.get(cidr='10.0.2.0/24')
link = eth0.links.create(
eth0,
LinkMode.STATIC,
subnet=internal_subnet,
ip_address='10.0.2.20',
default_gateway=True
)
print(f"Configured {machine.hostname}: {link.ip_address}")
# Allocate and configure machines
dmz_machine = client.machines.allocate(tags=['dmz'])
configure_dmz_machine(dmz_machine)
internal_machine = client.machines.allocate(tags=['internal'])
configure_internal_machine(internal_machine)Set up complex storage layouts with LVM for flexibility.
from maas.client import connect
client = connect('http://maas.example.com:5240/MAAS/', apikey='key')
def configure_lvm_storage(machine):
"""Configure LVM with multiple logical volumes."""
# Get block devices
sdb = machine.block_devices.get_by_name('sdb')
sdc = machine.block_devices.get_by_name('sdc')
# Create volume group from multiple disks
vg = machine.volume_groups.create(
machine,
name='vg-data',
devices=[sdb, sdc]
)
print(f"Created volume group: {vg.name}")
print(f" Total size: {vg.size / (1024**3):.2f} GB")
print(f" Available: {vg.available_size / (1024**3):.2f} GB")
# Create logical volumes for different purposes
# Application data (200GB)
lv_app = vg.logical_volumes.create(
vg,
name='lv-app',
size=200 * 1024**3,
tags=['application']
)
lv_app.format('ext4')
lv_app.mount('/opt/app')
# Database (300GB)
lv_db = vg.logical_volumes.create(
vg,
name='lv-db',
size=300 * 1024**3,
tags=['database']
)
lv_db.format('xfs')
lv_db.mount('/var/lib/mysql')
# Logs (100GB)
lv_logs = vg.logical_volumes.create(
vg,
name='lv-logs',
size=100 * 1024**3,
tags=['logs']
)
lv_logs.format('ext4')
lv_logs.mount('/var/log/app')
# Backups (remaining space)
lv_backup = vg.logical_volumes.create(
vg,
name='lv-backup',
size=vg.available_size,
tags=['backup']
)
lv_backup.format('ext4')
lv_backup.mount('/backup')
print(f"\nCreated logical volumes:")
for lv in vg.logical_volumes:
fs = lv.filesystem
print(f" {lv.name}: {lv.size / (1024**3):.2f} GB -> {fs.mount_point}")
# Allocate machine and configure storage
machine = client.machines.allocate(tags=['storage'])
configure_lvm_storage(machine)
# Deploy with storage configured
machine.deploy(distro_series='jammy', wait=True)Configure SSD caching for improved I/O performance.
from maas.client import connect
from maas.client.enum import CacheMode
client = connect('http://maas.example.com:5240/MAAS/', apikey='key')
def configure_bcache(machine):
"""Configure Bcache with NVMe SSD cache and HDD backing."""
# Create cache set from NVMe SSD
nvme = machine.block_devices.get_by_name('nvme0n1')
cache_set = machine.bcache_cache_sets.create(machine, nvme)
print(f"Created cache set from {nvme.name}")
# Configure bcache for each HDD
hdds = ['sdb', 'sdc', 'sdd']
bcache_devices = []
for i, hdd_name in enumerate(hdds):
hdd = machine.block_devices.get_by_name(hdd_name)
bcache = machine.bcaches.create(
machine,
name=f'bcache{i}',
backing_device=hdd,
cache_set=cache_set,
cache_mode=CacheMode.WRITEBACK # Maximum performance
)
# Format and mount
virtual_device = bcache.virtual_device
virtual_device.format('xfs')
virtual_device.mount(f'/data{i}')
bcache_devices.append(bcache)
print(f" Configured {bcache.name}: {hdd.name} cached by {nvme.name}")
return bcache_devices
# Allocate machine with NVMe
machine = client.machines.allocate(tags=['nvme', 'storage'])
bcache_devices = configure_bcache(machine)
# Deploy
machine.deploy(distro_series='jammy', wait=True)
print(f"\nDeployed {machine.hostname} with {len(bcache_devices)} bcache devices")Complete lifecycle management with monitoring and cleanup.
import time
from maas.client import connect
from maas.client.enum import NodeStatus, PowerState
client = connect('http://maas.example.com:5240/MAAS/', apikey='key')
class MachineManager:
"""Manage machine lifecycle."""
def __init__(self, client):
self.client = client
def provision(self, requirements):
"""Provision machine with requirements."""
print(f"Provisioning machine with: {requirements}")
try:
# Allocate
machine = self.client.machines.allocate(**requirements)
print(f" Allocated: {machine.hostname}")
# Commission if needed
if machine.status == NodeStatus.NEW:
print(f" Commissioning...")
machine.commission(wait=True)
# Power on
if machine.power_state != PowerState.ON:
print(f" Powering on...")
machine.power_on(wait=True)
# Deploy
print(f" Deploying...")
machine.deploy(distro_series='jammy', wait=True)
print(f" ✓ Deployed: {machine.hostname} at {machine.ip_addresses}")
return machine
except Exception as e:
print(f" ✗ Provisioning failed: {e}")
raise
def decommission(self, machine, secure_erase=True):
"""Decommission and release machine."""
print(f"Decommissioning: {machine.hostname}")
try:
# Power off
if machine.power_state == PowerState.ON:
print(f" Powering off...")
machine.power_off(wait=True)
# Release with secure erase
print(f" Releasing...")
machine.release(
secure_erase=secure_erase,
comment='Automated decommission',
wait=True
)
print(f" ✓ Released: {machine.hostname}")
except Exception as e:
print(f" ✗ Decommission failed: {e}")
raise
def health_check(self, machine):
"""Check machine health."""
print(f"Health check: {machine.hostname}")
# Refresh state
machine.refresh()
checks = {
'status': machine.status == NodeStatus.DEPLOYED,
'power': machine.power_state == PowerState.ON,
'network': len(machine.ip_addresses) > 0,
}
for check, passed in checks.items():
status = "✓" if passed else "✗"
print(f" {status} {check}")
return all(checks.values())
# Usage
manager = MachineManager(client)
# Provision machines
web_server = manager.provision({
'cpu_count': 4,
'mem': 8192,
'tags': ['web']
})
db_server = manager.provision({
'cpu_count': 8,
'mem': 32768,
'tags': ['database']
})
# Monitor
time.sleep(60) # Wait for machines to stabilize
# Health checks
manager.health_check(web_server)
manager.health_check(db_server)
# Decommission when done
# manager.decommission(web_server)
# manager.decommission(db_server)Manage multiple machines efficiently.
from maas.client import connect
from maas.client.enum import NodeStatus
import concurrent.futures
client = connect('http://maas.example.com:5240/MAAS/', apikey='key')
def deploy_machine(machine_id, config):
"""Deploy a single machine."""
try:
machine = client.machines.get(machine_id)
machine.deploy(**config, wait=True)
return {'success': True, 'machine': machine.hostname, 'ips': machine.ip_addresses}
except Exception as e:
return {'success': False, 'machine': machine_id, 'error': str(e)}
def bulk_deploy(machine_ids, config):
"""Deploy multiple machines in parallel."""
print(f"Deploying {len(machine_ids)} machines...")
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(deploy_machine, mid, config): mid
for mid in machine_ids
}
results = []
for future in concurrent.futures.as_completed(futures):
result = future.result()
results.append(result)
if result['success']:
print(f" ✓ {result['machine']}: {result['ips']}")
else:
print(f" ✗ {result['machine']}: {result['error']}")
return results
def bulk_release(machine_ids):
"""Release multiple machines."""
print(f"Releasing {len(machine_ids)} machines...")
for machine_id in machine_ids:
try:
machine = client.machines.get(machine_id)
machine.release(comment='Bulk release')
print(f" ✓ Released: {machine.hostname}")
except Exception as e:
print(f" ✗ Failed to release {machine_id}: {e}")
# Get all ready machines
ready_machines = [
m.system_id for m in client.machines.list()
if m.status == NodeStatus.READY
]
# Deploy configuration
deploy_config = {
'distro_series': 'jammy',
'user_data': """#cloud-config
packages:
- docker.io
"""
}
# Bulk deploy
results = bulk_deploy(ready_machines[:10], deploy_config)
# Summary
successful = sum(1 for r in results if r['success'])
failed = len(results) - successful
print(f"\nDeployment complete: {successful} successful, {failed} failed")Organize and manage machines using tags.
from maas.client import connect
client = connect('http://maas.example.com:5240/MAAS/', apikey='key')
# Create tags for organization
tags = {
'production': client.tags.create(
name='production',
comment='Production environment machines'
),
'staging': client.tags.create(
name='staging',
comment='Staging environment machines'
),
'gpu': client.tags.create(
name='gpu',
definition='//node[@class="display"]/vendor[contains(text(), "NVIDIA")]',
comment='Machines with NVIDIA GPUs'
),
}
# Tag machines
for machine in client.machines.list():
if 'prod' in machine.hostname:
machine.tags.add(tags['production'])
elif 'staging' in machine.hostname:
machine.tags.add(tags['staging'])
# Allocate by tags
gpu_machine = client.machines.allocate(tags=['gpu', 'production'])
print(f"Allocated GPU machine: {gpu_machine.hostname}")
# Query by tags
prod_machines = [
m for m in client.machines.list()
if 'production' in m.tags
]
print(f"Production machines: {len(prod_machines)}")