or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connections.mdindex.mdjob-management.mdmonitoring.mdrepositories.mdsql-operations.mdworkflows.md

repositories.mddocs/

0

# Repository Management

1

2

The Databricks provider offers comprehensive Git repository management capabilities through Databricks Repos, enabling version-controlled deployment of notebooks, libraries, and code across different environments. This includes creating, updating, and deleting repositories with full Git integration.

3

4

## Core Operators

5

6

### DatabricksReposCreateOperator

7

8

Create new Git repositories in Databricks Repos for version-controlled code deployment.

9

10

```python { .api }

11

from airflow.providers.databricks.operators.databricks_repos import DatabricksReposCreateOperator

12

13

class DatabricksReposCreateOperator(BaseOperator):

14

def __init__(

15

self,

16

*,

17

git_url: str,

18

git_provider: str | None = None,

19

repo_path: str | None = None,

20

databricks_conn_id: str = "databricks_default",

21

databricks_retry_limit: int = 3,

22

databricks_retry_delay: int = 1,

23

**kwargs

24

) -> None:

25

"""

26

Create a new Git repository in Databricks Repos.

27

28

Args:

29

git_url: URL of the Git repository to clone

30

git_provider: Git provider - "gitHub", "bitbucketCloud", "gitLab",

31

"azureDevOpsServices", "gitHubEnterprise", "bitbucketServer",

32

"gitLabEnterpriseEdition". If None, provider is auto-detected

33

repo_path: Path where repository will be created in Databricks workspace

34

If None, uses /Repos/{username}/{repo_name}

35

databricks_conn_id: Airflow connection ID for Databricks

36

databricks_retry_limit: Number of retries for API calls

37

databricks_retry_delay: Seconds between retries

38

"""

39

```

40

41

### DatabricksReposUpdateOperator

42

43

Update existing repositories to different branches, tags, or commits.

44

45

```python { .api }

46

from airflow.providers.databricks.operators.databricks_repos import DatabricksReposUpdateOperator

47

48

class DatabricksReposUpdateOperator(BaseOperator):

49

def __init__(

50

self,

51

*,

52

repo_id: int | None = None,

53

repo_path: str | None = None,

54

branch: str | None = None,

55

tag: str | None = None,

56

databricks_conn_id: str = "databricks_default",

57

databricks_retry_limit: int = 3,

58

databricks_retry_delay: int = 1,

59

**kwargs

60

) -> None:

61

"""

62

Update an existing Git repository in Databricks Repos.

63

64

Args:

65

repo_id: Repository ID in Databricks (alternative to repo_path)

66

repo_path: Path to repository in Databricks workspace

67

branch: Git branch to checkout (mutually exclusive with tag)

68

tag: Git tag to checkout (mutually exclusive with branch)

69

databricks_conn_id: Airflow connection ID for Databricks

70

databricks_retry_limit: Number of retries for API calls

71

databricks_retry_delay: Seconds between retries

72

"""

73

```

74

75

### DatabricksReposDeleteOperator

76

77

Delete repositories from Databricks Repos when they're no longer needed.

78

79

```python { .api }

80

from airflow.providers.databricks.operators.databricks_repos import DatabricksReposDeleteOperator

81

82

class DatabricksReposDeleteOperator(BaseOperator):

83

def __init__(

84

self,

85

*,

86

repo_id: int | None = None,

87

repo_path: str | None = None,

88

databricks_conn_id: str = "databricks_default",

89

databricks_retry_limit: int = 3,

90

databricks_retry_delay: int = 1,

91

**kwargs

92

) -> None:

93

"""

94

Delete a Git repository from Databricks Repos.

95

96

Args:

97

repo_id: Repository ID in Databricks (alternative to repo_path)

98

repo_path: Path to repository in Databricks workspace

99

databricks_conn_id: Airflow connection ID for Databricks

100

databricks_retry_limit: Number of retries for API calls

101

databricks_retry_delay: Seconds between retries

102

"""

103

```

104

105

## Usage Examples

106

107

### Basic Repository Creation

108

109

Create a repository from a public GitHub repository:

110

111

```python

112

from airflow.providers.databricks.operators.databricks_repos import DatabricksReposCreateOperator

113

114

create_analytics_repo = DatabricksReposCreateOperator(

115

task_id='create_analytics_repo',

116

git_url='https://github.com/company/analytics-notebooks.git',

117

git_provider='gitHub',

118

repo_path='/Repos/production/analytics-notebooks',

119

databricks_conn_id='databricks_production'

120

)

121

```

122

123

### Private Repository with Authentication

124

125

Create a repository from a private Git provider:

126

127

