or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

athena-analytics.mdauthentication.mdbatch-processing.mddata-transfers.mddms-migration.mddynamodb-nosql.mdecs-containers.mdeks-kubernetes.mdemr-clusters.mdglue-processing.mdindex.mdlambda-functions.mdmessaging-sns-sqs.mdrds-databases.mdredshift-warehouse.mds3-storage.mdsagemaker-ml.md

redshift-warehouse.mddocs/

0

# Redshift Data Warehouse

1

2

Amazon Redshift integration for data warehouse operations including cluster management, SQL execution, and data loading. Supports both traditional Redshift connections and the modern Redshift Data API for serverless SQL execution.

3

4

## Capabilities

5

6

### Redshift SQL Hook

7

8

Hook for executing SQL operations against Redshift clusters using traditional database connections.

9

10

```python { .api }

11

class RedshiftSqlHook(AwsBaseHook):

12

def __init__(self, redshift_conn_id: str = 'redshift_default', **kwargs):

13

"""

14

Initialize Redshift SQL Hook.

15

16

Parameters:

17

- redshift_conn_id: Redshift connection ID

18

"""

19

20

def run(self, sql: str, autocommit: bool = False, parameters: dict = None, handler: callable = None) -> Any:

21

"""

22

Execute SQL statement.

23

24

Parameters:

25

- sql: SQL statement to execute

26

- autocommit: Enable autocommit mode

27

- parameters: Query parameters

28

- handler: Result handler function

29

30

Returns:

31

Query results

32

"""

33

34

def get_records(self, sql: str, parameters: dict = None) -> list:

35

"""

36

Execute SQL and return records.

37

38

Parameters:

39

- sql: SQL query to execute

40

- parameters: Query parameters

41

42

Returns:

43

List of result records

44

"""

45

46

def get_first(self, sql: str, parameters: dict = None) -> Any:

47

"""

48

Execute SQL and return first result.

49

50

Parameters:

51

- sql: SQL query to execute

52

- parameters: Query parameters

53

54

Returns:

55

First result record

56

"""

57

58

def get_pandas_df(self, sql: str, parameters: dict = None, **kwargs) -> Any:

59

"""

60

Execute SQL and return pandas DataFrame.

61

62

Parameters:

63

- sql: SQL query to execute

64

- parameters: Query parameters

65

66

Returns:

67

pandas DataFrame with results

68

"""

69

70

def test_connection(self) -> tuple:

71

"""

72

Test Redshift connection.

73

74

Returns:

75

Connection test result tuple (success, message)

76

"""

77

```

78

79

### Redshift Data Hook

80

81

Hook for serverless SQL execution using Redshift Data API.

82

83

```python { .api }

84

class RedshiftDataHook(AwsBaseHook):

85

def __init__(self, aws_conn_id: str = 'aws_default', **kwargs):

86

"""

87

Initialize Redshift Data Hook.

88

89

Parameters:

90

- aws_conn_id: AWS connection ID

91

"""

92

93

def execute_query(self, database: str, sql: str, cluster_identifier: str = None, db_user: str = None, parameters: list = None, secret_arn: str = None, statement_name: str = None, with_event: bool = False, wait_for_completion: bool = True, poll_interval: int = 10) -> str:

94

"""

95

Execute SQL using Redshift Data API.

96

97

Parameters:

98

- database: Database name

99

- sql: SQL statement to execute

100

- cluster_identifier: Redshift cluster identifier

101

- db_user: Database user name

102

- parameters: SQL parameters

103

- secret_arn: AWS Secrets Manager ARN for credentials

104

- statement_name: Statement name for identification

105

- with_event: Enable event notifications

106

- wait_for_completion: Wait for query completion

107

- poll_interval: Polling interval in seconds

108

109

Returns:

110

Statement ID

111

"""

112

113

def describe_statement(self, id: str) -> dict:

114

"""

115

Get statement execution details.

116

117

Parameters:

118

- id: Statement ID

119

120

Returns:

121

Statement details

122

"""

123

124

def get_statement_result(self, id: str, next_token: str = None) -> dict:

125

"""

126

Get statement execution results.

127

128

Parameters:

129

- id: Statement ID

130

- next_token: Pagination token

131

132

Returns:

133

Query results

134

"""

135

136

def cancel_statement(self, id: str) -> bool:

137

"""

138

Cancel running statement.

139

140

Parameters:

141

- id: Statement ID

142

143

Returns:

144

Cancellation success status

145

"""

146

147

def list_statements(self, status: str = None, statement_name: str = None, max_items: int = 100, next_token: str = None) -> dict:

148

"""

149

List executed statements.

150

151

Parameters:

152

- status: Filter by statement status

153

- statement_name: Filter by statement name

154

- max_items: Maximum items to return

155

- next_token: Pagination token

156

157

Returns:

158

List of statements

159

"""

160

```

