CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-flagembedding

FlagEmbedding - BGE: One-Stop Retrieval Toolkit For Search and RAG

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

base-classes.mddocs/

Base Classes and Interfaces

Abstract base classes that define the core interface contracts for embedders and rerankers. These classes provide multi-device support, consistent API patterns, and form the foundation for all concrete implementations in FlagEmbedding.

Capabilities

AbsEmbedder (Abstract Embedder Base)

Base class for all embedding models providing a unified interface for encoding text into vector representations. Supports multi-device processing and consistent batch handling across different model architectures.

from typing import Union

class AbsEmbedder:
    def __init__(
        self,
        model_name_or_path: str,
        normalize_embeddings: bool = True,
        use_fp16: bool = True,
        query_instruction_for_retrieval: Optional[str] = None,
        query_instruction_format: str = "{}{}",
        devices: Optional[Union[str, List[str]]] = None,
        batch_size: int = 256,
        query_max_length: int = 512,
        passage_max_length: int = 512,
        convert_to_numpy: bool = True,
        **kwargs
    ):
        """
        Initialize abstract embedder base class.
        
        Args:
            model_name_or_path: Path to model or HuggingFace model name
            normalize_embeddings: Whether to normalize output embeddings
            use_fp16: Use half precision for inference
            query_instruction_for_retrieval: Instruction prepended to queries
            query_instruction_format: Format string for instructions
            devices: List of devices for multi-GPU inference
            batch_size: Default batch size for encoding
            query_max_length: Maximum query token length
            passage_max_length: Maximum passage token length
            convert_to_numpy: Convert outputs to numpy arrays
            **kwargs: Additional model-specific parameters
        """
        
    def encode_queries(
        self,
        queries: Union[str, List[str]],
        batch_size: Optional[int] = None,
        max_length: Optional[int] = None,
        convert_to_numpy: Optional[bool] = None,
        **kwargs
    ) -> Union[torch.Tensor, np.ndarray]:
        """
        Encode queries for retrieval tasks.
        
        Args:
            queries: Single query string or list of query strings
            batch_size: Batch size for processing (overrides default)
            max_length: Maximum sequence length (overrides query_max_length)
            convert_to_numpy: Convert output to numpy (overrides default)
            **kwargs: Additional encoding parameters
            
        Returns:
            Query embeddings as tensor or numpy array
        """
        
    def encode_corpus(
        self,
        corpus: Union[str, List[str]],
        batch_size: Optional[int] = None,
        max_length: Optional[int] = None,
        convert_to_numpy: Optional[bool] = None,
        **kwargs
    ) -> Union[torch.Tensor, np.ndarray]:
        """
        Encode corpus documents for retrieval tasks.
        
        Args:
            corpus: Single document string or list of document strings
            batch_size: Batch size for processing (overrides default)
            max_length: Maximum sequence length (overrides passage_max_length)
            convert_to_numpy: Convert output to numpy (overrides default)
            **kwargs: Additional encoding parameters
            
        Returns:
            Corpus embeddings as tensor or numpy array
        """
        
    def encode(
        self,
        sentences: Union[str, List[str]],
        batch_size: Optional[int] = None,
        max_length: Optional[int] = None,
        convert_to_numpy: Optional[bool] = None,
        instruction: Optional[str] = None,
        instruction_format: Optional[str] = None,
        **kwargs
    ) -> Union[torch.Tensor, np.ndarray]:
        """
        General-purpose encoding method for any text.
        
        Args:
            sentences: Single sentence or list of sentences to encode
            batch_size: Batch size for processing
            max_length: Maximum sequence length
            convert_to_numpy: Convert output to numpy
            instruction: Instruction to prepend to sentences
            instruction_format: Format string for instruction
            **kwargs: Additional encoding parameters
            
        Returns:
            Text embeddings as tensor or numpy array
        """
        
    def encode_single_device(
        self,
        sentences: List[str],
        batch_size: int = 256,
        max_length: int = 512,
        convert_to_numpy: bool = True,
        device: Optional[str] = None,
        **kwargs
    ) -> Union[torch.Tensor, np.ndarray]:
        """
        Abstract method for single-device encoding (implemented by subclasses).
        
        Args:
            sentences: List of sentences to encode
            batch_size: Batch size for processing
            max_length: Maximum sequence length
            convert_to_numpy: Convert output to numpy
            device: Specific device for processing
            **kwargs: Additional encoding parameters
            
        Returns:
            Embeddings from single device
        """
        
    def start_multi_process_pool(
        self,
        process_target_func: Callable
    ) -> Dict[str, Any]:
        """
        Start multi-process pool for parallel processing.
        
        Args:
            process_target_func: Function to execute in parallel
            
        Returns:
            Process pool information
        """
        
    @staticmethod
    def stop_multi_process_pool(pool: Dict[str, Any]) -> None:
        """
        Stop multi-process pool and clean up resources.
        
        Args:
            pool: Process pool to terminate
        """

