or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-client.mdcaching.mdclient.mdevaluation.mdindex.mdrun-management.mdschemas.mdtesting.mdtracing.mdutilities.md
README.mdtile.json

async-client.mddocs/

Async Client API

Asynchronous client for interacting with the LangSmith API. The AsyncClient provides async/await versions of most Client operations, enabling high-performance concurrent operations for applications using asyncio.

AsyncClient Class

class AsyncClient:
    def __init__(
        self,
        api_url: Optional[str] = None,
        api_key: Optional[str] = None,
        timeout_ms: Optional[Union[int, tuple[Optional[int], Optional[int], Optional[int], Optional[int]]]] = None,
        retry_config: Optional[Mapping[str, Any]] = None,
        web_url: Optional[str] = None,
        cache: Union[AsyncCache, bool] = False,
    ):
        """
        Create an asynchronous LangSmith client.

        Parameters:
        - api_url: URL of the LangSmith API (defaults to LANGSMITH_ENDPOINT env var)
        - api_key: API key for authentication (defaults to LANGSMITH_API_KEY env var)
        - timeout_ms: Timeout in milliseconds, or tuple of timeouts
        - retry_config: Configuration for retry behavior
        - web_url: URL of the LangSmith web UI
        - cache: AsyncCache instance or True for default cache
        """

    async def __aenter__(self) -> "AsyncClient":
        """
        Enter async context manager.

        Returns:
        Self
        """

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """
        Exit async context manager and cleanup resources.
        """

    async def aclose(self) -> None:
        """
        Close the client and cleanup resources.
        """

Usage Example

from langsmith import AsyncClient

async def main():
    async with AsyncClient() as client:
        # Create a run
        await client.create_run(
            name="my-run",
            run_type="chain",
            inputs={"query": "test"}
        )

        # Read the run
        runs = [run async for run in client.list_runs(project_name="my-project")]

# Alternative without context manager
client = AsyncClient()
try:
    await client.create_run(...)
finally:
    await client.aclose()

Run Operations

Asynchronous operations for managing runs (trace spans).

async def create_run(
    self,
    name: str,
    run_type: str,
    inputs: dict,
    *,
    execution_order: Optional[int] = None,
    child_runs: Optional[Sequence[Union[dict, RunTree]]] = None,
    parent_run_id: Optional[Union[str, UUID]] = None,
    project_name: Optional[str] = None,
    revision_id: Optional[Union[str, UUID]] = None,
    outputs: Optional[dict] = None,
    error: Optional[str] = None,
    reference_example_id: Optional[Union[str, UUID]] = None,
    start_time: Optional[datetime] = None,
    end_time: Optional[datetime] = None,
    tags: Optional[list[str]] = None,
    extra: Optional[dict] = None,
    trace_id: Optional[Union[str, UUID]] = None,
    dotted_order: Optional[str] = None,
    run_id: Optional[Union[str, UUID]] = None,
    **kwargs
) -> None:
    """
    Asynchronously create a new run.

    Parameters:
    - name: Name of the run
    - run_type: Type of run (e.g., "chain", "llm", "tool")
    - inputs: Input data for the run
    - execution_order: Order of execution in parent
    - child_runs: Child runs to include
    - parent_run_id: ID of parent run
    - project_name: Project/session to log to
    - revision_id: Revision/version identifier
    - outputs: Output data
    - error: Error message if run failed
    - reference_example_id: Dataset example ID for evaluation
    - start_time: When the run started
    - end_time: When the run ended
    - tags: List of tags
    - extra: Extra metadata
    - trace_id: Root trace ID
    - dotted_order: Dotted order string
    - run_id: Preset run ID
    """

async def update_run(
    self,
    run_id: Union[str, UUID],
    *,
    end_time: Optional[datetime] = None,
    error: Optional[str] = None,
    outputs: Optional[dict] = None,
    events: Optional[Sequence[dict]] = None,
    tags: Optional[list[str]] = None,
    extra: Optional[dict] = None,
    input_attachments: Optional[dict[str, Attachment]] = None,
    output_attachments: Optional[dict[str, Attachment]] = None,
    **kwargs
) -> None:
    """
    Asynchronously update an existing run.

    Parameters:
    - run_id: ID of the run to update
    - end_time: When the run ended
    - error: Error message if run failed
    - outputs: Output data
    - events: List of events
    - tags: Tags to add
    - extra: Extra metadata
    - input_attachments: Input file attachments
    - output_attachments: Output file attachments
    """

async def read_run(
    self,
    run_id: Union[str, UUID]
) -> Run:
    """
    Asynchronously read a run by ID.

    Parameters:
    - run_id: ID of the run

    Returns:
    Run object
    """

async def list_runs(
    self,
    *,
    project_name: Optional[str] = None,
    project_id: Optional[Union[str, UUID]] = None,
    run_type: Optional[str] = None,
    dataset_name: Optional[str] = None,
    dataset_id: Optional[Union[str, UUID]] = None,
    reference_example_id: Optional[Union[str, UUID]] = None,
    trace_id: Optional[Union[str, UUID]] = None,
    parent_run_id: Optional[Union[str, UUID]] = None,
    start_time: Optional[datetime] = None,
    error: Optional[bool] = None,
    run_ids: Optional[list[Union[str, UUID]]] = None,
    filter: Optional[str] = None,
    trace_filter: Optional[str] = None,
    is_root: Optional[bool] = None,
    select: Optional[list[str]] = None,
    limit: Optional[int] = None,
    **kwargs
) -> AsyncIterator[Run]:
    """
    Asynchronously list runs with filtering.

    Parameters:
    - project_name: Filter by project name
    - project_id: Filter by project ID
    - run_type: Filter by run type
    - dataset_name: Filter by associated dataset
    - dataset_id: Filter by dataset ID
    - reference_example_id: Filter by example ID
    - trace_id: Filter by trace ID
    - parent_run_id: Filter by parent run
    - start_time: Filter runs after this time
    - error: Filter by error status
    - run_ids: Filter by specific run IDs
    - filter: Advanced filter string
    - trace_filter: Filter for root trace properties
    - is_root: Filter for root runs only
    - select: Fields to include in response
    - limit: Maximum number of runs to return

    Returns:
    AsyncIterator of Run objects
    """

async def share_run(
    self,
    run_id: Union[str, UUID],
    *,
    share_id: Optional[Union[str, UUID]] = None
) -> str:
    """
    Asynchronously share a run.

    Parameters:
    - run_id: ID of the run to share
    - share_id: Custom share ID

    Returns:
    Shareable URL
    """

async def run_is_shared(
    self,
    run_id: Union[str, UUID]
) -> bool:
    """
    Asynchronously check if a run is shared.

    Parameters:
    - run_id: ID of the run

    Returns:
    True if shared, False otherwise
    """

async def read_run_shared_link(
    self,
    run_id: Union[str, UUID]
) -> Optional[str]:
    """
    Asynchronously get the shared link for a run.

    Parameters:
    - run_id: ID of the run

    Returns:
    Shared URL or None if not shared
    """

async def unshare_run(
    self,
    run_id: Union[str, UUID]
) -> None:
    """
    Asynchronously unshare a run.

    Parameters:
    - run_id: ID of the run to unshare
    """

Project Operations

Asynchronous operations for managing projects.

async def create_project(
    self,
    project_name: str,
    *,
    description: Optional[str] = None,
    metadata: Optional[dict] = None,
    tags: Optional[list[str]] = None,
    reference_dataset_id: Optional[Union[str, UUID]] = None,
) -> TracerSession:
    """
    Asynchronously create a new project.

    Parameters:
    - project_name: Name of the project
    - description: Project description
    - metadata: Project metadata
    - tags: Project tags
    - reference_dataset_id: Associated dataset ID

    Returns:
    Created TracerSession object
    """

async def read_project(
    self,
    project_name: Optional[str] = None,
    project_id: Optional[Union[str, UUID]] = None
) -> TracerSession:
    """
    Asynchronously read a project by name or ID.

    Parameters:
    - project_name: Name of the project
    - project_id: ID of the project (alternative to name)

    Returns:
    TracerSession object
    """

async def list_projects(
    self,
    *,
    project_ids: Optional[list[Union[str, UUID]]] = None,
    name: Optional[str] = None,
    name_contains: Optional[str] = None,
    reference_dataset_id: Optional[Union[str, UUID]] = None,
    reference_dataset_name: Optional[str] = None,
    limit: Optional[int] = None,
    offset: Optional[int] = None,
) -> AsyncIterator[TracerSession]:
    """
    Asynchronously list all projects.

    Parameters:
    - project_ids: Filter by specific project IDs
    - name: Filter by exact name
    - name_contains: Filter by name substring
    - reference_dataset_id: Filter by reference dataset ID
    - reference_dataset_name: Filter by reference dataset name
    - limit: Maximum number of projects
    - offset: Number of projects to skip

    Returns:
    AsyncIterator of TracerSession objects
    """