161

162

### Redshift Cluster Hook

163

164

Hook for Redshift cluster lifecycle management.

165

166

```python { .api }

167

class RedshiftHook(AwsBaseHook):

168

def __init__(self, aws_conn_id: str = 'aws_default', **kwargs):

169

"""

170

Initialize Redshift Cluster Hook.

171

172

Parameters:

173

- aws_conn_id: AWS connection ID

174

"""

175

176

def create_cluster(self, cluster_identifier: str, node_type: str, master_username: str, master_user_password: str, **kwargs) -> dict:

177

"""

178

Create Redshift cluster.

179

180

Parameters:

181

- cluster_identifier: Unique cluster identifier

182

- node_type: Node type (e.g., 'dc2.large')

183

- master_username: Master username

184

- master_user_password: Master password

185

186

Returns:

187

Cluster configuration

188

"""

189

190

def delete_cluster(self, cluster_identifier: str, skip_final_cluster_snapshot: bool = False, final_cluster_snapshot_identifier: str = None) -> dict:

191

"""

192

Delete Redshift cluster.

193

194

Parameters:

195

- cluster_identifier: Cluster identifier

196

- skip_final_cluster_snapshot: Skip final snapshot

197

- final_cluster_snapshot_identifier: Final snapshot identifier

198

199

Returns:

200

Deletion response

201

"""

202

203

def describe_clusters(self, cluster_identifier: str = None) -> dict:

204

"""

205

Describe Redshift clusters.

206

207

Parameters:

208

- cluster_identifier: Specific cluster identifier

209

210

Returns:

211

Cluster descriptions

212

"""

213

214

def pause_cluster(self, cluster_identifier: str) -> dict:

215

"""

216

Pause Redshift cluster.

217

218

Parameters:

219

- cluster_identifier: Cluster identifier

220

221

Returns:

222

Pause response

223

"""

224

225

def resume_cluster(self, cluster_identifier: str) -> dict:

226

"""

227

Resume paused Redshift cluster.

228

229

Parameters:

230

- cluster_identifier: Cluster identifier

231

232

Returns:

233

Resume response

234

"""

235

236

def get_cluster_status(self, cluster_identifier: str) -> str:

237

"""

238

Get cluster status.

239

240

Parameters:

241

- cluster_identifier: Cluster identifier

242

243

Returns:

244

Current cluster status

245

"""

246

```

247

248

### Redshift Operators

249

250

Task implementations for Redshift operations.

251

252

```python { .api }

253

class RedshiftSqlOperator(BaseOperator):

254

def __init__(self, sql: str, redshift_conn_id: str = 'redshift_default', parameters: dict = None, autocommit: bool = True, **kwargs):

255

"""

256

Execute SQL on Redshift cluster.

257

258

Parameters:

259

- sql: SQL statement to execute

260

- redshift_conn_id: Redshift connection ID

261

- parameters: SQL parameters

262

- autocommit: Enable autocommit mode

263

"""

264

265

class RedshiftDataOperator(BaseOperator):

266

def __init__(self, database: str, sql: str, cluster_identifier: str = None, db_user: str = None, secret_arn: str = None, statement_name: str = None, parameters: list = None, poll_interval: int = 10, aws_conn_id: str = 'aws_default', **kwargs):

267

"""

268

Execute SQL using Redshift Data API.

269

270

Parameters:

271

- database: Database name

272

- sql: SQL statement to execute

273

- cluster_identifier: Redshift cluster identifier

274

- db_user: Database user name

275

- secret_arn: AWS Secrets Manager ARN for credentials

276

- statement_name: Statement name

277

- parameters: SQL parameters

278

- poll_interval: Polling interval in seconds

279

- aws_conn_id: AWS connection ID

280

"""

281

282

class RedshiftCreateClusterOperator(BaseOperator):

283

def __init__(self, cluster_identifier: str, node_type: str, master_username: str, master_user_password: str, publicly_accessible: bool = True, port: int = 5439, aws_conn_id: str = 'aws_default', **kwargs):

284

"""

285

Create Redshift cluster.

286

287

Parameters:

288

- cluster_identifier: Cluster identifier

289

- node_type: Node type

290

- master_username: Master username

291

- master_user_password: Master password

292

- publicly_accessible: Public accessibility

293

- port: Database port

294

- aws_conn_id: AWS connection ID

295

"""

296

297

class RedshiftDeleteClusterOperator(BaseOperator):

298

def __init__(self, cluster_identifier: str, skip_final_cluster_snapshot: bool = False, final_cluster_snapshot_identifier: str = None, aws_conn_id: str = 'aws_default', **kwargs):

299

"""

300

Delete Redshift cluster.

301

302

Parameters:

303

- cluster_identifier: Cluster identifier

304

- skip_final_cluster_snapshot: Skip final snapshot

305

- final_cluster_snapshot_identifier: Final snapshot identifier

306

- aws_conn_id: AWS connection ID

307

"""

308

309

class RedshiftPauseClusterOperator(BaseOperator):

310

def __init__(self, cluster_identifier: str, aws_conn_id: str = 'aws_default', **kwargs):

311

"""

312

Pause Redshift cluster.

313

314

Parameters:

315

- cluster_identifier: Cluster identifier

316

- aws_conn_id: AWS connection ID

317

"""

318

319

class RedshiftResumeClusterOperator(BaseOperator):

320

def __init__(self, cluster_identifier: str, aws_conn_id: str = 'aws_default', **kwargs):

321

"""

322

Resume Redshift cluster.

323

324

Parameters:

325

- cluster_identifier: Cluster identifier

326

- aws_conn_id: AWS connection ID

327

"""

328

```

