Skip to content

Transport Configuration

Configure how agents connect to the GateFlow MCP server.

Transport Options

TransportProtocolBest For
HTTP/SSEHTTPSCloud agents, serverless
stdiostdin/stdoutLocal agents, CLI tools
WebSocketWSSReal-time, bidirectional

HTTP/SSE Transport

The default transport for cloud-based agents.

Configuration

python
from gateflow_mcp import MCPClient

client = MCPClient(
    agent_id="agent_abc123",
    api_key="gf-agent-xyz789...",
    transport="http",
    options={
        "base_url": "https://mcp.gateflow.ai",
        "timeout": 30,
        "retry_attempts": 3
    }
)

HTTP Options

OptionTypeDefaultDescription
base_urlstringhttps://mcp.gateflow.aiServer URL
timeoutnumber30Request timeout (seconds)
retry_attemptsnumber3Max retries
verify_sslbooleantrueVerify SSL certificates
proxystringnullHTTP proxy URL

Headers

python
client = MCPClient(
    agent_id="agent_abc123",
    api_key="gf-agent-xyz789...",
    options={
        "headers": {
            "X-Request-ID": "custom-id-123",
            "X-Tenant-ID": "tenant-456"
        }
    }
)

Streaming Responses

python
async for event in client.stream_tool(
    name="llm/chat",
    arguments={
        "messages": [{"role": "user", "content": "Hello"}],
        "stream": True
    }
):
    if event["type"] == "content":
        print(event["data"], end="", flush=True)
    elif event["type"] == "done":
        break

stdio Transport

For local development and CLI integrations.

Installation

bash
npm install -g @gateflow/mcp-server

Running the Server

bash
gateflow-mcp-server \
  --agent-id agent_abc123 \
  --api-key gf-agent-xyz789...

Environment Variables

bash
export GATEFLOW_AGENT_ID="agent_abc123"
export GATEFLOW_API_KEY="gf-agent-xyz789..."

gateflow-mcp-server

Python Integration

python
import subprocess
import json

class StdioMCPClient:
    def __init__(self, agent_id: str, api_key: str):
        self.process = subprocess.Popen(
            [
                "gateflow-mcp-server",
                "--agent-id", agent_id,
                "--api-key", api_key
            ],
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            text=True
        )

    def call_tool(self, name: str, arguments: dict):
        request = {
            "jsonrpc": "2.0",
            "method": "tools/call",
            "params": {
                "name": name,
                "arguments": arguments
            },
            "id": 1
        }

        self.process.stdin.write(json.dumps(request) + "\n")
        self.process.stdin.flush()

        response = json.loads(self.process.stdout.readline())
        return response.get("result")

    def close(self):
        self.process.terminate()

Claude Desktop Integration

Add to ~/.claude/config.json:

json
{
  "mcpServers": {
    "gateflow": {
      "command": "npx",
      "args": ["@gateflow/mcp-server"],
      "env": {
        "GATEFLOW_AGENT_ID": "agent_abc123",
        "GATEFLOW_API_KEY": "gf-agent-xyz789..."
      }
    }
  }
}

WebSocket Transport

For real-time, bidirectional communication.

Configuration

python
from gateflow_mcp import MCPClient

client = MCPClient(
    agent_id="agent_abc123",
    api_key="gf-agent-xyz789...",
    transport="websocket",
    options={
        "url": "wss://mcp.gateflow.ai/ws",
        "ping_interval": 30,
        "reconnect": True,
        "max_reconnect_attempts": 5
    }
)

WebSocket Options

OptionTypeDefaultDescription
urlstringwss://mcp.gateflow.ai/wsWebSocket URL
ping_intervalnumber30Ping interval (seconds)
reconnectbooleantrueAuto-reconnect
max_reconnect_attemptsnumber5Max reconnect tries

Event Handling

python
async def handle_events():
    async with client.connect() as ws:
        # Subscribe to events
        await ws.subscribe(["session.cost_warning", "agent.suspended"])

        async for event in ws.events():
            if event["type"] == "session.cost_warning":
                print(f"Cost warning: {event['data']['current']} / {event['data']['limit']}")
            elif event["type"] == "agent.suspended":
                print("Agent suspended!")
                break

Connection Pooling

HTTP Connection Pool

python
client = MCPClient(
    agent_id="agent_abc123",
    api_key="gf-agent-xyz789...",
    options={
        "pool_connections": 10,
        "pool_maxsize": 20,
        "max_retries": 3
    }
)

Async Connection Pool

python
from gateflow_mcp import AsyncMCPClient

async with AsyncMCPClient(
    agent_id="agent_abc123",
    api_key="gf-agent-xyz789...",
    options={
        "max_connections": 100,
        "keepalive_timeout": 30
    }
) as client:
    results = await asyncio.gather(
        client.call_tool("llm/chat", args1),
        client.call_tool("llm/chat", args2),
        client.call_tool("llm/chat", args3)
    )

Proxy Configuration

HTTP Proxy

python
client = MCPClient(
    agent_id="agent_abc123",
    api_key="gf-agent-xyz789...",
    options={
        "proxy": "http://proxy.company.com:8080"
    }
)

Environment-Based Proxy

bash
export HTTP_PROXY="http://proxy.company.com:8080"
export HTTPS_PROXY="http://proxy.company.com:8080"

TLS Configuration

Custom CA Certificate

python
client = MCPClient(
    agent_id="agent_abc123",
    api_key="gf-agent-xyz789...",
    options={
        "ca_cert": "/path/to/ca-bundle.crt"
    }
)

Client Certificate Authentication

python
client = MCPClient(
    agent_id="agent_abc123",
    api_key="gf-agent-xyz789...",
    options={
        "client_cert": "/path/to/client.crt",
        "client_key": "/path/to/client.key"
    }
)

Timeout Configuration

Per-Request Timeout

python
result = client.call_tool(
    name="voice/transcribe",
    arguments={"audio": large_audio_b64},
    timeout=120  # Override default timeout
)

Timeout Strategy

python
client = MCPClient(
    agent_id="agent_abc123",
    api_key="gf-agent-xyz789...",
    options={
        "connect_timeout": 5,    # Connection timeout
        "read_timeout": 30,      # Read timeout
        "write_timeout": 30      # Write timeout
    }
)

Health Checks

Check Connection

python
def check_health(client):
    try:
        result = client.call_tool("self_inspect/whoami", {})
        return {"status": "healthy", "agent_id": result["agent_id"]}
    except Exception as e:
        return {"status": "unhealthy", "error": str(e)}

Periodic Health Check

python
import asyncio

async def health_check_loop(client, interval=60):
    while True:
        health = check_health(client)
        if health["status"] == "unhealthy":
            logger.warning(f"MCP connection unhealthy: {health['error']}")
        await asyncio.sleep(interval)

Best Practices

  1. Use connection pooling - Reuse connections for performance
  2. Set appropriate timeouts - Match to expected operation duration
  3. Enable reconnection - For long-running agents
  4. Use health checks - Monitor connection status
  5. Configure retries - Handle transient failures
  6. Secure credentials - Use environment variables

Transport Comparison

FeatureHTTP/SSEstdioWebSocket
Cloud ready
Local dev
Bidirectional
Streaming
Firewall friendlyN/A
Low latencyMediumHighHigh

Next Steps

Built with reliability in mind.