0
# Google LevelDB Integration
1
2
Google LevelDB integration provides a high-performance, embedded key-value database interface through Apache Airflow. LevelDB is a fast, ordered key-value storage library that provides persistent storage for applications requiring high-performance local data access.
3
4
## Capabilities
5
6
### LevelDB Hook
7
8
Core hook for connecting to and interacting with LevelDB databases using the Plyvel Python wrapper.
9
10
```python { .api }
11
class LevelDBHook(BaseHook):
12
"""
13
Plyvel Wrapper to Interact With LevelDB Database.
14
15
Provides connectivity and database operations for LevelDB through
16
the Plyvel library wrapper.
17
"""
18
def __init__(self, leveldb_conn_id: str = "leveldb_default"): ...
19
20
def get_conn(
21
self,
22
name: str = "/tmp/testdb/",
23
create_if_missing: bool = False,
24
**kwargs
25
) -> plyvel.DB:
26
"""
27
Create Plyvel DB connection.
28
29
Args:
30
name: Path to create database (e.g. '/tmp/testdb/')
31
create_if_missing: Whether a new database should be created if needed
32
kwargs: Other options for plyvel.DB creation
33
34
Returns:
35
plyvel.DB: Database connection object
36
"""
37
38
def close_conn(self) -> None:
39
"""Close database connection."""
40
41
def run(
42
self,
43
command: str,
44
key: bytes,
45
value: bytes | None = None,
46
keys: list[bytes] | None = None,
47
values: list[bytes] | None = None,
48
) -> bytes | None:
49
"""
50
Execute operation with LevelDB.
51
52
Args:
53
command: Command ('put', 'get', 'delete', 'write_batch')
54
key: Key for operation (bytes)
55
value: Value for put operation (bytes)
56
keys: Keys for write_batch operation (list[bytes])
57
values: Values for write_batch operation (list[bytes])
58
59
Returns:
60
bytes | None: Value from get operation or None
61
"""
62
63
def put(self, key: bytes, value: bytes) -> None:
64
"""
65
Put a single value into LevelDB by key.
66
67
Args:
68
key: Key for put operation
69
value: Value for put operation
70
"""
71
72
def get(self, key: bytes) -> bytes:
73
"""
74
Get a single value from LevelDB by key.
75
76
Args:
77
key: Key for get operation
78
79
Returns:
80
bytes: Value associated with key
81
"""
82
83
def delete(self, key: bytes) -> None:
84
"""
85
Delete a single value in LevelDB by key.
86
87
Args:
88
key: Key for delete operation
89
"""
90
91
def write_batch(self, keys: list[bytes], values: list[bytes]) -> None:
92
"""
93
Write batch of values in LevelDB by keys.
94
95
Args:
96
keys: Keys for batch write operation
97
values: Values for batch write operation
98
"""
99
```
100
101
### LevelDB Operator
102
103
Operator for executing commands in LevelDB databases within Airflow DAGs.
104
105
```python { .api }
106
class LevelDBOperator(BaseOperator):
107
"""
108
Execute command in LevelDB.
109
110
Performs database operations using LevelDB through the Plyvel wrapper,
111
supporting put, get, delete, and write_batch operations.
112
"""
113
def __init__(
114
self,
115
*,
116
command: str,
117
key: bytes,
118
value: bytes | None = None,
119
keys: list[bytes] | None = None,
120
values: list[bytes] | None = None,
121
leveldb_conn_id: str = "leveldb_default",
122
name: str = "/tmp/testdb/",
123
create_if_missing: bool = True,
124
create_db_extra_options: dict[str, Any] | None = None,
125
**kwargs,
126
):
127
"""
128
Initialize LevelDB operator.
129
130
Args:
131
command: LevelDB command ('put', 'get', 'delete', 'write_batch')
132
key: Key for operation (bytes)
133
value: Value for put operation (bytes, optional)
134
keys: Keys for write_batch operation (list[bytes], optional)
135
values: Values for write_batch operation (list[bytes], optional)
136
leveldb_conn_id: Airflow connection ID for LevelDB
137
name: Database path
138
create_if_missing: Whether to create database if it doesn't exist
139
create_db_extra_options: Extra options for database creation
140
"""
141
142
def execute(self, context: Context) -> str | None:
143
"""
144
Execute LevelDB command.
145
146
Returns:
147
str | None: Value from get operation (decoded to string) or None
148
"""
149
```
150
151
### Exception Classes
152
153
Custom exception handling for LevelDB operations.
154
155
```python { .api }
156
class LevelDBHookException(AirflowException):
157
"""Exception specific for LevelDB operations."""
158
```
159
160
## Usage Examples
161
162
### Basic LevelDB Operations
163
164
```python
165
from airflow import DAG
166
from airflow.providers.google.leveldb.operators.leveldb import LevelDBOperator
167
from datetime import datetime
168
169
dag = DAG(
170
'leveldb_example',
171
default_args={'start_date': datetime(2023, 1, 1)},
172
schedule_interval='@daily',
173
catchup=False
174
)
175
176
# Put a value
177
put_data = LevelDBOperator(
178
task_id='put_data',
179
command='put',
180
key=b'user:123',
181
value=b'{"name": "John", "age": 30}',
182
name='/tmp/mydb/',
183
create_if_missing=True,
184
dag=dag
185
)
186
187
# Get a value
188
get_data = LevelDBOperator(
189
task_id='get_data',
190
command='get',
191
key=b'user:123',
192
name='/tmp/mydb/',
193
dag=dag
194
)
195
196
# Write batch data
197
batch_write = LevelDBOperator(
198
task_id='batch_write',
199
command='write_batch',
200
keys=[b'user:124', b'user:125'],
201
values=[b'{"name": "Jane", "age": 25}', b'{"name": "Bob", "age": 35}'],
202
name='/tmp/mydb/',
203
dag=dag
204
)
205
206
put_data >> get_data >> batch_write
207
```
208
209
### Using LevelDB Hook Directly
210
211
```python
212
from airflow.providers.google.leveldb.hooks.leveldb import LevelDBHook
213
214
def process_leveldb_data():
215
hook = LevelDBHook(leveldb_conn_id='my_leveldb_conn')
216
217
# Connect to database
218
db = hook.get_conn(name='/path/to/db/', create_if_missing=True)
219
220
try:
221
# Put data
222
hook.put(b'key1', b'value1')
223
224
# Get data
225
value = hook.get(b'key1')
226
print(f"Retrieved: {value.decode()}")
227
228
# Batch operations
229
hook.write_batch(
230
keys=[b'batch1', b'batch2'],
231
values=[b'data1', b'data2']
232
)
233
234
finally:
235
# Always close connection
236
hook.close_conn()
237
```
238
239
## Types
240
241
```python { .api }
242
from typing import Any
243
import plyvel
244
245
# LevelDB specific types
246
LevelDBConnection = plyvel.DB
247
DatabasePath = str
248
DatabaseKey = bytes
249
DatabaseValue = bytes
250
BatchKeys = list[bytes]
251
BatchValues = list[bytes]
252
CreateOptions = dict[str, Any]
253
```