Soniox

Build a voice agent with Pipecat and Soniox

Compose Soniox STT and TTS into a complete voice agent running with Pipecat framework, from a minimal chat bot to a structured appointment-booking flow.

Overview

The STT and TTS integration pages already cover how Soniox APIs run with Pipecat. This page combines them into a voice agent and grows the bot from a chat-only setup to a structured booking assistant.

The walkthrough builds a dentist receptionist in three stages:

  1. The pipeline shape, then the small additions that make it actually run as a chat bot.
  2. Adding tools the LLM can call to look up and book appointments.
  3. Structuring with Pipecat Flows, where the conversation follows a deterministic node graph.

Why Soniox for voice agents

Reasons that matter when shipping a real voice agent:

  • One API key for both ends. Soniox covers STT and TTS through the unified speech platform. No second vendor to integrate, monitor, or scale.
  • Real multilingual support. STT supports 60+ languages with automatic language identification and handles code-switched speech. TTS speaks 60+ languages.
  • Names, numbers, and IDs. STT recognizes names, phone numbers, emails, and alphanumerics accurately, and TTS pronounces them back the same way. General-purpose providers usually mangle one or the other.
  • Low STT latency. Soniox leads the Pipecat STT benchmark on time-to-final-transcript, so the LLM picks up the moment the user stops talking.
  • Production scaling with good pricing. Soniox supports high-concurrency real-time workloads and regional endpoints.

Setup

Install Pipecat with the extras for Soniox STT and TTS, OpenAI as the LLM, the development runner, and the transports the examples support (browser WebRTC and Daily):

pip install "pipecat-ai[soniox,openai,runner,webrtc,daily]"

For the Pipecat Flows stage at the end, also install:

pip install pipecat-ai-flows

Set your API keys. The same Soniox key works for STT and TTS. Create one in the Soniox Console:

export SONIOX_API_KEY=...
export OPENAI_API_KEY=...

Or put them in a .env next to your bot file. The examples below call load_dotenv() to pick that up automatically.

Running the bot

Each bot file uses Pipecat's development runner, which picks the transport at startup. The fastest local test is the prebuilt WebRTC UI:

python bot.py -t webrtc

Open http://localhost:7860, click connect, and start talking. The prebuilt UI hands the bot your microphone and plays its replies through your speakers. Metrics are enabled in every example, so the same page shows per-stage latency, token usage, and function calls live as the conversation runs.

The same files also support -t daily and -t twilio for cloud rooms and telephony. See the Pipecat development runner guide for the credentials and tunneling those transports need.

The pipeline shape

A voice agent is a Pipecat pipeline with five stages in order:

  • A transport captures audio from the user.
  • An STT service turns that audio into text.
  • An LLM produces a reply.
  • A TTS service turns the reply back into audio.
  • The transport again plays the audio back to the user.

The transport is whatever you use to move audio between the user and the bot. Pipecat ships transports for telephony providers (Twilio, Telnyx, and others), web and WebRTC stacks (Daily, LiveKit, browser WebRTC), and local audio for development. The rest of the pipeline does not change when you swap one for another.

import os

from pipecat.pipeline.pipeline import Pipeline
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.soniox.stt import (
    SonioxContextObject,
    SonioxContextGeneralItem,
    SonioxSTTService,
)
from pipecat.services.soniox.tts import SonioxTTSService
from pipecat.transcriptions.language import Language

# ... transport and runner setup omitted, see the full code below ...

stt = SonioxSTTService(
    api_key=os.environ["SONIOX_API_KEY"],
    vad_force_turn_endpoint=False,
    settings=SonioxSTTService.Settings(
        language_hints=[Language.EN],
        language_hints_strict=True,
        context=SonioxContextObject(
            general=[
                SonioxContextGeneralItem(key="domain", value="Dental practice"),
                SonioxContextGeneralItem(key="topic", value="Booking an appointment"),
            ],
            terms=["Bright Smile Dental", "checkup", "cavity", "crown", "X-ray"],
        ),
    ),
)
llm = OpenAILLMService(
    api_key=os.environ["OPENAI_API_KEY"],
    settings=OpenAILLMService.Settings(
        system_instruction=(
            "You are a friendly receptionist at Bright Smile Dental. "
            "Keep replies short and natural. They will be spoken aloud."
        ),
    ),
)
tts = SonioxTTSService(api_key=os.environ["SONIOX_API_KEY"])

