当前位置:首页 > AI > 正文内容

RAG 知识库搭建实战完全指南:从零构建企业级智能问答系统

廖万里14小时前AI1

RAG(检索增强生成)是 2026 年最热门的 AI 技术之一,它将外部知识库与大语言模型结合,让 AI 能够基于私有数据生成准确、可靠的回答。本教程从零开始,手把手教你搭建企业级 RAG 知识库系统。

一、RAG 核心概念与工作原理

RAG(Retrieval-Augmented Generation,检索增强生成)是一种将外部知识检索大语言模型生成相结合的技术架构。它有效解决了大模型知识时效性差、容易产生"幻觉"以及无法处理私有数据等核心痛点。

1.1 为什么需要 RAG?

传统大语言模型存在以下局限性:

  • 知识时效性差:模型训练完成后,无法获取新知识
  • 幻觉问题:面对不熟悉的问题,模型可能编造虚假信息
  • 私有数据盲区:无法访问企业内部文档、知识库等私有数据
  • 不可追溯:无法提供答案的信息来源

RAG 通过引入外部知识库,让模型在生成答案前先"查阅资料",从而产生更准确、可追溯的回答。

1.2 RAG 系统的核心组件

一个完整的 RAG 系统包含三个核心组件:

  1. 知识库(Knowledge Base):存储外部知识的向量数据库,支持语义级别的相似度检索
  2. 检索器(Retriever):根据用户查询,从知识库中召回最相关的文档片段
  3. 生成器(Generator):大语言模型,基于检索结果生成最终答案

1.3 RAG 工作流程

# RAG 工作流程示意图
用户提问 → 向量化 → 检索知识库 → 召回Top-K文档 → 构建增强Prompt → LLM生成答案

# 具体步骤:
# 1. 用户输入问题:"如何配置Docker容器?"
# 2. 将问题转换为向量嵌入
# 3. 在向量数据库中检索最相关的文档片段
# 4. 将检索结果与原问题组合成增强提示词
# 5. LLM 基于增强上下文生成准确答案

二、搭建 RAG 知识库的完整流程

2.1 数据收集与预处理

高质量的知识库是 RAG 系统的基础。数据收集需要注意:

import os
from pathlib import Path

class DataLoader:
    """多格式文档加载器"""
    
    def __init__(self, data_dir: str):
        self.data_dir = Path(data_dir)
        self.supported_formats = [".txt", ".pdf", ".md", ".docx"]
    
    def load_documents(self):
        """加载目录下所有支持的文档"""
        documents = []
        
        for file_path in self.data_dir.rglob("*"):
            if file_path.suffix.lower() in self.supported_formats:
                try:
                    content = self._read_file(file_path)
                    documents.append({
                        "content": content,
                        "source": str(file_path),
                        "metadata": {
                            "filename": file_path.name,
                            "format": file_path.suffix
                        }
                    })
                except Exception as e:
                    print(f"加载文件失败: {file_path}, 错误: {e}")
        
        return documents
    
    def _read_file(self, file_path: Path) -> str:
        """根据文件类型读取内容"""
        if file_path.suffix == ".txt" or file_path.suffix == ".md":
            return file_path.read_text(encoding="utf-8")
        elif file_path.suffix == ".pdf":
            # 使用 PyPDF2 或 pdfplumber 解析 PDF
            import pdfplumber
            with pdfplumber.open(file_path) as pdf:
                return "\n".join([page.extract_text() or "" for page in pdf.pages])
        # 其他格式的处理...
        return ""

# 使用示例
loader = DataLoader("./knowledge_base")
docs = loader.load_documents()
print(f"成功加载 {len(docs)} 个文档")

2.2 文本分块策略

由于大模型的上下文窗口限制,需要将长文档分割成适当大小的块。分块策略直接影响检索效果。

from typing import List
import re

