CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-storage-file-datalake

Microsoft Azure File DataLake Storage Client Library for Python

Overall
score

92%

Overview
Eval results
Files

file-operations.mddocs/

File Operations

File-specific operations for uploading, downloading, appending data, and managing file properties and metadata. The DataLakeFileClient provides comprehensive file management capabilities including streaming operations and query functionality.

Capabilities

DataLakeFileClient

Client to interact with a specific file, providing operations for data upload/download, append operations, and file management. Inherits path-based operations from the underlying PathClient.

class DataLakeFileClient:
    """
    A client to interact with a specific file in Azure Data Lake Storage Gen2.
    
    Attributes:
        url (str): The full endpoint URL to the file, including SAS token if used
        primary_endpoint (str): The full primary endpoint URL
        primary_hostname (str): The hostname of the primary endpoint
        file_system_name (str): Name of the file system
        path_name (str): Path to the file
    """
    
    def __init__(
        self,
        account_url: str,
        file_system_name: str,
        file_path: str,
        credential=None,
        **kwargs
    ):
        """
        Initialize the DataLakeFileClient.
        
        Args:
            account_url (str): The URL to the DataLake storage account
            file_system_name (str): Name of the file system
            file_path (str): Path to the file
            credential: Authentication credential
            **kwargs: Additional client configuration options
        """
    
    @classmethod
    def from_connection_string(
        cls,
        conn_str: str,
        file_system_name: str,
        file_path: str,
        credential=None,
        **kwargs
    ) -> 'DataLakeFileClient':
        """
        Create DataLakeFileClient from connection string.
        
        Args:
            conn_str (str): Connection string for the storage account
            file_system_name (str): Name of the file system
            file_path (str): Path to the file
            credential: Optional credential to override connection string auth
            **kwargs: Additional client configuration options
            
        Returns:
            DataLakeFileClient: The file client instance
        """

Usage Examples:

from azure.storage.filedatalake import DataLakeFileClient

# Create client directly
file_client = DataLakeFileClient(
    account_url="https://mystorageaccount.dfs.core.windows.net",
    file_system_name="myfilesystem",
    file_path="data/analytics/results.json",
    credential="<account_key>"
)

# From connection string
file_client = DataLakeFileClient.from_connection_string(
    "DefaultEndpointsProtocol=https;AccountName=mystorageaccount;AccountKey=<key>",
    file_system_name="myfilesystem",
    file_path="data/analytics/results.json"
)

File Management

Core operations for creating, deleting, and managing the file itself.

def create_file(self, **kwargs) -> Dict[str, Any]:
    """
    Create the file.
    
    Args:
        content_settings (ContentSettings, optional): Content settings for the file
        metadata (dict, optional): Metadata key-value pairs
        permissions (str, optional): POSIX permissions in octal format
        umask (str, optional): POSIX umask for permission calculation
        **kwargs: Additional options including conditions and CPK
        
    Returns:
        dict: File creation response headers including etag and last_modified
    """

def delete_file(self, **kwargs) -> None:
    """
    Delete the file.
    
    Args:
        **kwargs: Additional options including conditions
    """

def exists(self, **kwargs) -> bool:
    """
    Check if the file exists.
    
    Args:
        **kwargs: Additional options
        
    Returns:
        bool: True if file exists, False otherwise
    """

def get_file_properties(self, **kwargs) -> FileProperties:
    """
    Get file properties and metadata.
    
    Args:
        **kwargs: Additional options including conditions and user principal names
        
    Returns:
        FileProperties: Properties of the file including size, metadata, etag, permissions
    """

def rename_file(
    self,
    new_name: str,
    **kwargs
) -> DataLakeFileClient:
    """
    Rename the file.
    
    Args:
        new_name (str): New name/path for the file
        content_settings (ContentSettings, optional): Content settings for renamed file
        metadata (dict, optional): Metadata for renamed file
        **kwargs: Additional options including conditions
        
    Returns:
        DataLakeFileClient: Client for the renamed file
    """

Data Upload Operations

Operations for uploading and writing data to files.

