Python wrapper for the Tavily API with search, extract, crawl, and map capabilities
Full async/await support for all Tavily API operations, enabling high-performance concurrent requests and seamless integration with async frameworks like FastAPI, aiohttp, and asyncio-based applications.
The AsyncTavilyClient provides async versions of all synchronous operations with identical functionality and parameters.
class AsyncTavilyClient:
def __init__(
self,
api_key: Optional[str] = None,
company_info_tags: Sequence[str] = ("news", "general", "finance"),
proxies: Optional[dict[str, str]] = None,
api_base_url: Optional[str] = None
): ...All search operations support async/await patterns for non-blocking execution.
async def search(
self,
query: str,
search_depth: Literal["basic", "advanced"] = None,
topic: Literal["general", "news", "finance"] = None,
time_range: Literal["day", "week", "month", "year"] = None,
max_results: int = None,
**kwargs
) -> dict: ...
async def get_search_context(
self,
query: str,
max_tokens: int = 4000,
**kwargs
) -> str: ...
async def qna_search(
self,
query: str,
search_depth: Literal["basic", "advanced"] = "advanced",
**kwargs
) -> str: ...
async def get_company_info(
self,
query: str,
search_depth: Literal["basic", "advanced"] = "advanced",
max_results: int = 5,
**kwargs
) -> Sequence[dict]: ...Async versions of extract and crawl operations for efficient content processing.
async def extract(
self,
urls: Union[List[str], str],
extract_depth: Literal["basic", "advanced"] = None,
format: Literal["markdown", "text"] = None,
**kwargs
) -> dict: ...
async def crawl(
self,
url: str,
max_depth: int = None,
max_breadth: int = None,
instructions: str = None,
**kwargs
) -> dict: ...
async def map(
self,
url: str,
max_depth: int = None,
max_breadth: int = None,
instructions: str = None,
**kwargs
) -> dict: ...import asyncio
from tavily import AsyncTavilyClient
async def basic_async_search():
client = AsyncTavilyClient(api_key="tvly-YOUR_API_KEY")
# Async search
result = await client.search("What is machine learning?")
print(result)
# Async content extraction
content = await client.extract(["https://example.com/article"])
print(content)
# Run the async function
asyncio.run(basic_async_search())Leverage async for concurrent API calls to improve performance:
import asyncio
from tavily import AsyncTavilyClient
async def concurrent_searches():
client = AsyncTavilyClient(api_key="tvly-YOUR_API_KEY")
queries = [
"artificial intelligence trends",
"climate change solutions",
"quantum computing advances",
"renewable energy technology"
]
# Execute searches concurrently
tasks = [client.search(query) for query in queries]
results = await asyncio.gather(*tasks)
# Process results
for i, result in enumerate(results):
print(f"Query: {queries[i]}")
print(f"Results: {len(result.get('results', []))}")
print("---")
asyncio.run(concurrent_searches())Process large batches of requests with controlled concurrency:
import asyncio
from tavily import AsyncTavilyClient
async def batch_extract_with_limit():
client = AsyncTavilyClient(api_key="tvly-YOUR_API_KEY")
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
# ... many more URLs
]
# Limit concurrent requests to avoid overwhelming the API
semaphore = asyncio.Semaphore(5) # Max 5 concurrent requests
async def extract_with_limit(url):
async with semaphore:
try:
return await client.extract(url)
except Exception as e:
return {"error": str(e), "url": url}
# Process URLs in batches
tasks = [extract_with_limit(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
successful = [r for r in results if not isinstance(r, Exception) and "error" not in r]
failed = [r for r in results if isinstance(r, Exception) or "error" in r]
print(f"Successful extractions: {len(successful)}")
print(f"Failed extractions: {len(failed)}")
asyncio.run(batch_extract_with_limit())Create processing pipelines with async operations:
import asyncio
from tavily import AsyncTavilyClient
async def search_extract_pipeline():
client = AsyncTavilyClient(api_key="tvly-YOUR_API_KEY")
# Stage 1: Search for relevant URLs
search_result = await client.search(
query="best practices for API design",
max_results=10,
search_depth="advanced"
)
# Stage 2: Extract URLs from search results
urls = [result['url'] for result in search_result.get('results', [])]
# Stage 3: Extract content from URLs concurrently
extraction_tasks = [client.extract(url, format="markdown") for url in urls[:5]]
extraction_results = await asyncio.gather(*extraction_tasks, return_exceptions=True)
# Stage 4: Process extracted content
processed_content = []
for result in extraction_results:
if isinstance(result, dict) and 'results' in result:
for content in result['results']:
processed_content.append({
'url': content['url'],
'title': content.get('title', 'No title'),
'content_length': len(content.get('content', ''))
})
return processed_content
# Run pipeline
content_summary = asyncio.run(search_extract_pipeline())
for item in content_summary:
print(f"Title: {item['title']}")
print(f"URL: {item['url']}")
print(f"Content Length: {item['content_length']} characters")
print("---")Integrate AsyncTavilyClient with FastAPI applications:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from tavily import AsyncTavilyClient
app = FastAPI()
client = AsyncTavilyClient(api_key="tvly-YOUR_API_KEY")
class SearchRequest(BaseModel):
query: str
max_results: int = 5
class ExtractRequest(BaseModel):
urls: list[str]
format: str = "markdown"
@app.post("/search")
async def search_endpoint(request: SearchRequest):
try:
result = await client.search(
query=request.query,
max_results=request.max_results
)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/extract")
async def extract_endpoint(request: ExtractRequest):
try:
result = await client.extract(
urls=request.urls,
format=request.format
)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/company/{company_name}")
async def company_info_endpoint(company_name: str):
try:
result = await client.get_company_info(company_name)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))Use with aiohttp web applications:
from aiohttp import web
import json
from tavily import AsyncTavilyClient
async def init_app():
app = web.Application()
app['tavily_client'] = AsyncTavilyClient(api_key="tvly-YOUR_API_KEY")
app.router.add_post('/search', search_handler)
app.router.add_post('/extract', extract_handler)
return app
async def search_handler(request):
client = request.app['tavily_client']
data = await request.json()
try:
result = await client.search(data['query'])
return web.json_response(result)
except Exception as e:
return web.json_response(
{'error': str(e)},
status=500
)
async def extract_handler(request):
client = request.app['tavily_client']
data = await request.json()
try:
result = await client.extract(data['urls'])
return web.json_response(result)
except Exception as e:
return web.json_response(
{'error': str(e)},
status=500
)
if __name__ == '__main__':
web.run_app(init_app(), host='localhost', port=8080)Proper async error handling patterns:
import asyncio
from tavily import AsyncTavilyClient, InvalidAPIKeyError, UsageLimitExceededError, TimeoutError
async def robust_async_operations():
client = AsyncTavilyClient(api_key="tvly-YOUR_API_KEY")
# Handle individual operation errors
try:
result = await client.search("test query", timeout=30)
print("Search successful")
except TimeoutError:
print("Search timed out")
except UsageLimitExceededError:
print("API usage limit exceeded")
except InvalidAPIKeyError:
print("Invalid API key")
except Exception as e:
print(f"Unexpected error: {e}")
# Handle concurrent operation errors
queries = ["query1", "query2", "query3"]
tasks = [client.search(query) for query in queries]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Query {i+1} failed: {result}")
else:
print(f"Query {i+1} succeeded: {len(result.get('results', []))} results")
asyncio.run(robust_async_operations())The AsyncTavilyClient automatically manages HTTP connections efficiently:
# The client handles connection pooling internally
client = AsyncTavilyClient(api_key="tvly-YOUR_API_KEY")
# Multiple concurrent requests reuse connections
async def optimized_requests():
tasks = []
for i in range(10):
task = client.search(f"query {i}")
tasks.append(task)
# All requests share connection pool
results = await asyncio.gather(*tasks)
return resultsHandle large-scale async operations efficiently:
async def memory_efficient_processing():
client = AsyncTavilyClient(api_key="tvly-YOUR_API_KEY")
# Process in chunks to manage memory
urls = ["url1", "url2", "url3"] # Imagine many URLs
chunk_size = 10
all_results = []
for i in range(0, len(urls), chunk_size):
chunk = urls[i:i + chunk_size]
chunk_tasks = [client.extract(url) for url in chunk]
chunk_results = await asyncio.gather(*chunk_tasks, return_exceptions=True)
all_results.extend(chunk_results)
# Optional: add small delay between chunks
await asyncio.sleep(0.1)
return all_resultsUse async context managers for proper resource cleanup:
from contextlib import asynccontextmanager
from tavily import AsyncTavilyClient
@asynccontextmanager
async def tavily_client_context(api_key):
client = AsyncTavilyClient(api_key=api_key)
try:
yield client
finally:
# Cleanup if needed (client handles this automatically)
pass
async def main():
async with tavily_client_context("tvly-YOUR_API_KEY") as client:
result = await client.search("example query")
print(result)
asyncio.run(main())Install with Tessl CLI
npx tessl i tessl/pypi-tavily-python