```python

128

# For private repos, authentication is handled through the Databricks connection

129

# Configure personal access token or SSH key in the Databricks workspace settings

130

create_private_repo = DatabricksReposCreateOperator(

131

task_id='create_private_ml_repo',

132

git_url='https://github.com/company/ml-models-private.git',

133

git_provider='gitHub',

134

repo_path='/Repos/{{ params.environment }}/ml-models',

135

databricks_conn_id='databricks_{{ params.environment }}'

136

)

137

```

138

139

### Multi-Environment Repository Setup

140

141

Create repositories across different environments:

142

143

```python

144

from airflow.utils.task_group import TaskGroup

145

146

def create_repo_for_environment(env_name: str):

147

return DatabricksReposCreateOperator(

148

task_id=f'create_repo_{env_name}',

149

git_url='https://github.com/company/data-pipelines.git',

150

git_provider='gitHub',

151

repo_path=f'/Repos/{env_name}/data-pipelines',

152

databricks_conn_id=f'databricks_{env_name}'

153

)

154

155

with TaskGroup(group_id='setup_repositories') as repo_setup:

156

dev_repo = create_repo_for_environment('development')

157

staging_repo = create_repo_for_environment('staging')

158

prod_repo = create_repo_for_environment('production')

159

160

dev_repo >> staging_repo >> prod_repo

161

```

162

163

### Branch and Tag Management

164

165

Update repositories to specific branches or tags for deployments:

166

167

```python

168

from airflow.providers.databricks.operators.databricks_repos import DatabricksReposUpdateOperator

169

170

# Update to development branch

171

update_to_dev = DatabricksReposUpdateOperator(

172

task_id='update_to_dev_branch',

173

repo_path='/Repos/development/analytics',

174

branch='develop',

175

databricks_conn_id='databricks_dev'

176

)

177

178

# Update to release tag for production

179

update_to_release = DatabricksReposUpdateOperator(

180

task_id='update_to_release_tag',

181

repo_path='/Repos/production/analytics',

182

tag='v{{ params.release_version }}',

183

databricks_conn_id='databricks_production'

184

)

185

186

# Update to specific commit for hotfix

187

update_to_commit = DatabricksReposUpdateOperator(

188

task_id='hotfix_deployment',

189

repo_path='/Repos/production/analytics',

190

branch='hotfix-{{ params.ticket_number }}',

191

databricks_conn_id='databricks_production'

192

)

193

```

194

195

### CI/CD Pipeline Integration

196

197

Integrate repository operations with CI/CD workflows:

198

199

```python

200

from airflow.providers.databricks.operators.databricks_repos import (

201

DatabricksReposCreateOperator,

202

DatabricksReposUpdateOperator,

203

DatabricksReposDeleteOperator

204

)

205

from airflow.providers.databricks.operators.databricks import DatabricksNotebookOperator

206

from airflow.operators.python import PythonOperator

207

208

def validate_deployment(**context):

209

"""Validate that deployment was successful."""

210

# Custom validation logic

211

repo_path = context['params']['repo_path']

212

print(f"Validating deployment at {repo_path}")

213

return True

214

215

def cleanup_old_deployments(**context):

216

"""Clean up old deployment artifacts."""

217

# Cleanup logic for old branches or temporary repositories

218

print("Cleaning up old deployment artifacts")

219

220

# CI/CD Pipeline DAG

221

with DAG('cicd_pipeline', schedule_interval=None, catchup=False) as dag:

222

223

# Create temporary deployment repository

224

create_temp_repo = DatabricksReposCreateOperator(

225

task_id='create_temp_deployment_repo',

226

git_url='{{ params.git_url }}',

227

git_provider='gitHub',

228

repo_path='/Repos/temp/deployment-{{ run_id }}',

229

databricks_conn_id='databricks_staging'

230

)

231

232

# Update to specific commit for testing

233

checkout_commit = DatabricksReposUpdateOperator(

234

task_id='checkout_test_commit',

235

repo_path='/Repos/temp/deployment-{{ run_id }}',

236

branch='{{ params.test_branch }}',

237

databricks_conn_id='databricks_staging'

238

)

239

240

# Run tests on the checked out code

241

run_tests = DatabricksNotebookOperator(

242

task_id='run_integration_tests',

243

notebook_path='/Repos/temp/deployment-{{ run_id }}/tests/integration_tests',

244

existing_cluster_id='test-cluster-001',

245

base_parameters={

246

'test_env': 'staging',

247

'git_commit': '{{ params.git_commit }}'

248

},

249

databricks_conn_id='databricks_staging'

250

)

251

252

# Validate deployment

253

validate = PythonOperator(

254

task_id='validate_deployment',

255

python_callable=validate_deployment,

256

params={'repo_path': '/Repos/temp/deployment-{{ run_id }}'}

257

)

258

259

# Deploy to production if tests pass

260

deploy_to_prod = DatabricksReposUpdateOperator(

261

task_id='deploy_to_production',

262

repo_path='/Repos/production/analytics',

263

branch='{{ params.production_branch }}',

264

databricks_conn_id='databricks_production'

265

)

266

267

# Clean up temporary repository

268

cleanup_temp = DatabricksReposDeleteOperator(

269

task_id='cleanup_temp_repo',

270

repo_path='/Repos/temp/deployment-{{ run_id }}',

271

databricks_conn_id='databricks_staging',

272

trigger_rule='none_failed_min_one_success' # Run cleanup regardless of test outcome

273

)

274

275

# Cleanup old deployments

276

cleanup_old = PythonOperator(

277

task_id='cleanup_old_deployments',

278

python_callable=cleanup_old_deployments

279

)

280

281

create_temp_repo >> checkout_commit >> run_tests >> validate >> deploy_to_prod

282

[run_tests, validate, deploy_to_prod] >> cleanup_temp >> cleanup_old

283

```

