Microsoft Azure Batch Client Library for Python providing comprehensive APIs for managing batch computing workloads in Azure cloud
91
File management capabilities for accessing, downloading, and managing files on compute nodes and task outputs. This includes retrieving task output files, log files, and other files created during task execution.
Manage files associated with specific tasks including output files, logs, and working directory contents.
def list_from_task(job_id, task_id, file_list_from_task_options=None, custom_headers=None, raw=False, **operation_config):
"""
List files associated with the specified task.
Args:
job_id: ID of the job containing the task
task_id: ID of the task
file_list_from_task_options: Additional options for listing including recursive
Returns:
ItemPaged[FileProperties]: Paginated list of files
"""
def get_from_task(job_id, task_id, file_path, file_get_from_task_options=None, custom_headers=None, raw=False, **operation_config):
"""
Get the content of the specified task file.
Args:
job_id: ID of the job containing the task
task_id: ID of the task
file_path: Path to the file relative to task working directory
file_get_from_task_options: Additional options including byte range
Returns:
Stream: File content stream
"""
def get_properties_from_task(job_id, task_id, file_path, file_get_properties_from_task_options=None, custom_headers=None, raw=False, **operation_config):
"""
Get properties of the specified task file.
Args:
job_id: ID of the job containing the task
task_id: ID of the task
file_path: Path to the file relative to task working directory
file_get_properties_from_task_options: Additional options
Returns:
None (properties returned in response headers)
"""
def delete_from_task(job_id, task_id, file_path, file_delete_from_task_options=None, custom_headers=None, raw=False, **operation_config):
"""
Delete the specified task file.
Args:
job_id: ID of the job containing the task
task_id: ID of the task
file_path: Path to the file relative to task working directory
file_delete_from_task_options: Additional options
Returns:
None
"""Manage files on compute nodes including system files, application files, and shared data.
def list_from_compute_node(pool_id, node_id, file_list_from_compute_node_options=None, custom_headers=None, raw=False, **operation_config):
"""
List files on the specified compute node.
Args:
pool_id: ID of the pool containing the node
node_id: ID of the compute node
file_list_from_compute_node_options: Additional options for listing including recursive
Returns:
ItemPaged[FileProperties]: Paginated list of files
"""
def get_from_compute_node(pool_id, node_id, file_path, file_get_from_compute_node_options=None, custom_headers=None, raw=False, **operation_config):
"""
Get the content of the specified file from a compute node.
Args:
pool_id: ID of the pool containing the node
node_id: ID of the compute node
file_path: Path to the file on the compute node
file_get_from_compute_node_options: Additional options including byte range
Returns:
Stream: File content stream
"""
def get_properties_from_compute_node(pool_id, node_id, file_path, file_get_properties_from_compute_node_options=None, custom_headers=None, raw=False, **operation_config):
"""
Get properties of the specified file on a compute node.
Args:
pool_id: ID of the pool containing the node
node_id: ID of the compute node
file_path: Path to the file on the compute node
file_get_properties_from_compute_node_options: Additional options
Returns:
None (properties returned in response headers)
"""
def delete_from_compute_node(pool_id, node_id, file_path, file_delete_from_compute_node_options=None, custom_headers=None, raw=False, **operation_config):
"""
Delete the specified file from a compute node.
Args:
pool_id: ID of the pool containing the node
node_id: ID of the compute node
file_path: Path to the file on the compute node
file_delete_from_compute_node_options: Additional options
Returns:
None
"""# List all files for a task
files = client.file.list_from_task("my-job", "task-001")
for file_info in files:
print(f"File: {file_info.name}")
print(f" Size: {file_info.properties.content_length} bytes")
print(f" Modified: {file_info.properties.last_modified}")
print(f" Type: {'Directory' if file_info.is_directory else 'File'}")
# List files recursively in subdirectories
from azure.batch.models import FileListFromTaskOptions
list_options = FileListFromTaskOptions(recursive=True)
files = client.file.list_from_task("my-job", "task-001", list_options)
# Download stdout and stderr files
stdout_content = client.file.get_from_task("my-job", "task-001", "stdout.txt")
with open("local_stdout.txt", "wb") as f:
for chunk in stdout_content:
f.write(chunk)
stderr_content = client.file.get_from_task("my-job", "task-001", "stderr.txt")
with open("local_stderr.txt", "wb") as f:
for chunk in stderr_content:
f.write(chunk)
# Download a specific output file
output_file = client.file.get_from_task("my-job", "task-001", "results/output.json")
with open("output.json", "wb") as f:
for chunk in output_file:
f.write(chunk)from azure.batch.models import FileGetFromTaskOptions
# Get file properties without downloading content
client.file.get_properties_from_task("my-job", "task-001", "large_output.txt")
# Download only part of a large file (first 1KB)
get_options = FileGetFromTaskOptions(
ocp_range="bytes=0-1023" # First 1024 bytes
)
partial_content = client.file.get_from_task(
"my-job", "task-001", "large_output.txt", get_options
)
with open("partial_output.txt", "wb") as f:
for chunk in partial_content:
f.write(chunk)
# Get last 500 bytes of a log file
get_options = FileGetFromTaskOptions(
ocp_range="bytes=-500" # Last 500 bytes
)
log_tail = client.file.get_from_task(
"my-job", "task-001", "application.log", get_options
)# List files on a compute node
node_files = client.file.list_from_compute_node("my-pool", "tvm-123456789")
for file_info in node_files:
if not file_info.is_directory:
print(f"Node file: {file_info.name} ({file_info.properties.content_length} bytes)")
# Download a file from the compute node
node_file = client.file.get_from_compute_node(
"my-pool", "tvm-123456789", "shared/data/input.txt"
)
with open("downloaded_input.txt", "wb") as f:
for chunk in node_file:
f.write(chunk)
# List batch service logs on node
from azure.batch.models import FileListFromComputeNodeOptions
list_options = FileListFromComputeNodeOptions(recursive=True)
log_files = client.file.list_from_compute_node(
"my-pool", "tvm-123456789", list_options
)
for file_info in log_files:
if "startup" in file_info.name.lower() or "stdout" in file_info.name.lower():
print(f"Log file: {file_info.name}")# Delete temporary files from task working directory
try:
client.file.delete_from_task("my-job", "task-001", "temp/intermediate.dat")
print("Temporary file deleted successfully")
except Exception as e:
print(f"Failed to delete file: {e}")
# Delete files from compute node
client.file.delete_from_compute_node("my-pool", "tvm-123456789", "temp/cache.tmp")
# Batch delete multiple task files
temp_files = ["temp1.txt", "temp2.txt", "cache/temp.dat"]
for temp_file in temp_files:
try:
client.file.delete_from_task("my-job", "task-001", temp_file)
except Exception as e:
print(f"Could not delete {temp_file}: {e}")import os
def download_task_outputs(job_id, task_id, local_dir):
"""Download all output files from a task to local directory."""
os.makedirs(local_dir, exist_ok=True)
files = client.file.list_from_task(job_id, task_id,
FileListFromTaskOptions(recursive=True))
for file_info in files:
if not file_info.is_directory and not file_info.name.startswith("wd/"):
# Skip working directory files, download outputs only
local_path = os.path.join(local_dir, file_info.name.replace("/", "_"))
try:
content = client.file.get_from_task(job_id, task_id, file_info.name)
with open(local_path, "wb") as f:
for chunk in content:
f.write(chunk)
print(f"Downloaded: {file_info.name} -> {local_path}")
except Exception as e:
print(f"Failed to download {file_info.name}: {e}")
# Usage
download_task_outputs("my-job", "task-001", "./task_outputs")
def get_task_logs(job_id, task_id):
"""Get stdout and stderr content as strings."""
logs = {}
for log_file in ["stdout.txt", "stderr.txt"]:
try:
content = client.file.get_from_task(job_id, task_id, log_file)
logs[log_file] = b"".join(content).decode('utf-8')
except Exception as e:
logs[log_file] = f"Error reading {log_file}: {e}"
return logs
# Usage
task_logs = get_task_logs("my-job", "task-001")
print("STDOUT:")
print(task_logs["stdout.txt"])
print("\nSTDERR:")
print(task_logs["stderr.txt"])class FileProperties:
"""File properties and metadata."""
def __init__(self):
self.name: str
self.url: str
self.is_directory: bool
self.properties: FilePropertiesDetail
class FilePropertiesDetail:
"""Detailed file properties."""
def __init__(self):
self.content_length: int
self.content_type: str
self.creation_time: datetime.datetime
self.last_modified: datetime.datetime
self.file_mode: strclass FileListFromTaskOptions:
"""Options for listing files from task."""
def __init__(self):
self.filter: str
self.recursive: bool
self.max_results: int
self.timeout: int
class FileListFromComputeNodeOptions:
"""Options for listing files from compute node."""
def __init__(self):
self.filter: str
self.recursive: bool
self.max_results: int
self.timeout: int
class FileGetFromTaskOptions:
"""Options for getting file from task."""
def __init__(self):
self.ocp_range: str # Byte range like "bytes=0-1023"
self.if_modified_since: datetime.datetime
self.if_unmodified_since: datetime.datetime
self.timeout: int
class FileGetFromComputeNodeOptions:
"""Options for getting file from compute node."""
def __init__(self):
self.ocp_range: str # Byte range like "bytes=0-1023"
self.if_modified_since: datetime.datetime
self.if_unmodified_since: datetime.datetime
self.timeout: int
class FileDeleteFromTaskOptions:
"""Options for deleting file from task."""
def __init__(self):
self.recursive: bool
self.timeout: int
class FileDeleteFromComputeNodeOptions:
"""Options for deleting file from compute node."""
def __init__(self):
self.recursive: bool
self.timeout: int
class FileGetPropertiesFromTaskOptions:
"""Options for getting file properties from task."""
def __init__(self):
self.if_modified_since: datetime.datetime
self.if_unmodified_since: datetime.datetime
self.timeout: int
class FileGetPropertiesFromComputeNodeOptions:
"""Options for getting file properties from compute node."""
def __init__(self):
self.if_modified_since: datetime.datetime
self.if_unmodified_since: datetime.datetime
self.timeout: intInstall with Tessl CLI
npx tessl i tessl/pypi-azure-batchdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10