CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-fakeredis

Python implementation of redis API, can be used for testing purposes

Pending
Overview
Eval results
Files

lua-scripting.mddocs/

Lua Scripting

Redis Lua scripting support with script caching and execution. Lua scripts provide atomic operations, server-side computation, and complex logic execution with full access to Redis data structures and commands, enabling powerful custom operations that maintain data consistency.

Capabilities

Script Execution

Core functions for executing Lua scripts with Redis data access.

def eval(
    self,
    script: str,
    numkeys: int,
    *keys_and_args: KeyT
) -> Any: ...

def evalsha(
    self,
    sha: str,
    numkeys: int,
    *keys_and_args: KeyT
) -> Any: ...

Script Management

Functions for managing script cache and script lifecycle.

def script_load(self, script: str) -> str: ...

def script_exists(self, *args: str) -> List[bool]: ...

def script_flush(self, sync_type: Optional[str] = None) -> bool: ...

def script_kill(self) -> bool: ...

Script Debugging

Redis 3.2+ debugging capabilities for Lua script development.

def script_debug(self, *args) -> str: ...

Usage Examples

Basic Lua Scripts

import fakeredis

client = fakeredis.FakeRedis()

# Simple script that increments a counter and returns the new value
increment_script = """
local key = KEYS[1]
local increment = tonumber(ARGV[1]) or 1

local current = redis.call('GET', key)
if current == false then
    current = 0
else
    current = tonumber(current)
end

local new_value = current + increment
redis.call('SET', key, new_value)
return new_value
"""

# Execute script directly
result1 = client.eval(increment_script, 1, 'counter', '5')
print(f"First increment: {result1}")  # 5

result2 = client.eval(increment_script, 1, 'counter', '3')
print(f"Second increment: {result2}")  # 8

# Verify the value
current_value = client.get('counter')
print(f"Current counter value: {current_value.decode()}")  # 8

Script Caching with SHA

import fakeredis

client = fakeredis.FakeRedis()

# Load script and get SHA hash
multi_key_script = """
-- Get multiple keys and return them as a table
local keys = KEYS
local result = {}

for i, key in ipairs(keys) do
    local value = redis.call('GET', key)
    if value ~= false then
        result[key] = value
    end
end

return result
"""

# Load script into Redis cache
script_sha = client.script_load(multi_key_script)
print(f"Script loaded with SHA: {script_sha}")

# Set some test data
client.mset({
    'user:1:name': 'Alice',
    'user:1:email': 'alice@example.com', 
    'user:1:age': '30',
    'nonexistent:key': 'wont_be_set'
})

# Execute using SHA (more efficient for repeated executions)
result = client.evalsha(
    script_sha, 
    3,  # number of keys
    'user:1:name', 'user:1:email', 'user:1:age'  # keys
)

print("Script result:")
for key, value in result.items():
    print(f"  {key}: {value}")

# Check if script exists in cache
exists = client.script_exists(script_sha)
print(f"Script exists in cache: {exists}")  # [True]

Atomic Operations with Lua

import fakeredis

client = fakeredis.FakeRedis()

# Atomic transfer between accounts
transfer_script = """
local from_account = KEYS[1]
local to_account = KEYS[2]
local amount = tonumber(ARGV[1])

-- Get current balances
local from_balance = tonumber(redis.call('GET', from_account) or 0)
local to_balance = tonumber(redis.call('GET', to_account) or 0)

-- Check if sufficient funds
if from_balance < amount then
    return {err = 'Insufficient funds'}
end

-- Perform transfer
redis.call('SET', from_account, from_balance - amount)
redis.call('SET', to_account, to_balance + amount)

-- Return new balances
return {
    from_balance = from_balance - amount,
    to_balance = to_balance + amount,
    transferred = amount
}
"""

# Setup initial balances
client.set('account:alice', '1000')
client.set('account:bob', '500')

print("Initial balances:")
print(f"Alice: ${client.get('account:alice').decode()}")
print(f"Bob: ${client.get('account:bob').decode()}")

# Execute transfer
result = client.eval(
    transfer_script,
    2,  # 2 keys
    'account:alice', 'account:bob',  # from, to
    '150'  # amount
)

print(f"\nTransfer result: {result}")

print("\nFinal balances:")
print(f"Alice: ${client.get('account:alice').decode()}")
print(f"Bob: ${client.get('account:bob').decode()}")

