or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcontent-processing.mddetection.mdembedded-extraction.mdembedding.mdexceptions.mdindex.mdio-utilities.mdlanguage.mdmetadata.mdmime-types.mdparsing.mdpipes.mdprocess-forking.mdrendering.md

pipes.mddocs/

0

# Batch Processing (Pipes)

1

2

Batch processing framework for high-throughput document processing with configurable fetchers, emitters, and processing pipelines supporting parallel execution and error handling.

3

4

## Capabilities

5

6

### Core Pipeline Components

7

8

#### FetchEmitTuple

9

10

Core data structure representing a document processing operation with fetch source, emit target, and processing metadata.

11

12

```java { .api }

13

/**

14

* Tuple representing a document processing operation from fetch to emit

15

*/

16

public class FetchEmitTuple {

17

/**

18

* Creates FetchEmitTuple with fetch and emit identifiers

19

* @param fetcherName Name of fetcher to retrieve document

20

* @param fetchKey Key/path for document retrieval

21

* @param emitterName Name of emitter for output

22

* @param emitKey Key/path for document output

23

*/

24

public FetchEmitTuple(String fetcherName, String fetchKey, String emitterName, String emitKey);

25

26

/**

27

* Creates FetchEmitTuple with metadata

28

* @param fetcherName Fetcher identifier

29

* @param fetchKey Document fetch key

30

* @param emitterName Emitter identifier

31

* @param emitKey Document emit key

32

* @param metadata Processing metadata

33

*/

34

public FetchEmitTuple(String fetcherName, String fetchKey, String emitterName, String emitKey, Metadata metadata);

35

36

/**

37

* Gets the fetcher name for document retrieval

38

* @return String identifying which fetcher to use

39

*/

40

public String getFetcherName();

41

42

/**

43

* Gets the fetch key for document location

44

* @return String key/path identifying document to fetch

45

*/

46

public String getFetchKey();

47

48

/**

49

* Gets the emitter name for output

50

* @return String identifying which emitter to use for output

51

*/

52

public String getEmitterName();

53

54

/**

55

* Gets the emit key for output location

56

* @return String key/path for output destination

57

*/

58

public String getEmitKey();

59

60

/**

61

* Gets processing metadata

62

* @return Metadata object with processing parameters and hints

63

*/

64

public Metadata getMetadata();

65

66

/**

67

* Sets processing metadata

68

* @param metadata Metadata to associate with processing

69

*/

70

public void setMetadata(Metadata metadata);

71

72

/**

73

* Gets processing parameters as properties

74

* @return Properties containing processing configuration

75

*/

76

public Properties getProperties();

77

78

/**

79

* Sets processing parameters

80

* @param properties Processing configuration properties

81

*/

82

public void setProperties(Properties properties);

83

84

/**

85

* Gets unique identifier for this tuple

86

* @return String uniquely identifying this processing operation

87

*/

88

public String getId();

89

}

90

```

91

92

#### HandlerConfig

93

94

Configuration for content handlers used during document processing pipelines.

95

96

```java { .api }

97

/**

98

* Configuration for content handlers in processing pipeline

99

*/

100

public class HandlerConfig {

101

/**

102

* Creates HandlerConfig with handler class name

103

* @param handlerClass Fully qualified class name of handler

104

*/

105

public HandlerConfig(String handlerClass);

106

107

/**

108

* Creates HandlerConfig with class and parameters

109

* @param handlerClass Handler class name

110

* @param params Configuration parameters for handler

111

*/

112

public HandlerConfig(String handlerClass, Map<String, String> params);

113

114

/**

115

* Gets handler class name

116

* @return Fully qualified class name of content handler

117

*/

118

public String getHandlerClass();

119

120

/**

121

* Gets configuration parameters

122

* @return Map of parameter names to values

123

*/

124

public Map<String, String> getParams();

125

126

/**

127

* Sets configuration parameter

128

* @param name Parameter name

129

* @param value Parameter value

130

*/

131

public void setParam(String name, String value);

132

133

/**

134

* Gets parameter value

135

* @param name Parameter name

136

* @return Parameter value or null if not set

137

*/

138

public String getParam(String name);

139

140

/**

141

* Creates handler instance from configuration

142

* @return ContentHandler instance configured with parameters

143

* @throws TikaException if handler cannot be created

144

*/

145

public ContentHandler createHandler() throws TikaException;

146

}

147

```

148

149

### Client-Server Architecture

150

151

#### PipesClient

152

153

Client interface for submitting document processing requests to Tika pipes server.

154

155

