0
# Resource Management
1
2
Configurable resources for Databricks client management with support for multiple authentication methods and automatic credential handling. These resources provide standardized ways to configure and access Databricks clients across different Dagster components.
3
4
## Capabilities
5
6
### DatabricksClientResource
7
8
Primary configurable resource for managing Databricks client instances with comprehensive authentication options and validation.
9
10
```python { .api }
11
class DatabricksClientResource(ConfigurableResource, IAttachDifferentObjectToOpContext):
12
"""Resource which provides a Python client for interacting with Databricks within an
13
op or asset."""
14
15
host: Optional[str] = None
16
token: Optional[str] = None
17
oauth_credentials: Optional[OauthCredentials] = None
18
azure_credentials: Optional[AzureServicePrincipalCredentials] = None
19
workspace_id: Optional[str] = None
20
21
def get_client(self) -> DatabricksClient:
22
"""
23
Create and return a configured DatabricksClient instance.
24
25
Returns:
26
DatabricksClient: Configured client with specified authentication
27
"""
28
```
29
30
### Legacy Resource Function
31
32
Traditional resource function for backward compatibility and simple use cases.
33
34
```python { .api }
35
def databricks_client(init_context) -> DatabricksClient:
36
"""
37
Create a DatabricksClient from resource context configuration.
38
39
Parameters:
40
- init_context: Dagster resource initialization context
41
42
Returns:
43
DatabricksClient: Configured Databricks client
44
"""
45
```
46
47
### Authentication Credential Classes
48
49
Configuration classes for different authentication methods.
50
51
```python { .api }
52
class OauthCredentials:
53
"""OAuth credentials for Databricks service principal authentication."""
54
55
client_id: str
56
client_secret: str
57
58
class AzureServicePrincipalCredentials:
59
"""Azure service principal credentials for Azure Databricks."""
60
61
azure_client_id: str
62
azure_client_secret: str
63
azure_tenant_id: str
64
```
65
66
## Authentication Methods
67
68
### Personal Access Token (PAT)
69
70
Most common authentication method using a personal access token.
71
72
```python
73
from dagster import job
74
from dagster_databricks import DatabricksClientResource
75
76
# Direct token configuration
77
databricks_resource = DatabricksClientResource(
78
host="https://your-workspace.cloud.databricks.com",
79
token="dapi1234567890abcdef"
80
)
81
82
# Environment variable configuration
83
databricks_resource = DatabricksClientResource(
84
host="https://your-workspace.cloud.databricks.com",
85
token={"env": "DATABRICKS_TOKEN"}
86
)
87
88
@job(resource_defs={"databricks": databricks_resource})
89
def my_job():
90
my_op()
91
```
92
93
### OAuth Service Principal
94
95
Secure authentication using Databricks OAuth service principal credentials.
96
97
```python
98
from dagster_databricks import DatabricksClientResource, OauthCredentials
99
100
# Direct credential configuration
101
databricks_resource = DatabricksClientResource(
102
host="https://your-workspace.cloud.databricks.com",
103
oauth_credentials=OauthCredentials(
104
client_id="your-client-id",
105
client_secret="your-client-secret"
106
)
107
)
108
109
# Environment variable configuration
110
databricks_resource = DatabricksClientResource(
111
host="https://your-workspace.cloud.databricks.com",
112
oauth_credentials=OauthCredentials(
113
client_id={"env": "DATABRICKS_CLIENT_ID"},
114
client_secret={"env": "DATABRICKS_CLIENT_SECRET"}
115
)
116
)
117
```
118
119
### Azure Service Principal
120
121
Authentication for Azure Databricks using Azure service principal credentials.
122
123
```python
124
from dagster_databricks import DatabricksClientResource, AzureServicePrincipalCredentials
125
126
databricks_resource = DatabricksClientResource(
127
host="https://your-workspace.azuredatabricks.net",
128
azure_credentials=AzureServicePrincipalCredentials(
129
azure_client_id={"env": "AZURE_CLIENT_ID"},
130
azure_client_secret={"env": "AZURE_CLIENT_SECRET"},
131
azure_tenant_id={"env": "AZURE_TENANT_ID"}
132
)
133
)
134
```
135
136
### Default Credentials
137
138
Automatic credential resolution from environment variables or configuration files.
139
140
```python
141
# No explicit credentials - will read from environment or ~/.databrickscfg
142
databricks_resource = DatabricksClientResource(
143
host="https://your-workspace.cloud.databricks.com"
144
)
145
146
# Or let it auto-detect host as well
147
databricks_resource = DatabricksClientResource()
148
```
149
150
## Configuration Validation
151
152
The resource includes comprehensive validation to ensure proper credential configuration:
153
154
### Single Authentication Method
155
156
Only one authentication method can be specified at a time:
157
158
```python
159
# Valid - only token specified
160
DatabricksClientResource(
161
host="https://workspace.cloud.databricks.com",
162
token="your-token"
163
)
164
165
# Invalid - multiple auth methods (will raise ValueError)
166
DatabricksClientResource(
167
host="https://workspace.cloud.databricks.com",
168
token="your-token",
169
oauth_credentials=OauthCredentials(client_id="id", client_secret="secret")
170
)
171
```
172
173
### Required Credential Components
174
175
Each authentication method requires all necessary components:
176
177
```python
178
# Valid OAuth configuration
179
oauth_credentials=OauthCredentials(
180
client_id="your-client-id",
181
client_secret="your-client-secret"
182
)
183
184
# Invalid - missing client_secret (will raise ValueError)
185
oauth_credentials=OauthCredentials(
186
client_id="your-client-id"
187
)
188
```
189
190
## Usage Examples
191
192
### Basic Resource Setup
193
194
```python
195
from dagster import job, op, Config
196
from dagster_databricks import DatabricksClientResource
197
198
@op
199
def process_data(context):
200
# Access the Databricks client
201
databricks_client = context.resources.databricks
202
203
# Use the client for operations
204
workspace_client = databricks_client.workspace_client
205
jobs_api = workspace_client.jobs
206
207
# Submit a job
208
run_result = jobs_api.submit(
209
tasks=[{
210
"task_key": "process",
211
"existing_cluster_id": "cluster-id",
212
"notebook_task": {
213
"notebook_path": "/path/to/notebook"
214
}
215
}]
216
)
217
218
return run_result.run_id
219
220
@job(
221
resource_defs={
222
"databricks": DatabricksClientResource(
223
host={"env": "DATABRICKS_HOST"},
224
token={"env": "DATABRICKS_TOKEN"}
225
)
226
}
227
)
228
def data_processing_job():
229
process_data()
230
```
231
232
### Multi-Environment Configuration
233
234
```python
235
from dagster import job, EnvVar
236
from dagster_databricks import DatabricksClientResource, OauthCredentials
237
238
# Development environment configuration
239
dev_databricks = DatabricksClientResource(
240
host="https://dev-workspace.cloud.databricks.com",
241
token=EnvVar("DEV_DATABRICKS_TOKEN")
242
)
243
244
# Production environment configuration
245
prod_databricks = DatabricksClientResource(
246
host="https://prod-workspace.cloud.databricks.com",
247
oauth_credentials=OauthCredentials(
248
client_id=EnvVar("PROD_DATABRICKS_CLIENT_ID"),
249
client_secret=EnvVar("PROD_DATABRICKS_CLIENT_SECRET")
250
)
251
)
252
253
# Job definition with environment-specific resources
254
@job(
255
resource_defs={
256
"databricks": dev_databricks # Switch based on deployment
257
}
258
)
259
def my_pipeline():
260
extract_data()
261
transform_data()
262
load_data()
263
```
264
265
### Integration with Other Resources
266
267
```python
268
from dagster import job, op, resource
269
from dagster_databricks import DatabricksClientResource
270
271
@resource
272
def data_lake_config():
273
return {
274
"bucket": "my-data-lake",
275
"prefix": "processed-data/"
276
}
277
278
@op(required_resource_keys={"databricks", "data_lake"})
279
def etl_operation(context):
280
databricks = context.resources.databricks
281
data_lake = context.resources.data_lake
282
283
# Use both resources together
284
workspace_client = databricks.workspace_client
285
286
# Submit job with data lake configuration
287
run_result = workspace_client.jobs.submit(
288
tasks=[{
289
"task_key": "etl",
290
"existing_cluster_id": "etl-cluster",
291
"spark_python_task": {
292
"python_file": "s3://scripts/etl.py",
293
"parameters": [
294
"--bucket", data_lake["bucket"],
295
"--prefix", data_lake["prefix"]
296
]
297
}
298
}]
299
)
300
301
return run_result.run_id
302
303
@job(
304
resource_defs={
305
"databricks": DatabricksClientResource(
306
host={"env": "DATABRICKS_HOST"},
307
token={"env": "DATABRICKS_TOKEN"}
308
),
309
"data_lake": data_lake_config
310
}
311
)
312
def etl_pipeline():
313
etl_operation()
314
```
315
316
### Legacy Resource Usage
317
318
For backwards compatibility, the traditional resource function is still available:
319
320
```python
321
from dagster import job, op
322
from dagster_databricks import databricks_client
323
324
@op(required_resource_keys={"databricks"})
325
def legacy_op(context):
326
client = context.resources.databricks
327
return client.get_run_state(12345)
328
329
@job(
330
resource_defs={
331
"databricks": databricks_client.configured({
332
"host": {"env": "DATABRICKS_HOST"},
333
"token": {"env": "DATABRICKS_TOKEN"}
334
})
335
}
336
)
337
def legacy_job():
338
legacy_op()
339
```
340
341
### Resource Testing
342
343
```python
344
from dagster import build_op_context
345
from dagster_databricks import DatabricksClientResource
346
347
def test_databricks_operation():
348
# Create resource for testing
349
databricks_resource = DatabricksClientResource(
350
host="https://test-workspace.cloud.databricks.com",
351
token="test-token"
352
)
353
354
# Build context with resource
355
context = build_op_context(
356
resources={"databricks": databricks_resource}
357
)
358
359
# Test op execution
360
result = process_data(context)
361
assert result is not None
362
```