CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-docker

Docker integration provider for Apache Airflow workflows, enabling containerized task execution and Docker Swarm orchestration.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

docker-swarm.mddocs/

Docker Swarm Orchestration

Deploy and manage Docker Swarm services for distributed containerized workloads. The DockerSwarmOperator extends DockerOperator functionality to provide orchestration capabilities for multi-container applications with service discovery, load balancing, and scaling features.

Capabilities

DockerSwarmOperator

Execute commands as Docker Swarm services with distributed orchestration capabilities.

class DockerSwarmOperator(DockerOperator):
    def __init__(
        self,
        *,
        image: str,
        enable_logging: bool = True,
        configs: list | None = None,
        secrets: list | None = None,
        mode: dict | None = None,
        networks: list | None = None,
        endpoint_spec: dict | None = None,
        **kwargs
    ) -> None

Additional Parameters (beyond DockerOperator):

  • image: Docker image for the Swarm service
  • enable_logging: Enable service logging and log streaming
  • configs: List of Docker configs to attach to the service
  • secrets: List of Docker secrets to attach to the service
  • mode: Service mode configuration (replicated, global, etc.)
  • networks: List of networks to attach the service to
  • endpoint_spec: Service endpoint specification for port publishing

Execution Methods

def execute(self, context: Context) -> None:
    """Execute the Docker Swarm service."""

def on_kill(self) -> None:
    """Handle task cancellation by removing Swarm service."""

Utility Methods

@staticmethod
def format_args(args: list[str] | str | None) -> list[str] | None:
    """Format service arguments for Swarm deployment."""

Usage Examples

Basic Swarm Service

from airflow.providers.docker.operators.docker_swarm import DockerSwarmOperator

# Simple Swarm service
basic_service = DockerSwarmOperator(
    task_id='swarm_hello',
    image='alpine:latest',
    command=['echo', 'Hello from Docker Swarm!']
)

Replicated Service with Multiple Instances

# Multi-replica service
replicated_service = DockerSwarmOperator(
    task_id='data_processing_service',
    image='myapp:latest',
    command=['python', '/app/worker.py'],
    mode={
        'Replicated': {
            'Replicas': 3
        }
    },
    environment={
        'WORKER_TYPE': 'processor',
        'CONCURRENCY': '4'
    }
)

Service with Secrets and Configs

# Service using Docker secrets and configs
secure_service = DockerSwarmOperator(
    task_id='secure_web_service',
    image='nginx:alpine',
    configs=[
        {
            'ConfigID': 'nginx_config',
            'ConfigName': 'nginx.conf',
            'File': {
                'Name': '/etc/nginx/nginx.conf',
                'UID': '0',
                'GID': '0',
                'Mode': 0o644
            }
        }
    ],
    secrets=[
        {
            'SecretID': 'ssl_cert',
            'SecretName': 'server.crt',
            'File': {
                'Name': '/etc/ssl/certs/server.crt',
                'UID': '0', 
                'GID': '0',
                'Mode': 0o600
            }
        },
        {
            'SecretID': 'ssl_key',
            'SecretName': 'server.key',
            'File': {
                'Name': '/etc/ssl/private/server.key',
                'UID': '0',
                'GID': '0', 
                'Mode': 0o600
            }
        }
    ]
)

Service with Custom Networks

# Service with overlay network configuration
networked_service = DockerSwarmOperator(
    task_id='microservice',
    image='myapp:v1.2.0',
    command=['./start-server.sh'],
    networks=[
        {
            'Target': 'backend_network',
            'Aliases': ['api-service']
        },
        {
            'Target': 'monitoring_network',
            'Aliases': ['app-metrics']
        }
    ],
    endpoint_spec={
        'Ports': [
            {
                'Protocol': 'tcp',
                'TargetPort': 8080,
                'PublishedPort': 80,
                'PublishMode': 'ingress'
            }
        ]
    }
)

Global Service Mode

# Global service (one task per node)
monitoring_agent = DockerSwarmOperator(
    task_id='node_monitoring',
    image='monitoring/agent:latest',
    command=['./monitor.sh'],
    mode={
        'Global': {}
    },
    mounts=[
        {
            'Type': 'bind',
            'Source': '/var/run/docker.sock',
            'Target': '/var/run/docker.sock',
            'ReadOnly': True
        },
        {
            'Type': 'bind',
            'Source': '/proc',
            'Target': '/host/proc',
            'ReadOnly': True
        }
    ],
    privileged=True
)

Service with Resource Constraints

# Service with CPU and memory limits
constrained_service = DockerSwarmOperator(
    task_id='batch_processor',
    image='processor:latest',
    command=['python', '/app/batch_process.py'],
    mode={
        'Replicated': {
            'Replicas': 2
        }
    },
    mem_limit='1g',
    cpus=1.5,
    environment={
        'MAX_WORKERS': '8',
        'BATCH_SIZE': '1000'
    }
)

Service with Health Checks

# Service with custom health check
web_service = DockerSwarmOperator(
    task_id='web_application',
    image='webapp:latest',
    endpoint_spec={
        'Ports': [
            {
                'Protocol': 'tcp',
                'TargetPort': 3000,
                'PublishedPort': 3000
            }
        ]
    },
    # Health check configured via Docker image or service update
    mode={
        'Replicated': {
            'Replicas': 2
        }
    },
    labels={
        'service.type': 'web',
        'monitoring.enabled': 'true'
    }
)

Service Update Strategy

# Service with rolling update configuration
updating_service = DockerSwarmOperator(
    task_id='rolling_update_service',
    image='myapp:v2.0.0',
    command=['./start.sh'],
    mode={
        'Replicated': {
            'Replicas': 4
        }
    },
    # Update configuration handled by Swarm
    labels={
        'update.strategy': 'rolling',
        'update.parallelism': '2'
    }
)

Service Management

Service Lifecycle

The DockerSwarmOperator handles the complete service lifecycle:

  1. Service Creation: Creates a new Swarm service with specified configuration
  2. Task Monitoring: Monitors service tasks for completion or failure
  3. Log Streaming: Streams service logs when enable_logging=True
  4. Service Cleanup: Removes service when task completes or is cancelled

Service States

Services progress through these states:

  • NEW: Service created but not yet running
  • RUNNING: Service tasks are executing
  • COMPLETE: All service tasks completed successfully
  • FAILED: Service tasks failed or errored

Error Handling

The operator handles various failure scenarios:

  • Service creation failures
  • Task execution failures
  • Network connectivity issues
  • Resource constraint violations

Failed services are automatically cleaned up, and detailed error information is provided in task logs.

Docker Swarm Prerequisites

To use DockerSwarmOperator, you need:

  1. Docker Swarm Mode: Docker daemon must be running in Swarm mode
  2. Swarm Manager Access: Airflow worker must connect to a Swarm manager node
  3. Network Connectivity: Proper network configuration for service communication
  4. Resource Availability: Sufficient cluster resources for service requirements

Initialize Docker Swarm:

docker swarm init

Join additional nodes:

docker swarm join --token <token> <manager-ip>:2377

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-docker

docs

docker-api.md

docker-decorators.md

docker-operations.md

docker-swarm.md

error-handling.md

index.md

tile.json