or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddestinations.mdhelpers.mdincremental.mdindex.mdpipeline.mdschema.mdsource-filesystem.mdsource-rest-api.mdsource-sql-database.mdsources-resources.md
tile.json

source-filesystem.mddocs/

Filesystem Source

Load files from local filesystem or cloud storage (S3, GCS, Azure, SFTP) with automatic file listing, filtering, and content reading.

Capabilities

Filesystem Source Function

def filesystem(
    bucket_url: str,
    credentials: Any = None,
    file_glob: str = "*",
    files_per_page: int = None,
    extract_content: bool = False,
    kwargs: dict = None,
    client_kwargs: dict = None,
    incremental: Incremental = None
) -> DltResource:
    """
    Lists and optionally loads files from filesystem or cloud storage.

    Args:
        bucket_url: Filesystem path or cloud bucket URL:
            - Local: "/path/to/files" or "file:///path"
            - S3: "s3://bucket-name/path"
            - GCS: "gs://bucket-name/path"
            - Azure: "az://container-name/path" or "abfss://container@account.dfs.core.windows.net/path"
            - SFTP: "sftp://hostname/path"
        credentials: Cloud credentials or fsspec AbstractFileSystem instance:
            - AWS: AwsCredentials
            - GCS: GcpCredentials
            - Azure: AzureCredentials
        file_glob: Glob pattern for filtering files (default: "*")
            - "*.json": All JSON files
            - "data/**/*.csv": All CSV files in data directory (recursive)
            - "2024-*": Files starting with "2024-"
        files_per_page: Number of files to list per page (default: 100)
        extract_content: Whether to read file contents (default: False, only metadata)
        kwargs: Additional arguments passed to fsspec constructor (e.g., dict(use_ssl=True) for s3fs)
        client_kwargs: Arguments for fsspec native client (e.g., dict(verify="public.crt") for botocore)
        incremental: Incremental cursor on listed files (commonly modification_date)

    Returns:
        DltResource yielding FileItem objects

    Example:
        # List S3 files
        pipeline.run(filesystem("s3://my-bucket/data", file_glob="*.json"))

        # Load local files with incremental by modification date
        pipeline.run(filesystem("/data/files", extract_content=True,
                               incremental=dlt.sources.incremental("modification_date")))
    """

Readers Source

def readers(
    bucket_url: str,
    credentials: Any = None,
    file_glob: str = "*",
    kwargs: dict = None,
    client_kwargs: dict = None,
    incremental: Incremental = None
) -> Tuple[DltResource, ...]:
    """
    Creates a source with multiple reader transformer resources for different file formats.

    Args:
        bucket_url: Filesystem path or cloud bucket URL
        credentials: Cloud credentials or fsspec AbstractFileSystem instance
        file_glob: Glob pattern for file filtering
        kwargs: Additional arguments passed to fsspec constructor
        client_kwargs: Arguments for fsspec native client
        incremental: Incremental cursor on listed files (commonly modification_date)

    Returns:
        Tuple of DltResource reader transformers (read_csv, read_jsonl, read_parquet, read_csv_duckdb)

    Example:
        source = readers("s3://bucket/data", file_glob="*.csv")
        pipeline.run(source.with_resources("read_csv"))
    """

File Reader Transformers

@dlt.transformer
def read_csv(
    items: Iterator[FileItem],
    chunksize: int = 10000,
    **pandas_kwargs
) -> Iterator[Any]:
    """
    Reads CSV files using pandas.

    Args:
        items: Iterator of FileItem objects
        chunksize: Rows per chunk
        **pandas_kwargs: Additional pandas.read_csv arguments

    Yields:
        Data rows from CSV files

    Example:
        @dlt.transformer(data_from=filesystem_resource)
        def my_csv_reader(items):
            yield from read_csv(items, chunksize=5000, sep=";")
    """
@dlt.transformer
def read_jsonl(
    items: Iterator[FileItem],
    chunksize: int = 1000
) -> Iterator[dict]:
    """
    Reads JSON Lines files.

    Args:
        items: Iterator of FileItem objects
        chunksize: Lines per chunk

    Yields:
        Parsed JSON objects

    Example:
        yield from read_jsonl(filesystem_items)
    """
@dlt.transformer
def read_parquet(
    items: Iterator[FileItem],
    **pyarrow_kwargs
) -> Iterator[Any]:
    """
    Reads Parquet files using PyArrow.

    Args:
        items: Iterator of FileItem objects
        **pyarrow_kwargs: Additional PyArrow arguments

    Yields:
        Data rows from Parquet files

    Example:
        yield from read_parquet(filesystem_items)
    """
@dlt.transformer
def read_csv_duckdb(
    items: Iterator[FileItem],
    chunksize: int = 10000,
    **duckdb_kwargs
) -> Iterator[Any]:
    """
    Reads CSV files using DuckDB (faster for large files).

    Args:
        items: Iterator of FileItem objects
        chunksize: Rows per chunk
        **duckdb_kwargs: Additional DuckDB arguments

    Yields:
        Data rows from CSV files

    Example:
        yield from read_csv_duckdb(filesystem_items)
    """

FileItem Type

class FileItem(TypedDict):
    """
    File metadata structure.

    Fields:
        file_name: Name of the file
        file_url: Full URL/path to file
        mime_type: MIME type of file
        size_in_bytes: File size
        modification_date: Last modified timestamp
        file_content: File content (if extract_content=True)
    """
    file_name: str
    file_url: str
    mime_type: str
    size_in_bytes: int
    modification_date: str
    file_content: Optional[bytes]

Utility Functions

