Routers is a group of components that route queries or Documents to other components that can handle them best.
Module conditional_router
NoRouteSelectedException
Exception raised when no route is selected in ConditionalRouter.
RouteConditionException
Exception raised when there is an error parsing or evaluating the condition expression in ConditionalRouter.
ConditionalRouter
ConditionalRouter
allows data routing based on specific conditions.
This is achieved by defining a list named routes
. Each element in this list is a dictionary representing a
single route.
A route dictionary comprises four key elements:
condition
: A Jinja2 string expression that determines if the route is selected.output
: A Jinja2 expression defining the route's output value.output_type
: The type of the output data (e.g.,str
,List[int]
).output_name
: The name under which theoutput
value of the route is published. This name is used to connect the router to other components in the pipeline.
Usage example:
from typing import List
from haystack.components.routers import ConditionalRouter
routes = [
{
"condition": "{{streams|length > 2}}",
"output": "{{streams}}",
"output_name": "enough_streams",
"output_type": List[int],
},
{
"condition": "{{streams|length <= 2}}",
"output": "{{streams}}",
"output_name": "insufficient_streams",
"output_type": List[int],
},
]
router = ConditionalRouter(routes)
# When 'streams' has more than 2 items, 'enough_streams' output will activate, emitting the list [1, 2, 3]
kwargs = {"streams": [1, 2, 3], "query": "Haystack"}
result = router.run(**kwargs)
assert result == {"enough_streams": [1, 2, 3]}
In this example, we configure two routes. The first route sends the 'streams' value to 'enough_streams' if the stream count exceeds two. Conversely, the second route directs 'streams' to 'insufficient_streams' when there are two or fewer streams.
In the pipeline setup, the router is connected to other components using the output names. For example, the 'enough_streams' output might be connected to another component that processes the streams, while the 'insufficient_streams' output might be connected to a component that fetches more streams, and so on.
Here is a pseudocode example of a pipeline that uses the ConditionalRouter
and routes fetched ByteStreams
to
different components depending on the number of streams fetched:
from typing import List
from haystack import Pipeline
from haystack.dataclasses import ByteStream
from haystack.components.routers import ConditionalRouter
routes = [
{
"condition": "{{streams|length > 2}}",
"output": "{{streams}}",
"output_name": "enough_streams",
"output_type": List[ByteStream],
},
{
"condition": "{{streams|length <= 2}}",
"output": "{{streams}}",
"output_name": "insufficient_streams",
"output_type": List[ByteStream],
},
]
pipe = Pipeline()
pipe.add_component("router", router)
...
pipe.connect("router.enough_streams", "some_component_a.streams")
pipe.connect("router.insufficient_streams", "some_component_b.streams_or_some_other_input")
...
ConditionalRouter.__init__
def __init__(routes: List[Dict],
custom_filters: Optional[Dict[str, Callable]] = None)
Initializes the ConditionalRouter
with a list of routes detailing the conditions for routing.
Arguments:
routes
: A list of dictionaries, each defining a route. A route dictionary comprises four key elements:condition
: A Jinja2 string expression that determines if the route is selected.output
: A Jinja2 expression defining the route's output value.output_type
: The type of the output data (e.g., str, List[int]).output_name
: The name under which theoutput
value of the route is published. This name is used to connect the router to other components in the pipeline.custom_filters
: A dictionary of custom Jinja2 filters to be used in the condition expressions. For example, passing{"my_filter": my_filter_fcn}
where:my_filter
is the name of the custom filter.my_filter_fcn
is a callable that takesmy_var:str
and returnsmy_var[:3]
.{{ my_var|my_filter }}
can then be used inside a route condition expression like so:"condition": "{{ my_var|my_filter == 'foo' }}"
.
ConditionalRouter.to_dict
def to_dict() -> Dict[str, Any]
Serializes the component to a dictionary.
Returns:
Dictionary with serialized data.
ConditionalRouter.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ConditionalRouter"
Deserializes the component from a dictionary.
Arguments:
data
: The dictionary to deserialize from.
Returns:
The deserialized component.
ConditionalRouter.run
def run(**kwargs)
Executes the routing logic.
Executes the routing logic by evaluating the specified boolean condition expressions for each route in the
order they are listed. The method directs the flow of data to the output specified in the first route whose
condition
is True.
Arguments:
kwargs
: All variables used in thecondition
expressed in the routes. When the component is used in a pipeline, these variables are passed from the previous component's output.
Raises:
NoRouteSelectedException
: If nocondition' in the routes is
True`.RouteConditionException
: If there is an error parsing or evaluating thecondition
expression in the routes.
Returns:
A dictionary where the key is the output_name
of the selected route and the value is the output
of the selected route.
Module file_type_router
FileTypeRouter
Groups a list of data sources by their MIME types.
FileTypeRouter groups a list of data sources (file paths or byte streams) by their MIME types, allowing for flexible routing of files to different components based on their content type. It supports both exact MIME type matching and pattern matching using regular expressions.
For file paths, MIME types are inferred from their extensions, while for byte streams, MIME types are determined from the provided metadata. This enables the router to classify a diverse collection of files and data streams for specialized processing.
The router's flexibility is enhanced by the support for regex patterns in the mime_types
parameter, allowing users
to specify broad categories (e.g., 'audio/' or 'text/') or more specific types with regex patterns. This feature
is designed to be backward compatible, treating MIME types without regex patterns as exact matches.
Usage example:
from haystack.components.routers import FileTypeRouter
from pathlib import Path
# For exact MIME type matching
router = FileTypeRouter(mime_types=["text/plain", "application/pdf"])
# For flexible matching using regex, to handle all audio types
router_with_regex = FileTypeRouter(mime_types=[r"audio/.*", r"text/plain"])
sources = [Path("file.txt"), Path("document.pdf"), Path("song.mp3")]
print(router.run(sources=sources))
print(router_with_regex.run(sources=sources))
# Expected output:
# {'text/plain': [
# PosixPath('file.txt')], 'application/pdf': [PosixPath('document.pdf')], 'unclassified': [PosixPath('song.mp3')
# ]}
# {'audio/.*': [
# PosixPath('song.mp3')], 'text/plain': [PosixPath('file.txt')], 'unclassified': [PosixPath('document.pdf')
# ]}
Arguments:
mime_types
: A list of MIME types or regex patterns to classify the incoming files or data streams.
FileTypeRouter.__init__
def __init__(mime_types: List[str])
Initialize the FileTypeRouter component.
Arguments:
mime_types
: A list of file mime types to consider when routing files (e.g.["text/plain", "audio/x-wav", "image/jpeg"]
).
FileTypeRouter.run
def run(
sources: List[Union[str, Path, ByteStream]]
) -> Dict[str, List[Union[ByteStream, Path]]]
Categorizes the provided data sources by their MIME types.
Arguments:
sources
: A list of file paths or byte streams to categorize.
Returns:
A dictionary where the keys are MIME types (or "unclassified"
) and the values are lists of data
sources.
Module metadata_router
MetadataRouter
A component that routes documents to different connections based on the content of their metadata fields.
Usage example:
from haystack import Document
from haystack.components.routers import MetadataRouter
docs = [Document(content="Paris is the capital of France.", meta={"language": "en"}),
Document(content="Berlin ist die Haupststadt von Deutschland.", meta={"language": "de"})]
router = MetadataRouter(rules={"en": {"field": "meta.language", "operator": "==", "value": "en"}})
print(router.run(documents=docs))
# {'en': [Document(id=..., content: 'Paris is the capital of France.', meta: {'language': 'en'})],
# 'unmatched': [Document(id=..., content: 'Berlin ist die Haupststadt von Deutschland.', meta: {'language': 'de'})]}
MetadataRouter.__init__
def __init__(rules: Dict[str, Dict])
Initialize the MetadataRouter.
Arguments:
rules
: A dictionary of rules that specify which output connection to route a document to based on its metadata. The keys of the dictionary are the names of the output connections, and the values are dictionaries that follow the format of filtering expressions in Haystack. For example:
{
"edge_1": {
"operator": "AND",
"conditions": [
{"field": "meta.created_at", "operator": ">=", "value": "2023-01-01"},
{"field": "meta.created_at", "operator": "<", "value": "2023-04-01"},
],
},
"edge_2": {
"operator": "AND",
"conditions": [
{"field": "meta.created_at", "operator": ">=", "value": "2023-04-01"},
{"field": "meta.created_at", "operator": "<", "value": "2023-07-01"},
],
},
"edge_3": {
"operator": "AND",
"conditions": [
{"field": "meta.created_at", "operator": ">=", "value": "2023-07-01"},
{"field": "meta.created_at", "operator": "<", "value": "2023-10-01"},
],
},
"edge_4": {
"operator": "AND",
"conditions": [
{"field": "meta.created_at", "operator": ">=", "value": "2023-10-01"},
{"field": "meta.created_at", "operator": "<", "value": "2024-01-01"},
],
},
}
MetadataRouter.run
def run(documents: List[Document])
Route the documents.
Route the documents to different edges based on their fields content and the rules specified during initialization. If a document does not match any of the rules, it is routed to a connection named "unmatched".
Arguments:
documents
: A list of documents to route to different edges.
Returns:
A dictionary where the keys are the names of the output connections (including "unmatched"
)
and the values are lists of routed documents.
Module text_language_router
TextLanguageRouter
Routes a text input onto one of different output connections depending on its language.
The set of supported languages can be specified.
For routing Documents based on their language use the DocumentLanguageClassifier
component to first
classify the documents and then the MetaDataRouter
to route them.
Usage example in a retrieval pipeline that passes only English language queries to the retriever:
from haystack import Pipeline, Document
from haystack.components.routers import TextLanguageRouter
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
document_store = InMemoryDocumentStore()
document_store.write_documents([Document(content="Elvis Presley was an American singer and actor.")])
p = Pipeline()
p.add_component(instance=TextLanguageRouter(languages=["en"]), name="text_language_router")
p.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="retriever")
p.connect("text_language_router.en", "retriever.query")
result = p.run({"text_language_router": {"text": "Who was Elvis Presley?"}})
assert result["retriever"]["documents"][0].content == "Elvis Presley was an American singer and actor."
result = p.run({"text_language_router": {"text": "ένα ελληνικό κείμενο"}})
assert result["text_language_router"]["unmatched"] == "ένα ελληνικό κείμενο"
TextLanguageRouter.__init__
def __init__(languages: Optional[List[str]] = None)
Initialize the TextLanguageRouter component.
Arguments:
languages
: A list of languages in ISO code, each corresponding to a different output connection. For supported languages, see thelangdetect
documentation. If not specified, the default is["en"]
.
TextLanguageRouter.run
def run(text: str) -> Dict[str, str]
Route the text to one of different output connections based on its language.
If the text does not match any of the languages specified at initialization, it is routed to a connection named "unmatched".
Arguments:
text
: A string to route to different edges based on its language.
Raises:
TypeError
: If the input is not a string.
Returns:
A dictionary of length one in which the key is the language (or "unmatched"
)
and the value is the text.
Module transformers_text_router
TransformersTextRouter
Routes a text input onto different output connections depending on which label it has been categorized into.
This is useful for routing queries to different models in a pipeline depending on their categorization.
The set of labels to be used for categorization are provided by the selected model.
Example usage in a query pipeline that routes english queries to a text generator optimized for english text and
german queries to a text generator optimized for german text.
```python
from haystack.core.pipeline import Pipeline
from haystack.components.routers import TransformersTextRouter
from haystack.components.builders import PromptBuilder
from haystack.components.generators import HuggingFaceLocalGenerator
p = Pipeline()
p.add_component(
instance=TransformersTextRouter(model="papluca/xlm-roberta-base-language-detection"),
name="text_router"
)
p.add_component(
instance=PromptBuilder(template="Answer the question: {{query}}
Answer:"), name="english_prompt_builder" ) p.add_component( instance=PromptBuilder(template="Beantworte die Frage: {{query}} Antwort:"), name="german_prompt_builder" )
p.add_component(
instance=HuggingFaceLocalGenerator(model="DiscoResearch/Llama3-DiscoLeo-Instruct-8B-v0.1"),
name="german_llm"
)
p.add_component(
instance=HuggingFaceLocalGenerator(model="microsoft/Phi-3-mini-4k-instruct"),
name="english_llm"
)
p.connect("text_router.en", "english_prompt_builder.query")
p.connect("text_router.de", "german_prompt_builder.query")
p.connect("english_prompt_builder.prompt", "english_llm.prompt")
p.connect("german_prompt_builder.prompt", "german_llm.prompt")
# English Example
print(p.run({"text_router": {"text": "What is the capital of Germany?"}}))
# German Example
print(p.run({"text_router": {"text": "Was ist die Hauptstadt von Deutschland?"}}))
```
TransformersTextRouter.__init__
def __init__(model: str,
labels: Optional[List[str]] = None,
device: Optional[ComponentDevice] = None,
token: Optional[Secret] = Secret.from_env_var(
["HF_API_TOKEN", "HF_TOKEN"], strict=False),
huggingface_pipeline_kwargs: Optional[Dict[str, Any]] = None)
Initializes the TransformersTextRouter.
Arguments:
model
: The name or path of a Hugging Face model for text classification.labels
: The list of labels that the model has been trained to predict. If not provided, the labels are fetched from the model configuration file hosted on the HuggingFace Hub usingtransformers.AutoConfig.from_pretrained
.device
: The device on which the model is loaded. IfNone
, the default device is automatically selected. If a device/device map is specified inhuggingface_pipeline_kwargs
, it overrides this parameter.token
: The API token used to download private models from Hugging Face. Iftoken
is set toTrue
, the token generated when runningtransformers-cli login
(stored in ~/.huggingface) is used.huggingface_pipeline_kwargs
: Dictionary containing keyword arguments used to initialize the Hugging Face pipeline for text classification.
TransformersTextRouter.warm_up
def warm_up()
Initializes the component.
TransformersTextRouter.to_dict
def to_dict() -> Dict[str, Any]
Serializes the component to a dictionary.
Returns:
Dictionary with serialized data.
TransformersTextRouter.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "TransformersTextRouter"
Deserializes the component from a dictionary.
Arguments:
data
: Dictionary to deserialize from.
Returns:
Deserialized component.
TransformersTextRouter.run
@component.output_types(documents=Dict[str, str])
def run(text: str)
Run the TransformersTextRouter.
This method routes the text to one of the different edges based on which label it has been categorized into.
Arguments:
text
: A str to route to one of the different edges.
Raises:
TypeError
: If the input is not a str.RuntimeError
: If the pipeline has not been loaded because warm_up() was not called before.
Returns:
A dictionary with the label as key and the text as value.
Module zero_shot_text_router
TransformersZeroShotTextRouter
Routes a text input onto different output connections depending on which label it has been categorized into.
This is useful for routing queries to different models in a pipeline depending on their categorization. The set of labels to be used for categorization can be specified.
Example usage in a retrieval pipeline that passes question-like queries to a text embedder optimized for query-passage retrieval and passage-like queries to a text embedder optimized for passage-passage retrieval.
from haystack import Document
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.core.pipeline import Pipeline
from haystack.components.routers import TransformersZeroShotTextRouter
from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
from haystack.components.retrievers import InMemoryEmbeddingRetriever
document_store = InMemoryDocumentStore()
doc_embedder = SentenceTransformersDocumentEmbedder(model="intfloat/e5-base-v2")
doc_embedder.warm_up()
docs = [
Document(
content="Germany, officially the Federal Republic of Germany, is a country in the western region of "
"Central Europe. The nation's capital and most populous city is Berlin and its main financial centre "
"is Frankfurt; the largest urban area is the Ruhr."
),
Document(
content="France, officially the French Republic, is a country located primarily in Western Europe. "
"France is a unitary semi-presidential republic with its capital in Paris, the country's largest city "
"and main cultural and commercial centre; other major urban areas include Marseille, Lyon, Toulouse, "
"Lille, Bordeaux, Strasbourg, Nantes and Nice."
)
]
docs_with_embeddings = doc_embedder.run(docs)
document_store.write_documents(docs_with_embeddings["documents"])
p = Pipeline()
p.add_component(instance=TransformersZeroShotTextRouter(labels=["passage", "query"]), name="text_router")
p.add_component(
instance=SentenceTransformersTextEmbedder(model="intfloat/e5-base-v2", prefix="passage: "),
name="passage_embedder"
)
p.add_component(
instance=SentenceTransformersTextEmbedder(model="intfloat/e5-base-v2", prefix="query: "),
name="query_embedder"
)
p.add_component(
instance=InMemoryEmbeddingRetriever(document_store=document_store),
name="query_retriever"
)
p.add_component(
instance=InMemoryEmbeddingRetriever(document_store=document_store),
name="passage_retriever"
)
p.connect("text_router.passage", "passage_embedder.text")
p.connect("passage_embedder.embedding", "passage_retriever.query_embedding")
p.connect("text_router.query", "query_embedder.text")
p.connect("query_embedder.embedding", "query_retriever.query_embedding")
# Query Example
p.run({"text_router": {"text": "What is the capital of Germany?"}})
# Passage Example
p.run({
"text_router":{
"text": "The United Kingdom of Great Britain and Northern Ireland, commonly known as the " "United Kingdom (UK) or Britain, is a country in Northwestern Europe, off the north-western coast of " "the continental mainland."
}
})
TransformersZeroShotTextRouter.__init__
def __init__(labels: List[str],
multi_label: bool = False,
model: str = "MoritzLaurer/deberta-v3-base-zeroshot-v1.1-all-33",
device: Optional[ComponentDevice] = None,
token: Optional[Secret] = Secret.from_env_var(
["HF_API_TOKEN", "HF_TOKEN"], strict=False),
huggingface_pipeline_kwargs: Optional[Dict[str, Any]] = None)
Initializes the TransformersZeroShotTextRouter.
Arguments:
labels
: The set of possible class labels to classify each sequence into. Can be a single label, a string of comma-separated labels, or a list of labels.multi_label
: Whether or not multiple candidate labels can be true. If False, the scores are normalized such that the sum of the label likelihoods for each sequence is 1. If True, the labels are considered independent and probabilities are normalized for each candidate by doing a softmax of the entailment score vs. the contradiction score.model
: The name or path of a Hugging Face model for zero-shot text classification.device
: The device on which the model is loaded. IfNone
, the default device is automatically selected. If a device/device map is specified inhuggingface_pipeline_kwargs
, it overrides this parameter.token
: The API token used to download private models from Hugging Face. Iftoken
is set toTrue
, the token generated when runningtransformers-cli login
(stored in ~/.huggingface) is used.huggingface_pipeline_kwargs
: Dictionary containing keyword arguments used to initialize the Hugging Face pipeline for zero shot text classification.
TransformersZeroShotTextRouter.warm_up
def warm_up()
Initializes the component.
TransformersZeroShotTextRouter.to_dict
def to_dict() -> Dict[str, Any]
Serializes the component to a dictionary.
Returns:
Dictionary with serialized data.
TransformersZeroShotTextRouter.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "TransformersZeroShotTextRouter"
Deserializes the component from a dictionary.
Arguments:
data
: Dictionary to deserialize from.
Returns:
Deserialized component.
TransformersZeroShotTextRouter.run
@component.output_types(documents=Dict[str, str])
def run(text: str)
Run the TransformersZeroShotTextRouter.
This method routes the text to one of the different edges based on which label it has been categorized into.
Arguments:
text
: A str to route to one of the different edges.
Raises:
TypeError
: If the input is not a str.RuntimeError
: If the pipeline has not been loaded because warm_up() was not called before.
Returns:
A dictionary with the label as key and the text as value.