or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-apache-airflow-providers-dbt-cloud

Provider package for integrating Apache Airflow with dbt Cloud for data transformation workflow orchestration

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-dbt-cloud@4.4.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-dbt-cloud@4.4.0

0

# Apache Airflow Providers dbt Cloud

1

2

An Apache Airflow provider package that enables seamless integration with dbt Cloud for orchestrating data transformation workflows. This provider offers comprehensive connectivity to dbt Cloud's analytics engineering platform, allowing teams to trigger dbt Cloud jobs, monitor their status, retrieve artifacts, and integrate with data lineage systems within Airflow DAGs.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-dbt-cloud

7

- **Package Type**: pip

8

- **Language**: Python

9

- **Installation**: `pip install apache-airflow-providers-dbt-cloud`

10

- **Minimum Airflow Version**: 2.10.0

11

12

## Core Imports

13

14

```python

15

from airflow.providers.dbt.cloud.hooks.dbt import (

16

DbtCloudHook,

17

DbtCloudJobRunStatus,

18

DbtCloudJobRunException,

19

DbtCloudResourceLookupError,

20

JobRunInfo

21

)

22

from airflow.providers.dbt.cloud.operators.dbt import (

23

DbtCloudRunJobOperator,

24

DbtCloudGetJobRunArtifactOperator,

25

DbtCloudListJobsOperator

26

)

27

from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor

28

from airflow.providers.dbt.cloud.triggers.dbt import DbtCloudRunJobTrigger

29

```

30

31

## Basic Usage

32

33

```python

34

from datetime import datetime, timedelta

35

from airflow import DAG

36

from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator

37

from airflow.providers.dbt.cloud.sensors.dbt import DbtCloudJobRunSensor

38

39

# Define default arguments

40

default_args = {

41

'owner': 'data-team',

42

'depends_on_past': False,

43

'start_date': datetime(2024, 1, 1),

44

'email_on_failure': False,

45

'email_on_retry': False,

46

'retries': 1,

47

'retry_delay': timedelta(minutes=5),

48

}

49

50

# Create DAG

51

dag = DAG(

52

'dbt_cloud_workflow',

53

default_args=default_args,

54

description='Run dbt Cloud transformation job',

55

schedule_interval=timedelta(hours=6),

56

catchup=False,

57

)

58

59

# Run a dbt Cloud job

60

run_dbt_job = DbtCloudRunJobOperator(

61

task_id='run_dbt_models',

62

dbt_cloud_conn_id='dbt_cloud_default',

63

job_id=12345,

64

check_interval=60,

65

timeout=3600,

66

dag=dag,

67

)

68

69

# Monitor job completion with sensor

70

wait_for_completion = DbtCloudJobRunSensor(

71

task_id='wait_for_dbt_completion',

72

dbt_cloud_conn_id='dbt_cloud_default',

73

run_id="{{ task_instance.xcom_pull(task_ids='run_dbt_models') }}",

74

timeout=3600,

75

poke_interval=60,

76

dag=dag,

77

)

78

79

run_dbt_job >> wait_for_completion

80

```

81

82

## Architecture

83

84

The dbt Cloud provider follows Airflow's standard provider pattern with four main component types:

85

86

- **Hooks**: Low-level interface for direct dbt Cloud API communication

87

- **Operators**: High-level task implementations for common dbt Cloud operations

88

- **Sensors**: Monitoring tasks that wait for specific dbt Cloud job states

89

- **Triggers**: Async components for deferrable operations and efficient resource usage

90

91

The provider integrates with Airflow's connection system for secure credential management and supports both synchronous and asynchronous (deferrable) task execution patterns.

92

93

## Capabilities

94

95

### dbt Cloud Hook

96

97

Low-level interface for comprehensive dbt Cloud API interaction, providing methods for account management, project operations, job execution, and artifact retrieval.

98

99

```python { .api }

100

class DbtCloudHook:

101

def __init__(self, dbt_cloud_conn_id: str = "dbt_cloud_default"): ...

102

def get_conn(self) -> Session: ...

103

def test_connection(self) -> tuple[bool, str]: ...

104

def trigger_job_run(self, job_id: int, cause: str, **kwargs) -> Response: ...

105

def get_job_run_status(self, run_id: int, **kwargs) -> int: ...

106

def wait_for_job_run_status(self, run_id: int, **kwargs) -> bool: ...

107

```

108

109

[dbt Cloud Hook](./hooks.md)

110

111

### Job Execution Operators

112

113

Operators for triggering and managing dbt Cloud job executions with comprehensive configuration options and integration capabilities.

