SSH and SFTP integration for Dagster data orchestration workflows
npx @tessl/cli install tessl/pypi-dagster-ssh@0.27.00
# Dagster SSH
1
2
SSH and SFTP integration for Dagster data orchestration workflows. This package provides secure remote connections to servers with support for multiple authentication methods, file transfer operations, and SSH tunneling capabilities designed to seamlessly integrate with Dagster jobs and assets.
3
4
## Package Information
5
6
- **Package Name**: dagster-ssh
7
- **Language**: Python
8
- **Installation**: `pip install dagster-ssh`
9
- **Dependencies**: dagster==1.11.9, sshtunnel, paramiko
10
11
## Core Imports
12
13
```python
14
from dagster_ssh import SSHResource, ssh_resource
15
```
16
17
Additional imports:
18
19
```python
20
from dagster_ssh import __version__
21
# or
22
from dagster_ssh.version import __version__
23
```
24
25
## Basic Usage
26
27
```python
28
from dagster_ssh import SSHResource
29
from dagster import Definitions, asset
30
31
# Create SSH resource with key-based authentication
32
ssh_resource = SSHResource(
33
remote_host="example.com",
34
username="myuser",
35
key_file="/path/to/private/key"
36
)
37
38
# Or with password authentication
39
ssh_resource_pwd = SSHResource(
40
remote_host="example.com",
41
username="myuser",
42
password="my_secure_password"
43
)
44
45
@asset
46
def download_remote_file(ssh: SSHResource):
47
# Download file from remote server
48
local_file = ssh.sftp_get("/remote/path/data.csv", "/local/path/data.csv")
49
return local_file
50
51
@asset
52
def upload_processed_file(ssh: SSHResource):
53
# Upload file to remote server
54
ssh.sftp_put("/remote/path/output.csv", "/local/path/processed.csv")
55
56
defs = Definitions(
57
assets=[download_remote_file, upload_processed_file],
58
resources={"ssh": ssh_resource}
59
)
60
```
61
62
## Capabilities
63
64
### SSH Resource Class
65
66
Modern Dagster resource using Pydantic configuration for SSH connections and file operations.
67
68
```python { .api }
69
class SSHResource(ConfigurableResource):
70
"""
71
A Dagster resource for establishing SSH connections and file operations.
72
73
Args:
74
remote_host (str): Hostname or IP address of remote server
75
remote_port (Optional[int]): SSH port (default: None, uses 22)
76
username (Optional[str]): Username for authentication (default: None, uses system user)
77
password (Optional[str]): Password for authentication (default: None)
78
key_file (Optional[str]): Path to SSH private key file (default: None)
79
key_string (Optional[str]): SSH private key as string (default: None)
80
timeout (int): Connection timeout in seconds (default: 10)
81
keepalive_interval (int): Keepalive packet interval (default: 30)
82
compress (bool): Enable transport compression (default: True)
83
no_host_key_check (bool): Disable host key verification (default: True)
84
allow_host_key_change (bool): Allow changed host keys (default: False)
85
"""
86
87
def setup_for_execution(self, context: InitResourceContext) -> None:
88
"""
89
Initialize resource for execution context.
90
91
Sets up the logger, creates RSA key from string if provided,
92
auto-detects username if not specified, and configures SSH proxy
93
from ~/.ssh/config if available.
94
95
Args:
96
context (InitResourceContext): Dagster resource initialization context
97
"""
98
99
def set_logger(self, logger: logging.Logger) -> None:
100
"""Set logger instance for the resource."""
101
102
def get_connection(self) -> SSHClient:
103
"""
104
Open SSH connection to remote host.
105
106
Returns:
107
paramiko.client.SSHClient: Connected SSH client
108
"""
109
110
def get_tunnel(
111
self,
112
remote_port,
113
remote_host="localhost",
114
local_port=None
115
) -> SSHTunnelForwarder:
116
"""
117
Create SSH tunnel forwarder.
118
119
Args:
120
remote_port: Remote port to forward
121
remote_host: Remote host to connect to (default: "localhost")
122
local_port: Local port to bind (default: None, auto-assign)
123
124
Returns:
125
SSHTunnelForwarder: Configured tunnel forwarder
126
"""
127
128
def sftp_get(self, remote_filepath: str, local_filepath: str) -> str:
129
"""
130
Download file from remote server via SFTP.
131
132
Args:
133
remote_filepath (str): Path to remote file
134
local_filepath (str): Local destination path
135
136
Returns:
137
str: Path to downloaded local file
138
"""
139
140
def sftp_put(
141
self,
142
remote_filepath: str,
143
local_filepath: str,
144
confirm: bool = True
145
) -> str:
146
"""
147
Upload file to remote server via SFTP.
148
149
Args:
150
remote_filepath (str): Remote destination path
151
local_filepath (str): Path to local file
152
confirm (bool): Confirm file transfer (default: True)
153
154
Returns:
155
str: Path to uploaded local file
156
"""
157
158
@property
159
def log(self) -> logging.Logger:
160
"""Logger instance for the resource."""
161
```
162
163
### SSH Resource Factory Function
164
165
Legacy Dagster resource function for creating SSH resources from configuration.
166
167
```python { .api }
168
@resource
169
def ssh_resource(init_context: InitResourceContext) -> SSHResource:
170
"""
171
Factory function for creating SSHResource from Dagster context.
172
173
Args:
174
init_context (InitResourceContext): Dagster resource initialization context
175
176
Returns:
177
SSHResource: Configured SSH resource instance
178
179
Configuration Schema:
180
remote_host (StringSource): Remote host to connect to (required)
181
remote_port (IntSource): SSH port (default: 22)
182
username (StringSource): Username for connection (optional)
183
password (StringSource): Password for authentication (optional)
184
key_file (StringSource): Key file path for authentication (optional)
185
key_string (StringSource): Key string for authentication (optional)
186
timeout (IntSource): Connection timeout (default: 10)
187
keepalive_interval (IntSource): Keepalive interval (default: 30)
188
compress (BoolSource): Enable compression (default: True)
189
no_host_key_check (BoolSource): Disable host key check (default: True)
190
allow_host_key_change (BoolSource): Allow host key changes (default: False)
191
"""
192
```
193
194
### Utility Functions
195
196
Helper functions for SSH key management.
197
198
```python { .api }
199
def key_from_str(key_str: str) -> RSAKey:
200
"""
201
Create paramiko SSH key from string representation.
202
203
Args:
204
key_str (str): String containing private key data
205
206
Returns:
207
RSAKey: RSA key object
208
209
Raises:
210
ValueError: If key string is invalid or cannot be parsed
211
"""
212
```
213
214
### Version Information
215
216
Access to package version information.
217
218
```python { .api }
219
__version__: str
220
# Version string for the dagster-ssh package (e.g., "0.27.9")
221
```
222
223
## Types
224
225
```python { .api }
226
from paramiko.client import SSHClient
227
from paramiko import RSAKey
228
from sshtunnel import SSHTunnelForwarder
229
from dagster._core.execution.context.init import InitResourceContext
230
import logging
231
```
232
233
## Advanced Usage Examples
234
235
### SSH Tunneling
236
237
```python
238
from dagster_ssh import SSHResource
239
from dagster import asset
240
241
@asset
242
def connect_through_tunnel(ssh: SSHResource):
243
# Create SSH tunnel to access remote database
244
tunnel = ssh.get_tunnel(remote_port=5432, local_port=5433)
245
tunnel.start()
246
247
try:
248
# Connect to database through tunnel using localhost:5433
249
# Database connection code here
250
pass
251
finally:
252
tunnel.stop()
253
```
254
255
### Multiple Authentication Methods
256
257
```python
258
# Key-based authentication with file
259
ssh_key_file = SSHResource(
260
remote_host="server.example.com",
261
username="deploy_user",
262
key_file="/home/user/.ssh/id_rsa"
263
)
264
265
# Key-based authentication with string
266
ssh_key_string = SSHResource(
267
remote_host="server.example.com",
268
username="deploy_user",
269
key_string="""-----BEGIN RSA PRIVATE KEY-----
270
MIIEpAIBAAKCAQEA...
271
-----END RSA PRIVATE KEY-----"""
272
)
273
274
# Password authentication
275
ssh_password = SSHResource(
276
remote_host="server.example.com",
277
username="deploy_user",
278
password="secure_password"
279
)
280
```
281
282
### File Transfer Operations
283
284
```python
285
from dagster import asset, Definitions
286
from dagster_ssh import SSHResource
287
288
@asset
289
def batch_file_transfer(ssh: SSHResource):
290
# Download multiple files
291
files = [
292
("/remote/data1.csv", "/local/data1.csv"),
293
("/remote/data2.csv", "/local/data2.csv"),
294
("/remote/config.json", "/local/config.json")
295
]
296
297
downloaded_files = []
298
for remote_path, local_path in files:
299
downloaded_file = ssh.sftp_get(remote_path, local_path)
300
downloaded_files.append(downloaded_file)
301
302
return downloaded_files
303
304
@asset
305
def upload_results(ssh: SSHResource):
306
# Upload processed results
307
ssh.sftp_put("/remote/results/output.csv", "/local/processed_data.csv")
308
ssh.sftp_put("/remote/logs/process.log", "/local/processing.log")
309
```
310
311
### Legacy Resource Configuration
312
313
```python
314
from dagster import Definitions, job, op
315
from dagster_ssh import ssh_resource
316
317
@op
318
def transfer_files(context, ssh):
319
# Use SSH resource in operation
320
ssh.sftp_get("/remote/file.txt", "/local/file.txt")
321
322
@job(resource_defs={"ssh": ssh_resource})
323
def ssh_transfer_job():
324
transfer_files()
325
326
defs = Definitions(
327
jobs=[ssh_transfer_job],
328
resources={
329
"ssh": ssh_resource.configured({
330
"remote_host": "example.com",
331
"username": "myuser",
332
"key_file": "/path/to/key"
333
})
334
}
335
)
336
```
337
338
## Authentication Methods
339
340
The SSH resource supports multiple authentication approaches:
341
342
1. **SSH Key File**: Most secure, specify `key_file` path
343
2. **SSH Key String**: Secure, embed key content in `key_string`
344
3. **Password**: Less secure, use `password` parameter
345
4. **SSH Config**: Automatic detection from `~/.ssh/config`
346
5. **System Keys**: Automatic key discovery via paramiko
347
348
## Security Considerations
349
350
- **Host Key Verification**: Default configuration disables host key checking (`no_host_key_check=True`)
351
- **Key-based Authentication**: Preferred over password authentication
352
- **Private Key Security**: Store keys securely, avoid hardcoding in source
353
- **Network Security**: Use appropriate network isolation and firewall rules
354
- **Connection Timeout**: Configure appropriate timeout values to prevent hanging connections
355
356
## Error Handling
357
358
Common exceptions and error patterns:
359
360
- **Connection Errors**: Network connectivity, authentication failures
361
- **File Transfer Errors**: Permission issues, disk space, invalid paths
362
- **Key Format Errors**: Invalid private key format in `key_from_str`
363
- **Timeout Errors**: Connection or operation timeouts