or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-client.mdindex.mdjob-management.mdop-factories.mdpipes-integration.mdpyspark-step-launcher.mdresource-management.md

core-client.mddocs/

0

# Core Client API

1

2

Low-level Databricks REST API client providing authentication, job management, file operations, and run monitoring capabilities. The DatabricksClient serves as the foundation for all Databricks interactions and supports multiple authentication methods.

3

4

## Capabilities

5

6

### DatabricksClient

7

8

The primary client class that wraps the Databricks REST API with authentication and essential operations for job management and file handling.

9

10

```python { .api }

11

class DatabricksClient:

12

"""A thin wrapper over the Databricks REST API."""

13

14

def __init__(

15

self,

16

host: Optional[str] = None,

17

token: Optional[str] = None,

18

oauth_client_id: Optional[str] = None,

19

oauth_client_secret: Optional[str] = None,

20

azure_client_id: Optional[str] = None,

21

azure_client_secret: Optional[str] = None,

22

azure_tenant_id: Optional[str] = None,

23

workspace_id: Optional[str] = None,

24

):

25

"""

26

Initialize the Databricks client with authentication credentials.

27

28

Parameters:

29

- host: Databricks workspace URL (e.g., https://your-workspace.cloud.databricks.com)

30

- token: Personal access token for authentication

31

- oauth_client_id: OAuth client ID for service principal authentication

32

- oauth_client_secret: OAuth client secret for service principal authentication

33

- azure_client_id: Azure service principal client ID

34

- azure_client_secret: Azure service principal client secret

35

- azure_tenant_id: Azure tenant ID

36

- workspace_id: Databricks workspace ID (deprecated)

37

"""

38

```

39

40

### Workspace Client Access

41

42

Access to the underlying Databricks SDK WorkspaceClient for advanced operations.

43

44

```python { .api }

45

@property

46

def workspace_client(self) -> WorkspaceClient:

47

"""

48

Retrieve a reference to the underlying Databricks Workspace client.

49

50

Returns:

51

WorkspaceClient: The authenticated Databricks SDK Workspace Client

52

"""

53

```

54

55

### File Operations

56

57

DBFS (Databricks File System) operations for reading and writing files to Databricks storage.

58

59

```python { .api }

60

def read_file(self, dbfs_path: str, block_size: int = 1024**2) -> bytes:

61

"""

62

Read a file from DBFS to a byte string.

63

64

Parameters:

65

- dbfs_path: Path to file in DBFS (can include 'dbfs://' prefix)

66

- block_size: Block size for reading in bytes (default 1MB)

67

68

Returns:

69

bytes: File contents as byte string

70

"""

71

72

def put_file(

73

self,

74

file_obj: IO,

75

dbfs_path: str,

76

overwrite: bool = False,

77

block_size: int = 1024**2

78

) -> None:

79

"""

80

Upload an arbitrary large file to DBFS.

81

82

Parameters:

83

- file_obj: File-like object to upload

84

- dbfs_path: Destination path in DBFS (can include 'dbfs://' prefix)

85

- overwrite: Whether to overwrite existing file

86

- block_size: Block size for uploading in bytes (default 1MB)

87

"""

88

```

89

90

### Run State Management

91

92

Methods for monitoring and polling Databricks job run states.

93

94

```python { .api }

95

def get_run_state(self, databricks_run_id: int) -> DatabricksRunState:

96

"""

97

Get the state of a run by Databricks run ID.

98

99

Parameters:

100

- databricks_run_id: ID of the Databricks run

101

102

Returns:

103

DatabricksRunState: Object containing lifecycle state, result state, and message

104

"""

105

106

def poll_run_state(

107

self,

108

logger: logging.Logger,

109

start_poll_time: float,

110

databricks_run_id: int,

111

max_wait_time_sec: float,

112

verbose_logs: bool = True,

113

) -> bool:

114

"""

115

Poll the state of a run once and return whether it completed successfully.

116

117

Parameters:

118

- logger: Logger for status messages

119

- start_poll_time: Start time for timeout calculation

120

- databricks_run_id: ID of the Databricks run

121

- max_wait_time_sec: Maximum time to wait before timing out

122

- verbose_logs: Whether to log detailed status messages

123

124

Returns:

125

bool: True if run completed successfully, False if still running

126

127

Raises:

128

DatabricksError: If run failed or timed out

129

"""

130

131

def wait_for_run_to_complete(

132

self,

133

logger: logging.Logger,

134

databricks_run_id: int,

135

poll_interval_sec: float,

136

max_wait_time_sec: float,

137

verbose_logs: bool = True,

138

) -> None:

139

"""

140

Wait for a Databricks run to complete by polling its state.

141

142

Parameters:

143

- logger: Logger for status messages

144

- databricks_run_id: ID of the Databricks run

145

- poll_interval_sec: Time between polling attempts

146

- max_wait_time_sec: Maximum time to wait before timing out

147

- verbose_logs: Whether to log detailed status messages

148

149

Raises:

150

DatabricksError: If run failed or timed out

151

"""

152

```

153

154

## Usage Examples

155

156

### Basic Client Setup

157

158

```python

159

from dagster_databricks import DatabricksClient

160

161

# Using personal access token

162

client = DatabricksClient(

163

host="https://your-workspace.cloud.databricks.com",

164

token="your-access-token"

165

)

166

167

# Using OAuth service principal

168

client = DatabricksClient(

169

host="https://your-workspace.cloud.databricks.com",

170

oauth_client_id="your-client-id",

171

oauth_client_secret="your-client-secret"

172

)

173

174

# Using Azure service principal

175

client = DatabricksClient(

176

host="https://your-workspace.cloud.databricks.com",

177

azure_client_id="your-azure-client-id",

178

azure_client_secret="your-azure-client-secret",

179

azure_tenant_id="your-azure-tenant-id"

180

)

181

```

182

183

### File Operations

184

185

```python

186

# Read a file from DBFS

187

file_contents = client.read_file("/path/to/file.txt")

188

text_content = file_contents.decode('utf-8')

189

190

# Upload a file to DBFS

191

with open("local_file.txt", "rb") as f:

192

client.put_file(f, "/dbfs/path/to/destination.txt", overwrite=True)

193

```

194

195

### Run Monitoring

196

197

```python

198

import logging

199

200

logger = logging.getLogger(__name__)

201

202

# Get current state of a run

203

run_state = client.get_run_state(run_id=12345)

204

print(f"Run state: {run_state.life_cycle_state}")

205

print(f"Result: {run_state.result_state}")

206

207

# Wait for run to complete

208

client.wait_for_run_to_complete(

209

logger=logger,

210

databricks_run_id=12345,

211

poll_interval_sec=10,

212

max_wait_time_sec=3600, # 1 hour timeout

213

verbose_logs=True

214

)

215

```