Skip to content

通信协议

SSE

概念

SSE全称Server-Sent-Events,服务端推送事件,是HTML5提供的一种单工通信,由服务端客户端发起通信。

初步实现

tsx
// 服务端
import express from 'express'
import cors from 'cors'

const app = express()
app.use(cors())

app.get('/sse', (req, res) => {
	res.setHeaders('Content-Type', 'text/event-stream')
	res.setHeader('Connection', 'keep-alive') // 保持长连接
	res.flushHeaders()
	
	res.write(`data: ${Math.random()}\n\n`)
})

app.listen(3000, () => {
	console.log('server is running at http://localhost:3000')
})

// 客户端

const sse= new EventSource('http://localhost:3000/sse')

sse.addEventListener('message', (event) => {
	console.log(event.data)
})

post实现

  • 原生实现,不支持自动重连
tsx
// 服务端
import express from 'express'
import cors from 'cors'

const app = express()
app.use(cors())

app.post('/sse', (req, res) => {
	res.setHeaders('Content-Type', 'text/event-stream')
	res.setHeader('Connection', 'keep-alive') // 保持长连接
	res.flushHeaders()
	
	res.write(`data: ${Math.random()}\n\n`)
})

app.listen(3000, () => {
	console.log('server is running at http://localhost:3000')
})

// 客户端只能使用fetch
async function startSSE() {
	const res = await fetch('http://localhost:3000/sse', {
		method: 'POST',
		headers: {
			'Content-Type': 'application/json'
		},
		body: JSON.stringify({
			message: 'Hello, world!'
		})
	})
	// 获取到的数据格式为Unit8Array,可以使用String.fromCharCode()读取
	const reader = res.body?.getReader()
	const arr: string[] = []
	while (true) {
		const { done, value } = await reader!.read()
		if (done) break
		const decoder = new TextDecoder()
		const valueStr = decoder.decode(value)
		const data = valueStr
			.split('\n')
			.filter(item => item !== '')
			.map(item => item.startsWith('data:') && item.split(':')[1])
			.join('')
			.split('')
		arr.push(...data)
		app.innerHTML += arr.shift()
	}
}

startSSE()
  • @microsoft/fetch-event-source
tsx
import { fetchEventSource } from '@microsoft/fetch-event-source'
fetchEventSource('http://localhost:3000/sse', {
	method: 'POST',
	headers: {
		'Content-Type': 'application/json'
	},
	body: JSON.stringify({
		message: 'Hello, world!'
	}),
	onmessage(event) {
		console.log(event.data)
	}
})

应用场景

  • AI对话
tsx
import express from "express";
import { ChatOpenAI } from "@langchain/openai";
import { API_KEY } from "./config/env.js";
import { BufferMemory } from "langchain/memory";
import { PromptTemplate } from "@langchain/core/prompts";

const router = express.Router();

const model = new ChatOpenAI({
  model: "deepseek-chat",
  apiKey: API_KEY,
  temperature: 1.3,
  // 开启流式输出
  streaming: true,
  configuration: {
    baseURL: "https://api.deepseek.com",
  },
});
// 创建记忆能力
const memory = new BufferMemory({
  // 返回消息
  returnMessages: true,
  // 记忆键
  memoryKey: "chat_history",
  // 输入键(我们的问题)
  inputKey: "input",
});

// 创建提示词
const prompt = new PromptTemplate({
  template: `
        你是一个高级前端专业,精通各种框架源码,对前端工程化有大量实践
        对话内容:
        {chat_history}
        用户的问题:
        {input}
        你的回答:
    `,
  inputVariables: ["input", "chat_history"],
});

router.post("/chat", async (req, res) => {
  try {
    // 设置SSE响应头
    res.setHeader("Content-Type", "text/event-stream");
    res.setHeader("Cache-Control", "no-cache");
    res.setHeader("Connection", "keep-alive");
    res.flushHeaders();

    const { message } = req.body;
    const history = await memory.loadMemoryVariables({});
    const chatHistory = history.chat_history || [];
    const formattedPrompt = await prompt.format({
      input: message,
      chat_history: chatHistory,
    });

    const result = await model.stream(formattedPrompt);

    for await (const chunk of result) {
      res.write(chunk.content);
    }

    // 保存完整的对话到内存
    await memory.saveContext({ input: message }, { output: formattedPrompt });
  } catch (error) {
    console.error("Stream error:", error);
  }
});

// 客户端
const startSSE = async (message) => {
  if (!message) return;
  try {
    const response = await fetch("http://localhost:3000/api/chat", {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
      },
      body: JSON.stringify({ message }),
    });
    const reader = response.body.getReader();
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      const chunk = new TextDecoder().decode(value);
      chat.innerHTML += chunk;
    }
  } catch (error) {
    console.log(error);
  }
};
sendButton.addEventListener("click", () => {
  const message = messageInput.value;
  startSSE(message);
});
  • RAG数据库检索(结合typeorm)
tsx
import express from "express";
import { ChatOpenAI } from "@langchain/openai";
import { API_KEY } from "./config/env.js";
import { PromptTemplate } from "@langchain/core/prompts";
import { SqlDatabase } from "langchain/sql_db";
import { dataSource } from "./sql.js";
import { createSqlQueryChain } from "langchain/chains/sql_db";
import { RunnableSequence } from "@langchain/core/runnables";
import { StringOutputParser } from "@langchain/core/output_parsers";

const router = express.Router();

await dataSource.initialize();
const db = await SqlDatabase.fromDataSourceParams({
  appDataSource: dataSource,
});

const model = new ChatOpenAI({
  model: "deepseek-chat",
  apiKey: API_KEY,
  temperature: 1.3,
  streaming: true,
  configuration: {
    baseURL: "https://api.deepseek.com",
  },
});

// 创建提示词
const prompt = new PromptTemplate({
  template: `
        你是一个sql查询专家,请根据用户的问题生成一个sql语句
		请校验sql语句是否正确,如果正确请返回sql语句,如果错误请返回错误原因
		- 使用 NOT IN 与 NULL 值
		- 标识符是否正确引用
		- 函数是否正确使用
		- 数据类型是否匹配
		- 避免使用子查询

		如果以上查询错误,请重写查询,如果没有错误,返回原始查询

		只返回sql语句,不要返回任何其他内容

		原始查询 {query}
    `,
  inputVariables: ["query"],
});

const chain = await createSqlQueryChain({
  llm: model,
  db,
  dialect: "mysql",
});

const validateChain = prompt.pipe(model).pipe(new StringOutputParser());

const fullChain = RunnableSequence.from([
  {
    query: async (input) => {
      const result = await chain.invoke(input);
      return result;
    },
  },
  validateChain,
]);

router.post("/sql", async (req, res) => {
  const { query } = req.body;
  const sql = await fullChain.invoke({ question: query });
  const result = await db.run(sql);
  res.json(result);
});

export default router;
最近更新