OpenSearchSQLRetriever
Executes raw OpenSearch SQL queries against an OpenSearch Document Store and returns the raw JSON response.
| Most common position in a pipeline | Standalone, or anywhere you need to fetch metadata, aggregations, or other structured data |
| Mandatory init variables | document_store: An instance of OpenSearchDocumentStore |
| Mandatory run variables | query: An OpenSearch SQL query string |
| Output variables | result: A dictionary with the raw JSON response from the OpenSearch SQL API |
| API reference | OpenSearch |
| GitHub link | https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/opensearch |
| Package name | opensearch-haystack |
Overview
OpenSearchSQLRetriever lets you run OpenSearch SQL queries directly against an OpenSearchDocumentStore. Instead of matching a query against documents like the OpenSearchBM25Retriever or OpenSearchEmbeddingRetriever, it executes a SQL statement and returns the raw JSON response from the OpenSearch SQL API.
This is useful when you need structured access to your index at runtime, for example to fetch specific fields, filter on metadata, or compute aggregations such as counts and averages.
Unlike the other OpenSearch retrievers, this component does not return a list of Document objects. The output is a single result dictionary, where result["result"] holds the raw OpenSearch response. Depending on the query:
- For regular queries,
result["result"]["hits"]["hits"]contains the matching documents. - For aggregate queries,
result["result"]["aggregations"]contains the aggregations.
The component accepts two optional parameters at initialization:
raise_on_failure: ifTrue(the default), an exception is raised when the SQL API call fails. IfFalse, the error is logged as a warning and the result is empty.fetch_size: the number of results to fetch per page. If not set, the default fetch size configured in OpenSearch is used.
Installation
Install OpenSearch and then start an instance.
If you have Docker set up, we recommend pulling the Docker image and running it.
As an alternative, you can go to OpenSearch integration GitHub and start a Docker container running OpenSearch using the provided docker-compose.yml:
Once you have a running OpenSearch instance, install the opensearch-haystack integration:
Usage
On its own
Write a few documents to an index, then run a SQL query against it. The example below selects the content field from the index and reads the returned hits:
from haystack import Document
from haystack_integrations.components.retrievers.opensearch import (
OpenSearchSQLRetriever,
)
from haystack_integrations.document_stores.opensearch import (
OpenSearchDocumentStore,
)
from haystack.document_stores.types import DuplicatePolicy
document_store = OpenSearchDocumentStore(hosts="http://localhost:9200", index="my_index")
documents = [
Document(content="There are over 7,000 languages spoken around the world today."),
Document(
content="Elephants have been observed to behave in a way that indicates a high level of self-awareness, such as recognizing themselves in mirrors.",
),
Document(
content="In certain parts of the world, like the Maldives, Puerto Rico, and San Diego, you can witness the phenomenon of bioluminescent waves.",
),
]
# DuplicatePolicy.SKIP is optional, but useful to run the script multiple times without throwing errors
document_store.write_documents(documents=documents, policy=DuplicatePolicy.SKIP)
retriever = OpenSearchSQLRetriever(document_store=document_store)
output = retriever.run(query="SELECT content FROM my_index LIMIT 10")
result = output["result"]
# Regular queries return matching documents under hits
for hit in result["hits"]["hits"]:
print(hit["_source"])
Running an aggregation query
Because the component returns the raw SQL response, you can use it for aggregations that the document-based retrievers don't support, such as counting documents:
retriever = OpenSearchSQLRetriever(document_store=document_store)
output = retriever.run(query="SELECT COUNT(*) AS doc_count FROM my_index")
result = output["result"]
# Aggregate queries expose their results under aggregations
print(result["aggregations"])
To avoid raising an exception on a malformed or failing query, initialize the component with raise_on_failure=False. In that case, a failed query logs a warning and returns an empty result instead.