SSH and SFTP integration for Dagster data orchestration workflows
npx @tessl/cli install tessl/pypi-dagster-ssh@0.27.0SSH and SFTP integration for Dagster data orchestration workflows. This package provides secure remote connections to servers with support for multiple authentication methods, file transfer operations, and SSH tunneling capabilities designed to seamlessly integrate with Dagster jobs and assets.
pip install dagster-sshfrom dagster_ssh import SSHResource, ssh_resourceAdditional imports:
from dagster_ssh import __version__
# or
from dagster_ssh.version import __version__from dagster_ssh import SSHResource
from dagster import Definitions, asset
# Create SSH resource with key-based authentication
ssh_resource = SSHResource(
remote_host="example.com",
username="myuser",
key_file="/path/to/private/key"
)
# Or with password authentication
ssh_resource_pwd = SSHResource(
remote_host="example.com",
username="myuser",
password="my_secure_password"
)
@asset
def download_remote_file(ssh: SSHResource):
# Download file from remote server
local_file = ssh.sftp_get("/remote/path/data.csv", "/local/path/data.csv")
return local_file
@asset
def upload_processed_file(ssh: SSHResource):
# Upload file to remote server
ssh.sftp_put("/remote/path/output.csv", "/local/path/processed.csv")
defs = Definitions(
assets=[download_remote_file, upload_processed_file],
resources={"ssh": ssh_resource}
)Modern Dagster resource using Pydantic configuration for SSH connections and file operations.
class SSHResource(ConfigurableResource):
"""
A Dagster resource for establishing SSH connections and file operations.
Args:
remote_host (str): Hostname or IP address of remote server
remote_port (Optional[int]): SSH port (default: None, uses 22)
username (Optional[str]): Username for authentication (default: None, uses system user)
password (Optional[str]): Password for authentication (default: None)
key_file (Optional[str]): Path to SSH private key file (default: None)
key_string (Optional[str]): SSH private key as string (default: None)
timeout (int): Connection timeout in seconds (default: 10)
keepalive_interval (int): Keepalive packet interval (default: 30)
compress (bool): Enable transport compression (default: True)
no_host_key_check (bool): Disable host key verification (default: True)
allow_host_key_change (bool): Allow changed host keys (default: False)
"""
def setup_for_execution(self, context: InitResourceContext) -> None:
"""
Initialize resource for execution context.
Sets up the logger, creates RSA key from string if provided,
auto-detects username if not specified, and configures SSH proxy
from ~/.ssh/config if available.
Args:
context (InitResourceContext): Dagster resource initialization context
"""
def set_logger(self, logger: logging.Logger) -> None:
"""Set logger instance for the resource."""
def get_connection(self) -> SSHClient:
"""
Open SSH connection to remote host.
Returns:
paramiko.client.SSHClient: Connected SSH client
"""
def get_tunnel(
self,
remote_port,
remote_host="localhost",
local_port=None
) -> SSHTunnelForwarder:
"""
Create SSH tunnel forwarder.
Args:
remote_port: Remote port to forward
remote_host: Remote host to connect to (default: "localhost")
local_port: Local port to bind (default: None, auto-assign)
Returns:
SSHTunnelForwarder: Configured tunnel forwarder
"""
def sftp_get(self, remote_filepath: str, local_filepath: str) -> str:
"""
Download file from remote server via SFTP.
Args:
remote_filepath (str): Path to remote file
local_filepath (str): Local destination path
Returns:
str: Path to downloaded local file
"""
def sftp_put(
self,
remote_filepath: str,
local_filepath: str,
confirm: bool = True
) -> str:
"""
Upload file to remote server via SFTP.
Args:
remote_filepath (str): Remote destination path
local_filepath (str): Path to local file
confirm (bool): Confirm file transfer (default: True)
Returns:
str: Path to uploaded local file
"""
@property
def log(self) -> logging.Logger:
"""Logger instance for the resource."""Legacy Dagster resource function for creating SSH resources from configuration.
@resource
def ssh_resource(init_context: InitResourceContext) -> SSHResource:
"""
Factory function for creating SSHResource from Dagster context.
Args:
init_context (InitResourceContext): Dagster resource initialization context
Returns:
SSHResource: Configured SSH resource instance
Configuration Schema:
remote_host (StringSource): Remote host to connect to (required)
remote_port (IntSource): SSH port (default: 22)
username (StringSource): Username for connection (optional)
password (StringSource): Password for authentication (optional)
key_file (StringSource): Key file path for authentication (optional)
key_string (StringSource): Key string for authentication (optional)
timeout (IntSource): Connection timeout (default: 10)
keepalive_interval (IntSource): Keepalive interval (default: 30)
compress (BoolSource): Enable compression (default: True)
no_host_key_check (BoolSource): Disable host key check (default: True)
allow_host_key_change (BoolSource): Allow host key changes (default: False)
"""Helper functions for SSH key management.
def key_from_str(key_str: str) -> RSAKey:
"""
Create paramiko SSH key from string representation.
Args:
key_str (str): String containing private key data
Returns:
RSAKey: RSA key object
Raises:
ValueError: If key string is invalid or cannot be parsed
"""Access to package version information.
__version__: str
# Version string for the dagster-ssh package (e.g., "0.27.9")from paramiko.client import SSHClient
from paramiko import RSAKey
from sshtunnel import SSHTunnelForwarder
from dagster._core.execution.context.init import InitResourceContext
import loggingfrom dagster_ssh import SSHResource
from dagster import asset
@asset
def connect_through_tunnel(ssh: SSHResource):
# Create SSH tunnel to access remote database
tunnel = ssh.get_tunnel(remote_port=5432, local_port=5433)
tunnel.start()
try:
# Connect to database through tunnel using localhost:5433
# Database connection code here
pass
finally:
tunnel.stop()# Key-based authentication with file
ssh_key_file = SSHResource(
remote_host="server.example.com",
username="deploy_user",
key_file="/home/user/.ssh/id_rsa"
)
# Key-based authentication with string
ssh_key_string = SSHResource(
remote_host="server.example.com",
username="deploy_user",
key_string="""-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEA...
-----END RSA PRIVATE KEY-----"""
)
# Password authentication
ssh_password = SSHResource(
remote_host="server.example.com",
username="deploy_user",
password="secure_password"
)from dagster import asset, Definitions
from dagster_ssh import SSHResource
@asset
def batch_file_transfer(ssh: SSHResource):
# Download multiple files
files = [
("/remote/data1.csv", "/local/data1.csv"),
("/remote/data2.csv", "/local/data2.csv"),
("/remote/config.json", "/local/config.json")
]
downloaded_files = []
for remote_path, local_path in files:
downloaded_file = ssh.sftp_get(remote_path, local_path)
downloaded_files.append(downloaded_file)
return downloaded_files
@asset
def upload_results(ssh: SSHResource):
# Upload processed results
ssh.sftp_put("/remote/results/output.csv", "/local/processed_data.csv")
ssh.sftp_put("/remote/logs/process.log", "/local/processing.log")from dagster import Definitions, job, op
from dagster_ssh import ssh_resource
@op
def transfer_files(context, ssh):
# Use SSH resource in operation
ssh.sftp_get("/remote/file.txt", "/local/file.txt")
@job(resource_defs={"ssh": ssh_resource})
def ssh_transfer_job():
transfer_files()
defs = Definitions(
jobs=[ssh_transfer_job],
resources={
"ssh": ssh_resource.configured({
"remote_host": "example.com",
"username": "myuser",
"key_file": "/path/to/key"
})
}
)The SSH resource supports multiple authentication approaches:
key_file pathkey_stringpassword parameter~/.ssh/configno_host_key_check=True)Common exceptions and error patterns:
key_from_str