DocumentationAPI Reference📓 Tutorials🧑‍🍳 Cookbook🤝 Integrations💜 Discord🎨 Studio
API Reference

Arranges components and integrations in flow.

Module pipeline

Pipeline

class Pipeline()

Components orchestration engine.

Builds a graph of components and orchestrates their execution according to the execution graph.

Pipeline.__init__

def __init__(metadata: Optional[Dict[str, Any]] = None,
             max_loops_allowed: int = 100,
             debug_path: Union[Path, str] = Path(".haystack_debug/"))

Creates the Pipeline.

Arguments:

  • metadata: Arbitrary dictionary to store metadata about this pipeline. Make sure all the values contained in this dictionary can be serialized and deserialized if you wish to save this pipeline to file with save_pipelines()/load_pipelines().
  • max_loops_allowed: How many times the pipeline can run the same node before throwing an exception.
  • debug_path: When debug is enabled in run(), where to save the debug data.

Pipeline.__eq__

def __eq__(other) -> bool

Equal pipelines share every metadata, node and edge, but they're not required to use the same node instances: this allows pipeline saved and then loaded back to be equal to themselves.

Pipeline.__repr__

def __repr__() -> str

Returns a text representation of the Pipeline. If this runs in a Jupyter notebook, it will instead display the Pipeline image.

Pipeline.to_dict

def to_dict() -> Dict[str, Any]

Serializes the pipeline to a dictionary.

This is meant to be an intermediate representation but it can be also used to save a pipeline to file.

Returns:

Dictionary with serialized data.

Pipeline.from_dict

@classmethod
def from_dict(cls: Type[T], data: Dict[str, Any], **kwargs) -> T

Deserializes the pipeline from a dictionary.

Arguments:

  • data: Dictionary to deserialize from.
  • kwargs: components: a dictionary of {name: instance} to reuse instances of components instead of creating new ones.

Returns:

Deserialized component.

Pipeline.dumps

def dumps(marshaller: Marshaller = DEFAULT_MARSHALLER) -> str

Returns the string representation of this pipeline according to the

format dictated by the Marshaller in use.

Arguments:

  • marshaller: The Marshaller used to create the string representation. Defaults to YamlMarshaller.

Returns:

A string representing the pipeline.

Pipeline.dump

def dump(fp: TextIO, marshaller: Marshaller = DEFAULT_MARSHALLER)

Writes the string representation of this pipeline to the file-like object

passed in the fp argument.

Arguments:

  • fp: A file-like object ready to be written to.
  • marshaller: The Marshaller used to create the string representation. Defaults to YamlMarshaller.

Pipeline.loads

@classmethod
def loads(cls,
          data: Union[str, bytes, bytearray],
          marshaller: Marshaller = DEFAULT_MARSHALLER) -> "Pipeline"

Creates a Pipeline object from the string representation passed in the data argument.

Arguments:

  • data: The string representation of the pipeline, can be str, bytes or bytearray.
  • marshaller: The Marshaller used to create the string representation. Defaults to YamlMarshaller.

Returns:

A Pipeline object.

Pipeline.load

@classmethod
def load(cls,
         fp: TextIO,
         marshaller: Marshaller = DEFAULT_MARSHALLER) -> "Pipeline"

Creates a Pipeline object from the string representation read from the file-like

object passed in the fp argument.

Arguments:

  • data: The string representation of the pipeline, can be str, bytes or bytearray.
  • fp: A file-like object ready to be read from.
  • marshaller: The Marshaller used to create the string representation. Defaults to YamlMarshaller.

Returns:

A Pipeline object.

Pipeline.add_component

def add_component(name: str, instance: Component) -> None

Add the given component to the pipeline.

Components are not connected to anything by default: use Pipeline.connect() to connect components together. Component names must be unique, but component instances can be reused if needed.

Arguments:

  • name: The name of the component to add.
  • instance: The component instance to add.

Raises:

  • ValueError: If a component with the same name already exists.
  • PipelineValidationError: If the given instance is not a Canals component.

Pipeline.connect

def connect(sender: str, receiver: str) -> "Pipeline"

Connects two components together.

All components to connect must exist in the pipeline. If connecting to an component that has several output connections, specify the inputs and output names as 'component_name.connections_name'.

