AsyncPipeline
Use AsyncPipeline to run multiple Haystack components at the same time for faster processing.
The AsyncPipeline
in Haystack introduces asynchronous execution capabilities, enabling concurrent component execution when dependencies allow. This optimizes performance, particularly in complex pipelines where multiple independent components can run in parallel.
The AsyncPipeline
provides significant performance improvements in scenarios such as:
- Hybrid retrieval pipelines, where multiple Retrievers can run in parallel,
- Multiple LLM calls that can be executed concurrently,
- Complex pipelines with independent branches of execution,
- I/O-bound operations that benefit from asynchronous execution.
Key Features
Concurrent Execution
The AsyncPipeline
schedules components based on input readiness and dependency resolution, ensuring efficient parallel execution when possible. For example, in a hybrid retrieval scenario, multiple Retrievers can run simultaneously if they do not depend on each other.
Execution Methods
The AsyncPipeline
offers three ways to run your pipeline:
Synchronous Run (run
)
run
)Executes the pipeline synchronously with the provided input data. This method is blocking, making it suitable for environments where asynchronous execution is not possible or desired. Although components execute concurrently internally, the method blocks until the pipeline completes.
Asynchronous Run (run_async
)
run_async
)Executes the pipeline in an asynchronous manner, allowing non-blocking execution. This method is ideal when integrating the pipeline into an async workflow, enabling smooth operation within larger async applications or services.
Asynchronous Generator (run_async_generator
)
run_async_generator
)Allows step-by-step execution by yielding partial outputs as components complete their tasks. This is particularly useful for monitoring progress, debugging, and handling outputs incrementally. It differs from run_async
, which executes the pipeline in a single async call.
In an AsyncPipeline
, components such as A and B will run in parallel only if they have no shared dependencies and can process inputs independently.
Concurrency Control
You can control the maximum number of components that run simultaneously using the concurrency_limit
parameter to ensure controlled resource usage.
You can find more details in our API Reference, or directly in the pipeline's GitHub code.
Example
import asyncio
from haystack import AsyncPipeline
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers import InMemoryEmbeddingRetriever, InMemoryBM25Retriever
from haystack.components.joiners import DocumentJoiner
from haystack.component.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
hybrid_rag_retrieval = AsyncPipeline()
hybrid_rag_retrieval.add_component("text_embedder", SentenceTransformersTextEmbedder())
hybrid_rag_retrieval.add_component("embedding_retriever", InMemoryEmbeddingRetriever(document_store=document_store))
hybrid_rag_retrieval.add_component("bm25_retriever", InMemoryBM25Retriever(document_store=document_store))
hybrid_rag_retrieval.add_component("document_joiner", DocumentJoiner())
hybrid_rag_retrieval.add_component("prompt_builder", ChatPromptBuilder(template=prompt_template))
hybrid_rag_retrieval.add_component("llm", OpenAIChatGenerator())
hybrid_rag_retrieval.connect("text_embedder", "embedding_retriever")
hybrid_rag_retrieval.connect("bm25_retriever", "document_joiner")
hybrid_rag_retrieval.connect("embedding_retriever", "document_joiner")
hybrid_rag_retrieval.connect("document_joiner", "prompt_builder.documents")
hybrid_rag_retrieval.connect("prompt_builder", "llm")
async def process_results():
async for partial_output in hybrid_rag_retrieval.run_async_generator(
data=data,
include_outputs_from={"document_joiner", "llm"}
):
# Each partial_output contains the results from a completed component
if "retriever" in partial_output:
print("Retrieved documents:", len(partial_output["document_joiner"]["documents"]))
if "llm" in partial_output:
print("Generated answer:", partial_output["llm"]["replies"][0])
asyncio.run(process_results())
Updated 1 day ago