CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-tavily-python

Python wrapper for the Tavily API with search, extract, crawl, and map capabilities

Overview
Eval results
Files

async.mddocs/

Async Operations

Full async/await support for all Tavily API operations, enabling high-performance concurrent requests and seamless integration with async frameworks like FastAPI, aiohttp, and asyncio-based applications.

Capabilities

Async Client

The AsyncTavilyClient provides async versions of all synchronous operations with identical functionality and parameters.

class AsyncTavilyClient:
    def __init__(
        self,
        api_key: Optional[str] = None,
        company_info_tags: Sequence[str] = ("news", "general", "finance"),
        proxies: Optional[dict[str, str]] = None,
        api_base_url: Optional[str] = None
    ): ...

Async Search Operations

All search operations support async/await patterns for non-blocking execution.

async def search(
    self,
    query: str,
    search_depth: Literal["basic", "advanced"] = None,
    topic: Literal["general", "news", "finance"] = None,
    time_range: Literal["day", "week", "month", "year"] = None,
    max_results: int = None,
    **kwargs
) -> dict: ...

async def get_search_context(
    self,
    query: str,
    max_tokens: int = 4000,
    **kwargs
) -> str: ...

async def qna_search(
    self,
    query: str,
    search_depth: Literal["basic", "advanced"] = "advanced",
    **kwargs
) -> str: ...

async def get_company_info(
    self,
    query: str,
    search_depth: Literal["basic", "advanced"] = "advanced",
    max_results: int = 5,
    **kwargs
) -> Sequence[dict]: ...

Async Content Operations

Async versions of extract and crawl operations for efficient content processing.

async def extract(
    self,
    urls: Union[List[str], str],
    extract_depth: Literal["basic", "advanced"] = None,
    format: Literal["markdown", "text"] = None,
    **kwargs
) -> dict: ...

async def crawl(
    self,
    url: str,
    max_depth: int = None,
    max_breadth: int = None,
    instructions: str = None,
    **kwargs
) -> dict: ...

async def map(
    self,
    url: str,
    max_depth: int = None,
    max_breadth: int = None,
    instructions: str = None,
    **kwargs
) -> dict: ...

Basic Async Usage

Simple Async Operations

import asyncio
from tavily import AsyncTavilyClient

async def basic_async_search():
    client = AsyncTavilyClient(api_key="tvly-YOUR_API_KEY")
    
    # Async search
    result = await client.search("What is machine learning?")
    print(result)
    
    # Async content extraction
    content = await client.extract(["https://example.com/article"])
    print(content)

# Run the async function
asyncio.run(basic_async_search())

Concurrent Operations

Leverage async for concurrent API calls to improve performance:

import asyncio
from tavily import AsyncTavilyClient

async def concurrent_searches():
    client = AsyncTavilyClient(api_key="tvly-YOUR_API_KEY")
    
    queries = [
        "artificial intelligence trends",
        "climate change solutions", 
        "quantum computing advances",
        "renewable energy technology"
    ]
    
    # Execute searches concurrently
    tasks = [client.search(query) for query in queries]
    results = await asyncio.gather(*tasks)
    
    # Process results
    for i, result in enumerate(results):
        print(f"Query: {queries[i]}")
        print(f"Results: {len(result.get('results', []))}")
        print("---")

asyncio.run(concurrent_searches())

Advanced Async Patterns

Batch Processing with Rate Limiting

Process large batches of requests with controlled concurrency:

import asyncio
from tavily import AsyncTavilyClient

async def batch_extract_with_limit():
    client = AsyncTavilyClient(api_key="tvly-YOUR_API_KEY")
    
    urls = [
        "https://example.com/page1",
        "https://example.com/page2", 
        "https://example.com/page3",
        # ... many more URLs
    ]
    
    # Limit concurrent requests to avoid overwhelming the API
    semaphore = asyncio.Semaphore(5)  # Max 5 concurrent requests
    
    async def extract_with_limit(url):
        async with semaphore:
            try:
                return await client.extract(url)
            except Exception as e:
                return {"error": str(e), "url": url}
    
    # Process URLs in batches
    tasks = [extract_with_limit(url) for url in urls]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Process results
    successful = [r for r in results if not isinstance(r, Exception) and "error" not in r]
    failed = [r for r in results if isinstance(r, Exception) or "error" in r]
    
    print(f"Successful extractions: {len(successful)}")
    print(f"Failed extractions: {len(failed)}")

asyncio.run(batch_extract_with_limit())

Pipeline Processing

Create processing pipelines with async operations:

import asyncio
from tavily import AsyncTavilyClient

async def search_extract_pipeline():
    client = AsyncTavilyClient(api_key="tvly-YOUR_API_KEY")
    
    # Stage 1: Search for relevant URLs
    search_result = await client.search(
        query="best practices for API design",
        max_results=10,
        search_depth="advanced"
    )
    
    # Stage 2: Extract URLs from search results
    urls = [result['url'] for result in search_result.get('results', [])]
    
    # Stage 3: Extract content from URLs concurrently
    extraction_tasks = [client.extract(url, format="markdown") for url in urls[:5]]
    extraction_results = await asyncio.gather(*extraction_tasks, return_exceptions=True)
    
    # Stage 4: Process extracted content
    processed_content = []
    for result in extraction_results:
        if isinstance(result, dict) and 'results' in result:
            for content in result['results']:
                processed_content.append({
                    'url': content['url'],
                    'title': content.get('title', 'No title'),
                    'content_length': len(content.get('content', ''))
                })
    
    return processed_content

