CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-redis

Python client for Redis database and key-value store

Pending
Overview
Eval results
Files

redis-timeseries.mddocs/modules/

RedisTimeSeries

RedisTimeSeries provides time series data structure support for Redis, enabling efficient storage, querying, and analysis of time-stamped data. It supports downsampling, aggregations, and real-time analytics for monitoring and IoT applications.

Capabilities

Time Series Creation and Management

Create and configure time series with retention policies and labels.

def ts_create(
    self,
    key: str,
    retention_msecs: Optional[int] = None,
    uncompressed: bool = False,
    labels: Optional[Dict[str, str]] = None,
    duplicate_policy: Optional[str] = None
) -> str: ...

def ts_del(
    self,
    key: str,
    from_timestamp: int,
    to_timestamp: int
) -> int: ...

def ts_alter(
    self,
    key: str,
    retention_msecs: Optional[int] = None,
    labels: Optional[Dict[str, str]] = None,
    duplicate_policy: Optional[str] = None
) -> str: ...

def ts_info(self, key: str) -> Dict[str, Any]: ...

Data Ingestion

Add time series data points with timestamps and values.

def ts_add(
    self,
    key: str,
    timestamp: Union[int, str],
    value: float,
    retention_msecs: Optional[int] = None,
    uncompressed: bool = False,
    labels: Optional[Dict[str, str]] = None,
    duplicate_policy: Optional[str] = None
) -> int: ...

def ts_madd(
    self,
    *args: Tuple[str, Union[int, str], float]
) -> List[int]: ...

def ts_incrby(
    self,
    key: str,
    value: float,
    timestamp: Optional[Union[int, str]] = None,
    retention_msecs: Optional[int] = None,
    uncompressed: bool = False,
    labels: Optional[Dict[str, str]] = None
) -> int: ...

def ts_decrby(
    self,
    key: str,
    value: float,
    timestamp: Optional[Union[int, str]] = None,
    retention_msecs: Optional[int] = None,
    uncompressed: bool = False,
    labels: Optional[Dict[str, str]] = None
) -> int: ...

Data Retrieval and Querying

Query time series data with range queries and aggregations.

def ts_get(self, key: str) -> Optional[Tuple[int, float]]: ...

def ts_mget(
    self,
    filters: List[str],
    with_labels: bool = False
) -> List[Dict[str, Any]]: ...

def ts_range(
    self,
    key: str,
    from_timestamp: Union[int, str],
    to_timestamp: Union[int, str],
    count: Optional[int] = None,
    aggregation_type: Optional[str] = None,
    bucket_size_msec: Optional[int] = None,
    with_labels: bool = False,
    filter_by_ts: Optional[List[int]] = None,
    filter_by_min_value: Optional[float] = None,
    filter_by_max_value: Optional[float] = None
) -> List[Tuple[int, float]]: ...

def ts_revrange(
    self,
    key: str,
    from_timestamp: Union[int, str],
    to_timestamp: Union[int, str],
    count: Optional[int] = None,
    aggregation_type: Optional[str] = None,
    bucket_size_msec: Optional[int] = None,
    with_labels: bool = False,
    filter_by_ts: Optional[List[int]] = None,
    filter_by_min_value: Optional[float] = None,
    filter_by_max_value: Optional[float] = None
) -> List[Tuple[int, float]]: ...

def ts_mrange(
    self,
    from_timestamp: Union[int, str],
    to_timestamp: Union[int, str],
    filters: List[str],
    count: Optional[int] = None,
    aggregation_type: Optional[str] = None,
    bucket_size_msec: Optional[int] = None,
    with_labels: bool = False,
    filter_by_ts: Optional[List[int]] = None,
    filter_by_min_value: Optional[float] = None,
    filter_by_max_value: Optional[float] = None,
    groupby: Optional[str] = None,
    reduce: Optional[str] = None
) -> List[Dict[str, Any]]: ...

