CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-tika--tika-core

Apache Tika Core provides the foundational APIs for detecting and extracting metadata and structured text content from various document formats.

Pending
Overview
Eval results
Files

pipes.mddocs/

Batch Processing (Pipes)

Batch processing framework for high-throughput document processing with configurable fetchers, emitters, and processing pipelines supporting parallel execution and error handling.

Capabilities

Core Pipeline Components

FetchEmitTuple

Core data structure representing a document processing operation with fetch source, emit target, and processing metadata.

/**
 * Tuple representing a document processing operation from fetch to emit
 */
public class FetchEmitTuple {
    /**
     * Creates FetchEmitTuple with fetch and emit identifiers
     * @param fetcherName Name of fetcher to retrieve document
     * @param fetchKey Key/path for document retrieval
     * @param emitterName Name of emitter for output
     * @param emitKey Key/path for document output
     */
    public FetchEmitTuple(String fetcherName, String fetchKey, String emitterName, String emitKey);
    
    /**
     * Creates FetchEmitTuple with metadata
     * @param fetcherName Fetcher identifier
     * @param fetchKey Document fetch key
     * @param emitterName Emitter identifier  
     * @param emitKey Document emit key
     * @param metadata Processing metadata
     */
    public FetchEmitTuple(String fetcherName, String fetchKey, String emitterName, String emitKey, Metadata metadata);
    
    /**
     * Gets the fetcher name for document retrieval
     * @return String identifying which fetcher to use
     */
    public String getFetcherName();
    
    /**
     * Gets the fetch key for document location
     * @return String key/path identifying document to fetch
     */
    public String getFetchKey();
    
    /**
     * Gets the emitter name for output
     * @return String identifying which emitter to use for output
     */
    public String getEmitterName();
    
    /**
     * Gets the emit key for output location
     * @return String key/path for output destination
     */
    public String getEmitKey();
    
    /**
     * Gets processing metadata
     * @return Metadata object with processing parameters and hints
     */
    public Metadata getMetadata();
    
    /**
     * Sets processing metadata
     * @param metadata Metadata to associate with processing
     */
    public void setMetadata(Metadata metadata);
    
    /**
     * Gets processing parameters as properties
     * @return Properties containing processing configuration
     */
    public Properties getProperties();
    
    /**
     * Sets processing parameters
     * @param properties Processing configuration properties
     */
    public void setProperties(Properties properties);
    
    /**
     * Gets unique identifier for this tuple
     * @return String uniquely identifying this processing operation
     */
    public String getId();
}

HandlerConfig

Configuration for content handlers used during document processing pipelines.

/**
 * Configuration for content handlers in processing pipeline
 */
public class HandlerConfig {
    /**
     * Creates HandlerConfig with handler class name
     * @param handlerClass Fully qualified class name of handler
     */
    public HandlerConfig(String handlerClass);
    
    /**
     * Creates HandlerConfig with class and parameters
     * @param handlerClass Handler class name
     * @param params Configuration parameters for handler
     */
    public HandlerConfig(String handlerClass, Map<String, String> params);
    
    /**
     * Gets handler class name
     * @return Fully qualified class name of content handler
     */
    public String getHandlerClass();
    
    /**
     * Gets configuration parameters
     * @return Map of parameter names to values
     */
    public Map<String, String> getParams();
    
    /**
     * Sets configuration parameter
     * @param name Parameter name
     * @param value Parameter value
     */
    public void setParam(String name, String value);
    
    /**
     * Gets parameter value
     * @param name Parameter name
     * @return Parameter value or null if not set
     */
    public String getParam(String name);
    
    /**
     * Creates handler instance from configuration
     * @return ContentHandler instance configured with parameters
     * @throws TikaException if handler cannot be created
     */
    public ContentHandler createHandler() throws TikaException;
}

Client-Server Architecture

PipesClient

Client interface for submitting document processing requests to Tika pipes server.

/**
 * Client for submitting document processing requests to Tika pipes server
 */
