Orchestrator

The orchestrator is Agentend's multi-step workflow engine. It executes workflows as directed acyclic graphs (DAGs), running independent steps in parallel and dependent steps in sequence. It supports checkpointing, retry logic, and human-in-the-loop interrupts.

DAG workflow engine

The DAGExecutor takes a Workflow definition and executes it step by step. Steps are grouped into parallel groups based on their dependency declarations — steps with no unmet dependencies run concurrently.

from agentend.orchestrator.dag import DAGExecutor
from agentend.orchestrator.workflow import Workflow, Step

workflow = Workflow(name="invoice_pipeline")

# Steps are added with dependency declarations
workflow.add_step(Step(name="extract", worker=extract_fn))
workflow.add_step(Step(name="verify", worker=verify_fn, depends_on=["extract"]))
workflow.add_step(Step(name="summarize", worker=summarize_fn, depends_on=["extract"]))
workflow.add_step(Step(name="notify", worker=notify_fn, depends_on=["verify", "summarize"]))

executor = DAGExecutor()
results = await executor.execute(workflow)

In this example, extract runs first. Once it completes, verify and summarize run in parallel. Finally, notify runs after both have finished.

Step definitions

Each step in a workflow is a Step object with the following properties:

PropertyTypeDescription
namestrUnique step identifier
workerCallableAsync function to execute
depends_onList[str]Steps that must complete before this one runs
inputDictStatic input data for the step
timeout_secondsfloatMaximum execution time before timeout
retry_configRetryConfigRetry policy with max retries, backoff factor, and max backoff
interrupt_policyInterruptPolicyWhen to pause for human approval

Dependency outputs are automatically injected into downstream steps. If step extract produces a result, the verify step receives it as extract_result in its input dict.

Parallel execution

The DAGExecutor groups steps into parallel batches using topological ordering. Within each batch, all steps run concurrently via asyncio.gather(). Between batches, the executor checkpoints results before starting the next group.

# Execution order for the invoice pipeline:
#
# Group 1: [extract]          — runs alone (no dependencies)
# Group 2: [verify, summarize] — runs in parallel (both depend on extract)
# Group 3: [notify]           — runs alone (depends on verify + summarize)
#
# Total time ≈ extract + max(verify, summarize) + notify

Checkpointing and resumption

After each parallel group completes, the executor saves a checkpoint with all step results. If a workflow fails partway through, it can be resumed from the last checkpoint:

# Resume a failed workflow from the "verify" step
results = await executor.execute(
    workflow,
    execution_id="invoice_pipeline:1711152000",
    resume_from="verify",
)

Retry logic

Each step can configure retry behavior with exponential backoff. The executor retries on transient errors and timeouts, up to the configured maximum:

Step(
    name="verify",
    worker=verify_fn,
    depends_on=["extract"],
    timeout_seconds=45,
    retry_config=RetryConfig(
        max_retries=3,
        backoff_factor=2.0,   # 2s, 4s, 8s
        backoff_max=30.0,
        retry_on=["transient_error", "timeout"],
    ),
)

Human-in-the-loop interrupts

Steps can be configured to pause execution and request human input before proceeding. The interrupt is surfaced to the frontend as an interrupt event. Three interrupt types are supported:

Typeaction_requiredUse Case
ApprovalapproveUser must approve before a step executes (e.g., issuing a refund)
SelectionselectUser must choose from a set of options
InputinputUser must provide free-form text input

If the human rejects the step, it is marked as interrupted and skipped. Downstream steps that depend on it will not run.

Step results and status

Each step returns a StepResult with the execution status, output, duration, and retry count:

StatusMeaning
successStep completed successfully
failedStep failed after exhausting all retries
interruptedStep was rejected by human-in-the-loop
skippedStep was skipped due to an upstream failure or interruption

The overall workflow status is computed from step results: success if all steps succeeded, failed if any step failed, or partial_success if some steps were interrupted.