Declaratively load data from REST APIs with automatic pagination, authentication, and response processing.
def rest_api(
client: dict = None,
resources: List[Union[str, dict, Any]] = None,
resource_defaults: dict = None
) -> List[DltResource]:
"""
Creates REST API resources from declarative configuration.
Args:
client: HTTP client configuration (base_url, auth, headers, etc.)
resources: List of endpoint resource definitions (name, endpoint, params, etc.)
resource_defaults: Default settings applied to all resources
Returns:
List of DltResource instances
Example:
pipeline.run(
rest_api(
client={"base_url": "https://api.example.com"},
resources=[
{"name": "users", "endpoint": "/users"},
{"name": "posts", "endpoint": "/posts"}
]
)
)
"""def rest_api_source(
config: dict,
name: str = None,
section: str = None,
max_table_nesting: int = None,
root_key: bool = None,
schema: Schema = None,
schema_contract: Any = None,
parallelized: bool = False
) -> DltSource:
"""
Creates a REST API source with additional source-level configuration.
Args:
config: REST API configuration with client, resources, resource_defaults
name: Source name
section: Configuration section name
max_table_nesting: Maximum nesting level for flattening
root_key: Add root key to nested data
schema: Custom schema
schema_contract: Schema evolution rules
parallelized: Enable parallel resource execution
Returns:
DltSource with endpoint resources
"""def rest_api_resources(config: dict) -> List[DltResource]:
"""
Creates list of resources from config without wrapping in source.
Args:
config: REST API configuration
Returns:
List of DltResource instances
"""def check_connection(
base_url: str,
auth: dict = None,
headers: dict = None
) -> bool:
"""
Tests REST API connectivity.
Args:
base_url: API base URL
auth: Authentication configuration
headers: HTTP headers
Returns:
True if connection successful
Example:
if check_connection("https://api.example.com", auth={"token": "..."}):
pipeline.run(rest_api(config))
"""RESTAPIConfig = TypedDict("RESTAPIConfig", {
"client": "ClientConfig",
"resources": List["EndpointResource"],
"resource_defaults": dict, # Optional defaults for all resources
}, total=False)
ClientConfig = TypedDict("ClientConfig", {
"base_url": str,
"auth": "AuthConfig",
"headers": dict,
"params": dict,
"paginator": "BasePaginator",
"session": Any,
}, total=False)
EndpointResource = TypedDict("EndpointResource", {
"name": str,
"endpoint": Union[str, "Endpoint"],
"write_disposition": str,
"primary_key": Union[str, List[str]],
"processing_steps": List["ProcessingSteps"],
"selected": bool,
}, total=False)
Endpoint = TypedDict("Endpoint", {
"path": str,
"method": str,
"params": dict,
"json": dict,
"paginator": "BasePaginator",
"data_selector": str,
"response_actions": List[dict],
"incremental": "Incremental",
}, total=False)
AuthConfig = TypedDict("AuthConfig", {
"type": str, # "bearer", "api_key", "http_basic", "oauth2"
"token": str,
"api_key": str,
"name": str,
"location": str, # "header" or "query"
"username": str,
"password": str,
"client_id": str,
"client_secret": str,
"token_url": str,
"scopes": List[str],
}, total=False)
ProcessingSteps = TypedDict("ProcessingSteps", {
"filter": Callable,
"map": Callable,
}, total=False)import dlt
from dlt.sources.rest_api import rest_api
config = {
"client": {
"base_url": "https://api.example.com"
},
"resources": [
{
"name": "users",
"endpoint": "/users"
}
]
}
pipeline = dlt.pipeline(destination="duckdb", dataset_name="api_data")
pipeline.run(rest_api(config))# Bearer token authentication
config = {
"client": {
"base_url": "https://api.example.com",
"auth": {
"type": "bearer",
"token": dlt.secrets["api_token"]
}
},
"resources": [
{"name": "users", "endpoint": "/users"}
]
}
pipeline.run(rest_api(config))config = {
"client": {
"base_url": "https://api.example.com",
"auth": {
"type": "api_key",
"api_key": dlt.secrets["api_key"],
"name": "X-API-Key",
"location": "header"
}
},
"resources": [
{"name": "data", "endpoint": "/data"}
]
}
pipeline.run(rest_api(config))config = {
"client": {
"base_url": "https://api.example.com",
"auth": {
"type": "oauth2",
"client_id": dlt.secrets["client_id"],
"client_secret": dlt.secrets["client_secret"],
"token_url": "https://api.example.com/oauth/token",
"scopes": ["read:data"]
}
},
"resources": [
{"name": "protected_data", "endpoint": "/protected"}
]
}
pipeline.run(rest_api(config))from dlt.sources.helpers.rest_client.paginators import JSONResponsePaginator
config = {
"client": {
"base_url": "https://api.example.com",
"paginator": JSONResponsePaginator(next_url_path="pagination.next")
},
"resources": [
{
"name": "paginated_data",
"endpoint": "/data"
}
]
}
pipeline.run(rest_api(config))# Extract data from nested response
config = {
"client": {
"base_url": "https://api.example.com"
},
"resources": [
{
"name": "users",
"endpoint": {
"path": "/users",
"data_selector": "data.users" # Extract from response.data.users
}
}
]
}
pipeline.run(rest_api(config))config = {
"client": {
"base_url": "https://api.example.com",
"auth": {"type": "bearer", "token": dlt.secrets["token"]}
},
"resources": [
{
"name": "users",
"endpoint": "/users",
"write_disposition": "merge",
"primary_key": "id"
},
{
"name": "orders",
"endpoint": "/orders",
"write_disposition": "append"
},
{
"name": "products",
"endpoint": "/products",
"write_disposition": "replace"
}
]
}
pipeline.run(rest_api(config))from dlt.sources import incremental
config = {
"client": {
"base_url": "https://api.example.com"
},
"resources": [
{
"name": "events",
"endpoint": {
"path": "/events",
"params": {
"since": "{last_value}" # Template for incremental
},
"incremental": incremental("updated_at")
},
"write_disposition": "append"
}
]
}
pipeline.run(rest_api(config))config = {
"client": {
"base_url": "https://api.example.com",
"params": {
"format": "json",
"limit": 100
}
},
"resources": [
{
"name": "filtered_data",
"endpoint": {
"path": "/data",
"params": {
"category": "sales", # Merged with client params
"year": 2024
}
}
}
]
}
pipeline.run(rest_api(config))config = {
"client": {
"base_url": "https://api.example.com",
"headers": {
"User-Agent": "MyApp/1.0",
"Accept": "application/json"
}
},
"resources": [
{"name": "data", "endpoint": "/data"}
]
}
pipeline.run(rest_api(config))config = {
"client": {
"base_url": "https://api.example.com"
},
"resources": [
{
"name": "query_results",
"endpoint": {
"path": "/query",
"method": "POST",
"json": {
"query": "SELECT * FROM data",
"format": "json"
}
}
}
]
}
pipeline.run(rest_api(config))config = {
"client": {
"base_url": "https://api.example.com"
},
"resources": [
{
"name": "active_users",
"endpoint": "/users",
"processing_steps": [
{
"filter": lambda item: item["status"] == "active"
},
{
"map": lambda item: {
**item,
"processed_at": datetime.now().isoformat()
}
}
]
}
]
}
pipeline.run(rest_api(config))config = {
"client": {
"base_url": "https://api.example.com"
},
"resource_defaults": {
"write_disposition": "merge",
"primary_key": "id"
},
"resources": [
{"name": "users", "endpoint": "/users"},
{"name": "orders", "endpoint": "/orders"}
# Both use merge disposition and id as primary key
]
}
pipeline.run(rest_api(config))config = {
"client": {
"base_url": "https://api.example.com"
},
"resources": [
{
"name": "data",
"endpoint": {
"path": "/data",
"response_actions": [
{
"status_code": 429, # Rate limit
"action": "retry",
"retry_after": 60
},
{
"status_code": 404,
"action": "ignore"
}
]
}
}
]
}
pipeline.run(rest_api(config))from dlt.sources.helpers.rest_client.paginators import (
OffsetPaginator,
HeaderLinkPaginator,
PageNumberPaginator
)
# Offset-based pagination
config = {
"client": {
"base_url": "https://api.example.com",
"paginator": OffsetPaginator(
limit=100,
offset_param="offset",
limit_param="limit",
total_path="total"
)
},
"resources": [
{"name": "data", "endpoint": "/data"}
]
}
pipeline.run(rest_api(config))from dlt.sources.rest_api import rest_api, check_connection
config = {...}
# Test connection first
if check_connection(
config["client"]["base_url"],
auth=config["client"].get("auth")
):
pipeline.run(rest_api(config))
else:
print("Connection failed"){"type": "bearer", "token": "..."}{"type": "api_key", "api_key": "...", "name": "...", "location": "header|query"}{"type": "http_basic", "username": "...", "password": "..."}{"type": "oauth2", "client_id": "...", "client_secret": "...", "token_url": "..."}dlt.secrets for tokens and keyscheck_connection() before loading