Publisher-subscriber pattern implementation and middleware system for reactive programming. These utilities enable event-driven architectures and asynchronous data processing pipelines commonly used in form applications.
Complete publisher-subscriber implementation with filtering and notification control.
/**
* Publisher-subscriber class for reactive programming patterns
* Manages subscriptions, notifications, and filtering
*/
class Subscribable<Payload = any> {
/** Internal storage for subscribers and subscription configuration */
subscribers: {
index?: number;
[key: number]: Subscriber<Payload>;
};
subscription: Subscription<Payload>;
/**
* Subscribe to notifications
* @param callback - Function to call when notifications are sent
* @returns Subscription index for unsubscribing
*/
subscribe(callback?: Subscriber<Payload>): number | undefined;
/**
* Unsubscribe from notifications
* @param index - Subscription index to remove, or undefined to clear all
*/
unsubscribe(index?: number): void;
/**
* Notify all subscribers with payload
* @param payload - Data to send to subscribers
* @param silent - If true, skip notification (still runs subscription hooks)
*/
notify(payload?: Payload, silent?: boolean): void;
}
type Subscriber<S> = (payload: S) => void;
interface Subscription<S> {
/** Hook called before notifying subscribers, can prevent notification by returning false */
notify?: (payload: S) => void | boolean;
/** Filter function to transform payload before subscribers receive it */
filter?: (payload: S) => any;
}Usage Examples:
import { Subscribable } from "@formily/shared";
// Basic publisher-subscriber
const eventBus = new Subscribable<string>();
// Subscribe to events
const subscription1 = eventBus.subscribe((message) => {
console.log(`Subscriber 1: ${message}`);
});
const subscription2 = eventBus.subscribe((message) => {
console.log(`Subscriber 2: ${message.toUpperCase()}`);
});
// Notify subscribers
eventBus.notify("Hello World!");
// Output:
// Subscriber 1: Hello World!
// Subscriber 2: HELLO WORLD!
// Unsubscribe specific subscriber
eventBus.unsubscribe(subscription1);
eventBus.notify("Still listening");
// Output:
// Subscriber 2: STILL LISTENING
// Clear all subscriptions
eventBus.unsubscribe();
// Form field change events
interface FieldChangeEvent {
fieldName: string;
value: any;
oldValue: any;
}
const formEvents = new Subscribable<FieldChangeEvent>();
// Subscribe to field changes
formEvents.subscribe((event) => {
console.log(`Field ${event.fieldName} changed from ${event.oldValue} to ${event.value}`);
});
// Trigger field change
formEvents.notify({
fieldName: "email",
value: "user@example.com",
oldValue: ""
});import { Subscribable } from "@formily/shared";
// Custom subscription with filtering and notification control
const dataStream = new Subscribable<number>();
// Set up subscription configuration
dataStream.subscription = {
// Pre-notification hook - can prevent notification
notify: (value) => {
console.log(`Processing value: ${value}`);
if (value < 0) {
console.log("Negative values are not allowed");
return false; // Prevents notification to subscribers
}
return true; // Allow notification
},
// Filter payload before sending to subscribers
filter: (value) => {
return Math.round(value * 100) / 100; // Round to 2 decimal places
}
};
// Subscribe to filtered stream
dataStream.subscribe((value) => {
console.log(`Received filtered value: ${value}`);
});
// Test notifications
dataStream.notify(3.14159); // Will be rounded to 3.14
dataStream.notify(-5); // Will be blocked by notify hook
dataStream.notify(2.71828); // Will be rounded to 2.72
// Output:
// Processing value: 3.14159
// Received filtered value: 3.14
// Processing value: -5
// Negative values are not allowed
// Processing value: 2.71828
// Received filtered value: 2.72import { Subscribable, isEqual } from "@formily/shared";
interface FormState {
values: Record<string, any>;
errors: Record<string, string>;
touched: Record<string, boolean>;
}
class FormStateManager {
private stateStream = new Subscribable<FormState>();
private currentState: FormState = {
values: {},
errors: {},
touched: {}
};
constructor() {
// Set up state change filtering
this.stateStream.subscription = {
notify: (newState) => {
// Only notify if state actually changed
if (isEqual(this.currentState, newState)) {
return false; // Skip notification for identical state
}
this.currentState = { ...newState };
return true;
}
};
}
// Subscribe to state changes
subscribe(callback: (state: FormState) => void): number {
return this.stateStream.subscribe(callback);
}
// Update form state
updateState(newState: Partial<FormState>): void {
const updatedState = {
...this.currentState,
...newState
};
this.stateStream.notify(updatedState);
}
// Update specific field
setFieldValue(fieldName: string, value: any): void {
this.updateState({
values: { ...this.currentState.values, [fieldName]: value },
touched: { ...this.currentState.touched, [fieldName]: true }
});
}
// Set field error
setFieldError(fieldName: string, error: string): void {
this.updateState({
errors: { ...this.currentState.errors, [fieldName]: error }
});
}
}
// Usage
const formManager = new FormStateManager();
// Subscribe to form changes
formManager.subscribe((state) => {
console.log("Form state changed:", state);
// Update UI based on state
updateFormUI(state);
});
// Simulate form interactions
formManager.setFieldValue("email", "user@example.com");
formManager.setFieldValue("password", "secret123");
formManager.setFieldError("email", "Invalid email format");Composable middleware pattern for processing data through a chain of functions.
/**
* Middleware function interface
* @param payload - Data being processed
* @param next - Function to call the next middleware in chain
* @returns Promise resolving to processed result
*/
interface IMiddleware<Payload = any, Result = any> {
(payload: Payload, next: (payload?: Payload) => Result): Result;
}
/**
* Apply middleware chain to payload
* @param payload - Initial data to process
* @param fns - Array of middleware functions
* @returns Promise resolving to final processed result
*/
function applyMiddleware(payload: any, fns?: IMiddleware[]): Promise<any>;Usage Examples:
import { applyMiddleware, IMiddleware } from "@formily/shared";
// Form validation middleware
const requiredFieldsMiddleware: IMiddleware = (data, next) => {
const requiredFields = ["name", "email"];
const missing = requiredFields.filter(field => !data[field]);
if (missing.length > 0) {
throw new Error(`Missing required fields: ${missing.join(", ")}`);
}
return next(data);
};
const emailValidationMiddleware: IMiddleware = (data, next) => {
if (data.email && !data.email.includes("@")) {
throw new Error("Invalid email format");
}
return next(data);
};
const sanitizationMiddleware: IMiddleware = (data, next) => {
const sanitized = { ...data };
// Trim string values
for (const [key, value] of Object.entries(sanitized)) {
if (typeof value === "string") {
sanitized[key] = value.trim();
}
}
return next(sanitized);
};
// Apply middleware chain
async function processFormData(formData: Record<string, any>) {
try {
const result = await applyMiddleware(formData, [
sanitizationMiddleware,
requiredFieldsMiddleware,
emailValidationMiddleware
]);
console.log("Form data processed successfully:", result);
return result;
} catch (error) {
console.error("Form validation failed:", error.message);
throw error;
}
}
// Usage
processFormData({
name: " John Doe ",
email: "john@example.com",
age: 30
});
// Output: Form data processed successfully: { name: "John Doe", email: "john@example.com", age: 30 }
processFormData({
name: "Jane",
email: "invalid-email"
});
// Output: Form validation failed: Invalid email formatimport { applyMiddleware, IMiddleware } from "@formily/shared";
// Logging middleware
const loggingMiddleware: IMiddleware = (data, next) => {
console.log("Processing:", data);
const startTime = Date.now();
const result = next(data);
const endTime = Date.now();
console.log(`Processing completed in ${endTime - startTime}ms`);
return result;
};
// Transformation middleware
const transformationMiddleware: IMiddleware = (data, next) => {
const transformed = {
...data,
processedAt: new Date().toISOString(),
id: Math.random().toString(36).substr(2, 9)
};
return next(transformed);
};
// Async validation middleware
const asyncValidationMiddleware: IMiddleware = async (data, next) => {
if (data.email) {
// Simulate async email validation
const isEmailValid = await validateEmailAsync(data.email);
if (!isEmailValid) {
throw new Error("Email address is already in use");
}
}
return next(data);
};
async function validateEmailAsync(email: string): Promise<boolean> {
// Simulate API call
return new Promise(resolve => {
setTimeout(() => {
resolve(!email.includes("spam"));
}, 100);
});
}
// API data processing pipeline
async function processApiData(rawData: any) {
return await applyMiddleware(rawData, [
loggingMiddleware,
transformationMiddleware,
asyncValidationMiddleware
]);
}
// Error handling in middleware
const errorHandlingMiddleware: IMiddleware = (data, next) => {
try {
return next(data);
} catch (error) {
console.error("Middleware error:", error);
// Add error info to result
return {
...data,
error: error.message,
success: false
};
}
};
// Conditional middleware
const conditionalMiddleware: IMiddleware = (data, next) => {
if (data.skipValidation) {
// Skip to end of chain
return data;
}
return next(data);
};import { Subscribable, applyMiddleware, IMiddleware } from "@formily/shared";
// Combined reactive form system
class ReactiveForm {
private stateStream = new Subscribable<any>();
private validationMiddleware: IMiddleware[] = [];
// Add validation middleware
addValidation(middleware: IMiddleware): void {
this.validationMiddleware.push(middleware);
}
// Subscribe to form changes
onChange(callback: (state: any) => void): number {
return this.stateStream.subscribe(callback);
}
// Update form with middleware processing
async updateField(fieldName: string, value: any): Promise<void> {
const currentState = this.getCurrentState();
const newState = { ...currentState, [fieldName]: value };
try {
// Process through validation middleware
const validatedState = await applyMiddleware(newState, this.validationMiddleware);
// Notify subscribers of valid state
this.stateStream.notify(validatedState);
} catch (error) {
// Notify subscribers of error state
this.stateStream.notify({
...newState,
errors: { [fieldName]: error.message }
});
}
}
private getCurrentState(): any {
// Implementation would track current state
return {};
}
}The reactive patterns in @formily/shared provide a foundation for building complex, event-driven form applications with clean separation of concerns and flexible data processing pipelines.