pipeline = Pipeline([
    transport.input(),
    stt,
    llm,
    tts,
    transport.output(),
])

vad_force_turn_endpoint=False tells Pipecat to use Soniox's built-in endpoint detection instead of running a separate local VAD. See endpoint detection for details.

The context field tunes STT to your domain. List the brand names, jargon, and identifiers your users will say. See STT context for examples and more details.

Adding turn-taking and history

The pipeline above runs, but it has no memory between turns. LLMContext solves that: it is the conversation history, and every LLM request includes it. That is how the bot remembers names, resolves pronouns, and answers follow-ups.

Two aggregators keep the context current. The user aggregator buffers STT fragments and appends one {"role": "user", "content": "..."} message when the user's turn ends. The assistant aggregator buffers the LLM's streaming tokens and appends one {"role": "assistant", "content": "..."} message when the response completes. They come as a pair because they share the same context.

After a few turns, that context looks like this:

[
  { "role": "developer", "content": "You are a friendly receptionist..." },
  { "role": "user", "content": "Hi, I need to book a cleaning." },
  { "role": "assistant", "content": "Of course. May I have your name?" },
  { "role": "user", "content": "Jordan Smith." },
  { "role": "assistant", "content": "Thanks, Jordan. What day works for you?" }
]

Wiring this into the pipeline takes two things: a fresh LLMContext and the aggregator pair built from it, slotted in around the LLM. The full bot looks like this:

# ... imports from the previous section ...
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair

# ... stt, llm, tts from the previous section ...

context = LLMContext()
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)

pipeline = Pipeline([
    transport.input(),
    stt,
    user_aggregator,
    llm,
    tts,
    transport.output(),
    assistant_aggregator,
])

task = PipelineTask(pipeline)

To start the conversation before the user speaks, append a message to the context and queue an LLMRunFrame:

@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
    context.add_message({
        "role": "developer",
        "content": "Greet the caller and ask how you can help.",
    })
    await task.queue_frames([LLMRunFrame()])

That is a complete voice agent, chat-only for now. Tools come next.

import os

from dotenv import load_dotenv

from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.soniox.stt import (
    SonioxContextObject,
    SonioxContextGeneralItem,
    SonioxSTTService,
)
from pipecat.services.soniox.tts import SonioxTTSService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams

load_dotenv(override=True)


transport_params = {
    "daily": lambda: DailyParams(audio_in_enabled=True, audio_out_enabled=True),
    "twilio": lambda: FastAPIWebsocketParams(audio_in_enabled=True, audio_out_enabled=True),
    "webrtc": lambda: TransportParams(audio_in_enabled=True, audio_out_enabled=True),
}


async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
    stt = SonioxSTTService(
        api_key=os.environ["SONIOX_API_KEY"],
        vad_force_turn_endpoint=False,
        settings=SonioxSTTService.Settings(
            language_hints=[Language.EN],
            language_hints_strict=True,
            context=SonioxContextObject(
                general=[
                    SonioxContextGeneralItem(key="domain", value="Dental practice"),
                    SonioxContextGeneralItem(key="topic", value="Booking an appointment"),
                ],
                terms=["Bright Smile Dental", "checkup", "cavity", "crown", "X-ray"],
            ),
        ),
    )
    tts = SonioxTTSService(api_key=os.environ["SONIOX_API_KEY"])
    llm = OpenAILLMService(
        api_key=os.environ["OPENAI_API_KEY"],
        settings=OpenAILLMService.Settings(
            system_instruction=(
                "You are a friendly receptionist at Bright Smile Dental. "
                "Keep replies short and natural. They will be spoken aloud."
            ),
        ),
    )

    context = LLMContext()
    user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)

    pipeline = Pipeline(
        [
            transport.input(),
            stt,
            user_aggregator,
            llm,
            tts,
            transport.output(),
            assistant_aggregator,
        ]
    )

    task = PipelineTask(
        pipeline, params=PipelineParams(enable_metrics=True, enable_usage_metrics=True)
    )

    @transport.event_handler("on_client_connected")
    async def on_client_connected(transport, client):
        context.add_message(
            {
                "role": "developer",
                "content": "Greet the caller and ask how you can help.",
            }
        )
        await task.queue_frames([LLMRunFrame()])

    @transport.event_handler("on_client_disconnected")
    async def on_client_disconnected(transport, client):
        await task.cancel()

    runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
    await runner.run(task)


async def bot(runner_args: RunnerArguments):
    transport = await create_transport(runner_args, transport_params)
    await run_bot(transport, runner_args)


if __name__ == "__main__":
    from pipecat.runner.run import main

    main()

Adding tools

Function calling lets the LLM trigger Python code during a conversation. Each tool has two parts:

  • A handler is the Python function that runs when the tool is called. It receives the LLM's parsed arguments and returns a result.
  • A schema is the declarative description the LLM sees: the tool's name, what it does, and what arguments it expects. The LLM uses the schema to decide when to call a tool and how to fill in its arguments.

Pipecat's FunctionSchema is what you build for the schema. A single FunctionSchema is translated into the right wire format for whichever LLM provider you use, so the same tool definition works across OpenAI, Anthropic, Google, and others.

The dentist needs two tools to start: one to look up open slots, one to book an appointment.

# ... imports from the previous section ...
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import TTSSpeakFrame
from pipecat.services.llm_service import FunctionCallParams

# ... stt, tts, llm, aggregators, pipeline from the previous section ...


# Replace this with your real calendar or booking system.
class DentalCalendar:
    booked: set[str] = {"2026-05-12T10:30"}

    async def find_slots(self, date: str) -> list[str]:
        all_slots = [f"{date}T{t}" for t in ("09:00", "10:30", "14:00", "15:30")]
        return [s for s in all_slots if s not in self.booked]

    async def book(self, name: str, slot: str, reason: str) -> str:  # name and reason unused in mock
        self.booked.add(slot)
        return f"DENT-{abs(hash(slot)) % 10000:04d}"


calendar = DentalCalendar()


# Tool handlers can do anything Python can: API calls, database queries,
# RAG lookups, file I/O. Here they hit the mock calendar above.
async def check_availability(params: FunctionCallParams):
    slots = await calendar.find_slots(params.arguments["date"])
    await params.result_callback({"slots": slots})


async def book_appointment(params: FunctionCallParams):
    cid = await calendar.book(
        name=params.arguments["name"],
        slot=params.arguments["slot"],
        reason=params.arguments["reason"],
    )
    await params.result_callback({"confirmation_id": cid, "status": "booked"})


# Register handlers against their tool names.
llm.register_function("check_availability", check_availability)
llm.register_function("book_appointment", book_appointment)

# Describe the tools so the LLM knows when and how to call them.
tools = ToolsSchema(standard_tools=[
    FunctionSchema(
        name="check_availability",
        description=(
            "Look up open appointment slots. Call this whenever the patient "
            "asks about availability or mentions a preferred date."
        ),
        properties={
            "date": {"type": "string", "description": "ISO date, e.g. 2026-05-12"},
        },
        required=["date"],
    ),
    FunctionSchema(
        name="book_appointment",
        description=(
            "Book a confirmed appointment. Only call after the patient has "
            "explicitly confirmed the date, time, and reason."
        ),
        properties={
            "name": {"type": "string"},
            "slot": {"type": "string", "description": "ISO datetime, e.g. 2026-05-12T10:30"},
            "reason": {"type": "string"},
        },
        required=["name", "slot", "reason"],
    ),
])

# Construct the LLM context with tools (replaces the bare LLMContext
# from the previous section).
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)

How invocation works

The LLM decides when to call a tool, based on the conversation and the tool's description. There is no fixed user phrase that maps to a tool. A user asking is there anything Tuesday? causes the LLM to call check_availability with date="2026-05-12", the same as one asking what slots are open this week?.

If a required parameter is missing, the LLM should ask the user a clarifying question rather than fill a placeholder, and call the tool once the answer arrives. That is why required arguments are useful: they push the LLM to gather data conversationally before triggering the side effect.

Useful patterns

Audible filler keeps the call from going silent during a tool's network round trip. The calls argument is a list of FunctionCallFromLLM entries with function_name, tool_call_id, and arguments, so you can pick the filler per tool:

@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, calls):
    for call in calls:
        if call.function_name == "check_availability":
            await tts.queue_frame(TTSSpeakFrame("Let me check the schedule."))
        elif call.function_name == "book_appointment":
            await tts.queue_frame(TTSSpeakFrame("Booking that for you now."))

When prompt rules stop being enough

This is enough for a small bot, but two real problems show up as the dentist flow grows:

  • Accidental writes. Tools that change real state are easy to fire by accident. A system-prompt rule like always read back the date and time before calling book_appointment helps, but the LLM follows prompt rules unevenly. One bad turn and the booking goes through unconfirmed.
  • More rules to enforce. Requirements like collect insurance before booking or urgent cases skip slot proposal pile into the system prompt as natural-language instructions. The longer that prompt, the more often the LLM ignores parts of it.

Pipecat Flows fixes both structurally. Each tool is scoped to a specific node, so book_appointment cannot be called until the conversation reaches the confirm node. Step ordering and branching live in Python instead of the prompt, so the LLM cannot skip them.

import os

from dotenv import load_dotenv

from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.soniox.stt import (
    SonioxContextObject,
    SonioxContextGeneralItem,
    SonioxSTTService,
)
from pipecat.services.soniox.tts import SonioxTTSService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams

load_dotenv(override=True)


# Replace this with your real calendar or booking system.
class DentalCalendar:
    booked: set[str] = {"2026-05-12T10:30"}

    async def find_slots(self, date: str) -> list[str]:
        all_slots = [f"{date}T{t}" for t in ("09:00", "10:30", "14:00", "15:30")]
        return [s for s in all_slots if s not in self.booked]

    async def book(self, name: str, slot: str, reason: str) -> str:  # name and reason unused in mock
        self.booked.add(slot)
        return f"DENT-{abs(hash(slot)) % 10000:04d}"


calendar = DentalCalendar()


# Tool handlers can do anything Python can: API calls, database queries,
# RAG lookups, file I/O. Here they hit the mock calendar above.
async def check_availability(params: FunctionCallParams):
    slots = await calendar.find_slots(params.arguments["date"])
    await params.result_callback({"slots": slots})


async def book_appointment(params: FunctionCallParams):
    cid = await calendar.book(
        name=params.arguments["name"],
        slot=params.arguments["slot"],
        reason=params.arguments["reason"],
    )
    await params.result_callback({"confirmation_id": cid, "status": "booked"})


tools = ToolsSchema(
    standard_tools=[
        FunctionSchema(
            name="check_availability",
            description=(
                "Look up open appointment slots. Call this whenever the patient "
                "asks about availability or mentions a preferred date."
            ),
            properties={
                "date": {"type": "string", "description": "ISO date, e.g. 2026-05-12"},
            },
            required=["date"],
        ),
        FunctionSchema(
            name="book_appointment",
            description=(
                "Book a confirmed appointment. Only call after the patient has "
                "explicitly confirmed the date, time, and reason."
            ),
            properties={
                "name": {"type": "string"},
                "slot": {"type": "string", "description": "ISO datetime, e.g. 2026-05-12T10:30"},
                "reason": {"type": "string"},
            },
            required=["name", "slot", "reason"],
        ),
    ]
)


transport_params = {
    "daily": lambda: DailyParams(audio_in_enabled=True, audio_out_enabled=True),
    "twilio": lambda: FastAPIWebsocketParams(audio_in_enabled=True, audio_out_enabled=True),
    "webrtc": lambda: TransportParams(audio_in_enabled=True, audio_out_enabled=True),
}


