Python implementation of the Circuit Breaker pattern for handling failing subsystems gracefully
—
PyBreaker provides optional support for protecting asynchronous operations using the Tornado web framework. This allows circuit breaker functionality to be applied to async functions and coroutines.
Async support requires the tornado package to be installed:
pip install tornadoYou can check if async support is available:
import pybreaker
if pybreaker.HAS_TORNADO_SUPPORT:
print("Async support is available")
else:
print("Install tornado for async support")Protects asynchronous functions and coroutines with circuit breaker logic using Tornado's coroutine system.
def call_async(self, func, *args, **kwargs):
"""
Call async func with circuit breaker protection.
Args:
func: The async function/coroutine to call
*args: Positional arguments for the function
**kwargs: Keyword arguments for the function
Returns:
Tornado Future/coroutine result
Raises:
ImportError: If tornado package is not available
CircuitBreakerError: When circuit is open (if throw_new_error_on_trip=True)
"""Using the circuit breaker as a decorator for async functions with special parameter.
def __call__(self, *call_args: Any, **call_kwargs: bool) -> Callable:
"""
Decorator interface with async support.
Args:
*call_args: Function to decorate (when used as @decorator)
**call_kwargs: Keyword arguments, set __pybreaker_call_async=True for async protection
Returns:
Decorator that applies circuit breaker protection to async functions
"""import pybreaker
from tornado import gen
import tornado.ioloop
@gen.coroutine
def async_database_call():
# Simulate async database operation
yield gen.sleep(0.1)
# Potentially failing operation
raise gen.Return("database_result")
# Create circuit breaker
breaker = pybreaker.CircuitBreaker(fail_max=3, reset_timeout=30)
@gen.coroutine
def main():
try:
# Protect async call
result = yield breaker.call_async(async_database_call)
print(f"Result: {result}")
except Exception as e:
print(f"Failed: {e}")
# Run with Tornado
tornado.ioloop.IOLoop.current().run_sync(main)import pybreaker
from tornado import gen
breaker = pybreaker.CircuitBreaker(fail_max=5, reset_timeout=60)
# Use decorator with async flag
@breaker(__pybreaker_call_async=True)
@gen.coroutine
def async_api_call(endpoint, data):
"""Make async API call with circuit breaker protection."""
# Async HTTP request logic
yield gen.sleep(0.2) # Simulate network delay
raise gen.Return({"status": "success", "data": data})
@gen.coroutine
def main():
try:
result = yield async_api_call("/users", {"name": "Alice"})
print(result)
except pybreaker.CircuitBreakerError:
print("Circuit breaker is open!")
except Exception as e:
print(f"API call failed: {e}")
tornado.ioloop.IOLoop.current().run_sync(main)import pybreaker
from tornado import gen
from tornado.httpclient import AsyncHTTPClient
import tornado.ioloop
class AsyncServiceClient:
def __init__(self):
self.http_client = AsyncHTTPClient()
self.breaker = pybreaker.CircuitBreaker(
fail_max=3,
reset_timeout=30,
name="external_service"
)
@gen.coroutine
def fetch_data(self, url):
"""Fetch data with circuit breaker protection."""
try:
response = yield self.breaker.call_async(
self.http_client.fetch,
url,
request_timeout=10
)
raise gen.Return(response.body)
except pybreaker.CircuitBreakerError:
# Circuit is open, return cached data or error
raise gen.Return(None)
client = AsyncServiceClient()
@gen.coroutine
def main():
data = yield client.fetch_data("http://api.example.com/data")
if data:
print(f"Received data: {data}")
else:
print("Service unavailable")
tornado.ioloop.IOLoop.current().run_sync(main)import pybreaker
from tornado import gen
import logging
logger = logging.getLogger(__name__)
class AsyncMetricsListener(pybreaker.CircuitBreakerListener):
def __init__(self):
self.async_call_count = 0
self.async_failure_count = 0
def before_call(self, cb, func, *args, **kwargs):
if hasattr(func, '__name__') and 'async' in func.__name__:
self.async_call_count += 1
logger.info(f"Async call #{self.async_call_count} to {cb.name}")
def failure(self, cb, exc):
self.async_failure_count += 1
logger.warning(f"Async failure #{self.async_failure_count} in {cb.name}: {exc}")
def state_change(self, cb, old_state, new_state):
logger.info(f"Async circuit {cb.name} state: {old_state} -> {new_state}")
breaker = pybreaker.CircuitBreaker(name="async_service")
breaker.add_listener(AsyncMetricsListener())
@breaker(__pybreaker_call_async=True)
@gen.coroutine
def async_operation():
yield gen.sleep(0.1)
# Simulate occasional failures
import random
if random.random() < 0.3:
raise Exception("Random failure")
raise gen.Return("success")
@gen.coroutine
def main():
for i in range(10):
try:
result = yield async_operation()
print(f"Call {i}: {result}")
except Exception as e:
print(f"Call {i} failed: {e}")
yield gen.sleep(0.5)
tornado.ioloop.IOLoop.current().run_sync(main)import pybreaker
from tornado import gen
import redis
import tornado.ioloop
# Setup Redis storage for distributed async operations
redis_client = redis.Redis(host='localhost', port=6379, db=0)
storage = pybreaker.CircuitRedisStorage(
state=pybreaker.STATE_CLOSED,
redis_object=redis_client,
namespace="async_services"
)
breaker = pybreaker.CircuitBreaker(
fail_max=5,
reset_timeout=60,
state_storage=storage,
name="distributed_async_service"
)
@gen.coroutine
def async_distributed_operation(data):
"""Async operation that shares state across processes."""
yield gen.sleep(0.2)
# Process data
raise gen.Return(f"processed: {data}")
@gen.coroutine
def main():
operations = ["data1", "data2", "data3", "data4", "data5"]
# Run multiple async operations
results = yield [
breaker.call_async(async_distributed_operation, data)
for data in operations
]
for i, result in enumerate(results):
print(f"Operation {i}: {result}")
tornado.ioloop.IOLoop.current().run_sync(main)import pybreaker
from tornado import gen
import tornado.ioloop
breaker = pybreaker.CircuitBreaker(
fail_max=2,
reset_timeout=10,
throw_new_error_on_trip=True
)
@gen.coroutine
def failing_async_operation():
yield gen.sleep(0.1)
raise Exception("Service temporarily unavailable")
@gen.coroutine
def main():
for attempt in range(5):
try:
result = yield breaker.call_async(failing_async_operation)
print(f"Attempt {attempt}: Success - {result}")
except pybreaker.CircuitBreakerError:
print(f"Attempt {attempt}: Circuit breaker is open")
except Exception as e:
print(f"Attempt {attempt}: Operation failed - {e}")
# Check circuit state
print(f"Circuit state: {breaker.current_state}, Failures: {breaker.fail_counter}")
yield gen.sleep(1)
tornado.ioloop.IOLoop.current().run_sync(main)HAS_TORNADO_SUPPORT flag in pybreaker indicates availability@gen.coroutine decoratorsInstall with Tessl CLI
npx tessl i tessl/pypi-pybreaker