Arguments:

  • sender: The component that delivers the value. This can be either just a component name or can be in the format component_name.connection_name if the component has multiple outputs.
  • receiver: The component that receives the value. This can be either just a component name or can be in the format component_name.connection_name if the component has multiple inputs.

Raises:

  • PipelineConnectError: If the two components cannot be connected (for example if one of the components is not present in the pipeline, or the connections don't match by type, and so on).

Returns:

The Pipeline instance.

Pipeline.get_component

def get_component(name: str) -> Component

Get the component with the specified name from the pipeline.

Arguments:

  • name: The name of the component.

Raises:

  • ValueError: If a component with that name is not present in the pipeline.

Returns:

The instance of that component.

Pipeline.get_component_name

def get_component_name(instance: Component) -> str

Returns the name of the Component instance if it has been added to this Pipeline or an empty string otherwise.

Arguments:

  • instance: The Component instance to look for.

Returns:

The name of the Component instance.

Pipeline.inputs

def inputs() -> Dict[str, Dict[str, Any]]

Returns a dictionary containing the inputs of a pipeline. Each key in the dictionary

corresponds to a component name, and its value is another dictionary that describes the input sockets of that component, including their types and whether they are optional.

Returns:

A dictionary where each key is a pipeline component name and each value is a dictionary of inputs sockets of that component.

Pipeline.outputs

def outputs() -> Dict[str, Dict[str, Any]]

Returns a dictionary containing the outputs of a pipeline. Each key in the dictionary

corresponds to a component name, and its value is another dictionary that describes the output sockets of that component.

Returns:

A dictionary where each key is a pipeline component name and each value is a dictionary of output sockets of that component.

Pipeline.show

def show() -> None

If running in a Jupyter notebook, display an image representing this Pipeline.

Pipeline.draw

def draw(path: Path) -> None

Save an image representing this Pipeline to path.

Arguments:

  • path: The path to save the image to.

Pipeline.walk

def walk() -> Iterator[Tuple[str, Component]]

Visits each component in the pipeline exactly once and yields its name and instance.

No guarantees are provided on the visiting order.

Returns:

An iterator of tuples of component name and component instance.

Pipeline.warm_up

def warm_up()

Make sure all nodes are warm.

It's the node's responsibility to make sure this method can be called at every Pipeline.run() without re-initializing everything.

Pipeline.run

def run(data: Dict[str, Any], debug: bool = False) -> Dict[str, Any]

Runs the pipeline with given input data.

Arguments:

  • data: A dictionary of inputs for the pipeline's components. Each key is a component name and its value is a dictionary of that component's input parameters.
  • debug: Set to True to collect and return debug information.

Raises:

  • PipelineRuntimeError: If a component fails or returns unexpected output. Example a - Using named components: Consider a 'Hello' component that takes a 'word' input and outputs a greeting.
@component
class Hello:
    @component.output_types(output=str)
    def run(self, word: str):
        return {"output": f"Hello, {word}!"}

Create a pipeline with two 'Hello' components connected together:

pipeline = Pipeline()
pipeline.add_component("hello", Hello())
pipeline.add_component("hello2", Hello())
pipeline.connect("hello.output", "hello2.word")
result = pipeline.run(data={"hello": {"word": "world"}})

This runs the pipeline with the specified input for 'hello', yielding {'hello2': {'output': 'Hello, Hello, world!!'}}.

Example b - Using flat inputs: You can also pass inputs directly without specifying component names:

result = pipeline.run(data={"word": "world"})

The pipeline resolves inputs to the correct components, returning {'hello2': {'output': 'Hello, Hello, world!!'}}.

Returns:

A dictionary containing the pipeline's output.

Pipeline.from_template

@classmethod
def from_template(
        cls,
        predefined_pipeline: PredefinedPipeline,
        template_params: Optional[Dict[str, Any]] = None) -> "Pipeline"

Create a Pipeline from a predefined template. See PredefinedPipeline for available options.

Arguments:

  • predefined_pipeline: The predefined pipeline to use.
  • template_params: An optional dictionary of parameters to use when rendering the pipeline template.

Returns:

An instance of Pipeline.

parse_connect_string

def parse_connect_string(connection: str) -> Tuple[str, Optional[str]]

Returns component-connection pairs from a connect_to/from string.

Arguments:

  • connection: The connection string.

Returns:

A tuple containing the component name and the connection name.