High-level utilities for common Elasticsearch operations including simplified search, bulk operations, scrolling, and specialized data processing helpers that provide more convenient APIs than the raw client methods.
Execute search queries and return only document data without Elasticsearch metadata.
/**
* Simplified search returning only documents
* @param params - Standard search parameters
* @param options - Transport request options
* @returns Promise with array of documents including IDs
*/
search<TDocument>(
params: SearchRequest,
options?: TransportRequestOptions
): Promise<Array<TDocument & { _id: Id }>>;
interface SearchRequest {
index?: Indices;
query?: QueryDslQueryContainer;
sort?: SortCombinations[];
_source?: boolean | Fields;
from?: number;
size?: number;
// ... other search parameters
}Usage Examples:
// Get documents directly without response metadata
const products = await client.helpers.search<Product>({
index: "products",
query: {
match: { category: "electronics" }
},
size: 100
});
// Each item has document data plus _id
products.forEach(product => {
console.log(`Product ${product._id}: ${product.name} - $${product.price}`);
});Iterate through large result sets using async iteration pattern.
/**
* Scroll through search results with async iteration
* @param params - Search parameters with scroll configuration
* @param options - Scroll-specific options
* @returns AsyncIterable yielding scroll response batches
*/
scrollSearch<TDocument>(
params: SearchRequest,
options?: ScrollSearchOptions
): AsyncIterable<ScrollSearchResponse<TDocument>>;
interface ScrollSearchOptions extends TransportRequestOptions {
/** Wait time between scroll requests */
wait?: number;
/** Scroll context duration */
scroll?: Duration;
}
interface ScrollSearchResponse<TDocument> extends TransportResult<SearchResponse<TDocument>, unknown> {
/** Clear scroll context manually */
clear: () => Promise<void>;
/** Documents from current batch */
documents: TDocument[];
}Usage Examples:
// Iterate through all documents
for await (const response of client.helpers.scrollSearch<LogEntry>({
index: "logs-*",
query: { range: { timestamp: { gte: "now-1d" } } },
sort: [{ timestamp: "desc" }],
scroll: "1m"
})) {
console.log(`Processing ${response.documents.length} log entries`);
response.documents.forEach(log => {
processLogEntry(log);
});
// Optional early termination
if (shouldStop()) {
await response.clear();
break;
}
}Iterate through documents without response metadata for maximum simplicity.
/**
* Scroll through documents only (no response metadata)
* @param params - Search parameters
* @param options - Scroll options
* @returns AsyncIterable yielding individual documents
*/
scrollDocuments<TDocument>(
params: SearchRequest,
options?: ScrollSearchOptions
): AsyncIterable<TDocument>;Usage Examples:
// Process individual documents
for await (const document of client.helpers.scrollDocuments<User>({
index: "users",
query: { term: { status: "active" } }
})) {
await updateUserStatus(document);
}Perform bulk operations with automatic batching, retry logic, and progress tracking.
/**
* Bulk operations helper with advanced features
* @param params - Bulk helper configuration
* @param options - Transport request options
* @returns BulkHelper promise with operation management
*/
bulk<TDocument>(
params: BulkHelperOptions<TDocument>,
options?: TransportRequestOptions
): BulkHelper<TDocument>;
interface BulkHelperOptions<TDocument> {
/** Data source (array, async iterable, or readable stream) */
datasource: TDocument[] | AsyncIterable<TDocument> | Readable;
/** Transform each document into bulk operation */
onDocument: (doc: TDocument) => BulkOperationContainer | BulkOperationContainer[];
/** Operations per bulk request */
operations?: number;
/** Flush interval in milliseconds */
flushInterval?: number;
/** Maximum concurrent bulk requests */
concurrency?: number;
/** Retry attempts for failed operations */
retries?: number;
/** Wait time between retries */
wait?: number;
/** Callback for successful operations */
onDrop?: (doc: TDocument) => void;
/** Callback for failed operations */
onError?: (error: BulkError<TDocument>) => void;
/** Enable operation statistics */
enableStats?: boolean;
/** Refresh policy */
refresh?: Refresh;
/** Default index */
index?: IndexName;
/** Default routing */
routing?: Routing;
/** Pipeline */
pipeline?: string;
}
interface BulkHelper<TDocument> extends Promise<BulkStats> {
/** Abort bulk operations */
abort: () => BulkHelper<TDocument>;
/** Current operation statistics */
stats: BulkStats;
}
interface BulkStats {
total: number;
failed: number;
retry: number;
successful: number;
noop: number;
time: number;
bytes: number;
aborted: boolean;
}
interface BulkError<TDocument> {
status: number;
error: any;
operation: BulkOperationContainer;
document: TDocument;
}Usage Examples:
// Bulk index documents with progress tracking
const bulkResponse = client.helpers.bulk({
datasource: products, // array of products
operations: 1000, // batch size
flushInterval: 5000, // flush every 5 seconds
concurrency: 5, // 5 concurrent requests
onDocument: (product) => ({
index: { _index: "products", _id: product.id }
}),
onError: (error) => {
console.error("Failed to index product:", error.document.id);
}
});
// Wait for completion and get stats
const stats = await bulkResponse;
console.log(`Indexed ${stats.successful} products, ${stats.failed} failed`);
// Stream-based bulk operations
const stream = fs.createReadStream("large-dataset.jsonl")
.pipe(split()) // split by lines
.pipe(through2.obj((chunk, enc, callback) => {
callback(null, JSON.parse(chunk));
}));
const bulkOp = client.helpers.bulk({
datasource: stream,
operations: 5000,
onDocument: (doc) => [
{ index: { _index: "dataset", _id: doc.id } },
doc
]
});
// Monitor progress
setInterval(() => {
console.log(`Progress: ${bulkOp.stats.successful} completed`);
}, 1000);Execute multiple searches with automatic batching and concurrency control.
/**
* Multi-search helper with batching and concurrency
* @param params - Multi-search helper options
* @param options - Transport request options
* @returns MsearchHelper promise with search management
*/
msearch(
params: MsearchHelperOptions,
options?: TransportRequestOptions
): MsearchHelper;
interface MsearchHelperOptions extends MsearchRequest {
/** Searches per multi-search request */
operations?: number;
/** Flush interval in milliseconds */
flushInterval?: number;
/** Maximum concurrent multi-search requests */
concurrency?: number;
/** Retry attempts for failed searches */
retries?: number;
/** Wait time between retries */
wait?: number;
}
interface MsearchHelper extends Promise<void> {
/** Stop accepting new searches */
stop: (error?: Error | null) => void;
/** Add search to queue */
search: <TDocument = unknown>(
header: MsearchMultisearchHeader,
body: SearchRequest
) => Promise<MsearchHelperResponse<TDocument>>;
}
interface MsearchHelperResponse<TDocument> {
body: SearchResponse<TDocument>;
documents: TDocument[];
status: number;
}Usage Examples:
// Multi-search with queuing
const msearchHelper = client.helpers.msearch({
operations: 10, // batch 10 searches
concurrency: 3,
flushInterval: 1000
});
// Queue multiple searches
const promises = [];
for (const userId of userIds) {
promises.push(
msearchHelper.search(
{ index: "user-data", routing: userId },
{ query: { term: { user_id: userId } } }
)
);
}
// Wait for all searches and stop helper
const results = await Promise.all(promises);
msearchHelper.stop();
// Process results
results.forEach((result, index) => {
console.log(`User ${userIds[index]}: ${result.documents.length} documents`);
});Execute ES|QL queries with format conversion utilities.
/**
* ES|QL query helper with format conversion
* @param params - ES|QL query parameters
* @param options - Transport request options
* @returns EsqlHelper with format conversion methods
*/
esql(
params: EsqlQueryRequest,
options?: TransportRequestOptions
): EsqlHelper;
interface EsqlQueryRequest {
/** ES|QL query string */
query: string;
/** Query parameters */
params?: Record<string, any>;
/** Response format */
format?: 'csv' | 'json' | 'tsv' | 'txt' | 'yaml' | 'cbor' | 'smile';
/** Locale for formatting */
locale?: string;
/** Field delimiter for CSV/TSV */
delimiter?: string;
/** Time zone for date formatting */
time_zone?: string;
}
interface EsqlHelper {
/** Convert to record format - pivots results into array of row objects */
toRecords<TDocument = Record<string, any>>(): Promise<EsqlToRecords<TDocument>>;
/** Convert to Apache Arrow Table format for advanced data processing */
toArrowTable(): Promise<Table<TypeMap>>;
/** Convert to Apache Arrow streaming reader for large datasets */
toArrowReader(): Promise<AsyncRecordBatchStreamReader>;
}
interface EsqlToRecords<TDocument = Record<string, any>> {
records: TDocument[];
columns: EsqlColumn[];
}
interface EsqlColumn {
name: string;
type: string;
}Usage Examples:
// ES|QL query with record conversion
const result = await client.helpers.esql({
query: `
FROM products
| WHERE category == "electronics"
| STATS avg_price = AVG(price), count = COUNT(*) BY brand
| SORT avg_price DESC
`
}).toRecords<{brand: string, avg_price: number, count: number}>();
result.records.forEach(row => {
console.log(`${row.brand}: avg $${row.avg_price}, ${row.count} products`);
});
// ES|QL with parameters
const salesData = await client.helpers.esql({
query: `
FROM sales
| WHERE date >= ?start_date AND date <= ?end_date
| STATS total_sales = SUM(amount) BY DATE_TRUNC(?interval, date)
`,
params: {
start_date: "2024-01-01",
end_date: "2024-12-31",
interval: "1 month"
}
}).toRecords();
// Use Apache Arrow for high-performance analytics
const arrowTable = await client.helpers.esql({
query: "FROM metrics | STATS avg(cpu), max(memory) BY host"
}).toArrowTable();
// Process data with Arrow
const avgCpuColumn = arrowTable.getChild('avg(cpu)');
console.log(`Min CPU: ${avgCpuColumn.min()}, Max CPU: ${avgCpuColumn.max()}`);
// Stream large datasets with Arrow reader
const reader = await client.helpers.esql({
query: "FROM logs | WHERE timestamp > NOW() - 1d"
}).toArrowReader();
for await (const batch of reader) {
console.log(`Processing batch of ${batch.numRows} rows`);
// Process batch data
}Configure helper behavior and performance characteristics.
/**
* Helper function configuration options
*/
interface HelpersOptions {
/** Client instance to use */
client: Client;
/** Meta header for telemetry */
metaHeader: string | null;
/** Maximum retry attempts */
maxRetries: number;
}Usage Examples:
// Access helper configuration
console.log("Max retries:", client.helpers.maxRetries);
// Helper operations inherit client configuration
const client = new Client({
node: "https://localhost:9200",
maxRetries: 5,
requestTimeout: 30000
});
// All helper operations use these settings
const documents = await client.helpers.search({
index: "my-index",
query: { match_all: {} }
});import { errors } from "@elastic/elasticsearch";
// Bulk operation error handling
const bulkResponse = client.helpers.bulk({
datasource: documents,
onDocument: (doc) => ({ index: { _index: "test", _id: doc.id } }),
onError: (error) => {
if (error.status === 429) {
// Rate limited - could implement backoff
console.log("Rate limited, will retry");
} else {
// Log permanent failures
console.error("Permanent error:", error.error);
}
}
});
// Scroll operation error handling
try {
for await (const response of client.helpers.scrollSearch({
index: "large-index"
})) {
// Process documents
}
} catch (error) {
if (error instanceof errors.ResponseError) {
console.log("Search error:", error.body);
}
}type Id = string | number;
type Indices = string | string[];
type Fields = string | string[];
type Duration = string;
type Refresh = boolean | 'wait_for' | 'false' | 'true';
type IndexName = string;
type Routing = string;
interface TransportRequestOptions {
ignore?: number[];
requestTimeout?: number;
maxRetries?: number;
compression?: boolean;
headers?: Record<string, string>;
querystring?: Record<string, any>;
// ... other transport options
}
interface BulkOperationContainer {
index?: BulkIndexOperation;
create?: BulkCreateOperation;
update?: BulkUpdateOperation;
delete?: BulkDeleteOperation;
}