async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
    stt = SonioxSTTService(
        api_key=os.environ["SONIOX_API_KEY"],
        vad_force_turn_endpoint=False,
        settings=SonioxSTTService.Settings(
            language_hints=[Language.EN],
            language_hints_strict=True,
            context=SonioxContextObject(
                general=[
                    SonioxContextGeneralItem(key="domain", value="Dental practice"),
                    SonioxContextGeneralItem(key="topic", value="Booking an appointment"),
                ],
                terms=["Bright Smile Dental", "checkup", "cavity", "crown", "X-ray"],
            ),
        ),
    )
    tts = SonioxTTSService(api_key=os.environ["SONIOX_API_KEY"])
    llm = OpenAILLMService(
        api_key=os.environ["OPENAI_API_KEY"],
        settings=OpenAILLMService.Settings(
            system_instruction=(
                "You are a friendly receptionist at Bright Smile Dental. "
                "Keep replies short and natural. They will be spoken aloud."
            ),
        ),
    )

    llm.register_function("check_availability", check_availability)
    llm.register_function("book_appointment", book_appointment)

    @llm.event_handler("on_function_calls_started")
    async def on_function_calls_started(service, calls):
        for call in calls:
            if call.function_name == "check_availability":
                await tts.queue_frame(TTSSpeakFrame("Let me check the schedule."))
            elif call.function_name == "book_appointment":
                await tts.queue_frame(TTSSpeakFrame("Booking that for you now."))

    context = LLMContext(tools=tools)
    user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)

    pipeline = Pipeline(
        [
            transport.input(),
            stt,
            user_aggregator,
            llm,
            tts,
            transport.output(),
            assistant_aggregator,
        ]
    )

    task = PipelineTask(
        pipeline, params=PipelineParams(enable_metrics=True, enable_usage_metrics=True)
    )

    @transport.event_handler("on_client_connected")
    async def on_client_connected(transport, client):
        context.add_message(
            {
                "role": "developer",
                "content": "Greet the caller and ask how you can help.",
            }
        )
        await task.queue_frames([LLMRunFrame()])

    @transport.event_handler("on_client_disconnected")
    async def on_client_disconnected(transport, client):
        await task.cancel()

    runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
    await runner.run(task)


async def bot(runner_args: RunnerArguments):
    transport = await create_transport(runner_args, transport_params)
    await run_bot(transport, runner_args)


if __name__ == "__main__":
    from pipecat.runner.run import main

    main()

Structuring with Pipecat Flows

Pipecat Flows is a separate package that models a conversation as a graph of nodes. Each node has its own focused prompt and its own subset of tools. Each handler decides which node comes next. The LLM still phrases everything, but it cannot skip steps.

pip install pipecat-ai-flows

The mental model

A flow is a graph of nodes. At any moment, the bot is in exactly one node, and that node defines two things: what the LLM should be doing (its prompt) and which tools it is allowed to call. The LLM never sees tools from other nodes, so it cannot accidentally jump ahead and book an appointment while it is still collecting the patient's name.

Transitions between nodes happen in code, not in the prompt. Each tool is wired to a Python handler, and that handler returns the next node along with the tool's result. The LLM phrases the conversation, but the handler decides where the conversation goes next. Branching becomes an if statement: send urgent cases to triage and everyone else to slot proposal.

Flows does not replace the pipeline. It is a layer on top of the LLM service that swaps the prompt and the tool list whenever the conversation moves to a new node. A FlowManager object hooks into the LLM and the context aggregators, and you start the conversation by handing it the initial node.

Handlers and schemas

Each tool still needs a handler and a schema, the same as in the previous stage. Two things change:

  • Handlers in Flows return tuple[result, next_node] instead of calling a result_callback. The first element is what the LLM sees; the second element is the node the conversation moves to next.
  • Schemas use FlowsFunctionSchema instead of FunctionSchema. The shape is identical (name, description, properties, required), with one extra field, handler, that ties the schema to its Python function.