def fsspec_filesystem(
    bucket_url: str,
    credentials: Any = None
) -> AbstractFileSystem:
    """
    Creates an fsspec filesystem instance.

    Args:
        bucket_url: Filesystem URL
        credentials: Credentials

    Returns:
        fsspec AbstractFileSystem instance

    Example:
        fs = fsspec_filesystem("s3://bucket")
        files = fs.ls("/path")
    """
def glob_files(
    fs: AbstractFileSystem,
    bucket_url: str,
    file_glob: str = "*"
) -> Iterator[str]:
    """
    Globs files matching pattern.

    Args:
        fs: fsspec filesystem
        bucket_url: Base path
        file_glob: Glob pattern

    Yields:
        File paths matching pattern

    Example:
        for file_path in glob_files(fs, "s3://bucket/data", "*.json"):
            print(file_path)
    """

Usage Examples

List Files from S3

import dlt
from dlt.sources.credentials import AwsCredentials

# Configure AWS credentials
credentials = AwsCredentials(
    aws_access_key_id="AKIA...",
    aws_secret_access_key="secret...",
    region_name="us-east-1"
)

# List all JSON files
pipeline = dlt.pipeline(destination="duckdb", dataset_name="files")
source = filesystem(
    bucket_url="s3://my-bucket/data",
    credentials=credentials,
    file_glob="**/*.json"  # Recursive glob
)
pipeline.run(source)

# Query file metadata
with pipeline.sql_client() as client:
    files = client.execute_sql("SELECT file_name, size_in_bytes FROM filesystem")

Load and Parse CSV Files

from dlt.sources.filesystem import filesystem, read_csv

# List and read CSV files
source = filesystem(
    bucket_url="s3://bucket/data",
    file_glob="*.csv",
    extract_content=True  # Load file contents
)

# Transform to parse CSV
@dlt.transformer(data_from=source)
def csv_data(items):
    yield from read_csv(items, chunksize=10000)

pipeline.run(csv_data)

Using Readers Source

from dlt.sources.filesystem import readers

# Create source with built-in readers
source = readers(
    bucket_url="gs://my-bucket/exports",
    file_glob="*.csv",
    credentials=gcp_credentials
)

# Use CSV reader
pipeline.run(source.with_resources("read_csv"))

Load from GCS with Filtering

from dlt.sources.credentials import GcpServiceAccountCredentials

credentials = GcpServiceAccountCredentials(
    project_id="my-project",
    private_key="-----BEGIN PRIVATE KEY-----...",
    client_email="sa@project.iam.gserviceaccount.com"
)

# Load only recent files
source = filesystem(
    bucket_url="gs://bucket/logs",
    credentials=credentials,
    file_glob="2024-12-*.log"
)

pipeline.run(source)

Load from Azure Blob Storage

from dlt.sources.credentials import AzureCredentials

credentials = AzureCredentials(
    azure_storage_account_name="mystorageaccount",
    azure_storage_account_key="key..."
)

source = filesystem(
    bucket_url="az://container-name/path",
    credentials=credentials,
    file_glob="*.parquet"
)

pipeline.run(source)

Local Filesystem

# No credentials needed for local files
source = filesystem(
    bucket_url="/data/exports",
    file_glob="**/*.json",  # Recursive
    extract_content=True
)

pipeline.run(source)

Custom File Processing

from dlt.sources.filesystem import filesystem

source = filesystem("s3://bucket/data", file_glob="*.json", extract_content=True)

@dlt.transformer(data_from=source, write_disposition="append")
def process_files(file_item):
    """
    Custom file processing logic.
    """
    import json

    # Access file metadata
    file_name = file_item["file_name"]
    file_size = file_item["size_in_bytes"]

    # Parse file content
    content = json.loads(file_item["file_content"])

    # Transform and yield data
    for record in content["records"]:
        yield {
            "source_file": file_name,
            "file_size": file_size,
            **record
        }

pipeline.run(process_files)

Incremental File Loading

from dlt.sources import incremental

source = filesystem(
    bucket_url="s3://bucket/logs",
    file_glob="*.log"
)

@dlt.resource(primary_key="file_url")
def incremental_files(
    file_item,
    modified=incremental("modification_date")
):
    """
    Only load new or modified files.
    """
    # Filter based on modification date
    if file_item["modification_date"] > modified.last_value:
        yield file_item

pipeline.run(incremental_files)

Reading Parquet Files

from dlt.sources.filesystem import filesystem, read_parquet

source = filesystem(
    bucket_url="s3://data-lake/tables",
    file_glob="*.parquet"
)

@dlt.transformer(data_from=source)
def parquet_tables(items):
    yield from read_parquet(items)

pipeline.run(parquet_tables)

Multiple File Types

from dlt.sources.filesystem import readers

# Load different file types
source = readers("s3://bucket/data")

# Select multiple readers
pipeline.run(
    source.with_resources("read_csv", "read_jsonl", "read_parquet")
)

Supported Storage Systems

  • Local Filesystem: file:///path or /path
  • Amazon S3: s3://bucket/path
  • Google Cloud Storage: gs://bucket/path
  • Azure Blob Storage: az://container/path or abfss://...
  • SFTP: sftp://hostname/path
  • HTTP/HTTPS: http:// or https:// (read-only)

Credential Configuration

Credentials can be provided via:

  1. Function arguments
  2. secrets.toml file
  3. Environment variables
  4. dlt configuration system

Example secrets.toml:

[sources.filesystem.credentials]
aws_access_key_id = "AKIA..."
aws_secret_access_key = "secret..."
region_name = "us-east-1"