Microsoft Azure Developer LoadTesting Client Library for Python providing programmatic access to Azure's load testing platform.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Execute load tests and monitor performance through the LoadTestRunClient. This includes starting and stopping test runs, monitoring execution progress, collecting comprehensive performance metrics, and accessing detailed results and logs from both client-side and server-side perspectives.
Create, monitor, and control test run execution. Test runs execute the load test configuration against target endpoints and collect comprehensive performance metrics.
def begin_test_run(
test_run_id: str,
body: Union[JSON, IO],
*,
old_test_run_id: Optional[str] = None,
**kwargs
) -> LROPoller[JSON]:
"""
Create and start a new test run. Long running operation.
Parameters:
- test_run_id (str): Unique identifier for the test run
- body (Union[JSON, IO]): Test run configuration
- old_test_run_id (str, optional): Previous test run ID for comparison
Returns:
LROPoller[JSON]: Poller for test run execution status
"""
def get_test_run(test_run_id: str, **kwargs) -> JSON:
"""
Get test run details and current status.
Parameters:
- test_run_id (str): Unique test run identifier
Returns:
JSON: Test run details including status, start/end times, and statistics
"""
def delete_test_run(test_run_id: str, **kwargs) -> None:
"""
Delete a test run and its results.
Parameters:
- test_run_id (str): Unique test run identifier
"""
def stop_test_run(test_run_id: str, **kwargs) -> JSON:
"""
Stop a running test before completion.
Parameters:
- test_run_id (str): Unique test run identifier
Returns:
JSON: Updated test run status
"""
def list_test_runs(
*,
orderby: Optional[str] = None,
search: Optional[str] = None,
test_id: Optional[str] = None,
execution_from: Optional[datetime] = None,
execution_to: Optional[datetime] = None,
status: Optional[str] = None,
**kwargs
) -> Iterable[JSON]:
"""
List test runs with optional filtering.
Parameters:
- orderby (str, optional): Sort order ("lastModifiedDateTime asc/desc", "createdDateTime asc/desc")
- search (str, optional): Search by displayName or createdBy
- test_id (str, optional): Filter by test ID
- execution_from (datetime, optional): Filter by execution start time
- execution_to (datetime, optional): Filter by execution end time
- status (str, optional): Filter by status ("ACCEPTED", "NOTSTARTED", "PROVISIONING", "PROVISIONED", "CONFIGURING", "CONFIGURED", "EXECUTING", "EXECUTED", "DEPROVISIONING", "DEPROVISIONED", "DONE", "CANCELLING", "CANCELLED", "FAILED")
Returns:
Iterable[JSON]: Paginated list of test runs
"""from azure.core.credentials import DefaultAzureCredential
from azure.developer.loadtesting import LoadTestRunClient
import time
credential = DefaultAzureCredential()
client = LoadTestRunClient(
endpoint="https://your-resource.loadtest.azure.com",
credential=credential
)
# Define test run configuration
test_run_config = {
"testId": "my-load-test",
"displayName": "Production Load Test Run",
"description": "Load test against production API",
"loadTestConfiguration": {
"engineInstances": 2
},
"environmentVariables": {
"BASE_URL": "https://api.myapp.com",
"USERS_PER_ENGINE": "50"
},
"secrets": {
"API_KEY": {
"value": "https://my-vault.vault.azure.net/secrets/api-key",
"type": "AKV_SECRET_URI"
}
}
}
with client:
# Start the test run
print("Starting test run...")
run_poller = client.begin_test_run("prod-run-001", test_run_config)
# Monitor progress
while not run_poller.done():
test_run = client.get_test_run("prod-run-001")
print(f"Status: {test_run['status']} - {test_run.get('statusMessage', '')}")
time.sleep(30)
# Get final results
final_result = run_poller.result()
print(f"Test completed with status: {final_result['status']}")
print(f"Test result: {final_result['testResult']}")Access test run output files including logs, results, and artifacts generated during test execution.
def get_test_run_file(test_run_id: str, file_name: str, **kwargs) -> JSON:
"""
Get test run file details and download URL.
Parameters:
- test_run_id (str): Unique test run identifier
- file_name (str): Name of the file to retrieve
Returns:
JSON: File metadata including download URL and expiration
"""with client:
# Get list of available files for a test run
test_run = client.get_test_run("prod-run-001")
# Download test results
results_file = client.get_test_run_file("prod-run-001", "results.xml")
download_url = results_file['url']
# Download and process results
import requests
response = requests.get(download_url)
with open("test-results.xml", "wb") as f:
f.write(response.content)Retrieve comprehensive performance metrics collected during test execution, including client-side metrics (response times, throughput, error rates) and server-side Azure resource metrics.
def get_metric_namespaces(test_run_id: str, **kwargs) -> JSON:
"""
Get available metric namespaces for a test run.
Parameters:
- test_run_id (str): Unique test run identifier
Returns:
JSON: Available metric namespaces including LoadTestRunMetrics and Azure resource namespaces
"""
def get_metric_definitions(test_run_id: str, *, metric_namespace: str, **kwargs) -> JSON:
"""
Get metric definitions for a specific namespace.
Parameters:
- test_run_id (str): Unique test run identifier
- metric_namespace (str): Metric namespace (e.g., "LoadTestRunMetrics")
Returns:
JSON: Available metrics and their definitions for the namespace
"""
def list_metrics(
test_run_id: str,
body: Optional[Union[JSON, IO]] = None,
*,
metric_namespace: str,
metric_name: str,
time_interval: str,
interval: Optional[str] = None,
aggregation: Optional[str] = None,
**kwargs
) -> Iterable[JSON]:
"""
List metric values for a test run.
Parameters:
- test_run_id (str): Unique test run identifier
- body (Optional[Union[JSON, IO]]): Optional request body for metric filtering
- metric_namespace (str): Metric namespace (e.g., "LoadTestRunMetrics")
- metric_name (str): Specific metric name (e.g., "response_time_ms", "requests_per_sec")
- time_interval (str): Time range in ISO 8601 format (required)
- interval (str, optional): Aggregation interval (e.g., "PT1M" for 1 minute)
- aggregation (str, optional): Aggregation type ("Average", "Count", "Maximum", "Minimum", "Total")
Returns:
Iterable[JSON]: Time series metric data points
"""
def list_metric_dimension_values(
test_run_id: str,
name: str,
*,
metric_namespace: str,
metric_name: str,
time_interval: str,
interval: Optional[str] = None,
**kwargs
) -> Iterable[str]:
"""
List metric dimension values (e.g., different samplers or URLs).
Parameters:
- test_run_id (str): Unique test run identifier
- name (str): Dimension name (e.g., "SamplerName", "RequestName")
- metric_namespace (str): Metric namespace
- metric_name (str): Metric name
- time_interval (str): Time range filter (required)
- interval (str, optional): Aggregation interval
Returns:
Iterable[str]: Available dimension values
"""import json
with client:
test_run_id = "prod-run-001"
# Get available metric namespaces
namespaces = client.get_metric_namespaces(test_run_id)
print("Available namespaces:")
for ns in namespaces['value']:
print(f"- {ns['name']}")
# Get client-side metrics definitions
metrics_def = client.get_metric_definitions(
test_run_id,
metric_namespace="LoadTestRunMetrics"
)
print("\nAvailable client metrics:")
for metric in metrics_def['value']:
print(f"- {metric['name']['value']}: {metric['displayDescription']}")
# Get response time metrics
response_times = list(client.list_metrics(
test_run_id,
metric_namespace="LoadTestRunMetrics",
metric_name="response_time_ms",
time_interval="PT30M", # Last 30 minutes
aggregation="Average",
interval="PT1M" # 1 minute intervals
))
print(f"\nResponse time data points: {len(response_times)}")
for point in response_times[:5]: # Show first 5 points
timestamp = point['timestamp']
value = point['average']
print(f" {timestamp}: {value:.2f}ms")
# Get throughput metrics
throughput = list(client.list_metrics(
test_run_id,
metric_namespace="LoadTestRunMetrics",
metric_name="requests_per_sec",
time_interval="PT30M",
aggregation="Average"
))
avg_throughput = sum(p['average'] for p in throughput) / len(throughput)
print(f"\nAverage throughput: {avg_throughput:.2f} requests/sec")
# Get error rate
errors = list(client.list_metrics(
test_run_id,
metric_namespace="LoadTestRunMetrics",
metric_name="error",
time_interval="PT30M",
aggregation="Percentage"
))
if errors:
error_rate = errors[-1]['average'] # Latest error rate
print(f"Final error rate: {error_rate:.2f}%")Configure and retrieve application components monitoring for specific test runs, enabling server-side metrics collection during test execution.
def create_or_update_app_components(test_run_id: str, body: Union[JSON, IO], **kwargs) -> JSON:
"""
Create or update app components configuration for a test run.
Parameters:
- test_run_id (str): Unique test run identifier
- body (Union[JSON, IO]): App components configuration
Returns:
JSON: App components configuration for the test run
"""
def get_app_components(test_run_id: str, **kwargs) -> JSON:
"""
Get app components configuration for a test run.
Parameters:
- test_run_id (str): Unique test run identifier
Returns:
JSON: App components configuration
"""Configure and retrieve server-side metrics collection for specific test runs, enabling monitoring of Azure resource performance during load testing.
def create_or_update_server_metrics_config(test_run_id: str, body: Union[JSON, IO], **kwargs) -> JSON:
"""
Create or update server metrics configuration for a test run.
Parameters:
- test_run_id (str): Unique test run identifier
- body (Union[JSON, IO]): Server metrics configuration
Returns:
JSON: Server metrics configuration for the test run
"""
def get_server_metrics_config(test_run_id: str, **kwargs) -> JSON:
"""
Get server metrics configuration for a test run.
Parameters:
- test_run_id (str): Unique test run identifier
Returns:
JSON: Server metrics configuration
"""with client:
test_run_id = "prod-run-001"
# Configure server metrics for this specific test run
server_metrics = {
"metrics": {
"webapp-cpu": {
"resourceId": "/subscriptions/.../providers/Microsoft.Web/sites/my-app",
"metricNamespace": "Microsoft.Web/sites",
"name": "CpuPercentage",
"aggregation": "Average"
}
}
}
# Apply configuration to test run
client.create_or_update_server_metrics_config(test_run_id, server_metrics)
# After test completion, get server metrics
server_cpu_data = list(client.list_metrics(
test_run_id,
metric_namespace="Microsoft.Web/sites",
metric_name="CpuPercentage",
time_interval="PT30M",
aggregation="Average"
))
max_cpu = max(point['average'] for point in server_cpu_data)
print(f"Peak server CPU usage: {max_cpu:.1f}%")All test execution operations have async equivalents in azure.developer.loadtesting.aio.LoadTestRunClient:
from azure.developer.loadtesting.aio import LoadTestRunClient
from azure.core.credentials_async import DefaultAzureCredential
import asyncio
async def run_load_test():
credential = DefaultAzureCredential()
client = LoadTestRunClient(
endpoint="https://your-resource.loadtest.azure.com",
credential=credential
)
async with client:
# Start test run
run_poller = await client.begin_test_run("async-run-001", test_config)
# Monitor progress
while not run_poller.done():
await asyncio.sleep(30)
test_run = await client.get_test_run("async-run-001")
print(f"Status: {test_run['status']}")
# Get results
result = await run_poller.result()
# Collect metrics
metrics = []
async for metric_point in client.list_metrics(
"async-run-001",
metric_namespace="LoadTestRunMetrics",
metric_name="response_time_ms",
time_interval="PT30M"
):
metrics.append(metric_point)
return result, metrics
# Run the async function
result, metrics = asyncio.run(run_load_test())Install with Tessl CLI
npx tessl i tessl/pypi-azure-developer-loadtesting