OAuthTokenResolver
Resolves an OAuth access token at pipeline runtime and emits it for downstream components such as the SharePoint and Google Drive retrievers and fetchers.
| Most common position in a pipeline | At the start of a pipeline, feeding access_token into downstream components such as MSSharePointRetriever or GoogleDriveRetriever |
| Mandatory init variables | token_source: The strategy that resolves the access token, for example OAuthRefreshTokenSource |
| Mandatory run variables | None for config-only sources. subject_token: a controller-injected per-request credential, mandatory only when the source requires it (for example OAuthTokenExchangeSource) |
| Output variables | access_token: A bearer token string |
| API reference | OAuth |
| GitHub link | https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/oauth |
| Package name | oauth-haystack |
Overview
OAuthTokenResolver resolves an OAuth access token when the pipeline runs and emits it on the access_token output socket. Downstream components – such as MSSharePointRetriever, MSSharePointFetcher, GoogleDriveRetriever, and GoogleDriveFetcher – consume the token through a normal connection and never need to know how it was obtained.
The resolver itself is a thin wrapper. The actual work of getting a token is delegated to a pluggable token source that decides where the token comes from. This separation lets you swap authentication strategies (refresh-token grant, per-request token exchange, or a static long-lived token) without changing the rest of your pipeline.
Token sources
You pass a token source to the resolver through the token_source parameter. All sources are importable from haystack_integrations.utils.oauth.
| Source | Use it when | Per-request input |
|---|---|---|
OAuthRefreshTokenSource | You have a single, fixed identity backed by a stored refresh token and want the source to exchange it for short-lived access tokens and cache them. | None |
OAuthTokenExchangeSource | You serve multiple users (or run multiple replicas) and want to exchange an incoming per-request user assertion for a downstream token, with no persistent storage. Implements RFC 8693 token exchange and Microsoft's on-behalf-of flow. | subject_token |
OAuthStaticTokenSource | Your provider issues a non-expiring token that you manage out of band (for example Slack or Notion). | None |
When the configured source needs a per-request credential (OAuthTokenExchangeSource sets requires_subject_token = True), the resolver declares a mandatory subject_token run input. This is a controller-injected credential – for example an incoming user assertion – not a value chosen by an end user. For config-only sources (OAuthRefreshTokenSource, OAuthStaticTokenSource), the resolver declares no run input and acts as a source node.
The OAuth scopes you request depend on the downstream service. For Microsoft Graph, that means scopes such as https://graph.microsoft.com/Files.Read.All; for Google Drive, scopes such as https://www.googleapis.com/auth/drive.readonly. Always consult your identity provider's documentation for the exact scope values.
Installation
Install the OAuth integration with:
Usage
On its own
Resolve a token with a stored refresh token using OAuthRefreshTokenSource. The refresh token is read from an environment variable through the Secret API:
from haystack.utils import Secret
from haystack_integrations.components.connectors.oauth import OAuthTokenResolver
from haystack_integrations.utils.oauth import OAuthRefreshTokenSource
resolver = OAuthTokenResolver(
token_source=OAuthRefreshTokenSource(
token_url="https://login.microsoftonline.com/common/oauth2/v2.0/token",
client_id="aaa-bbb-ccc",
refresh_token=Secret.from_env_var("MS_REFRESH_TOKEN"),
scopes=[
"https://graph.microsoft.com/Files.Read.All",
"offline_access",
],
),
)
access_token = resolver.run()["access_token"]
For a provider that issues long-lived, non-expiring tokens, use OAuthStaticTokenSource instead:
from haystack.utils import Secret
from haystack_integrations.components.connectors.oauth import OAuthTokenResolver
from haystack_integrations.utils.oauth import OAuthStaticTokenSource
resolver = OAuthTokenResolver(
token_source=OAuthStaticTokenSource(token=Secret.from_env_var("SERVICE_TOKEN")),
)
access_token = resolver.run()["access_token"]
For multi-user backends, use OAuthTokenExchangeSource. The resolver then requires a per-request subject_token:
from haystack_integrations.components.connectors.oauth import OAuthTokenResolver
from haystack_integrations.utils.oauth import OAuthTokenExchangeSource
resolver = OAuthTokenResolver(
token_source=OAuthTokenExchangeSource(
token_url="https://login.microsoftonline.com/<tenant>/oauth2/v2.0/token",
client_id="aaa-bbb-ccc",
subject_token_param="assertion",
grant_type="urn:ietf:params:oauth:grant-type:jwt-bearer",
scopes=["https://graph.microsoft.com/Files.Read.All"],
extra_token_params={"requested_token_use": "on_behalf_of"},
),
)
# `subject_token` is the incoming per-request user assertion, injected by your application.
access_token = resolver.run(subject_token="<incoming-user-assertion>")["access_token"]
In a pipeline
In a pipeline, connect the resolver's access_token output to the access_token input of one or more downstream components. The example below wires the resolver into a MSSharePointRetriever so that searching SharePoint requires only a query at runtime:
from haystack import Pipeline
from haystack.utils import Secret
from haystack_integrations.components.connectors.oauth import OAuthTokenResolver
from haystack_integrations.utils.oauth import OAuthRefreshTokenSource
from haystack_integrations.components.retrievers.microsoft_sharepoint import (
MSSharePointRetriever,
)
pipeline = Pipeline()
pipeline.add_component(
"resolver",
OAuthTokenResolver(
token_source=OAuthRefreshTokenSource(
token_url="https://login.microsoftonline.com/common/oauth2/v2.0/token",
client_id="aaa-bbb-ccc",
refresh_token=Secret.from_env_var("MS_REFRESH_TOKEN"),
scopes=[
"https://graph.microsoft.com/Files.Read.All",
"https://graph.microsoft.com/Sites.Read.All",
"offline_access",
],
),
),
)
pipeline.add_component("retriever", MSSharePointRetriever(top_k=5))
pipeline.connect("resolver.access_token", "retriever.access_token")
result = pipeline.run({"retriever": {"query": "quarterly roadmap"}})
documents = result["retriever"]["documents"]
A single access_token output can be connected to several downstream inputs. For a full retrieve-then-fetch pipeline that feeds the same token to both a retriever and a fetcher, see the MSSharePointFetcher and GoogleDriveFetcher pages.