public class PipesClient {
    /**
     * Creates PipesClient with server URL
     * @param serverUrl URL of Tika pipes server
     */
    public PipesClient(String serverUrl);
    
    /**
     * Creates PipesClient with configuration
     * @param config Client configuration properties
     */
    public PipesClient(PipesConfig config);
    
    /**
     * Submits single document for processing
     * @param tuple FetchEmitTuple defining processing operation
     * @return PipesResult containing processing outcome
     * @throws IOException if communication with server fails
     * @throws TikaException if processing fails
     */
    public PipesResult process(FetchEmitTuple tuple) throws IOException, TikaException;
    
    /**
     * Submits batch of documents for processing
     * @param tuples List of FetchEmitTuple objects to process
     * @return List of PipesResult objects with processing outcomes
     * @throws IOException if communication fails
     * @throws TikaException if batch processing fails
     */
    public List<PipesResult> processBatch(List<FetchEmitTuple> tuples) throws IOException, TikaException;
    
    /**
     * Submits documents for asynchronous processing
     * @param tuples Documents to process asynchronously
     * @param callback Callback for processing results
     * @throws IOException if submission fails
     */
    public void processAsync(List<FetchEmitTuple> tuples, PipesCallback callback) throws IOException;
    
    /**
     * Gets server status and capabilities
     * @return Map containing server status information
     * @throws IOException if server communication fails
     */
    public Map<String, Object> getServerStatus() throws IOException;
    
    /**
     * Checks if server is available and responsive
     * @return true if server is ready to accept requests
     */
    public boolean isServerAvailable();
    
    /**
     * Sets request timeout
     * @param timeoutMs Timeout in milliseconds
     */
    public void setTimeout(int timeoutMs);
    
    /**
     * Gets current request timeout
     * @return Timeout in milliseconds
     */
    public int getTimeout();
    
    /**
     * Closes client and releases resources
     * @throws IOException if cleanup fails
     */
    public void close() throws IOException;
}

PipesServer

Server implementation for handling document processing requests with configurable concurrency and resource management.

/**
 * Server for handling batch document processing requests
 */
public class PipesServer {
    /**
     * Creates PipesServer with configuration
     * @param config Server configuration
     */
    public PipesServer(PipesConfig config);
    
    /**
     * Starts server and begins accepting requests
     * @throws IOException if server cannot be started
     * @throws TikaException if configuration is invalid
     */
    public void start() throws IOException, TikaException;
    
    /**
     * Stops server gracefully
     * @throws IOException if shutdown fails
     */
    public void stop() throws IOException;
    
    /**
     * Checks if server is running
     * @return true if server is started and accepting requests
     */
    public boolean isRunning();
    
    /**
     * Gets server port number
     * @return Port number server is listening on
     */
    public int getPort();
    
    /**
     * Gets server configuration
     * @return PipesConfig used by this server
     */
    public PipesConfig getConfig();
    
    /**
     * Gets current server statistics
     * @return Map containing processing statistics
     */
    public Map<String, Object> getStatistics();
    
    /**
     * Processes single document request
     * @param tuple Document processing request
     * @return PipesResult with processing outcome
     * @throws TikaException if processing fails
     * @throws IOException if I/O error occurs
     */
    public PipesResult process(FetchEmitTuple tuple) throws TikaException, IOException;
    
    /**
     * Sets maximum concurrent processing threads
     * @param maxThreads Maximum number of processing threads
     */
    public void setMaxConcurrentRequests(int maxThreads);
    
    /**
     * Gets maximum concurrent requests
     * @return Maximum number of concurrent processing requests
     */
    public int getMaxConcurrentRequests();
}

PipesResult

Result object containing outcome of document processing operation with status, metadata, and error information.

/**
 * Result of document processing operation in pipes framework
 */
