DocumentationAPI Reference📓 Tutorials🧑‍🍳 Cookbook🤝 Integrations💜 Discord🎨 Studio
Documentation

BranchJoiner

Use this component to join different branches of a pipeline into a single output.

NameBranchJoiner
Folder path/joiners/
Most common position in a pipelineFlexible: Can appear at the beginning of a pipeline or at the start of loops.
Mandatory input variables“**kwargs”: Any input data type defined at the initialization. This input is variadic, meaning you can connect a variable number of components to it.
Output variables“value”: The first value received from the connected components.

Overview

BranchJoiner joins multiple branches in a pipeline, allowing their outputs to be reconciled into a single branch. This is especially useful in pipelines with multiple branches that need to be unified before moving to the single component that comes next.

BranchJoiner receives multiple data connections of the same type from other components and passes the first value it receives to its single output. This makes it essential for closing loops in pipelines or reconciling multiple branches from a decision component.

BranchJoiner can handle only one input of one data type, declared in the __init__ function. It ensures that the data type remains consistent across the pipeline branches. If more than one value is received for the input when run is invoked, the component will raise an error:

from haystack.components.joiners import BranchJoiner

bj = BranchJoiner(int)
bj.run(value=[3, 4, 5])

>>> ValueError: BranchJoiner expects only one input, but 3 were received.

Usage

On its own

Although only one input value is allowed at every run, due to its variadic nature BranchJoiner still expects a list. As an example:

from haystack.components.joiners import BranchJoiner

# an example where input and output are strings
bj = BranchJoiner(str)
bj.run(value=["hello"])
>>> {"value" : "hello"}

# an example where input and output are integers
bj = BranchJoiner(int)
bj.run(value=[3])
>>> {"value": 3}

In a pipeline

Enabling loops

Below is an example where BranchJoiner is used for closing a loop. In this example, BranchJoiner receives a looped-back list of ChatMessage objects from the JsonSchemaValidator and sends it down to the OpenAIChatGenerator for re-generation.

import json
from typing import List

from haystack import Pipeline
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.joiners import BranchJoiner
from haystack.components.validators import JsonSchemaValidator
from haystack.dataclasses import ChatMessage

person_schema = {
    "type": "object",
    "properties": {
        "first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
        "last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
        "nationality": {"type": "string", "enum": ["Italian", "Portuguese", "American"]},
    },
    "required": ["first_name", "last_name", "nationality"]
}

# Initialize a pipeline
pipe = Pipeline()

# Add components to the pipeline
pipe.add_component('joiner', BranchJoiner(List[ChatMessage]))
pipe.add_component('fc_llm', OpenAIChatGenerator(model="gpt-3.5-turbo-0125"))
pipe.add_component('validator', JsonSchemaValidator(json_schema=person_schema))
pipe.add_component('adapter', OutputAdapter("{{chat_message}}", List[ChatMessage]))

# Connect components
pipe.connect("adapter", "joiner")
pipe.connect("joiner", "fc_llm")
pipe.connect("fc_llm.replies", "validator.messages")
pipe.connect("validator.validation_error", "joiner")

result = pipe.run(data={
    "fc_llm": {"generation_kwargs": {"response_format": {"type": "json_object"}}},
    "adapter": {"chat_message": [ChatMessage.from_user("Create json object from Peter Parker")]}
})

print(json.loads(result["validator"]["validated"][0].content))

# Output:
# {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation':
# 'Superhero', 'age': 23, 'location': 'New York City'}

Expand to see the pipeline graph

Reconciling branches

In this example, the TextLanguageRouter component directs the query to one of three language-specific Retrievers. The next component would be a PromptBuilder, but we cannot connect multiple Retrievers to a single PromptBuilder directly. Instead, we connect all the Retrievers to the BranchJoiner component. The BranchJoiner then takes the output from the Retriever that was actually called and passes it as a single list of documents to the PromptBuilder. The BranchJoiner ensures that the pipeline can handle multiple languages seamlessly by consolidating different outputs from the Retrievers into a unified connection for further processing.

from haystack import Document, Pipeline
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.components.joiners import BranchJoiner
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from haystack.components.routers import TextLanguageRouter

prompt_template = """
Answer the quesiton based on the given reviews.
Reviews:
  {% for doc in documents %}
    {{ doc.content }}
  {% endfor %}
Question: {{ query}}
Answer:
"""

documents = [
    Document(
        content="Super appartement. Juste au dessus de plusieurs bars qui ferment très tard. A savoir à l'avance. (Bouchons d'oreilles fournis !)"
    ),
    Document(
        content="El apartamento estaba genial y muy céntrico, todo a mano. Al lado de la librería Lello y De la Torre de los clérigos. Está situado en una zona de marcha, así que si vais en fin de semana , habrá ruido, aunque a nosotros no nos molestaba para dormir"
    ),
    Document(
        content="The keypad with a code is convenient and the location is convenient. Basically everything else, very noisy, wi-fi didn't work, check-in person didn't explain anything about facilities, shower head was broken, there's no cleaning and everything else one may need is charged."
    ),
    Document(
        content="It is very central and appartement has a nice appearance (even though a lot IKEA stuff), *W A R N I N G** the appartement presents itself as a elegant and as a place to relax, very wrong place to relax - you cannot sleep in this appartement, even the beds are vibrating from the bass of the clubs in the same building - you get ear plugs from the hotel."
    ),
    Document(
        content="Céntrico. Muy cómodo para moverse y ver Oporto. Edificio con terraza propia en la última planta. Todo reformado y nuevo. Te traen un estupendo desayuno todas las mañanas al apartamento. Solo que se puede escuchar algo de ruido de la calle a primeras horas de la noche. Es un zona de ocio nocturno. Pero respetan los horarios."
    ),
]

en_document_store = InMemoryDocumentStore()
fr_document_store = InMemoryDocumentStore()
es_document_store = InMemoryDocumentStore()

rag_pipeline = Pipeline()
rag_pipeline.add_component(instance=TextLanguageRouter(["en", "fr", "es"]), name="router")
rag_pipeline.add_component(instance=InMemoryBM25Retriever(document_store=en_document_store), name="en_retriever")
rag_pipeline.add_component(instance=InMemoryBM25Retriever(document_store=fr_document_store), name="fr_retriever")
rag_pipeline.add_component(instance=InMemoryBM25Retriever(document_store=es_document_store), name="es_retriever")
rag_pipeline.add_component(instance=BranchJoiner(type_=list[Document]), name="joiner")
rag_pipeline.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder")
rag_pipeline.add_component(instance=OpenAIGenerator(), name="llm")

rag_pipeline.connect("router.en", "en_retriever.query")
rag_pipeline.connect("router.fr", "fr_retriever.query")
rag_pipeline.connect("router.es", "es_retriever.query")
rag_pipeline.connect("en_retriever", "joiner")
rag_pipeline.connect("fr_retriever", "joiner")
rag_pipeline.connect("es_retriever", "joiner")
rag_pipeline.connect("joiner", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")

en_question = "Does this apartment has a noise problem?"

result = rag_pipeline.run({"router": {"text": en_question}, "prompt_builder": {"query": en_question}})

print(result["llm"]["replies"][0])

Expand to see the pipeline graph

Related Links

See the parameters details in our API reference: