or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-blinker

Fast, simple object-to-object and broadcast signaling

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/blinker@1.9.x

To install, run

npx @tessl/cli install tessl/pypi-blinker@1.9.0

index.mddocs/

Blinker

Fast, simple object-to-object and broadcast signaling system that allows any number of interested parties to subscribe to events, or "signals". Blinker provides a lightweight, thread-safe implementation of the observer pattern for Python applications, enabling decoupled communication between different parts of an application.

Package Information

  • Package Name: blinker
  • Language: Python
  • Installation: pip install blinker

Core Imports

import blinker

Common usage imports:

from blinker import signal, Signal, ANY

Import all public components:

from blinker import signal, Signal, NamedSignal, Namespace, default_namespace, ANY

Complete typing imports for advanced usage:

import typing as t
import collections.abc as c
import weakref
from contextlib import contextmanager
from functools import cached_property
from typing import Any, Callable, ClassVar, Coroutine, Generator, Hashable, TypeVar

# Type variables and aliases
F = t.TypeVar("F", bound=c.Callable[..., t.Any])
T = t.TypeVar("T")

Basic Usage

from blinker import signal

# Create a named signal
started = signal('round-started')

# Connect receivers
def each_round(sender, **kwargs):
    print(f"Round {kwargs.get('round_num', '?')}")

def round_two_only(sender, **kwargs):
    print("This is round two.")

# Connect receivers
started.connect(each_round)
started.connect(round_two_only, sender=2)  # Only for sender=2

# Send signals
for round_num in range(1, 4):
    started.send(round_num, round_num=round_num)
# Output:
# Round 1
# Round 2
# This is round two.
# Round 3

Architecture

Blinker's core components work together to provide flexible signal dispatching:

  • Signal: Core signal emitter that manages receivers and sends notifications
  • NamedSignal: Signal with an assigned name for use in namespaces
  • Namespace: Dictionary-like container for organizing related named signals
  • Weak References: Automatic cleanup of receivers when they go out of scope
  • Threading: Thread-safe operations for concurrent applications

The library supports both anonymous signals (Signal instances) and named signals (managed by Namespace), with automatic cleanup via weak references to prevent memory leaks.

Capabilities

Signal Creation and Management

Create and manage signal instances for event dispatching. Supports both anonymous signals and named signals organized in namespaces.

class Signal:
    """A notification emitter."""
    
    def __init__(self, doc: str | None = None): ...

def signal(name: str, doc: str | None = None) -> NamedSignal:
    """Return a NamedSignal in default_namespace with the given name."""

class NamedSignal(Signal):
    """A named generic notification emitter."""
    
    def __init__(self, name: str, doc: str | None = None): ...
    
    name: str  # The name of this signal

class Namespace(dict[str, NamedSignal]):
    """A dict mapping names to signals."""
    
    def signal(self, name: str, doc: str | None = None) -> NamedSignal:
        """Return the NamedSignal for the given name, creating it if required."""

Receiver Connection

Connect callable receivers to signals with flexible sender filtering and automatic cleanup options.

F = TypeVar("F", bound=Callable[..., Any])

def connect(self, receiver: F, sender: Any = ANY, weak: bool = True) -> F:
    """
    Connect receiver to be called when the signal is sent by sender.
    
    Parameters:
    - receiver: The callable to call when send() is called with the given sender,
        passing sender as a positional argument along with any extra keyword arguments
    - sender: Any object or ANY. receiver will only be called when send() is called
        with this sender. If ANY, the receiver will be called for any sender
    - weak: Track the receiver with a weakref. The receiver will be automatically
        disconnected when it is garbage collected. When connecting a receiver defined
        within a function, set to False, otherwise it will be disconnected when the
        function scope ends
    
    Returns:
    The receiver function (for decorator chaining)
    """

def connect_via(self, sender: Any, weak: bool = False) -> Callable[[F], F]:
    """
    Connect the decorated function to be called when the signal is sent by sender.
    
    The decorated function will be called when send() is called with the given sender,
    passing sender as a positional argument along with any extra keyword arguments.
    
    Parameters:
    - sender: Any object or ANY. receiver will only be called when send() is called
        with this sender. If ANY, the receiver will be called for any sender
    - weak: Track the receiver with a weakref. The receiver will be automatically
        disconnected when it is garbage collected. When connecting a receiver defined
        within a function, set to False, otherwise it will be disconnected when the
        function scope ends
    
    Returns:
    Decorator function
    """