public class PipesResult {
    /**
     * Processing status enumeration
     */
    public enum STATUS {
        SUCCESS,           // Processing completed successfully
        PARSE_EXCEPTION,   // Parse error occurred
        TIMEOUT,          // Processing timed out
        OOM,              // Out of memory error
        NO_SUCH_FILE,     // Input file not found
        EMIT_EXCEPTION,   // Error during emit phase
        FETCHER_INITIALIZATION_EXCEPTION, // Fetcher setup failed
        EMITTER_INITIALIZATION_EXCEPTION, // Emitter setup failed
        INTERRUPTED_EXCEPTION             // Processing was interrupted
    }
    
    /**
     * Creates PipesResult with status
     * @param status Processing status
     */
    public PipesResult(STATUS status);
    
    /**
     * Creates PipesResult with status and processing time
     * @param status Processing status
     * @param processTimeMs Processing time in milliseconds
     */
    public PipesResult(STATUS status, long processTimeMs);
    
    /**
     * Gets processing status
     * @return STATUS indicating processing outcome
     */
    public STATUS getStatus();
    
    /**
     * Gets processing time
     * @return Processing duration in milliseconds
     */
    public long getProcessTimeMs();
    
    /**
     * Gets exception that occurred during processing
     * @return Exception object or null if no error
     */
    public Exception getException();
    
    /**
     * Sets exception information
     * @param exception Exception that occurred
     */
    public void setException(Exception exception);
    
    /**
     * Gets extracted metadata
     * @return Metadata extracted during processing
     */
    public Metadata getMetadata();
    
    /**
     * Sets extracted metadata
     * @param metadata Metadata from processing
     */
    public void setMetadata(Metadata metadata);
    
    /**
     * Gets extracted content
     * @return Text content extracted from document
     */
    public String getContent();
    
    /**
     * Sets extracted content
     * @param content Text content from processing
     */
    public void setContent(String content);
    
    /**
     * Checks if processing was successful
     * @return true if status is SUCCESS
     */
    public boolean isSuccess();
    
    /**
     * Gets error message if processing failed
     * @return Error message or null if successful
     */
    public String getErrorMessage();
    
    /**
     * Gets processing statistics
     * @return Map containing detailed processing metrics
     */
    public Map<String, Object> getStatistics();
}

Configuration Classes

PipesConfig

Main configuration class for pipes framework with settings for fetchers, emitters, processing, and server options.

/**
 * Configuration for Tika pipes batch processing framework
 */
public class PipesConfig extends PipesConfigBase {
    /**
     * Creates default PipesConfig
     */
    public PipesConfig();
    
    /**
     * Creates PipesConfig from properties file
     * @param configFile Properties file containing configuration
     * @throws IOException if config file cannot be read
     */
    public PipesConfig(File configFile) throws IOException;
    
    /**
     * Creates PipesConfig from input stream
     * @param configStream Stream containing configuration properties
     * @throws IOException if stream cannot be read
     */
    public PipesConfig(InputStream configStream) throws IOException;
    
    /**
     * Gets server port number
     * @return Port number for pipes server
     */
    public int getServerPort();
    
    /**
     * Sets server port number
     * @param port Port number for server to listen on
     */
    public void setServerPort(int port);
    
    /**
     * Gets maximum concurrent processing threads
     * @return Maximum number of concurrent requests
     */
    public int getMaxConcurrentRequests();
    
    /**
     * Sets maximum concurrent processing threads
     * @param maxThreads Maximum concurrent requests
     */
    public void setMaxConcurrentRequests(int maxThreads);
    
    /**
     * Gets processing timeout in milliseconds
     * @return Timeout for individual document processing
     */
    public long getTimeoutMs();
    
    /**
     * Sets processing timeout
     * @param timeoutMs Timeout in milliseconds
     */
    public void setTimeoutMs(long timeoutMs);
    
    /**
     * Gets fetcher manager configuration
     * @return FetcherManager for document retrieval
     */
    public FetcherManager getFetcherManager();
    
    /**
     * Sets fetcher manager
     * @param fetcherManager Manager for document fetchers
     */
    public void setFetcherManager(FetcherManager fetcherManager);
    
    /**
     * Gets emitter manager configuration
     * @return EmitterManager for document output
     */
    public EmitterManager getEmitterManager();
    
