Skip to main content

LangfuseConnector

LangfuseConnector connects Haystack LLM framework with Langfuse in order to enable the

Basic Information

  • Type: haystack_integrations.components.connectors.langfuse.langfuse_connector.LangfuseConnector

Inputs

ParameterTypeDefaultDescription
invocation_contextOptional[Dict[str, Any]]NoneA dictionary with additional context for the invocation. This parameter is useful when users want to mark this particular invocation with additional information, e.g. a run id from their own execution framework, user id, etc. These key-value pairs are then visible in the Langfuse traces.

Outputs

ParameterTypeDefaultDescription
namestrA dictionary with the following keys: - name: The name of the tracing component. - trace_url: The URL to the tracing data. - trace_id: The ID of the trace.
trace_urlstrA dictionary with the following keys: - name: The name of the tracing component. - trace_url: The URL to the tracing data. - trace_id: The ID of the trace.
trace_idstrA dictionary with the following keys: - name: The name of the tracing component. - trace_url: The URL to the tracing data. - trace_id: The ID of the trace.

Overview

Work in Progress

Bear with us while we're working on adding pipeline examples and most common components connections.

LangfuseConnector connects Haystack LLM framework with Langfuse in order to enable the tracing of operations and data flow within various components of a pipeline.

To use LangfuseConnector, add it to your pipeline without connecting it to any other components. It will automatically trace all pipeline operations when tracing is enabled.

Environment Configuration:

  • LANGFUSE_SECRET_KEY and LANGFUSE_PUBLIC_KEY: Required Langfuse API credentials.
  • HAYSTACK_CONTENT_TRACING_ENABLED: Must be set to "true" to enable tracing.
  • HAYSTACK_LANGFUSE_ENFORCE_FLUSH: (Optional) If set to "false", disables flushing after each component. Be cautious: this may cause data loss on crashes unless you manually flush before shutdown. By default, the data is flushed after each component and blocks the thread until the data is sent to Langfuse.

If you disable flushing after each component make sure you will call langfuse.flush() explicitly before the program exits. For example:

from haystack.tracing import tracer

try:
# your code here
finally:
tracer.actual_tracer.flush()

or in FastAPI by defining a shutdown event handler:

from haystack.tracing import tracer

# ...

@app.on_event("shutdown")
async def shutdown_event():
tracer.actual_tracer.flush()

Here is an example of how to use LangfuseConnector in a pipeline:

import os

os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"

from haystack import Pipeline
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack_integrations.components.connectors.langfuse import (
LangfuseConnector,
)

pipe = Pipeline()
pipe.add_component("tracer", LangfuseConnector("Chat example"))
pipe.add_component("prompt_builder", ChatPromptBuilder())
pipe.add_component("llm", OpenAIChatGenerator(model="gpt-4o-mini"))

pipe.connect("prompt_builder.prompt", "llm.messages")

messages = [
ChatMessage.from_system(
"Always respond in German even if some input data is in other languages."
),
ChatMessage.from_user("Tell me about {{location}}"),
]

response = pipe.run(
data={
"prompt_builder": {
"template_variables": {"location": "Berlin"},
"template": messages,
}
}
)
print(response["llm"]["replies"][0])
print(response["tracer"]["trace_url"])
print(response["tracer"]["trace_id"])

For advanced use cases, you can also customize how spans are created and processed by providing a custom SpanHandler. This allows you to add custom metrics, set warning levels, or attach additional metadata to your Langfuse traces:

from haystack_integrations.tracing.langfuse import DefaultSpanHandler, LangfuseSpan
from typing import Optional

class CustomSpanHandler(DefaultSpanHandler):

def handle(self, span: LangfuseSpan, component_type: Optional[str]) -> None:
# Custom span handling logic, customize Langfuse spans however it fits you
# see DefaultSpanHandler for how we create and process spans by default
pass

connector = LangfuseConnector(span_handler=CustomSpanHandler())

Usage Example

components:
LangfuseConnector:
type: langfuse.src.haystack_integrations.components.connectors.langfuse.langfuse_connector.LangfuseConnector
init_parameters:

Parameters

Init Parameters

These are the parameters you can configure in Pipeline Builder:

ParameterTypeDefaultDescription
namestrThe name for the trace. This name will be used to identify the tracing run in the Langfuse dashboard.
publicboolFalseWhether the tracing data should be public or private. If set to True, the tracing data will be publicly accessible to anyone with the tracing URL. If set to False, the tracing data will be private and only accessible to the Langfuse account owner. The default is False.
public_keyOptional[Secret]Secret.from_env_var('LANGFUSE_PUBLIC_KEY')The Langfuse public key. Defaults to reading from LANGFUSE_PUBLIC_KEY environment variable.
secret_keyOptional[Secret]Secret.from_env_var('LANGFUSE_SECRET_KEY')The Langfuse secret key. Defaults to reading from LANGFUSE_SECRET_KEY environment variable.
httpx_clientOptional[httpx.Client]NoneOptional custom httpx.Client instance to use for Langfuse API calls. Note that when deserializing a pipeline from YAML, any custom client is discarded and Langfuse will create its own default client, since HTTPX clients cannot be serialized.
span_handlerOptional[SpanHandler]NoneOptional custom handler for processing spans. If None, uses DefaultSpanHandler. The span handler controls how spans are created and processed, allowing customization of span types based on component types and additional processing after spans are yielded. See SpanHandler class for details on implementing custom handlers. host: Host of Langfuse API. Can also be set via LANGFUSE_HOST environment variable. By default it is set to https://cloud.langfuse.com.
langfuse_client_kwargsOptional[Dict[str, Any]]NoneOptional custom configuration for the Langfuse client. This is a dictionary containing any additional configuration options for the Langfuse client. See the Langfuse documentation for more details on available configuration options.
hostOptional[str]None

Run Method Parameters

These are the parameters you can configure for the component's run() method. This means you can pass these parameters at query time through the API, in Playground, or when running a job. For details, see Modify Pipeline Parameters at Query Time.

ParameterTypeDefaultDescription
invocation_contextOptional[Dict[str, Any]]NoneA dictionary with additional context for the invocation. This parameter is useful when users want to mark this particular invocation with additional information, e.g. a run id from their own execution framework, user id, etc. These key-value pairs are then visible in the Langfuse traces.