or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

architect.mdbuild-context.mdbuilder-creation.mdindex.mdjob-system.mdnode-integration.mdprogress-reporting.mdtesting.md
tile.json

job-system.mddocs/

Job System

Lower-level job scheduling and execution infrastructure with observables for complex build workflows. The job system provides the foundation for the builder architecture, enabling advanced patterns like job dependencies, custom schedulers, and message-based communication.

Capabilities

Job Interface

Core interface representing a running job with observable streams for communication.

/**
 * A running job instance with observable communication
 */
interface Job<
  ArgumentT extends JsonValue = JsonValue,
  InputT extends JsonValue = JsonValue,
  OutputT extends JsonValue = JsonValue
> {
  /**
   * Job description resolved asynchronously
   */
  readonly description: Observable<JobDescription>;
  
  /**
   * Argument passed when scheduling the job
   */
  readonly argument: ArgumentT;
  
  /**
   * Input channel for sending data to the job
   */
  readonly input: Observer<InputT>;
  
  /**
   * Output stream from the job
   */
  readonly output: Observable<OutputT>;
  
  /**
   * Current state of the job execution
   */
  readonly state: JobState;
  
  /**
   * Get a named channel for additional communication
   * @param name - Channel name
   * @param schema - Optional JSON schema for validation
   * @returns Observable stream for the channel
   */
  getChannel<T extends JsonValue>(name: string, schema?: schema.JsonSchema): Observable<T>;
  
  /**
   * Ping the job and wait for response
   * @returns Observable that completes when pong is received
   */
  ping(): Observable<never>;
  
  /**
   * Stop the job execution
   */
  stop(): void;
  
  /**
   * Raw inbound message stream to the job
   */
  readonly inboundBus: Observer<JobInboundMessage<InputT>>;
  
  /**
   * Raw outbound message stream from the job
   */
  readonly outboundBus: Observable<JobOutboundMessage<OutputT>>;
}

/**
 * Job execution states
 */
enum JobState {
  Queued = "queued",
  Ready = "ready",
  Started = "started", 
  Ended = "ended",
  Errored = "errored"
}

/**
 * Job identifier type
 */
type JobName = string;

Usage Examples:

import { jobs } from "@angular-devkit/architect";

// Schedule a job
const job = scheduler.schedule("my-job", { config: "value" });

// Monitor job state
console.log(`Job state: ${job.state}`);

// Send input to job
job.input.next({ command: "process", data: [1, 2, 3] });

// Receive output from job
job.output.subscribe(output => {
  console.log("Job output:", output);
});

// Use custom channel
const progressChannel = job.getChannel<{ current: number; total: number }>("progress");
progressChannel.subscribe(progress => {
  console.log(`Progress: ${progress.current}/${progress.total}`);
});

// Ping job to check if alive
job.ping().subscribe({
  complete: () => console.log("Job responded to ping")
});

// Stop job when done
job.stop();

Scheduler Interface

Interface for scheduling and managing jobs.

/**
 * Interface for job scheduling and management
 */
interface Scheduler<
  MinimumArgumentValueT extends JsonValue = JsonValue,
  MinimumInputValueT extends JsonValue = JsonValue,
  MinimumOutputValueT extends JsonValue = JsonValue
> {
  /**
   * Get description for a named job
   * @param name - Job name to describe
   * @returns Observable of job description or null
   */
  getDescription(name: JobName): Observable<JobDescription | null>;
  
  /**
   * Check if a job name is registered
   * @param name - Job name to check
   * @returns Observable indicating if job exists
   */
  has(name: JobName): Observable<boolean>;
  
  /**
   * Pause job scheduling temporarily
   * @returns Function to resume scheduling
   */
  pause(): () => void;
  
  /**
   * Schedule a job for execution
   * @param name - Job name to schedule
   * @param argument - Argument to pass to job
   * @param options - Additional scheduling options
   * @returns Job instance
   */
  schedule<
    A extends MinimumArgumentValueT,
    I extends MinimumInputValueT,
    O extends MinimumOutputValueT
  >(
    name: JobName,
    argument: A,
    options?: ScheduleJobOptions
  ): Job<A, I, O>;
}

/**
 * Options for scheduling jobs
 */
interface ScheduleJobOptions {
  /**
   * Jobs that must complete before this job starts
   */
  dependencies?: Job | Job[];
}

Usage Examples:

import { jobs } from "@angular-devkit/architect";

// Create scheduler
const registry = new jobs.SimpleJobRegistry();
const scheduler = new jobs.SimpleScheduler(registry);

// Check if job exists
const hasJob = await scheduler.has("my-job").toPromise();
if (!hasJob) {
  console.log("Job not found");
}

// Get job description
const description = await scheduler.getDescription("my-job").toPromise();
if (description) {
  console.log(`Job: ${description.name}`);
}

// Schedule with dependencies
const dependency = scheduler.schedule("setup-job", {});
const mainJob = scheduler.schedule("main-job", { config: true }, {
  dependencies: [dependency]
});

