Skip to content

Streaming Speech

Real-time audio streaming for low-latency voice applications.

Overview

Streaming enables real-time audio processing:

Streaming Transcription

Python

python
import httpx

async def stream_transcription(audio_file):
    async with httpx.AsyncClient() as client:
        with open(audio_file, "rb") as f:
            async with client.stream(
                "POST",
                "https://api.gateflow.ai/v1/audio/transcriptions",
                headers={"Authorization": "Bearer gw_prod_..."},
                files={"file": f},
                data={"model": "whisper-1", "stream": "true"}
            ) as response:
                async for chunk in response.aiter_text():
                    data = json.loads(chunk)
                    print(f"Partial: {data['text']}")

Response Format

json
{"type": "partial", "text": "Hello, how"}
{"type": "partial", "text": "Hello, how can I"}
{"type": "partial", "text": "Hello, how can I help"}
{"type": "final", "text": "Hello, how can I help you today?"}

Streaming Speech Synthesis

Python

python
async def stream_speech(text):
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            "https://api.gateflow.ai/v1/audio/speech",
            headers={
                "Authorization": "Bearer gw_prod_...",
                "Content-Type": "application/json"
            },
            json={
                "model": "eleven_turbo_v2_5",
                "input": text,
                "voice": "friendly",
                "stream": True
            }
        ) as response:
            async for chunk in response.aiter_bytes():
                # Play audio chunk immediately
                play_audio_chunk(chunk)

Audio Format

Streamed audio is delivered as:

  • Format: MP3 or PCM (configurable)
  • Chunk size: ~100-500ms of audio
  • Sample rate: 24kHz (default)

Full Pipeline Streaming

Stream the entire voice pipeline:

python
async def voice_pipeline_stream(audio_input):
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            "https://api.gateflow.ai/v1/audio/pipelines",
            headers={
                "Authorization": "Bearer gw_prod_...",
                "Content-Type": "application/json"
            },
            json={
                "template": "voice-agent-fast",
                "audio": base64.b64encode(audio_input).decode(),
                "stream": True
            }
        ) as response:
            async for line in response.aiter_lines():
                event = json.loads(line)

                if event["type"] == "transcription.partial":
                    print(f"User: {event['text']}")

                elif event["type"] == "transcription.final":
                    print(f"User (final): {event['text']}")

                elif event["type"] == "llm.token":
                    print(event["token"], end="", flush=True)

                elif event["type"] == "audio.chunk":
                    audio_bytes = base64.b64decode(event["data"])
                    play_audio_chunk(audio_bytes)

                elif event["type"] == "done":
                    print("\n--- Complete ---")

Event Types

Event TypeDescription
transcription.partialPartial transcription text
transcription.finalFinal transcription
llm.tokenLLM response token
llm.doneLLM response complete
audio.chunkTTS audio chunk (base64)
donePipeline complete
errorError occurred

WebSocket Streaming

For bidirectional real-time audio:

python
import websockets
import asyncio

async def realtime_voice():
    uri = "wss://api.gateflow.ai/v1/audio/realtime"

    async with websockets.connect(
        uri,
        extra_headers={"Authorization": "Bearer gw_prod_..."}
    ) as ws:
        # Send configuration
        await ws.send(json.dumps({
            "type": "config",
            "template": "voice-agent-fast",
            "context": "You are a helpful assistant."
        }))

        # Start audio input task
        asyncio.create_task(send_audio(ws))

        # Receive responses
        async for message in ws:
            event = json.loads(message)

            if event["type"] == "audio":
                play_audio(base64.b64decode(event["data"]))
            elif event["type"] == "transcript":
                print(f"Assistant: {event['text']}")

async def send_audio(ws):
    # Stream microphone audio
    async for chunk in microphone_stream():
        await ws.send(json.dumps({
            "type": "audio",
            "data": base64.b64encode(chunk).decode()
        }))

Latency Optimization

Time to First Byte

ComponentTypical Latency
STT (streaming)200-500ms
LLM (first token)100-300ms
TTS (first chunk)100-200ms
Total400-1000ms

Optimization Tips

  1. Use streaming models:

    json
    {
      "stt": {"model": "voxtral-mini-latest", "streaming": true},
      "tts": {"model": "eleven_turbo_v2_5", "streaming": true}
    }
  2. Reduce LLM tokens:

    json
    {
      "llm": {"max_tokens": 100}
    }
  3. Use faster models:

    json
    {
      "llm": {"model": "gpt-5-mini"}
    }
  4. Enable voice activity detection:

    json
    {
      "vad": {"enabled": true, "silence_threshold_ms": 500}
    }

Error Handling

python
async for line in response.aiter_lines():
    event = json.loads(line)

    if event["type"] == "error":
        print(f"Error: {event['message']}")
        if event.get("recoverable"):
            # Retry the request
            continue
        else:
            # Fatal error, abort
            break

Next Steps

Built with reliability in mind.