Building a Realtime Phone Agent with ADK and Twilio



This content originally appeared on DEV Community and was authored by Julian Hecker

Nowadays, when you call a business, you’re not greeted by a human. You often get an IVR phone tree with layers of menus and options that says “press 1 for this, press 2 for that”.

We can do so much better than this using modern tools.

In this tutorial, we’ll walk through how to create a real-time phone agent using Google’s Agent Development Kit (ADK) and Twilio. We’ll focus on setting up the agent, handling audio, and responding in real-time.

1. Setting up a Google ADK Agent

Google’s Agent Development Kit is one of the latest innovations to come out of the latest AI revolution. This is an open-source agent framework which lets you natively handle bidirectional audio input and output using Gemini Live. No need for setting up a separate transcription service with added latency.

  1. Create a Python project and Virtual Environment

  2. Run pip install google-adk or add it to your requirements.txt file

  3. Get an API key from Google AI Studio. Create an environment variable called GOOGLE_API_KEY and set it to that key.

  4. Create a file called agent.py with the following code:

from google.adk.agents import Agent
from google.adk.tools import google_search

root_agent = Agent(
    name="google_search_agent",
    model="gemini-2.0-flash",
    description="Agent to answer questions using Google Search.",
    instruction="I can answer your questions by searching the internet. Just ask me anything!",
    tools=[google_search]
)

That’s it, that’s the agent setup! Now you can test your agent by running adk web

2. Setting up a Live Runner

Sounds weird, but a Live Runner is what’s going to let your agent respond to you in real-time over the phone. We also need to handle events from the agent as well as inbound audio from Twilio.

Let’s create a live_messaging.py and put the following code in it:

"""Live messaging runtime and bridge for ADK agent."""

from typing import AsyncGenerator, Awaitable, Callable, Literal

from google.adk.agents.run_config import RunConfig, StreamingMode
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.adk.agents.live_request_queue import LiveRequestQueue

from google.genai import types
from google.genai.types import Part, Blob, Content
from pydantic import BaseModel, Field

from .agent import root_agent


def text_to_content(text: str, role: Literal["user", "model"] = "user") -> Content:
    """Helper to create a Content object from text"""
    return Content(role=role, parts=[Part(text=text)])

APP_NAME = "THE VOICE AGENT"

LiveEvents = AsyncGenerator[Event, None]

async def start_agent_session(
    user_id: str, session_id: str
) -> tuple[LiveEvents, LiveRequestQueue]:
    """Starts an agent session"""

    # Create a Runner
    runner = InMemoryRunner(
        root_agent,
        app_name=APP_NAME,
    )

    # Create a Session
    session = await runner.session_service.create_session(
        app_name=APP_NAME,
        user_id=user_id,
        session_id=session_id,
    )

    speech_config = types.SpeechConfig(
        voice_config=types.VoiceConfig(
            # https://ai.google.dev/gemini-api/docs/speech-generation#voices
            prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Zephyr")
        ),
        # https://ai.google.dev/gemini-api/docs/speech-generation#languages
        language_code="en-US",
    )

    automatic_activity_detection = types.AutomaticActivityDetection(
        disabled=False,
        start_of_speech_sensitivity=types.StartSensitivity.START_SENSITIVITY_HIGH,
        end_of_speech_sensitivity=types.EndSensitivity.END_SENSITIVITY_HIGH,
        prefix_padding_ms=150,
        silence_duration_ms=400,
    )
    realtime_input_config = types.RealtimeInputConfig(
        automatic_activity_detection=automatic_activity_detection
    )

    run_config = RunConfig(
        speech_config=speech_config,
        streaming_mode=StreamingMode.BIDI,
        session_resumption=types.SessionResumptionConfig(),
        input_audio_transcription=types.AudioTranscriptionConfig(),
        output_audio_transcription=types.AudioTranscriptionConfig(),
        realtime_input_config=realtime_input_config,
    )

    live_request_queue = LiveRequestQueue()

    live_events = runner.run_live(
        live_request_queue=live_request_queue,
        run_config=run_config,
        session=session,
    )
    return live_events, live_request_queue

class AgentInterruptedEvent(BaseModel):
    type: Literal["interrupted"] = "interrupted"
    timestamp: float = Field(description="Unix timestamp of interruption")

class AgentTurnCompleteEvent(BaseModel):
    type: Literal["complete"] = "complete"
    timestamp: float = Field(description="Unix timestamp of turn completion")

class AgentDataEvent(BaseModel):
    payload: bytes = Field(description="Output PCM bytes (16-bit, 24kHz)")
    type: Literal["data"] = "data"


AgentEvent = AgentInterruptedEvent | AgentTurnCompleteEvent | AgentDataEvent

OnAgentEvent = Callable[[AgentEvent], Awaitable[None]]


