Provider package for Google services integration with Apache Airflow, including Google Ads, Google Cloud (GCP), Google Firebase, Google LevelDB, Google Marketing Platform, and Google Workspace
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Google LevelDB integration provides a high-performance, embedded key-value database interface through Apache Airflow. LevelDB is a fast, ordered key-value storage library that provides persistent storage for applications requiring high-performance local data access.
Core hook for connecting to and interacting with LevelDB databases using the Plyvel Python wrapper.
class LevelDBHook(BaseHook):
"""
Plyvel Wrapper to Interact With LevelDB Database.
Provides connectivity and database operations for LevelDB through
the Plyvel library wrapper.
"""
def __init__(self, leveldb_conn_id: str = "leveldb_default"): ...
def get_conn(
self,
name: str = "/tmp/testdb/",
create_if_missing: bool = False,
**kwargs
) -> plyvel.DB:
"""
Create Plyvel DB connection.
Args:
name: Path to create database (e.g. '/tmp/testdb/')
create_if_missing: Whether a new database should be created if needed
kwargs: Other options for plyvel.DB creation
Returns:
plyvel.DB: Database connection object
"""
def close_conn(self) -> None:
"""Close database connection."""
def run(
self,
command: str,
key: bytes,
value: bytes | None = None,
keys: list[bytes] | None = None,
values: list[bytes] | None = None,
) -> bytes | None:
"""
Execute operation with LevelDB.
Args:
command: Command ('put', 'get', 'delete', 'write_batch')
key: Key for operation (bytes)
value: Value for put operation (bytes)
keys: Keys for write_batch operation (list[bytes])
values: Values for write_batch operation (list[bytes])
Returns:
bytes | None: Value from get operation or None
"""
def put(self, key: bytes, value: bytes) -> None:
"""
Put a single value into LevelDB by key.
Args:
key: Key for put operation
value: Value for put operation
"""
def get(self, key: bytes) -> bytes:
"""
Get a single value from LevelDB by key.
Args:
key: Key for get operation
Returns:
bytes: Value associated with key
"""
def delete(self, key: bytes) -> None:
"""
Delete a single value in LevelDB by key.
Args:
key: Key for delete operation
"""
def write_batch(self, keys: list[bytes], values: list[bytes]) -> None:
"""
Write batch of values in LevelDB by keys.
Args:
keys: Keys for batch write operation
values: Values for batch write operation
"""Operator for executing commands in LevelDB databases within Airflow DAGs.
class LevelDBOperator(BaseOperator):
"""
Execute command in LevelDB.
Performs database operations using LevelDB through the Plyvel wrapper,
supporting put, get, delete, and write_batch operations.
"""
def __init__(
self,
*,
command: str,
key: bytes,
value: bytes | None = None,
keys: list[bytes] | None = None,
values: list[bytes] | None = None,
leveldb_conn_id: str = "leveldb_default",
name: str = "/tmp/testdb/",
create_if_missing: bool = True,
create_db_extra_options: dict[str, Any] | None = None,
**kwargs,
):
"""
Initialize LevelDB operator.
Args:
command: LevelDB command ('put', 'get', 'delete', 'write_batch')
key: Key for operation (bytes)
value: Value for put operation (bytes, optional)
keys: Keys for write_batch operation (list[bytes], optional)
values: Values for write_batch operation (list[bytes], optional)
leveldb_conn_id: Airflow connection ID for LevelDB
name: Database path
create_if_missing: Whether to create database if it doesn't exist
create_db_extra_options: Extra options for database creation
"""
def execute(self, context: Context) -> str | None:
"""
Execute LevelDB command.
Returns:
str | None: Value from get operation (decoded to string) or None
"""Custom exception handling for LevelDB operations.
class LevelDBHookException(AirflowException):
"""Exception specific for LevelDB operations."""from airflow import DAG
from airflow.providers.google.leveldb.operators.leveldb import LevelDBOperator
from datetime import datetime
dag = DAG(
'leveldb_example',
default_args={'start_date': datetime(2023, 1, 1)},
schedule_interval='@daily',
catchup=False
)
# Put a value
put_data = LevelDBOperator(
task_id='put_data',
command='put',
key=b'user:123',
value=b'{"name": "John", "age": 30}',
name='/tmp/mydb/',
create_if_missing=True,
dag=dag
)
# Get a value
get_data = LevelDBOperator(
task_id='get_data',
command='get',
key=b'user:123',
name='/tmp/mydb/',
dag=dag
)
# Write batch data
batch_write = LevelDBOperator(
task_id='batch_write',
command='write_batch',
keys=[b'user:124', b'user:125'],
values=[b'{"name": "Jane", "age": 25}', b'{"name": "Bob", "age": 35}'],
name='/tmp/mydb/',
dag=dag
)
put_data >> get_data >> batch_writefrom airflow.providers.google.leveldb.hooks.leveldb import LevelDBHook
def process_leveldb_data():
hook = LevelDBHook(leveldb_conn_id='my_leveldb_conn')
# Connect to database
db = hook.get_conn(name='/path/to/db/', create_if_missing=True)
try:
# Put data
hook.put(b'key1', b'value1')
# Get data
value = hook.get(b'key1')
print(f"Retrieved: {value.decode()}")
# Batch operations
hook.write_batch(
keys=[b'batch1', b'batch2'],
values=[b'data1', b'data2']
)
finally:
# Always close connection
hook.close_conn()from typing import Any
import plyvel
# LevelDB specific types
LevelDBConnection = plyvel.DB
DatabasePath = str
DatabaseKey = bytes
DatabaseValue = bytes
BatchKeys = list[bytes]
BatchValues = list[bytes]
CreateOptions = dict[str, Any]Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-google