0
# Process Forking Infrastructure
1
2
Advanced infrastructure for running document parsing operations in separate JVM processes to provide isolation, memory management, and fault tolerance. This system enables robust document processing by isolating potentially problematic parsers, managing memory limits, and providing timeout controls while maintaining seamless integration with the main Tika parsing pipeline.
3
4
## Capabilities
5
6
### Fork Parser
7
8
Main parser implementation that delegates parsing operations to separate JVM processes with connection pooling and resource management.
9
10
```java { .api }
11
/**
12
* Parser that runs actual parsing in separate JVM processes for isolation
13
*/
14
public class ForkParser implements Parser, Closeable {
15
/**
16
* Creates fork parser with default configuration
17
*/
18
public ForkParser();
19
20
/**
21
* Creates fork parser with specific class loader and delegate parser
22
* @param loader class loader for parser resources
23
* @param parser delegate parser to run in forked process
24
*/
25
public ForkParser(ClassLoader loader, Parser parser);
26
27
/**
28
* Creates fork parser with Tika binary path and parser factory
29
* @param tikaBin path to Tika binary/jars directory
30
* @param parserFactoryFactory factory for creating parser instances
31
*/
32
public ForkParser(Path tikaBin, ParserFactoryFactory parserFactoryFactory);
33
34
/**
35
* Gets Java command line arguments for forked processes
36
* @return list of JVM arguments and options
37
*/
38
public List<String> getJava();
39
40
/**
41
* Sets Java command line arguments for forked processes
42
* @param java JVM command line arguments
43
*/
44
public void setJava(List<String> java);
45
46
/**
47
* Gets connection pool size for forked processes
48
* @return number of processes in pool
49
*/
50
public int getPoolSize();
51
52
/**
53
* Sets connection pool size for forked processes
54
* @param poolSize number of processes to maintain in pool
55
*/
56
public void setPoolSize(int poolSize);
57
58
/**
59
* Gets server pulse interval for health checks
60
* @return pulse interval in milliseconds
61
*/
62
public long getServerPulseMillis();
63
64
/**
65
* Sets server pulse interval for health checks
66
* @param serverPulseMillis pulse interval in milliseconds
67
*/
68
public void setServerPulseMillis(long serverPulseMillis);
69
70
/**
71
* Gets parsing timeout for forked operations
72
* @return parsing timeout in milliseconds
73
*/
74
public long getServerParseTimeoutMillis();
75
76
/**
77
* Sets parsing timeout for forked operations
78
* @param serverParseTimeoutMillis timeout in milliseconds
79
*/
80
public void setServerParseTimeoutMillis(long serverParseTimeoutMillis);
81
82
/**
83
* Gets wait timeout for process communication
84
* @return wait timeout in milliseconds
85
*/
86
public long getServerWaitTimeoutMillis();
87
88
/**
89
* Sets wait timeout for process communication
90
* @param serverWaitTimeoutMillis timeout in milliseconds
91
*/
92
public void setServerWaitTimeoutMillis(long serverWaitTimeoutMillis);
93
}
94
```
95
96
### Fork Server and Client
97
98
Internal components for managing the client-server communication between main process and forked processes.
99
100
```java { .api }
101
/**
102
* Server running in forked process to handle parsing requests
103
*/
104
class ForkServer implements Runnable {
105
/** Protocol constants for client-server communication */
106
public static final byte ERROR = -1;
107
public static final byte DONE = 0;
108
public static final byte CALL = 1;
109
public static final byte PING = 2;
110
public static final byte RESOURCE = 3;
111
public static final byte READY = 4;
112
public static final byte FAILED_TO_START = 5;
113
public static final byte INIT_PARSER_FACTORY_FACTORY = 6;
114
public static final byte INIT_LOADER_PARSER = 7;
115
public static final byte INIT_PARSER_FACTORY_FACTORY_LOADER = 8;
116
}
117
118
/**
119
* Client in main process for communicating with forked server processes
120
*/
121
class ForkClient {
122
/**
123
* Acquires fork client from pool or creates new one
124
* @return fork client instance for parsing operations
125
*/
126
public static ForkClient acquire();
127
128
/**
129
* Releases fork client back to pool
130
* @param client client to release
131
*/
132
public static void release(ForkClient client);
133
134
/**
135
* Executes parsing operation in forked process
136
* @param resource fork resource representing the operation
137
* @return result of the parsing operation
138
*/
139
public Object execute(ForkResource resource);
140
}
141
```
142
143
### Fork Resources and Proxies
144
145
Interfaces and implementations for resource management and proxy objects in forked processes.
146
147
```java { .api }
148
/**
149
* Interface for resources that can be processed in forked processes
150
*/
151
public interface ForkResource {
152
/**
153
* Processes resource in forked process context
154
* @param input data input stream from main process
155
* @param output data output stream to main process
156
* @return throwable if processing fails, null on success
157
* @throws IOException if I/O communication fails
158
*/
159
Throwable process(DataInputStream input, DataOutputStream output) throws IOException;
160
}
161
162
/**
163
* Interface for proxy objects that can communicate across process boundaries
164
*/
165
public interface ForkProxy extends Serializable {
166
/**
167
* Initializes proxy with communication streams
168
* @param input input stream for receiving data
169
* @param output output stream for sending data
170
*/
171
void init(DataInputStream input, DataOutputStream output);
172
}
173
174
/**
175
* Proxy for content handlers in forked processes
176
*/
177
public class ContentHandlerProxy implements ForkProxy {
178
/**
179
* Creates content handler proxy
180
* @param handler target content handler
181
*/
182
public ContentHandlerProxy(ContentHandler handler);
183
}
184
185
/**
186
* Proxy for input streams in forked processes
187
*/
188
public class InputStreamProxy implements ForkProxy {
189
/**
190
* Creates input stream proxy
191
* @param stream target input stream
192
*/
193
public InputStreamProxy(InputStream stream);
194
}
195
196
/**
197
* Proxy for class loaders in forked processes
198
*/
199
public class ClassLoaderProxy implements ForkProxy {
200
/**
201
* Creates class loader proxy
202
* @param loader target class loader
203
*/
204
public ClassLoaderProxy(ClassLoader loader);
205
}
206
```
207
208
### Timeout and Configuration Management
209
210
Classes for managing timeouts and configuration in the forking infrastructure.
211
212
```java { .api }
213
/**
214
* Timeout configuration for forked operations
215
*/
216
class TimeoutLimits {
217
/**
218
* Creates timeout limits configuration
219
* @param pulseMS pulse interval for health checks
220
* @param parseTimeoutMS timeout for parsing operations
221
* @param waitTimeoutMS timeout for process communication
222
*/
223
TimeoutLimits(long pulseMS, long parseTimeoutMS, long waitTimeoutMS);
224
225
/** @return pulse interval in milliseconds */
226
public long getPulseMS();
227
228
/** @return parse timeout in milliseconds */
229
public long getParseTimeoutMS();
230
231
/** @return wait timeout in milliseconds */
232
public long getWaitTimeoutMS();
233
}
234
235
/**
236
* Factory interface for creating parser factories in forked processes
237
*/
238
public interface ParserFactoryFactory {
239
/**
240
* Creates parser factory instance
241
* @return parser factory for creating parsers
242
*/
243
ParserFactory create();
244
}
245
```
246
247
### Memory and Resource Management
248
249
Specialized components for memory management and resource handling in forked processes.
250
251
```java { .api }
252
/**
253
* URL connection for memory-based resources in forked processes
254
*/
255
public class MemoryURLConnection {
256
/**
257
* Creates memory URL connection
258
* @return connection for memory-based resources
259
*/
260
public static MemoryURLConnection create();
261
}
262
263
/**
264
* URL stream handler for memory resources
265
*/
266
public class MemoryURLStreamHandler {
267
/**
268
* Creates memory URL stream handler
269
*/
270
public MemoryURLStreamHandler();
271
}
272
273
/**
274
* Factory for memory URL stream handlers
275
*/
276
public class MemoryURLStreamHandlerFactory {
277
/**
278
* Creates stream handler factory
279
*/
280
public MemoryURLStreamHandlerFactory();
281
}
282
283
/**
284
* Record for memory URL stream data
285
*/
286
public class MemoryURLStreamRecord {
287
/**
288
* Creates memory URL stream record
289
* @param url URL for the resource
290
* @param data resource data
291
*/
292
public MemoryURLStreamRecord(URL url, byte[] data);
293
}
294
```
295
296
## Usage Examples
297
298
**Basic Fork Parser Usage:**
299
300
```java
301
import org.apache.tika.fork.ForkParser;
302
import org.apache.tika.parser.AutoDetectParser;
303
import org.apache.tika.metadata.Metadata;
304
import org.apache.tika.sax.BodyContentHandler;
305
import java.io.FileInputStream;
306
import java.io.InputStream;
307
import java.util.Arrays;
308
309
// Create fork parser with memory-limited JVM settings
310
ForkParser forkParser = new ForkParser();
311
312
// Configure JVM arguments for forked processes
313
forkParser.setJava(Arrays.asList(
314
"java",
315
"-Xmx512m", // Limit memory to 512MB
316
"-Djava.awt.headless=true", // Headless mode
317
"-XX:+UseG1GC", // Use G1 garbage collector
318
"-XX:MaxGCPauseMillis=200" // Limit GC pause time
319
));
320
321
// Configure process pool and timeouts
322
forkParser.setPoolSize(3); // 3 processes in pool
323
forkParser.setServerParseTimeoutMillis(120000); // 2-minute parse timeout
324
forkParser.setServerWaitTimeoutMillis(30000); // 30-second wait timeout
325
forkParser.setServerPulseMillis(5000); // 5-second pulse interval
326
327
// Parse document in isolated process
328
try (InputStream stream = new FileInputStream("large_document.pdf")) {
329
BodyContentHandler handler = new BodyContentHandler(-1);
330
Metadata metadata = new Metadata();
331
332
forkParser.parse(stream, handler, metadata, new ParseContext());
333
334
System.out.println("Content: " + handler.toString());
335
System.out.println("Title: " + metadata.get(TikaCoreProperties.TITLE));
336
} finally {
337
// Close fork parser and clean up processes
338
forkParser.close();
339
}
340
```
341
342
**Custom Parser in Forked Process:**
343
344
```java
345
import org.apache.tika.fork.ForkParser;
346
import org.apache.tika.parser.Parser;
347
import org.apache.tika.config.TikaConfig;
348
import java.nio.file.Paths;
349
350
// Custom parser factory for forked processes
351
public class CustomParserFactoryFactory implements ParserFactoryFactory {
352
@Override
353
public ParserFactory create() {
354
return new CustomParserFactory();
355
}
356
}
357
358
// Create fork parser with custom parser factory
359
Path tikaBinPath = Paths.get("/path/to/tika/jars");
360
CustomParserFactoryFactory factory = new CustomParserFactoryFactory();
361
ForkParser customForkParser = new ForkParser(tikaBinPath, factory);
362
363
// Configure for high-security parsing environment
364
customForkParser.setJava(Arrays.asList(
365
"java",
366
"-Xmx256m", // Very limited memory
367
"-Djava.awt.headless=true",
368
"-Djava.security.manager", // Enable security manager
369
"-Djava.security.policy=tika.policy", // Custom security policy
370
"-XX:+DisableAttachMechanism" // Disable JVM attach
371
));
372
373
customForkParser.setPoolSize(1); // Single process for security
374
customForkParser.setServerParseTimeoutMillis(60000); // Short timeout
375
376
// Use for parsing untrusted documents
377
try (InputStream stream = new FileInputStream("untrusted_document.doc")) {
378
BodyContentHandler handler = new BodyContentHandler(1000000); // 1MB limit
379
Metadata metadata = new Metadata();
380
381
customForkParser.parse(stream, handler, metadata, new ParseContext());
382
383
// Process extracted content safely
384
String content = handler.toString();
385
if (content.length() > 0) {
386
System.out.println("Successfully parsed untrusted document");
387
}
388
} finally {
389
customForkParser.close();
390
}
391
```
392
393
**Batch Processing with Fork Parser:**
394
395
```java
396
import org.apache.tika.fork.ForkParser;
397
import java.io.File;
398
import java.util.List;
399
import java.util.concurrent.ExecutorService;
400
import java.util.concurrent.Executors;
401
import java.util.concurrent.Future;
402
403
// Configure fork parser for batch processing
404
ForkParser batchForkParser = new ForkParser();
405
batchForkParser.setPoolSize(5); // Pool of 5 processes
406
batchForkParser.setServerParseTimeoutMillis(300000); // 5-minute timeout
407
batchForkParser.setJava(Arrays.asList(
408
"java",
409
"-Xmx1g", // 1GB per process
410
"-XX:+UseParallelGC", // Parallel GC for throughput
411
"-Djava.awt.headless=true"
412
));
413
414
// Process multiple documents concurrently
415
ExecutorService executor = Executors.newFixedThreadPool(10);
416
List<File> documentsToProcess = getDocumentList();
417
List<Future<String>> futures = new ArrayList<>();
418
419
for (File document : documentsToProcess) {
420
Future<String> future = executor.submit(() -> {
421
try (InputStream stream = new FileInputStream(document)) {
422
BodyContentHandler handler = new BodyContentHandler();
423
Metadata metadata = new Metadata();
424
425
// Each parsing operation runs in isolated forked process
426
batchForkParser.parse(stream, handler, metadata, new ParseContext());
427
428
return "Processed: " + document.getName() +
429
" (Title: " + metadata.get(TikaCoreProperties.TITLE) + ")";
430
} catch (Exception e) {
431
return "Failed: " + document.getName() + " - " + e.getMessage();
432
}
433
});
434
futures.add(future);
435
}
436
437
// Collect results
438
for (Future<String> future : futures) {
439
try {
440
String result = future.get();
441
System.out.println(result);
442
} catch (Exception e) {
443
System.err.println("Processing error: " + e.getMessage());
444
}
445
}
446
447
executor.shutdown();
448
batchForkParser.close();
449
```
450
451
**Error Handling and Recovery:**
452
453
```java
454
import org.apache.tika.fork.ForkParser;
455
import org.apache.tika.exception.TikaException;
456
457
// Create resilient fork parser configuration
458
ForkParser resilientParser = new ForkParser();
459
resilientParser.setPoolSize(2);
460
resilientParser.setServerParseTimeoutMillis(60000); // 1-minute timeout
461
resilientParser.setServerWaitTimeoutMillis(10000); // 10-second wait
462
463
try (InputStream stream = new FileInputStream("problematic_document.pdf")) {
464
BodyContentHandler handler = new BodyContentHandler();
465
Metadata metadata = new Metadata();
466
467
try {
468
resilientParser.parse(stream, handler, metadata, new ParseContext());
469
System.out.println("Successfully parsed document");
470
} catch (TikaException e) {
471
if (e.getMessage().contains("timeout")) {
472
System.err.println("Document parsing timed out - document may be corrupted");
473
// Fork parser automatically kills hung processes and creates new ones
474
} else if (e.getMessage().contains("OutOfMemoryError")) {
475
System.err.println("Document too large for current memory settings");
476
// Process was isolated, main JVM unaffected
477
} else {
478
System.err.println("Parsing failed: " + e.getMessage());
479
// Other parsing errors handled gracefully
480
}
481
}
482
} finally {
483
resilientParser.close();
484
}
485
```
486
487
The process forking infrastructure provides robust isolation and resource management for document parsing operations, enabling safe processing of potentially problematic documents while maintaining system stability and providing comprehensive timeout and memory management capabilities.