Featured

LLM Architecture Series – Complete Guide

Visualisation from bbycroft.net/llm – Annotated with Nano Banana Welcome to the LLM Architecture Series This comprehensive 20-part series takes you from the fundamentals to advanced concepts in Large Language Model architecture. Using interactive visualisations from Brendan Bycroft’s excellent LLM Visualisation, we explore every component of a GPT-style transformer. Series Overview Part 1: Foundations (Articles 1-5)…

Go to Course →

Capstone

This final capstone assembles everything from the course into a complete MCP platform: a registry for server discovery, an API gateway for authentication and routing, a collection of domain-specific MCP servers, and a web interface where teams can explore available tools, run agent queries, and review audit logs. When you deploy this platform, you have the infrastructure that enterprise teams need to build and manage AI-powered workflows on MCP.

Full MCP platform architecture registry gateway domain servers web interface audit logs monitoring dark
The complete MCP platform: registry, gateway, domain servers, and a management web interface.

Platform Architecture Overview

Component Purpose Lesson Reference
MCP Registry Server discovery and health tracking Lesson 44
API Gateway Auth (OAuth), rate limiting, routing Lessons 31, 41
Domain MCP Servers Business tools (CRM, docs, analytics) Parts I-III
Multi-Provider Agent Route queries to OpenAI/Claude/Gemini Lessons 28-30
Audit Service Structured logs, compliance reporting Lesson 35
Observability Stack Prometheus + Grafana + OpenTelemetry Lesson 42
Management UI Tool explorer, query interface, logs This lesson

Platform Bootstrap Script

// platform/bootstrap.js
// Register all MCP servers with the registry on startup

const REGISTRY_URL = process.env.REGISTRY_URL ?? 'http://localhost:4000';

const MCP_SERVERS = [
  {
    id: 'products',
    name: 'Product Catalog Server',
    description: 'Search, browse, and manage product catalog',
    url: process.env.PRODUCTS_SERVER_URL,
    tags: ['products', 'catalog', 'inventory'],
    auth: { type: 'bearer' },
    healthUrl: `${process.env.PRODUCTS_SERVER_URL}/health`,
  },
  {
    id: 'analytics',
    name: 'Analytics Server',
    description: 'Business metrics, trends, and reports',
    url: process.env.ANALYTICS_SERVER_URL,
    tags: ['analytics', 'metrics', 'reports'],
    auth: { type: 'bearer' },
    healthUrl: `${process.env.ANALYTICS_SERVER_URL}/health`,
  },
  // ... more servers
];

async function registerAll() {
  for (const server of MCP_SERVERS) {
    await fetch(`${REGISTRY_URL}/servers`, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(server),
    });
    console.log(`Registered: ${server.name}`);
  }
}

await registerAll();

Management API

// platform/management-api.js
// REST API for the management UI

import express from 'express';
const app = express();
app.use(express.json());

// List all registered MCP servers with health
app.get('/api/platform/servers', async (req, res) => {
  const response = await fetch(`${REGISTRY_URL}/status`);
  res.json(await response.json());
});

// List all tools from all healthy servers
app.get('/api/platform/tools', async (req, res) => {
  const discovery = new McpDiscoveryClient(REGISTRY_URL);
  await discovery.connect();
  const tools = await discovery.getAllTools();
  res.json({ tools, count: tools.length });
});

// Execute an agent query
app.post('/api/platform/query', async (req, res) => {
  const { question, provider = 'auto', userId } = req.body;
  // Rate limit, auth check, then:
  const agent = await createAgent({ scope: getUserScope(userId), preferredProvider: provider });
  const answer = await agent.run(question);
  res.json({ answer });
  await agent.close();
});

// Get audit logs for a user
app.get('/api/platform/audit', async (req, res) => {
  const { userId, from, to, limit = 50 } = req.query;
  const logs = await auditDb.query({ userId, from, to, limit });
  res.json({ logs });
});

app.listen(5000, () => console.log('Management API on :5000'));
Platform component interaction diagram registry discovery client agent router domain servers management UI dark
Component interaction: the discovery client queries the registry, builds the tool set, and routes through the agent.

Docker Compose – Full Platform

services:
  registry:
    build: ./registry
    ports: ["4000:4000"]
    depends_on: [redis]

  gateway:
    build: ./gateway
    ports: ["3000:3000"]
    environment:
      REGISTRY_URL: http://registry:4000
    depends_on: [registry, redis]

  management-api:
    build: ./platform
    ports: ["5000:5000"]
    depends_on: [gateway, registry]

  products-server:
    build: ./servers/products
    environment:
      DATABASE_URL: ${PRODUCTS_DB_URL}

  analytics-server:
    build: ./servers/analytics
    environment:
      DATABASE_URL: ${ANALYTICS_DB_URL}

  redis:
    image: redis:7-alpine

  prometheus:
    image: prom/prometheus:v2.50.0
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    ports: ["9090:9090"]

  grafana:
    image: grafana/grafana:10.3.0
    ports: ["3001:3000"]
    depends_on: [prometheus]

What You Have Built

Across all 53 lessons and 5 capstone projects you have built:

  • MCP servers using every primitive: tools, resources, prompts, sampling, elicitation, roots
  • Clients for all three major LLM providers: OpenAI, Claude, and Gemini
  • Production infrastructure: Docker, Kubernetes, Nginx, Redis
  • Security stack: OAuth 2.0, RBAC, input validation, audit logging, secrets management
  • Multi-agent systems: A2A delegation, LangGraph integration, state management
  • Observability: Prometheus metrics, OpenTelemetry tracing, structured logs
  • A complete enterprise platform: registry, gateway, domain servers, management UI

MCP is the connective tissue of the AI application stack. You now know it from protocol fundamentals to enterprise deployment. Go build something important.

nJoy πŸ˜‰

Project 4

This capstone builds the most complete MCP application in the course: an enterprise AI assistant with OAuth 2.0 authentication, RBAC tool access control, full audit logging, rate limiting, and a multi-provider backend. It brings together patterns from every major part of the course into a single deployable system. Deploy it and you have a production-ready enterprise AI assistant that your security team can audit and your compliance team can sign off on.

Enterprise AI assistant full architecture OAuth RBAC audit logging rate limiting multi-provider MCP dark
Enterprise-grade: OAuth tokens + RBAC scope filtering + audit logs + rate limiting + multi-provider routing.

System Architecture

enterprise-assistant/
β”œβ”€β”€ gateway/
β”‚   β”œβ”€β”€ server.js          (HTTP API gateway with auth + rate limiting)
β”‚   β”œβ”€β”€ auth.js            (OAuth 2.0 token validation, JWKS)
β”‚   β”œβ”€β”€ rbac.js            (Role-to-scope mapping, tool filtering)
β”‚   β”œβ”€β”€ audit.js           (Structured audit logging)
β”‚   └── rate-limiter.js    (Per-user rate limiting with Redis)
β”œβ”€β”€ agent/
β”‚   β”œβ”€β”€ router.js          (Multi-provider routing: OpenAI/Claude/Gemini)
β”‚   └── executor.js        (Tool loop with retry, timeout, token budget)
β”œβ”€β”€ servers/
β”‚   β”œβ”€β”€ knowledge-server.js (Knowledge base search)
β”‚   └── actions-server.js   (Business action tools)
└── docker-compose.yml

The Gateway Server

// gateway/server.js
import express from 'express';
import { validateToken, getRolesFromToken } from './auth.js';
import { getScopeFromRoles, getAllowedTools } from './rbac.js';
import { AuditLogger } from './audit.js';
import { createRateLimiter } from './rate-limiter.js';
import { createAgent } from '../agent/router.js';

const app = express();
app.use(express.json());

const auditLog = new AuditLogger();
const rateLimiter = createRateLimiter(60);  // 60 req/min per user