284

285

## Advanced Repository Management

286

287

### Multi-Provider Repository Setup

288

289

Handle repositories from different Git providers:

290

291

```python

292

# GitHub Enterprise repository

293

github_enterprise_repo = DatabricksReposCreateOperator(

294

task_id='create_github_enterprise_repo',

295

git_url='https://git.company.com/data-team/analytics.git',

296

git_provider='gitHubEnterprise',

297

repo_path='/Repos/enterprise/analytics',

298

databricks_conn_id='databricks_enterprise'

299

)

300

301

# Azure DevOps repository

302

azure_devops_repo = DatabricksReposCreateOperator(

303

task_id='create_azure_devops_repo',

304

git_url='https://dev.azure.com/company/DataProject/_git/ml-models',

305

git_provider='azureDevOpsServices',

306

repo_path='/Repos/azure/ml-models',

307

databricks_conn_id='databricks_azure'

308

)

309

310

# GitLab repository

311

gitlab_repo = DatabricksReposCreateOperator(

312

task_id='create_gitlab_repo',

313

git_url='https://gitlab.company.com/analytics/dashboards.git',

314

git_provider='gitLab',

315

repo_path='/Repos/gitlab/dashboards',

316

databricks_conn_id='databricks_gitlab'

317

)

318

319

# Bitbucket Cloud repository

320

bitbucket_repo = DatabricksReposCreateOperator(

321

task_id='create_bitbucket_repo',

322

git_url='https://bitbucket.org/company/data-science.git',

323

git_provider='bitbucketCloud',

324

repo_path='/Repos/bitbucket/data-science',

325

databricks_conn_id='databricks_bitbucket'

326

)

327

```

328

329

### Dynamic Repository Management

330

331

Dynamically manage repositories based on external triggers:

332

333

```python

334

from airflow.operators.python import BranchPythonOperator

335

336

def determine_repo_action(**context):

337

"""Determine what action to take based on trigger parameters."""

338

action = context['params'].get('action', 'create')

339

if action == 'create':

340

return 'create_repository'

341

elif action == 'update':

342

return 'update_repository'

343

elif action == 'delete':

344

return 'delete_repository'

345

else:

346

raise ValueError(f"Unknown action: {action}")

347

348

def get_repo_info(**context):

349

"""Extract repository information from webhook or external trigger."""

350

return {

351

'git_url': context['params'].get('git_url'),

352

'branch': context['params'].get('branch', 'main'),

353

'repo_path': context['params'].get('repo_path')

354

}

355

356

# Dynamic repository management workflow

357

branch_action = BranchPythonOperator(

358

task_id='determine_action',

359

python_callable=determine_repo_action

360

)

361

362

create_repo = DatabricksReposCreateOperator(

363

task_id='create_repository',

364

git_url='{{ params.git_url }}',

365

git_provider='{{ params.git_provider }}',

366

repo_path='{{ params.repo_path }}',

367

databricks_conn_id='{{ params.databricks_conn_id }}'

368

)

369

370

update_repo = DatabricksReposUpdateOperator(

371

task_id='update_repository',

372

repo_path='{{ params.repo_path }}',

373

branch='{{ params.branch }}',

374

databricks_conn_id='{{ params.databricks_conn_id }}'

375

)

376

377

delete_repo = DatabricksReposDeleteOperator(

378

task_id='delete_repository',

379

repo_path='{{ params.repo_path }}',

380

databricks_conn_id='{{ params.databricks_conn_id }}'

381

)

382

383

branch_action >> [create_repo, update_repo, delete_repo]

384

```

385

386

### Repository Synchronization

387

388

