Service Provider Interfaces (SPIs) that define extensible contracts for CDAP runtime functionality, enabling pluggable implementations of cluster provisioning, job lifecycle management, and infrastructure integration
—
Comprehensive SSH utilities for secure remote operations including session management, command execution, file transfer, and port forwarding. The SSH system is essential for cluster initialization, remote job management, and secure communication with provisioned infrastructure across different cloud platforms and environments.
Main entry point for SSH operations providing key management and session creation capabilities.
/**
* Provides SSH keys and session access for remote operations
* Main interface for establishing SSH connections and managing credentials
*/
interface SSHContext {
/**
* Generate a 2048-bit RSA key pair for the specified user
* @param user Username for the key pair
* @return Generated SSH key pair
* @throws KeyException if key generation fails
*/
SSHKeyPair generate(String user) throws KeyException;
/**
* Generate an RSA key pair with specified key size
* @param user Username for the key pair
* @param keySize Key size in bits (e.g., 2048, 4096)
* @return Generated SSH key pair
* @throws KeyException if key generation fails
*/
SSHKeyPair generate(String user, int keySize) throws KeyException;
/**
* Set the SSH key pair to use for connections
* @param keyPair SSH key pair to use
*/
void setSSHKeyPair(SSHKeyPair keyPair);
/**
* Get the currently configured SSH key pair
* @return SSH key pair if set, empty otherwise
*/
Optional<SSHKeyPair> getSSHKeyPair();
/**
* Create SSH session to specified host using configured key pair
* @param host Target host address
* @return SSH session instance
* @throws IOException if connection fails
*/
SSHSession createSSHSession(String host) throws IOException;
/**
* Create SSH session using specific key pair
* @param keyPair SSH key pair to use for authentication
* @param host Target host address
* @return SSH session instance
* @throws IOException if connection fails
*/
SSHSession createSSHSession(SSHKeyPair keyPair, String host) throws IOException;
/**
* Create SSH session with full configuration
* @param user Username for connection
* @param privateKeySupplier Supplier for private key bytes
* @param host Target host address
* @param port SSH port number
* @param configs Additional SSH configuration properties
* @return SSH session instance
* @throws IOException if connection fails
*/
SSHSession createSSHSession(String user,
Supplier<byte[]> privateKeySupplier,
String host,
int port,
Map<String, String> configs) throws IOException;
}Usage Example:
// Generate and configure SSH keys
SSHContext sshContext = // ... obtained from context
SSHKeyPair keyPair = sshContext.generate("cdap-user", 2048);
sshContext.setSSHKeyPair(keyPair);
// Create SSH session
try (SSHSession session = sshContext.createSSHSession("cluster-master.example.com")) {
// Use session for remote operations
SSHProcess process = session.executeAndWait("echo", "Hello from remote host");
System.out.println("Exit code: " + process.exitValue());
}Interface for managing SSH connections and performing remote operations.
/**
* SSH session for remote commands and file operations
* Must be closed to release connection resources
*/
interface SSHSession extends Closeable {
/**
* Check if SSH session is still alive
* @return true if session is connected
*/
boolean isAlive();
/**
* Get remote address of this session
* @return Socket address of remote host
*/
InetSocketAddress getAddress();
/**
* Get username used for this session
* @return Username string
*/
String getUsername();
/**
* Execute commands asynchronously
* @param commands Command arguments to execute
* @return SSH process handle
* @throws Exception if execution fails
*/
SSHProcess execute(String... commands) throws Exception;
/**
* Execute command list asynchronously
* @param commands List of command arguments
* @return SSH process handle
* @throws Exception if execution fails
*/
SSHProcess execute(List<String> commands) throws Exception;
/**
* Execute commands and wait for completion
* @param commands Command arguments to execute
* @return SSH process handle (already completed)
* @throws Exception if execution fails
*/
SSHProcess executeAndWait(String... commands) throws Exception;
/**
* Execute command list and wait for completion
* @param commands List of command arguments
* @return SSH process handle (already completed)
* @throws Exception if execution fails
*/
SSHProcess executeAndWait(List<String> commands) throws Exception;
/**
* Copy local file to remote host
* @param localFile Local file path
* @param remotePath Remote destination path
* @throws Exception if copy fails
*/
void copy(Path localFile, String remotePath) throws Exception;
/**
* Copy input stream to remote file with full options
* @param input Input stream to copy
* @param targetFile Remote target file path
* @param targetName Remote target file name
* @param length Number of bytes to copy
* @param permission File permissions (octal)
* @param lastModifiedTime Last modified timestamp (optional)
* @param lastAccessTime Last access timestamp (optional)
* @throws Exception if copy fails
*/
void copy(InputStream input,
String targetFile,
String targetName,
long length,
int permission,
Long lastModifiedTime,
Long lastAccessTime) throws Exception;
/**
* Create local port forwarding tunnel
* @param remoteHost Remote host to forward to
* @param remotePort Remote port to forward to
* @param localPort Local port to listen on
* @param dataConsumer Consumer for forwarded data
* @return Port forwarding channel
* @throws Exception if forwarding setup fails
*/
PortForwarding createLocalPortForward(String remoteHost,
int remotePort,
int localPort,
PortForwarding.DataConsumer dataConsumer) throws Exception;
/**
* Create remote port forwarding tunnel
* @param remotePort Remote port to listen on
* @param localPort Local port to forward to
* @return Remote port forwarding handle
* @throws Exception if forwarding setup fails
*/
RemotePortForwarding createRemotePortForward(int remotePort, int localPort) throws Exception;
/** Close SSH session and release resources */
void close() throws IOException;
}Usage Examples:
// Execute commands
try (SSHSession session = sshContext.createSSHSession("worker-1.example.com")) {
// Simple command execution
SSHProcess process = session.executeAndWait("ls", "-la", "/opt/cdap");
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line);
}
}
// File copy operations
Path localConfig = Paths.get("/local/config/app.properties");
session.copy(localConfig, "/opt/cdap/conf/app.properties");
// Port forwarding for web UI access
PortForwarding webTunnel = session.createLocalPortForward(
"localhost", 8080, 18080,
new PortForwarding.DataConsumer() {
@Override
public void received(ByteBuffer data) {
// Handle received data
}
@Override
public void flushed() {
// Handle flush events
}
@Override
public void finished() {
// Handle completion
}
});
}Interface representing a remote process launched via SSH.
/**
* Represents a remote process launched via SSH
* Provides access to process streams and status
*/
interface SSHProcess {
/**
* Get output stream for writing to process stdin
* @return Output stream to process
*/
OutputStream getOutputStream();
/**
* Get input stream for reading process stdout
* @return Input stream from process
*/
InputStream getInputStream();
/**
* Get input stream for reading process stderr
* @return Error stream from process
*/
InputStream getErrorStream();
/**
* Wait for process completion
* @return Process exit code
* @throws InterruptedException if wait is interrupted
*/
int waitFor() throws InterruptedException;
/**
* Wait for process completion with timeout
* @param timeout Maximum time to wait
* @param unit Time unit for timeout
* @return true if process completed, false if timeout
* @throws InterruptedException if wait is interrupted
*/
boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException;
/**
* Get process exit code (only valid after process completion)
* @return Exit code
* @throws IllegalThreadStateException if process not completed
*/
int exitValue();
/**
* Stop the remote process
*/
void destroy();
}Usage Example:
// Interactive process execution
SSHProcess process = session.execute("python3", "/opt/scripts/data_processor.py");
// Send input to process
try (PrintWriter writer = new PrintWriter(process.getOutputStream())) {
writer.println("process_data /input/data.csv");
writer.flush();
}
// Read process output
try (BufferedReader stdout = new BufferedReader(new InputStreamReader(process.getInputStream()));
BufferedReader stderr = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
// Wait for completion with timeout
boolean completed = process.waitFor(30, TimeUnit.MINUTES);
if (completed) {
int exitCode = process.exitValue();
System.out.println("Process completed with exit code: " + exitCode);
// Read any remaining output
String line;
while ((line = stdout.readLine()) != null) {
System.out.println("STDOUT: " + line);
}
while ((line = stderr.readLine()) != null) {
System.out.println("STDERR: " + line);
}
} else {
System.out.println("Process timed out, destroying...");
process.destroy();
}
}Interface for SSH port forwarding channels with data handling capabilities.
/**
* SSH port forwarding channel for tunneling network traffic
* Implements WritableByteChannel for sending data
*/
interface PortForwarding extends WritableByteChannel, Flushable {
/**
* Write data to the forwarding channel
* @param src Buffer containing data to write
* @return Number of bytes written
* @throws IOException if write fails
*/
int write(ByteBuffer src) throws IOException;
/**
* Flush any buffered data
* @throws IOException if flush fails
*/
void flush() throws IOException;
/**
* Check if channel is open
* @return true if channel is open
*/
boolean isOpen();
/** Close the forwarding channel */
void close() throws IOException;
/**
* Abstract consumer for handling forwarded data
*/
abstract class DataConsumer {
/**
* Called when data is received through the tunnel
* @param data Buffer containing received data
*/
public abstract void received(ByteBuffer data);
/**
* Called when data has been flushed
*/
public abstract void flushed();
/**
* Called when forwarding is finished
*/
public abstract void finished();
}
}Interface for managing remote port forwarding tunnels.
/**
* SSH remote port forwarding handle
* Must be closed to stop forwarding
*/
interface RemotePortForwarding extends Closeable {
/**
* Get the remote port being forwarded
* @return Remote port number
*/
int getRemotePort();
/** Stop remote port forwarding */
void close() throws IOException;
}Port Forwarding Usage Example:
// Set up database tunnel
RemotePortForwarding dbTunnel = session.createRemotePortForward(5432, 15432);
try {
// Now can connect to remote database via localhost:15432
String jdbcUrl = "jdbc:postgresql://localhost:15432/cdap_db";
Connection conn = DriverManager.getConnection(jdbcUrl, "user", "password");
// Use database connection...
} finally {
dbTunnel.close(); // Clean up tunnel
}Data classes for managing SSH key pairs and public keys.
/**
* Public-private key pair for SSH authentication
*/
class SSHKeyPair {
/**
* Get public key information
* @return SSH public key
*/
SSHPublicKey getPublicKey();
/**
* Get supplier for private key bytes
* @return Supplier providing private key data
*/
Supplier<byte[]> getPrivateKeySupplier();
}
/**
* SSH public key information
*/
class SSHPublicKey {
/**
* Get username associated with this key
* @return Username string
*/
String getUser();
/**
* Get public key in RFC-4253 format
* @return Public key string (e.g., "ssh-rsa AAAAB3Nza...")
*/
String getKey();
}Key Management Usage Example:
// Generate new key pair
SSHKeyPair keyPair = sshContext.generate("cdap-service", 4096);
// Extract public key for deployment
SSHPublicKey publicKey = keyPair.getPublicKey();
String authorizedKeyEntry = publicKey.getKey() + " " + publicKey.getUser() + "@cdap-cluster";
// Deploy public key to remote hosts (pseudo-code)
deployPublicKeyToCluster(authorizedKeyEntry);
// Use key pair for connections
sshContext.setSSHKeyPair(keyPair);
try (SSHSession session = sshContext.createSSHSession("cluster-node.example.com")) {
// Authenticated session ready for use
}Install with Tessl CLI
npx tessl i tessl/maven-io-cdap-cdap--cdap-runtime-spi