Asynchronous client for interacting with the LangSmith API. The AsyncClient provides async/await versions of most Client operations, enabling high-performance concurrent operations for applications using asyncio.
class AsyncClient:
def __init__(
self,
api_url: Optional[str] = None,
api_key: Optional[str] = None,
timeout_ms: Optional[Union[int, tuple[Optional[int], Optional[int], Optional[int], Optional[int]]]] = None,
retry_config: Optional[Mapping[str, Any]] = None,
web_url: Optional[str] = None,
cache: Union[AsyncCache, bool] = False,
):
"""
Create an asynchronous LangSmith client.
Parameters:
- api_url: URL of the LangSmith API (defaults to LANGSMITH_ENDPOINT env var)
- api_key: API key for authentication (defaults to LANGSMITH_API_KEY env var)
- timeout_ms: Timeout in milliseconds, or tuple of timeouts
- retry_config: Configuration for retry behavior
- web_url: URL of the LangSmith web UI
- cache: AsyncCache instance or True for default cache
"""
async def __aenter__(self) -> "AsyncClient":
"""
Enter async context manager.
Returns:
Self
"""
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""
Exit async context manager and cleanup resources.
"""
async def aclose(self) -> None:
"""
Close the client and cleanup resources.
"""from langsmith import AsyncClient
async def main():
async with AsyncClient() as client:
# Create a run
await client.create_run(
name="my-run",
run_type="chain",
inputs={"query": "test"}
)
# Read the run
runs = [run async for run in client.list_runs(project_name="my-project")]
# Alternative without context manager
client = AsyncClient()
try:
await client.create_run(...)
finally:
await client.aclose()Asynchronous operations for managing runs (trace spans).
async def create_run(
self,
name: str,
run_type: str,
inputs: dict,
*,
execution_order: Optional[int] = None,
child_runs: Optional[Sequence[Union[dict, RunTree]]] = None,
parent_run_id: Optional[Union[str, UUID]] = None,
project_name: Optional[str] = None,
revision_id: Optional[Union[str, UUID]] = None,
outputs: Optional[dict] = None,
error: Optional[str] = None,
reference_example_id: Optional[Union[str, UUID]] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
tags: Optional[list[str]] = None,
extra: Optional[dict] = None,
trace_id: Optional[Union[str, UUID]] = None,
dotted_order: Optional[str] = None,
run_id: Optional[Union[str, UUID]] = None,
**kwargs
) -> None:
"""
Asynchronously create a new run.
Parameters:
- name: Name of the run
- run_type: Type of run (e.g., "chain", "llm", "tool")
- inputs: Input data for the run
- execution_order: Order of execution in parent
- child_runs: Child runs to include
- parent_run_id: ID of parent run
- project_name: Project/session to log to
- revision_id: Revision/version identifier
- outputs: Output data
- error: Error message if run failed
- reference_example_id: Dataset example ID for evaluation
- start_time: When the run started
- end_time: When the run ended
- tags: List of tags
- extra: Extra metadata
- trace_id: Root trace ID
- dotted_order: Dotted order string
- run_id: Preset run ID
"""
async def update_run(
self,
run_id: Union[str, UUID],
*,
end_time: Optional[datetime] = None,
error: Optional[str] = None,
outputs: Optional[dict] = None,
events: Optional[Sequence[dict]] = None,
tags: Optional[list[str]] = None,
extra: Optional[dict] = None,
input_attachments: Optional[dict[str, Attachment]] = None,
output_attachments: Optional[dict[str, Attachment]] = None,
**kwargs
) -> None:
"""
Asynchronously update an existing run.
Parameters:
- run_id: ID of the run to update
- end_time: When the run ended
- error: Error message if run failed
- outputs: Output data
- events: List of events
- tags: Tags to add
- extra: Extra metadata
- input_attachments: Input file attachments
- output_attachments: Output file attachments
"""
async def read_run(
self,
run_id: Union[str, UUID]
) -> Run:
"""
Asynchronously read a run by ID.
Parameters:
- run_id: ID of the run
Returns:
Run object
"""
async def list_runs(
self,
*,
project_name: Optional[str] = None,
project_id: Optional[Union[str, UUID]] = None,
run_type: Optional[str] = None,
dataset_name: Optional[str] = None,
dataset_id: Optional[Union[str, UUID]] = None,
reference_example_id: Optional[Union[str, UUID]] = None,
trace_id: Optional[Union[str, UUID]] = None,
parent_run_id: Optional[Union[str, UUID]] = None,
start_time: Optional[datetime] = None,
error: Optional[bool] = None,
run_ids: Optional[list[Union[str, UUID]]] = None,
filter: Optional[str] = None,
trace_filter: Optional[str] = None,
is_root: Optional[bool] = None,
select: Optional[list[str]] = None,
limit: Optional[int] = None,
**kwargs
) -> AsyncIterator[Run]:
"""
Asynchronously list runs with filtering.
Parameters:
- project_name: Filter by project name
- project_id: Filter by project ID
- run_type: Filter by run type
- dataset_name: Filter by associated dataset
- dataset_id: Filter by dataset ID
- reference_example_id: Filter by example ID
- trace_id: Filter by trace ID
- parent_run_id: Filter by parent run
- start_time: Filter runs after this time
- error: Filter by error status
- run_ids: Filter by specific run IDs
- filter: Advanced filter string
- trace_filter: Filter for root trace properties
- is_root: Filter for root runs only
- select: Fields to include in response
- limit: Maximum number of runs to return
Returns:
AsyncIterator of Run objects
"""
async def share_run(
self,
run_id: Union[str, UUID],
*,
share_id: Optional[Union[str, UUID]] = None
) -> str:
"""
Asynchronously share a run.
Parameters:
- run_id: ID of the run to share
- share_id: Custom share ID
Returns:
Shareable URL
"""
async def run_is_shared(
self,
run_id: Union[str, UUID]
) -> bool:
"""
Asynchronously check if a run is shared.
Parameters:
- run_id: ID of the run
Returns:
True if shared, False otherwise
"""
async def read_run_shared_link(
self,
run_id: Union[str, UUID]
) -> Optional[str]:
"""
Asynchronously get the shared link for a run.
Parameters:
- run_id: ID of the run
Returns:
Shared URL or None if not shared
"""
async def unshare_run(
self,
run_id: Union[str, UUID]
) -> None:
"""
Asynchronously unshare a run.
Parameters:
- run_id: ID of the run to unshare
"""Asynchronous operations for managing projects.
async def create_project(
self,
project_name: str,
*,
description: Optional[str] = None,
metadata: Optional[dict] = None,
tags: Optional[list[str]] = None,
reference_dataset_id: Optional[Union[str, UUID]] = None,
) -> TracerSession:
"""
Asynchronously create a new project.
Parameters:
- project_name: Name of the project
- description: Project description
- metadata: Project metadata
- tags: Project tags
- reference_dataset_id: Associated dataset ID
Returns:
Created TracerSession object
"""
async def read_project(
self,
project_name: Optional[str] = None,
project_id: Optional[Union[str, UUID]] = None
) -> TracerSession:
"""
Asynchronously read a project by name or ID.
Parameters:
- project_name: Name of the project
- project_id: ID of the project (alternative to name)
Returns:
TracerSession object
"""
async def list_projects(
self,
*,
project_ids: Optional[list[Union[str, UUID]]] = None,
name: Optional[str] = None,
name_contains: Optional[str] = None,
reference_dataset_id: Optional[Union[str, UUID]] = None,
reference_dataset_name: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> AsyncIterator[TracerSession]:
"""
Asynchronously list all projects.
Parameters:
- project_ids: Filter by specific project IDs
- name: Filter by exact name
- name_contains: Filter by name substring
- reference_dataset_id: Filter by reference dataset ID
- reference_dataset_name: Filter by reference dataset name
- limit: Maximum number of projects
- offset: Number of projects to skip
Returns:
AsyncIterator of TracerSession objects
"""
async def delete_project(
self,
project_name: Optional[str] = None,
project_id: Optional[Union[str, UUID]] = None
) -> None:
"""
Asynchronously delete a project.
Parameters:
- project_name: Name of the project
- project_id: ID of the project (alternative to name)
"""
async def update_project(
self,
project_id: Union[str, UUID],
*,
name: Optional[str] = None,
description: Optional[str] = None,
metadata: Optional[dict] = None,
tags: Optional[list[str]] = None,
end_time: Optional[datetime] = None,
) -> TracerSession:
"""
Asynchronously update a project.
Parameters:
- project_id: ID of the project
- name: New name
- description: New description
- metadata: New metadata
- tags: New tags
- end_time: Mark project as ended
Returns:
Updated TracerSession object
"""Asynchronous operations for managing datasets.
async def create_dataset(
self,
dataset_name: str,
*,
description: Optional[str] = None,
data_type: Optional[DataType] = None,
inputs_schema: Optional[dict] = None,
outputs_schema: Optional[dict] = None,
metadata: Optional[dict] = None,
) -> Dataset:
"""
Asynchronously create a new dataset.
Parameters:
- dataset_name: Name of the dataset
- description: Dataset description
- data_type: Type of data (e.g., "kv", "llm", "chat")
- inputs_schema: JSON schema for inputs
- outputs_schema: JSON schema for outputs
- metadata: Dataset metadata
Returns:
Created Dataset object
"""
async def read_dataset(
self,
dataset_name: Optional[str] = None,
dataset_id: Optional[Union[str, UUID]] = None
) -> Dataset:
"""
Asynchronously read a dataset by name or ID.
Parameters:
- dataset_name: Name of the dataset
- dataset_id: ID of the dataset (alternative to name)
Returns:
Dataset object
"""
async def list_datasets(
self,
*,
dataset_ids: Optional[list[Union[str, UUID]]] = None,
dataset_name: Optional[str] = None,
dataset_name_contains: Optional[str] = None,
data_type: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> AsyncIterator[Dataset]:
"""
Asynchronously list all datasets.
Parameters:
- dataset_ids: Filter by specific dataset IDs
- dataset_name: Filter by exact name
- dataset_name_contains: Filter by name substring
- data_type: Filter by data type
- limit: Maximum number of datasets
- offset: Number of datasets to skip
Returns:
AsyncIterator of Dataset objects
"""
async def delete_dataset(
self,
dataset_id: Union[str, UUID]
) -> None:
"""
Asynchronously delete a dataset.
Parameters:
- dataset_id: ID of the dataset
"""Asynchronous operations for managing examples.
async def create_example(
self,
inputs: dict,
*,
dataset_id: Optional[Union[str, UUID]] = None,
dataset_name: Optional[str] = None,
outputs: Optional[dict] = None,
metadata: Optional[dict] = None,
split: Optional[Union[str, list[str]]] = None,
example_id: Optional[Union[str, UUID]] = None,
created_at: Optional[datetime] = None,
) -> Example:
"""
Asynchronously create an example.
Parameters:
- inputs: Input data
- dataset_id: ID of the dataset
- dataset_name: Name of the dataset (alternative to ID)
- outputs: Expected output data
- metadata: Example metadata
- split: Dataset split(s)
- example_id: Preset example ID
- created_at: Creation timestamp
Returns:
Created Example object
"""
async def read_example(
self,
example_id: Union[str, UUID]
) -> Example:
"""
Asynchronously read an example by ID.
Parameters:
- example_id: ID of the example
Returns:
Example object
"""
async def list_examples(
self,
*,
dataset_id: Optional[Union[str, UUID]] = None,
dataset_name: Optional[str] = None,
example_ids: Optional[list[Union[str, UUID]]] = None,
splits: Optional[list[str]] = None,
metadata: Optional[dict] = None,
filter: Optional[str] = None,
offset: Optional[int] = None,
limit: Optional[int] = None,
) -> AsyncIterator[Example]:
"""
Asynchronously list examples with filtering.
Parameters:
- dataset_id: Filter by dataset ID
- dataset_name: Filter by dataset name
- example_ids: Filter by specific example IDs
- splits: Filter by splits
- metadata: Filter by metadata
- filter: Advanced filter string
- offset: Number of examples to skip
- limit: Maximum number of examples
Returns:
AsyncIterator of Example objects
"""
async def update_example(
self,
example_id: Union[str, UUID],
*,
inputs: Optional[dict] = None,
outputs: Optional[dict] = None,
metadata: Optional[dict] = None,
split: Optional[Union[str, list[str]]] = None,
) -> Example:
"""
Asynchronously update an example.
Parameters:
- example_id: ID of the example
- inputs: New input data
- outputs: New output data
- metadata: New metadata
- split: New split assignment
Returns:
Updated Example object
"""
async def delete_example(
self,
example_id: Union[str, UUID]
) -> None:
"""
Asynchronously delete an example.
Parameters:
- example_id: ID of the example
"""Asynchronous operations for managing feedback.
async def create_feedback(
self,
run_id: Union[str, UUID],
key: str,
*,
score: Optional[Union[int, float, bool]] = None,
value: Optional[Union[int, float, bool, str, dict, list]] = None,
correction: Optional[dict] = None,
comment: Optional[str] = None,
source_info: Optional[dict] = None,
feedback_source_type: Optional[FeedbackSourceType] = None,
source_run_id: Optional[Union[str, UUID]] = None,
feedback_id: Optional[Union[str, UUID]] = None,
**kwargs
) -> Feedback:
"""
Asynchronously create feedback for a run.
Parameters:
- run_id: ID of the run to give feedback on
- key: Feedback key/metric name
- score: Numeric score
- value: Non-numeric value
- correction: Correction data
- comment: Text comment
- source_info: Information about feedback source
- feedback_source_type: Type of feedback source
- source_run_id: ID of evaluator run
- feedback_id: Preset feedback ID
Returns:
Created Feedback object
"""
async def read_feedback(
self,
feedback_id: Union[str, UUID]
) -> Feedback:
"""
Asynchronously read feedback by ID.
Parameters:
- feedback_id: ID of the feedback
Returns:
Feedback object
"""
async def list_feedback(
self,
*,
run_ids: Optional[list[Union[str, UUID]]] = None,
feedback_keys: Optional[list[str]] = None,
feedback_source_types: Optional[list[FeedbackSourceType]] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> AsyncIterator[Feedback]:
"""
Asynchronously list feedback with filtering.
Parameters:
- run_ids: Filter by run IDs
- feedback_keys: Filter by feedback keys
- feedback_source_types: Filter by source types
- limit: Maximum number of feedback items
- offset: Number of feedback items to skip
Returns:
AsyncIterator of Feedback objects
"""
async def delete_feedback(
self,
feedback_id: Union[str, UUID]
) -> None:
"""
Asynchronously delete feedback.
Parameters:
- feedback_id: ID of the feedback
"""
async def create_presigned_feedback_token(
self,
run_id: Union[str, UUID],
feedback_key: str,
*,
expiration: Optional[datetime] = None,
feedback_config: Optional[FeedbackConfig] = None,
) -> FeedbackIngestToken:
"""
Asynchronously create a presigned feedback token.
Parameters:
- run_id: ID of the run
- feedback_key: Feedback key/metric name
- expiration: When the token expires
- feedback_config: Feedback configuration
Returns:
FeedbackIngestToken object
"""
async def create_feedback_from_token(
self,
token: Union[str, UUID],
*,
score: Optional[Union[int, float, bool]] = None,
value: Optional[Union[int, float, bool, str, dict, list]] = None,
correction: Optional[dict] = None,
comment: Optional[str] = None,
) -> None:
"""
Asynchronously create feedback using a presigned token.
Parameters:
- token: Presigned feedback token
- score: Numeric score
- value: Non-numeric value
- correction: Correction data
- comment: Text comment
"""Asynchronous operations for managing annotation queues.
async def list_annotation_queues(
self,
*,
queue_ids: Optional[list[Union[str, UUID]]] = None,
name: Optional[str] = None,
name_contains: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> AsyncIterator[AnnotationQueue]:
"""
Asynchronously list annotation queues.
Parameters:
- queue_ids: Filter by queue IDs
- name: Filter by exact name
- name_contains: Filter by name substring
- limit: Maximum number of queues
- offset: Number of queues to skip
Returns:
AsyncIterator of AnnotationQueue objects
"""
async def create_annotation_queue(
self,
*,
name: str,
description: Optional[str] = None,
queue_id: Optional[Union[str, UUID]] = None,
) -> AnnotationQueue:
"""
Asynchronously create an annotation queue.
Parameters:
- name: Name of the queue
- description: Queue description
- queue_id: Preset queue ID
Returns:
Created AnnotationQueue object
"""
async def read_annotation_queue(
self,
queue_id: Union[str, UUID],
*,
queue_name: Optional[str] = None,
) -> AnnotationQueue:
"""
Asynchronously read an annotation queue.
Parameters:
- queue_id: ID of the queue
- queue_name: Name of the queue (alternative)
Returns:
AnnotationQueue object
"""
async def update_annotation_queue(
self,
queue_id: Union[str, UUID],
*,
name: Optional[str] = None,
description: Optional[str] = None,
) -> AnnotationQueue:
"""
Asynchronously update an annotation queue.
Parameters:
- queue_id: ID of the queue
- name: New name
- description: New description
Returns:
Updated AnnotationQueue object
"""
async def delete_annotation_queue(
self,
queue_id: Union[str, UUID]
) -> None:
"""
Asynchronously delete an annotation queue.
Parameters:
- queue_id: ID of the queue
"""
async def add_runs_to_annotation_queue(
self,
queue_id: Union[str, UUID],
run_ids: list[Union[str, UUID]]
) -> None:
"""
Asynchronously add runs to an annotation queue.
Parameters:
- queue_id: ID of the queue
- run_ids: List of run IDs to add
"""
async def delete_run_from_annotation_queue(
self,
queue_id: Union[str, UUID],
queue_run_id: Union[str, UUID]
) -> None:
"""
Asynchronously remove run from annotation queue.
Parameters:
- queue_id: ID of the queue
- queue_run_id: Queue run ID to remove
"""
async def get_run_from_annotation_queue(
self,
queue_id: Union[str, UUID],
*,
offset: int = 0,
) -> RunWithAnnotationQueueInfo:
"""
Asynchronously get the next run from an annotation queue.
Parameters:
- queue_id: ID of the queue
- offset: Offset in the queue
Returns:
RunWithAnnotationQueueInfo object
"""Asynchronous operations for dataset similarity search.
async def index_dataset(
self,
dataset_id: Union[str, UUID],
*,
tag: Optional[str] = None,
) -> None:
"""
Asynchronously index a dataset for similarity search.
Parameters:
- dataset_id: ID of the dataset
- tag: Version tag to index
"""
async def sync_indexed_dataset(
self,
dataset_id: Union[str, UUID],
*,
tag: Optional[str] = None,
) -> None:
"""
Asynchronously sync indexed dataset with latest changes.
Parameters:
- dataset_id: ID of the dataset
- tag: Version tag to sync
"""
async def similar_examples(
self,
inputs: dict,
*,
dataset_id: Union[str, UUID],
limit: int = 5,
filter: Optional[str] = None,
) -> list[Example]:
"""
Asynchronously find similar examples in a dataset.
Parameters:
- inputs: Input to find similar examples for
- dataset_id: ID of the dataset
- limit: Maximum number of similar examples
- filter: Filter string for examples
Returns:
List of similar Example objects
"""Asynchronous operations for managing prompts.
async def list_prompts(
self,
*,
limit: Optional[int] = None,
offset: Optional[int] = None,
query: Optional[str] = None,
is_public: Optional[bool] = None,
is_archived: Optional[bool] = None,
sort_field: Optional[str] = None,
sort_direction: Optional[str] = None,
) -> list[Prompt]:
"""
Asynchronously list available prompts.
Parameters:
- limit: Maximum number of prompts
- offset: Number of prompts to skip
- query: Search query
- is_public: Filter by public status
- is_archived: Filter by archived status
- sort_field: Field to sort by
- sort_direction: Sort direction
Returns:
List of Prompt objects
"""
async def get_prompt(
self,
prompt_identifier: str
) -> Prompt:
"""
Asynchronously get a specific prompt.
Parameters:
- prompt_identifier: Prompt name or identifier
Returns:
Prompt object
"""
async def create_prompt(
self,
prompt_name: str,
*,
description: Optional[str] = None,
readme: Optional[str] = None,
tags: Optional[list[str]] = None,
is_public: Optional[bool] = None,
) -> Prompt:
"""
Asynchronously create a new prompt.
Parameters:
- prompt_name: Name of the prompt
- description: Prompt description
- readme: Markdown readme
- tags: Prompt tags
- is_public: Whether prompt is public
Returns:
Created Prompt object
"""
async def create_commit(
self,
prompt_identifier: str,
*,
object: Any,
parent_commit: Optional[str] = None,
) -> dict:
"""
Asynchronously create a prompt commit.
Parameters:
- prompt_identifier: Prompt name or identifier
- object: Prompt object to commit
- parent_commit: Parent commit hash
Returns:
Commit info dictionary
"""
async def update_prompt(
self,
prompt_identifier: str,
*,
description: Optional[str] = None,
readme: Optional[str] = None,
tags: Optional[list[str]] = None,
is_public: Optional[bool] = None,
is_archived: Optional[bool] = None,
) -> Prompt:
"""
Asynchronously update a prompt.
Parameters:
- prompt_identifier: Prompt name or identifier
- description: New description
- readme: New readme
- tags: New tags
- is_public: New public status
- is_archived: New archived status
Returns:
Updated Prompt object
"""
async def delete_prompt(
self,
prompt_identifier: str
) -> None:
"""
Asynchronously delete a prompt.
Parameters:
- prompt_identifier: Prompt name or identifier
"""
async def like_prompt(
self,
prompt_identifier: str
) -> None:
"""
Asynchronously like a prompt.
Parameters:
- prompt_identifier: Prompt name or identifier
"""
async def unlike_prompt(
self,
prompt_identifier: str
) -> None:
"""
Asynchronously unlike a prompt.
Parameters:
- prompt_identifier: Prompt name or identifier
"""
async def pull_prompt_commit(
self,
prompt_identifier: str,
*,
commit_hash: str,
) -> dict:
"""
Asynchronously pull a specific prompt commit.
Parameters:
- prompt_identifier: Prompt name or identifier
- commit_hash: Commit hash to pull
Returns:
Prompt commit data
"""
async def list_prompt_commits(
self,
prompt_identifier: str,
*,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> list[dict]:
"""
Asynchronously list prompt commits.
Parameters:
- prompt_identifier: Prompt name or identifier
- limit: Maximum number of commits
- offset: Number of commits to skip
Returns:
List of commit info dictionaries
"""
async def pull_prompt(
self,
prompt_identifier: str,
*,
include_model: bool = False,
) -> Any:
"""
Asynchronously pull a prompt from the hub.
Parameters:
- prompt_identifier: Prompt name or identifier
- include_model: Whether to include model config
Returns:
Prompt object
"""
async def push_prompt(
self,
prompt_identifier: str,
*,
object: Any,
parent_commit: Optional[str] = None,
is_public: Optional[bool] = None,
description: Optional[str] = None,
readme: Optional[str] = None,
tags: Optional[list[str]] = None,
) -> str:
"""
Asynchronously push a prompt to the hub.
Parameters:
- prompt_identifier: Prompt name or identifier
- object: Prompt object to push
- parent_commit: Parent commit hash
- is_public: Whether prompt is public
- description: Prompt description
- readme: Markdown readme
- tags: Prompt tags
Returns:
Prompt URL
"""