or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddestinations.mdhelpers.mdincremental.mdindex.mdpipeline.mdschema.mdsource-filesystem.mdsource-rest-api.mdsource-sql-database.mdsources-resources.md
tile.json

source-rest-api.mddocs/

REST API Source

Declaratively load data from REST APIs with automatic pagination, authentication, and response processing.

Capabilities

REST API Source Function

def rest_api(
    client: dict = None,
    resources: List[Union[str, dict, Any]] = None,
    resource_defaults: dict = None
) -> List[DltResource]:
    """
    Creates REST API resources from declarative configuration.

    Args:
        client: HTTP client configuration (base_url, auth, headers, etc.)
        resources: List of endpoint resource definitions (name, endpoint, params, etc.)
        resource_defaults: Default settings applied to all resources

    Returns:
        List of DltResource instances

    Example:
        pipeline.run(
            rest_api(
                client={"base_url": "https://api.example.com"},
                resources=[
                    {"name": "users", "endpoint": "/users"},
                    {"name": "posts", "endpoint": "/posts"}
                ]
            )
        )
    """
def rest_api_source(
    config: dict,
    name: str = None,
    section: str = None,
    max_table_nesting: int = None,
    root_key: bool = None,
    schema: Schema = None,
    schema_contract: Any = None,
    parallelized: bool = False
) -> DltSource:
    """
    Creates a REST API source with additional source-level configuration.

    Args:
        config: REST API configuration with client, resources, resource_defaults
        name: Source name
        section: Configuration section name
        max_table_nesting: Maximum nesting level for flattening
        root_key: Add root key to nested data
        schema: Custom schema
        schema_contract: Schema evolution rules
        parallelized: Enable parallel resource execution

    Returns:
        DltSource with endpoint resources
    """
def rest_api_resources(config: dict) -> List[DltResource]:
    """
    Creates list of resources from config without wrapping in source.

    Args:
        config: REST API configuration

    Returns:
        List of DltResource instances
    """

Configuration Helper

def check_connection(
    base_url: str,
    auth: dict = None,
    headers: dict = None
) -> bool:
    """
    Tests REST API connectivity.

    Args:
        base_url: API base URL
        auth: Authentication configuration
        headers: HTTP headers

    Returns:
        True if connection successful

    Example:
        if check_connection("https://api.example.com", auth={"token": "..."}):
            pipeline.run(rest_api(config))
    """

Configuration Types

RESTAPIConfig = TypedDict("RESTAPIConfig", {
    "client": "ClientConfig",
    "resources": List["EndpointResource"],
    "resource_defaults": dict,  # Optional defaults for all resources
}, total=False)

ClientConfig = TypedDict("ClientConfig", {
    "base_url": str,
    "auth": "AuthConfig",
    "headers": dict,
    "params": dict,
    "paginator": "BasePaginator",
    "session": Any,
}, total=False)

EndpointResource = TypedDict("EndpointResource", {
    "name": str,
    "endpoint": Union[str, "Endpoint"],
    "write_disposition": str,
    "primary_key": Union[str, List[str]],
    "processing_steps": List["ProcessingSteps"],
    "selected": bool,
}, total=False)

Endpoint = TypedDict("Endpoint", {
    "path": str,
    "method": str,
    "params": dict,
    "json": dict,
    "paginator": "BasePaginator",
    "data_selector": str,
    "response_actions": List[dict],
    "incremental": "Incremental",
}, total=False)

AuthConfig = TypedDict("AuthConfig", {
    "type": str,  # "bearer", "api_key", "http_basic", "oauth2"
    "token": str,
    "api_key": str,
    "name": str,
    "location": str,  # "header" or "query"
    "username": str,
    "password": str,
    "client_id": str,
    "client_secret": str,
    "token_url": str,
    "scopes": List[str],
}, total=False)

ProcessingSteps = TypedDict("ProcessingSteps", {
    "filter": Callable,
    "map": Callable,
}, total=False)

Usage Examples

Basic REST API Source

import dlt
from dlt.sources.rest_api import rest_api

config = {
    "client": {
        "base_url": "https://api.example.com"
    },
    "resources": [
        {
            "name": "users",
            "endpoint": "/users"
        }
    ]
}

pipeline = dlt.pipeline(destination="duckdb", dataset_name="api_data")
pipeline.run(rest_api(config))

With Authentication

# Bearer token authentication
config = {
    "client": {
        "base_url": "https://api.example.com",
        "auth": {
            "type": "bearer",
            "token": dlt.secrets["api_token"]
        }
    },
    "resources": [
        {"name": "users", "endpoint": "/users"}
    ]
}

pipeline.run(rest_api(config))

API Key Authentication

config = {
    "client": {
        "base_url": "https://api.example.com",
        "auth": {
            "type": "api_key",
            "api_key": dlt.secrets["api_key"],
            "name": "X-API-Key",
            "location": "header"
        }
    },
    "resources": [
        {"name": "data", "endpoint": "/data"}
    ]
}

pipeline.run(rest_api(config))

OAuth2 Authentication

config = {
    "client": {
        "base_url": "https://api.example.com",
        "auth": {
            "type": "oauth2",
            "client_id": dlt.secrets["client_id"],
            "client_secret": dlt.secrets["client_secret"],
            "token_url": "https://api.example.com/oauth/token",
            "scopes": ["read:data"]
        }
    },
    "resources": [
        {"name": "protected_data", "endpoint": "/protected"}
    ]
}

pipeline.run(rest_api(config))

With Pagination

from dlt.sources.helpers.rest_client.paginators import JSONResponsePaginator

config = {
    "client": {
        "base_url": "https://api.example.com",
        "paginator": JSONResponsePaginator(next_url_path="pagination.next")
    },
    "resources": [
        {
            "name": "paginated_data",
            "endpoint": "/data"
        }
    ]
}

