Orchestrator
The orchestrator is Agentend's multi-step workflow engine. It executes workflows as directed acyclic graphs (DAGs), running independent steps in parallel and dependent steps in sequence. It supports checkpointing, retry logic, and human-in-the-loop interrupts.
DAG workflow engine
The DAGExecutor takes a Workflow definition and executes it step by step. Steps are grouped into parallel groups based on their dependency declarations — steps with no unmet dependencies run concurrently.
from agentend.orchestrator.dag import DAGExecutor
from agentend.orchestrator.workflow import Workflow, Step
workflow = Workflow(name="invoice_pipeline")
# Steps are added with dependency declarations
workflow.add_step(Step(name="extract", worker=extract_fn))
workflow.add_step(Step(name="verify", worker=verify_fn, depends_on=["extract"]))
workflow.add_step(Step(name="summarize", worker=summarize_fn, depends_on=["extract"]))
workflow.add_step(Step(name="notify", worker=notify_fn, depends_on=["verify", "summarize"]))
executor = DAGExecutor()
results = await executor.execute(workflow)In this example, extract runs first. Once it completes, verify and summarize run in parallel. Finally, notify runs after both have finished.
Step definitions
Each step in a workflow is a Step object with the following properties:
| Property | Type | Description |
|---|---|---|
| name | str | Unique step identifier |
| worker | Callable | Async function to execute |
| depends_on | List[str] | Steps that must complete before this one runs |
| input | Dict | Static input data for the step |
| timeout_seconds | float | Maximum execution time before timeout |
| retry_config | RetryConfig | Retry policy with max retries, backoff factor, and max backoff |
| interrupt_policy | InterruptPolicy | When to pause for human approval |
Dependency outputs are automatically injected into downstream steps. If step extract produces a result, the verify step receives it as extract_result in its input dict.
Parallel execution
The DAGExecutor groups steps into parallel batches using topological ordering. Within each batch, all steps run concurrently via asyncio.gather(). Between batches, the executor checkpoints results before starting the next group.
# Execution order for the invoice pipeline:
#
# Group 1: [extract] — runs alone (no dependencies)
# Group 2: [verify, summarize] — runs in parallel (both depend on extract)
# Group 3: [notify] — runs alone (depends on verify + summarize)
#
# Total time ≈ extract + max(verify, summarize) + notifyCheckpointing and resumption
After each parallel group completes, the executor saves a checkpoint with all step results. If a workflow fails partway through, it can be resumed from the last checkpoint:
# Resume a failed workflow from the "verify" step
results = await executor.execute(
workflow,
execution_id="invoice_pipeline:1711152000",
resume_from="verify",
)Retry logic
Each step can configure retry behavior with exponential backoff. The executor retries on transient errors and timeouts, up to the configured maximum:
Step(
name="verify",
worker=verify_fn,
depends_on=["extract"],
timeout_seconds=45,
retry_config=RetryConfig(
max_retries=3,
backoff_factor=2.0, # 2s, 4s, 8s
backoff_max=30.0,
retry_on=["transient_error", "timeout"],
),
)Human-in-the-loop interrupts
Steps can be configured to pause execution and request human input before proceeding. The interrupt is surfaced to the frontend as an interrupt event. Three interrupt types are supported:
| Type | action_required | Use Case |
|---|---|---|
| Approval | approve | User must approve before a step executes (e.g., issuing a refund) |
| Selection | select | User must choose from a set of options |
| Input | input | User must provide free-form text input |
If the human rejects the step, it is marked as interrupted and skipped. Downstream steps that depend on it will not run.
Step results and status
Each step returns a StepResult with the execution status, output, duration, and retry count:
| Status | Meaning |
|---|---|
| success | Step completed successfully |
| failed | Step failed after exhausting all retries |
| interrupted | Step was rejected by human-in-the-loop |
| skipped | Step was skipped due to an upstream failure or interruption |
The overall workflow status is computed from step results: success if all steps succeeded, failed if any step failed, or partial_success if some steps were interrupted.