```java { .api }

156

/**

157

* Client for submitting document processing requests to Tika pipes server

158

*/

159

public class PipesClient {

160

/**

161

* Creates PipesClient with server URL

162

* @param serverUrl URL of Tika pipes server

163

*/

164

public PipesClient(String serverUrl);

165

166

/**

167

* Creates PipesClient with configuration

168

* @param config Client configuration properties

169

*/

170

public PipesClient(PipesConfig config);

171

172

/**

173

* Submits single document for processing

174

* @param tuple FetchEmitTuple defining processing operation

175

* @return PipesResult containing processing outcome

176

* @throws IOException if communication with server fails

177

* @throws TikaException if processing fails

178

*/

179

public PipesResult process(FetchEmitTuple tuple) throws IOException, TikaException;

180

181

/**

182

* Submits batch of documents for processing

183

* @param tuples List of FetchEmitTuple objects to process

184

* @return List of PipesResult objects with processing outcomes

185

* @throws IOException if communication fails

186

* @throws TikaException if batch processing fails

187

*/

188

public List<PipesResult> processBatch(List<FetchEmitTuple> tuples) throws IOException, TikaException;

189

190

/**

191

* Submits documents for asynchronous processing

192

* @param tuples Documents to process asynchronously

193

* @param callback Callback for processing results

194

* @throws IOException if submission fails

195

*/

196

public void processAsync(List<FetchEmitTuple> tuples, PipesCallback callback) throws IOException;

197

198

/**

199

* Gets server status and capabilities

200

* @return Map containing server status information

201

* @throws IOException if server communication fails

202

*/

203

public Map<String, Object> getServerStatus() throws IOException;

204

205

/**

206

* Checks if server is available and responsive

207

* @return true if server is ready to accept requests

208

*/

209

public boolean isServerAvailable();

210

211

/**

212

* Sets request timeout

213

* @param timeoutMs Timeout in milliseconds

214

*/

215

public void setTimeout(int timeoutMs);

216

217

/**

218

* Gets current request timeout

219

* @return Timeout in milliseconds

220

*/

221

public int getTimeout();

222

223

/**

224

* Closes client and releases resources

225

* @throws IOException if cleanup fails

226

*/

227

public void close() throws IOException;

228

}

229

```

230

231

#### PipesServer

232

233

Server implementation for handling document processing requests with configurable concurrency and resource management.

234

235

```java { .api }

236

/**

237

* Server for handling batch document processing requests

238

*/

239

public class PipesServer {

240

/**

241

* Creates PipesServer with configuration

242

* @param config Server configuration

243

*/

244

public PipesServer(PipesConfig config);

245

246

/**

247

* Starts server and begins accepting requests

248

* @throws IOException if server cannot be started

249

* @throws TikaException if configuration is invalid

250

*/

251

public void start() throws IOException, TikaException;

252

253

/**

254

* Stops server gracefully

255

* @throws IOException if shutdown fails

256

*/

257

public void stop() throws IOException;

258

259

/**

260

* Checks if server is running

261

* @return true if server is started and accepting requests

262

*/

263

public boolean isRunning();

264

265

/**

266

* Gets server port number

267

* @return Port number server is listening on

268

*/

269

public int getPort();

270

271

/**

272

* Gets server configuration

273

* @return PipesConfig used by this server

274

*/

275

public PipesConfig getConfig();

276

277

/**

278

* Gets current server statistics

279

* @return Map containing processing statistics

280

*/

281

public Map<String, Object> getStatistics();

282

283

/**

284

* Processes single document request

285

* @param tuple Document processing request

286

* @return PipesResult with processing outcome

287

* @throws TikaException if processing fails

288

* @throws IOException if I/O error occurs

289

*/

290

public PipesResult process(FetchEmitTuple tuple) throws TikaException, IOException;

291

292

/**

293

* Sets maximum concurrent processing threads

294

* @param maxThreads Maximum number of processing threads

295

*/

296

public void setMaxConcurrentRequests(int maxThreads);

297

298

/**

299

* Gets maximum concurrent requests

300

* @return Maximum number of concurrent processing requests

301

*/

302

public int getMaxConcurrentRequests();

303

}

304

```

305

306

#### PipesResult

307

308

Result object containing outcome of document processing operation with status, metadata, and error information.

309

310

