Soniox
SDKsPythonText-to-Speech

Real-time speech generation with Python SDK

Convert text to speech with the Python SDK

Soniox Python SDK supports real-time Text-to-Speech generation with low latency streaming output. You send text chunks over WebSocket and receive audio chunks as they are generated.

Connect to a real-time session

from uuid import uuid4
from soniox import SonioxClient
from soniox.types import RealtimeTTSConfig

TEXT_CHUNKS = [
    "This is a test of Soniox real-time text to speech. ",
    "Audio is streamed back as it is generated. ",
    "Each chunk is sent as soon as it is ready.",
]

client = SonioxClient()
config = RealtimeTTSConfig(
    stream_id=f"sync-{uuid4()}",
    model="tts-rt-v1-preview",
    language="en",
    voice="Adrian",
    audio_format="wav",
)

audio_chunks: list[bytes] = []

with client.realtime.tts.connect(config=config) as session:
    session.send_text_chunks(TEXT_CHUNKS, text_end=True)
    for chunk in session.receive_audio_chunks():
        audio_chunks.append(chunk)

audio = b"".join(audio_chunks)
with open("tts_realtime_sync_output.wav", "wb") as f:
    f.write(audio)

print(f"Wrote {len(audio)} bytes")
print("Captured final message:", session.last_message)

For config options see: TTS WebSocket API and RealtimeTTSConfig reference.

Async real-time session

import asyncio
from collections.abc import AsyncIterator
from uuid import uuid4
from soniox import AsyncSonioxClient
from soniox.types import RealtimeTTSConfig

TEXT_CHUNKS = [
    "This is a test of Soniox real-time text to speech. ",
    "Audio is streamed back as it is generated. ",
    "Each chunk is sent as soon as it is ready.",
]

async def iter_text_chunks(chunks: list[str]) -> AsyncIterator[str]:
    for chunk in chunks:
        yield chunk

async def main() -> None:
    client = AsyncSonioxClient()
    config = RealtimeTTSConfig(
        stream_id=f"async-{uuid4()}",
        model="tts-rt-v1-preview",
        language="en",
        voice="Adrian",
        audio_format="wav",
    )

    audio_chunks: list[bytes] = []
    async with client.realtime.tts.connect(config=config) as session:
        await session.send_text_chunks(iter_text_chunks(TEXT_CHUNKS), text_end=True)
        async for chunk in session.receive_audio_chunks():
            audio_chunks.append(chunk)

    audio = b"".join(audio_chunks)
    with open("tts_realtime_async_output.wav", "wb") as f:
        f.write(audio)

    print(f"Wrote {len(audio)} bytes")
    print("Captured final message:", session.last_message)
    await client.aclose()

asyncio.run(main())

Send text incrementally

Use send_text_chunk when text arrives dynamically (for example from an LLM stream). Set text_end=True on the final chunk, or call finish().

with client.realtime.tts.connect(config=config) as session:
    session.send_text_chunk("Hello ", text_end=False)
    session.send_text_chunk("from Soniox ", text_end=False)
    session.send_text_chunk("real-time TTS.", text_end=True)

Equivalent explicit finalization:

with client.realtime.tts.connect(config=config) as session:
    session.send_text_chunk("Hello from Soniox real-time TTS.", text_end=False)
    session.finish()

Receive events vs audio chunks

receive_audio_chunks() yields decoded audio bytes directly and stops after finalization. Use receive_events() when you want access to raw event metadata like audio_end, terminated, and errors.

with client.realtime.tts.connect(config=config) as session:
    session.send_text_chunk("Hello!", text_end=True)

    for event in session.receive_events():
        if event.audio is not None:
            print("Audio chunk received")
        if event.audio_end:
            print("Server marked final audio payload")
        if event.terminated:
            print("Stream terminated")
            break

Multiple streams on one connection

A single WebSocket connection can carry up to 5 concurrent streams. Use connect_multi_stream() to open a multiplexed connection, then call open_stream() for each stream. Each stream has its own stream_id and operates independently — you can send text and receive audio on all streams in parallel.

Async multi-stream

import asyncio
from collections.abc import AsyncIterator
from uuid import uuid4
from soniox import AsyncSonioxClient
from soniox.types import RealtimeTTSConfig

