Quilt manages data like code with packages, repositories, browsing and revision history for machine learning and data-driven domains
—
Direct S3 bucket interface for file operations, listing, searching, and SQL queries. Provides high-level abstractions over AWS S3 operations.
from typing import UnionCreate bucket interface for S3 operations.
class Bucket:
def __init__(self, bucket_uri: str):
"""
Creates a Bucket object.
Parameters:
- bucket_uri: URI of bucket to target. Must start with 's3://'
Returns:
A new Bucket object
Raises:
QuiltException: If bucket_uri is not an S3 URI or contains path/version ID
"""Upload files and directories to S3 buckets.
class Bucket:
def put_file(self, key: str, path: str):
"""
Stores file at path to key in bucket.
Parameters:
- key: Key in bucket to store file at
- path: String representing local path to file
Raises:
- FileNotFoundError: If no file exists at path
- Exception: If copy fails
"""
def put_dir(self, key: str, directory: str):
"""
Stores all files in the directory under the prefix key.
Parameters:
- key: Prefix to store files under in bucket
- directory: Path to directory to grab files from
Raises:
- QuiltException: If provided directory does not exist
- Exception: If writing to bucket fails
"""Download files and directories from S3 buckets.
class Bucket:
def fetch(self, key: str, path: str):
"""
Fetches file (or files) at key to path.
Parameters:
- key: Key or prefix in bucket to fetch
- path: Local path where files will be saved
Returns:
Local path where files were saved
Note:
If key refers to a directory (prefix), all files under that prefix are fetched
"""List and discover objects in S3 buckets.
class Bucket:
def keys(self) -> list:
"""
Lists all keys in the bucket.
Returns:
List of all object keys in the bucket
"""
def ls(self, path: str = None, recursive: bool = False) -> tuple:
"""
List data from the specified path.
Parameters:
- path: Path prefix to list (None for root)
- recursive: Whether to list recursively
Returns:
Tuple of objects and prefixes in the specified path
"""Delete objects and directories from S3 buckets.
class Bucket:
def delete(self, key: str):
"""
Deletes a key from the bucket.
Parameters:
- key: Key to delete from bucket
Note:
This permanently deletes the object from S3
"""
def delete_dir(self, path: str):
"""
Delete a directory and all of its contents from the bucket.
Parameters:
- path: Directory path to delete recursively
Note:
This permanently deletes all objects under the specified prefix
"""Query data in S3 objects using SQL and search functionality.
class Bucket:
def select(self, key: str, query: str, raw: bool = False):
"""
Selects data from an S3 object using SQL query.
Parameters:
- key: S3 object key to query
- query: SQL query string
- raw: Whether to return raw results
Returns:
Query results (format depends on raw parameter and query)
Supported formats:
- CSV, JSON, Parquet files with SQL SELECT statements
- Compression formats: GZIP, BZIP2
"""
def search(self, query: Union[str, dict], limit: int = 10) -> list:
"""
Execute a search against the configured search endpoint.
Parameters:
- query: Query string or DSL query body
- limit: Maximum number of results to return
Returns:
List of search results
Query Syntax:
- String: Elasticsearch Query String Query syntax
- Dict: Elasticsearch Query DSL
"""import quilt3
# Create bucket interface
bucket = quilt3.Bucket("s3://my-data-bucket")
# Upload a single file
bucket.put_file("data/measurements.csv", "local/path/measurements.csv")
# Upload entire directory
bucket.put_dir("experiment_results/", "local/results/")
# List bucket contents
all_keys = bucket.keys()
print(f"Total objects: {len(all_keys)}")
# List with path prefix
data_files = bucket.ls("data/", recursive=True)
for item in data_files:
print(f"Found: {item}")# Download specific file
bucket.fetch("data/measurements.csv", "downloaded_measurements.csv")
# Download entire directory
bucket.fetch("experiment_results/", "local_results/")
# List directory contents first
contents = bucket.ls("data/")
for item in contents:
print(f"Available: {item}")
# Download multiple files
for key in ["data/file1.csv", "data/file2.csv", "data/file3.csv"]:
local_name = key.replace("/", "_")
bucket.fetch(key, f"downloads/{local_name}")# Query CSV data
csv_query = """
SELECT customer_id, purchase_amount, purchase_date
FROM S3Object[*]
WHERE purchase_amount > 100
LIMIT 1000
"""
results = bucket.select("sales/transactions.csv", csv_query)
print(f"Found {len(results)} high-value transactions")
# Query JSON data
json_query = """
SELECT s.user.name, s.event.type, s.timestamp
FROM S3Object[*].events[*] s
WHERE s.event.type = 'purchase'
"""
events = bucket.select("logs/user_events.json", json_query)
for event in events:
print(f"Purchase by {event['name']} at {event['timestamp']}")
# Query Parquet data
parquet_query = """
SELECT product_category, AVG(price) as avg_price
FROM S3Object
WHERE date >= '2024-01-01'
GROUP BY product_category
"""
analytics = bucket.select("analytics/sales.parquet", parquet_query)
for row in analytics:
print(f"{row['product_category']}: ${row['avg_price']:.2f} average")# Simple text search
search_results = bucket.search("experiment temperature", limit=50)
for result in search_results:
print(f"Found in: {result['_source']['key']}")
# Advanced search with Elasticsearch DSL
complex_query = {
"query": {
"bool": {
"must": [
{"term": {"file_type": "csv"}},
{"range": {"file_size": {"gte": 1000000}}}
],
"filter": [
{"term": {"tags": "experiment"}}
]
}
},
"sort": [{"modified_date": {"order": "desc"}}]
}
advanced_results = bucket.search(complex_query, limit=20)
for result in advanced_results:
source = result['_source']
print(f"Large CSV: {source['key']} ({source['file_size']} bytes)")# Delete specific files
bucket.delete("temp/old_data.csv")
bucket.delete("cache/expired_results.json")
# Delete entire directory
bucket.delete_dir("temp/")
# Batch operations
files_to_upload = [
("local/data1.csv", "processed/data1.csv"),
("local/data2.csv", "processed/data2.csv"),
("local/data3.csv", "processed/data3.csv")
]
for local_path, s3_key in files_to_upload:
bucket.put_file(s3_key, local_path)
print(f"Uploaded {local_path} to {s3_key}")
# Verify uploads
uploaded_files = bucket.ls("processed/")
print(f"Successfully uploaded {len(uploaded_files)} files")# Handle large file uploads with progress tracking
import os
def upload_large_file(local_path, s3_key):
file_size = os.path.getsize(local_path)
print(f"Uploading {file_size} byte file...")
bucket.put_file(s3_key, local_path)
print(f"Upload complete: {s3_key}")
# Stream large query results
large_query = """
SELECT * FROM S3Object
WHERE date BETWEEN '2023-01-01' AND '2023-12-31'
"""
# Process results in batches to manage memory
batch_size = 1000
offset = 0
while True:
paginated_query = f"{large_query} LIMIT {batch_size} OFFSET {offset}"
batch = bucket.select("large_dataset.csv", paginated_query)
if not batch:
break
process_batch(batch)
offset += batch_size
print(f"Processed {offset} records...")Install with Tessl CLI
npx tessl i tessl/pypi-quilt3