阅读本文章,你能得到什么?

  • 了解 MCP(Model Context Protocol)
  • 了解 MCP 最朴素的规范
  • 知道 MCP 是为了解决什么问题而出现的
  • 了解 MCP 的通讯协议,并且使用代码进行扩展验证
  • 动手实现一个遵循 MCP 的功能完备的 Agent 应用
  • 掌握快速了解一个新事物的思路(新事物一般来自于解决某些问题而衍生出来)

导读

最近 Model Context Protocol 比较火,有人说它无所不能,有人说它与 Function-Call 并无本质差别,有点小题大做,本着持续、高效学习新技术的心态,我也看了看各种博客以及官方文档,发现大多数博客都是基于官方文档的搬运,都是获取天气的案例,未考虑 Tool 的参数描述,等细节上的问题,而官方文档也只是说明了最核心的协议架构图,协议解决的问题,协议的优势,但是关于实际投入生产使用部分,都封装在代码 SDK 中,Get Start 文档也都是基于 Agent 侧展开的,并无详细的介绍,对于代码高级特效了解不多,或者使用 Windows 电脑安装的 Claude 找不到 developer 选项的人不太友好,因此我写下这篇文章,将 MCP 协议的相关知识进行拆分,一个一个攻克,攻克完成后统一汇总,实现一个功能基本完备的 LLM Agent,例如,使用大模型控制家里的电扇,控制家里的花盆浇水,当然了,在阅读这篇文章的时候,建议先阅读一下 官方文档 。

Model Context Protocol 协议

 如上图所示,是我基于理解梳理的架构图,可见,编排工具,例如:Trae、cursor、claude 等,都是内置了 Agent 的,Agent 可以基于用户输入、Prompt 以及 LLM 的结果,完成一些任务,并且随着技术以及方法论的成熟,衍生出很多概念,例如 ReAct 模式,Agent 可以基于用户的任务,进行任务拆分、编排以及执行,当然在之前 Agent 完成任务的时候,手头的工具(Tool)都是预先定义好的,这些 Tool 在定义的时候,有功能描述,参数类型含义,返回结果类型与含义,MCP 出现之后,Agent 又多了一个工具 MCP-Client,可以基于 Client 与注册上来的 MCP-Server 们进行通讯,进行数据采集,任务下发,基于架构图可以看出来 MCP-Client - 1 ---> n - MCP-Server,并且 MCP-Server 可以动态的插拔,有点传统编程技术上的注册中心 Nacos 的意思,万变不离其宗,那么我们可以推理一下,如果要我们自己设计的话,需要考虑哪些事情,然后对照看看 MCP 的官网 SDK 是如何实现的?

  • MCP-Server 与 MCP-Client 是如何通讯的?
  • MCP-Server 注册的时候,需要提供哪些信息?
  • MCP-Client 调用 MCP-Server 的时候,需要传递哪些信息,信息如何获取?

Stdio 通讯原理

基于官方文档查看,Client 与 Server 是 stdio 进程通讯,因此可以解耦,只要满足协议,不限制编程语言,使用如下命令可以模拟通讯过程,需要安装 node 、npm 以及 python3.10,管理 python 建议使用虚拟环境,stdio 通讯(标准输入输出通信)是指进程之间通过标准输入(stdin)、标准输出(stdout)和标准错误输出(stderr)进行的数据传递方式。这是一种常见的进程间通信(IPC)机制,尤其在 Unix/Linux 系统中,用于命令行工具组合(如 ls | grep txt)和子进程交互(如通过编程语言调用其他程序并处理其输出)。


shell

体验AI代码助手

代码解读

复制代码

npm run parent.js

parent.js


js

体验AI代码助手

代码解读

复制代码

#!parent.js 文件 #!/usr/bin/env node const { spawn } = require('child_process'); const { EventEmitter } = require('events'); let nextId = 1, pending = new Map(); const emitter = new EventEmitter(); let buffer = ''; // 启动 Python 子进程 const child = spawn('python3', ['child.py'], { stdio: ['pipe', 'pipe', 'inherit'] }); // 监听 stdout,处理数据帧 child.stdout.on('data', chunk => { buffer += chunk.toString(); while (hasFullFrame(buffer)) { const { body, rest } = extractFrame(buffer); buffer = rest; const msg = JSON.parse(body); if (msg.id !== undefined) { const { resolve, reject } = pending.get(msg.id) || {}; pending.delete(msg.id); msg.error ? reject(msg.error) : resolve(msg.result); } else { emitter.emit(msg.method, msg.params); } } }); // 判断 buffer 中是否有完整帧 function hasFullFrame(buffer) { const match = buffer.match(/^Content-Length: (\d+)\r\n\r\n/); if (!match) return false; const len = parseInt(match[1], 10); const totalLen = match[0].length + len; return buffer.length >= totalLen; } // 提取完整帧 function extractFrame(buffer) { const match = buffer.match(/^Content-Length: (\d+)\r\n\r\n/); const len = parseInt(match[1], 10); const start = match[0].length; const body = buffer.slice(start, start + len); const rest = buffer.slice(start + len); return { body, rest }; } // 发送请求 function sendRequest(method, params) { return new Promise((resolve, reject) => { const id = nextId++; const req = { jsonrpc: '2.0', id, method, params }; pending.set(id, { resolve, reject }); const payload = JSON.stringify(req); child.stdin.write(`Content-Length: ${Buffer.byteLength(payload)}\r\n\r\n${payload}`); }); } // 发送通知 function sendNotification(method, params) { const payload = JSON.stringify({ jsonrpc: '2.0', method, params }); child.stdin.write(`Content-Length: ${Buffer.byteLength(payload)}\r\n\r\n${payload}`); } // 示例:调用 Python 中的 add 方法 sendRequest('add', { a: 3, b: 7 }).then(result => { console.log('Add result:', result); }).catch(err => { console.error('Error:', err); }); // 示例:监听来自 Python 的通知 emitter.on('status', params => { console.log('Status from Python:', params); });