class TextChunker:
    """智能文本分块器"""
    
    def __init__(
        self,
        chunk_size: int = 500,      # 每块最大字符数
        chunk_overlap: int = 50,    # 块之间的重叠字符数
        separators: List[str] = None
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        # 按优先级尝试的分隔符
        self.separators = separators or ["\n\n", "\n", "。", "!", "?", " "]
    
    def split_text(self, text: str, metadata: dict = None) -> List[dict]:
        """将文本分割成多个块"""
        chunks = []
        
        # 尝试按段落分割
        paragraphs = text.split("\n\n")
        
        current_chunk = ""
        current_length = 0
        
        for para in paragraphs:
            para = para.strip()
            if not para:
                continue
            
            # 如果单个段落超过块大小,需要进一步分割
            if len(para) > self.chunk_size:
                # 先保存当前块
                if current_chunk:
                    chunks.append(self._create_chunk(current_chunk, metadata))
                    current_chunk = ""
                    current_length = 0
                
                # 按句子分割长段落
                sentences = self._split_by_sentences(para)
                for sentence in sentences:
                    if len(sentence) <= self.chunk_size:
                        chunks.append(self._create_chunk(sentence, metadata))
                    else:
                        # 如果句子仍然太长,按字符截断
                        for i in range(0, len(sentence), self.chunk_size - self.chunk_overlap):
                            chunk_text = sentence[i:i + self.chunk_size]
                            chunks.append(self._create_chunk(chunk_text, metadata))
            else:
                # 检查是否需要创建新块
                if current_length + len(para) > self.chunk_size:
                    if current_chunk:
                        chunks.append(self._create_chunk(current_chunk, metadata))
                    # 保留重叠部分
                    overlap_text = current_chunk[-self.chunk_overlap:] if current_chunk else ""
                    current_chunk = overlap_text + para
                    current_length = len(current_chunk)
                else:
                    current_chunk += "\n\n" + para if current_chunk else para
                    current_length = len(current_chunk)
        
        # 保存最后一块
        if current_chunk:
            chunks.append(self._create_chunk(current_chunk, metadata))
        
        return chunks
    
    def _split_by_sentences(self, text: str) -> List[str]:
        """按句子分割文本"""
        # 使用正则匹配中英文句子结束符
        sentence_pattern = r"[。!?.!?]+"
        sentences = re.split(sentence_pattern, text)
        return [s.strip() for s in sentences if s.strip()]
    
    def _create_chunk(self, text: str, metadata: dict = None) -> dict:
        """创建文档块对象"""
        chunk = {
            "content": text.strip(),
            "char_count": len(text.strip())
        }
        if metadata:
            chunk["metadata"] = metadata.copy()
        return chunk

# 使用示例
chunker = TextChunker(chunk_size=500, chunk_overlap=50)
text_chunks = chunker.split_text(docs[0]["content"], docs[0]["metadata"])
print(f"文档被分割成 {len(text_chunks)} 个块")

2.3 向量嵌入生成

将文本块转换为向量表示,这是语义检索的基础。推荐使用 OpenAI 的 text-embedding-3 或本地的 bge-m3 模型。

import numpy as np
from typing import List
import requests

class EmbeddingGenerator:
    """向量嵌入生成器"""
    
    def __init__(self, model: str = "text-embedding-3-small", api_key: str = None):
        self.model = model
        self.api_key = api_key
        self.api_url = "https://api.openai.com/v1/embeddings"
    
    def generate(self, texts: List[str]) -> np.ndarray:
        """批量生成文本向量"""
        # 调用 OpenAI API 生成嵌入
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        response = requests.post(
            self.api_url,
            headers=headers,
            json={
                "model": self.model,
                "input": texts,
                "encoding_format": "float"
            }
        )
        
        if response.status_code != 200:
            raise Exception(f"API 调用失败: {response.text}")
        
        # 提取向量并按原始顺序排列
        embeddings_data = response.json()["data"]
        embeddings = sorted(embeddings_data, key=lambda x: x["index"])
        
        return np.array([e["embedding"] for e in embeddings])
    
    def generate_single(self, text: str) -> np.ndarray:
        """生成单个文本的向量"""
        return self.generate([text])[0]

# 使用本地模型的开源替代方案
class LocalEmbeddingGenerator:
    """本地嵌入模型(使用 sentence-transformers)"""
    
    def __init__(self, model_name: str = "BAAI/bge-m3"):
        from sentence_transformers import SentenceTransformer
        self.model = SentenceTransformer(model_name)
    
    def generate(self, texts: List[str]) -> np.ndarray:
        """批量生成文本向量"""
        embeddings = self.model.encode(texts, normalize_embeddings=True)
        return embeddings
    
    def generate_single(self, text: str) -> np.ndarray:
        """生成单个文本的向量"""
        return self.model.encode([text], normalize_embeddings=True)[0]

# 使用示例
# embedder = EmbeddingGenerator(api_key="your-api-key")
embedder = LocalEmbeddingGenerator(model_name="BAAI/bge-m3")
vectors = embedder.generate([chunk["content"] for chunk in text_chunks])
print(f"生成了 {len(vectors)} 个向量,维度: {vectors[0].shape}")

2.4 向量数据库存储

选择合适的向量数据库至关重要。主流选择包括 Milvus、Pinecone、Chroma、FAISS 等。

from typing import List, Optional
import chromadb
from chromadb.config import Settings

class VectorStore:
    """向量数据库管理器(基于 ChromaDB)"""
    
    def __init__(self, persist_dir: str = "./chroma_db", collection_name: str = "knowledge_base"):
        # 初始化 ChromaDB 客户端
        self.client = chromadb.Client(Settings(
            chroma_db_impl="duckdb+parquet",
            persist_directory=persist_dir
        ))
        
        # 获取或创建集合
        self.collection = self.client.get_or_create_collection(
            name=collection_name,
            metadata={"hnsw:space": "cosine"}  # 使用余弦相似度
        )
        
        self.embedder = LocalEmbeddingGenerator()
    
    def add_documents(self, chunks: List[dict], batch_size: int = 100):
        """批量添加文档块到向量库"""
        for i in range(0, len(chunks), batch_size):
            batch = chunks[i:i + batch_size]
            
            # 生成向量
            texts = [chunk["content"] for chunk in batch]
            embeddings = self.embedder.generate(texts)
            
            # 生成唯一 ID
            ids = [f"chunk_{i + j}" for j in range(len(batch))]
            
            # 准备元数据
            metadatas = [chunk.get("metadata", {}) for chunk in batch]
            
            # 添加到集合
            self.collection.add(
                ids=ids,
                embeddings=embeddings.tolist(),
                documents=texts,
                metadatas=metadatas
            )
            
            print(f"已添加 {i + len(batch)}/{len(chunks)} 个文档块")
    
    def search(self, query: str, top_k: int = 5) -> List[dict]:
        """语义相似度搜索"""
        # 生成查询向量
        query_embedding = self.embedder.generate_single(query)
        
        # 执行搜索
        results = self.collection.query(
            query_embeddings=[query_embedding.tolist()],
            n_results=top_k,
            include=["documents", "metadatas", "distances"]
        )
        
        # 格式化结果
        search_results = []
        for i in range(len(results["ids"][0])):
            search_results.append({
                "content": results["documents"][0][i],
                "metadata": results["metadatas"][0][i],
                "score": 1 - results["distances"][0][i]  # 转换为相似度分数
            })
        
        return search_results
    
    def delete_collection(self):
        """删除整个集合"""
        self.client.delete_collection(self.collection.name)

# 使用示例
vector_store = VectorStore("./kb_chroma", "tech_docs")
vector_store.add_documents(text_chunks)

# 测试搜索
results = vector_store.search("如何配置 Docker 容器?", top_k=3)
for i, result in enumerate(results):
    print(f"\n--- 结果 {i+1} (相似度: {result[\"score\"]:.3f}) ---")
    print(result["content"][:200] + "...")

三、RAG 完整系统实现

3.1 构建 RAG Pipeline

from typing import List, Optional
import openai

class RAGPipeline:
    """完整的 RAG 管道"""
    
    def __init__(
        self,
        vector_store: VectorStore,
        llm_model: str = "gpt-4o-mini",
        api_key: str = None,
        top_k: int = 5
    ):
        self.vector_store = vector_store
        self.llm_model = llm_model
        self.top_k = top_k
        self.client = openai.OpenAI(api_key=api_key)
    
    def query(
        self,
        question: str,
        system_prompt: str = None,
        temperature: float = 0.7
    ) -> dict:
        """执行 RAG 查询"""
        
        # 1. 检索相关文档
        search_results = self.vector_store.search(question, top_k=self.top_k)
        
        if not search_results:
            return {
                "answer": "抱歉,我在知识库中没有找到相关信息。",
                "sources": []
            }
        
        # 2. 构建增强提示词
        context = self._build_context(search_results)
        
        default_prompt = """你是一个专业的知识库助手。请基于提供的上下文信息回答用户问题。

要求:
1. 只使用上下文中的信息回答,不要编造内容
2. 如果上下文中没有相关信息,请明确告知用户
3. 回答要简洁、准确、有条理
4. 如果引用具体内容,请标注来源编号 [1]、[2] 等

上下文信息:
{context}

用户问题:{question}

请给出你的回答:"""
        
        prompt = system_prompt or default_prompt
        full_prompt = prompt.format(context=context, question=question)
        
        # 3. 调用 LLM 生成答案
        response = self.client.chat.completions.create(
            model=self.llm_model,
            messages=[
                {"role": "system", "content": "你是一个专业的知识库助手。"},
                {"role": "user", "content": full_prompt}
            ],
            temperature=temperature
        )
        
        answer = response.choices[0].message.content
        
        # 4. 返回结果
        return {
            "answer": answer,
            "sources": [
                {
                    "content": r["content"][:300] + "...",
                    "score": r["score"],
                    "metadata": r["metadata"]
                }
                for r in search_results
            ],
            "model": self.llm_model
        }
    
    def _build_context(self, search_results: List[dict]) -> str:
        """构建上下文字符串"""
        context_parts = []
        for i, result in enumerate(search_results, 1):
            context_parts.append(f"[文档 {i}]\n{result[\"content\"]}\n")
        return "\n".join(context_parts)

# 使用示例
rag = RAGPipeline(
    vector_store=vector_store,
    llm_model="gpt-4o-mini",
    api_key="your-openai-api-key",
    top_k=5
)

# 执行查询
result = rag.query("如何优化向量检索的性能?")
print("回答:", result["answer"])
print("\n参考来源:")
for i, source in enumerate(result["sources"], 1):
    print(f"  [{i}] 相似度: {source[\"score\"]:.3f}")

3.2 完整示例:构建技术文档知识库

# main.py - 完整的 RAG 知识库系统

from pathlib import Path

def build_knowledge_base(data_dir: str, persist_dir: str):
    """构建知识库的完整流程"""
    
    print("=" * 50)
    print("开始构建 RAG 知识库")
    print("=" * 50)
    
    # 1. 加载文档
    print("\n[步骤 1] 加载文档...")
    loader = DataLoader(data_dir)
    documents = loader.load_documents()
    print(f"✓ 成功加载 {len(documents)} 个文档")
    
    # 2. 文本分块
    print("\n[步骤 2] 文本分块...")
    chunker = TextChunker(chunk_size=500, chunk_overlap=50)
    all_chunks = []
    for doc in documents:
        chunks = chunker.split_text(doc["content"], doc["metadata"])
        all_chunks.extend(chunks)
    print(f"✓ 生成了 {len(all_chunks)} 个文本块")
    
    # 3. 创建向量存储
    print("\n[步骤 3] 创建向量存储...")
    vector_store = VectorStore(persist_dir, "kb_collection")
    vector_store.add_documents(all_chunks)
    print(f"✓ 向量存储创建完成")
    
    return vector_store


def query_knowledge_base(vector_store: VectorStore, question: str):
    """查询知识库"""
    
    print("\n" + "=" * 50)
    print(f"查询: {question}")
    print("=" * 50)
    
    rag = RAGPipeline(vector_store)
    result = rag.query(question)
    
    print("\n【回答】")
    print(result["answer"])
    
    print("\n【参考来源】")
    for i, source in enumerate(result["sources"], 1):
        print(f"  [{i}] 相似度: {source[\"score\"]:.3f}")
    
    return result


# 主程序入口
if __name__ == "__main__":
    # 构建知识库
    kb = build_knowledge_base(
        data_dir="./docs",
        persist_dir="./kb_vector_db"
    )
    
    # 测试查询
    query_knowledge_base(kb, "什么是向量数据库?它有哪些优势?")

四、RAG 系统优化策略

4.1 提升检索质量

  • 混合检索:结合向量检索与关键词检索,提高召回率
  • 重排序:使用 Cross-Encoder 对检索结果进行精细排序
  • 查询扩展:将用户查询改写为多个相关查询,提高覆盖率
  • 元数据过滤:利用文档元数据缩小检索范围

4.2 优化生成质量

  • Prompt 工程:设计高质量的提示词模板,引导模型输出
  • 上下文窗口管理:动态调整检索文档数量,避免超出限制
  • 引用追踪:让模型标注答案的信息来源,提高可信度

4.3 系统性能优化

# 使用缓存提升响应速度
from functools import lru_cache
import hashlib

class CachedRAGPipeline(RAGPipeline):
    """带缓存的 RAG 管道"""
    
    def __init__(self, *args, cache_size: int = 1000, **kwargs):
        super().__init__(*args, **kwargs)
        self.query_cache = {}
        self.cache_size = cache_size
    
    def _get_cache_key(self, question: str) -> str:
        """生成缓存键"""
        return hashlib.md5(question.encode()).hexdigest()
    
    def query(self, question: str, **kwargs) -> dict:
        """带缓存的查询"""
        cache_key = self._get_cache_key(question)
        
        # 检查缓存
        if cache_key in self.query_cache:
            print("✓ 命中缓存")
            return self.query_cache[cache_key]
        
        # 执行查询
        result = super().query(question, **kwargs)
        
        # 缓存结果
        if len(self.query_cache) >= self.cache_size:
            # 简单的 LRU:删除最早的缓存
            oldest_key = next(iter(self.query_cache))
            del self.query_cache[oldest_key]
        
        self.query_cache[cache_key] = result
        return result

五、总结

RAG 技术是当前 AI 应用开发的核心能力之一。通过本教程,你已经掌握了:

  1. RAG 核心原理:理解检索增强生成的工作机制
  2. 数据处理流程:从文档加载到文本分块的完整方案
  3. 向量数据库应用:使用 ChromaDB 构建语义检索系统
  4. 完整 Pipeline 实现:端到端的 RAG 系统代码
  5. 优化策略:提升检索和生成质量的方法

下一步可以探索的方向:高级分块策略、多模态 RAG、Agent 结合 RAG 等。掌握 RAG 技术,将让你在 AI 应用开发领域如虎添翼!

本文链接:https://www.kkkliao.cn/?id=942 转载需授权!

分享到:

版权声明:本文由廖万里的博客发布,如需转载请注明出处。


发表评论

访客

看不清,换一张

◎欢迎参与讨论,请在这里发表您的看法和观点。