Microsoft Azure File DataLake Storage Client Library for Python
Overall
score
92%
File-specific operations for uploading, downloading, appending data, and managing file properties and metadata. The DataLakeFileClient provides comprehensive file management capabilities including streaming operations and query functionality.
Client to interact with a specific file, providing operations for data upload/download, append operations, and file management. Inherits path-based operations from the underlying PathClient.
class DataLakeFileClient:
"""
A client to interact with a specific file in Azure Data Lake Storage Gen2.
Attributes:
url (str): The full endpoint URL to the file, including SAS token if used
primary_endpoint (str): The full primary endpoint URL
primary_hostname (str): The hostname of the primary endpoint
file_system_name (str): Name of the file system
path_name (str): Path to the file
"""
def __init__(
self,
account_url: str,
file_system_name: str,
file_path: str,
credential=None,
**kwargs
):
"""
Initialize the DataLakeFileClient.
Args:
account_url (str): The URL to the DataLake storage account
file_system_name (str): Name of the file system
file_path (str): Path to the file
credential: Authentication credential
**kwargs: Additional client configuration options
"""
@classmethod
def from_connection_string(
cls,
conn_str: str,
file_system_name: str,
file_path: str,
credential=None,
**kwargs
) -> 'DataLakeFileClient':
"""
Create DataLakeFileClient from connection string.
Args:
conn_str (str): Connection string for the storage account
file_system_name (str): Name of the file system
file_path (str): Path to the file
credential: Optional credential to override connection string auth
**kwargs: Additional client configuration options
Returns:
DataLakeFileClient: The file client instance
"""Usage Examples:
from azure.storage.filedatalake import DataLakeFileClient
# Create client directly
file_client = DataLakeFileClient(
account_url="https://mystorageaccount.dfs.core.windows.net",
file_system_name="myfilesystem",
file_path="data/analytics/results.json",
credential="<account_key>"
)
# From connection string
file_client = DataLakeFileClient.from_connection_string(
"DefaultEndpointsProtocol=https;AccountName=mystorageaccount;AccountKey=<key>",
file_system_name="myfilesystem",
file_path="data/analytics/results.json"
)Core operations for creating, deleting, and managing the file itself.
def create_file(self, **kwargs) -> Dict[str, Any]:
"""
Create the file.
Args:
content_settings (ContentSettings, optional): Content settings for the file
metadata (dict, optional): Metadata key-value pairs
permissions (str, optional): POSIX permissions in octal format
umask (str, optional): POSIX umask for permission calculation
**kwargs: Additional options including conditions and CPK
Returns:
dict: File creation response headers including etag and last_modified
"""
def delete_file(self, **kwargs) -> None:
"""
Delete the file.
Args:
**kwargs: Additional options including conditions
"""
def exists(self, **kwargs) -> bool:
"""
Check if the file exists.
Args:
**kwargs: Additional options
Returns:
bool: True if file exists, False otherwise
"""
def get_file_properties(self, **kwargs) -> FileProperties:
"""
Get file properties and metadata.
Args:
**kwargs: Additional options including conditions and user principal names
Returns:
FileProperties: Properties of the file including size, metadata, etag, permissions
"""
def rename_file(
self,
new_name: str,
**kwargs
) -> DataLakeFileClient:
"""
Rename the file.
Args:
new_name (str): New name/path for the file
content_settings (ContentSettings, optional): Content settings for renamed file
metadata (dict, optional): Metadata for renamed file
**kwargs: Additional options including conditions
Returns:
DataLakeFileClient: Client for the renamed file
"""Operations for uploading and writing data to files.
def upload_data(
self,
data,
length: int = None,
overwrite: bool = False,
**kwargs
) -> Dict[str, Any]:
"""
Upload data to the file, creating it if it doesn't exist.
Args:
data: Data to upload (bytes, str, or file-like object)
length (int, optional): Length of the data in bytes
overwrite (bool): Whether to overwrite existing file
**kwargs: Additional options including content settings, metadata, conditions
Returns:
dict: Upload response headers including etag and last_modified
"""
def append_data(
self,
data,
offset: int,
length: int = None,
**kwargs
) -> Dict[str, Any]:
"""
Append data to the file at the specified offset.
Args:
data: Data to append (bytes, str, or file-like object)
offset (int): Byte offset where data should be appended
length (int, optional): Length of the data in bytes
**kwargs: Additional options including validate_content and lease conditions
Returns:
dict: Append response headers
"""
def flush_data(
self,
offset: int,
retain_uncommitted_data: bool = False,
**kwargs
) -> Dict[str, Any]:
"""
Commit previously appended data to the file.
Args:
offset (int): Offset equal to the length of the file after committing data
retain_uncommitted_data (bool): Whether to retain uncommitted data after flush
**kwargs: Additional options including conditions
Returns:
dict: Flush response headers including etag and last_modified
"""
def flush_data(
self,
offset: int,
retain_uncommitted_data: bool = False,
**kwargs
) -> Dict[str, Any]:
"""
Flush (commit) previously appended data to the file.
Args:
offset (int): Byte offset to flush up to
retain_uncommitted_data (bool): Whether to retain uncommitted data beyond offset
**kwargs: Additional options including content settings, conditions
Returns:
dict: Flush response headers including etag and last_modified
"""Operations for downloading and reading data from files.
def download_file(self, **kwargs) -> StorageStreamDownloader:
"""
Download the file content as a stream.
Args:
offset (int, optional): Start position for download
length (int, optional): Number of bytes to download
**kwargs: Additional options including conditions and CPK
Returns:
StorageStreamDownloader: Stream downloader for reading file content
"""
def read_file(self, **kwargs) -> bytes:
"""
Download and return the entire file content as bytes.
Args:
offset (int, optional): Start position for download
length (int, optional): Number of bytes to download
**kwargs: Additional options including conditions and CPK
Returns:
bytes: Complete file content
"""Operations for querying structured data within files using SQL-like syntax.
def query_file(
self,
query_expression: str,
**kwargs
) -> DataLakeFileQueryReader:
"""
Query file content using SQL-like expressions.
Args:
query_expression (str): SQL-like query expression (e.g., "SELECT * FROM BlobStorage")
file_format (QuickQueryDialect, optional): File format (CSV, JSON, Arrow, Parquet)
on_error (Callable, optional): Function to handle query errors
**kwargs: Additional options including input/output serialization settings
Returns:
DataLakeFileQueryReader: Query reader for streaming results
"""Operations for managing file-level access control and permissions.
def get_access_control(self, **kwargs) -> Dict[str, Any]:
"""
Get access control properties for the file.
Args:
upn (bool, optional): Return user principal names instead of object IDs
**kwargs: Additional options including conditions
Returns:
dict: Access control information including ACL, group, owner, permissions
"""
def set_access_control(
self,
owner: str = None,
group: str = None,
permissions: str = None,
acl: str = None,
**kwargs
) -> Dict[str, Any]:
"""
Set access control properties for the file.
Args:
owner (str, optional): Owner user ID or principal name
group (str, optional): Owning group ID or principal name
permissions (str, optional): POSIX permissions in octal format
acl (str, optional): Access control list in POSIX format
**kwargs: Additional options including conditions
Returns:
dict: Response headers including etag and last_modified
"""Streaming downloader for efficiently handling large file downloads and query results. Returned by download_file() and query_file() operations.
class StorageStreamDownloader:
"""
A streaming object to download from Azure Storage.
Attributes:
name (str): The name of the file being downloaded
properties (FileProperties): The properties of the file being downloaded
size (int): The size of the total data in the stream
"""
def readall(self) -> bytes:
"""
Download the contents of this file.
This operation is blocking until all data is downloaded.
Returns:
bytes: The contents of the specified file
"""
def readinto(self, stream) -> int:
"""
Download the contents of this file to a stream.
Args:
stream: The stream to download to. This can be an open file-handle,
or any writable stream. The stream must be seekable if the
download uses more than one parallel connection.
Returns:
int: The number of bytes read
"""
def read(self, size: int = -1) -> bytes:
"""
Read up to size bytes from the stream and return them.
Args:
size (int): The number of bytes to download from the stream.
Leave unspecified or set to -1 to download all bytes.
Returns:
bytes: The requested data as bytes. If the return value is empty,
there is no more data to read.
"""
def chunks(self):
"""
Iterate over chunks in the download stream.
Note: The iterator returned will iterate over the entire download content,
regardless of any data that was previously read.
Returns:
Iterator[bytes]: An iterator containing the chunks in the download stream
"""
def __len__(self) -> int:
"""
Returns the size of the download stream.
Returns:
int: The size of the stream
"""
def read(self, size: int = -1) -> bytes:
"""
Download and return up to size bytes.
Args:
size (int): Maximum number of bytes to read (-1 for all)
Returns:
bytes: Downloaded content
"""
def __iter__(self):
"""Iterate over the content in chunks."""
def __enter__(self) -> 'StorageStreamDownloader':
"""Context manager entry."""
def __exit__(self, *args) -> None:
"""Context manager exit."""Usage Examples:
import json
from azure.storage.filedatalake import DataLakeFileClient, ContentSettings
# Create a file client
file_client = DataLakeFileClient(
account_url="https://mystorageaccount.dfs.core.windows.net",
file_system_name="myfilesystem",
file_path="data/results.json",
credential="<account_key>"
)
# Upload JSON data
data = {"results": [1, 2, 3], "timestamp": "2023-01-01T00:00:00Z"}
json_data = json.dumps(data)
file_client.upload_data(
json_data,
overwrite=True,
content_settings=ContentSettings(content_type="application/json"),
metadata={"format": "json", "version": "1.0"}
)
# Download the file
download_stream = file_client.download_file()
content = download_stream.readall()
downloaded_data = json.loads(content.decode())
print(f"Downloaded: {downloaded_data}")
# Append data to an existing file
log_client = DataLakeFileClient(
account_url="https://mystorageaccount.dfs.core.windows.net",
file_system_name="myfilesystem",
file_path="logs/app.log",
credential="<account_key>"
)
# Get current file size for append offset
properties = log_client.get_file_properties()
current_size = properties.size
# Append new log entry
new_entry = "\n2023-01-01 12:00:00 INFO: Application started"
log_client.append_data(new_entry.encode(), offset=current_size)
log_client.flush_data(offset=current_size + len(new_entry.encode()))
# Query CSV file data
csv_client = DataLakeFileClient(
account_url="https://mystorageaccount.dfs.core.windows.net",
file_system_name="myfilesystem",
file_path="data/sales.csv",
credential="<account_key>"
)
# Query for specific records
query_result = csv_client.query_file(
"SELECT * FROM BlobStorage WHERE amount > 1000"
)
# Process query results
with query_result as stream:
for chunk in stream:
print(chunk.decode())Install with Tessl CLI
npx tessl i tessl/pypi-azure-storage-file-datalakedocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10