Pipeline Loops
Learn how loops work in Haystack pipelines, how they terminate, and how to use them for feedback and self-correction.
Haystack pipelines support loops: cycles in the component graph where the output of a later component is fed back into an earlier one. This enables feedback flows such as self-correction, validation, or iterative refinement, as well as more advanced agentic behavior.
At runtime, the pipeline re-runs a component whenever all of its required inputs are ready again. You control when loops stop either by designing your graph and routing logic carefully or by using built-in safety limits.
Multiple Runs of the Same Component
If a component participates in a loop, it can be run multiple times within a single Pipeline.run() call.
The pipeline keeps an internal visit counter for each component:
- Each time the component runs, its visit count increases by 1.
- You can use this visit count in debugging tools like breakpoints to inspect specific iterations of a loop.
In the final pipeline result:
- For each component that ran, the pipeline returns only the last-produced output.
- To capture outputs from intermediate components (for example, a validator or a router) in the final result dictionary, use the
include_outputs_fromargument ofPipeline.run().
Loop Termination and Safety Limits
Loops must eventually stop so that a pipeline run can complete. There are two main ways a loop ends:
-
Natural completion: No more components are runnable
The pipeline finishes when the work queue is empty and no component can run again (for example, the router stops feeding inputs back into the loop). -
Reaching the maximum run count
Every pipeline has a per-component run limit, controlled by themax_runs_per_componentparameter of thePipeline(orAsyncPipeline) constructor, which is100by default. If any component exceeds this limit, Haystack raises aPipelineMaxComponentRunserror.You can set this limit to a lower value:
pythonfrom haystack import Pipeline
pipe = Pipeline(max_runs_per_component=5)The limit is checked before each execution, so a component with a limit of 3 will complete 3 runs successfully before the error is raised on the 4th attempt.
This safeguard is especially important when experimenting with new loops or complex routing logic. If your loop condition is wrong or never satisfied, the error prevents the pipeline from running indefinitely.
Example: Feedback Loop for Self-Correction
The following example shows a simple feedback loop where:
- A
ChatPromptBuildercreates a prompt that includes previous incorrect replies. - An
OpenAIChatGeneratorproduces an answer. - A
ConditionalRouterchecks if the answer is correct:- If correct, it sends the answer to
final_answerand the loop ends. - If incorrect, it sends the answer back to the
ChatPromptBuilder, which triggers another iteration.
- If correct, it sends the answer to
from haystack import Pipeline
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.routers import ConditionalRouter
from haystack.dataclasses import ChatMessage
template = [
ChatMessage.from_system("Answer the following question concisely with just the answer, no punctuation."),
ChatMessage.from_user(
"{% if previous_replies %}"
"Previously you replied incorrectly: {{ previous_replies[0].text }}\n"
"{% endif %}"
"Question: {{ query }}"
),
]
prompt_builder = ChatPromptBuilder(template=template, required_variables=["query"])
generator = OpenAIChatGenerator()
router = ConditionalRouter(
routes=[
{
# End the loop when the answer is correct
"condition": "{{ 'Rome' in replies[0].text }}",
"output": "{{ replies }}",
"output_name": "final_answer",
"output_type": list[ChatMessage],
},
{
# Loop back when the answer is incorrect
"condition": "{{ 'Rome' not in replies[0].text }}",
"output": "{{ replies }}",
"output_name": "previous_replies",
"output_type": list[ChatMessage],
},
],
unsafe=True, # Required to handle ChatMessage objects
)
pipe = Pipeline(max_runs_per_component=3)
pipe.add_component("prompt_builder", prompt_builder)
pipe.add_component("generator", generator)
pipe.add_component("router", router)
pipe.connect("prompt_builder.prompt", "generator.messages")
pipe.connect("generator.replies", "router.replies")
pipe.connect("router.previous_replies", "prompt_builder.previous_replies")
result = pipe.run(
{
"prompt_builder": {
"query": "What is the capital of Italy? If the statement 'Previously you replied incorrectly:' is missing "
"above then answer with Milan.",
}
},
include_outputs_from={"router", "prompt_builder"},
)
print(result["prompt_builder"]["prompt"][1].text) # Shows the last prompt used
print(result["router"]["final_answer"][0].text) # Rome
What Happens During This Loop
-
First iteration
prompt_builderruns withquery="What is the capital of Italy?"and no previous replies.generatorreturns aChatMessagewith the LLM's answer.- The router evaluates its conditions and checks if
"Rome"is in the reply. - If the answer is incorrect,
previous_repliesis fed back intoprompt_builder.previous_replies.
-
Subsequent iterations (if needed)
prompt_builderruns again, now including the previous incorrect reply in the user message.generatorproduces a new answer with the additional context.- The router checks again whether the answer contains
"Rome".
-
Termination
- When the router routes to
final_answer, no more inputs are fed back into the loop. - The queue empties and the pipeline run finishes successfully.
- When the router routes to
Because we used max_runs_per_component=3, any unexpected behavior that causes the loop to continue would raise a PipelineMaxComponentRuns error instead of looping forever.
Components for Building Loops
Two components are particularly useful for building loops:
-
ConditionalRouter: Routes data to different outputs based on conditions. Use it to decide whether to exit the loop or continue iterating. The example above uses this pattern. -
BranchJoiner: Merges inputs from multiple sources into a single output. Use it when a component inside the loop needs to receive both the initial input (on the first iteration) and looped-back values (on subsequent iterations). For example, you might useBranchJoinerto feed both user input and validation errors into the same Generator. See the BranchJoiner documentation for a complete loop example.
Greedy vs. Lazy Variadic Sockets in Loops
Some components support variadic inputs that can receive multiple values on a single socket. In loops, variadic behavior controls how inputs are consumed across iterations.
-
Greedy variadic sockets
Consume exactly one value at a time and remove it after the component runs. This includes user-provided inputs, which prevents them from retriggering the component indefinitely. Most variadic sockets are greedy by default. -
Lazy variadic sockets
Accumulate all values received from predecessors across iterations. Useful when you need to collect multiple partial results over time (for example, gathering outputs from several loop iterations before proceeding).
For most loop scenarios it's sufficient to just connect components as usual and use max_runs_per_component to protect against mistakes.
Troubleshooting Loops
If your pipeline seems stuck or runs longer than expected, here are common causes and how to debug them.
Common Causes of Infinite Loops
-
Condition never satisfied: Your exit condition (for example,
"Rome" in reply) might never be true due to LLM behavior or data issues. Always set a reasonablemax_runs_per_componentas a safety net. -
Relying on optional outputs: When a component has multiple output sockets but only returns some of them, the unreturned outputs don't trigger their downstream connections. This can cause confusion in loops.
For example, this pattern can be problematic:
python@component
class Validator:
@component.output_types(valid=str, invalid=Optional[str])
def run(self, text: str):
if is_valid(text):
return {"valid": text} # "invalid" is never returned
else:
return {"invalid": text}If you connect
invalidback to an upstream component for retry, but also have other connections that keep the loop alive, you might get unexpected behavior.Instead, use a
ConditionalRouterwith explicit, mutually exclusive conditions:pythonrouter = ConditionalRouter(
routes=[
{"condition": "{{ is_valid }}", "output": "{{ text }}", "output_name": "valid", ...},
{"condition": "{{ not is_valid }}", "output": "{{ text }}", "output_name": "invalid", ...},
]
) -
User inputs retriggering the loop: If a user-provided input is connected to a socket inside the loop, it might cause the loop to restart unexpectedly.
python# Problematic: user input goes directly to a component inside the loop
result = pipe.run({
"generator": {"prompt": query}, # This input persists and may retrigger the loop
})
# Better: use an entry-point component outside the loop
result = pipe.run({
"prompt_builder": {"query": query}, # Entry point feeds into the loop once
})See Greedy vs. Lazy Variadic Sockets for details on how inputs are consumed.
-
Multiple paths feeding the same component: If a component inside the loop receives inputs from multiple sources, it runs whenever any path provides input.
python# Component receives from two sources – runs when either provides input
pipe.connect("source_a.output", "processor.input")
pipe.connect("source_b.output", "processor.input") # Variadic inputEnsure you understand when each path produces output, or use
BranchJoinerto explicitly control the merge point.
Debugging Tips
-
Start with a low limit: When developing loops, set
max_runs_per_component=3or similar. This helps you catch issues early with a clear error instead of waiting for a timeout. -
Use
include_outputs_from: Add intermediate components (like your router) to see what's happening at each step: -
Enable tracing: Use tracing to see every component execution, including inputs and outputs. This makes it easy to follow each iteration of the loop. For quick debugging, use
LoggingTracer(setup instructions). For deeper analysis, integrate with tools like Langfuse or other tracing backends. -
Visualize the pipeline: Use
pipe.draw()orpipe.show()to see the graph structure and verify your connections are correct. See the Pipeline Visualization documentation for details. -
Use breakpoints: Set a
Breakpointon a specific component and visit count to inspect the state at that iteration. See Pipeline Breakpoints for details. -
Check for blocked pipelines: If you see a
PipelineComponentsBlockedError, it means no components can run. This typically indicates a missing connection or a circular dependency. Check that all required inputs are provided.
By combining careful graph design, per-component run limits, and these debugging tools, you can build robust feedback loops in your Haystack pipelines.