or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

athena-queries.mdcloudwatch-logging.mdecr-integration.mdecs-orchestration.mdemr-processing.mdindex.mdparameter-store.mdpipes-orchestration.mdrds-operations.mdredshift-integration.mds3-storage.mdsecrets-management.md

athena-queries.mddocs/

0

# Athena Query Service

1

2

Execute serverless SQL queries against data in S3 using Amazon Athena with result management, query optimization, and comprehensive error handling.

3

4

## Capabilities

5

6

### Athena Resource

7

8

```python { .api }

9

class AthenaResource(ResourceWithBoto3Configuration):

10

"""

11

Resource for executing queries with Amazon Athena.

12

"""

13

work_group: str = "primary"

14

s3_staging_dir: str

15

16

def execute_query(

17

self,

18

query: str,

19

result_configuration: Optional[Dict] = None

20

) -> str:

21

"""

22

Execute SQL query using Athena.

23

24

Parameters:

25

query: SQL query to execute

26

result_configuration: Query result configuration

27

28

Returns:

29

str: Query execution ID

30

"""

31

32

def get_query_results(self, execution_id: str) -> Dict:

33

"""

34

Retrieve results for a completed query.

35

36

Parameters:

37

execution_id: Query execution ID

38

39

Returns:

40

Dict: Query results and metadata

41

"""

42

43

def wait_for_query_completion(

44

self,

45

execution_id: str,

46

timeout_seconds: int = 300

47

) -> bool:

48

"""

49

Wait for query to complete execution.

50

51

Parameters:

52

execution_id: Query execution ID

53

timeout_seconds: Maximum wait time

54

55

Returns:

56

bool: True if completed successfully

57

"""

58

59

def athena_resource(**kwargs) -> AthenaResource:

60

"""

61

Factory function for Athena resource.

62

"""

63

```

64

65

### Athena Client

66

67

```python { .api }

68

class AthenaClient:

69

"""

70

Direct Athena client wrapper.

71

"""

72

73

def start_query_execution(self, **kwargs): ...

74

def get_query_execution(self, execution_id: str): ...

75

def get_query_results(self, execution_id: str): ...

76

77

class AthenaClientResource(AthenaClient, ConfigurableResource): ...

78

```

79

80

### Error Handling

81

82

```python { .api }

83

class AthenaError(Exception):

84

"""

85

Exception for Athena-related errors.

86

"""

87

88

class AthenaTimeout(Exception):

89

"""

90

Exception for Athena query timeouts.

91

"""

92

```

93

94

### Testing Utilities

95

96

```python { .api }

97

class FakeAthenaClient: ...

98

class FakeAthenaResource: ...

99

def fake_athena_resource(**kwargs): ...

100

```

101

102

## Usage Examples

103

104

```python

105

from dagster import op, job, Definitions

106

from dagster_aws.athena import AthenaResource, AthenaTimeout

107

108

@op(required_resource_keys={"athena"})

109

def analyze_web_logs(context):

110

athena = context.resources.athena

111

112

query = """

113

SELECT

114

date(timestamp) as date,

115

count(*) as page_views,

116

count(distinct user_id) as unique_visitors

117

FROM web_logs

118

WHERE timestamp >= current_date - interval '7' day

119

GROUP BY date(timestamp)

120

ORDER BY date

121

"""

122

123

try:

124

execution_id = athena.execute_query(query)

125

126

if athena.wait_for_query_completion(execution_id, timeout_seconds=120):

127

results = athena.get_query_results(execution_id)

128

return results['ResultSet']['Rows']

129

else:

130

raise AthenaTimeout("Query execution timed out")

131

132

except Exception as e:

133

context.log.error(f"Athena query failed: {e}")

134

raise

135

136

@job(

137

resource_defs={

138

"athena": AthenaResource(

139

s3_staging_dir="s3://my-athena-results/",

140

work_group="analytics"

141

)

142

}

143

)

144

def web_analytics_job():

145

analyze_web_logs()

146

147

defs = Definitions(jobs=[web_analytics_job])

148

```