CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pybreaker

Python implementation of the Circuit Breaker pattern for handling failing subsystems gracefully

Pending
Overview
Eval results
Files

async.mddocs/

Async Support

PyBreaker provides optional support for protecting asynchronous operations using the Tornado web framework. This allows circuit breaker functionality to be applied to async functions and coroutines.

Requirements

Async support requires the tornado package to be installed:

pip install tornado

You can check if async support is available:

import pybreaker
if pybreaker.HAS_TORNADO_SUPPORT:
    print("Async support is available")
else:
    print("Install tornado for async support")

Capabilities

Async Function Protection

Protects asynchronous functions and coroutines with circuit breaker logic using Tornado's coroutine system.

def call_async(self, func, *args, **kwargs):
    """
    Call async func with circuit breaker protection.
    
    Args:
        func: The async function/coroutine to call
        *args: Positional arguments for the function
        **kwargs: Keyword arguments for the function
        
    Returns:
        Tornado Future/coroutine result
        
    Raises:
        ImportError: If tornado package is not available
        CircuitBreakerError: When circuit is open (if throw_new_error_on_trip=True)
    """

Async Decorator Usage

Using the circuit breaker as a decorator for async functions with special parameter.

def __call__(self, *call_args: Any, **call_kwargs: bool) -> Callable:
    """
    Decorator interface with async support.
    
    Args:
        *call_args: Function to decorate (when used as @decorator)
        **call_kwargs: Keyword arguments, set __pybreaker_call_async=True for async protection
        
    Returns:
        Decorator that applies circuit breaker protection to async functions
    """

Usage Examples

Basic Async Protection

import pybreaker
from tornado import gen
import tornado.ioloop

@gen.coroutine
def async_database_call():
    # Simulate async database operation
    yield gen.sleep(0.1)
    # Potentially failing operation
    raise gen.Return("database_result")

# Create circuit breaker
breaker = pybreaker.CircuitBreaker(fail_max=3, reset_timeout=30)

@gen.coroutine
def main():
    try:
        # Protect async call
        result = yield breaker.call_async(async_database_call)
        print(f"Result: {result}")
    except Exception as e:
        print(f"Failed: {e}")

# Run with Tornado
tornado.ioloop.IOLoop.current().run_sync(main)

Async Decorator Usage

import pybreaker
from tornado import gen

breaker = pybreaker.CircuitBreaker(fail_max=5, reset_timeout=60)

# Use decorator with async flag
@breaker(__pybreaker_call_async=True)
@gen.coroutine
def async_api_call(endpoint, data):
    """Make async API call with circuit breaker protection."""
    # Async HTTP request logic
    yield gen.sleep(0.2)  # Simulate network delay
    raise gen.Return({"status": "success", "data": data})

@gen.coroutine
def main():
    try:
        result = yield async_api_call("/users", {"name": "Alice"})
        print(result)
    except pybreaker.CircuitBreakerError:
        print("Circuit breaker is open!")
    except Exception as e:
        print(f"API call failed: {e}")

tornado.ioloop.IOLoop.current().run_sync(main)

Async with HTTP Client

import pybreaker
from tornado import gen
from tornado.httpclient import AsyncHTTPClient
import tornado.ioloop

class AsyncServiceClient:
    def __init__(self):
        self.http_client = AsyncHTTPClient()
        self.breaker = pybreaker.CircuitBreaker(
            fail_max=3,
            reset_timeout=30,
            name="external_service"
        )
    
    @gen.coroutine
    def fetch_data(self, url):
        """Fetch data with circuit breaker protection."""
        try:
            response = yield self.breaker.call_async(
                self.http_client.fetch, 
                url, 
                request_timeout=10
            )
            raise gen.Return(response.body)
        except pybreaker.CircuitBreakerError:
            # Circuit is open, return cached data or error
            raise gen.Return(None)

client = AsyncServiceClient()

@gen.coroutine
def main():
    data = yield client.fetch_data("http://api.example.com/data")
    if data:
        print(f"Received data: {data}")
    else:
        print("Service unavailable")

tornado.ioloop.IOLoop.current().run_sync(main)

Async with Event Listeners

import pybreaker
from tornado import gen
import logging

logger = logging.getLogger(__name__)

