Transport-level routing for MCP/ACP protocols
Echo Worker
Simple reference implementation demonstrating the NDJSON worker protocol for stdio Bus kernel.
Overview
The Echo Worker is a minimal worker implementation that echoes back any JSON-RPC messages it receives. It serves as both a testing tool for stdio Bus kernel functionality and a reference implementation for developers creating custom workers.
Key Features:
- Simple NDJSON protocol implementation
- JSON-RPC 2.0 compliant
- Session affinity support
- Graceful shutdown handling
- Reference code for custom workers
Purpose
The Echo Worker serves multiple purposes:
- Testing: Verify stdio Bus kernel functionality and message routing
- Reference Implementation: Demonstrate the NDJSON worker protocol through clean, documented code
- Development: Quick testing during development without complex protocol implementations
- Learning: Understand how workers communicate with stdio Bus kernel
Usage
Standalone Testing
Run the Echo Worker directly to test the NDJSON protocol:
echo '{"jsonrpc":"2.0","id":"1","method":"test","params":{"foo":"bar"}}' | \node node_modules/@stdiobus/workers-registry/workers/echo-worker/echo-worker.js
Expected output:
{"jsonrpc":"2.0","id":"1","result":{"method":"test","params":{"foo":"bar"}}}
Using with stdio Bus
Configuration file (echo-worker-config.json):
{"pools": [{"id": "echo-worker","command": "npx","args": ["@stdiobus/workers-registry","echo-worker"],"instances": 1}]}
Run with Docker:
docker run \--name stdiobus-echo \-p 9000:9000 \-v $(pwd):/stdiobus:ro \-v $(pwd)/echo-worker-config.json:/config.json:ro \stdiobus/stdiobus:latest \--config /config.json --tcp 0.0.0.0:9000
Run with binary:
./stdio_bus --config echo-worker-config.json --tcp 0.0.0.0:9000
Testing the Connection
Send a test message to the Echo Worker:
echo '{"jsonrpc":"2.0","id":"1","method":"echo","params":{"test":true}}' | nc localhost 9000
Response:
{"jsonrpc":"2.0","id":"1","result":{"method":"echo","params":{"test":true}}}
Configuration
Basic Configuration
Minimal configuration for running a single Echo Worker instance:
{"pools": [{"id": "echo-worker","command": "npx","args": ["@stdiobus/workers-registry","echo-worker"],"instances": 1}]}
Multiple Instances
Run multiple Echo Worker instances for load testing:
{"pools": [{"id": "echo-worker","command": "npx","args": ["@stdiobus/workers-registry","echo-worker"],"instances": 4}]}
With Custom Limits
Configure resource limits for testing backpressure and error handling:
{"pools": [{"id": "echo-worker","command": "npx","args": ["@stdiobus/workers-registry","echo-worker"],"instances": 1}],"limits": {"max_input_buffer": 1048576,"max_output_queue": 4194304,"max_restarts": 5,"restart_window_sec": 60,"drain_timeout_sec": 30,"backpressure_timeout_sec": 60}}
Protocol Implementation
The Echo Worker demonstrates the core NDJSON protocol requirements:
Reading from stdin
import readline from 'readline';const rl = readline.createInterface({input: process.stdin,output: process.stdout,terminal: false});rl.on('line', (line) => {try {const msg = JSON.parse(line);handleMessage(msg);} catch (err) {console.error('Parse error:', err.message);}});
Writing to stdout
function sendResponse(id, result, sessionId) {const response = {jsonrpc: '2.0',id: id,result: result};if (sessionId) {response.sessionId = sessionId;}console.log(JSON.stringify(response));}
Handling SIGTERM
process.on('SIGTERM', () => {console.error('Shutting down...');rl.close();});rl.on('close', () => process.exit(0));
Testing Use Cases
Protocol Verification
Test that stdio Bus correctly routes messages to workers:
# Send requestecho '{"jsonrpc":"2.0","id":"1","method":"test","params":{}}' | nc localhost 9000# Verify response format# Should receive: {"jsonrpc":"2.0","id":"1","result":{"method":"test","params":{}}}
Session Affinity Testing
Test that messages with the same sessionId are routed to the same worker instance:
# Send multiple messages with same sessionIdecho '{"jsonrpc":"2.0","id":"1","method":"test","sessionId":"sess-123","params":{}}' | nc localhost 9000echo '{"jsonrpc":"2.0","id":"2","method":"test","sessionId":"sess-123","params":{}}' | nc localhost 9000echo '{"jsonrpc":"2.0","id":"3","method":"test","sessionId":"sess-123","params":{}}' | nc localhost 9000# All responses should come from the same worker instance
Load Testing
Test stdio Bus performance with multiple concurrent connections:
# Run multiple clients in parallelfor i in {1..100}; doecho '{"jsonrpc":"2.0","id":"'$i'","method":"test","params":{}}' | nc localhost 9000 &donewait
Error Handling
Test how stdio Bus handles malformed messages:
# Send invalid JSONecho 'not-json' | nc localhost 9000# Send JSON-RPC without required fieldsecho '{"method":"test"}' | nc localhost 9000
Development Reference
The Echo Worker source code serves as a reference for creating custom workers. Key implementation patterns:
Message Handling
function handleMessage(msg) {// Requests have an 'id' field and require a responseif (msg.id !== undefined) {const result = {method: msg.method,params: msg.params};sendResponse(msg.id, result, msg.sessionId);}// Notifications don't have an 'id' and don't require a responseelse {console.error('Received notification:', msg.method);}}
Error Responses
function sendError(id, code, message, sessionId) {const response = {jsonrpc: '2.0',id: id,error: {code: code,message: message}};if (sessionId) {response.sessionId = sessionId;}console.log(JSON.stringify(response));}
Logging
// Always log to stderr, never to stdoutconsole.error('Worker started');console.error('Processing message:', msg.method);console.error('Error occurred:', err.message);
Best Practices
When using the Echo Worker as a reference for custom workers:
- Never write non-JSON to stdout - All output to stdout must be valid NDJSON
- Log to stderr only - Use
console.error()for all logging and debugging - Preserve sessionId - Always include sessionId in responses when present in requests
- Handle SIGTERM - Implement graceful shutdown to avoid data loss
- Parse errors carefully - Catch JSON parse errors and log them to stderr
- Validate JSON-RPC - Check for required fields (jsonrpc, id, method)
- Use readline interface - Process stdin line-by-line for NDJSON protocol
Next Steps
- ACP Worker - Full ACP protocol implementation