def ts_mrevrange(
    self,
    from_timestamp: Union[int, str],
    to_timestamp: Union[int, str],
    filters: List[str],
    count: Optional[int] = None,
    aggregation_type: Optional[str] = None,
    bucket_size_msec: Optional[int] = None,
    with_labels: bool = False,
    filter_by_ts: Optional[List[int]] = None,
    filter_by_min_value: Optional[float] = None,
    filter_by_max_value: Optional[float] = None,
    groupby: Optional[str] = None,
    reduce: Optional[str] = None
) -> List[Dict[str, Any]]: ...

Rules and Downsampling

Create compaction rules for automatic downsampling and aggregation.

def ts_createrule(
    self,
    source_key: str,
    dest_key: str,
    aggregation_type: str,
    bucket_size_msec: int
) -> str: ...

def ts_deleterule(
    self,
    source_key: str,
    dest_key: str
) -> str: ...

Query and Metadata Operations

Query time series metadata and perform administrative operations.

def ts_queryindex(self, filters: List[str]) -> List[str]: ...

Usage Examples

Basic Time Series Operations

import redis
import time
from datetime import datetime, timedelta

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Create time series for temperature monitoring
def create_temperature_series():
    # Create time series with retention and labels
    result = r.ts().create(
        "temperature:sensor1",
        retention_msecs=86400000,  # 24 hours retention
        labels={
            "sensor_id": "sensor1",
            "location": "server_room",
            "type": "temperature"
        }
    )
    print(f"Created temperature series: {result}")
    
    # Create series for humidity
    r.ts().create(
        "humidity:sensor1",
        retention_msecs=86400000,
        labels={
            "sensor_id": "sensor1", 
            "location": "server_room",
            "type": "humidity"
        }
    )
    print("Created humidity series")

create_temperature_series()

# Add data points
def add_sensor_data():
    current_time = int(time.time() * 1000)  # Current timestamp in milliseconds
    
    # Add individual data points
    temp_timestamp = r.ts().add("temperature:sensor1", current_time, 23.5)
    humidity_timestamp = r.ts().add("humidity:sensor1", current_time, 65.2)
    
    print(f"Added temperature reading at {temp_timestamp}")
    print(f"Added humidity reading at {humidity_timestamp}")
    
    # Add multiple data points at once
    sensor_data = [
        ("temperature:sensor1", current_time + 1000, 23.7),
        ("humidity:sensor1", current_time + 1000, 64.8),
        ("temperature:sensor1", current_time + 2000, 24.1),
        ("humidity:sensor1", current_time + 2000, 66.1)
    ]
    
    timestamps = r.ts().madd(*sensor_data)
    print(f"Batch added data points: {timestamps}")

add_sensor_data()

Time Series Queries and Aggregations

import redis
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def query_sensor_data():
    # Get latest values
    latest_temp = r.ts().get("temperature:sensor1")
    latest_humidity = r.ts().get("humidity:sensor1")
    
    if latest_temp:
        timestamp, value = latest_temp
        dt = datetime.fromtimestamp(timestamp / 1000)
        print(f"Latest temperature: {value}°C at {dt}")
    
    if latest_humidity:
        timestamp, value = latest_humidity
        dt = datetime.fromtimestamp(timestamp / 1000)
        print(f"Latest humidity: {value}% at {dt}")
    
    # Query data range (last hour)
    end_time = int(time.time() * 1000)
    start_time = end_time - (60 * 60 * 1000)  # 1 hour ago
    
    temp_data = r.ts().range(
        "temperature:sensor1",
        start_time,
        end_time,
        count=100
    )
    
    print(f"Temperature readings in last hour: {len(temp_data)} points")
    for timestamp, value in temp_data[-5:]:  # Last 5 readings
        dt = datetime.fromtimestamp(timestamp / 1000)
        print(f"  {dt}: {value}°C")

query_sensor_data()

Advanced Aggregation Queries