async def delete_project(
    self,
    project_name: Optional[str] = None,
    project_id: Optional[Union[str, UUID]] = None
) -> None:
    """
    Asynchronously delete a project.

    Parameters:
    - project_name: Name of the project
    - project_id: ID of the project (alternative to name)
    """

async def update_project(
    self,
    project_id: Union[str, UUID],
    *,
    name: Optional[str] = None,
    description: Optional[str] = None,
    metadata: Optional[dict] = None,
    tags: Optional[list[str]] = None,
    end_time: Optional[datetime] = None,
) -> TracerSession:
    """
    Asynchronously update a project.

    Parameters:
    - project_id: ID of the project
    - name: New name
    - description: New description
    - metadata: New metadata
    - tags: New tags
    - end_time: Mark project as ended

    Returns:
    Updated TracerSession object
    """

Dataset Operations

Asynchronous operations for managing datasets.

async def create_dataset(
    self,
    dataset_name: str,
    *,
    description: Optional[str] = None,
    data_type: Optional[DataType] = None,
    inputs_schema: Optional[dict] = None,
    outputs_schema: Optional[dict] = None,
    metadata: Optional[dict] = None,
) -> Dataset:
    """
    Asynchronously create a new dataset.

    Parameters:
    - dataset_name: Name of the dataset
    - description: Dataset description
    - data_type: Type of data (e.g., "kv", "llm", "chat")
    - inputs_schema: JSON schema for inputs
    - outputs_schema: JSON schema for outputs
    - metadata: Dataset metadata

    Returns:
    Created Dataset object
    """

async def read_dataset(
    self,
    dataset_name: Optional[str] = None,
    dataset_id: Optional[Union[str, UUID]] = None
) -> Dataset:
    """
    Asynchronously read a dataset by name or ID.

    Parameters:
    - dataset_name: Name of the dataset
    - dataset_id: ID of the dataset (alternative to name)

    Returns:
    Dataset object
    """

async def list_datasets(
    self,
    *,
    dataset_ids: Optional[list[Union[str, UUID]]] = None,
    dataset_name: Optional[str] = None,
    dataset_name_contains: Optional[str] = None,
    data_type: Optional[str] = None,
    limit: Optional[int] = None,
    offset: Optional[int] = None,
) -> AsyncIterator[Dataset]:
    """
    Asynchronously list all datasets.

    Parameters:
    - dataset_ids: Filter by specific dataset IDs
    - dataset_name: Filter by exact name
    - dataset_name_contains: Filter by name substring
    - data_type: Filter by data type
    - limit: Maximum number of datasets
    - offset: Number of datasets to skip

    Returns:
    AsyncIterator of Dataset objects
    """

async def delete_dataset(
    self,
    dataset_id: Union[str, UUID]
) -> None:
    """
    Asynchronously delete a dataset.

    Parameters:
    - dataset_id: ID of the dataset
    """

Example Operations

Asynchronous operations for managing examples.

async def create_example(
    self,
    inputs: dict,
    *,
    dataset_id: Optional[Union[str, UUID]] = None,
    dataset_name: Optional[str] = None,
    outputs: Optional[dict] = None,
    metadata: Optional[dict] = None,
    split: Optional[Union[str, list[str]]] = None,
    example_id: Optional[Union[str, UUID]] = None,
    created_at: Optional[datetime] = None,
) -> Example:
    """
    Asynchronously create an example.

    Parameters:
    - inputs: Input data
    - dataset_id: ID of the dataset
    - dataset_name: Name of the dataset (alternative to ID)
    - outputs: Expected output data
    - metadata: Example metadata
    - split: Dataset split(s)
    - example_id: Preset example ID
    - created_at: Creation timestamp

    Returns:
    Created Example object
    """

async def read_example(
    self,
    example_id: Union[str, UUID]
) -> Example:
    """
    Asynchronously read an example by ID.

    Parameters:
    - example_id: ID of the example

    Returns:
    Example object
    """