# Try transfer with insufficient funds
print("\n--- Testing insufficient funds ---")
result2 = client.eval(
    transfer_script,
    2,
    'account:alice', 'account:bob',
    '2000'  # More than Alice has
)

print(f"Insufficient funds result: {result2}")

Complex Data Structure Operations

import fakeredis
import json

client = fakeredis.FakeRedis()

# Script for managing a shopping cart with inventory checking
shopping_cart_script = """
local user_id = ARGV[1]
local action = ARGV[2]
local product_id = ARGV[3]
local quantity = tonumber(ARGV[4]) or 0

local cart_key = 'cart:' .. user_id
local product_key = 'product:' .. product_id
local inventory_key = 'inventory:' .. product_id

if action == 'add' then
    -- Check inventory
    local available = tonumber(redis.call('GET', inventory_key) or 0)
    local current_in_cart = tonumber(redis.call('HGET', cart_key, product_id) or 0)
    
    if available < (current_in_cart + quantity) then
        return {error = 'Insufficient inventory', available = available, requested = current_in_cart + quantity}
    end
    
    -- Add to cart
    redis.call('HINCRBY', cart_key, product_id, quantity)
    local new_quantity = tonumber(redis.call('HGET', cart_key, product_id))
    
    return {success = true, product = product_id, quantity = new_quantity}
    
elseif action == 'remove' then
    -- Remove from cart
    local current = tonumber(redis.call('HGET', cart_key, product_id) or 0)
    local to_remove = math.min(quantity, current)
    
    if to_remove > 0 then
        redis.call('HINCRBY', cart_key, product_id, -to_remove)
        local remaining = tonumber(redis.call('HGET', cart_key, product_id))
        
        -- Remove if quantity is 0
        if remaining <= 0 then
            redis.call('HDEL', cart_key, product_id)
            remaining = 0
        end
        
        return {success = true, product = product_id, quantity = remaining, removed = to_remove}
    else
        return {error = 'Product not in cart'}
    end
    
elseif action == 'get' then
    -- Get cart contents
    local cart = redis.call('HGETALL', cart_key)
    local result = {}
    
    for i = 1, #cart, 2 do
        local prod_id = cart[i]
        local qty = tonumber(cart[i + 1])
        result[prod_id] = qty
    end
    
    return {cart = result}
    
elseif action == 'checkout' then
    -- Atomic checkout process
    local cart = redis.call('HGETALL', cart_key)
    local total_cost = 0
    local items = {}
    
    -- Check all items and calculate cost
    for i = 1, #cart, 2 do
        local prod_id = cart[i]
        local qty = tonumber(cart[i + 1])
        
        -- Check inventory
        local available = tonumber(redis.call('GET', inventory_key:gsub(product_id, prod_id)) or 0)
        if available < qty then
            return {error = 'Insufficient inventory for ' .. prod_id, available = available, needed = qty}
        end
        
        -- Get product price
        local price = tonumber(redis.call('HGET', 'product:' .. prod_id, 'price') or 0)
        local item_cost = price * qty
        total_cost = total_cost + item_cost
        
        table.insert(items, {product = prod_id, quantity = qty, price = price, total = item_cost})
    end
    
    -- Deduct from inventory and clear cart
    for _, item in ipairs(items) do
        redis.call('DECRBY', 'inventory:' .. item.product, item.quantity)
    end
    
    redis.call('DEL', cart_key)
    
    -- Create order
    local order_id = 'order:' .. redis.call('INCR', 'order_counter')
    redis.call('HSET', order_id, 'user_id', user_id, 'total_cost', total_cost, 'timestamp', redis.call('TIME')[1])
    
    return {success = true, order_id = order_id, total_cost = total_cost, items = items}
end

return {error = 'Invalid action'}
"""

# Setup products and inventory
products = {
    'laptop': {'price': 999.99, 'inventory': 5},
    'mouse': {'price': 29.99, 'inventory': 50},
    'keyboard': {'price': 89.99, 'inventory': 20}
}

for product_id, data in products.items():
    client.hset(f'product:{product_id}', 'price', str(data['price']))
    client.set(f'inventory:{product_id}', str(data['inventory']))

# Test shopping cart operations
user_id = 'user123'

# Add items to cart
print("=== Adding Items to Cart ===")
result1 = client.eval(shopping_cart_script, 0, user_id, 'add', 'laptop', '2')
print(f"Add laptop: {result1}")

result2 = client.eval(shopping_cart_script, 0, user_id, 'add', 'mouse', '3')
print(f"Add mouse: {result2}")