```java { .api }

311

/**

312

* Result of document processing operation in pipes framework

313

*/

314

public class PipesResult {

315

/**

316

* Processing status enumeration

317

*/

318

public enum STATUS {

319

SUCCESS, // Processing completed successfully

320

PARSE_EXCEPTION, // Parse error occurred

321

TIMEOUT, // Processing timed out

322

OOM, // Out of memory error

323

NO_SUCH_FILE, // Input file not found

324

EMIT_EXCEPTION, // Error during emit phase

325

FETCHER_INITIALIZATION_EXCEPTION, // Fetcher setup failed

326

EMITTER_INITIALIZATION_EXCEPTION, // Emitter setup failed

327

INTERRUPTED_EXCEPTION // Processing was interrupted

328

}

329

330

/**

331

* Creates PipesResult with status

332

* @param status Processing status

333

*/

334

public PipesResult(STATUS status);

335

336

/**

337

* Creates PipesResult with status and processing time

338

* @param status Processing status

339

* @param processTimeMs Processing time in milliseconds

340

*/

341

public PipesResult(STATUS status, long processTimeMs);

342

343

/**

344

* Gets processing status

345

* @return STATUS indicating processing outcome

346

*/

347

public STATUS getStatus();

348

349

/**

350

* Gets processing time

351

* @return Processing duration in milliseconds

352

*/

353

public long getProcessTimeMs();

354

355

/**

356

* Gets exception that occurred during processing

357

* @return Exception object or null if no error

358

*/

359

public Exception getException();

360

361

/**

362

* Sets exception information

363

* @param exception Exception that occurred

364

*/

365

public void setException(Exception exception);

366

367

/**

368

* Gets extracted metadata

369

* @return Metadata extracted during processing

370

*/

371

public Metadata getMetadata();

372

373

/**

374

* Sets extracted metadata

375

* @param metadata Metadata from processing

376

*/

377

public void setMetadata(Metadata metadata);

378

379

/**

380

* Gets extracted content

381

* @return Text content extracted from document

382

*/

383

public String getContent();

384

385

/**

386

* Sets extracted content

387

* @param content Text content from processing

388

*/

389

public void setContent(String content);

390

391

/**

392

* Checks if processing was successful

393

* @return true if status is SUCCESS

394

*/

395

public boolean isSuccess();

396

397

/**

398

* Gets error message if processing failed

399

* @return Error message or null if successful

400

*/

401

public String getErrorMessage();

402

403

/**

404

* Gets processing statistics

405

* @return Map containing detailed processing metrics

406

*/

407

public Map<String, Object> getStatistics();

408

}

409

```

410

411

### Configuration Classes

412

413

#### PipesConfig

414

415

Main configuration class for pipes framework with settings for fetchers, emitters, processing, and server options.

416

417

```java { .api }

418

/**

419

* Configuration for Tika pipes batch processing framework

420

*/

421

public class PipesConfig extends PipesConfigBase {

422

/**

423

* Creates default PipesConfig

424

*/

425

public PipesConfig();

426

427

/**

428

* Creates PipesConfig from properties file

429

* @param configFile Properties file containing configuration

430

* @throws IOException if config file cannot be read

431

*/

432

public PipesConfig(File configFile) throws IOException;

433

434

/**

435

* Creates PipesConfig from input stream

436

* @param configStream Stream containing configuration properties

437

* @throws IOException if stream cannot be read

438

*/

439

public PipesConfig(InputStream configStream) throws IOException;

440

441

/**

442

* Gets server port number

443

* @return Port number for pipes server

444

*/

445

public int getServerPort();

446

447

/**

448

* Sets server port number

449

* @param port Port number for server to listen on

450

*/

451

public void setServerPort(int port);

452

453

/**

454

* Gets maximum concurrent processing threads

455

* @return Maximum number of concurrent requests

456

*/

457

public int getMaxConcurrentRequests();

458

459

/**

460

* Sets maximum concurrent processing threads

461

* @param maxThreads Maximum concurrent requests

462

*/

463

public void setMaxConcurrentRequests(int maxThreads);

464

465

/**

466

* Gets processing timeout in milliseconds

467

* @return Timeout for individual document processing

468

*/

469

public long getTimeoutMs();

470

471

/**

472

* Sets processing timeout

473

* @param timeoutMs Timeout in milliseconds

474

*/

475

public void setTimeoutMs(long timeoutMs);

476

477

/**

478

* Gets fetcher manager configuration

479

* @return FetcherManager for document retrieval

480

*/

481

public FetcherManager getFetcherManager();

482

483

/**

484

* Sets fetcher manager

485

* @param fetcherManager Manager for document fetchers

486

*/

487

public void setFetcherManager(FetcherManager fetcherManager);

488

489

/**

490

* Gets emitter manager configuration

491

* @return EmitterManager for document output

492

*/

493

public EmitterManager getEmitterManager();

494

495

/**

496

* Sets emitter manager

497

* @param emitterManager Manager for document emitters

498

*/

499

public void setEmitterManager(EmitterManager emitterManager);

500

501

/**

502

* Gets TikaConfig for parsing configuration

503

* @return TikaConfig with parser and detector settings

504

*/

505

public TikaConfig getTikaConfig();

506

507

/**

508

* Sets TikaConfig for parsing

509

* @param tikaConfig Tika configuration for parsers

510

*/

511

public void setTikaConfig(TikaConfig tikaConfig);

512

}

513

```

