CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-vine

Python promises library providing specialized promise-based asynchronous operations and lazy evaluation

Pending
Overview
Eval results
Files

synchronization.mddocs/

Synchronization

Synchronization primitives for coordinating multiple promises and waiting for all to complete. The barrier class provides a way to execute a callback only after a collection of promises have been fulfilled.

Capabilities

Barrier Creation

Create synchronization barriers for coordinating multiple promises.

class barrier:
    def __init__(self, promises=None, args=None, kwargs=None, callback=None, size=None):
        """
        Create a synchronization barrier.

        Parameters:
        - promises: iterable, collection of promises to wait for
        - args: tuple, arguments to pass to callback when all promises complete
        - kwargs: dict, keyword arguments to pass to callback
        - callback: callable or promise, function to execute when barrier completes
        - size: int, expected number of promises (auto-calculated if promises provided)
        """

Usage Examples:

from vine import barrier, promise

# Create barrier with existing promises
p1 = promise(lambda: "task1")
p2 = promise(lambda: "task2") 
p3 = promise(lambda: "task3")

b = barrier([p1, p2, p3], callback=lambda: print("All tasks complete!"))

# Fulfill promises in any order
p2()
p1() 
p3()  # Prints: All tasks complete!

Adding Promises

Add promises to the barrier before finalization.

def add(self, promise):
    """
    Add a promise to the barrier and increment expected size.
    
    Parameters:
    - promise: promise object to add to the barrier
    
    Raises:
    - ValueError: if barrier is already fulfilled
    """

def add_noincr(self, promise):
    """
    Add promise to barrier without incrementing expected size.
    
    Parameters: 
    - promise: promise object to add to the barrier
    
    Raises:
    - ValueError: if barrier is already fulfilled
    """

Usage Examples:

# Start with empty barrier
b = barrier()

# Add promises dynamically
p1 = promise(fetch_data, args=("url1",))
p2 = promise(fetch_data, args=("url2",))

b.add(p1)
b.add(p2)

# Set completion callback
b.then(lambda: print("All data fetched!"))

# Add more promises before finalization
p3 = promise(fetch_data, args=("url3",))
b.add(p3)

b.finalize()  # No more promises can be added

Barrier Completion

Control when the barrier considers itself complete.

def __call__(self, *args, **kwargs):
    """
    Mark one promise as complete. Called automatically by fulfilled promises.
    
    Internal method - promises call this when they complete.
    """

def finalize(self):
    """
    Finalize the barrier, preventing addition of more promises.
    
    If current promise count meets expected size, triggers completion immediately.
    """

Usage Examples:

# Barrier with predetermined size
b = barrier(size=3)
b.then(lambda: print("3 promises completed!"))

# Add promises up to the size
b.add(promise1)
b.add(promise2) 
b.add(promise3)

# Must finalize before promises can trigger completion
b.finalize()

# Now fulfill promises
promise1()
promise2() 
promise3()  # Triggers barrier completion

Barrier Chaining

Chain callbacks to execute when the barrier completes.

def then(self, callback, errback=None):
    """
    Chain callback to execute when barrier completes.
    
    Parameters:
    - callback: callable or promise, function to execute on completion
    - errback: callable or promise, error handler for exceptions
    
    Returns:
    None (delegates to internal promise)
    """

Usage Examples:

# Chain multiple callbacks
b = barrier([p1, p2, p3])

b.then(lambda: print("First callback"))
b.then(lambda: print("Second callback"))
b.then(lambda: send_notification("All complete"))

# Chain with error handling
b.then(
    success_callback,
    errback=lambda exc: print(f"Barrier failed: {exc}")
)

Error Handling

Handle exceptions that occur in barrier promises.

def throw(self, *args, **kwargs):
    """
    Throw exception through the barrier.
    
    Parameters:
    - *args: arguments to pass to underlying promise throw()
    - **kwargs: keyword arguments to pass to underlying promise throw()
    """

# throw1 is an alias for throw
throw1 = throw

Usage Examples:

# Handle errors in barrier promises
def error_handler(exc):
    print(f"Barrier failed with: {exc}")

b = barrier([p1, p2, p3])
b.then(success_callback, errback=error_handler)

# If any promise fails, error handler is called
p1.throw(RuntimeError("Network error"))

Barrier Cancellation

Cancel the barrier and stop waiting for promises.

def cancel(self):
    """
    Cancel the barrier and underlying promise.
    
    Stops the barrier from completing even if all promises fulfill.
    """

Usage Examples:

# Cancel barrier if taking too long
import threading

b = barrier([long_running_promise1, long_running_promise2])
b.then(lambda: print("Tasks completed"))

# Cancel after timeout
def timeout_cancel():
    print("Barrier timed out, cancelling...")
    b.cancel()

timer = threading.Timer(30.0, timeout_cancel)
timer.start()

Barrier Properties

Access barrier state and information.

@property
def ready(self) -> bool:
    """True when barrier has completed (all promises fulfilled)."""

@property
def finalized(self) -> bool:
    """True when barrier is finalized (no more promises can be added)."""

@property
def cancelled(self) -> bool:
    """True when barrier has been cancelled."""

@property
def size(self) -> int:
    """Expected number of promises for barrier completion."""

@property  
def failed(self) -> bool:
    """True when barrier failed due to promise exception."""

@property
def reason(self) -> Exception:
    """Exception that caused barrier failure."""

Usage Examples:

b = barrier([p1, p2, p3])

print(f"Waiting for {b.size} promises")
print(f"Finalized: {b.finalized}")  # True
print(f"Ready: {b.ready}")          # False

# Fulfill promises
p1()
p2()

print(f"Ready: {b.ready}")          # False (still waiting for p3)

p3()

print(f"Ready: {b.ready}")          # True

Advanced Usage

Dynamic Barrier Management

# Barrier that grows dynamically
b = barrier()

def add_more_work():
    if more_work_needed():
        new_promise = promise(do_additional_work)
        b.add(new_promise)
        new_promise.then(add_more_work)  # Check for even more work
    
    # Only finalize when we know no more work is needed
    if no_more_work():
        b.finalize()

# Start the process
initial_promise = promise(initial_work)
b.add(initial_promise)
initial_promise.then(add_more_work)
initial_promise()

Barrier with Results Collection

# Collect results from all promises
results = []

def collect_result(result):
    results.append(result)

def all_complete():
    print(f"All results: {results}")

p1 = promise(lambda: "result1")
p2 = promise(lambda: "result2")
p3 = promise(lambda: "result3")

# Chain each promise to collect its result
p1.then(collect_result)
p2.then(collect_result)
p3.then(collect_result)

# Use barrier to know when all are complete
b = barrier([p1, p2, p3], callback=all_complete)

# Execute promises
p1()
p2()
p3()  # Prints: All results: ['result1', 'result2', 'result3']

Install with Tessl CLI

npx tessl i tessl/pypi-vine

docs

abstract.md

index.md

promises.md

synchronization.md

utilities.md

tile.json