Docker integration provider for Apache Airflow workflows, enabling containerized task execution and Docker Swarm orchestration.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Low-level Docker API client for direct Docker daemon interactions, connection management, and custom Docker operations. The DockerHook provides a thin wrapper around the docker.APIClient for advanced Docker daemon operations not covered by the operators.
Interact with Docker Daemon and Container Registry through the Docker API.
class DockerHook(BaseHook):
conn_name_attr = "docker_conn_id"
default_conn_name = "docker_default"
conn_type = "docker"
hook_name = "Docker"
def __init__(
self,
docker_conn_id: str | None = "docker_default",
base_url: str | list[str] | None = None,
version: str | None = None,
tls: TLSConfig | bool | None = None,
timeout: int = 60
) -> NoneParameters:
docker_conn_id: Docker connection ID for authentication (default: "docker_default")base_url: URL or list of URLs to Docker server (required)version: API version to use ("auto" for auto-detection, None for latest)tls: TLS configuration (TLSConfig object, True for default TLS, False for no TLS)timeout: Default timeout for API calls in secondsdef get_conn(self) -> APIClient:
"""Get connection to Docker API."""
@property
def api_client(self) -> APIClient:
"""Get Docker API client instance."""
@property
def client_created(self) -> bool:
"""Check if client was successfully created."""@staticmethod
def construct_tls_config(
ca_cert: str | None = None,
client_cert: str | None = None,
client_key: str | None = None,
verify: bool = True,
assert_hostname: str | bool | None = None,
ssl_version: str | None = None
) -> TLSConfig | bool:
"""
Construct TLS configuration for secure Docker connections.
Args:
ca_cert: Path to PEM-encoded CA certificate file
client_cert: Path to PEM-encoded certificate file
client_key: Path to PEM-encoded key file
verify: Verify certificate validity
assert_hostname: Hostname to match against server certificate
ssl_version: SSL version to use
Returns:
TLSConfig object or boolean for TLS enablement
"""@classmethod
def get_connection_form_widgets(cls) -> dict[str, Any]:
"""Get connection form widgets for Airflow UI."""
@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""Get UI field behavior configuration."""from airflow.providers.docker.hooks.docker import DockerHook
# Initialize hook with default connection
hook = DockerHook(docker_conn_id='docker_default')
# Get API client
client = hook.get_conn()
# Use client for Docker operations
containers = client.containers()
images = client.images()# Connect to custom Docker daemon
custom_hook = DockerHook(
base_url=['tcp://docker.example.com:2376'],
version='1.41',
timeout=120
)
client = custom_hook.api_client
# Check daemon info
info = client.info()
version = client.version()from docker import TLSConfig
# Configure TLS for secure connection
tls_config = DockerHook.construct_tls_config(
ca_cert='/path/to/ca.pem',
client_cert='/path/to/cert.pem',
client_key='/path/to/key.pem',
verify=True,
assert_hostname='docker.example.com'
)
secure_hook = DockerHook(
base_url=['https://docker.example.com:2376'],
tls=tls_config,
docker_conn_id='secure_docker'
)
client = secure_hook.get_conn()# Use hook for direct container operations
hook = DockerHook()
client = hook.api_client
# Create container
container = client.create_container(
image='alpine:latest',
command=['echo', 'Hello World'],
name='test_container'
)
# Start container
client.start(container['Id'])
# Get logs
logs = client.logs(container['Id'])
# Stop and remove container
client.stop(container['Id'])
client.remove_container(container['Id'])# Image operations using hook
hook = DockerHook()
client = hook.api_client
# Pull image
for line in client.pull('python:3.9', stream=True, decode=True):
print(line)
# List images
images = client.images()
# Build image from Dockerfile
buildlogs = client.build(
path='/path/to/dockerfile/dir',
tag='myapp:latest',
rm=True
)
for log in buildlogs:
print(log)# Network management
hook = DockerHook()
client = hook.api_client
# Create network
network = client.create_network(
name='custom_network',
driver='bridge',
options={
'com.docker.network.bridge.name': 'custom0'
}
)
# List networks
networks = client.networks()
# Connect container to network
client.connect_container_to_network(
container='container_name',
net_id=network['Id']
)
# Remove network
client.remove_network(network['Id'])# Volume operations
hook = DockerHook()
client = hook.api_client
# Create volume
volume = client.create_volume(
name='data_volume',
driver='local',
driver_opts={
'type': 'nfs',
'o': 'addr=192.168.1.100,rw',
'device': ':/path/to/dir'
}
)
# List volumes
volumes = client.volumes()
# Remove volume
client.remove_volume('data_volume')# Authenticate with container registry
hook = DockerHook(docker_conn_id='registry_connection')
client = hook.api_client
# Login handled automatically via connection
# Push image to registry
push_logs = client.push(
repository='registry.example.com/myapp',
tag='v1.0.0',
stream=True,
decode=True
)
for log in push_logs:
print(log.get('status', ''))# Swarm service operations
hook = DockerHook()
client = hook.api_client
# List services (Swarm mode)
services = client.services()
# Get service details
service_info = client.inspect_service('service_id')
# Update service
client.update_service(
service='service_id',
version=service_info['Version']['Index'],
image='myapp:v2.0.0'
)Configure Docker connections in Airflow UI:
Local Docker daemon:
Connection Type: Docker
Host: unix:///var/run/docker.sockRemote Docker daemon (non-TLS):
Connection Type: Docker
Host: tcp://docker.example.com:2375Remote Docker daemon (TLS):
Connection Type: Docker
Host: tcp://docker.example.com:2376
Extra: {"tls": true, "ca_cert": "/path/to/ca.pem"}Container registry authentication:
Connection Type: Docker
Host: tcp://localhost:2376
Login: username
Password: password_or_tokenThe hook handles various Docker API errors:
Common exceptions:
docker.errors.APIError: Docker API errorsdocker.errors.DockerException: General Docker client errorsairflow.exceptions.AirflowException: Airflow-specific errorsInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-docker