async def agent_to_client_messaging(
    on_agent_event: OnAgentEvent, live_events: LiveEvents
) -> None:
    """
    Agent to client communication.
    Sends events to the client via the on_event callback.
    To be used in parallel with webhook loop.

    Args:
        on_agent_event: Async callback invoked per AgentEvent.
        live_events: Async generator of ADK Event objects to send to client.
    """
    async for event in live_events:
        message: AgentEvent

        if event.turn_complete:
            message = AgentTurnCompleteEvent(timestamp=event.timestamp)
            await on_agent_event(message)
            continue

        if event.interrupted:
            message = AgentInterruptedEvent(timestamp=event.timestamp)
            await on_agent_event(message)
            continue

        if not event.content or not event.content.parts:
            print("Agent sent empty content", event)
            continue

        for part in event.content.parts:
            is_text = hasattr(part, "text") and part.text is not None
            is_audio = (
                part.inline_data
                and part.inline_data.mime_type
                and part.inline_data.mime_type.startswith("audio/pcm")
            )

            if is_audio:
                audio_data = part.inline_data and part.inline_data.data
                if not audio_data:
                    continue
                message = AgentDataEvent(payload=audio_data)
                await on_agent_event(message)
                continue

            elif is_text:
                # print(part.text, end="", flush=True)
                continue

            else:
                print("Unknown event content part", event)


def send_pcm_to_agent(pcm_audio: bytes, live_request_queue: LiveRequestQueue):
    """
    Sends audio data to the agent.

    Should be nested inside the websocket loop, which runs alongside agent_to_client_messaging.

    Args:
        pcm_audio: bytes - Input PCM bytes (16-bit, 16kHz)
        live_request_queue: LiveRequestQueue - The live request queue to send audio to
    """
    live_request_queue.send_realtime(
        Blob(data=pcm_audio, mime_type="audio/pcm;rate=16000")
    )

start_agent_session Creates a runner for our session. This lets us push messages to the live request queue which the AI will use to respond in real time. It’s also where we can fine-tune our agent’s voice configuration.

agent_to_client_messaging is what handles sending events from the agent to the client. When the agent tries to speak, uses a function call, transcribes audio, this will be triggered. We’ll pass in a callback function in our API which will handle each event.

send_pcm_to_agent lets us send audio to the live queue, which lets the agent actually hear us.

Once we plug this into our Twilio Handler Server, our agent will be able to hear and speak on the phone.

3. Audio Encoding

There’s lots of different audio formats. There’s mp3, ogg, flac, acc, wav, and more. Luckily, both Twilio and our agent operate using WAV. However, they use it with different encodings and sample rates.

Twilio operates using an 8-bit, 8kHz μ-law. ADK takes 16-bit 16kHz PCM and produces 16-bit 24kHz PCM. We need to resample the audio from one to the other for them to speak with eachother.

Create a file audio.py and use the following code:

import audioop
import numpy as np
import soxr

# Inbound: Twilio 8-bit 8kHz μ-law -> 16-bit 16kHz PCM for ADK
def twilio_ulaw8k_to_adk_pcm16k(mulaw_bytes: bytes) -> bytes:
    pcm8 = audioop.ulaw2lin(mulaw_bytes, 2)  # μ-law -> 16-bit PCM @ 8kHz
    # resample: int16 <-> float32 for soxr
    x = np.frombuffer(pcm8, dtype=np.int16).astype(np.float32) / 32768.0
    y = soxr.resample(x, 8000, 16000)  # 8kHz -> 16kHz
    pcm16 = (np.clip(y, -1, 1) * 32767).astype(np.int16).tobytes()
    return pcm16

# Outbound: ADK 16-bit 24kHz PCM -> Twilio 8-bit 8kHz μ-law
def adk_pcm24k_to_twilio_ulaw8k(pcm24: bytes) -> bytes:
    x = np.frombuffer(pcm24, dtype=np.int16).astype(np.float32) / 32768.0
    y = soxr.resample(x, 24000, 8000)  # 24kHz -> 8kHz
    pcm8 = (np.clip(y, -1, 1) * 32767).astype(np.int16).tobytes()
    ulaw = audioop.lin2ulaw(pcm8, 2)  # PCM -> μ-law
    return ulaw

4. Handling Twilio Requests

Finally, we need to create a server to handle phone calls and audio from Twilio. For this, we’re going to create a FastAPI server.

  1. Run pip install fastapi[standard] twilio or add them to your requirements.txt.
  2. Create an account with Twilio. We’ll need a phone number that can make phone calls.
  3. Create a file main.py with the code below:
import asyncio
import base64
import logging
from uuid import uuid4

from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from twilio.twiml.voice_response import Connect, Stream, VoiceResponse

from .live_messaging import AgentEvent, agent_to_client_messaging, send_pcm_to_agent, start_agent_session, text_to_content
from .audio import adk_pcm24k_to_twilio_ulaw8k, twilio_ulaw8k_to_adk_pcm16k

logger = logging.getLogger('uvicorn.error')

api = FastAPI()


@api.post("/connect")
def create_call(req: Request):
    """Generate TwiML to connect a call to a Twilio Media Stream"""

    host = req.url.hostname
    scheme = req.url.scheme
    ws_protocol = "ws" if scheme == "http" else "wss"
    ws_url = f"{ws_protocol}://{host}/twilio/stream"

    stream = Stream(url=ws_url)
    connect = Connect()
    connect.append(stream)
    response = VoiceResponse()
    response.append(connect)

    logger.info(response)

    return HTMLResponse(content=str(response), media_type="application/xml")


