Haystack Pipeline API with experimental features.
Module haystack_experimental.core.pipeline.pipeline
Pipeline
Synchronous version of the orchestration engine.
Orchestrates component execution according to the execution graph, one after the other.
Pipeline.run
def run(data: Dict[str, Any],
include_outputs_from: Optional[Set[str]] = None,
breakpoints: Optional[Set[Tuple[str, Optional[int]]]] = None,
resume_state: Optional[Dict[str, Any]] = None,
debug_path: Optional[Union[str, Path]] = None) -> Dict[str, Any]
Runs the Pipeline with given input data.
Usage:
from haystack import Pipeline, Document
from haystack.utils import Secret
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders.answer_builder import AnswerBuilder
from haystack.components.builders.prompt_builder import PromptBuilder
# Write documents to InMemoryDocumentStore
document_store = InMemoryDocumentStore()
document_store.write_documents([
Document(content="My name is Jean and I live in Paris."),
Document(content="My name is Mark and I live in Berlin."),
Document(content="My name is Giorgio and I live in Rome.")
])
prompt_template = """
Given these documents, answer the question.
Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question: {{question}}
Answer:
"""
retriever = InMemoryBM25Retriever(document_store=document_store)
prompt_builder = PromptBuilder(template=prompt_template)
llm = OpenAIGenerator(api_key=Secret.from_token(api_key))
rag_pipeline = Pipeline()
rag_pipeline.add_component("retriever", retriever)
rag_pipeline.add_component("prompt_builder", prompt_builder)
rag_pipeline.add_component("llm", llm)
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")
# Ask a question
question = "Who lives in Paris?"
results = rag_pipeline.run(
{
"retriever": {"query": question},
"prompt_builder": {"question": question},
}
)
print(results["llm"]["replies"])
# Jean lives in Paris
Arguments:
data
: A dictionary of inputs for the pipeline's components. Each key is a component name and its value is a dictionary of that component's input parameters:
data = {
"comp1": {"input1": 1, "input2": 2},
}
For convenience, this format is also supported when input names are unique:
data = {
"input1": 1, "input2": 2,
}
include_outputs_from
: Set of component names whose individual outputs are to be included in the pipeline's output. For components that are invoked multiple times (in a loop), only the last-produced output is included.breakpoints
: Set of tuples of component names and visit counts at which the pipeline should break execution. If the visit count is not given, it is assumed to be 0, it will break on the first visit.resume_state
: A dictionary containing the state of a previously saved pipeline execution.debug_path
: Path to the directory where the pipeline state should be saved.
Raises:
ValueError
: If invalid inputs are provided to the pipeline.PipelineRuntimeError
: If the Pipeline contains cycles with unsupported connections that would cause it to get stuck and fail running. Or if a Component fails or returns output in an unsupported type.PipelineMaxComponentRuns
: If a Component reaches the maximum number of times it can be run in this Pipeline.PipelineBreakpointException
: When a breakpoint is triggered. Contains the component name, state, and partial results.
Returns:
A dictionary where each entry corresponds to a component name
and its output. If include_outputs_from
is None
, this dictionary
will only contain the outputs of leaf components, i.e., components
without outgoing connections.
Pipeline.inject_resume_state_into_graph
def inject_resume_state_into_graph() -> Tuple[Dict[str, int], Dict[str, Any]]
Loads the resume state from a file and injects it into the pipeline graph.
Pipeline.transform_json_structure
@staticmethod
def transform_json_structure(
data: Union[Dict[str, Any], List[Any], Any]) -> Any
Transforms a JSON structure by removing the 'sender' key and moving the 'value' to the top level.
For example: "key": [{"sender": null, "value": "some value"}] -> "key": "some value"
Pipeline.save_state
def save_state(
inputs: Dict[str, Any],
component_name: str,
component_visits: Dict[str, int],
callback_fun: Optional[Callable[..., Any]] = None) -> Dict[str, Any]
Saves the state of the pipeline at a given component visit count.
Returns:
The saved state dictionary
Pipeline.load_state
@staticmethod
def load_state(file_path: Union[str, Path]) -> Dict[str, Any]
Load a saved pipeline state.
Arguments:
file_path
: Path to the state file
Returns:
Dict containing the loaded state