or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-dagster-ssh

SSH and SFTP integration for Dagster data orchestration workflows

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/dagster-ssh@0.27.x

To install, run

npx @tessl/cli install tessl/pypi-dagster-ssh@0.27.0

index.mddocs/

Dagster SSH

SSH and SFTP integration for Dagster data orchestration workflows. This package provides secure remote connections to servers with support for multiple authentication methods, file transfer operations, and SSH tunneling capabilities designed to seamlessly integrate with Dagster jobs and assets.

Package Information

  • Package Name: dagster-ssh
  • Language: Python
  • Installation: pip install dagster-ssh
  • Dependencies: dagster==1.11.9, sshtunnel, paramiko

Core Imports

from dagster_ssh import SSHResource, ssh_resource

Additional imports:

from dagster_ssh import __version__
# or
from dagster_ssh.version import __version__

Basic Usage

from dagster_ssh import SSHResource
from dagster import Definitions, asset

# Create SSH resource with key-based authentication
ssh_resource = SSHResource(
    remote_host="example.com",
    username="myuser", 
    key_file="/path/to/private/key"
)

# Or with password authentication
ssh_resource_pwd = SSHResource(
    remote_host="example.com",
    username="myuser",
    password="my_secure_password"
)

@asset
def download_remote_file(ssh: SSHResource):
    # Download file from remote server
    local_file = ssh.sftp_get("/remote/path/data.csv", "/local/path/data.csv")
    return local_file

@asset  
def upload_processed_file(ssh: SSHResource):
    # Upload file to remote server
    ssh.sftp_put("/remote/path/output.csv", "/local/path/processed.csv")

defs = Definitions(
    assets=[download_remote_file, upload_processed_file],
    resources={"ssh": ssh_resource}
)

Capabilities

SSH Resource Class

Modern Dagster resource using Pydantic configuration for SSH connections and file operations.

class SSHResource(ConfigurableResource):
    """
    A Dagster resource for establishing SSH connections and file operations.
    
    Args:
        remote_host (str): Hostname or IP address of remote server
        remote_port (Optional[int]): SSH port (default: None, uses 22)
        username (Optional[str]): Username for authentication (default: None, uses system user)
        password (Optional[str]): Password for authentication (default: None)
        key_file (Optional[str]): Path to SSH private key file (default: None)
        key_string (Optional[str]): SSH private key as string (default: None)
        timeout (int): Connection timeout in seconds (default: 10)
        keepalive_interval (int): Keepalive packet interval (default: 30)
        compress (bool): Enable transport compression (default: True)
        no_host_key_check (bool): Disable host key verification (default: True)
        allow_host_key_change (bool): Allow changed host keys (default: False)
    """
    
    def setup_for_execution(self, context: InitResourceContext) -> None:
        """
        Initialize resource for execution context.
        
        Sets up the logger, creates RSA key from string if provided,
        auto-detects username if not specified, and configures SSH proxy
        from ~/.ssh/config if available.
        
        Args:
            context (InitResourceContext): Dagster resource initialization context
        """
    
    def set_logger(self, logger: logging.Logger) -> None:
        """Set logger instance for the resource."""
    
    def get_connection(self) -> SSHClient:
        """
        Open SSH connection to remote host.
        
        Returns:
            paramiko.client.SSHClient: Connected SSH client
        """
    
    def get_tunnel(
        self, 
        remote_port, 
        remote_host="localhost", 
        local_port=None
    ) -> SSHTunnelForwarder:
        """
        Create SSH tunnel forwarder.
        
        Args:
            remote_port: Remote port to forward
            remote_host: Remote host to connect to (default: "localhost")
            local_port: Local port to bind (default: None, auto-assign)
            
        Returns:
            SSHTunnelForwarder: Configured tunnel forwarder
        """
    
    def sftp_get(self, remote_filepath: str, local_filepath: str) -> str:
        """
        Download file from remote server via SFTP.
        
        Args:
            remote_filepath (str): Path to remote file
            local_filepath (str): Local destination path
            
        Returns:
            str: Path to downloaded local file
        """
    
    def sftp_put(
        self, 
        remote_filepath: str, 
        local_filepath: str, 
        confirm: bool = True
    ) -> str:
        """
        Upload file to remote server via SFTP.
        
        Args:
            remote_filepath (str): Remote destination path
            local_filepath (str): Path to local file
            confirm (bool): Confirm file transfer (default: True)
            
        Returns:
            str: Path to uploaded local file
        """

    @property
    def log(self) -> logging.Logger:
        """Logger instance for the resource."""

SSH Resource Factory Function

Legacy Dagster resource function for creating SSH resources from configuration.

@resource
def ssh_resource(init_context: InitResourceContext) -> SSHResource:
    """
    Factory function for creating SSHResource from Dagster context.
    
    Args:
        init_context (InitResourceContext): Dagster resource initialization context
        
    Returns:
        SSHResource: Configured SSH resource instance
        
    Configuration Schema:
        remote_host (StringSource): Remote host to connect to (required)
        remote_port (IntSource): SSH port (default: 22)
        username (StringSource): Username for connection (optional)
        password (StringSource): Password for authentication (optional)
        key_file (StringSource): Key file path for authentication (optional)
        key_string (StringSource): Key string for authentication (optional)
        timeout (IntSource): Connection timeout (default: 10)
        keepalive_interval (IntSource): Keepalive interval (default: 30)
        compress (BoolSource): Enable compression (default: True)
        no_host_key_check (BoolSource): Disable host key check (default: True)
        allow_host_key_change (BoolSource): Allow host key changes (default: False)
    """

Utility Functions

Helper functions for SSH key management.

def key_from_str(key_str: str) -> RSAKey:
    """
    Create paramiko SSH key from string representation.
    
    Args:
        key_str (str): String containing private key data
        
    Returns:
        RSAKey: RSA key object
        
    Raises:
        ValueError: If key string is invalid or cannot be parsed
    """

Version Information

Access to package version information.

__version__: str
# Version string for the dagster-ssh package (e.g., "0.27.9")

Types

from paramiko.client import SSHClient
from paramiko import RSAKey
from sshtunnel import SSHTunnelForwarder
from dagster._core.execution.context.init import InitResourceContext
import logging

Advanced Usage Examples

SSH Tunneling

from dagster_ssh import SSHResource
from dagster import asset

@asset
def connect_through_tunnel(ssh: SSHResource):
    # Create SSH tunnel to access remote database
    tunnel = ssh.get_tunnel(remote_port=5432, local_port=5433)
    tunnel.start()
    
    try:
        # Connect to database through tunnel using localhost:5433
        # Database connection code here
        pass
    finally:
        tunnel.stop()

Multiple Authentication Methods

# Key-based authentication with file
ssh_key_file = SSHResource(
    remote_host="server.example.com",
    username="deploy_user",
    key_file="/home/user/.ssh/id_rsa"
)

# Key-based authentication with string
ssh_key_string = SSHResource(
    remote_host="server.example.com", 
    username="deploy_user",
    key_string="""-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEA...
-----END RSA PRIVATE KEY-----"""
)

# Password authentication
ssh_password = SSHResource(
    remote_host="server.example.com",
    username="deploy_user",
    password="secure_password"
)

File Transfer Operations

from dagster import asset, Definitions
from dagster_ssh import SSHResource

@asset
def batch_file_transfer(ssh: SSHResource):
    # Download multiple files
    files = [
        ("/remote/data1.csv", "/local/data1.csv"),
        ("/remote/data2.csv", "/local/data2.csv"),
        ("/remote/config.json", "/local/config.json")
    ]
    
    downloaded_files = []
    for remote_path, local_path in files:
        downloaded_file = ssh.sftp_get(remote_path, local_path)
        downloaded_files.append(downloaded_file)
    
    return downloaded_files

@asset
def upload_results(ssh: SSHResource):
    # Upload processed results
    ssh.sftp_put("/remote/results/output.csv", "/local/processed_data.csv")
    ssh.sftp_put("/remote/logs/process.log", "/local/processing.log")

Legacy Resource Configuration

from dagster import Definitions, job, op
from dagster_ssh import ssh_resource

@op
def transfer_files(context, ssh):
    # Use SSH resource in operation
    ssh.sftp_get("/remote/file.txt", "/local/file.txt")

@job(resource_defs={"ssh": ssh_resource})
def ssh_transfer_job():
    transfer_files()

defs = Definitions(
    jobs=[ssh_transfer_job],
    resources={
        "ssh": ssh_resource.configured({
            "remote_host": "example.com",
            "username": "myuser", 
            "key_file": "/path/to/key"
        })
    }
)

Authentication Methods

The SSH resource supports multiple authentication approaches:

  1. SSH Key File: Most secure, specify key_file path
  2. SSH Key String: Secure, embed key content in key_string
  3. Password: Less secure, use password parameter
  4. SSH Config: Automatic detection from ~/.ssh/config
  5. System Keys: Automatic key discovery via paramiko

Security Considerations

  • Host Key Verification: Default configuration disables host key checking (no_host_key_check=True)
  • Key-based Authentication: Preferred over password authentication
  • Private Key Security: Store keys securely, avoid hardcoding in source
  • Network Security: Use appropriate network isolation and firewall rules
  • Connection Timeout: Configure appropriate timeout values to prevent hanging connections

Error Handling

Common exceptions and error patterns:

  • Connection Errors: Network connectivity, authentication failures
  • File Transfer Errors: Permission issues, disk space, invalid paths
  • Key Format Errors: Invalid private key format in key_from_str
  • Timeout Errors: Connection or operation timeouts