import redis
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def advanced_analytics():
    end_time = int(time.time() * 1000)
    start_time = end_time - (24 * 60 * 60 * 1000)  # 24 hours ago
    
    # Get average temperature per hour
    hourly_avg = r.ts().range(
        "temperature:sensor1",
        start_time,
        end_time,
        aggregation_type="AVG",
        bucket_size_msec=3600000  # 1 hour buckets
    )
    
    print("Hourly average temperatures:")
    for timestamp, avg_temp in hourly_avg:
        dt = datetime.fromtimestamp(timestamp / 1000)
        print(f"  {dt.strftime('%Y-%m-%d %H:00')}: {avg_temp:.2f}°C")
    
    # Get min/max temperature in last 24 hours
    min_temp = r.ts().range(
        "temperature:sensor1",
        start_time,
        end_time,
        aggregation_type="MIN",
        bucket_size_msec=24 * 3600000  # 24 hour bucket
    )
    
    max_temp = r.ts().range(
        "temperature:sensor1",
        start_time,
        end_time,
        aggregation_type="MAX",
        bucket_size_msec=24 * 3600000
    )
    
    if min_temp and max_temp:
        print(f"24h Min temperature: {min_temp[0][1]:.2f}°C")
        print(f"24h Max temperature: {max_temp[0][1]:.2f}°C")

advanced_analytics()

Multi-Series Queries

import redis
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def multi_sensor_queries():
    # Get latest values from all sensors
    latest_readings = r.ts().mget(
        filters=["location=server_room"],
        with_labels=True
    )
    
    print("Latest readings from all server room sensors:")
    for reading in latest_readings:
        key = reading.get('key', 'unknown')
        labels = reading.get('labels', {})
        value_timestamp = reading.get('value', [None, None])
        
        if value_timestamp[0] is not None:
            timestamp, value = value_timestamp
            dt = datetime.fromtimestamp(timestamp / 1000)
            sensor_type = labels.get('type', 'unknown')
            print(f"  {sensor_type}: {value} at {dt}")
    
    # Query range data from multiple series
    end_time = int(time.time() * 1000)
    start_time = end_time - (3600000)  # 1 hour ago
    
    multi_range_data = r.ts().mrange(
        start_time,
        end_time,
        filters=["location=server_room"],
        aggregation_type="AVG",
        bucket_size_msec=600000,  # 10-minute buckets
        with_labels=True
    )
    
    print("\nAverage readings per 10 minutes (last hour):")
    for series in multi_range_data:
        key = series.get('key', 'unknown')
        labels = series.get('labels', {})
        values = series.get('values', [])
        sensor_type = labels.get('type', 'unknown')
        
        print(f"\n{sensor_type} ({key}):")
        for timestamp, value in values[-3:]:  # Last 3 readings
            dt = datetime.fromtimestamp(timestamp / 1000)
            print(f"  {dt.strftime('%H:%M')}: {value:.2f}")

multi_sensor_queries()

Automatic Downsampling with Rules

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def setup_downsampling():
    # Create destination series for downsampled data
    r.ts().create(
        "temperature:sensor1:hourly",
        retention_msecs=30 * 24 * 3600000,  # 30 days retention
        labels={
            "sensor_id": "sensor1",
            "location": "server_room", 
            "type": "temperature",
            "resolution": "hourly"
        }
    )
    
    r.ts().create(
        "temperature:sensor1:daily",
        retention_msecs=365 * 24 * 3600000,  # 1 year retention
        labels={
            "sensor_id": "sensor1",
            "location": "server_room",
            "type": "temperature", 
            "resolution": "daily"
        }
    )
    
    # Create downsampling rules
    # Raw data -> Hourly averages
    r.ts().createrule(
        "temperature:sensor1",
        "temperature:sensor1:hourly",
        "AVG",
        3600000  # 1 hour
    )
    
    # Hourly data -> Daily averages  
    r.ts().createrule(
        "temperature:sensor1:hourly",
        "temperature:sensor1:daily",
        "AVG",
        24 * 3600000  # 24 hours
    )
    
    print("Created downsampling rules")
    
    # Verify rules were created
    info = r.ts().info("temperature:sensor1")
    print(f"Rules for sensor1: {info.get('rules', [])}")