    /**
     * Sets emitter manager
     * @param emitterManager Manager for document emitters
     */
    public void setEmitterManager(EmitterManager emitterManager);
    
    /**
     * Gets TikaConfig for parsing configuration
     * @return TikaConfig with parser and detector settings
     */
    public TikaConfig getTikaConfig();
    
    /**
     * Sets TikaConfig for parsing
     * @param tikaConfig Tika configuration for parsers
     */
    public void setTikaConfig(TikaConfig tikaConfig);
}

PipesConfigBase

Base configuration class with common settings and initialization patterns.

/**
 * Base configuration class for pipes components
 */
public abstract class PipesConfigBase {
    /**
     * Initializes configuration from properties
     * @param properties Configuration properties
     * @throws TikaException if initialization fails
     */
    public void initialize(Properties properties) throws TikaException;
    
    /**
     * Gets configuration property value
     * @param key Property key
     * @return Property value or null if not set
     */
    public String getProperty(String key);
    
    /**
     * Gets configuration property with default value
     * @param key Property key
     * @param defaultValue Default value if property not set
     * @return Property value or default if not set
     */
    public String getProperty(String key, String defaultValue);
    
    /**
     * Sets configuration property
     * @param key Property key
     * @param value Property value
     */
    public void setProperty(String key, String value);
    
    /**
     * Gets all configuration properties
     * @return Properties object with all settings
     */
    public Properties getProperties();
    
    /**
     * Validates configuration settings
     * @throws TikaException if configuration is invalid
     */
    public void validate() throws TikaException;
}

Fetcher Framework

Fetcher Interface

Interface for document retrieval implementations supporting various data sources and protocols.

/**
 * Interface for fetching documents from various sources
 */
public interface Fetcher extends Initializable {
    /**
     * Fetches document from source
     * @param fetchKey Key identifying document to fetch
     * @param metadata Metadata for fetch operation
     * @return InputStream containing document data
     * @throws IOException if fetch operation fails
     * @throws TikaException if fetcher encounters error
     */
    InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException;
    
    /**
     * Gets name/identifier for this fetcher
     * @return String identifying this fetcher instance
     */
    String getName();
    
    /**
     * Sets name/identifier for this fetcher
     * @param name Identifier for this fetcher
     */
    void setName(String name);
    
    /**
     * Checks if fetcher supports specific fetch key pattern
     * @param fetchKey Key to check support for
     * @return true if this fetcher can handle the key
     */
    boolean supports(String fetchKey);
}

AbstractFetcher

Base implementation providing common fetcher functionality and configuration patterns.

/**
 * Abstract base class for fetcher implementations
 */
public abstract class AbstractFetcher implements Fetcher {
    /**
     * Creates AbstractFetcher with default settings
     */
    public AbstractFetcher();
    
    /**
     * Gets fetcher name
     * @return String identifier for this fetcher
     */
    @Override
    public String getName();
    
    /**
     * Sets fetcher name
     * @param name Identifier for this fetcher
     */
    @Override
    public void setName(String name);
    
    /**
     * Initializes fetcher with configuration parameters
     * @param params Configuration parameters
     * @throws TikaConfigException if initialization fails
     */
    @Override
    public void initialize(Map<String, Param> params) throws TikaConfigException;
    
    /**
     * Checks initialization problems
     * @param handler Problem handler for reporting issues
     */
    @Override
    public void checkInitialization(InitializableProblemHandler handler);
    
    /**
     * Default implementation returns true for all keys
     * @param fetchKey Key to check support for
     * @return true (subclasses should override for specific logic)
     */
    @Override
    public boolean supports(String fetchKey);
    
    /**
     * Template method for actual fetch implementation
     * @param fetchKey Document key to fetch
     * @param metadata Fetch metadata
     * @return InputStream with document data
     * @throws IOException if fetch fails
     * @throws TikaException if processing error occurs
     */
    @Override
    public abstract InputStream fetch(String fetchKey, Metadata metadata) 
            throws IOException, TikaException;
}

FetcherManager

