Load files from local filesystem or cloud storage (S3, GCS, Azure, SFTP) with automatic file listing, filtering, and content reading.
def filesystem(
bucket_url: str,
credentials: Any = None,
file_glob: str = "*",
files_per_page: int = None,
extract_content: bool = False,
kwargs: dict = None,
client_kwargs: dict = None,
incremental: Incremental = None
) -> DltResource:
"""
Lists and optionally loads files from filesystem or cloud storage.
Args:
bucket_url: Filesystem path or cloud bucket URL:
- Local: "/path/to/files" or "file:///path"
- S3: "s3://bucket-name/path"
- GCS: "gs://bucket-name/path"
- Azure: "az://container-name/path" or "abfss://container@account.dfs.core.windows.net/path"
- SFTP: "sftp://hostname/path"
credentials: Cloud credentials or fsspec AbstractFileSystem instance:
- AWS: AwsCredentials
- GCS: GcpCredentials
- Azure: AzureCredentials
file_glob: Glob pattern for filtering files (default: "*")
- "*.json": All JSON files
- "data/**/*.csv": All CSV files in data directory (recursive)
- "2024-*": Files starting with "2024-"
files_per_page: Number of files to list per page (default: 100)
extract_content: Whether to read file contents (default: False, only metadata)
kwargs: Additional arguments passed to fsspec constructor (e.g., dict(use_ssl=True) for s3fs)
client_kwargs: Arguments for fsspec native client (e.g., dict(verify="public.crt") for botocore)
incremental: Incremental cursor on listed files (commonly modification_date)
Returns:
DltResource yielding FileItem objects
Example:
# List S3 files
pipeline.run(filesystem("s3://my-bucket/data", file_glob="*.json"))
# Load local files with incremental by modification date
pipeline.run(filesystem("/data/files", extract_content=True,
incremental=dlt.sources.incremental("modification_date")))
"""def readers(
bucket_url: str,
credentials: Any = None,
file_glob: str = "*",
kwargs: dict = None,
client_kwargs: dict = None,
incremental: Incremental = None
) -> Tuple[DltResource, ...]:
"""
Creates a source with multiple reader transformer resources for different file formats.
Args:
bucket_url: Filesystem path or cloud bucket URL
credentials: Cloud credentials or fsspec AbstractFileSystem instance
file_glob: Glob pattern for file filtering
kwargs: Additional arguments passed to fsspec constructor
client_kwargs: Arguments for fsspec native client
incremental: Incremental cursor on listed files (commonly modification_date)
Returns:
Tuple of DltResource reader transformers (read_csv, read_jsonl, read_parquet, read_csv_duckdb)
Example:
source = readers("s3://bucket/data", file_glob="*.csv")
pipeline.run(source.with_resources("read_csv"))
"""@dlt.transformer
def read_csv(
items: Iterator[FileItem],
chunksize: int = 10000,
**pandas_kwargs
) -> Iterator[Any]:
"""
Reads CSV files using pandas.
Args:
items: Iterator of FileItem objects
chunksize: Rows per chunk
**pandas_kwargs: Additional pandas.read_csv arguments
Yields:
Data rows from CSV files
Example:
@dlt.transformer(data_from=filesystem_resource)
def my_csv_reader(items):
yield from read_csv(items, chunksize=5000, sep=";")
"""@dlt.transformer
def read_jsonl(
items: Iterator[FileItem],
chunksize: int = 1000
) -> Iterator[dict]:
"""
Reads JSON Lines files.
Args:
items: Iterator of FileItem objects
chunksize: Lines per chunk
Yields:
Parsed JSON objects
Example:
yield from read_jsonl(filesystem_items)
"""@dlt.transformer
def read_parquet(
items: Iterator[FileItem],
**pyarrow_kwargs
) -> Iterator[Any]:
"""
Reads Parquet files using PyArrow.
Args:
items: Iterator of FileItem objects
**pyarrow_kwargs: Additional PyArrow arguments
Yields:
Data rows from Parquet files
Example:
yield from read_parquet(filesystem_items)
"""@dlt.transformer
def read_csv_duckdb(
items: Iterator[FileItem],
chunksize: int = 10000,
**duckdb_kwargs
) -> Iterator[Any]:
"""
Reads CSV files using DuckDB (faster for large files).
Args:
items: Iterator of FileItem objects
chunksize: Rows per chunk
**duckdb_kwargs: Additional DuckDB arguments
Yields:
Data rows from CSV files
Example:
yield from read_csv_duckdb(filesystem_items)
"""class FileItem(TypedDict):
"""
File metadata structure.
Fields:
file_name: Name of the file
file_url: Full URL/path to file
mime_type: MIME type of file
size_in_bytes: File size
modification_date: Last modified timestamp
file_content: File content (if extract_content=True)
"""
file_name: str
file_url: str
mime_type: str
size_in_bytes: int
modification_date: str
file_content: Optional[bytes]def fsspec_filesystem(
bucket_url: str,
credentials: Any = None
) -> AbstractFileSystem:
"""
Creates an fsspec filesystem instance.
Args:
bucket_url: Filesystem URL
credentials: Credentials
Returns:
fsspec AbstractFileSystem instance
Example:
fs = fsspec_filesystem("s3://bucket")
files = fs.ls("/path")
"""def glob_files(
fs: AbstractFileSystem,
bucket_url: str,
file_glob: str = "*"
) -> Iterator[str]:
"""
Globs files matching pattern.
Args:
fs: fsspec filesystem
bucket_url: Base path
file_glob: Glob pattern
Yields:
File paths matching pattern
Example:
for file_path in glob_files(fs, "s3://bucket/data", "*.json"):
print(file_path)
"""import dlt
from dlt.sources.credentials import AwsCredentials
# Configure AWS credentials
credentials = AwsCredentials(
aws_access_key_id="AKIA...",
aws_secret_access_key="secret...",
region_name="us-east-1"
)
# List all JSON files
pipeline = dlt.pipeline(destination="duckdb", dataset_name="files")
source = filesystem(
bucket_url="s3://my-bucket/data",
credentials=credentials,
file_glob="**/*.json" # Recursive glob
)
pipeline.run(source)
# Query file metadata
with pipeline.sql_client() as client:
files = client.execute_sql("SELECT file_name, size_in_bytes FROM filesystem")from dlt.sources.filesystem import filesystem, read_csv
# List and read CSV files
source = filesystem(
bucket_url="s3://bucket/data",
file_glob="*.csv",
extract_content=True # Load file contents
)
# Transform to parse CSV
@dlt.transformer(data_from=source)
def csv_data(items):
yield from read_csv(items, chunksize=10000)
pipeline.run(csv_data)from dlt.sources.filesystem import readers
# Create source with built-in readers
source = readers(
bucket_url="gs://my-bucket/exports",
file_glob="*.csv",
credentials=gcp_credentials
)
# Use CSV reader
pipeline.run(source.with_resources("read_csv"))from dlt.sources.credentials import GcpServiceAccountCredentials
credentials = GcpServiceAccountCredentials(
project_id="my-project",
private_key="-----BEGIN PRIVATE KEY-----...",
client_email="sa@project.iam.gserviceaccount.com"
)
# Load only recent files
source = filesystem(
bucket_url="gs://bucket/logs",
credentials=credentials,
file_glob="2024-12-*.log"
)
pipeline.run(source)from dlt.sources.credentials import AzureCredentials
credentials = AzureCredentials(
azure_storage_account_name="mystorageaccount",
azure_storage_account_key="key..."
)
source = filesystem(
bucket_url="az://container-name/path",
credentials=credentials,
file_glob="*.parquet"
)
pipeline.run(source)# No credentials needed for local files
source = filesystem(
bucket_url="/data/exports",
file_glob="**/*.json", # Recursive
extract_content=True
)
pipeline.run(source)from dlt.sources.filesystem import filesystem
source = filesystem("s3://bucket/data", file_glob="*.json", extract_content=True)
@dlt.transformer(data_from=source, write_disposition="append")
def process_files(file_item):
"""
Custom file processing logic.
"""
import json
# Access file metadata
file_name = file_item["file_name"]
file_size = file_item["size_in_bytes"]
# Parse file content
content = json.loads(file_item["file_content"])
# Transform and yield data
for record in content["records"]:
yield {
"source_file": file_name,
"file_size": file_size,
**record
}
pipeline.run(process_files)from dlt.sources import incremental
source = filesystem(
bucket_url="s3://bucket/logs",
file_glob="*.log"
)
@dlt.resource(primary_key="file_url")
def incremental_files(
file_item,
modified=incremental("modification_date")
):
"""
Only load new or modified files.
"""
# Filter based on modification date
if file_item["modification_date"] > modified.last_value:
yield file_item
pipeline.run(incremental_files)from dlt.sources.filesystem import filesystem, read_parquet
source = filesystem(
bucket_url="s3://data-lake/tables",
file_glob="*.parquet"
)
@dlt.transformer(data_from=source)
def parquet_tables(items):
yield from read_parquet(items)
pipeline.run(parquet_tables)from dlt.sources.filesystem import readers
# Load different file types
source = readers("s3://bucket/data")
# Select multiple readers
pipeline.run(
source.with_resources("read_csv", "read_jsonl", "read_parquet")
)file:///path or /paths3://bucket/pathgs://bucket/pathaz://container/path or abfss://...sftp://hostname/pathhttp:// or https:// (read-only)Credentials can be provided via:
secrets.toml fileExample secrets.toml:
[sources.filesystem.credentials]
aws_access_key_id = "AKIA..."
aws_secret_access_key = "secret..."
region_name = "us-east-1"