// Health check
app.get('/health', (req, res) => res.json({ status: 'ok', uptime: process.uptime() }));
app.get('/metrics', (req, res) => res.end(getPrometheusMetrics()));

// Main API endpoint
app.post('/api/ask', async (req, res) => {
  const requestId = crypto.randomUUID();

  // 1. Authenticate
  const authHeader = req.headers.authorization;
  if (!authHeader?.startsWith('Bearer ')) {
    return res.status(401).json({ error: 'Bearer token required' });
  }

  let claims;
  try {
    claims = await validateToken(authHeader.slice(7));
  } catch {
    return res.status(401).json({ error: 'Invalid token' });
  }

  // 2. Rate limit
  try {
    await rateLimiter.consume(claims.sub);
  } catch (rl) {
    res.setHeader('Retry-After', Math.ceil(rl.msBeforeNext / 1000));
    return res.status(429).json({ error: 'Rate limit exceeded' });
  }

  // 3. Determine role and scope
  const roles = getRolesFromToken(claims);
  const scope = getScopeFromRoles(roles);

  // 4. Get question
  const { question, preferredProvider } = req.body;
  if (!question?.trim()) return res.status(400).json({ error: 'question is required' });

  // 5. Build and run the agent
  const agent = await createAgent({ scope, preferredProvider });

  // 6. Run with audit logging
  await auditLog.write({
    eventId: requestId,
    eventType: 'api_request',
    actor: { userId: claims.sub, roles },
    request: { question: question.slice(0, 100) },
    scope: scope.split(' '),
  });

  try {
    const answer = await agent.run(question);

    await auditLog.write({
      eventId: requestId,
      eventType: 'api_response',
      actor: { userId: claims.sub },
      outcome: { success: true },
    });

    res.json({ answer, requestId });
  } catch (err) {
    await auditLog.write({
      eventId: requestId,
      eventType: 'api_error',
      actor: { userId: claims.sub },
      outcome: { success: false, error: err.message },
    });
    res.status(500).json({ error: 'Agent execution failed', requestId });
  } finally {
    await agent.close();
  }
});

const PORT = process.env.PORT ?? 3000;
app.listen(PORT, () => console.log(`Enterprise assistant listening on :${PORT}`));
Request flow diagram authenticate rate limit RBAC scope filter agent run audit log response dark
Request lifecycle: every request goes through 6 stages before the agent runs.

RBAC Configuration

// gateway/rbac.js
const ROLE_SCOPES = {
  employee: 'knowledge:read',
  manager: 'knowledge:read actions:read',
  admin: 'knowledge:read knowledge:write actions:read actions:write',
};

const SCOPE_TOOLS = {
  'knowledge:read': ['search_knowledge', 'get_article', 'list_categories'],
  'knowledge:write': ['create_article', 'update_article', 'publish_article'],
  'actions:read': ['get_ticket', 'list_tickets', 'get_report'],
  'actions:write': ['create_ticket', 'update_ticket', 'trigger_alert'],
};

export function getScopeFromRoles(roles) {
  return [...new Set(roles.flatMap(r => (ROLE_SCOPES[r] ?? '').split(' ')).filter(Boolean))].join(' ');
}

export function getAllowedTools(scope, allTools) {
  const allowed = new Set(
    scope.split(' ').flatMap(s => SCOPE_TOOLS[s] ?? [])
  );
  return allTools.filter(t => allowed.has(t.name));
}

Multi-Provider Agent Router

// agent/router.js - select provider based on question complexity
import { OpenAIProvider } from './providers/openai.js';
import { ClaudeProvider } from './providers/claude.js';
import { GeminiProvider } from './providers/gemini.js';
import { getAllowedTools } from '../gateway/rbac.js';

export async function createAgent({ scope, preferredProvider = 'auto' }) {
  // Load MCP servers
  const mcpClients = await connectMcpServers();
  const allTools = await aggregateTools(mcpClients);
  const scopedTools = getAllowedTools(scope, allTools);

  // Select provider
  const question = '';  // Provider selection is done at query time
  const providerKey = preferredProvider === 'auto'
    ? selectProvider(question)
    : preferredProvider;

  const Provider = { openai: OpenAIProvider, claude: ClaudeProvider, gemini: GeminiProvider }[providerKey];
  const provider = new Provider({ maxTurns: 12, tokenBudget: 50_000 });

  return {
    async run(question) {
      return provider.run(question, scopedTools, mcpClients);
    },
    async close() {
      await Promise.all(mcpClients.map(c => c.close()));
    },
  };
}

Deployment

services:
  gateway:
    build: .
    ports: ["3000:3000"]
    environment:
      OPENAI_API_KEY: ${OPENAI_API_KEY}
      ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY}
      GEMINI_API_KEY: ${GEMINI_API_KEY}
      JWKS_URL: ${JWKS_URL}
      REDIS_URL: redis://redis:6379
    depends_on: [redis]
    healthcheck:
      test: ["CMD", "wget", "-qO-", "http://localhost:3000/health"]
      interval: 30s; timeout: 5s; retries: 3

  redis:
    image: redis:7-alpine
    volumes: ["redis-data:/data"]

volumes:
  redis-data:

nJoy πŸ˜‰

Project 3

Real-world AI assistants need to integrate many APIs: a CRM for customer data, a ticketing system for support requests, a payment processor for billing status, a calendar for scheduling. Each of these becomes an MCP server, and the multi-provider abstraction layer from Lesson 29 routes queries to the right provider. This capstone builds a multi-API integration hub that unifies five real-world APIs behind a single MCP interface, with tool routing, error handling, and a unified context window.

Multi-API hub architecture five MCP servers CRM ticketing payments calendar analytics unified gateway dark
Five MCP servers, one agent: the hub aggregates tools from all servers and routes calls automatically.

Project Architecture

mcp-api-hub/
β”œβ”€β”€ servers/
β”‚   β”œβ”€β”€ crm-server.js          (Customer data: search, get, update)
β”‚   β”œβ”€β”€ tickets-server.js      (Support tickets: list, create, update)
β”‚   β”œβ”€β”€ payments-server.js     (Billing: get_invoice, check_subscription)
β”‚   β”œβ”€β”€ calendar-server.js     (Meetings: list, create, cancel)
β”‚   └── analytics-server.js   (Metrics: get_report, get_trend)
β”œβ”€β”€ agent/
β”‚   └── hub-agent.js           (Multi-server MCP + OpenAI agent)
└── index.js

The Multi-Server Agent

// agent/hub-agent.js
import OpenAI from 'openai';
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';

const SERVER_CONFIGS = [
  { id: 'crm', command: 'node', args: ['./servers/crm-server.js'] },
  { id: 'tickets', command: 'node', args: ['./servers/tickets-server.js'] },
  { id: 'payments', command: 'node', args: ['./servers/payments-server.js'] },
  { id: 'calendar', command: 'node', args: ['./servers/calendar-server.js'] },
  { id: 'analytics', command: 'node', args: ['./servers/analytics-server.js'] },
];

