RAG 知识库搭建实战完全指南:从零构建企业级智能问答系统
RAG(检索增强生成)是 2026 年最热门的 AI 技术之一,它将外部知识库与大语言模型结合,让 AI 能够基于私有数据生成准确、可靠的回答。本教程从零开始,手把手教你搭建企业级 RAG 知识库系统。
一、RAG 核心概念与工作原理
RAG(Retrieval-Augmented Generation,检索增强生成)是一种将外部知识检索与大语言模型生成相结合的技术架构。它有效解决了大模型知识时效性差、容易产生"幻觉"以及无法处理私有数据等核心痛点。
1.1 为什么需要 RAG?
传统大语言模型存在以下局限性:
- 知识时效性差:模型训练完成后,无法获取新知识
- 幻觉问题:面对不熟悉的问题,模型可能编造虚假信息
- 私有数据盲区:无法访问企业内部文档、知识库等私有数据
- 不可追溯:无法提供答案的信息来源
RAG 通过引入外部知识库,让模型在生成答案前先"查阅资料",从而产生更准确、可追溯的回答。
1.2 RAG 系统的核心组件
一个完整的 RAG 系统包含三个核心组件:
- 知识库(Knowledge Base):存储外部知识的向量数据库,支持语义级别的相似度检索
- 检索器(Retriever):根据用户查询,从知识库中召回最相关的文档片段
- 生成器(Generator):大语言模型,基于检索结果生成最终答案
1.3 RAG 工作流程
# RAG 工作流程示意图 用户提问 → 向量化 → 检索知识库 → 召回Top-K文档 → 构建增强Prompt → LLM生成答案 # 具体步骤: # 1. 用户输入问题:"如何配置Docker容器?" # 2. 将问题转换为向量嵌入 # 3. 在向量数据库中检索最相关的文档片段 # 4. 将检索结果与原问题组合成增强提示词 # 5. LLM 基于增强上下文生成准确答案
二、搭建 RAG 知识库的完整流程
2.1 数据收集与预处理
高质量的知识库是 RAG 系统的基础。数据收集需要注意:
import os
from pathlib import Path
class DataLoader:
"""多格式文档加载器"""
def __init__(self, data_dir: str):
self.data_dir = Path(data_dir)
self.supported_formats = [".txt", ".pdf", ".md", ".docx"]
def load_documents(self):
"""加载目录下所有支持的文档"""
documents = []
for file_path in self.data_dir.rglob("*"):
if file_path.suffix.lower() in self.supported_formats:
try:
content = self._read_file(file_path)
documents.append({
"content": content,
"source": str(file_path),
"metadata": {
"filename": file_path.name,
"format": file_path.suffix
}
})
except Exception as e:
print(f"加载文件失败: {file_path}, 错误: {e}")
return documents
def _read_file(self, file_path: Path) -> str:
"""根据文件类型读取内容"""
if file_path.suffix == ".txt" or file_path.suffix == ".md":
return file_path.read_text(encoding="utf-8")
elif file_path.suffix == ".pdf":
# 使用 PyPDF2 或 pdfplumber 解析 PDF
import pdfplumber
with pdfplumber.open(file_path) as pdf:
return "\n".join([page.extract_text() or "" for page in pdf.pages])
# 其他格式的处理...
return ""
# 使用示例
loader = DataLoader("./knowledge_base")
docs = loader.load_documents()
print(f"成功加载 {len(docs)} 个文档")
2.2 文本分块策略
由于大模型的上下文窗口限制,需要将长文档分割成适当大小的块。分块策略直接影响检索效果。
from typing import List
import re
class TextChunker:
"""智能文本分块器"""
def __init__(
self,
chunk_size: int = 500, # 每块最大字符数
chunk_overlap: int = 50, # 块之间的重叠字符数
separators: List[str] = None
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# 按优先级尝试的分隔符
self.separators = separators or ["\n\n", "\n", "。", "!", "?", " "]
def split_text(self, text: str, metadata: dict = None) -> List[dict]:
"""将文本分割成多个块"""
chunks = []
# 尝试按段落分割
paragraphs = text.split("\n\n")
current_chunk = ""
current_length = 0
for para in paragraphs:
para = para.strip()
if not para:
continue
# 如果单个段落超过块大小,需要进一步分割
if len(para) > self.chunk_size:
# 先保存当前块
if current_chunk:
chunks.append(self._create_chunk(current_chunk, metadata))
current_chunk = ""
current_length = 0
# 按句子分割长段落
sentences = self._split_by_sentences(para)
for sentence in sentences:
if len(sentence) <= self.chunk_size:
chunks.append(self._create_chunk(sentence, metadata))
else:
# 如果句子仍然太长,按字符截断
for i in range(0, len(sentence), self.chunk_size - self.chunk_overlap):
chunk_text = sentence[i:i + self.chunk_size]
chunks.append(self._create_chunk(chunk_text, metadata))
else:
# 检查是否需要创建新块
if current_length + len(para) > self.chunk_size:
if current_chunk:
chunks.append(self._create_chunk(current_chunk, metadata))
# 保留重叠部分
overlap_text = current_chunk[-self.chunk_overlap:] if current_chunk else ""
current_chunk = overlap_text + para
current_length = len(current_chunk)
else:
current_chunk += "\n\n" + para if current_chunk else para
current_length = len(current_chunk)
# 保存最后一块
if current_chunk:
chunks.append(self._create_chunk(current_chunk, metadata))
return chunks
def _split_by_sentences(self, text: str) -> List[str]:
"""按句子分割文本"""
# 使用正则匹配中英文句子结束符
sentence_pattern = r"[。!?.!?]+"
sentences = re.split(sentence_pattern, text)
return [s.strip() for s in sentences if s.strip()]
def _create_chunk(self, text: str, metadata: dict = None) -> dict:
"""创建文档块对象"""
chunk = {
"content": text.strip(),
"char_count": len(text.strip())
}
if metadata:
chunk["metadata"] = metadata.copy()
return chunk
# 使用示例
chunker = TextChunker(chunk_size=500, chunk_overlap=50)
text_chunks = chunker.split_text(docs[0]["content"], docs[0]["metadata"])
print(f"文档被分割成 {len(text_chunks)} 个块")
2.3 向量嵌入生成
将文本块转换为向量表示,这是语义检索的基础。推荐使用 OpenAI 的 text-embedding-3 或本地的 bge-m3 模型。
import numpy as np
from typing import List
import requests
class EmbeddingGenerator:
"""向量嵌入生成器"""
def __init__(self, model: str = "text-embedding-3-small", api_key: str = None):
self.model = model
self.api_key = api_key
self.api_url = "https://api.openai.com/v1/embeddings"
def generate(self, texts: List[str]) -> np.ndarray:
"""批量生成文本向量"""
# 调用 OpenAI API 生成嵌入
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
response = requests.post(
self.api_url,
headers=headers,
json={
"model": self.model,
"input": texts,
"encoding_format": "float"
}
)
if response.status_code != 200:
raise Exception(f"API 调用失败: {response.text}")
# 提取向量并按原始顺序排列
embeddings_data = response.json()["data"]
embeddings = sorted(embeddings_data, key=lambda x: x["index"])
return np.array([e["embedding"] for e in embeddings])
def generate_single(self, text: str) -> np.ndarray:
"""生成单个文本的向量"""
return self.generate([text])[0]
# 使用本地模型的开源替代方案
class LocalEmbeddingGenerator:
"""本地嵌入模型(使用 sentence-transformers)"""
def __init__(self, model_name: str = "BAAI/bge-m3"):
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer(model_name)
def generate(self, texts: List[str]) -> np.ndarray:
"""批量生成文本向量"""
embeddings = self.model.encode(texts, normalize_embeddings=True)
return embeddings
def generate_single(self, text: str) -> np.ndarray:
"""生成单个文本的向量"""
return self.model.encode([text], normalize_embeddings=True)[0]
# 使用示例
# embedder = EmbeddingGenerator(api_key="your-api-key")
embedder = LocalEmbeddingGenerator(model_name="BAAI/bge-m3")
vectors = embedder.generate([chunk["content"] for chunk in text_chunks])
print(f"生成了 {len(vectors)} 个向量,维度: {vectors[0].shape}")
2.4 向量数据库存储
选择合适的向量数据库至关重要。主流选择包括 Milvus、Pinecone、Chroma、FAISS 等。
from typing import List, Optional
import chromadb
from chromadb.config import Settings
class VectorStore:
"""向量数据库管理器(基于 ChromaDB)"""
def __init__(self, persist_dir: str = "./chroma_db", collection_name: str = "knowledge_base"):
# 初始化 ChromaDB 客户端
self.client = chromadb.Client(Settings(
chroma_db_impl="duckdb+parquet",
persist_directory=persist_dir
))
# 获取或创建集合
self.collection = self.client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"} # 使用余弦相似度
)
self.embedder = LocalEmbeddingGenerator()
def add_documents(self, chunks: List[dict], batch_size: int = 100):
"""批量添加文档块到向量库"""
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
# 生成向量
texts = [chunk["content"] for chunk in batch]
embeddings = self.embedder.generate(texts)
# 生成唯一 ID
ids = [f"chunk_{i + j}" for j in range(len(batch))]
# 准备元数据
metadatas = [chunk.get("metadata", {}) for chunk in batch]
# 添加到集合
self.collection.add(
ids=ids,
embeddings=embeddings.tolist(),
documents=texts,
metadatas=metadatas
)
print(f"已添加 {i + len(batch)}/{len(chunks)} 个文档块")
def search(self, query: str, top_k: int = 5) -> List[dict]:
"""语义相似度搜索"""
# 生成查询向量
query_embedding = self.embedder.generate_single(query)
# 执行搜索
results = self.collection.query(
query_embeddings=[query_embedding.tolist()],
n_results=top_k,
include=["documents", "metadatas", "distances"]
)
# 格式化结果
search_results = []
for i in range(len(results["ids"][0])):
search_results.append({
"content": results["documents"][0][i],
"metadata": results["metadatas"][0][i],
"score": 1 - results["distances"][0][i] # 转换为相似度分数
})
return search_results
def delete_collection(self):
"""删除整个集合"""
self.client.delete_collection(self.collection.name)
# 使用示例
vector_store = VectorStore("./kb_chroma", "tech_docs")
vector_store.add_documents(text_chunks)
# 测试搜索
results = vector_store.search("如何配置 Docker 容器?", top_k=3)
for i, result in enumerate(results):
print(f"\n--- 结果 {i+1} (相似度: {result[\"score\"]:.3f}) ---")
print(result["content"][:200] + "...")
三、RAG 完整系统实现
3.1 构建 RAG Pipeline
from typing import List, Optional
import openai
class RAGPipeline:
"""完整的 RAG 管道"""
def __init__(
self,
vector_store: VectorStore,
llm_model: str = "gpt-4o-mini",
api_key: str = None,
top_k: int = 5
):
self.vector_store = vector_store
self.llm_model = llm_model
self.top_k = top_k
self.client = openai.OpenAI(api_key=api_key)
def query(
self,
question: str,
system_prompt: str = None,
temperature: float = 0.7
) -> dict:
"""执行 RAG 查询"""
# 1. 检索相关文档
search_results = self.vector_store.search(question, top_k=self.top_k)
if not search_results:
return {
"answer": "抱歉,我在知识库中没有找到相关信息。",
"sources": []
}
# 2. 构建增强提示词
context = self._build_context(search_results)
default_prompt = """你是一个专业的知识库助手。请基于提供的上下文信息回答用户问题。
要求:
1. 只使用上下文中的信息回答,不要编造内容
2. 如果上下文中没有相关信息,请明确告知用户
3. 回答要简洁、准确、有条理
4. 如果引用具体内容,请标注来源编号 [1]、[2] 等
上下文信息:
{context}
用户问题:{question}
请给出你的回答:"""
prompt = system_prompt or default_prompt
full_prompt = prompt.format(context=context, question=question)
# 3. 调用 LLM 生成答案
response = self.client.chat.completions.create(
model=self.llm_model,
messages=[
{"role": "system", "content": "你是一个专业的知识库助手。"},
{"role": "user", "content": full_prompt}
],
temperature=temperature
)
answer = response.choices[0].message.content
# 4. 返回结果
return {
"answer": answer,
"sources": [
{
"content": r["content"][:300] + "...",
"score": r["score"],
"metadata": r["metadata"]
}
for r in search_results
],
"model": self.llm_model
}
def _build_context(self, search_results: List[dict]) -> str:
"""构建上下文字符串"""
context_parts = []
for i, result in enumerate(search_results, 1):
context_parts.append(f"[文档 {i}]\n{result[\"content\"]}\n")
return "\n".join(context_parts)
# 使用示例
rag = RAGPipeline(
vector_store=vector_store,
llm_model="gpt-4o-mini",
api_key="your-openai-api-key",
top_k=5
)
# 执行查询
result = rag.query("如何优化向量检索的性能?")
print("回答:", result["answer"])
print("\n参考来源:")
for i, source in enumerate(result["sources"], 1):
print(f" [{i}] 相似度: {source[\"score\"]:.3f}")
3.2 完整示例:构建技术文档知识库
# main.py - 完整的 RAG 知识库系统
from pathlib import Path
def build_knowledge_base(data_dir: str, persist_dir: str):
"""构建知识库的完整流程"""
print("=" * 50)
print("开始构建 RAG 知识库")
print("=" * 50)
# 1. 加载文档
print("\n[步骤 1] 加载文档...")
loader = DataLoader(data_dir)
documents = loader.load_documents()
print(f"✓ 成功加载 {len(documents)} 个文档")
# 2. 文本分块
print("\n[步骤 2] 文本分块...")
chunker = TextChunker(chunk_size=500, chunk_overlap=50)
all_chunks = []
for doc in documents:
chunks = chunker.split_text(doc["content"], doc["metadata"])
all_chunks.extend(chunks)
print(f"✓ 生成了 {len(all_chunks)} 个文本块")
# 3. 创建向量存储
print("\n[步骤 3] 创建向量存储...")
vector_store = VectorStore(persist_dir, "kb_collection")
vector_store.add_documents(all_chunks)
print(f"✓ 向量存储创建完成")
return vector_store
def query_knowledge_base(vector_store: VectorStore, question: str):
"""查询知识库"""
print("\n" + "=" * 50)
print(f"查询: {question}")
print("=" * 50)
rag = RAGPipeline(vector_store)
result = rag.query(question)
print("\n【回答】")
print(result["answer"])
print("\n【参考来源】")
for i, source in enumerate(result["sources"], 1):
print(f" [{i}] 相似度: {source[\"score\"]:.3f}")
return result
# 主程序入口
if __name__ == "__main__":
# 构建知识库
kb = build_knowledge_base(
data_dir="./docs",
persist_dir="./kb_vector_db"
)
# 测试查询
query_knowledge_base(kb, "什么是向量数据库?它有哪些优势?")
四、RAG 系统优化策略
4.1 提升检索质量
- 混合检索:结合向量检索与关键词检索,提高召回率
- 重排序:使用 Cross-Encoder 对检索结果进行精细排序
- 查询扩展:将用户查询改写为多个相关查询,提高覆盖率
- 元数据过滤:利用文档元数据缩小检索范围
4.2 优化生成质量
- Prompt 工程:设计高质量的提示词模板,引导模型输出
- 上下文窗口管理:动态调整检索文档数量,避免超出限制
- 引用追踪:让模型标注答案的信息来源,提高可信度
4.3 系统性能优化
# 使用缓存提升响应速度
from functools import lru_cache
import hashlib
class CachedRAGPipeline(RAGPipeline):
"""带缓存的 RAG 管道"""
def __init__(self, *args, cache_size: int = 1000, **kwargs):
super().__init__(*args, **kwargs)
self.query_cache = {}
self.cache_size = cache_size
def _get_cache_key(self, question: str) -> str:
"""生成缓存键"""
return hashlib.md5(question.encode()).hexdigest()
def query(self, question: str, **kwargs) -> dict:
"""带缓存的查询"""
cache_key = self._get_cache_key(question)
# 检查缓存
if cache_key in self.query_cache:
print("✓ 命中缓存")
return self.query_cache[cache_key]
# 执行查询
result = super().query(question, **kwargs)
# 缓存结果
if len(self.query_cache) >= self.cache_size:
# 简单的 LRU:删除最早的缓存
oldest_key = next(iter(self.query_cache))
del self.query_cache[oldest_key]
self.query_cache[cache_key] = result
return result
五、总结
RAG 技术是当前 AI 应用开发的核心能力之一。通过本教程,你已经掌握了:
- RAG 核心原理:理解检索增强生成的工作机制
- 数据处理流程:从文档加载到文本分块的完整方案
- 向量数据库应用:使用 ChromaDB 构建语义检索系统
- 完整 Pipeline 实现:端到端的 RAG 系统代码
- 优化策略:提升检索和生成质量的方法
下一步可以探索的方向:高级分块策略、多模态 RAG、Agent 结合 RAG 等。掌握 RAG 技术,将让你在 AI 应用开发领域如虎添翼!
本文链接:https://www.kkkliao.cn/?id=942 转载需授权!
版权声明:本文由廖万里的博客发布,如需转载请注明出处。



手机流量卡
免费领卡
号卡合伙人
产品服务
关于本站
