Python promises library providing specialized promise-based asynchronous operations and lazy evaluation
—
Synchronization primitives for coordinating multiple promises and waiting for all to complete. The barrier class provides a way to execute a callback only after a collection of promises have been fulfilled.
Create synchronization barriers for coordinating multiple promises.
class barrier:
def __init__(self, promises=None, args=None, kwargs=None, callback=None, size=None):
"""
Create a synchronization barrier.
Parameters:
- promises: iterable, collection of promises to wait for
- args: tuple, arguments to pass to callback when all promises complete
- kwargs: dict, keyword arguments to pass to callback
- callback: callable or promise, function to execute when barrier completes
- size: int, expected number of promises (auto-calculated if promises provided)
"""Usage Examples:
from vine import barrier, promise
# Create barrier with existing promises
p1 = promise(lambda: "task1")
p2 = promise(lambda: "task2")
p3 = promise(lambda: "task3")
b = barrier([p1, p2, p3], callback=lambda: print("All tasks complete!"))
# Fulfill promises in any order
p2()
p1()
p3() # Prints: All tasks complete!Add promises to the barrier before finalization.
def add(self, promise):
"""
Add a promise to the barrier and increment expected size.
Parameters:
- promise: promise object to add to the barrier
Raises:
- ValueError: if barrier is already fulfilled
"""
def add_noincr(self, promise):
"""
Add promise to barrier without incrementing expected size.
Parameters:
- promise: promise object to add to the barrier
Raises:
- ValueError: if barrier is already fulfilled
"""Usage Examples:
# Start with empty barrier
b = barrier()
# Add promises dynamically
p1 = promise(fetch_data, args=("url1",))
p2 = promise(fetch_data, args=("url2",))
b.add(p1)
b.add(p2)
# Set completion callback
b.then(lambda: print("All data fetched!"))
# Add more promises before finalization
p3 = promise(fetch_data, args=("url3",))
b.add(p3)
b.finalize() # No more promises can be addedControl when the barrier considers itself complete.
def __call__(self, *args, **kwargs):
"""
Mark one promise as complete. Called automatically by fulfilled promises.
Internal method - promises call this when they complete.
"""
def finalize(self):
"""
Finalize the barrier, preventing addition of more promises.
If current promise count meets expected size, triggers completion immediately.
"""Usage Examples:
# Barrier with predetermined size
b = barrier(size=3)
b.then(lambda: print("3 promises completed!"))
# Add promises up to the size
b.add(promise1)
b.add(promise2)
b.add(promise3)
# Must finalize before promises can trigger completion
b.finalize()
# Now fulfill promises
promise1()
promise2()
promise3() # Triggers barrier completionChain callbacks to execute when the barrier completes.
def then(self, callback, errback=None):
"""
Chain callback to execute when barrier completes.
Parameters:
- callback: callable or promise, function to execute on completion
- errback: callable or promise, error handler for exceptions
Returns:
None (delegates to internal promise)
"""Usage Examples:
# Chain multiple callbacks
b = barrier([p1, p2, p3])
b.then(lambda: print("First callback"))
b.then(lambda: print("Second callback"))
b.then(lambda: send_notification("All complete"))
# Chain with error handling
b.then(
success_callback,
errback=lambda exc: print(f"Barrier failed: {exc}")
)Handle exceptions that occur in barrier promises.
def throw(self, *args, **kwargs):
"""
Throw exception through the barrier.
Parameters:
- *args: arguments to pass to underlying promise throw()
- **kwargs: keyword arguments to pass to underlying promise throw()
"""
# throw1 is an alias for throw
throw1 = throwUsage Examples:
# Handle errors in barrier promises
def error_handler(exc):
print(f"Barrier failed with: {exc}")
b = barrier([p1, p2, p3])
b.then(success_callback, errback=error_handler)
# If any promise fails, error handler is called
p1.throw(RuntimeError("Network error"))Cancel the barrier and stop waiting for promises.
def cancel(self):
"""
Cancel the barrier and underlying promise.
Stops the barrier from completing even if all promises fulfill.
"""Usage Examples:
# Cancel barrier if taking too long
import threading
b = barrier([long_running_promise1, long_running_promise2])
b.then(lambda: print("Tasks completed"))
# Cancel after timeout
def timeout_cancel():
print("Barrier timed out, cancelling...")
b.cancel()
timer = threading.Timer(30.0, timeout_cancel)
timer.start()Access barrier state and information.
@property
def ready(self) -> bool:
"""True when barrier has completed (all promises fulfilled)."""
@property
def finalized(self) -> bool:
"""True when barrier is finalized (no more promises can be added)."""
@property
def cancelled(self) -> bool:
"""True when barrier has been cancelled."""
@property
def size(self) -> int:
"""Expected number of promises for barrier completion."""
@property
def failed(self) -> bool:
"""True when barrier failed due to promise exception."""
@property
def reason(self) -> Exception:
"""Exception that caused barrier failure."""Usage Examples:
b = barrier([p1, p2, p3])
print(f"Waiting for {b.size} promises")
print(f"Finalized: {b.finalized}") # True
print(f"Ready: {b.ready}") # False
# Fulfill promises
p1()
p2()
print(f"Ready: {b.ready}") # False (still waiting for p3)
p3()
print(f"Ready: {b.ready}") # True# Barrier that grows dynamically
b = barrier()
def add_more_work():
if more_work_needed():
new_promise = promise(do_additional_work)
b.add(new_promise)
new_promise.then(add_more_work) # Check for even more work
# Only finalize when we know no more work is needed
if no_more_work():
b.finalize()
# Start the process
initial_promise = promise(initial_work)
b.add(initial_promise)
initial_promise.then(add_more_work)
initial_promise()# Collect results from all promises
results = []
def collect_result(result):
results.append(result)
def all_complete():
print(f"All results: {results}")
p1 = promise(lambda: "result1")
p2 = promise(lambda: "result2")
p3 = promise(lambda: "result3")
# Chain each promise to collect its result
p1.then(collect_result)
p2.then(collect_result)
p3.then(collect_result)
# Use barrier to know when all are complete
b = barrier([p1, p2, p3], callback=all_complete)
# Execute promises
p1()
p2()
p3() # Prints: All results: ['result1', 'result2', 'result3']Install with Tessl CLI
npx tessl i tessl/pypi-vine