Branching happens inside the handler. Pick the next node with normal Python, based on the LLM's arguments or external state:

# Branch on LLM args.
async def collect_reason(args: FlowArgs):
    urgent = args.get("urgent", False)
    next_node = create_triage_node() if urgent else create_propose_slots_node()
    return {"reason": args["reason"], "urgent": urgent}, next_node


# Branch on external state.
async def propose_slots(args: FlowArgs):
    slots = await calendar.find_slots(args["preferred_date"])
    if slots:
        return {"slots": slots}, create_confirm_node(slots)
    return {"slots": []}, create_no_availability_node(args["preferred_date"])

Cycles work the same way. A handler that returns a node the conversation has already visited just sends the user back to that step.

A schema looks the same as a regular FunctionSchema plus a handler reference:

reason_schema = FlowsFunctionSchema(
    name="collect_reason",
    description="Record the reason for the visit.",
    properties={
        "reason": {"type": "string", "description": "e.g. cleaning, toothache, checkup"},
        "urgent": {"type": "boolean", "description": "True if pain, swelling, or trauma."},
    },
    required=["reason"],
    handler=collect_reason,
)

The dentist flow has five handlers and five schemas in total. The full set is in the bot file at the end of this section.

Nodes

Each node-creator function returns a NodeConfig for one step. Parameters such as name or slots flow from one step to the next through these factories. The initial node is plain:

def create_initial_node() -> NodeConfig:
    return NodeConfig(
        name="greet",
        role_message=(
            "You are a friendly receptionist at Bright Smile Dental. "
            "Keep replies short and natural. They will be spoken aloud."
        ),
        task_messages=[{
            "role": "developer",
            "content": "Greet the caller and ask for their full name.",
        }],
        functions=[name_schema],
    )

A later node accepts data from the previous step and bakes it into its prompt:

def create_reason_node(name: str) -> NodeConfig:
    return NodeConfig(
        name="reason",
        task_messages=[{
            "role": "developer",
            "content": (
                f"You now have the patient's name: {name}. Ask what brings "
                "them in. Set urgent=True if they mention pain, swelling, or "
                "an injury."
            ),
        }],
        functions=[reason_schema],
    )

The dentist flow has seven nodes (greet, reason, triage, propose_slots, confirm, no_availability, end). Each one follows the same shape. The full set is in the bot file below.

Wiring it into the pipeline

The pipeline is the same as in stage 1. The two new pieces are a FlowManager built on top of the aggregator pair, and an on_client_connected that hands it the initial node:

flow_manager = FlowManager(
    task=task,
    llm=llm,
    context_aggregator=context_aggregator,
    transport=transport,
)

@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
    await flow_manager.initialize(create_initial_node())

There are no register_function calls and no tools argument on the context. Flows manages the tool surface per node, so the LLM only sees book_appointment once the conversation reaches the confirm node.

What this buys you

  • Tool isolation. book_appointment is not in the LLM's tool list during the greet node, so it cannot be invoked there. With a single prompt, that constraint depends on the LLM choosing to follow instructions.
  • Deterministic transitions. Step ordering is a Python expression in a handler, not a sentence in a prompt. The LLM still phrases the conversation, but it cannot skip steps.
  • Branchable logic. The urgent field routes to triage instead of propose_slots from the same handler. Adding a new branch is one extra if and one extra node.
  • Cycles. no_availability returns the patient to propose_slots until they pick an open day or end the call. Cycles are common in voice flows.

import os
from datetime import date
from typing import TypedDict

from dotenv import load_dotenv
from pipecat_flows import FlowArgs, FlowManager, FlowsFunctionSchema, NodeConfig

from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.soniox.stt import (
    SonioxContextObject,
    SonioxContextGeneralItem,
    SonioxSTTService,
)
from pipecat.services.soniox.tts import SonioxTTSService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams

load_dotenv(override=True)


