Serializing Pipelines
Save your pipelines into a custom format and explore the serialization options.
Serialization means converting a pipeline to a format that you can save on your disk and load later.
Serialization formats
Haystack 2.0 only supports YAML format at this time. We will be rolling out more formats gradually.
Converting a Pipeline to YAML
Use the dumps()
method to convert a Pipeline object to YAML:
from haystack import Pipeline
pipe = Pipeline()
print(pipe.dumps())
# Prints:
#
# components: {}
# connections: []
# max_loops_allowed: 100
# metadata: {}
Converting a Pipeline Back to Python
You can convert a YAML pipeline back into Python. Use the loads()
method to convert a string representation of a pipeline (str
, bytes
or bytearray
) or the load()
method to convert a pipeline represented in a file-like object into a corresponding Python object.
Both loading methods support callbacks that let you modify components during the deserialization process.
Here is an example script:
from haystack import Pipeline
from haystack.core.serialization import DeserializationCallbacks
from typing import Type, Dict, Any
# This is the YAML you want to convert to Python:
pipeline_yaml = """
components:
cleaner:
init_parameters:
remove_empty_lines: true
remove_extra_whitespaces: true
remove_regex: null
remove_repeated_substrings: false
remove_substrings: null
type: haystack.components.preprocessors.document_cleaner.DocumentCleaner
converter:
init_parameters:
encoding: utf-8
type: haystack.components.converters.txt.TextFileToDocument
connections:
- receiver: cleaner.documents
sender: converter.documents
max_loops_allowed: 100
metadata: {}
"""
def component_pre_init_callback(component_name: str, component_cls: Type, init_params: Dict[str, Any]):
# This function gets called every time a component is deserialized.
if component_name == "cleaner":
assert "DocumentCleaner" in component_cls.__name__
# Modify the init parameters. The modified parameters are passed to
# the init method of the component during deserialization.
init_params["remove_empty_lines"] = False
print("Modified 'remove_empty_lines' to False in 'cleaner' component")
else:
print(f"Not modifying component {component_name} of class {component_cls}")
pipe = Pipeline.loads(pipeline_yaml, callbacks=DeserializationCallbacks(component_pre_init_callback))
Performing Custom Serialization
Pipelines and components in Haystack can serialize simple components, including custom ones, out of the box. Code like this just works:
from haystack import component
@component
class RepeatWordComponent:
def __init__(self, times: int):
self.times = times
@component.output_types(result=str)
def run(self, word: str):
return word * self.times
On the other hand, this code doesn't work if the final format is JSON, as the set
type is not JSON-serializable:
from haystack import component
@component
class SetIntersector:
def __init__(self, intersect_with: set):
self.intersect_with = intersect_with
@component.output_types(result=set)
def run(self, data: set):
return data.intersection(self.intersect_with)
In such cases, you can provide your own implementation from_dict
and to_dict
to components:
from haystack import component, default_from_dict, default_to_dict
class SetIntersector:
def __init__(self, intersect_with: set):
self.intersect_with = intersect_with
@component.output_types(result=set)
def run(self, data: set):
return data.intersect(self.intersect_with)
def to_dict(self):
return default_to_dict(self, intersect_with=list(self.intersect_with))
@classmethod
def from_dict(cls, data):
# convert the set into a list for the dict representation,
# so it can be converted to JSON
data["intersect_with"] = set(data["intersect_with"])
return default_from_dict(cls, data)
Saving a Pipeline to a Custom Format
Once a pipeline is available in its dictionary format, the last step of serialization is to convert that dictionary into a format you can store or send over the wire. Haystack supports YAML out of the box, but if you need a different format, you can write a custom Marshaller.
A Marshaller
is a Python class responsible for converting text to a dictionary and a dictionary to text according to a certain format. Marshallers must respect the Marshaller
protocol, providing the methods marshal
and unmarshal
.
This is the code for a custom TOML marshaller that relies on the rtoml
library:
# This code requires a `pip install rtoml`
from typing import Dict, Any, Union
import rtoml
class TomlMarshaller:
def marshal(self, dict_: Dict[str, Any]) -> str:
return rtoml.dumps(dict_)
def unmarshal(self, data_: Union[str, bytes]) -> Dict[str, Any]:
return dict(rtoml.loads(data_))
You can then pass a Marshaller instance to the methods dump
, dumps
, load
, and loads
:
from haystack import Pipeline
from my_custom_marshallers import TomlMarshaller
pipe = Pipeline()
pipe.dumps(TomlMarshaller())
# prints:
# 'max_loops_allowed = 100\nconnections = []\n\n[metadata]\n\n[components]\n'
Updated 6 months ago