class AsyncMetricsListener(pybreaker.CircuitBreakerListener):
    def __init__(self):
        self.async_call_count = 0
        self.async_failure_count = 0
    
    def before_call(self, cb, func, *args, **kwargs):
        if hasattr(func, '__name__') and 'async' in func.__name__:
            self.async_call_count += 1
            logger.info(f"Async call #{self.async_call_count} to {cb.name}")
    
    def failure(self, cb, exc):
        self.async_failure_count += 1
        logger.warning(f"Async failure #{self.async_failure_count} in {cb.name}: {exc}")
    
    def state_change(self, cb, old_state, new_state):
        logger.info(f"Async circuit {cb.name} state: {old_state} -> {new_state}")

breaker = pybreaker.CircuitBreaker(name="async_service")
breaker.add_listener(AsyncMetricsListener())

@breaker(__pybreaker_call_async=True)
@gen.coroutine
def async_operation():
    yield gen.sleep(0.1)
    # Simulate occasional failures
    import random
    if random.random() < 0.3:
        raise Exception("Random failure")
    raise gen.Return("success")

@gen.coroutine
def main():
    for i in range(10):
        try:
            result = yield async_operation()
            print(f"Call {i}: {result}")
        except Exception as e:
            print(f"Call {i} failed: {e}")
        yield gen.sleep(0.5)

tornado.ioloop.IOLoop.current().run_sync(main)

Async with Redis Storage

import pybreaker
from tornado import gen
import redis
import tornado.ioloop

# Setup Redis storage for distributed async operations
redis_client = redis.Redis(host='localhost', port=6379, db=0)
storage = pybreaker.CircuitRedisStorage(
    state=pybreaker.STATE_CLOSED,
    redis_object=redis_client,
    namespace="async_services"
)

breaker = pybreaker.CircuitBreaker(
    fail_max=5,
    reset_timeout=60,
    state_storage=storage,
    name="distributed_async_service"
)

@gen.coroutine
def async_distributed_operation(data):
    """Async operation that shares state across processes."""
    yield gen.sleep(0.2)
    # Process data
    raise gen.Return(f"processed: {data}")

@gen.coroutine
def main():
    operations = ["data1", "data2", "data3", "data4", "data5"]
    
    # Run multiple async operations
    results = yield [
        breaker.call_async(async_distributed_operation, data)
        for data in operations
    ]
    
    for i, result in enumerate(results):
        print(f"Operation {i}: {result}")

tornado.ioloop.IOLoop.current().run_sync(main)

Error Handling in Async Context

import pybreaker
from tornado import gen
import tornado.ioloop

breaker = pybreaker.CircuitBreaker(
    fail_max=2,
    reset_timeout=10,
    throw_new_error_on_trip=True
)

@gen.coroutine
def failing_async_operation():
    yield gen.sleep(0.1)
    raise Exception("Service temporarily unavailable")

@gen.coroutine
def main():
    for attempt in range(5):
        try:
            result = yield breaker.call_async(failing_async_operation)
            print(f"Attempt {attempt}: Success - {result}")
        except pybreaker.CircuitBreakerError:
            print(f"Attempt {attempt}: Circuit breaker is open")
        except Exception as e:
            print(f"Attempt {attempt}: Operation failed - {e}")
        
        # Check circuit state
        print(f"Circuit state: {breaker.current_state}, Failures: {breaker.fail_counter}")
        yield gen.sleep(1)

tornado.ioloop.IOLoop.current().run_sync(main)

Important Notes

Tornado Dependency

  • Async support requires Tornado to be installed
  • Import errors will occur if Tornado is not available when using async features
  • The HAS_TORNADO_SUPPORT flag in pybreaker indicates availability

Compatibility

  • Works with Tornado's @gen.coroutine decorators
  • Compatible with Tornado's Future and IOLoop systems
  • Can be used in Tornado web applications and standalone async applications

Performance

  • Async circuit breakers share the same thread-safe locking as synchronous versions
  • Redis storage works well with async operations for distributed state management
  • Event listeners work normally with async operations

Best Practices

  • Use meaningful names for async circuit breakers to distinguish them in monitoring
  • Consider using Redis storage for async operations that need to coordinate across processes
  • Implement proper error handling for both CircuitBreakerError and original exceptions
  • Use event listeners to monitor async circuit breaker behavior in production

Install with Tessl CLI

npx tessl i tessl/pypi-pybreaker

docs

async.md

circuit-breaker.md

index.md

listeners.md

storage.md

tile.json