DocumentationAPI Reference📓 Tutorials🧑‍🍳 Cookbook🤝 Integrations💜 Discord🎨 Studio
API Reference

Pipeline

Haystack Pipeline API with experimental features.

Module haystack_experimental.core.pipeline.pipeline

Pipeline

Synchronous version of the orchestration engine.

Orchestrates component execution according to the execution graph, one after the other.

Pipeline.run

def run(data: Dict[str, Any],
        include_outputs_from: Optional[Set[str]] = None,
        break_point: Optional[Union[Breakpoint, AgentBreakpoint]] = None,
        resume_state: Optional[Dict[str, Any]] = None,
        debug_path: Optional[Union[str, Path]] = None) -> Dict[str, Any]

Runs the Pipeline with given input data.

Usage:

from haystack import Pipeline, Document
from haystack.utils import Secret
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders.answer_builder import AnswerBuilder
from haystack.components.builders.prompt_builder import PromptBuilder

# Write documents to InMemoryDocumentStore
document_store = InMemoryDocumentStore()
document_store.write_documents([
    Document(content="My name is Jean and I live in Paris."),
    Document(content="My name is Mark and I live in Berlin."),
    Document(content="My name is Giorgio and I live in Rome.")
])

prompt_template = """
Given these documents, answer the question.
Documents:
{% for doc in documents %}
    {{ doc.content }}
{% endfor %}
Question: {{question}}
Answer:
"""

retriever = InMemoryBM25Retriever(document_store=document_store)
prompt_builder = PromptBuilder(template=prompt_template)
llm = OpenAIGenerator(api_key=Secret.from_token(api_key))

rag_pipeline = Pipeline()
rag_pipeline.add_component("retriever", retriever)
rag_pipeline.add_component("prompt_builder", prompt_builder)
rag_pipeline.add_component("llm", llm)
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")

# Ask a question
question = "Who lives in Paris?"
results = rag_pipeline.run(
    {
        "retriever": {"query": question},
        "prompt_builder": {"question": question},
    }
)

print(results["llm"]["replies"])
# Jean lives in Paris

Arguments:

  • data: A dictionary of inputs for the pipeline's components. Each key is a component name and its value is a dictionary of that component's input parameters:
data = {
    "comp1": {"input1": 1, "input2": 2},
}

For convenience, this format is also supported when input names are unique:

data = {
    "input1": 1, "input2": 2,
}
  • include_outputs_from: Set of component names whose individual outputs are to be included in the pipeline's output. For components that are invoked multiple times (in a loop), only the last-produced output is included.
  • break_point: A set of breakpoints that can be used to debug the pipeline execution.
  • resume_state: A dictionary containing the state of a previously saved pipeline execution.
  • debug_path: Path to the directory where the pipeline state should be saved.

Raises:

  • ValueError: If invalid inputs are provided to the pipeline.
  • PipelineRuntimeError: If the Pipeline contains cycles with unsupported connections that would cause it to get stuck and fail running. Or if a Component fails or returns output in an unsupported type.
  • PipelineMaxComponentRuns: If a Component reaches the maximum number of times it can be run in this Pipeline.
  • PipelineBreakpointException: When a pipeline_breakpoint is triggered. Contains the component name, state, and partial results.

Returns:

A dictionary where each entry corresponds to a component name and its output. If include_outputs_from is None, this dictionary will only contain the outputs of leaf components, i.e., components without outgoing connections.

Pipeline.inject_resume_state_into_graph

def inject_resume_state_into_graph(resume_state)

Loads the resume state from a file and injects it into the pipeline graph.