CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-databricks

Comprehensive Databricks integration for Apache Airflow with operators, hooks, sensors, and triggers for orchestrating data workflows

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

repositories.mddocs/

Repository Management

The Databricks provider offers comprehensive Git repository management capabilities through Databricks Repos, enabling version-controlled deployment of notebooks, libraries, and code across different environments. This includes creating, updating, and deleting repositories with full Git integration.

Core Operators

DatabricksReposCreateOperator

Create new Git repositories in Databricks Repos for version-controlled code deployment.

from airflow.providers.databricks.operators.databricks_repos import DatabricksReposCreateOperator

class DatabricksReposCreateOperator(BaseOperator):
    def __init__(
        self,
        *,
        git_url: str,
        git_provider: str | None = None,
        repo_path: str | None = None,
        databricks_conn_id: str = "databricks_default",
        databricks_retry_limit: int = 3,
        databricks_retry_delay: int = 1,
        **kwargs
    ) -> None:
        """
        Create a new Git repository in Databricks Repos.
        
        Args:
            git_url: URL of the Git repository to clone
            git_provider: Git provider - "gitHub", "bitbucketCloud", "gitLab", 
                         "azureDevOpsServices", "gitHubEnterprise", "bitbucketServer", 
                         "gitLabEnterpriseEdition". If None, provider is auto-detected
            repo_path: Path where repository will be created in Databricks workspace
                      If None, uses /Repos/{username}/{repo_name}
            databricks_conn_id: Airflow connection ID for Databricks
            databricks_retry_limit: Number of retries for API calls
            databricks_retry_delay: Seconds between retries
        """

DatabricksReposUpdateOperator

Update existing repositories to different branches, tags, or commits.

from airflow.providers.databricks.operators.databricks_repos import DatabricksReposUpdateOperator

class DatabricksReposUpdateOperator(BaseOperator):
    def __init__(
        self,
        *,
        repo_id: int | None = None,
        repo_path: str | None = None,
        branch: str | None = None,
        tag: str | None = None,
        databricks_conn_id: str = "databricks_default",
        databricks_retry_limit: int = 3,
        databricks_retry_delay: int = 1,
        **kwargs
    ) -> None:
        """
        Update an existing Git repository in Databricks Repos.
        
        Args:
            repo_id: Repository ID in Databricks (alternative to repo_path)
            repo_path: Path to repository in Databricks workspace
            branch: Git branch to checkout (mutually exclusive with tag)
            tag: Git tag to checkout (mutually exclusive with branch)
            databricks_conn_id: Airflow connection ID for Databricks
            databricks_retry_limit: Number of retries for API calls
            databricks_retry_delay: Seconds between retries
        """

DatabricksReposDeleteOperator

Delete repositories from Databricks Repos when they're no longer needed.

from airflow.providers.databricks.operators.databricks_repos import DatabricksReposDeleteOperator

class DatabricksReposDeleteOperator(BaseOperator):
    def __init__(
        self,
        *,
        repo_id: int | None = None,
        repo_path: str | None = None,
        databricks_conn_id: str = "databricks_default",
        databricks_retry_limit: int = 3,
        databricks_retry_delay: int = 1,
        **kwargs
    ) -> None:
        """
        Delete a Git repository from Databricks Repos.
        
        Args:
            repo_id: Repository ID in Databricks (alternative to repo_path)
            repo_path: Path to repository in Databricks workspace
            databricks_conn_id: Airflow connection ID for Databricks
            databricks_retry_limit: Number of retries for API calls
            databricks_retry_delay: Seconds between retries
        """

Usage Examples

Basic Repository Creation

Create a repository from a public GitHub repository:

from airflow.providers.databricks.operators.databricks_repos import DatabricksReposCreateOperator

create_analytics_repo = DatabricksReposCreateOperator(
    task_id='create_analytics_repo',
    git_url='https://github.com/company/analytics-notebooks.git',
    git_provider='gitHub',
    repo_path='/Repos/production/analytics-notebooks',
    databricks_conn_id='databricks_production'
)

Private Repository with Authentication

Create a repository from a private Git provider:

