or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Dagster SSH

1

2

SSH and SFTP integration for Dagster data orchestration workflows. This package provides secure remote connections to servers with support for multiple authentication methods, file transfer operations, and SSH tunneling capabilities designed to seamlessly integrate with Dagster jobs and assets.

3

4

## Package Information

5

6

- **Package Name**: dagster-ssh

7

- **Language**: Python

8

- **Installation**: `pip install dagster-ssh`

9

- **Dependencies**: dagster==1.11.9, sshtunnel, paramiko

10

11

## Core Imports

12

13

```python

14

from dagster_ssh import SSHResource, ssh_resource

15

```

16

17

Additional imports:

18

19

```python

20

from dagster_ssh import __version__

21

# or

22

from dagster_ssh.version import __version__

23

```

24

25

## Basic Usage

26

27

```python

28

from dagster_ssh import SSHResource

29

from dagster import Definitions, asset

30

31

# Create SSH resource with key-based authentication

32

ssh_resource = SSHResource(

33

remote_host="example.com",

34

username="myuser",

35

key_file="/path/to/private/key"

36

)

37

38

# Or with password authentication

39

ssh_resource_pwd = SSHResource(

40

remote_host="example.com",

41

username="myuser",

42

password="my_secure_password"

43

)

44

45

@asset

46

def download_remote_file(ssh: SSHResource):

47

# Download file from remote server

48

local_file = ssh.sftp_get("/remote/path/data.csv", "/local/path/data.csv")

49

return local_file

50

51

@asset

52

def upload_processed_file(ssh: SSHResource):

53

# Upload file to remote server

54

ssh.sftp_put("/remote/path/output.csv", "/local/path/processed.csv")

55

56

defs = Definitions(

57

assets=[download_remote_file, upload_processed_file],

58

resources={"ssh": ssh_resource}

59

)

60

```

61

62

## Capabilities

63

64

### SSH Resource Class

65

66

Modern Dagster resource using Pydantic configuration for SSH connections and file operations.

67

68

```python { .api }

69

class SSHResource(ConfigurableResource):

70

"""

71

A Dagster resource for establishing SSH connections and file operations.

72

73

Args:

74

remote_host (str): Hostname or IP address of remote server

75

remote_port (Optional[int]): SSH port (default: None, uses 22)

76

username (Optional[str]): Username for authentication (default: None, uses system user)

77

password (Optional[str]): Password for authentication (default: None)

78

key_file (Optional[str]): Path to SSH private key file (default: None)

79

key_string (Optional[str]): SSH private key as string (default: None)

80

timeout (int): Connection timeout in seconds (default: 10)

81

keepalive_interval (int): Keepalive packet interval (default: 30)

82

compress (bool): Enable transport compression (default: True)

83

no_host_key_check (bool): Disable host key verification (default: True)

84

allow_host_key_change (bool): Allow changed host keys (default: False)

85

"""

86

87

def setup_for_execution(self, context: InitResourceContext) -> None:

88

"""

89

Initialize resource for execution context.

90

91

Sets up the logger, creates RSA key from string if provided,

92

auto-detects username if not specified, and configures SSH proxy

93

from ~/.ssh/config if available.

94

95

Args:

96

context (InitResourceContext): Dagster resource initialization context

97

"""

98

99

def set_logger(self, logger: logging.Logger) -> None:

100

"""Set logger instance for the resource."""

101

102

def get_connection(self) -> SSHClient:

103

"""

104

Open SSH connection to remote host.

105

106

Returns:

107

paramiko.client.SSHClient: Connected SSH client

108

"""

109

110

def get_tunnel(

111

self,

112

remote_port,

113

remote_host="localhost",

114

local_port=None

115

) -> SSHTunnelForwarder:

116

"""

117

Create SSH tunnel forwarder.

118

119

Args:

120

remote_port: Remote port to forward

121

remote_host: Remote host to connect to (default: "localhost")

122

local_port: Local port to bind (default: None, auto-assign)

123

124

Returns:

125

SSHTunnelForwarder: Configured tunnel forwarder

126

"""

127

128

def sftp_get(self, remote_filepath: str, local_filepath: str) -> str:

129

"""

130

Download file from remote server via SFTP.

131

132

Args:

133

remote_filepath (str): Path to remote file

134

local_filepath (str): Local destination path

135

136

Returns:

137

str: Path to downloaded local file

138

"""

139

140

def sftp_put(

141

self,

142

remote_filepath: str,

143

local_filepath: str,

144

confirm: bool = True

145

) -> str:

146

"""

147

Upload file to remote server via SFTP.

148

149

Args:

150

remote_filepath (str): Remote destination path

151

local_filepath (str): Path to local file

152

confirm (bool): Confirm file transfer (default: True)

153

154

Returns:

155

str: Path to uploaded local file

156

"""

157

158

@property

159

def log(self) -> logging.Logger:

160

"""Logger instance for the resource."""

161

```

162

163

### SSH Resource Factory Function

164

165

Legacy Dagster resource function for creating SSH resources from configuration.

166

167

```python { .api }

168

@resource

169

def ssh_resource(init_context: InitResourceContext) -> SSHResource:

170

"""

171

Factory function for creating SSHResource from Dagster context.

172

173

Args:

174

init_context (InitResourceContext): Dagster resource initialization context

175

176

Returns:

177

SSHResource: Configured SSH resource instance

178

179

Configuration Schema:

180

remote_host (StringSource): Remote host to connect to (required)

181

remote_port (IntSource): SSH port (default: 22)

182

username (StringSource): Username for connection (optional)

183

password (StringSource): Password for authentication (optional)

184

key_file (StringSource): Key file path for authentication (optional)

185

key_string (StringSource): Key string for authentication (optional)

186

timeout (IntSource): Connection timeout (default: 10)

187

keepalive_interval (IntSource): Keepalive interval (default: 30)

188

compress (BoolSource): Enable compression (default: True)

189

no_host_key_check (BoolSource): Disable host key check (default: True)

190

allow_host_key_change (BoolSource): Allow host key changes (default: False)

191

"""

192

```