Manager for multiple fetcher instances with routing and lifecycle management.

/**
 * Manager for multiple fetcher instances with routing capabilities
 */
public class FetcherManager {
    /**
     * Creates FetcherManager with default configuration
     */
    public FetcherManager();
    
    /**
     * Creates FetcherManager from configuration
     * @param config Configuration properties for fetchers
     * @throws TikaException if configuration is invalid
     */
    public FetcherManager(Properties config) throws TikaException;
    
    /**
     * Registers fetcher instance
     * @param name Fetcher identifier
     * @param fetcher Fetcher implementation to register
     */
    public void addFetcher(String name, Fetcher fetcher);
    
    /**
     * Gets fetcher by name
     * @param name Fetcher identifier
     * @return Fetcher instance or null if not found
     */
    public Fetcher getFetcher(String name);
    
    /**
     * Gets all registered fetcher names
     * @return Set of fetcher identifiers
     */
    public Set<String> getFetcherNames();
    
    /**
     * Removes fetcher by name
     * @param name Fetcher identifier to remove
     * @return Removed fetcher or null if not found
     */
    public Fetcher removeFetcher(String name);
    
    /**
     * Fetches document using appropriate fetcher
     * @param fetcherName Name of fetcher to use
     * @param fetchKey Document key to fetch
     * @param metadata Fetch metadata
     * @return InputStream containing document data
     * @throws IOException if fetch fails
     * @throws TikaException if no suitable fetcher found
     */
    public InputStream fetch(String fetcherName, String fetchKey, Metadata metadata) 
            throws IOException, TikaException;
    
    /**
     * Initializes all registered fetchers
     * @throws TikaException if any fetcher initialization fails
     */
    public void initialize() throws TikaException;
    
    /**
     * Closes all fetchers and releases resources
     * @throws IOException if cleanup fails
     */
    public void close() throws IOException;
}

Emitter Framework

Emitter Interface

Interface for document output implementations supporting various destinations and formats.

/**
 * Interface for emitting processed documents to various destinations
 */
public interface Emitter extends Initializable {
    /**
     * Emits processed document to destination
     * @param emitKey Key identifying output destination
     * @param metadata Document metadata
     * @param outputStream Stream containing processed document data
     * @throws IOException if emit operation fails
     * @throws TikaException if emitter encounters error
     */
    void emit(String emitKey, Metadata metadata, OutputStream outputStream) 
            throws IOException, TikaException;
    
    /**
     * Emits document content as string
     * @param emitKey Output destination key
     * @param metadata Document metadata
     * @param content Processed document content
     * @throws IOException if emit fails
     * @throws TikaException if processing error occurs
     */
    void emit(String emitKey, Metadata metadata, String content) 
            throws IOException, TikaException;
    
    /**
     * Gets name/identifier for this emitter
     * @return String identifying this emitter instance
     */
    String getName();
    
    /**
     * Sets name/identifier for this emitter
     * @param name Identifier for this emitter
     */
    void setName(String name);
    
    /**
     * Checks if emitter supports specific emit key pattern
     * @param emitKey Key to check support for
     * @return true if this emitter can handle the key
     */
    boolean supports(String emitKey);
}

AbstractEmitter

Base implementation providing common emitter functionality and configuration support.

/**
 * Abstract base class for emitter implementations
 */
public abstract class AbstractEmitter implements Emitter {
    /**
     * Creates AbstractEmitter with default settings
     */
    public AbstractEmitter();
    
    /**
     * Gets emitter name
     * @return String identifier for this emitter
     */
    @Override
    public String getName();
    
    /**
     * Sets emitter name
     * @param name Identifier for this emitter
     */
    @Override
    public void setName(String name);
    
    /**
     * Initializes emitter with configuration parameters
     * @param params Configuration parameters
     * @throws TikaConfigException if initialization fails
     */
    @Override
    public void initialize(Map<String, Param> params) throws TikaConfigException;
    
    /**
     * Checks initialization problems
     * @param handler Problem handler for reporting issues
     */
    @Override
    public void checkInitialization(InitializableProblemHandler handler);
    