export async function createHubAgent() {
  const openai = new OpenAI();
  const connections = new Map();
  const allTools = [];

  // Connect to all servers in parallel
  await Promise.all(SERVER_CONFIGS.map(async config => {
    const transport = new StdioClientTransport({ command: config.command, args: config.args, env: process.env });
    const client = new Client({ name: 'hub-agent', version: '1.0.0' });
    await client.connect(transport);
    connections.set(config.id, client);

    const { tools } = await client.listTools();
    for (const tool of tools) {
      allTools.push({
        serverId: config.id,
        tool,
        openaiFormat: {
          type: 'function',
          function: { name: tool.name, description: `[${config.id}] ${tool.description}`, parameters: tool.inputSchema, strict: true },
        },
      });
    }
  }));

  console.log(`Hub connected to ${connections.size} servers, ${allTools.length} tools total`);

  // Find which server owns a tool
  const toolIndex = new Map(allTools.map(t => [t.tool.name, t]));

  return {
    async query(userMessage) {
      const messages = [
        {
          role: 'system',
          content: `You are a comprehensive business assistant with access to CRM, ticketing, payments, calendar, and analytics systems.
Tools are prefixed with their system: [crm], [tickets], [payments], [calendar], [analytics].
When answering questions, use tools from multiple systems as needed to give a complete answer.
Always check multiple related systems when investigating customer issues.`,
        },
        { role: 'user', content: userMessage },
      ];

      const openaiTools = allTools.map(t => t.openaiFormat);
      let turns = 0;

      while (true) {
        const response = await openai.chat.completions.create({
          model: 'gpt-4o', messages, tools: openaiTools, tool_choice: 'auto',
          parallel_tool_calls: true,
        });
        const msg = response.choices[0].message;
        messages.push(msg);

        if (msg.finish_reason !== 'tool_calls') return msg.content;
        if (++turns > 15) throw new Error('Max turns exceeded');

        const results = await Promise.all(msg.tool_calls.map(async tc => {
          const entry = toolIndex.get(tc.function.name);
          if (!entry) {
            return { role: 'tool', tool_call_id: tc.id, content: `Tool '${tc.function.name}' not found` };
          }
          const client = connections.get(entry.serverId);
          const args = JSON.parse(tc.function.arguments);
          const result = await client.callTool({ name: tc.function.name, arguments: args });
          const text = result.content.filter(c => c.type === 'text').map(c => c.text).join('\n');
          return { role: 'tool', tool_call_id: tc.id, content: text };
        }));
        messages.push(...results);
      }
    },

    async close() {
      await Promise.all([...connections.values()].map(c => c.close()));
    },
  };
}
Multi-server query flow OpenAI calling tools from CRM tickets payments in parallel collecting results dark
Parallel tool calling: GPT-4o queries CRM, tickets, and payments simultaneously for a complete customer view.

Sample CRM Server (Condensed)

// servers/crm-server.js
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import { z } from 'zod';

const server = new McpServer({ name: 'crm-server', version: '1.0.0' });

server.tool('search_customers', {
  query: z.string().min(1).max(100),
  limit: z.number().int().min(1).max(20).default(10),
}, async ({ query, limit }) => {
  const customers = await crmApi.search(query, limit);
  return { content: [{ type: 'text', text: JSON.stringify(customers) }] };
});

server.tool('get_customer', {
  id: z.string().uuid(),
}, async ({ id }) => {
  const customer = await crmApi.getById(id);
  if (!customer) return { content: [{ type: 'text', text: 'Customer not found' }], isError: true };
  return { content: [{ type: 'text', text: JSON.stringify(customer) }] };
});

const transport = new StdioServerTransport();
await server.connect(transport);

Example Usage

const agent = await createHubAgent();

const answer = await agent.query(
  'Customer john.smith@acme.com says their subscription renewal failed last week. ' +
  'What is their account status, do they have any open support tickets, ' +
  'and what does their payment history look like?'
);
// Agent will call: search_customers, get_subscription, list_tickets, get_payment_history
// in parallel, then synthesize a complete answer

console.log(answer);
await agent.close();

nJoy πŸ˜‰

Project 2

This capstone builds a filesystem agent powered by Claude 3.7 Sonnet. The agent can read files, search codebases, analyze code structure, and refactor files under user supervision. It applies the security patterns from Part VIII: roots for filesystem boundaries, tool safety for path validation, and confirmation-based elicitation for destructive file writes. The result is a safe, auditable codebase assistant that you can trust with your actual project files.

Filesystem agent architecture Claude MCP server file tools read search analyze write with roots boundary dark
Filesystem agent: Claude plans file operations, MCP server executes them within roots-defined boundaries.

The Filesystem MCP Server

// servers/fs-server.js
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import { z } from 'zod';
import fs from 'node:fs/promises';
import path from 'node:path';

const server = new McpServer({ name: 'fs-server', version: '1.0.0' });

// Get the allowed root from the client (via roots capability)
let allowedRoots = [];
server.server.onroots_list_changed = async () => {
  const { roots } = await server.server.listRoots();
  allowedRoots = roots.map(r => r.uri.replace('file://', ''));
};

// Path safety: ensure the path is within an allowed root
function validatePath(filePath) {
  const resolved = path.resolve(filePath);
  if (allowedRoots.length === 0) {
    throw new Error('No filesystem roots configured');
  }
  const isAllowed = allowedRoots.some(root => resolved.startsWith(path.resolve(root)));
  if (!isAllowed) {
    throw new Error(`Path '${resolved}' is outside allowed roots: ${allowedRoots.join(', ')}`);
  }
  return resolved;
}

// Tool: Read a file
server.tool('read_file', {
  path: z.string().min(1).max(512).refine(p => !p.includes('..'), 'Path traversal not allowed'),
}, async ({ path: filePath }) => {
  const safePath = validatePath(filePath);
  try {
    const content = await fs.readFile(safePath, 'utf8');
    const lines = content.split('\n').length;
    return { content: [{ type: 'text', text: `// ${safePath} (${lines} lines)\n${content}` }] };
  } catch (err) {
    return { content: [{ type: 'text', text: `Cannot read file: ${err.message}` }], isError: true };
  }
});

// Tool: List directory
server.tool('list_directory', {
  path: z.string().min(1).max(512),
  recursive: z.boolean().default(false),
}, async ({ path: dirPath, recursive }) => {
  const safePath = validatePath(dirPath);
  const entries = await listDir(safePath, recursive, 0, []);
  return { content: [{ type: 'text', text: entries.join('\n') }] };
});

async function listDir(dirPath, recursive, depth, results) {
  if (depth > 3) return results;  // Max 3 levels deep
  const entries = await fs.readdir(dirPath, { withFileTypes: true });
  for (const entry of entries) {
    if (entry.name.startsWith('.') || entry.name === 'node_modules') continue;
    const full = path.join(dirPath, entry.name);
    results.push(`${'  '.repeat(depth)}${entry.isDirectory() ? '[DIR] ' : ''}${entry.name}`);
    if (recursive && entry.isDirectory()) await listDir(full, recursive, depth + 1, results);
  }
  return results;
}

// Tool: Search for text in files
server.tool('search_files', {
  directory: z.string(),
  pattern: z.string().max(200),
  file_extension: z.string().optional(),
}, async ({ directory, pattern, file_extension }) => {
  const safePath = validatePath(directory);
  const regex = new RegExp(pattern, 'i');
  const matches = [];
  await searchFiles(safePath, regex, file_extension, matches);
  return { content: [{ type: 'text', text: matches.slice(0, 50).join('\n') || 'No matches found' }] };
});

// Tool: Write file (requires confirmation via elicitation)
server.tool('write_file', {
  path: z.string().min(1).max(512),
  content: z.string().max(100_000),
}, async ({ path: filePath, content }, context) => {
  const safePath = validatePath(filePath);

  // Check if file already exists
  const exists = await fs.access(safePath).then(() => true).catch(() => false);

  if (exists) {
    const confirm = await context.elicit(
      `This will overwrite '${safePath}'. Confirm?`,
      { type: 'object', properties: { confirm: { type: 'boolean' } } }
    );
    if (!confirm.content?.confirm) {
      return { content: [{ type: 'text', text: 'Write cancelled.' }] };
    }
  }

  await fs.mkdir(path.dirname(safePath), { recursive: true });
  await fs.writeFile(safePath, content, 'utf8');
  return { content: [{ type: 'text', text: `Written: ${safePath}` }] };
});