Keep repositories synchronized across multiple environments:

389

390

```python

391

def sync_repositories_across_environments():

392

"""Synchronize repository state across dev, staging, and production."""

393

394

environments = ['development', 'staging', 'production']

395

branches = {

396

'development': 'develop',

397

'staging': 'release-candidate',

398

'production': 'main'

399

}

400

401

sync_tasks = []

402

403

for env in environments:

404

sync_task = DatabricksReposUpdateOperator(

405

task_id=f'sync_{env}_repo',

406

repo_path=f'/Repos/{env}/data-pipelines',

407

branch=branches[env],

408

databricks_conn_id=f'databricks_{env}'

409

)

410

sync_tasks.append(sync_task)

411

412

# Create dependencies: dev -> staging -> production

413

for i in range(len(sync_tasks) - 1):

414

sync_tasks[i] >> sync_tasks[i + 1]

415

416

return sync_tasks

417

418

# Use in DAG

419

sync_tasks = sync_repositories_across_environments()

420

```

421

422

## Error Handling and Best Practices

423

424

### Repository Validation

425

426

Validate repository operations with custom checks:

427

428

```python

429

from airflow.providers.databricks.hooks.databricks import DatabricksHook

430

431

def validate_repository_creation(**context):

432

"""Validate that repository was created successfully."""

433

repo_path = context['params']['repo_path']

434

435

hook = DatabricksHook(databricks_conn_id='databricks_default')

436

437

try:

438

# Check if repository exists and is accessible

439

repo_info = hook._do_api_call(

440

('GET', f'api/2.0/repos/{repo_path}'),

441

{}

442

)

443

444

if repo_info.get('path') == repo_path:

445

print(f"Repository {repo_path} created successfully")

446

return True

447

else:

448

raise ValueError(f"Repository validation failed for {repo_path}")

449

450

except Exception as e:

451

print(f"Repository validation error: {str(e)}")

452

raise

453

454

# Repository creation with validation

455

create_validated_repo = DatabricksReposCreateOperator(

456

task_id='create_repo',

457

git_url='https://github.com/company/analytics.git',

458

repo_path='/Repos/production/analytics'

459

) >> PythonOperator(

460

task_id='validate_repo_creation',

461

python_callable=validate_repository_creation,

462

params={'repo_path': '/Repos/production/analytics'}

463

)

464

```

465

466

### Retry Configuration

467

468

Configure robust retry mechanisms for repository operations:

469

470

```python

471

robust_repo_update = DatabricksReposUpdateOperator(

472

task_id='robust_repo_update',

473

repo_path='/Repos/production/critical-pipeline',

474

branch='hotfix-urgent',

475

databricks_conn_id='databricks_production',

476

databricks_retry_limit=5,

477

databricks_retry_delay=10,

478

retries=2,

479

retry_delay=timedelta(minutes=2)

480

)

481

```

482

483

### Repository Cleanup Strategies

484

485

Implement automated cleanup for temporary repositories:

486

487

```python

488

from airflow.operators.python import PythonOperator

489

from datetime import datetime, timedelta

490

491

def cleanup_old_temp_repos(**context):

492

"""Clean up temporary repositories older than specified days."""

493

hook = DatabricksHook(databricks_conn_id='databricks_staging')

494

495

# List all repositories

496

repos = hook._do_api_call(('GET', 'api/2.0/repos'), {})

497

498

cutoff_date = datetime.now() - timedelta(days=7)

499

500

for repo in repos.get('repos', []):

501

repo_path = repo.get('path', '')

502

503

# Check if it's a temporary repository

504

if '/temp/' in repo_path:

505

# Check creation date (you might need to track this separately)

506

# For demo, we'll use a naming convention with timestamps

507

if 'temp-deploy-' in repo_path:

508

try:

509

# Extract timestamp from path if following naming convention

510

timestamp_str = repo_path.split('temp-deploy-')[1].split('/')[0]

511

repo_date = datetime.strptime(timestamp_str, '%Y%m%d-%H%M%S')

512

513

if repo_date < cutoff_date:

514

print(f"Cleaning up old repository: {repo_path}")

515

hook._do_api_call(

516

('DELETE', f"api/2.0/repos/{repo['id']}"),

517

{}

518

)

519

except (ValueError, IndexError) as e:

520

print(f"Could not parse date from {repo_path}: {e}")

521

522

cleanup_task = PythonOperator(

523

task_id='cleanup_old_repositories',

524

python_callable=cleanup_old_temp_repos,

525

schedule_interval='@daily'

526

)

527

```

528

529

The repository management operators provide comprehensive Git integration for Databricks Repos, enabling version-controlled deployment and management of notebooks, libraries, and code across different environments with robust error handling and automation capabilities.