tessl install tessl/pypi-aioredis@2.0.0asyncio (PEP 3156) Redis support
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.01x
Baseline
Agent success rate without this tile
97%
Build a real-time notification service that implements a publish-subscribe messaging system using Redis. The service should handle multiple notification channels and support both exact channel matching and pattern-based subscriptions.
Implement a NotificationPublisher class that:
The publisher should have:
publish(channel: str, message: str) -> int method that sends a message to a channel and returns the number of subscribers that received itclose() method for cleanupImplement a NotificationSubscriber class that:
The subscriber should have:
subscribe(*channels: str) method to subscribe to specific channelspsubscribe(*patterns: str) method to subscribe to channel patternsget_message(timeout: float = None) -> dict method that retrieves the next message (returns None on timeout)unsubscribe(*channels: str) method to unsubscribe from specific channelsclose() method for cleanupMessages received should be dictionaries with at least:
type: The message type (e.g., "message", "pmessage", "subscribe", "psubscribe")channel: The channel name (for exact matches) or matched channel (for patterns)data: The message content (for actual messages)pattern: The subscription pattern (for pattern-based messages only)Provides async Redis client functionality for publish-subscribe messaging.
Create test file test_notifications.py with the following test cases:
async def test_basic_pubsub():
"""Test basic message publishing and subscribing"""
publisher = NotificationPublisher()
subscriber = NotificationSubscriber()
await subscriber.subscribe("alerts")
# Publish a message
count = await publisher.publish("alerts", "Server is down")
assert count == 1 # One subscriber received it
# Receive the message
msg = await subscriber.get_message(timeout=1.0)
assert msg is not None
assert msg["type"] == "message"
assert msg["channel"] == "alerts"
assert msg["data"] == "Server is down"
await subscriber.close()
await publisher.close()async def test_pattern_subscription():
"""Test subscribing to channel patterns"""
publisher = NotificationPublisher()
subscriber = NotificationSubscriber()
await subscriber.psubscribe("alerts.*")
# Publish to matching channels
await publisher.publish("alerts.urgent", "Critical error")
await publisher.publish("alerts.info", "System update")
# Receive first message
msg1 = await subscriber.get_message(timeout=1.0)
assert msg1 is not None
assert msg1["type"] == "pmessage"
assert msg1["pattern"] == "alerts.*"
assert msg1["channel"] in ["alerts.urgent", "alerts.info"]
# Receive second message
msg2 = await subscriber.get_message(timeout=1.0)
assert msg2 is not None
assert msg2["type"] == "pmessage"
await subscriber.close()
await publisher.close()async def test_unsubscribe():
"""Test unsubscribing from channels"""
publisher = NotificationPublisher()
subscriber = NotificationSubscriber()
await subscriber.subscribe("news", "updates")
await subscriber.unsubscribe("news")
# Publish to unsubscribed channel
count = await publisher.publish("news", "Breaking news")
assert count == 0 # No subscribers
# Publish to still-subscribed channel
count = await publisher.publish("updates", "New version")
assert count == 1
msg = await subscriber.get_message(timeout=1.0)
assert msg["channel"] == "updates"
await subscriber.close()
await publisher.close()