Components that join list of different objects
Module document_joiner
DocumentJoiner
A component that joins multiple list of Documents into a single list.
It supports different joins modes:
- concatenate: Keeps the highest scored Document in case of duplicates.
- merge: Merge a calculate a weighted sum of the scores of duplicate Documents.
- reciprocal_rank_fusion: Merge and assign scores based on reciprocal rank fusion.
- distribution_based_rank_fusion: Merge and assign scores based on scores distribution in each retriever
Usage example:
document_store = InMemoryDocumentStore()
p = Pipeline()
p.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="bm25_retriever")
p.add_component(
instance=SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2"),
name="text_embedder",
)
p.add_component(instance=InMemoryEmbeddingRetriever(document_store=document_store), name="embedding_retriever")
p.add_component(instance=DocumentJoiner(), name="joiner")
p.connect("bm25_retriever", "joiner")
p.connect("embedding_retriever", "joiner")
p.connect("text_embedder", "embedding_retriever")
query = "What is the capital of France?"
p.run(data={"query": query})
DocumentJoiner.__init__
def __init__(join_mode: str = "concatenate",
weights: Optional[List[float]] = None,
top_k: Optional[int] = None,
sort_by_score: bool = True)
Create an DocumentJoiner component.
Arguments:
join_mode
: Specifies the join mode to use. Available modes:concatenate
merge
reciprocal_rank_fusion
distribution_based_rank_fusion
weights
: Weight for each list of Documents received, must have the same length as the number of inputs. Ifjoin_mode
isconcatenate
ordistribution_based_rank_fusion
this parameter is ignored.top_k
: The maximum number of Documents to return.sort_by_score
: If True sorts the Documents by score in descending order. If a Document has no score, it is handled as if its score is -infinity.
DocumentJoiner.run
@component.output_types(documents=List[Document])
def run(documents: Variadic[List[Document]], top_k: Optional[int] = None)
Joins multiple lists of Documents into a single list depending on the join_mode
parameter.
Arguments:
documents
: List of list of Documents to be merged.top_k
: The maximum number of Documents to return. Overrides the instance'stop_k
if provided.
Returns:
A dictionary with the following keys:
documents
: Merged list of Documents
Module branch
BranchJoiner
A component to join different branches of a pipeline into one single output.
BranchJoiner
receives multiple data connections of the same type from other components and passes the first
value coming to its single output, possibly distributing it to various other components.
BranchJoiner
is fundamental to close loops in a pipeline, where the two branches it joins are the ones
coming from the previous component and one coming back from a loop. For example, BranchJoiner
could be used
to send data to a component evaluating errors. BranchJoiner
would receive two connections, one to get the
original data and another one to get modified data in case there was an error. In both cases, BranchJoiner
would send (or re-send in case of a loop) data to the component evaluating errors. See "Usage example" below.
Another use case with a need for BranchJoiner
is to reconcile multiple branches coming out of a decision
or Classifier component. For example, in a RAG pipeline, there might be a "query language classifier" component
sending the query to different retrievers, selecting one specifically according to the detected language. After the
retrieval step the pipeline would ideally continue with a PromptBuilder
, and since we don't know in advance the
language of the query, all the retrievers should be ideally connected to the single PromptBuilder
. Since the
PromptBuilder
won't accept more than one connection in input, we would connect all the retrievers to a
BranchJoiner
component and reconcile them in a single output that can be connected to the PromptBuilder
downstream.
Usage example:
import json
from typing import List
from haystack import Pipeline
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.joiners import BranchJoiner
from haystack.components.validators import JsonSchemaValidator
from haystack.dataclasses import ChatMessage
person_schema = {
"type": "object",
"properties": {
"first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"nationality": {"type": "string", "enum": ["Italian", "Portuguese", "American"]},
},
"required": ["first_name", "last_name", "nationality"]
}
# Initialize a pipeline
pipe = Pipeline()
# Add components to the pipeline
pipe.add_component('joiner', BranchJoiner(List[ChatMessage]))
pipe.add_component('fc_llm', OpenAIChatGenerator(model="gpt-3.5-turbo-0125"))
pipe.add_component('validator', JsonSchemaValidator(json_schema=person_schema))
pipe.add_component('adapter', OutputAdapter("{{chat_message}}", List[ChatMessage])),
# And connect them
pipe.connect("adapter", "joiner")
pipe.connect("joiner", "fc_llm")
pipe.connect("fc_llm.replies", "validator.messages")
pipe.connect("validator.validation_error", "joiner")
result = pipe.run(data={"fc_llm": {"generation_kwargs": {"response_format": {"type": "json_object"}}},
"adapter": {"chat_message": [ChatMessage.from_user("Create json from Peter Parker")]}})
print(json.loads(result["validator"]["validated"][0].content))
>> {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation':
>> 'Superhero', 'age': 23, 'location': 'New York City'}
Note that BranchJoiner
can manage only one data type at a time. In this case, BranchJoiner
is created for
passing List[ChatMessage]
. This determines the type of data that BranchJoiner
will receive from the upstream
connected components and also the type of data that BranchJoiner
will send through its output.
In the code example, BranchJoiner
receives a looped back List[ChatMessage]
from the JsonSchemaValidator
and
sends it down to the OpenAIChatGenerator
for re-generation. We can have multiple loopback connections in the
pipeline. In this instance, the downstream component is only one (the OpenAIChatGenerator
), but the pipeline might
have more than one downstream component.
BranchJoiner.__init__
def __init__(type_: Type)
Create a BranchJoiner
component.
Arguments:
type_
: The type of data that theBranchJoiner
will receive from the upstream connected components and distribute to the downstream connected components.
BranchJoiner.to_dict
def to_dict()
Serializes the component to a dictionary.
Returns:
Dictionary with serialized data.
BranchJoiner.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "BranchJoiner"
Deserializes the component from a dictionary.
Arguments:
data
: Dictionary to deserialize from.
Returns:
Deserialized component.
BranchJoiner.run
def run(**kwargs)
The run method of the BranchJoiner
component.
Multiplexes the input data from the upstream connected components and distributes it to the downstream connected components.
Arguments:
**kwargs
: The input data. Must be of the type declared in__init__
.
Returns:
A dictionary with the following keys:
value
: The input data.