Comprehensive Databricks integration for Apache Airflow with operators, hooks, sensors, and triggers for orchestrating data workflows
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The Databricks provider offers comprehensive Git repository management capabilities through Databricks Repos, enabling version-controlled deployment of notebooks, libraries, and code across different environments. This includes creating, updating, and deleting repositories with full Git integration.
Create new Git repositories in Databricks Repos for version-controlled code deployment.
from airflow.providers.databricks.operators.databricks_repos import DatabricksReposCreateOperator
class DatabricksReposCreateOperator(BaseOperator):
def __init__(
self,
*,
git_url: str,
git_provider: str | None = None,
repo_path: str | None = None,
databricks_conn_id: str = "databricks_default",
databricks_retry_limit: int = 3,
databricks_retry_delay: int = 1,
**kwargs
) -> None:
"""
Create a new Git repository in Databricks Repos.
Args:
git_url: URL of the Git repository to clone
git_provider: Git provider - "gitHub", "bitbucketCloud", "gitLab",
"azureDevOpsServices", "gitHubEnterprise", "bitbucketServer",
"gitLabEnterpriseEdition". If None, provider is auto-detected
repo_path: Path where repository will be created in Databricks workspace
If None, uses /Repos/{username}/{repo_name}
databricks_conn_id: Airflow connection ID for Databricks
databricks_retry_limit: Number of retries for API calls
databricks_retry_delay: Seconds between retries
"""Update existing repositories to different branches, tags, or commits.
from airflow.providers.databricks.operators.databricks_repos import DatabricksReposUpdateOperator
class DatabricksReposUpdateOperator(BaseOperator):
def __init__(
self,
*,
repo_id: int | None = None,
repo_path: str | None = None,
branch: str | None = None,
tag: str | None = None,
databricks_conn_id: str = "databricks_default",
databricks_retry_limit: int = 3,
databricks_retry_delay: int = 1,
**kwargs
) -> None:
"""
Update an existing Git repository in Databricks Repos.
Args:
repo_id: Repository ID in Databricks (alternative to repo_path)
repo_path: Path to repository in Databricks workspace
branch: Git branch to checkout (mutually exclusive with tag)
tag: Git tag to checkout (mutually exclusive with branch)
databricks_conn_id: Airflow connection ID for Databricks
databricks_retry_limit: Number of retries for API calls
databricks_retry_delay: Seconds between retries
"""Delete repositories from Databricks Repos when they're no longer needed.
from airflow.providers.databricks.operators.databricks_repos import DatabricksReposDeleteOperator
class DatabricksReposDeleteOperator(BaseOperator):
def __init__(
self,
*,
repo_id: int | None = None,
repo_path: str | None = None,
databricks_conn_id: str = "databricks_default",
databricks_retry_limit: int = 3,
databricks_retry_delay: int = 1,
**kwargs
) -> None:
"""
Delete a Git repository from Databricks Repos.
Args:
repo_id: Repository ID in Databricks (alternative to repo_path)
repo_path: Path to repository in Databricks workspace
databricks_conn_id: Airflow connection ID for Databricks
databricks_retry_limit: Number of retries for API calls
databricks_retry_delay: Seconds between retries
"""Create a repository from a public GitHub repository:
from airflow.providers.databricks.operators.databricks_repos import DatabricksReposCreateOperator
create_analytics_repo = DatabricksReposCreateOperator(
task_id='create_analytics_repo',
git_url='https://github.com/company/analytics-notebooks.git',
git_provider='gitHub',
repo_path='/Repos/production/analytics-notebooks',
databricks_conn_id='databricks_production'
)Create a repository from a private Git provider:
# For private repos, authentication is handled through the Databricks connection
# Configure personal access token or SSH key in the Databricks workspace settings
create_private_repo = DatabricksReposCreateOperator(
task_id='create_private_ml_repo',
git_url='https://github.com/company/ml-models-private.git',
git_provider='gitHub',
repo_path='/Repos/{{ params.environment }}/ml-models',
databricks_conn_id='databricks_{{ params.environment }}'
)Create repositories across different environments:
from airflow.utils.task_group import TaskGroup
def create_repo_for_environment(env_name: str):
return DatabricksReposCreateOperator(
task_id=f'create_repo_{env_name}',
git_url='https://github.com/company/data-pipelines.git',
git_provider='gitHub',
repo_path=f'/Repos/{env_name}/data-pipelines',
databricks_conn_id=f'databricks_{env_name}'
)
with TaskGroup(group_id='setup_repositories') as repo_setup:
dev_repo = create_repo_for_environment('development')
staging_repo = create_repo_for_environment('staging')
prod_repo = create_repo_for_environment('production')
dev_repo >> staging_repo >> prod_repoUpdate repositories to specific branches or tags for deployments:
from airflow.providers.databricks.operators.databricks_repos import DatabricksReposUpdateOperator
# Update to development branch
update_to_dev = DatabricksReposUpdateOperator(
task_id='update_to_dev_branch',
repo_path='/Repos/development/analytics',
branch='develop',
databricks_conn_id='databricks_dev'
)
# Update to release tag for production
update_to_release = DatabricksReposUpdateOperator(
task_id='update_to_release_tag',
repo_path='/Repos/production/analytics',
tag='v{{ params.release_version }}',
databricks_conn_id='databricks_production'
)
# Update to specific commit for hotfix
update_to_commit = DatabricksReposUpdateOperator(
task_id='hotfix_deployment',
repo_path='/Repos/production/analytics',
branch='hotfix-{{ params.ticket_number }}',
databricks_conn_id='databricks_production'
)Integrate repository operations with CI/CD workflows:
from airflow.providers.databricks.operators.databricks_repos import (
DatabricksReposCreateOperator,
DatabricksReposUpdateOperator,
DatabricksReposDeleteOperator
)
from airflow.providers.databricks.operators.databricks import DatabricksNotebookOperator
from airflow.operators.python import PythonOperator
def validate_deployment(**context):
"""Validate that deployment was successful."""
# Custom validation logic
repo_path = context['params']['repo_path']
print(f"Validating deployment at {repo_path}")
return True
def cleanup_old_deployments(**context):
"""Clean up old deployment artifacts."""
# Cleanup logic for old branches or temporary repositories
print("Cleaning up old deployment artifacts")
# CI/CD Pipeline DAG
with DAG('cicd_pipeline', schedule_interval=None, catchup=False) as dag:
# Create temporary deployment repository
create_temp_repo = DatabricksReposCreateOperator(
task_id='create_temp_deployment_repo',
git_url='{{ params.git_url }}',
git_provider='gitHub',
repo_path='/Repos/temp/deployment-{{ run_id }}',
databricks_conn_id='databricks_staging'
)
# Update to specific commit for testing
checkout_commit = DatabricksReposUpdateOperator(
task_id='checkout_test_commit',
repo_path='/Repos/temp/deployment-{{ run_id }}',
branch='{{ params.test_branch }}',
databricks_conn_id='databricks_staging'
)
# Run tests on the checked out code
run_tests = DatabricksNotebookOperator(
task_id='run_integration_tests',
notebook_path='/Repos/temp/deployment-{{ run_id }}/tests/integration_tests',
existing_cluster_id='test-cluster-001',
base_parameters={
'test_env': 'staging',
'git_commit': '{{ params.git_commit }}'
},
databricks_conn_id='databricks_staging'
)
# Validate deployment
validate = PythonOperator(
task_id='validate_deployment',
python_callable=validate_deployment,
params={'repo_path': '/Repos/temp/deployment-{{ run_id }}'}
)
# Deploy to production if tests pass
deploy_to_prod = DatabricksReposUpdateOperator(
task_id='deploy_to_production',
repo_path='/Repos/production/analytics',
branch='{{ params.production_branch }}',
databricks_conn_id='databricks_production'
)
# Clean up temporary repository
cleanup_temp = DatabricksReposDeleteOperator(
task_id='cleanup_temp_repo',
repo_path='/Repos/temp/deployment-{{ run_id }}',
databricks_conn_id='databricks_staging',
trigger_rule='none_failed_min_one_success' # Run cleanup regardless of test outcome
)
# Cleanup old deployments
cleanup_old = PythonOperator(
task_id='cleanup_old_deployments',
python_callable=cleanup_old_deployments
)
create_temp_repo >> checkout_commit >> run_tests >> validate >> deploy_to_prod
[run_tests, validate, deploy_to_prod] >> cleanup_temp >> cleanup_oldHandle repositories from different Git providers:
# GitHub Enterprise repository
github_enterprise_repo = DatabricksReposCreateOperator(
task_id='create_github_enterprise_repo',
git_url='https://git.company.com/data-team/analytics.git',
git_provider='gitHubEnterprise',
repo_path='/Repos/enterprise/analytics',
databricks_conn_id='databricks_enterprise'
)
# Azure DevOps repository
azure_devops_repo = DatabricksReposCreateOperator(
task_id='create_azure_devops_repo',
git_url='https://dev.azure.com/company/DataProject/_git/ml-models',
git_provider='azureDevOpsServices',
repo_path='/Repos/azure/ml-models',
databricks_conn_id='databricks_azure'
)
# GitLab repository
gitlab_repo = DatabricksReposCreateOperator(
task_id='create_gitlab_repo',
git_url='https://gitlab.company.com/analytics/dashboards.git',
git_provider='gitLab',
repo_path='/Repos/gitlab/dashboards',
databricks_conn_id='databricks_gitlab'
)
# Bitbucket Cloud repository
bitbucket_repo = DatabricksReposCreateOperator(
task_id='create_bitbucket_repo',
git_url='https://bitbucket.org/company/data-science.git',
git_provider='bitbucketCloud',
repo_path='/Repos/bitbucket/data-science',
databricks_conn_id='databricks_bitbucket'
)Dynamically manage repositories based on external triggers:
from airflow.operators.python import BranchPythonOperator
def determine_repo_action(**context):
"""Determine what action to take based on trigger parameters."""
action = context['params'].get('action', 'create')
if action == 'create':
return 'create_repository'
elif action == 'update':
return 'update_repository'
elif action == 'delete':
return 'delete_repository'
else:
raise ValueError(f"Unknown action: {action}")
def get_repo_info(**context):
"""Extract repository information from webhook or external trigger."""
return {
'git_url': context['params'].get('git_url'),
'branch': context['params'].get('branch', 'main'),
'repo_path': context['params'].get('repo_path')
}
# Dynamic repository management workflow
branch_action = BranchPythonOperator(
task_id='determine_action',
python_callable=determine_repo_action
)
create_repo = DatabricksReposCreateOperator(
task_id='create_repository',
git_url='{{ params.git_url }}',
git_provider='{{ params.git_provider }}',
repo_path='{{ params.repo_path }}',
databricks_conn_id='{{ params.databricks_conn_id }}'
)
update_repo = DatabricksReposUpdateOperator(
task_id='update_repository',
repo_path='{{ params.repo_path }}',
branch='{{ params.branch }}',
databricks_conn_id='{{ params.databricks_conn_id }}'
)
delete_repo = DatabricksReposDeleteOperator(
task_id='delete_repository',
repo_path='{{ params.repo_path }}',
databricks_conn_id='{{ params.databricks_conn_id }}'
)
branch_action >> [create_repo, update_repo, delete_repo]Keep repositories synchronized across multiple environments:
def sync_repositories_across_environments():
"""Synchronize repository state across dev, staging, and production."""
environments = ['development', 'staging', 'production']
branches = {
'development': 'develop',
'staging': 'release-candidate',
'production': 'main'
}
sync_tasks = []
for env in environments:
sync_task = DatabricksReposUpdateOperator(
task_id=f'sync_{env}_repo',
repo_path=f'/Repos/{env}/data-pipelines',
branch=branches[env],
databricks_conn_id=f'databricks_{env}'
)
sync_tasks.append(sync_task)
# Create dependencies: dev -> staging -> production
for i in range(len(sync_tasks) - 1):
sync_tasks[i] >> sync_tasks[i + 1]
return sync_tasks
# Use in DAG
sync_tasks = sync_repositories_across_environments()Validate repository operations with custom checks:
from airflow.providers.databricks.hooks.databricks import DatabricksHook
def validate_repository_creation(**context):
"""Validate that repository was created successfully."""
repo_path = context['params']['repo_path']
hook = DatabricksHook(databricks_conn_id='databricks_default')
try:
# Check if repository exists and is accessible
repo_info = hook._do_api_call(
('GET', f'api/2.0/repos/{repo_path}'),
{}
)
if repo_info.get('path') == repo_path:
print(f"Repository {repo_path} created successfully")
return True
else:
raise ValueError(f"Repository validation failed for {repo_path}")
except Exception as e:
print(f"Repository validation error: {str(e)}")
raise
# Repository creation with validation
create_validated_repo = DatabricksReposCreateOperator(
task_id='create_repo',
git_url='https://github.com/company/analytics.git',
repo_path='/Repos/production/analytics'
) >> PythonOperator(
task_id='validate_repo_creation',
python_callable=validate_repository_creation,
params={'repo_path': '/Repos/production/analytics'}
)Configure robust retry mechanisms for repository operations:
robust_repo_update = DatabricksReposUpdateOperator(
task_id='robust_repo_update',
repo_path='/Repos/production/critical-pipeline',
branch='hotfix-urgent',
databricks_conn_id='databricks_production',
databricks_retry_limit=5,
databricks_retry_delay=10,
retries=2,
retry_delay=timedelta(minutes=2)
)Implement automated cleanup for temporary repositories:
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def cleanup_old_temp_repos(**context):
"""Clean up temporary repositories older than specified days."""
hook = DatabricksHook(databricks_conn_id='databricks_staging')
# List all repositories
repos = hook._do_api_call(('GET', 'api/2.0/repos'), {})
cutoff_date = datetime.now() - timedelta(days=7)
for repo in repos.get('repos', []):
repo_path = repo.get('path', '')
# Check if it's a temporary repository
if '/temp/' in repo_path:
# Check creation date (you might need to track this separately)
# For demo, we'll use a naming convention with timestamps
if 'temp-deploy-' in repo_path:
try:
# Extract timestamp from path if following naming convention
timestamp_str = repo_path.split('temp-deploy-')[1].split('/')[0]
repo_date = datetime.strptime(timestamp_str, '%Y%m%d-%H%M%S')
if repo_date < cutoff_date:
print(f"Cleaning up old repository: {repo_path}")
hook._do_api_call(
('DELETE', f"api/2.0/repos/{repo['id']}"),
{}
)
except (ValueError, IndexError) as e:
print(f"Could not parse date from {repo_path}: {e}")
cleanup_task = PythonOperator(
task_id='cleanup_old_repositories',
python_callable=cleanup_old_temp_repos,
schedule_interval='@daily'
)The repository management operators provide comprehensive Git integration for Databricks Repos, enabling version-controlled deployment and management of notebooks, libraries, and code across different environments with robust error handling and automation capabilities.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-databricks