# Run pipeline
content_summary = asyncio.run(search_extract_pipeline())
for item in content_summary:
    print(f"Title: {item['title']}")
    print(f"URL: {item['url']}")
    print(f"Content Length: {item['content_length']} characters")
    print("---")

Framework Integration

FastAPI Integration

Integrate AsyncTavilyClient with FastAPI applications:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from tavily import AsyncTavilyClient

app = FastAPI()
client = AsyncTavilyClient(api_key="tvly-YOUR_API_KEY")

class SearchRequest(BaseModel):
    query: str
    max_results: int = 5

class ExtractRequest(BaseModel):
    urls: list[str]
    format: str = "markdown"

@app.post("/search")
async def search_endpoint(request: SearchRequest):
    try:
        result = await client.search(
            query=request.query,
            max_results=request.max_results
        )
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/extract")
async def extract_endpoint(request: ExtractRequest):
    try:
        result = await client.extract(
            urls=request.urls,
            format=request.format
        )
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/company/{company_name}")
async def company_info_endpoint(company_name: str):
    try:
        result = await client.get_company_info(company_name)
        return result
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

aiohttp Integration

Use with aiohttp web applications:

from aiohttp import web
import json
from tavily import AsyncTavilyClient

async def init_app():
    app = web.Application()
    app['tavily_client'] = AsyncTavilyClient(api_key="tvly-YOUR_API_KEY")
    
    app.router.add_post('/search', search_handler)
    app.router.add_post('/extract', extract_handler)
    
    return app

async def search_handler(request):
    client = request.app['tavily_client']
    data = await request.json()
    
    try:
        result = await client.search(data['query'])
        return web.json_response(result)
    except Exception as e:
        return web.json_response(
            {'error': str(e)}, 
            status=500
        )

async def extract_handler(request):
    client = request.app['tavily_client']
    data = await request.json()
    
    try:
        result = await client.extract(data['urls'])
        return web.json_response(result)
    except Exception as e:
        return web.json_response(
            {'error': str(e)}, 
            status=500
        )

if __name__ == '__main__':
    web.run_app(init_app(), host='localhost', port=8080)

Error Handling in Async Code

Proper async error handling patterns:

import asyncio
from tavily import AsyncTavilyClient, InvalidAPIKeyError, UsageLimitExceededError, TimeoutError

async def robust_async_operations():
    client = AsyncTavilyClient(api_key="tvly-YOUR_API_KEY")
    
    # Handle individual operation errors
    try:
        result = await client.search("test query", timeout=30)
        print("Search successful")
    except TimeoutError:
        print("Search timed out")
    except UsageLimitExceededError:
        print("API usage limit exceeded")
    except InvalidAPIKeyError:
        print("Invalid API key")
    except Exception as e:
        print(f"Unexpected error: {e}")
    
    # Handle concurrent operation errors
    queries = ["query1", "query2", "query3"]
    tasks = [client.search(query) for query in queries]
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Query {i+1} failed: {result}")
        else:
            print(f"Query {i+1} succeeded: {len(result.get('results', []))} results")

asyncio.run(robust_async_operations())

Performance Optimization

Connection Pooling

The AsyncTavilyClient automatically manages HTTP connections efficiently:

# The client handles connection pooling internally
client = AsyncTavilyClient(api_key="tvly-YOUR_API_KEY")

# Multiple concurrent requests reuse connections
async def optimized_requests():
    tasks = []
    for i in range(10):
        task = client.search(f"query {i}")
        tasks.append(task)
    
    # All requests share connection pool
    results = await asyncio.gather(*tasks)
    return results

Memory Management

Handle large-scale async operations efficiently:

async def memory_efficient_processing():
    client = AsyncTavilyClient(api_key="tvly-YOUR_API_KEY")
    
    # Process in chunks to manage memory
    urls = ["url1", "url2", "url3"]  # Imagine many URLs
    chunk_size = 10
    
    all_results = []
    for i in range(0, len(urls), chunk_size):
        chunk = urls[i:i + chunk_size]
        chunk_tasks = [client.extract(url) for url in chunk]
        chunk_results = await asyncio.gather(*chunk_tasks, return_exceptions=True)
        all_results.extend(chunk_results)
        
        # Optional: add small delay between chunks
        await asyncio.sleep(0.1)
    
    return all_results

Context Managers and Cleanup

Use async context managers for proper resource cleanup:

from contextlib import asynccontextmanager
from tavily import AsyncTavilyClient

@asynccontextmanager
async def tavily_client_context(api_key):
    client = AsyncTavilyClient(api_key=api_key)
    try:
        yield client
    finally:
        # Cleanup if needed (client handles this automatically)
        pass

async def main():
    async with tavily_client_context("tvly-YOUR_API_KEY") as client:
        result = await client.search("example query")
        print(result)

asyncio.run(main())

Install with Tessl CLI

npx tessl i tessl/pypi-tavily-python

docs

async.md

content.md

hybrid-rag.md

index.md

mapping.md

search.md

tile.json