This content originally appeared on DEV Community and was authored by Julian Hecker
Nowadays, when you call a business, you’re not greeted by a human. You often get an IVR phone tree with layers of menus and options that says “press 1 for this, press 2 for that”.
We can do so much better than this using modern tools.
In this tutorial, we’ll walk through how to create a real-time phone agent using Google’s Agent Development Kit (ADK) and Twilio. We’ll focus on setting up the agent, handling audio, and responding in real-time.
1. Setting up a Google ADK Agent
Google’s Agent Development Kit is one of the latest innovations to come out of the latest AI revolution. This is an open-source agent framework which lets you natively handle bidirectional audio input and output using Gemini Live. No need for setting up a separate transcription service with added latency.
Create a Python project and Virtual Environment
Run
pip install google-adk
or add it to yourrequirements.txt
fileGet an API key from Google AI Studio. Create an environment variable called
GOOGLE_API_KEY
and set it to that key.Create a file called
agent.py
with the following code:
from google.adk.agents import Agent
from google.adk.tools import google_search
root_agent = Agent(
name="google_search_agent",
model="gemini-2.0-flash",
description="Agent to answer questions using Google Search.",
instruction="I can answer your questions by searching the internet. Just ask me anything!",
tools=[google_search]
)
That’s it, that’s the agent setup! Now you can test your agent by running adk web
2. Setting up a Live Runner
Sounds weird, but a Live Runner is what’s going to let your agent respond to you in real-time over the phone. We also need to handle events from the agent as well as inbound audio from Twilio.
Let’s create a live_messaging.py
and put the following code in it:
"""Live messaging runtime and bridge for ADK agent."""
from typing import AsyncGenerator, Awaitable, Callable, Literal
from google.adk.agents.run_config import RunConfig, StreamingMode
from google.adk.events import Event
from google.adk.runners import InMemoryRunner
from google.adk.agents.live_request_queue import LiveRequestQueue
from google.genai import types
from google.genai.types import Part, Blob, Content
from pydantic import BaseModel, Field
from .agent import root_agent
def text_to_content(text: str, role: Literal["user", "model"] = "user") -> Content:
"""Helper to create a Content object from text"""
return Content(role=role, parts=[Part(text=text)])
APP_NAME = "THE VOICE AGENT"
LiveEvents = AsyncGenerator[Event, None]
async def start_agent_session(
user_id: str, session_id: str
) -> tuple[LiveEvents, LiveRequestQueue]:
"""Starts an agent session"""
# Create a Runner
runner = InMemoryRunner(
root_agent,
app_name=APP_NAME,
)
# Create a Session
session = await runner.session_service.create_session(
app_name=APP_NAME,
user_id=user_id,
session_id=session_id,
)
speech_config = types.SpeechConfig(
voice_config=types.VoiceConfig(
# https://ai.google.dev/gemini-api/docs/speech-generation#voices
prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Zephyr")
),
# https://ai.google.dev/gemini-api/docs/speech-generation#languages
language_code="en-US",
)
automatic_activity_detection = types.AutomaticActivityDetection(
disabled=False,
start_of_speech_sensitivity=types.StartSensitivity.START_SENSITIVITY_HIGH,
end_of_speech_sensitivity=types.EndSensitivity.END_SENSITIVITY_HIGH,
prefix_padding_ms=150,
silence_duration_ms=400,
)
realtime_input_config = types.RealtimeInputConfig(
automatic_activity_detection=automatic_activity_detection
)
run_config = RunConfig(
speech_config=speech_config,
streaming_mode=StreamingMode.BIDI,
session_resumption=types.SessionResumptionConfig(),
input_audio_transcription=types.AudioTranscriptionConfig(),
output_audio_transcription=types.AudioTranscriptionConfig(),
realtime_input_config=realtime_input_config,
)
live_request_queue = LiveRequestQueue()
live_events = runner.run_live(
live_request_queue=live_request_queue,
run_config=run_config,
session=session,
)
return live_events, live_request_queue
class AgentInterruptedEvent(BaseModel):
type: Literal["interrupted"] = "interrupted"
timestamp: float = Field(description="Unix timestamp of interruption")
class AgentTurnCompleteEvent(BaseModel):
type: Literal["complete"] = "complete"
timestamp: float = Field(description="Unix timestamp of turn completion")
class AgentDataEvent(BaseModel):
payload: bytes = Field(description="Output PCM bytes (16-bit, 24kHz)")
type: Literal["data"] = "data"
AgentEvent = AgentInterruptedEvent | AgentTurnCompleteEvent | AgentDataEvent
OnAgentEvent = Callable[[AgentEvent], Awaitable[None]]
async def agent_to_client_messaging(
on_agent_event: OnAgentEvent, live_events: LiveEvents
) -> None:
"""
Agent to client communication.
Sends events to the client via the on_event callback.
To be used in parallel with webhook loop.
Args:
on_agent_event: Async callback invoked per AgentEvent.
live_events: Async generator of ADK Event objects to send to client.
"""
async for event in live_events:
message: AgentEvent
if event.turn_complete:
message = AgentTurnCompleteEvent(timestamp=event.timestamp)
await on_agent_event(message)
continue
if event.interrupted:
message = AgentInterruptedEvent(timestamp=event.timestamp)
await on_agent_event(message)
continue
if not event.content or not event.content.parts:
print("Agent sent empty content", event)
continue
for part in event.content.parts:
is_text = hasattr(part, "text") and part.text is not None
is_audio = (
part.inline_data
and part.inline_data.mime_type
and part.inline_data.mime_type.startswith("audio/pcm")
)
if is_audio:
audio_data = part.inline_data and part.inline_data.data
if not audio_data:
continue
message = AgentDataEvent(payload=audio_data)
await on_agent_event(message)
continue
elif is_text:
# print(part.text, end="", flush=True)
continue
else:
print("Unknown event content part", event)
def send_pcm_to_agent(pcm_audio: bytes, live_request_queue: LiveRequestQueue):
"""
Sends audio data to the agent.
Should be nested inside the websocket loop, which runs alongside agent_to_client_messaging.
Args:
pcm_audio: bytes - Input PCM bytes (16-bit, 16kHz)
live_request_queue: LiveRequestQueue - The live request queue to send audio to
"""
live_request_queue.send_realtime(
Blob(data=pcm_audio, mime_type="audio/pcm;rate=16000")
)
start_agent_session
Creates a runner for our session. This lets us push messages to the live request queue which the AI will use to respond in real time. It’s also where we can fine-tune our agent’s voice configuration.
agent_to_client_messaging
is what handles sending events from the agent to the client. When the agent tries to speak, uses a function call, transcribes audio, this will be triggered. We’ll pass in a callback function in our API which will handle each event.
send_pcm_to_agent
lets us send audio to the live queue, which lets the agent actually hear us.
Once we plug this into our Twilio Handler Server, our agent will be able to hear and speak on the phone.
3. Audio Encoding
There’s lots of different audio formats. There’s mp3, ogg, flac, acc, wav, and more. Luckily, both Twilio and our agent operate using WAV. However, they use it with different encodings and sample rates.
Twilio operates using an 8-bit, 8kHz μ-law. ADK takes 16-bit 16kHz PCM and produces 16-bit 24kHz PCM. We need to resample the audio from one to the other for them to speak with eachother.
Create a file audio.py
and use the following code:
import audioop
import numpy as np
import soxr
# Inbound: Twilio 8-bit 8kHz μ-law -> 16-bit 16kHz PCM for ADK
def twilio_ulaw8k_to_adk_pcm16k(mulaw_bytes: bytes) -> bytes:
pcm8 = audioop.ulaw2lin(mulaw_bytes, 2) # μ-law -> 16-bit PCM @ 8kHz
# resample: int16 <-> float32 for soxr
x = np.frombuffer(pcm8, dtype=np.int16).astype(np.float32) / 32768.0
y = soxr.resample(x, 8000, 16000) # 8kHz -> 16kHz
pcm16 = (np.clip(y, -1, 1) * 32767).astype(np.int16).tobytes()
return pcm16
# Outbound: ADK 16-bit 24kHz PCM -> Twilio 8-bit 8kHz μ-law
def adk_pcm24k_to_twilio_ulaw8k(pcm24: bytes) -> bytes:
x = np.frombuffer(pcm24, dtype=np.int16).astype(np.float32) / 32768.0
y = soxr.resample(x, 24000, 8000) # 24kHz -> 8kHz
pcm8 = (np.clip(y, -1, 1) * 32767).astype(np.int16).tobytes()
ulaw = audioop.lin2ulaw(pcm8, 2) # PCM -> μ-law
return ulaw
4. Handling Twilio Requests
Finally, we need to create a server to handle phone calls and audio from Twilio. For this, we’re going to create a FastAPI server.
- Run
pip install fastapi[standard] twilio
or add them to yourrequirements.txt
. - Create an account with Twilio. We’ll need a phone number that can make phone calls.
- Create a file
main.py
with the code below:
import asyncio
import base64
import logging
from uuid import uuid4
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from twilio.twiml.voice_response import Connect, Stream, VoiceResponse
from .live_messaging import AgentEvent, agent_to_client_messaging, send_pcm_to_agent, start_agent_session, text_to_content
from .audio import adk_pcm24k_to_twilio_ulaw8k, twilio_ulaw8k_to_adk_pcm16k
logger = logging.getLogger('uvicorn.error')
api = FastAPI()
@api.post("/connect")
def create_call(req: Request):
"""Generate TwiML to connect a call to a Twilio Media Stream"""
host = req.url.hostname
scheme = req.url.scheme
ws_protocol = "ws" if scheme == "http" else "wss"
ws_url = f"{ws_protocol}://{host}/twilio/stream"
stream = Stream(url=ws_url)
connect = Connect()
connect.append(stream)
response = VoiceResponse()
response.append(connect)
logger.info(response)
return HTMLResponse(content=str(response), media_type="application/xml")
@api.websocket("/stream")
async def twilio_websocket(ws: WebSocket):
"""Handle Twilio Media Stream WebSocket connection"""
await ws.accept()
await ws.receive_json() # throw away `connected` event
start_event = await ws.receive_json()
assert start_event["event"] == "start"
call_sid = start_event["start"]["callSid"]
stream_sid = start_event["streamSid"]
user_id = uuid4().hex # Fake user ID for this example
live_events, live_request_queue = await start_agent_session(user_id, call_sid)
# Sending an initial message makes the agent speak first when the call starts.
initial_message = text_to_content("Introduce yourself.", "user")
live_request_queue.send_content(initial_message)
async def handle_agent_event(event: AgentEvent):
"""Handle outgoing AgentEvent to Twilio WebSocket"""
if event.type == "complete":
logger.info(f"Agent turn complete at {event.timestamp}")
return
if event.type == "interrupted":
logger.info(f"Agent interrupted at {event.timestamp}")
# https://www.twilio.com/docs/voice/media-streams/websocket-messages#send-a-clear-message
return await ws.send_json({"event": "clear", "streamSid": stream_sid})
ulaw_bytes = adk_pcm24k_to_twilio_ulaw8k(event.payload)
payload = base64.b64encode(ulaw_bytes).decode("ascii")
await ws.send_json(
{
"event": "media",
"streamSid": stream_sid,
"media": {"payload": payload},
}
)
async def websocket_loop():
"""
Handle incoming WebSocket messages to Agent.
"""
while True:
event = await ws.receive_json()
event_type = event["event"]
if event_type == "stop":
logger.debug(f"Call ended by Twilio. Stream SID: {stream_sid}")
break
if event_type == "start" or event_type == "connected":
logger.warning(f"Unexpected Twilio Initialization event: {event}")
continue
elif event_type == "dtmf":
digit = event["dtmf"]["digit"]
logger.info(f"DTMF: {digit}")
continue
elif event_type == "mark":
logger.info(f"Twilio sent a Mark Event: {event}")
continue
elif event_type == "media":
payload = event["media"]["payload"]
mulaw_bytes = base64.b64decode(payload)
pcm_bytes = twilio_ulaw8k_to_adk_pcm16k(mulaw_bytes)
send_pcm_to_agent(pcm_bytes, live_request_queue)
try:
websocket_coro = websocket_loop()
websocket_task = asyncio.create_task(websocket_coro)
messaging_coro = agent_to_client_messaging(handle_agent_event, live_events)
messaging_task = asyncio.create_task(messaging_coro)
tasks = [websocket_task, messaging_task]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for p in pending:
p.cancel()
await asyncio.gather(*pending, return_exceptions=True)
for d in done:
if d.cancelled():
continue
exception = d.exception()
if exception:
raise exception
except (KeyboardInterrupt, asyncio.CancelledError, WebSocketDisconnect):
logger.warning("Process interrupted, exiting...")
except Exception as ex:
logger.exception(f"Unexpected Error: {ex}")
finally:
live_request_queue.close()
try:
await ws.close()
except Exception as ex:
logger.warning(f"Error while closing WebSocket: {ex}")
The gist: you have a /connect route that will be hit when people call your twilio number, and a /stream WebSocket route that will handle bidirectional media from Twilio. In the websocket route, you have websocket_loop
and handle_agent_event
which are running together in parallel.
NOTE: This code does not implement Twilio signature verification. This is an important security feature that you should implement to prevent hackers from abusing your endpoint.
5. Running it Live
There’s only a little left to do until you can speak with your agent on the phone!
- Install
ngrok
. This lets Twilio hit your local web server. - Use
ngrok
to forward your local server to the internet. Get the address. - Configure Twilio to hit your local server. Go to your Phone Numbers, select your number, go to the
Configure
tab.- Under
Configure with
, selectWebhook, TwiML Bin, Function, Studio Flow, Proxy Service
. - Under
A call comes in
, selectWebhook
. - For
HTTP
, selectHTTP Post
. - For
URL
, use the URL you got fromngrok
, appended with/twilio/connect.
- The whole URL might look like
https://<subdomain>.ngrok-free.app/twilio/connect
.
- Under
- Call your number and listen to your agent!
6. Conclusion
I hope you have fun setting up a voice agent you can actually speak with. I can’t wait to see what great things people build with this! Maybe one day we can forget about those “customer service” phone systems that never understand what you need. Maybe shouting “REPRESENTATIVE” into the phone will be a thing of the past!
I created this tutorial as part of a submission to the GKE Turns 10 Hackathon, where I created a Banking Phone Agent. I deployed this using Google Kubernetes Engine which.
This content originally appeared on DEV Community and was authored by Julian Hecker