or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

athena-analytics.mdauthentication.mdbatch-processing.mddata-transfers.mddms-migration.mddynamodb-nosql.mdecs-containers.mdeks-kubernetes.mdemr-clusters.mdglue-processing.mdindex.mdlambda-functions.mdmessaging-sns-sqs.mdrds-databases.mdredshift-warehouse.mds3-storage.mdsagemaker-ml.md

athena-analytics.mddocs/

0

# Amazon Athena Analytics

1

2

Amazon Athena provides serverless SQL query capabilities for data stored in Amazon S3, enabling interactive analytics and data processing through standard SQL syntax without managing infrastructure.

3

4

## Capabilities

5

6

### SQL Query Execution

7

8

Execute Trino/Presto SQL queries against data in S3 with comprehensive result management and monitoring.

9

10

```python { .api }

11

class AthenaOperator(AwsBaseOperator):

12

"""

13

Submit a Trino/Presto query to Amazon Athena.

14

15

Parameters:

16

- query: str - Trino/Presto query to be run on Amazon Athena

17

- database: str - database to select

18

- catalog: str - catalog to select

19

- output_location: str - S3 path to write query results

20

- client_request_token: str - unique token to avoid duplicate executions

21

- workgroup: str - Athena workgroup for query execution (default: 'primary')

22

- query_execution_context: dict - context for query execution

23

- result_configuration: dict - configuration for results storage and encryption

24

- sleep_time: int - time in seconds between status checks (default: 30)

25

- max_polling_attempts: int - number of polling attempts before timeout

26

- log_query: bool - whether to log query and execution parameters (default: True)

27

- deferrable: bool - run operator in deferrable mode

28

- poll_interval: int - polling interval for deferrable mode

29

- aws_conn_id: str - Airflow connection for AWS credentials

30

31

Returns:

32

str: Query execution ID

33

"""

34

def __init__(

35

self,

36

*,

37

query: str,

38

database: str,

39

catalog: str = None,

40

output_location: str = None,

41

client_request_token: str = None,

42

workgroup: str = "primary",

43

query_execution_context: dict[str, str] = None,

44

result_configuration: dict = None,

45

sleep_time: int = 30,

46

max_polling_attempts: int = None,

47

log_query: bool = True,

48

deferrable: bool = False,

49

poll_interval: int = 30,

50

**kwargs

51

): ...

52

```

53

54

### Query Status Monitoring

55

56

Monitor Athena query execution status with configurable polling and timeout settings.

57

58

```python { .api }

59

class AthenaSensor(BaseSensorOperator):

60

"""

61

Wait for an Amazon Athena query to complete.

62

63

Parameters:

64

- query_execution_id: str - Athena query execution ID to monitor

65

- max_retries: int - maximum number of status check retries

66

- aws_conn_id: str - Airflow connection for AWS credentials

67

- sleep_time: int - time between status checks

68

- poke_interval: int - sensor poke interval

69

- timeout: int - maximum time to wait for completion

70

71

Returns:

72

bool: True when query completes successfully

73

"""

74

def __init__(

75

self,

76

query_execution_id: str,

77

max_retries: int = None,

78

aws_conn_id: str = 'aws_default',

79

sleep_time: int = 30,

80

**kwargs

81

): ...

82

```

83

84

### Athena Service Hook

85

86

Low-level Athena service operations for query management and result retrieval.

87

88

