MinIO Python SDK for Amazon S3 Compatible Cloud Storage
—
Advanced S3 features including presigned URLs for secure temporary access, SQL SELECT queries for object content filtering, bucket notifications for event-driven workflows, and sophisticated object composition operations.
Generate temporary URLs for secure access to objects without exposing credentials. Useful for web applications, direct uploads, and temporary sharing.
def presigned_get_object(
self,
bucket_name: str,
object_name: str,
expires: datetime.timedelta = datetime.timedelta(days=7),
response_headers: dict[str, str] | None = None,
request_date: datetime.datetime | None = None,
version_id: str | None = None,
extra_query_params: dict[str, str] | None = None
) -> str:
"""
Generate a presigned URL for GET operations.
Args:
bucket_name: Name of the bucket
object_name: Name of the object
expires: Expiration time for the URL (max 7 days)
response_headers: Headers to override in response
request_date: Date for the request (defaults to current time)
version_id: Specific version to access
extra_query_params: Additional query parameters
Returns:
Presigned URL string
Raises:
S3Error: If URL generation fails
"""
def presigned_put_object(
self,
bucket_name: str,
object_name: str,
expires: datetime.timedelta = datetime.timedelta(days=7),
response_headers: dict[str, str] | None = None,
request_date: datetime.datetime | None = None
) -> str:
"""
Generate a presigned URL for PUT operations.
Args:
bucket_name: Name of the bucket
object_name: Name of the object
expires: Expiration time for the URL (max 7 days)
response_headers: Headers to override in response
request_date: Date for the request (defaults to current time)
Returns:
Presigned URL string
Raises:
S3Error: If URL generation fails
"""
def presigned_post_policy(self, policy: PostPolicy) -> dict[str, str]:
"""
Generate presigned POST policy for browser-based uploads.
Args:
policy: PostPolicy object with upload constraints
Returns:
Dictionary containing form fields for HTML form
Raises:
S3Error: If policy generation fails
"""Query object content using SQL expressions without downloading entire objects. Supports CSV, JSON, and Parquet formats.
def select_object_content(
self,
bucket_name: str,
object_name: str,
request: SelectRequest
) -> SelectObjectReader:
"""
Select object content using SQL expressions.
Args:
bucket_name: Name of the bucket
object_name: Name of the object to query
request: SelectRequest with SQL expression and format specifications
Returns:
SelectObjectReader for streaming results
Raises:
S3Error: If select operation fails
"""Listen for real-time bucket events such as object creation, deletion, and modifications. Essential for event-driven architectures.
def listen_bucket_notification(
self,
bucket_name: str,
prefix: str = "",
suffix: str = "",
events: list[str] = ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"]
) -> EventIterable:
"""
Listen for bucket notifications in real-time.
Args:
bucket_name: Name of the bucket to monitor
prefix: Filter events by object name prefix
suffix: Filter events by object name suffix
events: List of S3 event types to monitor
Returns:
EventIterable for streaming notification events
Raises:
S3Error: If notification setup fails
"""
def set_bucket_notification(
self,
bucket_name: str,
config: NotificationConfig
) -> None:
"""
Set bucket notification configuration.
Args:
bucket_name: Name of the bucket
config: NotificationConfig with destination settings
Raises:
S3Error: If configuration fails
"""
def get_bucket_notification(self, bucket_name: str) -> NotificationConfig:
"""
Get current bucket notification configuration.
Args:
bucket_name: Name of the bucket
Returns:
NotificationConfig with current settings
Raises:
S3Error: If retrieval fails
"""
def delete_bucket_notification(self, bucket_name: str) -> None:
"""
Remove bucket notification configuration.
Args:
bucket_name: Name of the bucket
Raises:
S3Error: If removal fails
"""Configure client behavior including endpoints, tracing, and advanced features.
def set_app_info(self, app_name: str, app_version: str) -> None:
"""
Set application information for user agent string.
Args:
app_name: Name of the application
app_version: Version of the application
"""
def trace_on(self, stream: TextIO) -> None:
"""
Enable HTTP request/response tracing.
Args:
stream: Text stream to write trace output to
"""
def trace_off(self) -> None:
"""
Disable HTTP request/response tracing.
"""
def enable_accelerate_endpoint(self) -> None:
"""
Enable S3 transfer acceleration endpoint.
"""
def disable_accelerate_endpoint(self) -> None:
"""
Disable S3 transfer acceleration endpoint.
"""
def enable_dualstack_endpoint(self) -> None:
"""
Enable dual-stack (IPv4/IPv6) endpoints.
"""
def disable_dualstack_endpoint(self) -> None:
"""
Disable dual-stack endpoints.
"""
def enable_virtual_style_endpoint(self) -> None:
"""
Enable virtual-style endpoint addressing.
"""
def disable_virtual_style_endpoint(self) -> None:
"""
Disable virtual-style endpoint addressing.
"""Efficient batch operations for handling multiple objects simultaneously.
def upload_snowball_objects(
self,
bucket_name: str,
object_list: Iterable[SnowballObject],
metadata: dict[str, str] | None = None,
sse: Sse | None = None,
tags: Tags | None = None,
retention: Retention | None = None,
legal_hold: bool = False,
staging_filename: str | None = None,
compression: bool = True
) -> ObjectWriteResult:
"""
Upload multiple objects in a single compressed archive.
Args:
bucket_name: Name of the bucket
object_list: Iterable of SnowballObject specifications
metadata: Common metadata for all objects
sse: Server-side encryption
tags: Common tags for all objects
retention: Retention configuration
legal_hold: Enable legal hold
staging_filename: Temporary file for staging
compression: Enable compression of archive
Returns:
ObjectWriteResult for the uploaded archive
Raises:
S3Error: If batch upload fails
"""class PostPolicy:
"""Policy configuration for presigned POST uploads."""
def set_bucket_name(self, bucket_name: str) -> None:
"""Set the bucket name for uploads."""
def set_bucket_name_starts_with(self, bucket_name_starts_with: str) -> None:
"""Set bucket name prefix constraint."""
def set_content_type(self, content_type: str) -> None:
"""Set allowed content type."""
def set_content_length_range(self, lower_limit: int, upper_limit: int) -> None:
"""Set file size constraints."""
def set_expires(self, expires: datetime.datetime) -> None:
"""Set policy expiration time."""
def set_key(self, object_name: str) -> None:
"""Set exact object name for uploads."""
def set_key_starts_with(self, key_starts_with: str) -> None:
"""Set object name prefix constraint."""class SelectRequest:
"""SQL select request configuration."""
def __init__(
self,
expression: str,
input_serialization: InputSerialization,
output_serialization: OutputSerialization,
request_progress: bool = False
) -> None: ...
expression: str
input_serialization: InputSerialization
output_serialization: OutputSerialization
request_progress: bool
class SelectObjectReader:
"""Reader for streaming select query results."""
def stream(self) -> Iterator[bytes]:
"""Stream selected records as bytes."""
def stats(self) -> dict[str, Any]:
"""Get selection statistics."""
class CSVInputSerialization:
"""CSV input format specification."""
def __init__(
self,
file_header_info: str = "USE",
record_delimiter: str = "\n",
field_delimiter: str = ",",
quote_character: str = '"',
quote_escape_character: str = '"',
comments: str = "#",
allow_quoted_record_delimiter: bool = False
) -> None: ...
class JSONInputSerialization:
"""JSON input format specification."""
def __init__(self, json_type: str = "DOCUMENT") -> None: ...
class ParquetInputSerialization:
"""Parquet input format specification."""
def __init__(self) -> None: ...
class CSVOutputSerialization:
"""CSV output format specification."""
def __init__(
self,
record_delimiter: str = "\n",
field_delimiter: str = ",",
quote_character: str = '"',
quote_escape_character: str = '"',
quote_fields: str = "ASNEEDED"
) -> None: ...
class JSONOutputSerialization:
"""JSON output format specification."""
def __init__(self, record_delimiter: str = "\n") -> None: ...class EventIterable:
"""Iterator for streaming bucket notification events."""
def __iter__(self) -> Iterator[dict[str, Any]]:
"""Iterate over notification events."""
class NotificationConfig:
"""Bucket notification configuration."""
def __init__(
self,
cloud_func_config_list: list[CloudFunctionConfig] | None = None,
queue_config_list: list[QueueConfig] | None = None,
topic_config_list: list[TopicConfig] | None = None
) -> None: ...class SnowballObject:
"""Specification for batch upload objects."""
def __init__(
self,
object_name: str,
filename: str | None = None,
data: io.IOBase | None = None,
length: int = -1,
metadata: dict[str, str] | None = None
) -> None: ...
object_name: str
filename: str | None
data: io.IOBase | None
length: int
metadata: dict[str, str] | Noneimport datetime
from minio import Minio
from minio.datatypes import PostPolicy
client = Minio("localhost:9000", "minio", "minio123")
# Generate presigned GET URL (valid for 1 hour)
url = client.presigned_get_object(
"my-bucket",
"my-object.jpg",
expires=datetime.timedelta(hours=1)
)
print(f"Download URL: {url}")
# Generate presigned PUT URL for uploads
upload_url = client.presigned_put_object(
"my-bucket",
"uploads/new-file.txt",
expires=datetime.timedelta(minutes=30)
)
print(f"Upload URL: {upload_url}")
# Generate POST policy for browser uploads
policy = PostPolicy()
policy.set_bucket_name("my-bucket")
policy.set_key_starts_with("uploads/")
policy.set_content_length_range(1024, 10*1024*1024) # 1KB to 10MB
policy.set_expires(datetime.datetime.utcnow() + datetime.timedelta(hours=1))
form_data = client.presigned_post_policy(policy)
print("Form fields for HTML upload:")
for key, value in form_data.items():
print(f"{key}: {value}")from minio.select import (
SelectRequest,
CSVInputSerialization,
CSVOutputSerialization
)
# Query CSV data with SQL
input_serialization = CSVInputSerialization(
file_header_info="USE",
record_delimiter="\n",
field_delimiter=","
)
output_serialization = CSVOutputSerialization(
record_delimiter="\n",
field_delimiter=","
)
request = SelectRequest(
expression="SELECT name, age FROM S3Object WHERE age > 25",
input_serialization=input_serialization,
output_serialization=output_serialization,
request_progress=True
)
try:
reader = client.select_object_content("data-bucket", "people.csv", request)
# Stream results
for data in reader.stream():
print(data.decode('utf-8'), end='')
# Get statistics
stats = reader.stats()
print(f"Bytes scanned: {stats.get('bytes_scanned', 0)}")
print(f"Bytes processed: {stats.get('bytes_processed', 0)}")
except Exception as e:
print(f"Select query failed: {e}")# Listen for bucket events
try:
events = client.listen_bucket_notification(
"my-bucket",
prefix="uploads/",
events=["s3:ObjectCreated:*", "s3:ObjectRemoved:*"]
)
for event in events:
for record in event.get("Records", []):
event_name = record.get("eventName")
bucket = record["s3"]["bucket"]["name"]
obj_key = record["s3"]["object"]["key"]
print(f"Event: {event_name} - {bucket}/{obj_key}")
except KeyboardInterrupt:
print("Stopped listening for events")
except Exception as e:
print(f"Notification error: {e}")from minio.commonconfig import SnowballObject
# Batch upload multiple small objects efficiently
objects_to_upload = [
SnowballObject("file1.txt", "/path/to/file1.txt"),
SnowballObject("file2.txt", "/path/to/file2.txt"),
SnowballObject("data/file3.txt", "/path/to/file3.txt"),
]
try:
result = client.upload_snowball_objects(
"my-bucket",
objects_to_upload,
compression=True
)
print(f"Batch upload completed: {result.etag}")
except Exception as e:
print(f"Batch upload failed: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-minio