py

体验AI代码助手

代码解读

复制代码

#!child.py 文件 #!/usr/bin/env python3 import sys import json import threading def read_messages(): buffer = "" while True: chunk = sys.stdin.read(1) if not chunk: break buffer += chunk while True: if "\r\n\r\n" not in buffer: break header, rest = buffer.split("\r\n\r\n", 1) content_length = 0 for line in header.split("\r\n"): if line.lower().startswith("content-length:"): content_length = int(line.split(":")[1].strip()) if len(rest) < content_length: break body = rest[:content_length] buffer = rest[content_length:] handle_message(json.loads(body)) def handle_message(msg): if 'method' in msg: if msg.get('id') is not None: # 是请求,返回响应 if msg['method'] == 'add': a = msg['params'].get('a', 0) b = msg['params'].get('b', 0) result = a + b response = { "jsonrpc": "2.0", "id": msg['id'], "result": result } send(response) else: # 是通知 print("Received notification:", msg['method'], msg['params'], file=sys.stderr) def send(obj): data = json.dumps(obj) sys.stdout.write(f"Content-Length: {len(data)}\r\n\r\n{data}") sys.stdout.flush() # 主动发送通知 def send_status(): import time import random while True: send({ "jsonrpc": "2.0", "method": "status", "params": {"message": "running", "load": round(random.random(), 2)} }) time.sleep(5) # 启动通知线程 threading.Thread(target=send_status, daemon=True).start() # 开始读取输入 read_messages()

MCP-Server 注册的时候,需要提供哪些信息

个人分析

作为一个服务提供方 MCP-Server 需要清晰的提供如下信息:

  • 资源 Resource
    • 提供
      • 资源名称
      • 资源描述
      • 资源入参类型,入参名称,入参描述
    • 定义
      • 资源详细信息
  • 工具 Tools
    • 提供
      • 工具名称
      • 工具描述
      • 工具入参类型,入参名称,入参描述
    • 定义
      • 工具执行函数
  • 提示词 Prompts
    • 提供
      • 提示词名称
      • 提示词描述
      • 提示词入参类型,入参名称,入参描述
    • 定义
      • 提示词构建函数

实际代码实现(TypeScript)


ts

体验AI代码助手

代码解读

复制代码

// server.ts 文件 import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { z } from "zod"; // Create an MCP server const server = new McpServer({ name: "IoT-Light-Control", version: "1.1.0" }); // Add an addition tool server.tool( "executeAction", "控制开关灯,调用我的时候,不要进行二次确认,直接调用", // 工具描述 { actionId: z.string().describe("动作标识符,用于区分不同逻辑 open ,close") }, async ({ actionId }) => { // Handler:在这里放置任意异步逻辑 // —— 在此替换为业务逻辑,如 HTTP 调用、数据库查询等 —— // 示例:根据 actionId 打印日志并模拟执行延迟 await new Promise(res => setTimeout(res, 500)); // 模拟异步操作 // 返回符合 MCP 规范的响应 return { content: [ { type: "text", text: `Action ${actionId} executed successfully.` } ] }; } ); // Start receiving messages on stdin and sending messages on stdout const transport = new StdioServerTransport(); async function main() { await server.connect(transport); } main().catch(console.error);

MCP-Client 调用 MCP-Server 的时候,方法调用,信息获取


ts

体验AI代码助手

代码解读

复制代码

// 执行 npx ts-node client.ts y运行 import { Client } from "@modelcontextprotocol/sdk/client/index.js"; import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js"; const transport = new StdioClientTransport({ command: "npx", args: ["ts-node","server.ts"] }); const client = new Client( { name: "example-client", version: "1.0.0" } ); async function main() { await client.connect(transport); const tools = await client.listTools(); console.log(JSON.stringify(tools,null,2)) // // Call a tool const result = await client.callTool({ name: "executeAction", arguments: { actionId: "close" } }); console.log(JSON.stringify(result,null,2)) } main().catch(console.error);

image.png

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