async def list_examples(
    self,
    *,
    dataset_id: Optional[Union[str, UUID]] = None,
    dataset_name: Optional[str] = None,
    example_ids: Optional[list[Union[str, UUID]]] = None,
    splits: Optional[list[str]] = None,
    metadata: Optional[dict] = None,
    filter: Optional[str] = None,
    offset: Optional[int] = None,
    limit: Optional[int] = None,
) -> AsyncIterator[Example]:
    """
    Asynchronously list examples with filtering.

    Parameters:
    - dataset_id: Filter by dataset ID
    - dataset_name: Filter by dataset name
    - example_ids: Filter by specific example IDs
    - splits: Filter by splits
    - metadata: Filter by metadata
    - filter: Advanced filter string
    - offset: Number of examples to skip
    - limit: Maximum number of examples

    Returns:
    AsyncIterator of Example objects
    """

async def update_example(
    self,
    example_id: Union[str, UUID],
    *,
    inputs: Optional[dict] = None,
    outputs: Optional[dict] = None,
    metadata: Optional[dict] = None,
    split: Optional[Union[str, list[str]]] = None,
) -> Example:
    """
    Asynchronously update an example.

    Parameters:
    - example_id: ID of the example
    - inputs: New input data
    - outputs: New output data
    - metadata: New metadata
    - split: New split assignment

    Returns:
    Updated Example object
    """

async def delete_example(
    self,
    example_id: Union[str, UUID]
) -> None:
    """
    Asynchronously delete an example.

    Parameters:
    - example_id: ID of the example
    """

Feedback Operations

Asynchronous operations for managing feedback.

async def create_feedback(
    self,
    run_id: Union[str, UUID],
    key: str,
    *,
    score: Optional[Union[int, float, bool]] = None,
    value: Optional[Union[int, float, bool, str, dict, list]] = None,
    correction: Optional[dict] = None,
    comment: Optional[str] = None,
    source_info: Optional[dict] = None,
    feedback_source_type: Optional[FeedbackSourceType] = None,
    source_run_id: Optional[Union[str, UUID]] = None,
    feedback_id: Optional[Union[str, UUID]] = None,
    **kwargs
) -> Feedback:
    """
    Asynchronously create feedback for a run.

    Parameters:
    - run_id: ID of the run to give feedback on
    - key: Feedback key/metric name
    - score: Numeric score
    - value: Non-numeric value
    - correction: Correction data
    - comment: Text comment
    - source_info: Information about feedback source
    - feedback_source_type: Type of feedback source
    - source_run_id: ID of evaluator run
    - feedback_id: Preset feedback ID

    Returns:
    Created Feedback object
    """

async def read_feedback(
    self,
    feedback_id: Union[str, UUID]
) -> Feedback:
    """
    Asynchronously read feedback by ID.

    Parameters:
    - feedback_id: ID of the feedback

    Returns:
    Feedback object
    """

async def list_feedback(
    self,
    *,
    run_ids: Optional[list[Union[str, UUID]]] = None,
    feedback_keys: Optional[list[str]] = None,
    feedback_source_types: Optional[list[FeedbackSourceType]] = None,
    limit: Optional[int] = None,
    offset: Optional[int] = None,
) -> AsyncIterator[Feedback]:
    """
    Asynchronously list feedback with filtering.

    Parameters:
    - run_ids: Filter by run IDs
    - feedback_keys: Filter by feedback keys
    - feedback_source_types: Filter by source types
    - limit: Maximum number of feedback items
    - offset: Number of feedback items to skip

    Returns:
    AsyncIterator of Feedback objects
    """

async def delete_feedback(
    self,
    feedback_id: Union[str, UUID]
) -> None:
    """
    Asynchronously delete feedback.

    Parameters:
    - feedback_id: ID of the feedback
    """

async def create_presigned_feedback_token(
    self,
    run_id: Union[str, UUID],
    feedback_key: str,
    *,
    expiration: Optional[datetime] = None,
    feedback_config: Optional[FeedbackConfig] = None,
) -> FeedbackIngestToken:
    """
    Asynchronously create a presigned feedback token.

    Parameters:
    - run_id: ID of the run
    - feedback_key: Feedback key/metric name
    - expiration: When the token expires
    - feedback_config: Feedback configuration

    Returns:
    FeedbackIngestToken object
    """

async def create_feedback_from_token(
    self,
    token: Union[str, UUID],
    *,
    score: Optional[Union[int, float, bool]] = None,
    value: Optional[Union[int, float, bool, str, dict, list]] = None,
    correction: Optional[dict] = None,
    comment: Optional[str] = None,
) -> None:
    """
    Asynchronously create feedback using a presigned token.

    Parameters:
    - token: Presigned feedback token
    - score: Numeric score
    - value: Non-numeric value
    - correction: Correction data
    - comment: Text comment
    """

Annotation Queue Operations

