DocumentationAPI ReferenceπŸ““ TutorialsπŸ§‘β€πŸ³ Cookbook🀝 IntegrationsπŸ’œ Discord

Routers is a group of components that route queries or Documents to other components that can handle them best.

Module conditional_router

NoRouteSelectedException

class NoRouteSelectedException(Exception)

Exception raised when no route is selected in ConditionalRouter.

RouteConditionException

class RouteConditionException(Exception)

Exception raised when there is an error parsing or evaluating the condition expression in ConditionalRouter.

ConditionalRouter

@component
class ConditionalRouter()

ConditionalRouter allows data routing based on specific conditions.

This is achieved by defining a list named routes. Each element in this list is a dictionary representing a single route. A route dictionary comprises four key elements:

  • condition: A Jinja2 string expression that determines if the route is selected.
  • output: A Jinja2 expression defining the route's output value.
  • output_type: The type of the output data (e.g., str, List[int]).
  • output_name: The name under which the output value of the route is published. This name is used to connect the router to other components in the pipeline.

Usage example:

from typing import List
from haystack.components.routers import ConditionalRouter

routes = [
    {
        "condition": "{{streams|length > 2}}",
        "output": "{{streams}}",
        "output_name": "enough_streams",
        "output_type": List[int],
    },
    {
        "condition": "{{streams|length <= 2}}",
        "output": "{{streams}}",
        "output_name": "insufficient_streams",
        "output_type": List[int],
    },
]
router = ConditionalRouter(routes)
# When 'streams' has more than 2 items, 'enough_streams' output will activate, emitting the list [1, 2, 3]
kwargs = {"streams": [1, 2, 3], "query": "Haystack"}
result = router.run(**kwargs)
assert result == {"enough_streams": [1, 2, 3]}

In this example, we configure two routes. The first route sends the 'streams' value to 'enough_streams' if the stream count exceeds two. Conversely, the second route directs 'streams' to 'insufficient_streams' when there are two or fewer streams.

In the pipeline setup, the router is connected to other components using the output names. For example, the 'enough_streams' output might be connected to another component that processes the streams, while the 'insufficient_streams' output might be connected to a component that fetches more streams, and so on.

Here is a pseudocode example of a pipeline that uses the ConditionalRouter and routes fetched ByteStreams to different components depending on the number of streams fetched:

from typing import List
from haystack import Pipeline
from haystack.dataclasses import ByteStream
from haystack.components.routers import ConditionalRouter

routes = [
    {
        "condition": "{{streams|length > 2}}",
        "output": "{{streams}}",
        "output_name": "enough_streams",
        "output_type": List[ByteStream],
    },
    {
        "condition": "{{streams|length <= 2}}",
        "output": "{{streams}}",
        "output_name": "insufficient_streams",
        "output_type": List[ByteStream],
    },
]

pipe = Pipeline()
pipe.add_component("router", router)
...
pipe.connect("router.enough_streams", "some_component_a.streams")
pipe.connect("router.insufficient_streams", "some_component_b.streams_or_some_other_input")
...

ConditionalRouter.__init__

def __init__(routes: List[Dict])

Initializes the ConditionalRouter with a list of routes detailing the conditions for routing.

Arguments:

  • routes: A list of dictionaries, each defining a route. A route dictionary comprises four key elements:
  • condition: A Jinja2 string expression that determines if the route is selected.
  • output: A Jinja2 expression defining the route's output value.
  • output_type: The type of the output data (e.g., str, List[int]).
  • output_name: The name under which the output value of the route is published. This name is used to connect the router to other components in the pipeline.

ConditionalRouter.to_dict

def to_dict() -> Dict[str, Any]

Serializes the component to a dictionary.

Returns:

Dictionary with serialized data.

ConditionalRouter.from_dict

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ConditionalRouter"

Deserializes the component from a dictionary.

Arguments:

  • data: The dictionary to deserialize from.

Returns:

The deserialized component.

ConditionalRouter.run

def run(**kwargs)

Executes the routing logic by evaluating the specified boolean condition expressions for each route in the order they are listed.

The method directs the flow of data to the output specified in the first route whose condition is True.

Arguments:

  • kwargs: All variables used in the condition expressed in the routes. When the component is used in a pipeline, these variables are passed from the previous component's output.

Raises:

  • NoRouteSelectedException: If no condition' in the routes is True`.
  • RouteConditionException: If there is an error parsing or evaluating the condition expression in the routes.

Returns:

A dictionary where the key is the output_name of the selected route and the value is the output of the selected route.

Module file_type_router

FileTypeRouter

@component
class FileTypeRouter()

FileTypeRouter takes a list of data sources (file paths or byte streams) and groups them by their corresponding MIME types.

For file paths, MIME types are inferred from their extensions, while for byte streams, MIME types are determined from the provided metadata.

The set of MIME types to consider is specified during the initialization of the component.

This component is useful when you need to classify a large collection of files or data streams according to their MIME types and route them to different components for further processing.

Usage example:

from haystack.components.routers import FileTypeRouter

router = FileTypeRouter(mime_types=["text/plain"])

print(router.run(sources=["text_file.txt", "pdf_file.pdf"]))