pipeline.run(rest_api(config))

Data Selector

# Extract data from nested response
config = {
    "client": {
        "base_url": "https://api.example.com"
    },
    "resources": [
        {
            "name": "users",
            "endpoint": {
                "path": "/users",
                "data_selector": "data.users"  # Extract from response.data.users
            }
        }
    ]
}

pipeline.run(rest_api(config))

Multiple Resources

config = {
    "client": {
        "base_url": "https://api.example.com",
        "auth": {"type": "bearer", "token": dlt.secrets["token"]}
    },
    "resources": [
        {
            "name": "users",
            "endpoint": "/users",
            "write_disposition": "merge",
            "primary_key": "id"
        },
        {
            "name": "orders",
            "endpoint": "/orders",
            "write_disposition": "append"
        },
        {
            "name": "products",
            "endpoint": "/products",
            "write_disposition": "replace"
        }
    ]
}

pipeline.run(rest_api(config))

Incremental Loading

from dlt.sources import incremental

config = {
    "client": {
        "base_url": "https://api.example.com"
    },
    "resources": [
        {
            "name": "events",
            "endpoint": {
                "path": "/events",
                "params": {
                    "since": "{last_value}"  # Template for incremental
                },
                "incremental": incremental("updated_at")
            },
            "write_disposition": "append"
        }
    ]
}

pipeline.run(rest_api(config))

Request Parameters

config = {
    "client": {
        "base_url": "https://api.example.com",
        "params": {
            "format": "json",
            "limit": 100
        }
    },
    "resources": [
        {
            "name": "filtered_data",
            "endpoint": {
                "path": "/data",
                "params": {
                    "category": "sales",  # Merged with client params
                    "year": 2024
                }
            }
        }
    ]
}

pipeline.run(rest_api(config))

Custom Headers

config = {
    "client": {
        "base_url": "https://api.example.com",
        "headers": {
            "User-Agent": "MyApp/1.0",
            "Accept": "application/json"
        }
    },
    "resources": [
        {"name": "data", "endpoint": "/data"}
    ]
}

pipeline.run(rest_api(config))

POST Requests

config = {
    "client": {
        "base_url": "https://api.example.com"
    },
    "resources": [
        {
            "name": "query_results",
            "endpoint": {
                "path": "/query",
                "method": "POST",
                "json": {
                    "query": "SELECT * FROM data",
                    "format": "json"
                }
            }
        }
    ]
}

pipeline.run(rest_api(config))

Processing Steps

config = {
    "client": {
        "base_url": "https://api.example.com"
    },
    "resources": [
        {
            "name": "active_users",
            "endpoint": "/users",
            "processing_steps": [
                {
                    "filter": lambda item: item["status"] == "active"
                },
                {
                    "map": lambda item: {
                        **item,
                        "processed_at": datetime.now().isoformat()
                    }
                }
            ]
        }
    ]
}

pipeline.run(rest_api(config))

Resource Defaults

config = {
    "client": {
        "base_url": "https://api.example.com"
    },
    "resource_defaults": {
        "write_disposition": "merge",
        "primary_key": "id"
    },
    "resources": [
        {"name": "users", "endpoint": "/users"},
        {"name": "orders", "endpoint": "/orders"}
        # Both use merge disposition and id as primary key
    ]
}

pipeline.run(rest_api(config))

Response Actions

config = {
    "client": {
        "base_url": "https://api.example.com"
    },
    "resources": [
        {
            "name": "data",
            "endpoint": {
                "path": "/data",
                "response_actions": [
                    {
                        "status_code": 429,  # Rate limit
                        "action": "retry",
                        "retry_after": 60
                    },
                    {
                        "status_code": 404,
                        "action": "ignore"
                    }
                ]
            }
        }
    ]
}

pipeline.run(rest_api(config))

Complex Pagination Example

from dlt.sources.helpers.rest_client.paginators import (
    OffsetPaginator,
    HeaderLinkPaginator,
    PageNumberPaginator
)

# Offset-based pagination
config = {
    "client": {
        "base_url": "https://api.example.com",
        "paginator": OffsetPaginator(
            limit=100,
            offset_param="offset",
            limit_param="limit",
            total_path="total"
        )
    },
    "resources": [
        {"name": "data", "endpoint": "/data"}
    ]
}

pipeline.run(rest_api(config))

Check Connection Before Loading

from dlt.sources.rest_api import rest_api, check_connection

config = {...}

# Test connection first
if check_connection(
    config["client"]["base_url"],
    auth=config["client"].get("auth")
):
    pipeline.run(rest_api(config))
else:
    print("Connection failed")

Authentication Types

  • Bearer Token: {"type": "bearer", "token": "..."}
  • API Key: {"type": "api_key", "api_key": "...", "name": "...", "location": "header|query"}
  • HTTP Basic: {"type": "http_basic", "username": "...", "password": "..."}
  • OAuth2: {"type": "oauth2", "client_id": "...", "client_secret": "...", "token_url": "..."}

Pagination Strategies

  • JSON Response: Extract next URL from JSON response
  • Header Link: Use Link header (RFC 5988)
  • Offset: Offset/limit-based pagination
  • Page Number: Page number-based pagination
  • Cursor: Cursor-based pagination

Best Practices

  1. Store credentials in secrets: Use dlt.secrets for tokens and keys
  2. Use incremental loading: For time-based data
  3. Set write_disposition: Choose append/merge/replace based on data
  4. Handle rate limits: Configure response actions
  5. Test connections: Use check_connection() before loading
  6. Use data selectors: Extract nested data correctly
  7. Apply filters early: Use processing_steps to reduce data volume