Asynchronous operations for managing annotation queues.

async def list_annotation_queues(
    self,
    *,
    queue_ids: Optional[list[Union[str, UUID]]] = None,
    name: Optional[str] = None,
    name_contains: Optional[str] = None,
    limit: Optional[int] = None,
    offset: Optional[int] = None,
) -> AsyncIterator[AnnotationQueue]:
    """
    Asynchronously list annotation queues.

    Parameters:
    - queue_ids: Filter by queue IDs
    - name: Filter by exact name
    - name_contains: Filter by name substring
    - limit: Maximum number of queues
    - offset: Number of queues to skip

    Returns:
    AsyncIterator of AnnotationQueue objects
    """

async def create_annotation_queue(
    self,
    *,
    name: str,
    description: Optional[str] = None,
    queue_id: Optional[Union[str, UUID]] = None,
) -> AnnotationQueue:
    """
    Asynchronously create an annotation queue.

    Parameters:
    - name: Name of the queue
    - description: Queue description
    - queue_id: Preset queue ID

    Returns:
    Created AnnotationQueue object
    """

async def read_annotation_queue(
    self,
    queue_id: Union[str, UUID],
    *,
    queue_name: Optional[str] = None,
) -> AnnotationQueue:
    """
    Asynchronously read an annotation queue.

    Parameters:
    - queue_id: ID of the queue
    - queue_name: Name of the queue (alternative)

    Returns:
    AnnotationQueue object
    """

async def update_annotation_queue(
    self,
    queue_id: Union[str, UUID],
    *,
    name: Optional[str] = None,
    description: Optional[str] = None,
) -> AnnotationQueue:
    """
    Asynchronously update an annotation queue.

    Parameters:
    - queue_id: ID of the queue
    - name: New name
    - description: New description

    Returns:
    Updated AnnotationQueue object
    """

async def delete_annotation_queue(
    self,
    queue_id: Union[str, UUID]
) -> None:
    """
    Asynchronously delete an annotation queue.

    Parameters:
    - queue_id: ID of the queue
    """

async def add_runs_to_annotation_queue(
    self,
    queue_id: Union[str, UUID],
    run_ids: list[Union[str, UUID]]
) -> None:
    """
    Asynchronously add runs to an annotation queue.

    Parameters:
    - queue_id: ID of the queue
    - run_ids: List of run IDs to add
    """

async def delete_run_from_annotation_queue(
    self,
    queue_id: Union[str, UUID],
    queue_run_id: Union[str, UUID]
) -> None:
    """
    Asynchronously remove run from annotation queue.

    Parameters:
    - queue_id: ID of the queue
    - queue_run_id: Queue run ID to remove
    """

async def get_run_from_annotation_queue(
    self,
    queue_id: Union[str, UUID],
    *,
    offset: int = 0,
) -> RunWithAnnotationQueueInfo:
    """
    Asynchronously get the next run from an annotation queue.

    Parameters:
    - queue_id: ID of the queue
    - offset: Offset in the queue

    Returns:
    RunWithAnnotationQueueInfo object
    """

Similarity Search Operations

Asynchronous operations for dataset similarity search.

async def index_dataset(
    self,
    dataset_id: Union[str, UUID],
    *,
    tag: Optional[str] = None,
) -> None:
    """
    Asynchronously index a dataset for similarity search.

    Parameters:
    - dataset_id: ID of the dataset
    - tag: Version tag to index
    """

async def sync_indexed_dataset(
    self,
    dataset_id: Union[str, UUID],
    *,
    tag: Optional[str] = None,
) -> None:
    """
    Asynchronously sync indexed dataset with latest changes.

    Parameters:
    - dataset_id: ID of the dataset
    - tag: Version tag to sync
    """

async def similar_examples(
    self,
    inputs: dict,
    *,
    dataset_id: Union[str, UUID],
    limit: int = 5,
    filter: Optional[str] = None,
) -> list[Example]:
    """
    Asynchronously find similar examples in a dataset.

    Parameters:
    - inputs: Input to find similar examples for
    - dataset_id: ID of the dataset
    - limit: Maximum number of similar examples
    - filter: Filter string for examples

    Returns:
    List of similar Example objects
    """

Prompt Management

Asynchronous operations for managing prompts.

