The official Python library for the anthropic API
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Client configuration options, HTTP settings, timeout management, retry policies, and utility functions provide fine-grained control over SDK behavior and enable customization for different deployment scenarios.
Comprehensive configuration options for customizing client behavior, HTTP settings, and request handling.
class Anthropic:
def __init__(
self,
*,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
timeout: Optional[Timeout] = None,
max_retries: Optional[int] = None,
default_headers: Optional[Mapping[str, str]] = None,
default_query: Optional[Mapping[str, object]] = None,
http_client: Optional[httpx.Client] = None,
**kwargs
): ...
class AsyncAnthropic:
def __init__(
self,
*,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
timeout: Optional[Timeout] = None,
max_retries: Optional[int] = None,
default_headers: Optional[Mapping[str, str]] = None,
default_query: Optional[Mapping[str, object]] = None,
http_client: Optional[httpx.AsyncClient] = None,
**kwargs
): ...
@property
def with_raw_response(self) -> AnthropicWithRawResponse: ...
@property
def with_streaming_response(self) -> AnthropicWithStreamedResponse: ...Default HTTP client implementations with different backends and customization options.
class DefaultHttpxClient:
"""Default synchronous HTTP client using httpx"""
def __init__(
self,
*,
limits: Optional[httpx.Limits] = None,
timeout: Optional[httpx.Timeout] = None,
proxies: Optional[httpx._types.ProxiesTypes] = None,
**kwargs
): ...
class DefaultAsyncHttpxClient:
"""Default asynchronous HTTP client using httpx"""
def __init__(
self,
*,
limits: Optional[httpx.Limits] = None,
timeout: Optional[httpx.Timeout] = None,
proxies: Optional[httpx._types.ProxiesTypes] = None,
**kwargs
): ...
class DefaultAioHttpClient:
"""Alternative async HTTP client using aiohttp"""
def __init__(
self,
*,
timeout: Optional[aiohttp.ClientTimeout] = None,
connector: Optional[aiohttp.BaseConnector] = None,
**kwargs
): ...Types for configuring individual requests and global client settings.
class RequestOptions(TypedDict, total=False):
headers: Optional[Mapping[str, str]]
max_retries: Optional[int]
timeout: Optional[Timeout]
params: Optional[Mapping[str, object]]
extra_json: Optional[Mapping[str, object]]
class Timeout:
"""Timeout configuration for requests"""
def __init__(
self,
total: Optional[float] = None,
*,
connect: Optional[float] = None,
read: Optional[float] = None,
write: Optional[float] = None,
pool: Optional[float] = None
): ...
class Transport:
"""HTTP transport configuration"""
pass
class ProxiesTypes:
"""Proxy configuration types"""
passHelper functions for file handling and request preparation.
def file_from_path(path: Union[str, Path]) -> Any:
"""Create file object from file path for uploads"""
...Default configuration values and SDK constants.
DEFAULT_TIMEOUT: float
DEFAULT_MAX_RETRIES: int
DEFAULT_CONNECTION_LIMITS: httpx.Limits
HUMAN_PROMPT: str # Legacy prompt marker
AI_PROMPT: str # Legacy prompt markerimport os
from anthropic import Anthropic
# Basic configuration with API key
client = Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY")
)
# Configuration with custom base URL
client = Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
base_url="https://api.anthropic.com/v1" # Custom endpoint
)
# Configuration with timeout settings
client = Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
timeout=30.0 # 30 second timeout
)from anthropic import Anthropic, Timeout
import httpx
# Detailed timeout configuration
timeout_config = Timeout(
total=60.0, # Total request timeout
connect=10.0, # Connection timeout
read=30.0, # Read timeout
write=30.0, # Write timeout
pool=5.0 # Pool timeout
)
client = Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
timeout=timeout_config
)
# Using httpx.Timeout directly
httpx_timeout = httpx.Timeout(
timeout=45.0,
connect=5.0,
read=20.0,
write=20.0,
pool=2.0
)
client = Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
timeout=httpx_timeout
)# Configure retry behavior
client = Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
max_retries=5 # Retry up to 5 times on failure
)
# Disable retries
client = Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
max_retries=0
)
# Per-request retry override
message = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello!"}],
extra_options={"max_retries": 2} # Override global retry setting
)# Add custom headers
client = Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
default_headers={
"User-Agent": "MyApp/1.0",
"X-Custom-Header": "custom-value"
}
)
# Per-request headers
message = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello!"}],
extra_options={
"headers": {
"X-Request-ID": "req-12345",
"X-User-ID": "user-67890"
}
}
)import httpx
from anthropic import Anthropic, DefaultHttpxClient
# Custom HTTP client with connection limits
custom_limits = httpx.Limits(
max_keepalive_connections=10,
max_connections=20,
keepalive_expiry=30.0
)
http_client = DefaultHttpxClient(
limits=custom_limits,
timeout=httpx.Timeout(30.0)
)
client = Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
http_client=http_client
)
# Using custom httpx client directly
custom_httpx_client = httpx.Client(
limits=httpx.Limits(max_connections=50),
timeout=httpx.Timeout(45.0),
headers={"User-Agent": "MyCustomApp/2.0"}
)
client = Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
http_client=custom_httpx_client
)import httpx
from anthropic import Anthropic
# HTTP proxy
proxies = {
"http://": "http://proxy.example.com:8080",
"https://": "https://proxy.example.com:8080"
}
client = Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
http_client=httpx.Client(proxies=proxies)
)
# SOCKS proxy
client = Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
http_client=httpx.Client(
proxies="socks5://proxy.example.com:1080"
)
)
# Proxy with authentication
proxy_with_auth = "http://user:pass@proxy.example.com:8080"
client = Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
http_client=httpx.Client(proxies=proxy_with_auth)
)import asyncio
import httpx
from anthropic import AsyncAnthropic, DefaultAsyncHttpxClient, DefaultAioHttpClient
# Basic async client
async_client = AsyncAnthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY")
)
# Custom async httpx client
custom_async_client = DefaultAsyncHttpxClient(
limits=httpx.Limits(max_connections=25),
timeout=httpx.Timeout(60.0)
)
async_client = AsyncAnthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
http_client=custom_async_client
)
# Using aiohttp backend for better async performance
aiohttp_client = DefaultAioHttpClient(
timeout=aiohttp.ClientTimeout(total=60)
)
async_client = AsyncAnthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
http_client=aiohttp_client
)import os
from anthropic import Anthropic
class AnthropicConfig:
"""Configuration management for Anthropic client"""
def __init__(self):
self.api_key = os.environ.get("ANTHROPIC_API_KEY")
self.base_url = os.environ.get("ANTHROPIC_BASE_URL", "https://api.anthropic.com")
self.timeout = float(os.environ.get("ANTHROPIC_TIMEOUT", "30"))
self.max_retries = int(os.environ.get("ANTHROPIC_MAX_RETRIES", "3"))
self.proxy = os.environ.get("HTTP_PROXY")
def create_client(self) -> Anthropic:
"""Create configured Anthropic client"""
# Build HTTP client with proxy if specified
http_client = None
if self.proxy:
http_client = httpx.Client(proxies=self.proxy)
return Anthropic(
api_key=self.api_key,
base_url=self.base_url,
timeout=self.timeout,
max_retries=self.max_retries,
http_client=http_client
)
# Usage with environment variables
config = AnthropicConfig()
client = config.create_client()from anthropic import file_from_path
from pathlib import Path
# Create file object for upload
image_file = file_from_path("./image.jpg")
document_file = file_from_path(Path("./document.pdf"))
# Use in message with image
message = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": image_file # file_from_path result
}
},
{
"type": "text",
"text": "What's in this image?"
}
]
}
]
)import logging
from anthropic import Anthropic
import httpx
def create_production_client() -> Anthropic:
"""Create a production-ready Anthropic client"""
# Configure logging
logging.basicConfig(level=logging.INFO)
# Production HTTP client settings
limits = httpx.Limits(
max_keepalive_connections=20, # Keep connections alive
max_connections=100, # Allow more concurrent connections
keepalive_expiry=60.0 # Keep connections for 60 seconds
)
timeout = httpx.Timeout(
total=120.0, # Allow longer requests in production
connect=10.0, # Quick connection timeout
read=60.0, # Allow time for long responses
write=30.0, # Reasonable write timeout
pool=5.0 # Quick pool timeout
)
# Create client with production settings
client = Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
max_retries=5, # More retries in production
timeout=timeout,
http_client=httpx.Client(
limits=limits,
timeout=timeout
),
default_headers={
"User-Agent": "MyApp-Production/1.0",
"X-Environment": "production"
}
)
return client
# Usage
production_client = create_production_client()import os
from anthropic import Anthropic
def create_anthropic_client(environment: str = "development") -> Anthropic:
"""Create client configured for different environments"""
base_config = {
"api_key": os.environ.get("ANTHROPIC_API_KEY")
}
if environment == "development":
return Anthropic(
**base_config,
timeout=10.0, # Shorter timeout for dev
max_retries=1, # Fewer retries for faster feedback
default_headers={
"X-Environment": "development",
"X-Debug": "true"
}
)
elif environment == "staging":
return Anthropic(
**base_config,
timeout=30.0,
max_retries=3,
default_headers={
"X-Environment": "staging"
}
)
elif environment == "production":
return Anthropic(
**base_config,
timeout=60.0,
max_retries=5,
default_headers={
"X-Environment": "production",
"User-Agent": "MyApp/1.0"
}
)
else:
raise ValueError(f"Unknown environment: {environment}")
# Usage
env = os.environ.get("APP_ENV", "development")
client = create_anthropic_client(env)import httpx
from anthropic import Anthropic
# Shared HTTP client for connection pooling across multiple Anthropic instances
shared_http_client = httpx.Client(
limits=httpx.Limits(
max_keepalive_connections=50,
max_connections=100
),
timeout=httpx.Timeout(30.0)
)
# Multiple clients sharing the same connection pool
client1 = Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY_1"),
http_client=shared_http_client
)
client2 = Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY_2"),
http_client=shared_http_client
)
# Remember to close the shared client when done
try:
# Use clients...
response1 = client1.messages.create(...)
response2 = client2.messages.create(...)
finally:
shared_http_client.close()Access raw HTTP responses with headers and status codes for advanced debugging and response processing.
from anthropic import Anthropic
client = Anthropic()
# Get raw response with headers and status code
raw_response = client.with_raw_response.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello!"}]
)
# Access response details
print(f"Status Code: {raw_response.status_code}")
print(f"Headers: {raw_response.headers}")
print(f"Content: {raw_response.content}")
# Parse the response content
message = raw_response.parse() # Returns normal Message objectAccess streaming responses with raw response details for advanced stream processing.
# Streaming with raw response access
with client.with_streaming_response.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a story"}],
stream=True
) as stream_response:
# Access stream metadata
print(f"Status: {stream_response.status_code}")
print(f"Headers: {stream_response.headers}")
# Process streaming content
for chunk in stream_response.iter_lines():
if chunk:
print(chunk)
# Async streaming with raw response access
async with client.with_streaming_response.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a story"}],
stream=True
) as async_stream_response:
async for chunk in async_stream_response.iter_lines():
if chunk:
print(chunk)