def upload_data(
    self,
    data,
    length: int = None,
    overwrite: bool = False,
    **kwargs
) -> Dict[str, Any]:
    """
    Upload data to the file, creating it if it doesn't exist.
    
    Args:
        data: Data to upload (bytes, str, or file-like object)
        length (int, optional): Length of the data in bytes
        overwrite (bool): Whether to overwrite existing file
        **kwargs: Additional options including content settings, metadata, conditions
        
    Returns:
        dict: Upload response headers including etag and last_modified
    """

def append_data(
    self,
    data,
    offset: int,
    length: int = None,
    **kwargs
) -> Dict[str, Any]:
    """
    Append data to the file at the specified offset.
    
    Args:
        data: Data to append (bytes, str, or file-like object)
        offset (int): Byte offset where data should be appended
        length (int, optional): Length of the data in bytes
        **kwargs: Additional options including validate_content and lease conditions
        
    Returns:
        dict: Append response headers
    """

def flush_data(
    self,
    offset: int,
    retain_uncommitted_data: bool = False,
    **kwargs
) -> Dict[str, Any]:
    """
    Commit previously appended data to the file.
    
    Args:
        offset (int): Offset equal to the length of the file after committing data
        retain_uncommitted_data (bool): Whether to retain uncommitted data after flush
        **kwargs: Additional options including conditions
        
    Returns:
        dict: Flush response headers including etag and last_modified
    """

def flush_data(
    self,
    offset: int,
    retain_uncommitted_data: bool = False,
    **kwargs
) -> Dict[str, Any]:
    """
    Flush (commit) previously appended data to the file.
    
    Args:
        offset (int): Byte offset to flush up to
        retain_uncommitted_data (bool): Whether to retain uncommitted data beyond offset
        **kwargs: Additional options including content settings, conditions
        
    Returns:
        dict: Flush response headers including etag and last_modified
    """

Data Download Operations

Operations for downloading and reading data from files.

def download_file(self, **kwargs) -> StorageStreamDownloader:
    """
    Download the file content as a stream.
    
    Args:
        offset (int, optional): Start position for download
        length (int, optional): Number of bytes to download
        **kwargs: Additional options including conditions and CPK
        
    Returns:
        StorageStreamDownloader: Stream downloader for reading file content
    """

def read_file(self, **kwargs) -> bytes:
    """
    Download and return the entire file content as bytes.
    
    Args:
        offset (int, optional): Start position for download
        length (int, optional): Number of bytes to download
        **kwargs: Additional options including conditions and CPK
        
    Returns:
        bytes: Complete file content
    """

Query Operations

Operations for querying structured data within files using SQL-like syntax.

def query_file(
    self,
    query_expression: str,
    **kwargs
) -> DataLakeFileQueryReader:
    """
    Query file content using SQL-like expressions.
    
    Args:
        query_expression (str): SQL-like query expression (e.g., "SELECT * FROM BlobStorage")
        file_format (QuickQueryDialect, optional): File format (CSV, JSON, Arrow, Parquet)
        on_error (Callable, optional): Function to handle query errors
        **kwargs: Additional options including input/output serialization settings
        
    Returns:
        DataLakeFileQueryReader: Query reader for streaming results
    """

Access Control Management

Operations for managing file-level access control and permissions.

def get_access_control(self, **kwargs) -> Dict[str, Any]:
    """
    Get access control properties for the file.
    
    Args:
        upn (bool, optional): Return user principal names instead of object IDs
        **kwargs: Additional options including conditions
        
    Returns:
        dict: Access control information including ACL, group, owner, permissions
    """

def set_access_control(
    self,
    owner: str = None,
    group: str = None,
    permissions: str = None,
    acl: str = None,
    **kwargs
) -> Dict[str, Any]:
    """
    Set access control properties for the file.
    
    Args:
        owner (str, optional): Owner user ID or principal name
        group (str, optional): Owning group ID or principal name
        permissions (str, optional): POSIX permissions in octal format
        acl (str, optional): Access control list in POSIX format
        **kwargs: Additional options including conditions
        
    Returns:
        dict: Response headers including etag and last_modified
    """

StorageStreamDownloader