329

330

### Redshift Sensors

331

332

Monitoring tasks for Redshift cluster states and query execution.

333

334

```python { .api }

335

class RedshiftClusterSensor(BaseSensorOperator):

336

def __init__(self, cluster_identifier: str, target_status: str = 'available', aws_conn_id: str = 'aws_default', **kwargs):

337

"""

338

Wait for Redshift cluster to reach target status.

339

340

Parameters:

341

- cluster_identifier: Cluster identifier

342

- target_status: Target cluster status

343

- aws_conn_id: AWS connection ID

344

"""

345

```

346

347

## Usage Examples

348

349

### Data Warehouse Operations

350

351

```python

352

from airflow import DAG

353

from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSqlOperator

354

from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator

355

356

dag = DAG('redshift_analytics', start_date=datetime(2023, 1, 1))

357

358

# Create staging tables

359

create_staging = RedshiftSqlOperator(

360

task_id='create_staging_tables',

361

redshift_conn_id='redshift_prod',

362

sql="""

363

CREATE TABLE IF NOT EXISTS staging.sales_data (

364

transaction_id VARCHAR(50),

365

customer_id INTEGER,

366

product_id INTEGER,

367

quantity INTEGER,

368

price DECIMAL(10,2),

369

transaction_date DATE,

370

region VARCHAR(50)

371

);

372

373

TRUNCATE TABLE staging.sales_data;

374

""",

375

dag=dag

376

)

377

378

# Load data from S3

379

load_data = S3ToRedshiftOperator(

380

task_id='load_from_s3',

381

schema='staging',

382

table='sales_data',

383

s3_bucket='data-warehouse-staging',

384

s3_key='sales/{{ ds }}/',

385

redshift_conn_id='redshift_prod',

386

copy_options=[

387

"CSV",

388

"IGNOREHEADER 1",

389

"TIMEFORMAT 'YYYY-MM-DD'",

390

"TRUNCATECOLUMNS"

391

],

392

dag=dag

393

)

394

395

# Transform and load to production tables

396

transform_data = RedshiftSqlOperator(

397

task_id='transform_and_load',

398

redshift_conn_id='redshift_prod',

399

sql="""

400

-- Update dimension tables

401

INSERT INTO dim_customers (customer_id, region)

402

SELECT DISTINCT customer_id, region

403

FROM staging.sales_data s

404

WHERE NOT EXISTS (

405

SELECT 1 FROM dim_customers d

406

WHERE d.customer_id = s.customer_id

407

);

408

409

-- Insert fact data

410

INSERT INTO fact_sales (

411

transaction_id, customer_id, product_id,

412

quantity, price, transaction_date

413

)

414

SELECT

415

transaction_id, customer_id, product_id,

416

quantity, price, transaction_date

417

FROM staging.sales_data;

418

419

-- Update statistics

420

ANALYZE fact_sales;

421

ANALYZE dim_customers;

422

""",

423

dag=dag

424

)

425

426

create_staging >> load_data >> transform_data

427

```

428

429

### Redshift Data API Usage

430

431

```python

432

from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator

433

434

# Execute query using Data API

435

analyze_sales = RedshiftDataOperator(

436

task_id='analyze_sales_data',

437

database='analytics',

438

cluster_identifier='analytics-cluster',

439

sql="""

440

SELECT

441

region,

442

DATE_TRUNC('month', transaction_date) as month,

443

SUM(quantity * price) as revenue,

444

COUNT(*) as transaction_count

445

FROM fact_sales

446

WHERE transaction_date >= CURRENT_DATE - INTERVAL '3 months'

447

GROUP BY region, month

448

ORDER BY region, month;

449

""",

450

statement_name='monthly_sales_analysis',

451

aws_conn_id='aws_default',

452

dag=dag

453

)

454

```

