Microsoft Azure Synapse Management Client Library for Python
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive security management including auditing, vulnerability assessments, data classification, masking, and threat protection. These capabilities provide enterprise-grade security controls for SQL pools within Azure Synapse Analytics.
Configure blob auditing to track database activities and write audit logs to Azure Storage.
def get_blob_auditing_policy(resource_group_name: str, workspace_name: str, sql_pool_name: str, blob_auditing_policy_name: str) -> SqlPoolBlobAuditingPolicy:
"""
Get a blob auditing policy.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- blob_auditing_policy_name (str): Name of the blob auditing policy
Returns:
SqlPoolBlobAuditingPolicy: Blob auditing policy
"""
def create_or_update_blob_auditing_policy(resource_group_name: str, workspace_name: str, sql_pool_name: str, blob_auditing_policy_name: str, parameters: SqlPoolBlobAuditingPolicy) -> SqlPoolBlobAuditingPolicy:
"""
Create or update blob auditing policy.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- blob_auditing_policy_name (str): Name of the blob auditing policy
- parameters (SqlPoolBlobAuditingPolicy): Auditing policy configuration
Returns:
SqlPoolBlobAuditingPolicy: Updated auditing policy
"""
def list_by_sql_pool(resource_group_name: str, workspace_name: str, sql_pool_name: str) -> ItemPaged[SqlPoolBlobAuditingPolicy]:
"""
List blob auditing policies for a SQL pool.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
Returns:
ItemPaged[SqlPoolBlobAuditingPolicy]: Paged collection of auditing policies
"""Configure extended blob auditing with additional settings and retention policies.
def get_extended_blob_auditing_policy(resource_group_name: str, workspace_name: str, sql_pool_name: str, blob_auditing_policy_name: str) -> ExtendedSqlPoolBlobAuditingPolicy:
"""
Get extended blob auditing policy.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- blob_auditing_policy_name (str): Name of the blob auditing policy
Returns:
ExtendedSqlPoolBlobAuditingPolicy: Extended blob auditing policy
"""
def create_or_update_extended_blob_auditing_policy(resource_group_name: str, workspace_name: str, sql_pool_name: str, blob_auditing_policy_name: str, parameters: ExtendedSqlPoolBlobAuditingPolicy) -> ExtendedSqlPoolBlobAuditingPolicy:
"""
Create or update extended blob auditing policy.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- blob_auditing_policy_name (str): Name of the blob auditing policy
- parameters (ExtendedSqlPoolBlobAuditingPolicy): Extended auditing policy configuration
Returns:
ExtendedSqlPoolBlobAuditingPolicy: Updated extended auditing policy
"""
def list_by_sql_pool_extended(resource_group_name: str, workspace_name: str, sql_pool_name: str) -> ItemPaged[ExtendedSqlPoolBlobAuditingPolicy]:
"""
List extended blob auditing policies for a SQL pool.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
Returns:
ItemPaged[ExtendedSqlPoolBlobAuditingPolicy]: Paged collection of extended auditing policies
"""Configure threat detection and security alert policies to monitor suspicious activities.
def get_security_alert_policy(resource_group_name: str, workspace_name: str, sql_pool_name: str, security_alert_policy_name: str) -> SqlPoolSecurityAlertPolicy:
"""
Get security alert policy.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- security_alert_policy_name (str): Name of the security alert policy
Returns:
SqlPoolSecurityAlertPolicy: Security alert policy
"""
def create_or_update_security_alert_policy(resource_group_name: str, workspace_name: str, sql_pool_name: str, security_alert_policy_name: str, parameters: SqlPoolSecurityAlertPolicy) -> SqlPoolSecurityAlertPolicy:
"""
Create or update security alert policy.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- security_alert_policy_name (str): Name of the security alert policy
- parameters (SqlPoolSecurityAlertPolicy): Security alert policy configuration
Returns:
SqlPoolSecurityAlertPolicy: Updated security alert policy
"""
def list_security_alert_policies(resource_group_name: str, workspace_name: str, sql_pool_name: str) -> ItemPaged[SqlPoolSecurityAlertPolicy]:
"""
List security alert policies for a SQL pool.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
Returns:
ItemPaged[SqlPoolSecurityAlertPolicy]: Paged collection of security alert policies
"""Configure and manage vulnerability assessments to identify security risks.
def get_vulnerability_assessment(resource_group_name: str, workspace_name: str, sql_pool_name: str, vulnerability_assessment_name: str) -> SqlPoolVulnerabilityAssessment:
"""
Get vulnerability assessment settings.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- vulnerability_assessment_name (str): Name of the vulnerability assessment
Returns:
SqlPoolVulnerabilityAssessment: Vulnerability assessment settings
"""
def create_or_update_vulnerability_assessment(resource_group_name: str, workspace_name: str, sql_pool_name: str, vulnerability_assessment_name: str, parameters: SqlPoolVulnerabilityAssessment) -> SqlPoolVulnerabilityAssessment:
"""
Create or update vulnerability assessment settings.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- vulnerability_assessment_name (str): Name of the vulnerability assessment
- parameters (SqlPoolVulnerabilityAssessment): Vulnerability assessment configuration
Returns:
SqlPoolVulnerabilityAssessment: Updated vulnerability assessment settings
"""
def delete_vulnerability_assessment(resource_group_name: str, workspace_name: str, sql_pool_name: str, vulnerability_assessment_name: str) -> None:
"""
Remove vulnerability assessment settings.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- vulnerability_assessment_name (str): Name of the vulnerability assessment
"""
def list_vulnerability_assessments(resource_group_name: str, workspace_name: str, sql_pool_name: str) -> ItemPaged[SqlPoolVulnerabilityAssessment]:
"""
List vulnerability assessments for a SQL pool.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
Returns:
ItemPaged[SqlPoolVulnerabilityAssessment]: Paged collection of vulnerability assessments
"""Execute and manage vulnerability assessment scans.
def list_scans(resource_group_name: str, workspace_name: str, sql_pool_name: str, vulnerability_assessment_name: str) -> ItemPaged[VulnerabilityAssessmentScanRecord]:
"""
List vulnerability assessment scans.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- vulnerability_assessment_name (str): Name of the vulnerability assessment
Returns:
ItemPaged[VulnerabilityAssessmentScanRecord]: Paged collection of scan records
"""
def get_scan(resource_group_name: str, workspace_name: str, sql_pool_name: str, vulnerability_assessment_name: str, scan_id: str) -> VulnerabilityAssessmentScanRecord:
"""
Get vulnerability assessment scan.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- vulnerability_assessment_name (str): Name of the vulnerability assessment
- scan_id (str): Scan ID
Returns:
VulnerabilityAssessmentScanRecord: Scan record
"""
def initiate_scan(resource_group_name: str, workspace_name: str, sql_pool_name: str, vulnerability_assessment_name: str, scan_id: str) -> LROPoller[VulnerabilityAssessmentScanRecord]:
"""
Initiate a vulnerability assessment scan.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- vulnerability_assessment_name (str): Name of the vulnerability assessment
- scan_id (str): Scan ID
Returns:
LROPoller[VulnerabilityAssessmentScanRecord]: Long-running operation poller
"""Manage data classification and sensitivity labels for compliance and governance.
def get_sensitivity_label(resource_group_name: str, workspace_name: str, sql_pool_name: str, schema_name: str, table_name: str, column_name: str, sensitivity_label_source: str) -> SensitivityLabel:
"""
Get sensitivity label for a column.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- schema_name (str): Schema name
- table_name (str): Table name
- column_name (str): Column name
- sensitivity_label_source (str): Sensitivity label source
Returns:
SensitivityLabel: Sensitivity label
"""
def create_or_update_sensitivity_label(resource_group_name: str, workspace_name: str, sql_pool_name: str, schema_name: str, table_name: str, column_name: str, sensitivity_label_source: str, parameters: SensitivityLabel) -> SensitivityLabel:
"""
Create or update sensitivity label.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- schema_name (str): Schema name
- table_name (str): Table name
- column_name (str): Column name
- sensitivity_label_source (str): Sensitivity label source
- parameters (SensitivityLabel): Sensitivity label configuration
Returns:
SensitivityLabel: Updated sensitivity label
"""
def delete_sensitivity_label(resource_group_name: str, workspace_name: str, sql_pool_name: str, schema_name: str, table_name: str, column_name: str, sensitivity_label_source: str) -> None:
"""
Delete sensitivity label.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- schema_name (str): Schema name
- table_name (str): Table name
- column_name (str): Column name
- sensitivity_label_source (str): Sensitivity label source
"""
def list_current_sensitivity_labels(resource_group_name: str, workspace_name: str, sql_pool_name: str, **kwargs) -> ItemPaged[SensitivityLabel]:
"""
List current sensitivity labels for a SQL pool.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- **kwargs: Additional filtering parameters
Returns:
ItemPaged[SensitivityLabel]: Paged collection of sensitivity labels
"""Configure dynamic data masking to protect sensitive data.
def get_data_masking_policy(resource_group_name: str, workspace_name: str, sql_pool_name: str, data_masking_policy_name: str) -> DataMaskingPolicy:
"""
Get data masking policy.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- data_masking_policy_name (str): Name of the data masking policy
Returns:
DataMaskingPolicy: Data masking policy
"""
def create_or_update_data_masking_policy(resource_group_name: str, workspace_name: str, sql_pool_name: str, parameters: DataMaskingPolicy) -> DataMaskingPolicy:
"""
Create or update data masking policy.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- parameters (DataMaskingPolicy): Data masking policy configuration
Returns:
DataMaskingPolicy: Updated data masking policy
"""
def get_data_masking_rule(resource_group_name: str, workspace_name: str, sql_pool_name: str, data_masking_rule_name: str) -> DataMaskingRule:
"""
Get data masking rule.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- data_masking_rule_name (str): Name of the data masking rule
Returns:
DataMaskingRule: Data masking rule
"""
def create_or_update_data_masking_rule(resource_group_name: str, workspace_name: str, sql_pool_name: str, data_masking_rule_name: str, parameters: DataMaskingRule) -> DataMaskingRule:
"""
Create or update data masking rule.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
- data_masking_rule_name (str): Name of the data masking rule
- parameters (DataMaskingRule): Data masking rule configuration
Returns:
DataMaskingRule: Updated data masking rule
"""
def list_data_masking_rules(resource_group_name: str, workspace_name: str, sql_pool_name: str) -> ItemPaged[DataMaskingRule]:
"""
List data masking rules for a SQL pool.
Parameters:
- resource_group_name (str): Name of the resource group
- workspace_name (str): Name of the workspace
- sql_pool_name (str): Name of the SQL pool
Returns:
ItemPaged[DataMaskingRule]: Paged collection of data masking rules
"""class SqlPoolBlobAuditingPolicy:
"""
A blob auditing policy.
Attributes:
- id (str): Resource ID
- name (str): Resource name
- type (str): Resource type
- kind (str): Resource kind
- state (str): Policy state
- storage_endpoint (str): Storage endpoint
- storage_account_access_key (str): Storage account access key
- retention_days (int): Retention days
- audit_actions_and_groups (list): Actions and groups to audit
- storage_account_subscription_id (str): Storage account subscription ID
- is_storage_secondary_key_in_use (bool): Use secondary storage key
- is_azure_monitor_target_enabled (bool): Enable Azure Monitor target
- queue_delay_ms (int): Queue delay in milliseconds
"""class SqlPoolSecurityAlertPolicy:
"""
A security alert policy.
Attributes:
- id (str): Resource ID
- name (str): Resource name
- type (str): Resource type
- state (str): Policy state
- disabled_alerts (list): Disabled alert types
- email_addresses (list): Email addresses for alerts
- email_account_admins (bool): Email account administrators
- storage_endpoint (str): Storage endpoint
- storage_account_access_key (str): Storage account access key
- retention_days (int): Retention days
- creation_time (datetime): Creation time
"""class SqlPoolVulnerabilityAssessment:
"""
A vulnerability assessment.
Attributes:
- id (str): Resource ID
- name (str): Resource name
- type (str): Resource type
- storage_container_path (str): Storage container path
- storage_container_sas_key (str): Storage container SAS key
- storage_account_access_key (str): Storage account access key
- recurring_scans (VulnerabilityAssessmentRecurringScansProperties): Recurring scan settings
"""class SensitivityLabel:
"""
A sensitivity label.
Attributes:
- id (str): Resource ID
- name (str): Resource name
- type (str): Resource type
- managed_by (str): Managed by
- schema_name (str): Schema name
- table_name (str): Table name
- column_name (str): Column name
- label_name (str): Label name
- label_id (str): Label ID
- information_type (str): Information type
- information_type_id (str): Information type ID
- is_disabled (bool): Is disabled
- rank (str): Sensitivity rank
"""class DataMaskingPolicy:
"""
A data masking policy.
Attributes:
- id (str): Resource ID
- name (str): Resource name
- type (str): Resource type
- location (str): Resource location
- kind (str): Resource kind
- managed_by (str): Managed by
- data_masking_state (str): Data masking state
- exempt_principals (str): Exempt principals
- application_principals (str): Application principals
- masking_level (str): Masking level
"""class DataMaskingRule:
"""
A data masking rule.
Attributes:
- id (str): Resource ID
- name (str): Resource name
- type (str): Resource type
- location (str): Resource location
- kind (str): Resource kind
- id_properties_id (str): Rule ID
- alias_name (str): Alias name
- rule_state (str): Rule state
- schema_name (str): Schema name
- table_name (str): Table name
- column_name (str): Column name
- masking_function (str): Masking function
- number_from (str): Number from (for number range masking)
- number_to (str): Number to (for number range masking)
- prefix_size (str): Prefix size
- suffix_size (str): Suffix size
- replacement_string (str): Replacement string
"""from azure.mgmt.synapse.models import SqlPoolBlobAuditingPolicy
auditing_policy = SqlPoolBlobAuditingPolicy(
state="Enabled",
storage_endpoint="https://mystorageaccount.blob.core.windows.net",
storage_account_access_key="storage-access-key",
retention_days=90,
audit_actions_and_groups=[
"SUCCESSFUL_DATABASE_AUTHENTICATION_GROUP",
"FAILED_DATABASE_AUTHENTICATION_GROUP",
"BATCH_COMPLETED_GROUP"
],
is_azure_monitor_target_enabled=True
)
policy = client.sql_pool_blob_auditing_policies.create_or_update_blob_auditing_policy(
resource_group_name="my-resource-group",
workspace_name="my-synapse-workspace",
sql_pool_name="my-sql-pool",
blob_auditing_policy_name="default",
parameters=auditing_policy
)
print("Configured blob auditing policy")from azure.mgmt.synapse.models import SqlPoolSecurityAlertPolicy
security_policy = SqlPoolSecurityAlertPolicy(
state="Enabled",
email_addresses=["admin@company.com", "security@company.com"],
email_account_admins=True,
disabled_alerts=[],
retention_days=30,
storage_endpoint="https://mystorageaccount.blob.core.windows.net",
storage_account_access_key="storage-access-key"
)
alert_policy = client.sql_pool_security_alert_policies.create_or_update_security_alert_policy(
resource_group_name="my-resource-group",
workspace_name="my-synapse-workspace",
sql_pool_name="my-sql-pool",
security_alert_policy_name="default",
parameters=security_policy
)
print("Configured security alert policy")from azure.mgmt.synapse.models import DataMaskingPolicy, DataMaskingRule
# Enable data masking
masking_policy = DataMaskingPolicy(
data_masking_state="Enabled"
)
policy = client.data_masking_policies.create_or_update_data_masking_policy(
resource_group_name="my-resource-group",
workspace_name="my-synapse-workspace",
sql_pool_name="my-sql-pool",
parameters=masking_policy
)
# Add masking rule for credit card column
masking_rule = DataMaskingRule(
schema_name="dbo",
table_name="customers",
column_name="credit_card_number",
masking_function="CreditCardNumber"
)
rule = client.data_masking_rules.create_or_update_data_masking_rule(
resource_group_name="my-resource-group",
workspace_name="my-synapse-workspace",
sql_pool_name="my-sql-pool",
data_masking_rule_name="CreditCardRule",
parameters=masking_rule
)
print("Configured data masking for credit card numbers")Install with Tessl CLI
npx tessl i tessl/pypi-azure-mgmt-synapse