Provider package for Microsoft Azure integrations with Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive Power BI integration for managing datasets, triggering refreshes, and monitoring Power BI workspace operations. This provider enables automation of Power BI dataset refresh operations and workspace management through Microsoft Graph API.
Async hook for connecting to and interacting with Power BI services through Microsoft Graph API.
class PowerBIHook(KiotaRequestAdapterHook):
"""
Hook for Power BI operations via Microsoft Graph API.
Provides async methods for dataset management, refresh operations,
and workspace interactions.
"""
async def get_refresh_history(
self,
dataset_id: str,
group_id: str,
top: int | None = None
) -> list[dict[str, Any]]: ...
async def get_refresh_details_by_refresh_id(
self,
dataset_id: str,
group_id: str,
dataset_refresh_id: str
) -> dict[str, str]: ...
async def trigger_dataset_refresh(
self,
dataset_id: str,
group_id: str,
request_body: dict[str, Any] | None = None
) -> str: ...
async def get_workspace_list(self) -> list[str]: ...
async def get_dataset_list(self, *, group_id: str) -> list[str]: ...
async def cancel_dataset_refresh(
self,
dataset_id: str,
group_id: str,
dataset_refresh_id: str
) -> None: ...Operators for triggering and monitoring Power BI dataset refresh operations with support for asynchronous waits and status monitoring.
class PowerBIDatasetRefreshOperator(BaseOperator):
"""
Refreshes a Power BI dataset.
Parameters:
- dataset_id: The dataset id
- group_id: The workspace id
- conn_id: Connection ID for Power BI authentication
- timeout: Time in seconds to wait for terminal status
- check_interval: Seconds between refresh status checks
- request_body: Additional refresh parameters
"""
def __init__(
self,
*,
dataset_id: str,
group_id: str,
conn_id: str = "powerbi_default",
timeout: float = 60 * 60 * 24 * 7,
check_interval: int = 60,
request_body: dict[str, Any] | None = None,
**kwargs,
): ...
def execute(self, context: Context) -> None: ...Sensors for monitoring Power BI dataset refresh status and workspace operations.
class PowerBIDatasetRefreshSensor(BaseSensorOperator):
"""
Sensor for monitoring Power BI dataset refresh completion.
Monitors dataset refresh status until it reaches a terminal state
(completed, failed, or cancelled).
"""
def __init__(
self,
*,
dataset_id: str,
group_id: str,
dataset_refresh_id: str,
conn_id: str = "powerbi_default",
**kwargs,
): ...
def poke(self, context: Context) -> bool: ...Deferrable triggers for async monitoring of Power BI operations.
class PowerBITrigger(BaseTrigger):
"""Base trigger for Power BI async operations."""
def __init__(
self,
conn_id: str,
timeout: float,
proxies: dict | None = None,
api_version: str | None = None,
**kwargs,
): ...
class PowerBIDatasetListTrigger(PowerBITrigger):
"""Trigger for monitoring dataset list operations."""
class PowerBIWorkspaceListTrigger(PowerBITrigger):
"""Trigger for monitoring workspace list operations."""from airflow import DAG
from airflow.providers.microsoft.azure.operators.powerbi import PowerBIDatasetRefreshOperator
from datetime import datetime, timedelta
dag = DAG(
'powerbi_refresh_example',
default_args={'owner': 'data-team'},
description='Refresh Power BI dataset',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False
)
# Trigger dataset refresh
refresh_dataset = PowerBIDatasetRefreshOperator(
task_id='refresh_sales_dataset',
dataset_id='12345678-1234-1234-1234-123456789012',
group_id='87654321-4321-4321-4321-210987654321',
conn_id='powerbi_connection',
timeout=3600, # 1 hour timeout
check_interval=300, # Check every 5 minutes
dag=dag
)# Refresh with specific tables and enhanced refresh type
advanced_refresh = PowerBIDatasetRefreshOperator(
task_id='enhanced_dataset_refresh',
dataset_id='12345678-1234-1234-1234-123456789012',
group_id='87654321-4321-4321-4321-210987654321',
request_body={
"type": "full",
"commitMode": "transactional",
"objects": [
{
"table": "SalesData"
},
{
"table": "CustomerData"
}
]
},
conn_id='powerbi_production',
timeout=7200, # 2 hour timeout
dag=dag
)from airflow.providers.microsoft.azure.sensors.powerbi import PowerBIDatasetRefreshSensor
# Wait for dataset refresh completion
wait_for_refresh = PowerBIDatasetRefreshSensor(
task_id='wait_for_refresh_completion',
dataset_id='12345678-1234-1234-1234-123456789012',
group_id='87654321-4321-4321-4321-210987654321',
dataset_refresh_id='{{ ti.xcom_pull("refresh_sales_dataset") }}',
conn_id='powerbi_connection',
timeout=3600,
poke_interval=300,
dag=dag
)
refresh_dataset >> wait_for_refreshPower BI integration uses Microsoft Graph API authentication methods:
Connection configuration requires appropriate Microsoft Graph API permissions for Power BI operations including Dataset.ReadWrite.All and Workspace.Read.All.
class PowerBIDatasetRefreshStatus:
"""Power BI dataset refresh status constants."""
IN_PROGRESS: str = "In Progress"
FAILED: str = "Failed"
COMPLETED: str = "Completed"
DISABLED: str = "Disabled"
TERMINAL_STATUSES: set[str] = {FAILED, COMPLETED}
FAILURE_STATUSES: set[str] = {FAILED, DISABLED}
class PowerBIDatasetRefreshFields:
"""Power BI refresh dataset detail field names."""
REQUEST_ID: str = "request_id"
STATUS: str = "status"
ERROR: str = "error"
class PowerBIDatasetRefreshException(AirflowException):
"""Exception raised when Power BI dataset refresh fails."""Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure