Essential CRUD operations for managing individual documents and bulk operations in Elasticsearch, including versioning, routing, refresh control, and existence checking.
Create or update a document in an index with automatic ID generation or explicit ID specification.
/**
* Index a document in Elasticsearch
* @param params - Document indexing parameters
* @returns Promise with indexing result including document metadata
*/
function index<TDocument = Record<string, any>>(params: {
/** Target index name */
index: IndexName;
/** Document ID (optional, auto-generated if not provided) */
id?: Id;
/** Document content to index */
document: TDocument;
/** Control when document changes are visible to search */
refresh?: Refresh;
/** Custom routing value for shard selection */
routing?: Routing;
/** Document version for optimistic concurrency control */
version?: VersionNumber;
/** Version type for version checking */
version_type?: VersionType;
/** Operation type: 'index' or 'create' */
op_type?: OpType;
/** Ingest pipeline to execute */
pipeline?: string;
/** Request timeout */
timeout?: Duration;
/** Wait for active shards before proceeding */
wait_for_active_shards?: WaitForActiveShards;
/** Require document alias for write operations */
require_alias?: boolean;
/** Require data stream target */
require_data_stream?: boolean;
}): Promise<TransportResult<IndexResponse, unknown>>;
interface IndexResponse {
_index: IndexName;
_id: Id;
_version: VersionNumber;
result: 'created' | 'updated' | 'noop';
_shards: ShardStatistics;
_seq_no: SequenceNumber;
_primary_term: number;
}Usage Examples:
// Index with auto-generated ID
const response = await client.index({
index: "my-index",
document: {
title: "Sample Document",
content: "This is a test document",
timestamp: new Date().toISOString()
}
});
// Index with specific ID
await client.index({
index: "products",
id: "product-123",
document: {
name: "Laptop",
price: 999.99,
category: "electronics"
},
refresh: "wait_for"
});
// Index with routing and pipeline
await client.index({
index: "user-logs",
id: "log-456",
document: { user_id: "user123", action: "login" },
routing: "user123",
pipeline: "log-processing-pipeline"
});Create a new document, failing if a document with the same ID already exists.
/**
* Create a new document (fails if document already exists)
* @param params - Document creation parameters
* @returns Promise with creation result
*/
function create<TDocument = Record<string, any>>(params: {
/** Target index name */
index: IndexName;
/** Document ID (required for create operation) */
id: Id;
/** Document content to create */
document: TDocument;
/** Control when document changes are visible to search */
refresh?: Refresh;
/** Custom routing value for shard selection */
routing?: Routing;
/** Ingest pipeline to execute */
pipeline?: string;
/** Request timeout */
timeout?: Duration;
/** Wait for active shards before proceeding */
wait_for_active_shards?: WaitForActiveShards;
/** Require data stream target */
require_data_stream?: boolean;
}): Promise<TransportResult<CreateResponse, unknown>>;
interface CreateResponse {
_index: IndexName;
_id: Id;
_version: VersionNumber;
result: 'created';
_shards: ShardStatistics;
_seq_no: SequenceNumber;
_primary_term: number;
}Usage Examples:
// Create new document
await client.create({
index: "users",
id: "user-789",
document: {
username: "john_doe",
email: "john@example.com",
created_at: new Date().toISOString()
}
});Retrieve a document by its ID with optional source filtering and routing.
/**
* Retrieve a document by ID
* @param params - Document retrieval parameters
* @returns Promise with document data
*/
function get<TDocument = Record<string, any>>(params: {
/** Source index name */
index: IndexName;
/** Document ID */
id: Id;
/** Include or exclude source fields */
_source?: boolean | Fields;
/** Exclude specific source fields */
_source_excludes?: Fields;
/** Include only specific source fields */
_source_includes?: Fields;
/** Custom routing value */
routing?: Routing;
/** Preference for which shard replicas to query */
preference?: string;
/** Realtime retrieval (bypass refresh) */
realtime?: boolean;
/** Refresh before retrieving */
refresh?: boolean;
/** Document version for retrieval */
version?: VersionNumber;
/** Version type for version checking */
version_type?: VersionType;
/** Return stored fields */
stored_fields?: Fields;
}): Promise<TransportResult<GetResponse<TDocument>, unknown>>;
interface GetResponse<TDocument = Record<string, any>> {
_index: IndexName;
_id: Id;
_version: VersionNumber;
_seq_no: SequenceNumber;
_primary_term: number;
found: boolean;
_source?: TDocument;
_routing?: Routing;
fields?: Record<string, any>;
}Usage Examples:
// Get full document
const response = await client.get({
index: "products",
id: "product-123"
});
if (response.body.found) {
console.log("Product:", response.body._source);
}
// Get document with source filtering
const response = await client.get({
index: "users",
id: "user-456",
_source: ["name", "email"], // only include these fields
_source_excludes: ["password"] // exclude sensitive fields
});
// Get with routing
const response = await client.get({
index: "user-logs",
id: "log-789",
routing: "user123"
});Update an existing document with partial updates or scripts.
/**
* Update an existing document
* @param params - Document update parameters
* @returns Promise with update result
*/
function update<TDocument = Record<string, any>, TPartialDocument = Partial<TDocument>>(params: {
/** Target index name */
index: IndexName;
/** Document ID */
id: Id;
/** Partial document for merging */
doc?: TPartialDocument;
/** Script for programmatic updates */
script?: Script;
/** Upsert document if not exists */
upsert?: TDocument;
/** Control when changes are visible to search */
refresh?: Refresh;
/** Custom routing value */
routing?: Routing;
/** Retry on version conflict */
retry_on_conflict?: number;
/** Request timeout */
timeout?: Duration;
/** Wait for active shards */
wait_for_active_shards?: WaitForActiveShards;
/** Version for optimistic concurrency control */
if_seq_no?: SequenceNumber;
/** Primary term for optimistic concurrency control */
if_primary_term?: number;
/** Return updated document */
_source?: boolean | Fields;
/** Detection of noop updates */
detect_noop?: boolean;
}): Promise<TransportResult<UpdateResponse<TDocument>, unknown>>;
interface UpdateResponse<TDocument = Record<string, any>> {
_index: IndexName;
_id: Id;
_version: VersionNumber;
result: 'updated' | 'noop';
_shards: ShardStatistics;
_seq_no: SequenceNumber;
_primary_term: number;
get?: GetResponse<TDocument>;
}
interface Script {
/** Script source code */
source: string;
/** Script language (default: painless) */
lang?: string;
/** Script parameters */
params?: Record<string, any>;
}Usage Examples:
// Partial document update
await client.update({
index: "products",
id: "product-123",
doc: {
price: 899.99,
last_updated: new Date().toISOString()
}
});
// Script-based update
await client.update({
index: "products",
id: "product-123",
script: {
source: "ctx._source.views += params.increment",
params: { increment: 1 }
}
});
// Update with upsert
await client.update({
index: "counters",
id: "page-views",
script: {
source: "ctx._source.count += 1"
},
upsert: { count: 1 }
});Remove a document from an index by ID.
/**
* Delete a document by ID
* @param params - Document deletion parameters
* @returns Promise with deletion result
*/
function delete(params: {
/** Source index name */
index: IndexName;
/** Document ID */
id: Id;
/** Control when changes are visible to search */
refresh?: Refresh;
/** Custom routing value */
routing?: Routing;
/** Request timeout */
timeout?: Duration;
/** Version for optimistic concurrency control */
version?: VersionNumber;
/** Version type for version checking */
version_type?: VersionType;
/** Wait for active shards */
wait_for_active_shards?: WaitForActiveShards;
}): Promise<TransportResult<DeleteResponse, unknown>>;
interface DeleteResponse {
_index: IndexName;
_id: Id;
_version: VersionNumber;
result: 'deleted' | 'not_found';
_shards: ShardStatistics;
_seq_no: SequenceNumber;
_primary_term: number;
}Usage Examples:
// Delete document
const response = await client.delete({
index: "products",
id: "product-123"
});
if (response.body.result === 'deleted') {
console.log("Document deleted successfully");
}
// Delete with routing
await client.delete({
index: "user-logs",
id: "log-456",
routing: "user123"
});Updates documents that match a specified query without retrieving them first.
/**
* Update documents matching a query
* @param params - Update by query parameters
* @returns Promise with update operation results
*/
function updateByQuery(params: {
/** Target index name or pattern */
index: Indices;
/** Query to match documents for updating */
query?: QueryDslQueryContainer;
/** Script to apply to matching documents */
script?: Script;
/** Maximum number of documents to update */
max_docs?: number;
/** How to handle version conflicts */
conflicts?: Conflicts;
/** Control when changes are visible to search */
refresh?: boolean;
/** Throttling rate for the operation */
requests_per_second?: number;
/** Scroll context keep-alive time */
scroll?: Duration;
/** Number of parallel slices */
slices?: Slices;
/** Request timeout */
timeout?: Duration;
/** Wait for completion or return task ID */
wait_for_completion?: boolean;
/** Wait for active shards */
wait_for_active_shards?: WaitForActiveShards;
}): Promise<TransportResult<UpdateByQueryResponse, unknown>>;
interface UpdateByQueryResponse {
took: number;
timed_out: boolean;
total: number;
updated: number;
deleted: number;
batches: number;
version_conflicts: number;
noops: number;
retries: {
bulk: number;
search: number;
};
throttled_millis: number;
requests_per_second: number;
throttled_until_millis: number;
failures: BulkIndexByScrollFailure[];
}Usage Examples:
// Update all documents in an index
await client.updateByQuery({
index: "my-index",
script: {
source: "ctx._source.last_updated = params.timestamp",
params: { timestamp: new Date().toISOString() }
},
refresh: true
});
// Update documents matching a query
await client.updateByQuery({
index: "products",
query: { term: { category: "electronics" } },
script: {
source: "ctx._source.price *= params.discount",
params: { discount: 0.9 }
},
conflicts: "proceed",
requests_per_second: 100
});Deletes documents that match a specified query in bulk.
/**
* Delete documents matching a query
* @param params - Delete by query parameters
* @returns Promise with delete operation results
*/
function deleteByQuery(params: {
/** Target index name or pattern */
index: Indices;
/** Query to match documents for deletion */
query?: QueryDslQueryContainer;
/** Maximum number of documents to delete */
max_docs?: number;
/** How to handle version conflicts */
conflicts?: Conflicts;
/** Control when changes are visible to search */
refresh?: boolean;
/** Throttling rate for the operation */
requests_per_second?: number;
/** Scroll context keep-alive time */
scroll?: Duration;
/** Number of parallel slices */
slices?: Slices;
/** Request timeout */
timeout?: Duration;
/** Wait for completion or return task ID */
wait_for_completion?: boolean;
/** Wait for active shards */
wait_for_active_shards?: WaitForActiveShards;
}): Promise<TransportResult<DeleteByQueryResponse, unknown>>;
interface DeleteByQueryResponse {
took: number;
timed_out: boolean;
total: number;
deleted: number;
batches: number;
version_conflicts: number;
noops: number;
retries: {
bulk: number;
search: number;
};
throttled_millis: number;
requests_per_second: number;
throttled_until_millis: number;
failures: BulkIndexByScrollFailure[];
}Usage Examples:
// Delete documents older than 30 days
await client.deleteByQuery({
index: "logs-*",
query: {
range: {
timestamp: {
lt: "now-30d"
}
}
},
refresh: true
});
// Delete with throttling and conflict handling
await client.deleteByQuery({
index: "temp-data",
query: { match_all: {} },
conflicts: "proceed",
requests_per_second: 50,
wait_for_completion: false
});Copies documents from a source index to a destination index with optional transformation.
/**
* Copy documents from source to destination
* @param params - Reindex operation parameters
* @returns Promise with reindex operation results
*/
function reindex(params: {
/** Source configuration */
source: {
/** Source index name or pattern */
index: Indices;
/** Query to filter source documents */
query?: QueryDslQueryContainer;
/** Remote cluster configuration */
remote?: {
host: string;
username?: string;
password?: string;
socket_timeout?: Duration;
connect_timeout?: Duration;
};
/** Source fields to copy */
_source?: Fields | boolean;
/** Sort order for source documents */
sort?: SortCombinations[];
};
/** Destination configuration */
dest: {
/** Destination index name */
index: IndexName;
/** Document routing */
routing?: Routing;
/** Operation type */
op_type?: OpType;
/** Version type */
version_type?: VersionType;
/** Pipeline to apply to documents */
pipeline?: string;
};
/** Script to transform documents */
script?: Script;
/** Maximum number of documents to reindex */
max_docs?: number;
/** How to handle version conflicts */
conflicts?: Conflicts;
/** Control when changes are visible to search */
refresh?: boolean;
/** Throttling rate for the operation */
requests_per_second?: number;
/** Number of parallel slices */
slices?: Slices;
/** Request timeout */
timeout?: Duration;
/** Wait for completion or return task ID */
wait_for_completion?: boolean;
/** Wait for active shards */
wait_for_active_shards?: WaitForActiveShards;
/** Require alias for destination writes */
require_alias?: boolean;
}): Promise<TransportResult<ReindexResponse, unknown>>;
interface ReindexResponse {
took: number;
timed_out: boolean;
total: number;
created: number;
updated: number;
deleted: number;
batches: number;
version_conflicts: number;
noops: number;
retries: {
bulk: number;
search: number;
};
throttled_millis: number;
requests_per_second: number;
throttled_until_millis: number;
failures: BulkIndexByScrollFailure[];
}Usage Examples:
// Simple reindex
await client.reindex({
source: { index: "old-index" },
dest: { index: "new-index" },
refresh: true
});
// Reindex with transformation
await client.reindex({
source: {
index: "source-logs",
query: { range: { timestamp: { gte: "2023-01-01" } } }
},
dest: {
index: "processed-logs",
pipeline: "log-enrichment"
},
script: {
source: "ctx._source.processed_at = System.currentTimeMillis()"
},
max_docs: 10000,
requests_per_second: 200
});
// Reindex from remote cluster
await client.reindex({
source: {
remote: {
host: "https://remote-cluster:9200",
username: "elastic",
password: "password"
},
index: "remote-index"
},
dest: { index: "local-copy" },
conflicts: "proceed"
});Perform multiple index, create, update, or delete operations in a single request.
/**
* Perform multiple document operations in a single request
* @param params - Bulk operation parameters
* @returns Promise with bulk operation results
*/
function bulk<TDocument = Record<string, any>, TPartialDocument = Partial<TDocument>>(params: {
/** Default index for operations without index specified */
index?: IndexName;
/** Array of operation definitions and documents (optional union type) */
operations?: (BulkOperationContainer | BulkUpdateAction<TDocument, TPartialDocument> | TDocument)[];
/** Control when changes are visible to search */
refresh?: Refresh;
/** Default routing value */
routing?: Routing;
/** Request timeout */
timeout?: Duration;
/** Wait for active shards */
wait_for_active_shards?: WaitForActiveShards;
/** Default pipeline for operations */
pipeline?: string;
/** Require alias for write operations */
require_alias?: boolean;
/** Include source in error responses */
include_source_on_error?: boolean;
/** List executed pipelines in response */
list_executed_pipelines?: boolean;
/** Source field control */
_source?: SearchSourceConfigParam;
/** Exclude source fields */
_source_excludes?: Fields;
/** Include specific source fields */
_source_includes?: Fields;
/** Require data stream target */
require_data_stream?: boolean;
}): Promise<TransportResult<BulkResponse, unknown>>;
interface BulkOperationContainer {
/** Index operation: create or update document */
index?: BulkIndexOperation;
/** Create operation: create document (fail if exists) */
create?: BulkCreateOperation;
/** Update operation: partial document update */
update?: BulkUpdateOperation;
/** Delete operation: remove document */
delete?: BulkDeleteOperation;
}
interface BulkIndexOperation {
_index?: IndexName;
_id?: Id;
_routing?: Routing;
version?: VersionNumber;
version_type?: VersionType;
}
interface BulkResponse {
took: number;
errors: boolean;
items: BulkResponseItem[];
}
interface BulkResponseItem {
index?: BulkResponseItemBase;
create?: BulkResponseItemBase;
update?: BulkResponseItemBase;
delete?: BulkResponseItemBase;
}Usage Examples:
// Bulk index operations
await client.bulk({
operations: [
{ index: { _index: "products", _id: "1" } },
{ name: "Product 1", price: 10.99 },
{ index: { _index: "products", _id: "2" } },
{ name: "Product 2", price: 15.99 },
{ delete: { _index: "products", _id: "old-product" } }
]
});
// Mixed bulk operations
await client.bulk({
operations: [
{ create: { _index: "users", _id: "user1" } },
{ name: "John", email: "john@example.com" },
{ update: { _index: "users", _id: "user2" } },
{ doc: { last_login: new Date().toISOString() } },
{ delete: { _index: "users", _id: "inactive-user" } }
],
refresh: "wait_for"
});Retrieve multiple documents by their IDs in a single request.
/**
* Retrieve multiple documents by their IDs
* @param params - Multi-get parameters
* @returns Promise with multiple document results
*/
function mget<TDocument = Record<string, any>>(params: {
/** Default index name */
index?: IndexName;
/** Array of document specifications */
docs: MgetOperation[];
/** Include source fields */
_source?: boolean | Fields;
/** Exclude source fields */
_source_excludes?: Fields;
/** Include only specific source fields */
_source_includes?: Fields;
/** Stored fields to return */
stored_fields?: Fields;
/** Query preference */
preference?: string;
/** Realtime retrieval */
realtime?: boolean;
/** Refresh before retrieval */
refresh?: boolean;
/** Default routing */
routing?: Routing;
}): Promise<TransportResult<MgetResponse<TDocument>, unknown>>;
interface MgetOperation {
_index?: IndexName;
_id: Id;
_source?: boolean | Fields;
_routing?: Routing;
stored_fields?: Fields;
}
interface MgetResponse<TDocument = Record<string, any>> {
docs: Array<GetResponse<TDocument> | MgetResponseItem<TDocument>>;
}Usage Examples:
// Multi-get from same index
const response = await client.mget({
index: "products",
docs: [
{ _id: "product-1" },
{ _id: "product-2" },
{ _id: "product-3" }
]
});
// Multi-get from different indices
const response = await client.mget({
docs: [
{ _index: "products", _id: "product-1" },
{ _index: "users", _id: "user-1", _routing: "user-group-1" },
{ _index: "orders", _id: "order-1", _source: ["status", "total"] }
]
});Check if documents exist without retrieving their content.
/**
* Check if a document exists
* @param params - Document existence check parameters
* @returns Promise with boolean result
*/
function exists(params: {
/** Source index name */
index: IndexName;
/** Document ID */
id: Id;
/** Custom routing value */
routing?: Routing;
/** Query preference */
preference?: string;
/** Realtime check */
realtime?: boolean;
/** Refresh before check */
refresh?: boolean;
/** Document version */
version?: VersionNumber;
/** Version type */
version_type?: VersionType;
}): Promise<boolean>;
/**
* Check if document source exists
* @param params - Document source existence parameters
* @returns Promise with boolean result
*/
function existsSource(params: {
/** Source index name */
index: IndexName;
/** Document ID */
id: Id;
/** Source fields to check */
_source_excludes?: Fields;
/** Source fields to include */
_source_includes?: Fields;
/** Custom routing value */
routing?: Routing;
/** Query preference */
preference?: string;
/** Realtime check */
realtime?: boolean;
/** Refresh before check */
refresh?: boolean;
/** Document version */
version?: VersionNumber;
/** Version type */
version_type?: VersionType;
}): Promise<boolean>;Usage Examples:
// Check document existence
const exists = await client.exists({
index: "products",
id: "product-123"
});
if (exists) {
console.log("Document exists");
}
// Check source existence
const sourceExists = await client.existsSource({
index: "users",
id: "user-456",
routing: "user-group-1"
});Retrieve only the source content of a document without metadata.
/**
* Get document source only (without metadata)
* @param params - Source retrieval parameters
* @returns Promise with document source
*/
function getSource<TDocument = Record<string, any>>(params: {
/** Source index name */
index: IndexName;
/** Document ID */
id: Id;
/** Include source fields */
_source_excludes?: Fields;
/** Exclude source fields */
_source_includes?: Fields;
/** Custom routing value */
routing?: Routing;
/** Query preference */
preference?: string;
/** Realtime retrieval */
realtime?: boolean;
/** Refresh before retrieval */
refresh?: boolean;
/** Document version */
version?: VersionNumber;
/** Version type */
version_type?: VersionType;
}): Promise<TransportResult<TDocument, unknown>>;Usage Examples:
// Get source only
const source = await client.getSource({
index: "products",
id: "product-123"
});
console.log("Product data:", source.body);
// Get source with field filtering
const filteredSource = await client.getSource({
index: "users",
id: "user-456",
_source_includes: ["name", "email"],
_source_excludes: ["password", "internal_data"]
});type IndexName = string;
type Id = string | number;
type Routing = string;
type Fields = string | string[];
type Refresh = boolean | 'wait_for' | 'false' | 'true';
type Duration = string;
type VersionNumber = number;
type VersionType = 'internal' | 'external' | 'external_gte' | 'force';
type OpType = 'index' | 'create';
type WaitForActiveShards = number | 'all';
type SequenceNumber = number;
interface ShardStatistics {
total: number;
successful: number;
failed: number;
skipped?: number;
}