```python { .api }

89

class AthenaHook(AwsBaseHook):

90

"""

91

Hook for Amazon Athena service operations.

92

93

Parameters:

94

- aws_conn_id: str - Airflow connection for AWS credentials

95

- region_name: str - AWS region name

96

- verify: bool - whether to verify SSL certificates

97

- botocore_config: dict - botocore client configuration

98

"""

99

def __init__(

100

self,

101

aws_conn_id: str = 'aws_default',

102

region_name: str = None,

103

verify: bool = None,

104

botocore_config: dict = None,

105

**kwargs

106

): ...

107

108

def run_query(

109

self,

110

query: str,

111

query_context: dict,

112

result_configuration: dict,

113

client_request_token: str = None,

114

workgroup: str = 'primary'

115

) -> str:

116

"""

117

Run a query on Amazon Athena.

118

119

Parameters:

120

- query: str - SQL query to execute

121

- query_context: dict - query execution context

122

- result_configuration: dict - result storage configuration

123

- client_request_token: str - unique request token

124

- workgroup: str - Athena workgroup name

125

126

Returns:

127

str: Query execution ID

128

"""

129

...

130

131

def check_query_status(self, query_execution_id: str) -> str:

132

"""

133

Check the status of a submitted query.

134

135

Parameters:

136

- query_execution_id: str - query execution ID

137

138

Returns:

139

str: Query status ('QUEUED', 'RUNNING', 'SUCCEEDED', 'FAILED', 'CANCELLED')

140

"""

141

...

142

143

def get_query_results(

144

self,

145

query_execution_id: str,

146

next_token_id: str = None,

147

max_results: int = 1000

148

) -> dict:

149

"""

150

Get query results from Amazon Athena.

151

152

Parameters:

153

- query_execution_id: str - query execution ID

154

- next_token_id: str - pagination token

155

- max_results: int - maximum number of results to return

156

157

Returns:

158

dict: Query results with metadata

159

"""

160

...

161

162

def get_query_results_paginator(

163

self,

164

query_execution_id: str,

165

max_items: int = None,

166

page_size: int = None

167

):

168

"""

169

Get paginated query results.

170

171

Parameters:

172

- query_execution_id: str - query execution ID

173

- max_items: int - maximum items to return

174

- page_size: int - page size for pagination

175

176

Returns:

177

Iterator of result pages

178

"""

179

...

180

181

def stop_query(self, query_execution_id: str) -> dict:

182

"""

183

Stop/cancel a running query.

184

185

Parameters:

186

- query_execution_id: str - query execution ID to cancel

187

188

Returns:

189

dict: Cancellation response

190

"""

191

...

192

193

def get_output_location(self, query_execution_id: str) -> str:

194

"""

195

Get the S3 output location for query results.

196

197

Parameters:

198

- query_execution_id: str - query execution ID

199

200

Returns:

201

str: S3 URI of query results

202

"""

203

...

204

```

205

206

### Data Catalog Integration

207

208

Query and manage AWS Glue Data Catalog resources through Athena SQL interface.

209

210

```python { .api }

211

class AthenaCreateDataCatalogOperator(AwsBaseOperator):

212

"""

213

Create a data catalog in Amazon Athena.

214

215

Parameters:

216

- catalog_name: str - name of the data catalog

217

- catalog_type: str - type of catalog ('HIVE' or 'GLUE')

218

- description: str - description of the catalog

219

- parameters: dict - catalog configuration parameters

220

- tags: dict - tags to apply to the catalog

221

- aws_conn_id: str - Airflow connection for AWS credentials

222

223

Returns:

224

str: Data catalog ARN

225

"""

226

def __init__(

227

self,

228

catalog_name: str,

229

catalog_type: str,

230

description: str = None,

231

parameters: dict = None,

232

tags: dict = None,

233

**kwargs

234

): ...

235

```

236

237

## Usage Examples

238

239

### Basic Query Execution

240

241

```python

242

from airflow.providers.amazon.aws.operators.athena import AthenaOperator

243

244

# Execute a simple analytics query

245

analytics_query = AthenaOperator(

246

task_id='run_sales_analysis',

247

query="""

248

SELECT

249

region,

250

SUM(sales_amount) as total_sales,

251

COUNT(*) as transaction_count

252

FROM sales_data

253

WHERE date_column >= date('2023-01-01')

254

GROUP BY region

255

ORDER BY total_sales DESC

256

""",

257

database='analytics_db',

258

catalog='AwsDataCatalog',

259

output_location='s3://my-results-bucket/athena-results/',

260

workgroup='analytics-workgroup',

261

sleep_time=10,

262

max_polling_attempts=100,

263

aws_conn_id='aws_default'

264

)

265

```

266

267

### Data Transformation Pipeline

268

269