@contextmanager
def connected_to(self, receiver: Callable[..., Any], sender: Any = ANY) -> Generator[None, None, None]:
    """
    A context manager that temporarily connects receiver to the signal while a with
    block executes. When the block exits, the receiver is disconnected. Useful for tests.
    
    Parameters:
    - receiver: The callable to call when send() is called with the given sender,
        passing sender as a positional argument along with any extra keyword arguments
    - sender: Any object or ANY. receiver will only be called when send() is called
        with this sender. If ANY, the receiver will be called for any sender
    
    Usage:
    with signal.connected_to(my_receiver):
        # receiver is connected
        signal.send("test")
    # receiver is automatically disconnected
    """

Signal Sending

Send signals to connected receivers with support for both synchronous and asynchronous execution patterns.

def send(
    self,
    sender: Any | None = None,
    /,
    *,
    _async_wrapper: Callable[
        [Callable[..., Coroutine[Any, Any, Any]]], Callable[..., Any]
    ] | None = None,
    **kwargs: Any,
) -> list[tuple[Callable[..., Any], Any]]:
    """
    Call all receivers that are connected to the given sender or ANY. Each receiver
    is called with sender as a positional argument along with any extra keyword
    arguments. Return a list of (receiver, return value) tuples.
    
    The order receivers are called is undefined, but can be influenced by setting
    set_class.
    
    If a receiver raises an exception, that exception will propagate up. This makes
    debugging straightforward, with an assumption that correctly implemented receivers
    will not raise.
    
    Parameters:
    - sender: Call receivers connected to this sender, in addition to those connected to ANY
    - _async_wrapper: Will be called on any receivers that are async coroutines to turn
        them into sync callables. For example, could run the receiver with an event loop
    - **kwargs: Extra keyword arguments to pass to each receiver
    
    Returns:
    List of (receiver, return_value) tuples
    """

async def send_async(
    self,
    sender: Any | None = None,
    /,
    *,
    _sync_wrapper: Callable[
        [Callable[..., Any]], Callable[..., Coroutine[Any, Any, Any]]
    ] | None = None,
    **kwargs: Any,
) -> list[tuple[Callable[..., Any], Any]]:
    """
    Await all receivers that are connected to the given sender or ANY. Each receiver
    is called with sender as a positional argument along with any extra keyword
    arguments. Return a list of (receiver, return value) tuples.
    
    The order receivers are called is undefined, but can be influenced by setting
    set_class.
    
    If a receiver raises an exception, that exception will propagate up. This makes
    debugging straightforward, with an assumption that correctly implemented receivers
    will not raise.
    
    Parameters:
    - sender: Call receivers connected to this sender, in addition to those connected to ANY
    - _sync_wrapper: Will be called on any receivers that are sync callables to turn them
        into async coroutines. For example, could call the receiver in a thread
    - **kwargs: Extra keyword arguments to pass to each receiver
    
    Returns:
    List of (receiver, return_value) tuples
    """

Receiver Discovery and Disconnection

Query and manage connected receivers with support for weak reference cleanup and batch disconnection.

def has_receivers_for(self, sender: Any) -> bool:
    """
    Check if there is at least one receiver that will be called with the given sender.
    A receiver connected to ANY will always be called, regardless of sender. Does not
    check if weakly referenced receivers are still live. See receivers_for for a
    stronger search.
    
    Parameters:
    - sender: Check for receivers connected to this sender, in addition to those
        connected to ANY
    
    Returns:
    True if receivers exist for sender or ANY
    """

def receivers_for(self, sender: Any) -> Generator[Callable[..., Any], None, None]:
    """
    Yield each receiver to be called for sender, in addition to those to be called
    for ANY. Weakly referenced receivers that are not live will be disconnected and
    skipped.
    
    Parameters:
    - sender: Yield receivers connected to this sender, in addition to those
        connected to ANY
    
    Yields:
    Callable receivers (weak references resolved automatically)
    """

def disconnect(self, receiver: Callable[..., Any], sender: Any = ANY) -> None:
    """
    Disconnect receiver from being called when the signal is sent by sender.
    
    Parameters:
    - receiver: A connected receiver callable
    - sender: Disconnect from only this sender. By default, disconnect from all senders
    """

Signal Control and Introspection

Control signal behavior and inspect signal state for debugging and testing purposes.

@contextmanager
def muted(self) -> Generator[None, None, None]:
    """
    A context manager that temporarily disables the signal. No receivers will be
    called if the signal is sent, until the with block exits. Useful for tests.
    
    Usage:
    with signal.muted():
        signal.send("test")  # No receivers called
    """

# Instance attributes for introspection
receivers: dict[Any, weakref.ref[Callable[..., Any]] | Callable[..., Any]]
"""The map of connected receivers. Useful to quickly check if any receivers are
connected to the signal: if s.receivers:. The structure and data is not part of
the public API, but checking its boolean value is."""