async def list_prompts(
    self,
    *,
    limit: Optional[int] = None,
    offset: Optional[int] = None,
    query: Optional[str] = None,
    is_public: Optional[bool] = None,
    is_archived: Optional[bool] = None,
    sort_field: Optional[str] = None,
    sort_direction: Optional[str] = None,
) -> list[Prompt]:
    """
    Asynchronously list available prompts.

    Parameters:
    - limit: Maximum number of prompts
    - offset: Number of prompts to skip
    - query: Search query
    - is_public: Filter by public status
    - is_archived: Filter by archived status
    - sort_field: Field to sort by
    - sort_direction: Sort direction

    Returns:
    List of Prompt objects
    """

async def get_prompt(
    self,
    prompt_identifier: str
) -> Prompt:
    """
    Asynchronously get a specific prompt.

    Parameters:
    - prompt_identifier: Prompt name or identifier

    Returns:
    Prompt object
    """

async def create_prompt(
    self,
    prompt_name: str,
    *,
    description: Optional[str] = None,
    readme: Optional[str] = None,
    tags: Optional[list[str]] = None,
    is_public: Optional[bool] = None,
) -> Prompt:
    """
    Asynchronously create a new prompt.

    Parameters:
    - prompt_name: Name of the prompt
    - description: Prompt description
    - readme: Markdown readme
    - tags: Prompt tags
    - is_public: Whether prompt is public

    Returns:
    Created Prompt object
    """

async def create_commit(
    self,
    prompt_identifier: str,
    *,
    object: Any,
    parent_commit: Optional[str] = None,
) -> dict:
    """
    Asynchronously create a prompt commit.

    Parameters:
    - prompt_identifier: Prompt name or identifier
    - object: Prompt object to commit
    - parent_commit: Parent commit hash

    Returns:
    Commit info dictionary
    """

async def update_prompt(
    self,
    prompt_identifier: str,
    *,
    description: Optional[str] = None,
    readme: Optional[str] = None,
    tags: Optional[list[str]] = None,
    is_public: Optional[bool] = None,
    is_archived: Optional[bool] = None,
) -> Prompt:
    """
    Asynchronously update a prompt.

    Parameters:
    - prompt_identifier: Prompt name or identifier
    - description: New description
    - readme: New readme
    - tags: New tags
    - is_public: New public status
    - is_archived: New archived status

    Returns:
    Updated Prompt object
    """

async def delete_prompt(
    self,
    prompt_identifier: str
) -> None:
    """
    Asynchronously delete a prompt.

    Parameters:
    - prompt_identifier: Prompt name or identifier
    """

async def like_prompt(
    self,
    prompt_identifier: str
) -> None:
    """
    Asynchronously like a prompt.

    Parameters:
    - prompt_identifier: Prompt name or identifier
    """

async def unlike_prompt(
    self,
    prompt_identifier: str
) -> None:
    """
    Asynchronously unlike a prompt.

    Parameters:
    - prompt_identifier: Prompt name or identifier
    """

async def pull_prompt_commit(
    self,
    prompt_identifier: str,
    *,
    commit_hash: str,
) -> dict:
    """
    Asynchronously pull a specific prompt commit.

    Parameters:
    - prompt_identifier: Prompt name or identifier
    - commit_hash: Commit hash to pull

    Returns:
    Prompt commit data
    """

async def list_prompt_commits(
    self,
    prompt_identifier: str,
    *,
    limit: Optional[int] = None,
    offset: Optional[int] = None,
) -> list[dict]:
    """
    Asynchronously list prompt commits.

    Parameters:
    - prompt_identifier: Prompt name or identifier
    - limit: Maximum number of commits
    - offset: Number of commits to skip

    Returns:
    List of commit info dictionaries
    """

async def pull_prompt(
    self,
    prompt_identifier: str,
    *,
    include_model: bool = False,
) -> Any:
    """
    Asynchronously pull a prompt from the hub.

    Parameters:
    - prompt_identifier: Prompt name or identifier
    - include_model: Whether to include model config

    Returns:
    Prompt object
    """

async def push_prompt(
    self,
    prompt_identifier: str,
    *,
    object: Any,
    parent_commit: Optional[str] = None,
    is_public: Optional[bool] = None,
    description: Optional[str] = None,
    readme: Optional[str] = None,
    tags: Optional[list[str]] = None,
) -> str:
    """
    Asynchronously push a prompt to the hub.

    Parameters:
    - prompt_identifier: Prompt name or identifier
    - object: Prompt object to push
    - parent_commit: Parent commit hash
    - is_public: Whether prompt is public
    - description: Prompt description
    - readme: Markdown readme
    - tags: Prompt tags

    Returns:
    Prompt URL
    """