or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bigquery.mddataproc.mdgcs.mdindex.mdpipes.md

bigquery.mddocs/

0

# BigQuery Integration

1

2

Comprehensive BigQuery integration for Dagster providing data warehousing capabilities, I/O managers for reading and writing BigQuery tables, operations for data loading and querying, and resources for BigQuery client management with full authentication support.

3

4

## Capabilities

5

6

### BigQuery Resource

7

8

Configurable resource for BigQuery client management with project specification and authentication.

9

10

```python { .api }

11

class BigQueryResource(ConfigurableResource):

12

"""Resource for BigQuery client management."""

13

project: Optional[str] # GCP project ID

14

location: Optional[str] # Default location for jobs/datasets/tables

15

gcp_credentials: Optional[str] # Base64-encoded service account credentials

16

17

def get_client(self) -> Iterator[bigquery.Client]:

18

"""Context manager yielding authenticated BigQuery client."""

19

```

20

21

Legacy resource factory:

22

23

```python { .api }

24

@resource(

25

config_schema=BigQueryResource.to_config_schema(),

26

description="Dagster resource for connecting to BigQuery"

27

)

28

def bigquery_resource(context) -> Iterator[bigquery.Client]:

29

"""Legacy BigQuery resource factory that yields a BigQuery client."""

30

```

31

32

### I/O Manager

33

34

Configurable I/O manager factory for BigQuery table storage and retrieval.

35

36

```python { .api }

37

class BigQueryIOManager(ConfigurableIOManagerFactory):

38

"""Base class for BigQuery I/O managers."""

39

project: str # GCP project ID

40

dataset: Optional[str] # Default BigQuery dataset

41

location: Optional[str] # GCP location

42

gcp_credentials: Optional[str] # Base64-encoded credentials

43

temporary_gcs_bucket: Optional[str] # Temporary GCS bucket for large operations

44

timeout: Optional[float] # Query timeout for Pandas operations

45

46

def type_handlers(self) -> Sequence[DbTypeHandler]:

47

"""Abstract method to define type handlers."""

48

49

def default_load_type(self) -> Optional[type]:

50

"""Default type for loading data."""

51

52

def create_io_manager(self, context) -> Generator:

53

"""Creates the actual I/O manager instance."""

54

55

def build_bigquery_io_manager(

56

type_handlers: Sequence[DbTypeHandler],

57

default_load_type: Optional[type] = None

58

) -> IOManagerDefinition:

59

"""Factory function for creating BigQuery I/O manager definitions."""

60

```

61

62

### Data Loading Operations

63

64

Operations for importing data from various sources into BigQuery.

65

66

```python { .api }

67

@op(

68

required_resource_keys={"bigquery"},

69

config_schema=define_bigquery_load_config()

70

)

71

def import_df_to_bq(context, df: DataFrame) -> None:

72

"""Import Pandas DataFrame to BigQuery table."""

73

74

@op(

75

required_resource_keys={"bigquery"},

76

config_schema=define_bigquery_load_config()

77

)

78

def import_file_to_bq(context, path: str) -> None:

79

"""Import local file to BigQuery table."""

80

81

@op(

82

required_resource_keys={"bigquery"},

83

config_schema=define_bigquery_load_config()

84

)

85

def import_gcs_paths_to_bq(context, paths: List[str]) -> None:

86

"""Import GCS files to BigQuery table."""

87

```

88

89

### Query Operations

90

91

Operations for executing SQL queries against BigQuery.

92

93

```python { .api }

94

def bq_op_for_queries(sql_queries: List[str]) -> OpDefinition:

95

"""

96

Creates an op that executes BigQuery SQL queries.

97

98

Parameters:

99

- sql_queries: List of SQL queries to execute

100

101

Returns:

102

Op function that returns List[DataFrame]

103

"""

104

```

105

106

### Dataset Management Operations

107

108

Operations for BigQuery dataset lifecycle management.

109

110

```python { .api }

111

@op(

112

required_resource_keys={"bigquery"},

113

config_schema=define_bigquery_create_dataset_config()

114

)

115

def bq_create_dataset(context) -> None:

116

"""

117

Create BigQuery dataset.

118

119

Config:

120

- dataset: str - Dataset identifier

121

- exists_ok: bool - Whether to ignore "already exists" errors

122

"""

123

124

@op(

125

required_resource_keys={"bigquery"},

126

config_schema=define_bigquery_delete_dataset_config()

127

)

128

def bq_delete_dataset(context) -> None:

129

"""

130

Delete BigQuery dataset.

131

132

Config:

133

- dataset: str - Dataset identifier

134

- delete_contents: bool - Whether to delete tables in dataset

135

- not_found_ok: bool - Whether to ignore "not found" errors

136

"""

137

```

138

139

### Utility Functions

140

141

Helper functions for BigQuery operations.

142

143