```python

270

# Transform and prepare data for analytics

271

data_transform = AthenaOperator(

272

task_id='transform_customer_data',

273

query="""

274

CREATE TABLE analytics_db.customer_metrics AS

275

SELECT

276

customer_id,

277

customer_tier,

278

date_trunc('month', order_date) as month,

279

SUM(order_value) as monthly_spend,

280

COUNT(order_id) as monthly_orders,

281

AVG(order_value) as avg_order_value

282

FROM raw_data.orders o

283

JOIN raw_data.customers c ON o.customer_id = c.id

284

WHERE order_date >= date('2023-01-01')

285

GROUP BY customer_id, customer_tier, date_trunc('month', order_date)

286

""",

287

database='analytics_db',

288

output_location='s3://analytics-bucket/transformed-data/',

289

result_configuration={

290

'OutputLocation': 's3://analytics-bucket/query-results/',

291

'EncryptionConfiguration': {

292

'EncryptionOption': 'SSE_S3'

293

}

294

},

295

workgroup='data-processing',

296

log_query=True,

297

aws_conn_id='aws_default'

298

)

299

```

300

301

### Parameterized Query with Template

302

303

```python

304

# Use templated queries with Airflow variables

305

parameterized_query = AthenaOperator(

306

task_id='daily_metrics_report',

307

query="""

308

SELECT

309

'{{ ds }}' as report_date,

310

product_category,

311

SUM(revenue) as daily_revenue,

312

COUNT(DISTINCT customer_id) as unique_customers

313

FROM sales_fact

314

WHERE date_column = '{{ ds }}'

315

GROUP BY product_category

316

""",

317

database='reporting',

318

output_location='s3://reports-bucket/daily-metrics/{{ ds }}/',

319

workgroup='reporting-workgroup',

320

query_execution_context={

321

'Catalog': 'AwsDataCatalog',

322

'Database': 'reporting'

323

},

324

client_request_token='daily-report-{{ ds_nodash }}',

325

aws_conn_id='aws_default'

326

)

327

```

328

329

### Large Dataset Processing with Deferrable Mode

330

331

```python

332

# Process large datasets efficiently using deferrable execution

333

large_data_processing = AthenaOperator(

334

task_id='process_large_dataset',

335

query="""

336

CREATE TABLE processed_data.yearly_aggregates AS

337

SELECT

338

year(transaction_date) as year,

339

month(transaction_date) as month,

340

store_id,

341

product_category,

342

SUM(amount) as total_amount,

343

COUNT(*) as transaction_count,

344

AVG(amount) as avg_amount

345

FROM raw_data.transactions

346

WHERE transaction_date >= date('2020-01-01')

347

GROUP BY year(transaction_date), month(transaction_date), store_id, product_category

348

""",

349

database='processed_data',

350

output_location='s3://processed-data-bucket/yearly-aggregates/',

351

workgroup='heavy-processing',

352

deferrable=True, # Use deferrable mode for long-running queries

353

poll_interval=60, # Check status every minute

354

result_configuration={

355

'OutputLocation': 's3://processed-data-bucket/query-results/',

356

'EncryptionConfiguration': {

357

'EncryptionOption': 'SSE_KMS',

358

'KmsKey': 'arn:aws:kms:us-west-2:123456789012:key/12345678-1234-1234-1234-123456789012'

359

}

360

},

361

aws_conn_id='aws_default'

362

)

363

```

364

365

### Query Result Processing

366

367

```python

368

from airflow.providers.amazon.aws.hooks.athena import AthenaHook

369

370

def process_athena_results(**context):

371

"""Custom function to process Athena query results."""

372

athena_hook = AthenaHook(aws_conn_id='aws_default')

373

374

# Get query execution ID from previous task

375

query_execution_id = context['task_instance'].xcom_pull(task_ids='run_analytics_query')

376

377

# Get query results

378

results = athena_hook.get_query_results(query_execution_id=query_execution_id)

379

380

# Process results

381

for row in results['ResultSet']['Rows'][1:]: # Skip header row

382

data = [col.get('VarCharValue', '') for col in row['Data']]

383

# Process each data row

384

print(f"Processing row: {data}")

385

386

return f"Processed {len(results['ResultSet']['Rows']) - 1} rows"

387

388

# Use with PythonOperator

389

process_results = PythonOperator(

390

task_id='process_results',

391

python_callable=process_athena_results,

392

provide_context=True

393

)

394

395

analytics_query >> process_results

396

```

397

398

## Import Statements

399

400

```python

401

from airflow.providers.amazon.aws.operators.athena import (

402

AthenaOperator,

403

AthenaCreateDataCatalogOperator

404

)

405

from airflow.providers.amazon.aws.sensors.athena import AthenaSensor

406

from airflow.providers.amazon.aws.hooks.athena import AthenaHook

407

```