# Try to add more than available inventory
result3 = client.eval(shopping_cart_script, 0, user_id, 'add', 'laptop', '10')
print(f"Add too many laptops: {result3}")

# Get cart contents
cart_contents = client.eval(shopping_cart_script, 0, user_id, 'get')
print(f"Cart contents: {cart_contents}")

# Remove some items
result4 = client.eval(shopping_cart_script, 0, user_id, 'remove', 'laptop', '1')
print(f"Remove laptop: {result4}")

# Checkout
print("\n=== Checkout Process ===")
checkout_result = client.eval(shopping_cart_script, 0, user_id, 'checkout')
print(f"Checkout result: {checkout_result}")

# Check remaining inventory
print(f"\nRemaining inventory:")
for product_id in products:
    remaining = client.get(f'inventory:{product_id}').decode()
    print(f"  {product_id}: {remaining}")

Script Error Handling

import fakeredis

client = fakeredis.FakeRedis()

# Script with error handling
safe_script = """
-- Safe division with error handling
local dividend = tonumber(ARGV[1])
local divisor = tonumber(ARGV[2])

-- Input validation
if not dividend or not divisor then
    return {error = 'Invalid input: arguments must be numbers'}
end

if divisor == 0 then
    return {error = 'Division by zero is not allowed'}
end

-- Perform calculation
local result = dividend / divisor

-- Store result with timestamp
local result_key = KEYS[1]
redis.call('HSET', result_key, 'result', result, 'timestamp', redis.call('TIME')[1])

return {success = true, result = result}
"""

# Test valid operations
print("=== Testing Valid Operations ===")
result1 = client.eval(safe_script, 1, 'calc:result1', '10', '2')
print(f"10 / 2 = {result1}")

result2 = client.eval(safe_script, 1, 'calc:result2', '7.5', '1.5')
print(f"7.5 / 1.5 = {result2}")

# Test error conditions
print("\n=== Testing Error Conditions ===")
result3 = client.eval(safe_script, 1, 'calc:result3', '10', '0')
print(f"10 / 0 = {result3}")

result4 = client.eval(safe_script, 1, 'calc:result4', 'abc', '5')
print(f"'abc' / 5 = {result4}")

# Check stored results
print("\n=== Stored Results ===")
for i in [1, 2]:
    stored = client.hgetall(f'calc:result{i}')
    if stored:
        result_val = stored[b'result'].decode()
        timestamp = stored[b'timestamp'].decode()
        print(f"Result {i}: {result_val} (calculated at {timestamp})")

Advanced Lua Features

import fakeredis
import time

client = fakeredis.FakeRedis()

# Advanced script using multiple Redis data types
analytics_script = """
-- Advanced analytics script
local event_type = ARGV[1]
local user_id = ARGV[2]
local timestamp = tonumber(ARGV[3])
local metadata = cjson.decode(ARGV[4])

-- Keys for different data structures
local daily_events_key = 'analytics:daily:' .. os.date('%Y-%m-%d', timestamp)
local user_events_key = 'analytics:user:' .. user_id
local event_stream_key = 'analytics:stream:' .. event_type
local leaderboard_key = 'analytics:leaderboard:' .. event_type

-- 1. Increment daily event counter (Hash)
redis.call('HINCRBY', daily_events_key, event_type, 1)

-- 2. Add to user's event list (List with capping)
local event_data = cjson.encode({
    type = event_type,
    timestamp = timestamp,
    metadata = metadata
})
redis.call('LPUSH', user_events_key, event_data)
redis.call('LTRIM', user_events_key, 0, 99)  -- Keep only last 100 events

-- 3. Add to event stream (Stream)
local stream_id = redis.call('XADD', event_stream_key, '*', 
    'user_id', user_id,
    'timestamp', timestamp,
    'metadata', ARGV[4]
)

-- 4. Update user score in leaderboard (Sorted Set)
local score_increment = metadata.score or 1
redis.call('ZINCRBY', leaderboard_key, score_increment, user_id)

-- 5. Set key expiration for cleanup
redis.call('EXPIRE', daily_events_key, 86400 * 30)  -- 30 days
redis.call('EXPIRE', user_events_key, 86400 * 7)    -- 7 days

-- 6. Get current statistics
local daily_stats = redis.call('HGETALL', daily_events_key)
local user_score = redis.call('ZSCORE', leaderboard_key, user_id)
local top_users = redis.call('ZREVRANGE', leaderboard_key, 0, 4, 'WITHSCORES')

-- Format response
local stats = {}
for i = 1, #daily_stats, 2 do
    stats[daily_stats[i]] = tonumber(daily_stats[i + 1])
end

local leaderboard = {}
for i = 1, #top_users, 2 do
    table.insert(leaderboard, {
        user = top_users[i],
        score = tonumber(top_users[i + 1])
    })
end

return {
    stream_id = stream_id,
    user_score = tonumber(user_score),
    daily_stats = stats,
    top_users = leaderboard
}
"""

