CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-docker

Docker integration provider for Apache Airflow workflows, enabling containerized task execution and Docker Swarm orchestration.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

docker-api.mddocs/

Docker API Client

Low-level Docker API client for direct Docker daemon interactions, connection management, and custom Docker operations. The DockerHook provides a thin wrapper around the docker.APIClient for advanced Docker daemon operations not covered by the operators.

Capabilities

DockerHook

Interact with Docker Daemon and Container Registry through the Docker API.

class DockerHook(BaseHook):
    conn_name_attr = "docker_conn_id"
    default_conn_name = "docker_default"
    conn_type = "docker"
    hook_name = "Docker"
    
    def __init__(
        self,
        docker_conn_id: str | None = "docker_default",
        base_url: str | list[str] | None = None,
        version: str | None = None,
        tls: TLSConfig | bool | None = None,
        timeout: int = 60
    ) -> None

Parameters:

  • docker_conn_id: Docker connection ID for authentication (default: "docker_default")
  • base_url: URL or list of URLs to Docker server (required)
  • version: API version to use ("auto" for auto-detection, None for latest)
  • tls: TLS configuration (TLSConfig object, True for default TLS, False for no TLS)
  • timeout: Default timeout for API calls in seconds

Connection Methods

def get_conn(self) -> APIClient:
    """Get connection to Docker API."""

@property
def api_client(self) -> APIClient:
    """Get Docker API client instance."""

@property  
def client_created(self) -> bool:
    """Check if client was successfully created."""

TLS Configuration

@staticmethod
def construct_tls_config(
    ca_cert: str | None = None,
    client_cert: str | None = None, 
    client_key: str | None = None,
    verify: bool = True,
    assert_hostname: str | bool | None = None,
    ssl_version: str | None = None
) -> TLSConfig | bool:
    """
    Construct TLS configuration for secure Docker connections.
    
    Args:
        ca_cert: Path to PEM-encoded CA certificate file
        client_cert: Path to PEM-encoded certificate file
        client_key: Path to PEM-encoded key file
        verify: Verify certificate validity
        assert_hostname: Hostname to match against server certificate
        ssl_version: SSL version to use
        
    Returns:
        TLSConfig object or boolean for TLS enablement
    """

Connection Form Configuration

@classmethod
def get_connection_form_widgets(cls) -> dict[str, Any]:
    """Get connection form widgets for Airflow UI."""

@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
    """Get UI field behavior configuration."""

Usage Examples

Basic Hook Usage

from airflow.providers.docker.hooks.docker import DockerHook

# Initialize hook with default connection
hook = DockerHook(docker_conn_id='docker_default')

# Get API client
client = hook.get_conn()

# Use client for Docker operations
containers = client.containers()
images = client.images()

Custom Docker Daemon Connection

# Connect to custom Docker daemon
custom_hook = DockerHook(
    base_url=['tcp://docker.example.com:2376'],
    version='1.41',
    timeout=120
)

client = custom_hook.api_client

# Check daemon info
info = client.info()
version = client.version()

TLS Secure Connection

from docker import TLSConfig

# Configure TLS for secure connection
tls_config = DockerHook.construct_tls_config(
    ca_cert='/path/to/ca.pem',
    client_cert='/path/to/cert.pem', 
    client_key='/path/to/key.pem',
    verify=True,
    assert_hostname='docker.example.com'
)

secure_hook = DockerHook(
    base_url=['https://docker.example.com:2376'],
    tls=tls_config,
    docker_conn_id='secure_docker'
)

client = secure_hook.get_conn()

Container Management

# Use hook for direct container operations
hook = DockerHook()
client = hook.api_client

# Create container
container = client.create_container(
    image='alpine:latest',
    command=['echo', 'Hello World'],
    name='test_container'
)

# Start container
client.start(container['Id'])

# Get logs
logs = client.logs(container['Id'])

# Stop and remove container
client.stop(container['Id'])
client.remove_container(container['Id'])

Image Management

# Image operations using hook
hook = DockerHook()
client = hook.api_client

# Pull image
for line in client.pull('python:3.9', stream=True, decode=True):
    print(line)

# List images
images = client.images()

# Build image from Dockerfile
buildlogs = client.build(
    path='/path/to/dockerfile/dir',
    tag='myapp:latest',
    rm=True
)

for log in buildlogs:
    print(log)

Network Operations

# Network management
hook = DockerHook()
client = hook.api_client

# Create network
network = client.create_network(
    name='custom_network',
    driver='bridge',
    options={
        'com.docker.network.bridge.name': 'custom0'
    }
)

# List networks
networks = client.networks()

# Connect container to network
client.connect_container_to_network(
    container='container_name',
    net_id=network['Id']
)

# Remove network
client.remove_network(network['Id'])

Volume Management

# Volume operations
hook = DockerHook()
client = hook.api_client

# Create volume
volume = client.create_volume(
    name='data_volume',
    driver='local',
    driver_opts={
        'type': 'nfs',
        'o': 'addr=192.168.1.100,rw',
        'device': ':/path/to/dir'
    }
)

# List volumes
volumes = client.volumes()

# Remove volume
client.remove_volume('data_volume')

Registry Authentication

# Authenticate with container registry
hook = DockerHook(docker_conn_id='registry_connection')
client = hook.api_client

# Login handled automatically via connection
# Push image to registry
push_logs = client.push(
    repository='registry.example.com/myapp',
    tag='v1.0.0',
    stream=True,
    decode=True
)

for log in push_logs:
    print(log.get('status', ''))

Service Discovery

# Swarm service operations
hook = DockerHook()
client = hook.api_client

# List services (Swarm mode)
services = client.services()

# Get service details
service_info = client.inspect_service('service_id')

# Update service
client.update_service(
    service='service_id',
    version=service_info['Version']['Index'],
    image='myapp:v2.0.0'
)

Connection Configuration

Airflow Connection Setup

Configure Docker connections in Airflow UI:

  • Connection Type: Docker
  • Host: Docker daemon hostname/IP
  • Port: Docker daemon port (usually 2376 for TLS, 2375 for non-TLS)
  • Schema: Protocol (tcp, unix, ssh)
  • Login: Username for registry authentication
  • Password: Password or token for registry authentication

Connection Examples

Local Docker daemon:

Connection Type: Docker
Host: unix:///var/run/docker.sock

Remote Docker daemon (non-TLS):

Connection Type: Docker  
Host: tcp://docker.example.com:2375

Remote Docker daemon (TLS):

Connection Type: Docker
Host: tcp://docker.example.com:2376
Extra: {"tls": true, "ca_cert": "/path/to/ca.pem"}

Container registry authentication:

Connection Type: Docker
Host: tcp://localhost:2376
Login: username
Password: password_or_token

Error Handling

The hook handles various Docker API errors:

  • Connection failures: Network connectivity issues
  • Authentication errors: Invalid credentials or certificates
  • API errors: Docker daemon errors and responses
  • Timeout errors: API call timeouts

Common exceptions:

  • docker.errors.APIError: Docker API errors
  • docker.errors.DockerException: General Docker client errors
  • airflow.exceptions.AirflowException: Airflow-specific errors

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-docker

docs

docker-api.md

docker-decorators.md

docker-operations.md

docker-swarm.md

error-handling.md

index.md

tile.json