0
# Batch Processing (Pipes)
1
2
Batch processing framework for high-throughput document processing with configurable fetchers, emitters, and processing pipelines supporting parallel execution and error handling.
3
4
## Capabilities
5
6
### Core Pipeline Components
7
8
#### FetchEmitTuple
9
10
Core data structure representing a document processing operation with fetch source, emit target, and processing metadata.
11
12
```java { .api }
13
/**
14
* Tuple representing a document processing operation from fetch to emit
15
*/
16
public class FetchEmitTuple {
17
/**
18
* Creates FetchEmitTuple with fetch and emit identifiers
19
* @param fetcherName Name of fetcher to retrieve document
20
* @param fetchKey Key/path for document retrieval
21
* @param emitterName Name of emitter for output
22
* @param emitKey Key/path for document output
23
*/
24
public FetchEmitTuple(String fetcherName, String fetchKey, String emitterName, String emitKey);
25
26
/**
27
* Creates FetchEmitTuple with metadata
28
* @param fetcherName Fetcher identifier
29
* @param fetchKey Document fetch key
30
* @param emitterName Emitter identifier
31
* @param emitKey Document emit key
32
* @param metadata Processing metadata
33
*/
34
public FetchEmitTuple(String fetcherName, String fetchKey, String emitterName, String emitKey, Metadata metadata);
35
36
/**
37
* Gets the fetcher name for document retrieval
38
* @return String identifying which fetcher to use
39
*/
40
public String getFetcherName();
41
42
/**
43
* Gets the fetch key for document location
44
* @return String key/path identifying document to fetch
45
*/
46
public String getFetchKey();
47
48
/**
49
* Gets the emitter name for output
50
* @return String identifying which emitter to use for output
51
*/
52
public String getEmitterName();
53
54
/**
55
* Gets the emit key for output location
56
* @return String key/path for output destination
57
*/
58
public String getEmitKey();
59
60
/**
61
* Gets processing metadata
62
* @return Metadata object with processing parameters and hints
63
*/
64
public Metadata getMetadata();
65
66
/**
67
* Sets processing metadata
68
* @param metadata Metadata to associate with processing
69
*/
70
public void setMetadata(Metadata metadata);
71
72
/**
73
* Gets processing parameters as properties
74
* @return Properties containing processing configuration
75
*/
76
public Properties getProperties();
77
78
/**
79
* Sets processing parameters
80
* @param properties Processing configuration properties
81
*/
82
public void setProperties(Properties properties);
83
84
/**
85
* Gets unique identifier for this tuple
86
* @return String uniquely identifying this processing operation
87
*/
88
public String getId();
89
}
90
```
91
92
#### HandlerConfig
93
94
Configuration for content handlers used during document processing pipelines.
95
96
```java { .api }
97
/**
98
* Configuration for content handlers in processing pipeline
99
*/
100
public class HandlerConfig {
101
/**
102
* Creates HandlerConfig with handler class name
103
* @param handlerClass Fully qualified class name of handler
104
*/
105
public HandlerConfig(String handlerClass);
106
107
/**
108
* Creates HandlerConfig with class and parameters
109
* @param handlerClass Handler class name
110
* @param params Configuration parameters for handler
111
*/
112
public HandlerConfig(String handlerClass, Map<String, String> params);
113
114
/**
115
* Gets handler class name
116
* @return Fully qualified class name of content handler
117
*/
118
public String getHandlerClass();
119
120
/**
121
* Gets configuration parameters
122
* @return Map of parameter names to values
123
*/
124
public Map<String, String> getParams();
125
126
/**
127
* Sets configuration parameter
128
* @param name Parameter name
129
* @param value Parameter value
130
*/
131
public void setParam(String name, String value);
132
133
/**
134
* Gets parameter value
135
* @param name Parameter name
136
* @return Parameter value or null if not set
137
*/
138
public String getParam(String name);
139
140
/**
141
* Creates handler instance from configuration
142
* @return ContentHandler instance configured with parameters
143
* @throws TikaException if handler cannot be created
144
*/
145
public ContentHandler createHandler() throws TikaException;
146
}
147
```
148
149
### Client-Server Architecture
150
151
#### PipesClient
152
153
Client interface for submitting document processing requests to Tika pipes server.
154
155
```java { .api }
156
/**
157
* Client for submitting document processing requests to Tika pipes server
158
*/
159
public class PipesClient {
160
/**
161
* Creates PipesClient with server URL
162
* @param serverUrl URL of Tika pipes server
163
*/
164
public PipesClient(String serverUrl);
165
166
/**
167
* Creates PipesClient with configuration
168
* @param config Client configuration properties
169
*/
170
public PipesClient(PipesConfig config);
171
172
/**
173
* Submits single document for processing
174
* @param tuple FetchEmitTuple defining processing operation
175
* @return PipesResult containing processing outcome
176
* @throws IOException if communication with server fails
177
* @throws TikaException if processing fails
178
*/
179
public PipesResult process(FetchEmitTuple tuple) throws IOException, TikaException;
180
181
/**
182
* Submits batch of documents for processing
183
* @param tuples List of FetchEmitTuple objects to process
184
* @return List of PipesResult objects with processing outcomes
185
* @throws IOException if communication fails
186
* @throws TikaException if batch processing fails
187
*/
188
public List<PipesResult> processBatch(List<FetchEmitTuple> tuples) throws IOException, TikaException;
189
190
/**
191
* Submits documents for asynchronous processing
192
* @param tuples Documents to process asynchronously
193
* @param callback Callback for processing results
194
* @throws IOException if submission fails
195
*/
196
public void processAsync(List<FetchEmitTuple> tuples, PipesCallback callback) throws IOException;
197
198
/**
199
* Gets server status and capabilities
200
* @return Map containing server status information
201
* @throws IOException if server communication fails
202
*/
203
public Map<String, Object> getServerStatus() throws IOException;
204
205
/**
206
* Checks if server is available and responsive
207
* @return true if server is ready to accept requests
208
*/
209
public boolean isServerAvailable();
210
211
/**
212
* Sets request timeout
213
* @param timeoutMs Timeout in milliseconds
214
*/
215
public void setTimeout(int timeoutMs);
216
217
/**
218
* Gets current request timeout
219
* @return Timeout in milliseconds
220
*/
221
public int getTimeout();
222
223
/**
224
* Closes client and releases resources
225
* @throws IOException if cleanup fails
226
*/
227
public void close() throws IOException;
228
}
229
```
230
231
#### PipesServer
232
233
Server implementation for handling document processing requests with configurable concurrency and resource management.
234
235
```java { .api }
236
/**
237
* Server for handling batch document processing requests
238
*/
239
public class PipesServer {
240
/**
241
* Creates PipesServer with configuration
242
* @param config Server configuration
243
*/
244
public PipesServer(PipesConfig config);
245
246
/**
247
* Starts server and begins accepting requests
248
* @throws IOException if server cannot be started
249
* @throws TikaException if configuration is invalid
250
*/
251
public void start() throws IOException, TikaException;
252
253
/**
254
* Stops server gracefully
255
* @throws IOException if shutdown fails
256
*/
257
public void stop() throws IOException;
258
259
/**
260
* Checks if server is running
261
* @return true if server is started and accepting requests
262
*/
263
public boolean isRunning();
264
265
/**
266
* Gets server port number
267
* @return Port number server is listening on
268
*/
269
public int getPort();
270
271
/**
272
* Gets server configuration
273
* @return PipesConfig used by this server
274
*/
275
public PipesConfig getConfig();
276
277
/**
278
* Gets current server statistics
279
* @return Map containing processing statistics
280
*/
281
public Map<String, Object> getStatistics();
282
283
/**
284
* Processes single document request
285
* @param tuple Document processing request
286
* @return PipesResult with processing outcome
287
* @throws TikaException if processing fails
288
* @throws IOException if I/O error occurs
289
*/
290
public PipesResult process(FetchEmitTuple tuple) throws TikaException, IOException;
291
292
/**
293
* Sets maximum concurrent processing threads
294
* @param maxThreads Maximum number of processing threads
295
*/
296
public void setMaxConcurrentRequests(int maxThreads);
297
298
/**
299
* Gets maximum concurrent requests
300
* @return Maximum number of concurrent processing requests
301
*/
302
public int getMaxConcurrentRequests();
303
}
304
```
305
306
#### PipesResult
307
308
Result object containing outcome of document processing operation with status, metadata, and error information.
309
310
```java { .api }
311
/**
312
* Result of document processing operation in pipes framework
313
*/
314
public class PipesResult {
315
/**
316
* Processing status enumeration
317
*/
318
public enum STATUS {
319
SUCCESS, // Processing completed successfully
320
PARSE_EXCEPTION, // Parse error occurred
321
TIMEOUT, // Processing timed out
322
OOM, // Out of memory error
323
NO_SUCH_FILE, // Input file not found
324
EMIT_EXCEPTION, // Error during emit phase
325
FETCHER_INITIALIZATION_EXCEPTION, // Fetcher setup failed
326
EMITTER_INITIALIZATION_EXCEPTION, // Emitter setup failed
327
INTERRUPTED_EXCEPTION // Processing was interrupted
328
}
329
330
/**
331
* Creates PipesResult with status
332
* @param status Processing status
333
*/
334
public PipesResult(STATUS status);
335
336
/**
337
* Creates PipesResult with status and processing time
338
* @param status Processing status
339
* @param processTimeMs Processing time in milliseconds
340
*/
341
public PipesResult(STATUS status, long processTimeMs);
342
343
/**
344
* Gets processing status
345
* @return STATUS indicating processing outcome
346
*/
347
public STATUS getStatus();
348
349
/**
350
* Gets processing time
351
* @return Processing duration in milliseconds
352
*/
353
public long getProcessTimeMs();
354
355
/**
356
* Gets exception that occurred during processing
357
* @return Exception object or null if no error
358
*/
359
public Exception getException();
360
361
/**
362
* Sets exception information
363
* @param exception Exception that occurred
364
*/
365
public void setException(Exception exception);
366
367
/**
368
* Gets extracted metadata
369
* @return Metadata extracted during processing
370
*/
371
public Metadata getMetadata();
372
373
/**
374
* Sets extracted metadata
375
* @param metadata Metadata from processing
376
*/
377
public void setMetadata(Metadata metadata);
378
379
/**
380
* Gets extracted content
381
* @return Text content extracted from document
382
*/
383
public String getContent();
384
385
/**
386
* Sets extracted content
387
* @param content Text content from processing
388
*/
389
public void setContent(String content);
390
391
/**
392
* Checks if processing was successful
393
* @return true if status is SUCCESS
394
*/
395
public boolean isSuccess();
396
397
/**
398
* Gets error message if processing failed
399
* @return Error message or null if successful
400
*/
401
public String getErrorMessage();
402
403
/**
404
* Gets processing statistics
405
* @return Map containing detailed processing metrics
406
*/
407
public Map<String, Object> getStatistics();
408
}
409
```
410
411
### Configuration Classes
412
413
#### PipesConfig
414
415
Main configuration class for pipes framework with settings for fetchers, emitters, processing, and server options.
416
417
```java { .api }
418
/**
419
* Configuration for Tika pipes batch processing framework
420
*/
421
public class PipesConfig extends PipesConfigBase {
422
/**
423
* Creates default PipesConfig
424
*/
425
public PipesConfig();
426
427
/**
428
* Creates PipesConfig from properties file
429
* @param configFile Properties file containing configuration
430
* @throws IOException if config file cannot be read
431
*/
432
public PipesConfig(File configFile) throws IOException;
433
434
/**
435
* Creates PipesConfig from input stream
436
* @param configStream Stream containing configuration properties
437
* @throws IOException if stream cannot be read
438
*/
439
public PipesConfig(InputStream configStream) throws IOException;
440
441
/**
442
* Gets server port number
443
* @return Port number for pipes server
444
*/
445
public int getServerPort();
446
447
/**
448
* Sets server port number
449
* @param port Port number for server to listen on
450
*/
451
public void setServerPort(int port);
452
453
/**
454
* Gets maximum concurrent processing threads
455
* @return Maximum number of concurrent requests
456
*/
457
public int getMaxConcurrentRequests();
458
459
/**
460
* Sets maximum concurrent processing threads
461
* @param maxThreads Maximum concurrent requests
462
*/
463
public void setMaxConcurrentRequests(int maxThreads);
464
465
/**
466
* Gets processing timeout in milliseconds
467
* @return Timeout for individual document processing
468
*/
469
public long getTimeoutMs();
470
471
/**
472
* Sets processing timeout
473
* @param timeoutMs Timeout in milliseconds
474
*/
475
public void setTimeoutMs(long timeoutMs);
476
477
/**
478
* Gets fetcher manager configuration
479
* @return FetcherManager for document retrieval
480
*/
481
public FetcherManager getFetcherManager();
482
483
/**
484
* Sets fetcher manager
485
* @param fetcherManager Manager for document fetchers
486
*/
487
public void setFetcherManager(FetcherManager fetcherManager);
488
489
/**
490
* Gets emitter manager configuration
491
* @return EmitterManager for document output
492
*/
493
public EmitterManager getEmitterManager();
494
495
/**
496
* Sets emitter manager
497
* @param emitterManager Manager for document emitters
498
*/
499
public void setEmitterManager(EmitterManager emitterManager);
500
501
/**
502
* Gets TikaConfig for parsing configuration
503
* @return TikaConfig with parser and detector settings
504
*/
505
public TikaConfig getTikaConfig();
506
507
/**
508
* Sets TikaConfig for parsing
509
* @param tikaConfig Tika configuration for parsers
510
*/
511
public void setTikaConfig(TikaConfig tikaConfig);
512
}
513
```
514
515
#### PipesConfigBase
516
517
Base configuration class with common settings and initialization patterns.
518
519
```java { .api }
520
/**
521
* Base configuration class for pipes components
522
*/
523
public abstract class PipesConfigBase {
524
/**
525
* Initializes configuration from properties
526
* @param properties Configuration properties
527
* @throws TikaException if initialization fails
528
*/
529
public void initialize(Properties properties) throws TikaException;
530
531
/**
532
* Gets configuration property value
533
* @param key Property key
534
* @return Property value or null if not set
535
*/
536
public String getProperty(String key);
537
538
/**
539
* Gets configuration property with default value
540
* @param key Property key
541
* @param defaultValue Default value if property not set
542
* @return Property value or default if not set
543
*/
544
public String getProperty(String key, String defaultValue);
545
546
/**
547
* Sets configuration property
548
* @param key Property key
549
* @param value Property value
550
*/
551
public void setProperty(String key, String value);
552
553
/**
554
* Gets all configuration properties
555
* @return Properties object with all settings
556
*/
557
public Properties getProperties();
558
559
/**
560
* Validates configuration settings
561
* @throws TikaException if configuration is invalid
562
*/
563
public void validate() throws TikaException;
564
}
565
```
566
567
## Fetcher Framework
568
569
### Fetcher Interface
570
571
Interface for document retrieval implementations supporting various data sources and protocols.
572
573
```java { .api }
574
/**
575
* Interface for fetching documents from various sources
576
*/
577
public interface Fetcher extends Initializable {
578
/**
579
* Fetches document from source
580
* @param fetchKey Key identifying document to fetch
581
* @param metadata Metadata for fetch operation
582
* @return InputStream containing document data
583
* @throws IOException if fetch operation fails
584
* @throws TikaException if fetcher encounters error
585
*/
586
InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException;
587
588
/**
589
* Gets name/identifier for this fetcher
590
* @return String identifying this fetcher instance
591
*/
592
String getName();
593
594
/**
595
* Sets name/identifier for this fetcher
596
* @param name Identifier for this fetcher
597
*/
598
void setName(String name);
599
600
/**
601
* Checks if fetcher supports specific fetch key pattern
602
* @param fetchKey Key to check support for
603
* @return true if this fetcher can handle the key
604
*/
605
boolean supports(String fetchKey);
606
}
607
```
608
609
### AbstractFetcher
610
611
Base implementation providing common fetcher functionality and configuration patterns.
612
613
```java { .api }
614
/**
615
* Abstract base class for fetcher implementations
616
*/
617
public abstract class AbstractFetcher implements Fetcher {
618
/**
619
* Creates AbstractFetcher with default settings
620
*/
621
public AbstractFetcher();
622
623
/**
624
* Gets fetcher name
625
* @return String identifier for this fetcher
626
*/
627
@Override
628
public String getName();
629
630
/**
631
* Sets fetcher name
632
* @param name Identifier for this fetcher
633
*/
634
@Override
635
public void setName(String name);
636
637
/**
638
* Initializes fetcher with configuration parameters
639
* @param params Configuration parameters
640
* @throws TikaConfigException if initialization fails
641
*/
642
@Override
643
public void initialize(Map<String, Param> params) throws TikaConfigException;
644
645
/**
646
* Checks initialization problems
647
* @param handler Problem handler for reporting issues
648
*/
649
@Override
650
public void checkInitialization(InitializableProblemHandler handler);
651
652
/**
653
* Default implementation returns true for all keys
654
* @param fetchKey Key to check support for
655
* @return true (subclasses should override for specific logic)
656
*/
657
@Override
658
public boolean supports(String fetchKey);
659
660
/**
661
* Template method for actual fetch implementation
662
* @param fetchKey Document key to fetch
663
* @param metadata Fetch metadata
664
* @return InputStream with document data
665
* @throws IOException if fetch fails
666
* @throws TikaException if processing error occurs
667
*/
668
@Override
669
public abstract InputStream fetch(String fetchKey, Metadata metadata)
670
throws IOException, TikaException;
671
}
672
```
673
674
### FetcherManager
675
676
Manager for multiple fetcher instances with routing and lifecycle management.
677
678
```java { .api }
679
/**
680
* Manager for multiple fetcher instances with routing capabilities
681
*/
682
public class FetcherManager {
683
/**
684
* Creates FetcherManager with default configuration
685
*/
686
public FetcherManager();
687
688
/**
689
* Creates FetcherManager from configuration
690
* @param config Configuration properties for fetchers
691
* @throws TikaException if configuration is invalid
692
*/
693
public FetcherManager(Properties config) throws TikaException;
694
695
/**
696
* Registers fetcher instance
697
* @param name Fetcher identifier
698
* @param fetcher Fetcher implementation to register
699
*/
700
public void addFetcher(String name, Fetcher fetcher);
701
702
/**
703
* Gets fetcher by name
704
* @param name Fetcher identifier
705
* @return Fetcher instance or null if not found
706
*/
707
public Fetcher getFetcher(String name);
708
709
/**
710
* Gets all registered fetcher names
711
* @return Set of fetcher identifiers
712
*/
713
public Set<String> getFetcherNames();
714
715
/**
716
* Removes fetcher by name
717
* @param name Fetcher identifier to remove
718
* @return Removed fetcher or null if not found
719
*/
720
public Fetcher removeFetcher(String name);
721
722
/**
723
* Fetches document using appropriate fetcher
724
* @param fetcherName Name of fetcher to use
725
* @param fetchKey Document key to fetch
726
* @param metadata Fetch metadata
727
* @return InputStream containing document data
728
* @throws IOException if fetch fails
729
* @throws TikaException if no suitable fetcher found
730
*/
731
public InputStream fetch(String fetcherName, String fetchKey, Metadata metadata)
732
throws IOException, TikaException;
733
734
/**
735
* Initializes all registered fetchers
736
* @throws TikaException if any fetcher initialization fails
737
*/
738
public void initialize() throws TikaException;
739
740
/**
741
* Closes all fetchers and releases resources
742
* @throws IOException if cleanup fails
743
*/
744
public void close() throws IOException;
745
}
746
```
747
748
## Emitter Framework
749
750
### Emitter Interface
751
752
Interface for document output implementations supporting various destinations and formats.
753
754
```java { .api }
755
/**
756
* Interface for emitting processed documents to various destinations
757
*/
758
public interface Emitter extends Initializable {
759
/**
760
* Emits processed document to destination
761
* @param emitKey Key identifying output destination
762
* @param metadata Document metadata
763
* @param outputStream Stream containing processed document data
764
* @throws IOException if emit operation fails
765
* @throws TikaException if emitter encounters error
766
*/
767
void emit(String emitKey, Metadata metadata, OutputStream outputStream)
768
throws IOException, TikaException;
769
770
/**
771
* Emits document content as string
772
* @param emitKey Output destination key
773
* @param metadata Document metadata
774
* @param content Processed document content
775
* @throws IOException if emit fails
776
* @throws TikaException if processing error occurs
777
*/
778
void emit(String emitKey, Metadata metadata, String content)
779
throws IOException, TikaException;
780
781
/**
782
* Gets name/identifier for this emitter
783
* @return String identifying this emitter instance
784
*/
785
String getName();
786
787
/**
788
* Sets name/identifier for this emitter
789
* @param name Identifier for this emitter
790
*/
791
void setName(String name);
792
793
/**
794
* Checks if emitter supports specific emit key pattern
795
* @param emitKey Key to check support for
796
* @return true if this emitter can handle the key
797
*/
798
boolean supports(String emitKey);
799
}
800
```
801
802
### AbstractEmitter
803
804
Base implementation providing common emitter functionality and configuration support.
805
806
```java { .api }
807
/**
808
* Abstract base class for emitter implementations
809
*/
810
public abstract class AbstractEmitter implements Emitter {
811
/**
812
* Creates AbstractEmitter with default settings
813
*/
814
public AbstractEmitter();
815
816
/**
817
* Gets emitter name
818
* @return String identifier for this emitter
819
*/
820
@Override
821
public String getName();
822
823
/**
824
* Sets emitter name
825
* @param name Identifier for this emitter
826
*/
827
@Override
828
public void setName(String name);
829
830
/**
831
* Initializes emitter with configuration parameters
832
* @param params Configuration parameters
833
* @throws TikaConfigException if initialization fails
834
*/
835
@Override
836
public void initialize(Map<String, Param> params) throws TikaConfigException;
837
838
/**
839
* Checks initialization problems
840
* @param handler Problem handler for reporting issues
841
*/
842
@Override
843
public void checkInitialization(InitializableProblemHandler handler);
844
845
/**
846
* Default implementation returns true for all keys
847
* @param emitKey Key to check support for
848
* @return true (subclasses should override for specific logic)
849
*/
850
@Override
851
public boolean supports(String emitKey);
852
853
/**
854
* Default string emit implementation using OutputStream version
855
* @param emitKey Output destination key
856
* @param metadata Document metadata
857
* @param content Document content as string
858
* @throws IOException if emit fails
859
* @throws TikaException if processing error occurs
860
*/
861
@Override
862
public void emit(String emitKey, Metadata metadata, String content)
863
throws IOException, TikaException;
864
865
/**
866
* Template method for actual emit implementation
867
* @param emitKey Output destination key
868
* @param metadata Document metadata
869
* @param outputStream Stream containing document data
870
* @throws IOException if emit fails
871
* @throws TikaException if processing error occurs
872
*/
873
@Override
874
public abstract void emit(String emitKey, Metadata metadata, OutputStream outputStream)
875
throws IOException, TikaException;
876
}
877
```
878
879
### EmitterManager
880
881
Manager for multiple emitter instances with routing and lifecycle management.
882
883
```java { .api }
884
/**
885
* Manager for multiple emitter instances with routing capabilities
886
*/
887
public class EmitterManager {
888
/**
889
* Creates EmitterManager with default configuration
890
*/
891
public EmitterManager();
892
893
/**
894
* Creates EmitterManager from configuration
895
* @param config Configuration properties for emitters
896
* @throws TikaException if configuration is invalid
897
*/
898
public EmitterManager(Properties config) throws TikaException;
899
900
/**
901
* Registers emitter instance
902
* @param name Emitter identifier
903
* @param emitter Emitter implementation to register
904
*/
905
public void addEmitter(String name, Emitter emitter);
906
907
/**
908
* Gets emitter by name
909
* @param name Emitter identifier
910
* @return Emitter instance or null if not found
911
*/
912
public Emitter getEmitter(String name);
913
914
/**
915
* Gets all registered emitter names
916
* @return Set of emitter identifiers
917
*/
918
public Set<String> getEmitterNames();
919
920
/**
921
* Removes emitter by name
922
* @param name Emitter identifier to remove
923
* @return Removed emitter or null if not found
924
*/
925
public Emitter removeEmitter(String name);
926
927
/**
928
* Emits document using appropriate emitter
929
* @param emitterName Name of emitter to use
930
* @param emitKey Output destination key
931
* @param metadata Document metadata
932
* @param outputStream Document data stream
933
* @throws IOException if emit fails
934
* @throws TikaException if no suitable emitter found
935
*/
936
public void emit(String emitterName, String emitKey, Metadata metadata, OutputStream outputStream)
937
throws IOException, TikaException;
938
939
/**
940
* Initializes all registered emitters
941
* @throws TikaException if any emitter initialization fails
942
*/
943
public void initialize() throws TikaException;
944
945
/**
946
* Closes all emitters and releases resources
947
* @throws IOException if cleanup fails
948
*/
949
public void close() throws IOException;
950
}
951
```
952
953
## Usage Examples
954
955
### Basic Pipes Processing
956
957
```java { .api }
958
// Configure and start pipes server
959
PipesConfig config = new PipesConfig();
960
config.setServerPort(9998);
961
config.setMaxConcurrentRequests(10);
962
config.setTimeoutMs(30000);
963
964
PipesServer server = new PipesServer(config);
965
server.start();
966
967
// Create client and submit processing request
968
PipesClient client = new PipesClient("http://localhost:9998");
969
970
FetchEmitTuple tuple = new FetchEmitTuple(
971
"file-fetcher", "/path/to/document.pdf",
972
"text-emitter", "/output/document.txt"
973
);
974
975
try {
976
PipesResult result = client.process(tuple);
977
978
if (result.isSuccess()) {
979
System.out.println("Processing successful");
980
System.out.println("Time: " + result.getProcessTimeMs() + "ms");
981
System.out.println("Content: " + result.getContent());
982
} else {
983
System.err.println("Processing failed: " + result.getErrorMessage());
984
}
985
986
} catch (IOException | TikaException e) {
987
System.err.println("Request failed: " + e.getMessage());
988
989
} finally {
990
client.close();
991
server.stop();
992
}
993
```
994
995
### Batch Document Processing
996
997
```java { .api }
998
// Prepare batch of documents for processing
999
List<FetchEmitTuple> batch = new ArrayList<>();
1000
1001
for (int i = 1; i <= 100; i++) {
1002
FetchEmitTuple tuple = new FetchEmitTuple(
1003
"file-fetcher", "/docs/doc" + i + ".pdf",
1004
"search-emitter", "doc-" + i
1005
);
1006
batch.add(tuple);
1007
}
1008
1009
// Submit batch for processing
1010
PipesClient client = new PipesClient("http://localhost:9998");
1011
1012
try {
1013
List<PipesResult> results = client.processBatch(batch);
1014
1015
int successful = 0;
1016
int failed = 0;
1017
long totalTime = 0;
1018
1019
for (PipesResult result : results) {
1020
totalTime += result.getProcessTimeMs();
1021
1022
if (result.isSuccess()) {
1023
successful++;
1024
} else {
1025
failed++;
1026
System.err.println("Failed: " + result.getErrorMessage());
1027
}
1028
}
1029
1030
System.out.println("Batch completed:");
1031
System.out.println(" Successful: " + successful);
1032
System.out.println(" Failed: " + failed);
1033
System.out.println(" Total time: " + totalTime + "ms");
1034
System.out.println(" Average: " + (totalTime / results.size()) + "ms per doc");
1035
1036
} catch (IOException | TikaException e) {
1037
System.err.println("Batch processing failed: " + e.getMessage());
1038
}
1039
```
1040
1041
### Custom Fetcher Implementation
1042
1043
```java { .api }
1044
// Custom fetcher for database documents
1045
public class DatabaseFetcher extends AbstractFetcher {
1046
1047
private DataSource dataSource;
1048
private String queryTemplate;
1049
1050
@Override
1051
public void initialize(Map<String, Param> params) throws TikaConfigException {
1052
super.initialize(params);
1053
1054
Param dsParam = params.get("dataSource");
1055
if (dsParam != null) {
1056
this.dataSource = (DataSource) dsParam.getValue();
1057
}
1058
1059
Param queryParam = params.get("query");
1060
if (queryParam != null) {
1061
this.queryTemplate = queryParam.getValue().toString();
1062
}
1063
}
1064
1065
@Override
1066
public boolean supports(String fetchKey) {
1067
// Support numeric document IDs
1068
return fetchKey.matches("\\d+");
1069
}
1070
1071
@Override
1072
public InputStream fetch(String fetchKey, Metadata metadata)
1073
throws IOException, TikaException {
1074
1075
try (Connection conn = dataSource.getConnection()) {
1076
String query = queryTemplate.replace("{id}", fetchKey);
1077
1078
try (PreparedStatement stmt = conn.prepareStatement(query);
1079
ResultSet rs = stmt.executeQuery()) {
1080
1081
if (rs.next()) {
1082
byte[] data = rs.getBytes("document_data");
1083
String filename = rs.getString("filename");
1084
String mimeType = rs.getString("mime_type");
1085
1086
// Set metadata from database
1087
metadata.set(Metadata.RESOURCE_NAME_KEY, filename);
1088
metadata.set(Metadata.CONTENT_TYPE, mimeType);
1089
1090
return new ByteArrayInputStream(data);
1091
} else {
1092
throw new TikaException("Document not found: " + fetchKey);
1093
}
1094
}
1095
1096
} catch (SQLException e) {
1097
throw new TikaException("Database error fetching document", e);
1098
}
1099
}
1100
}
1101
```
1102
1103
### Custom Emitter Implementation
1104
1105
```java { .api }
1106
// Custom emitter for search index
1107
public class SearchIndexEmitter extends AbstractEmitter {
1108
1109
private SearchClient searchClient;
1110
private String indexName;
1111
1112
@Override
1113
public void initialize(Map<String, Param> params) throws TikaConfigException {
1114
super.initialize(params);
1115
1116
Param clientParam = params.get("searchClient");
1117
if (clientParam != null) {
1118
this.searchClient = (SearchClient) clientParam.getValue();
1119
}
1120
1121
Param indexParam = params.get("indexName");
1122
if (indexParam != null) {
1123
this.indexName = indexParam.getValue().toString();
1124
}
1125
}
1126
1127
@Override
1128
public void emit(String emitKey, Metadata metadata, String content)
1129
throws IOException, TikaException {
1130
1131
try {
1132
// Create search document
1133
SearchDocument doc = new SearchDocument();
1134
doc.setId(emitKey);
1135
doc.setContent(content);
1136
1137
// Add metadata fields
1138
for (String name : metadata.names()) {
1139
String[] values = metadata.getValues(name);
1140
if (values.length == 1) {
1141
doc.addField(name, values[0]);
1142
} else {
1143
doc.addField(name, Arrays.asList(values));
1144
}
1145
}
1146
1147
// Index document
1148
searchClient.index(indexName, doc);
1149
1150
} catch (Exception e) {
1151
throw new TikaException("Failed to index document: " + emitKey, e);
1152
}
1153
}
1154
1155
@Override
1156
public void emit(String emitKey, Metadata metadata, OutputStream outputStream)
1157
throws IOException, TikaException {
1158
1159
// Convert stream to string and delegate
1160
String content = IOUtils.toString(outputStream, "UTF-8");
1161
emit(emitKey, metadata, content);
1162
}
1163
}
1164
```
1165
1166
### Asynchronous Processing
1167
1168
```java { .api }
1169
// Asynchronous batch processing with callback
1170
public class AsyncProcessor {
1171
1172
public void processDocumentsAsync(List<FetchEmitTuple> documents) throws IOException {
1173
PipesClient client = new PipesClient("http://localhost:9998");
1174
1175
// Create callback for handling results
1176
PipesCallback callback = new PipesCallback() {
1177
@Override
1178
public void onResult(FetchEmitTuple tuple, PipesResult result) {
1179
if (result.isSuccess()) {
1180
System.out.println("Completed: " + tuple.getFetchKey() +
1181
" (" + result.getProcessTimeMs() + "ms)");
1182
} else {
1183
System.err.println("Failed: " + tuple.getFetchKey() +
1184
" - " + result.getErrorMessage());
1185
}
1186
}
1187
1188
@Override
1189
public void onBatchComplete(List<PipesResult> results) {
1190
long totalTime = results.stream()
1191
.mapToLong(PipesResult::getProcessTimeMs)
1192
.sum();
1193
1194
System.out.println("Batch completed in " + totalTime + "ms");
1195
}
1196
1197
@Override
1198
public void onError(Exception error) {
1199
System.err.println("Batch error: " + error.getMessage());
1200
}
1201
};
1202
1203
// Submit for asynchronous processing
1204
client.processAsync(documents, callback);
1205
1206
// Continue with other work while processing happens...
1207
}
1208
}
1209
1210
interface PipesCallback {
1211
void onResult(FetchEmitTuple tuple, PipesResult result);
1212
void onBatchComplete(List<PipesResult> results);
1213
void onError(Exception error);
1214
}
1215
```
1216
1217
### Configuration and Management
1218
1219
```java { .api }
1220
// Complete pipes configuration setup
1221
public class PipesSetup {
1222
1223
public PipesConfig createConfiguration() throws IOException, TikaException {
1224
PipesConfig config = new PipesConfig();
1225
1226
// Server settings
1227
config.setServerPort(9998);
1228
config.setMaxConcurrentRequests(20);
1229
config.setTimeoutMs(60000);
1230
1231
// Setup fetcher manager
1232
FetcherManager fetcherManager = new FetcherManager();
1233
fetcherManager.addFetcher("file", new FileFetcher());
1234
fetcherManager.addFetcher("http", new HttpFetcher());
1235
fetcherManager.addFetcher("s3", new S3Fetcher());
1236
fetcherManager.addFetcher("database", new DatabaseFetcher());
1237
1238
config.setFetcherManager(fetcherManager);
1239
1240
// Setup emitter manager
1241
EmitterManager emitterManager = new EmitterManager();
1242
emitterManager.addEmitter("file", new FileEmitter());
1243
emitterManager.addEmitter("search", new SearchIndexEmitter());
1244
emitterManager.addEmitter("database", new DatabaseEmitter());
1245
1246
config.setEmitterManager(emitterManager);
1247
1248
// Configure Tika parsing
1249
TikaConfig tikaConfig = TikaConfig.getDefaultConfig();
1250
config.setTikaConfig(tikaConfig);
1251
1252
return config;
1253
}
1254
1255
public void startServer(PipesConfig config) throws IOException, TikaException {
1256
PipesServer server = new PipesServer(config);
1257
1258
// Add shutdown hook
1259
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
1260
try {
1261
System.out.println("Shutting down pipes server...");
1262
server.stop();
1263
} catch (IOException e) {
1264
System.err.println("Error during shutdown: " + e.getMessage());
1265
}
1266
}));
1267
1268
server.start();
1269
System.out.println("Pipes server started on port " + server.getPort());
1270
1271
// Monitor server statistics
1272
new Thread(() -> {
1273
while (server.isRunning()) {
1274
try {
1275
Thread.sleep(30000); // Every 30 seconds
1276
1277
Map<String, Object> stats = server.getStatistics();
1278
System.out.println("Server stats: " + stats);
1279
1280
} catch (InterruptedException e) {
1281
Thread.currentThread().interrupt();
1282
break;
1283
}
1284
}
1285
}).start();
1286
}
1287
}
1288
```