Fast, simple object-to-object and broadcast signaling
npx @tessl/cli install tessl/pypi-blinker@1.9.0Fast, simple object-to-object and broadcast signaling system that allows any number of interested parties to subscribe to events, or "signals". Blinker provides a lightweight, thread-safe implementation of the observer pattern for Python applications, enabling decoupled communication between different parts of an application.
pip install blinkerimport blinkerCommon usage imports:
from blinker import signal, Signal, ANYImport all public components:
from blinker import signal, Signal, NamedSignal, Namespace, default_namespace, ANYComplete typing imports for advanced usage:
import typing as t
import collections.abc as c
import weakref
from contextlib import contextmanager
from functools import cached_property
from typing import Any, Callable, ClassVar, Coroutine, Generator, Hashable, TypeVar
# Type variables and aliases
F = t.TypeVar("F", bound=c.Callable[..., t.Any])
T = t.TypeVar("T")from blinker import signal
# Create a named signal
started = signal('round-started')
# Connect receivers
def each_round(sender, **kwargs):
print(f"Round {kwargs.get('round_num', '?')}")
def round_two_only(sender, **kwargs):
print("This is round two.")
# Connect receivers
started.connect(each_round)
started.connect(round_two_only, sender=2) # Only for sender=2
# Send signals
for round_num in range(1, 4):
started.send(round_num, round_num=round_num)
# Output:
# Round 1
# Round 2
# This is round two.
# Round 3Blinker's core components work together to provide flexible signal dispatching:
The library supports both anonymous signals (Signal instances) and named signals (managed by Namespace), with automatic cleanup via weak references to prevent memory leaks.
Create and manage signal instances for event dispatching. Supports both anonymous signals and named signals organized in namespaces.
class Signal:
"""A notification emitter."""
def __init__(self, doc: str | None = None): ...
def signal(name: str, doc: str | None = None) -> NamedSignal:
"""Return a NamedSignal in default_namespace with the given name."""
class NamedSignal(Signal):
"""A named generic notification emitter."""
def __init__(self, name: str, doc: str | None = None): ...
name: str # The name of this signal
class Namespace(dict[str, NamedSignal]):
"""A dict mapping names to signals."""
def signal(self, name: str, doc: str | None = None) -> NamedSignal:
"""Return the NamedSignal for the given name, creating it if required."""Connect callable receivers to signals with flexible sender filtering and automatic cleanup options.
F = TypeVar("F", bound=Callable[..., Any])
def connect(self, receiver: F, sender: Any = ANY, weak: bool = True) -> F:
"""
Connect receiver to be called when the signal is sent by sender.
Parameters:
- receiver: The callable to call when send() is called with the given sender,
passing sender as a positional argument along with any extra keyword arguments
- sender: Any object or ANY. receiver will only be called when send() is called
with this sender. If ANY, the receiver will be called for any sender
- weak: Track the receiver with a weakref. The receiver will be automatically
disconnected when it is garbage collected. When connecting a receiver defined
within a function, set to False, otherwise it will be disconnected when the
function scope ends
Returns:
The receiver function (for decorator chaining)
"""
def connect_via(self, sender: Any, weak: bool = False) -> Callable[[F], F]:
"""
Connect the decorated function to be called when the signal is sent by sender.
The decorated function will be called when send() is called with the given sender,
passing sender as a positional argument along with any extra keyword arguments.
Parameters:
- sender: Any object or ANY. receiver will only be called when send() is called
with this sender. If ANY, the receiver will be called for any sender
- weak: Track the receiver with a weakref. The receiver will be automatically
disconnected when it is garbage collected. When connecting a receiver defined
within a function, set to False, otherwise it will be disconnected when the
function scope ends
Returns:
Decorator function
"""
@contextmanager
def connected_to(self, receiver: Callable[..., Any], sender: Any = ANY) -> Generator[None, None, None]:
"""
A context manager that temporarily connects receiver to the signal while a with
block executes. When the block exits, the receiver is disconnected. Useful for tests.
Parameters:
- receiver: The callable to call when send() is called with the given sender,
passing sender as a positional argument along with any extra keyword arguments
- sender: Any object or ANY. receiver will only be called when send() is called
with this sender. If ANY, the receiver will be called for any sender
Usage:
with signal.connected_to(my_receiver):
# receiver is connected
signal.send("test")
# receiver is automatically disconnected
"""Send signals to connected receivers with support for both synchronous and asynchronous execution patterns.
def send(
self,
sender: Any | None = None,
/,
*,
_async_wrapper: Callable[
[Callable[..., Coroutine[Any, Any, Any]]], Callable[..., Any]
] | None = None,
**kwargs: Any,
) -> list[tuple[Callable[..., Any], Any]]:
"""
Call all receivers that are connected to the given sender or ANY. Each receiver
is called with sender as a positional argument along with any extra keyword
arguments. Return a list of (receiver, return value) tuples.
The order receivers are called is undefined, but can be influenced by setting
set_class.
If a receiver raises an exception, that exception will propagate up. This makes
debugging straightforward, with an assumption that correctly implemented receivers
will not raise.
Parameters:
- sender: Call receivers connected to this sender, in addition to those connected to ANY
- _async_wrapper: Will be called on any receivers that are async coroutines to turn
them into sync callables. For example, could run the receiver with an event loop
- **kwargs: Extra keyword arguments to pass to each receiver
Returns:
List of (receiver, return_value) tuples
"""
async def send_async(
self,
sender: Any | None = None,
/,
*,
_sync_wrapper: Callable[
[Callable[..., Any]], Callable[..., Coroutine[Any, Any, Any]]
] | None = None,
**kwargs: Any,
) -> list[tuple[Callable[..., Any], Any]]:
"""
Await all receivers that are connected to the given sender or ANY. Each receiver
is called with sender as a positional argument along with any extra keyword
arguments. Return a list of (receiver, return value) tuples.
The order receivers are called is undefined, but can be influenced by setting
set_class.
If a receiver raises an exception, that exception will propagate up. This makes
debugging straightforward, with an assumption that correctly implemented receivers
will not raise.
Parameters:
- sender: Call receivers connected to this sender, in addition to those connected to ANY
- _sync_wrapper: Will be called on any receivers that are sync callables to turn them
into async coroutines. For example, could call the receiver in a thread
- **kwargs: Extra keyword arguments to pass to each receiver
Returns:
List of (receiver, return_value) tuples
"""Query and manage connected receivers with support for weak reference cleanup and batch disconnection.
def has_receivers_for(self, sender: Any) -> bool:
"""
Check if there is at least one receiver that will be called with the given sender.
A receiver connected to ANY will always be called, regardless of sender. Does not
check if weakly referenced receivers are still live. See receivers_for for a
stronger search.
Parameters:
- sender: Check for receivers connected to this sender, in addition to those
connected to ANY
Returns:
True if receivers exist for sender or ANY
"""
def receivers_for(self, sender: Any) -> Generator[Callable[..., Any], None, None]:
"""
Yield each receiver to be called for sender, in addition to those to be called
for ANY. Weakly referenced receivers that are not live will be disconnected and
skipped.
Parameters:
- sender: Yield receivers connected to this sender, in addition to those
connected to ANY
Yields:
Callable receivers (weak references resolved automatically)
"""
def disconnect(self, receiver: Callable[..., Any], sender: Any = ANY) -> None:
"""
Disconnect receiver from being called when the signal is sent by sender.
Parameters:
- receiver: A connected receiver callable
- sender: Disconnect from only this sender. By default, disconnect from all senders
"""Control signal behavior and inspect signal state for debugging and testing purposes.
@contextmanager
def muted(self) -> Generator[None, None, None]:
"""
A context manager that temporarily disables the signal. No receivers will be
called if the signal is sent, until the with block exits. Useful for tests.
Usage:
with signal.muted():
signal.send("test") # No receivers called
"""
# Instance attributes for introspection
receivers: dict[Any, weakref.ref[Callable[..., Any]] | Callable[..., Any]]
"""The map of connected receivers. Useful to quickly check if any receivers are
connected to the signal: if s.receivers:. The structure and data is not part of
the public API, but checking its boolean value is."""
is_muted: bool
"""Whether signal is currently muted."""
@cached_property
def receiver_connected(self) -> Signal:
"""Emitted at the end of each connect() call.
The signal sender is the signal instance, and the connect() arguments are passed
through: receiver, sender, and weak.
"""
@cached_property
def receiver_disconnected(self) -> Signal:
"""Emitted at the end of each disconnect() call.
The sender is the signal instance, and the disconnect() arguments are passed
through: receiver and sender.
This signal is emitted only when disconnect() is called explicitly. This signal
cannot be emitted by an automatic disconnect when a weakly referenced receiver or
sender goes out of scope, as the instance is no longer be available to be used as
the sender for this signal.
"""
# Class attributes for customization
set_class: type[set[Any]] = set
"""The set class to use for tracking connected receivers and senders. Python's set
is unordered. If receivers must be dispatched in the order they were connected, an
ordered set implementation can be used."""
ANY = ANY
"""An alias for the ANY sender symbol."""
# Testing and cleanup methods
def _clear_state(self) -> None:
"""Disconnect all receivers and senders. Useful for tests."""
def _cleanup_bookkeeping(self) -> None:
"""Prune unused sender/receiver bookkeeping. Not threadsafe.
Connecting & disconnecting leaves behind a small amount of bookkeeping data.
Typical workloads using Blinker, for example in most web apps, Flask, CLI scripts,
etc., are not adversely affected by this bookkeeping.
With a long-running process performing dynamic signal routing with high volume,
e.g. connecting to function closures, senders are all unique object instances.
Doing all of this over and over may cause memory usage to grow due to extraneous
bookkeeping. (An empty set for each stale sender/receiver pair.)
This method will prune that bookkeeping away, with the caveat that such pruning
is not threadsafe. The risk is that cleanup of a fully disconnected receiver/sender
pair occurs while another thread is connecting that same pair.
"""ANY: Symbol
"""Symbol for 'any sender' - receivers connected to ANY are called for all senders."""
default_namespace: Namespace
"""Default Namespace instance for creating named signals."""
class Symbol:
"""
A constant symbol, nicer than object(). Repeated calls return the same instance.
Usage:
>>> Symbol('foo') is Symbol('foo')
True
>>> Symbol('foo')
foo
"""
symbols: ClassVar[dict[str, Symbol]] = {}
def __new__(cls, name: str) -> Symbol: ...
def __init__(self, name: str) -> None: ...
def __repr__(self) -> str: ...
def __getnewargs__(self) -> tuple[Any, ...]: ...
name: str
def make_id(obj: object) -> Hashable:
"""
Get a stable identifier for a receiver or sender, to be used as a dict key
or in a set.
For bound methods, uses the id of the unbound function and instance.
For strings and ints, returns the value directly (stable hash).
For other types, assumes they are not hashable but will be the same instance.
"""
def make_ref(obj: T, callback: Callable[[ref[T]], None] | None = None) -> ref[T]:
"""
Create a weak reference to obj with optional callback.
For methods, uses WeakMethod for proper cleanup.
For other objects, uses standard weakref.ref.
Parameters:
- obj: Object to create weak reference to
- callback: Optional callback when reference is garbage collected
Returns:
Weak reference to the object
"""from blinker import signal, Namespace
# Using default namespace
user_logged_in = signal('user-logged-in')
user_logged_out = signal('user-logged-out')
# Using custom namespace
app_signals = Namespace()
request_started = app_signals.signal('request-started')
request_finished = app_signals.signal('request-finished')
@request_started.connect
def log_request_start(sender, **kwargs):
print(f"Request started: {kwargs}")from blinker import Signal
sig = Signal()
# Weak reference (default) - automatically disconnected when receiver is garbage collected
def temp_handler(sender, **kwargs):
print("Temporary handler")
sig.connect(temp_handler, weak=True) # weak=True is default
# Strong reference - receiver stays connected until explicitly disconnected
sig.connect(temp_handler, weak=False)from blinker import Signal
# Signal with multiple senders
data_changed = Signal()
class Model:
def __init__(self, name):
self.name = name
def update(self, **kwargs):
# Send with self as sender
data_changed.send(self, model=self.name, **kwargs)
# Connect receiver for specific sender
model1 = Model("users")
model2 = Model("products")
@data_changed.connect_via(model1)
def handle_user_changes(sender, **kwargs):
print(f"User model updated: {kwargs}")
@data_changed.connect # Receives from ANY sender
def handle_all_changes(sender, **kwargs):
print(f"Model {sender.name} updated: {kwargs}")import asyncio
from blinker import Signal
async_signal = Signal()
async def async_handler(sender, **kwargs):
await asyncio.sleep(0.1)
return f"Processed {kwargs}"
def sync_handler(sender, **kwargs):
return f"Sync processed {kwargs}"
async_signal.connect(async_handler)
async_signal.connect(sync_handler)
# Send to async receivers
async def send_async_example():
def sync_to_async(func):
async def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
results = await async_signal.send_async(
"test_sender",
_sync_wrapper=sync_to_async,
data="example"
)
for receiver, result in results:
print(f"{receiver.__name__}: {result}")
# asyncio.run(send_async_example())from blinker import signal
# Signal for testing
test_signal = signal('test-event')
def test_receiver(sender, **kwargs):
print(f"Test received: {kwargs}")
# Temporary connection for testing
with test_signal.connected_to(test_receiver):
test_signal.send("test_sender", message="hello")
# Muted signal for testing
with test_signal.muted():
test_signal.send("test_sender", message="ignored") # No outputfrom blinker import Signal, signal
import asyncio
# Error propagation in signal sending
error_signal = Signal()
def failing_receiver(sender, **kwargs):
raise ValueError("Something went wrong")
def safe_receiver(sender, **kwargs):
print("Safe receiver called")
error_signal.connect(failing_receiver)
error_signal.connect(safe_receiver)
try:
# First receiver will raise, preventing safe_receiver from being called
error_signal.send("test")
except ValueError as e:
print(f"Caught error: {e}")
# Handle async receiver errors
async def failing_async_receiver(sender, **kwargs):
raise RuntimeError("Async error")
async_signal = Signal()
async_signal.connect(failing_async_receiver)
# Mixing sync and async receivers
def sync_wrapper(func):
async def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
try:
async def test_async_error():
await async_signal.send_async("test", _sync_wrapper=sync_wrapper)
except RuntimeError as e:
print(f"Async error: {e}")
# Weak reference cleanup
def create_temporary_receiver():
def temp_receiver(sender, **kwargs):
print("Temporary receiver")
return temp_receiver
cleanup_signal = Signal()
temp_func = create_temporary_receiver()
cleanup_signal.connect(temp_func, weak=True)
# After temp_func goes out of scope, it will be automatically disconnected
del temp_func
# Signal will automatically clean up the weak referencefrom blinker import Signal, Namespace
# Event routing with multiple namespaces
user_events = Namespace()
system_events = Namespace()
user_login = user_events.signal('login')
user_logout = user_events.signal('logout')
system_startup = system_events.signal('startup')
# Global event handler
def global_event_handler(sender, **kwargs):
print(f"Global: {sender} sent {kwargs}")
# Connect to multiple signals
for sig in [user_login, user_logout, system_startup]:
sig.connect(global_event_handler)
# Conditional receivers
class EventFilter:
def __init__(self, allowed_senders):
self.allowed_senders = set(allowed_senders)
def filtered_receiver(self, sender, **kwargs):
if sender in self.allowed_senders:
print(f"Filtered: {sender} -> {kwargs}")
# Else ignore
filter_handler = EventFilter(['admin', 'system'])
user_login.connect(filter_handler.filtered_receiver)
# Send events
user_login.send('admin', action='login') # Will be processed
user_login.send('guest', action='login') # Will be ignored
# Signal chaining - one signal triggers another
chain_start = Signal()
chain_middle = Signal()
chain_end = Signal()
@chain_start.connect
def start_to_middle(sender, **kwargs):
chain_middle.send(sender, **kwargs)
@chain_middle.connect
def middle_to_end(sender, **kwargs):
chain_end.send(sender, **kwargs)
@chain_end.connect
def end_handler(sender, **kwargs):
print(f"Chain completed: {kwargs}")
# Trigger the chain
chain_start.send("initiator", data="test")