setup_downsampling()

def query_downsampled_data():
    # Query different resolution data
    end_time = int(time.time() * 1000)
    
    # Last 7 days of daily averages
    start_time = end_time - (7 * 24 * 3600000)
    daily_data = r.ts().range("temperature:sensor1:daily", start_time, end_time)
    
    print("Daily temperature averages (last 7 days):")
    for timestamp, avg_temp in daily_data:
        dt = datetime.fromtimestamp(timestamp / 1000)
        print(f"  {dt.strftime('%Y-%m-%d')}: {avg_temp:.2f}°C")
    
    # Last 24 hours of hourly averages
    start_time = end_time - (24 * 3600000)
    hourly_data = r.ts().range("temperature:sensor1:hourly", start_time, end_time)
    
    print("\nHourly temperature averages (last 24 hours):")
    for timestamp, avg_temp in hourly_data[-6:]:  # Last 6 hours
        dt = datetime.fromtimestamp(timestamp / 1000)
        print(f"  {dt.strftime('%H:00')}: {avg_temp:.2f}°C")

# Wait a moment for rules to process, then query
time.sleep(1)
query_downsampled_data()

Real-time Monitoring and Alerting

import redis
import time
import threading

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

class TemperatureMonitor:
    def __init__(self, redis_client):
        self.r = redis_client
        self.running = False
        self.alert_threshold_high = 25.0
        self.alert_threshold_low = 20.0
    
    def add_reading(self, temperature):
        """Add a temperature reading and check for alerts"""
        timestamp = r.ts().add("temperature:sensor1", "*", temperature)
        
        # Check for alerts
        if temperature > self.alert_threshold_high:
            self.trigger_alert("HIGH", temperature, timestamp)
        elif temperature < self.alert_threshold_low:
            self.trigger_alert("LOW", temperature, timestamp)
        
        return timestamp
    
    def trigger_alert(self, alert_type, temperature, timestamp):
        """Trigger temperature alert"""
        dt = datetime.fromtimestamp(timestamp / 1000)
        print(f"🚨 ALERT: {alert_type} temperature {temperature}°C at {dt}")
        
        # Store alert in separate time series
        alert_series = f"alerts:temperature:sensor1"
        if not self.series_exists(alert_series):
            self.r.ts().create(
                alert_series,
                labels={
                    "sensor_id": "sensor1",
                    "type": "alert",
                    "metric": "temperature"
                }
            )
        
        # Store alert with severity as value
        severity = 2 if alert_type == "HIGH" else 1
        self.r.ts().add(alert_series, timestamp, severity)
    
    def series_exists(self, key):
        """Check if time series exists"""
        try:
            self.r.ts().info(key)
            return True
        except:
            return False
    
    def get_recent_stats(self, minutes=60):
        """Get statistics for recent data"""
        end_time = int(time.time() * 1000)
        start_time = end_time - (minutes * 60 * 1000)
        
        # Get recent data
        data = self.r.ts().range("temperature:sensor1", start_time, end_time)
        
        if not data:
            return None
        
        temperatures = [value for _, value in data]
        
        return {
            "count": len(temperatures),
            "min": min(temperatures),
            "max": max(temperatures),
            "avg": sum(temperatures) / len(temperatures),
            "latest": temperatures[-1]
        }

# Usage example
monitor = TemperatureMonitor(r)

# Simulate temperature readings with some alerts
test_temperatures = [21.5, 22.1, 23.8, 24.5, 25.8, 26.2, 24.1, 22.9, 19.5, 18.2, 21.0]

print("Simulating temperature readings...")
for temp in test_temperatures:
    monitor.add_reading(temp)
    time.sleep(0.1)

