or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

common-utilities.mddata-transfers.mdfirebase.mdgcp-services.mdgoogle-ads.mdgoogle-workspace.mdindex.mdleveldb.mdmarketing-platform.md

leveldb.mddocs/

0

# Google LevelDB Integration

1

2

Google LevelDB integration provides a high-performance, embedded key-value database interface through Apache Airflow. LevelDB is a fast, ordered key-value storage library that provides persistent storage for applications requiring high-performance local data access.

3

4

## Capabilities

5

6

### LevelDB Hook

7

8

Core hook for connecting to and interacting with LevelDB databases using the Plyvel Python wrapper.

9

10

```python { .api }

11

class LevelDBHook(BaseHook):

12

"""

13

Plyvel Wrapper to Interact With LevelDB Database.

14

15

Provides connectivity and database operations for LevelDB through

16

the Plyvel library wrapper.

17

"""

18

def __init__(self, leveldb_conn_id: str = "leveldb_default"): ...

19

20

def get_conn(

21

self,

22

name: str = "/tmp/testdb/",

23

create_if_missing: bool = False,

24

**kwargs

25

) -> plyvel.DB:

26

"""

27

Create Plyvel DB connection.

28

29

Args:

30

name: Path to create database (e.g. '/tmp/testdb/')

31

create_if_missing: Whether a new database should be created if needed

32

kwargs: Other options for plyvel.DB creation

33

34

Returns:

35

plyvel.DB: Database connection object

36

"""

37

38

def close_conn(self) -> None:

39

"""Close database connection."""

40

41

def run(

42

self,

43

command: str,

44

key: bytes,

45

value: bytes | None = None,

46

keys: list[bytes] | None = None,

47

values: list[bytes] | None = None,

48

) -> bytes | None:

49

"""

50

Execute operation with LevelDB.

51

52

Args:

53

command: Command ('put', 'get', 'delete', 'write_batch')

54

key: Key for operation (bytes)

55

value: Value for put operation (bytes)

56

keys: Keys for write_batch operation (list[bytes])

57

values: Values for write_batch operation (list[bytes])

58

59

Returns:

60

bytes | None: Value from get operation or None

61

"""

62

63

def put(self, key: bytes, value: bytes) -> None:

64

"""

65

Put a single value into LevelDB by key.

66

67

Args:

68

key: Key for put operation

69

value: Value for put operation

70

"""

71

72

def get(self, key: bytes) -> bytes:

73

"""

74

Get a single value from LevelDB by key.

75

76

Args:

77

key: Key for get operation

78

79

Returns:

80

bytes: Value associated with key

81

"""

82

83

def delete(self, key: bytes) -> None:

84

"""

85

Delete a single value in LevelDB by key.

86

87

Args:

88

key: Key for delete operation

89

"""

90

91

def write_batch(self, keys: list[bytes], values: list[bytes]) -> None:

92

"""

93

Write batch of values in LevelDB by keys.

94

95

Args:

96

keys: Keys for batch write operation

97

values: Values for batch write operation

98

"""

99

```

100

101

### LevelDB Operator

102

103

Operator for executing commands in LevelDB databases within Airflow DAGs.

104

105

```python { .api }

106

class LevelDBOperator(BaseOperator):

107

"""

108

Execute command in LevelDB.

109

110

Performs database operations using LevelDB through the Plyvel wrapper,

111

supporting put, get, delete, and write_batch operations.

112

"""

113

def __init__(

114

self,

115

*,

116

command: str,

117

key: bytes,

118

value: bytes | None = None,

119

keys: list[bytes] | None = None,

120

values: list[bytes] | None = None,

121

leveldb_conn_id: str = "leveldb_default",

122

name: str = "/tmp/testdb/",

123

create_if_missing: bool = True,

124

create_db_extra_options: dict[str, Any] | None = None,

125

**kwargs,

126

):

127

"""

128

Initialize LevelDB operator.

129

130

Args:

131

command: LevelDB command ('put', 'get', 'delete', 'write_batch')

132

key: Key for operation (bytes)

133

value: Value for put operation (bytes, optional)

134

keys: Keys for write_batch operation (list[bytes], optional)

135

values: Values for write_batch operation (list[bytes], optional)

136

leveldb_conn_id: Airflow connection ID for LevelDB

137

name: Database path

138

create_if_missing: Whether to create database if it doesn't exist

139

create_db_extra_options: Extra options for database creation

140

"""

141

142

def execute(self, context: Context) -> str | None:

143

"""

144

Execute LevelDB command.

145

146

Returns:

147

str | None: Value from get operation (decoded to string) or None

148

"""

149

```

150

151

### Exception Classes

152

153

Custom exception handling for LevelDB operations.

154

155

```python { .api }

156

class LevelDBHookException(AirflowException):

157

"""Exception specific for LevelDB operations."""

158

```

159

160

## Usage Examples

161

162

### Basic LevelDB Operations

163

164

```python

165

from airflow import DAG

166

from airflow.providers.google.leveldb.operators.leveldb import LevelDBOperator

167

from datetime import datetime

168

169

dag = DAG(

170

'leveldb_example',

171

default_args={'start_date': datetime(2023, 1, 1)},

172

schedule_interval='@daily',

173

catchup=False

174

)

175

176

# Put a value

177

put_data = LevelDBOperator(

178

task_id='put_data',

179

command='put',

180

key=b'user:123',

181

value=b'{"name": "John", "age": 30}',

182

name='/tmp/mydb/',

183

create_if_missing=True,

184

dag=dag

185

)

186

187

# Get a value

188

get_data = LevelDBOperator(

189

task_id='get_data',

190

command='get',

191

key=b'user:123',

192

name='/tmp/mydb/',

193

dag=dag

194

)

195

196

# Write batch data

197

batch_write = LevelDBOperator(

198

task_id='batch_write',

199

command='write_batch',

200

keys=[b'user:124', b'user:125'],

201

values=[b'{"name": "Jane", "age": 25}', b'{"name": "Bob", "age": 35}'],

202

name='/tmp/mydb/',

203

dag=dag

204

)

205

206

put_data >> get_data >> batch_write

207

```

208

209

### Using LevelDB Hook Directly

210

211

```python

212

from airflow.providers.google.leveldb.hooks.leveldb import LevelDBHook

213

214

def process_leveldb_data():

215

hook = LevelDBHook(leveldb_conn_id='my_leveldb_conn')

216

217

# Connect to database

218

db = hook.get_conn(name='/path/to/db/', create_if_missing=True)

219

220

try:

221

# Put data

222

hook.put(b'key1', b'value1')

223

224

# Get data

225

value = hook.get(b'key1')

226

print(f"Retrieved: {value.decode()}")

227

228

# Batch operations

229

hook.write_batch(

230

keys=[b'batch1', b'batch2'],

231

values=[b'data1', b'data2']

232

)

233

234

finally:

235

# Always close connection

236

hook.close_conn()

237

```

238

239

## Types

240

241

```python { .api }

242

from typing import Any

243

import plyvel

244

245

# LevelDB specific types

246

LevelDBConnection = plyvel.DB

247

DatabasePath = str

248

DatabaseKey = bytes

249

DatabaseValue = bytes

250

BatchKeys = list[bytes]

251

BatchValues = list[bytes]

252

CreateOptions = dict[str, Any]

253

```