Other utility components for Haystack Pipelines.
Module multiplexer
Multiplexer
@component(is_greedy=True)
class Multiplexer()
Multiplexer
offers the ability to both receive data connections from multiple other
components and to distribute it to various other components, enhancing the functionality of complex data
processing pipelines.
Multiplexer
is important for spreading outputs from a single source like a Large Language Model (LLM) across
different branches of a pipeline. It is especially valuable in error correction loops by rerouting data for
reevaluation if errors are detected. For instance, in an example pipeline below, Multiplexer
helps create
a schema valid JSON object (given a person's data) with the help of an OpenAIChatGenerator
and a JsonSchemaValidator
.
In case the generated JSON object fails schema validation, JsonSchemaValidator
starts a correction loop, sending
the data back through the Multiplexer
to the OpenAIChatGenerator
until it passes schema validation. If we didn't
have Multiplexer
, we wouldn't be able to loop back the data to OpenAIChatGenerator
for re-generation, as components
accept only one input connection for the declared run method parameters.
Usage example:
import json
from typing import List
from haystack import Pipeline
from haystack.components.converters import OutputAdapter
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.components.others import Multiplexer
from haystack.components.validators import JsonSchemaValidator
from haystack.dataclasses import ChatMessage
person_schema = {
"type": "object",
"properties": {
"first_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"last_name": {"type": "string", "pattern": "^[A-Z][a-z]+$"},
"nationality": {"type": "string", "enum": ["Italian", "Portuguese", "American"]},
},
"required": ["first_name", "last_name", "nationality"]
}
# Initialize a pipeline
pipe = Pipeline()
# Add components to the pipeline
pipe.add_component('mx', Multiplexer(List[ChatMessage]))
pipe.add_component('fc_llm', OpenAIChatGenerator(model="gpt-3.5-turbo-0125"))
pipe.add_component('validator', JsonSchemaValidator(json_schema=person_schema))
pipe.add_component('adapter', OutputAdapter("{{chat_message}}", List[ChatMessage])),
# And connect them
pipe.connect("adapter", "mx")
pipe.connect("mx", "fc_llm")
pipe.connect("fc_llm.replies", "validator.messages")
pipe.connect("validator.validation_error", "mx")
result = pipe.run(data={"fc_llm": {"generation_kwargs": {"response_format": {"type": "json_object"}}},
"adapter": {"chat_message": [ChatMessage.from_user("Create json object from Peter Parker")]}})
print(json.loads(result["validator"]["validated"][0].content))
>> {'first_name': 'Peter', 'last_name': 'Parker', 'nationality': 'American', 'name': 'Spider-Man', 'occupation':
>> 'Superhero', 'age': 23, 'location': 'New York City'}
Note that Multiplexer
is created with a single type parameter. This determines the
type of data that Multiplexer
will receive from the upstream connected components and also the
type of data that Multiplexer
will distribute to the downstream connected components. In the example
above, the Multiplexer
is created with the type List[ChatMessage]
. This means Multiplexer
will receive
a list of ChatMessage
objects from the upstream connected components and also distribute a list of ChatMessage
objects to the downstream connected components.
In the code example, Multiplexer
receives a looped back List[ChatMessage]
from the JsonSchemaValidator
and
sends it down to the OpenAIChatGenerator
for re-generation. We can have multiple loop back connections in the
pipeline. In this instance, the downstream component is only one – the OpenAIChatGenerator
– but the pipeline can have more
than one downstream component.
Multiplexer.__init__
def __init__(type_: TypeAlias)
Create a Multiplexer
component.
Arguments:
type_
: The type of data that theMultiplexer
will receive from the upstream connected components and distribute to the downstream connected components.
Multiplexer.to_dict
def to_dict()
Serializes the component to a dictionary.
Returns:
Dictionary with serialized data.
Multiplexer.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Multiplexer"
Deserializes the component from a dictionary.
Arguments:
data
: Dictionary to deserialize from.
Returns:
Deserialized component.
Multiplexer.run
def run(**kwargs)
Multiplexes the input data from the upstream connected components and distributes it to the downstream connected
components.
Arguments:
**kwargs
: The input data. Must be of the type declared in__init__
.
Returns:
A dictionary with the following keys:
value
: The input data.