# defaultdict(<class 'list'>, {'text/plain': [PosixPath('text_file.txt')],
#                              'unclassified': [PosixPath('pdf_file.pdf')]})

FileTypeRouter.__init__

def __init__(mime_types: List[str])

Arguments:

  • mime_types: A list of file mime types to consider when routing files (e.g. ["text/plain", "audio/x-wav", "image/jpeg"]).

FileTypeRouter.run

def run(
    sources: List[Union[str, Path, ByteStream]]
) -> Dict[str, List[Union[ByteStream, Path]]]

Categorizes the provided data sources by their MIME types.

Arguments:

  • sources: A list of file paths or byte streams to categorize.

Returns:

A dictionary where the keys are MIME types (or "unclassified") and the values are lists of data sources.

Module metadata_router

MetadataRouter

@component
class MetadataRouter()

A component that routes documents to different connections based on the content of their metadata fields.

Usage example:

from haystack import Document
from haystack.components.routers import MetadataRouter

docs = [Document(content="Paris is the capital of France.", meta={"language": "en"}),
        Document(content="Berlin ist die Haupststadt von Deutschland.", meta={"language": "de"})]

router = MetadataRouter(rules={"en": {"field": "meta.language", "operator": "==", "value": "en"}})

print(router.run(documents=docs))

# {'en': [Document(id=..., content: 'Paris is the capital of France.', meta: {'language': 'en'})],
#  'unmatched': [Document(id=..., content: 'Berlin ist die Haupststadt von Deutschland.', meta: {'language': 'de'})]}

MetadataRouter.__init__

def __init__(rules: Dict[str, Dict])

Initialize the MetadataRouter.

Arguments:

  • rules: A dictionary of rules that specify which output connection to route a document to based on its metadata. The keys of the dictionary are the names of the output connections, and the values are dictionaries that follow the format of filtering expressions in Haystack. For example:
{
"edge_1": {
    "operator": "AND",
    "conditions": [
        {"field": "meta.created_at", "operator": ">=", "value": "2023-01-01"},
        {"field": "meta.created_at", "operator": "<", "value": "2023-04-01"},
    ],
},
"edge_2": {
    "operator": "AND",
    "conditions": [
        {"field": "meta.created_at", "operator": ">=", "value": "2023-04-01"},
        {"field": "meta.created_at", "operator": "<", "value": "2023-07-01"},
    ],
},
"edge_3": {
    "operator": "AND",
    "conditions": [
        {"field": "meta.created_at", "operator": ">=", "value": "2023-07-01"},
        {"field": "meta.created_at", "operator": "<", "value": "2023-10-01"},
    ],
},
"edge_4": {
    "operator": "AND",
    "conditions": [
        {"field": "meta.created_at", "operator": ">=", "value": "2023-10-01"},
        {"field": "meta.created_at", "operator": "<", "value": "2024-01-01"},
    ],
},
}

MetadataRouter.run

def run(documents: List[Document])

Route the documents to different edges based on their fields content and the rules specified during initialization.

If a document does not match any of the rules, it is routed to a connection named "unmatched".

Arguments:

  • documents: A list of documents to route to different edges.

Returns:

A dictionary where the keys are the names of the output connections (including "unmatched") and the values are lists of routed documents.

Module text_language_router

TextLanguageRouter

@component
class TextLanguageRouter()

Routes a text input onto one of different output connections depending on its language.

The set of supported languages can be specified. For routing Documents based on their language use the DocumentLanguageClassifier component to first classify the documents and then the MetaDataRouter to route them.

Usage example in a retrieval pipeline that passes only English language queries to the retriever:

from haystack import Pipeline, Document
from haystack.components.routers import TextLanguageRouter
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever

document_store = InMemoryDocumentStore()
document_store.write_documents([Document(content="Elvis Presley was an American singer and actor.")])

p = Pipeline()
p.add_component(instance=TextLanguageRouter(languages=["en"]), name="text_language_router")
p.add_component(instance=InMemoryBM25Retriever(document_store=document_store), name="retriever")
p.connect("text_language_router.en", "retriever.query")

result = p.run({"text_language_router": {"text": "Who was Elvis Presley?"}})
assert result["retriever"]["documents"][0].content == "Elvis Presley was an American singer and actor."

result = p.run({"text_language_router": {"text": "ένα Ρλληνικό κΡίμΡνο"}})
assert result["text_language_router"]["unmatched"] == "ένα Ρλληνικό κΡίμΡνο"

TextLanguageRouter.__init__

def __init__(languages: Optional[List[str]] = None)

Arguments:

  • languages: A list of languages in ISO code, each corresponding to a different output connection. For supported languages, see the langdetect documentation. If not specified, the default is ["en"].

TextLanguageRouter.run

def run(text: str) -> Dict[str, str]

Route the text to one of different output connections based on its language.

If the text does not match any of the languages specified at initialization, it is routed to a connection named "unmatched".

Arguments:

  • text: A string to route to different edges based on its language.

Raises:

  • TypeError: If the input is not a string.

Returns:

A dictionary of length one in which the key is the language (or "unmatched") and the value is the text.