Langfuse integration for Haystack
Module haystack_integrations.components.connectors.langfuse.langfuse_connector
LangfuseConnector
LangfuseConnector connects Haystack LLM framework with Langfuse in order to enable the tracing of operations and data flow within various components of a pipeline.
Simply add this component to your pipeline, but do not connect it to any other component. The LangfuseConnector will automatically trace the operations and data flow within the pipeline.
Note that you need to set the LANGFUSE_SECRET_KEY
and LANGFUSE_PUBLIC_KEY
environment variables in order
to use this component. The LANGFUSE_SECRET_KEY
and LANGFUSE_PUBLIC_KEY
are the secret and public keys provided
by Langfuse. You can get these keys by signing up for an account on the Langfuse website.
In addition, you need to set the HAYSTACK_CONTENT_TRACING_ENABLED
environment variable to true
in order to
enable Haystack tracing in your pipeline.
Lastly, you may disable flushing the data after each component by setting the HAYSTACK_LANGFUSE_ENFORCE_FLUSH
environent variable to false
. By default, the data is flushed after each component and blocks the thread until
the data is sent to Langfuse. Caution: Disabling this feature may result in data loss if the program crashes
before the data is sent to Langfuse. Make sure you will call langfuse.flush() explicitly before the program exits.
E.g. by using tracer.actual_tracer.flush():
from haystack.tracing import tracer
try:
# your code here
finally:
tracer.actual_tracer.flush()
or in FastAPI by defining a shutdown event handler:
from haystack.tracing import tracer
# ...
@app.on_event("shutdown")
async def shutdown_event():
tracer.actual_tracer.flush()
Here is an example of how to use it:
import os
os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"
from haystack import Pipeline
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack_integrations.components.connectors.langfuse import (
LangfuseConnector,
)
if __name__ == "__main__":
pipe = Pipeline()
pipe.add_component("tracer", LangfuseConnector("Chat example"))
pipe.add_component("prompt_builder", ChatPromptBuilder())
pipe.add_component("llm", OpenAIChatGenerator(model="gpt-3.5-turbo"))
pipe.connect("prompt_builder.prompt", "llm.messages")
messages = [
ChatMessage.from_system(
"Always respond in German even if some input data is in other languages."
),
ChatMessage.from_user("Tell me about {{location}}"),
]
response = pipe.run(
data={
"prompt_builder": {
"template_variables": {"location": "Berlin"},
"template": messages,
}
}
)
print(response["llm"]["replies"][0])
print(response["tracer"]["trace_url"])
LangfuseConnector.__init__
def __init__(name: str, public: bool = False)
Initialize the LangfuseConnector component.
Arguments:
name
: The name of the pipeline or component. This name will be used to identify the tracing run on the Langfuse dashboard.public
: Whether the tracing data should be public or private. If set toTrue
, the tracing data will be publicly accessible to anyone with the tracing URL. If set toFalse
, the tracing data will be private and only accessible to the Langfuse account owner. The default isFalse
.
LangfuseConnector.run
@component.output_types(name=str, trace_url=str)
def run(invocation_context: Optional[Dict[str, Any]] = None)
Runs the LangfuseConnector component.
Arguments:
invocation_context
: A dictionary with additional context for the invocation. This parameter is useful when users want to mark this particular invocation with additional information, e.g. a run id from their own execution framework, user id, etc. These key-value pairs are then visible in the Langfuse traces.
Returns:
A dictionary with the following keys:
name
: The name of the tracing component.trace_url
: The URL to the tracing data.
Module haystack_integrations.tracing.langfuse.tracer
LangfuseSpan
Internal class representing a bridge between the Haystack span tracing API and Langfuse.
LangfuseSpan.__init__
def __init__(
span:
"Union[langfuse.client.StatefulSpanClient, langfuse.client.StatefulTraceClient]"
) -> None
Initialize a LangfuseSpan instance.
Arguments:
span
: The span instance managed by Langfuse.
LangfuseSpan.set_tag
def set_tag(key: str, value: Any) -> None
Set a generic tag for this span.
Arguments:
key
: The tag key.value
: The tag value.
LangfuseSpan.set_content_tag
def set_content_tag(key: str, value: Any) -> None
Set a content-specific tag for this span.
Arguments:
key
: The content tag key.value
: The content tag value.
LangfuseSpan.raw_span
def raw_span() -> Any
Return the underlying span instance.
Returns:
The Langfuse span instance.
LangfuseTracer
Internal class representing a bridge between the Haystack tracer and Langfuse.
LangfuseTracer.__init__
def __init__(tracer: "langfuse.Langfuse",
name: str = "Haystack",
public: bool = False) -> None
Initialize a LangfuseTracer instance.
Arguments:
tracer
: The Langfuse tracer instance.name
: The name of the pipeline or component. This name will be used to identify the tracing run on the Langfuse dashboard.public
: Whether the tracing data should be public or private. If set toTrue
, the tracing data will be publicly accessible to anyone with the tracing URL. If set toFalse
, the tracing data will be private and only accessible to the Langfuse account owner.
LangfuseTracer.trace
@contextlib.contextmanager
def trace(operation_name: str,
tags: Optional[Dict[str, Any]] = None) -> Iterator[Span]
Start and manage a new trace span.
Arguments:
operation_name
: The name of the operation.tags
: A dictionary of tags to attach to the span.
Returns:
A context manager yielding the span.
LangfuseTracer.current_span
def current_span() -> Span
Return the currently active span.
Returns:
The currently active span.
LangfuseTracer.get_trace_url
def get_trace_url() -> str
Return the URL to the tracing data.
Returns:
The URL to the tracing data.