DocumentJoiner
Use this component in hybrid retrieval pipelines or indexing pipelines with multiple file converters to join lists of documents.
Most common position in a pipeline | In indexing and query pipelines, after components that return a list of documents such as multiple Retrievers or multiple Converters |
Mandatory run variables | “documents”: A list of documents. This input is variadic , meaning you can connect a variable number of components to it. |
Output variables | “documents”: A list of documents |
API reference | Joiners |
GitHub link | https://github.com/deepset-ai/haystack/blob/main/haystack/components/joiners/document_joiner.py |
Overview
DocumentJoiner
joins input lists of documents from multiple connections and outputs them as one list. You can choose how you want the lists to be joined by specifying the join_mode
. There are three options available:
concatenate
- Combines document from multiple components, discarding any duplicates. documents get their scores from the last component in the pipeline that assigns scores. This mode doesn’t influence document scores.merge
- Merges the scores of duplicate documents coming from multiple components. You can also assign a weight to the scores to influence how they’re merged and set the top_k limit to specify how many documents you wantDocumentJoiner
to return.reciprocal_rank_fusion
- Combines documents into a single list based on their ranking received from multiple components. It then calculates a new score based on the ranks of documents in the input lists. If the same Document appears in more than one list (was returned by multiple components), it gets a higher score.distribution_based_rank_fusion
– Combines rankings from multiple sources into a single, unified ranking. It analyzes how scores are spread out and normalizes them, ensuring that each component's scoring method is taken into account. This normalization helps to balance the influence of each component, resulting in a more robust and fair combined ranking. If a document appears in multiple lists, its final score is adjusted based on the distribution of scores from all lists.
Usage
On its own
Below is an example where we are using the DocumentJoiner
to merge two lists of documents. We run the DocumentJoiner
and provide the documents. It returns a list of documents ranked by combined scores. By default, equal weight is given to each Retriever score. You could also use custom weights by setting the weights parameter to a list of floats with one weight per input component.
from haystack import Document
from haystack.components.joiners.document_joiner import DocumentJoiner
docs_1 = [Document(content="Paris is the capital of France.", score=0.5), Document(content="Berlin is the capital of Germany.", score=0.4)]
docs_2 = [Document(content="Paris is the capital of France.", score=0.6), Document(content="Rome is the capital of Italy.", score=0.5)]
joiner = DocumentJoiner(join_mode="merge")
joiner.run(documents=[docs_1, docs_2])
# {'documents': [Document(id=0f5beda04153dbfc462c8b31f8536749e43654709ecf0cfe22c6d009c9912214, content: 'Paris is the capital of France.', score: 0.55), Document(id=424beed8b549a359239ab000f33ca3b1ddb0f30a988bbef2a46597b9c27e42f2, content: 'Rome is the capital of Italy.', score: 0.25), Document(id=312b465e77e25c11512ee76ae699ce2eb201f34c8c51384003bb367e24fb6cf8, content: 'Berlin is the capital of Germany.', score: 0.2)]}
In a pipeline
Hybrid Retrieval
Below is an example of a hybrid retrieval pipeline that retrieves documents from an InMemoryDocumentStore
based on keyword search (using InMemoryBM25Retriever
) and embedding search (using InMemoryEmbeddingRetriever
). It then uses the DocumentJoiner
with its default join mode to concatenate the retrieved documents into one list. The Document Store must contain documents with embeddings, otherwise the InMemoryEmbeddingRetriever
will not return any documents.
from haystack.components.joiners.document_joiner import DocumentJoiner
from haystack import Pipeline
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever, InMemoryEmbeddingRetriever
from haystack.components.embedders import SentenceTransformersTextEmbedder
document_store = InMemoryDocumentStore()
p = Pipeline()
p.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="bm25_retriever")
p.add_component(
instance=SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2"),
name="text_embedder",
)
p.add_component(instance=InMemoryEmbeddingRetriever(document_store=document_store), name="embedding_retriever")
p.add_component(instance=DocumentJoiner(), name="joiner")
p.connect("bm25_retriever", "joiner")
p.connect("embedding_retriever", "joiner")
p.connect("text_embedder", "embedding_retriever")
query = "What is the capital of France?"
p.run(data={"bm25_retriever": {"query": query},
"text_embedder": {"text": query}})
Indexing
Here's an example of an indexing pipeline that uses DocumentJoiner
to compile all files into a single list of documents that can be fed through the rest of the indexing pipeline as one.
from haystack.components.writers import DocumentWriter
from haystack.components.converters import MarkdownToDocument, PyPDFToDocument, TextFileToDocument
from haystack.components.preprocessors import DocumentSplitter, DocumentCleaner
from haystack.components.routers import FileTypeRouter
from haystack.components.joiners import DocumentJoiner
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack import Pipeline
from haystack.document_stores.in_memory import InMemoryDocumentStore
from pathlib import Path
document_store = InMemoryDocumentStore()
file_type_router = FileTypeRouter(mime_types=["text/plain", "application/pdf", "text/markdown"])
text_file_converter = TextFileToDocument()
markdown_converter = MarkdownToDocument()
pdf_converter = PyPDFToDocument()
document_joiner = DocumentJoiner()
document_cleaner = DocumentCleaner()
document_splitter = DocumentSplitter(split_by="word", split_length=150, split_overlap=50)
document_embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
document_writer = DocumentWriter(document_store)
preprocessing_pipeline = Pipeline()
preprocessing_pipeline.add_component(instance=file_type_router, name="file_type_router")
preprocessing_pipeline.add_component(instance=text_file_converter, name="text_file_converter")
preprocessing_pipeline.add_component(instance=markdown_converter, name="markdown_converter")
preprocessing_pipeline.add_component(instance=pdf_converter, name="pypdf_converter")
preprocessing_pipeline.add_component(instance=document_joiner, name="document_joiner")
preprocessing_pipeline.add_component(instance=document_cleaner, name="document_cleaner")
preprocessing_pipeline.add_component(instance=document_splitter, name="document_splitter")
preprocessing_pipeline.add_component(instance=document_embedder, name="document_embedder")
preprocessing_pipeline.add_component(instance=document_writer, name="document_writer")
preprocessing_pipeline.connect("file_type_router.text/plain", "text_file_converter.sources")
preprocessing_pipeline.connect("file_type_router.application/pdf", "pypdf_converter.sources")
preprocessing_pipeline.connect("file_type_router.text/markdown", "markdown_converter.sources")
preprocessing_pipeline.connect("text_file_converter", "document_joiner")
preprocessing_pipeline.connect("pypdf_converter", "document_joiner")
preprocessing_pipeline.connect("markdown_converter", "document_joiner")
preprocessing_pipeline.connect("document_joiner", "document_cleaner")
preprocessing_pipeline.connect("document_cleaner", "document_splitter")
preprocessing_pipeline.connect("document_splitter", "document_embedder")
preprocessing_pipeline.connect("document_embedder", "document_writer")
preprocessing_pipeline.run({"file_type_router": {"sources": list(Path(output_dir).glob("**/*"))}})
Additional References
📓 Tutorial: Preprocessing Different File Types
Updated about 1 month ago