0
# Repository Management
1
2
The Databricks provider offers comprehensive Git repository management capabilities through Databricks Repos, enabling version-controlled deployment of notebooks, libraries, and code across different environments. This includes creating, updating, and deleting repositories with full Git integration.
3
4
## Core Operators
5
6
### DatabricksReposCreateOperator
7
8
Create new Git repositories in Databricks Repos for version-controlled code deployment.
9
10
```python { .api }
11
from airflow.providers.databricks.operators.databricks_repos import DatabricksReposCreateOperator
12
13
class DatabricksReposCreateOperator(BaseOperator):
14
def __init__(
15
self,
16
*,
17
git_url: str,
18
git_provider: str | None = None,
19
repo_path: str | None = None,
20
databricks_conn_id: str = "databricks_default",
21
databricks_retry_limit: int = 3,
22
databricks_retry_delay: int = 1,
23
**kwargs
24
) -> None:
25
"""
26
Create a new Git repository in Databricks Repos.
27
28
Args:
29
git_url: URL of the Git repository to clone
30
git_provider: Git provider - "gitHub", "bitbucketCloud", "gitLab",
31
"azureDevOpsServices", "gitHubEnterprise", "bitbucketServer",
32
"gitLabEnterpriseEdition". If None, provider is auto-detected
33
repo_path: Path where repository will be created in Databricks workspace
34
If None, uses /Repos/{username}/{repo_name}
35
databricks_conn_id: Airflow connection ID for Databricks
36
databricks_retry_limit: Number of retries for API calls
37
databricks_retry_delay: Seconds between retries
38
"""
39
```
40
41
### DatabricksReposUpdateOperator
42
43
Update existing repositories to different branches, tags, or commits.
44
45
```python { .api }
46
from airflow.providers.databricks.operators.databricks_repos import DatabricksReposUpdateOperator
47
48
class DatabricksReposUpdateOperator(BaseOperator):
49
def __init__(
50
self,
51
*,
52
repo_id: int | None = None,
53
repo_path: str | None = None,
54
branch: str | None = None,
55
tag: str | None = None,
56
databricks_conn_id: str = "databricks_default",
57
databricks_retry_limit: int = 3,
58
databricks_retry_delay: int = 1,
59
**kwargs
60
) -> None:
61
"""
62
Update an existing Git repository in Databricks Repos.
63
64
Args:
65
repo_id: Repository ID in Databricks (alternative to repo_path)
66
repo_path: Path to repository in Databricks workspace
67
branch: Git branch to checkout (mutually exclusive with tag)
68
tag: Git tag to checkout (mutually exclusive with branch)
69
databricks_conn_id: Airflow connection ID for Databricks
70
databricks_retry_limit: Number of retries for API calls
71
databricks_retry_delay: Seconds between retries
72
"""
73
```
74
75
### DatabricksReposDeleteOperator
76
77
Delete repositories from Databricks Repos when they're no longer needed.
78
79
```python { .api }
80
from airflow.providers.databricks.operators.databricks_repos import DatabricksReposDeleteOperator
81
82
class DatabricksReposDeleteOperator(BaseOperator):
83
def __init__(
84
self,
85
*,
86
repo_id: int | None = None,
87
repo_path: str | None = None,
88
databricks_conn_id: str = "databricks_default",
89
databricks_retry_limit: int = 3,
90
databricks_retry_delay: int = 1,
91
**kwargs
92
) -> None:
93
"""
94
Delete a Git repository from Databricks Repos.
95
96
Args:
97
repo_id: Repository ID in Databricks (alternative to repo_path)
98
repo_path: Path to repository in Databricks workspace
99
databricks_conn_id: Airflow connection ID for Databricks
100
databricks_retry_limit: Number of retries for API calls
101
databricks_retry_delay: Seconds between retries
102
"""
103
```
104
105
## Usage Examples
106
107
### Basic Repository Creation
108
109
Create a repository from a public GitHub repository:
110
111
```python
112
from airflow.providers.databricks.operators.databricks_repos import DatabricksReposCreateOperator
113
114
create_analytics_repo = DatabricksReposCreateOperator(
115
task_id='create_analytics_repo',
116
git_url='https://github.com/company/analytics-notebooks.git',
117
git_provider='gitHub',
118
repo_path='/Repos/production/analytics-notebooks',
119
databricks_conn_id='databricks_production'
120
)
121
```
122
123
### Private Repository with Authentication
124
125
Create a repository from a private Git provider:
126
127
```python
128
# For private repos, authentication is handled through the Databricks connection
129
# Configure personal access token or SSH key in the Databricks workspace settings
130
create_private_repo = DatabricksReposCreateOperator(
131
task_id='create_private_ml_repo',
132
git_url='https://github.com/company/ml-models-private.git',
133
git_provider='gitHub',
134
repo_path='/Repos/{{ params.environment }}/ml-models',
135
databricks_conn_id='databricks_{{ params.environment }}'
136
)
137
```
138
139
### Multi-Environment Repository Setup
140
141
Create repositories across different environments:
142
143
```python
144
from airflow.utils.task_group import TaskGroup
145
146
def create_repo_for_environment(env_name: str):
147
return DatabricksReposCreateOperator(
148
task_id=f'create_repo_{env_name}',
149
git_url='https://github.com/company/data-pipelines.git',
150
git_provider='gitHub',
151
repo_path=f'/Repos/{env_name}/data-pipelines',
152
databricks_conn_id=f'databricks_{env_name}'
153
)
154
155
with TaskGroup(group_id='setup_repositories') as repo_setup:
156
dev_repo = create_repo_for_environment('development')
157
staging_repo = create_repo_for_environment('staging')
158
prod_repo = create_repo_for_environment('production')
159
160
dev_repo >> staging_repo >> prod_repo
161
```
162
163
### Branch and Tag Management
164
165
Update repositories to specific branches or tags for deployments:
166
167
```python
168
from airflow.providers.databricks.operators.databricks_repos import DatabricksReposUpdateOperator
169
170
# Update to development branch
171
update_to_dev = DatabricksReposUpdateOperator(
172
task_id='update_to_dev_branch',
173
repo_path='/Repos/development/analytics',
174
branch='develop',
175
databricks_conn_id='databricks_dev'
176
)
177
178
# Update to release tag for production
179
update_to_release = DatabricksReposUpdateOperator(
180
task_id='update_to_release_tag',
181
repo_path='/Repos/production/analytics',
182
tag='v{{ params.release_version }}',
183
databricks_conn_id='databricks_production'
184
)
185
186
# Update to specific commit for hotfix
187
update_to_commit = DatabricksReposUpdateOperator(
188
task_id='hotfix_deployment',
189
repo_path='/Repos/production/analytics',
190
branch='hotfix-{{ params.ticket_number }}',
191
databricks_conn_id='databricks_production'
192
)
193
```
194
195
### CI/CD Pipeline Integration
196
197
Integrate repository operations with CI/CD workflows:
198
199
```python
200
from airflow.providers.databricks.operators.databricks_repos import (
201
DatabricksReposCreateOperator,
202
DatabricksReposUpdateOperator,
203
DatabricksReposDeleteOperator
204
)
205
from airflow.providers.databricks.operators.databricks import DatabricksNotebookOperator
206
from airflow.operators.python import PythonOperator
207
208
def validate_deployment(**context):
209
"""Validate that deployment was successful."""
210
# Custom validation logic
211
repo_path = context['params']['repo_path']
212
print(f"Validating deployment at {repo_path}")
213
return True
214
215
def cleanup_old_deployments(**context):
216
"""Clean up old deployment artifacts."""
217
# Cleanup logic for old branches or temporary repositories
218
print("Cleaning up old deployment artifacts")
219
220
# CI/CD Pipeline DAG
221
with DAG('cicd_pipeline', schedule_interval=None, catchup=False) as dag:
222
223
# Create temporary deployment repository
224
create_temp_repo = DatabricksReposCreateOperator(
225
task_id='create_temp_deployment_repo',
226
git_url='{{ params.git_url }}',
227
git_provider='gitHub',
228
repo_path='/Repos/temp/deployment-{{ run_id }}',
229
databricks_conn_id='databricks_staging'
230
)
231
232
# Update to specific commit for testing
233
checkout_commit = DatabricksReposUpdateOperator(
234
task_id='checkout_test_commit',
235
repo_path='/Repos/temp/deployment-{{ run_id }}',
236
branch='{{ params.test_branch }}',
237
databricks_conn_id='databricks_staging'
238
)
239
240
# Run tests on the checked out code
241
run_tests = DatabricksNotebookOperator(
242
task_id='run_integration_tests',
243
notebook_path='/Repos/temp/deployment-{{ run_id }}/tests/integration_tests',
244
existing_cluster_id='test-cluster-001',
245
base_parameters={
246
'test_env': 'staging',
247
'git_commit': '{{ params.git_commit }}'
248
},
249
databricks_conn_id='databricks_staging'
250
)
251
252
# Validate deployment
253
validate = PythonOperator(
254
task_id='validate_deployment',
255
python_callable=validate_deployment,
256
params={'repo_path': '/Repos/temp/deployment-{{ run_id }}'}
257
)
258
259
# Deploy to production if tests pass
260
deploy_to_prod = DatabricksReposUpdateOperator(
261
task_id='deploy_to_production',
262
repo_path='/Repos/production/analytics',
263
branch='{{ params.production_branch }}',
264
databricks_conn_id='databricks_production'
265
)
266
267
# Clean up temporary repository
268
cleanup_temp = DatabricksReposDeleteOperator(
269
task_id='cleanup_temp_repo',
270
repo_path='/Repos/temp/deployment-{{ run_id }}',
271
databricks_conn_id='databricks_staging',
272
trigger_rule='none_failed_min_one_success' # Run cleanup regardless of test outcome
273
)
274
275
# Cleanup old deployments
276
cleanup_old = PythonOperator(
277
task_id='cleanup_old_deployments',
278
python_callable=cleanup_old_deployments
279
)
280
281
create_temp_repo >> checkout_commit >> run_tests >> validate >> deploy_to_prod
282
[run_tests, validate, deploy_to_prod] >> cleanup_temp >> cleanup_old
283
```
284
285
## Advanced Repository Management
286
287
### Multi-Provider Repository Setup
288
289
Handle repositories from different Git providers:
290
291
```python
292
# GitHub Enterprise repository
293
github_enterprise_repo = DatabricksReposCreateOperator(
294
task_id='create_github_enterprise_repo',
295
git_url='https://git.company.com/data-team/analytics.git',
296
git_provider='gitHubEnterprise',
297
repo_path='/Repos/enterprise/analytics',
298
databricks_conn_id='databricks_enterprise'
299
)
300
301
# Azure DevOps repository
302
azure_devops_repo = DatabricksReposCreateOperator(
303
task_id='create_azure_devops_repo',
304
git_url='https://dev.azure.com/company/DataProject/_git/ml-models',
305
git_provider='azureDevOpsServices',
306
repo_path='/Repos/azure/ml-models',
307
databricks_conn_id='databricks_azure'
308
)
309
310
# GitLab repository
311
gitlab_repo = DatabricksReposCreateOperator(
312
task_id='create_gitlab_repo',
313
git_url='https://gitlab.company.com/analytics/dashboards.git',
314
git_provider='gitLab',
315
repo_path='/Repos/gitlab/dashboards',
316
databricks_conn_id='databricks_gitlab'
317
)
318
319
# Bitbucket Cloud repository
320
bitbucket_repo = DatabricksReposCreateOperator(
321
task_id='create_bitbucket_repo',
322
git_url='https://bitbucket.org/company/data-science.git',
323
git_provider='bitbucketCloud',
324
repo_path='/Repos/bitbucket/data-science',
325
databricks_conn_id='databricks_bitbucket'
326
)
327
```
328
329
### Dynamic Repository Management
330
331
Dynamically manage repositories based on external triggers:
332
333
```python
334
from airflow.operators.python import BranchPythonOperator
335
336
def determine_repo_action(**context):
337
"""Determine what action to take based on trigger parameters."""
338
action = context['params'].get('action', 'create')
339
if action == 'create':
340
return 'create_repository'
341
elif action == 'update':
342
return 'update_repository'
343
elif action == 'delete':
344
return 'delete_repository'
345
else:
346
raise ValueError(f"Unknown action: {action}")
347
348
def get_repo_info(**context):
349
"""Extract repository information from webhook or external trigger."""
350
return {
351
'git_url': context['params'].get('git_url'),
352
'branch': context['params'].get('branch', 'main'),
353
'repo_path': context['params'].get('repo_path')
354
}
355
356
# Dynamic repository management workflow
357
branch_action = BranchPythonOperator(
358
task_id='determine_action',
359
python_callable=determine_repo_action
360
)
361
362
create_repo = DatabricksReposCreateOperator(
363
task_id='create_repository',
364
git_url='{{ params.git_url }}',
365
git_provider='{{ params.git_provider }}',
366
repo_path='{{ params.repo_path }}',
367
databricks_conn_id='{{ params.databricks_conn_id }}'
368
)
369
370
update_repo = DatabricksReposUpdateOperator(
371
task_id='update_repository',
372
repo_path='{{ params.repo_path }}',
373
branch='{{ params.branch }}',
374
databricks_conn_id='{{ params.databricks_conn_id }}'
375
)
376
377
delete_repo = DatabricksReposDeleteOperator(
378
task_id='delete_repository',
379
repo_path='{{ params.repo_path }}',
380
databricks_conn_id='{{ params.databricks_conn_id }}'
381
)
382
383
branch_action >> [create_repo, update_repo, delete_repo]
384
```
385
386
### Repository Synchronization
387
388
Keep repositories synchronized across multiple environments:
389
390
```python
391
def sync_repositories_across_environments():
392
"""Synchronize repository state across dev, staging, and production."""
393
394
environments = ['development', 'staging', 'production']
395
branches = {
396
'development': 'develop',
397
'staging': 'release-candidate',
398
'production': 'main'
399
}
400
401
sync_tasks = []
402
403
for env in environments:
404
sync_task = DatabricksReposUpdateOperator(
405
task_id=f'sync_{env}_repo',
406
repo_path=f'/Repos/{env}/data-pipelines',
407
branch=branches[env],
408
databricks_conn_id=f'databricks_{env}'
409
)
410
sync_tasks.append(sync_task)
411
412
# Create dependencies: dev -> staging -> production
413
for i in range(len(sync_tasks) - 1):
414
sync_tasks[i] >> sync_tasks[i + 1]
415
416
return sync_tasks
417
418
# Use in DAG
419
sync_tasks = sync_repositories_across_environments()
420
```
421
422
## Error Handling and Best Practices
423
424
### Repository Validation
425
426
Validate repository operations with custom checks:
427
428
```python
429
from airflow.providers.databricks.hooks.databricks import DatabricksHook
430
431
def validate_repository_creation(**context):
432
"""Validate that repository was created successfully."""
433
repo_path = context['params']['repo_path']
434
435
hook = DatabricksHook(databricks_conn_id='databricks_default')
436
437
try:
438
# Check if repository exists and is accessible
439
repo_info = hook._do_api_call(
440
('GET', f'api/2.0/repos/{repo_path}'),
441
{}
442
)
443
444
if repo_info.get('path') == repo_path:
445
print(f"Repository {repo_path} created successfully")
446
return True
447
else:
448
raise ValueError(f"Repository validation failed for {repo_path}")
449
450
except Exception as e:
451
print(f"Repository validation error: {str(e)}")
452
raise
453
454
# Repository creation with validation
455
create_validated_repo = DatabricksReposCreateOperator(
456
task_id='create_repo',
457
git_url='https://github.com/company/analytics.git',
458
repo_path='/Repos/production/analytics'
459
) >> PythonOperator(
460
task_id='validate_repo_creation',
461
python_callable=validate_repository_creation,
462
params={'repo_path': '/Repos/production/analytics'}
463
)
464
```
465
466
### Retry Configuration
467
468
Configure robust retry mechanisms for repository operations:
469
470
```python
471
robust_repo_update = DatabricksReposUpdateOperator(
472
task_id='robust_repo_update',
473
repo_path='/Repos/production/critical-pipeline',
474
branch='hotfix-urgent',
475
databricks_conn_id='databricks_production',
476
databricks_retry_limit=5,
477
databricks_retry_delay=10,
478
retries=2,
479
retry_delay=timedelta(minutes=2)
480
)
481
```
482
483
### Repository Cleanup Strategies
484
485
Implement automated cleanup for temporary repositories:
486
487
```python
488
from airflow.operators.python import PythonOperator
489
from datetime import datetime, timedelta
490
491
def cleanup_old_temp_repos(**context):
492
"""Clean up temporary repositories older than specified days."""
493
hook = DatabricksHook(databricks_conn_id='databricks_staging')
494
495
# List all repositories
496
repos = hook._do_api_call(('GET', 'api/2.0/repos'), {})
497
498
cutoff_date = datetime.now() - timedelta(days=7)
499
500
for repo in repos.get('repos', []):
501
repo_path = repo.get('path', '')
502
503
# Check if it's a temporary repository
504
if '/temp/' in repo_path:
505
# Check creation date (you might need to track this separately)
506
# For demo, we'll use a naming convention with timestamps
507
if 'temp-deploy-' in repo_path:
508
try:
509
# Extract timestamp from path if following naming convention
510
timestamp_str = repo_path.split('temp-deploy-')[1].split('/')[0]
511
repo_date = datetime.strptime(timestamp_str, '%Y%m%d-%H%M%S')
512
513
if repo_date < cutoff_date:
514
print(f"Cleaning up old repository: {repo_path}")
515
hook._do_api_call(
516
('DELETE', f"api/2.0/repos/{repo['id']}"),
517
{}
518
)
519
except (ValueError, IndexError) as e:
520
print(f"Could not parse date from {repo_path}: {e}")
521
522
cleanup_task = PythonOperator(
523
task_id='cleanup_old_repositories',
524
python_callable=cleanup_old_temp_repos,
525
schedule_interval='@daily'
526
)
527
```
528
529
The repository management operators provide comprehensive Git integration for Databricks Repos, enabling version-controlled deployment and management of notebooks, libraries, and code across different environments with robust error handling and automation capabilities.