Streaming downloader for efficiently handling large file downloads and query results. Returned by download_file() and query_file() operations.

class StorageStreamDownloader:
    """
    A streaming object to download from Azure Storage.
    
    Attributes:
        name (str): The name of the file being downloaded
        properties (FileProperties): The properties of the file being downloaded
        size (int): The size of the total data in the stream
    """
    
    def readall(self) -> bytes:
        """
        Download the contents of this file.
        
        This operation is blocking until all data is downloaded.
        
        Returns:
            bytes: The contents of the specified file
        """
    
    def readinto(self, stream) -> int:
        """
        Download the contents of this file to a stream.
        
        Args:
            stream: The stream to download to. This can be an open file-handle,
                   or any writable stream. The stream must be seekable if the 
                   download uses more than one parallel connection.
                   
        Returns:
            int: The number of bytes read
        """
    
    def read(self, size: int = -1) -> bytes:
        """
        Read up to size bytes from the stream and return them.
        
        Args:
            size (int): The number of bytes to download from the stream. 
                       Leave unspecified or set to -1 to download all bytes.
                       
        Returns:
            bytes: The requested data as bytes. If the return value is empty, 
                   there is no more data to read.
        """
    
    def chunks(self):
        """
        Iterate over chunks in the download stream.
        
        Note: The iterator returned will iterate over the entire download content,
        regardless of any data that was previously read.
        
        Returns:
            Iterator[bytes]: An iterator containing the chunks in the download stream
        """
    
    def __len__(self) -> int:
        """
        Returns the size of the download stream.
        
        Returns:
            int: The size of the stream
        """
    
    def read(self, size: int = -1) -> bytes:
        """
        Download and return up to size bytes.
        
        Args:
            size (int): Maximum number of bytes to read (-1 for all)
            
        Returns:
            bytes: Downloaded content
        """
    
    def __iter__(self):
        """Iterate over the content in chunks."""
    
    def __enter__(self) -> 'StorageStreamDownloader':
        """Context manager entry."""
    
    def __exit__(self, *args) -> None:
        """Context manager exit."""

Usage Examples:

import json
from azure.storage.filedatalake import DataLakeFileClient, ContentSettings

# Create a file client
file_client = DataLakeFileClient(
    account_url="https://mystorageaccount.dfs.core.windows.net",
    file_system_name="myfilesystem",
    file_path="data/results.json",
    credential="<account_key>"
)

# Upload JSON data
data = {"results": [1, 2, 3], "timestamp": "2023-01-01T00:00:00Z"}
json_data = json.dumps(data)

file_client.upload_data(
    json_data,
    overwrite=True,
    content_settings=ContentSettings(content_type="application/json"),
    metadata={"format": "json", "version": "1.0"}
)

# Download the file
download_stream = file_client.download_file()
content = download_stream.readall()
downloaded_data = json.loads(content.decode())
print(f"Downloaded: {downloaded_data}")

# Append data to an existing file
log_client = DataLakeFileClient(
    account_url="https://mystorageaccount.dfs.core.windows.net",
    file_system_name="myfilesystem", 
    file_path="logs/app.log",
    credential="<account_key>"
)

# Get current file size for append offset
properties = log_client.get_file_properties()
current_size = properties.size

# Append new log entry
new_entry = "\n2023-01-01 12:00:00 INFO: Application started"
log_client.append_data(new_entry.encode(), offset=current_size)
log_client.flush_data(offset=current_size + len(new_entry.encode()))

# Query CSV file data
csv_client = DataLakeFileClient(
    account_url="https://mystorageaccount.dfs.core.windows.net",
    file_system_name="myfilesystem",
    file_path="data/sales.csv", 
    credential="<account_key>"
)

# Query for specific records
query_result = csv_client.query_file(
    "SELECT * FROM BlobStorage WHERE amount > 1000"
)

# Process query results
with query_result as stream:
    for chunk in stream:
        print(chunk.decode())

Install with Tessl CLI

npx tessl i tessl/pypi-azure-storage-file-datalake

docs

access-control-security.md

directory-operations.md

file-operations.md

file-system-operations.md

index.md

models-types.md

service-operations.md

tile.json