// Pause and resume scheduling
const resume = scheduler.pause();
// ... do other work
resume(); // Resume scheduling

// Schedule multiple related jobs
const jobs = [
  scheduler.schedule("job-1", { step: 1 }),
  scheduler.schedule("job-2", { step: 2 }),
  scheduler.schedule("job-3", { step: 3 })
];

// Wait for all to complete
Promise.all(jobs.map(job => job.output.pipe(last()).toPromise()));

Job Handler

Function interface for implementing job logic.

/**
 * Function interface for job implementations
 */
interface JobHandler<
  ArgT extends JsonValue,
  InputT extends JsonValue,
  OutputT extends JsonValue
> {
  /**
   * Job implementation function  
   * @param argument - Argument passed when scheduling
   * @param context - Job execution context
   * @returns Observable stream of outbound messages
   */
  (
    argument: ArgT,
    context: JobHandlerContext<ArgT, InputT, OutputT>
  ): Observable<JobOutboundMessage<OutputT>>;
  
  /**
   * Job metadata
   */
  jobDescription: Partial<JobDescription>;
}

/**
 * Context provided to job handlers
 */
interface JobHandlerContext<
  MinimumArgumentValueT extends JsonValue = JsonValue,
  MinimumInputValueT extends JsonValue = JsonValue,
  MinimumOutputValueT extends JsonValue = JsonValue
> {
  /**
   * Job description information
   */
  readonly description: JobDescription;
  
  /**
   * Scheduler instance for scheduling dependent jobs
   */
  readonly scheduler: Scheduler<JsonValue, JsonValue, JsonValue>;
  
  /**
   * Dependent jobs that must complete first
   */
  readonly dependencies: Job<JsonValue, JsonValue, JsonValue>[];
  
  /**
   * Input message stream
   */
  readonly inboundBus: Observable<JobInboundMessage<MinimumInputValueT>>;
}

Job Description

Metadata interface describing job capabilities and schemas.

/**
 * Metadata describing a job
 */
interface JobDescription extends JsonObject {
  /**
   * Unique job name
   */
  readonly name: JobName;
  
  /**
   * JSON schema for job arguments
   */
  readonly argument: DeepReadonly<schema.JsonSchema>;
  
  /**
   * JSON schema for job input messages
   */
  readonly input: DeepReadonly<schema.JsonSchema>;
  
  /**
   * JSON schema for job output messages
   */
  readonly output: DeepReadonly<schema.JsonSchema>;
}

Create Job Handler

Utility functions for creating job handlers from simple functions.

/**
 * Simple job handler function signature
 */
type SimpleJobHandlerFn<A extends JsonValue, I extends JsonValue, O extends JsonValue> = (
  input: A,
  context: SimpleJobHandlerContext<A, I, O>
) => O | Promise<O> | Observable<O>;

/**
 * Extended context for simple job handlers
 */
interface SimpleJobHandlerContext<A extends JsonValue, I extends JsonValue, O extends JsonValue> 
  extends JobHandlerContext<A, I, O> {
  createChannel: (name: string) => Observer<JsonValue>;
  input: Observable<I>;
  addTeardown(teardown: () => Promise<void> | void): void;
}

/**
 * Create a job handler from a simple function
 * @param fn - Function to wrap as job handler
 * @param options - Job metadata options
 * @returns JobHandler instance
 */
function createJobHandler<A extends JsonValue, I extends JsonValue, O extends JsonValue>(
  fn: SimpleJobHandlerFn<A, I, O>,
  options?: Partial<JobDescription>
): JobHandler<A, I, O>;

/**
 * Create a job factory function
 * @param loader - Function that loads the actual job handler
 * @param options - Job metadata options
 * @returns JobHandler factory
 */
function createJobFactory<A extends JsonValue, I extends JsonValue, O extends JsonValue>(
  loader: () => Promise<JobHandler<A, I, O>>,
  options: Partial<JobDescription>
): JobHandler<A, I, O>;

/**
 * Create a logger job that forwards log entries to a channel
 * @param options - Job metadata options
 * @returns JobHandler for logging
 */
function createLoggerJob<A extends JsonValue, I extends JsonValue, O extends JsonValue>(
  options: Partial<JobDescription>
): JobHandler<A, I, O>;

Usage Examples:

import { jobs } from "@angular-devkit/architect";

// Create simple job handler
const simpleJob = jobs.createJobHandler(
  (argument: { message: string }) => {
    console.log(argument.message);
    return Promise.resolve({ result: "done" });
  },
  {
    name: "simple-job",
    argument: { type: "object", properties: { message: { type: "string" } } },
    input: false,
    output: { type: "object", properties: { result: { type: "string" } } }
  }
);

// Create complex job handler
const complexJob = jobs.createJobHandler(
  (argument: { count: number }, context) => {
    return new Observable(observer => {
      for (let i = 0; i < argument.count; i++) {
        observer.next({ step: i, total: argument.count });
      }
      observer.complete();
    });
  },
  {
    name: "complex-job",
    argument: { type: "object", properties: { count: { type: "number" } } },
    output: { 
      type: "object", 
      properties: { 
        step: { type: "number" },
        total: { type: "number" }
      }
    }
  }
);

