DocumentationAPI Reference📓 Tutorials🧑‍🍳 Cookbook🤝 Integrations💜 Discord🎨 Studio
Documentation

Pipelines

To build modern search pipelines, you need two things: powerful nodes and an easy way to put them together. The Haystack pipeline is built for this purpose and enables many search scenarios beyond question answering.

Flexibility powered by DAGs

The core idea is to build a Directed Acyclic Graph (DAG) where each node can be a model like a Reader, Retriever or Generator.

Querying Pipelines

Querying pipelines perform searches on a set of documents that have been indexed into a DocumentStore. The first node in a querying pipeline always takes query as input. To find the right answer, the pipeline uses Documents stored in a DocumentStore.

Here's an example of a querying pipeline for open domain question answering:

from haystack import Pipeline

p = Pipeline()
p.add_node(component=retriever, name="ESRetriever1", inputs=["Query"])
p.add_node(component=reader, name="QAReader", inputs=["ESRetriever1"])
res = p.run(query="What did Einstein work on?")

Indexing Pipelines

Indexing pipelines prepare your files for search. Their main objective is to convert your files into Haystack Documents, so that they can be saved in a DocumentStore.

They begin with either a FileConverter, or a FileClassifier followed by FileConverters. The last node in an indexing pipeline is always a DocumentStore.

When you add nodes into the middle of an indexing pipeline, they can also perform extra steps of processing on Documents before they are saved into the DocumentStore. For example, a PreProcessor can be added to perform Document splitting and cleaning. A DocumentClassifier in an indexing pipeline attaches a classification label to each Document.

Here's an example of an indexing pipeline that preprocesses and classifies each document:

from haystack.pipelines import Pipeline

p = Pipeline()
p.add_node(component=text_converter, name="TextConverter", inputs=["File"])
p.add_node(component=preprocessor, name="PreProcessor", inputs=["TextConverter"])
p.add_node(component=document_classifier, name="DocumentClassifier", inputs=["PreProcessor"])
p.add_node(component=document_store, name="DocumentStore", inputs=["DocumentClassifier"])

p.run(file_paths=["filename.txt"])

👍

Tutorial

For a more detailed example of this kind of pipeline, have a look at the Document Classification at Indexing Tutorial.

Initialize a Pipeline

To start building your custom pipeline, initialize an object of the base Pipeline class:

from haystack import Pipeline

pipeline = Pipeline()

By default, a query pipeline receives a root node called Query and an indexing pipeline receives a root node called File as the entry point to the pipeline graph. Then, you manually add other nodes to the pipeline to define how the flow of information.

Add Nodes to a Pipeline

Use the add_node() method to add new components to the pipeline graph. You may either initialize the modules before or during the call to add_node(). When you add a node to the pipeline, give it a name and a list of inputs containing one or more items. Note how the default Query node acts as the input node to the first explicitly defined node.

pipeline.add_node(component=retriever, name='Retriever', inputs=['Query'])

Here's an example of a node with several input sources:

pipeline.add_node(component=JoinNode(), name='Joiner',
                  inputs=['Retriever1', 'Retriever2'])

If the predecessor node has more than one output, you’ll need to specify the output number in the inputs list. For example:

pipeline.add_node(component=Branch1(), name='Branch1',
                  inputs=['TopicClassifier.output_1'])
pipeline.add_node(component=Branch2(), name='Branch2',
                  inputs=['TopicClassifier.output_2'])

Under the hood, the nodes are placed in a queue and executed one by one when the run() or run_batch() method is invoked. The output of the last node in the queue is the output of the entire pipeline.

When you create a custom pipeline, you need to pay extra care that each node’s output is compatible with the input of the successive node in the chain. Otherwise, your system will throw an error at runtime.

Arguments

You configure the arguments of each node's run() and run_batch() methods. These arguments are then passed to the Pipeline's run() method.

The Pipeline class takes care of passing relevant arguments to the nodes. In addition to mandatory inputs like query and queries, run() and run_batch() accept optional node parameters, such as the number of documents to display (top_k) with the params argument. For instance, params={"top_k": 5} sets the top_k of all nodes to 5. To target parameters to a specific node, you can explicitly specify the node name, for example: params={"Retriever": {"top_k": 5}}.

This example sets the number of Documents the Retriever node should return to 5 and the number of Documents the Reader node should return to 3:

