Python client for Redis database and key-value store
—
RedisTimeSeries provides time series data structure support for Redis, enabling efficient storage, querying, and analysis of time-stamped data. It supports downsampling, aggregations, and real-time analytics for monitoring and IoT applications.
Create and configure time series with retention policies and labels.
def ts_create(
self,
key: str,
retention_msecs: Optional[int] = None,
uncompressed: bool = False,
labels: Optional[Dict[str, str]] = None,
duplicate_policy: Optional[str] = None
) -> str: ...
def ts_del(
self,
key: str,
from_timestamp: int,
to_timestamp: int
) -> int: ...
def ts_alter(
self,
key: str,
retention_msecs: Optional[int] = None,
labels: Optional[Dict[str, str]] = None,
duplicate_policy: Optional[str] = None
) -> str: ...
def ts_info(self, key: str) -> Dict[str, Any]: ...Add time series data points with timestamps and values.
def ts_add(
self,
key: str,
timestamp: Union[int, str],
value: float,
retention_msecs: Optional[int] = None,
uncompressed: bool = False,
labels: Optional[Dict[str, str]] = None,
duplicate_policy: Optional[str] = None
) -> int: ...
def ts_madd(
self,
*args: Tuple[str, Union[int, str], float]
) -> List[int]: ...
def ts_incrby(
self,
key: str,
value: float,
timestamp: Optional[Union[int, str]] = None,
retention_msecs: Optional[int] = None,
uncompressed: bool = False,
labels: Optional[Dict[str, str]] = None
) -> int: ...
def ts_decrby(
self,
key: str,
value: float,
timestamp: Optional[Union[int, str]] = None,
retention_msecs: Optional[int] = None,
uncompressed: bool = False,
labels: Optional[Dict[str, str]] = None
) -> int: ...Query time series data with range queries and aggregations.
def ts_get(self, key: str) -> Optional[Tuple[int, float]]: ...
def ts_mget(
self,
filters: List[str],
with_labels: bool = False
) -> List[Dict[str, Any]]: ...
def ts_range(
self,
key: str,
from_timestamp: Union[int, str],
to_timestamp: Union[int, str],
count: Optional[int] = None,
aggregation_type: Optional[str] = None,
bucket_size_msec: Optional[int] = None,
with_labels: bool = False,
filter_by_ts: Optional[List[int]] = None,
filter_by_min_value: Optional[float] = None,
filter_by_max_value: Optional[float] = None
) -> List[Tuple[int, float]]: ...
def ts_revrange(
self,
key: str,
from_timestamp: Union[int, str],
to_timestamp: Union[int, str],
count: Optional[int] = None,
aggregation_type: Optional[str] = None,
bucket_size_msec: Optional[int] = None,
with_labels: bool = False,
filter_by_ts: Optional[List[int]] = None,
filter_by_min_value: Optional[float] = None,
filter_by_max_value: Optional[float] = None
) -> List[Tuple[int, float]]: ...
def ts_mrange(
self,
from_timestamp: Union[int, str],
to_timestamp: Union[int, str],
filters: List[str],
count: Optional[int] = None,
aggregation_type: Optional[str] = None,
bucket_size_msec: Optional[int] = None,
with_labels: bool = False,
filter_by_ts: Optional[List[int]] = None,
filter_by_min_value: Optional[float] = None,
filter_by_max_value: Optional[float] = None,
groupby: Optional[str] = None,
reduce: Optional[str] = None
) -> List[Dict[str, Any]]: ...
def ts_mrevrange(
self,
from_timestamp: Union[int, str],
to_timestamp: Union[int, str],
filters: List[str],
count: Optional[int] = None,
aggregation_type: Optional[str] = None,
bucket_size_msec: Optional[int] = None,
with_labels: bool = False,
filter_by_ts: Optional[List[int]] = None,
filter_by_min_value: Optional[float] = None,
filter_by_max_value: Optional[float] = None,
groupby: Optional[str] = None,
reduce: Optional[str] = None
) -> List[Dict[str, Any]]: ...Create compaction rules for automatic downsampling and aggregation.
def ts_createrule(
self,
source_key: str,
dest_key: str,
aggregation_type: str,
bucket_size_msec: int
) -> str: ...
def ts_deleterule(
self,
source_key: str,
dest_key: str
) -> str: ...Query time series metadata and perform administrative operations.
def ts_queryindex(self, filters: List[str]) -> List[str]: ...import redis
import time
from datetime import datetime, timedelta
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Create time series for temperature monitoring
def create_temperature_series():
# Create time series with retention and labels
result = r.ts().create(
"temperature:sensor1",
retention_msecs=86400000, # 24 hours retention
labels={
"sensor_id": "sensor1",
"location": "server_room",
"type": "temperature"
}
)
print(f"Created temperature series: {result}")
# Create series for humidity
r.ts().create(
"humidity:sensor1",
retention_msecs=86400000,
labels={
"sensor_id": "sensor1",
"location": "server_room",
"type": "humidity"
}
)
print("Created humidity series")
create_temperature_series()
# Add data points
def add_sensor_data():
current_time = int(time.time() * 1000) # Current timestamp in milliseconds
# Add individual data points
temp_timestamp = r.ts().add("temperature:sensor1", current_time, 23.5)
humidity_timestamp = r.ts().add("humidity:sensor1", current_time, 65.2)
print(f"Added temperature reading at {temp_timestamp}")
print(f"Added humidity reading at {humidity_timestamp}")
# Add multiple data points at once
sensor_data = [
("temperature:sensor1", current_time + 1000, 23.7),
("humidity:sensor1", current_time + 1000, 64.8),
("temperature:sensor1", current_time + 2000, 24.1),
("humidity:sensor1", current_time + 2000, 66.1)
]
timestamps = r.ts().madd(*sensor_data)
print(f"Batch added data points: {timestamps}")
add_sensor_data()import redis
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def query_sensor_data():
# Get latest values
latest_temp = r.ts().get("temperature:sensor1")
latest_humidity = r.ts().get("humidity:sensor1")
if latest_temp:
timestamp, value = latest_temp
dt = datetime.fromtimestamp(timestamp / 1000)
print(f"Latest temperature: {value}°C at {dt}")
if latest_humidity:
timestamp, value = latest_humidity
dt = datetime.fromtimestamp(timestamp / 1000)
print(f"Latest humidity: {value}% at {dt}")
# Query data range (last hour)
end_time = int(time.time() * 1000)
start_time = end_time - (60 * 60 * 1000) # 1 hour ago
temp_data = r.ts().range(
"temperature:sensor1",
start_time,
end_time,
count=100
)
print(f"Temperature readings in last hour: {len(temp_data)} points")
for timestamp, value in temp_data[-5:]: # Last 5 readings
dt = datetime.fromtimestamp(timestamp / 1000)
print(f" {dt}: {value}°C")
query_sensor_data()import redis
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def advanced_analytics():
end_time = int(time.time() * 1000)
start_time = end_time - (24 * 60 * 60 * 1000) # 24 hours ago
# Get average temperature per hour
hourly_avg = r.ts().range(
"temperature:sensor1",
start_time,
end_time,
aggregation_type="AVG",
bucket_size_msec=3600000 # 1 hour buckets
)
print("Hourly average temperatures:")
for timestamp, avg_temp in hourly_avg:
dt = datetime.fromtimestamp(timestamp / 1000)
print(f" {dt.strftime('%Y-%m-%d %H:00')}: {avg_temp:.2f}°C")
# Get min/max temperature in last 24 hours
min_temp = r.ts().range(
"temperature:sensor1",
start_time,
end_time,
aggregation_type="MIN",
bucket_size_msec=24 * 3600000 # 24 hour bucket
)
max_temp = r.ts().range(
"temperature:sensor1",
start_time,
end_time,
aggregation_type="MAX",
bucket_size_msec=24 * 3600000
)
if min_temp and max_temp:
print(f"24h Min temperature: {min_temp[0][1]:.2f}°C")
print(f"24h Max temperature: {max_temp[0][1]:.2f}°C")
advanced_analytics()import redis
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def multi_sensor_queries():
# Get latest values from all sensors
latest_readings = r.ts().mget(
filters=["location=server_room"],
with_labels=True
)
print("Latest readings from all server room sensors:")
for reading in latest_readings:
key = reading.get('key', 'unknown')
labels = reading.get('labels', {})
value_timestamp = reading.get('value', [None, None])
if value_timestamp[0] is not None:
timestamp, value = value_timestamp
dt = datetime.fromtimestamp(timestamp / 1000)
sensor_type = labels.get('type', 'unknown')
print(f" {sensor_type}: {value} at {dt}")
# Query range data from multiple series
end_time = int(time.time() * 1000)
start_time = end_time - (3600000) # 1 hour ago
multi_range_data = r.ts().mrange(
start_time,
end_time,
filters=["location=server_room"],
aggregation_type="AVG",
bucket_size_msec=600000, # 10-minute buckets
with_labels=True
)
print("\nAverage readings per 10 minutes (last hour):")
for series in multi_range_data:
key = series.get('key', 'unknown')
labels = series.get('labels', {})
values = series.get('values', [])
sensor_type = labels.get('type', 'unknown')
print(f"\n{sensor_type} ({key}):")
for timestamp, value in values[-3:]: # Last 3 readings
dt = datetime.fromtimestamp(timestamp / 1000)
print(f" {dt.strftime('%H:%M')}: {value:.2f}")
multi_sensor_queries()import redis
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def setup_downsampling():
# Create destination series for downsampled data
r.ts().create(
"temperature:sensor1:hourly",
retention_msecs=30 * 24 * 3600000, # 30 days retention
labels={
"sensor_id": "sensor1",
"location": "server_room",
"type": "temperature",
"resolution": "hourly"
}
)
r.ts().create(
"temperature:sensor1:daily",
retention_msecs=365 * 24 * 3600000, # 1 year retention
labels={
"sensor_id": "sensor1",
"location": "server_room",
"type": "temperature",
"resolution": "daily"
}
)
# Create downsampling rules
# Raw data -> Hourly averages
r.ts().createrule(
"temperature:sensor1",
"temperature:sensor1:hourly",
"AVG",
3600000 # 1 hour
)
# Hourly data -> Daily averages
r.ts().createrule(
"temperature:sensor1:hourly",
"temperature:sensor1:daily",
"AVG",
24 * 3600000 # 24 hours
)
print("Created downsampling rules")
# Verify rules were created
info = r.ts().info("temperature:sensor1")
print(f"Rules for sensor1: {info.get('rules', [])}")
setup_downsampling()
def query_downsampled_data():
# Query different resolution data
end_time = int(time.time() * 1000)
# Last 7 days of daily averages
start_time = end_time - (7 * 24 * 3600000)
daily_data = r.ts().range("temperature:sensor1:daily", start_time, end_time)
print("Daily temperature averages (last 7 days):")
for timestamp, avg_temp in daily_data:
dt = datetime.fromtimestamp(timestamp / 1000)
print(f" {dt.strftime('%Y-%m-%d')}: {avg_temp:.2f}°C")
# Last 24 hours of hourly averages
start_time = end_time - (24 * 3600000)
hourly_data = r.ts().range("temperature:sensor1:hourly", start_time, end_time)
print("\nHourly temperature averages (last 24 hours):")
for timestamp, avg_temp in hourly_data[-6:]: # Last 6 hours
dt = datetime.fromtimestamp(timestamp / 1000)
print(f" {dt.strftime('%H:00')}: {avg_temp:.2f}°C")
# Wait a moment for rules to process, then query
time.sleep(1)
query_downsampled_data()import redis
import time
import threading
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
class TemperatureMonitor:
def __init__(self, redis_client):
self.r = redis_client
self.running = False
self.alert_threshold_high = 25.0
self.alert_threshold_low = 20.0
def add_reading(self, temperature):
"""Add a temperature reading and check for alerts"""
timestamp = r.ts().add("temperature:sensor1", "*", temperature)
# Check for alerts
if temperature > self.alert_threshold_high:
self.trigger_alert("HIGH", temperature, timestamp)
elif temperature < self.alert_threshold_low:
self.trigger_alert("LOW", temperature, timestamp)
return timestamp
def trigger_alert(self, alert_type, temperature, timestamp):
"""Trigger temperature alert"""
dt = datetime.fromtimestamp(timestamp / 1000)
print(f"🚨 ALERT: {alert_type} temperature {temperature}°C at {dt}")
# Store alert in separate time series
alert_series = f"alerts:temperature:sensor1"
if not self.series_exists(alert_series):
self.r.ts().create(
alert_series,
labels={
"sensor_id": "sensor1",
"type": "alert",
"metric": "temperature"
}
)
# Store alert with severity as value
severity = 2 if alert_type == "HIGH" else 1
self.r.ts().add(alert_series, timestamp, severity)
def series_exists(self, key):
"""Check if time series exists"""
try:
self.r.ts().info(key)
return True
except:
return False
def get_recent_stats(self, minutes=60):
"""Get statistics for recent data"""
end_time = int(time.time() * 1000)
start_time = end_time - (minutes * 60 * 1000)
# Get recent data
data = self.r.ts().range("temperature:sensor1", start_time, end_time)
if not data:
return None
temperatures = [value for _, value in data]
return {
"count": len(temperatures),
"min": min(temperatures),
"max": max(temperatures),
"avg": sum(temperatures) / len(temperatures),
"latest": temperatures[-1]
}
# Usage example
monitor = TemperatureMonitor(r)
# Simulate temperature readings with some alerts
test_temperatures = [21.5, 22.1, 23.8, 24.5, 25.8, 26.2, 24.1, 22.9, 19.5, 18.2, 21.0]
print("Simulating temperature readings...")
for temp in test_temperatures:
monitor.add_reading(temp)
time.sleep(0.1)
# Get statistics
stats = monitor.get_recent_stats(minutes=5)
if stats:
print(f"\nRecent temperature statistics:")
print(f" Count: {stats['count']} readings")
print(f" Min: {stats['min']:.1f}°C")
print(f" Max: {stats['max']:.1f}°C")
print(f" Average: {stats['avg']:.1f}°C")
print(f" Latest: {stats['latest']:.1f}°C")import redis
import csv
import json
from datetime import datetime
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def export_timeseries_data():
"""Export time series data to different formats"""
# Get data for export
end_time = int(time.time() * 1000)
start_time = end_time - (24 * 3600000) # Last 24 hours
temp_data = r.ts().range("temperature:sensor1", start_time, end_time)
humidity_data = r.ts().range("humidity:sensor1", start_time, end_time)
# Export to CSV
with open('/tmp/sensor_data.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['timestamp', 'datetime', 'temperature', 'humidity'])
# Combine data by timestamp
temp_dict = {ts: temp for ts, temp in temp_data}
humidity_dict = {ts: humidity for ts, humidity in humidity_data}
all_timestamps = sorted(set(temp_dict.keys()) | set(humidity_dict.keys()))
for timestamp in all_timestamps:
dt = datetime.fromtimestamp(timestamp / 1000)
temp = temp_dict.get(timestamp, '')
humidity = humidity_dict.get(timestamp, '')
writer.writerow([timestamp, dt.isoformat(), temp, humidity])
print("Exported data to /tmp/sensor_data.csv")
# Export to JSON
export_data = {
"metadata": {
"export_time": datetime.now().isoformat(),
"start_time": datetime.fromtimestamp(start_time / 1000).isoformat(),
"end_time": datetime.fromtimestamp(end_time / 1000).isoformat(),
"sensor_id": "sensor1",
"location": "server_room"
},
"data": {
"temperature": [
{"timestamp": ts, "value": temp} for ts, temp in temp_data
],
"humidity": [
{"timestamp": ts, "value": humidity} for ts, humidity in humidity_data
]
}
}
with open('/tmp/sensor_data.json', 'w') as jsonfile:
json.dump(export_data, jsonfile, indent=2)
print("Exported data to /tmp/sensor_data.json")
def analyze_sensor_patterns():
"""Analyze patterns in sensor data"""
end_time = int(time.time() * 1000)
start_time = end_time - (7 * 24 * 3600000) # Last 7 days
# Get hourly averages for pattern analysis
hourly_temps = r.ts().range(
"temperature:sensor1",
start_time,
end_time,
aggregation_type="AVG",
bucket_size_msec=3600000 # 1 hour
)
if not hourly_temps:
print("No data available for analysis")
return
# Analyze by hour of day
hourly_patterns = {}
for timestamp, temp in hourly_temps:
dt = datetime.fromtimestamp(timestamp / 1000)
hour = dt.hour
if hour not in hourly_patterns:
hourly_patterns[hour] = []
hourly_patterns[hour].append(temp)
print("Temperature patterns by hour of day:")
for hour in sorted(hourly_patterns.keys()):
temps = hourly_patterns[hour]
avg_temp = sum(temps) / len(temps)
min_temp = min(temps)
max_temp = max(temps)
print(f" {hour:02d}:00 - Avg: {avg_temp:.1f}°C, Range: {min_temp:.1f}-{max_temp:.1f}°C")
export_timeseries_data()
analyze_sensor_patterns()Install with Tessl CLI
npx tessl i tessl/pypi-redis