114

115

```python { .api }

116

class DbtCloudRunJobOperator:

117

def __init__(

118

self,

119

job_id: int | None = None,

120

project_name: str | None = None,

121

environment_name: str | None = None,

122

job_name: str | None = None,

123

**kwargs

124

): ...

125

126

class DbtCloudGetJobRunArtifactOperator:

127

def __init__(self, run_id: int, path: str, **kwargs): ...

128

129

class DbtCloudListJobsOperator:

130

def __init__(self, account_id: int | None = None, **kwargs): ...

131

```

132

133

[Job Execution Operators](./operators.md)

134

135

### Job Monitoring Sensor

136

137

Sensor for monitoring dbt Cloud job run status with support for both polling and deferrable execution modes.

138

139

```python { .api }

140

class DbtCloudJobRunSensor:

141

def __init__(

142

self,

143

run_id: int,

144

account_id: int | None = None,

145

deferrable: bool = False,

146

**kwargs

147

): ...

148

```

149

150

[Job Monitoring Sensor](./sensors.md)

151

152

### Async Job Trigger

153

154

Async trigger for efficient monitoring of dbt Cloud job status in deferrable tasks.

155

156

```python { .api }

157

class DbtCloudRunJobTrigger:

158

def __init__(

159

self,

160

conn_id: str,

161

run_id: int,

162

end_time: float,

163

poll_interval: float,

164

account_id: int | None

165

): ...

166

```

167

168

[Async Job Trigger](./triggers.md)

169

170

### OpenLineage Integration

171

172

Automatic data lineage tracking integration with OpenLineage for comprehensive visibility across dbt transformations and Airflow workflows.

173

174

```python { .api }

175

def generate_openlineage_events_from_dbt_cloud_run(

176

operator: DbtCloudRunJobOperator | DbtCloudJobRunSensor,

177

task_instance: TaskInstance

178

) -> OperatorLineage: ...

179

```

180

181

[OpenLineage Integration](./openlineage.md)

182

183

## Connection Configuration

184

185

Create a dbt Cloud connection in Airflow with the following parameters:

186

187

- **Connection Id**: `dbt_cloud_default` (or custom name)

188

- **Connection Type**: `dbt Cloud`

189

- **Host**: `cloud.getdbt.com` (or your custom dbt Cloud instance, defaults to cloud.getdbt.com)

190

- **Login**: Account ID (optional, used as fallback when account_id not provided in methods)

191

- **Password**: Your dbt Cloud API token (required)

192

- **Extra**: JSON configuration for additional options:

193

```json

194

{

195

"account_id": 12345,

196

"proxies": {

197

"http": "http://proxy.company.com:8080",

198

"https": "https://proxy.company.com:8080"

199

}

200

}

201

```

202

203

### Connection Form UI Fields

204

205

The connection form in Airflow UI uses the following field mappings:

206

- **Account ID** (labeled as "Login"): Default account ID

207

- **API Token** (labeled as "Password"): dbt Cloud API token

208

- **Tenant** (labeled as "Host"): dbt Cloud instance URL

209

- **Extra**: Optional JSON-formatted configuration

210

211

## Types

212

213

```python { .api }

214

from enum import Enum

215

from typing import TypedDict, Sequence

216

from airflow.exceptions import AirflowException

217

from requests import PreparedRequest

218

219

class DbtCloudJobRunStatus(Enum):

220

QUEUED = 1

221

STARTING = 2

222

RUNNING = 3

223

SUCCESS = 10

224

ERROR = 20

225

CANCELLED = 30

226

NON_TERMINAL_STATUSES = (QUEUED, STARTING, RUNNING)

227

TERMINAL_STATUSES = (SUCCESS, ERROR, CANCELLED)

228

229

@classmethod

230

def check_is_valid(cls, statuses: int | Sequence[int] | set[int]) -> None: ...

231

232

@classmethod

233

def is_terminal(cls, status: int) -> bool: ...

234

235

class JobRunInfo(TypedDict):

236

account_id: int | None

237

run_id: int

238

239

class DbtCloudJobRunException(AirflowException):

240

"""Exception raised when a dbt Cloud job run fails."""

241

242

class DbtCloudResourceLookupError(AirflowException):

243

"""Exception raised when a dbt Cloud resource cannot be found."""

244

245

class TokenAuth:

246

"""Helper class for Auth when executing requests."""

247

def __init__(self, token: str): ...

248

def __call__(self, request: PreparedRequest) -> PreparedRequest: ...

249

```