通信协议
SSE
概念
SSE全称Server-Sent-Events,服务端推送事件,是HTML5提供的一种单工通信,由服务端向客户端发起通信。
初步实现
tsx
// 服务端
import express from 'express'
import cors from 'cors'
const app = express()
app.use(cors())
app.get('/sse', (req, res) => {
res.setHeaders('Content-Type', 'text/event-stream')
res.setHeader('Connection', 'keep-alive') // 保持长连接
res.flushHeaders()
res.write(`data: ${Math.random()}\n\n`)
})
app.listen(3000, () => {
console.log('server is running at http://localhost:3000')
})
// 客户端
const sse= new EventSource('http://localhost:3000/sse')
sse.addEventListener('message', (event) => {
console.log(event.data)
})post实现
- 原生实现,不支持自动重连
tsx
// 服务端
import express from 'express'
import cors from 'cors'
const app = express()
app.use(cors())
app.post('/sse', (req, res) => {
res.setHeaders('Content-Type', 'text/event-stream')
res.setHeader('Connection', 'keep-alive') // 保持长连接
res.flushHeaders()
res.write(`data: ${Math.random()}\n\n`)
})
app.listen(3000, () => {
console.log('server is running at http://localhost:3000')
})
// 客户端只能使用fetch
async function startSSE() {
const res = await fetch('http://localhost:3000/sse', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
message: 'Hello, world!'
})
})
// 获取到的数据格式为Unit8Array,可以使用String.fromCharCode()读取
const reader = res.body?.getReader()
const arr: string[] = []
while (true) {
const { done, value } = await reader!.read()
if (done) break
const decoder = new TextDecoder()
const valueStr = decoder.decode(value)
const data = valueStr
.split('\n')
.filter(item => item !== '')
.map(item => item.startsWith('data:') && item.split(':')[1])
.join('')
.split('')
arr.push(...data)
app.innerHTML += arr.shift()
}
}
startSSE()@microsoft/fetch-event-source
tsx
import { fetchEventSource } from '@microsoft/fetch-event-source'
fetchEventSource('http://localhost:3000/sse', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
message: 'Hello, world!'
}),
onmessage(event) {
console.log(event.data)
}
})应用场景
- AI对话
tsx
import express from "express";
import { ChatOpenAI } from "@langchain/openai";
import { API_KEY } from "./config/env.js";
import { BufferMemory } from "langchain/memory";
import { PromptTemplate } from "@langchain/core/prompts";
const router = express.Router();
const model = new ChatOpenAI({
model: "deepseek-chat",
apiKey: API_KEY,
temperature: 1.3,
// 开启流式输出
streaming: true,
configuration: {
baseURL: "https://api.deepseek.com",
},
});
// 创建记忆能力
const memory = new BufferMemory({
// 返回消息
returnMessages: true,
// 记忆键
memoryKey: "chat_history",
// 输入键(我们的问题)
inputKey: "input",
});
// 创建提示词
const prompt = new PromptTemplate({
template: `
你是一个高级前端专业,精通各种框架源码,对前端工程化有大量实践
对话内容:
{chat_history}
用户的问题:
{input}
你的回答:
`,
inputVariables: ["input", "chat_history"],
});
router.post("/chat", async (req, res) => {
try {
// 设置SSE响应头
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.flushHeaders();
const { message } = req.body;
const history = await memory.loadMemoryVariables({});
const chatHistory = history.chat_history || [];
const formattedPrompt = await prompt.format({
input: message,
chat_history: chatHistory,
});
const result = await model.stream(formattedPrompt);
for await (const chunk of result) {
res.write(chunk.content);
}
// 保存完整的对话到内存
await memory.saveContext({ input: message }, { output: formattedPrompt });
} catch (error) {
console.error("Stream error:", error);
}
});
// 客户端
const startSSE = async (message) => {
if (!message) return;
try {
const response = await fetch("http://localhost:3000/api/chat", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ message }),
});
const reader = response.body.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = new TextDecoder().decode(value);
chat.innerHTML += chunk;
}
} catch (error) {
console.log(error);
}
};
sendButton.addEventListener("click", () => {
const message = messageInput.value;
startSSE(message);
});- RAG数据库检索(结合typeorm)
tsx
import express from "express";
import { ChatOpenAI } from "@langchain/openai";
import { API_KEY } from "./config/env.js";
import { PromptTemplate } from "@langchain/core/prompts";
import { SqlDatabase } from "langchain/sql_db";
import { dataSource } from "./sql.js";
import { createSqlQueryChain } from "langchain/chains/sql_db";
import { RunnableSequence } from "@langchain/core/runnables";
import { StringOutputParser } from "@langchain/core/output_parsers";
const router = express.Router();
await dataSource.initialize();
const db = await SqlDatabase.fromDataSourceParams({
appDataSource: dataSource,
});
const model = new ChatOpenAI({
model: "deepseek-chat",
apiKey: API_KEY,
temperature: 1.3,
streaming: true,
configuration: {
baseURL: "https://api.deepseek.com",
},
});
// 创建提示词
const prompt = new PromptTemplate({
template: `
你是一个sql查询专家,请根据用户的问题生成一个sql语句
请校验sql语句是否正确,如果正确请返回sql语句,如果错误请返回错误原因
- 使用 NOT IN 与 NULL 值
- 标识符是否正确引用
- 函数是否正确使用
- 数据类型是否匹配
- 避免使用子查询
如果以上查询错误,请重写查询,如果没有错误,返回原始查询
只返回sql语句,不要返回任何其他内容
原始查询 {query}
`,
inputVariables: ["query"],
});
const chain = await createSqlQueryChain({
llm: model,
db,
dialect: "mysql",
});
const validateChain = prompt.pipe(model).pipe(new StringOutputParser());
const fullChain = RunnableSequence.from([
{
query: async (input) => {
const result = await chain.invoke(input);
return result;
},
},
validateChain,
]);
router.post("/sql", async (req, res) => {
const { query } = req.body;
const sql = await fullChain.invoke({ question: query });
const result = await db.run(sql);
res.json(result);
});
export default router;