# For private repos, authentication is handled through the Databricks connection
# Configure personal access token or SSH key in the Databricks workspace settings
create_private_repo = DatabricksReposCreateOperator(
    task_id='create_private_ml_repo',
    git_url='https://github.com/company/ml-models-private.git',
    git_provider='gitHub',
    repo_path='/Repos/{{ params.environment }}/ml-models',
    databricks_conn_id='databricks_{{ params.environment }}'
)

Multi-Environment Repository Setup

Create repositories across different environments:

from airflow.utils.task_group import TaskGroup

def create_repo_for_environment(env_name: str):
    return DatabricksReposCreateOperator(
        task_id=f'create_repo_{env_name}',
        git_url='https://github.com/company/data-pipelines.git',
        git_provider='gitHub', 
        repo_path=f'/Repos/{env_name}/data-pipelines',
        databricks_conn_id=f'databricks_{env_name}'
    )

with TaskGroup(group_id='setup_repositories') as repo_setup:
    dev_repo = create_repo_for_environment('development')
    staging_repo = create_repo_for_environment('staging') 
    prod_repo = create_repo_for_environment('production')
    
    dev_repo >> staging_repo >> prod_repo

Branch and Tag Management

Update repositories to specific branches or tags for deployments:

from airflow.providers.databricks.operators.databricks_repos import DatabricksReposUpdateOperator

# Update to development branch
update_to_dev = DatabricksReposUpdateOperator(
    task_id='update_to_dev_branch',
    repo_path='/Repos/development/analytics',
    branch='develop',
    databricks_conn_id='databricks_dev'
)

# Update to release tag for production
update_to_release = DatabricksReposUpdateOperator(
    task_id='update_to_release_tag',
    repo_path='/Repos/production/analytics',
    tag='v{{ params.release_version }}',
    databricks_conn_id='databricks_production'
)

# Update to specific commit for hotfix
update_to_commit = DatabricksReposUpdateOperator(
    task_id='hotfix_deployment',
    repo_path='/Repos/production/analytics',
    branch='hotfix-{{ params.ticket_number }}',
    databricks_conn_id='databricks_production'
)

CI/CD Pipeline Integration

Integrate repository operations with CI/CD workflows:

from airflow.providers.databricks.operators.databricks_repos import (
    DatabricksReposCreateOperator,
    DatabricksReposUpdateOperator,
    DatabricksReposDeleteOperator
)
from airflow.providers.databricks.operators.databricks import DatabricksNotebookOperator
from airflow.operators.python import PythonOperator

def validate_deployment(**context):
    """Validate that deployment was successful."""
    # Custom validation logic
    repo_path = context['params']['repo_path']
    print(f"Validating deployment at {repo_path}")
    return True

def cleanup_old_deployments(**context):
    """Clean up old deployment artifacts."""
    # Cleanup logic for old branches or temporary repositories
    print("Cleaning up old deployment artifacts")

# CI/CD Pipeline DAG
with DAG('cicd_pipeline', schedule_interval=None, catchup=False) as dag:
    
    # Create temporary deployment repository
    create_temp_repo = DatabricksReposCreateOperator(
        task_id='create_temp_deployment_repo',
        git_url='{{ params.git_url }}',
        git_provider='gitHub',
        repo_path='/Repos/temp/deployment-{{ run_id }}',
        databricks_conn_id='databricks_staging'
    )
    
    # Update to specific commit for testing
    checkout_commit = DatabricksReposUpdateOperator(
        task_id='checkout_test_commit',
        repo_path='/Repos/temp/deployment-{{ run_id }}',
        branch='{{ params.test_branch }}',
        databricks_conn_id='databricks_staging'
    )
    
    # Run tests on the checked out code
    run_tests = DatabricksNotebookOperator(
        task_id='run_integration_tests',
        notebook_path='/Repos/temp/deployment-{{ run_id }}/tests/integration_tests',
        existing_cluster_id='test-cluster-001',
        base_parameters={
            'test_env': 'staging',
            'git_commit': '{{ params.git_commit }}'
        },
        databricks_conn_id='databricks_staging'
    )
    
    # Validate deployment
    validate = PythonOperator(
        task_id='validate_deployment',
        python_callable=validate_deployment,
        params={'repo_path': '/Repos/temp/deployment-{{ run_id }}'}
    )
    
    # Deploy to production if tests pass
    deploy_to_prod = DatabricksReposUpdateOperator(
        task_id='deploy_to_production',
        repo_path='/Repos/production/analytics',
        branch='{{ params.production_branch }}',
        databricks_conn_id='databricks_production'
    )
    
    # Clean up temporary repository
    cleanup_temp = DatabricksReposDeleteOperator(
        task_id='cleanup_temp_repo',
        repo_path='/Repos/temp/deployment-{{ run_id }}',
        databricks_conn_id='databricks_staging',
        trigger_rule='none_failed_min_one_success'  # Run cleanup regardless of test outcome
    )
    
    # Cleanup old deployments
    cleanup_old = PythonOperator(
        task_id='cleanup_old_deployments',
        python_callable=cleanup_old_deployments
    )
    
    create_temp_repo >> checkout_commit >> run_tests >> validate >> deploy_to_prod
    [run_tests, validate, deploy_to_prod] >> cleanup_temp >> cleanup_old

Advanced Repository Management

Multi-Provider Repository Setup

Handle repositories from different Git providers:

# GitHub Enterprise repository
github_enterprise_repo = DatabricksReposCreateOperator(
    task_id='create_github_enterprise_repo',
    git_url='https://git.company.com/data-team/analytics.git',
    git_provider='gitHubEnterprise',
    repo_path='/Repos/enterprise/analytics',
    databricks_conn_id='databricks_enterprise'
)

# Azure DevOps repository  
azure_devops_repo = DatabricksReposCreateOperator(
    task_id='create_azure_devops_repo',
    git_url='https://dev.azure.com/company/DataProject/_git/ml-models',
    git_provider='azureDevOpsServices',
    repo_path='/Repos/azure/ml-models',
    databricks_conn_id='databricks_azure'
)

# GitLab repository
gitlab_repo = DatabricksReposCreateOperator(
    task_id='create_gitlab_repo',
    git_url='https://gitlab.company.com/analytics/dashboards.git',
    git_provider='gitLab',
    repo_path='/Repos/gitlab/dashboards',
    databricks_conn_id='databricks_gitlab'
)

# Bitbucket Cloud repository
bitbucket_repo = DatabricksReposCreateOperator(
    task_id='create_bitbucket_repo',
    git_url='https://bitbucket.org/company/data-science.git',
    git_provider='bitbucketCloud',
    repo_path='/Repos/bitbucket/data-science',
    databricks_conn_id='databricks_bitbucket'
)

Dynamic Repository Management

Dynamically manage repositories based on external triggers:

from airflow.operators.python import BranchPythonOperator

def determine_repo_action(**context):
    """Determine what action to take based on trigger parameters."""
    action = context['params'].get('action', 'create')
    if action == 'create':
        return 'create_repository'
    elif action == 'update':
        return 'update_repository'
    elif action == 'delete':
        return 'delete_repository'
    else:
        raise ValueError(f"Unknown action: {action}")

def get_repo_info(**context):
    """Extract repository information from webhook or external trigger."""
    return {
        'git_url': context['params'].get('git_url'),
        'branch': context['params'].get('branch', 'main'),
        'repo_path': context['params'].get('repo_path')
    }

# Dynamic repository management workflow
branch_action = BranchPythonOperator(
    task_id='determine_action',
    python_callable=determine_repo_action
)

create_repo = DatabricksReposCreateOperator(
    task_id='create_repository',
    git_url='{{ params.git_url }}',
    git_provider='{{ params.git_provider }}',
    repo_path='{{ params.repo_path }}',
    databricks_conn_id='{{ params.databricks_conn_id }}'
)

update_repo = DatabricksReposUpdateOperator(
    task_id='update_repository',
    repo_path='{{ params.repo_path }}',
    branch='{{ params.branch }}',
    databricks_conn_id='{{ params.databricks_conn_id }}'
)

delete_repo = DatabricksReposDeleteOperator(
    task_id='delete_repository',
    repo_path='{{ params.repo_path }}',
    databricks_conn_id='{{ params.databricks_conn_id }}'
)

branch_action >> [create_repo, update_repo, delete_repo]

Repository Synchronization

Keep repositories synchronized across multiple environments:

