DocumentationAPI Reference📓 Tutorials🧑‍🍳 Cookbook🤝 Integrations💜 Discord🎨 Studio (Waitlist)
API Reference

Components that join list of different objects

Module document_joiner

JoinMode

Enum for join mode.

JoinMode.from_str

@staticmethod
def from_str(string: str) -> "JoinMode"

Convert a string to a JoinMode enum.

DocumentJoiner

Joins multiple lists of documents into a single list.

It supports different join modes:

  • concatenate: Keeps the highest-scored document in case of duplicates.
  • merge: Calculates a weighted sum of scores for duplicates and merges them.
  • reciprocal_rank_fusion: Merges and assigns scores based on reciprocal rank fusion.
  • distribution_based_rank_fusion: Merges and assigns scores based on scores distribution in each Retriever.

Usage example:

document_store = InMemoryDocumentStore()
p = Pipeline()
p.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="bm25_retriever")
p.add_component(
        instance=SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2"),
        name="text_embedder",
    )
p.add_component(instance=InMemoryEmbeddingRetriever(document_store=document_store), name="embedding_retriever")
p.add_component(instance=DocumentJoiner(), name="joiner")
p.connect("bm25_retriever", "joiner")
p.connect("embedding_retriever", "joiner")
p.connect("text_embedder", "embedding_retriever")
query = "What is the capital of France?"
p.run(data={"query": query})

DocumentJoiner.__init__

def __init__(join_mode: Union[str, JoinMode] = JoinMode.CONCATENATE,
             weights: Optional[List[float]] = None,
             top_k: Optional[int] = None,
             sort_by_score: bool = True)

Creates a DocumentJoiner component.

Arguments:

  • join_mode: Specifies the join mode to use. Available modes:
  • concatenate: Keeps the highest-scored document in case of duplicates.
  • merge: Calculates a weighted sum of scores for duplicates and merges them.
  • reciprocal_rank_fusion: Merges and assigns scores based on reciprocal rank fusion.
  • distribution_based_rank_fusion: Merges and assigns scores based on scores distribution in each Retriever.
  • weights: Assign importance to each list of documents to influence how they're joined. This parameter is ignored for concatenate or distribution_based_rank_fusion join modes. Weight for each list of documents must match the number of inputs.
  • top_k: The maximum number of documents to return.
  • sort_by_score: If True, sorts the documents by score in descending order. If a document has no score, it is handled as if its score is -infinity.

DocumentJoiner.run

@component.output_types(documents=List[Document])
def run(documents: Variadic[List[Document]], top_k: Optional[int] = None)

Joins multiple lists of Documents into a single list depending on the join_mode parameter.

Arguments:

  • documents: List of list of documents to be merged.
  • top_k: The maximum number of documents to return. Overrides the instance's top_k if provided.

Returns:

A dictionary with the following keys:

  • documents: Merged list of Documents

DocumentJoiner.to_dict

def to_dict() -> Dict[str, Any]

Serializes the component to a dictionary.

Returns:

Dictionary with serialized data.

DocumentJoiner.from_dict

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "DocumentJoiner"

Deserializes the component from a dictionary.

Arguments:

  • data: The dictionary to deserialize from.

Returns:

The deserialized component.

Module branch

BranchJoiner

A component to join different branches of a pipeline into one single output.

BranchJoiner receives multiple data connections of the same type from other components and passes the first value coming to its single output, possibly distributing it to various other components.

BranchJoiner is fundamental to close loops in a pipeline, where the two branches it joins are the ones coming from the previous component and one coming back from a loop. For example, BranchJoiner could be used to send data to a component evaluating errors. BranchJoiner would receive two connections, one to get the original data and another one to get modified data in case there was an error. In both cases, BranchJoiner would send (or re-send in case of a loop) data to the component evaluating errors. See "Usage example" below.

Another use case with a need for BranchJoiner is to reconcile multiple branches coming out of a decision or Classifier component. For example, in a RAG pipeline, there might be a "query language classifier" component sending the query to different retrievers, selecting one specifically according to the detected language. After the retrieval step the pipeline would ideally continue with a PromptBuilder, and since we don't know in advance the language of the query, all the retrievers should be ideally connected to the single PromptBuilder. Since the PromptBuilder won't accept more than one connection in input, we would connect all the retrievers to a BranchJoiner component and reconcile them in a single output that can be connected to the PromptBuilder downstream.

Usage example:

import json
from typing import List

from haystack import Pipeline
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.joiners import BranchJoiner
from haystack.components.validators import JsonSchemaValidator
from haystack.dataclasses import ChatMessage

person_schema = {
    "type": "object",
    "properties": {
        "first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
        "last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
        "nationality": {"type": "string", "enum": ["Italian", "Portuguese", "American"]},
    },
    "required": ["first_name", "last_name", "nationality"]
}

# Initialize a pipeline
pipe = Pipeline()

