A Python library for the Docker Engine API.
—
Docker daemon system information, resource usage monitoring, and real-time event streaming for comprehensive Docker environment observability.
def df(self):
"""
Get Docker system disk usage information.
Returns:
dict: Disk usage data with the following structure:
- LayersSize (int): Total size of image layers in bytes
- Images (list): List of images with size information
- Containers (list): List of containers with size information
- Volumes (list): List of volumes with size information
- BuildCache (list): Build cache usage information
"""
def info(self):
"""
Get Docker system information.
Returns:
dict: System information including:
- ServerVersion (str): Docker server version
- Driver (str): Storage driver name
- KernelVersion (str): Host kernel version
- Architecture (str): Host architecture
- MemTotal (int): Total system memory
- NCPU (int): Number of CPUs
- DockerRootDir (str): Docker root directory
- Swarm (dict): Swarm configuration if enabled
"""
def ping(self):
"""
Ping the Docker daemon to verify connectivity.
Returns:
bool: True if daemon is reachable
Raises:
APIError: If daemon is unreachable
"""
def version(self, api_version=True):
"""
Get Docker version information.
Args:
api_version (bool): Include API version information
Returns:
dict: Version information including:
- Version (str): Docker version
- ApiVersion (str): API version
- MinAPIVersion (str): Minimum supported API version
- GitCommit (str): Git commit hash
- GoVersion (str): Go version used to build Docker
- Os (str): Operating system
- Arch (str): Architecture
- BuildTime (str): Build timestamp
"""def events(self, since=None, until=None, filters=None, decode=None):
"""
Stream Docker events from the daemon in real-time.
Args:
since (datetime, int, or str): Show events since this time
until (datetime, int, or str): Show events until this time
filters (dict): Filter events by type, container, image, etc.
decode (bool): Decode JSON events automatically (default: None)
Yields:
dict: Event objects with the following structure:
- Type (str): Event type ('container', 'image', 'network', 'volume', 'daemon')
- Action (str): Event action ('create', 'start', 'stop', 'destroy', etc.)
- Actor (dict): Object that generated the event
- ID (str): Object ID
- Attributes (dict): Object attributes
- time (int): Event timestamp
- timeNano (int): Event timestamp in nanoseconds
Common filter keys:
- type: Event type filter
- container: Container name or ID
- image: Image name or ID
- event: Specific event action
- label: Filter by labels
- network: Network name or ID
- volume: Volume name
- daemon: Daemon events
"""import docker
from datetime import datetime
client = docker.from_env()
# Get system information
info = client.info()
print(f"Docker Version: {info['ServerVersion']}")
print(f"Storage Driver: {info['Driver']}")
print(f"Total Memory: {info['MemTotal'] / (1024**3):.1f} GB")
print(f"CPUs: {info['NCPU']}")
# Check if Swarm is enabled
if info['Swarm']['LocalNodeState'] == 'active':
print("Swarm mode is active")
# Get resource usage
usage = client.df()
print(f"\\nDisk Usage:")
print(f"Images: {len(usage['Images'])} ({usage['LayersSize'] / (1024**2):.1f} MB)")
print(f"Containers: {len(usage['Containers'])}")
print(f"Volumes: {len(usage['Volumes'])}")
if 'BuildCache' in usage:
cache_size = sum(item.get('Size', 0) for item in usage['BuildCache'])
print(f"Build Cache: {cache_size / (1024**2):.1f} MB")
# Version information
version = client.version()
print(f"\\nVersion Details:")
print(f"API Version: {version['ApiVersion']}")
print(f"Git Commit: {version['GitCommit']}")
print(f"Go Version: {version['GoVersion']}")import docker
from datetime import datetime, timedelta
client = docker.from_env()
# Monitor all events
print("Monitoring all Docker events (Ctrl+C to stop):")
try:
for event in client.events(decode=True):
timestamp = datetime.fromtimestamp(event['time'])
actor_name = event['Actor']['Attributes'].get('name', 'unknown')
print(f"[{timestamp}] {event['Type']}: {event['Action']} - {actor_name}")
except KeyboardInterrupt:
print("Event monitoring stopped")
# Monitor specific events with filters
print("\\nMonitoring container start/stop events:")
filters = {
'type': 'container',
'event': ['start', 'stop', 'create', 'destroy']
}
for event in client.events(filters=filters, decode=True):
container_name = event['Actor']['Attributes'].get('name', event['Actor']['ID'][:12])
action = event['Action']
timestamp = datetime.fromtimestamp(event['time'])
print(f"[{timestamp}] Container {container_name}: {action}")
# Historical events
since_time = datetime.now() - timedelta(hours=1)
print(f"\\nEvents from the last hour:")
for event in client.events(since=since_time, decode=True):
if event['Type'] == 'container':
container_name = event['Actor']['Attributes'].get('name', 'unknown')
print(f"{event['Action']}: {container_name} at {datetime.fromtimestamp(event['time'])}")import docker
import json
client = docker.from_env()
# Filter by specific container
container_filters = {
'container': ['my-app', 'nginx-proxy']
}
print("Monitoring specific containers:")
for event in client.events(filters=container_filters, decode=True):
container_name = event['Actor']['Attributes'].get('name')
print(f"Container {container_name}: {event['Action']}")
# Filter by labels
label_filters = {
'type': 'container',
'label': ['environment=production', 'service=web']
}
print("\\nMonitoring production web containers:")
for event in client.events(filters=label_filters, decode=True):
attrs = event['Actor']['Attributes']
container_name = attrs.get('name', 'unknown')
environment = attrs.get('environment', 'unknown')
service = attrs.get('service', 'unknown')
print(f"[{environment}/{service}] {container_name}: {event['Action']}")
# Process image events
image_filters = {'type': 'image'}
print("\\nMonitoring image events:")
for event in client.events(filters=image_filters, decode=True):
image_name = event['Actor']['Attributes'].get('name', event['Actor']['ID'][:12])
print(f"Image {image_name}: {event['Action']}")
# Log detailed info for pull events
if event['Action'] == 'pull':
print(f" Full event data: {json.dumps(event, indent=2)}")
# Custom event processor
def process_events():
\"\"\"Custom event processor with statistics.\"\"\"
event_counts = {}
try:
for event in client.events(decode=True):
event_type = f"{event['Type']}.{event['Action']}"
event_counts[event_type] = event_counts.get(event_type, 0) + 1
# Print summary every 10 events
total_events = sum(event_counts.values())
if total_events % 10 == 0:
print(f"\\nEvent Summary (last {total_events} events):")
for event_type, count in sorted(event_counts.items()):
print(f" {event_type}: {count}")
except KeyboardInterrupt:
print(f"\\nFinal Event Summary:")
for event_type, count in sorted(event_counts.items()):
print(f" {event_type}: {count}")
# Run custom processor
# process_events()import docker
import time
client = docker.from_env()
def monitor_system_health():
\"\"\"Monitor Docker system health metrics.\"\"\"
while True:
try:
# Test connectivity
client.ping()
# Get system stats
info = client.info()
usage = client.df()
# Calculate metrics
total_containers = len(usage['Containers'])
running_containers = len([c for c in client.containers.list()])
total_images = len(usage['Images'])
layer_size_gb = usage['LayersSize'] / (1024**3)
print(f"[{datetime.now()}] System Health:")
print(f" Containers: {running_containers}/{total_containers} running")
print(f" Images: {total_images} ({layer_size_gb:.2f} GB)")
print(f" Memory: {info['MemTotal'] / (1024**3):.1f} GB total")
print(f" Storage Driver: {info['Driver']}")
print(" Status: Healthy\\n")
except Exception as e:
print(f"[{datetime.now()}] System Health: ERROR - {e}\\n")
time.sleep(30) # Check every 30 seconds
# monitor_system_health()