or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/aioredis@2.0.x
tile.json

tessl/pypi-aioredis

tessl install tessl/pypi-aioredis@2.0.0

asyncio (PEP 3156) Redis support

Agent Success

Agent success rate when using this tile

98%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.01x

Baseline

Agent success rate without this tile

97%

task.mdevals/scenario-5/

Real-Time Notification Service

Build a real-time notification service that implements a publish-subscribe messaging system using Redis. The service should handle multiple notification channels and support both exact channel matching and pattern-based subscriptions.

Functional Requirements { .functional-requirements }

Notification Publisher

Implement a NotificationPublisher class that:

  • Connects to Redis asynchronously
  • Publishes messages to specific notification channels
  • Supports closing the connection when done

The publisher should have:

  • An async publish(channel: str, message: str) -> int method that sends a message to a channel and returns the number of subscribers that received it
  • An async close() method for cleanup

Notification Subscriber

Implement a NotificationSubscriber class that:

  • Connects to Redis asynchronously
  • Subscribes to one or more specific channels
  • Subscribes to channel patterns (e.g., "alerts.*" to match "alerts.urgent", "alerts.info", etc.)
  • Receives and processes messages from subscribed channels
  • Unsubscribes from channels when needed
  • Supports closing the connection when done

The subscriber should have:

  • An async subscribe(*channels: str) method to subscribe to specific channels
  • An async psubscribe(*patterns: str) method to subscribe to channel patterns
  • An async get_message(timeout: float = None) -> dict method that retrieves the next message (returns None on timeout)
  • An async unsubscribe(*channels: str) method to unsubscribe from specific channels
  • An async close() method for cleanup

Message Format

Messages received should be dictionaries with at least:

  • type: The message type (e.g., "message", "pmessage", "subscribe", "psubscribe")
  • channel: The channel name (for exact matches) or matched channel (for patterns)
  • data: The message content (for actual messages)
  • pattern: The subscription pattern (for pattern-based messages only)

Dependencies { .dependencies }

aioredis { .dependency }

Provides async Redis client functionality for publish-subscribe messaging.

Testing Requirements { .testing }

Create test file test_notifications.py with the following test cases:

Test: Basic publish and subscribe @test

async def test_basic_pubsub():
    """Test basic message publishing and subscribing"""
    publisher = NotificationPublisher()
    subscriber = NotificationSubscriber()

    await subscriber.subscribe("alerts")

    # Publish a message
    count = await publisher.publish("alerts", "Server is down")
    assert count == 1  # One subscriber received it

    # Receive the message
    msg = await subscriber.get_message(timeout=1.0)
    assert msg is not None
    assert msg["type"] == "message"
    assert msg["channel"] == "alerts"
    assert msg["data"] == "Server is down"

    await subscriber.close()
    await publisher.close()

Test: Pattern-based subscription @test

async def test_pattern_subscription():
    """Test subscribing to channel patterns"""
    publisher = NotificationPublisher()
    subscriber = NotificationSubscriber()

    await subscriber.psubscribe("alerts.*")

    # Publish to matching channels
    await publisher.publish("alerts.urgent", "Critical error")
    await publisher.publish("alerts.info", "System update")

    # Receive first message
    msg1 = await subscriber.get_message(timeout=1.0)
    assert msg1 is not None
    assert msg1["type"] == "pmessage"
    assert msg1["pattern"] == "alerts.*"
    assert msg1["channel"] in ["alerts.urgent", "alerts.info"]

    # Receive second message
    msg2 = await subscriber.get_message(timeout=1.0)
    assert msg2 is not None
    assert msg2["type"] == "pmessage"

    await subscriber.close()
    await publisher.close()

Test: Unsubscribe functionality @test

async def test_unsubscribe():
    """Test unsubscribing from channels"""
    publisher = NotificationPublisher()
    subscriber = NotificationSubscriber()

    await subscriber.subscribe("news", "updates")
    await subscriber.unsubscribe("news")

    # Publish to unsubscribed channel
    count = await publisher.publish("news", "Breaking news")
    assert count == 0  # No subscribers

    # Publish to still-subscribed channel
    count = await publisher.publish("updates", "New version")
    assert count == 1

    msg = await subscriber.get_message(timeout=1.0)
    assert msg["channel"] == "updates"

    await subscriber.close()
    await publisher.close()

Implementation Notes

  • Use async/await syntax throughout
  • Handle connection lifecycle properly (connect, use, close)
  • Ensure proper cleanup of resources
  • The solution should work with Redis server running on localhost:6379
  • Messages should be encoded/decoded as strings (UTF-8)