Routers is a group of components that route queries or Documents to other components that can handle them best.
Module conditional_router
NoRouteSelectedException
Exception raised when no route is selected in ConditionalRouter.
RouteConditionException
Exception raised when there is an error parsing or evaluating the condition expression in ConditionalRouter.
ConditionalRouter
Routes data based on specific conditions.
You define these conditions in a list of dictionaries called routes
.
Each dictionary in this list represents a single route. Each route has these four elements:
condition
: A Jinja2 string expression that determines if the route is selected.output
: A Jinja2 expression defining the route's output value.output_type
: The type of the output data (for example,str
,List[int]
).output_name
: The name you want to use to publishoutput
. This name is used to connect the router to other components in the pipeline.
Usage example
from typing import List
from haystack.components.routers import ConditionalRouter
routes = [
{
"condition": "{{streams|length > 2}}",
"output": "{{streams}}",
"output_name": "enough_streams",
"output_type": List[int],
},
{
"condition": "{{streams|length <= 2}}",
"output": "{{streams}}",
"output_name": "insufficient_streams",
"output_type": List[int],
},
]
router = ConditionalRouter(routes)
# When 'streams' has more than 2 items, 'enough_streams' output will activate, emitting the list [1, 2, 3]
kwargs = {"streams": [1, 2, 3], "query": "Haystack"}
result = router.run(**kwargs)
assert result == {"enough_streams": [1, 2, 3]}
In this example, we configure two routes. The first route sends the 'streams' value to 'enough_streams' if the stream count exceeds two. The second route directs 'streams' to 'insufficient_streams' if there are two or fewer streams.
In the pipeline setup, the Router connects to other components using the output names. For example, 'enough_streams' might connect to a component that processes streams, while 'insufficient_streams' might connect to a component that fetches more streams.
Here is a pipeline that uses ConditionalRouter
and routes the fetched ByteStreams
to
different components depending on the number of streams fetched:
from typing import List
from haystack import Pipeline
from haystack.dataclasses import ByteStream
from haystack.components.routers import ConditionalRouter
routes = [
{
"condition": "{{streams|length > 2}}",
"output": "{{streams}}",
"output_name": "enough_streams",
"output_type": List[ByteStream],
},
{
"condition": "{{streams|length <= 2}}",
"output": "{{streams}}",
"output_name": "insufficient_streams",
"output_type": List[ByteStream],
},
]
pipe = Pipeline()
pipe.add_component("router", router)
...
pipe.connect("router.enough_streams", "some_component_a.streams")
pipe.connect("router.insufficient_streams", "some_component_b.streams_or_some_other_input")
...
ConditionalRouter.__init__
def __init__(routes: List[Dict],
custom_filters: Optional[Dict[str, Callable]] = None,
unsafe: bool = False,
validate_output_type: bool = False)
Initializes the ConditionalRouter
with a list of routes detailing the conditions for routing.
Arguments:
routes
: A list of dictionaries, each defining a route. Each route has these four elements:condition
: A Jinja2 string expression that determines if the route is selected.output
: A Jinja2 expression defining the route's output value.output_type
: The type of the output data (for example,str
,List[int]
).output_name
: The name you want to use to publishoutput
. This name is used to connect the router to other components in the pipeline.custom_filters
: A dictionary of custom Jinja2 filters used in the condition expressions. For example, passing{"my_filter": my_filter_fcn}
where:my_filter
is the name of the custom filter.my_filter_fcn
is a callable that takesmy_var:str
and returnsmy_var[:3]
.{{ my_var|my_filter }}
can then be used inside a route condition expression:"condition": "{{ my_var|my_filter == 'foo' }}"
.unsafe
: Enable execution of arbitrary code in the Jinja template. This should only be used if you trust the source of the template as it can be lead to remote code execution.validate_output_type
: Enable validation of routes' output. If a route output doesn't match the declared type a ValueError is raised running.
ConditionalRouter.to_dict
def to_dict() -> Dict[str, Any]
Serializes the component to a dictionary.
Returns:
Dictionary with serialized data.
ConditionalRouter.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ConditionalRouter"
Deserializes the component from a dictionary.
Arguments:
data
: The dictionary to deserialize from.
Returns:
The deserialized component.
ConditionalRouter.run
def run(**kwargs)
Executes the routing logic.
Executes the routing logic by evaluating the specified boolean condition expressions for each route in the
order they are listed. The method directs the flow of data to the output specified in the first route whose
condition
is True.
Arguments:
kwargs
: All variables used in thecondition
expressed in the routes. When the component is used in a pipeline, these variables are passed from the previous component's output.
Raises:
NoRouteSelectedException
: If nocondition' in the routes is
True`.RouteConditionException
: If there is an error parsing or evaluating thecondition
expression in the routes.ValueError
: If type validation is enabled and route type doesn't match actual value type.
Returns:
A dictionary where the key is the output_name
of the selected route and the value is the output
of the selected route.
Module file_type_router
FileTypeRouter
Categorizes files or byte streams by their MIME types, helping in context-based routing.
FileTypeRouter supports both exact MIME type matching and regex patterns.
For file paths, MIME types come from extensions, while byte streams use metadata.
You can use regex patterns in the mime_types
parameter to set broad categories
(such as 'audio/' or 'text/') or specific types.
MIME types without regex patterns are treated as exact matches.
Usage example
from haystack.components.routers import FileTypeRouter
from pathlib import Path
# For exact MIME type matching
router = FileTypeRouter(mime_types=["text/plain", "application/pdf"])
# For flexible matching using regex, to handle all audio types
router_with_regex = FileTypeRouter(mime_types=[r"audio/.*", r"text/plain"])
sources = [Path("file.txt"), Path("document.pdf"), Path("song.mp3")]
print(router.run(sources=sources))
print(router_with_regex.run(sources=sources))
# Expected output:
# {'text/plain': [
# PosixPath('file.txt')], 'application/pdf': [PosixPath('document.pdf')], 'unclassified': [PosixPath('song.mp3')
# ]}
# {'audio/.*': [
# PosixPath('song.mp3')], 'text/plain': [PosixPath('file.txt')], 'unclassified': [PosixPath('document.pdf')
# ]}
FileTypeRouter.__init__
def __init__(mime_types: List[str],
additional_mimetypes: Optional[Dict[str, str]] = None)
Initialize the FileTypeRouter component.
Arguments:
mime_types
: A list of MIME types or regex patterns to classify the input files or byte streams. (for example:["text/plain", "audio/x-wav", "image/jpeg"]
).additional_mimetypes
: A dictionary containing the MIME type to add to the mimetypes package to prevent unsupported or non native packages from being unclassified. (for example:{"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx"}
).
FileTypeRouter.to_dict
def to_dict() -> Dict[str, Any]
Serializes the component to a dictionary.
Returns:
Dictionary with serialized data.
FileTypeRouter.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "FileTypeRouter"
Deserializes the component from a dictionary.
Arguments:
data
: The dictionary to deserialize from.
Returns:
The deserialized component.
FileTypeRouter.run
def run(
sources: List[Union[str, Path, ByteStream]],
meta: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None
) -> Dict[str, List[Union[ByteStream, Path]]]
Categorize files or byte streams according to their MIME types.
Arguments:
sources
: A list of file paths or byte streams to categorize.meta
: Optional metadata to attach to the sources. When provided, the sources are internally converted to ByteStream objects and the metadata is added. This value can be a list of dictionaries or a single dictionary. If it's a single dictionary, its content is added to the metadata of all ByteStream objects. If it's a list, its length must match the number of sources, as they are zipped together.
Returns:
A dictionary where the keys are MIME types (or "unclassified"
) and the values are lists of data
sources.
Module metadata_router
MetadataRouter
Routes documents to different connections based on their metadata fields.
Specify the routing rules in the init
method.
If a document does not match any of the rules, it's routed to a connection named "unmatched".
Usage example
from haystack import Document
from haystack.components.routers import MetadataRouter
docs = [Document(content="Paris is the capital of France.", meta={"language": "en"}),
Document(content="Berlin ist die Haupststadt von Deutschland.", meta={"language": "de"})]
router = MetadataRouter(rules={"en": {"field": "meta.language", "operator": "==", "value": "en"}})
print(router.run(documents=docs))
# {'en': [Document(id=..., content: 'Paris is the capital of France.', meta: {'language': 'en'})],
# 'unmatched': [Document(id=..., content: 'Berlin ist die Haupststadt von Deutschland.', meta: {'language': 'de'})]}
MetadataRouter.__init__
def __init__(rules: Dict[str, Dict])
Initializes the MetadataRouter component.
Arguments:
rules
: A dictionary defining how to route documents to output connections based on their metadata. Keys are output connection names, and values are dictionaries of filtering expressions in Haystack. For example:
{
"edge_1": {
"operator": "AND",
"conditions": [
{"field": "meta.created_at", "operator": ">=", "value": "2023-01-01"},
{"field": "meta.created_at", "operator": "<", "value": "2023-04-01"},
],
},
"edge_2": {
"operator": "AND",
"conditions": [
{"field": "meta.created_at", "operator": ">=", "value": "2023-04-01"},
{"field": "meta.created_at", "operator": "<", "value": "2023-07-01"},
],
},
"edge_3": {
"operator": "AND",
"conditions": [
{"field": "meta.created_at", "operator": ">=", "value": "2023-07-01"},
{"field": "meta.created_at", "operator": "<", "value": "2023-10-01"},
],
},
"edge_4": {
"operator": "AND",
"conditions": [
{"field": "meta.created_at", "operator": ">=", "value": "2023-10-01"},
{"field": "meta.created_at", "operator": "<", "value": "2024-01-01"},
],
},
}
MetadataRouter.run
def run(documents: List[Document])
Routes the documents.
If a document does not match any of the rules, it's routed to a connection named "unmatched".
Arguments:
documents
: A list of documents to route.
Returns:
A dictionary where the keys are the names of the output connections (including "unmatched"
)
and the values are lists of routed documents.
Module text_language_router
TextLanguageRouter
Routes text strings to different output connections based on their language.
Provide a list of languages during initialization. If the document's text doesn't match any of the specified languages, the metadata value is set to "unmatched". For routing documents based on their language, use the DocumentLanguageClassifier component, followed by the MetaDataRouter.
Usage example
from haystack import Pipeline, Document
from haystack.components.routers import TextLanguageRouter
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
document_store = InMemoryDocumentStore()
document_store.write_documents([Document(content="Elvis Presley was an American singer and actor.")])
p = Pipeline()
p.add_component(instance=TextLanguageRouter(languages=["en"]), name="text_language_router")
p.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="retriever")
p.connect("text_language_router.en", "retriever.query")
result = p.run({"text_language_router": {"text": "Who was Elvis Presley?"}})
assert result["retriever"]["documents"][0].content == "Elvis Presley was an American singer and actor."
result = p.run({"text_language_router": {"text": "ένα ελληνικό κείμενο"}})
assert result["text_language_router"]["unmatched"] == "ένα ελληνικό κείμενο"
TextLanguageRouter.__init__
def __init__(languages: Optional[List[str]] = None)
Initialize the TextLanguageRouter component.
Arguments:
languages
: A list of ISO language codes. See the supported languages inlangdetect
documentation. If not specified, defaults to ["en"].
TextLanguageRouter.run
def run(text: str) -> Dict[str, str]
Routes the text strings to different output connections based on their language.
If the document's text doesn't match any of the specified languages, the metadata value is set to "unmatched".
Arguments:
text
: A text string to route.
Raises:
TypeError
: If the input is not a string.
Returns:
A dictionary in which the key is the language (or "unmatched"
),
and the value is the text.
Module transformers_text_router
TransformersTextRouter
Routes the text strings to different connections based on a category label.
The labels are specific to each model and can be found it its description on Hugging Face.
Usage example
from haystack.core.pipeline import Pipeline
from haystack.components.routers import TransformersTextRouter
from haystack.components.builders import PromptBuilder
from haystack.components.generators import HuggingFaceLocalGenerator
p = Pipeline()
p.add_component(
instance=TransformersTextRouter(model="papluca/xlm-roberta-base-language-detection"),
name="text_router"
)
p.add_component(
instance=PromptBuilder(template="Answer the question: {{query}}\nAnswer:"),
name="english_prompt_builder"
)
p.add_component(
instance=PromptBuilder(template="Beantworte die Frage: {{query}}\nAntwort:"),
name="german_prompt_builder"
)
p.add_component(
instance=HuggingFaceLocalGenerator(model="DiscoResearch/Llama3-DiscoLeo-Instruct-8B-v0.1"),
name="german_llm"
)
p.add_component(
instance=HuggingFaceLocalGenerator(model="microsoft/Phi-3-mini-4k-instruct"),
name="english_llm"
)
p.connect("text_router.en", "english_prompt_builder.query")
p.connect("text_router.de", "german_prompt_builder.query")
p.connect("english_prompt_builder.prompt", "english_llm.prompt")
p.connect("german_prompt_builder.prompt", "german_llm.prompt")
# English Example
print(p.run({"text_router": {"text": "What is the capital of Germany?"}}))
# German Example
print(p.run({"text_router": {"text": "Was ist die Hauptstadt von Deutschland?"}}))
TransformersTextRouter.__init__
def __init__(model: str,
labels: Optional[List[str]] = None,
device: Optional[ComponentDevice] = None,
token: Optional[Secret] = Secret.from_env_var(
["HF_API_TOKEN", "HF_TOKEN"], strict=False),
huggingface_pipeline_kwargs: Optional[Dict[str, Any]] = None)
Initializes the TransformersTextRouter component.
Arguments:
model
: The name or path of a Hugging Face model for text classification.labels
: The list of labels. If not provided, the component fetches the labels from the model configuration file hosted on the Hugging Face Hub usingtransformers.AutoConfig.from_pretrained
.device
: The device for loading the model. IfNone
, automatically selects the default device. If a device or device map is specified inhuggingface_pipeline_kwargs
, it overrides this parameter.token
: The API token used to download private models from Hugging Face. IfTrue
, uses eitherHF_API_TOKEN
orHF_TOKEN
environment variables. To generate these tokens, runtransformers-cli login
.huggingface_pipeline_kwargs
: A dictionary of keyword arguments for initializing the Hugging Face text classification pipeline.
TransformersTextRouter.warm_up
def warm_up()
Initializes the component.
TransformersTextRouter.to_dict
def to_dict() -> Dict[str, Any]
Serializes the component to a dictionary.
Returns:
Dictionary with serialized data.
TransformersTextRouter.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "TransformersTextRouter"
Deserializes the component from a dictionary.
Arguments:
data
: Dictionary to deserialize from.
Returns:
Deserialized component.
TransformersTextRouter.run
def run(text: str) -> Dict[str, str]
Routes the text strings to different connections based on a category label.
Arguments:
text
: A string of text to route.
Raises:
TypeError
: If the input is not a str.RuntimeError
: If the pipeline has not been loaded because warm_up() was not called before.
Returns:
A dictionary with the label as key and the text as value.
Module zero_shot_text_router
TransformersZeroShotTextRouter
Routes the text strings to different connections based on a category label.
Specify the set of labels for categorization when initializing the component.
Usage example
from haystack import Document
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.core.pipeline import Pipeline
from haystack.components.routers import TransformersZeroShotTextRouter
from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
from haystack.components.retrievers import InMemoryEmbeddingRetriever
document_store = InMemoryDocumentStore()
doc_embedder = SentenceTransformersDocumentEmbedder(model="intfloat/e5-base-v2")
doc_embedder.warm_up()
docs = [
Document(
content="Germany, officially the Federal Republic of Germany, is a country in the western region of "
"Central Europe. The nation's capital and most populous city is Berlin and its main financial centre "
"is Frankfurt; the largest urban area is the Ruhr."
),
Document(
content="France, officially the French Republic, is a country located primarily in Western Europe. "
"France is a unitary semi-presidential republic with its capital in Paris, the country's largest city "
"and main cultural and commercial centre; other major urban areas include Marseille, Lyon, Toulouse, "
"Lille, Bordeaux, Strasbourg, Nantes and Nice."
)
]
docs_with_embeddings = doc_embedder.run(docs)
document_store.write_documents(docs_with_embeddings["documents"])
p = Pipeline()
p.add_component(instance=TransformersZeroShotTextRouter(labels=["passage", "query"]), name="text_router")
p.add_component(
instance=SentenceTransformersTextEmbedder(model="intfloat/e5-base-v2", prefix="passage: "),
name="passage_embedder"
)
p.add_component(
instance=SentenceTransformersTextEmbedder(model="intfloat/e5-base-v2", prefix="query: "),
name="query_embedder"
)
p.add_component(
instance=InMemoryEmbeddingRetriever(document_store=document_store),
name="query_retriever"
)
p.add_component(
instance=InMemoryEmbeddingRetriever(document_store=document_store),
name="passage_retriever"
)
p.connect("text_router.passage", "passage_embedder.text")
p.connect("passage_embedder.embedding", "passage_retriever.query_embedding")
p.connect("text_router.query", "query_embedder.text")
p.connect("query_embedder.embedding", "query_retriever.query_embedding")
# Query Example
p.run({"text_router": {"text": "What is the capital of Germany?"}})
# Passage Example
p.run({
"text_router":{
"text": "The United Kingdom of Great Britain and Northern Ireland, commonly known as the " "United Kingdom (UK) or Britain, is a country in Northwestern Europe, off the north-western coast of " "the continental mainland."
}
})
TransformersZeroShotTextRouter.__init__
def __init__(labels: List[str],
multi_label: bool = False,
model: str = "MoritzLaurer/deberta-v3-base-zeroshot-v1.1-all-33",
device: Optional[ComponentDevice] = None,
token: Optional[Secret] = Secret.from_env_var(
["HF_API_TOKEN", "HF_TOKEN"], strict=False),
huggingface_pipeline_kwargs: Optional[Dict[str, Any]] = None)
Initializes the TransformersZeroShotTextRouter component.
Arguments:
labels
: The set of labels to use for classification. Can be a single label, a string of comma-separated labels, or a list of labels.multi_label
: Indicates if multiple labels can be true. IfFalse
, label scores are normalized so their sum equals 1 for each sequence. IfTrue
, the labels are considered independent and probabilities are normalized for each candidate by doing a softmax of the entailment score vs. the contradiction score.model
: The name or path of a Hugging Face model for zero-shot text classification.device
: The device for loading the model. IfNone
, automatically selects the default device. If a device or device map is specified inhuggingface_pipeline_kwargs
, it overrides this parameter.token
: The API token used to download private models from Hugging Face. IfTrue
, uses eitherHF_API_TOKEN
orHF_TOKEN
environment variables. To generate these tokens, runtransformers-cli login
.huggingface_pipeline_kwargs
: A dictionary of keyword arguments for initializing the Hugging Face zero shot text classification.
TransformersZeroShotTextRouter.warm_up
def warm_up()
Initializes the component.
TransformersZeroShotTextRouter.to_dict
def to_dict() -> Dict[str, Any]
Serializes the component to a dictionary.
Returns:
Dictionary with serialized data.
TransformersZeroShotTextRouter.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "TransformersZeroShotTextRouter"
Deserializes the component from a dictionary.
Arguments:
data
: Dictionary to deserialize from.
Returns:
Deserialized component.
TransformersZeroShotTextRouter.run
def run(text: str) -> Dict[str, str]
Routes the text strings to different connections based on a category label.
Arguments:
text
: A string of text to route.
Raises:
TypeError
: If the input is not a str.RuntimeError
: If the pipeline has not been loaded because warm_up() was not called before.
Returns:
A dictionary with the label as key and the text as value.