AbsReranker (Abstract Reranker Base)

Base class for all reranking models providing a unified interface for scoring query-document pairs. Supports multi-device processing and flexible instruction formatting.

class AbsReranker:
    def __init__(
        self,
        model_name_or_path: str,
        use_fp16: bool = False,
        query_instruction_for_rerank: Optional[str] = None,
        query_instruction_format: str = "{}{}",
        passage_instruction_for_rerank: Optional[str] = None,
        passage_instruction_format: str = "{}{}",
        devices: Optional[Union[str, List[str]]] = None,
        batch_size: int = 128,
        query_max_length: Optional[int] = None,
        max_length: int = 512,
        normalize: bool = False,
        **kwargs
    ):
        """
        Initialize abstract reranker base class.
        
        Args:
            model_name_or_path: Path to model or HuggingFace model name
            use_fp16: Use half precision for inference
            query_instruction_for_rerank: Instruction prepended to queries
            query_instruction_format: Format string for query instructions
            passage_instruction_for_rerank: Instruction prepended to passages
            passage_instruction_format: Format string for passage instructions
            devices: List of devices for multi-GPU inference
            batch_size: Default batch size for scoring
            query_max_length: Maximum query token length
            max_length: Maximum total sequence length
            normalize: Whether to normalize output scores
            **kwargs: Additional model-specific parameters
        """
        
    def compute_score(
        self,
        sentence_pairs: List[Tuple[str, str]],
        **kwargs
    ) -> np.ndarray:
        """
        Compute relevance scores for query-document pairs.
        
        Args:
            sentence_pairs: List of (query, document) tuples
            **kwargs: Additional scoring parameters
            
        Returns:
            Array of relevance scores (higher = more relevant)
        """
        
    def compute_score_single_gpu(
        self,
        sentence_pairs: List[Tuple[str, str]],
        batch_size: int = 256,
        query_max_length: Optional[int] = None,
        max_length: int = 512,
        normalize: bool = False,
        device: Optional[str] = None,
        **kwargs
    ) -> np.ndarray:
        """
        Abstract method for single-GPU scoring (implemented by subclasses).
        
        Args:
            sentence_pairs: List of (query, document) tuples
            batch_size: Batch size for processing
            query_max_length: Maximum query token length
            max_length: Maximum total sequence length
            normalize: Whether to normalize scores
            device: Specific device for processing
            **kwargs: Additional scoring parameters
            
        Returns:
            Relevance scores from single GPU
        """

Usage Examples

Understanding the Base Class Interface

from FlagEmbedding import FlagModel, FlagReranker

# All concrete embedders inherit from AbsEmbedder
embedder = FlagModel('bge-base-en-v1.5')
assert isinstance(embedder, AbsEmbedder)  # True

# All concrete rerankers inherit from AbsReranker  
reranker = FlagReranker('bge-reranker-base')
assert isinstance(reranker, AbsReranker)  # True

# Base class methods are available on all implementations
queries = ["What is machine learning?"]
embeddings = embedder.encode_queries(queries)  # AbsEmbedder method

pairs = [("query", "document")]
scores = reranker.compute_score(pairs)  # AbsReranker method

Multi-Device Processing with Base Classes

from FlagEmbedding import FlagModel

# Base class handles multi-device distribution automatically
embedder = FlagModel(
    'bge-large-en-v1.5',
    devices=['cuda:0', 'cuda:1', 'cuda:2'],  # Multiple GPUs
    batch_size=256
)

# Large corpus processing - base class manages device distribution
large_corpus = [f"Document {i}" for i in range(50000)]
embeddings = embedder.encode_corpus(
    large_corpus,
    batch_size=512,  # Override default batch size
    convert_to_numpy=True
)

print(f"Processed {len(large_corpus)} documents across {len(embedder.devices)} devices")

Custom Instruction Handling

from FlagEmbedding import FlagModel, FlagReranker

# Embedder with custom query instructions
embedder = FlagModel(
    'bge-base-en-v1.5',
    query_instruction_for_retrieval="Search for: ",
    query_instruction_format="{}{}"  # Base class handles formatting
)

# Reranker with separate query and passage instructions
reranker = FlagReranker(
    'bge-reranker-base',
    query_instruction_for_rerank="Query: ",
    passage_instruction_for_rerank="Document: ",
    query_instruction_format="{}{}",
    passage_instruction_format="{}{}"
)