# Add components to the pipeline
pipe.add_component('joiner', BranchJoiner(List[ChatMessage]))
pipe.add_component('fc_llm', OpenAIChatGenerator(model="gpt-3.5-turbo-0125"))
pipe.add_component('validator', JsonSchemaValidator(json_schema=person_schema))
pipe.add_component('adapter', OutputAdapter("{{chat_message}}", List[ChatMessage])),
# And connect them
pipe.connect("adapter", "joiner")
pipe.connect("joiner", "fc_llm")
pipe.connect("fc_llm.replies", "validator.messages")
pipe.connect("validator.validation_error", "joiner")

result = pipe.run(data={"fc_llm": {"generation_kwargs": {"response_format": {"type": "json_object"}}},
                        "adapter": {"chat_message": [ChatMessage.from_user("Create json from Peter Parker")]}})

print(json.loads(result["validator"]["validated"][0].content))


>> {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation':
>> 'Superhero', 'age': 23, 'location': 'New York City'}

Note that BranchJoiner can manage only one data type at a time. In this case, BranchJoiner is created for passing List[ChatMessage]. This determines the type of data that BranchJoiner will receive from the upstream connected components and also the type of data that BranchJoiner will send through its output.

In the code example, BranchJoiner receives a looped back List[ChatMessage] from the JsonSchemaValidator and sends it down to the OpenAIChatGenerator for re-generation. We can have multiple loopback connections in the pipeline. In this instance, the downstream component is only one (the OpenAIChatGenerator), but the pipeline might have more than one downstream component.

BranchJoiner.__init__

def __init__(type_: Type)

Create a BranchJoiner component.

Arguments:

  • type_: The type of data that the BranchJoiner will receive from the upstream connected components and distribute to the downstream connected components.

BranchJoiner.to_dict

def to_dict()

Serializes the component to a dictionary.

Returns:

Dictionary with serialized data.

BranchJoiner.from_dict

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "BranchJoiner"

Deserializes the component from a dictionary.

Arguments:

  • data: Dictionary to deserialize from.

Returns:

Deserialized component.

BranchJoiner.run

def run(**kwargs)

The run method of the BranchJoiner component.

Multiplexes the input data from the upstream connected components and distributes it to the downstream connected components.

Arguments:

  • **kwargs: The input data. Must be of the type declared in __init__.

Returns:

A dictionary with the following keys:

  • value: The input data.

Module answer_joiner

JoinMode

Enum for AnswerJoiner join modes.

JoinMode.from_str

@staticmethod
def from_str(string: str) -> "JoinMode"

Convert a string to a JoinMode enum.

AnswerJoiner

Merges multiple lists of Answer objects into a single list.

Use this component to combine answers from different Generators into a single list. Currently, the component supports only one join mode: CONCATENATE. This mode concatenates multiple lists of answers into a single list.

Usage example

In this example, AnswerJoiner merges answers from two different Generators:

from haystack.components.builders import AnswerBuilder
from haystack.components.joiners import AnswerJoiner

from haystack.core.pipeline import Pipeline

from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage


query = "What's Natural Language Processing?"
messages = [ChatMessage.from_system("You are a helpful, respectful and honest assistant. Be super concise."),
            ChatMessage.from_user(query)]

pipe = Pipeline()
pipe.add_component("gpt-4o", OpenAIChatGenerator(model="gpt-4o"))
pipe.add_component("llama", OpenAIChatGenerator(model="gpt-3.5-turbo"))
pipe.add_component("aba", AnswerBuilder())
pipe.add_component("abb", AnswerBuilder())
pipe.add_component("joiner", AnswerJoiner())

pipe.connect("gpt-4o.replies", "aba")
pipe.connect("llama.replies", "abb")
pipe.connect("aba.answers", "joiner")
pipe.connect("abb.answers", "joiner")

results = pipe.run(data={"gpt-4o": {"messages": messages},
                            "llama": {"messages": messages},
                            "aba": {"query": query},
                            "abb": {"query": query}})

AnswerJoiner.__init__

def __init__(join_mode: Union[str, JoinMode] = JoinMode.CONCATENATE,
             top_k: Optional[int] = None,
             sort_by_score: bool = False)

Creates an AnswerJoiner component.

Arguments:

  • join_mode: Specifies the join mode to use. Available modes:
  • concatenate: Concatenates multiple lists of Answers into a single list.
  • top_k: The maximum number of Answers to return.
  • sort_by_score: If True, sorts the documents by score in descending order. If a document has no score, it is handled as if its score is -infinity.

AnswerJoiner.run

@component.output_types(answers=List[AnswerType])
def run(answers: Variadic[List[AnswerType]], top_k: Optional[int] = None)

Joins multiple lists of Answers into a single list depending on the join_mode parameter.

Arguments:

  • answers: Nested list of Answers to be merged.
  • top_k: The maximum number of Answers to return. Overrides the instance's top_k if provided.

Returns:

A dictionary with the following keys:

  • answers: Merged list of Answers

AnswerJoiner.to_dict

def to_dict() -> Dict[str, Any]

Serializes the component to a dictionary.

Returns:

Dictionary with serialized data.

AnswerJoiner.from_dict

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "AnswerJoiner"

Deserializes the component from a dictionary.

Arguments:

  • data: The dictionary to deserialize from.

Returns:

The deserialized component.