is_muted: bool
"""Whether signal is currently muted."""

@cached_property
def receiver_connected(self) -> Signal:
    """Emitted at the end of each connect() call.
    
    The signal sender is the signal instance, and the connect() arguments are passed
    through: receiver, sender, and weak.
    """

@cached_property  
def receiver_disconnected(self) -> Signal:
    """Emitted at the end of each disconnect() call.
    
    The sender is the signal instance, and the disconnect() arguments are passed
    through: receiver and sender.
    
    This signal is emitted only when disconnect() is called explicitly. This signal
    cannot be emitted by an automatic disconnect when a weakly referenced receiver or
    sender goes out of scope, as the instance is no longer be available to be used as
    the sender for this signal.
    """

# Class attributes for customization
set_class: type[set[Any]] = set
"""The set class to use for tracking connected receivers and senders. Python's set
is unordered. If receivers must be dispatched in the order they were connected, an
ordered set implementation can be used."""

ANY = ANY
"""An alias for the ANY sender symbol."""

# Testing and cleanup methods
def _clear_state(self) -> None:
    """Disconnect all receivers and senders. Useful for tests."""

def _cleanup_bookkeeping(self) -> None:
    """Prune unused sender/receiver bookkeeping. Not threadsafe.
    
    Connecting & disconnecting leaves behind a small amount of bookkeeping data.
    Typical workloads using Blinker, for example in most web apps, Flask, CLI scripts,
    etc., are not adversely affected by this bookkeeping.
    
    With a long-running process performing dynamic signal routing with high volume,
    e.g. connecting to function closures, senders are all unique object instances.
    Doing all of this over and over may cause memory usage to grow due to extraneous
    bookkeeping. (An empty set for each stale sender/receiver pair.)
    
    This method will prune that bookkeeping away, with the caveat that such pruning
    is not threadsafe. The risk is that cleanup of a fully disconnected receiver/sender
    pair occurs while another thread is connecting that same pair.
    """

Constants and Utilities

ANY: Symbol
"""Symbol for 'any sender' - receivers connected to ANY are called for all senders."""

default_namespace: Namespace  
"""Default Namespace instance for creating named signals."""

class Symbol:
    """
    A constant symbol, nicer than object(). Repeated calls return the same instance.
    
    Usage:
    >>> Symbol('foo') is Symbol('foo')
    True
    >>> Symbol('foo')
    foo
    """
    
    symbols: ClassVar[dict[str, Symbol]] = {}
    
    def __new__(cls, name: str) -> Symbol: ...
    def __init__(self, name: str) -> None: ...
    def __repr__(self) -> str: ...
    def __getnewargs__(self) -> tuple[Any, ...]: ...
    
    name: str

def make_id(obj: object) -> Hashable:
    """
    Get a stable identifier for a receiver or sender, to be used as a dict key
    or in a set.
    
    For bound methods, uses the id of the unbound function and instance.
    For strings and ints, returns the value directly (stable hash).
    For other types, assumes they are not hashable but will be the same instance.
    """

def make_ref(obj: T, callback: Callable[[ref[T]], None] | None = None) -> ref[T]:
    """
    Create a weak reference to obj with optional callback.
    
    For methods, uses WeakMethod for proper cleanup.
    For other objects, uses standard weakref.ref.
    
    Parameters:
    - obj: Object to create weak reference to
    - callback: Optional callback when reference is garbage collected
    
    Returns:
    Weak reference to the object
    """

Usage Examples

Named Signals with Namespaces

from blinker import signal, Namespace

# Using default namespace
user_logged_in = signal('user-logged-in')
user_logged_out = signal('user-logged-out')

# Using custom namespace
app_signals = Namespace()
request_started = app_signals.signal('request-started')
request_finished = app_signals.signal('request-finished')

@request_started.connect
def log_request_start(sender, **kwargs):
    print(f"Request started: {kwargs}")

Weak vs Strong References

from blinker import Signal

sig = Signal()

# Weak reference (default) - automatically disconnected when receiver is garbage collected
def temp_handler(sender, **kwargs):
    print("Temporary handler")

sig.connect(temp_handler, weak=True)  # weak=True is default

# Strong reference - receiver stays connected until explicitly disconnected
sig.connect(temp_handler, weak=False)

Advanced Signal Patterns

from blinker import Signal

# Signal with multiple senders
data_changed = Signal()

class Model:
    def __init__(self, name):
        self.name = name
    
    def update(self, **kwargs):
        # Send with self as sender
        data_changed.send(self, model=self.name, **kwargs)