514

515

#### PipesConfigBase

516

517

Base configuration class with common settings and initialization patterns.

518

519

```java { .api }

520

/**

521

* Base configuration class for pipes components

522

*/

523

public abstract class PipesConfigBase {

524

/**

525

* Initializes configuration from properties

526

* @param properties Configuration properties

527

* @throws TikaException if initialization fails

528

*/

529

public void initialize(Properties properties) throws TikaException;

530

531

/**

532

* Gets configuration property value

533

* @param key Property key

534

* @return Property value or null if not set

535

*/

536

public String getProperty(String key);

537

538

/**

539

* Gets configuration property with default value

540

* @param key Property key

541

* @param defaultValue Default value if property not set

542

* @return Property value or default if not set

543

*/

544

public String getProperty(String key, String defaultValue);

545

546

/**

547

* Sets configuration property

548

* @param key Property key

549

* @param value Property value

550

*/

551

public void setProperty(String key, String value);

552

553

/**

554

* Gets all configuration properties

555

* @return Properties object with all settings

556

*/

557

public Properties getProperties();

558

559

/**

560

* Validates configuration settings

561

* @throws TikaException if configuration is invalid

562

*/

563

public void validate() throws TikaException;

564

}

565

```

566

567

## Fetcher Framework

568

569

### Fetcher Interface

570

571

Interface for document retrieval implementations supporting various data sources and protocols.

572

573

```java { .api }

574

/**

575

* Interface for fetching documents from various sources

576

*/

577

public interface Fetcher extends Initializable {

578

/**

579

* Fetches document from source

580

* @param fetchKey Key identifying document to fetch

581

* @param metadata Metadata for fetch operation

582

* @return InputStream containing document data

583

* @throws IOException if fetch operation fails

584

* @throws TikaException if fetcher encounters error

585

*/

586

InputStream fetch(String fetchKey, Metadata metadata) throws IOException, TikaException;

587

588

/**

589

* Gets name/identifier for this fetcher

590

* @return String identifying this fetcher instance

591

*/

592

String getName();

593

594

/**

595

* Sets name/identifier for this fetcher

596

* @param name Identifier for this fetcher

597

*/

598

void setName(String name);

599

600

/**

601

* Checks if fetcher supports specific fetch key pattern

602

* @param fetchKey Key to check support for

603

* @return true if this fetcher can handle the key

604

*/

605

boolean supports(String fetchKey);

606

}

607

```

608

609

### AbstractFetcher

610

611

Base implementation providing common fetcher functionality and configuration patterns.

612

613

```java { .api }

614

/**

615

* Abstract base class for fetcher implementations

616

*/

617

public abstract class AbstractFetcher implements Fetcher {

618

/**

619

* Creates AbstractFetcher with default settings

620

*/

621

public AbstractFetcher();

622

623

/**

624

* Gets fetcher name

625

* @return String identifier for this fetcher

626

*/

627

@Override

628

public String getName();

629

630

/**

631

* Sets fetcher name

632

* @param name Identifier for this fetcher

633

*/

634

@Override

635

public void setName(String name);

636

637

/**

638

* Initializes fetcher with configuration parameters

639

* @param params Configuration parameters

640

* @throws TikaConfigException if initialization fails

641

*/

642

@Override

643

public void initialize(Map<String, Param> params) throws TikaConfigException;

644

645

/**

646

* Checks initialization problems

647

* @param handler Problem handler for reporting issues

648

*/

649

@Override

650

public void checkInitialization(InitializableProblemHandler handler);

651

652

/**

653

* Default implementation returns true for all keys

654

* @param fetchKey Key to check support for

655

* @return true (subclasses should override for specific logic)

656

*/

657

@Override

658

public boolean supports(String fetchKey);

659

660

/**

661

* Template method for actual fetch implementation

662

* @param fetchKey Document key to fetch

663

* @param metadata Fetch metadata

664

* @return InputStream with document data

665

* @throws IOException if fetch fails

666

* @throws TikaException if processing error occurs

667

*/

668

@Override

669

public abstract InputStream fetch(String fetchKey, Metadata metadata)

670

throws IOException, TikaException;

671

}

672

```

673

674

### FetcherManager

675

676

Manager for multiple fetcher instances with routing and lifecycle management.

677

678

