tessl install tessl/pypi-chonkie@1.5.0The lightweight ingestion library for fast, efficient and robust RAG pipelines
Fluent API for building composable text processing workflows with component registration, recipe support, and end-to-end CHOMP pipeline orchestration.
Main class providing a fluent API for building and executing CHOMP pipelines.
from typing import Optional, Union, Any
class Pipeline:
"""
Fluent API for building and executing CHOMP pipelines.
Pipelines chain together:
- Fetchers: Load data from sources
- Chefs: Parse and preprocess text
- Chunkers: Split text into chunks
- Refineries: Post-process chunks
- Porters: Export chunks to files/datasets
- Handshakes: Send chunks to vector databases
"""
def __init__(self): ...
def fetch_from(self, source_type: str, **kwargs: Any) -> Pipeline:
"""
Adds a fetcher step to the pipeline.
Args:
source_type: Fetcher alias (e.g., 'file')
**kwargs: Arguments for the fetcher
Returns:
Self for method chaining
"""
...
def process_with(self, chef_type: str, **kwargs: Any) -> Pipeline:
"""
Adds a chef (preprocessing) step to the pipeline.
Args:
chef_type: Chef alias (e.g., 'text', 'markdown', 'table')
**kwargs: Arguments for the chef
Returns:
Self for method chaining
"""
...
def chunk_with(self, chunker_type: str, **kwargs: Any) -> Pipeline:
"""
Adds a chunker step to the pipeline.
Args:
chunker_type: Chunker alias (e.g., 'token', 'sentence', 'recursive', 'semantic')
**kwargs: Arguments for the chunker
Returns:
Self for method chaining
"""
...
def refine_with(self, refinery_type: str, **kwargs: Any) -> Pipeline:
"""
Adds a refinery (post-processing) step to the pipeline.
Args:
refinery_type: Refinery alias (e.g., 'overlap', 'embeddings')
**kwargs: Arguments for the refinery
Returns:
Self for method chaining
"""
...
def export_with(self, porter_type: str, **kwargs: Any) -> Pipeline:
"""
Adds a porter (export) step to the pipeline.
Args:
porter_type: Porter alias (e.g., 'json', 'datasets')
**kwargs: Arguments for the porter
Returns:
Self for method chaining
"""
...
def store_in(self, handshake_type: str, **kwargs: Any) -> Pipeline:
"""
Adds a handshake (vector database) step to the pipeline.
Args:
handshake_type: Handshake alias (e.g., 'chroma', 'qdrant', 'pinecone')
**kwargs: Arguments for the handshake
Returns:
Self for method chaining
"""
...
def run(
self,
texts: Optional[Union[str, list[str]]] = None
) -> Union[Document, list[Document]]:
"""
Executes the pipeline.
Args:
texts: Optional input text(s). If None, pipeline must start with fetch_from()
Returns:
Processed Document or list of Documents
"""
...
def reset(self) -> Pipeline:
"""
Clears all pipeline steps.
Returns:
Self for method chaining
"""
...
def to_config(self, path: Optional[str] = None) -> list[dict[str, Any]]:
"""
Exports pipeline configuration.
Args:
path: Optional file path to save configuration as JSON
Returns:
List of component configurations
"""
...
def describe(self) -> str:
"""
Returns human-readable pipeline description.
Returns:
String describing all pipeline steps
"""
...
@classmethod
def from_recipe(cls, name: str, path: Optional[str] = None) -> Pipeline:
"""
Creates pipeline from a predefined recipe.
Args:
name: Recipe name
path: Optional path to custom recipe file
Returns:
Configured Pipeline instance
"""
...
@classmethod
def from_config(
cls,
config: Union[str, list[Union[tuple[Any, ...], dict[str, Any]]]]
) -> Pipeline:
"""
Creates pipeline from configuration.
Args:
config: Configuration as JSON file path or list of component configs.
Each config item can be:
- Tuple: (step_type, component_name, kwargs_dict)
- Dict: {'type': step_type, 'component': component_name, **kwargs}
step_type: 'fetch', 'process', 'chunk', 'refine', 'export', 'write'
Returns:
Configured Pipeline instance
Example:
Pipeline.from_config([
('chunk', 'token', {'chunk_size': 512}),
('refine', 'overlap', {'context_size': 50})
])
"""
...Usage examples:
from chonkie import Pipeline
# Basic pipeline
pipe = (
Pipeline()
.chunk_with("recursive", chunk_size=512)
.refine_with("embeddings", embedding_model="all-MiniLM-L6-v2")
)
doc = pipe.run("Your text here...")
# Complex pipeline with multiple steps
pipe = (
Pipeline()
.fetch_from("file", dir="./documents", ext=[".txt", ".md"])
.process_with("markdown")
.chunk_with("recursive", recipe="markdown", chunk_size=1024)
.chunk_with("semantic", threshold=0.75)
.refine_with("overlap", context_size=128)
.refine_with("embeddings", embedding_model="openai/text-embedding-3-small")
.store_in("chroma", collection_name="my_docs")
.export_with("json", file="output.jsonl")
)
docs = pipe.run()
# Save configuration
pipe.to_config("pipeline_config.json")
# Load from configuration
new_pipe = Pipeline.from_config("pipeline_config.json")
# Load from recipe
recipe_pipe = Pipeline.from_recipe("default")
# Describe pipeline
print(pipe.describe())Dataclass representing metadata about a pipeline component.
from dataclasses import dataclass
from typing import Any
@dataclass
class Component:
"""
Metadata about a pipeline component.
Attributes:
name: Full class name of the component
alias: Short alias used in pipeline methods
component_class: The actual component class
component_type: Type of component (fetcher, chef, chunker, etc.)
"""
name: str
alias: str
component_class: type[Any]
component_type: ComponentTypeEnum defining the types of pipeline components.
from enum import Enum
class ComponentType(Enum):
"""
Enum of pipeline component types.
"""
FETCHER = "fetcher"
CHEF = "chef"
CHUNKER = "chunker"
REFINERY = "refinery"
PORTER = "porter"
HANDSHAKE = "handshake"Global singleton registry for managing and discovering pipeline components. ComponentRegistry is a pre-instantiated object, not a class - use it directly without instantiation.
class ComponentRegistry:
"""
Global singleton registry for managing pipeline components.
This is a pre-instantiated object - use directly without calling ComponentRegistry().
"""
@staticmethod
def register_component(component: Component) -> None:
"""
Registers a component in the registry.
Args:
component: Component to register
"""
...
@staticmethod
def get_fetcher(alias: str) -> Component:
"""
Gets fetcher component by alias.
Args:
alias: Fetcher alias
Returns:
Component instance
Raises:
ValueError: If alias not found
"""
...
@staticmethod
def get_chef(alias: str) -> Component:
"""
Gets chef component by alias.
Args:
alias: Chef alias
Returns:
Component instance
Raises:
ValueError: If alias not found
"""
...
@staticmethod
def get_chunker(alias: str) -> Component:
"""
Gets chunker component by alias.
Args:
alias: Chunker alias
Returns:
Component instance
Raises:
ValueError: If alias not found
"""
...
@staticmethod
def get_refinery(alias: str) -> Component:
"""
Gets refinery component by alias.
Args:
alias: Refinery alias
Returns:
Component instance
Raises:
ValueError: If alias not found
"""
...
@staticmethod
def get_porter(alias: str) -> Component:
"""
Gets porter component by alias.
Args:
alias: Porter alias
Returns:
Component instance
Raises:
ValueError: If alias not found
"""
...
@staticmethod
def get_handshake(alias: str) -> Component:
"""
Gets handshake component by alias.
Args:
alias: Handshake alias
Returns:
Component instance
Raises:
ValueError: If alias not found
"""
...
@staticmethod
def list_fetchers() -> list[str]:
"""
Lists all registered fetcher aliases.
Returns:
List of fetcher aliases
"""
...
@staticmethod
def list_chefs() -> list[str]:
"""
Lists all registered chef aliases.
Returns:
List of chef aliases
"""
...
@staticmethod
def list_chunkers() -> list[str]:
"""
Lists all registered chunker aliases.
Returns:
List of chunker aliases
"""
...
@staticmethod
def list_refineries() -> list[str]:
"""
Lists all registered refinery aliases.
Returns:
List of refinery aliases
"""
...
@staticmethod
def list_porters() -> list[str]:
"""
Lists all registered porter aliases.
Returns:
List of porter aliases
"""
...
@staticmethod
def list_handshakes() -> list[str]:
"""
Lists all registered handshake aliases.
Returns:
List of handshake aliases
"""
...Usage example:
from chonkie.pipeline import ComponentRegistry
# List available components
chunkers = ComponentRegistry.list_chunkers()
print(chunkers) # ['token', 'sentence', 'recursive', 'semantic', ...]
refineries = ComponentRegistry.list_refineries()
print(refineries) # ['overlap', 'embeddings']
# Get component info
component = ComponentRegistry.get_chunker("recursive")
print(component.name) # 'RecursiveChunker'
print(component.component_type) # ComponentType.CHUNKERDecorators for registering custom components with the pipeline system.
from typing import Callable, TypeVar
T = TypeVar('T')
def pipeline_component(
alias: str,
component_type: ComponentType
) -> Callable[[type[T]], type[T]]:
"""
Generic decorator for registering pipeline components.
Args:
alias: Short alias for the component
component_type: Type of component
Returns:
Decorator function
"""
...
def fetcher(alias: str) -> Callable[[type[T]], type[T]]:
"""
Decorator for registering a fetcher.
Args:
alias: Fetcher alias
Returns:
Decorator function
"""
...
def chef(alias: str) -> Callable[[type[T]], type[T]]:
"""
Decorator for registering a chef.
Args:
alias: Chef alias
Returns:
Decorator function
"""
...
def chunker(alias: str) -> Callable[[type[T]], type[T]]:
"""
Decorator for registering a chunker.
Args:
alias: Chunker alias
Returns:
Decorator function
"""
...
def refinery(alias: str) -> Callable[[type[T]], type[T]]:
"""
Decorator for registering a refinery.
Args:
alias: Refinery alias
Returns:
Decorator function
"""
...
def porter(alias: str) -> Callable[[type[T]], type[T]]:
"""
Decorator for registering a porter.
Args:
alias: Porter alias
Returns:
Decorator function
"""
...
def handshake(alias: str) -> Callable[[type[T]], type[T]]:
"""
Decorator for registering a handshake.
Args:
alias: Handshake alias
Returns:
Decorator function
"""
...Usage example:
from chonkie import BaseChunker, BaseRefinery
from chonkie.pipeline import chunker, refinery
@chunker("custom")
class CustomChunker(BaseChunker):
def __init__(self, my_param: int = 10):
super().__init__()
self.my_param = my_param
def chunk(self, text: str) -> list[Chunk]:
# Custom chunking logic
return []
@refinery("custom")
class CustomRefinery(BaseRefinery):
def refine(self, chunks: list[Chunk]) -> list[Chunk]:
# Custom refinement logic
return chunks
# Use in pipeline
from chonkie import Pipeline
pipe = (
Pipeline()
.chunk_with("custom", my_param=20)
.refine_with("custom")
)Pipeline components are available from the main package:
from chonkie import Pipeline
from chonkie.pipeline import (
Component,
ComponentType,
ComponentRegistry,
pipeline_component,
fetcher,
chef,
chunker,
refinery,
porter,
handshake,
)file - FileFetchertext - TextChefmarkdown - MarkdownCheftable - TableCheftoken - TokenChunkersentence - SentenceChunkerrecursive - RecursiveChunkersemantic - SemanticChunkercode - CodeChunkerlate - LateChunkerslumber - SlumberChunkerneural - NeuralChunkerfast - FastChunkeroverlap - OverlapRefineryembeddings - EmbeddingsRefineryjson - JSONPorterdatasets - DatasetsPorterchroma - ChromaHandshakeqdrant - QdrantHandshakepinecone - PineconeHandshakeweaviate - WeaviateHandshakemilvus - MilvusHandshakeelastic - ElasticHandshakemongodb - MongoDBHandshakepgvector - PgvectorHandshaketurbopuffer - TurbopufferHandshakefrom chonkie import Pipeline
# Coarse-to-fine chunking strategy
pipe = (
Pipeline()
.chunk_with("recursive", chunk_size=4096) # Large chunks first
.chunk_with("semantic", chunk_size=512) # Then semantic split
)from chonkie import Pipeline
# Different processing for different file types
markdown_pipe = (
Pipeline()
.process_with("markdown")
.chunk_with("recursive", recipe="markdown")
)
text_pipe = (
Pipeline()
.process_with("text")
.chunk_with("sentence")
)from chonkie import Pipeline
# Export to multiple destinations
pipe = (
Pipeline()
.chunk_with("recursive")
.refine_with("embeddings")
.store_in("chroma", collection_name="docs")
.store_in("qdrant", collection_name="docs")
.export_with("json", file="backup.jsonl")
)from chonkie import Pipeline
# Load pre-configured pipeline
pipe = Pipeline.from_recipe("default")
# Customize after loading
pipe.refine_with("embeddings", embedding_model="openai/text-embedding-3-large")