CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-quilt3

Quilt manages data like code with packages, repositories, browsing and revision history for machine learning and data-driven domains

Pending
Overview
Eval results
Files

bucket-operations.mddocs/

S3 Bucket Operations

Direct S3 bucket interface for file operations, listing, searching, and SQL queries. Provides high-level abstractions over AWS S3 operations.

Type Imports

from typing import Union

Capabilities

Bucket Initialization

Create bucket interface for S3 operations.

class Bucket:
    def __init__(self, bucket_uri: str):
        """
        Creates a Bucket object.

        Parameters:
        - bucket_uri: URI of bucket to target. Must start with 's3://'

        Returns:
        A new Bucket object

        Raises:
        QuiltException: If bucket_uri is not an S3 URI or contains path/version ID
        """

File Upload Operations

Upload files and directories to S3 buckets.

class Bucket:
    def put_file(self, key: str, path: str):
        """
        Stores file at path to key in bucket.

        Parameters:
        - key: Key in bucket to store file at
        - path: String representing local path to file

        Raises:
        - FileNotFoundError: If no file exists at path
        - Exception: If copy fails
        """

    def put_dir(self, key: str, directory: str):
        """
        Stores all files in the directory under the prefix key.

        Parameters:
        - key: Prefix to store files under in bucket
        - directory: Path to directory to grab files from

        Raises:
        - QuiltException: If provided directory does not exist
        - Exception: If writing to bucket fails
        """

File Download Operations

Download files and directories from S3 buckets.

class Bucket:
    def fetch(self, key: str, path: str):
        """
        Fetches file (or files) at key to path.

        Parameters:
        - key: Key or prefix in bucket to fetch
        - path: Local path where files will be saved

        Returns:
        Local path where files were saved

        Note:
        If key refers to a directory (prefix), all files under that prefix are fetched
        """

Bucket Listing and Discovery

List and discover objects in S3 buckets.

class Bucket:
    def keys(self) -> list:
        """
        Lists all keys in the bucket.

        Returns:
        List of all object keys in the bucket
        """

    def ls(self, path: str = None, recursive: bool = False) -> tuple:
        """
        List data from the specified path.

        Parameters:
        - path: Path prefix to list (None for root)
        - recursive: Whether to list recursively

        Returns:
        Tuple of objects and prefixes in the specified path
        """

Object Deletion

Delete objects and directories from S3 buckets.

class Bucket:
    def delete(self, key: str):
        """
        Deletes a key from the bucket.

        Parameters:
        - key: Key to delete from bucket

        Note:
        This permanently deletes the object from S3
        """

    def delete_dir(self, path: str):
        """
        Delete a directory and all of its contents from the bucket.

        Parameters:
        - path: Directory path to delete recursively

        Note:
        This permanently deletes all objects under the specified prefix
        """

S3 Select and Search

Query data in S3 objects using SQL and search functionality.

class Bucket:
    def select(self, key: str, query: str, raw: bool = False):
        """
        Selects data from an S3 object using SQL query.

        Parameters:
        - key: S3 object key to query
        - query: SQL query string
        - raw: Whether to return raw results

        Returns:
        Query results (format depends on raw parameter and query)

        Supported formats:
        - CSV, JSON, Parquet files with SQL SELECT statements
        - Compression formats: GZIP, BZIP2
        """

    def search(self, query: Union[str, dict], limit: int = 10) -> list:
        """
        Execute a search against the configured search endpoint.

        Parameters:
        - query: Query string or DSL query body
        - limit: Maximum number of results to return

        Returns:
        List of search results

        Query Syntax:
        - String: Elasticsearch Query String Query syntax
        - Dict: Elasticsearch Query DSL
        """

Usage Examples

Basic Bucket Operations

import quilt3

# Create bucket interface
bucket = quilt3.Bucket("s3://my-data-bucket")

# Upload a single file
bucket.put_file("data/measurements.csv", "local/path/measurements.csv")

# Upload entire directory
bucket.put_dir("experiment_results/", "local/results/")

# List bucket contents
all_keys = bucket.keys()
print(f"Total objects: {len(all_keys)}")