# Replace this with your real calendar or booking system.
class DentalCalendar:
    booked: set[str] = {"2026-05-12T10:30"}

    async def find_slots(self, date: str) -> list[str]:
        all_slots = [f"{date}T{t}" for t in ("09:00", "10:30", "14:00", "15:30")]
        return [s for s in all_slots if s not in self.booked]

    async def book(self, name: str, slot: str, reason: str) -> str:  # name and reason unused in mock
        self.booked.add(slot)
        return f"DENT-{abs(hash(slot)) % 10000:04d}"


calendar = DentalCalendar()


class NameResult(TypedDict):
    name: str


class ReasonResult(TypedDict):
    reason: str
    urgent: bool


class SlotsResult(TypedDict):
    slots: list[str]


class BookingResult(TypedDict):
    confirmation_id: str


# Handlers: each returns (result, next_node).
async def collect_name(args: FlowArgs) -> tuple[NameResult, NodeConfig]:
    return {"name": args["name"]}, create_reason_node(args["name"])


async def collect_reason(args: FlowArgs) -> tuple[ReasonResult, NodeConfig]:
    urgent = args.get("urgent", False)
    next_node = create_triage_node() if urgent else create_propose_slots_node()
    return {"reason": args["reason"], "urgent": urgent}, next_node


async def propose_slots(args: FlowArgs) -> tuple[SlotsResult, NodeConfig]:
    slots = await calendar.find_slots(args["preferred_date"])
    if slots:
        return {"slots": slots}, create_confirm_node(slots)
    return {"slots": []}, create_no_availability_node(args["preferred_date"])


async def book_appointment(args: FlowArgs) -> tuple[BookingResult, NodeConfig]:
    cid = await calendar.book(args["name"], args["slot"], args["reason"])
    return {"confirmation_id": cid}, create_end_node(cid)


async def end_conversation(args: FlowArgs) -> tuple[None, NodeConfig]:
    return None, create_end_node(None)


# Schemas: a handler reference plus the same shape as FunctionSchema.
name_schema = FlowsFunctionSchema(
    name="collect_name",
    description="Record the patient's full name once provided.",
    properties={"name": {"type": "string"}},
    required=["name"],
    handler=collect_name,
)

reason_schema = FlowsFunctionSchema(
    name="collect_reason",
    description="Record the reason for the visit.",
    properties={
        "reason": {"type": "string", "description": "e.g. cleaning, toothache, checkup"},
        "urgent": {"type": "boolean", "description": "True if pain, swelling, or trauma."},
    },
    required=["reason"],
    handler=collect_reason,
)

propose_schema = FlowsFunctionSchema(
    name="propose_slots",
    description="Look up open appointment slots for the patient's preferred date.",
    properties={"preferred_date": {"type": "string", "description": "ISO date"}},
    required=["preferred_date"],
    handler=propose_slots,
)

book_schema = FlowsFunctionSchema(
    name="book_appointment",
    description="Book the confirmed appointment. Only after the patient has explicitly confirmed the slot.",
    properties={
        "name": {"type": "string"},
        "slot": {"type": "string"},
        "reason": {"type": "string"},
    },
    required=["name", "slot", "reason"],
    handler=book_appointment,
)

end_schema = FlowsFunctionSchema(
    name="end_conversation",
    description="End the call once the patient is satisfied.",
    properties={},
    required=[],
    handler=end_conversation,
)


# Nodes: one factory per step.
def create_initial_node() -> NodeConfig:
    return NodeConfig(
        name="greet",
        role_message=(
            "You are a friendly receptionist at Bright Smile Dental. "
            "Keep replies short and natural. They will be spoken aloud."
        ),
        task_messages=[
            {
                "role": "developer",
                "content": "Greet the caller and ask for their full name.",
            }
        ],
        functions=[name_schema],
    )


def create_reason_node(name: str) -> NodeConfig:
    return NodeConfig(
        name="reason",
        task_messages=[
            {
                "role": "developer",
                "content": (
                    f"You now have the patient's name: {name}. Ask what brings "
                    "them in. Set urgent=True if they mention pain, swelling, or "
                    "an injury."
                ),
            }
        ],
        functions=[reason_schema],
    )


