Skip to main content
Version: 2.21-unstable

Pipeline Loops

Learn how loops work in Haystack pipelines, how they terminate, and how to use them for feedback and self-correction.

Haystack pipelines support loops: cycles in the component graph where the output of a later component is fed back into an earlier one. This enables feedback flows such as self-correction, validation, or iterative refinement, as well as more advanced agentic behavior.

At runtime, the pipeline re-runs a component whenever all of its required inputs are ready again. You control when loops stop either by designing your graph and routing logic carefully or by using built-in safety limits.

Multiple Runs of the Same Component

If a component participates in a loop, it can be run multiple times within a single Pipeline.run() call. The pipeline keeps an internal visit counter for each component:

  • Each time the component runs, its visit count increases by 1.
  • You can use this visit count in debugging tools like breakpoints to inspect specific iterations of a loop.

In the final pipeline result:

  • For each component that ran, the pipeline returns only the last-produced output.
  • To capture outputs from intermediate components (for example, a validator or a router) in the final result dictionary, use the include_outputs_from argument of Pipeline.run().

Loop Termination and Safety Limits

Loops must eventually stop so that a pipeline run can complete. There are two main ways a loop ends:

  1. Natural completion: No more components are runnable
    The pipeline finishes when the work queue is empty and no component can run again (for example, the router stops feeding inputs back into the loop).

  2. Reaching the maximum run count
    Every pipeline has a per-component run limit, controlled by the max_runs_per_component parameter of the Pipeline (or AsyncPipeline) constructor, which is 100 by default. If any component exceeds this limit, Haystack raises a PipelineMaxComponentRuns error.

    You can set this limit to a lower value:

    python
    from haystack import Pipeline

    pipe = Pipeline(max_runs_per_component=5)

    The limit is checked before each execution, so a component with a limit of 3 will complete 3 runs successfully before the error is raised on the 4th attempt.

    This safeguard is especially important when experimenting with new loops or complex routing logic. If your loop condition is wrong or never satisfied, the error prevents the pipeline from running indefinitely.

Example: Feedback Loop for Self-Correction

The following example shows a simple feedback loop where:

  • A ChatPromptBuilder creates a prompt that includes previous incorrect replies.
  • An OpenAIChatGenerator produces an answer.
  • A ConditionalRouter checks if the answer is correct:
    • If correct, it sends the answer to final_answer and the loop ends.
    • If incorrect, it sends the answer back to the ChatPromptBuilder, which triggers another iteration.
python
from haystack import Pipeline
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.routers import ConditionalRouter
from haystack.dataclasses import ChatMessage

template = [
ChatMessage.from_system("Answer the following question concisely with just the answer, no punctuation."),
ChatMessage.from_user(
"{% if previous_replies %}"
"Previously you replied incorrectly: {{ previous_replies[0].text }}\n"
"{% endif %}"
"Question: {{ query }}"
),
]

prompt_builder = ChatPromptBuilder(template=template, required_variables=["query"])
generator = OpenAIChatGenerator()

router = ConditionalRouter(
routes=[
{
# End the loop when the answer is correct
"condition": "{{ 'Rome' in replies[0].text }}",
"output": "{{ replies }}",
"output_name": "final_answer",
"output_type": list[ChatMessage],
},
{
# Loop back when the answer is incorrect
"condition": "{{ 'Rome' not in replies[0].text }}",
"output": "{{ replies }}",
"output_name": "previous_replies",
"output_type": list[ChatMessage],
},
],
unsafe=True, # Required to handle ChatMessage objects
)

pipe = Pipeline(max_runs_per_component=3)

pipe.add_component("prompt_builder", prompt_builder)
pipe.add_component("generator", generator)
pipe.add_component("router", router)

pipe.connect("prompt_builder.prompt", "generator.messages")
pipe.connect("generator.replies", "router.replies")
pipe.connect("router.previous_replies", "prompt_builder.previous_replies")

result = pipe.run(
{
"prompt_builder": {
"query": "What is the capital of Italy? If the statement 'Previously you replied incorrectly:' is missing "
"above then answer with Milan.",
}
},
include_outputs_from={"router", "prompt_builder"},
)

print(result["prompt_builder"]["prompt"][1].text) # Shows the last prompt used
print(result["router"]["final_answer"][0].text) # Rome

What Happens During This Loop

  1. First iteration

    • prompt_builder runs with query="What is the capital of Italy?" and no previous replies.
    • generator returns a ChatMessage with the LLM's answer.
    • The router evaluates its conditions and checks if "Rome" is in the reply.
    • If the answer is incorrect, previous_replies is fed back into prompt_builder.previous_replies.
  2. Subsequent iterations (if needed)

    • prompt_builder runs again, now including the previous incorrect reply in the user message.
    • generator produces a new answer with the additional context.
    • The router checks again whether the answer contains "Rome".
  3. Termination

    • When the router routes to final_answer, no more inputs are fed back into the loop.
    • The queue empties and the pipeline run finishes successfully.

