DocumentationAPI Reference📓 Tutorials🧑‍🍳 Cookbook🤝 Integrations💜 Discord🎨 Studio (Waitlist)
API Reference

Components that join list of different objects

Module document_joiner

DocumentJoiner

A component that joins multiple list of Documents into a single list.

It supports different joins modes:

  • concatenate: Keeps the highest scored Document in case of duplicates.
  • merge: Merge a calculate a weighted sum of the scores of duplicate Documents.
  • reciprocal_rank_fusion: Merge and assign scores based on reciprocal rank fusion.
  • distribution_based_rank_fusion: Merge and assign scores based on scores distribution in each retriever

Usage example:

document_store = InMemoryDocumentStore()
p = Pipeline()
p.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="bm25_retriever")
p.add_component(
        instance=SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2"),
        name="text_embedder",
    )
p.add_component(instance=InMemoryEmbeddingRetriever(document_store=document_store), name="embedding_retriever")
p.add_component(instance=DocumentJoiner(), name="joiner")
p.connect("bm25_retriever", "joiner")
p.connect("embedding_retriever", "joiner")
p.connect("text_embedder", "embedding_retriever")
query = "What is the capital of France?"
p.run(data={"query": query})

DocumentJoiner.__init__

def __init__(join_mode: str = "concatenate",
             weights: Optional[List[float]] = None,
             top_k: Optional[int] = None,
             sort_by_score: bool = True)

Create an DocumentJoiner component.

Arguments:

  • join_mode: Specifies the join mode to use. Available modes:
  • concatenate
  • merge
  • reciprocal_rank_fusion
  • distribution_based_rank_fusion
  • weights: Weight for each list of Documents received, must have the same length as the number of inputs. If join_mode is concatenate or distribution_based_rank_fusion this parameter is ignored.
  • top_k: The maximum number of Documents to return.
  • sort_by_score: If True sorts the Documents by score in descending order. If a Document has no score, it is handled as if its score is -infinity.

DocumentJoiner.run

@component.output_types(documents=List[Document])
def run(documents: Variadic[List[Document]], top_k: Optional[int] = None)

Joins multiple lists of Documents into a single list depending on the join_mode parameter.

Arguments:

  • documents: List of list of Documents to be merged.
  • top_k: The maximum number of Documents to return. Overrides the instance's top_k if provided.

Returns:

A dictionary with the following keys:

  • documents: Merged list of Documents

Module branch

BranchJoiner

A component to join different branches of a pipeline into one single output.

BranchJoiner receives multiple data connections of the same type from other components and passes the first value coming to its single output, possibly distributing it to various other components.

BranchJoiner is fundamental to close loops in a pipeline, where the two branches it joins are the ones coming from the previous component and one coming back from a loop. For example, BranchJoiner could be used to send data to a component evaluating errors. BranchJoiner would receive two connections, one to get the original data and another one to get modified data in case there was an error. In both cases, BranchJoiner would send (or re-send in case of a loop) data to the component evaluating errors. See "Usage example" below.

Another use case with a need for BranchJoiner is to reconcile multiple branches coming out of a decision or Classifier component. For example, in a RAG pipeline, there might be a "query language classifier" component sending the query to different retrievers, selecting one specifically according to the detected language. After the retrieval step the pipeline would ideally continue with a PromptBuilder, and since we don't know in advance the language of the query, all the retrievers should be ideally connected to the single PromptBuilder. Since the PromptBuilder won't accept more than one connection in input, we would connect all the retrievers to a BranchJoiner component and reconcile them in a single output that can be connected to the PromptBuilder downstream.

Usage example:

import json
from typing import List

from haystack import Pipeline
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.joiners import BranchJoiner
from haystack.components.validators import JsonSchemaValidator
from haystack.dataclasses import ChatMessage

person_schema = {
    "type": "object",
    "properties": {
        "first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
        "last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
        "nationality": {"type": "string", "enum": ["Italian", "Portuguese", "American"]},
    },
    "required": ["first_name", "last_name", "nationality"]
}

# Initialize a pipeline
pipe = Pipeline()

# Add components to the pipeline
pipe.add_component('joiner', BranchJoiner(List[ChatMessage]))
pipe.add_component('fc_llm', OpenAIChatGenerator(model="gpt-3.5-turbo-0125"))
pipe.add_component('validator', JsonSchemaValidator(json_schema=person_schema))
pipe.add_component('adapter', OutputAdapter("{{chat_message}}", List[ChatMessage])),
# And connect them
pipe.connect("adapter", "joiner")
pipe.connect("joiner", "fc_llm")
pipe.connect("fc_llm.replies", "validator.messages")
pipe.connect("validator.validation_error", "joiner")

result = pipe.run(data={"fc_llm": {"generation_kwargs": {"response_format": {"type": "json_object"}}},
                        "adapter": {"chat_message": [ChatMessage.from_user("Create json from Peter Parker")]}})

print(json.loads(result["validator"]["validated"][0].content))


>> {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation':
>> 'Superhero', 'age': 23, 'location': 'New York City'}

Note that BranchJoiner can manage only one data type at a time. In this case, BranchJoiner is created for passing List[ChatMessage]. This determines the type of data that BranchJoiner will receive from the upstream connected components and also the type of data that BranchJoiner will send through its output.

In the code example, BranchJoiner receives a looped back List[ChatMessage] from the JsonSchemaValidator and sends it down to the OpenAIChatGenerator for re-generation. We can have multiple loopback connections in the pipeline. In this instance, the downstream component is only one (the OpenAIChatGenerator), but the pipeline might have more than one downstream component.

BranchJoiner.__init__

def __init__(type_: Type)

Create a BranchJoiner component.

Arguments:

  • type_: The type of data that the BranchJoiner will receive from the upstream connected components and distribute to the downstream connected components.

BranchJoiner.to_dict

def to_dict()

Serializes the component to a dictionary.

Returns:

Dictionary with serialized data.

BranchJoiner.from_dict

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "BranchJoiner"

Deserializes the component from a dictionary.

Arguments:

  • data: Dictionary to deserialize from.

Returns:

Deserialized component.

BranchJoiner.run

def run(**kwargs)

The run method of the BranchJoiner component.

Multiplexes the input data from the upstream connected components and distributes it to the downstream connected components.

Arguments:

  • **kwargs: The input data. Must be of the type declared in __init__.

Returns:

A dictionary with the following keys:

  • value: The input data.