res = pipeline.run(
    query="What did Einstein work on?",
    params={"Retriever": {"top_k": 5}, "Reader": {"top_k": 3}}
)

Run a Pipeline

The run() function is the single command that triggers the execution of the entire pipeline. The arguments passed in the Pipeline's run() method come from each node's run() or run_batch() methods.

For querying pipelines, provide query as an argument:

query = "What's the history of Quidditch?"
pipeline.run(query=query)

For indexing pipelines, you will need to provide file_paths as an argument:

file_paths = ["filename1.txt", "filename2.pdf"]
pipeline.run(file_paths=file_paths)

For querying pipelines, there's an additional function run_batch() that you can use to provide multiple queries as input:

queries = ["What's the history of Quidditch?", "Where did Quidditch originate?", "When did the first Quidditch match take place?"]
pipeline.run_batch(queries=queries)

Every node has its own run() method, and the pipeline run() call invokes each node, one after the other. When you run() a pipeline, all the function arguments are propagated to every node in the graph. The same is true for the run_batch() method.

Say, the top_k values of Retriever and Ranker have aliases that are automatically recognized by the respective modules. This lets you dynamically modify these parameters in each call to the pipeline:

pipeline.run(query=query, params={"retriever": {"top_k": 28}, "ranker": {"top_k": 9}})

To learn more about the run_batch() method, see also the PR on Github.

Inspect a Pipeline

Using draw()

The pipeline.draw() method generates a sketch of your pipeline. By looking at a drawing of your pipeline, it's easier to check if it's structured as you intended. This is especially true for customized graphs that may branch out at some point.

140

A Retriever-Reader pipeline.

Accessing Pipeline Nodes

If your custom pipeline is not working as intended, try running your nodes in isolation. You may access any pipeline node by using the get_node() method and specifying the component's name:

retriever_node = pipeline.get_node('Retriever')

Returning Debugging Information

Nodes in a pipeline can provide debug information that gets propagated to the final output of a pipeline. For instance, a QueryClassifier node can provide the decision it made on a given query. A Retriever can provide the input query and arguments it received as well as the candidate documents it retrieved, even if it is not the final node in your pipeline.

In Haystack, you can see the input and output of the node using a debug argument. If create a custom node, it has this debugging functionality as long as it inherits from BaseComponent.

In all Haystack nodes, you can enable debugging in a few ways:

  • You can set the debugging attribute of a node:
es_retriever = BM25Retriever(document_store)
es_retriever.debug = True
  • You can provide the debugging attribute as a parameter when running your pipeline:
result = pipeline.run(
    query="Who is the father of Arya Stark?",
    params={
        "ESRetriever": {
            "debug": True
        },
        "DPRRetriever": {
            "debug": True
        },
        "Reader": {
            "debug": True
        }
    }
)
  • You can provide debug as a global parameter when running your pipeline:
    result = p_classifier.run(
        query="Who is the father of Arya Stark?",
        params={
            "debug": True
        }
    )

If you enable debugging, the _debug key of the dictionary returned by the pipeline stores the debugging information.

result = pipeline.run(...)
result["_debug"]

You can expect a top level key with the name of the node. Under that, there is an input and output key.