Because we used max_runs_per_component=3, any unexpected behavior that causes the loop to continue would raise a PipelineMaxComponentRuns error instead of looping forever.

Components for Building Loops

Two components are particularly useful for building loops:

  • ConditionalRouter: Routes data to different outputs based on conditions. Use it to decide whether to exit the loop or continue iterating. The example above uses this pattern.

  • BranchJoiner: Merges inputs from multiple sources into a single output. Use it when a component inside the loop needs to receive both the initial input (on the first iteration) and looped-back values (on subsequent iterations). For example, you might use BranchJoiner to feed both user input and validation errors into the same Generator. See the BranchJoiner documentation for a complete loop example.

Greedy vs. Lazy Variadic Sockets in Loops

Some components support variadic inputs that can receive multiple values on a single socket. In loops, variadic behavior controls how inputs are consumed across iterations.

  • Greedy variadic sockets
    Consume exactly one value at a time and remove it after the component runs. This includes user-provided inputs, which prevents them from retriggering the component indefinitely. Most variadic sockets are greedy by default.

  • Lazy variadic sockets
    Accumulate all values received from predecessors across iterations. Useful when you need to collect multiple partial results over time (for example, gathering outputs from several loop iterations before proceeding).

For most loop scenarios it's sufficient to just connect components as usual and use max_runs_per_component to protect against mistakes.

Troubleshooting Loops

If your pipeline seems stuck or runs longer than expected, here are common causes and how to debug them.

Common Causes of Infinite Loops

  1. Condition never satisfied: Your exit condition (for example, "Rome" in reply) might never be true due to LLM behavior or data issues. Always set a reasonable max_runs_per_component as a safety net.

  2. Relying on optional outputs: When a component has multiple output sockets but only returns some of them, the unreturned outputs don't trigger their downstream connections. This can cause confusion in loops.

    For example, this pattern can be problematic:

    python
    @component
    class Validator:
    @component.output_types(valid=str, invalid=Optional[str])
    def run(self, text: str):
    if is_valid(text):
    return {"valid": text} # "invalid" is never returned
    else:
    return {"invalid": text}

    If you connect invalid back to an upstream component for retry, but also have other connections that keep the loop alive, you might get unexpected behavior.

    Instead, use a ConditionalRouter with explicit, mutually exclusive conditions:

    python
    router = ConditionalRouter(
    routes=[
    {"condition": "{{ is_valid }}", "output": "{{ text }}", "output_name": "valid", ...},
    {"condition": "{{ not is_valid }}", "output": "{{ text }}", "output_name": "invalid", ...},
    ]
    )
  3. User inputs retriggering the loop: If a user-provided input is connected to a socket inside the loop, it might cause the loop to restart unexpectedly.

    python
    # Problematic: user input goes directly to a component inside the loop
    result = pipe.run({
    "generator": {"prompt": query}, # This input persists and may retrigger the loop
    })

    # Better: use an entry-point component outside the loop
    result = pipe.run({
    "prompt_builder": {"query": query}, # Entry point feeds into the loop once
    })

    See Greedy vs. Lazy Variadic Sockets for details on how inputs are consumed.

  4. Multiple paths feeding the same component: If a component inside the loop receives inputs from multiple sources, it runs whenever any path provides input.

    python
    # Component receives from two sources – runs when either provides input
    pipe.connect("source_a.output", "processor.input")
    pipe.connect("source_b.output", "processor.input") # Variadic input

    Ensure you understand when each path produces output, or use BranchJoiner to explicitly control the merge point.

Debugging Tips

  1. Start with a low limit: When developing loops, set max_runs_per_component=3 or similar. This helps you catch issues early with a clear error instead of waiting for a timeout.

  2. Use include_outputs_from: Add intermediate components (like your router) to see what's happening at each step:

    python
    result = pipe.run(data, include_outputs_from={"router", "validator"})
  3. Enable tracing: Use tracing to see every component execution, including inputs and outputs. This makes it easy to follow each iteration of the loop. For quick debugging, use LoggingTracer (setup instructions). For deeper analysis, integrate with tools like Langfuse or other tracing backends.

  4. Visualize the pipeline: Use pipe.draw() or pipe.show() to see the graph structure and verify your connections are correct. See the Pipeline Visualization documentation for details.

  5. Use breakpoints: Set a Breakpoint on a specific component and visit count to inspect the state at that iteration. See Pipeline Breakpoints for details.

  6. Check for blocked pipelines: If you see a PipelineComponentsBlockedError, it means no components can run. This typically indicates a missing connection or a circular dependency. Check that all required inputs are provided.

By combining careful graph design, per-component run limits, and these debugging tools, you can build robust feedback loops in your Haystack pipelines.