Apache Airflow provider package for HashiCorp Vault integration, enabling secret management and authentication within Airflow workflows.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Direct interaction with HashiCorp Vault's Key-Value secret engine for comprehensive secret management operations. The VaultHook provides methods for reading, writing, and managing secrets with full support for both KV version 1 and 2 engines.
Main hook class for HashiCorp Vault operations with configurable authentication and engine version support.
class VaultHook(BaseHook):
def __init__(
self,
vault_conn_id: str = "vault_default",
auth_type: str | None = None,
auth_mount_point: str | None = None,
kv_engine_version: int | None = None,
role_id: str | None = None,
region: str | None = None,
kubernetes_role: str | None = None,
kubernetes_jwt_path: str | None = None,
token_path: str | None = None,
gcp_key_path: str | None = None,
gcp_scopes: str | None = None,
azure_tenant_id: str | None = None,
azure_resource: str | None = None,
radius_host: str | None = None,
radius_port: int | None = None,
**kwargs
):
"""
Initialize VaultHook with connection and authentication parameters.
Parameters:
- vault_conn_id: Connection ID for Vault configuration
- auth_type: Authentication method ('token', 'approle', 'kubernetes', etc.)
- auth_mount_point: Mount point for authentication method
- kv_engine_version: KV engine version (1 or 2, default: 2)
- role_id: Role ID for AppRole or AWS IAM authentication
- region: AWS region for STS API calls
- kubernetes_role: Role for Kubernetes authentication
- kubernetes_jwt_path: Path to Kubernetes JWT token file
- token_path: Path to authentication token file
- gcp_key_path: Path to GCP service account key file
- gcp_scopes: OAuth2 scopes for GCP authentication
- azure_tenant_id: Azure AD tenant ID
- azure_resource: Azure application URL
- radius_host: RADIUS server host
- radius_secret: RADIUS shared secret
- radius_port: RADIUS server port
"""Establish and retrieve connections to HashiCorp Vault with automatic authentication handling.
def get_conn(self) -> hvac.Client:
"""
Retrieve authenticated connection to Vault.
Returns:
hvac.Client: Authenticated Vault client instance
"""Comprehensive secret management operations supporting both read and write operations across KV engine versions.
def get_secret(self, secret_path: str, secret_version: int | None = None) -> dict | None:
"""
Get secret value from the Vault KV engine.
Parameters:
- secret_path: Path to the secret in Vault
- secret_version: Specific version to retrieve (KV v2 only)
Returns:
dict | None: Secret data as dictionary or None if not found
"""
def create_or_update_secret(
self,
secret_path: str,
secret: dict,
method: str | None = None,
cas: int | None = None
) -> Response:
"""
Create or update a secret in Vault.
Parameters:
- secret_path: Path where the secret will be stored
- secret: Secret data as dictionary
- method: HTTP method ('POST' for create, 'PUT' for update, KV v1 only)
- cas: Check-And-Set value for version control (KV v2 only)
Returns:
Response: HTTP response object from the operation
"""Access secret metadata and version information for KV version 2 engines.
def get_secret_metadata(self, secret_path: str) -> dict | None:
"""
Read secret metadata including all versions (KV v2 only).
Parameters:
- secret_path: Path to the secret
Returns:
dict | None: Metadata including version information or None if not found
"""
def get_secret_including_metadata(
self,
secret_path: str,
secret_version: int | None = None
) -> dict | None:
"""
Read secret with metadata in a single response (KV v2 only).
Parameters:
- secret_path: Path to the secret
- secret_version: Specific version to retrieve
Returns:
dict | None: Dictionary with 'data' and 'metadata' keys or None if not found
"""Methods for integrating with Airflow's web UI connection management.
@classmethod
def get_connection_form_widgets(cls) -> dict[str, Any]:
"""
Return connection form widgets for Airflow UI.
Returns:
dict[str, Any]: Dictionary of form widget configurations
"""
@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
"""
Return UI field behavior configuration.
Returns:
dict[str, Any]: UI field behavior settings
"""
def test_connection(self) -> tuple[bool, str]:
"""
Test the Vault connection.
Returns:
tuple[bool, str]: Success status and descriptive message
"""# External types used in API signatures
import hvac
from requests import Response
from airflow.providers.hashicorp.version_compat import BaseHook
from typing import Anyfrom airflow.providers.hashicorp.hooks.vault import VaultHook
# Initialize hook with default connection
hook = VaultHook(vault_conn_id='vault_default')
# Get a secret
secret = hook.get_secret('secret/myapp/database')
if secret:
db_password = secret.get('password')
db_user = secret.get('username')# Get specific version of a secret
secret_v2 = hook.get_secret('secret/myapp/api-key', secret_version=2)
# Get secret with metadata
full_data = hook.get_secret_including_metadata('secret/myapp/api-key')
secret_data = full_data['data']
secret_metadata = full_data['metadata']
version = secret_metadata['version']
# Get just metadata
metadata = hook.get_secret_metadata('secret/myapp/api-key')
all_versions = metadata['versions']# Create a new secret
new_secret = {
'username': 'admin',
'password': 'secure123',
'api_key': 'abc-123-def'
}
response = hook.create_or_update_secret('secret/myapp/credentials', new_secret)
# Update with check-and-set (KV v2)
updated_secret = {'password': 'newsecure456'}
response = hook.create_or_update_secret(
'secret/myapp/credentials',
updated_secret,
cas=3 # Only update if current version is 3
)# Using AppRole authentication
hook = VaultHook(
vault_conn_id='vault_approle',
auth_type='approle',
role_id='my-role-id'
)
# Using Kubernetes authentication
hook = VaultHook(
vault_conn_id='vault_k8s',
auth_type='kubernetes',
kubernetes_role='airflow-role'
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-hashicorp