```java { .api }

679

/**

680

* Manager for multiple fetcher instances with routing capabilities

681

*/

682

public class FetcherManager {

683

/**

684

* Creates FetcherManager with default configuration

685

*/

686

public FetcherManager();

687

688

/**

689

* Creates FetcherManager from configuration

690

* @param config Configuration properties for fetchers

691

* @throws TikaException if configuration is invalid

692

*/

693

public FetcherManager(Properties config) throws TikaException;

694

695

/**

696

* Registers fetcher instance

697

* @param name Fetcher identifier

698

* @param fetcher Fetcher implementation to register

699

*/

700

public void addFetcher(String name, Fetcher fetcher);

701

702

/**

703

* Gets fetcher by name

704

* @param name Fetcher identifier

705

* @return Fetcher instance or null if not found

706

*/

707

public Fetcher getFetcher(String name);

708

709

/**

710

* Gets all registered fetcher names

711

* @return Set of fetcher identifiers

712

*/

713

public Set<String> getFetcherNames();

714

715

/**

716

* Removes fetcher by name

717

* @param name Fetcher identifier to remove

718

* @return Removed fetcher or null if not found

719

*/

720

public Fetcher removeFetcher(String name);

721

722

/**

723

* Fetches document using appropriate fetcher

724

* @param fetcherName Name of fetcher to use

725

* @param fetchKey Document key to fetch

726

* @param metadata Fetch metadata

727

* @return InputStream containing document data

728

* @throws IOException if fetch fails

729

* @throws TikaException if no suitable fetcher found

730

*/

731

public InputStream fetch(String fetcherName, String fetchKey, Metadata metadata)

732

throws IOException, TikaException;

733

734

/**

735

* Initializes all registered fetchers

736

* @throws TikaException if any fetcher initialization fails

737

*/

738

public void initialize() throws TikaException;

739

740

/**

741

* Closes all fetchers and releases resources

742

* @throws IOException if cleanup fails

743

*/

744

public void close() throws IOException;

745

}

746

```

747

748

## Emitter Framework

749

750

### Emitter Interface

751

752

Interface for document output implementations supporting various destinations and formats.

753

754

```java { .api }

755

/**

756

* Interface for emitting processed documents to various destinations

757

*/

758

public interface Emitter extends Initializable {

759

/**

760

* Emits processed document to destination

761

* @param emitKey Key identifying output destination

762

* @param metadata Document metadata

763

* @param outputStream Stream containing processed document data

764

* @throws IOException if emit operation fails

765

* @throws TikaException if emitter encounters error

766

*/

767

void emit(String emitKey, Metadata metadata, OutputStream outputStream)

768

throws IOException, TikaException;

769

770

/**

771

* Emits document content as string

772

* @param emitKey Output destination key

773

* @param metadata Document metadata

774

* @param content Processed document content

775

* @throws IOException if emit fails

776

* @throws TikaException if processing error occurs

777

*/

778

void emit(String emitKey, Metadata metadata, String content)

779

throws IOException, TikaException;

780

781

/**

782

* Gets name/identifier for this emitter

783

* @return String identifying this emitter instance

784

*/

785

String getName();

786

787

/**

788

* Sets name/identifier for this emitter

789

* @param name Identifier for this emitter

790

*/

791

void setName(String name);

792

793

/**

794

* Checks if emitter supports specific emit key pattern

795

* @param emitKey Key to check support for

796

* @return true if this emitter can handle the key

797

*/

798

boolean supports(String emitKey);

799

}

800

```

801

802

### AbstractEmitter

803

804

Base implementation providing common emitter functionality and configuration support.

805

806

```java { .api }

807

/**

808

* Abstract base class for emitter implementations

809

*/

810

public abstract class AbstractEmitter implements Emitter {

811

/**

812

* Creates AbstractEmitter with default settings

813

*/

814

public AbstractEmitter();

815

816

/**

817

* Gets emitter name

818

* @return String identifier for this emitter

819

*/

820

@Override

821

public String getName();

822

823

/**

824

* Sets emitter name

825

* @param name Identifier for this emitter

826

*/

827

@Override

828

public void setName(String name);

829

830

/**

831

* Initializes emitter with configuration parameters

832

* @param params Configuration parameters

833

* @throws TikaConfigException if initialization fails

834

*/

835

@Override

836

public void initialize(Map<String, Param> params) throws TikaConfigException;

837

838

/**

839

* Checks initialization problems

840

* @param handler Problem handler for reporting issues

841

*/

842

@Override

843

public void checkInitialization(InitializableProblemHandler handler);

844

845

/**

846

* Default implementation returns true for all keys

847

* @param emitKey Key to check support for

848

* @return true (subclasses should override for specific logic)

849

*/

850

@Override

851

public boolean supports(String emitKey);

852

853

/**

854

* Default string emit implementation using OutputStream version

855

* @param emitKey Output destination key

856

* @param metadata Document metadata

857

* @param content Document content as string

858

* @throws IOException if emit fails

859

* @throws TikaException if processing error occurs

860

*/

861

@Override

862

public void emit(String emitKey, Metadata metadata, String content)

863

throws IOException, TikaException;

864

865

/**

866

* Template method for actual emit implementation

867

* @param emitKey Output destination key

868

* @param metadata Document metadata

869

* @param outputStream Stream containing document data

870

* @throws IOException if emit fails

871

* @throws TikaException if processing error occurs

872

*/

873

@Override

874

public abstract void emit(String emitKey, Metadata metadata, OutputStream outputStream)

875

throws IOException, TikaException;

876

}

877

```