STREAM_TEXTS = {
    "a": ["Hello from stream A. ", "Stream A shares a connection with B. ", "Goodbye from A."],
    "b": ["Hello from stream B. ", "Stream B shares a connection with A. ", "Goodbye from B."],
}

async def iter_text(chunks: list[str]) -> AsyncIterator[str]:
    for chunk in chunks:
        yield chunk

async def collect_audio(stream) -> bytes:
    chunks: list[bytes] = []
    async for chunk in stream.receive_audio_chunks():
        chunks.append(chunk)
    return b"".join(chunks)

async def main() -> None:
    client = AsyncSonioxClient()

    async with client.realtime.tts.connect_multi_stream() as connection:
        # Open two streams on the same WebSocket.
        streams = {}
        for key in STREAM_TEXTS:
            config = RealtimeTTSConfig(
                stream_id=f"multi-{key}-{uuid4()}",
                model="tts-rt-v1-preview",
                language="en",
                voice="Adrian",
                audio_format="wav",
            )
            streams[key] = await connection.open_stream(config=config)

        # Start receiving audio from each stream concurrently.
        receiver_tasks = {
            key: asyncio.create_task(collect_audio(stream))
            for key, stream in streams.items()
        }

        # Send text to each stream concurrently.
        sender_tasks = [
            asyncio.create_task(
                stream.send_text_chunks(iter_text(STREAM_TEXTS[key]), text_end=True)
            )
            for key, stream in streams.items()
        ]
        await asyncio.gather(*sender_tasks)

        # Wait for all audio to arrive.
        for key, task in receiver_tasks.items():
            audio = await task
            with open(f"tts_multi_{key}.wav", "wb") as f:
                f.write(audio)
            print(f"Stream {key}: wrote {len(audio)} bytes")

    await client.aclose()

asyncio.run(main())

Sync multi-stream

In synchronous code, use threads to send text and receive audio from each stream concurrently.

import threading
from uuid import uuid4
from soniox import SonioxClient
from soniox.types import RealtimeTTSConfig

STREAM_TEXTS = {
    "a": ["Hello from stream A. ", "Stream A shares a connection with B. ", "Goodbye from A."],
    "b": ["Hello from stream B. ", "Stream B shares a connection with A. ", "Goodbye from B."],
}

def collect_audio(stream, results: dict, key: str) -> None:
    results[key] = b"".join(stream.receive_audio_chunks())

client = SonioxClient()

with client.realtime.tts.connect_multi_stream() as connection:
    # Open two streams on the same WebSocket.
    streams = {}
    for key in STREAM_TEXTS:
        config = RealtimeTTSConfig(
            stream_id=f"multi-{key}-{uuid4()}",
            model="tts-rt-v1-preview",
            language="en",
            voice="Adrian",
            audio_format="wav",
        )
        streams[key] = connection.open_stream(config=config)

    audio_results: dict[str, bytes] = {}

    # Start receiving audio from each stream in background threads.
    receivers = []
    for key, stream in streams.items():
        t = threading.Thread(target=collect_audio, args=(stream, audio_results, key))
        t.start()
        receivers.append(t)

    # Send text to each stream (threads handle concurrent receiving).
    for key, stream in streams.items():
        stream.send_text_chunks(STREAM_TEXTS[key], text_end=True)

    # Wait for all audio to arrive.
    for t in receivers:
        t.join()

    for key, audio in sorted(audio_results.items()):
        with open(f"tts_multi_{key}.wav", "wb") as f:
            f.write(audio)
        print(f"Stream {key}: wrote {len(audio)} bytes")

client.close()

Error handling

A failed stream does not close the whole WebSocket connection by default. Stream-level errors finalize only that stream (terminated=True for the same stream_id), while other streams on the same connection can continue. Connection-level failures end the whole connection and all streams.

from soniox.errors import SonioxRealtimeError

try:
    with client.realtime.tts.connect(config=config) as session:
        session.send_text_chunk("Hello!", text_end=True)
        for _chunk in session.receive_audio_chunks():
            pass
except SonioxRealtimeError as exc:
    print("Realtime TTS error:", exc)