# List with path prefix
data_files = bucket.ls("data/", recursive=True)
for item in data_files:
    print(f"Found: {item}")

File Download and Retrieval

# Download specific file
bucket.fetch("data/measurements.csv", "downloaded_measurements.csv")

# Download entire directory
bucket.fetch("experiment_results/", "local_results/")

# List directory contents first
contents = bucket.ls("data/")
for item in contents:
    print(f"Available: {item}")
    
# Download multiple files
for key in ["data/file1.csv", "data/file2.csv", "data/file3.csv"]:
    local_name = key.replace("/", "_")
    bucket.fetch(key, f"downloads/{local_name}")

S3 Select Queries

# Query CSV data
csv_query = """
SELECT customer_id, purchase_amount, purchase_date 
FROM S3Object[*] 
WHERE purchase_amount > 100 
LIMIT 1000
"""

results = bucket.select("sales/transactions.csv", csv_query)
print(f"Found {len(results)} high-value transactions")

# Query JSON data
json_query = """
SELECT s.user.name, s.event.type, s.timestamp
FROM S3Object[*].events[*] s
WHERE s.event.type = 'purchase'
"""

events = bucket.select("logs/user_events.json", json_query)
for event in events:
    print(f"Purchase by {event['name']} at {event['timestamp']}")

# Query Parquet data
parquet_query = """
SELECT product_category, AVG(price) as avg_price
FROM S3Object
WHERE date >= '2024-01-01'
GROUP BY product_category
"""

analytics = bucket.select("analytics/sales.parquet", parquet_query)
for row in analytics:
    print(f"{row['product_category']}: ${row['avg_price']:.2f} average")

Search Operations

# Simple text search
search_results = bucket.search("experiment temperature", limit=50)
for result in search_results:
    print(f"Found in: {result['_source']['key']}")

# Advanced search with Elasticsearch DSL
complex_query = {
    "query": {
        "bool": {
            "must": [
                {"term": {"file_type": "csv"}},
                {"range": {"file_size": {"gte": 1000000}}}
            ],
            "filter": [
                {"term": {"tags": "experiment"}}
            ]
        }
    },
    "sort": [{"modified_date": {"order": "desc"}}]
}

advanced_results = bucket.search(complex_query, limit=20)
for result in advanced_results:
    source = result['_source']
    print(f"Large CSV: {source['key']} ({source['file_size']} bytes)")

Bucket Management

# Delete specific files
bucket.delete("temp/old_data.csv")
bucket.delete("cache/expired_results.json")

# Delete entire directory
bucket.delete_dir("temp/")

# Batch operations
files_to_upload = [
    ("local/data1.csv", "processed/data1.csv"),
    ("local/data2.csv", "processed/data2.csv"),
    ("local/data3.csv", "processed/data3.csv")
]

for local_path, s3_key in files_to_upload:
    bucket.put_file(s3_key, local_path)
    print(f"Uploaded {local_path} to {s3_key}")

# Verify uploads
uploaded_files = bucket.ls("processed/")
print(f"Successfully uploaded {len(uploaded_files)} files")

Working with Large Objects

# Handle large file uploads with progress tracking
import os

def upload_large_file(local_path, s3_key):
    file_size = os.path.getsize(local_path)
    print(f"Uploading {file_size} byte file...")
    
    bucket.put_file(s3_key, local_path)
    print(f"Upload complete: {s3_key}")

# Stream large query results
large_query = """
SELECT * FROM S3Object
WHERE date BETWEEN '2023-01-01' AND '2023-12-31'
"""

# Process results in batches to manage memory
batch_size = 1000
offset = 0

while True:
    paginated_query = f"{large_query} LIMIT {batch_size} OFFSET {offset}"
    batch = bucket.select("large_dataset.csv", paginated_query)
    
    if not batch:
        break
        
    process_batch(batch)
    offset += batch_size
    print(f"Processed {offset} records...")

Install with Tessl CLI

npx tessl i tessl/pypi-quilt3

docs

admin.md

bucket-operations.md

config-session.md

data-access.md

hooks.md

index.md

package-management.md

registry-operations.md

tile.json