CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aioredis

asyncio (PEP 3156) Redis support

Overall
score

98%

Overview
Eval results
Files

task.mdevals/scenario-5/

Redis Response Parser

A custom response parser implementation for Redis commands that handles type conversion and custom formatting.

Requirements

Implement a Redis client wrapper that customizes command response parsing. The wrapper should:

  1. Allow registering custom parsers for specific commands via a callback registration method
  2. Apply registered parsers when executing commands
  3. Support configuring automatic response decoding (bytes to strings)
  4. Fall back to default behavior when no custom parser is registered

Test Cases

  • Setting a JSON string in Redis and getting it back should return a parsed Python dict when using a custom GET parser. @test
  • MGET should return a dict mapping keys to values when using a custom parser, not just a list. @test
  • When decode_responses is True, string responses should be decoded from bytes to str. @test
  • When decode_responses is False, responses should remain as bytes. @test
  • Commands without registered custom parsers should use default behavior. @test

Implementation

@generates

API

import aioredis
from typing import Optional, Dict, List, Any, Callable

class CustomRedisClient:
    """
    A Redis client wrapper that supports custom response parsing and decoding configuration.

    This client allows registering custom parsers for specific Redis commands and
    configuring automatic response decoding behavior.
    """

    def __init__(self, redis_url: str, decode_responses: bool = True):
        """
        Initialize the custom Redis client.

        Args:
            redis_url: Redis connection URL (e.g., "redis://localhost")
            decode_responses: Whether to automatically decode byte responses to strings
        """
        pass

    async def connect(self):
        """Establish connection to Redis server."""
        pass

    async def close(self):
        """Close the Redis connection."""
        pass

    def set_response_callback(self, command: str, callback: Callable[[Any], Any]):
        """
        Register a custom response parser for a specific Redis command.

        Args:
            command: The Redis command name (e.g., "GET", "HGETALL")
            callback: A function that takes the raw response and returns the parsed result
        """
        pass

    async def execute_command(self, command: str, *args) -> Any:
        """
        Execute a Redis command with custom response parsing.

        Args:
            command: The Redis command to execute
            *args: Arguments for the command

        Returns:
            The response, processed by any registered custom parser
        """
        pass

Dependencies { .dependencies }

aioredis { .dependency }

Provides asyncio Redis client support.

Install with Tessl CLI

npx tessl i tessl/pypi-aioredis

tile.json