The official Python library for the anthropic API
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
The official Python library for the Anthropic API, providing convenient access to Claude AI models. This library offers both synchronous and asynchronous clients with comprehensive type definitions, streaming responses, message batching, tool use capabilities, and specialized integrations for AWS Bedrock and Google Vertex AI.
pip install anthropicpip install anthropic[aiohttp] for aiohttp support, pip install anthropic[bedrock] for AWS Bedrock, pip install anthropic[vertex] for Google Vertex AIfrom anthropic import Anthropic, AsyncAnthropicFor specialized integrations:
from anthropic import AnthropicBedrock, AsyncAnthropicBedrock
from anthropic import AnthropicVertex, AsyncAnthropicVerteximport os
from anthropic import Anthropic
client = Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
)
message = client.messages.create(
max_tokens=1024,
messages=[
{
"role": "user",
"content": "Hello, Claude",
}
],
model="claude-sonnet-4-20250514",
)
print(message.content)import os
import asyncio
from anthropic import AsyncAnthropic
client = AsyncAnthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
)
async def main():
message = await client.messages.create(
max_tokens=1024,
messages=[
{
"role": "user",
"content": "Hello, Claude",
}
],
model="claude-sonnet-4-20250514",
)
print(message.content)
asyncio.run(main())The SDK follows a resource-based API design pattern:
Anthropic, AsyncAnthropic) that manages authentication and HTTP configurationmessages, completions, models, beta)Core conversational interface for interacting with Claude models, supporting multi-turn conversations, system prompts, tool use, streaming responses, and message batching.
def create(
max_tokens: int,
messages: List[MessageParam],
model: str,
*,
metadata: Optional[MetadataParam] = None,
service_tier: Optional[Literal["auto", "standard_only"]] = None,
stop_sequences: Optional[List[str]] = None,
stream: Optional[bool] = None,
system: Optional[str] = None,
temperature: Optional[float] = None,
thinking: Optional[ThinkingConfigParam] = None,
tool_choice: Optional[ToolChoiceParam] = None,
tools: Optional[List[ToolParam]] = None,
top_k: Optional[int] = None,
top_p: Optional[float] = None,
**kwargs
) -> Message: ...
async def create(
max_tokens: int,
messages: List[MessageParam],
model: str,
*,
metadata: Optional[MetadataParam] = None,
service_tier: Optional[Literal["auto", "standard_only"]] = None,
stop_sequences: Optional[List[str]] = None,
stream: Optional[bool] = None,
system: Optional[str] = None,
temperature: Optional[float] = None,
thinking: Optional[ThinkingConfigParam] = None,
tool_choice: Optional[ToolChoiceParam] = None,
tools: Optional[List[ToolParam]] = None,
top_k: Optional[int] = None,
top_p: Optional[float] = None,
**kwargs
) -> Message: ...
def count_tokens(
messages: List[MessageParam],
model: str,
*,
system: Optional[str] = None,
tool_choice: Optional[ToolChoiceParam] = None,
tools: Optional[List[ToolParam]] = None,
**kwargs
) -> MessageTokensCount: ...Direct text completion interface for generating text from prompts, primarily used for specific use cases requiring the legacy completion format.
def create(
max_tokens_to_sample: int,
model: str,
prompt: str,
*,
metadata: Optional[MetadataParam] = None,
stop_sequences: Optional[List[str]] = None,
stream: Optional[bool] = None,
temperature: Optional[float] = None,
top_k: Optional[int] = None,
top_p: Optional[float] = None,
**kwargs
) -> Completion: ...Real-time message streaming with event handlers for processing partial responses, tool use events, and completion updates as they arrive.
class MessageStream:
def __enter__(self) -> MessageStream: ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
def __iter__(self) -> Iterator[MessageStreamEvent]: ...
def on_text(self, handler: Callable[[TextEvent], None]) -> MessageStream: ...
def on_input_json(self, handler: Callable[[InputJsonEvent], None]) -> MessageStream: ...
def on_message_stop(self, handler: Callable[[MessageStopEvent], None]) -> MessageStream: ...
def get_final_message(self) -> Message: ...
class AsyncMessageStream:
async def __aenter__(self) -> AsyncMessageStream: ...
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
def __aiter__(self) -> AsyncIterator[MessageStreamEvent]: ...
def on_text(self, handler: Callable[[TextEvent], Awaitable[None]]) -> AsyncMessageStream: ...
def on_input_json(self, handler: Callable[[InputJsonEvent], Awaitable[None]]) -> AsyncMessageStream: ...
def on_message_stop(self, handler: Callable[[MessageStopEvent], Awaitable[None]]) -> AsyncMessageStream: ...
async def get_final_message(self) -> Message: ...Access to available Claude models, including model information, capabilities, and metadata for selecting appropriate models for different use cases.
def list(**kwargs) -> List[Model]: ...
async def list(**kwargs) -> List[Model]: ...Integration system for connecting Claude with external functions and APIs, enabling Claude to use tools, call functions, and interact with external systems.
class ToolParam(TypedDict):
name: str
description: str
input_schema: Dict[str, Any]
class ToolChoiceParam(TypedDict, total=False):
type: Literal["auto", "any", "none", "tool"]
name: Optional[str]
class ToolUseBlock(TypedDict):
type: Literal["tool_use"]
id: str
name: str
input: Dict[str, Any]
class ToolResultBlockParam(TypedDict):
type: Literal["tool_result"]
tool_use_id: str
content: Union[str, List[ContentBlockParam]]
is_error: Optional[bool]Efficient processing of multiple message requests in batches, providing cost optimization and throughput improvements for high-volume applications.
def create(
requests: List[Dict[str, Any]],
**kwargs
) -> Any: ...
def retrieve(batch_id: str, **kwargs) -> Any: ...
def list(**kwargs) -> Any: ...
def cancel(batch_id: str, **kwargs) -> Any: ...Experimental and preview features including advanced capabilities, new model features, and cutting-edge functionality.
class Beta:
messages: BetaMessages
models: BetaModels
files: BetaFiles
class AsyncBeta:
messages: AsyncBetaMessages
models: AsyncBetaModels
files: AsyncBetaFilesSpecialized client for accessing Claude models through Amazon Bedrock, with AWS authentication and Bedrock-specific configurations.
class AnthropicBedrock:
def __init__(
self,
*,
aws_access_key: Optional[str] = None,
aws_secret_key: Optional[str] = None,
aws_session_token: Optional[str] = None,
aws_region: Optional[str] = None,
**kwargs
): ...
messages: Messages
completions: Completions
class AsyncAnthropicBedrock:
def __init__(
self,
*,
aws_access_key: Optional[str] = None,
aws_secret_key: Optional[str] = None,
aws_session_token: Optional[str] = None,
aws_region: Optional[str] = None,
**kwargs
): ...
messages: AsyncMessages
completions: AsyncCompletionsSpecialized client for accessing Claude models through Google Cloud Vertex AI, with Google Cloud authentication and Vertex-specific configurations.
class AnthropicVertex:
def __init__(
self,
*,
project_id: Optional[str] = None,
region: Optional[str] = None,
**kwargs
): ...
messages: Messages
completions: Completions
class AsyncAnthropicVertex:
def __init__(
self,
*,
project_id: Optional[str] = None,
region: Optional[str] = None,
**kwargs
): ...
messages: AsyncMessages
completions: AsyncCompletionsComprehensive exception hierarchy for handling API errors, network issues, authentication problems, and service-specific errors.
class AnthropicError(Exception): ...
class APIError(AnthropicError): ...
class APIStatusError(APIError): ...
class APITimeoutError(APIError): ...
class APIConnectionError(APIError): ...
class APIResponseValidationError(APIError): ...
class BadRequestError(APIStatusError): ...
class AuthenticationError(APIStatusError): ...
class PermissionDeniedError(APIStatusError): ...
class NotFoundError(APIStatusError): ...
class ConflictError(APIStatusError): ...
class UnprocessableEntityError(APIStatusError): ...
class RateLimitError(APIStatusError): ...
class InternalServerError(APIStatusError): ...Client configuration options, HTTP settings, timeout management, retry policies, and utility functions for file handling and request customization.
class Anthropic:
def __init__(
self,
*,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
timeout: Optional[Timeout] = None,
max_retries: Optional[int] = None,
default_headers: Optional[Mapping[str, str]] = None,
default_query: Optional[Mapping[str, object]] = None,
http_client: Optional[httpx.Client] = None,
**kwargs
): ...
@property
def with_raw_response(self) -> AnthropicWithRawResponse: ...
@property
def with_streaming_response(self) -> AnthropicWithStreamedResponse: ...
def file_from_path(path: Union[str, Path]) -> Any: ...