    /**
     * Default implementation returns true for all keys
     * @param emitKey Key to check support for
     * @return true (subclasses should override for specific logic)
     */
    @Override
    public boolean supports(String emitKey);
    
    /**
     * Default string emit implementation using OutputStream version
     * @param emitKey Output destination key
     * @param metadata Document metadata
     * @param content Document content as string
     * @throws IOException if emit fails
     * @throws TikaException if processing error occurs
     */
    @Override
    public void emit(String emitKey, Metadata metadata, String content) 
            throws IOException, TikaException;
    
    /**
     * Template method for actual emit implementation
     * @param emitKey Output destination key
     * @param metadata Document metadata
     * @param outputStream Stream containing document data
     * @throws IOException if emit fails
     * @throws TikaException if processing error occurs
     */
    @Override
    public abstract void emit(String emitKey, Metadata metadata, OutputStream outputStream) 
            throws IOException, TikaException;
}

EmitterManager

Manager for multiple emitter instances with routing and lifecycle management.

/**
 * Manager for multiple emitter instances with routing capabilities
 */
public class EmitterManager {
    /**
     * Creates EmitterManager with default configuration
     */
    public EmitterManager();
    
    /**
     * Creates EmitterManager from configuration
     * @param config Configuration properties for emitters
     * @throws TikaException if configuration is invalid
     */
    public EmitterManager(Properties config) throws TikaException;
    
    /**
     * Registers emitter instance
     * @param name Emitter identifier
     * @param emitter Emitter implementation to register
     */
    public void addEmitter(String name, Emitter emitter);
    
    /**
     * Gets emitter by name
     * @param name Emitter identifier
     * @return Emitter instance or null if not found
     */
    public Emitter getEmitter(String name);
    
    /**
     * Gets all registered emitter names
     * @return Set of emitter identifiers
     */
    public Set<String> getEmitterNames();
    
    /**
     * Removes emitter by name
     * @param name Emitter identifier to remove
     * @return Removed emitter or null if not found
     */
    public Emitter removeEmitter(String name);
    
    /**
     * Emits document using appropriate emitter
     * @param emitterName Name of emitter to use
     * @param emitKey Output destination key
     * @param metadata Document metadata
     * @param outputStream Document data stream
     * @throws IOException if emit fails
     * @throws TikaException if no suitable emitter found
     */
    public void emit(String emitterName, String emitKey, Metadata metadata, OutputStream outputStream) 
            throws IOException, TikaException;
    
    /**
     * Initializes all registered emitters
     * @throws TikaException if any emitter initialization fails
     */
    public void initialize() throws TikaException;
    
    /**
     * Closes all emitters and releases resources
     * @throws IOException if cleanup fails
     */
    public void close() throws IOException;
}

Usage Examples

Basic Pipes Processing

// Configure and start pipes server
PipesConfig config = new PipesConfig();
config.setServerPort(9998);
config.setMaxConcurrentRequests(10);
config.setTimeoutMs(30000);

PipesServer server = new PipesServer(config);
server.start();

// Create client and submit processing request
PipesClient client = new PipesClient("http://localhost:9998");

FetchEmitTuple tuple = new FetchEmitTuple(
    "file-fetcher", "/path/to/document.pdf",
    "text-emitter", "/output/document.txt"
);

try {
    PipesResult result = client.process(tuple);
    
    if (result.isSuccess()) {
        System.out.println("Processing successful");
        System.out.println("Time: " + result.getProcessTimeMs() + "ms");
        System.out.println("Content: " + result.getContent());
    } else {
        System.err.println("Processing failed: " + result.getErrorMessage());
    }
    
} catch (IOException | TikaException e) {
    System.err.println("Request failed: " + e.getMessage());
    
} finally {
    client.close();
    server.stop();
}

Batch Document Processing

// Prepare batch of documents for processing
List<FetchEmitTuple> batch = new ArrayList<>();