878

879

### EmitterManager

880

881

Manager for multiple emitter instances with routing and lifecycle management.

882

883

```java { .api }

884

/**

885

* Manager for multiple emitter instances with routing capabilities

886

*/

887

public class EmitterManager {

888

/**

889

* Creates EmitterManager with default configuration

890

*/

891

public EmitterManager();

892

893

/**

894

* Creates EmitterManager from configuration

895

* @param config Configuration properties for emitters

896

* @throws TikaException if configuration is invalid

897

*/

898

public EmitterManager(Properties config) throws TikaException;

899

900

/**

901

* Registers emitter instance

902

* @param name Emitter identifier

903

* @param emitter Emitter implementation to register

904

*/

905

public void addEmitter(String name, Emitter emitter);

906

907

/**

908

* Gets emitter by name

909

* @param name Emitter identifier

910

* @return Emitter instance or null if not found

911

*/

912

public Emitter getEmitter(String name);

913

914

/**

915

* Gets all registered emitter names

916

* @return Set of emitter identifiers

917

*/

918

public Set<String> getEmitterNames();

919

920

/**

921

* Removes emitter by name

922

* @param name Emitter identifier to remove

923

* @return Removed emitter or null if not found

924

*/

925

public Emitter removeEmitter(String name);

926

927

/**

928

* Emits document using appropriate emitter

929

* @param emitterName Name of emitter to use

930

* @param emitKey Output destination key

931

* @param metadata Document metadata

932

* @param outputStream Document data stream

933

* @throws IOException if emit fails

934

* @throws TikaException if no suitable emitter found

935

*/

936

public void emit(String emitterName, String emitKey, Metadata metadata, OutputStream outputStream)

937

throws IOException, TikaException;

938

939

/**

940

* Initializes all registered emitters

941

* @throws TikaException if any emitter initialization fails

942

*/

943

public void initialize() throws TikaException;

944

945

/**

946

* Closes all emitters and releases resources

947

* @throws IOException if cleanup fails

948

*/

949

public void close() throws IOException;

950

}

951

```

952

953

## Usage Examples

954

955

### Basic Pipes Processing

956

957

```java { .api }

958

// Configure and start pipes server

959

PipesConfig config = new PipesConfig();

960

config.setServerPort(9998);

961

config.setMaxConcurrentRequests(10);

962

config.setTimeoutMs(30000);

963

964

PipesServer server = new PipesServer(config);

965

server.start();

966

967

// Create client and submit processing request

968

PipesClient client = new PipesClient("http://localhost:9998");

969

970

FetchEmitTuple tuple = new FetchEmitTuple(

971

"file-fetcher", "/path/to/document.pdf",

972

"text-emitter", "/output/document.txt"

973

);

974

975

try {

976

PipesResult result = client.process(tuple);

977

978

if (result.isSuccess()) {

979

System.out.println("Processing successful");

980

System.out.println("Time: " + result.getProcessTimeMs() + "ms");

981

System.out.println("Content: " + result.getContent());

982

} else {

983

System.err.println("Processing failed: " + result.getErrorMessage());

984

}

985

986

} catch (IOException | TikaException e) {

987

System.err.println("Request failed: " + e.getMessage());

988

989

} finally {

990

client.close();

991

server.stop();

992

}

993

```

994

995

### Batch Document Processing

996

997

