Haystack Pipeline API with experimental features.
Module haystack_experimental.core.pipeline.pipeline
Pipeline
Synchronous version of the orchestration engine.
Orchestrates component execution according to the execution graph, one after the other.
Pipeline.run
def run(data: Dict[str, Any],
include_outputs_from: Optional[Set[str]] = None,
break_point: Optional[Union[Breakpoint, AgentBreakpoint]] = None,
resume_state: Optional[Dict[str, Any]] = None,
debug_path: Optional[Union[str, Path]] = None) -> Dict[str, Any]
Runs the Pipeline with given input data.
Usage:
from haystack import Pipeline, Document
from haystack.utils import Secret
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders.answer_builder import AnswerBuilder
from haystack.components.builders.prompt_builder import PromptBuilder
# Write documents to InMemoryDocumentStore
document_store = InMemoryDocumentStore()
document_store.write_documents([
Document(content="My name is Jean and I live in Paris."),
Document(content="My name is Mark and I live in Berlin."),
Document(content="My name is Giorgio and I live in Rome.")
])
prompt_template = """
Given these documents, answer the question.
Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question: {{question}}
Answer:
"""
retriever = InMemoryBM25Retriever(document_store=document_store)
prompt_builder = PromptBuilder(template=prompt_template)
llm = OpenAIGenerator(api_key=Secret.from_token(api_key))
rag_pipeline = Pipeline()
rag_pipeline.add_component("retriever", retriever)
rag_pipeline.add_component("prompt_builder", prompt_builder)
rag_pipeline.add_component("llm", llm)
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")
# Ask a question
question = "Who lives in Paris?"
results = rag_pipeline.run(
{
"retriever": {"query": question},
"prompt_builder": {"question": question},
}
)
print(results["llm"]["replies"])
# Jean lives in Paris
Arguments:
data
: A dictionary of inputs for the pipeline's components. Each key is a component name and its value is a dictionary of that component's input parameters:
data = {
"comp1": {"input1": 1, "input2": 2},
}
For convenience, this format is also supported when input names are unique:
data = {
"input1": 1, "input2": 2,
}
include_outputs_from
: Set of component names whose individual outputs are to be included in the pipeline's output. For components that are invoked multiple times (in a loop), only the last-produced output is included.break_point
: A set of breakpoints that can be used to debug the pipeline execution.resume_state
: A dictionary containing the state of a previously saved pipeline execution.debug_path
: Path to the directory where the pipeline state should be saved.
Raises:
ValueError
: If invalid inputs are provided to the pipeline.PipelineRuntimeError
: If the Pipeline contains cycles with unsupported connections that would cause it to get stuck and fail running. Or if a Component fails or returns output in an unsupported type.PipelineMaxComponentRuns
: If a Component reaches the maximum number of times it can be run in this Pipeline.PipelineBreakpointException
: When a pipeline_breakpoint is triggered. Contains the component name, state, and partial results.
Returns:
A dictionary where each entry corresponds to a component name
and its output. If include_outputs_from
is None
, this dictionary
will only contain the outputs of leaf components, i.e., components
without outgoing connections.
Pipeline.inject_resume_state_into_graph
def inject_resume_state_into_graph(resume_state)
Loads the resume state from a file and injects it into the pipeline graph.