CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-clients

Apache Flink client library providing programmatic APIs and command-line interfaces for submitting, managing, and monitoring Flink jobs.

Pending
Overview
Eval results
Files

artifact-management.mddocs/

Artifact Management

Flexible system for fetching and managing job artifacts from various sources including local files, HTTP endpoints, and distributed file systems with pluggable fetcher implementations and comprehensive error handling.

Capabilities

Artifact Fetch Manager

Central manager for artifact fetching operations providing unified interface to multiple fetcher implementations.

/**
 * Manager for artifact fetching operations
 */
public class ArtifactFetchManager {
    /**
     * Create artifact fetch manager from configuration
     * @param configuration Flink configuration
     * @return Configured artifact fetch manager
     */
    public static ArtifactFetchManager fromConfiguration(Configuration configuration);
    
    /**
     * Fetch artifact from URI to target directory
     * @param uri URI of the artifact to fetch
     * @param targetDir Target directory for the artifact
     * @return Future with fetched file
     * @throws Exception if fetching fails
     */
    public CompletableFuture<File> fetchArtifact(URI uri, File targetDir) throws Exception;
    
    /**
     * Fetch artifact with custom filename
     * @param uri URI of the artifact to fetch
     * @param targetDir Target directory for the artifact
     * @param filename Custom filename for the artifact
     * @return Future with fetched file
     * @throws Exception if fetching fails
     */
    public CompletableFuture<File> fetchArtifact(
        URI uri,
        File targetDir,
        @Nullable String filename
    ) throws Exception;
    
    /**
     * Get registered artifact fetchers
     * @return Collection of registered fetchers
     */
    public Collection<ArtifactFetcher> getRegisteredFetchers();
    
    /**
     * Register custom artifact fetcher
     * @param fetcher Artifact fetcher to register
     */
    public void registerFetcher(ArtifactFetcher fetcher);
}

Usage Examples:

import org.apache.flink.client.program.artifact.ArtifactFetchManager;
import java.net.URI;
import java.io.File;

// Create artifact fetch manager
ArtifactFetchManager fetchManager = ArtifactFetchManager.fromConfiguration(config);

// Fetch artifact from HTTP
URI httpUri = new URI("https://example.com/path/to/job.jar");
File targetDir = new File("/tmp/artifacts");
targetDir.mkdirs();

CompletableFuture<File> fetchResult = fetchManager.fetchArtifact(httpUri, targetDir);
File fetchedFile = fetchResult.get();
System.out.println("Fetched artifact: " + fetchedFile.getAbsolutePath());

// Fetch with custom filename
CompletableFuture<File> customFetch = fetchManager.fetchArtifact(
    httpUri, 
    targetDir, 
    "my-job.jar"
);
File customFile = customFetch.get();

Artifact Fetcher Interface

Interface for implementing artifact fetcher strategies supporting different protocols and storage systems.

/**
 * Interface for artifact fetching implementations
 */
public interface ArtifactFetcher {
    /**
     * Fetch artifact from URI
     * @param uri URI of the artifact
     * @param flinkConf Flink configuration
     * @param targetDir Target directory
     * @param filename Optional custom filename
     * @return Future with fetched file
     * @throws Exception if fetching fails
     */
    CompletableFuture<File> fetch(
        URI uri,
        Configuration flinkConf,
        File targetDir,
        @Nullable String filename
    ) throws Exception;
    
    /**
     * Check if this fetcher supports the given URI scheme
     * @param uri URI to check
     * @return true if supported
     */
    boolean supportsScheme(URI uri);
}

HTTP Artifact Fetcher

Implementation for fetching artifacts from HTTP and HTTPS endpoints with support for authentication and custom headers.

/**
 * Fetcher for HTTP-based artifacts
 */
public class HttpArtifactFetcher implements ArtifactFetcher {
    /**
     * Create HTTP artifact fetcher with default configuration
     */
    public HttpArtifactFetcher();
    
    /**
     * Create HTTP artifact fetcher with connection timeout
     * @param connectionTimeoutMs Connection timeout in milliseconds
     * @param readTimeoutMs Read timeout in milliseconds
     */
    public HttpArtifactFetcher(int connectionTimeoutMs, int readTimeoutMs);
    
