DocumentationAPI Reference📓 Tutorials🧑‍🍳 Cookbook🤝 Integrations💜 Discord🎨 Studio
Documentation

AsyncPipeline

Use AsyncPipeline to run multiple Haystack components at the same time for faster processing.

The AsyncPipeline in Haystack introduces asynchronous execution capabilities, enabling concurrent component execution when dependencies allow. This optimizes performance, particularly in complex pipelines where multiple independent components can run in parallel.

The AsyncPipeline provides significant performance improvements in scenarios such as:

  • Hybrid retrieval pipelines, where multiple Retrievers can run in parallel,
  • Multiple LLM calls that can be executed concurrently,
  • Complex pipelines with independent branches of execution,
  • I/O-bound operations that benefit from asynchronous execution.

Key Features

Concurrent Execution

The AsyncPipeline schedules components based on input readiness and dependency resolution, ensuring efficient parallel execution when possible. For example, in a hybrid retrieval scenario, multiple Retrievers can run simultaneously if they do not depend on each other.

Execution Methods

The AsyncPipeline offers three ways to run your pipeline:

Synchronous Run (run)

Executes the pipeline synchronously with the provided input data. This method is blocking, making it suitable for environments where asynchronous execution is not possible or desired. Although components execute concurrently internally, the method blocks until the pipeline completes.

Asynchronous Run (run_async)

Executes the pipeline in an asynchronous manner, allowing non-blocking execution. This method is ideal when integrating the pipeline into an async workflow, enabling smooth operation within larger async applications or services.

Asynchronous Generator (run_async_generator)

Allows step-by-step execution by yielding partial outputs as components complete their tasks. This is particularly useful for monitoring progress, debugging, and handling outputs incrementally. It differs from run_async, which executes the pipeline in a single async call.

In an AsyncPipeline, components such as A and B will run in parallel only if they have no shared dependencies and can process inputs independently.

Concurrency Control

You can control the maximum number of components that run simultaneously using the concurrency_limit parameter to ensure controlled resource usage.

You can find more details in our API Reference, or directly in the pipeline's GitHub code.

Example

import asyncio
from haystack import AsyncPipeline
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers import InMemoryEmbeddingRetriever, InMemoryBM25Retriever
from haystack.components.joiners import DocumentJoiner
from haystack.component.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator

hybrid_rag_retrieval = AsyncPipeline()
hybrid_rag_retrieval.add_component("text_embedder", SentenceTransformersTextEmbedder())
hybrid_rag_retrieval.add_component("embedding_retriever", InMemoryEmbeddingRetriever(document_store=document_store))
hybrid_rag_retrieval.add_component("bm25_retriever", InMemoryBM25Retriever(document_store=document_store))
hybrid_rag_retrieval.add_component("document_joiner", DocumentJoiner())
hybrid_rag_retrieval.add_component("prompt_builder", ChatPromptBuilder(template=prompt_template))
hybrid_rag_retrieval.add_component("llm", OpenAIChatGenerator())

hybrid_rag_retrieval.connect("text_embedder", "embedding_retriever")
hybrid_rag_retrieval.connect("bm25_retriever", "document_joiner")
hybrid_rag_retrieval.connect("embedding_retriever", "document_joiner")
hybrid_rag_retrieval.connect("document_joiner", "prompt_builder.documents")
hybrid_rag_retrieval.connect("prompt_builder", "llm")

async def process_results():
    async for partial_output in hybrid_rag_retrieval.run_async_generator(
            data=data,
            include_outputs_from={"document_joiner", "llm"}
    ):
        # Each partial_output contains the results from a completed component
        if "retriever" in partial_output:
            print("Retrieved documents:", len(partial_output["document_joiner"]["documents"]))
        if "llm" in partial_output:
            print("Generated answer:", partial_output["llm"]["replies"][0])

asyncio.run(process_results())