# Instructions are automatically applied by base class methods
queries = ["machine learning concepts"]
embeddings = embedder.encode_queries(queries)  # "Search for: machine learning concepts"

pairs = [("AI research", "Machine learning is a branch of AI")]
scores = reranker.compute_score(pairs)  # Instructions applied to both query and passage

Flexible Encoding Methods

from FlagEmbedding import FlagModel

embedder = FlagModel('bge-base-en-v1.5')

# Different encoding methods for different use cases
queries = ["How do neural networks work?"]
documents = ["Neural networks are computing systems inspired by biology"]
general_text = ["Some general text to embed"]

# Specialized methods with optimized settings
query_embeddings = embedder.encode_queries(queries, max_length=256)
doc_embeddings = embedder.encode_corpus(documents, max_length=512)

# General-purpose method with custom instruction
general_embeddings = embedder.encode(
    general_text,
    instruction="Encode this text: ",
    instruction_format="{}{}",
    max_length=384
)

Process Pool Management

from FlagEmbedding import FlagModel
import multiprocessing as mp

def encode_chunk(chunk_data):
    embedder, text_chunk = chunk_data
    return embedder.encode_corpus(text_chunk)

# Initialize embedder
embedder = FlagModel('bge-base-en-v1.5')

# Large dataset to process
large_dataset = [f"Document {i}" for i in range(100000)]
chunk_size = 1000
chunks = [large_dataset[i:i+chunk_size] for i in range(0, len(large_dataset), chunk_size)]

# Start multi-process pool using base class method
pool_info = embedder.start_multi_process_pool(encode_chunk)

try:
    # Process chunks in parallel
    chunk_data = [(embedder, chunk) for chunk in chunks]
    results = pool_info['pool'].map(encode_chunk, chunk_data)
    
    # Combine results
    import numpy as np
    all_embeddings = np.vstack(results)
    
finally:
    # Clean up pool using base class static method
    AbsEmbedder.stop_multi_process_pool(pool_info)

Type Checking and Interface Validation

from FlagEmbedding import AbsEmbedder, AbsReranker, FlagAutoModel, FlagAutoReranker

# Factory methods return base class instances
embedder = FlagAutoModel.from_finetuned('bge-base-en-v1.5')
reranker = FlagAutoReranker.from_finetuned('bge-reranker-base')

# Type checking
if isinstance(embedder, AbsEmbedder):
    # Can use all embedder interface methods
    queries = ["test query"]
    embeddings = embedder.encode_queries(queries)
    print(f"Embedding shape: {embeddings.shape}")

if isinstance(reranker, AbsReranker):
    # Can use all reranker interface methods
    pairs = [("query", "document")]
    scores = reranker.compute_score(pairs)
    print(f"Relevance score: {scores[0]}")

Error Handling in Base Classes

from FlagEmbedding import FlagModel

try:
    # Invalid device specification
    embedder = FlagModel('bge-base-en-v1.5', devices=['invalid:0'])
except RuntimeError as e:
    print(f"Device error handled by base class: {e}")

try:
    # Invalid batch size
    embedder = FlagModel('bge-base-en-v1.5')
    embeddings = embedder.encode_queries(["test"], batch_size=-1)
except ValueError as e:
    print(f"Parameter validation by base class: {e}")

Base Class Benefits

Consistent Interface

  • All embedders provide the same methods regardless of underlying architecture
  • All rerankers follow the same scoring interface
  • Uniform parameter handling across implementations

Multi-Device Support

  • Automatic workload distribution across multiple GPUs
  • Consistent performance scaling
  • Built-in device management and error handling

Flexible Configuration

  • Standardized instruction formatting
  • Consistent batch processing options
  • Unified parameter validation

Extensibility

  • Clear interface contracts for new implementations
  • Abstract methods guide proper implementation
  • Consistent behavior across model types

Types

from typing import Union, List, Optional, Dict, Any, Callable, Tuple
import torch
import numpy as np

# Base class types
EmbedderInput = Union[str, List[str]]
EmbedderOutput = Union[torch.Tensor, np.ndarray]
RerankerInput = List[Tuple[str, str]]
RerankerOutput = np.ndarray

# Configuration types
DeviceList = Optional[List[str]]
InstructionFormat = str
ProcessPoolInfo = Dict[str, Any]
ProcessTarget = Callable[[Any], Any]

# Abstract base class references
AbstractEmbedder = AbsEmbedder
AbstractReranker = AbsReranker

Install with Tessl CLI

npx tessl i tessl/pypi-flagembedding

docs

auto-models.md

base-classes.md

decoder-embedders.md

encoder-embedders.md

index.md

model-types.md

rerankers.md

tile.json