for (int i = 1; i <= 100; i++) {
    FetchEmitTuple tuple = new FetchEmitTuple(
        "file-fetcher", "/docs/doc" + i + ".pdf",
        "search-emitter", "doc-" + i
    );
    batch.add(tuple);
}

// Submit batch for processing
PipesClient client = new PipesClient("http://localhost:9998");

try {
    List<PipesResult> results = client.processBatch(batch);
    
    int successful = 0;
    int failed = 0;
    long totalTime = 0;
    
    for (PipesResult result : results) {
        totalTime += result.getProcessTimeMs();
        
        if (result.isSuccess()) {
            successful++;
        } else {
            failed++;
            System.err.println("Failed: " + result.getErrorMessage());
        }
    }
    
    System.out.println("Batch completed:");
    System.out.println("  Successful: " + successful);
    System.out.println("  Failed: " + failed);
    System.out.println("  Total time: " + totalTime + "ms");
    System.out.println("  Average: " + (totalTime / results.size()) + "ms per doc");
    
} catch (IOException | TikaException e) {
    System.err.println("Batch processing failed: " + e.getMessage());
}

Custom Fetcher Implementation

// Custom fetcher for database documents
public class DatabaseFetcher extends AbstractFetcher {
    
    private DataSource dataSource;
    private String queryTemplate;
    
    @Override
    public void initialize(Map<String, Param> params) throws TikaConfigException {
        super.initialize(params);
        
        Param dsParam = params.get("dataSource");
        if (dsParam != null) {
            this.dataSource = (DataSource) dsParam.getValue();
        }
        
        Param queryParam = params.get("query");
        if (queryParam != null) {
            this.queryTemplate = queryParam.getValue().toString();
        }
    }
    
    @Override
    public boolean supports(String fetchKey) {
        // Support numeric document IDs
        return fetchKey.matches("\\d+");
    }
    
    @Override
    public InputStream fetch(String fetchKey, Metadata metadata) 
            throws IOException, TikaException {
        
        try (Connection conn = dataSource.getConnection()) {
            String query = queryTemplate.replace("{id}", fetchKey);
            
            try (PreparedStatement stmt = conn.prepareStatement(query);
                 ResultSet rs = stmt.executeQuery()) {
                
                if (rs.next()) {
                    byte[] data = rs.getBytes("document_data");
                    String filename = rs.getString("filename");
                    String mimeType = rs.getString("mime_type");
                    
                    // Set metadata from database
                    metadata.set(Metadata.RESOURCE_NAME_KEY, filename);
                    metadata.set(Metadata.CONTENT_TYPE, mimeType);
                    
                    return new ByteArrayInputStream(data);
                } else {
                    throw new TikaException("Document not found: " + fetchKey);
                }
            }
            
        } catch (SQLException e) {
            throw new TikaException("Database error fetching document", e);
        }
    }
}

Custom Emitter Implementation

// Custom emitter for search index
public class SearchIndexEmitter extends AbstractEmitter {
    
    private SearchClient searchClient;
    private String indexName;
    
    @Override
    public void initialize(Map<String, Param> params) throws TikaConfigException {
        super.initialize(params);
        
        Param clientParam = params.get("searchClient");
        if (clientParam != null) {
            this.searchClient = (SearchClient) clientParam.getValue();
        }
        
        Param indexParam = params.get("indexName");
        if (indexParam != null) {
            this.indexName = indexParam.getValue().toString();
        }
    }
    
    @Override
    public void emit(String emitKey, Metadata metadata, String content) 
            throws IOException, TikaException {
        
        try {
            // Create search document
            SearchDocument doc = new SearchDocument();
            doc.setId(emitKey);
            doc.setContent(content);
            
            // Add metadata fields
            for (String name : metadata.names()) {
                String[] values = metadata.getValues(name);
                if (values.length == 1) {
                    doc.addField(name, values[0]);
                } else {
                    doc.addField(name, Arrays.asList(values));
                }
            }
            
            // Index document
            searchClient.index(indexName, doc);
            
        } catch (Exception e) {
            throw new TikaException("Failed to index document: " + emitKey, e);
        }
    }
    