193

194

### Utility Functions

195

196

Helper functions for SSH key management.

197

198

```python { .api }

199

def key_from_str(key_str: str) -> RSAKey:

200

"""

201

Create paramiko SSH key from string representation.

202

203

Args:

204

key_str (str): String containing private key data

205

206

Returns:

207

RSAKey: RSA key object

208

209

Raises:

210

ValueError: If key string is invalid or cannot be parsed

211

"""

212

```

213

214

### Version Information

215

216

Access to package version information.

217

218

```python { .api }

219

__version__: str

220

# Version string for the dagster-ssh package (e.g., "0.27.9")

221

```

222

223

## Types

224

225

```python { .api }

226

from paramiko.client import SSHClient

227

from paramiko import RSAKey

228

from sshtunnel import SSHTunnelForwarder

229

from dagster._core.execution.context.init import InitResourceContext

230

import logging

231

```

232

233

## Advanced Usage Examples

234

235

### SSH Tunneling

236

237

```python

238

from dagster_ssh import SSHResource

239

from dagster import asset

240

241

@asset

242

def connect_through_tunnel(ssh: SSHResource):

243

# Create SSH tunnel to access remote database

244

tunnel = ssh.get_tunnel(remote_port=5432, local_port=5433)

245

tunnel.start()

246

247

try:

248

# Connect to database through tunnel using localhost:5433

249

# Database connection code here

250

pass

251

finally:

252

tunnel.stop()

253

```

254

255

### Multiple Authentication Methods

256

257

```python

258

# Key-based authentication with file

259

ssh_key_file = SSHResource(

260

remote_host="server.example.com",

261

username="deploy_user",

262

key_file="/home/user/.ssh/id_rsa"

263

)

264

265

# Key-based authentication with string

266

ssh_key_string = SSHResource(

267

remote_host="server.example.com",

268

username="deploy_user",

269

key_string="""-----BEGIN RSA PRIVATE KEY-----

270

MIIEpAIBAAKCAQEA...

271

-----END RSA PRIVATE KEY-----"""

272

)

273

274

# Password authentication

275

ssh_password = SSHResource(

276

remote_host="server.example.com",

277

username="deploy_user",

278

password="secure_password"

279

)

280

```

281

282

### File Transfer Operations

283

284

```python

285

from dagster import asset, Definitions

286

from dagster_ssh import SSHResource

287

288

@asset

289

def batch_file_transfer(ssh: SSHResource):

290

# Download multiple files

291

files = [

292

("/remote/data1.csv", "/local/data1.csv"),

293

("/remote/data2.csv", "/local/data2.csv"),

294

("/remote/config.json", "/local/config.json")

295

]

296

297

downloaded_files = []

298

for remote_path, local_path in files:

299

downloaded_file = ssh.sftp_get(remote_path, local_path)

300

downloaded_files.append(downloaded_file)

301

302

return downloaded_files

303

304

@asset

305

def upload_results(ssh: SSHResource):

306

# Upload processed results

307

ssh.sftp_put("/remote/results/output.csv", "/local/processed_data.csv")

308

ssh.sftp_put("/remote/logs/process.log", "/local/processing.log")

309

```

310

311

### Legacy Resource Configuration

312

313

```python

314

from dagster import Definitions, job, op

315

from dagster_ssh import ssh_resource

316

317

@op

318

def transfer_files(context, ssh):

319

# Use SSH resource in operation

320

ssh.sftp_get("/remote/file.txt", "/local/file.txt")

321

322

@job(resource_defs={"ssh": ssh_resource})

323

def ssh_transfer_job():

324

transfer_files()

325

326

defs = Definitions(

327

jobs=[ssh_transfer_job],

328

resources={

329

"ssh": ssh_resource.configured({

330

"remote_host": "example.com",

331

"username": "myuser",

332

"key_file": "/path/to/key"

333

})

334

}

335

)

336

```

337

338

## Authentication Methods

339

340

The SSH resource supports multiple authentication approaches:

341

342

1. **SSH Key File**: Most secure, specify `key_file` path

343

2. **SSH Key String**: Secure, embed key content in `key_string`

344

3. **Password**: Less secure, use `password` parameter

345

4. **SSH Config**: Automatic detection from `~/.ssh/config`

346

5. **System Keys**: Automatic key discovery via paramiko

347

348

## Security Considerations

349

350

- **Host Key Verification**: Default configuration disables host key checking (`no_host_key_check=True`)

351

- **Key-based Authentication**: Preferred over password authentication

352

- **Private Key Security**: Store keys securely, avoid hardcoding in source

353

- **Network Security**: Use appropriate network isolation and firewall rules

354

- **Connection Timeout**: Configure appropriate timeout values to prevent hanging connections

355

356

## Error Handling

357

358

Common exceptions and error patterns:

359

360

- **Connection Errors**: Network connectivity, authentication failures

361

- **File Transfer Errors**: Permission issues, disk space, invalid paths

362

- **Key Format Errors**: Invalid private key format in `key_from_str`

363

- **Timeout Errors**: Connection or operation timeouts