Components that join list of different objects
Module document_joiner
JoinMode
Enum for join mode.
JoinMode.from_str
@staticmethod
def from_str(string: str) -> "JoinMode"
Convert a string to a JoinMode enum.
DocumentJoiner
Joins multiple lists of documents into a single list.
It supports different join modes:
- concatenate: Keeps the highest-scored document in case of duplicates.
- merge: Calculates a weighted sum of scores for duplicates and merges them.
- reciprocal_rank_fusion: Merges and assigns scores based on reciprocal rank fusion.
- distribution_based_rank_fusion: Merges and assigns scores based on scores distribution in each Retriever.
Usage example:
document_store = InMemoryDocumentStore()
p = Pipeline()
p.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="bm25_retriever")
p.add_component(
instance=SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2"),
name="text_embedder",
)
p.add_component(instance=InMemoryEmbeddingRetriever(document_store=document_store), name="embedding_retriever")
p.add_component(instance=DocumentJoiner(), name="joiner")
p.connect("bm25_retriever", "joiner")
p.connect("embedding_retriever", "joiner")
p.connect("text_embedder", "embedding_retriever")
query = "What is the capital of France?"
p.run(data={"query": query})
DocumentJoiner.__init__
def __init__(join_mode: Union[str, JoinMode] = JoinMode.CONCATENATE,
weights: Optional[List[float]] = None,
top_k: Optional[int] = None,
sort_by_score: bool = True)
Creates a DocumentJoiner component.
Arguments:
join_mode
: Specifies the join mode to use. Available modes:concatenate
: Keeps the highest-scored document in case of duplicates.merge
: Calculates a weighted sum of scores for duplicates and merges them.reciprocal_rank_fusion
: Merges and assigns scores based on reciprocal rank fusion.distribution_based_rank_fusion
: Merges and assigns scores based on scores distribution in each Retriever.weights
: Assign importance to each list of documents to influence how they're joined. This parameter is ignored forconcatenate
ordistribution_based_rank_fusion
join modes. Weight for each list of documents must match the number of inputs.top_k
: The maximum number of documents to return.sort_by_score
: IfTrue
, sorts the documents by score in descending order. If a document has no score, it is handled as if its score is -infinity.
DocumentJoiner.run
@component.output_types(documents=List[Document])
def run(documents: Variadic[List[Document]], top_k: Optional[int] = None)
Joins multiple lists of Documents into a single list depending on the join_mode
parameter.
Arguments:
documents
: List of list of documents to be merged.top_k
: The maximum number of documents to return. Overrides the instance'stop_k
if provided.
Returns:
A dictionary with the following keys:
documents
: Merged list of Documents
DocumentJoiner.to_dict
def to_dict() -> Dict[str, Any]
Serializes the component to a dictionary.
Returns:
Dictionary with serialized data.
DocumentJoiner.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "DocumentJoiner"
Deserializes the component from a dictionary.
Arguments:
data
: The dictionary to deserialize from.
Returns:
The deserialized component.
Module branch
BranchJoiner
A component to join different branches of a pipeline into one single output.
BranchJoiner
receives multiple data connections of the same type from other components and passes the first
value coming to its single output, possibly distributing it to various other components.
BranchJoiner
is fundamental to close loops in a pipeline, where the two branches it joins are the ones
coming from the previous component and one coming back from a loop. For example, BranchJoiner
could be used
to send data to a component evaluating errors. BranchJoiner
would receive two connections, one to get the
original data and another one to get modified data in case there was an error. In both cases, BranchJoiner
would send (or re-send in case of a loop) data to the component evaluating errors. See "Usage example" below.
Another use case with a need for BranchJoiner
is to reconcile multiple branches coming out of a decision
or Classifier component. For example, in a RAG pipeline, there might be a "query language classifier" component
sending the query to different retrievers, selecting one specifically according to the detected language. After the
retrieval step the pipeline would ideally continue with a PromptBuilder
, and since we don't know in advance the
language of the query, all the retrievers should be ideally connected to the single PromptBuilder
. Since the
PromptBuilder
won't accept more than one connection in input, we would connect all the retrievers to a
BranchJoiner
component and reconcile them in a single output that can be connected to the PromptBuilder
downstream.
Usage example:
import json
from typing import List
from haystack import Pipeline
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.joiners import BranchJoiner
from haystack.components.validators import JsonSchemaValidator
from haystack.dataclasses import ChatMessage
person_schema = {
"type": "object",
"properties": {
"first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"nationality": {"type": "string", "enum": ["Italian", "Portuguese", "American"]},
},
"required": ["first_name", "last_name", "nationality"]
}
# Initialize a pipeline
pipe = Pipeline()
# Add components to the pipeline
pipe.add_component('joiner', BranchJoiner(List[ChatMessage]))
pipe.add_component('fc_llm', OpenAIChatGenerator(model="gpt-4o-mini"))
pipe.add_component('validator', JsonSchemaValidator(json_schema=person_schema))
pipe.add_component('adapter', OutputAdapter("{{chat_message}}", List[ChatMessage])),
# And connect them
pipe.connect("adapter", "joiner")
pipe.connect("joiner", "fc_llm")
pipe.connect("fc_llm.replies", "validator.messages")
pipe.connect("validator.validation_error", "joiner")
result = pipe.run(data={"fc_llm": {"generation_kwargs": {"response_format": {"type": "json_object"}}},
"adapter": {"chat_message": [ChatMessage.from_user("Create json from Peter Parker")]}})
print(json.loads(result["validator"]["validated"][0].content))
>> {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation':
>> 'Superhero', 'age': 23, 'location': 'New York City'}
Note that BranchJoiner
can manage only one data type at a time. In this case, BranchJoiner
is created for
passing List[ChatMessage]
. This determines the type of data that BranchJoiner
will receive from the upstream
connected components and also the type of data that BranchJoiner
will send through its output.
In the code example, BranchJoiner
receives a looped back List[ChatMessage]
from the JsonSchemaValidator
and
sends it down to the OpenAIChatGenerator
for re-generation. We can have multiple loopback connections in the
pipeline. In this instance, the downstream component is only one (the OpenAIChatGenerator
), but the pipeline might
have more than one downstream component.
BranchJoiner.__init__
def __init__(type_: Type)
Create a BranchJoiner
component.
Arguments:
type_
: The type of data that theBranchJoiner
will receive from the upstream connected components and distribute to the downstream connected components.
BranchJoiner.to_dict
def to_dict()
Serializes the component to a dictionary.
Returns:
Dictionary with serialized data.
BranchJoiner.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "BranchJoiner"
Deserializes the component from a dictionary.
Arguments:
data
: Dictionary to deserialize from.
Returns:
Deserialized component.
BranchJoiner.run
def run(**kwargs)
The run method of the BranchJoiner
component.
Multiplexes the input data from the upstream connected components and distributes it to the downstream connected components.
Arguments:
**kwargs
: The input data. Must be of the type declared in__init__
.
Returns:
A dictionary with the following keys:
value
: The input data.
Module answer_joiner
JoinMode
Enum for AnswerJoiner join modes.
JoinMode.from_str
@staticmethod
def from_str(string: str) -> "JoinMode"
Convert a string to a JoinMode enum.
AnswerJoiner
Merges multiple lists of Answer
objects into a single list.
Use this component to combine answers from different Generators into a single list.
Currently, the component supports only one join mode: CONCATENATE
.
This mode concatenates multiple lists of answers into a single list.
Usage example
In this example, AnswerJoiner merges answers from two different Generators:
from haystack.components.builders import AnswerBuilder
from haystack.components.joiners import AnswerJoiner
from haystack.core.pipeline import Pipeline
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
query = "What's Natural Language Processing?"
messages = [ChatMessage.from_system("You are a helpful, respectful and honest assistant. Be super concise."),
ChatMessage.from_user(query)]
pipe = Pipeline()
pipe.add_component("gpt-4o", OpenAIChatGenerator(model="gpt-4o"))
pipe.add_component("llama", OpenAIChatGenerator(model="gpt-3.5-turbo"))
pipe.add_component("aba", AnswerBuilder())
pipe.add_component("abb", AnswerBuilder())
pipe.add_component("joiner", AnswerJoiner())
pipe.connect("gpt-4o.replies", "aba")
pipe.connect("llama.replies", "abb")
pipe.connect("aba.answers", "joiner")
pipe.connect("abb.answers", "joiner")
results = pipe.run(data={"gpt-4o": {"messages": messages},
"llama": {"messages": messages},
"aba": {"query": query},
"abb": {"query": query}})
AnswerJoiner.__init__
def __init__(join_mode: Union[str, JoinMode] = JoinMode.CONCATENATE,
top_k: Optional[int] = None,
sort_by_score: bool = False)
Creates an AnswerJoiner component.
Arguments:
join_mode
: Specifies the join mode to use. Available modes:concatenate
: Concatenates multiple lists of Answers into a single list.top_k
: The maximum number of Answers to return.sort_by_score
: IfTrue
, sorts the documents by score in descending order. If a document has no score, it is handled as if its score is -infinity.
AnswerJoiner.run
@component.output_types(answers=List[AnswerType])
def run(answers: Variadic[List[AnswerType]], top_k: Optional[int] = None)
Joins multiple lists of Answers into a single list depending on the join_mode
parameter.
Arguments:
answers
: Nested list of Answers to be merged.top_k
: The maximum number of Answers to return. Overrides the instance'stop_k
if provided.
Returns:
A dictionary with the following keys:
answers
: Merged list of Answers
AnswerJoiner.to_dict
def to_dict() -> Dict[str, Any]
Serializes the component to a dictionary.
Returns:
Dictionary with serialized data.
AnswerJoiner.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "AnswerJoiner"
Deserializes the component from a dictionary.
Arguments:
data
: The dictionary to deserialize from.
Returns:
The deserialized component.