Tool-using agents with provider-agnostic chat model support.
Module haystack_experimental.components.agents.agent
Agent
A Haystack component that implements a tool-using agent with provider-agnostic chat model support.
NOTE: This class extends Haystack's Agent component to add support for human-in-the-loop confirmation strategies.
The component processes messages and executes tools until an exit condition is met. The exit condition can be triggered either by a direct text response or by invoking a specific designated tool. Multiple exit conditions can be specified.
When you call an Agent without tools, it acts as a ChatGenerator, produces one response, then exits.
Usage example
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack.tools.tool import Tool
from haystack_experimental.components.agents import Agent
from haystack_experimental.components.agents.human_in_the_loop import (
HumanInTheLoopStrategy,
AlwaysAskPolicy,
NeverAskPolicy,
SimpleConsoleUI,
)
calculator_tool = Tool(name="calculator", description="A tool for performing mathematical calculations.", ...)
search_tool = Tool(name="search", description="A tool for searching the web.", ...)
agent = Agent(
chat_generator=OpenAIChatGenerator(),
tools=[calculator_tool, search_tool],
confirmation_strategies={
calculator_tool.name: HumanInTheLoopStrategy(
confirmation_policy=NeverAskPolicy(), confirmation_ui=SimpleConsoleUI()
),
search_tool.name: HumanInTheLoopStrategy(
confirmation_policy=AlwaysAskPolicy(), confirmation_ui=SimpleConsoleUI()
),
},
)
# Run the agent
result = agent.run(
messages=[ChatMessage.from_user("Find information about Haystack")]
)
assert "messages" in result # Contains conversation history
Agent.__init__
def __init__(*,
chat_generator: ChatGenerator,
tools: Optional[Union[list[Tool], Toolset]] = None,
system_prompt: Optional[str] = None,
exit_conditions: Optional[list[str]] = None,
state_schema: Optional[dict[str, Any]] = None,
max_agent_steps: int = 100,
streaming_callback: Optional[StreamingCallbackT] = None,
raise_on_tool_invocation_failure: bool = False,
confirmation_strategies: Optional[dict[
str, ConfirmationStrategy]] = None,
tool_invoker_kwargs: Optional[dict[str, Any]] = None) -> None
Initialize the agent component.
Arguments:
chat_generator
: An instance of the chat generator that your agent should use. It must support tools.tools
: List of Tool objects or a Toolset that the agent can use.system_prompt
: System prompt for the agent.exit_conditions
: List of conditions that will cause the agent to return. Can include "text" if the agent should return when it generates a message without tool calls, or tool names that will cause the agent to return once the tool was executed. Defaults to ["text"].state_schema
: The schema for the runtime state used by the tools.max_agent_steps
: Maximum number of steps the agent will run before stopping. Defaults to 100. If the agent exceeds this number of steps, it will stop and return the current state.streaming_callback
: A callback that will be invoked when a response is streamed from the LLM. The same callback can be configured to emit tool results when a tool is called.raise_on_tool_invocation_failure
: Should the agent raise an exception when a tool invocation fails? If set to False, the exception will be turned into a chat message and passed to the LLM.tool_invoker_kwargs
: Additional keyword arguments to pass to the ToolInvoker.
Raises:
TypeError
: If the chat_generator does not support tools parameter in its run method.ValueError
: If the exit_conditions are not valid.
Agent.run
def run(messages: list[ChatMessage],
streaming_callback: Optional[StreamingCallbackT] = None,
*,
break_point: Optional[AgentBreakpoint] = None,
snapshot: Optional[AgentSnapshot] = None,
system_prompt: Optional[str] = None,
tools: Optional[Union[list[Tool], Toolset, list[str]]] = None,
**kwargs: Any) -> dict[str, Any]
Process messages and execute tools until an exit condition is met.
Arguments:
messages
: List of Haystack ChatMessage objects to process.streaming_callback
: A callback that will be invoked when a response is streamed from the LLM. The same callback can be configured to emit tool results when a tool is called.break_point
: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint for "tool_invoker".snapshot
: A dictionary containing a snapshot of a previously saved agent execution. The snapshot contains the relevant information to restart the Agent execution from where it left off.system_prompt
: System prompt for the agent. If provided, it overrides the default system prompt.tools
: Optional list of Tool objects, a Toolset, or list of tool names to use for this run. When passing tool names, tools are selected from the Agent's originally configured tools.kwargs
: Additional data to pass to the State schema used by the Agent. The keys must match the schema defined in the Agent'sstate_schema
.
Raises:
RuntimeError
: If the Agent component wasn't warmed up before callingrun()
.BreakpointException
: If an agent breakpoint is triggered.
Returns:
A dictionary with the following keys:
- "messages": List of all messages exchanged during the agent's run.
- "last_message": The last message exchanged during the agent's run.
- Any additional keys defined in the
state_schema
.
Agent.run_async
async def run_async(messages: list[ChatMessage],
streaming_callback: Optional[StreamingCallbackT] = None,
*,
break_point: Optional[AgentBreakpoint] = None,
snapshot: Optional[AgentSnapshot] = None,
system_prompt: Optional[str] = None,
tools: Optional[Union[list[Tool], Toolset,
list[str]]] = None,
**kwargs: Any) -> dict[str, Any]
Asynchronously process messages and execute tools until the exit condition is met.
This is the asynchronous version of the run
method. It follows the same logic but uses
asynchronous operations where possible, such as calling the run_async
method of the ChatGenerator
if available.
Arguments:
messages
: List of Haystack ChatMessage objects to process.streaming_callback
: An asynchronous callback that will be invoked when a response is streamed from the LLM. The same callback can be configured to emit tool results when a tool is called.break_point
: An AgentBreakpoint, can be a Breakpoint for the "chat_generator" or a ToolBreakpoint for "tool_invoker".snapshot
: A dictionary containing a snapshot of a previously saved agent execution. The snapshot contains the relevant information to restart the Agent execution from where it left off.system_prompt
: System prompt for the agent. If provided, it overrides the default system prompt.tools
: Optional list of Tool objects, a Toolset, or list of tool names to use for this run.kwargs
: Additional data to pass to the State schema used by the Agent. The keys must match the schema defined in the Agent'sstate_schema
.
Raises:
RuntimeError
: If the Agent component wasn't warmed up before callingrun_async()
.BreakpointException
: If an agent breakpoint is triggered.
Returns:
A dictionary with the following keys:
- "messages": List of all messages exchanged during the agent's run.
- "last_message": The last message exchanged during the agent's run.
- Any additional keys defined in the
state_schema
.
Agent.to_dict
def to_dict() -> dict[str, Any]
Serialize the component to a dictionary.
Returns:
Dictionary with serialized data
Agent.from_dict
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Agent"
Deserialize the agent from a dictionary.
Arguments:
data
: Dictionary to deserialize from
Returns:
Deserialized agent
Module haystack_experimental.components.agents.human_in_the_loop.breakpoint
get_tool_calls_and_descriptions_from_snapshot
def get_tool_calls_and_descriptions_from_snapshot(
agent_snapshot: AgentSnapshot,
breakpoint_tool_only: bool = True
) -> tuple[list[dict], dict[str, str]]
Extract tool calls and tool descriptions from an AgentSnapshot.
By default, only the tool call that caused the breakpoint is processed and its arguments are reconstructed. This is useful for scenarios where you want to present the relevant tool call and its description to a human for confirmation before execution.
Arguments:
agent_snapshot
: The AgentSnapshot from which to extract tool calls and descriptions.breakpoint_tool_only
: If True, only the tool call that caused the breakpoint is returned. If False, all tool calls are returned.
Returns:
A tuple containing a list of tool call dictionaries and a dictionary of tool descriptions
Module haystack_experimental.components.agents.human_in_the_loop.dataclasses
ConfirmationUIResult
Result of the confirmation UI interaction.
Arguments:
action
: The action taken by the user such as "confirm", "reject", or "modify". This action type is not enforced to allow for custom actions to be implemented.feedback
: Optional feedback message from the user. For example, if the user rejects the tool execution, they might provide a reason for the rejection.new_tool_params
: Optional set of new parameters for the tool. For example, if the user chooses to modify the tool parameters, they can provide a new set of parameters here.
action
"confirm", "reject", "modify"
ToolExecutionDecision
Decision made regarding tool execution.
Arguments:
tool_name
: The name of the tool to be executed.execute
: A boolean indicating whether to execute the tool with the provided parameters.tool_call_id
: Optional unique identifier for the tool call. This can be used to track and correlate the decision with a specific tool invocation.feedback
: Optional feedback message. For example, if the tool execution is rejected, this can contain the reason. Or if the tool parameters were modified, this can contain the modification details.final_tool_params
: Optional final parameters for the tool if execution is confirmed or modified.
ToolExecutionDecision.to_dict
def to_dict() -> dict[str, Any]
Convert the ToolExecutionDecision to a dictionary representation.
Returns:
A dictionary containing the tool execution decision details.
ToolExecutionDecision.from_dict
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ToolExecutionDecision"
Populate the ToolExecutionDecision from a dictionary representation.
Arguments:
data
: A dictionary containing the tool execution decision details.
Returns:
An instance of ToolExecutionDecision.
Module haystack_experimental.components.agents.human_in_the_loop.errors
HITLBreakpointException
Exception raised when a tool execution is paused by a ConfirmationStrategy (e.g. BreakpointConfirmationStrategy).
HITLBreakpointException.__init__
def __init__(message: str,
tool_name: str,
snapshot_file_path: str,
tool_call_id: Optional[str] = None) -> None
Initialize the HITLBreakpointException.
Arguments:
message
: The exception message.tool_name
: The name of the tool whose execution is paused.snapshot_file_path
: The file path to the saved pipeline snapshot.tool_call_id
: Optional unique identifier for the tool call. This can be used to track and correlate the decision with a specific tool invocation.
Module haystack_experimental.components.agents.human_in_the_loop.policies
AlwaysAskPolicy
Always ask for confirmation.
AlwaysAskPolicy.should_ask
def should_ask(tool_name: str, tool_description: str,
tool_params: dict[str, Any]) -> bool
Always ask for confirmation before executing the tool.
Arguments:
tool_name
: The name of the tool to be executed.tool_description
: The description of the tool.tool_params
: The parameters to be passed to the tool.
Returns:
Always returns True, indicating confirmation is needed.
NeverAskPolicy
Never ask for confirmation.
NeverAskPolicy.should_ask
def should_ask(tool_name: str, tool_description: str,
tool_params: dict[str, Any]) -> bool
Never ask for confirmation, always proceed with tool execution.
Arguments:
tool_name
: The name of the tool to be executed.tool_description
: The description of the tool.tool_params
: The parameters to be passed to the tool.
Returns:
Always returns False, indicating no confirmation is needed.
AskOncePolicy
Ask only once per tool with specific parameters.
AskOncePolicy.should_ask
def should_ask(tool_name: str, tool_description: str,
tool_params: dict[str, Any]) -> bool
Ask for confirmation only once per tool with specific parameters.
Arguments:
tool_name
: The name of the tool to be executed.tool_description
: The description of the tool.tool_params
: The parameters to be passed to the tool.
Returns:
True if confirmation is needed, False if already asked with the same parameters.
AskOncePolicy.update_after_confirmation
def update_after_confirmation(
tool_name: str, tool_description: str, tool_params: dict[str, Any],
confirmation_result: ConfirmationUIResult) -> None
Store the tool and parameters if the action was "confirm" to avoid asking again.
This method updates the internal state to remember that the user has already confirmed the execution of the tool with the given parameters.
Arguments:
tool_name
: The name of the tool that was executed.tool_description
: The description of the tool.tool_params
: The parameters that were passed to the tool.confirmation_result
: The result from the confirmation UI.
Module haystack_experimental.components.agents.human_in_the_loop.strategies
BlockingConfirmationStrategy
Confirmation strategy that blocks execution to gather user feedback.
BlockingConfirmationStrategy.__init__
def __init__(confirmation_policy: ConfirmationPolicy,
confirmation_ui: ConfirmationUI) -> None
Initialize the BlockingConfirmationStrategy with a confirmation policy and UI.
Arguments:
confirmation_policy
: The confirmation policy to determine when to ask for user confirmation.confirmation_ui
: The user interface to interact with the user for confirmation.
BlockingConfirmationStrategy.run
def run(tool_name: str,
tool_description: str,
tool_params: dict[str, Any],
tool_call_id: Optional[str] = None) -> ToolExecutionDecision
Run the human-in-the-loop strategy for a given tool and its parameters.
Arguments:
tool_name
: The name of the tool to be executed.tool_description
: The description of the tool.tool_params
: The parameters to be passed to the tool.tool_call_id
: Optional unique identifier for the tool call. This can be used to track and correlate the decision with a specific tool invocation.
Returns:
A ToolExecutionDecision indicating whether to execute the tool with the given parameters, or a feedback message if rejected.
BlockingConfirmationStrategy.to_dict
def to_dict() -> dict[str, Any]
Serializes the BlockingConfirmationStrategy to a dictionary.
Returns:
Dictionary with serialized data.
BlockingConfirmationStrategy.from_dict
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "BlockingConfirmationStrategy"
Deserializes the BlockingConfirmationStrategy from a dictionary.
Arguments:
data
: Dictionary to deserialize from.
Returns:
Deserialized BlockingConfirmationStrategy.
BreakpointConfirmationStrategy
Confirmation strategy that raises a tool breakpoint exception to pause execution and gather user feedback.
This strategy is designed for scenarios where immediate user interaction is not possible.
When a tool execution requires confirmation, it raises an HITLBreakpointException
, which is caught by the Agent.
The Agent then serialize its current state, including the tool call details. This information can then be used to
notify a user to review and confirm the tool execution.
BreakpointConfirmationStrategy.__init__
def __init__(snapshot_file_path: str) -> None
Initialize the BreakpointConfirmationStrategy.
Arguments:
snapshot_file_path
: The path to the directory that the snapshot should be saved.
BreakpointConfirmationStrategy.run
def run(tool_name: str,
tool_description: str,
tool_params: dict[str, Any],
tool_call_id: Optional[str] = None) -> ToolExecutionDecision
Run the breakpoint confirmation strategy for a given tool and its parameters.
Arguments:
tool_name
: The name of the tool to be executed.tool_description
: The description of the tool.tool_params
: The parameters to be passed to the tool.tool_call_id
: Optional unique identifier for the tool call. This can be used to track and correlate the decision with a specific tool invocation.
Raises:
HITLBreakpointException
: Always raises anHITLBreakpointException
exception to signal that user confirmation is required.
Returns:
This method does not return; it always raises an exception.
BreakpointConfirmationStrategy.to_dict
def to_dict() -> dict[str, Any]
Serializes the BreakpointConfirmationStrategy to a dictionary.
BreakpointConfirmationStrategy.from_dict
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "BreakpointConfirmationStrategy"
Deserializes the BreakpointConfirmationStrategy from a dictionary.
Arguments:
data
: Dictionary to deserialize from.
Returns:
Deserialized BreakpointConfirmationStrategy.
Module haystack_experimental.components.agents.human_in_the_loop.types
ConfirmationUI
Base class for confirmation UIs.
ConfirmationUI.get_user_confirmation
def get_user_confirmation(tool_name: str, tool_description: str,
tool_params: dict[str, Any]) -> ConfirmationUIResult
Get user confirmation for tool execution.
ConfirmationUI.to_dict
def to_dict() -> dict[str, Any]
Serialize the UI to a dictionary.
ConfirmationUI.from_dict
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ConfirmationUI"
Deserialize the ConfirmationUI from a dictionary.
ConfirmationPolicy
Base class for confirmation policies.
ConfirmationPolicy.should_ask
def should_ask(tool_name: str, tool_description: str,
tool_params: dict[str, Any]) -> bool
Determine whether to ask for confirmation.
ConfirmationPolicy.update_after_confirmation
def update_after_confirmation(
tool_name: str, tool_description: str, tool_params: dict[str, Any],
confirmation_result: ConfirmationUIResult) -> None
Update the policy based on the confirmation UI result.
ConfirmationPolicy.to_dict
def to_dict() -> dict[str, Any]
Serialize the policy to a dictionary.
ConfirmationPolicy.from_dict
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ConfirmationPolicy"
Deserialize the policy from a dictionary.
ConfirmationStrategy
ConfirmationStrategy.run
def run(tool_name: str,
tool_description: str,
tool_params: dict[str, Any],
tool_call_id: Optional[str] = None) -> ToolExecutionDecision
Run the confirmation strategy for a given tool and its parameters.
Arguments:
tool_name
: The name of the tool to be executed.tool_description
: The description of the tool.tool_params
: The parameters to be passed to the tool.tool_call_id
: Optional unique identifier for the tool call. This can be used to track and correlate the decision with a specific tool invocation.
Returns:
The result of the confirmation strategy (e.g., tool output, rejection message, etc.).
ConfirmationStrategy.to_dict
def to_dict() -> dict[str, Any]
Serialize the strategy to a dictionary.
ConfirmationStrategy.from_dict
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "ConfirmationStrategy"
Deserialize the strategy from a dictionary.
Module haystack_experimental.components.agents.human_in_the_loop.user_interfaces
RichConsoleUI
Rich console interface for user interaction.
RichConsoleUI.get_user_confirmation
def get_user_confirmation(tool_name: str, tool_description: str,
tool_params: dict[str, Any]) -> ConfirmationUIResult
Get user confirmation for tool execution via rich console prompts.
Arguments:
tool_name
: The name of the tool to be executed.tool_description
: The description of the tool.tool_params
: The parameters to be passed to the tool.
Returns:
ConfirmationUIResult based on user input.
RichConsoleUI.to_dict
def to_dict() -> dict[str, Any]
Serializes the RichConsoleConfirmationUI to a dictionary.
Returns:
Dictionary with serialized data.
SimpleConsoleUI
Simple console interface using standard input/output.
SimpleConsoleUI.get_user_confirmation
def get_user_confirmation(tool_name: str, tool_description: str,
tool_params: dict[str, Any]) -> ConfirmationUIResult
Get user confirmation for tool execution via simple console prompts.
Arguments:
tool_name
: The name of the tool to be executed.tool_description
: The description of the tool.tool_params
: The parameters to be passed to the tool.