@api.websocket("/stream")
async def twilio_websocket(ws: WebSocket):
    """Handle Twilio Media Stream WebSocket connection"""

    await ws.accept()
    await ws.receive_json()  # throw away `connected` event

    start_event = await ws.receive_json()
    assert start_event["event"] == "start"

    call_sid = start_event["start"]["callSid"]
    stream_sid = start_event["streamSid"]
    user_id = uuid4().hex  # Fake user ID for this example

    live_events, live_request_queue = await start_agent_session(user_id, call_sid)

    # Sending an initial message makes the agent speak first when the call starts.
    initial_message = text_to_content("Introduce yourself.", "user")
    live_request_queue.send_content(initial_message)

    async def handle_agent_event(event: AgentEvent):
        """Handle outgoing AgentEvent to Twilio WebSocket"""

        if event.type == "complete":
            logger.info(f"Agent turn complete at {event.timestamp}")
            return

        if event.type == "interrupted":
            logger.info(f"Agent interrupted at {event.timestamp}")
            # https://www.twilio.com/docs/voice/media-streams/websocket-messages#send-a-clear-message
            return await ws.send_json({"event": "clear", "streamSid": stream_sid})

        ulaw_bytes = adk_pcm24k_to_twilio_ulaw8k(event.payload)
        payload = base64.b64encode(ulaw_bytes).decode("ascii")

        await ws.send_json(
            {
                "event": "media",
                "streamSid": stream_sid,
                "media": {"payload": payload},
            }
        )

    async def websocket_loop():
        """
        Handle incoming WebSocket messages to Agent.
        """
        while True:
            event = await ws.receive_json()
            event_type = event["event"]

            if event_type == "stop":
                logger.debug(f"Call ended by Twilio. Stream SID: {stream_sid}")
                break

            if event_type == "start" or event_type == "connected":
                logger.warning(f"Unexpected Twilio Initialization event: {event}")
                continue

            elif event_type == "dtmf":
                digit = event["dtmf"]["digit"]
                logger.info(f"DTMF: {digit}")
                continue

            elif event_type == "mark":
                logger.info(f"Twilio sent a Mark Event: {event}")
                continue

            elif event_type == "media":
                payload = event["media"]["payload"]
                mulaw_bytes = base64.b64decode(payload)
                pcm_bytes = twilio_ulaw8k_to_adk_pcm16k(mulaw_bytes)
                send_pcm_to_agent(pcm_bytes, live_request_queue)

    try:
        websocket_coro = websocket_loop()
        websocket_task = asyncio.create_task(websocket_coro)
        messaging_coro = agent_to_client_messaging(handle_agent_event, live_events)
        messaging_task = asyncio.create_task(messaging_coro)
        tasks = [websocket_task, messaging_task]
        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
        for p in pending:
            p.cancel()
        await asyncio.gather(*pending, return_exceptions=True)
        for d in done:
            if d.cancelled():
                continue
            exception = d.exception()
            if exception:
                raise exception
    except (KeyboardInterrupt, asyncio.CancelledError, WebSocketDisconnect):
        logger.warning("Process interrupted, exiting...")
    except Exception as ex:
        logger.exception(f"Unexpected Error: {ex}")
    finally:
        live_request_queue.close()
        try:
            await ws.close()
        except Exception as ex:
            logger.warning(f"Error while closing WebSocket: {ex}")

The gist: you have a /connect route that will be hit when people call your twilio number, and a /stream WebSocket route that will handle bidirectional media from Twilio. In the websocket route, you have websocket_loop and handle_agent_event which are running together in parallel.

NOTE: This code does not implement Twilio signature verification. This is an important security feature that you should implement to prevent hackers from abusing your endpoint.

5. Running it Live

There’s only a little left to do until you can speak with your agent on the phone!

  1. Install ngrok. This lets Twilio hit your local web server.
  2. Use ngrok to forward your local server to the internet. Get the address.
  3. Configure Twilio to hit your local server. Go to your Phone Numbers, select your number, go to the Configure tab.
    • Under Configure with, select Webhook, TwiML Bin, Function, Studio Flow, Proxy Service.
    • Under A call comes in, select Webhook.
    • For HTTP, select HTTP Post.
    • For URL, use the URL you got from ngrok, appended with /twilio/connect.
    • The whole URL might look like https://<subdomain>.ngrok-free.app/twilio/connect.
  4. Call your number and listen to your agent!

6. Conclusion

I hope you have fun setting up a voice agent you can actually speak with. I can’t wait to see what great things people build with this! Maybe one day we can forget about those “customer service” phone systems that never understand what you need. Maybe shouting “REPRESENTATIVE” into the phone will be a thing of the past!

I created this tutorial as part of a submission to the GKE Turns 10 Hackathon, where I created a Banking Phone Agent. I deployed this using Google Kubernetes Engine which.


This content originally appeared on DEV Community and was authored by Julian Hecker