0
# BigQuery Integration
1
2
Comprehensive BigQuery integration for Dagster providing data warehousing capabilities, I/O managers for reading and writing BigQuery tables, operations for data loading and querying, and resources for BigQuery client management with full authentication support.
3
4
## Capabilities
5
6
### BigQuery Resource
7
8
Configurable resource for BigQuery client management with project specification and authentication.
9
10
```python { .api }
11
class BigQueryResource(ConfigurableResource):
12
"""Resource for BigQuery client management."""
13
project: Optional[str] # GCP project ID
14
location: Optional[str] # Default location for jobs/datasets/tables
15
gcp_credentials: Optional[str] # Base64-encoded service account credentials
16
17
def get_client(self) -> Iterator[bigquery.Client]:
18
"""Context manager yielding authenticated BigQuery client."""
19
```
20
21
Legacy resource factory:
22
23
```python { .api }
24
@resource(
25
config_schema=BigQueryResource.to_config_schema(),
26
description="Dagster resource for connecting to BigQuery"
27
)
28
def bigquery_resource(context) -> Iterator[bigquery.Client]:
29
"""Legacy BigQuery resource factory that yields a BigQuery client."""
30
```
31
32
### I/O Manager
33
34
Configurable I/O manager factory for BigQuery table storage and retrieval.
35
36
```python { .api }
37
class BigQueryIOManager(ConfigurableIOManagerFactory):
38
"""Base class for BigQuery I/O managers."""
39
project: str # GCP project ID
40
dataset: Optional[str] # Default BigQuery dataset
41
location: Optional[str] # GCP location
42
gcp_credentials: Optional[str] # Base64-encoded credentials
43
temporary_gcs_bucket: Optional[str] # Temporary GCS bucket for large operations
44
timeout: Optional[float] # Query timeout for Pandas operations
45
46
def type_handlers(self) -> Sequence[DbTypeHandler]:
47
"""Abstract method to define type handlers."""
48
49
def default_load_type(self) -> Optional[type]:
50
"""Default type for loading data."""
51
52
def create_io_manager(self, context) -> Generator:
53
"""Creates the actual I/O manager instance."""
54
55
def build_bigquery_io_manager(
56
type_handlers: Sequence[DbTypeHandler],
57
default_load_type: Optional[type] = None
58
) -> IOManagerDefinition:
59
"""Factory function for creating BigQuery I/O manager definitions."""
60
```
61
62
### Data Loading Operations
63
64
Operations for importing data from various sources into BigQuery.
65
66
```python { .api }
67
@op(
68
required_resource_keys={"bigquery"},
69
config_schema=define_bigquery_load_config()
70
)
71
def import_df_to_bq(context, df: DataFrame) -> None:
72
"""Import Pandas DataFrame to BigQuery table."""
73
74
@op(
75
required_resource_keys={"bigquery"},
76
config_schema=define_bigquery_load_config()
77
)
78
def import_file_to_bq(context, path: str) -> None:
79
"""Import local file to BigQuery table."""
80
81
@op(
82
required_resource_keys={"bigquery"},
83
config_schema=define_bigquery_load_config()
84
)
85
def import_gcs_paths_to_bq(context, paths: List[str]) -> None:
86
"""Import GCS files to BigQuery table."""
87
```
88
89
### Query Operations
90
91
Operations for executing SQL queries against BigQuery.
92
93
```python { .api }
94
def bq_op_for_queries(sql_queries: List[str]) -> OpDefinition:
95
"""
96
Creates an op that executes BigQuery SQL queries.
97
98
Parameters:
99
- sql_queries: List of SQL queries to execute
100
101
Returns:
102
Op function that returns List[DataFrame]
103
"""
104
```
105
106
### Dataset Management Operations
107
108
Operations for BigQuery dataset lifecycle management.
109
110
```python { .api }
111
@op(
112
required_resource_keys={"bigquery"},
113
config_schema=define_bigquery_create_dataset_config()
114
)
115
def bq_create_dataset(context) -> None:
116
"""
117
Create BigQuery dataset.
118
119
Config:
120
- dataset: str - Dataset identifier
121
- exists_ok: bool - Whether to ignore "already exists" errors
122
"""
123
124
@op(
125
required_resource_keys={"bigquery"},
126
config_schema=define_bigquery_delete_dataset_config()
127
)
128
def bq_delete_dataset(context) -> None:
129
"""
130
Delete BigQuery dataset.
131
132
Config:
133
- dataset: str - Dataset identifier
134
- delete_contents: bool - Whether to delete tables in dataset
135
- not_found_ok: bool - Whether to ignore "not found" errors
136
"""
137
```
138
139
### Utility Functions
140
141
Helper functions for BigQuery operations.
142
143
```python { .api }
144
def fetch_last_updated_timestamps(
145
client: bigquery.Client,
146
dataset_id: str,
147
table_ids: Sequence[str]
148
) -> Mapping[str, datetime]:
149
"""
150
Get last updated timestamps for BigQuery tables.
151
152
Parameters:
153
- client: BigQuery client
154
- dataset_id: Dataset ID
155
- table_ids: List of table IDs
156
157
Returns:
158
Mapping of table ID to timestamp
159
"""
160
```
161
162
### Types and Configuration
163
164
BigQuery-specific types, enums, and configuration schemas.
165
166
```python { .api }
167
class BigQueryError(Exception):
168
"""Exception class for BigQuery-related errors."""
169
170
class BigQueryLoadSource(Enum):
171
"""Enum for BigQuery load sources."""
172
DataFrame = "DataFrame"
173
GCS = "GCS"
174
File = "File"
175
176
class BQCreateDisposition(Enum):
177
"""Table creation behavior."""
178
CREATE_IF_NEEDED = "CREATE_IF_NEEDED"
179
CREATE_NEVER = "CREATE_NEVER"
180
181
class BQWriteDisposition(Enum):
182
"""Write behavior for existing tables."""
183
WRITE_TRUNCATE = "WRITE_TRUNCATE"
184
WRITE_APPEND = "WRITE_APPEND"
185
WRITE_EMPTY = "WRITE_EMPTY"
186
187
class BQSchemaUpdateOption(Enum):
188
"""Schema update options."""
189
ALLOW_FIELD_ADDITION = "ALLOW_FIELD_ADDITION"
190
ALLOW_FIELD_RELAXATION = "ALLOW_FIELD_RELAXATION"
191
192
class BQPriority(Enum):
193
"""Query priority levels."""
194
BATCH = "BATCH"
195
INTERACTIVE = "INTERACTIVE"
196
197
class BQEncoding(Enum):
198
"""File encoding options."""
199
UTF_8 = "UTF-8"
200
ISO_8859_1 = "ISO-8859-1"
201
202
class BQSourceFormat(Enum):
203
"""Source file format options."""
204
CSV = "CSV"
205
NEWLINE_DELIMITED_JSON = "NEWLINE_DELIMITED_JSON"
206
AVRO = "AVRO"
207
PARQUET = "PARQUET"
208
DATASTORE_BACKUP = "DATASTORE_BACKUP"
209
```
210
211
### Configuration Scalars
212
213
Validation scalars for BigQuery identifiers.
214
215
```python { .api }
216
def Table(table_name: str) -> str:
217
"""Validates BigQuery table identifiers."""
218
219
def Dataset(dataset_name: str) -> str:
220
"""Validates BigQuery dataset identifiers."""
221
```
222
223
### Configuration Functions
224
225
Functions that define configuration schemas for BigQuery operations.
226
227
```python { .api }
228
def define_bigquery_query_config() -> ConfigSchema:
229
"""Configuration for query operations."""
230
231
def define_bigquery_load_config() -> ConfigSchema:
232
"""Configuration for load operations."""
233
234
def define_bigquery_create_dataset_config() -> ConfigSchema:
235
"""Configuration for dataset creation."""
236
237
def define_bigquery_delete_dataset_config() -> ConfigSchema:
238
"""Configuration for dataset deletion."""
239
```
240
241
## Usage Examples
242
243
### Basic Resource Usage
244
245
```python
246
from dagster import asset, Definitions
247
from dagster_gcp import BigQueryResource
248
249
@asset
250
def customer_data(bigquery: BigQueryResource):
251
with bigquery.get_client() as client:
252
query = """
253
SELECT customer_id, order_count, total_spent
254
FROM `project.analytics.customer_summary`
255
WHERE last_order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
256
"""
257
return client.query(query).to_dataframe()
258
259
defs = Definitions(
260
assets=[customer_data],
261
resources={
262
"bigquery": BigQueryResource(
263
project="my-gcp-project",
264
location="US"
265
)
266
}
267
)
268
```
269
270
### I/O Manager Usage
271
272
```python
273
from dagster import asset, Definitions
274
from dagster_gcp import BigQueryIOManager, BigQueryResource
275
276
@asset
277
def processed_orders():
278
# This will be stored in BigQuery
279
return pd.DataFrame({
280
'order_id': [1, 2, 3],
281
'amount': [100.0, 250.0, 75.0],
282
'processed_at': [datetime.now()] * 3
283
})
284
285
defs = Definitions(
286
assets=[processed_orders],
287
resources={
288
"io_manager": BigQueryIOManager(
289
project="my-gcp-project",
290
dataset="analytics"
291
)
292
}
293
)
294
```
295
296
### Data Loading Operations
297
298
```python
299
from dagster import job, op
300
from dagster_gcp import import_df_to_bq, BigQueryResource
301
import pandas as pd
302
303
@op
304
def create_sample_data():
305
return pd.DataFrame({
306
'id': [1, 2, 3],
307
'value': ['a', 'b', 'c']
308
})
309
310
@job(
311
resource_defs={
312
"bigquery": BigQueryResource(project="my-project")
313
}
314
)
315
def load_data_job():
316
df = create_sample_data()
317
import_df_to_bq(df)
318
```