    @Override
    public CompletableFuture<File> fetch(
        URI uri,
        Configuration flinkConf,
        File targetDir,
        @Nullable String filename
    ) throws Exception;
    
    @Override
    public boolean supportsScheme(URI uri);
}

File System Artifact Fetcher

Implementation for fetching artifacts from distributed file systems including HDFS, S3, and other Flink-supported file systems.

/**
 * Fetcher for file system-based artifacts
 */
public class FsArtifactFetcher implements ArtifactFetcher {
    /**
     * Create file system artifact fetcher
     */
    public FsArtifactFetcher();
    
    @Override
    public CompletableFuture<File> fetch(
        URI uri,
        Configuration flinkConf,
        File targetDir,
        @Nullable String filename
    ) throws Exception;
    
    @Override
    public boolean supportsScheme(URI uri);
}

Local Artifact Fetcher

Implementation for handling local file system artifacts with support for symbolic links and file validation.

/**
 * Fetcher for local file system artifacts
 */
public class LocalArtifactFetcher implements ArtifactFetcher {
    /**
     * Create local artifact fetcher
     */
    public LocalArtifactFetcher();
    
    @Override
    public CompletableFuture<File> fetch(
        URI uri,
        Configuration flinkConf,
        File targetDir,
        @Nullable String filename
    ) throws Exception;
    
    @Override
    public boolean supportsScheme(URI uri);
    
    /**
     * Copy local file to target directory
     * @param sourceFile Source file to copy
     * @param targetDir Target directory
     * @param targetFileName Target filename
     * @return Copied file
     * @throws IOException if copy fails
     */
    protected File copyLocalFile(File sourceFile, File targetDir, String targetFileName) 
        throws IOException;
}

Artifact Utilities

Utility functions for common artifact operations including validation, metadata extraction, and path manipulation.

/**
 * Utilities for artifact operations
 */
public class ArtifactUtils {
    /**
     * Extract filename from URI
     * @param uri URI to extract filename from
     * @return Extracted filename or null
     */
    @Nullable
    public static String extractFilenameFromUri(URI uri);
    
    /**
     * Validate artifact file
     * @param file File to validate
     * @return true if valid artifact
     */
    public static boolean isValidArtifact(File file);
    
    /**
     * Get file extension from filename
     * @param filename Filename to analyze
     * @return File extension or empty string
     */
    public static String getFileExtension(String filename);
    
    /**
     * Create unique filename in directory
     * @param targetDir Target directory
     * @param baseFilename Base filename
     * @return Unique filename
     */
    public static String createUniqueFilename(File targetDir, String baseFilename);
    
    /**
     * Calculate file checksum
     * @param file File to checksum
     * @param algorithm Hash algorithm (MD5, SHA-1, SHA-256)
     * @return Hex-encoded checksum
     * @throws IOException if calculation fails
     */
    public static String calculateChecksum(File file, String algorithm) throws IOException;
    
    /**
     * Verify file checksum
     * @param file File to verify
     * @param expectedChecksum Expected checksum
     * @param algorithm Hash algorithm
     * @return true if checksum matches
     * @throws IOException if verification fails
     */
    public static boolean verifyChecksum(File file, String expectedChecksum, String algorithm)
        throws IOException;
}

CLI Artifact Fetch Options

Command-line options for artifact fetching operations integrated with the CLI frontend.

/**
 * Options for artifact fetching operations
 */
public class ArtifactFetchOptions extends CommandLineOptions {
    /**
     * Get artifact URI from options
     * @return Artifact URI
     */
    public String getArtifactUri();
    
    /**
     * Get target directory from options
     * @return Target directory path
     */
    public String getTargetDirectory();
    
    /**
     * Get custom filename from options
     * @return Custom filename or null
     */
    @Nullable
    public String getCustomFilename();
    
    /**
     * Get connection timeout from options
     * @return Connection timeout in milliseconds
     */
    public int getConnectionTimeout();
    
    /**
     * Get read timeout from options
     * @return Read timeout in milliseconds
     */
    public int getReadTimeout();
    
    /**
     * Check if checksum verification is enabled
     * @return true if verification enabled
     */
    public boolean isChecksumVerificationEnabled();
    
