Python implementation of redis API, can be used for testing purposes
—
Redis Lua scripting support with script caching and execution. Lua scripts provide atomic operations, server-side computation, and complex logic execution with full access to Redis data structures and commands, enabling powerful custom operations that maintain data consistency.
Core functions for executing Lua scripts with Redis data access.
def eval(
self,
script: str,
numkeys: int,
*keys_and_args: KeyT
) -> Any: ...
def evalsha(
self,
sha: str,
numkeys: int,
*keys_and_args: KeyT
) -> Any: ...Functions for managing script cache and script lifecycle.
def script_load(self, script: str) -> str: ...
def script_exists(self, *args: str) -> List[bool]: ...
def script_flush(self, sync_type: Optional[str] = None) -> bool: ...
def script_kill(self) -> bool: ...Redis 3.2+ debugging capabilities for Lua script development.
def script_debug(self, *args) -> str: ...import fakeredis
client = fakeredis.FakeRedis()
# Simple script that increments a counter and returns the new value
increment_script = """
local key = KEYS[1]
local increment = tonumber(ARGV[1]) or 1
local current = redis.call('GET', key)
if current == false then
current = 0
else
current = tonumber(current)
end
local new_value = current + increment
redis.call('SET', key, new_value)
return new_value
"""
# Execute script directly
result1 = client.eval(increment_script, 1, 'counter', '5')
print(f"First increment: {result1}") # 5
result2 = client.eval(increment_script, 1, 'counter', '3')
print(f"Second increment: {result2}") # 8
# Verify the value
current_value = client.get('counter')
print(f"Current counter value: {current_value.decode()}") # 8import fakeredis
client = fakeredis.FakeRedis()
# Load script and get SHA hash
multi_key_script = """
-- Get multiple keys and return them as a table
local keys = KEYS
local result = {}
for i, key in ipairs(keys) do
local value = redis.call('GET', key)
if value ~= false then
result[key] = value
end
end
return result
"""
# Load script into Redis cache
script_sha = client.script_load(multi_key_script)
print(f"Script loaded with SHA: {script_sha}")
# Set some test data
client.mset({
'user:1:name': 'Alice',
'user:1:email': 'alice@example.com',
'user:1:age': '30',
'nonexistent:key': 'wont_be_set'
})
# Execute using SHA (more efficient for repeated executions)
result = client.evalsha(
script_sha,
3, # number of keys
'user:1:name', 'user:1:email', 'user:1:age' # keys
)
print("Script result:")
for key, value in result.items():
print(f" {key}: {value}")
# Check if script exists in cache
exists = client.script_exists(script_sha)
print(f"Script exists in cache: {exists}") # [True]import fakeredis
client = fakeredis.FakeRedis()
# Atomic transfer between accounts
transfer_script = """
local from_account = KEYS[1]
local to_account = KEYS[2]
local amount = tonumber(ARGV[1])
-- Get current balances
local from_balance = tonumber(redis.call('GET', from_account) or 0)
local to_balance = tonumber(redis.call('GET', to_account) or 0)
-- Check if sufficient funds
if from_balance < amount then
return {err = 'Insufficient funds'}
end
-- Perform transfer
redis.call('SET', from_account, from_balance - amount)
redis.call('SET', to_account, to_balance + amount)
-- Return new balances
return {
from_balance = from_balance - amount,
to_balance = to_balance + amount,
transferred = amount
}
"""
# Setup initial balances
client.set('account:alice', '1000')
client.set('account:bob', '500')
print("Initial balances:")
print(f"Alice: ${client.get('account:alice').decode()}")
print(f"Bob: ${client.get('account:bob').decode()}")
# Execute transfer
result = client.eval(
transfer_script,
2, # 2 keys
'account:alice', 'account:bob', # from, to
'150' # amount
)
print(f"\nTransfer result: {result}")
print("\nFinal balances:")
print(f"Alice: ${client.get('account:alice').decode()}")
print(f"Bob: ${client.get('account:bob').decode()}")
# Try transfer with insufficient funds
print("\n--- Testing insufficient funds ---")
result2 = client.eval(
transfer_script,
2,
'account:alice', 'account:bob',
'2000' # More than Alice has
)
print(f"Insufficient funds result: {result2}")import fakeredis
import json
client = fakeredis.FakeRedis()
# Script for managing a shopping cart with inventory checking
shopping_cart_script = """
local user_id = ARGV[1]
local action = ARGV[2]
local product_id = ARGV[3]
local quantity = tonumber(ARGV[4]) or 0
local cart_key = 'cart:' .. user_id
local product_key = 'product:' .. product_id
local inventory_key = 'inventory:' .. product_id
if action == 'add' then
-- Check inventory
local available = tonumber(redis.call('GET', inventory_key) or 0)
local current_in_cart = tonumber(redis.call('HGET', cart_key, product_id) or 0)
if available < (current_in_cart + quantity) then
return {error = 'Insufficient inventory', available = available, requested = current_in_cart + quantity}
end
-- Add to cart
redis.call('HINCRBY', cart_key, product_id, quantity)
local new_quantity = tonumber(redis.call('HGET', cart_key, product_id))
return {success = true, product = product_id, quantity = new_quantity}
elseif action == 'remove' then
-- Remove from cart
local current = tonumber(redis.call('HGET', cart_key, product_id) or 0)
local to_remove = math.min(quantity, current)
if to_remove > 0 then
redis.call('HINCRBY', cart_key, product_id, -to_remove)
local remaining = tonumber(redis.call('HGET', cart_key, product_id))
-- Remove if quantity is 0
if remaining <= 0 then
redis.call('HDEL', cart_key, product_id)
remaining = 0
end
return {success = true, product = product_id, quantity = remaining, removed = to_remove}
else
return {error = 'Product not in cart'}
end
elseif action == 'get' then
-- Get cart contents
local cart = redis.call('HGETALL', cart_key)
local result = {}
for i = 1, #cart, 2 do
local prod_id = cart[i]
local qty = tonumber(cart[i + 1])
result[prod_id] = qty
end
return {cart = result}
elseif action == 'checkout' then
-- Atomic checkout process
local cart = redis.call('HGETALL', cart_key)
local total_cost = 0
local items = {}
-- Check all items and calculate cost
for i = 1, #cart, 2 do
local prod_id = cart[i]
local qty = tonumber(cart[i + 1])
-- Check inventory
local available = tonumber(redis.call('GET', inventory_key:gsub(product_id, prod_id)) or 0)
if available < qty then
return {error = 'Insufficient inventory for ' .. prod_id, available = available, needed = qty}
end
-- Get product price
local price = tonumber(redis.call('HGET', 'product:' .. prod_id, 'price') or 0)
local item_cost = price * qty
total_cost = total_cost + item_cost
table.insert(items, {product = prod_id, quantity = qty, price = price, total = item_cost})
end
-- Deduct from inventory and clear cart
for _, item in ipairs(items) do
redis.call('DECRBY', 'inventory:' .. item.product, item.quantity)
end
redis.call('DEL', cart_key)
-- Create order
local order_id = 'order:' .. redis.call('INCR', 'order_counter')
redis.call('HSET', order_id, 'user_id', user_id, 'total_cost', total_cost, 'timestamp', redis.call('TIME')[1])
return {success = true, order_id = order_id, total_cost = total_cost, items = items}
end
return {error = 'Invalid action'}
"""
# Setup products and inventory
products = {
'laptop': {'price': 999.99, 'inventory': 5},
'mouse': {'price': 29.99, 'inventory': 50},
'keyboard': {'price': 89.99, 'inventory': 20}
}
for product_id, data in products.items():
client.hset(f'product:{product_id}', 'price', str(data['price']))
client.set(f'inventory:{product_id}', str(data['inventory']))
# Test shopping cart operations
user_id = 'user123'
# Add items to cart
print("=== Adding Items to Cart ===")
result1 = client.eval(shopping_cart_script, 0, user_id, 'add', 'laptop', '2')
print(f"Add laptop: {result1}")
result2 = client.eval(shopping_cart_script, 0, user_id, 'add', 'mouse', '3')
print(f"Add mouse: {result2}")
# Try to add more than available inventory
result3 = client.eval(shopping_cart_script, 0, user_id, 'add', 'laptop', '10')
print(f"Add too many laptops: {result3}")
# Get cart contents
cart_contents = client.eval(shopping_cart_script, 0, user_id, 'get')
print(f"Cart contents: {cart_contents}")
# Remove some items
result4 = client.eval(shopping_cart_script, 0, user_id, 'remove', 'laptop', '1')
print(f"Remove laptop: {result4}")
# Checkout
print("\n=== Checkout Process ===")
checkout_result = client.eval(shopping_cart_script, 0, user_id, 'checkout')
print(f"Checkout result: {checkout_result}")
# Check remaining inventory
print(f"\nRemaining inventory:")
for product_id in products:
remaining = client.get(f'inventory:{product_id}').decode()
print(f" {product_id}: {remaining}")import fakeredis
client = fakeredis.FakeRedis()
# Script with error handling
safe_script = """
-- Safe division with error handling
local dividend = tonumber(ARGV[1])
local divisor = tonumber(ARGV[2])
-- Input validation
if not dividend or not divisor then
return {error = 'Invalid input: arguments must be numbers'}
end
if divisor == 0 then
return {error = 'Division by zero is not allowed'}
end
-- Perform calculation
local result = dividend / divisor
-- Store result with timestamp
local result_key = KEYS[1]
redis.call('HSET', result_key, 'result', result, 'timestamp', redis.call('TIME')[1])
return {success = true, result = result}
"""
# Test valid operations
print("=== Testing Valid Operations ===")
result1 = client.eval(safe_script, 1, 'calc:result1', '10', '2')
print(f"10 / 2 = {result1}")
result2 = client.eval(safe_script, 1, 'calc:result2', '7.5', '1.5')
print(f"7.5 / 1.5 = {result2}")
# Test error conditions
print("\n=== Testing Error Conditions ===")
result3 = client.eval(safe_script, 1, 'calc:result3', '10', '0')
print(f"10 / 0 = {result3}")
result4 = client.eval(safe_script, 1, 'calc:result4', 'abc', '5')
print(f"'abc' / 5 = {result4}")
# Check stored results
print("\n=== Stored Results ===")
for i in [1, 2]:
stored = client.hgetall(f'calc:result{i}')
if stored:
result_val = stored[b'result'].decode()
timestamp = stored[b'timestamp'].decode()
print(f"Result {i}: {result_val} (calculated at {timestamp})")import fakeredis
import time
client = fakeredis.FakeRedis()
# Advanced script using multiple Redis data types
analytics_script = """
-- Advanced analytics script
local event_type = ARGV[1]
local user_id = ARGV[2]
local timestamp = tonumber(ARGV[3])
local metadata = cjson.decode(ARGV[4])
-- Keys for different data structures
local daily_events_key = 'analytics:daily:' .. os.date('%Y-%m-%d', timestamp)
local user_events_key = 'analytics:user:' .. user_id
local event_stream_key = 'analytics:stream:' .. event_type
local leaderboard_key = 'analytics:leaderboard:' .. event_type
-- 1. Increment daily event counter (Hash)
redis.call('HINCRBY', daily_events_key, event_type, 1)
-- 2. Add to user's event list (List with capping)
local event_data = cjson.encode({
type = event_type,
timestamp = timestamp,
metadata = metadata
})
redis.call('LPUSH', user_events_key, event_data)
redis.call('LTRIM', user_events_key, 0, 99) -- Keep only last 100 events
-- 3. Add to event stream (Stream)
local stream_id = redis.call('XADD', event_stream_key, '*',
'user_id', user_id,
'timestamp', timestamp,
'metadata', ARGV[4]
)
-- 4. Update user score in leaderboard (Sorted Set)
local score_increment = metadata.score or 1
redis.call('ZINCRBY', leaderboard_key, score_increment, user_id)
-- 5. Set key expiration for cleanup
redis.call('EXPIRE', daily_events_key, 86400 * 30) -- 30 days
redis.call('EXPIRE', user_events_key, 86400 * 7) -- 7 days
-- 6. Get current statistics
local daily_stats = redis.call('HGETALL', daily_events_key)
local user_score = redis.call('ZSCORE', leaderboard_key, user_id)
local top_users = redis.call('ZREVRANGE', leaderboard_key, 0, 4, 'WITHSCORES')
-- Format response
local stats = {}
for i = 1, #daily_stats, 2 do
stats[daily_stats[i]] = tonumber(daily_stats[i + 1])
end
local leaderboard = {}
for i = 1, #top_users, 2 do
table.insert(leaderboard, {
user = top_users[i],
score = tonumber(top_users[i + 1])
})
end
return {
stream_id = stream_id,
user_score = tonumber(user_score),
daily_stats = stats,
top_users = leaderboard
}
"""
# Test the analytics script
print("=== Advanced Analytics Test ===")
# Simulate multiple events
events = [
('page_view', 'user1', {'score': 1, 'page': '/home'}),
('purchase', 'user1', {'score': 10, 'amount': 99.99}),
('page_view', 'user2', {'score': 1, 'page': '/products'}),
('signup', 'user3', {'score': 5, 'referrer': 'google'}),
('purchase', 'user2', {'score': 15, 'amount': 149.99}),
('page_view', 'user1', {'score': 1, 'page': '/checkout'})
]
for event_type, user_id, metadata in events:
result = client.eval(
analytics_script,
0, # No KEYS needed
event_type,
user_id,
str(int(time.time())),
json.dumps(metadata)
)
print(f"\n{event_type} by {user_id}:")
print(f" Stream ID: {result['stream_id']}")
print(f" User Score: {result['user_score']}")
print(f" Daily Stats: {result['daily_stats']}")
print(f" Top Users: {result['top_users']}")import fakeredis
import time
import threading
class LuaRateLimiter:
def __init__(self, client: fakeredis.FakeRedis):
self.client = client
# Sliding window rate limiter script
self.rate_limit_script = """
local key = KEYS[1]
local window = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local current_time = tonumber(ARGV[3])
-- Remove expired entries
redis.call('ZREMRANGEBYSCORE', key, 0, current_time - window)
-- Count current requests
local current_requests = redis.call('ZCARD', key)
if current_requests < limit then
-- Add current request
redis.call('ZADD', key, current_time, current_time)
redis.call('EXPIRE', key, math.ceil(window))
return {allowed = true, remaining = limit - current_requests - 1}
else
return {allowed = false, remaining = 0, retry_after = redis.call('ZRANGE', key, 0, 0)[1] + window - current_time}
end
"""
self.script_sha = self.client.script_load(self.rate_limit_script)
def is_allowed(self, identifier: str, window_seconds: int, max_requests: int) -> dict:
"""Check if request is allowed under rate limit"""
key = f"rate_limit:{identifier}"
current_time = time.time()
result = self.client.evalsha(
self.script_sha,
1,
key,
str(window_seconds),
str(max_requests),
str(current_time)
)
return result
# Usage example
client = fakeredis.FakeRedis()
rate_limiter = LuaRateLimiter(client)
def simulate_api_requests(user_id: str, num_requests: int):
"""Simulate API requests from a user"""
print(f"\n=== User {user_id} making {num_requests} requests ===")
for i in range(num_requests):
# Rate limit: 5 requests per 10 seconds
result = rate_limiter.is_allowed(user_id, window_seconds=10, max_requests=5)
if result['allowed']:
print(f"Request {i+1}: ✅ Allowed (remaining: {result['remaining']})")
else:
retry_after = result.get('retry_after', 0)
print(f"Request {i+1}: ❌ Rate limited (retry after: {retry_after:.1f}s)")
time.sleep(0.5) # Small delay between requests
# Test rate limiting
simulate_api_requests('user1', 8) # Should hit rate limit
print("\n=== Testing Multiple Users ===")
def concurrent_requests(user_id: str):
"""Concurrent requests from different users"""
for i in range(3):
result = rate_limiter.is_allowed(user_id, window_seconds=10, max_requests=5)
status = "✅" if result['allowed'] else "❌"
print(f"User {user_id} Request {i+1}: {status}")
time.sleep(0.2)
# Test concurrent access
users = ['userA', 'userB', 'userC']
threads = []
for user_id in users:
thread = threading.Thread(target=concurrent_requests, args=(user_id,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()import fakeredis
client = fakeredis.FakeRedis()
# Implement a Lua-based priority queue
priority_queue_script = """
local action = ARGV[1]
local queue_key = KEYS[1]
if action == 'enqueue' then
local item = ARGV[2]
local priority = tonumber(ARGV[3])
-- Add item with priority as score (higher score = higher priority)
redis.call('ZADD', queue_key, priority, item)
return redis.call('ZCARD', queue_key)
elseif action == 'dequeue' then
-- Get highest priority item
local items = redis.call('ZREVRANGE', queue_key, 0, 0, 'WITHSCORES')
if #items == 0 then
return nil
end
local item = items[1]
local priority = tonumber(items[2])
-- Remove the item
redis.call('ZREM', queue_key, item)
return {item = item, priority = priority}
elseif action == 'peek' then
-- Get highest priority item without removing
local items = redis.call('ZREVRANGE', queue_key, 0, 0, 'WITHSCORES')
if #items == 0 then
return nil
end
return {item = items[1], priority = tonumber(items[2])}
elseif action == 'size' then
return redis.call('ZCARD', queue_key)
elseif action == 'list' then
-- Get all items sorted by priority
local items = redis.call('ZREVRANGE', queue_key, 0, -1, 'WITHSCORES')
local result = {}
for i = 1, #items, 2 do
table.insert(result, {
item = items[i],
priority = tonumber(items[i + 1])
})
end
return result
end
return {error = 'Invalid action'}
"""
class LuaPriorityQueue:
def __init__(self, client: fakeredis.FakeRedis, queue_name: str):
self.client = client
self.queue_key = f"priority_queue:{queue_name}"
self.script_sha = self.client.script_load(priority_queue_script)
def enqueue(self, item: str, priority: int) -> int:
"""Add item with priority to queue"""
return self.client.evalsha(self.script_sha, 1, self.queue_key, 'enqueue', item, str(priority))
def dequeue(self):
"""Remove and return highest priority item"""
return self.client.evalsha(self.script_sha, 1, self.queue_key, 'dequeue')
def peek(self):
"""Get highest priority item without removing"""
return self.client.evalsha(self.script_sha, 1, self.queue_key, 'peek')
def size(self) -> int:
"""Get queue size"""
return self.client.evalsha(self.script_sha, 1, self.queue_key, 'size')
def list_all(self):
"""Get all items sorted by priority"""
return self.client.evalsha(self.script_sha, 1, self.queue_key, 'list')
# Test the priority queue
print("=== Priority Queue Test ===")
pq = LuaPriorityQueue(client, 'tasks')
# Add tasks with different priorities
tasks = [
('Send email', 3),
('Critical bug fix', 10),
('Update documentation', 2),
('Deploy to production', 9),
('Code review', 5),
('Security patch', 10)
]
print("Enqueuing tasks:")
for task, priority in tasks:
size = pq.enqueue(task, priority)
print(f" Added '{task}' (priority {priority}) - Queue size: {size}")
print(f"\nQueue contents (sorted by priority):")
all_tasks = pq.list_all()
for task_info in all_tasks:
print(f" Priority {task_info['priority']}: {task_info['item']}")
print(f"\nProcessing tasks:")
while pq.size() > 0:
# Peek at next task
next_task = pq.peek()
print(f" Next: {next_task['item']} (priority {next_task['priority']})")
# Dequeue and process
processed = pq.dequeue()
print(f" Processed: {processed['item']}")
if pq.size() > 0:
print(f" Remaining tasks: {pq.size()}")
print("All tasks completed!")Install with Tessl CLI
npx tessl i tessl/pypi-fakeredisdocs