Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Connect to Kubernetes clusters with comprehensive API access supporting both synchronous and asynchronous operations. The hooks provide low-level access to Kubernetes resources and enable custom integrations.
Synchronous hook for interacting with Kubernetes API, supporting cluster connections, resource management, and pod operations.
class KubernetesHook(BaseHook):
"""
Creates Kubernetes API connection with multiple configuration options.
Args:
conn_id (str): Airflow connection ID. Default: 'kubernetes_default'
client_configuration (Configuration, optional): Kubernetes client configuration
cluster_context (str, optional): Kubernetes cluster context
config_file (str, optional): Path to kubeconfig file
in_cluster (bool, optional): Use in-cluster configuration
disable_verify_ssl (bool): Disable SSL verification. Default: False
disable_tcp_keepalive (bool): Disable TCP keepalive. Default: False
"""
def __init__(
self,
conn_id: str = "kubernetes_default",
client_configuration: Configuration | None = None,
cluster_context: str | None = None,
config_file: str | None = None,
in_cluster: bool | None = None,
disable_verify_ssl: bool = False,
disable_tcp_keepalive: bool = False,
**kwargs
): ...
def get_conn(self) -> ApiClient:
"""Returns kubernetes api session."""
...
def create_custom_object(
self,
group: str,
version: str,
plural: str,
body: dict,
namespace: str | None = None
) -> dict:
"""Create custom resource definition object."""
...
def get_pod(self, name: str, namespace: str) -> V1Pod:
"""Read pod object from kubernetes API."""
...
def create_job(self, job: V1Job, **kwargs) -> V1Job:
"""Run Job."""
...
def get_pod_logs(
self,
pod_name: str,
container: str | None = "",
namespace: str | None = None,
) -> str:
"""Retrieve a container's log from the specified pod."""
...
def get_pod_log_stream(
self,
pod_name: str,
container: str | None = "",
namespace: str | None = None,
) -> tuple[watch.Watch, Generator[str, None, None]]:
"""Retrieve a log stream for a container in a kubernetes pod."""
...
def get_namespaced_pod_list(
self,
label_selector: str | None = "",
namespace: str | None = None,
**kwargs
) -> list[V1Pod]:
"""Get list of pods in a namespace."""
...Asynchronous hook for non-blocking Kubernetes API operations, enabling efficient concurrent processing and improved performance.
class AsyncKubernetesHook(KubernetesHook):
"""
Asynchronous Kubernetes API hook for non-blocking operations.
Args:
config_dict (dict, optional): Configuration dictionary
All other parameters inherited from KubernetesHook
"""
def __init__(
self,
config_dict: dict | None = None,
*args,
**kwargs
): ...
async def get_conn(self) -> AsyncApiClient:
"""Returns asynchronous kubernetes api session."""
...
async def get_pod(self, name: str, namespace: str) -> V1Pod:
"""Get pod's object asynchronously."""
...
def get_custom_object(
self,
group: str,
version: str,
plural: str,
name: str,
namespace: str | None = None
) -> dict:
"""Get custom resource definition object."""
...
def delete_custom_object(
self,
group: str,
version: str,
plural: str,
name: str,
namespace: str | None = None,
body: V1DeleteOptions | None = None
) -> dict:
"""Delete custom resource definition object."""
...
def get_namespace(self) -> str:
"""Return the namespace defined in connection."""
...
def get_pod_log_stream(
self,
pod_name: str,
container: str | None = None,
namespace: str | None = None,
**kwargs
):
"""Retrieve log stream for container in pod."""
...
def get_pod_logs(
self,
pod_name: str,
container: str | None = None,
namespace: str | None = None,
**kwargs
) -> str:
"""Retrieve container's log from pod."""
...
def get_pod(self, name: str, namespace: str) -> V1Pod:
"""Read pod object from kubernetes API."""
...
def create_job(self, job: V1Job) -> V1Job:
"""Run Kubernetes Job."""
...
def get_job(self, job_name: str, namespace: str) -> V1Job:
"""Get Job of specified name and namespace."""
...
def get_job_status(self, job_name: str, namespace: str) -> V1JobStatus:
"""Get job status."""
...
def wait_until_job_complete(
self,
job_name: str,
namespace: str,
timeout: int = 3600
) -> V1Job:
"""Block until job is complete or failed."""
...
def is_job_complete(self, job: V1Job) -> bool:
"""Check if job is complete."""
...
def is_job_failed(self, job: V1Job) -> bool:
"""Check if job failed."""
...
def is_job_successful(self, job: V1Job) -> bool:
"""Check if job completed successfully."""
...
def apply_from_yaml_file(
self,
yaml_file: str | None = None,
yaml_objects: list[dict] | None = None,
verbose: bool = False,
namespace: str = "default"
) -> list[dict]:
"""Perform action from yaml file."""
...
def test_connection(self) -> tuple[bool, str]:
"""Test the connection."""
...Asynchronous hook for non-blocking Kubernetes operations, supporting concurrent resource management and monitoring.
class AsyncKubernetesHook(KubernetesHook):
"""
Hook to use Kubernetes SDK asynchronously.
Inherits all configuration from KubernetesHook with async operation support.
"""
def __init__(self, **kwargs): ...
@asynccontextmanager
async def get_conn(self) -> AsyncGenerator[ApiClient, None]:
"""Async context manager for API client."""
...
async def get_pod(self, name: str, namespace: str) -> V1Pod:
"""Get pod's object asynchronously."""
...
async def delete_pod(self, name: str, namespace: str) -> V1Status:
"""Delete pod's object asynchronously."""
...
async def read_logs(
self,
name: str,
namespace: str,
container: str | None = None,
follow: bool = False,
**kwargs
) -> str:
"""Read logs inside the pod."""
...
async def get_job_status(self, job_name: str, namespace: str) -> V1JobStatus:
"""Get job's status object asynchronously."""
...
async def wait_until_job_complete(
self,
job_name: str,
namespace: str,
timeout: int = 3600,
check_interval: int = 10
) -> V1Job:
"""Wait until job complete asynchronously."""
...
async def wait_until_container_complete(
self,
pod_name: str,
namespace: str,
container_name: str | None = None,
timeout: int = 3600
) -> None:
"""Wait for container completion."""
...
async def wait_until_container_started(
self,
pod_name: str,
namespace: str,
container_name: str | None = None,
timeout: int = 120
) -> None:
"""Wait for container to start."""
...Constants used by the Kubernetes hooks for configuration and status checking.
# Resource loading constant
LOADING_KUBE_CONFIG_FILE_RESOURCE: str
# Job status condition types
JOB_FINAL_STATUS_CONDITION_TYPES: list[str]
JOB_STATUS_CONDITION_TYPES: list[str]from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
# Connect to Kubernetes cluster
hook = KubernetesHook(
conn_id='kubernetes_default',
in_cluster=False,
config_file='/path/to/kubeconfig'
)
# Get API client
client = hook.get_conn()
# Test connection
is_connected, message = hook.test_connection()
print(f"Connection status: {is_connected}, Message: {message}")# Get pod information
pod = hook.get_pod(name='my-pod', namespace='default')
print(f"Pod status: {pod.status.phase}")
# Get pod logs
logs = hook.get_pod_logs(
pod_name='my-pod',
namespace='default',
container='main-container'
)
print(logs)
# Stream pod logs
for log_line in hook.get_pod_log_stream(
pod_name='my-pod',
namespace='default',
follow=True
):
print(log_line)from kubernetes.client import V1Job, V1JobSpec, V1PodTemplateSpec, V1PodSpec, V1Container
# Create job specification
job_spec = V1Job(
api_version='batch/v1',
kind='Job',
metadata={'name': 'my-job', 'namespace': 'default'},
spec=V1JobSpec(
template=V1PodTemplateSpec(
spec=V1PodSpec(
containers=[
V1Container(
name='worker',
image='busybox:latest',
command=['sh', '-c', 'echo "Job completed"']
)
],
restart_policy='Never'
)
)
)
)
# Create and monitor job
created_job = hook.create_job(job_spec)
print(f"Job created: {created_job.metadata.name}")
# Wait for completion
completed_job = hook.wait_until_job_complete(
job_name='my-job',
namespace='default',
timeout=600
)
# Check job status
if hook.is_job_successful(completed_job):
print("Job completed successfully")
elif hook.is_job_failed(completed_job):
print("Job failed")# Create custom resource
custom_resource = {
'apiVersion': 'apps.example.com/v1',
'kind': 'MyApp',
'metadata': {
'name': 'my-app-instance',
'namespace': 'default'
},
'spec': {
'replicas': 3,
'image': 'my-app:latest'
}
}
# Create the custom resource
created_cr = hook.create_custom_object(
group='apps.example.com',
version='v1',
plural='myapps',
body=custom_resource,
namespace='default'
)
# Get custom resource
cr = hook.get_custom_object(
group='apps.example.com',
version='v1',
plural='myapps',
name='my-app-instance',
namespace='default'
)
# Delete custom resource
hook.delete_custom_object(
group='apps.example.com',
version='v1',
plural='myapps',
name='my-app-instance',
namespace='default'
)# Apply resources from YAML file
applied_resources = hook.apply_from_yaml_file(
yaml_file='/path/to/resources.yaml',
namespace='default',
verbose=True
)
# Apply resources from YAML objects
yaml_objects = [
{
'apiVersion': 'v1',
'kind': 'ConfigMap',
'metadata': {'name': 'my-config', 'namespace': 'default'},
'data': {'key': 'value'}
}
]
hook.apply_from_yaml_file(
yaml_objects=yaml_objects,
namespace='default'
)import asyncio
from airflow.providers.cncf.kubernetes.hooks.kubernetes import AsyncKubernetesHook
async def async_operations():
# Create async hook
async_hook = AsyncKubernetesHook(
conn_id='kubernetes_default',
in_cluster=True
)
# Use async context manager for connection
async with async_hook.get_conn() as client:
# Get pod asynchronously
pod = await async_hook.get_pod(
name='my-pod',
namespace='default'
)
# Read logs asynchronously
logs = await async_hook.read_logs(
name='my-pod',
namespace='default',
container='main',
follow=False
)
# Monitor job completion
completed_job = await async_hook.wait_until_job_complete(
job_name='my-job',
namespace='default',
timeout=600,
check_interval=5
)
# Wait for container to start
await async_hook.wait_until_container_started(
pod_name='my-pod',
namespace='default',
container_name='main',
timeout=120
)
# Run async operations
asyncio.run(async_operations())# Connect to different clusters
prod_hook = KubernetesHook(
conn_id='k8s_prod',
cluster_context='production-cluster',
config_file='/home/user/.kube/config'
)
staging_hook = KubernetesHook(
conn_id='k8s_staging',
cluster_context='staging-cluster',
config_file='/home/user/.kube/config'
)
# In-cluster configuration
in_cluster_hook = KubernetesHook(
conn_id='k8s_in_cluster',
in_cluster=True
)# Test connection with detailed output
is_connected, message = hook.test_connection()
if not is_connected:
print(f"Connection failed: {message}")
else:
print("Successfully connected to Kubernetes cluster")
# Get namespace from connection
namespace = hook.get_namespace()
print(f"Default namespace: {namespace}")
# Disable SSL verification for testing
test_hook = KubernetesHook(
conn_id='k8s_test',
disable_verify_ssl=True,
disable_tcp_keepalive=True
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-cncf-kubernetes