CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-google

Provider package for Google services integration with Apache Airflow, including Google Ads, Google Cloud (GCP), Google Firebase, Google LevelDB, Google Marketing Platform, and Google Workspace

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

leveldb.mddocs/

Google LevelDB Integration

Google LevelDB integration provides a high-performance, embedded key-value database interface through Apache Airflow. LevelDB is a fast, ordered key-value storage library that provides persistent storage for applications requiring high-performance local data access.

Capabilities

LevelDB Hook

Core hook for connecting to and interacting with LevelDB databases using the Plyvel Python wrapper.

class LevelDBHook(BaseHook):
    """
    Plyvel Wrapper to Interact With LevelDB Database.
    
    Provides connectivity and database operations for LevelDB through
    the Plyvel library wrapper.
    """
    def __init__(self, leveldb_conn_id: str = "leveldb_default"): ...
    
    def get_conn(
        self,
        name: str = "/tmp/testdb/",
        create_if_missing: bool = False,
        **kwargs
    ) -> plyvel.DB:
        """
        Create Plyvel DB connection.
        
        Args:
            name: Path to create database (e.g. '/tmp/testdb/')
            create_if_missing: Whether a new database should be created if needed
            kwargs: Other options for plyvel.DB creation
            
        Returns:
            plyvel.DB: Database connection object
        """
    
    def close_conn(self) -> None:
        """Close database connection."""
    
    def run(
        self,
        command: str,
        key: bytes,
        value: bytes | None = None,
        keys: list[bytes] | None = None,
        values: list[bytes] | None = None,
    ) -> bytes | None:
        """
        Execute operation with LevelDB.
        
        Args:
            command: Command ('put', 'get', 'delete', 'write_batch')
            key: Key for operation (bytes)
            value: Value for put operation (bytes)
            keys: Keys for write_batch operation (list[bytes])
            values: Values for write_batch operation (list[bytes])
            
        Returns:
            bytes | None: Value from get operation or None
        """
    
    def put(self, key: bytes, value: bytes) -> None:
        """
        Put a single value into LevelDB by key.
        
        Args:
            key: Key for put operation
            value: Value for put operation
        """
    
    def get(self, key: bytes) -> bytes:
        """
        Get a single value from LevelDB by key.
        
        Args:
            key: Key for get operation
            
        Returns:
            bytes: Value associated with key
        """
    
    def delete(self, key: bytes) -> None:
        """
        Delete a single value in LevelDB by key.
        
        Args:
            key: Key for delete operation
        """
    
    def write_batch(self, keys: list[bytes], values: list[bytes]) -> None:
        """
        Write batch of values in LevelDB by keys.
        
        Args:
            keys: Keys for batch write operation
            values: Values for batch write operation
        """

LevelDB Operator

Operator for executing commands in LevelDB databases within Airflow DAGs.

class LevelDBOperator(BaseOperator):
    """
    Execute command in LevelDB.
    
    Performs database operations using LevelDB through the Plyvel wrapper,
    supporting put, get, delete, and write_batch operations.
    """
    def __init__(
        self,
        *,
        command: str,
        key: bytes,
        value: bytes | None = None,
        keys: list[bytes] | None = None,
        values: list[bytes] | None = None,
        leveldb_conn_id: str = "leveldb_default",
        name: str = "/tmp/testdb/",
        create_if_missing: bool = True,
        create_db_extra_options: dict[str, Any] | None = None,
        **kwargs,
    ):
        """
        Initialize LevelDB operator.
        
        Args:
            command: LevelDB command ('put', 'get', 'delete', 'write_batch')
            key: Key for operation (bytes)
            value: Value for put operation (bytes, optional)
            keys: Keys for write_batch operation (list[bytes], optional)
            values: Values for write_batch operation (list[bytes], optional)
            leveldb_conn_id: Airflow connection ID for LevelDB
            name: Database path
            create_if_missing: Whether to create database if it doesn't exist
            create_db_extra_options: Extra options for database creation
        """
    
    def execute(self, context: Context) -> str | None:
        """
        Execute LevelDB command.
        
        Returns:
            str | None: Value from get operation (decoded to string) or None
        """

Exception Classes

Custom exception handling for LevelDB operations.

class LevelDBHookException(AirflowException):
    """Exception specific for LevelDB operations."""

Usage Examples

Basic LevelDB Operations

from airflow import DAG
from airflow.providers.google.leveldb.operators.leveldb import LevelDBOperator
from datetime import datetime

dag = DAG(
    'leveldb_example',
    default_args={'start_date': datetime(2023, 1, 1)},
    schedule_interval='@daily',
    catchup=False
)

# Put a value
put_data = LevelDBOperator(
    task_id='put_data',
    command='put',
    key=b'user:123',
    value=b'{"name": "John", "age": 30}',
    name='/tmp/mydb/',
    create_if_missing=True,
    dag=dag
)

# Get a value
get_data = LevelDBOperator(
    task_id='get_data',
    command='get',
    key=b'user:123',
    name='/tmp/mydb/',
    dag=dag
)

# Write batch data
batch_write = LevelDBOperator(
    task_id='batch_write',
    command='write_batch',
    keys=[b'user:124', b'user:125'],
    values=[b'{"name": "Jane", "age": 25}', b'{"name": "Bob", "age": 35}'],
    name='/tmp/mydb/',
    dag=dag
)

put_data >> get_data >> batch_write

Using LevelDB Hook Directly

from airflow.providers.google.leveldb.hooks.leveldb import LevelDBHook

def process_leveldb_data():
    hook = LevelDBHook(leveldb_conn_id='my_leveldb_conn')
    
    # Connect to database
    db = hook.get_conn(name='/path/to/db/', create_if_missing=True)
    
    try:
        # Put data
        hook.put(b'key1', b'value1')
        
        # Get data
        value = hook.get(b'key1')
        print(f"Retrieved: {value.decode()}")
        
        # Batch operations
        hook.write_batch(
            keys=[b'batch1', b'batch2'],
            values=[b'data1', b'data2']
        )
        
    finally:
        # Always close connection
        hook.close_conn()

Types

from typing import Any
import plyvel

# LevelDB specific types
LevelDBConnection = plyvel.DB
DatabasePath = str
DatabaseKey = bytes
DatabaseValue = bytes
BatchKeys = list[bytes]
BatchValues = list[bytes]
CreateOptions = dict[str, Any]

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-google

docs

common-utilities.md

data-transfers.md

firebase.md

gcp-services.md

google-ads.md

google-workspace.md

index.md

leveldb.md

marketing-platform.md

tile.json