    /**
     * Get expected checksum from options
     * @return Expected checksum or null
     */
    @Nullable
    public String getExpectedChecksum();
    
    /**
     * Get checksum algorithm from options
     * @return Checksum algorithm (default: SHA-256)
     */
    public String getChecksumAlgorithm();
}

Usage Patterns

Basic Artifact Fetching

// Configure and fetch from multiple sources
Configuration config = new Configuration();
ArtifactFetchManager manager = ArtifactFetchManager.fromConfiguration(config);

// Fetch from HTTP
URI httpArtifact = new URI("https://repo.example.com/artifacts/job-1.0.jar");
File httpResult = manager.fetchArtifact(httpArtifact, targetDir).get();

// Fetch from HDFS
URI hdfsArtifact = new URI("hdfs://namenode:9000/artifacts/job-1.0.jar");
File hdfsResult = manager.fetchArtifact(hdfsArtifact, targetDir).get();

// Fetch from local file system
URI localArtifact = new URI("file:///path/to/local/job.jar");
File localResult = manager.fetchArtifact(localArtifact, targetDir).get();

Custom Fetcher Registration

// Create custom fetcher for specific protocol
public class S3ArtifactFetcher implements ArtifactFetcher {
    @Override
    public CompletableFuture<File> fetch(
        URI uri, Configuration flinkConf, File targetDir, String filename
    ) throws Exception {
        // Custom S3 fetching logic
        return CompletableFuture.supplyAsync(() -> {
            // Implement S3 download
            return downloadFromS3(uri, targetDir, filename);
        });
    }
    
    @Override
    public boolean supportsScheme(URI uri) {
        return "s3".equals(uri.getScheme()) || "s3a".equals(uri.getScheme());
    }
}

// Register custom fetcher
ArtifactFetchManager manager = ArtifactFetchManager.fromConfiguration(config);
manager.registerFetcher(new S3ArtifactFetcher());

Artifact Validation and Checksums

// Fetch with checksum verification
URI artifactUri = new URI("https://repo.example.com/job.jar");
File fetchedFile = manager.fetchArtifact(artifactUri, targetDir).get();

// Verify artifact integrity
String expectedChecksum = "a1b2c3d4e5f6...";
boolean isValid = ArtifactUtils.verifyChecksum(
    fetchedFile, 
    expectedChecksum, 
    "SHA-256"
);

if (!isValid) {
    throw new RuntimeException("Artifact checksum verification failed");
}

// Validate artifact format
if (!ArtifactUtils.isValidArtifact(fetchedFile)) {
    throw new RuntimeException("Invalid artifact format");
}

Error Handling

Artifact management operations handle various error conditions:

  • Network Errors: Connection failures, timeouts, DNS resolution issues
  • Authentication Errors: Invalid credentials, permission denied
  • File System Errors: Disk space, permission issues, path not found
  • Validation Errors: Checksum mismatches, corrupted files, invalid formats
  • Configuration Errors: Invalid URIs, missing required parameters

Error Handling Patterns:

try {
    CompletableFuture<File> fetchFuture = manager.fetchArtifact(uri, targetDir);
    
    File result = fetchFuture.handle((file, throwable) -> {
        if (throwable != null) {
            if (throwable.getCause() instanceof IOException) {
                System.err.println("I/O error: " + throwable.getMessage());
                // Handle I/O errors (network, file system)
            } else if (throwable.getCause() instanceof SecurityException) {
                System.err.println("Security error: " + throwable.getMessage());
                // Handle authentication/authorization errors
            } else {
                System.err.println("Unexpected error: " + throwable.getMessage());
                // Handle other errors
            }
            return null;
        }
        return file;
    }).get();
    
    if (result != null) {
        System.out.println("Artifact fetched successfully: " + result.getPath());
    }
    
} catch (InterruptedException | ExecutionException e) {
    System.err.println("Failed to fetch artifact: " + e.getMessage());
}

The artifact management system provides a flexible and extensible framework for handling diverse artifact sources, enabling Flink applications to fetch dependencies and resources from various storage systems with consistent error handling and validation.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-clients

docs

application-mode.md

artifact-management.md

cli-frontend.md

cluster-client.md

deployment-management.md

index.md

program-packaging.md

tile.json