Apache Tika Core provides the foundational APIs for detecting and extracting metadata and structured text content from various document formats.
—
Batch processing framework for high-throughput document processing with configurable fetchers, emitters, and processing pipelines supporting parallel execution and error handling.
Core data structure representing a document processing operation with fetch source, emit target, and processing metadata.
/**
* Tuple representing a document processing operation from fetch to emit
*/
public class FetchEmitTuple {
/**
* Creates FetchEmitTuple with fetch and emit identifiers
* @param fetcherName Name of fetcher to retrieve document
* @param fetchKey Key/path for document retrieval
* @param emitterName Name of emitter for output
* @param emitKey Key/path for document output
*/
public FetchEmitTuple(String fetcherName, String fetchKey, String emitterName, String emitKey);
/**
* Creates FetchEmitTuple with metadata
* @param fetcherName Fetcher identifier
* @param fetchKey Document fetch key
* @param emitterName Emitter identifier
* @param emitKey Document emit key
* @param metadata Processing metadata
*/
public FetchEmitTuple(String fetcherName, String fetchKey, String emitterName, String emitKey, Metadata metadata);
/**
* Gets the fetcher name for document retrieval
* @return String identifying which fetcher to use
*/
public String getFetcherName();
/**
* Gets the fetch key for document location
* @return String key/path identifying document to fetch
*/
public String getFetchKey();
/**
* Gets the emitter name for output
* @return String identifying which emitter to use for output
*/
public String getEmitterName();
/**
* Gets the emit key for output location
* @return String key/path for output destination
*/
public String getEmitKey();
/**
* Gets processing metadata
* @return Metadata object with processing parameters and hints
*/
public Metadata getMetadata();
/**
* Sets processing metadata
* @param metadata Metadata to associate with processing
*/
public void setMetadata(Metadata metadata);
/**
* Gets processing parameters as properties
* @return Properties containing processing configuration
*/
public Properties getProperties();
/**
* Sets processing parameters
* @param properties Processing configuration properties
*/
public void setProperties(Properties properties);
/**
* Gets unique identifier for this tuple
* @return String uniquely identifying this processing operation
*/
public String getId();
}Configuration for content handlers used during document processing pipelines.
/**
* Configuration for content handlers in processing pipeline
*/
public class HandlerConfig {
/**
* Creates HandlerConfig with handler class name
* @param handlerClass Fully qualified class name of handler
*/
public HandlerConfig(String handlerClass);
/**
* Creates HandlerConfig with class and parameters
* @param handlerClass Handler class name
* @param params Configuration parameters for handler
*/
public HandlerConfig(String handlerClass, Map<String, String> params);
/**
* Gets handler class name
* @return Fully qualified class name of content handler
*/
public String getHandlerClass();
/**
* Gets configuration parameters
* @return Map of parameter names to values
*/
public Map<String, String> getParams();
/**
* Sets configuration parameter
* @param name Parameter name
* @param value Parameter value
*/
public void setParam(String name, String value);
/**
* Gets parameter value
* @param name Parameter name
* @return Parameter value or null if not set
*/
public String getParam(String name);
/**
* Creates handler instance from configuration
* @return ContentHandler instance configured with parameters
* @throws TikaException if handler cannot be created
*/
public ContentHandler createHandler() throws TikaException;
}Client interface for submitting document processing requests to Tika pipes server.
/**
* Client for submitting document processing requests to Tika pipes server
*/
public class PipesClient {
/**
* Creates PipesClient with server URL
* @param serverUrl URL of Tika pipes server
*/
public PipesClient(String serverUrl);
/**
* Creates PipesClient with configuration
* @param config Client configuration properties
*/
public PipesClient(PipesConfig config);
/**
* Submits single document for processing
* @param tuple FetchEmitTuple defining processing operation
* @return PipesResult containing processing outcome
* @throws IOException if communication with server fails
* @throws TikaException if processing fails
*/
public PipesResult process(FetchEmitTuple tuple) throws IOException, TikaException;
/**
* Submits batch of documents for processing
* @param tuples List of FetchEmitTuple objects to process
* @return List of PipesResult objects with processing outcomes
* @throws IOException if communication fails
* @throws TikaException if batch processing fails
*/
public List<PipesResult> processBatch(List<FetchEmitTuple> tuples) throws IOException, TikaException;
/**
* Submits documents for asynchronous processing
* @param tuples Documents to process asynchronously
* @param callback Callback for processing results
* @throws IOException if submission fails
*/
public void processAsync(List<FetchEmitTuple> tuples, PipesCallback callback) throws IOException;
/**
* Gets server status and capabilities
* @return Map containing server status information
* @throws IOException if server communication fails
*/
public Map<String, Object> getServerStatus() throws IOException;
/**
* Checks if server is available and responsive
* @return true if server is ready to accept requests
*/
public boolean isServerAvailable();
/**
* Sets request timeout
* @param timeoutMs Timeout in milliseconds
*/
public void setTimeout(int timeoutMs);
/**
* Gets current request timeout
* @return Timeout in milliseconds
*/
public int getTimeout();
/**
* Closes client and releases resources
* @throws IOException if cleanup fails
*/
public void close() throws IOException;
}Server implementation for handling document processing requests with configurable concurrency and resource management.
/**
* Server for handling batch document processing requests
*/
public class PipesServer {
/**
* Creates PipesServer with configuration
* @param config Server configuration
*/
public PipesServer(PipesConfig config);
/**
* Starts server and begins accepting requests
* @throws IOException if server cannot be started
* @throws TikaException if configuration is invalid
*/
public void start() throws IOException, TikaException;
/**
* Stops server gracefully
* @throws IOException if shutdown fails
*/
public void stop() throws IOException;
/**
* Checks if server is running
* @return true if server is started and accepting requests
*/
public boolean isRunning();
/**
* Gets server port number
* @return Port number server is listening on
*/
public int getPort();
/**
* Gets server configuration
* @return PipesConfig used by this server
*/
public PipesConfig getConfig();
/**
* Gets current server statistics
* @return Map containing processing statistics
*/
public Map<String, Object> getStatistics();
/**
* Processes single document request
* @param tuple Document processing request
* @return PipesResult with processing outcome
* @throws TikaException if processing fails
* @throws IOException if I/O error occurs
*/
public PipesResult process(FetchEmitTuple tuple) throws TikaException, IOException;
/**
* Sets maximum concurrent processing threads
* @param maxThreads Maximum number of processing threads
*/
public void setMaxConcurrentRequests(int maxThreads);
/**
* Gets maximum concurrent requests
* @return Maximum number of concurrent processing requests
*/
public int getMaxConcurrentRequests();
}Result object containing outcome of document processing operation with status, metadata, and error information.
/**
* Result of document processing operation in pipes framework
*/
public class PipesResult {
/**
* Processing status enumeration
*/
public enum STATUS {
SUCCESS, // Processing completed successfully
PARSE_EXCEPTION, // Parse error occurred
TIMEOUT, // Processing timed out
OOM, // Out of memory error
NO_SUCH_FILE, // Input file not found
EMIT_EXCEPTION, // Error during emit phase
FETCHER_INITIALIZATION_EXCEPTION, // Fetcher setup failed
EMITTER_INITIALIZATION_EXCEPTION, // Emitter setup failed
INTERRUPTED_EXCEPTION // Processing was interrupted
}
/**
* Creates PipesResult with status
* @param status Processing status
*/
public PipesResult(STATUS status);
/**
* Creates PipesResult with status and processing time
* @param status Processing status
* @param processTimeMs Processing time in milliseconds
*/
public PipesResult(STATUS status, long processTimeMs);
/**
* Gets processing status
* @return STATUS indicating processing outcome
*/
public STATUS getStatus();
/**
* Gets processing time
* @return Processing duration in milliseconds
*/
public long getProcessTimeMs();
/**
* Gets exception that occurred during processing
* @return Exception object or null if no error
*/
public Exception getException();
/**
* Sets exception information
* @param exception Exception that occurred
*/
public void setException(Exception exception);
/**
* Gets extracted metadata
* @return Metadata extracted during processing
*/
public Metadata getMetadata();
/**
* Sets extracted metadata
* @param metadata Metadata from processing
*/
public void setMetadata(Metadata metadata);
/**
* Gets extracted content
* @return Text content extracted from document
*/
public String getContent();
/**
* Sets extracted content
* @param content Text content from processing
*/
public void setContent(String content);
/**
* Checks if processing was successful
* @return true if status is SUCCESS
*/
public boolean isSuccess();
/**
* Gets error message if processing failed
* @return Error message or null if successful
*/
public String getErrorMessage();
/**
* Gets processing statistics
* @return Map containing detailed processing metrics
*/
public Map<String, Object> getStatistics();
}Main configuration class for pipes framework with settings for fetchers, emitters, processing, and server options.
/**
* Configuration for Tika pipes batch processing framework
*/
public class PipesConfig extends PipesConfigBase {
/**
* Creates default PipesConfig
*/
public PipesConfig();
/**
* Creates PipesConfig from properties file
* @param configFile Properties file containing configuration
* @throws IOException if config file cannot be read
*/
public PipesConfig(File configFile) throws IOException;
/**
* Creates PipesConfig from input stream
* @param configStream Stream containing configuration properties
* @throws IOException if stream cannot be read
*/
public PipesConfig(InputStream configStream) throws IOException;
/**
* Gets server port number
* @return Port number for pipes server
*/
public int getServerPort();
/**
* Sets server port number
* @param port Port number for server to listen on
*/
public void setServerPort(int port);
/**
* Gets maximum concurrent processing threads
* @return Maximum number of concurrent requests
*/
public int getMaxConcurrentRequests();
/**
* Sets maximum concurrent processing threads
* @param maxThreads Maximum concurrent requests
*/
public void setMaxConcurrentRequests(int maxThreads);
/**
* Gets processing timeout in milliseconds
* @return Timeout for individual document processing
*/
public long getTimeoutMs();
/**
* Sets processing timeout
* @param timeoutMs Timeout in milliseconds
*/
public void setTimeoutMs(long timeoutMs);
/**
* Gets fetcher manager configuration
* @return FetcherManager for document retrieval
*/
public FetcherManager getFetcherManager();
/**
* Sets fetcher manager
* @param fetcherManager Manager for document fetchers
*/
public void setFetcherManager(FetcherManager fetcherManager);
/**
* Gets emitter manager configuration
* @return EmitterManager for document output
*/
public EmitterManager getEmitterManager();
/**
* Sets emitter manager
* @param emitterManager Manager for document emitters
*/
public void setEmitterManager(EmitterManager emitterManager);
/**
* Gets TikaConfig for parsing configuration
* @return TikaConfig with parser and detector settings
*/
public TikaConfig getTikaConfig();
/**
* Sets TikaConfig for parsing
* @param tikaConfig Tika configuration for parsers
*/
public void setTikaConfig(TikaConfig tikaConfig);
}Base configuration class with common settings and initialization patterns.
/**
* Base configuration class for pipes components
*/
public abstract class PipesConfigBase {
/**
* Initializes configuration from properties
* @param properties Configuration properties
* @throws TikaException if initialization fails
*/
public void initialize(Properties properties) throws TikaException;
/**
* Gets configuration property value
* @param key Property key
* @return Property value or null if not set
*/
public String getProperty(String key);
/**
* Gets configuration property with default value
* @param key Property key
* @param defaultValue Default value if property not set
* @return Property value or default if not set
*/
public String getProperty(String key, String defaultValue);
/**
* Sets configuration property
* @param key Property key
* @param value Property value
*/
public void setProperty(String key, String value);
/**
* Gets all configuration properties
* @return Properties object with all settings
*/
public Properties getProperties();
/**
* Validates configuration settings
* @throws TikaException if configuration is invalid
*/
public void validate() throws TikaException;
}Interface for document retrieval implementations supporting various data sources and protocols.
/**
* Interface for fetching documents from various sources
*/
public interface Fetcher extends Initializable {
/**
* Fetches document from source
* @param fetchKey Key identifying document to fetch
* @param metadata Metadata for fetch operation
* @return InputStream containing document data
* @throws IOException if fetch operation fails
* @throws TikaException if fetcher encounters error
*/
InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException;
/**
* Gets name/identifier for this fetcher
* @return String identifying this fetcher instance
*/
String getName();
/**
* Sets name/identifier for this fetcher
* @param name Identifier for this fetcher
*/
void setName(String name);
/**
* Checks if fetcher supports specific fetch key pattern
* @param fetchKey Key to check support for
* @return true if this fetcher can handle the key
*/
boolean supports(String fetchKey);
}Base implementation providing common fetcher functionality and configuration patterns.
/**
* Abstract base class for fetcher implementations
*/
public abstract class AbstractFetcher implements Fetcher {
/**
* Creates AbstractFetcher with default settings
*/
public AbstractFetcher();
/**
* Gets fetcher name
* @return String identifier for this fetcher
*/
@Override
public String getName();
/**
* Sets fetcher name
* @param name Identifier for this fetcher
*/
@Override
public void setName(String name);
/**
* Initializes fetcher with configuration parameters
* @param params Configuration parameters
* @throws TikaConfigException if initialization fails
*/
@Override
public void initialize(Map<String, Param> params) throws TikaConfigException;
/**
* Checks initialization problems
* @param handler Problem handler for reporting issues
*/
@Override
public void checkInitialization(InitializableProblemHandler handler);
/**
* Default implementation returns true for all keys
* @param fetchKey Key to check support for
* @return true (subclasses should override for specific logic)
*/
@Override
public boolean supports(String fetchKey);
/**
* Template method for actual fetch implementation
* @param fetchKey Document key to fetch
* @param metadata Fetch metadata
* @return InputStream with document data
* @throws IOException if fetch fails
* @throws TikaException if processing error occurs
*/
@Override
public abstract InputStream fetch(String fetchKey, Metadata metadata)
throws IOException, TikaException;
}Manager for multiple fetcher instances with routing and lifecycle management.
/**
* Manager for multiple fetcher instances with routing capabilities
*/
public class FetcherManager {
/**
* Creates FetcherManager with default configuration
*/
public FetcherManager();
/**
* Creates FetcherManager from configuration
* @param config Configuration properties for fetchers
* @throws TikaException if configuration is invalid
*/
public FetcherManager(Properties config) throws TikaException;
/**
* Registers fetcher instance
* @param name Fetcher identifier
* @param fetcher Fetcher implementation to register
*/
public void addFetcher(String name, Fetcher fetcher);
/**
* Gets fetcher by name
* @param name Fetcher identifier
* @return Fetcher instance or null if not found
*/
public Fetcher getFetcher(String name);
/**
* Gets all registered fetcher names
* @return Set of fetcher identifiers
*/
public Set<String> getFetcherNames();
/**
* Removes fetcher by name
* @param name Fetcher identifier to remove
* @return Removed fetcher or null if not found
*/
public Fetcher removeFetcher(String name);
/**
* Fetches document using appropriate fetcher
* @param fetcherName Name of fetcher to use
* @param fetchKey Document key to fetch
* @param metadata Fetch metadata
* @return InputStream containing document data
* @throws IOException if fetch fails
* @throws TikaException if no suitable fetcher found
*/
public InputStream fetch(String fetcherName, String fetchKey, Metadata metadata)
throws IOException, TikaException;
/**
* Initializes all registered fetchers
* @throws TikaException if any fetcher initialization fails
*/
public void initialize() throws TikaException;
/**
* Closes all fetchers and releases resources
* @throws IOException if cleanup fails
*/
public void close() throws IOException;
}Interface for document output implementations supporting various destinations and formats.
/**
* Interface for emitting processed documents to various destinations
*/
public interface Emitter extends Initializable {
/**
* Emits processed document to destination
* @param emitKey Key identifying output destination
* @param metadata Document metadata
* @param outputStream Stream containing processed document data
* @throws IOException if emit operation fails
* @throws TikaException if emitter encounters error
*/
void emit(String emitKey, Metadata metadata, OutputStream outputStream)
throws IOException, TikaException;
/**
* Emits document content as string
* @param emitKey Output destination key
* @param metadata Document metadata
* @param content Processed document content
* @throws IOException if emit fails
* @throws TikaException if processing error occurs
*/
void emit(String emitKey, Metadata metadata, String content)
throws IOException, TikaException;
/**
* Gets name/identifier for this emitter
* @return String identifying this emitter instance
*/
String getName();
/**
* Sets name/identifier for this emitter
* @param name Identifier for this emitter
*/
void setName(String name);
/**
* Checks if emitter supports specific emit key pattern
* @param emitKey Key to check support for
* @return true if this emitter can handle the key
*/
boolean supports(String emitKey);
}Base implementation providing common emitter functionality and configuration support.
/**
* Abstract base class for emitter implementations
*/
public abstract class AbstractEmitter implements Emitter {
/**
* Creates AbstractEmitter with default settings
*/
public AbstractEmitter();
/**
* Gets emitter name
* @return String identifier for this emitter
*/
@Override
public String getName();
/**
* Sets emitter name
* @param name Identifier for this emitter
*/
@Override
public void setName(String name);
/**
* Initializes emitter with configuration parameters
* @param params Configuration parameters
* @throws TikaConfigException if initialization fails
*/
@Override
public void initialize(Map<String, Param> params) throws TikaConfigException;
/**
* Checks initialization problems
* @param handler Problem handler for reporting issues
*/
@Override
public void checkInitialization(InitializableProblemHandler handler);
/**
* Default implementation returns true for all keys
* @param emitKey Key to check support for
* @return true (subclasses should override for specific logic)
*/
@Override
public boolean supports(String emitKey);
/**
* Default string emit implementation using OutputStream version
* @param emitKey Output destination key
* @param metadata Document metadata
* @param content Document content as string
* @throws IOException if emit fails
* @throws TikaException if processing error occurs
*/
@Override
public void emit(String emitKey, Metadata metadata, String content)
throws IOException, TikaException;
/**
* Template method for actual emit implementation
* @param emitKey Output destination key
* @param metadata Document metadata
* @param outputStream Stream containing document data
* @throws IOException if emit fails
* @throws TikaException if processing error occurs
*/
@Override
public abstract void emit(String emitKey, Metadata metadata, OutputStream outputStream)
throws IOException, TikaException;
}Manager for multiple emitter instances with routing and lifecycle management.
/**
* Manager for multiple emitter instances with routing capabilities
*/
public class EmitterManager {
/**
* Creates EmitterManager with default configuration
*/
public EmitterManager();
/**
* Creates EmitterManager from configuration
* @param config Configuration properties for emitters
* @throws TikaException if configuration is invalid
*/
public EmitterManager(Properties config) throws TikaException;
/**
* Registers emitter instance
* @param name Emitter identifier
* @param emitter Emitter implementation to register
*/
public void addEmitter(String name, Emitter emitter);
/**
* Gets emitter by name
* @param name Emitter identifier
* @return Emitter instance or null if not found
*/
public Emitter getEmitter(String name);
/**
* Gets all registered emitter names
* @return Set of emitter identifiers
*/
public Set<String> getEmitterNames();
/**
* Removes emitter by name
* @param name Emitter identifier to remove
* @return Removed emitter or null if not found
*/
public Emitter removeEmitter(String name);
/**
* Emits document using appropriate emitter
* @param emitterName Name of emitter to use
* @param emitKey Output destination key
* @param metadata Document metadata
* @param outputStream Document data stream
* @throws IOException if emit fails
* @throws TikaException if no suitable emitter found
*/
public void emit(String emitterName, String emitKey, Metadata metadata, OutputStream outputStream)
throws IOException, TikaException;
/**
* Initializes all registered emitters
* @throws TikaException if any emitter initialization fails
*/
public void initialize() throws TikaException;
/**
* Closes all emitters and releases resources
* @throws IOException if cleanup fails
*/
public void close() throws IOException;
}// Configure and start pipes server
PipesConfig config = new PipesConfig();
config.setServerPort(9998);
config.setMaxConcurrentRequests(10);
config.setTimeoutMs(30000);
PipesServer server = new PipesServer(config);
server.start();
// Create client and submit processing request
PipesClient client = new PipesClient("http://localhost:9998");
FetchEmitTuple tuple = new FetchEmitTuple(
"file-fetcher", "/path/to/document.pdf",
"text-emitter", "/output/document.txt"
);
try {
PipesResult result = client.process(tuple);
if (result.isSuccess()) {
System.out.println("Processing successful");
System.out.println("Time: " + result.getProcessTimeMs() + "ms");
System.out.println("Content: " + result.getContent());
} else {
System.err.println("Processing failed: " + result.getErrorMessage());
}
} catch (IOException | TikaException e) {
System.err.println("Request failed: " + e.getMessage());
} finally {
client.close();
server.stop();
}// Prepare batch of documents for processing
List<FetchEmitTuple> batch = new ArrayList<>();
for (int i = 1; i <= 100; i++) {
FetchEmitTuple tuple = new FetchEmitTuple(
"file-fetcher", "/docs/doc" + i + ".pdf",
"search-emitter", "doc-" + i
);
batch.add(tuple);
}
// Submit batch for processing
PipesClient client = new PipesClient("http://localhost:9998");
try {
List<PipesResult> results = client.processBatch(batch);
int successful = 0;
int failed = 0;
long totalTime = 0;
for (PipesResult result : results) {
totalTime += result.getProcessTimeMs();
if (result.isSuccess()) {
successful++;
} else {
failed++;
System.err.println("Failed: " + result.getErrorMessage());
}
}
System.out.println("Batch completed:");
System.out.println(" Successful: " + successful);
System.out.println(" Failed: " + failed);
System.out.println(" Total time: " + totalTime + "ms");
System.out.println(" Average: " + (totalTime / results.size()) + "ms per doc");
} catch (IOException | TikaException e) {
System.err.println("Batch processing failed: " + e.getMessage());
}// Custom fetcher for database documents
public class DatabaseFetcher extends AbstractFetcher {
private DataSource dataSource;
private String queryTemplate;
@Override
public void initialize(Map<String, Param> params) throws TikaConfigException {
super.initialize(params);
Param dsParam = params.get("dataSource");
if (dsParam != null) {
this.dataSource = (DataSource) dsParam.getValue();
}
Param queryParam = params.get("query");
if (queryParam != null) {
this.queryTemplate = queryParam.getValue().toString();
}
}
@Override
public boolean supports(String fetchKey) {
// Support numeric document IDs
return fetchKey.matches("\\d+");
}
@Override
public InputStream fetch(String fetchKey, Metadata metadata)
throws IOException, TikaException {
try (Connection conn = dataSource.getConnection()) {
String query = queryTemplate.replace("{id}", fetchKey);
try (PreparedStatement stmt = conn.prepareStatement(query);
ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
byte[] data = rs.getBytes("document_data");
String filename = rs.getString("filename");
String mimeType = rs.getString("mime_type");
// Set metadata from database
metadata.set(Metadata.RESOURCE_NAME_KEY, filename);
metadata.set(Metadata.CONTENT_TYPE, mimeType);
return new ByteArrayInputStream(data);
} else {
throw new TikaException("Document not found: " + fetchKey);
}
}
} catch (SQLException e) {
throw new TikaException("Database error fetching document", e);
}
}
}// Custom emitter for search index
public class SearchIndexEmitter extends AbstractEmitter {
private SearchClient searchClient;
private String indexName;
@Override
public void initialize(Map<String, Param> params) throws TikaConfigException {
super.initialize(params);
Param clientParam = params.get("searchClient");
if (clientParam != null) {
this.searchClient = (SearchClient) clientParam.getValue();
}
Param indexParam = params.get("indexName");
if (indexParam != null) {
this.indexName = indexParam.getValue().toString();
}
}
@Override
public void emit(String emitKey, Metadata metadata, String content)
throws IOException, TikaException {
try {
// Create search document
SearchDocument doc = new SearchDocument();
doc.setId(emitKey);
doc.setContent(content);
// Add metadata fields
for (String name : metadata.names()) {
String[] values = metadata.getValues(name);
if (values.length == 1) {
doc.addField(name, values[0]);
} else {
doc.addField(name, Arrays.asList(values));
}
}
// Index document
searchClient.index(indexName, doc);
} catch (Exception e) {
throw new TikaException("Failed to index document: " + emitKey, e);
}
}
@Override
public void emit(String emitKey, Metadata metadata, OutputStream outputStream)
throws IOException, TikaException {
// Convert stream to string and delegate
String content = IOUtils.toString(outputStream, "UTF-8");
emit(emitKey, metadata, content);
}
}// Asynchronous batch processing with callback
public class AsyncProcessor {
public void processDocumentsAsync(List<FetchEmitTuple> documents) throws IOException {
PipesClient client = new PipesClient("http://localhost:9998");
// Create callback for handling results
PipesCallback callback = new PipesCallback() {
@Override
public void onResult(FetchEmitTuple tuple, PipesResult result) {
if (result.isSuccess()) {
System.out.println("Completed: " + tuple.getFetchKey() +
" (" + result.getProcessTimeMs() + "ms)");
} else {
System.err.println("Failed: " + tuple.getFetchKey() +
" - " + result.getErrorMessage());
}
}
@Override
public void onBatchComplete(List<PipesResult> results) {
long totalTime = results.stream()
.mapToLong(PipesResult::getProcessTimeMs)
.sum();
System.out.println("Batch completed in " + totalTime + "ms");
}
@Override
public void onError(Exception error) {
System.err.println("Batch error: " + error.getMessage());
}
};
// Submit for asynchronous processing
client.processAsync(documents, callback);
// Continue with other work while processing happens...
}
}
interface PipesCallback {
void onResult(FetchEmitTuple tuple, PipesResult result);
void onBatchComplete(List<PipesResult> results);
void onError(Exception error);
}// Complete pipes configuration setup
public class PipesSetup {
public PipesConfig createConfiguration() throws IOException, TikaException {
PipesConfig config = new PipesConfig();
// Server settings
config.setServerPort(9998);
config.setMaxConcurrentRequests(20);
config.setTimeoutMs(60000);
// Setup fetcher manager
FetcherManager fetcherManager = new FetcherManager();
fetcherManager.addFetcher("file", new FileFetcher());
fetcherManager.addFetcher("http", new HttpFetcher());
fetcherManager.addFetcher("s3", new S3Fetcher());
fetcherManager.addFetcher("database", new DatabaseFetcher());
config.setFetcherManager(fetcherManager);
// Setup emitter manager
EmitterManager emitterManager = new EmitterManager();
emitterManager.addEmitter("file", new FileEmitter());
emitterManager.addEmitter("search", new SearchIndexEmitter());
emitterManager.addEmitter("database", new DatabaseEmitter());
config.setEmitterManager(emitterManager);
// Configure Tika parsing
TikaConfig tikaConfig = TikaConfig.getDefaultConfig();
config.setTikaConfig(tikaConfig);
return config;
}
public void startServer(PipesConfig config) throws IOException, TikaException {
PipesServer server = new PipesServer(config);
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
System.out.println("Shutting down pipes server...");
server.stop();
} catch (IOException e) {
System.err.println("Error during shutdown: " + e.getMessage());
}
}));
server.start();
System.out.println("Pipes server started on port " + server.getPort());
// Monitor server statistics
new Thread(() -> {
while (server.isRunning()) {
try {
Thread.sleep(30000); // Every 30 seconds
Map<String, Object> stats = server.getStatistics();
System.out.println("Server stats: " + stats);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}).start();
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-tika--tika-core