DocumentationAPI Reference📓 Tutorials🧑‍🍳 Cookbook🤝 Integrations💜 Discord🎨 Studio
API Reference

Pipeline

Haystack Pipeline API with experimental features.

Module haystack_experimental.core.pipeline.pipeline

Pipeline

Synchronous version of the orchestration engine.

Orchestrates component execution according to the execution graph, one after the other.

Pipeline.run

def run(data: Dict[str, Any],
        include_outputs_from: Optional[Set[str]] = None,
        breakpoints: Optional[Set[Tuple[str, Optional[int]]]] = None,
        resume_state: Optional[Dict[str, Any]] = None,
        debug_path: Optional[Union[str, Path]] = None) -> Dict[str, Any]

Runs the Pipeline with given input data.

Usage:

from haystack import Pipeline, Document
from haystack.utils import Secret
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders.answer_builder import AnswerBuilder
from haystack.components.builders.prompt_builder import PromptBuilder

# Write documents to InMemoryDocumentStore
document_store = InMemoryDocumentStore()
document_store.write_documents([
    Document(content="My name is Jean and I live in Paris."),
    Document(content="My name is Mark and I live in Berlin."),
    Document(content="My name is Giorgio and I live in Rome.")
])

prompt_template = """
Given these documents, answer the question.
Documents:
{% for doc in documents %}
    {{ doc.content }}
{% endfor %}
Question: {{question}}
Answer:
"""

retriever = InMemoryBM25Retriever(document_store=document_store)
prompt_builder = PromptBuilder(template=prompt_template)
llm = OpenAIGenerator(api_key=Secret.from_token(api_key))

rag_pipeline = Pipeline()
rag_pipeline.add_component("retriever", retriever)
rag_pipeline.add_component("prompt_builder", prompt_builder)
rag_pipeline.add_component("llm", llm)
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")

# Ask a question
question = "Who lives in Paris?"
results = rag_pipeline.run(
    {
        "retriever": {"query": question},
        "prompt_builder": {"question": question},
    }
)

print(results["llm"]["replies"])
# Jean lives in Paris

Arguments:

  • data: A dictionary of inputs for the pipeline's components. Each key is a component name and its value is a dictionary of that component's input parameters:
data = {
    "comp1": {"input1": 1, "input2": 2},
}

For convenience, this format is also supported when input names are unique:

data = {
    "input1": 1, "input2": 2,
}
  • include_outputs_from: Set of component names whose individual outputs are to be included in the pipeline's output. For components that are invoked multiple times (in a loop), only the last-produced output is included.
  • breakpoints: Set of tuples of component names and visit counts at which the pipeline should break execution. If the visit count is not given, it is assumed to be 0, it will break on the first visit.
  • resume_state: A dictionary containing the state of a previously saved pipeline execution.
  • debug_path: Path to the directory where the pipeline state should be saved.

Raises:

  • ValueError: If invalid inputs are provided to the pipeline.
  • PipelineRuntimeError: If the Pipeline contains cycles with unsupported connections that would cause it to get stuck and fail running. Or if a Component fails or returns output in an unsupported type.
  • PipelineMaxComponentRuns: If a Component reaches the maximum number of times it can be run in this Pipeline.
  • PipelineBreakpointException: When a breakpoint is triggered. Contains the component name, state, and partial results.

Returns:

A dictionary where each entry corresponds to a component name and its output. If include_outputs_from is None, this dictionary will only contain the outputs of leaf components, i.e., components without outgoing connections.

Pipeline.inject_resume_state_into_graph

def inject_resume_state_into_graph() -> Tuple[Dict[str, int], Dict[str, Any]]

Loads the resume state from a file and injects it into the pipeline graph.

Pipeline.transform_json_structure

@staticmethod
def transform_json_structure(
        data: Union[Dict[str, Any], List[Any], Any]) -> Any

Transforms a JSON structure by removing the 'sender' key and moving the 'value' to the top level.

For example: "key": [{"sender": null, "value": "some value"}] -> "key": "some value"

Pipeline.save_state

def save_state(
        inputs: Dict[str, Any],
        component_name: str,
        component_visits: Dict[str, int],
        callback_fun: Optional[Callable[..., Any]] = None) -> Dict[str, Any]

Saves the state of the pipeline at a given component visit count.

Returns:

The saved state dictionary

Pipeline.load_state

@staticmethod
def load_state(file_path: Union[str, Path]) -> Dict[str, Any]

Load a saved pipeline state.

Arguments:

  • file_path: Path to the state file

Returns:

Dict containing the loaded state