# Test the analytics script
print("=== Advanced Analytics Test ===")

# Simulate multiple events
events = [
    ('page_view', 'user1', {'score': 1, 'page': '/home'}),
    ('purchase', 'user1', {'score': 10, 'amount': 99.99}),
    ('page_view', 'user2', {'score': 1, 'page': '/products'}),
    ('signup', 'user3', {'score': 5, 'referrer': 'google'}),
    ('purchase', 'user2', {'score': 15, 'amount': 149.99}),
    ('page_view', 'user1', {'score': 1, 'page': '/checkout'})
]

for event_type, user_id, metadata in events:
    result = client.eval(
        analytics_script,
        0,  # No KEYS needed
        event_type,
        user_id,
        str(int(time.time())),
        json.dumps(metadata)
    )
    
    print(f"\n{event_type} by {user_id}:")
    print(f"  Stream ID: {result['stream_id']}")
    print(f"  User Score: {result['user_score']}")
    print(f"  Daily Stats: {result['daily_stats']}")
    print(f"  Top Users: {result['top_users']}")

Pattern: Rate Limiting with Lua

import fakeredis
import time
import threading

class LuaRateLimiter:
    def __init__(self, client: fakeredis.FakeRedis):
        self.client = client
        
        # Sliding window rate limiter script
        self.rate_limit_script = """
        local key = KEYS[1]
        local window = tonumber(ARGV[1])
        local limit = tonumber(ARGV[2])
        local current_time = tonumber(ARGV[3])
        
        -- Remove expired entries
        redis.call('ZREMRANGEBYSCORE', key, 0, current_time - window)
        
        -- Count current requests
        local current_requests = redis.call('ZCARD', key)
        
        if current_requests < limit then
            -- Add current request
            redis.call('ZADD', key, current_time, current_time)
            redis.call('EXPIRE', key, math.ceil(window))
            return {allowed = true, remaining = limit - current_requests - 1}
        else
            return {allowed = false, remaining = 0, retry_after = redis.call('ZRANGE', key, 0, 0)[1] + window - current_time}
        end
        """
        
        self.script_sha = self.client.script_load(self.rate_limit_script)
    
    def is_allowed(self, identifier: str, window_seconds: int, max_requests: int) -> dict:
        """Check if request is allowed under rate limit"""
        key = f"rate_limit:{identifier}"
        current_time = time.time()
        
        result = self.client.evalsha(
            self.script_sha,
            1,
            key,
            str(window_seconds),
            str(max_requests),
            str(current_time)
        )
        
        return result

# Usage example
client = fakeredis.FakeRedis()
rate_limiter = LuaRateLimiter(client)

def simulate_api_requests(user_id: str, num_requests: int):
    """Simulate API requests from a user"""
    print(f"\n=== User {user_id} making {num_requests} requests ===")
    
    for i in range(num_requests):
        # Rate limit: 5 requests per 10 seconds
        result = rate_limiter.is_allowed(user_id, window_seconds=10, max_requests=5)
        
        if result['allowed']:
            print(f"Request {i+1}: ✅ Allowed (remaining: {result['remaining']})")
        else:
            retry_after = result.get('retry_after', 0)
            print(f"Request {i+1}: ❌ Rate limited (retry after: {retry_after:.1f}s)")
        
        time.sleep(0.5)  # Small delay between requests

# Test rate limiting
simulate_api_requests('user1', 8)  # Should hit rate limit

print("\n=== Testing Multiple Users ===")

def concurrent_requests(user_id: str):
    """Concurrent requests from different users"""
    for i in range(3):
        result = rate_limiter.is_allowed(user_id, window_seconds=10, max_requests=5)
        status = "✅" if result['allowed'] else "❌"
        print(f"User {user_id} Request {i+1}: {status}")
        time.sleep(0.2)

# Test concurrent access
users = ['userA', 'userB', 'userC']
threads = []