```java { .api }

998

// Prepare batch of documents for processing

999

List<FetchEmitTuple> batch = new ArrayList<>();

1000

1001

for (int i = 1; i <= 100; i++) {

1002

FetchEmitTuple tuple = new FetchEmitTuple(

1003

"file-fetcher", "/docs/doc" + i + ".pdf",

1004

"search-emitter", "doc-" + i

1005

);

1006

batch.add(tuple);

1007

}

1008

1009

// Submit batch for processing

1010

PipesClient client = new PipesClient("http://localhost:9998");

1011

1012

try {

1013

List<PipesResult> results = client.processBatch(batch);

1014

1015

int successful = 0;

1016

int failed = 0;

1017

long totalTime = 0;

1018

1019

for (PipesResult result : results) {

1020

totalTime += result.getProcessTimeMs();

1021

1022

if (result.isSuccess()) {

1023

successful++;

1024

} else {

1025

failed++;

1026

System.err.println("Failed: " + result.getErrorMessage());

1027

}

1028

}

1029

1030

System.out.println("Batch completed:");

1031

System.out.println(" Successful: " + successful);

1032

System.out.println(" Failed: " + failed);

1033

System.out.println(" Total time: " + totalTime + "ms");

1034

System.out.println(" Average: " + (totalTime / results.size()) + "ms per doc");

1035

1036

} catch (IOException | TikaException e) {

1037

System.err.println("Batch processing failed: " + e.getMessage());

1038

}

1039

```

1040

1041

### Custom Fetcher Implementation

1042

1043

```java { .api }

1044

// Custom fetcher for database documents

1045

public class DatabaseFetcher extends AbstractFetcher {

1046

1047

private DataSource dataSource;

1048

private String queryTemplate;

1049

1050

@Override

1051

public void initialize(Map<String, Param> params) throws TikaConfigException {

1052

super.initialize(params);

1053

1054

Param dsParam = params.get("dataSource");

1055

if (dsParam != null) {

1056

this.dataSource = (DataSource) dsParam.getValue();

1057

}

1058

1059

Param queryParam = params.get("query");

1060

if (queryParam != null) {

1061

this.queryTemplate = queryParam.getValue().toString();

1062

}

1063

}

1064

1065

@Override

1066

public boolean supports(String fetchKey) {

1067

// Support numeric document IDs

1068

return fetchKey.matches("\\d+");

1069

}

1070

1071

@Override

1072

public InputStream fetch(String fetchKey, Metadata metadata)

1073

throws IOException, TikaException {

1074

1075

try (Connection conn = dataSource.getConnection()) {

1076

String query = queryTemplate.replace("{id}", fetchKey);

1077

1078

try (PreparedStatement stmt = conn.prepareStatement(query);

1079

ResultSet rs = stmt.executeQuery()) {

1080

1081

if (rs.next()) {

1082

byte[] data = rs.getBytes("document_data");

1083

String filename = rs.getString("filename");

1084

String mimeType = rs.getString("mime_type");

1085

1086

// Set metadata from database

1087

metadata.set(Metadata.RESOURCE_NAME_KEY, filename);

1088

metadata.set(Metadata.CONTENT_TYPE, mimeType);

1089

1090

return new ByteArrayInputStream(data);

1091

} else {

1092

throw new TikaException("Document not found: " + fetchKey);

1093

}

1094

}

1095

1096

} catch (SQLException e) {

1097

throw new TikaException("Database error fetching document", e);

1098

}

1099

}

1100

}

1101

```

1102

1103

### Custom Emitter Implementation

1104

1105

```java { .api }

1106

// Custom emitter for search index

1107

public class SearchIndexEmitter extends AbstractEmitter {

1108

1109

private SearchClient searchClient;

1110

private String indexName;

1111

1112

@Override

1113

public void initialize(Map<String, Param> params) throws TikaConfigException {

1114

super.initialize(params);

1115

1116

Param clientParam = params.get("searchClient");

1117

if (clientParam != null) {

1118

this.searchClient = (SearchClient) clientParam.getValue();

1119

}

1120

1121

Param indexParam = params.get("indexName");

1122

if (indexParam != null) {

1123

this.indexName = indexParam.getValue().toString();

1124

}

1125

}

1126

1127

@Override

1128

public void emit(String emitKey, Metadata metadata, String content)

1129

throws IOException, TikaException {

1130

1131

try {

1132

// Create search document

1133

SearchDocument doc = new SearchDocument();

1134

doc.setId(emitKey);

1135

doc.setContent(content);

1136

1137

// Add metadata fields

1138

for (String name : metadata.names()) {

1139

String[] values = metadata.getValues(name);

1140

if (values.length == 1) {

1141

doc.addField(name, values[0]);

1142

} else {

1143

doc.addField(name, Arrays.asList(values));

1144

}

1145

}

1146

1147

// Index document

1148

searchClient.index(indexName, doc);

1149

1150

} catch (Exception e) {

1151

throw new TikaException("Failed to index document: " + emitKey, e);

1152

}

1153

}

1154

1155

@Override

1156

public void emit(String emitKey, Metadata metadata, OutputStream outputStream)

1157

throws IOException, TikaException {

1158

1159

// Convert stream to string and delegate

1160

String content = IOUtils.toString(outputStream, "UTF-8");

1161

emit(emitKey, metadata, content);

1162

}

1163

}

1164

```