def create_triage_node() -> NodeConfig:
    today = date.today().isoformat()
    return NodeConfig(
        name="triage",
        task_messages=[
            {
                "role": "developer",
                "content": (
                    "This is urgent. Tell the patient you are flagging it as "
                    "urgent and will offer the earliest available slot. Then "
                    f"call propose_slots with preferred_date={today}."
                ),
            }
        ],
        functions=[propose_schema],
    )


def create_propose_slots_node() -> NodeConfig:
    return NodeConfig(
        name="propose_slots",
        task_messages=[
            {
                "role": "developer",
                "content": "Ask what date works best, then call propose_slots.",
            }
        ],
        functions=[propose_schema],
    )


def create_confirm_node(slots: list[str]) -> NodeConfig:
    slot_list = ", ".join(slots)
    return NodeConfig(
        name="confirm",
        task_messages=[
            {
                "role": "developer",
                "content": (
                    f"Available slots: {slot_list}. Read them aloud, ask the "
                    "patient to pick one, and read back the date and time before "
                    "calling book_appointment."
                ),
            }
        ],
        functions=[book_schema],
    )


def create_no_availability_node(date: str) -> NodeConfig:
    return NodeConfig(
        name="no_availability",
        task_messages=[
            {
                "role": "developer",
                "content": (f"No slots are open on {date}. Apologize and ask for another date."),
            }
        ],
        functions=[propose_schema, end_schema],
    )


def create_end_node(confirmation_id: str | None) -> NodeConfig:
    msg = (
        f"Confirm booking ID {confirmation_id} and thank them."
        if confirmation_id
        else "Thank them politely and end the call."
    )
    return NodeConfig(
        name="end",
        task_messages=[{"role": "developer", "content": msg}],
        post_actions=[{"type": "end_conversation"}],
    )


transport_params = {
    "daily": lambda: DailyParams(audio_in_enabled=True, audio_out_enabled=True),
    "twilio": lambda: FastAPIWebsocketParams(audio_in_enabled=True, audio_out_enabled=True),
    "webrtc": lambda: TransportParams(audio_in_enabled=True, audio_out_enabled=True),
}


async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
    stt = SonioxSTTService(
        api_key=os.environ["SONIOX_API_KEY"],
        vad_force_turn_endpoint=False,
        settings=SonioxSTTService.Settings(
            language_hints=[Language.EN],
            language_hints_strict=True,
            context=SonioxContextObject(
                general=[
                    SonioxContextGeneralItem(key="domain", value="Dental practice"),
                    SonioxContextGeneralItem(key="topic", value="Booking an appointment"),
                ],
                terms=["Bright Smile Dental", "checkup", "cavity", "crown", "X-ray"],
            ),
        ),
    )
    tts = SonioxTTSService(api_key=os.environ["SONIOX_API_KEY"])
    llm = OpenAILLMService(api_key=os.environ["OPENAI_API_KEY"])

    context_aggregator = LLMContextAggregatorPair(LLMContext())

    pipeline = Pipeline(
        [
            transport.input(),
            stt,
            context_aggregator.user(),
            llm,
            tts,
            transport.output(),
            context_aggregator.assistant(),
        ]
    )

    task = PipelineTask(
        pipeline, params=PipelineParams(enable_metrics=True, enable_usage_metrics=True)
    )

    flow_manager = FlowManager(
        task=task,
        llm=llm,
        context_aggregator=context_aggregator,
        transport=transport,
    )

    @transport.event_handler("on_client_connected")
    async def on_client_connected(transport, client):
        await flow_manager.initialize(create_initial_node())

    @transport.event_handler("on_client_disconnected")
    async def on_client_disconnected(transport, client):
        await task.cancel()

    runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
    await runner.run(task)


async def bot(runner_args: RunnerArguments):
    transport = await create_transport(runner_args, transport_params)
    await run_bot(transport, runner_args)


if __name__ == "__main__":
    from pipecat.runner.run import main

    main()