Registry Interface

Interface for job registration and resolution.

/**
 * Registry interface for storing and retrieving job handlers
 */
interface Registry<
  MinimumArgumentValueT extends JsonValue = JsonValue,
  MinimumInputValueT extends JsonValue = JsonValue,
  MinimumOutputValueT extends JsonValue = JsonValue
> {
  /**
   * Get a job handler by name
   * @param name - Job name to retrieve
   * @returns Observable of job handler or null
   */
  get<A extends MinimumArgumentValueT, I extends MinimumInputValueT, O extends MinimumOutputValueT>(
    name: JobName
  ): Observable<JobHandler<A, I, O> | null>;
}

Built-in Registry Implementations

Pre-built registry implementations for common use cases.

/**
 * Simple in-memory job registry
 */
class SimpleJobRegistry implements Registry {
  /**
   * Register a job handler
   * @param handler - Job handler to register
   */
  register<A extends JsonValue, I extends JsonValue, O extends JsonValue>(
    handler: JobHandler<A, I, O>
  ): void;
}

/**
 * Registry that tries multiple registries in order
 */
class FallbackRegistry implements Registry {
  /**
   * Create registry with fallback chain
   * @param registries - Array of registries to try in order
   */
  constructor(registries: Registry[]);
}

/**
 * Simple job scheduler implementation
 */
class SimpleScheduler implements Scheduler {
  /**
   * Create scheduler with registry and schema registry
   * @param registry - Job registry for resolving jobs
   * @param schemaRegistry - Schema registry for validation
   */
  constructor(
    registry: Registry,
    schemaRegistry?: schema.SchemaRegistry
  );
}

Usage Examples:

import { jobs } from "@angular-devkit/architect";

// Create and configure registry
const registry = new jobs.SimpleJobRegistry();

// Register job handlers
registry.register(simpleJob);
registry.register(complexJob);

// Create fallback registry
const fallbackRegistry = new jobs.FallbackRegistry([
  registry,
  additionalRegistry
]);

// Create scheduler
const scheduler = new jobs.SimpleScheduler(fallbackRegistry);

// Use scheduler
const job = scheduler.schedule("simple-job", { message: "Hello" });
job.output.subscribe(output => {
  console.log("Job completed:", output);
});

Job Strategies

Advanced job execution strategies for controlling job behavior.

/**
 * Job strategy function type for modifying job handler behavior
 */
type JobStrategy<
  A extends JsonValue = JsonValue,
  I extends JsonValue = JsonValue,
  O extends JsonValue = JsonValue
> = (
  handler: JobHandler<A, I, O>,
  options?: Partial<Readonly<JobDescription>>
) => JobHandler<A, I, O>;

/**
 * Create a strategy that serializes job execution
 * @returns JobStrategy that runs jobs one at a time
 */
function serialize<A extends JsonValue, I extends JsonValue, O extends JsonValue>(): JobStrategy<A, I, O>;

/**
 * Create a strategy that reuses running jobs
 * @param replayMessages - Whether to replay all messages when reusing
 * @returns JobStrategy that reuses active jobs
 */
function reuse<A extends JsonValue, I extends JsonValue, O extends JsonValue>(
  replayMessages?: boolean
): JobStrategy<A, I, O>;

/**
 * Create a strategy that memoizes job results
 * @returns JobStrategy that caches job outputs
 */
function memoize<A extends JsonValue, I extends JsonValue, O extends JsonValue>(): JobStrategy<A, I, O>;

Usage Examples:

import { jobs } from "@angular-devkit/architect";

// Create job with serialization strategy
const serializedHandler = jobs.strategy.serialize()(originalHandler);

// Create job with memoization
const memoizedHandler = jobs.strategy.memoize()(expensiveHandler);

// Create job with reuse strategy
const reusableHandler = jobs.strategy.reuse(true)(longRunningHandler);

// Register strategies with registry
registry.register(serializedHandler);
registry.register(memoizedHandler);

Message Types

Message interfaces for job communication.

/**
 * Message kinds for inbound communication
 */
enum JobInboundMessageKind {
  Ping = "ip",
  Stop = "is", 
  Input = "in"
}

/**
 * Message kinds for outbound communication  
 */
enum JobOutboundMessageKind {
  OnReady = "c",
  Start = "s",
  End = "e",
  Pong = "p",
  Output = "o",
  ChannelCreate = "cn",
  ChannelMessage = "cm",
  ChannelError = "ce", 
  ChannelComplete = "cc"
}

/**
 * Input message to job
 */
interface JobInboundMessageInput<InputT extends JsonValue> {
  readonly kind: JobInboundMessageKind.Input;
  readonly value: InputT;
}

/**
 * Output message from job
 */
interface JobOutboundMessageOutput<OutputT extends JsonValue> {
  readonly kind: JobOutboundMessageKind.Output;
  readonly value: OutputT;
}