Multiplexer
Use this component to facilitate connections in loops and other complex pipelines by letting many components send a value to the same input or distributing a single value to many components.
Name | Multiplexer |
Folder Path | /others/ |
Most common Position in a Pipeline | Flexible: for example, at the start of the Pipeline, at the start of loops, and so on. |
Mandatory Input variables | Defined at initialization |
Output variables | The same as the input |
Overview
Multiplexer is a component that accepts any number of input connections and distributes the first value that it receives to all the components that are connected to its output. Multiplexer’s main role is to facilitate the connection of other components together in non-trivial Pipelines.
All the expected input connections of a Multiplexer must be of the same type that needs to be defined at initialization time. The output is of the same type as the input. Note that when used in isolation, due to its variadic nature, Multiplexer always expects a list to wrap the input value. As an example:
from haystack.components.others import Multiplexer
# an example where input and output are strings
mp = Multiplexer(str)
mp.run(value=["hello"])
>>> {"value" : "hello"}
# an example where input and output are integers
mp = Multiplexer(int)
mp.run(value=[3])
>>> {"value": 3}
This component won't handle several inputs at the same time: it always only expects one at any given time. If more than one input value is received when run
is invoked, the component will raise an error:
from haystack.components.others import Multiplexer
mp = Multiplexer(int)
mp.run(value=[3, 4, 5])
>>> ValueError: Multiplexer expects only one input, but 3 were received.
mp = Multiplexer(Optional[int])
mp.run(value=[None, 4])
>>> ValueError: Multiplexer expects only one input, but 2 were received.
Multiplexer behaves differently than other Haystack components and will try to run as soon as any value is received.
Usage
Multiplexer is very flexible and covers several use cases, which we will explore further.
Distribute One Value to Many Components
As the Pipeline grows, it may happen that different components need the same value from the input, for example, query
. This means that the pipeline.run()
statement needs to distribute this value to many components, making the data
parameter or the run()
method very verbose.
For example, here is a hybrid retrieval generative Pipeline:
pipe = Pipeline()
pipe.add_component(instance=OpenAITextEmbedder(), name="query_embedder")
pipe.add_component(instance=InMemoryEmbeddingRetriever(document_store=document_store), name="embedding_retriever")
pipe.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="bm25_retriever")
pipe.add_component(instance=DocumentJoiner(sort_by_score=False), name="doc_joiner")
pipe.add_component(instance=TransformersSimilarityRanker(model="intfloat/simlm-msmarco-reranker", top_k=10), name="ranker")
pipe.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder")
pipe.add_component(instance=OpenAIGenerator(), name="llm")
pipe.add_component(instance=AnswerBuilder(), name="answer_builder")
pipe.connect("query_embedder", "embedding_retriever.query_embedding")
pipe.connect("embedding_retriever", "doc_joiner.documents")
pipe.connect("bm25_retriever", "doc_joiner.documents")
pipe.connect("doc_joiner", "ranker.documents")
pipe.connect("ranker", "prompt_builder.documents")
pipe.connect("prompt_builder", "llm")
pipe.connect("llm.replies", "answer_builder.replies")
pipe.connect("llm.meta", "answer_builder.meta")
pipe.connect("doc_joiner", "answer_builder.documents")
Click to see the initial Pipeline graph
This Pipeline includes:
- A BM25 Retriever,
- An Embedder,
- A Ranker,
- A Prompt Builder,
- An Answer Builder.
All of these need the query in order to operate. This means that the pipeline.run()
statement will be long and repetitive:
question = "Where does Mark live?"
result = pipe.run(
{
"query_embedder": {"text": question},
"bm25_retriever": {"query": question},
"ranker": {"query": question},
"prompt_builder": {"question": question},
"answer_builder": {"query": question},
}
)
In such case, a Multiplexer can be added at the start of the Pipeline to simplify the pipe.run()
call significantly. We can put a Multiplexer at the top and connect all components that need the query value to it. The Pipeline code will get a bit longer:
pipe = Pipeline()
# Add a Multiplexer to the pipeline
pipe.add_component(instance=Multiplexer(str), name="multiplexer")
pipe.add_component(instance=OpenAITextEmbedder(), name="query_embedder")
pipe.add_component(instance=InMemoryEmbeddingRetriever(document_store=document_store), name="embedding_retriever")
pipe.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="bm25_retriever")
pipe.add_component(instance=DocumentJoiner(sort_by_score=False), name="doc_joiner")
pipe.add_component(instance=TransformersSimilarityRanker(model="intfloat/simlm-msmarco-reranker", top_k=10), name="ranker")
pipe.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder")
pipe.add_component(instance=OpenAIGenerator(), name="llm")
pipe.add_component(instance=AnswerBuilder(), name="answer_builder")
# Connect the Multiplexer to all the components that need the query
pipe.connect("multiplexer.value", "query_embedder.text")
pipe.connect("multiplexer.value", "bm25_retriever.query")
pipe.connect("multiplexer.value", "ranker.query")
pipe.connect("multiplexer.value", "prompt_builder.question")
pipe.connect("multiplexer.value", "answer_builder.query")
pipe.connect("query_embedder", "embedding_retriever.query_embedding")
pipe.connect("embedding_retriever", "doc_joiner.documents")
pipe.connect("bm25_retriever", "doc_joiner.documents")
pipe.connect("doc_joiner", "ranker.documents")
pipe.connect("ranker", "prompt_builder.documents")
pipe.connect("prompt_builder", "llm")
pipe.connect("llm.replies", "answer_builder.replies")
pipe.connect("llm.meta", "answer_builder.meta")
pipe.connect("doc_joiner", "answer_builder.documents")
Click to see a Pipeline graph with the Multiplexer
However, the user only needs to provide the query parameter once to the Multiplexer, which will then take care of passing it along to everywhere that needs it, according to the Pipeline connections.
result = pipe.run({"multiplexer": {"value": "Where does Mark live?"}})
In this case, it is not important for the Multiplexer to accept variadic input. Instead, you can think of it as a redistributor of input values.
If you have multiple values to redistribute in this manner, you can use multiple Multiplexers.
Enabling loops
Multiplexer can be used to transform a regular input connection into a variadic one, which is often necessary to close a loop.
For instance, when building a Pipeline with an error correction loop between a Generator and a custom validation component, the multiplexer enables the validation component to ask the LLM to correct its own mistakes if the Generator provides an incorrect answer.
Click to see an example graph
Writing code for this Pipeline is not obvious: the LLM might be getting its prompt either from a PromptBuilder
or the validation component. However, the Generator has a single prompt
input, which makes it impossible to connect both components.
pipe.add_component("retriever", InMemoryBM25Retriever(document_store=document_store))
pipe.add_component("prompt_builder", PromptBuilder(template=template))
pipe.add_component("llm", OpenAIGenerator())
pipe.add_component("unwrapper", PromptBuilder("{% for reply in replies %}{{ reply }} {% endfor %}"))
pipe.add_component("checker", HallucinationChecker())
pipe.add_component("correction_prompt_builder", PromptBuilder(template=correction_template))
pipe.connect("retriever", "prompt_builder")
pipe.connect("prompt_builder", "llm")
pipe.connect("llm.replies", "unwrapper.replies")
pipe.connect("unwrapper.prompt", "checker.statement")
pipe.connect("retriever", "checker.documents")
pipe.connect("checker.hallucination", "correction_prompt_builder.hallucination")
pipe.connect("checker.contraddicting_documents", "correction_prompt_builder.documents")
# This connection will fail!
pipe.connect("correction_prompt_builder", "llm")
# >> PipelineConnectError: Cannot connect 'correction_prompt_builder.prompt'
# with 'llm.prompt': llm.prompt is already connected to ['prompt_builder'].
Here is where the variadic input of the Multiplexer comes to the rescue. By placing a Multiplexer on the prompt
input, it's now possible to connect both the PromptBuilder
and the validation node, and the Multiplexer will forward the value to the Generator in the way the Generator expects.
pipe = Pipeline(max_loops_allowed=5)
pipe.add_component("retriever", InMemoryBM25Retriever(document_store=document_store, top_k=3))
pipe.add_component("prompt_builder", PromptBuilder(template=template))
pipe.add_component("llm", OpenAIGenerator())
pipe.add_component("unwrapper", PromptBuilder("{% for reply in replies %}{{ reply }} {% endfor %}"))
pipe.add_component("checker", HallucinationChecker())
pipe.add_component("correction_prompt_builder", PromptBuilder(template=correction_template))
pipe.add_component("multiplexer", Multiplexer(str))
pipe.connect("retriever", "prompt_builder")
pipe.connect("prompt_builder", "multiplexer")
pipe.connect("multiplexer", "llm")
pipe.connect("llm.replies", "unwrapper.replies")
pipe.connect("unwrapper.prompt", "checker.statement")
pipe.connect("retriever", "checker.documents")
pipe.connect("checker.hallucination", "correction_prompt_builder.hallucination")
pipe.connect("checker.contraddicting_documents", "correction_prompt_builder.documents")
pipe.connect("correction_prompt_builder", "multiplexer")
Click to see the resulting Pipeline graph
Common Pitfall
In any of these Pipelines, it is impossible for Multiplexer ever to receive more than one value at a time. However, if your Pipeline gets more complex, you have to make sure this assumption is correct, as Multiplexer will throw an exception if it receives multiple values at the same time.
For example, a pipeline like this will fail:
pipeline = Pipeline()
pipeline.add_component("retriever", InMemoryBM25Retriever(document_store=InMemoryDocumentStore()))
pipeline.add_component("prompt_builder_a", PromptBuilder("Docs A: {{ docs }}"))
pipeline.add_component("prompt_builder_b", PromptBuilder("Docs B: {{ docs }}"))
pipeline.add_component("multiplexer", Multiplexer(str))
pipeline.connect("retriever", "prompt_builder_a")
pipeline.connect("retriever", "prompt_builder_b")
pipeline.connect("prompt_builder_a", "multiplexer")
pipeline.connect("prompt_builder_b", "multiplexer")
results = pipeline.run({
"prompt_builder_a": {"question": "a?"},
"prompt_builder_b": {"question": "b?"},
})
# >> ValueError: Multiplexer expects only one input, but 2 were received.
Click to see a graph of this Pipeline
Updated 10 months ago
See the parameters details in our API reference: