Microsoft Azure Synapse Management Client Library for Python
npx @tessl/cli install tessl/pypi-azure-mgmt-synapse@2.0.0Microsoft Azure Synapse Management Client Library provides comprehensive programmatic access to Azure Synapse Analytics resources. This client library enables automation of workspace management, SQL pool operations, Apache Spark pool configuration, integration runtime provisioning, security policy management, and all aspects of Azure Synapse Analytics infrastructure.
pip install azure-mgmt-synapsefrom azure.mgmt.synapse import SynapseManagementClientFor async operations:
from azure.mgmt.synapse.aio import SynapseManagementClientImport models and types:
from azure.mgmt.synapse.models import (
Workspace, SqlPool, BigDataPoolResourceInfo,
WorkspacePatchInfo, SqlPoolPatchInfo,
IntegrationRuntimeResource, PrivateLinkHub, Key,
IpFirewallRuleInfo, PrivateEndpointConnection,
SqlPoolBlobAuditingPolicy, LibraryResource
)from azure.identity import DefaultAzureCredential
from azure.mgmt.synapse import SynapseManagementClient
# Initialize client with Azure credentials
credential = DefaultAzureCredential()
subscription_id = "your-subscription-id"
client = SynapseManagementClient(credential, subscription_id)
# List all Synapse workspaces in a resource group
resource_group = "my-resource-group"
workspaces = client.workspaces.list_by_resource_group(resource_group)
for workspace in workspaces:
print(f"Workspace: {workspace.name}, Location: {workspace.location}")
# Get a specific workspace
workspace_name = "my-synapse-workspace"
workspace = client.workspaces.get(resource_group, workspace_name)
print(f"Workspace URL: {workspace.connectivity_endpoints.web}")
# List SQL pools in the workspace
sql_pools = client.sql_pools.list_by_workspace(resource_group, workspace_name)
for pool in sql_pools:
print(f"SQL Pool: {pool.name}, Status: {pool.status}")Azure Synapse Management Client follows Azure Resource Manager (ARM) patterns:
SynapseManagementClient serves as the main entry point with operation groups as propertiesLROPoller objects for tracking async operationsItemPaged objects for handling large result setsThe client supports both synchronous and asynchronous operations through separate client implementations.
Core Synapse workspace lifecycle management including creation, configuration, monitoring, and administrative operations.
# Primary workspace operations
def get(resource_group_name: str, workspace_name: str) -> Workspace
def create_or_update(resource_group_name: str, workspace_name: str, workspace_info: Workspace) -> LROPoller[Workspace]
def delete(resource_group_name: str, workspace_name: str) -> LROPoller[object]
def list_by_resource_group(resource_group_name: str) -> ItemPaged[Workspace]
def list_by_subscription() -> ItemPaged[Workspace]
def update(resource_group_name: str, workspace_name: str, workspace_patch_info: WorkspacePatchInfo) -> LROPoller[Workspace]Dedicated SQL pool lifecycle management, configuration, scaling, security, and monitoring operations.
# Core SQL pool operations
def get(resource_group_name: str, workspace_name: str, sql_pool_name: str) -> SqlPool
def create(resource_group_name: str, workspace_name: str, sql_pool_name: str, sql_pool_info: SqlPool) -> LROPoller[SqlPool]
def delete(resource_group_name: str, workspace_name: str, sql_pool_name: str) -> LROPoller[object]
def pause(resource_group_name: str, workspace_name: str, sql_pool_name: str) -> LROPoller[object]
def resume(resource_group_name: str, workspace_name: str, sql_pool_name: str) -> LROPoller[object]
def list_by_workspace(resource_group_name: str, workspace_name: str) -> ItemPaged[SqlPool]Big data pool (Apache Spark) configuration, auto-scaling, library management, and lifecycle operations.
# Big data pool operations
def get(resource_group_name: str, workspace_name: str, big_data_pool_name: str) -> BigDataPoolResourceInfo
def create_or_update(resource_group_name: str, workspace_name: str, big_data_pool_name: str, big_data_pool_info: BigDataPoolResourceInfo) -> LROPoller[BigDataPoolResourceInfo]
def delete(resource_group_name: str, workspace_name: str, big_data_pool_name: str) -> LROPoller[object]
def list_by_workspace(resource_group_name: str, workspace_name: str) -> ItemPaged[BigDataPoolResourceInfo]Comprehensive security management including auditing, vulnerability assessments, data classification, masking, and threat protection.
# Security operations
def get_blob_auditing_policy(resource_group_name: str, workspace_name: str, sql_pool_name: str) -> SqlPoolBlobAuditingPolicy
def create_or_update_blob_auditing_policy(resource_group_name: str, workspace_name: str, sql_pool_name: str, parameters: SqlPoolBlobAuditingPolicy) -> SqlPoolBlobAuditingPolicy
def get_vulnerability_assessment(resource_group_name: str, workspace_name: str, sql_pool_name: str) -> SqlPoolVulnerabilityAssessment
def get_security_alert_policy(resource_group_name: str, workspace_name: str, sql_pool_name: str) -> SqlPoolSecurityAlertPolicyIntegration runtime provisioning, configuration, monitoring, and credential management for data integration scenarios.
# Integration runtime operations
def get(resource_group_name: str, workspace_name: str, integration_runtime_name: str) -> IntegrationRuntimeResource
def create(resource_group_name: str, workspace_name: str, integration_runtime_name: str, integration_runtime: IntegrationRuntimeResource) -> LROPoller[IntegrationRuntimeResource]
def delete(resource_group_name: str, workspace_name: str, integration_runtime_name: str) -> LROPoller[object]
def list_by_workspace(resource_group_name: str, workspace_name: str) -> ItemPaged[IntegrationRuntimeResource]
def start(resource_group_name: str, workspace_name: str, integration_runtime_name: str) -> LROPoller[IntegrationRuntimeStatusResponse]
def stop(resource_group_name: str, workspace_name: str, integration_runtime_name: str) -> LROPoller[object]Integration Runtime Management
Limited library inspection capabilities for workspace libraries. Note: This SDK provides read-only access to libraries - creation, upload, and management operations are not available through this management client.
# Library operations (read-only)
def get(resource_group_name: str, workspace_name: str, library_name: str) -> LibraryResource
def list_by_workspace(resource_group_name: str, workspace_name: str) -> ItemPaged[LibraryListResponse]Private Link Hub creation, configuration, and management for enabling secure connectivity to Synapse resources through private endpoints.
# Private Link Hub operations
def get(resource_group_name: str, private_link_hub_name: str) -> PrivateLinkHub
def create_or_update(resource_group_name: str, private_link_hub_name: str, private_link_hub: PrivateLinkHub) -> PrivateLinkHub
def update(resource_group_name: str, private_link_hub_name: str, private_link_hub_patch_info: PrivateLinkHubPatchInfo) -> PrivateLinkHub
def delete(resource_group_name: str, private_link_hub_name: str) -> LROPoller[object]
def list_by_resource_group(resource_group_name: str) -> ItemPaged[PrivateLinkHub]
def list() -> ItemPaged[PrivateLinkHub]Workspace key lifecycle management including customer-managed keys for encryption and access control.
# Key operations
def get(resource_group_name: str, workspace_name: str, key_name: str) -> Key
def create_or_update(resource_group_name: str, workspace_name: str, key_name: str, key_properties: Key) -> Key
def delete(resource_group_name: str, workspace_name: str, key_name: str) -> Key
def list_by_workspace(resource_group_name: str, workspace_name: str) -> ItemPaged[Key]Network isolation, private endpoints, firewall rules, and connectivity management for secure Synapse deployments.
# Network security operations
def get_ip_firewall_rule(resource_group_name: str, workspace_name: str, rule_name: str) -> IpFirewallRuleInfo
def create_or_update_ip_firewall_rule(resource_group_name: str, workspace_name: str, rule_name: str, ip_firewall_rule_info: IpFirewallRuleInfo) -> LROPoller[IpFirewallRuleInfo]
def get_private_endpoint_connection(resource_group_name: str, workspace_name: str, private_endpoint_connection_name: str) -> PrivateEndpointConnection
def create_private_endpoint_connection(resource_group_name: str, workspace_name: str, private_endpoint_connection_name: str, request: PrivateEndpointConnection) -> LROPoller[PrivateEndpointConnection]Database schema, table, column, and metadata operations for SQL pool data structures.
# Schema management operations
def list_schemas(resource_group_name: str, workspace_name: str, sql_pool_name: str) -> ItemPaged[SqlPoolSchema]
def list_tables(resource_group_name: str, workspace_name: str, sql_pool_name: str, schema_name: str) -> ItemPaged[SqlPoolTable]
def list_table_columns(resource_group_name: str, workspace_name: str, sql_pool_name: str, schema_name: str, table_name: str) -> ItemPaged[SqlPoolColumn]
def get_column(resource_group_name: str, workspace_name: str, sql_pool_name: str, schema_name: str, table_name: str, column_name: str) -> SqlPoolColumnOperation monitoring, usage metrics, restore points, and maintenance management across all Synapse resources.
# Monitoring operations
def list_sql_pool_operations(resource_group_name: str, workspace_name: str, sql_pool_name: str) -> ItemPaged[SqlPoolOperation]
def list_usages(resource_group_name: str, workspace_name: str, sql_pool_name: str) -> ItemPaged[SqlPoolUsage]
def list_restore_points(resource_group_name: str, workspace_name: str, sql_pool_name: str) -> ItemPaged[RestorePoint]
def create_restore_point(resource_group_name: str, workspace_name: str, sql_pool_name: str, parameters: CreateSqlPoolRestorePointDefinition) -> LROPoller[RestorePoint]All operations may raise Azure-specific exceptions that should be handled appropriately:
from azure.core.exceptions import (
ClientAuthenticationError,
HttpResponseError,
ResourceExistsError,
ResourceNotFoundError
)
from azure.mgmt.core.exceptions import ARMErrorFormat
try:
workspace = client.workspaces.get(resource_group, workspace_name)
except ResourceNotFoundError:
print("Workspace not found")
except ClientAuthenticationError:
print("Authentication failed")
except HttpResponseError as e:
print(f"HTTP error: {e.status_code} - {e.message}")For async operations, use the aio client with the same interface:
import asyncio
from azure.identity.aio import DefaultAzureCredential
from azure.mgmt.synapse.aio import SynapseManagementClient
async def list_workspaces():
credential = DefaultAzureCredential()
async with SynapseManagementClient(credential, subscription_id) as client:
workspaces = client.workspaces.list_by_resource_group(resource_group)
async for workspace in workspaces:
print(f"Workspace: {workspace.name}")
asyncio.run(list_workspaces())Essential types and models used throughout the Azure Synapse Management Client:
# Core resource types
class SynapseManagementClient:
"""Main client for Azure Synapse Management operations"""
def __init__(self, credential: TokenCredential, subscription_id: str, base_url: Optional[str] = None) -> None: ...
def close(self) -> None: ...
class Workspace(TrackedResource):
"""Synapse workspace resource"""
identity: Optional[ManagedIdentity]
default_data_lake_storage: Optional[DataLakeStorageAccountDetails]
sql_administrator_login_password: Optional[str]
managed_resource_group_name: Optional[str]
provisioning_state: Optional[str]
sql_administrator_login: Optional[str]
virtual_network_profile: Optional[VirtualNetworkProfile]
connectivity_endpoints: Optional[Dict[str, str]]
managed_virtual_network: Optional[str]
private_endpoint_connections: Optional[List[PrivateEndpointConnection]]
encryption: Optional[EncryptionDetails]
workspace_uid: Optional[str]
extra_properties: Optional[Dict[str, Any]]
managed_virtual_network_settings: Optional[ManagedVirtualNetworkSettings]
workspace_repository_configuration: Optional[WorkspaceRepositoryConfiguration]
purview_configuration: Optional[PurviewConfiguration]
class SqlPool(TrackedResource):
"""SQL pool resource"""
sku: Optional[Sku]
max_size_bytes: Optional[int]
collation: Optional[str]
source_database_id: Optional[str]
recoverable_database_id: Optional[str]
provisioning_state: Optional[str]
status: Optional[str]
restore_point_in_time: Optional[datetime]
create_mode: Optional[str]
creation_date: Optional[datetime]
storage_account_type: Optional[StorageAccountType]
class BigDataPoolResourceInfo(TrackedResource):
"""Big Data (Spark) pool resource"""
provisioning_state: Optional[str]
auto_scale: Optional[AutoScaleProperties]
creation_date: Optional[datetime]
auto_pause: Optional[AutoPauseProperties]
is_compute_isolation_enabled: Optional[bool]
have_library_requirements_changed: Optional[bool]
session_level_packages_enabled: Optional[bool]
cache_size: Optional[int]
dynamic_executor_allocation: Optional[DynamicExecutorAllocation]
spark_events_folder: Optional[str]
node_count: Optional[int]
library_requirements: Optional[LibraryRequirements]
custom_libraries: Optional[List[LibraryInfo]]
spark_config_properties: Optional[Dict[str, str]]
spark_version: Optional[str]
default_spark_log_folder: Optional[str]
node_size: Optional[NodeSize]
node_size_family: Optional[NodeSizeFamily]
class IntegrationRuntimeResource(SubResource):
"""Integration runtime resource"""
properties: Optional[IntegrationRuntime]
name: Optional[str]
type: Optional[str]
etag: Optional[str]
class PrivateLinkHub(Resource):
"""Private Link Hub resource"""
name: Optional[str]
id: Optional[str]
type: Optional[str]
tags: Optional[Dict[str, str]]
location: Optional[str]
provisioning_state: Optional[str]
private_endpoint_connections: Optional[List[PrivateEndpointConnectionForPrivateLinkHubBasic]]
class Key:
"""Workspace key resource"""
is_active_cmk: Optional[bool]
key_vault_url: Optional[str]
name: Optional[str]
# Common operation result types
class LROPoller(Generic[T]):
"""Long-running operation poller"""
def result(self, timeout: Optional[int] = None) -> T: ...
def wait(self, timeout: Optional[int] = None) -> None: ...
def done(self) -> bool: ...
def status(self) -> str: ...
class ItemPaged(Generic[T]):
"""Paged result iterator"""
def __iter__(self) -> Iterator[T]: ...
# Security and auditing types
class SqlPoolBlobAuditingPolicy(ProxyResource):
"""SQL pool blob auditing policy"""
kind: Optional[str]
state: Optional[BlobAuditingPolicyState]
storage_endpoint: Optional[str]
storage_account_access_key: Optional[str]
retention_days: Optional[int]
audit_actions_and_groups: Optional[List[str]]
storage_account_subscription_id: Optional[str]
is_storage_secondary_key_in_use: Optional[bool]
is_azure_monitor_target_enabled: Optional[bool]
class SqlPoolVulnerabilityAssessment(ProxyResource):
"""SQL pool vulnerability assessment"""
storage_container_path: Optional[str]
storage_container_sas_key: Optional[str]
storage_account_access_key: Optional[str]
recurring_scans: Optional[VulnerabilityAssessmentRecurringScansProperties]
# Common enums
class BlobAuditingPolicyState(str, Enum):
ENABLED = "Enabled"
DISABLED = "Disabled"
class NodeSize(str, Enum):
NONE = "None"
SMALL = "Small"
MEDIUM = "Medium"
LARGE = "Large"
XLARGE = "XLarge"
XXLARGE = "XXLarge"
XXXLARGE = "XXXLarge"
class IntegrationRuntimeState(str, Enum):
INITIAL = "Initial"
STOPPED = "Stopped"
STARTED = "Started"
STARTING = "Starting"
STOPPING = "Stopping"
NEED_REGISTRATION = "NeedRegistration"
ONLINE = "Online"
LIMITED = "Limited"
OFFLINE = "Offline"
ACCESS_DENIED = "AccessDenied"
class ProvisioningState(str, Enum):
PROVISIONING = "Provisioning"
SUCCEEDED = "Succeeded"
DELETING = "Deleting"
FAILED = "Failed"
DELETE_ERROR = "DeleteError"