DocumentationAPI Reference📓 Tutorials🧑‍🍳 Cookbook🤝 Integrations💜 Discord🎨 Studio (Waitlist)
Documentation

Serializing Pipelines

Save your pipelines into a custom format and explore the serialization options.

Serialization means converting a pipeline to a format that you can save on your disk and load later.

📘

Serialization formats

Haystack 2.0 only supports YAML format at this time. We will be rolling out more formats gradually.

Converting a Pipeline to YAML

Use the dumps() method to convert a Pipeline object to YAML:

from haystack import Pipeline

pipe = Pipeline()
print(pipe.dumps())

# Prints:
#
# components: {}
# connections: []
# max_loops_allowed: 100
# metadata: {}

You can also use dump() method to save the YAML representation of a pipeline in a file:

with open("/content/test.yml", "w") as file:
  pipe.dump(file)

Converting a Pipeline Back to Python

You can convert a YAML pipeline back into Python. Use the loads() method to convert a string representation of a pipeline (str, bytes or bytearray) or the load() method to convert a pipeline represented in a file-like object into a corresponding Python object.

Both loading methods support callbacks that let you modify components during the deserialization process.

Here is an example script:

from haystack import Pipeline
from haystack.core.serialization import DeserializationCallbacks
from typing import Type, Dict, Any


# This is the YAML you want to convert to Python:
pipeline_yaml = """
components:
  cleaner:
    init_parameters:
      remove_empty_lines: true
      remove_extra_whitespaces: true
      remove_regex: null
      remove_repeated_substrings: false
      remove_substrings: null
    type: haystack.components.preprocessors.document_cleaner.DocumentCleaner
  converter:
    init_parameters:
      encoding: utf-8
    type: haystack.components.converters.txt.TextFileToDocument
connections:
- receiver: cleaner.documents
  sender: converter.documents
max_loops_allowed: 100
metadata: {}
"""

def component_pre_init_callback(component_name: str, component_cls: Type, init_params: Dict[str, Any]):
   # This function gets called every time a component is deserialized.
   if component_name == "cleaner":
      assert "DocumentCleaner" in component_cls.__name__
      # Modify the init parameters. The modified parameters are passed to
      # the init method of the component during deserialization.
      init_params["remove_empty_lines"] = False
      print("Modified 'remove_empty_lines' to False in 'cleaner' component")
   else:
      print(f"Not modifying component {component_name} of class {component_cls}")

pipe = Pipeline.loads(pipeline_yaml, callbacks=DeserializationCallbacks(component_pre_init_callback))

Performing Custom Serialization

Pipelines and components in Haystack can serialize simple components, including custom ones, out of the box. Code like this just works:

from haystack import component

@component
class RepeatWordComponent:
    def __init__(self, times: int):
        self.times = times

    @component.output_types(result=str)
    def run(self, word: str):
        return word * self.times

On the other hand, this code doesn't work if the final format is JSON, as the set type is not JSON-serializable:

from haystack import component

@component
class SetIntersector:
    def __init__(self, intersect_with: set):
        self.intersect_with = intersect_with

    @component.output_types(result=set)
    def run(self, data: set):
        return data.intersection(self.intersect_with)

In such cases, you can provide your own implementation from_dict and to_dict to components:

from haystack import component, default_from_dict, default_to_dict

class SetIntersector:
		def __init__(self, intersect_with: set):
	      self.intersect_with = intersect_with
		
    @component.output_types(result=set)
	  def run(self, data: set):
        return data.intersect(self.intersect_with)

    def to_dict(self):
        return default_to_dict(self, intersect_with=list(self.intersect_with))

    @classmethod
    def from_dict(cls, data):
        # convert the set into a list for the dict representation,
        # so it can be converted to JSON
        data["intersect_with"] = set(data["intersect_with"])
        return default_from_dict(cls, data)

Saving a Pipeline to a Custom Format

Once a pipeline is available in its dictionary format, the last step of serialization is to convert that dictionary into a format you can store or send over the wire. Haystack supports YAML out of the box, but if you need a different format, you can write a custom Marshaller.

A Marshaller is a Python class responsible for converting text to a dictionary and a dictionary to text according to a certain format. Marshallers must respect the Marshaller protocol, providing the methods marshal and unmarshal.

This is the code for a custom TOML marshaller that relies on the rtoml library:

# This code requires a `pip install rtoml`
from typing import Dict, Any, Union
import rtoml

class TomlMarshaller:
    def marshal(self, dict_: Dict[str, Any]) -> str:
        return rtoml.dumps(dict_)

    def unmarshal(self, data_: Union[str, bytes]) -> Dict[str, Any]:
        return dict(rtoml.loads(data_))

You can then pass a Marshaller instance to the methods dump, dumps, load, and loads:

from haystack import Pipeline
from my_custom_marshallers import TomlMarshaller

pipe = Pipeline()
pipe.dumps(TomlMarshaller())
# prints:
# 'max_loops_allowed = 100\nconnections = []\n\n[metadata]\n\n[components]\n'

Additional References

📓 Tutorial: Serializing LLM Pipelines