or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bigquery.mddataproc.mdgcs.mdindex.mdpipes.md

index.mddocs/

0

# Dagster GCP

1

2

Google Cloud Platform integration components for the Dagster data orchestration framework. This package provides resources, I/O managers, operations, and utilities for building data pipelines that leverage GCP services including BigQuery, Google Cloud Storage, Dataproc, and Pipes for external process communication.

3

4

## Package Information

5

6

- **Package Name**: dagster-gcp

7

- **Language**: Python

8

- **Installation**: `pip install dagster-gcp`

9

10

## Core Imports

11

12

```python

13

import dagster_gcp

14

```

15

16

Common specific imports:

17

18

```python

19

from dagster_gcp import (

20

BigQueryResource, BigQueryIOManager,

21

GCSResource, GCSPickleIOManager,

22

DataprocResource, configurable_dataproc_op

23

)

24

```

25

26

Pipes components (imported from submodule):

27

28

```python

29

from dagster_gcp.pipes import (

30

PipesDataprocJobClient,

31

PipesGCSContextInjector,

32

PipesGCSMessageReader,

33

PipesGCSLogReader

34

)

35

```

36

37

## Basic Usage

38

39

```python

40

from dagster import asset, Definitions

41

from dagster_gcp import BigQueryResource, GCSResource, GCSPickleIOManager

42

43

@asset

44

def my_data_asset(bigquery: BigQueryResource):

45

with bigquery.get_client() as client:

46

query = "SELECT * FROM `project.dataset.table` LIMIT 100"

47

df = client.query(query).to_dataframe()

48

return df

49

50

defs = Definitions(

51

assets=[my_data_asset],

52

resources={

53

"bigquery": BigQueryResource(project="my-gcp-project"),

54

"gcs": GCSResource(project="my-gcp-project"),

55

"io_manager": GCSPickleIOManager(

56

gcs_bucket="my-data-bucket",

57

gcs=GCSResource(project="my-gcp-project")

58

)

59

}

60

)

61

```

62

63

## Architecture

64

65

The dagster-gcp package follows Dagster's resource and I/O manager patterns while providing GCP-specific implementations:

66

67

- **Resources**: Configurable connections to GCP services (BigQuery, GCS, Dataproc)

68

- **I/O Managers**: Data storage and retrieval using GCP storage services

69

- **Operations**: Pre-built ops for common GCP tasks (data loading, job execution)

70

- **Pipes Integration**: External process communication through GCP services

71

72

Each integration supports both modern ConfigurableResource patterns and legacy resource factory functions for backward compatibility.

73

74

## Capabilities

75

76

### BigQuery Integration

77

78

Data warehousing operations including I/O managers for BigQuery tables, operations for data loading and querying, and resources for BigQuery client management with authentication support.

79

80

```python { .api }

81

class BigQueryResource(ConfigurableResource):

82

project: Optional[str]

83

location: Optional[str]

84

gcp_credentials: Optional[str]

85

86

def get_client(self) -> Iterator[bigquery.Client]: ...

87

88

class BigQueryIOManager(ConfigurableIOManagerFactory):

89

project: str

90

dataset: Optional[str]

91

location: Optional[str]

92

gcp_credentials: Optional[str]

93

temporary_gcs_bucket: Optional[str]

94

timeout: Optional[float]

95

```

96

97

[BigQuery Integration](./bigquery.md)

98

99

### Google Cloud Storage (GCS)

100

101

File storage and management including I/O managers for pickled objects, file managers for direct GCS operations, compute log management, and sensor utilities for GCS-based data processing.

102

103

```python { .api }

104

class GCSResource(ConfigurableResource):

105

project: Optional[str]

106

107

def get_client(self) -> storage.Client: ...

108

109

class GCSPickleIOManager(ConfigurableIOManager):

110

gcs: ResourceDependency[GCSResource]

111

gcs_bucket: str

112

gcs_prefix: str = "dagster"

113

114

def load_input(self, context) -> Any: ...

115

def handle_output(self, context, obj) -> None: ...

116

```

117

118

[Google Cloud Storage](./gcs.md)

119

120

### Dataproc Integration

121

122

Apache Spark cluster management and job execution including resources for Dataproc cluster lifecycle management, operations for submitting and monitoring Spark jobs, and comprehensive configuration support for cluster and job parameters.

123

124

```python { .api }

125

class DataprocResource(ConfigurableResource):

126

project_id: str

127

region: str

128

cluster_name: str

129

labels: Optional[dict[str, str]]

130

cluster_config_yaml_path: Optional[str]

131

cluster_config_json_path: Optional[str]

132

cluster_config_dict: Optional[dict]

133

134

def get_client(self) -> DataprocClient: ...

135

136

@op

137

def configurable_dataproc_op(

138

dataproc: DataprocResource,

139

config: DataprocOpConfig

140

) -> Any: ...

141

```

142

143

[Dataproc Integration](./dataproc.md)

144

145

### Pipes Integration

146

147

External process communication through GCP services including clients for running workloads on Dataproc, context injectors for passing data via GCS, and message readers for collecting results and logs from external processes.

148

149

```python { .api }

150

class PipesDataprocJobClient(PipesClient):

151

client: JobControllerClient

152

context_injector: PipesContextInjector

153

message_reader: PipesMessageReader

154

forward_termination: bool = True

155

poll_interval: float = 5.0

156

157

def run(self, context, submit_job_params, extras) -> PipesClientCompletedInvocation: ...

158

159

class PipesGCSContextInjector(PipesContextInjector):

160

bucket: str

161

client: GCSClient

162

key_prefix: Optional[str]

163

```

164

165

[Pipes Integration](./pipes.md)

166

167

## Types

168

169

### Core Types

170

171

```python { .api }

172

# File handle for GCS objects

173

class GCSFileHandle(FileHandle):

174

@property

175

def gcs_bucket(self) -> str: ...

176

177

@property

178

def gcs_key(self) -> str: ...

179

180

@property

181

def gcs_path(self) -> str: ...

182

183

@property

184

def path_desc(self) -> str: ...

185

186

# BigQuery error handling

187

class BigQueryError(Exception): ...

188

189

# Dataproc error handling

190

class DataprocError(Exception): ...

191

```