    @Override
    public void emit(String emitKey, Metadata metadata, OutputStream outputStream) 
            throws IOException, TikaException {
        
        // Convert stream to string and delegate
        String content = IOUtils.toString(outputStream, "UTF-8");
        emit(emitKey, metadata, content);
    }
}

Asynchronous Processing

// Asynchronous batch processing with callback
public class AsyncProcessor {
    
    public void processDocumentsAsync(List<FetchEmitTuple> documents) throws IOException {
        PipesClient client = new PipesClient("http://localhost:9998");
        
        // Create callback for handling results
        PipesCallback callback = new PipesCallback() {
            @Override
            public void onResult(FetchEmitTuple tuple, PipesResult result) {
                if (result.isSuccess()) {
                    System.out.println("Completed: " + tuple.getFetchKey() + 
                                     " (" + result.getProcessTimeMs() + "ms)");
                } else {
                    System.err.println("Failed: " + tuple.getFetchKey() + 
                                     " - " + result.getErrorMessage());
                }
            }
            
            @Override
            public void onBatchComplete(List<PipesResult> results) {
                long totalTime = results.stream()
                    .mapToLong(PipesResult::getProcessTimeMs)
                    .sum();
                
                System.out.println("Batch completed in " + totalTime + "ms");
            }
            
            @Override
            public void onError(Exception error) {
                System.err.println("Batch error: " + error.getMessage());
            }
        };
        
        // Submit for asynchronous processing
        client.processAsync(documents, callback);
        
        // Continue with other work while processing happens...
    }
}

interface PipesCallback {
    void onResult(FetchEmitTuple tuple, PipesResult result);
    void onBatchComplete(List<PipesResult> results);
    void onError(Exception error);
}

Configuration and Management

// Complete pipes configuration setup
public class PipesSetup {
    
    public PipesConfig createConfiguration() throws IOException, TikaException {
        PipesConfig config = new PipesConfig();
        
        // Server settings
        config.setServerPort(9998);
        config.setMaxConcurrentRequests(20);
        config.setTimeoutMs(60000);
        
        // Setup fetcher manager
        FetcherManager fetcherManager = new FetcherManager();
        fetcherManager.addFetcher("file", new FileFetcher());
        fetcherManager.addFetcher("http", new HttpFetcher());
        fetcherManager.addFetcher("s3", new S3Fetcher());
        fetcherManager.addFetcher("database", new DatabaseFetcher());
        
        config.setFetcherManager(fetcherManager);
        
        // Setup emitter manager
        EmitterManager emitterManager = new EmitterManager();
        emitterManager.addEmitter("file", new FileEmitter());
        emitterManager.addEmitter("search", new SearchIndexEmitter());
        emitterManager.addEmitter("database", new DatabaseEmitter());
        
        config.setEmitterManager(emitterManager);
        
        // Configure Tika parsing
        TikaConfig tikaConfig = TikaConfig.getDefaultConfig();
        config.setTikaConfig(tikaConfig);
        
        return config;
    }
    
    public void startServer(PipesConfig config) throws IOException, TikaException {
        PipesServer server = new PipesServer(config);
        
        // Add shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                System.out.println("Shutting down pipes server...");
                server.stop();
            } catch (IOException e) {
                System.err.println("Error during shutdown: " + e.getMessage());
            }
        }));
        
        server.start();
        System.out.println("Pipes server started on port " + server.getPort());
        
        // Monitor server statistics
        new Thread(() -> {
            while (server.isRunning()) {
                try {
                    Thread.sleep(30000); // Every 30 seconds
                    
                    Map<String, Object> stats = server.getStatistics();
                    System.out.println("Server stats: " + stats);
                    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }).start();
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-tika--tika-core

docs

configuration.md

content-processing.md

detection.md

embedded-extraction.md

embedding.md

exceptions.md

index.md

io-utilities.md

language.md

metadata.md

mime-types.md

parsing.md

pipes.md

process-forking.md

rendering.md

tile.json