def sync_repositories_across_environments():
    """Synchronize repository state across dev, staging, and production."""
    
    environments = ['development', 'staging', 'production']
    branches = {
        'development': 'develop',
        'staging': 'release-candidate', 
        'production': 'main'
    }
    
    sync_tasks = []
    
    for env in environments:
        sync_task = DatabricksReposUpdateOperator(
            task_id=f'sync_{env}_repo',
            repo_path=f'/Repos/{env}/data-pipelines',
            branch=branches[env],
            databricks_conn_id=f'databricks_{env}'
        )
        sync_tasks.append(sync_task)
    
    # Create dependencies: dev -> staging -> production
    for i in range(len(sync_tasks) - 1):
        sync_tasks[i] >> sync_tasks[i + 1]
    
    return sync_tasks

# Use in DAG
sync_tasks = sync_repositories_across_environments()

Error Handling and Best Practices

Repository Validation

Validate repository operations with custom checks:

from airflow.providers.databricks.hooks.databricks import DatabricksHook

def validate_repository_creation(**context):
    """Validate that repository was created successfully."""
    repo_path = context['params']['repo_path']
    
    hook = DatabricksHook(databricks_conn_id='databricks_default')
    
    try:
        # Check if repository exists and is accessible
        repo_info = hook._do_api_call(
            ('GET', f'api/2.0/repos/{repo_path}'),
            {}
        )
        
        if repo_info.get('path') == repo_path:
            print(f"Repository {repo_path} created successfully")
            return True
        else:
            raise ValueError(f"Repository validation failed for {repo_path}")
            
    except Exception as e:
        print(f"Repository validation error: {str(e)}")
        raise

# Repository creation with validation
create_validated_repo = DatabricksReposCreateOperator(
    task_id='create_repo',
    git_url='https://github.com/company/analytics.git',
    repo_path='/Repos/production/analytics'
) >> PythonOperator(
    task_id='validate_repo_creation',
    python_callable=validate_repository_creation,
    params={'repo_path': '/Repos/production/analytics'}
)

Retry Configuration

Configure robust retry mechanisms for repository operations:

robust_repo_update = DatabricksReposUpdateOperator(
    task_id='robust_repo_update',
    repo_path='/Repos/production/critical-pipeline',
    branch='hotfix-urgent',
    databricks_conn_id='databricks_production',
    databricks_retry_limit=5,
    databricks_retry_delay=10,
    retries=2,
    retry_delay=timedelta(minutes=2)
)

Repository Cleanup Strategies

Implement automated cleanup for temporary repositories:

from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def cleanup_old_temp_repos(**context):
    """Clean up temporary repositories older than specified days."""
    hook = DatabricksHook(databricks_conn_id='databricks_staging')
    
    # List all repositories
    repos = hook._do_api_call(('GET', 'api/2.0/repos'), {})
    
    cutoff_date = datetime.now() - timedelta(days=7)
    
    for repo in repos.get('repos', []):
        repo_path = repo.get('path', '')
        
        # Check if it's a temporary repository
        if '/temp/' in repo_path:
            # Check creation date (you might need to track this separately)
            # For demo, we'll use a naming convention with timestamps
            if 'temp-deploy-' in repo_path:
                try:
                    # Extract timestamp from path if following naming convention
                    timestamp_str = repo_path.split('temp-deploy-')[1].split('/')[0]
                    repo_date = datetime.strptime(timestamp_str, '%Y%m%d-%H%M%S')
                    
                    if repo_date < cutoff_date:
                        print(f"Cleaning up old repository: {repo_path}")
                        hook._do_api_call(
                            ('DELETE', f"api/2.0/repos/{repo['id']}"),
                            {}
                        )
                except (ValueError, IndexError) as e:
                    print(f"Could not parse date from {repo_path}: {e}")

cleanup_task = PythonOperator(
    task_id='cleanup_old_repositories',
    python_callable=cleanup_old_temp_repos,
    schedule_interval='@daily'
)

The repository management operators provide comprehensive Git integration for Databricks Repos, enabling version-controlled deployment and management of notebooks, libraries, and code across different environments with robust error handling and automation capabilities.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-databricks

docs

connections.md

index.md

job-management.md

monitoring.md

repositories.md

sql-operations.md

workflows.md

tile.json