Transport-level routing for MCP/ACP protocols
Platform Integration Guide
This guide describes how to embed stdio Bus in an IDE, platform, or infrastructure system. It covers operating mode selection, configuration, worker implementation, error handling, and graceful shutdown.
Deployment Options
stdio Bus can be embedded in your platform in several ways, depending on your architecture and requirements.
Subprocess
Spawn cos as a child process. Ideal for IDE extensions and desktop applications where stdio Bus runs alongside your main process.
Sidecar
Run cos alongside your service in containerized deployments. Connect via Unix socket or TCP for multi-process architectures.
Embedded (TBD)
Link stdio Bus as a library for deep integration. This mode is planned for future releases and not yet supported.
Subprocess Embedding
The most common approach is spawning stdio Bus as a subprocess. This provides process isolation while maintaining simple communication via stdin/stdout.
Node.js Example
subprocess-embedding.js
const { spawn } = require('child_process');const readline = require('readline');// Spawn stdio Bus as a subprocessconst kernel = spawn('./build/kernel', ['--config', './config.json','--stdio'], {stdio: ['pipe', 'pipe', 'inherit'] // stdin, stdout, stderr});// Send messages to stdio Bus via stdincos.stdin.write(JSON.stringify({jsonrpc: '2.0',id: '1',method: 'initialize',params: {},sessionId: 'session-001'}) + '\n');// Read responses from stdio Bus via stdoutconst rl = readline.createInterface({ input: cos.stdout });rl.on('line', (line) => {const response = JSON.parse(line);console.log('Response:', response);});
Key Points
When using subprocess embedding, stdio Bus reads from its stdin and writes to its stdout. Your platform sends NDJSON messages to stdio Bus's stdin and reads responses from its stdout. Diagnostic output goes to stderr.
Socket-Based Integration
For multi-client scenarios, use Unix socket or TCP mode. This allows multiple processes to connect to a single stdio Bus instance.
Unix Socket Connection
unix-socket-client.js
const net = require('net');// Connect to stdio Bus via Unix socketconst client = net.createConnection('/tmp/stdio_bus.sock', () => {console.log('Connected to stdio Bus');// Send a requestclient.write(JSON.stringify({jsonrpc: '2.0',id: '1',method: 'test',params: {},sessionId: 'session-001'}) + '\n');});// Handle responses with NDJSON parsinglet buffer = '';client.on('data', (data) => {buffer += data.toString();let newlineIndex;while ((newlineIndex = buffer.indexOf('\n')) !== -1) {const line = buffer.slice(0, newlineIndex);buffer = buffer.slice(newlineIndex + 1);const response = JSON.parse(line);console.log('Response:', response);}});
TCP Connection
tcp-client.js
const net = require('net');// Connect to stdio Bus via TCPconst client = net.createConnection({ host: '127.0.0.1', port: 9000 }, () => {console.log('Connected to stdio Bus via TCP');client.write(JSON.stringify({jsonrpc: '2.0',id: '1',method: 'process',params: { data: 'test' },sessionId: 'session-001'}) + '\n');});client.on('data', (data) => {// Handle NDJSON responsesconst lines = data.toString().split('\n').filter(Boolean);lines.forEach(line => {const response = JSON.parse(line);console.log('Response:', response);});});
Mode Selection Guide
stdio Bus supports three operating modes, each suited to different integration scenarios. Choose based on your deployment requirements.
| Mode | Flag | Use Case |
|---|---|---|
| stdio | --stdio | Subprocess embedding, single client, IDE extensions |
| Unix socket | --unix <path> | Local IPC, multiple clients, security-sensitive deployments |
| TCP | --tcp <host:port> | Network access, remote clients, distributed systems |
Mode Characteristics
| Requirement | Recommended Mode |
|---|---|
| Single client, subprocess embedding | stdio |
| Multiple local clients, security-sensitive | Unix socket |
| Multiple clients, network access needed | TCP |
| IDE extension | stdio |
| Containerized microservice | Unix socket or TCP |
| Development/testing | Any (TCP is easiest to debug) |
Configuration Options
stdio Bus reads configuration from a JSON file specified via --config <path>. Configure worker pools and operational limits based on your requirements.
Pool Configuration
| Field | Required | Description |
|---|---|---|
pools[].id | Yes | Unique identifier for the pool (used in logs) |
pools[].command | Yes | Path to the worker executable |
pools[].args | No | Array of command-line arguments |
pools[].instances | Yes | Number of worker processes to spawn (≥1) |
Limits Configuration
| Field | Default | Description |
|---|---|---|
max_input_buffer | 1 MB | Maximum bytes buffered per input connection |
max_output_queue | 4 MB | Maximum bytes queued per output connection |
max_restarts | 5 | Maximum worker restarts within time window |
restart_window_sec | 60 | Time window for counting restarts |
drain_timeout_sec | 30 | Graceful shutdown timeout before SIGKILL |
backpressure_timeout_sec | 60 | Backpressure timeout before closing connection |
Configuration Examples
Minimal Configuration — Single worker, default limits:
config-minimal.json
{"pools": [{"id": "worker","command": "/usr/bin/node","args": ["./worker.js"],"instances": 1}]}
High-Throughput Configuration — Multiple workers, increased buffers:
config-high-throughput.json
{"pools": [{"id": "worker","command": "/usr/bin/node","args": ["./worker.js"],"instances": 8}],"limits": {"max_input_buffer": 4194304,"max_output_queue": 16777216,"backpressure_timeout_sec": 120}}
Fault-Tolerant Configuration — Aggressive restart policy:
config-fault-tolerant.json
{"pools": [{"id": "worker","command": "/usr/bin/node","args": ["./worker.js"],"instances": 4}],"limits": {"max_restarts": 10,"restart_window_sec": 300,"drain_timeout_sec": 60}}
Worker Implementation Templates
Workers are child processes that communicate with stdio Bus via stdin/stdout pipes using NDJSON format. A compliant worker must:
- Read NDJSON from stdin (one JSON object per line)
- Write NDJSON responses to stdout
- Write diagnostics to stderr (never stdout)
- Handle SIGTERM for graceful shutdown
- Preserve
idandsessionIdin responses
JavaScript Worker Template
worker.js
#!/usr/bin/env nodeconst readline = require('readline');const rl = readline.createInterface({input: process.stdin,output: process.stdout,terminal: false});rl.on('line', (line) => {try {const msg = JSON.parse(line);// Only respond to requests (messages with id and method)if (msg.id !== undefined && msg.method !== undefined) {const response = {jsonrpc: '2.0',id: msg.id,result: handleMethod(msg.method, msg.params)};// Preserve sessionId if presentif (msg.sessionId) {response.sessionId = msg.sessionId;}console.log(JSON.stringify(response));}} catch (err) {console.error(`Parse error: ${err.message}`);}});function handleMethod(method, params) {switch (method) {case 'initialize':return { capabilities: {} };case 'echo':return { echo: params };default:return { error: `Unknown method: ${method}` };}}// Graceful shutdown on SIGTERMprocess.on('SIGTERM', () => {console.error('Received SIGTERM, shutting down');process.exit(0);});
Python Worker Template
worker.py
#!/usr/bin/env python3import sysimport jsonimport signaldef handle_method(method, params):"""Implement your method handlers here."""if method == 'initialize':return {'capabilities': {}}elif method == 'echo':return {'echo': params}else:return {'error': f'Unknown method: {method}'}def process_message(line):"""Process a single NDJSON message."""try:msg = json.loads(line)# Only respond to requests (messages with id and method)if 'id' in msg and 'method' in msg:response = {'jsonrpc': '2.0','id': msg['id'],'result': handle_method(msg['method'], msg.get('params', {}))}# Preserve sessionId if presentif 'sessionId' in msg:response['sessionId'] = msg['sessionId']print(json.dumps(response), flush=True)except json.JSONDecodeError as e:print(f'Parse error: {e}', file=sys.stderr)def main():# Graceful shutdown on SIGTERMsignal.signal(signal.SIGTERM, lambda sig, frame: sys.exit(0))for line in sys.stdin:process_message(line.strip())if __name__ == '__main__':main()
Error Handling Patterns
Platform integrators should handle connection errors, message errors, and worker failures gracefully.
Connection Errors
| Error | Cause | Platform Action |
|---|---|---|
| Connection refused | stdio Bus not running or wrong address | Retry with backoff, check stdio Bus status |
| Connection reset | stdio Bus closed connection | Check for malformed messages, reconnect |
| Timeout | stdio Bus or worker unresponsive | Implement client-side timeouts |
stdio Bus Exit Codes
| Exit Code | Meaning | Platform Action |
|---|---|---|
| 0 | Graceful shutdown | Normal termination |
| 1 | Configuration or startup error | Check config file and logs |
| Non-zero | Unexpected error | Check stderr, restart |
Request Timeout Pattern
error-handling.js
const pendingRequests = new Map();function sendRequest(client, request, timeoutMs = 30000) {return new Promise((resolve, reject) => {const timer = setTimeout(() => {pendingRequests.delete(request.id);reject(new Error('Request timeout'));}, timeoutMs);// Store pending requestpendingRequests.set(request.id, { resolve, reject, timer });// Send requestclient.write(JSON.stringify(request) + '\n');});}function handleResponse(response) {const pending = pendingRequests.get(response.id);if (pending) {clearTimeout(pending.timer);pendingRequests.delete(response.id);if (response.error) {pending.reject(new Error(response.error.message));} else {pending.resolve(response.result);}}}
Graceful Shutdown Procedures
Proper shutdown handling ensures no data loss and clean resource cleanup.
stdio Bus Shutdown Sequence
When stdio Bus receives SIGTERM or SIGINT:
- Stop accepting connections: Close listening socket (socket modes)
- Signal workers: Send SIGTERM to all running workers
- Drain period: Wait up to
drain_timeout_secfor workers to exit - Force termination: Send SIGKILL to remaining workers
- Cleanup: Close all file descriptors, free memory
- Exit: Exit with code 0
Platform Shutdown Procedure
graceful-shutdown.js
const { spawn } = require('child_process');const kernel = spawn('./build/kernel', ['--config', 'config.json', '--stdio']);// Graceful shutdown functionfunction shutdown() {return new Promise((resolve) => {cos.on('exit', (code) => {console.log(`stdio Bus exited with code ${code}`);resolve(code);});// Send SIGTERM for graceful shutdowncos.kill('SIGTERM');// Force kill after timeout (drain_timeout_sec + buffer)setTimeout(() => {if (!cos.killed) {console.warn('stdio Bus did not exit, sending SIGKILL');cos.kill('SIGKILL');}}, 35000);});}// Handle process signalsprocess.on('SIGTERM', async () => {await shutdown();process.exit(0);});
⏱️ Timeout Recommendation
Set your platform timeout slightly higher than drain_timeout_sec (default: 30s) to allow stdio Bus to complete its shutdown sequence. A platform timeout of 35 seconds is recommended.
Common Integration Patterns
The following code snippets demonstrate common patterns for integrating with stdio Bus in production environments.
Session Management
Sessions provide worker affinity for related messages. All messages with the same sessionId route to the same worker.
session-management.js
// Generate unique session IDsfunction generateSessionId() {return `session-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;}// Track sessions per user/contextconst userSessions = new Map();function getOrCreateSession(userId) {if (!userSessions.has(userId)) {userSessions.set(userId, generateSessionId());}return userSessions.get(userId);}// Send request with session affinityfunction sendRequest(client, userId, method, params) {const sessionId = getOrCreateSession(userId);const request = {jsonrpc: '2.0',id: generateRequestId(),method,params,sessionId};client.write(JSON.stringify(request) + '\n');}
Request-Response Correlation
Track pending requests for response matching using the request id field.
request-correlation.js
const pendingRequests = new Map();let requestCounter = 0;function generateRequestId() {return `req-${++requestCounter}`;}function sendRequest(client, method, params, sessionId) {return new Promise((resolve, reject) => {const id = generateRequestId();const request = { jsonrpc: '2.0', id, method, params };if (sessionId) request.sessionId = sessionId;pendingRequests.set(id, { resolve, reject });client.write(JSON.stringify(request) + '\n');});}function handleResponse(response) {const pending = pendingRequests.get(response.id);if (pending) {pendingRequests.delete(response.id);if (response.error) {pending.reject(new Error(response.error.message));} else {pending.resolve(response.result);}}}
Connection Pooling
Manage multiple connections for high throughput in TCP or Unix socket mode.
connection-pool.js
const net = require('net');class ConnectionPool {constructor(socketPath, poolSize = 4) {this.socketPath = socketPath;this.connections = [];this.nextIndex = 0;for (let i = 0; i < poolSize; i++) {this.connections.push(this.createConnection());}}createConnection() {const conn = net.createConnection(this.socketPath);conn.buffer = '';conn.on('data', (data) => {conn.buffer += data.toString();this.processBuffer(conn);});return conn;}processBuffer(conn) {let newlineIndex;while ((newlineIndex = conn.buffer.indexOf('\n')) !== -1) {const line = conn.buffer.slice(0, newlineIndex);conn.buffer = conn.buffer.slice(newlineIndex + 1);this.handleResponse(JSON.parse(line));}}getConnection() {const conn = this.connections[this.nextIndex];this.nextIndex = (this.nextIndex + 1) % this.connections.length;return conn;}send(message) {const conn = this.getConnection();conn.write(JSON.stringify(message) + '\n');}close() {this.connections.forEach(conn => conn.end());}}
Health Monitoring
Monitor stdio Bus process health and implement automatic restart on failure.
health-monitoring.js
const { spawn } = require('child_process');class COSManager {constructor(configPath, mode = 'stdio') {this.configPath = configPath;this.mode = mode;this.process = null;this.restartCount = 0;this.maxRestarts = 5;}start() {const args = ['--config', this.configPath, `--${this.mode}`];this.process = spawn('./build/kernel', args, {stdio: ['pipe', 'pipe', 'inherit']});this.process.on('exit', (code, signal) => {console.error(`stdio Bus exited: code=${code}, signal=${signal}`);this.handleExit(code, signal);});this.process.on('error', (err) => {console.error(`stdio Bus error: ${err.message}`);});return this.process;}handleExit(code, signal) {if (signal === 'SIGTERM' || signal === 'SIGINT') {return; // Intentional shutdown}// Unexpected exit - attempt restartif (this.restartCount < this.maxRestarts) {this.restartCount++;console.log(`Restarting stdio Bus (attempt ${this.restartCount})`);setTimeout(() => this.start(), 1000 * this.restartCount);} else {console.error('Max restarts exceeded, giving up');}}stop() {if (this.process) {this.process.kill('SIGTERM');}}}
Logging Integration
Capture stdio Bus stderr output and forward to your logging system.
logging-integration.js
const { spawn } = require('child_process');const readline = require('readline');const kernel = spawn('./build/kernel', ['--config', 'config.json', '--stdio'], {stdio: ['pipe', 'pipe', 'pipe'] // Capture stderr});// Forward stdio Bus logs to your logging systemconst stderrReader = readline.createInterface({ input: cos.stderr });stderrReader.on('line', (line) => {// Parse stdio Bus log format: [TIMESTAMP] [LEVEL] messageconst match = line.match(/^\[([^\]]+)\] \[([^\]]+)\] (.*)$/);if (match) {const [, timestamp, level, message] = match;yourLogger.log(level.toLowerCase(), message, {timestamp,source: 'stdio Bus'});} else {yourLogger.info(line, { source: 'stdio Bus' });}});