455

456

### Cluster Lifecycle Management

457

458

```python

459

from airflow.providers.amazon.aws.operators.redshift_cluster import (

460

RedshiftCreateClusterOperator,

461

RedshiftPauseClusterOperator,

462

RedshiftResumeClusterOperator

463

)

464

from airflow.providers.amazon.aws.sensors.redshift_cluster import RedshiftClusterSensor

465

466

# Resume cluster for processing

467

resume_cluster = RedshiftResumeClusterOperator(

468

task_id='resume_cluster',

469

cluster_identifier='analytics-cluster',

470

dag=dag

471

)

472

473

# Wait for cluster to be available

474

wait_for_cluster = RedshiftClusterSensor(

475

task_id='wait_for_available',

476

cluster_identifier='analytics-cluster',

477

target_status='available',

478

timeout=1800, # 30 minutes

479

dag=dag

480

)

481

482

# Run analytics workload

483

run_analytics = RedshiftSqlOperator(

484

task_id='run_analytics',

485

sql='CALL analytics.run_monthly_reports();',

486

redshift_conn_id='redshift_prod',

487

dag=dag

488

)

489

490

# Pause cluster to save costs

491

pause_cluster = RedshiftPauseClusterOperator(

492

task_id='pause_cluster',

493

cluster_identifier='analytics-cluster',

494

dag=dag

495

)

496

497

resume_cluster >> wait_for_cluster >> run_analytics >> pause_cluster

498

```

499

500

## Types

501

502

```python { .api }

503

# Redshift cluster states

504

class RedshiftClusterState:

505

AVAILABLE = 'available'

506

CREATING = 'creating'

507

DELETING = 'deleting'

508

FINAL_SNAPSHOT = 'final-snapshot'

509

HARDWARE_FAILURE = 'hardware-failure'

510

INCOMPATIBLE_HSMS = 'incompatible-hsm'

511

INCOMPATIBLE_NETWORK = 'incompatible-network'

512

INCOMPATIBLE_PARAMETERS = 'incompatible-parameters'

513

INCOMPATIBLE_RESTORE = 'incompatible-restore'

514

MODIFYING = 'modifying'

515

PAUSED = 'paused'

516

REBOOTING = 'rebooting'

517

RENAMING = 'renaming'

518

RESIZING = 'resizing'

519

ROTATING_KEYS = 'rotating-keys'

520

STORAGE_FULL = 'storage-full'

521

UPDATING_HSMS = 'updating-hsms'

522

523

# Redshift node types

524

class RedshiftNodeType:

525

DC2_LARGE = 'dc2.large'

526

DC2_8XLARGE = 'dc2.8xlarge'

527

DS2_XLARGE = 'ds2.xlarge'

528

DS2_8XLARGE = 'ds2.8xlarge'

529

RA3_XLPLUS = 'ra3.xlplus'

530

RA3_4XLARGE = 'ra3.4xlarge'

531

RA3_16XLARGE = 'ra3.16xlarge'

532

533

# Statement status for Data API

534

class StatementStatus:

535

SUBMITTED = 'SUBMITTED'

536

PICKED = 'PICKED'

537

STARTED = 'STARTED'

538

FINISHED = 'FINISHED'

539

ABORTED = 'ABORTED'

540

FAILED = 'FAILED'

541

542

# Cluster configuration

543

class RedshiftClusterConfig:

544

cluster_identifier: str

545

node_type: str

546

master_username: str

547

db_name: str = None

548

port: int = 5439

549

cluster_type: str = 'multi-node'

550

number_of_nodes: int = 1

551

publicly_accessible: bool = True

552

encrypted: bool = False

553

hsm_client_certificate_identifier: str = None

554

hsm_configuration_identifier: str = None

555

elastic_ip: str = None

556

cluster_parameter_group_name: str = None

557

cluster_subnet_group_name: str = None

558

availability_zone: str = None

559

preferred_maintenance_window: str = None

560

cluster_version: str = None

561

allow_version_upgrade: bool = True

562

automated_snapshot_retention_period: int = 1

563

manual_snapshot_retention_period: int = None

564

snapshot_identifier: str = None

565

snapshot_cluster_identifier: str = None

566

owner_account: str = None

567

additional_info: str = None

568

kms_key_id: str = None

569

enhanced_vpc_routing: bool = False

570

iam_roles: list = None

571

maintenance_track_name: str = None

572

snapshot_schedule_identifier: str = None

573

aqua_configuration_status: str = None

574

default_iam_role_arn: str = None

575

tags: list = None

576

```