or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-management.mdexception-handling.mdimmutable-state.mdindex.mdstate-querying.md

state-querying.mddocs/

0

# State Querying

1

2

Asynchronous state querying capabilities for accessing different types of keyed state from running Flink jobs. All state queries return immutable, read-only state objects.

3

4

## Capabilities

5

6

### Basic State Query

7

8

Query keyed state using TypeInformation for type safety.

9

10

```java { .api }

11

public <K, S extends State, V> CompletableFuture<S> getKvState(

12

JobID jobId,

13

String queryableStateName,

14

K key,

15

TypeInformation<K> keyTypeInfo,

16

StateDescriptor<S, V> stateDescriptor

17

);

18

```

19

20

**Type Parameters:**

21

- `K` - Type of the key

22

- `S` - Type of the state (extends State)

23

- `V` - Type of the value stored in the state

24

25

**Parameters:**

26

- `jobId` - JobID of the job containing the queryable state

27

- `queryableStateName` - Name under which the state is queryable (set via StateDescriptor.setQueryable())

28

- `key` - The key to query for

29

- `keyTypeInfo` - TypeInformation for the key type

30

- `stateDescriptor` - StateDescriptor matching the state configuration in the Flink job

31

32

**Returns:**

33

- `CompletableFuture<S>` - Future containing an immutable state object

34

35

**Usage Example:**

36

```java

37

import org.apache.flink.api.common.state.ValueStateDescriptor;

38

import org.apache.flink.api.common.typeinfo.TypeInformation;

39

40

QueryableStateClient client = new QueryableStateClient("localhost", 9069);

41

42

JobID jobId = JobID.fromHexString("a1b2c3d4e5f6789012345678901234567890abcd");

43

String stateName = "userProfiles";

44

String userId = "user123";

45

46

ValueStateDescriptor<UserProfile> descriptor =

47

new ValueStateDescriptor<>("userProfiles", UserProfile.class);

48

49

CompletableFuture<ValueState<UserProfile>> future = client.getKvState(

50

jobId,

51

stateName,

52

userId,

53

TypeInformation.of(String.class),

54

descriptor

55

);

56

57

future.thenAccept(state -> {

58

UserProfile profile = state.value();

59

System.out.println("User profile: " + profile);

60

});

61

```

62

63

### Type Hint-Based Query

64

65

Query keyed state using TypeHint for generic type inference.

66

67

```java { .api }

68

public <K, S extends State, V> CompletableFuture<S> getKvState(

69

JobID jobId,

70

String queryableStateName,

71

K key,

72

TypeHint<K> keyTypeHint,

73

StateDescriptor<S, V> stateDescriptor

74

);

75

```

76

77

**Parameters:**

78

- Same as basic query, but uses `TypeHint<K>` instead of `TypeInformation<K>`

79

80

**Usage Example:**

81

```java

82

import org.apache.flink.api.common.typeinfo.TypeHint;

83

84

CompletableFuture<ValueState<String>> future = client.getKvState(

85

jobId,

86

stateName,

87

"myKey",

88

new TypeHint<String>() {}, // Type hint for String key

89

new ValueStateDescriptor<>("state", String.class)

90

);

91

```

92

93

### Supported State Types

94

95

The client supports querying all Flink state types:

96

97

#### ValueState

98

```java

99

ValueStateDescriptor<String> descriptor =

100

new ValueStateDescriptor<>("myValue", String.class);

101

CompletableFuture<ValueState<String>> future = client.getKvState(/*...*/);

102

```

103

104

#### ListState

105

```java

106

ListStateDescriptor<Integer> descriptor =

107

new ListStateDescriptor<>("myList", Integer.class);

108

CompletableFuture<ListState<Integer>> future = client.getKvState(/*...*/);

109

```

110

111

#### MapState

112

```java

113

MapStateDescriptor<String, Long> descriptor =

114

new MapStateDescriptor<>("myMap", String.class, Long.class);

115

CompletableFuture<MapState<String, Long>> future = client.getKvState(/*...*/);

116

```

117

118

#### ReducingState

119

```java

120

ReducingStateDescriptor<Integer> descriptor =

121

new ReducingStateDescriptor<>("myReducing", Integer::sum, Integer.class);

122

CompletableFuture<ReducingState<Integer>> future = client.getKvState(/*...*/);

123

```

124

125

#### AggregatingState

126

```java

127

AggregatingStateDescriptor<Integer, Long, Double> descriptor =

128

new AggregatingStateDescriptor<>("myAggregating", aggregateFunction, Long.class);

129

CompletableFuture<AggregatingState<Integer, Double>> future = client.getKvState(/*...*/);

130

```

131

132

## Query Process

133

134

The state querying process involves several steps:

135

136

1. **Key Serialization**: The key and namespace are serialized using the provided type serializers

137

2. **Location Resolution**: The client proxy contacts the JobManager to resolve the location of the key group

138

3. **State Request**: A KvStateRequest is sent to the appropriate TaskManager

139

4. **State Retrieval**: The TaskManager retrieves the serialized state data

140

5. **Response Processing**: The serialized state is deserialized into an immutable state wrapper

141

6. **Future Completion**: The CompletableFuture is completed with the immutable state object

142

143

## Error Handling

144

145

State queries can fail for various reasons. Common exceptions include:

146

147

- `UnknownKeyOrNamespaceException` - No state exists for the given key/namespace

148

- `UnknownKvStateIdException` - The state ID is not recognized

149

- `UnknownLocationException` - State location cannot be resolved

150

- `IOException` - Network or serialization errors

151

- `FlinkRuntimeException` - General runtime errors

152

153

**Error Handling Example:**

154

```java

155

future.whenComplete((state, throwable) -> {

156

if (throwable != null) {

157

if (throwable instanceof UnknownKeyOrNamespaceException) {

158

System.out.println("No state found for key: " + key);

159

} else if (throwable instanceof UnknownLocationException) {

160

System.err.println("Could not resolve state location");

161

} else {

162

System.err.println("Query failed: " + throwable.getMessage());

163

}

164

} else {

165

// Process successful result

166

processState(state);

167

}

168

});

169

```

170

171

## Performance Considerations

172

173

- **Location Caching**: Resolved state locations are cached to improve performance

174

- **Asynchronous Operations**: All queries are non-blocking and return CompletableFuture

175

- **Serialization Overhead**: Consider the cost of key and state serialization/deserialization

176

- **Network Communication**: Each query involves network communication with the Flink cluster

177

- **Connection Reuse**: The client maintains persistent connections to minimize connection overhead