```python { .api }

144

def fetch_last_updated_timestamps(

145

client: bigquery.Client,

146

dataset_id: str,

147

table_ids: Sequence[str]

148

) -> Mapping[str, datetime]:

149

"""

150

Get last updated timestamps for BigQuery tables.

151

152

Parameters:

153

- client: BigQuery client

154

- dataset_id: Dataset ID

155

- table_ids: List of table IDs

156

157

Returns:

158

Mapping of table ID to timestamp

159

"""

160

```

161

162

### Types and Configuration

163

164

BigQuery-specific types, enums, and configuration schemas.

165

166

```python { .api }

167

class BigQueryError(Exception):

168

"""Exception class for BigQuery-related errors."""

169

170

class BigQueryLoadSource(Enum):

171

"""Enum for BigQuery load sources."""

172

DataFrame = "DataFrame"

173

GCS = "GCS"

174

File = "File"

175

176

class BQCreateDisposition(Enum):

177

"""Table creation behavior."""

178

CREATE_IF_NEEDED = "CREATE_IF_NEEDED"

179

CREATE_NEVER = "CREATE_NEVER"

180

181

class BQWriteDisposition(Enum):

182

"""Write behavior for existing tables."""

183

WRITE_TRUNCATE = "WRITE_TRUNCATE"

184

WRITE_APPEND = "WRITE_APPEND"

185

WRITE_EMPTY = "WRITE_EMPTY"

186

187

class BQSchemaUpdateOption(Enum):

188

"""Schema update options."""

189

ALLOW_FIELD_ADDITION = "ALLOW_FIELD_ADDITION"

190

ALLOW_FIELD_RELAXATION = "ALLOW_FIELD_RELAXATION"

191

192

class BQPriority(Enum):

193

"""Query priority levels."""

194

BATCH = "BATCH"

195

INTERACTIVE = "INTERACTIVE"

196

197

class BQEncoding(Enum):

198

"""File encoding options."""

199

UTF_8 = "UTF-8"

200

ISO_8859_1 = "ISO-8859-1"

201

202

class BQSourceFormat(Enum):

203

"""Source file format options."""

204

CSV = "CSV"

205

NEWLINE_DELIMITED_JSON = "NEWLINE_DELIMITED_JSON"

206

AVRO = "AVRO"

207

PARQUET = "PARQUET"

208

DATASTORE_BACKUP = "DATASTORE_BACKUP"

209

```

210

211

### Configuration Scalars

212

213

Validation scalars for BigQuery identifiers.

214

215

```python { .api }

216

def Table(table_name: str) -> str:

217

"""Validates BigQuery table identifiers."""

218

219

def Dataset(dataset_name: str) -> str:

220

"""Validates BigQuery dataset identifiers."""

221

```

222

223

### Configuration Functions

224

225

Functions that define configuration schemas for BigQuery operations.

226

227

```python { .api }

228

def define_bigquery_query_config() -> ConfigSchema:

229

"""Configuration for query operations."""

230

231

def define_bigquery_load_config() -> ConfigSchema:

232

"""Configuration for load operations."""

233

234

def define_bigquery_create_dataset_config() -> ConfigSchema:

235

"""Configuration for dataset creation."""

236

237

def define_bigquery_delete_dataset_config() -> ConfigSchema:

238

"""Configuration for dataset deletion."""

239

```

240

241

## Usage Examples

242

243

### Basic Resource Usage

244

245

```python

246

from dagster import asset, Definitions

247

from dagster_gcp import BigQueryResource

248

249

@asset

250

def customer_data(bigquery: BigQueryResource):

251

with bigquery.get_client() as client:

252

query = """

253

SELECT customer_id, order_count, total_spent

254

FROM `project.analytics.customer_summary`

255

WHERE last_order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)

256

"""

257

return client.query(query).to_dataframe()

258

259

defs = Definitions(

260

assets=[customer_data],

261

resources={

262

"bigquery": BigQueryResource(

263

project="my-gcp-project",

264

location="US"

265

)

266

}

267

)

268

```

269

270

### I/O Manager Usage

271

272

```python

273

from dagster import asset, Definitions

274

from dagster_gcp import BigQueryIOManager, BigQueryResource

275

276

@asset

277

def processed_orders():

278

# This will be stored in BigQuery

279

return pd.DataFrame({

280

'order_id': [1, 2, 3],

281

'amount': [100.0, 250.0, 75.0],

282

'processed_at': [datetime.now()] * 3

283

})

284

285

defs = Definitions(

286

assets=[processed_orders],

287

resources={

288

"io_manager": BigQueryIOManager(

289

project="my-gcp-project",

290

dataset="analytics"

291

)

292

}

293

)

294

```

295

296

### Data Loading Operations

297

298

```python

299

from dagster import job, op

300

from dagster_gcp import import_df_to_bq, BigQueryResource

301

import pandas as pd

302

303

@op

304

def create_sample_data():

305

return pd.DataFrame({

306

'id': [1, 2, 3],

307

'value': ['a', 'b', 'c']

308

})

309

310

@job(

311

resource_defs={

312

"bigquery": BigQueryResource(project="my-project")

313

}

314

)

315

def load_data_job():

316

df = create_sample_data()

317

import_df_to_bq(df)

318

```