ValkeyDocumentStore
| API reference | Valkey |
| GitHub link | https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/valkey |
Valkey is a high-performance, in-memory data structure store that you can use in Haystack pipelines with the ValkeyDocumentStore. Valkey operates in-memory by default for maximum performance, but can be configured with persistence options for data durability.
The ValkeyDocumentStore connects to a Valkey server with the search module running and supports vector similarity search for RAG and other retrieval use cases. For a detailed overview of all the available methods and settings, visit the API Reference.
Installation
You can install the Valkey Haystack integration with:
Initialization
To use Valkey as your data storage for Haystack pipelines, you need a Valkey server with the search module running. Initialize a ValkeyDocumentStore like this:
from haystack_integrations.document_stores.valkey import ValkeyDocumentStore
document_store = ValkeyDocumentStore(
nodes_list=[("localhost", 6379)],
index_name="my_documents",
embedding_dim=768,
distance_metric="cosine"
)
Running Valkey locally
For development and testing, you can start a Valkey server with Docker:
Then connect with the same initialization code above, using nodes_list=[("localhost", 6379)].
For more advanced configurations and clustering setups, refer to the Valkey documentation.
Writing documents
To write documents to your ValkeyDocumentStore, create an indexing pipeline or use the write_documents() method. You can use Converters, PreProcessors, and other integrations to fetch and prepare data. Below is an example that indexes Markdown files into Valkey.
Indexing pipeline
from haystack import Pipeline
from haystack.components.converters import MarkdownToDocument
from haystack.components.writers import DocumentWriter
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.components.preprocessors import DocumentSplitter
from haystack_integrations.document_stores.valkey import ValkeyDocumentStore
document_store = ValkeyDocumentStore(
nodes_list=[("localhost", 6379)],
index_name="my_documents",
embedding_dim=768,
distance_metric="cosine"
)
indexing = Pipeline()
indexing.add_component("converter", MarkdownToDocument())
indexing.add_component("splitter", DocumentSplitter(split_by="sentence", split_length=2))
indexing.add_component("embedder", SentenceTransformersDocumentEmbedder())
indexing.add_component("writer", DocumentWriter(document_store))
indexing.connect("converter", "splitter")
indexing.connect("splitter", "embedder")
indexing.connect("embedder", "writer")
indexing.run({"converter": {"sources": ["filename.md"]}})
Using Valkey in a RAG pipeline
Once documents are in your ValkeyDocumentStore, you can use ValkeyEmbeddingRetriever to retrieve them. The following example builds a RAG pipeline with a custom prompt:
from haystack import Pipeline
from haystack.utils import Secret
from haystack.dataclasses import ChatMessage
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack_integrations.document_stores.valkey import ValkeyDocumentStore
from haystack_integrations.components.retrievers.valkey import ValkeyEmbeddingRetriever
document_store = ValkeyDocumentStore(
nodes_list=[("localhost", 6379)],
index_name="my_documents",
embedding_dim=768,
distance_metric="cosine"
)
prompt_template = [
ChatMessage.from_system("Answer the question based on the provided context. If the context does not include an answer, reply with 'I don't know'."),
ChatMessage.from_user(
"Query: {{query}}\n"
"Documents:\n{% for doc in documents %}{{ doc.content }}\n{% endfor %}\n"
"Answer:",
),
]
query_pipeline = Pipeline()
query_pipeline.add_component("text_embedder", SentenceTransformersTextEmbedder())
query_pipeline.add_component("retriever", ValkeyEmbeddingRetriever(document_store=document_store))
query_pipeline.add_component("prompt_builder", ChatPromptBuilder(template=prompt_template, required_variables=["query", "documents"]))
query_pipeline.add_component("generator", OpenAIChatGenerator(api_key=Secret.from_token("YOUR_OPENAI_API_KEY"), model="gpt-4o"))
query_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
query_pipeline.connect("retriever.documents", "prompt_builder.documents")
query_pipeline.connect("prompt_builder.messages", "generator.messages")
query = "What is Valkey?"
results = query_pipeline.run(
{
"text_embedder": {"text": query},
"prompt_builder": {"query": query},
}
)
For more examples, see the examples folder in the repository.
Performance benefits
- In-memory storage: Fast read and write operations.
- High throughput: Handles many operations per second.
- Low latency: Minimal response times for document operations.
- Scalability: Supports clustering for horizontal scaling.
Supported Retrievers
ValkeyEmbeddingRetriever: Compares the query and document embeddings and fetches the documents most relevant to the query from the ValkeyDocumentStore.