BranchJoiner
Use this component to join different branches of a pipeline into a single output.
Most common position in a pipeline | Flexible: Can appear at the beginning of a pipeline or at the start of loops. |
Mandatory init variables | "type": The type of data expected from preceding components |
Mandatory run variables | “**kwargs”: Any input data type defined at the initialization. This input is variadic, meaning you can connect a variable number of components to it. |
Output variables | “value”: The first value received from the connected components. |
API reference | Joiners |
GitHub link | https://github.com/deepset-ai/haystack/blob/main/haystack/components/joiners/branch.py |
Overview
BranchJoiner
joins multiple branches in a pipeline, allowing their outputs to be reconciled into a single branch. This is especially useful in pipelines with multiple branches that need to be unified before moving to the single component that comes next.
BranchJoiner
receives multiple data connections of the same type from other components and passes the first value it receives to its single output. This makes it essential for closing loops in pipelines or reconciling multiple branches from a decision component.
BranchJoiner
can handle only one input of one data type, declared in the __init__
function. It ensures that the data type remains consistent across the pipeline branches. If more than one value is received for the input when run
is invoked, the component will raise an error:
from haystack.components.joiners import BranchJoiner
bj = BranchJoiner(int)
bj.run(value=[3, 4, 5])
>>> ValueError: BranchJoiner expects only one input, but 3 were received.
Usage
On its own
Although only one input value is allowed at every run, due to its variadic nature BranchJoiner
still expects a list. As an example:
from haystack.components.joiners import BranchJoiner
# an example where input and output are strings
bj = BranchJoiner(str)
bj.run(value=["hello"])
>>> {"value" : "hello"}
# an example where input and output are integers
bj = BranchJoiner(int)
bj.run(value=[3])
>>> {"value": 3}
In a pipeline
Enabling loops
Below is an example where BranchJoiner
is used for closing a loop. In this example, BranchJoiner
receives a looped-back list of ChatMessage
objects from the JsonSchemaValidator
and sends it down to the OpenAIChatGenerator
for re-generation.
import json
from typing import List
from haystack import Pipeline
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.joiners import BranchJoiner
from haystack.components.validators import JsonSchemaValidator
from haystack.dataclasses import ChatMessage
person_schema = {
"type": "object",
"properties": {
"first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"nationality": {"type": "string", "enum": ["Italian", "Portuguese", "American"]},
},
"required": ["first_name", "last_name", "nationality"]
}
# Initialize a pipeline
pipe = Pipeline()
# Add components to the pipeline
pipe.add_component('joiner', BranchJoiner(List[ChatMessage]))
pipe.add_component('fc_llm', OpenAIChatGenerator(model="gpt-4o-mini"))
pipe.add_component('validator', JsonSchemaValidator(json_schema=person_schema))
pipe.add_component('adapter', OutputAdapter("{{chat_message}}", List[ChatMessage]))
# Connect components
pipe.connect("adapter", "joiner")
pipe.connect("joiner", "fc_llm")
pipe.connect("fc_llm.replies", "validator.messages")
pipe.connect("validator.validation_error", "joiner")
result = pipe.run(data={
"fc_llm": {"generation_kwargs": {"response_format": {"type": "json_object"}}},
"adapter": {"chat_message": [ChatMessage.from_user("Create json object from Peter Parker")]}
})
print(json.loads(result["validator"]["validated"][0].content))
# Output:
# {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation':
# 'Superhero', 'age': 23, 'location': 'New York City'}
Expand to see the pipeline graph
Reconciling branches
In this example, the TextLanguageRouter
component directs the query to one of three language-specific Retrievers. The next component would be a PromptBuilder
, but we cannot connect multiple Retrievers to a single PromptBuilder
directly. Instead, we connect all the Retrievers to the BranchJoiner
component. The BranchJoiner
then takes the output from the Retriever that was actually called and passes it as a single list of documents to the PromptBuilder
. The BranchJoiner
ensures that the pipeline can handle multiple languages seamlessly by consolidating different outputs from the Retrievers into a unified connection for further processing.
from haystack import Document, Pipeline
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.joiners import BranchJoiner
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from haystack.components.routers import TextLanguageRouter
prompt_template = """
Answer the quesiton based on the given reviews.
Reviews:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question: {{ query}}
Answer:
"""
documents = [
Document(
content="Super appartement. Juste au dessus de plusieurs bars qui ferment très tard. A savoir à l'avance. (Bouchons d'oreilles fournis !)"
),
Document(
content="El apartamento estaba genial y muy céntrico, todo a mano. Al lado de la librería Lello y De la Torre de los clérigos. Está situado en una zona de marcha, así que si vais en fin de semana , habrá ruido, aunque a nosotros no nos molestaba para dormir"
),
Document(
content="The keypad with a code is convenient and the location is convenient. Basically everything else, very noisy, wi-fi didn't work, check-in person didn't explain anything about facilities, shower head was broken, there's no cleaning and everything else one may need is charged."
),
Document(
content="It is very central and appartement has a nice appearance (even though a lot IKEA stuff), *W A R N I N G** the appartement presents itself as a elegant and as a place to relax, very wrong place to relax - you cannot sleep in this appartement, even the beds are vibrating from the bass of the clubs in the same building - you get ear plugs from the hotel."
),
Document(
content="Céntrico. Muy cómodo para moverse y ver Oporto. Edificio con terraza propia en la última planta. Todo reformado y nuevo. Te traen un estupendo desayuno todas las mañanas al apartamento. Solo que se puede escuchar algo de ruido de la calle a primeras horas de la noche. Es un zona de ocio nocturno. Pero respetan los horarios."
),
]
en_document_store = InMemoryDocumentStore()
fr_document_store = InMemoryDocumentStore()
es_document_store = InMemoryDocumentStore()
rag_pipeline = Pipeline()
rag_pipeline.add_component(instance=TextLanguageRouter(["en", "fr", "es"]), name="router")
rag_pipeline.add_component(instance=InMemoryBM25Retriever(document_store=en_document_store), name="en_retriever")
rag_pipeline.add_component(instance=InMemoryBM25Retriever(document_store=fr_document_store), name="fr_retriever")
rag_pipeline.add_component(instance=InMemoryBM25Retriever(document_store=es_document_store), name="es_retriever")
rag_pipeline.add_component(instance=BranchJoiner(type_=list[Document]), name="joiner")
rag_pipeline.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder")
rag_pipeline.add_component(instance=OpenAIGenerator(), name="llm")
rag_pipeline.connect("router.en", "en_retriever.query")
rag_pipeline.connect("router.fr", "fr_retriever.query")
rag_pipeline.connect("router.es", "es_retriever.query")
rag_pipeline.connect("en_retriever", "joiner")
rag_pipeline.connect("fr_retriever", "joiner")
rag_pipeline.connect("es_retriever", "joiner")
rag_pipeline.connect("joiner", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")
en_question = "Does this apartment has a noise problem?"
result = rag_pipeline.run({"router": {"text": en_question}, "prompt_builder": {"query": en_question}})
print(result["llm"]["replies"][0])
Expand to see the pipeline graph
Updated 4 months ago