const transport = new StdioServerTransport();
await server.connect(transport);
Filesystem tools read_file list_directory search_files write_file with path validation roots check dark
Four filesystem tools with layered safety: roots validation, path sanitization, and elicitation for writes.

The Claude Filesystem Agent

// agent/fs-agent.js
import Anthropic from '@anthropic-ai/sdk';
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';

const anthropic = new Anthropic();

export async function createFilesystemAgent(projectRoot) {
  const transport = new StdioClientTransport({
    command: 'node',
    args: ['./servers/fs-server.js'],
    env: { ...process.env },
  });
  const mcp = new Client({
    name: 'fs-agent',
    version: '1.0.0',
    capabilities: { roots: { listChanged: true } },  // Declare roots support
  });
  await mcp.connect(transport);

  // Set the allowed root to the project directory
  // (roots are set by the client, enforced by the server)
  console.log(`Filesystem agent initialized. Root: ${projectRoot}`);

  const { tools: mcpTools } = await mcp.listTools();
  const tools = mcpTools.map(t => ({
    name: t.name, description: t.description, input_schema: t.inputSchema,
  }));

  return {
    async ask(question) {
      const messages = [{ role: 'user', content: question }];
      let turns = 0;

      while (true) {
        const response = await anthropic.messages.create({
          model: 'claude-3-7-sonnet-20250219',
          max_tokens: 4096,
          system: `You are a codebase assistant. The project root is ${projectRoot}.
Use read_file to examine files, list_directory to explore structure, search_files to find code.
Only use write_file when explicitly asked to modify files.`,
          messages,
          tools,
        });
        messages.push({ role: 'assistant', content: response.content });

        if (response.stop_reason !== 'tool_use') {
          return response.content.filter(b => b.type === 'text').map(b => b.text).join('');
        }

        if (++turns > 15) throw new Error('Max turns exceeded');

        const toolResults = await Promise.all(
          response.content.filter(b => b.type === 'tool_use').map(async block => {
            const result = await mcp.callTool({ name: block.name, arguments: block.input });
            const text = result.content.filter(c => c.type === 'text').map(c => c.text).join('\n');
            return { type: 'tool_result', tool_use_id: block.id, content: text };
          })
        );
        messages.push({ role: 'user', content: toolResults });
      }
    },
    async close() { await mcp.close(); },
  };
}

What to Extend

  • Add a run_tests tool that executes node --test and returns the output – the agent can then read failing test files and suggest fixes.
  • Add Claude’s extended thinking for architectural analysis queries (Lesson 21 pattern).
  • Add the prompt caching pattern from Lesson 23 to cache the system prompt for long analysis sessions.

nJoy πŸ˜‰

Project 1

This capstone project builds a complete, production-ready PostgreSQL query agent using OpenAI GPT-4o and MCP. By the end you will have a fully functional system where a user can ask questions in natural language and the agent translates them to safe, parameterized SQL queries, executes them against a real PostgreSQL database, formats the results, and explains its reasoning. The project incorporates lessons from throughout the course: schema validation, tool safety, audit logging, retry logic, and graceful shutdown.

PostgreSQL query agent architecture diagram OpenAI GPT-4o MCP server database tools natural language SQL dark
The database query agent: user asks a question, GPT-4o plans SQL queries, MCP tools execute them safely.

Project Structure

mcp-db-agent/
β”œβ”€β”€ package.json         (type: module, node 22+)
β”œβ”€β”€ .env                 (DATABASE_URL, OPENAI_API_KEY)
β”œβ”€β”€ servers/
β”‚   └── db-server.js     (MCP server with database tools)
β”œβ”€β”€ agent/
β”‚   └── query-agent.js   (OpenAI + MCP client loop)
β”œβ”€β”€ lib/
β”‚   β”œβ”€β”€ db.js            (PostgreSQL connection pool)
β”‚   β”œβ”€β”€ audit.js         (Audit logger)
β”‚   └── safety.js        (SQL safety checks)
└── index.js             (CLI entry point)

The MCP Database Server

// servers/db-server.js
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import { z } from 'zod';
import pg from 'pg';

const pool = new pg.Pool({ connectionString: process.env.DATABASE_URL });
const server = new McpServer({ name: 'db-server', version: '1.0.0' });

// Tool 1: List available tables
server.tool('list_tables', {}, async () => {
  const { rows } = await pool.query(
    "SELECT table_name, table_type FROM information_schema.tables WHERE table_schema = 'public' ORDER BY table_name"
  );
  return { content: [{ type: 'text', text: JSON.stringify(rows) }] };
});

// Tool 2: Describe a table's columns
server.tool('describe_table', {
  table_name: z.string().regex(/^[a-zA-Z_][a-zA-Z0-9_]*$/, 'Invalid table name'),
}, async ({ table_name }) => {
  const { rows } = await pool.query(
    'SELECT column_name, data_type, is_nullable, column_default FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2 ORDER BY ordinal_position',
    ['public', table_name]
  );
  if (rows.length === 0) {
    return { content: [{ type: 'text', text: `Table '${table_name}' not found` }], isError: true };
  }
  return { content: [{ type: 'text', text: JSON.stringify(rows) }] };
});

// Tool 3: Execute a read-only query (SELECT only)
server.tool('execute_query', {
  sql: z.string().max(2000),
  params: z.array(z.union([z.string(), z.number(), z.null()])).max(20).default([]),
}, async ({ sql, params }) => {
  // Safety check: only allow SELECT statements
  const normalizedSql = sql.trim().toUpperCase();
  if (!normalizedSql.startsWith('SELECT') && !normalizedSql.startsWith('WITH')) {
    return { content: [{ type: 'text', text: 'Only SELECT queries are allowed' }], isError: true };
  }

  // Forbid dangerous keywords
  const dangerous = ['DROP', 'DELETE', 'UPDATE', 'INSERT', 'ALTER', 'TRUNCATE', 'GRANT', 'REVOKE'];
  if (dangerous.some(kw => normalizedSql.includes(kw))) {
    return { content: [{ type: 'text', text: 'Query contains forbidden keywords' }], isError: true };
  }

  try {
    const { rows, rowCount } = await pool.query(sql, params);
    return {
      content: [{ type: 'text', text: JSON.stringify({ rows: rows.slice(0, 100), total: rowCount, truncated: rowCount > 100 }) }],
    };
  } catch (err) {
    return { content: [{ type: 'text', text: `Query failed: ${err.message}` }], isError: true };
  }
});

// Tool 4: Get row count (for planning queries)
server.tool('count_rows', {
  table_name: z.string().regex(/^[a-zA-Z_][a-zA-Z0-9_]*$/),
  where_clause: z.string().max(500).optional(),
}, async ({ table_name, where_clause }) => {
  const sql = where_clause
    ? `SELECT COUNT(*) as count FROM ${table_name} WHERE ${where_clause}`
    : `SELECT COUNT(*) as count FROM ${table_name}`;
  const { rows } = await pool.query(sql);
  return { content: [{ type: 'text', text: JSON.stringify(rows[0]) }] };
});

const transport = new StdioServerTransport();
await server.connect(transport);
Four database MCP tools list_tables describe_table execute_query count_rows with safety validation dark
Four tools: schema discovery (list, describe), safe query execution, and row counting for query planning.

The OpenAI Query Agent

// agent/query-agent.js
import OpenAI from 'openai';
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';

const openai = new OpenAI();

export async function createQueryAgent() {
  const transport = new StdioClientTransport({
    command: 'node',
    args: ['./servers/db-server.js'],
    env: { ...process.env },
  });
  const mcp = new Client({ name: 'query-agent', version: '1.0.0' });
  await mcp.connect(transport);
  const { tools: mcpTools } = await mcp.listTools();

  const tools = mcpTools.map(t => ({
    type: 'function',
    function: { name: t.name, description: t.description, parameters: t.inputSchema, strict: true },
  }));

  const SYSTEM_PROMPT = `You are a precise database analyst.
You have access to a PostgreSQL database. To answer questions:
1. First call list_tables to see available tables
2. Call describe_table for tables relevant to the question
3. Plan a safe SELECT query (use parameters for any user values)
4. Call execute_query with the query and parameters
5. Present results clearly with a brief interpretation

Always use parameterized queries. Never build SQL by string concatenation.
If a question cannot be answered with a SELECT, say so clearly.`;

  return {
    async query(userQuestion) {
      const messages = [
        { role: 'system', content: SYSTEM_PROMPT },
        { role: 'user', content: userQuestion },
      ];
      let turns = 0;

      while (true) {
        const response = await openai.chat.completions.create({
          model: 'gpt-4o', messages, tools, tool_choice: 'auto',
        });
        const msg = response.choices[0].message;
        messages.push(msg);

        if (msg.finish_reason !== 'tool_calls') {
          return msg.content;
        }

        if (++turns > 10) throw new Error('Max turns exceeded');

        const results = await Promise.all(msg.tool_calls.map(async tc => {
          const args = JSON.parse(tc.function.arguments);
          const result = await mcp.callTool({ name: tc.function.name, arguments: args });
          const text = result.content.filter(c => c.type === 'text').map(c => c.text).join('\n');
          return { role: 'tool', tool_call_id: tc.id, content: text };
        }));
        messages.push(...results);
      }
    },
    async close() { await mcp.close(); },
  };
}

Running the Agent

// index.js
import { createQueryAgent } from './agent/query-agent.js';
import readline from 'node:readline';

const agent = await createQueryAgent();
const rl = readline.createInterface({ input: process.stdin, output: process.stdout });

console.log('PostgreSQL Query Agent ready. Ask anything about your data.');
console.log('Type "exit" to quit.\n');

rl.on('line', async (line) => {
  if (line.trim() === 'exit') { await agent.close(); process.exit(0); }
  if (!line.trim()) return;
  try {
    const answer = await agent.query(line);
    console.log('\n' + answer + '\n');
  } catch (err) {
    console.error('Error:', err.message);
  }
});

What to Extend

  • Add the audit logging middleware from Lesson 35 to log every execute_query call with the SQL, user, and result count.
  • Add a sample_rows tool that returns 3 rows from any table – helps the model understand data format before writing queries.
  • Connect it to your real production database with a read-only service account.

nJoy πŸ˜‰

Writing Custom Transports and Protocol Extensions

The MCP SDK ships with two built-in transports: stdio and Streamable HTTP. These cover the vast majority of use cases. But sometimes you need something different: an in-process transport for testing, a WebSocket transport for browser environments, an IPC transport for Electron apps, or a transport that encrypts the JSON-RPC stream at the application layer. The SDK’s transport interface is deliberately minimal, making it straightforward to implement your own. This lesson covers the interface, two reference implementations, and practical extension points.

MCP custom transport interface diagram showing Transport interface implementations InProcess WebSocket IPC dark
The Transport interface is three methods: start, send, and close. Any communication channel can become an MCP transport.

The Transport Interface

// The MCP SDK Transport interface (TypeScript definition for reference)
// interface Transport {
//   start(): Promise;
//   send(message: JSONRPCMessage): Promise;
//   close(): Promise;
//   onmessage?: (message: JSONRPCMessage) => void;
//   onerror?: (error: Error) => void;
//   onclose?: () => void;
// }

// In JavaScript, implement the same shape:
class CustomTransport {
  onmessage = null;   // Called when a message is received
  onerror = null;     // Called on transport errors
  onclose = null;     // Called when the transport closes

  async start() {
    // Initialize the underlying communication channel
  }

  async send(message) {
    // Send a JSONRPCMessage object
  }

  async close() {
    // Clean up the channel
  }
}

In-Process Transport for Testing

An in-process transport connects a client directly to a server in the same Node.js process. Essential for integration tests without spawning subprocesses:

// in-process-transport.js

export function createInProcessTransport() {
  let clientTransport, serverTransport;

  clientTransport = {
    onmessage: null, onerror: null, onclose: null,
    async start() {},
    async send(msg) {
      // Route to server
      if (serverTransport.onmessage) serverTransport.onmessage(msg);
    },
    async close() {
      if (clientTransport.onclose) clientTransport.onclose();
      if (serverTransport.onclose) serverTransport.onclose();
    },
  };

  serverTransport = {
    onmessage: null, onerror: null, onclose: null,
    async start() {},
    async send(msg) {
      // Route to client
      if (clientTransport.onmessage) clientTransport.onmessage(msg);
    },
    async close() {
      if (clientTransport.onclose) clientTransport.onclose();
      if (serverTransport.onclose) serverTransport.onclose();
    },
  };

  return { clientTransport, serverTransport };
}

// Usage in tests:
import { test } from 'node:test';
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { createInProcessTransport } from './in-process-transport.js';

test('in-process round trip', async (t) => {
  const { clientTransport, serverTransport } = createInProcessTransport();
  const server = buildServer();
  const client = new Client({ name: 'test', version: '1.0.0' });

  await server.connect(serverTransport);
  await client.connect(clientTransport);

  const { tools } = await client.listTools();
  assert.ok(tools.length > 0);

  await client.close();
});
In-process transport diagram client and server connected directly in same process for testing no network dark
In-process transport: no network, no subprocess, instant round trip – ideal for unit and integration testing.

WebSocket Transport

npm install ws
// websocket-transport.js - client side
import WebSocket from 'ws';

export class WebSocketClientTransport {
  #url;
  #ws = null;
  onmessage = null;
  onerror = null;
  onclose = null;

  constructor(url) {
    this.#url = url;
  }

  async start() {
    return new Promise((resolve, reject) => {
      this.#ws = new WebSocket(this.#url);
      this.#ws.once('open', resolve);
      this.#ws.once('error', reject);
      this.#ws.on('message', (data) => {
        try {
          const msg = JSON.parse(data.toString());
          if (this.onmessage) this.onmessage(msg);
        } catch (err) {
          if (this.onerror) this.onerror(err);
        }
      });
      this.#ws.on('close', () => {
        if (this.onclose) this.onclose();
      });
      this.#ws.on('error', (err) => {
        if (this.onerror) this.onerror(err);
      });
    });
  }

  async send(message) {
    this.#ws.send(JSON.stringify(message));
  }

  async close() {
    this.#ws?.close();
  }
}

// WebSocket server transport
export class WebSocketServerTransport {
  #socket;
  onmessage = null;
  onerror = null;
  onclose = null;

  constructor(socket) {
    this.#socket = socket;
    socket.on('message', (data) => {
      try {
        const msg = JSON.parse(data.toString());
        if (this.onmessage) this.onmessage(msg);
      } catch (err) {
        if (this.onerror) this.onerror(err);
      }
    });
    socket.on('close', () => {
      if (this.onclose) this.onclose();
    });
  }

  async start() {}

  async send(message) {
    this.#socket.send(JSON.stringify(message));
  }

  async close() {
    this.#socket.close();
  }
}

// Server side: wrap ws.WebSocketServer
import { WebSocketServer } from 'ws';
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';

const wss = new WebSocketServer({ port: 9000 });
wss.on('connection', async (socket) => {
  const transport = new WebSocketServerTransport(socket);
  const server = buildMcpServer();
  await server.connect(transport);
});

Protocol Extensions: Custom Methods

// MCP allows custom methods beyond the spec - they are prefixed with your namespace
// Use this for proprietary extensions that are specific to your deployment

// Server side: handle a custom method
server.server.setRequestHandler(
  { method: 'com.mycompany/getServerMetrics' },
  async (request) => {
    return {
      uptime: process.uptime(),
      activeSessions: sessionStore.size,
      memoryMB: Math.round(process.memoryUsage().heapUsed / 1024 / 1024),
    };
  }
);

// Client side: call a custom method
const metrics = await client.request(
  { method: 'com.mycompany/getServerMetrics', params: {} },
  /* ResultSchema */ undefined
);

What to Build Next

  • Replace subprocess spawning in your integration tests with the in-process transport. Measure the test speedup.
  • If you have a browser-based MCP client, implement the WebSocket transport and test it against your existing MCP server with a WebSocket adapter.

nJoy πŸ˜‰

Protocol Versioning, Backwards Compatibility, and Migration

The MCP specification evolves. New capabilities are added; some older mechanisms are deprecated; breaking changes occasionally ship. Building MCP servers that handle protocol version negotiation correctly means your clients and servers can interoperate across version boundaries without hard dependencies on a single spec revision. This lesson covers how MCP versioning works, how to negotiate capabilities with older clients, how to write migration guides when your own server schema changes, and the stability guarantees you can rely on from Anthropic.

MCP protocol versioning negotiation diagram client offering versions server selecting compatible version dark
MCP version negotiation: client offers supported versions, server selects the best match.

How MCP Protocol Versioning Works

MCP uses date-stamped version strings like 2024-11-05 or 2025-03-26. During initialization, the client sends the version it wants, and the server responds with the version it will use (typically the same or an older compatible one).

// Initialization exchange (JSON-RPC)
// Client sends:
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "initialize",
  "params": {
    "protocolVersion": "2025-03-26",
    "clientInfo": { "name": "my-client", "version": "2.0.0" },
    "capabilities": { "sampling": {}, "elicitation": {} }
  }
}

// Server responds with the version it accepts:
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "protocolVersion": "2025-03-26",
    "serverInfo": { "name": "my-server", "version": "1.5.0" },
    "capabilities": { "tools": {}, "resources": {}, "prompts": {} }
  }
}
// The @modelcontextprotocol/sdk handles version negotiation automatically
// You do not need to implement it manually

// To check the negotiated version in your server:
server.server.oninitialized = () => {
  const version = server.server.negotiatedProtocolVersion;
  console.log(`MCP session initialized with protocol version: ${version}`);
};

Feature Detection (Capability Negotiation)

// Check if the connected client supports a specific capability
// before using it in your server code

server.server.oninitialized = () => {
  const clientCaps = server.server.getClientCapabilities();

  const supportsElicitation = !!clientCaps?.elicitation;
  const supportsSampling = !!clientCaps?.sampling;
  const supportsRoots = !!clientCaps?.roots;

  console.log(`Client capabilities: elicitation=${supportsElicitation} sampling=${supportsSampling} roots=${supportsRoots}`);

  if (!supportsElicitation) {
    // Fall back to returning instructions in tool result instead of interactive elicitation
    console.warn('Client does not support elicitation - using text fallback');
  }
};
Capability negotiation table client declares capabilities server checks before using elicitation sampling roots dark
Always check client capabilities before using server-initiated features like elicitation or sampling.

Migrating Your Tool Schema

When you change a tool’s input schema, existing clients that have cached the old schema will break. Follow a compatibility-first migration process:

// Backwards-compatible schema evolution: add optional fields, never remove required ones

// Version 1 schema (existing clients use this)
// search_products: { query: z.string(), limit: z.number().optional().default(10) }

// Version 2: add optional 'category' filter without breaking v1 clients
server.tool('search_products', {
  query: z.string(),
  limit: z.number().optional().default(10),
  category: z.string().optional(),           // New optional field - backwards compatible
  // NEVER remove or rename 'query' or 'limit' - that breaks v1 clients
  // NEVER make an optional field required - that also breaks v1 clients
}, handler);
// Breaking change strategy: add a versioned tool name during transition
// Phase 1: add new tool alongside old one
server.tool('search_products_v2', {
  query: z.string(),
  limit: z.number().optional().default(10),
  filters: z.object({  // New required field - would break v1 if added to original
    category: z.string().optional(),
    priceMax: z.number().optional(),
    inStock: z.boolean().optional().default(true),
  }),
}, handler);

// Phase 2: deprecate old tool via description
// server.tool('search_products', ... 
//   description: 'DEPRECATED: use search_products_v2 instead'

// Phase 3 (after client migration window): remove old tool

Version Compatibility Matrix

MCP Spec Version SDK Version Key Features Added
2024-11-05 0.x Initial release: tools, resources, prompts, sampling
2025-03-26 1.x Elicitation, streamable HTTP transport, tasks API

Stability Guarantees

  • JSON-RPC 2.0 wire format: Stable. Will not change between spec versions.
  • Core methods (initialize, tools/call, resources/read, prompts/get): Stable across all versions.
  • New capabilities: Always added as optional, never required for a functional server.
  • Deprecated features: Maintained for at least 2 spec revisions before removal.
  • SDK APIs: The TypeScript/JavaScript SDK minor versions maintain backwards compatibility; only major versions may include breaking changes.

What to Build Next

  • Add a server://version resource to your MCP server that returns the current protocol version, SDK version, and your tool schema versions. Update it on every release.
  • Review your most-used tools for any fields that are currently optional but should be made required. Use the v2 naming strategy to transition safely.

nJoy πŸ˜‰

Cancellation, Progress, and Backpressure in MCP Streams

Streaming responses, long-running tools, and multi-step agent pipelines all share a common challenge: what happens when the client stops listening? Without proper cancellation propagation, cancelled client connections leave expensive operations running on the server indefinitely. This lesson covers three related mechanisms: request cancellation using AbortSignal, progress reporting with real-time updates, and backpressure strategies that prevent fast producers from overwhelming slow consumers.

Cancellation propagation diagram client disconnect AbortSignal tool cleanup chain stop resource release dark
Cancellation must propagate through the entire call chain: from client disconnect to every active resource.

AbortSignal in MCP Tool Handlers

When a client disconnects or cancels a request, the MCP SDK calls server.setRequestHandler‘s signal. Tool handlers should check this signal and abort expensive operations:

import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { z } from 'zod';

const server = new McpServer({ name: 'streaming-server', version: '1.0.0' });

server.tool('search_large_dataset', {
  query: z.string(),
  maxResults: z.number().default(100),
}, async ({ query, maxResults }, { signal }) => {
  // Pass signal to database query
  const results = await db.search(query, { maxResults, signal });

  // Pass signal to downstream HTTP calls
  const enriched = await Promise.all(
    results.map(r =>
      fetch(`https://enrichment.api/v1/${r.id}`, { signal })
        .then(res => res.json())
        .catch(err => {
          if (err.name === 'AbortError') throw err;  // Re-throw cancellation
          return r;  // Return unenriched on other errors
        })
    )
  );

  return { content: [{ type: 'text', text: JSON.stringify(enriched) }] };
});
// In database clients that support AbortSignal
async function search(query, { maxResults, signal } = {}) {
  const client = await pool.connect();

  // Register cleanup on signal abort
  const cleanup = () => {
    client.query('SELECT pg_cancel_backend(pg_backend_pid())').catch(() => {});
    client.release();
  };

  signal?.addEventListener('abort', cleanup, { once: true });

  try {
    const result = await client.query(
      'SELECT * FROM products WHERE to_tsvector(description) @@ plainto_tsquery($1) LIMIT $2',
      [query, maxResults]
    );
    return result.rows;
  } finally {
    signal?.removeEventListener('abort', cleanup);
    client.release();
  }
}

Progress Reporting via Streaming Tool Results

// MCP tools can emit progress events using the server's notification mechanism
// For now, progress is communicated via the task polling pattern from Lesson 45
// or via streaming text content updates

server.tool('process_batch', {
  items: z.array(z.string()).max(1000),
}, async ({ items }, { signal }) => {
  const results = [];
  const total = items.length;

  for (let i = 0; i < items.length; i++) {
    if (signal?.aborted) {
      return {
        content: [{ type: 'text', text: JSON.stringify({
          status: 'cancelled',
          processed: i,
          total,
          results,
        }) }],
      };
    }

    const result = await processItem(items[i]);
    results.push(result);

    // Emit progress via logs/notification (visible in MCP Inspector)
    if (i % 50 === 0) {
      server.server.sendLoggingMessage({
        level: 'info',
        data: `Progress: ${i + 1}/${total} (${Math.round(((i + 1) / total) * 100)}%)`,
      });
    }
  }

  return { content: [{ type: 'text', text: JSON.stringify({ status: 'complete', results }) }] };
});
Progress reporting pattern batch processing loop checking AbortSignal emitting log notifications at intervals dark
Batch tools check the AbortSignal on each iteration and emit progress via logging notifications.

Backpressure in Streaming Tool Results

// When a tool generates large amounts of streaming data,
// use a ReadableStream with backpressure control

server.tool('stream_logs', {
  service: z.string(),
  since: z.string(),
}, async ({ service, since }, { signal }) => {
  // Generator-based streaming with backpressure
  async function* generateLogs() {
    const logStream = await getLiveLogStream(service, since, { signal });
    let buffer = [];

    for await (const logLine of logStream) {
      if (signal?.aborted) break;
      buffer.push(logLine);

      // Yield batches of 50 lines to avoid overwhelming the response
      if (buffer.length >= 50) {
        yield buffer.join('\n');
        buffer = [];
        // Yield control to allow backpressure to work
        await new Promise(r => setImmediate(r));
      }
    }

    if (buffer.length > 0) yield buffer.join('\n');
  }

  // Collect all chunks (in practice, return first N lines for tool calls)
  const chunks = [];
  let totalLines = 0;

  for await (const chunk of generateLogs()) {
    chunks.push(chunk);
    totalLines += chunk.split('\n').length;
    if (totalLines > 500) {
      chunks.push('[...truncated, 500 line limit reached]');
      break;
    }
  }

  return { content: [{ type: 'text', text: chunks.join('\n') }] };
});

Handling SSE Client Disconnections

// For Streamable HTTP servers, detect client disconnections via res.on('close')
app.post('/mcp', async (req, res) => {
  const transport = getOrCreateTransport(req);

  // Create an AbortController for this connection
  const controller = new AbortController();
  req.socket.on('close', () => controller.abort());

  // Pass the signal to the MCP transport (SDK handles propagation to tool handlers)
  await transport.handleRequest(req, res, req.body, { signal: controller.signal });
});

What to Build Next

  • Add signal?.addEventListener('abort', cleanup) to your longest-running tool handler. Test it by disconnecting the client mid-execution and verify resources are released.
  • Add a per-tool timeout using AbortSignal.timeout(ms) to prevent any single tool call from running indefinitely.

nJoy πŸ˜‰

Tasks API

Most MCP tool calls complete in under a second: query a database, call an API, read a file. But some operations take minutes or hours: training a model, processing a large dataset, running a batch export, triggering a CI/CD pipeline. For these, a synchronous request-response model breaks down. The MCP Tasks API provides an async task model: a client submits a task, the server accepts it immediately, the client polls for updates, and the server streams progress events until completion. This lesson covers the full Tasks API implementation.

MCP Tasks API async operation diagram task submitted accepted polling progress events completion dark
Tasks API: submit a long-running operation, poll for progress via SSE, receive the result when done.

When to Use Tasks API vs Regular Tools

  • Use regular tools for operations that complete in under 30 seconds. Keep them synchronous – the LLM waits for the result before proceeding.
  • Use Tasks API for operations that take longer than 30 seconds, produce intermediate results the user or LLM can act on, or may fail partway through and need resumability.

Server-Side Task Implementation

import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { z } from 'zod';
import crypto from 'node:crypto';

const server = new McpServer({ name: 'async-server', version: '1.0.0' });

// Task store (use Redis or PostgreSQL in production)
const tasks = new Map();

// Submit a long-running task - returns a task ID immediately
server.tool('start_data_export', {
  datasetId: z.string(),
  format: z.enum(['csv', 'json', 'parquet']),
  dateRange: z.object({
    from: z.string(),
    to: z.string(),
  }),
}, async ({ datasetId, format, dateRange }) => {
  const taskId = crypto.randomUUID();

  // Store task state
  tasks.set(taskId, {
    id: taskId,
    status: 'pending',
    progress: 0,
    createdAt: new Date().toISOString(),
    result: null,
    error: null,
  });

  // Start the long-running operation asynchronously
  runExportTask(taskId, datasetId, format, dateRange).catch(err => {
    const task = tasks.get(taskId);
    if (task) {
      task.status = 'failed';
      task.error = err.message;
    }
  });

  return {
    content: [{
      type: 'text',
      text: JSON.stringify({ taskId, status: 'pending', message: 'Export started. Use get_task_status to check progress.' }),
    }],
  };
});

// Poll task status
server.tool('get_task_status', {
  taskId: z.string().uuid(),
}, async ({ taskId }) => {
  const task = tasks.get(taskId);
  if (!task) {
    return {
      content: [{ type: 'text', text: JSON.stringify({ error: 'Task not found' }) }],
      isError: true,
    };
  }
  return { content: [{ type: 'text', text: JSON.stringify(task) }] };
});

// The actual long-running work
async function runExportTask(taskId, datasetId, format, dateRange) {
  const task = tasks.get(taskId);
  task.status = 'running';

  const totalRows = await db.countRows(datasetId, dateRange);
  const batchSize = 1000;
  const batches = Math.ceil(totalRows / batchSize);
  const results = [];

  for (let i = 0; i < batches; i++) {
    const batch = await db.fetchBatch(datasetId, dateRange, i * batchSize, batchSize);
    results.push(...batch);
    task.progress = Math.round(((i + 1) / batches) * 100);
    task.message = `Processed ${Math.min((i + 1) * batchSize, totalRows)} / ${totalRows} rows`;
    // Small delay to not hammer the DB
    await new Promise(r => setTimeout(r, 10));
  }

  const exportUrl = await uploadToStorage(results, format);
  task.status = 'completed';
  task.progress = 100;
  task.result = { url: exportUrl, rowCount: results.length };
}
Task status polling pattern LLM calling get_task_status multiple times watching progress 0 to 100 percent dark
LLM polling pattern: call start_task, then poll get_task_status with increasing intervals until status is ‘completed’.

Client-Side: LLM-Driven Task Polling

// System prompt that teaches the LLM how to handle async tasks
const ASYNC_SYSTEM_PROMPT = `When you call a tool that returns a taskId (like start_data_export), 
you must poll for the result using get_task_status. 
Poll every 5 seconds until status is 'completed' or 'failed'.
When completed, use the result URL to complete the user's request.
When failed, report the error message.`;

// Add this as part of the tool description to hint the LLM
server.tool('start_data_export', /* ... */);
// Tool description: "Starts a data export. Returns a taskId. Use get_task_status to check progress. 
//                   Poll until status is 'completed', then use the result.url."

Task Cancellation

server.tool('cancel_task', {
  taskId: z.string().uuid(),
}, async ({ taskId }) => {
  const task = tasks.get(taskId);
  if (!task) {
    return { content: [{ type: 'text', text: 'Task not found' }], isError: true };
  }
  if (task.status === 'completed' || task.status === 'failed') {
    return { content: [{ type: 'text', text: `Cannot cancel: task already ${task.status}` }], isError: true };
  }
  task.status = 'cancelled';
  task.cancelledAt = new Date().toISOString();
  // The running task checks for cancellation in its loop
  return { content: [{ type: 'text', text: `Task ${taskId} cancelled` }] };
});

Task Expiry and Cleanup

// Clean up completed/failed tasks older than 24 hours
setInterval(() => {
  const cutoff = Date.now() - 24 * 60 * 60 * 1000;
  for (const [id, task] of tasks) {
    if (['completed', 'failed', 'cancelled'].includes(task.status)) {
      const age = new Date(task.createdAt).getTime();
      if (age < cutoff) tasks.delete(id);
    }
  }
}, 60 * 60 * 1000);  // Run every hour

What to Build Next

  • Identify one tool in your MCP server that regularly takes longer than 10 seconds. Refactor it using the async task pattern from this lesson.
  • Add a list_my_tasks tool that returns all pending and running tasks for the authenticated user.

nJoy πŸ˜‰

MCP Registry, Discovery, and Service Mesh Patterns

In large organizations, the number of MCP servers grows quickly. A payments MCP server, a customer data MCP server, a product catalog server, an analytics server – each maintained by different teams. Without a registry, every agent developer must manually configure each server’s URL, credentials, and capabilities. A registry solves this: publish once, discover everywhere. This lesson builds an MCP server registry, a discovery client, and covers service mesh integration patterns for enterprise deployments.

MCP server registry diagram servers publishing capabilities agents discovering via registry service mesh dark
MCP registry: servers publish capabilities, agents query the registry to build their tool set dynamically.

Registry Data Model

// A registry entry describes one MCP server
/**
 * @typedef {Object} RegistryEntry
 * @property {string} id - Unique server identifier (slug)
 * @property {string} name - Human-readable name
 * @property {string} description - What this server does
 * @property {string} url - Base URL for Streamable HTTP transport
 * @property {string} version - Server version (semver)
 * @property {string[]} tags - Capability tags for discovery (e.g., ['products', 'inventory'])
 * @property {Object} auth - Authentication requirements
 * @property {string} auth.type - 'none' | 'bearer' | 'oauth2'
 * @property {string} [auth.tokenEndpoint] - OAuth token endpoint if auth.type === 'oauth2'
 * @property {string} healthUrl - Health check endpoint
 * @property {Date} lastSeen - Last successful health check
 * @property {'healthy' | 'degraded' | 'down'} status - Current health status
 */

Simple Registry Server

// registry-server.js - A lightweight HTTP registry for MCP servers
import express from 'express';

const app = express();
app.use(express.json());

// In-memory store (use Redis or PostgreSQL in production)
const registry = new Map();

// Register a server
app.post('/servers', (req, res) => {
  const entry = {
    ...req.body,
    registeredAt: new Date().toISOString(),
    lastSeen: new Date().toISOString(),
    status: 'healthy',
  };
  registry.set(entry.id, entry);
  res.status(201).json({ id: entry.id });
});

// List all healthy servers (with optional tag filter)
app.get('/servers', (req, res) => {
  const { tags, status = 'healthy' } = req.query;
  let servers = [...registry.values()].filter(s => s.status === status);

  if (tags) {
    const filterTags = tags.split(',');
    servers = servers.filter(s => filterTags.some(t => s.tags?.includes(t)));
  }

  res.json({ servers });
});

// Health check runner: poll all registered servers every 30 seconds
setInterval(async () => {
  for (const [id, entry] of registry) {
    try {
      const res = await fetch(entry.healthUrl, { signal: AbortSignal.timeout(5000) });
      entry.status = res.ok ? 'healthy' : 'degraded';
      entry.lastSeen = new Date().toISOString();
    } catch {
      entry.status = 'down';
    }
    registry.set(id, entry);
  }
}, 30_000);

app.listen(4000, () => console.log('Registry listening on :4000'));

Discovery Client for Agents

// discovery-client.js - Used by agent hosts to discover MCP servers
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamable-http.js';

class McpDiscoveryClient {
  #registryUrl;
  #connections = new Map();

  constructor(registryUrl) {
    this.#registryUrl = registryUrl;
  }

  // Discover servers by tags and establish connections
  async connect(tags = []) {
    const query = tags.length ? `?tags=${tags.join(',')}` : '';
    const res = await fetch(`${this.#registryUrl}/servers${query}`);
    const { servers } = await res.json();

    const connected = [];
    for (const server of servers) {
      if (this.#connections.has(server.id)) {
        connected.push(server);
        continue;
      }

      try {
        const transport = new StreamableHTTPClientTransport(new URL(`${server.url}/mcp`));
        const client = new Client({ name: 'discovery-host', version: '1.0.0' });
        await client.connect(transport);
        this.#connections.set(server.id, { client, server });
        connected.push(server);
        console.log(`Connected to ${server.name} (${server.id})`);
      } catch (err) {
        console.error(`Failed to connect to ${server.name}: ${err.message}`);
      }
    }
    return connected;
  }

  // Get all tools from all connected servers
  async getAllTools() {
    const allTools = [];
    for (const [id, { client, server }] of this.#connections) {
      try {
        const { tools } = await client.listTools();
        allTools.push(...tools.map(t => ({ ...t, serverId: id })));
      } catch (err) {
        console.error(`Failed to list tools from ${id}: ${err.message}`);
      }
    }
    return allTools;
  }

  // Route a tool call to the correct server
  async callTool(toolName, args) {
    for (const [, { client }] of this.#connections) {
      const { tools } = await client.listTools();
      if (tools.some(t => t.name === toolName)) {
        return client.callTool({ name: toolName, arguments: args });
      }
    }
    throw new Error(`Tool '${toolName}' not found in any connected server`);
  }
}

// Usage
const discovery = new McpDiscoveryClient('https://registry.internal');
await discovery.connect(['products', 'analytics']);
const allTools = await discovery.getAllTools();
console.log(`Discovered ${allTools.length} tools across all servers`);
Discovery client connecting to registry fetching server list connecting to multiple MCP servers aggregating tools dark
Discovery flow: query registry by tags -> connect to relevant servers -> aggregate tools -> route calls.

Service Mesh Integration (Istio / Linkerd)

In Kubernetes environments, a service mesh handles mutual TLS, traffic routing, and observability for all service-to-service communication, including MCP connections:

# With Istio, MCP server-to-server communication is automatically mTLS
# No code changes required - the sidecar proxy handles it

# Example: VirtualService for traffic splitting during MCP server rollout
apiVersion: networking.istio.io/v1alpha3
kind: VirtualService
metadata:
  name: mcp-product-server
spec:
  hosts:
    - mcp-product-server
  http:
    - route:
        - destination:
            host: mcp-product-server
            subset: v2
          weight: 10  # 10% to new version
        - destination:
            host: mcp-product-server
            subset: v1
          weight: 90  # 90% to stable version

Server Health Aggregation

// Aggregate health status across all registered servers for a status page
app.get('/status', async (req, res) => {
  const servers = [...registry.values()];
  const healthy = servers.filter(s => s.status === 'healthy').length;
  const degraded = servers.filter(s => s.status === 'degraded').length;
  const down = servers.filter(s => s.status === 'down').length;

  const overall = down > 0 ? 'degraded' : (degraded > 0 ? 'degraded' : 'operational');

  res.json({
    status: overall,
    summary: { total: servers.length, healthy, degraded, down },
    servers: servers.map(s => ({
      id: s.id, name: s.name, status: s.status, lastSeen: s.lastSeen,
    })),
  });
});

What to Build Next

  • Deploy the registry server alongside your existing MCP servers. Register each server on startup using a POST to the registry.
  • Build a simple status page that reads from /status and shows which MCP servers are healthy.

nJoy πŸ˜‰