Python wrapper for the Tavily API with search, extract, crawl, and map capabilities
Combine Tavily's web search capabilities with local vector database searches for enhanced RAG applications. The TavilyHybridClient integrates web search results with your existing document collections, providing both fresh external content and relevant local context.
The TavilyHybridClient combines Tavily API search with local vector database queries, supporting embedding generation, result ranking, and automatic storage of web results in your database.
class TavilyHybridClient:
def __init__(
self,
api_key: Union[str, None],
db_provider: Literal['mongodb'],
collection,
index: str,
embeddings_field: str = 'embeddings',
content_field: str = 'content',
embedding_function: Optional[callable] = None,
ranking_function: Optional[callable] = None
):
"""
Initialize hybrid RAG client combining Tavily API with local database.
Parameters:
- api_key: Tavily API key (or None to use TAVILY_API_KEY env var)
- db_provider: Database provider ("mongodb" only currently supported)
- collection: MongoDB collection object for local search
- index: Name of the vector search index in the collection
- embeddings_field: Field name containing embeddings (default: 'embeddings')
- content_field: Field name containing text content (default: 'content')
- embedding_function: Custom embedding function (defaults to Cohere)
- ranking_function: Custom ranking function (defaults to Cohere rerank)
"""Perform combined searches across both local database and web sources with intelligent ranking and optional result storage.
def search(
self,
query: str,
max_results: int = 10,
max_local: int = None,
max_foreign: int = None,
save_foreign: bool = False,
**kwargs
) -> list:
"""
Perform hybrid search combining local database and Tavily API results.
Parameters:
- query: Search query string
- max_results: Maximum number of final ranked results to return
- max_local: Maximum results from local database (defaults to max_results)
- max_foreign: Maximum results from Tavily API (defaults to max_results)
- save_foreign: Whether to save Tavily results to local database
- True: Save results as-is with content and embeddings
- callable: Transform function to process results before saving
- False: Don't save results
- **kwargs: Additional parameters passed to Tavily search
Returns:
List of ranked search results containing:
- content: Content text
- score: Relevance score
- origin: Source ("local" or "foreign")
"""Configure MongoDB with vector search capabilities:
import pymongo
from tavily import TavilyHybridClient
# Connect to MongoDB
client = pymongo.MongoClient("mongodb://localhost:27017/")
db = client["my_rag_database"]
collection = db["documents"]
# Create vector search index (run once)
collection.create_search_index({
"name": "vector_index",
"definition": {
"fields": [
{
"type": "vector",
"path": "embeddings",
"numDimensions": 1024, # Adjust based on your embedding model
"similarity": "cosine"
}
]
}
})
# Initialize hybrid client
hybrid_client = TavilyHybridClient(
api_key="tvly-YOUR_API_KEY",
db_provider="mongodb",
collection=collection,
index="vector_index"
)Use custom embedding functions instead of the default Cohere integration:
from sentence_transformers import SentenceTransformer
import numpy as np
# Custom embedding function using sentence-transformers
model = SentenceTransformer('all-MiniLM-L6-v2')
def custom_embed_function(texts, input_type):
"""
Custom embedding function compatible with TavilyHybridClient.
Args:
texts: List of text strings to embed
input_type: 'search_query' or 'search_document'
Returns:
List of embedding vectors
"""
embeddings = model.encode(texts)
return embeddings.tolist()
# Custom ranking function
def custom_ranking_function(query, documents, top_n):
"""
Custom ranking function for result reordering.
Args:
query: Search query string
documents: List of document dicts with 'content' field
top_n: Number of top results to return
Returns:
List of reranked documents with 'score' field added
"""
# Simple keyword-based scoring (replace with your ranking logic)
query_words = set(query.lower().split())
scored_docs = []
for doc in documents:
content_words = set(doc['content'].lower().split())
overlap = len(query_words.intersection(content_words))
doc_with_score = doc.copy()
doc_with_score['score'] = overlap / len(query_words) if query_words else 0
scored_docs.append(doc_with_score)
# Sort by score and return top N
scored_docs.sort(key=lambda x: x['score'], reverse=True)
return scored_docs[:top_n]
# Initialize with custom functions
hybrid_client = TavilyHybridClient(
api_key="tvly-YOUR_API_KEY",
db_provider="mongodb",
collection=collection,
index="vector_index",
embedding_function=custom_embed_function,
ranking_function=custom_ranking_function
)Combine local knowledge with web search:
# Initialize hybrid client
hybrid_client = TavilyHybridClient(
api_key="tvly-YOUR_API_KEY",
db_provider="mongodb",
collection=collection,
index="vector_index"
)
# Perform hybrid search
results = hybrid_client.search(
query="latest developments in quantum computing",
max_results=10,
max_local=5, # Get up to 5 local results
max_foreign=5 # Get up to 5 web results
)
# Process combined results
for result in results:
print(f"Source: {result['origin']}")
print(f"Score: {result['score']:.3f}")
print(f"Content: {result['content'][:200]}...")
print("---")Automatically expand your local knowledge base with relevant web content:
# Search and save web results to local database
results = hybrid_client.search(
query="machine learning best practices",
max_results=8,
save_foreign=True, # Save web results to database
search_depth="advanced",
topic="general"
)
print(f"Found {len(results)} total results")
local_count = len([r for r in results if r['origin'] == 'local'])
foreign_count = len([r for r in results if r['origin'] == 'foreign'])
print(f"Local: {local_count}, Web: {foreign_count}")Transform web results before saving to database:
def process_web_result(result):
"""
Custom function to process web results before saving to database.
Args:
result: Web search result dict with 'content', 'embeddings', etc.
Returns:
Dict to save to database, or None to skip saving
"""
# Add metadata
processed = {
'content': result['content'],
'embeddings': result['embeddings'],
'source_url': result.get('url', ''),
'added_date': datetime.utcnow(),
'content_type': 'web_search',
'content_length': len(result['content'])
}
# Skip very short content
if len(result['content']) < 100:
return None
return processed
# Use custom processing
results = hybrid_client.search(
query="renewable energy technologies",
save_foreign=process_web_result, # Use custom processing function
max_results=10
)Create specialized RAG systems for specific domains:
# Medical RAG with domain filtering
medical_results = hybrid_client.search(
query="treatment options for type 2 diabetes",
max_results=12,
max_local=8, # Prioritize local medical knowledge
max_foreign=4, # Limited web results
include_domains=[ # Focus on medical sources
"pubmed.ncbi.nlm.nih.gov",
"mayoclinic.org",
"nejm.org",
"bmj.com"
],
save_foreign=True,
search_depth="advanced"
)
# Legal RAG with case law focus
legal_results = hybrid_client.search(
query="precedent for intellectual property disputes",
max_results=10,
include_domains=[
"law.cornell.edu",
"justia.com",
"findlaw.com"
],
save_foreign=process_legal_content, # Custom legal content processor
topic="general"
)Keep your knowledge base current with fresh web content:
def update_knowledge_base():
"""Periodically update knowledge base with fresh web content."""
# Define topics of interest
topics = [
"artificial intelligence developments",
"climate change research",
"medical breakthroughs",
"technology innovations"
]
for topic in topics:
print(f"Updating knowledge for: {topic}")
# Search with time constraints to get recent content
results = hybrid_client.search(
query=topic,
max_results=5,
max_foreign=5, # Only get web results
max_local=0, # Skip local results for updates
time_range="week", # Recent content only
save_foreign=True, # Save to expand knowledge base
search_depth="advanced"
)
print(f"Added {len([r for r in results if r['origin'] == 'foreign'])} new documents")
# Run periodically (e.g., daily via cron job)
update_knowledge_base()Combine different types of content in your RAG system:
def enhanced_document_processor(result):
"""Process web results with enhanced metadata extraction."""
content = result['content']
# Basic content analysis
word_count = len(content.split())
has_code = '```' in content or 'def ' in content or 'class ' in content
has_data = any(word in content.lower() for word in ['data', 'statistics', 'metrics'])
processed = {
'content': content,
'embeddings': result['embeddings'],
'metadata': {
'source_url': result.get('url', ''),
'word_count': word_count,
'content_type': 'code' if has_code else 'data' if has_data else 'text',
'processed_date': datetime.utcnow(),
'embedding_model': 'cohere-embed-english-v3.0'
}
}
return processed
# Search with enhanced processing
results = hybrid_client.search(
query="Python web scraping techniques",
save_foreign=enhanced_document_processor,
max_results=10,
include_raw_content="markdown" # Preserve formatting for code examples
)Handle errors in hybrid RAG operations:
from tavily import TavilyHybridClient, InvalidAPIKeyError
import pymongo.errors
try:
# Initialize client
hybrid_client = TavilyHybridClient(
api_key="tvly-YOUR_API_KEY",
db_provider="mongodb",
collection=collection,
index="vector_index"
)
# Perform search with error handling
results = hybrid_client.search(
query="example query",
max_results=10,
save_foreign=True
)
except ValueError as e:
# Handle database configuration errors
print(f"Database configuration error: {e}")
except InvalidAPIKeyError:
# Handle Tavily API key errors
print("Invalid Tavily API key")
except pymongo.errors.PyMongoError as e:
# Handle MongoDB errors
print(f"Database error: {e}")
except Exception as e:
# Handle unexpected errors
print(f"Unexpected error: {e}")Optimize hybrid RAG performance:
# Balanced search configuration
optimized_results = hybrid_client.search(
query="query",
max_results=10, # Reasonable result count
max_local=7, # Favor local results (faster)
max_foreign=5, # Limit web requests
timeout=30, # Reasonable timeout
search_depth="basic" # Faster web search
)
# Batch processing for multiple queries
queries = ["query1", "query2", "query3"]
all_results = []
for query in queries:
try:
results = hybrid_client.search(
query=query,
max_results=5, # Smaller batches
save_foreign=False # Skip saving for batch processing
)
all_results.extend(results)
except Exception as e:
print(f"Failed to process query '{query}': {e}")
continue
print(f"Processed {len(all_results)} total results from {len(queries)} queries")Install with Tessl CLI
npx tessl i tessl/pypi-tavily-python