Cohere integration for Haystack
Module haystack_integrations.components.embedders.cohere.document_embedder
CohereDocumentEmbedder
A component for computing Document embeddings using Cohere models.
The embedding of each Document is stored in the embedding
field of the Document.
Usage example:
from haystack import Document
from cohere_haystack.embedders.document_embedder import CohereDocumentEmbedder
doc = Document(content="I love pizza!")
document_embedder = CohereDocumentEmbedder()
result = document_embedder.run([doc])
print(result['documents'][0].embedding)
# [-0.453125, 1.2236328, 2.0058594, ...]
CohereDocumentEmbedder.__init__
def __init__(api_key: Secret = Secret.from_env_var(
["COHERE_API_KEY", "CO_API_KEY"]),
model: str = "embed-english-v2.0",
input_type: str = "search_document",
api_base_url: str = "https://api.cohere.com",
truncate: str = "END",
use_async_client: bool = False,
timeout: int = 120,
batch_size: int = 32,
progress_bar: bool = True,
meta_fields_to_embed: Optional[List[str]] = None,
embedding_separator: str = "\n",
embedding_type: Optional[EmbeddingTypes] = None)
Arguments:
api_key
: the Cohere API key.model
: the name of the model to use. Supported Models are:"embed-english-v3.0"
,"embed-english-light-v3.0"
,"embed-multilingual-v3.0"
,"embed-multilingual-light-v3.0"
,"embed-english-v2.0"
,"embed-english-light-v2.0"
,"embed-multilingual-v2.0"
. This list of all supported models can be found in the model documentation.input_type
: specifies the type of input you're giving to the model. Supported values are "search_document", "search_query", "classification" and "clustering". Not required for older versions of the embedding models (meaning anything lower than v3), but is required for more recent versions (meaning anything bigger than v2).api_base_url
: the Cohere API Base url.truncate
: truncate embeddings that are too long from start or end, ("NONE"|"START"|"END"). Passing "START" will discard the start of the input. "END" will discard the end of the input. In both cases, input is discarded until the remaining input is exactly the maximum input token length for the model. If "NONE" is selected, when the input exceeds the maximum input token length an error will be returned.use_async_client
: flag to select the AsyncClient. It is recommended to use AsyncClient for applications with many concurrent calls.timeout
: request timeout in seconds.batch_size
: number of Documents to encode at once.progress_bar
: whether to show a progress bar or not. Can be helpful to disable in production deployments to keep the logs clean.meta_fields_to_embed
: list of meta fields that should be embedded along with the Document text.embedding_separator
: separator used to concatenate the meta fields to the Document text.embedding_type
: the type of embeddings to return. Defaults to float embeddings. Note that int8, uint8, binary, and ubinary are only valid for v3 models.
CohereDocumentEmbedder.to_dict
def to_dict() -> Dict[str, Any]
Serializes the component to a dictionary.
Returns:
Dictionary with serialized data.
CohereDocumentEmbedder.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "CohereDocumentEmbedder"
Deserializes the component from a dictionary.
Arguments:
data
: Dictionary to deserialize from.
Returns:
Deserialized component.
CohereDocumentEmbedder.run
@component.output_types(documents=List[Document], meta=Dict[str, Any])
def run(documents: List[Document])
Embed a list of Documents
.
Arguments:
documents
: documents to embed.
Raises:
TypeError
: if the input is not a list ofDocuments
.
Returns:
A dictionary with the following keys:
documents
: documents with theembedding
field set.meta
: metadata about the embedding process.
Module haystack_integrations.components.embedders.cohere.text_embedder
CohereTextEmbedder
A component for embedding strings using Cohere models.
Usage example:
from haystack_integrations.components.embedders.cohere import CohereDocumentEmbedder
text_to_embed = "I love pizza!"
text_embedder = CohereTextEmbedder()
print(text_embedder.run(text_to_embed))
# {'embedding': [-0.453125, 1.2236328, 2.0058594, ...]
# 'meta': {'api_version': {'version': '1'}, 'billed_units': {'input_tokens': 4}}}
CohereTextEmbedder.__init__
def __init__(api_key: Secret = Secret.from_env_var(
["COHERE_API_KEY", "CO_API_KEY"]),
model: str = "embed-english-v2.0",
input_type: str = "search_query",
api_base_url: str = "https://api.cohere.com",
truncate: str = "END",
use_async_client: bool = False,
timeout: int = 120,
embedding_type: Optional[EmbeddingTypes] = None)
Arguments:
api_key
: the Cohere API key.model
: the name of the model to use. Supported Models are:"embed-english-v3.0"
,"embed-english-light-v3.0"
,"embed-multilingual-v3.0"
,"embed-multilingual-light-v3.0"
,"embed-english-v2.0"
,"embed-english-light-v2.0"
,"embed-multilingual-v2.0"
. This list of all supported models can be found in the model documentation.input_type
: specifies the type of input you're giving to the model. Supported values are "search_document", "search_query", "classification" and "clustering". Not required for older versions of the embedding models (meaning anything lower than v3), but is required for more recent versions (meaning anything bigger than v2).api_base_url
: the Cohere API Base url.truncate
: truncate embeddings that are too long from start or end, ("NONE"|"START"|"END"). Passing "START" will discard the start of the input. "END" will discard the end of the input. In both cases, input is discarded until the remaining input is exactly the maximum input token length for the model. If "NONE" is selected, when the input exceeds the maximum input token length an error will be returned.use_async_client
: flag to select the AsyncClient. It is recommended to use AsyncClient for applications with many concurrent calls.timeout
: request timeout in seconds.embedding_type
: the type of embeddings to return. Defaults to float embeddings. Note that int8, uint8, binary, and ubinary are only valid for v3 models.
CohereTextEmbedder.to_dict
def to_dict() -> Dict[str, Any]
Serializes the component to a dictionary.
Returns:
Dictionary with serialized data.
CohereTextEmbedder.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "CohereTextEmbedder"
Deserializes the component from a dictionary.
Arguments:
data
: Dictionary to deserialize from.
Returns:
Deserialized component.
CohereTextEmbedder.run
@component.output_types(embedding=List[float], meta=Dict[str, Any])
def run(text: str)
Embed text.
Arguments:
text
: the text to embed.
Raises:
TypeError
: If the input is not a string.
Returns:
A dictionary with the following keys:
embedding
: the embedding of the text.meta
: metadata about the request.
Module haystack_integrations.components.embedders.cohere.utils
get_async_response
async def get_async_response(cohere_async_client: AsyncClientV2,
texts: List[str],
model_name,
input_type,
truncate,
embedding_type: Optional[EmbeddingTypes] = None)
Embeds a list of texts asynchronously using the Cohere API.
Arguments:
cohere_async_client
: the CohereAsyncClient
texts
: the texts to embedmodel_name
: the name of the model to useinput_type
: one of "classification", "clustering", "search_document", "search_query". The type of input text provided to embed.truncate
: one of "NONE", "START", "END". How the API handles text longer than the maximum token length.embedding_type
: the type of embeddings to return. Defaults to float embeddings.
Raises:
ValueError
: If an error occurs while querying the Cohere API.
Returns:
A tuple of the embeddings and metadata.
get_response
def get_response(
cohere_client: ClientV2,
texts: List[str],
model_name,
input_type,
truncate,
batch_size=32,
progress_bar=False,
embedding_type: Optional[EmbeddingTypes] = None
) -> Tuple[List[List[float]], Dict[str, Any]]
Embeds a list of texts using the Cohere API.
Arguments:
cohere_client
: the CohereClient
texts
: the texts to embedmodel_name
: the name of the model to useinput_type
: one of "classification", "clustering", "search_document", "search_query". The type of input text provided to embed.truncate
: one of "NONE", "START", "END". How the API handles text longer than the maximum token length.batch_size
: the batch size to useprogress_bar
: ifTrue
, show a progress barembedding_type
: the type of embeddings to return. Defaults to float embeddings.
Raises:
ValueError
: If an error occurs while querying the Cohere API.
Returns:
A tuple of the embeddings and metadata.
Module haystack_integrations.components.generators.cohere.generator
CohereGenerator
Generates text using Cohere's models through Cohere's generate
endpoint.
NOTE: Cohere discontinued the generate
API, so this generator is a mere wrapper
around CohereChatGenerator
provided for backward compatibility.
Usage example
from haystack_integrations.components.generators.cohere import CohereGenerator
generator = CohereGenerator(api_key="test-api-key")
generator.run(prompt="What's the capital of France?")
CohereGenerator.__init__
def __init__(api_key: Secret = Secret.from_env_var(
["COHERE_API_KEY", "CO_API_KEY"]),
model: str = "command-r",
streaming_callback: Optional[Callable] = None,
api_base_url: Optional[str] = None,
**kwargs)
Instantiates a CohereGenerator
component.
Arguments:
api_key
: Cohere API key.model
: Cohere model to use for generation.streaming_callback
: Callback function that is called when a new token is received from the stream. The callback function accepts StreamingChunk as an argument.api_base_url
: Cohere base URL.**kwargs
: Additional arguments passed to the model. These arguments are specific to the model. You can check them in model's documentation.
CohereGenerator.run
@component.output_types(replies=List[str], meta=List[Dict[str, Any]])
def run(prompt: str)
Queries the LLM with the prompts to produce replies.
Arguments:
prompt
: the prompt to be sent to the generative model.
Returns:
A dictionary with the following keys:
replies
: A list of replies generated by the model.meta
: Information about the request.
Module haystack_integrations.components.generators.cohere.chat.chat_generator
CohereChatGenerator
Completes chats using Cohere's models using Cohere cohere.ClientV2 chat
endpoint.
You can customize how the chat response is generated by passing parameters to the
Cohere API through the **generation_kwargs
parameter. You can do this when
initializing or running the component. Any parameter that works with
cohere.ClientV2.chat
will work here too.
For details, see Cohere API.
Below is an example of how to use the component:
Simple example
from haystack.dataclasses import ChatMessage
from haystack.utils import Secret
from haystack_integrations.components.generators.cohere import CohereChatGenerator
client = CohereChatGenerator(model="command-r", api_key=Secret.from_env_var("COHERE_API_KEY"))
messages = [ChatMessage.from_user("What's Natural Language Processing?")]
client.run(messages)
# Output: {'replies': [ChatMessage(_role=<ChatRole.ASSISTANT: 'assistant'>,
# _content=[TextContent(text='Natural Language Processing (NLP) is an interdisciplinary...
Advanced example
CohereChatGenerator can be integrated into pipelines and supports Haystack's tooling architecture, enabling tools to be invoked seamlessly across various generators.
from haystack import Pipeline
from haystack.dataclasses import ChatMessage
from haystack.components.tools import ToolInvoker
from haystack.tools import Tool
from haystack_integrations.components.generators.cohere import CohereChatGenerator
# Create a weather tool
def weather(city: str) -> str:
return f"The weather in {city} is sunny and 32°C"
weather_tool = Tool(
name="weather",
description="useful to determine the weather in a given location",
parameters={
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "The name of the city to get weather for, e.g. Paris, London",
}
},
"required": ["city"],
},
function=weather,
)
# Create and set up the pipeline
pipeline = Pipeline()
pipeline.add_component("generator", CohereChatGenerator(model="command-r", tools=[weather_tool]))
pipeline.add_component("tool_invoker", ToolInvoker(tools=[weather_tool]))
pipeline.connect("generator", "tool_invoker")
# Run the pipeline with a weather query
results = pipeline.run(
data={"generator": {"messages": [ChatMessage.from_user("What's the weather like in Paris?")]}}
)
# The tool result will be available in the pipeline output
print(results["tool_invoker"]["tool_messages"][0].tool_call_result.result)
# Output: "The weather in Paris is sunny and 32°C"
CohereChatGenerator.__init__
def __init__(api_key: Secret = Secret.from_env_var(
["COHERE_API_KEY", "CO_API_KEY"]),
model: str = "command-r",
streaming_callback: Optional[Callable[[StreamingChunk],
None]] = None,
api_base_url: Optional[str] = None,
generation_kwargs: Optional[Dict[str, Any]] = None,
tools: Optional[List[Tool]] = None,
**kwargs)
Initialize the CohereChatGenerator instance.
Arguments:
api_key
: The API key for the Cohere API.model
: The name of the model to use. You can use models from thecommand
family.streaming_callback
: A callback function that is called when a new token is received from the stream. The callback function accepts StreamingChunk as an argument.api_base_url
: The base URL of the Cohere API.generation_kwargs
: Other parameters to use for the model during generation. For a list of parameters, see Cohere Chat endpoint. Some of the parameters are:- 'messages': A list of messages between the user and the model, meant to give the model conversational context for responding to the user's message.
- 'system_message': When specified, adds a system message at the beginning of the conversation.
- 'citation_quality': Defaults to
accurate
. Dictates the approach taken to generating citations as part of the RAG flow by allowing the user to specify whether they wantaccurate
results orfast
results. - 'temperature': A non-negative float that tunes the degree of randomness in generation. Lower temperatures mean less random generations.
tools
: A list of Tool objects that the model can use. Each tool should have a unique name.
CohereChatGenerator.to_dict
def to_dict() -> Dict[str, Any]
Serializes the component to a dictionary.
Returns:
Dictionary with serialized data.
CohereChatGenerator.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "CohereChatGenerator"
Deserializes the component from a dictionary.
Arguments:
data
: Dictionary to deserialize from.
Returns:
Deserialized component.
CohereChatGenerator.run
@component.output_types(replies=List[ChatMessage])
def run(messages: List[ChatMessage],
generation_kwargs: Optional[Dict[str, Any]] = None,
tools: Optional[List[Tool]] = None)
Invoke the chat endpoint based on the provided messages and generation parameters.
Arguments:
messages
: list ofChatMessage
instances representing the input messages.generation_kwargs
: additional keyword arguments for chat generation. These parameters will potentially override the parameters passed in the init method. For more details on the parameters supported by the Cohere API, refer to the Cohere documentation.tools
: A list of tools for which the model can prepare calls. If set, it will override thetools
parameter set during component initialization.
Returns:
A dictionary with the following keys:
replies
: a list ofChatMessage
instances representing the generated responses.
Module haystack_integrations.components.rankers.cohere.ranker
CohereRanker
Ranks Documents based on their similarity to the query using Cohere models.
Documents are indexed from most to least semantically relevant to the query.
Usage example:
from haystack import Document
from haystack.components.rankers import CohereRanker
ranker = CohereRanker(model="rerank-english-v2.0", top_k=2)
docs = [Document(content="Paris"), Document(content="Berlin")]
query = "What is the capital of germany?"
output = ranker.run(query=query, documents=docs)
docs = output["documents"]
CohereRanker.__init__
def __init__(model: str = "rerank-english-v2.0",
top_k: int = 10,
api_key: Secret = Secret.from_env_var(
["COHERE_API_KEY", "CO_API_KEY"]),
api_base_url: str = "https://api.cohere.com",
max_chunks_per_doc: Optional[int] = None,
meta_fields_to_embed: Optional[List[str]] = None,
meta_data_separator: str = "\n",
max_tokens_per_doc: int = 4096)
Creates an instance of the 'CohereRanker'.
Arguments:
model
: Cohere model name. Check the list of supported models in the Cohere documentation.top_k
: The maximum number of documents to return.api_key
: Cohere API key.api_base_url
: the base URL of the Cohere API.max_chunks_per_doc
: If your document exceeds 512 tokens, this determines the maximum number of chunks a document can be split into. IfNone
, the default of 10 is used. For example, if your document is 6000 tokens, with the default of 10, the document will be split into 10 chunks each of 512 tokens and the last 880 tokens will be disregarded. Check Cohere docs for more information.meta_fields_to_embed
: List of meta fields that should be concatenated with the document content for reranking.meta_data_separator
: Separator used to concatenate the meta fields to the Document content.max_tokens_per_doc
: The maximum number of tokens to embed for each document defaults to 4096.
CohereRanker.to_dict
def to_dict() -> Dict[str, Any]
Serializes the component to a dictionary.
Returns:
Dictionary with serialized data.
CohereRanker.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "CohereRanker"
Deserializes the component from a dictionary.
Arguments:
data
: The dictionary to deserialize from.
Returns:
The deserialized component.
CohereRanker.run
@component.output_types(documents=List[Document])
def run(query: str, documents: List[Document], top_k: Optional[int] = None)
Use the Cohere Reranker to re-rank the list of documents based on the query.
Arguments:
query
: Query string.documents
: List of Documents.top_k
: The maximum number of Documents you want the Ranker to return.
Raises:
ValueError
: Iftop_k
is not > 0.
Returns:
A dictionary with the following keys:
documents
: List of Documents most similar to the given query in descending order of similarity.