A Docker client for Python, designed to be fun and intuitive!
—
Docker Swarm operations for container orchestration including nodes, services, secrets, and configs. Docker Swarm provides native clustering and orchestration capabilities for Docker containers across multiple hosts.
Initialize, manage, and configure Docker Swarm clusters.
def init(
*,
advertise_addr: Optional[str] = None,
listen_addr: str = "0.0.0.0:2377",
force_new_cluster: bool = False,
cert_expiry: str = "2160h0m0s",
dispatcher_heartbeat: str = "5s",
task_history_limit: int = 5,
snapshot_interval: int = 10000,
log_entries_for_slow_followers: int = 500,
election_tick: int = 10,
heartbeat_tick: int = 1,
max_snapshots: int = 0,
snapshot_keep_old_snapshots: int = 0,
data_path_addr: Optional[str] = None,
data_path_port: Optional[int] = None,
default_addr_pool: Optional[List[str]] = None,
default_addr_pool_mask_length: Optional[int] = None,
external_ca: Optional[List[str]] = None,
autolock: bool = False
) -> str:
"""
Initialize a new swarm cluster.
Parameters:
- advertise_addr: Advertised address (format: <ip|interface>[:port])
- listen_addr: Listen address (format: <ip|interface>[:port])
- force_new_cluster: Force create new cluster from current state
- cert_expiry: Validity period for node certificates
- dispatcher_heartbeat: Dispatcher heartbeat period
- task_history_limit: Task history retention limit
- snapshot_interval: Number of log entries between snapshots
- log_entries_for_slow_followers: Number of log entries to keep for slow followers
- election_tick: Number of ticks between elections
- heartbeat_tick: Number of ticks between heartbeats
- max_snapshots: Number of additional snapshots to retain
- snapshot_keep_old_snapshots: Number of old snapshots to preserve
- data_path_addr: Address or interface to use for data path traffic
- data_path_port: Port number to use for data path traffic
- default_addr_pool: Default address pool in CIDR format
- default_addr_pool_mask_length: Default address pool subnet mask length
- external_ca: Specifications of one or more certificate signing endpoints
- autolock: Enable manager autolocking
Returns:
- Swarm initialization result
"""
def join(
token: str,
manager_addr: str,
*,
listen_addr: str = "0.0.0.0:2377",
advertise_addr: Optional[str] = None,
data_path_addr: Optional[str] = None
) -> None:
"""
Join an existing swarm as a node.
Parameters:
- token: Token for joining swarm
- manager_addr: Address of manager node
- listen_addr: Listen address for swarm traffic
- advertise_addr: Advertised address
- data_path_addr: Address for data path traffic
"""
def leave(force: bool = False) -> None:
"""
Leave the swarm.
Parameters:
- force: Force leave swarm (use on manager nodes)
"""
def update(
*,
cert_expiry: Optional[str] = None,
dispatcher_heartbeat: Optional[str] = None,
task_history_limit: Optional[int] = None,
snapshot_interval: Optional[int] = None,
log_entries_for_slow_followers: Optional[int] = None,
election_tick: Optional[int] = None,
heartbeat_tick: Optional[int] = None,
max_snapshots: Optional[int] = None,
snapshot_keep_old_snapshots: Optional[int] = None,
autolock: Optional[bool] = None
) -> None:
"""
Update swarm configuration.
Parameters: Same as init() but all optional for updating existing settings
"""Manage join tokens for worker and manager nodes.
def join_token(
role: str,
*,
rotate: bool = False,
quiet: bool = False
) -> str:
"""
Manage join tokens for swarm.
Parameters:
- role: Node role (worker, manager)
- rotate: Rotate the join token
- quiet: Only display token
Returns:
- Join token string
"""Manage swarm encryption and security features.
def unlock() -> None:
"""
Unlock swarm with unlock key.
"""
def unlock_key(
*,
rotate: bool = False,
quiet: bool = False
) -> str:
"""
Manage unlock key for locked swarm.
Parameters:
- rotate: Rotate unlock key
- quiet: Only display key
Returns:
- Unlock key string
"""Manage swarm nodes including promotion, demotion, and removal.
class NodeCLI:
def demote(self, nodes: Union[str, List[str]]) -> None:
"""
Demote manager nodes to worker nodes.
Parameters:
- nodes: Node ID(s) or name(s) to demote
"""
def inspect(self, nodes: Union[str, List[str]]) -> List[Node]:
"""
Get detailed information about nodes.
Parameters:
- nodes: Node ID(s) or name(s) to inspect
Returns:
- List of Node objects with full details
"""
def list(
self,
*,
filters: Optional[Dict[str, str]] = None,
quiet: bool = False,
format: Optional[str] = None
) -> List[Node]:
"""
List swarm nodes.
Parameters:
- filters: Filters to apply (id, label, membership, name, role)
- quiet: Only show node IDs
- format: Output format
Returns:
- List of Node objects
"""
def promote(self, nodes: Union[str, List[str]]) -> None:
"""
Promote worker nodes to manager nodes.
Parameters:
- nodes: Node ID(s) or name(s) to promote
"""
def remove(
self,
nodes: Union[str, List[str]],
*,
force: bool = False
) -> None:
"""
Remove nodes from swarm.
Parameters:
- nodes: Node ID(s) or name(s) to remove
- force: Force remove node from swarm
"""
def update(
self,
node: str,
*,
availability: Optional[str] = None,
labels_add: Optional[Dict[str, str]] = None,
labels_remove: Optional[List[str]] = None,
role: Optional[str] = None
) -> None:
"""
Update node configuration.
Parameters:
- node: Node ID or name
- availability: Node availability (active, pause, drain)
- labels_add: Labels to add
- labels_remove: Labels to remove
- role: Node role (worker, manager)
"""Create and manage swarm services for container orchestration.
class ServiceCLI:
def create(
self,
image: str,
*,
name: Optional[str] = None,
command: Optional[List[str]] = None,
replicas: Optional[int] = None,
mode: str = "replicated",
update_config: Optional[Dict[str, Any]] = None,
rollback_config: Optional[Dict[str, Any]] = None,
networks: Optional[List[str]] = None,
ports: Optional[List[str]] = None,
mounts: Optional[List[str]] = None,
secrets: Optional[List[str]] = None,
configs: Optional[List[str]] = None,
env: Optional[Dict[str, str]] = None,
labels: Optional[Dict[str, str]] = None,
constraints: Optional[List[str]] = None,
placement_prefs: Optional[List[str]] = None,
resources: Optional[Dict[str, Any]] = None,
restart_condition: Optional[str] = None,
restart_delay: Optional[str] = None,
restart_max_attempts: Optional[int] = None,
restart_window: Optional[str] = None,
stop_grace_period: Optional[str] = None,
health_cmd: Optional[str] = None,
health_interval: Optional[str] = None,
health_retries: Optional[int] = None,
health_start_period: Optional[str] = None,
health_timeout: Optional[str] = None,
hostname: Optional[str] = None,
isolation: Optional[str] = None,
limit_cpu: Optional[str] = None,
limit_memory: Optional[str] = None,
log_driver: Optional[str] = None,
log_opts: Optional[Dict[str, str]] = None,
reserve_cpu: Optional[str] = None,
reserve_memory: Optional[str] = None,
user: Optional[str] = None,
workdir: Optional[str] = None,
tty: bool = False,
replicas_max_per_node: Optional[int] = None,
stop_signal: Optional[str] = None,
read_only: bool = False
) -> Service:
"""
Create a new service.
Parameters:
- image: Service image
- name: Service name
- command: Command to run
- replicas: Number of replicas
- mode: Service mode (replicated, global)
- update_config: Update configuration
- rollback_config: Rollback configuration
- networks: Networks to attach
- ports: Port mappings
- mounts: Volume/bind mounts
- secrets: Secrets to attach
- configs: Configs to attach
- env: Environment variables
- labels: Service labels
- constraints: Placement constraints
- placement_prefs: Placement preferences
- resources: Resource requirements
- restart_condition: Restart condition (none, on-failure, any)
- restart_delay: Restart delay
- restart_max_attempts: Maximum restart attempts
- restart_window: Restart window
- stop_grace_period: Grace period before force kill
- health_cmd: Health check command
- health_interval: Health check interval
- health_retries: Health check retries
- health_start_period: Health check start period
- health_timeout: Health check timeout
- hostname: Container hostname
- isolation: Container isolation technology
- limit_cpu: CPU limit
- limit_memory: Memory limit
- log_driver: Logging driver
- log_opts: Logging driver options
- reserve_cpu: CPU reservation
- reserve_memory: Memory reservation
- user: User to run as
- workdir: Working directory
- tty: Allocate pseudo-TTY
- replicas_max_per_node: Maximum replicas per node
- stop_signal: Signal to stop container
- read_only: Mount root filesystem as read-only
Returns:
- Service object
"""
def update(
self,
service: str,
*,
image: Optional[str] = None,
command: Optional[List[str]] = None,
replicas: Optional[int] = None,
force: bool = False,
**kwargs
) -> None:
"""
Update service configuration.
Parameters:
- service: Service name or ID
- image: New service image
- command: New command
- replicas: New replica count
- force: Force update even if no changes
- **kwargs: Same options as create()
"""
def scale(self, services: Dict[str, int]) -> None:
"""
Scale services to specified replica counts.
Parameters:
- services: Dictionary mapping service names to replica counts
"""
def rollback(self, service: str) -> None:
"""
Rollback service to previous version.
Parameters:
- service: Service name or ID
"""from python_on_whales import docker
# Initialize swarm on manager node
init_result = docker.swarm.init(
advertise_addr="192.168.1.10:2377",
listen_addr="0.0.0.0:2377"
)
print(f"Swarm initialized: {init_result}")
# Get join tokens
worker_token = docker.swarm.join_token("worker", quiet=True)
manager_token = docker.swarm.join_token("manager", quiet=True)
print(f"Worker join token: {worker_token}")
print(f"Manager join token: {manager_token}")
# On worker nodes, join the swarm
# docker.swarm.join(worker_token, "192.168.1.10:2377")# Create a web service
web_service = docker.service.create(
"nginx:alpine",
name="web",
replicas=3,
ports=["80:80"],
labels={"service": "web"},
update_config={
"parallelism": 1,
"delay": "10s"
},
restart_condition="on-failure"
)
# Scale the service
docker.service.scale({"web": 5})
# Update service with new image
docker.service.update(
"web",
image="nginx:1.21-alpine",
force=True
)
# List all services
services = docker.service.list()
for service in services:
print(f"Service: {service.name} - Replicas: {service.replicas}")# List all nodes
nodes = docker.node.list()
for node in nodes:
print(f"Node: {node.hostname} - Role: {node.role} - Status: {node.status}")
# Promote worker to manager
docker.node.promote("worker-node-1")
# Drain node for maintenance
docker.node.update(
"worker-node-2",
availability="drain"
)
# Add labels to node
docker.node.update(
"worker-node-2",
labels_add={"environment": "production", "zone": "us-west-1a"}
)# Initialize swarm with advanced settings
docker.swarm.init(
advertise_addr="192.168.1.10:2377",
cert_expiry="8760h", # 1 year
task_history_limit=10,
autolock=True # Enable autolock for security
)
# Get unlock key for secure cluster
unlock_key = docker.swarm.unlock_key(quiet=True)
print(f"Store this unlock key securely: {unlock_key}")
# Create production service with placement constraints
prod_service = docker.service.create(
"myapp:production",
name="production-api",
replicas=5,
constraints=["node.labels.environment==production"],
placement_prefs=["spread=node.labels.zone"],
resources={
"limits": {"cpu": "1.0", "memory": "512M"},
"reservations": {"cpu": "0.5", "memory": "256M"}
},
update_config={
"parallelism": 2,
"delay": "30s",
"failure_action": "rollback",
"monitor": "60s"
},
rollback_config={
"parallelism": 1,
"delay": "30s",
"monitor": "60s"
}
)class Node:
id: str
version: Dict[str, Any]
created_at: str
updated_at: str
spec: Dict[str, Any]
description: Dict[str, Any]
status: Dict[str, Any]
manager_status: Optional[Dict[str, Any]]
hostname: str
role: str
availability: str
addr: str
engine_version: str
def demote(self) -> None: ...
def promote(self) -> None: ...
def remove(self, force: bool = False) -> None: ...
def update(self, **kwargs) -> None: ...
class Service:
id: str
version: Dict[str, Any]
created_at: str
updated_at: str
spec: Dict[str, Any]
endpoint: Dict[str, Any]
update_status: Optional[Dict[str, Any]]
name: str
image: str
replicas: int
mode: str
def remove(self) -> None: ...
def update(self, **kwargs) -> None: ...
def scale(self, replicas: int) -> None: ...
def rollback(self) -> None: ...
def logs(self, **kwargs) -> str: ...
def ps(self) -> List[Task]: ...
class Task:
id: str
version: Dict[str, Any]
created_at: str
updated_at: str
name: str
labels: Dict[str, str]
spec: Dict[str, Any]
service_id: str
slot: Optional[int]
node_id: str
assigned_generic_resources: List[Dict[str, Any]]
status: Dict[str, Any]
desired_state: strInstall with Tessl CLI
npx tessl i tessl/pypi-python-on-whales