# Connect receiver for specific sender
model1 = Model("users")
model2 = Model("products")

@data_changed.connect_via(model1)
def handle_user_changes(sender, **kwargs):
    print(f"User model updated: {kwargs}")

@data_changed.connect  # Receives from ANY sender
def handle_all_changes(sender, **kwargs):
    print(f"Model {sender.name} updated: {kwargs}")

Async Signal Handling

import asyncio
from blinker import Signal

async_signal = Signal()

async def async_handler(sender, **kwargs):
    await asyncio.sleep(0.1)
    return f"Processed {kwargs}"

def sync_handler(sender, **kwargs):
    return f"Sync processed {kwargs}"

async_signal.connect(async_handler)
async_signal.connect(sync_handler)

# Send to async receivers
async def send_async_example():
    def sync_to_async(func):
        async def wrapper(*args, **kwargs):
            return func(*args, **kwargs)
        return wrapper
    
    results = await async_signal.send_async(
        "test_sender", 
        _sync_wrapper=sync_to_async,
        data="example"
    )
    
    for receiver, result in results:
        print(f"{receiver.__name__}: {result}")

# asyncio.run(send_async_example())

Testing with Context Managers

from blinker import signal

# Signal for testing
test_signal = signal('test-event')

def test_receiver(sender, **kwargs):
    print(f"Test received: {kwargs}")

# Temporary connection for testing
with test_signal.connected_to(test_receiver):
    test_signal.send("test_sender", message="hello")

# Muted signal for testing
with test_signal.muted():
    test_signal.send("test_sender", message="ignored")  # No output

Error Handling and Edge Cases

from blinker import Signal, signal
import asyncio

# Error propagation in signal sending
error_signal = Signal()

def failing_receiver(sender, **kwargs):
    raise ValueError("Something went wrong")

def safe_receiver(sender, **kwargs):
    print("Safe receiver called")

error_signal.connect(failing_receiver)
error_signal.connect(safe_receiver)

try:
    # First receiver will raise, preventing safe_receiver from being called
    error_signal.send("test")
except ValueError as e:
    print(f"Caught error: {e}")

# Handle async receiver errors
async def failing_async_receiver(sender, **kwargs):
    raise RuntimeError("Async error")

async_signal = Signal()
async_signal.connect(failing_async_receiver)

# Mixing sync and async receivers
def sync_wrapper(func):
    async def wrapper(*args, **kwargs):
        return func(*args, **kwargs)
    return wrapper

try:
    async def test_async_error():
        await async_signal.send_async("test", _sync_wrapper=sync_wrapper)
except RuntimeError as e:
    print(f"Async error: {e}")

# Weak reference cleanup
def create_temporary_receiver():
    def temp_receiver(sender, **kwargs):
        print("Temporary receiver")
    return temp_receiver

cleanup_signal = Signal()
temp_func = create_temporary_receiver()
cleanup_signal.connect(temp_func, weak=True)

# After temp_func goes out of scope, it will be automatically disconnected
del temp_func
# Signal will automatically clean up the weak reference

Advanced Signal Routing Patterns

from blinker import Signal, Namespace

# Event routing with multiple namespaces
user_events = Namespace()
system_events = Namespace()

user_login = user_events.signal('login')
user_logout = user_events.signal('logout')
system_startup = system_events.signal('startup')

# Global event handler
def global_event_handler(sender, **kwargs):
    print(f"Global: {sender} sent {kwargs}")

# Connect to multiple signals
for sig in [user_login, user_logout, system_startup]:
    sig.connect(global_event_handler)

# Conditional receivers
class EventFilter:
    def __init__(self, allowed_senders):
        self.allowed_senders = set(allowed_senders)
    
    def filtered_receiver(self, sender, **kwargs):
        if sender in self.allowed_senders:
            print(f"Filtered: {sender} -> {kwargs}")
        # Else ignore

filter_handler = EventFilter(['admin', 'system'])
user_login.connect(filter_handler.filtered_receiver)

# Send events
user_login.send('admin', action='login')  # Will be processed
user_login.send('guest', action='login')  # Will be ignored

# Signal chaining - one signal triggers another
chain_start = Signal()
chain_middle = Signal()
chain_end = Signal()

@chain_start.connect
def start_to_middle(sender, **kwargs):
    chain_middle.send(sender, **kwargs)

@chain_middle.connect  
def middle_to_end(sender, **kwargs):
    chain_end.send(sender, **kwargs)

@chain_end.connect
def end_handler(sender, **kwargs):
    print(f"Chain completed: {kwargs}")

# Trigger the chain
chain_start.send("initiator", data="test")