for user_id in users:
    thread = threading.Thread(target=concurrent_requests, args=(user_id,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

Pattern: Custom Data Structures

import fakeredis

client = fakeredis.FakeRedis()

# Implement a Lua-based priority queue
priority_queue_script = """
local action = ARGV[1]
local queue_key = KEYS[1]

if action == 'enqueue' then
    local item = ARGV[2]
    local priority = tonumber(ARGV[3])
    
    -- Add item with priority as score (higher score = higher priority)
    redis.call('ZADD', queue_key, priority, item)
    
    return redis.call('ZCARD', queue_key)
    
elseif action == 'dequeue' then
    -- Get highest priority item
    local items = redis.call('ZREVRANGE', queue_key, 0, 0, 'WITHSCORES')
    
    if #items == 0 then
        return nil
    end
    
    local item = items[1]
    local priority = tonumber(items[2])
    
    -- Remove the item
    redis.call('ZREM', queue_key, item)
    
    return {item = item, priority = priority}
    
elseif action == 'peek' then
    -- Get highest priority item without removing
    local items = redis.call('ZREVRANGE', queue_key, 0, 0, 'WITHSCORES')
    
    if #items == 0 then
        return nil
    end
    
    return {item = items[1], priority = tonumber(items[2])}
    
elseif action == 'size' then
    return redis.call('ZCARD', queue_key)
    
elseif action == 'list' then
    -- Get all items sorted by priority
    local items = redis.call('ZREVRANGE', queue_key, 0, -1, 'WITHSCORES')
    local result = {}
    
    for i = 1, #items, 2 do
        table.insert(result, {
            item = items[i],
            priority = tonumber(items[i + 1])
        })
    end
    
    return result
end

return {error = 'Invalid action'}
"""

class LuaPriorityQueue:
    def __init__(self, client: fakeredis.FakeRedis, queue_name: str):
        self.client = client
        self.queue_key = f"priority_queue:{queue_name}"
        self.script_sha = self.client.script_load(priority_queue_script)
    
    def enqueue(self, item: str, priority: int) -> int:
        """Add item with priority to queue"""
        return self.client.evalsha(self.script_sha, 1, self.queue_key, 'enqueue', item, str(priority))
    
    def dequeue(self):
        """Remove and return highest priority item"""
        return self.client.evalsha(self.script_sha, 1, self.queue_key, 'dequeue')
    
    def peek(self):
        """Get highest priority item without removing"""
        return self.client.evalsha(self.script_sha, 1, self.queue_key, 'peek')
    
    def size(self) -> int:
        """Get queue size"""
        return self.client.evalsha(self.script_sha, 1, self.queue_key, 'size')
    
    def list_all(self):
        """Get all items sorted by priority"""
        return self.client.evalsha(self.script_sha, 1, self.queue_key, 'list')

# Test the priority queue
print("=== Priority Queue Test ===")

pq = LuaPriorityQueue(client, 'tasks')

# Add tasks with different priorities
tasks = [
    ('Send email', 3),
    ('Critical bug fix', 10),
    ('Update documentation', 2),
    ('Deploy to production', 9),
    ('Code review', 5),
    ('Security patch', 10)
]

print("Enqueuing tasks:")
for task, priority in tasks:
    size = pq.enqueue(task, priority)
    print(f"  Added '{task}' (priority {priority}) - Queue size: {size}")

print(f"\nQueue contents (sorted by priority):")
all_tasks = pq.list_all()
for task_info in all_tasks:
    print(f"  Priority {task_info['priority']}: {task_info['item']}")

print(f"\nProcessing tasks:")
while pq.size() > 0:
    # Peek at next task
    next_task = pq.peek()
    print(f"  Next: {next_task['item']} (priority {next_task['priority']})")
    
    # Dequeue and process
    processed = pq.dequeue()
    print(f"  Processed: {processed['item']}")
    
    if pq.size() > 0:
        print(f"  Remaining tasks: {pq.size()}")

print("All tasks completed!")

Install with Tessl CLI

npx tessl i tessl/pypi-fakeredis

docs

bitmap-operations.md

core-clients.md

generic-operations.md

geospatial-operations.md

hash-operations.md

index.md

list-operations.md

lua-scripting.md

pubsub-operations.md

server-management.md

server-operations.md

set-operations.md

sorted-set-operations.md

stack-extensions.md

stream-operations.md

string-operations.md

transaction-operations.md

valkey-support.md

tile.json