# Get statistics
stats = monitor.get_recent_stats(minutes=5)
if stats:
    print(f"\nRecent temperature statistics:")
    print(f"  Count: {stats['count']} readings")
    print(f"  Min: {stats['min']:.1f}°C")
    print(f"  Max: {stats['max']:.1f}°C")
    print(f"  Average: {stats['avg']:.1f}°C")
    print(f"  Latest: {stats['latest']:.1f}°C")

Time Series Data Export and Analysis

import redis
import csv
import json
from datetime import datetime

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def export_timeseries_data():
    """Export time series data to different formats"""
    
    # Get data for export
    end_time = int(time.time() * 1000)
    start_time = end_time - (24 * 3600000)  # Last 24 hours
    
    temp_data = r.ts().range("temperature:sensor1", start_time, end_time)
    humidity_data = r.ts().range("humidity:sensor1", start_time, end_time)
    
    # Export to CSV
    with open('/tmp/sensor_data.csv', 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['timestamp', 'datetime', 'temperature', 'humidity'])
        
        # Combine data by timestamp
        temp_dict = {ts: temp for ts, temp in temp_data}
        humidity_dict = {ts: humidity for ts, humidity in humidity_data}
        
        all_timestamps = sorted(set(temp_dict.keys()) | set(humidity_dict.keys()))
        
        for timestamp in all_timestamps:
            dt = datetime.fromtimestamp(timestamp / 1000)
            temp = temp_dict.get(timestamp, '')
            humidity = humidity_dict.get(timestamp, '')
            writer.writerow([timestamp, dt.isoformat(), temp, humidity])
    
    print("Exported data to /tmp/sensor_data.csv")
    
    # Export to JSON
    export_data = {
        "metadata": {
            "export_time": datetime.now().isoformat(),
            "start_time": datetime.fromtimestamp(start_time / 1000).isoformat(),
            "end_time": datetime.fromtimestamp(end_time / 1000).isoformat(),
            "sensor_id": "sensor1",
            "location": "server_room"
        },
        "data": {
            "temperature": [
                {"timestamp": ts, "value": temp} for ts, temp in temp_data
            ],
            "humidity": [
                {"timestamp": ts, "value": humidity} for ts, humidity in humidity_data
            ]
        }
    }
    
    with open('/tmp/sensor_data.json', 'w') as jsonfile:
        json.dump(export_data, jsonfile, indent=2)
    
    print("Exported data to /tmp/sensor_data.json")

def analyze_sensor_patterns():
    """Analyze patterns in sensor data"""
    end_time = int(time.time() * 1000)
    start_time = end_time - (7 * 24 * 3600000)  # Last 7 days
    
    # Get hourly averages for pattern analysis
    hourly_temps = r.ts().range(
        "temperature:sensor1",
        start_time,
        end_time,
        aggregation_type="AVG",
        bucket_size_msec=3600000  # 1 hour
    )
    
    if not hourly_temps:
        print("No data available for analysis")
        return
    
    # Analyze by hour of day
    hourly_patterns = {}
    for timestamp, temp in hourly_temps:
        dt = datetime.fromtimestamp(timestamp / 1000)
        hour = dt.hour
        
        if hour not in hourly_patterns:
            hourly_patterns[hour] = []
        hourly_patterns[hour].append(temp)
    
    print("Temperature patterns by hour of day:")
    for hour in sorted(hourly_patterns.keys()):
        temps = hourly_patterns[hour]
        avg_temp = sum(temps) / len(temps)
        min_temp = min(temps)
        max_temp = max(temps)
        print(f"  {hour:02d}:00 - Avg: {avg_temp:.1f}°C, Range: {min_temp:.1f}-{max_temp:.1f}°C")

export_timeseries_data()
analyze_sensor_patterns()

Install with Tessl CLI

npx tessl i tessl/pypi-redis

docs

modules

redis-bloom.md

redis-json.md

redis-search.md

redis-timeseries.md

async-support.md

cluster-support.md

connection-management.md

core-client.md

distributed-locking.md

error-handling.md

high-availability.md

index.md

pipelines-transactions.md

pubsub-messaging.md

tile.json