0
# Core Client API
1
2
Low-level Databricks REST API client providing authentication, job management, file operations, and run monitoring capabilities. The DatabricksClient serves as the foundation for all Databricks interactions and supports multiple authentication methods.
3
4
## Capabilities
5
6
### DatabricksClient
7
8
The primary client class that wraps the Databricks REST API with authentication and essential operations for job management and file handling.
9
10
```python { .api }
11
class DatabricksClient:
12
"""A thin wrapper over the Databricks REST API."""
13
14
def __init__(
15
self,
16
host: Optional[str] = None,
17
token: Optional[str] = None,
18
oauth_client_id: Optional[str] = None,
19
oauth_client_secret: Optional[str] = None,
20
azure_client_id: Optional[str] = None,
21
azure_client_secret: Optional[str] = None,
22
azure_tenant_id: Optional[str] = None,
23
workspace_id: Optional[str] = None,
24
):
25
"""
26
Initialize the Databricks client with authentication credentials.
27
28
Parameters:
29
- host: Databricks workspace URL (e.g., https://your-workspace.cloud.databricks.com)
30
- token: Personal access token for authentication
31
- oauth_client_id: OAuth client ID for service principal authentication
32
- oauth_client_secret: OAuth client secret for service principal authentication
33
- azure_client_id: Azure service principal client ID
34
- azure_client_secret: Azure service principal client secret
35
- azure_tenant_id: Azure tenant ID
36
- workspace_id: Databricks workspace ID (deprecated)
37
"""
38
```
39
40
### Workspace Client Access
41
42
Access to the underlying Databricks SDK WorkspaceClient for advanced operations.
43
44
```python { .api }
45
@property
46
def workspace_client(self) -> WorkspaceClient:
47
"""
48
Retrieve a reference to the underlying Databricks Workspace client.
49
50
Returns:
51
WorkspaceClient: The authenticated Databricks SDK Workspace Client
52
"""
53
```
54
55
### File Operations
56
57
DBFS (Databricks File System) operations for reading and writing files to Databricks storage.
58
59
```python { .api }
60
def read_file(self, dbfs_path: str, block_size: int = 1024**2) -> bytes:
61
"""
62
Read a file from DBFS to a byte string.
63
64
Parameters:
65
- dbfs_path: Path to file in DBFS (can include 'dbfs://' prefix)
66
- block_size: Block size for reading in bytes (default 1MB)
67
68
Returns:
69
bytes: File contents as byte string
70
"""
71
72
def put_file(
73
self,
74
file_obj: IO,
75
dbfs_path: str,
76
overwrite: bool = False,
77
block_size: int = 1024**2
78
) -> None:
79
"""
80
Upload an arbitrary large file to DBFS.
81
82
Parameters:
83
- file_obj: File-like object to upload
84
- dbfs_path: Destination path in DBFS (can include 'dbfs://' prefix)
85
- overwrite: Whether to overwrite existing file
86
- block_size: Block size for uploading in bytes (default 1MB)
87
"""
88
```
89
90
### Run State Management
91
92
Methods for monitoring and polling Databricks job run states.
93
94
```python { .api }
95
def get_run_state(self, databricks_run_id: int) -> DatabricksRunState:
96
"""
97
Get the state of a run by Databricks run ID.
98
99
Parameters:
100
- databricks_run_id: ID of the Databricks run
101
102
Returns:
103
DatabricksRunState: Object containing lifecycle state, result state, and message
104
"""
105
106
def poll_run_state(
107
self,
108
logger: logging.Logger,
109
start_poll_time: float,
110
databricks_run_id: int,
111
max_wait_time_sec: float,
112
verbose_logs: bool = True,
113
) -> bool:
114
"""
115
Poll the state of a run once and return whether it completed successfully.
116
117
Parameters:
118
- logger: Logger for status messages
119
- start_poll_time: Start time for timeout calculation
120
- databricks_run_id: ID of the Databricks run
121
- max_wait_time_sec: Maximum time to wait before timing out
122
- verbose_logs: Whether to log detailed status messages
123
124
Returns:
125
bool: True if run completed successfully, False if still running
126
127
Raises:
128
DatabricksError: If run failed or timed out
129
"""
130
131
def wait_for_run_to_complete(
132
self,
133
logger: logging.Logger,
134
databricks_run_id: int,
135
poll_interval_sec: float,
136
max_wait_time_sec: float,
137
verbose_logs: bool = True,
138
) -> None:
139
"""
140
Wait for a Databricks run to complete by polling its state.
141
142
Parameters:
143
- logger: Logger for status messages
144
- databricks_run_id: ID of the Databricks run
145
- poll_interval_sec: Time between polling attempts
146
- max_wait_time_sec: Maximum time to wait before timing out
147
- verbose_logs: Whether to log detailed status messages
148
149
Raises:
150
DatabricksError: If run failed or timed out
151
"""
152
```
153
154
## Usage Examples
155
156
### Basic Client Setup
157
158
```python
159
from dagster_databricks import DatabricksClient
160
161
# Using personal access token
162
client = DatabricksClient(
163
host="https://your-workspace.cloud.databricks.com",
164
token="your-access-token"
165
)
166
167
# Using OAuth service principal
168
client = DatabricksClient(
169
host="https://your-workspace.cloud.databricks.com",
170
oauth_client_id="your-client-id",
171
oauth_client_secret="your-client-secret"
172
)
173
174
# Using Azure service principal
175
client = DatabricksClient(
176
host="https://your-workspace.cloud.databricks.com",
177
azure_client_id="your-azure-client-id",
178
azure_client_secret="your-azure-client-secret",
179
azure_tenant_id="your-azure-tenant-id"
180
)
181
```
182
183
### File Operations
184
185
```python
186
# Read a file from DBFS
187
file_contents = client.read_file("/path/to/file.txt")
188
text_content = file_contents.decode('utf-8')
189
190
# Upload a file to DBFS
191
with open("local_file.txt", "rb") as f:
192
client.put_file(f, "/dbfs/path/to/destination.txt", overwrite=True)
193
```
194
195
### Run Monitoring
196
197
```python
198
import logging
199
200
logger = logging.getLogger(__name__)
201
202
# Get current state of a run
203
run_state = client.get_run_state(run_id=12345)
204
print(f"Run state: {run_state.life_cycle_state}")
205
print(f"Result: {run_state.result_state}")
206
207
# Wait for run to complete
208
client.wait_for_run_to_complete(
209
logger=logger,
210
databricks_run_id=12345,
211
poll_interval_sec=10,
212
max_wait_time_sec=3600, # 1 hour timeout
213
verbose_logs=True
214
)
215
```