{'ESRetriever': {'input': {'debug': True,
                           'query': 'Who is the father of Arya Stark?',
                           'root_node': 'Query',
                           'top_k': 1},
                 'output': {'documents': [<Document: {'content': "\n===In the Riverlands===\nThe Stark army reaches the Twins, a bridge strong", ...}>]
                            ...}

Custom Debugging Functionality

Nodes can also return custom debugging information. To return debug data from a node, add a _debug key in the output dictionary. The value should be a dictionary. For instance:

def run(self, query: str):
    if "?" in query:
        return {"_debug": "The query contained a question mark", "output_1"
    else:
        return {"_debug": "The query did not contain a question mark", "output_2"

This _debug gets inserted into the "global" _debug dictionary storing debug data for each node, under the runtime key. The final output may look as follows:

{
    "answers": ...,
    "_debug": {
        "node_a": {
            "input": { ... },
            "output": { ... },
            "runtime": "The query contained a question mark"
        },
        "node_b": {
            "input": { ... },
            "output": { ... },
            "runtime": "Some other debug info"
        }
    }
}

A node in a pipeline can access the global _debug from preceding nodes by adding a parameter called _debug in its run() method:

def run(self, query: str, _debug: dict):
    debug_info = _debug["PrecedingNodeA"]
    ...

This is also true for the run_batch() method.

YAML File Definitions

You can define and load pipelines in YAML files. This is particularly useful when you move between experimentation and production environments. Just export the YAML from your notebook or IDE using Pipeline.save_to_yaml()and import it into your production environment. Having your pipeline in a YAML file helps with version control of pipelines, makes it easy to share your pipelines with colleagues and configure pipeline parameters in production.

For example, you can define a simple Retriever-Reader pipeline and save it to a file called sample.haystack-pipeline.yml:

version: "1.2.0"

components: # define all the building-blocks for Pipeline
  - name: MyReader # custom-name for the component; helpful for visualization & debugging
    type: FARMReader # Haystack Class name for the component
    params:
      no_ans_boost: -10
      model_name_or_path: deepset/roberta-base-squad2
  - name: MyESRetriever
    type: BM25Retriever
    params:
      document_store: MyDocumentStore # params can reference other components defined in the YAML
      custom_query: null
  - name: MyDocumentStore
    type: ElasticsearchDocumentStore
    params:
      index: haystack_test

pipelines: # multiple Pipelines can be defined using the components from above
  - name: my_query_pipeline # a simple extractive-qa Pipeline
    nodes:
      - name: MyESRetriever
        inputs: [Query]
      - name: MyReader
        inputs: [MyESRetriever]

The components section defines the pipeline nodes. The pipelines section defines how these nodes are added into the pipeline.

The above example is the equivalent of the following python code:

pipeline = Pipeline()
pipeline.add_node(component=MyRetriever, name="MyESRetriever", inputs=["Query"])
pipeline.add_node(component=MyReader, name='MyReader', inputs=['MyESRetriever'])

To load, simply call:

pipeline = Pipeline.load_from_yaml(Path("sample.haystack-pipeline.yml"))

You can also define indexing pipelines via YAML. Nodes such as the PreProcessor, DocumentClassifier, EntityExtractor, FileTypeClassifier, and the Converters are all compatible.

pipelines:
- name: indexing
  nodes:
  - inputs:
    - File
    name: FileTypeClassifier
  - inputs:
    - FileTypeClassifier
    name: PdfConverter
  - inputs:
    - PdfConverter
    name: Preprocessor
  - inputs:
    - Preprocessor
    name: DocumentClassifier
  - inputs:
    - DocumentClassifier
    name: DocumentStore

For another example YAML configuration, check out Pipelines in GitHub.

Validating Your YAML

To validate your file manually, run:

from pathlib import Path
from haystack.pipelines.config import validate_yaml
validate_yaml(Path("path_to_your_file"))

You can also use your IDE to validate the file. The *.haystack-pipeline.yml suffix tells your IDE that this YAML contains Haystack pipeline configuration. It makes it possible for your IDE to show you feedback and autocompletion features, assuming that you have the relevant plugins like YAML for VSCode.

You can find the schema used for validation in SchemaStore and in our repository.

An update to the Haystack version might require small updates to the YAML files. You can set version to unstable in the pipeline YAML to circumvent validation or set it to the latest Haystack version if the components and parameters that you use are compatible with the latest version.

Ready-Made Pipelines

We added some ready-made pipelines that you can use to run standard patterns with very few lines of code. To learn more about these, see the Ready-Made Pipelines and Pipelines API.

Examples:

from haystack.pipelines import DocumentSearchPipeline, ExtractiveQAPipeline
from haystack.nodes import JoinDocuments
from haystack import Pipeline

# Extractive QA
qa_pipe = ExtractiveQAPipeline(reader=reader, retriever=retriever)
res = qa_pipe.run(query="When was Kant born?", params={"retriever": {"top_k": 3}, "reader": {"top_k": 5}})

# Document Search
doc_pipe = DocumentSearchPipeline(retriever=retriever)
res = doc_pipe.run(query="Physics Einstein", params={"retriever": {"top_k": 3}})

# Generative QA
doc_pipe = GenerativeQAPipeline(generator=rag_generator, retriever=retriever)
res = doc_pipe.run(query="Physics Einstein", params={"retriever": {"top_k": 3}})

# FAQ based QA
doc_pipe = FAQPipeline(retriever=retriever)
res = doc_pipe.run(query="How can I change my address?", params={"retriever": {"top_k": 3}})

Example: Multiple Retrievers

You can now also use multiple Retrievers and join their results:

from haystack import Pipeline

p = Pipeline()
p.add_node(component=es_retriever, name="ESRetriever", inputs=["Query"])
p.add_node(component=dpr_retriever, name="DPRRetriever", inputs=["Query"])
p.add_node(component=JoinDocuments(join_mode="concatenate"), name="JoinResults", inputs=["ESRetriever", "DPRRetriever"])
p.add_node(component=reader, name="QAReader", inputs=["JoinResults"])
res = p.run(query="What did Einstein work on?", params={"ESRetriever": {"top_k": 1}, "DPRRetriever": {"top_k": 3}})
352

A pipeline with multiple Retrievers.

Example: Creating a Retriever-Ranker-Summarizer Pipeline

In this example, we'll look at how to establish a custom Retriever-Ranker-Summarizer pipeline. It's useful to add a Ranker to a summarization pipeline because the output of the Summarizer depends on the order of the documents that it receives.

from haystack import Pipeline

pipeline = Pipeline()

To create new pipeline nodes, we initialize the modules first. For our use case, we need a Retriever, a Ranker, and a Summarizer. We tell the Summarizer to return a single summary per query (instead of one summary for each Document), and that its length should be somewhere between ten and 300 words:

from haystack.nodes import BM25Retriever, SentenceTransformersRanker, TransformersSummarizer

retriever = BM25Retriever(document_store, top_k=10)
ranker = SentenceTransformersRanker(model_name_or_path="cross-encoder/ms-marco-MiniLM-L-12-v2", top_k=10)
summarizer = TransformersSummarizer(model_name_or_path='t5-large', min_length=10, max_length=300, generate_single_summary=True)

We add the nodes to the pipeline:

pipeline.add_node(component=retriever, name='Retriever', inputs=['Query'])
pipeline.add_node(component=ranker, name='Ranker', inputs=['Retriever'])
pipeline.add_node(component=summarizer, name='Summarizer', inputs=['Ranker'])

Let's now run our custom pipeline on the Harry Potter Wiki dataset. A typical application for this pipeline is a situation where we want high-level information about our corpus that is not necessarily contained within one document. We retrieve multiple documents, rank them, and let the Summarizer return a single summary of all the texts.

query = "What's the history of Quidditch?"
result = pipeline.run(query=query)

The pipeline returns a dictionary that contains the query, the name of the last node, and a list of Documents:

result.keys()

>>> dict_keys(['documents', 'query', 'node_id'])

Because we requested a single summary of all the texts we inputted to the Summarizer, the list of Documents contains only one item. We access the summary through the text attribute:

result['documents'][0].text

>>> "the first record of a primitive form of Quidditch (''Kwidditch'') dates to c. 1050. the first known reference to wizards using broomsticks as a means of conveyance dates to A.D. 963. a variant of the game, Quodpot, was invented in the eighteenth century. in the middle of the 14th century it was made a protected species by the wizards council."

Distributed Pipelines with Ray

Ray is a framework for distributed computing. It allows distributing a pipeline's components across a cluster of machines. The individual components of a pipeline can be independently scaled. For instance, an extractive QA Pipeline deployment can have three replicas of the Reader and a single replica for the Retriever. It enables efficient resource utilization by horizontally scaling components.

To set the number of replicas, add replicas in the YAML config for the node in a pipeline:

version: 1.12.2
extras: ray

components:
    ...

pipelines:
  - name: query_pipeline
    nodes:
      - name: ESRetriever
        inputs: [ Query ]
        serve_deployment_kwargs:
            replicas: 2  # number of replicas to create on the Ray cluster

A RayPipeline can only be created with a YAML Pipeline configuration:

from haystack.pipelines import RayPipeline

pipeline = RayPipeline.load_from_yaml(path="my_pipelines.haystack-pipeline.yml", pipeline_name="query_pipeline")
pipeline.run(query="What is the capital of Germany?")

By default, RayPipelines creates an instance of RayServe locally. To connect to an existing Ray instance, set the address parameter when creating the RayPipeline instance.