Transport-level routing for MCP/ACP protocols
Integration Patterns
Complete guide to integrating stdio Bus workers into your applications, including Node.js, Unix sockets, and IDE integration examples.
Overview
stdio Bus provides multiple integration patterns for different use cases. This guide covers common integration scenarios with complete code examples.
Node.js Integration
TCP Connection
Connect to stdio Bus using TCP sockets from Node.js applications.
Basic TCP client:
import net from 'net';import readline from 'readline';class StdioBusClient {constructor(host = 'localhost', port = 9000) {this.host = host;this.port = port;this.socket = null;this.requestId = 0;this.pendingRequests = new Map();}connect() {return new Promise((resolve, reject) => {this.socket = net.createConnection(this.port, this.host, () => {console.log('Connected to stdio Bus');this.setupReadline();resolve();});this.socket.on('error', (err) => {console.error('Connection error:', err);reject(err);});this.socket.on('close', () => {console.log('Connection closed');});});}setupReadline() {const rl = readline.createInterface({input: this.socket,crlfDelay: Infinity});rl.on('line', (line) => {try {const response = JSON.parse(line);this.handleResponse(response);} catch (err) {console.error('Parse error:', err);}});}handleResponse(response) {if (response.id && this.pendingRequests.has(response.id)) {const { resolve, reject } = this.pendingRequests.get(response.id);this.pendingRequests.delete(response.id);if (response.error) {reject(new Error(response.error.message));} else {resolve(response.result);}}}sendRequest(method, params = {}, sessionId = null) {return new Promise((resolve, reject) => {const id = String(++this.requestId);const request = {jsonrpc: '2.0',id,method,params};if (sessionId) {request.sessionId = sessionId;}this.pendingRequests.set(id, { resolve, reject });this.socket.write(JSON.stringify(request) + '\n');});}disconnect() {if (this.socket) {this.socket.end();}}}// Usage exampleasync function main() {const client = new StdioBusClient('localhost', 9000);try {await client.connect();// Send initialize requestconst initResult = await client.sendRequest('initialize', {clientInfo: {name: 'my-app',version: '1.0.0'}});console.log('Initialize result:', initResult);// Send test requestconst testResult = await client.sendRequest('test', {foo: 'bar'});console.log('Test result:', testResult);} catch (err) {console.error('Error:', err);} finally {client.disconnect();}}main();
Unix Socket Connection
Connect using Unix domain sockets for better performance on the same machine.
Unix socket client:
import net from 'net';import readline from 'readline';class UnixSocketClient {constructor(socketPath = '/tmp/stdiobus.sock') {this.socketPath = socketPath;this.socket = null;this.requestId = 0;this.pendingRequests = new Map();}connect() {return new Promise((resolve, reject) => {this.socket = net.createConnection(this.socketPath, () => {console.log('Connected to stdio Bus via Unix socket');this.setupReadline();resolve();});this.socket.on('error', (err) => {console.error('Connection error:', err);reject(err);});});}setupReadline() {const rl = readline.createInterface({input: this.socket,crlfDelay: Infinity});rl.on('line', (line) => {try {const response = JSON.parse(line);this.handleResponse(response);} catch (err) {console.error('Parse error:', err);}});}handleResponse(response) {if (response.id && this.pendingRequests.has(response.id)) {const { resolve, reject } = this.pendingRequests.get(response.id);this.pendingRequests.delete(response.id);if (response.error) {reject(new Error(response.error.message));} else {resolve(response.result);}}}sendRequest(method, params = {}, sessionId = null) {return new Promise((resolve, reject) => {const id = String(++this.requestId);const request = {jsonrpc: '2.0',id,method,params};if (sessionId) {request.sessionId = sessionId;}this.pendingRequests.set(id, { resolve, reject });this.socket.write(JSON.stringify(request) + '\n');});}disconnect() {if (this.socket) {this.socket.end();}}}// Usage exampleasync function main() {const client = new UnixSocketClient('/tmp/stdiobus.sock');try {await client.connect();const result = await client.sendRequest('test', {message: 'Hello from Unix socket'});console.log('Result:', result);} catch (err) {console.error('Error:', err);} finally {client.disconnect();}}main();
Session Management
Maintain session state across multiple requests using sessionId.
Session-aware client:
class SessionClient {constructor(host = 'localhost', port = 9000) {this.host = host;this.port = port;this.socket = null;this.requestId = 0;this.pendingRequests = new Map();this.sessionId = null;}async connect() {return new Promise((resolve, reject) => {this.socket = net.createConnection(this.port, this.host, () => {this.setupReadline();resolve();});this.socket.on('error', reject);});}setupReadline() {const rl = readline.createInterface({input: this.socket,crlfDelay: Infinity});rl.on('line', (line) => {try {const response = JSON.parse(line);// Extract sessionId from responseif (response.sessionId && !this.sessionId) {this.sessionId = response.sessionId;console.log('Session established:', this.sessionId);}this.handleResponse(response);} catch (err) {console.error('Parse error:', err);}});}handleResponse(response) {if (response.id && this.pendingRequests.has(response.id)) {const { resolve, reject } = this.pendingRequests.get(response.id);this.pendingRequests.delete(response.id);if (response.error) {reject(new Error(response.error.message));} else {resolve(response.result);}}}sendRequest(method, params = {}) {return new Promise((resolve, reject) => {const id = String(++this.requestId);const request = {jsonrpc: '2.0',id,method,params};// Include sessionId if establishedif (this.sessionId) {request.sessionId = this.sessionId;}this.pendingRequests.set(id, { resolve, reject });this.socket.write(JSON.stringify(request) + '\n');});}disconnect() {if (this.socket) {this.socket.end();}}}// Usage exampleasync function main() {const client = new SessionClient('localhost', 9000);try {await client.connect();// Initialize sessionawait client.sendRequest('initialize', {clientInfo: { name: 'my-app', version: '1.0.0' }});// Subsequent requests use the same sessionawait client.sendRequest('createSession', {});await client.sendRequest('sendPrompt', {prompt: 'Hello, world!'});} catch (err) {console.error('Error:', err);} finally {client.disconnect();}}main();
Agent Routing
Route messages to specific agents using agentId parameter.
Registry Launcher Integration
Routing to specific agents:
class AgentClient {constructor(host = 'localhost', port = 9000) {this.host = host;this.port = port;this.socket = null;this.requestId = 0;this.pendingRequests = new Map();}async connect() {return new Promise((resolve, reject) => {this.socket = net.createConnection(this.port, this.host, () => {this.setupReadline();resolve();});this.socket.on('error', reject);});}setupReadline() {const rl = readline.createInterface({input: this.socket,crlfDelay: Infinity});rl.on('line', (line) => {try {const response = JSON.parse(line);this.handleResponse(response);} catch (err) {console.error('Parse error:', err);}});}handleResponse(response) {if (response.id && this.pendingRequests.has(response.id)) {const { resolve, reject } = this.pendingRequests.get(response.id);this.pendingRequests.delete(response.id);if (response.error) {reject(new Error(response.error.message));} else {resolve(response.result);}}}sendRequest(method, params = {}, agentId = null) {return new Promise((resolve, reject) => {const id = String(++this.requestId);const request = {jsonrpc: '2.0',id,method,params};// Include agentId for routingif (agentId) {request.params.agentId = agentId;}this.pendingRequests.set(id, { resolve, reject });this.socket.write(JSON.stringify(request) + '\n');});}disconnect() {if (this.socket) {this.socket.end();}}}// Usage exampleasync function main() {const client = new AgentClient('localhost', 9000);try {await client.connect();// Route to Claude agentawait client.sendRequest('initialize', {agentId: 'claude-acp',clientInfo: { name: 'my-app', version: '1.0.0' }});// Route to Goose agentawait client.sendRequest('initialize', {agentId: 'goose',clientInfo: { name: 'my-app', version: '1.0.0' }});} catch (err) {console.error('Error:', err);} finally {client.disconnect();}}main();
IDE Integration
MCP-to-ACP Proxy
Integrate stdio Bus with IDEs using the MCP-to-ACP Proxy.
Kiro IDE configuration:
{"mcpServers": {"stdio-bus-acp": {"command": "npx","args": ["@stdiobus/workers-registry","mcp-to-acp-proxy"],"env": {"ACP_HOST": "localhost","ACP_PORT": "9000","AGENT_ID": "claude-acp"}}}}
VS Code configuration:
{"mcp.servers": {"stdio-bus": {"command": "npx","args": ["@stdiobus/workers-registry","mcp-to-acp-proxy"],"env": {"ACP_HOST": "localhost","ACP_PORT": "9000","AGENT_ID": "claude-acp"}}}}
Custom IDE Integration
Direct MCP integration:
import { Client } from '@modelcontextprotocol/sdk/client/index.js';import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';class IDEIntegration {constructor() {this.client = null;}async connect() {const transport = new StdioClientTransport({command: 'node',args: ['@stdiobus/workers-registry','mcp-to-acp-proxy'],env: {ACP_HOST: 'localhost',ACP_PORT: '9000',AGENT_ID: 'claude-acp'}});this.client = new Client({name: 'my-ide',version: '1.0.0'},{capabilities: {}});await this.client.connect(transport);console.log('Connected to stdio Bus via MCP');}async listTools() {const response = await this.client.request({method: 'tools/list'});return response.tools;}async callTool(name, args) {const response = await this.client.request({method: 'tools/call',params: {name,arguments: args}});return response;}async disconnect() {if (this.client) {await this.client.close();}}}// Usage exampleasync function main() {const ide = new IDEIntegration();try {await ide.connect();// List available toolsconst tools = await ide.listTools();console.log('Available tools:', tools);// Call a toolconst result = await ide.callTool('echo', {text: 'Hello from IDE'});console.log('Tool result:', result);} catch (err) {console.error('Error:', err);} finally {await ide.disconnect();}}main();
Common Workflows
Initialize and Send Prompt
Complete workflow for initializing a session and sending a prompt:
async function initializeAndPrompt() {const client = new StdioBusClient('localhost', 9000);try {await client.connect();// Step 1: Initializeconst initResult = await client.sendRequest('initialize', {clientInfo: {name: 'my-app',version: '1.0.0'}});console.log('Initialized:', initResult);// Step 2: Create sessionconst sessionResult = await client.sendRequest('createSession', {});const sessionId = sessionResult.sessionId;console.log('Session created:', sessionId);// Step 3: Send promptconst promptResult = await client.sendRequest('sendPrompt', {prompt: 'What is the weather today?'}, sessionId);console.log('Prompt result:', promptResult);// Step 4: Close sessionawait client.sendRequest('closeSession', {}, sessionId);console.log('Session closed');} catch (err) {console.error('Error:', err);} finally {client.disconnect();}}initializeAndPrompt();
Tool Execution
Execute tools through ACP Worker:
async function executeTool() {const client = new StdioBusClient('localhost', 9000);try {await client.connect();// Initializeawait client.sendRequest('initialize', {clientInfo: { name: 'my-app', version: '1.0.0' }});// Create sessionconst sessionResult = await client.sendRequest('createSession', {});const sessionId = sessionResult.sessionId;// List available toolsconst toolsResult = await client.sendRequest('listTools', {}, sessionId);console.log('Available tools:', toolsResult.tools);// Execute toolconst toolResult = await client.sendRequest('executeTool', {toolName: 'echo',arguments: {text: 'Hello, tools!'}}, sessionId);console.log('Tool result:', toolResult);} catch (err) {console.error('Error:', err);} finally {client.disconnect();}}executeTool();
Error Handling
Robust error handling for production applications:
class RobustClient {constructor(host = 'localhost', port = 9000) {this.host = host;this.port = port;this.socket = null;this.requestId = 0;this.pendingRequests = new Map();this.reconnectAttempts = 0;this.maxReconnectAttempts = 5;}async connect() {return new Promise((resolve, reject) => {this.socket = net.createConnection(this.port, this.host, () => {console.log('Connected to stdio Bus');this.reconnectAttempts = 0;this.setupReadline();resolve();});this.socket.on('error', (err) => {console.error('Connection error:', err);reject(err);});this.socket.on('close', () => {console.log('Connection closed');this.handleDisconnect();});});}async handleDisconnect() {if (this.reconnectAttempts < this.maxReconnectAttempts) {this.reconnectAttempts++;console.log(`Reconnecting (attempt ${this.reconnectAttempts})...`);await new Promise(resolve => setTimeout(resolve, 1000 * this.reconnectAttempts));try {await this.connect();} catch (err) {console.error('Reconnection failed:', err);}} else {console.error('Max reconnection attempts reached');this.rejectAllPending(new Error('Connection lost'));}}rejectAllPending(error) {for (const [id, { reject }] of this.pendingRequests) {reject(error);}this.pendingRequests.clear();}setupReadline() {const rl = readline.createInterface({input: this.socket,crlfDelay: Infinity});rl.on('line', (line) => {try {const response = JSON.parse(line);this.handleResponse(response);} catch (err) {console.error('Parse error:', err);}});}handleResponse(response) {if (response.id && this.pendingRequests.has(response.id)) {const { resolve, reject } = this.pendingRequests.get(response.id);this.pendingRequests.delete(response.id);if (response.error) {reject(new Error(response.error.message));} else {resolve(response.result);}}}async sendRequest(method, params = {}, sessionId = null, timeout = 30000) {return new Promise((resolve, reject) => {const id = String(++this.requestId);const request = {jsonrpc: '2.0',id,method,params};if (sessionId) {request.sessionId = sessionId;}// Set timeoutconst timeoutId = setTimeout(() => {if (this.pendingRequests.has(id)) {this.pendingRequests.delete(id);reject(new Error('Request timeout'));}}, timeout);this.pendingRequests.set(id, {resolve: (result) => {clearTimeout(timeoutId);resolve(result);},reject: (error) => {clearTimeout(timeoutId);reject(error);}});try {this.socket.write(JSON.stringify(request) + '\n');} catch (err) {clearTimeout(timeoutId);this.pendingRequests.delete(id);reject(err);}});}disconnect() {if (this.socket) {this.socket.end();}}}// Usage exampleasync function main() {const client = new RobustClient('localhost', 9000);try {await client.connect();const result = await client.sendRequest('test', {}, null, 5000);console.log('Result:', result);} catch (err) {console.error('Error:', err);} finally {client.disconnect();}}main();
Best Practices
Connection Management
- Reuse connections - Keep connections open for multiple requests
- Handle disconnects - Implement reconnection logic
- Set timeouts - Prevent hanging requests
- Clean up - Always close connections when done
Session Management
- Preserve sessionId - Include sessionId in all session-related requests
- Handle session expiry - Detect and handle expired sessions
- Close sessions - Explicitly close sessions when done
- Session affinity - Ensure messages with same sessionId go to same worker
Error Handling
- Catch all errors - Handle connection, parse, and protocol errors
- Retry logic - Implement exponential backoff for retries
- Timeout handling - Set appropriate timeouts for requests
- Graceful degradation - Handle partial failures gracefully
Performance
- Connection pooling - Use connection pools for high-throughput applications
- Batch requests - Send multiple requests in parallel when possible
- Buffer management - Monitor buffer sizes and backpressure
- Resource cleanup - Clean up resources promptly
Next Steps
- Configuration - Configure stdio Bus for your use case
- Troubleshooting - Common issues and solutions
- API Reference - Complete API documentation
- ACP Worker - Learn about the ACP Worker
- Registry Launcher - Discover the Registry Launcher