FlagEmbedding - BGE: One-Stop Retrieval Toolkit For Search and RAG
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Abstract base classes that define the core interface contracts for embedders and rerankers. These classes provide multi-device support, consistent API patterns, and form the foundation for all concrete implementations in FlagEmbedding.
Base class for all embedding models providing a unified interface for encoding text into vector representations. Supports multi-device processing and consistent batch handling across different model architectures.
from typing import Union
class AbsEmbedder:
def __init__(
self,
model_name_or_path: str,
normalize_embeddings: bool = True,
use_fp16: bool = True,
query_instruction_for_retrieval: Optional[str] = None,
query_instruction_format: str = "{}{}",
devices: Optional[Union[str, List[str]]] = None,
batch_size: int = 256,
query_max_length: int = 512,
passage_max_length: int = 512,
convert_to_numpy: bool = True,
**kwargs
):
"""
Initialize abstract embedder base class.
Args:
model_name_or_path: Path to model or HuggingFace model name
normalize_embeddings: Whether to normalize output embeddings
use_fp16: Use half precision for inference
query_instruction_for_retrieval: Instruction prepended to queries
query_instruction_format: Format string for instructions
devices: List of devices for multi-GPU inference
batch_size: Default batch size for encoding
query_max_length: Maximum query token length
passage_max_length: Maximum passage token length
convert_to_numpy: Convert outputs to numpy arrays
**kwargs: Additional model-specific parameters
"""
def encode_queries(
self,
queries: Union[str, List[str]],
batch_size: Optional[int] = None,
max_length: Optional[int] = None,
convert_to_numpy: Optional[bool] = None,
**kwargs
) -> Union[torch.Tensor, np.ndarray]:
"""
Encode queries for retrieval tasks.
Args:
queries: Single query string or list of query strings
batch_size: Batch size for processing (overrides default)
max_length: Maximum sequence length (overrides query_max_length)
convert_to_numpy: Convert output to numpy (overrides default)
**kwargs: Additional encoding parameters
Returns:
Query embeddings as tensor or numpy array
"""
def encode_corpus(
self,
corpus: Union[str, List[str]],
batch_size: Optional[int] = None,
max_length: Optional[int] = None,
convert_to_numpy: Optional[bool] = None,
**kwargs
) -> Union[torch.Tensor, np.ndarray]:
"""
Encode corpus documents for retrieval tasks.
Args:
corpus: Single document string or list of document strings
batch_size: Batch size for processing (overrides default)
max_length: Maximum sequence length (overrides passage_max_length)
convert_to_numpy: Convert output to numpy (overrides default)
**kwargs: Additional encoding parameters
Returns:
Corpus embeddings as tensor or numpy array
"""
def encode(
self,
sentences: Union[str, List[str]],
batch_size: Optional[int] = None,
max_length: Optional[int] = None,
convert_to_numpy: Optional[bool] = None,
instruction: Optional[str] = None,
instruction_format: Optional[str] = None,
**kwargs
) -> Union[torch.Tensor, np.ndarray]:
"""
General-purpose encoding method for any text.
Args:
sentences: Single sentence or list of sentences to encode
batch_size: Batch size for processing
max_length: Maximum sequence length
convert_to_numpy: Convert output to numpy
instruction: Instruction to prepend to sentences
instruction_format: Format string for instruction
**kwargs: Additional encoding parameters
Returns:
Text embeddings as tensor or numpy array
"""
def encode_single_device(
self,
sentences: List[str],
batch_size: int = 256,
max_length: int = 512,
convert_to_numpy: bool = True,
device: Optional[str] = None,
**kwargs
) -> Union[torch.Tensor, np.ndarray]:
"""
Abstract method for single-device encoding (implemented by subclasses).
Args:
sentences: List of sentences to encode
batch_size: Batch size for processing
max_length: Maximum sequence length
convert_to_numpy: Convert output to numpy
device: Specific device for processing
**kwargs: Additional encoding parameters
Returns:
Embeddings from single device
"""
def start_multi_process_pool(
self,
process_target_func: Callable
) -> Dict[str, Any]:
"""
Start multi-process pool for parallel processing.
Args:
process_target_func: Function to execute in parallel
Returns:
Process pool information
"""
@staticmethod
def stop_multi_process_pool(pool: Dict[str, Any]) -> None:
"""
Stop multi-process pool and clean up resources.
Args:
pool: Process pool to terminate
"""Base class for all reranking models providing a unified interface for scoring query-document pairs. Supports multi-device processing and flexible instruction formatting.
class AbsReranker:
def __init__(
self,
model_name_or_path: str,
use_fp16: bool = False,
query_instruction_for_rerank: Optional[str] = None,
query_instruction_format: str = "{}{}",
passage_instruction_for_rerank: Optional[str] = None,
passage_instruction_format: str = "{}{}",
devices: Optional[Union[str, List[str]]] = None,
batch_size: int = 128,
query_max_length: Optional[int] = None,
max_length: int = 512,
normalize: bool = False,
**kwargs
):
"""
Initialize abstract reranker base class.
Args:
model_name_or_path: Path to model or HuggingFace model name
use_fp16: Use half precision for inference
query_instruction_for_rerank: Instruction prepended to queries
query_instruction_format: Format string for query instructions
passage_instruction_for_rerank: Instruction prepended to passages
passage_instruction_format: Format string for passage instructions
devices: List of devices for multi-GPU inference
batch_size: Default batch size for scoring
query_max_length: Maximum query token length
max_length: Maximum total sequence length
normalize: Whether to normalize output scores
**kwargs: Additional model-specific parameters
"""
def compute_score(
self,
sentence_pairs: List[Tuple[str, str]],
**kwargs
) -> np.ndarray:
"""
Compute relevance scores for query-document pairs.
Args:
sentence_pairs: List of (query, document) tuples
**kwargs: Additional scoring parameters
Returns:
Array of relevance scores (higher = more relevant)
"""
def compute_score_single_gpu(
self,
sentence_pairs: List[Tuple[str, str]],
batch_size: int = 256,
query_max_length: Optional[int] = None,
max_length: int = 512,
normalize: bool = False,
device: Optional[str] = None,
**kwargs
) -> np.ndarray:
"""
Abstract method for single-GPU scoring (implemented by subclasses).
Args:
sentence_pairs: List of (query, document) tuples
batch_size: Batch size for processing
query_max_length: Maximum query token length
max_length: Maximum total sequence length
normalize: Whether to normalize scores
device: Specific device for processing
**kwargs: Additional scoring parameters
Returns:
Relevance scores from single GPU
"""from FlagEmbedding import FlagModel, FlagReranker
# All concrete embedders inherit from AbsEmbedder
embedder = FlagModel('bge-base-en-v1.5')
assert isinstance(embedder, AbsEmbedder) # True
# All concrete rerankers inherit from AbsReranker
reranker = FlagReranker('bge-reranker-base')
assert isinstance(reranker, AbsReranker) # True
# Base class methods are available on all implementations
queries = ["What is machine learning?"]
embeddings = embedder.encode_queries(queries) # AbsEmbedder method
pairs = [("query", "document")]
scores = reranker.compute_score(pairs) # AbsReranker methodfrom FlagEmbedding import FlagModel
# Base class handles multi-device distribution automatically
embedder = FlagModel(
'bge-large-en-v1.5',
devices=['cuda:0', 'cuda:1', 'cuda:2'], # Multiple GPUs
batch_size=256
)
# Large corpus processing - base class manages device distribution
large_corpus = [f"Document {i}" for i in range(50000)]
embeddings = embedder.encode_corpus(
large_corpus,
batch_size=512, # Override default batch size
convert_to_numpy=True
)
print(f"Processed {len(large_corpus)} documents across {len(embedder.devices)} devices")from FlagEmbedding import FlagModel, FlagReranker
# Embedder with custom query instructions
embedder = FlagModel(
'bge-base-en-v1.5',
query_instruction_for_retrieval="Search for: ",
query_instruction_format="{}{}" # Base class handles formatting
)
# Reranker with separate query and passage instructions
reranker = FlagReranker(
'bge-reranker-base',
query_instruction_for_rerank="Query: ",
passage_instruction_for_rerank="Document: ",
query_instruction_format="{}{}",
passage_instruction_format="{}{}"
)
# Instructions are automatically applied by base class methods
queries = ["machine learning concepts"]
embeddings = embedder.encode_queries(queries) # "Search for: machine learning concepts"
pairs = [("AI research", "Machine learning is a branch of AI")]
scores = reranker.compute_score(pairs) # Instructions applied to both query and passagefrom FlagEmbedding import FlagModel
embedder = FlagModel('bge-base-en-v1.5')
# Different encoding methods for different use cases
queries = ["How do neural networks work?"]
documents = ["Neural networks are computing systems inspired by biology"]
general_text = ["Some general text to embed"]
# Specialized methods with optimized settings
query_embeddings = embedder.encode_queries(queries, max_length=256)
doc_embeddings = embedder.encode_corpus(documents, max_length=512)
# General-purpose method with custom instruction
general_embeddings = embedder.encode(
general_text,
instruction="Encode this text: ",
instruction_format="{}{}",
max_length=384
)from FlagEmbedding import FlagModel
import multiprocessing as mp
def encode_chunk(chunk_data):
embedder, text_chunk = chunk_data
return embedder.encode_corpus(text_chunk)
# Initialize embedder
embedder = FlagModel('bge-base-en-v1.5')
# Large dataset to process
large_dataset = [f"Document {i}" for i in range(100000)]
chunk_size = 1000
chunks = [large_dataset[i:i+chunk_size] for i in range(0, len(large_dataset), chunk_size)]
# Start multi-process pool using base class method
pool_info = embedder.start_multi_process_pool(encode_chunk)
try:
# Process chunks in parallel
chunk_data = [(embedder, chunk) for chunk in chunks]
results = pool_info['pool'].map(encode_chunk, chunk_data)
# Combine results
import numpy as np
all_embeddings = np.vstack(results)
finally:
# Clean up pool using base class static method
AbsEmbedder.stop_multi_process_pool(pool_info)from FlagEmbedding import AbsEmbedder, AbsReranker, FlagAutoModel, FlagAutoReranker
# Factory methods return base class instances
embedder = FlagAutoModel.from_finetuned('bge-base-en-v1.5')
reranker = FlagAutoReranker.from_finetuned('bge-reranker-base')
# Type checking
if isinstance(embedder, AbsEmbedder):
# Can use all embedder interface methods
queries = ["test query"]
embeddings = embedder.encode_queries(queries)
print(f"Embedding shape: {embeddings.shape}")
if isinstance(reranker, AbsReranker):
# Can use all reranker interface methods
pairs = [("query", "document")]
scores = reranker.compute_score(pairs)
print(f"Relevance score: {scores[0]}")from FlagEmbedding import FlagModel
try:
# Invalid device specification
embedder = FlagModel('bge-base-en-v1.5', devices=['invalid:0'])
except RuntimeError as e:
print(f"Device error handled by base class: {e}")
try:
# Invalid batch size
embedder = FlagModel('bge-base-en-v1.5')
embeddings = embedder.encode_queries(["test"], batch_size=-1)
except ValueError as e:
print(f"Parameter validation by base class: {e}")from typing import Union, List, Optional, Dict, Any, Callable, Tuple
import torch
import numpy as np
# Base class types
EmbedderInput = Union[str, List[str]]
EmbedderOutput = Union[torch.Tensor, np.ndarray]
RerankerInput = List[Tuple[str, str]]
RerankerOutput = np.ndarray
# Configuration types
DeviceList = Optional[List[str]]
InstructionFormat = str
ProcessPoolInfo = Dict[str, Any]
ProcessTarget = Callable[[Any], Any]
# Abstract base class references
AbstractEmbedder = AbsEmbedder
AbstractReranker = AbsRerankerInstall with Tessl CLI
npx tessl i tessl/pypi-flagembedding