1165

1166

### Asynchronous Processing

1167

1168

```java { .api }

1169

// Asynchronous batch processing with callback

1170

public class AsyncProcessor {

1171

1172

public void processDocumentsAsync(List<FetchEmitTuple> documents) throws IOException {

1173

PipesClient client = new PipesClient("http://localhost:9998");

1174

1175

// Create callback for handling results

1176

PipesCallback callback = new PipesCallback() {

1177

@Override

1178

public void onResult(FetchEmitTuple tuple, PipesResult result) {

1179

if (result.isSuccess()) {

1180

System.out.println("Completed: " + tuple.getFetchKey() +

1181

" (" + result.getProcessTimeMs() + "ms)");

1182

} else {

1183

System.err.println("Failed: " + tuple.getFetchKey() +

1184

" - " + result.getErrorMessage());

1185

}

1186

}

1187

1188

@Override

1189

public void onBatchComplete(List<PipesResult> results) {

1190

long totalTime = results.stream()

1191

.mapToLong(PipesResult::getProcessTimeMs)

1192

.sum();

1193

1194

System.out.println("Batch completed in " + totalTime + "ms");

1195

}

1196

1197

@Override

1198

public void onError(Exception error) {

1199

System.err.println("Batch error: " + error.getMessage());

1200

}

1201

};

1202

1203

// Submit for asynchronous processing

1204

client.processAsync(documents, callback);

1205

1206

// Continue with other work while processing happens...

1207

}

1208

}

1209

1210

interface PipesCallback {

1211

void onResult(FetchEmitTuple tuple, PipesResult result);

1212

void onBatchComplete(List<PipesResult> results);

1213

void onError(Exception error);

1214

}

1215

```

1216

1217

### Configuration and Management

1218

1219

```java { .api }

1220

// Complete pipes configuration setup

1221

public class PipesSetup {

1222

1223

public PipesConfig createConfiguration() throws IOException, TikaException {

1224

PipesConfig config = new PipesConfig();

1225

1226

// Server settings

1227

config.setServerPort(9998);

1228

config.setMaxConcurrentRequests(20);

1229

config.setTimeoutMs(60000);

1230

1231

// Setup fetcher manager

1232

FetcherManager fetcherManager = new FetcherManager();

1233

fetcherManager.addFetcher("file", new FileFetcher());

1234

fetcherManager.addFetcher("http", new HttpFetcher());

1235

fetcherManager.addFetcher("s3", new S3Fetcher());

1236

fetcherManager.addFetcher("database", new DatabaseFetcher());

1237

1238

config.setFetcherManager(fetcherManager);

1239

1240

// Setup emitter manager

1241

EmitterManager emitterManager = new EmitterManager();

1242

emitterManager.addEmitter("file", new FileEmitter());

1243

emitterManager.addEmitter("search", new SearchIndexEmitter());

1244

emitterManager.addEmitter("database", new DatabaseEmitter());

1245

1246

config.setEmitterManager(emitterManager);

1247

1248

// Configure Tika parsing

1249

TikaConfig tikaConfig = TikaConfig.getDefaultConfig();

1250

config.setTikaConfig(tikaConfig);

1251

1252

return config;

1253

}

1254

1255

public void startServer(PipesConfig config) throws IOException, TikaException {

1256

PipesServer server = new PipesServer(config);

1257

1258

// Add shutdown hook

1259

Runtime.getRuntime().addShutdownHook(new Thread(() -> {

1260

try {

1261

System.out.println("Shutting down pipes server...");

1262

server.stop();

1263

} catch (IOException e) {

1264

System.err.println("Error during shutdown: " + e.getMessage());

1265

}

1266

}));

1267

1268

server.start();

1269

System.out.println("Pipes server started on port " + server.getPort());

1270

1271

// Monitor server statistics

1272

new Thread(() -> {

1273

while (server.isRunning()) {

1274

try {

1275

Thread.sleep(30000); // Every 30 seconds

1276

1277

Map<String, Object> stats = server.getStatistics();

1278

System.out.println("Server stats: " + stats);

1279

1280

} catch (InterruptedException e) {

1281

Thread.currentThread().interrupt();

1282

break;

1283

}

1284

}

1285

}).start();

1286

}

1287

}

1288

```