OTEL Observability with Langfuse for Strands Agents



This content originally appeared on DEV Community and was authored by Shakir

Hi 👋, in this post we shall see how to use Open Telemetry and Langfuse cloud to observe LLM calls made via Strands Agents.

UV

Let’s initialize our project with uv.

uv init strands-langfuse-demo

Copy the code shown in this video to the project, so that the content looks like below. The final code for this lab is anyway here.

$ cd strands-langfuse-demo/

$ ls -a
.  ..  .env  .python-version  README.md  k8s_mcp_agent.py  k8s_mcp_app.py  main.py  pyproject.toml

Let’s activate the virtual environment.

uv venv

source .venv/bin/activate

We can now install these dependencies: streamlit, strands-agents

uv add streamlit==1.49.1
uv add strands-agents[otel]==1.6.0

We are adding the extra otel here, as we need to enable telemetry.

K3S

I am using K3S for launching a single node kubernetes cluster.

curl -sfL https://get.k3s.io | sh - 

Let’s copy the kubeconfig as we will be referring to this in our agent configuration.

sudo cp /etc/rancher/k3s/k3s.yaml .

Langfuse

Go to Langfuse cloud and create a new organization(for ex. my-org), inside that you can create a new project(for ex. my-llm-org). And generate api keys for the project.
Generate API keys for Langfuse project

We need to add these variables in our env file. So it looks like below.

cat .env
AWS_ACCESS_KEY_ID=""
AWS_SECRET_ACCESS_KEY=""
AWS_DEFAULT_REGION=us-west-2
LANGFUSE_PUBLIC_KEY="pk-lf-.."
LANGFUSE_SECRET_KEY="sk-lf-.."
OTEL_EXPORTER_OTLP_ENDPOINT = "https://cloud.langfuse.com/api/public/otel"

Let’s use this code to load the env vars and set the telemetry with otlp headers.

$ cat set_telemetry.py 
import os
import base64

from dotenv import load_dotenv
from strands.telemetry import StrandsTelemetry


def set_telemetry():
    load_dotenv()

    # Build Basic Auth header.
    LANGFUSE_AUTH = base64.b64encode(
        f"{os.environ.get('LANGFUSE_PUBLIC_KEY')}:{os.environ.get('LANGFUSE_SECRET_KEY')}".encode()
    ).decode()

    # Configure OpenTelemetry endpoint & headers
    os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {LANGFUSE_AUTH}"

    strands_telemetry = StrandsTelemetry()
    strands_telemetry.setup_otlp_exporter()

Since we are loading env above, we can remove it from the agent code. These lines can be removed:

$ cat k8s_mcp_agent.py | grep dotenv
from dotenv import load_dotenv
        load_dotenv()

And replace it with the new file we added.

$ cat k8s_mcp_agent.py | grep telemetry
from set_telemetry import set_telemetry
        set_telemetry()

Streamlit

Let’s now run our app.

streamlit run k8s_mcp_app.py

I tried a prompt to get the list of namespaces.
Check the list of namespaces from streamlit

We can see the trace for this in langfuse.
Trace on langfuse

We can add some custom attributes to traces, let’s add one by the name session.id with the trace_attributes argument.

import uuid

self.agent = Agent(
                callback_handler=None,
                model="us.amazon.nova-micro-v1:0",
                tools=tools,
                trace_attributes={
                    "session.id": str(uuid.uuid4()),
                },
            )

Which we can now see in the trace in the attributes section.
Session id in trace

The final agent code will be:

$ cat k8s_mcp_agent.py 
import re

from mcp import stdio_client, StdioServerParameters
from strands import Agent
from strands.tools.mcp import MCPClient
import streamlit as st

from set_telemetry import set_telemetry
import uuid

def remove_html_tags(text_with_html):
    text_with_out_html = re.sub(r"<[^>]+>", "", text_with_html)
    return text_with_out_html

async def stream_result(stream):
    result = ""
    placeholder = st.empty()
    async for chunk in stream:
        if 'data' in chunk:
            result += chunk['data']
            result = remove_html_tags(result)
            placeholder.write(result)


class KubernetesMCPAgent:
    def __init__(self):
        set_telemetry()
        server_params = {
            "command": "npx",
            "args": [
                "-y",
                "kubernetes-mcp-server@latest"
            ],
            "env": {
                "KUBECONFIG": "k3s.yaml"
            }
        }

        self.stdio_mcp_client = MCPClient(lambda: stdio_client(
            StdioServerParameters(
                **server_params
            )
        ))

        with self.stdio_mcp_client:
            # Get the tools from the MCP server
            tools = self.stdio_mcp_client.list_tools_sync()

            # Create an agent with these tools
            self.agent = Agent(
                callback_handler=None,
                model="us.amazon.nova-micro-v1:0",
                tools=tools,
                trace_attributes={
                    "session.id": str(uuid.uuid4()),
                },
            )


    async def send_prompt(self, prompt):
        with self.stdio_mcp_client:
            stream = self.agent.stream_async(prompt)
            await stream_result(stream)


kubernetes_mcp_agent = KubernetesMCPAgent()

That’s it for the post, hope we got some insights on LLM observability. Thank you for reading!


This content originally appeared on DEV Community and was authored by Shakir