Contents

使用LangChain与ChromaDB构建本地知识库RAG问答系统

前言

在大模型应用开发中,RAG(Retrieval-Augmented Generation,检索增强生成)是最核心的技术范式之一。它通过将外部知识库与大语言模型结合,解决了LLM的"幻觉"问题和知识时效性问题。

本文将从零开始,使用 LangChain 框架和 ChromaDB 向量数据库,构建一个完整的本地知识库问答系统。所有代码均可本地运行,无需依赖云端API。

为什么需要RAG?

直接使用大模型存在以下痛点:

  1. 知识截断:模型训练数据有截止日期,无法回答最新问题
  2. 幻觉问题:模型会"编造"不存在的信息
  3. 领域缺失:通用模型缺乏特定领域的专业知识
  4. 数据安全:企业内部数据不适合上传到第三方API

RAG的核心思路是:先检索,再生成。从知识库中检索相关文档片段,将其作为上下文注入Prompt,让大模型基于真实数据进行回答。

技术架构

1
2
3
4
5
6
7
用户提问
[文档加载] → [文本分割] → [向量化] → [ChromaDB存储]
用户提问 → [Embedding] → [向量检索] → [Top-K相关片段]
                           [Prompt组装] → [LLM生成回答]

环境准备

安装依赖

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# 创建虚拟环境
python -m venv rag_env
source rag_env/bin/activate  # Linux/Mac
# rag_env\Scripts\activate  # Windows

# 安装核心依赖
pip install langchain langchain-community langchain-chroma chromadb
pip install sentence-transformers  # 本地Embedding模型
pip install unstructured  # 文档加载器
pip install tiktoken  # Token计算

项目结构

1
2
3
4
5
6
7
8
9
rag_project/
├── knowledge_base/       # 知识库文档目录
│   ├── doc1.txt
│   ├── doc2.md
│   └── doc3.pdf
├── vector_db/           # ChromaDB持久化目录
├── rag_app.py           # 主程序
├── config.py            # 配置文件
└── requirements.txt

核心代码实现

1. 配置文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# config.py
import os

class Config:
    # 文档目录
    KNOWLEDGE_DIR = os.path.join(os.path.dirname(__file__), "knowledge_base")
    
    # ChromaDB持久化路径
    CHROMA_PERSIST_DIR = os.path.join(os.path.dirname(__file__), "vector_db")
    
    # 向量数据库集合名称
    COLLECTION_NAME = "knowledge_base"
    
    # Embedding模型(使用本地模型,无需API Key)
    EMBEDDING_MODEL = "shibing624/text2vec-base-chinese"
    
    # 文本分割参数
    CHUNK_SIZE = 500
    CHUNK_OVERLAP = 50
    
    # 检索参数
    TOP_K = 3
    
    # LLM配置(使用本地Ollama)
    LLM_MODEL = "qwen2.5:7b"
    LLM_BASE_URL = "http://localhost:11434"

2. 文档加载与分割

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# document_processor.py
from langchain_community.document_loaders import (
    TextLoader,
    DirectoryLoader,
    UnstructuredMarkdownLoader,
    PyPDFLoader,
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from config import Config


class DocumentProcessor:
    """文档处理器:负责加载和分割文档"""
    
    def __init__(self):
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=Config.CHUNK_SIZE,
            chunk_overlap=Config.CHUNK_OVERLAP,
            length_function=len,
            separators=["\n\n", "\n", "。", "!", "?", ".", "!", "?", " "],
        )
    
    def load_documents(self, directory: str = None):
        """加载目录下所有支持的文档"""
        directory = directory or Config.KNOWLEDGE_DIR
        all_docs = []
        
        # 加载TXT文件
        try:
            txt_loader = DirectoryLoader(
                directory,
                glob="**/*.txt",
                loader_cls=TextLoader,
                loader_kwargs={"encoding": "utf-8"},
            )
            all_docs.extend(txt_loader.load())
        except Exception as e:
            print(f"[WARN] 加载TXT文件失败: {e}")
        
        # 加载Markdown文件
        try:
            md_loader = DirectoryLoader(
                directory,
                glob="**/*.md",
                loader_cls=UnstructuredMarkdownLoader,
            )
            all_docs.extend(md_loader.load())
        except Exception as e:
            print(f"[WARN] 加载Markdown文件失败: {e}")
        
        # 加载PDF文件
        try:
            pdf_loader = DirectoryLoader(
                directory,
                glob="**/*.pdf",
                loader_cls=PyPDFLoader,
            )
            all_docs.extend(pdf_loader.load())
        except Exception as e:
            print(f"[WARN] 加载PDF文件失败: {e}")
        
        print(f"[INFO] 共加载 {len(all_docs)} 个文档片段")
        return all_docs
    
    def split_documents(self, documents):
        """将文档分割为小块"""
        chunks = self.text_splitter.split_documents(documents)
        print(f"[INFO] 分割后共 {len(chunks)} 个文本块")
        return chunks

3. 向量数据库管理

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# vector_store.py
from langchain_chroma import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from config import Config


class VectorStoreManager:
    """向量数据库管理器"""
    
    def __init__(self):
        # 初始化本地Embedding模型
        print("[INFO] 加载Embedding模型...")
        self.embeddings = HuggingFaceEmbeddings(
            model_name=Config.EMBEDDING_MODEL,
            model_kwargs={"device": "cpu"},
            encode_kwargs={"normalize_embeddings": True},
        )
        print("[INFO] Embedding模型加载完成")
        
        self.vector_store = None
    
    def create_or_load(self, chunks=None):
        """创建新的向量库或加载已有的"""
        if chunks:
            # 创建新的向量数据库
            self.vector_store = Chroma.from_documents(
                documents=chunks,
                embedding=self.embeddings,
                collection_name=Config.COLLECTION_NAME,
                persist_directory=Config.CHROMA_PERSIST_DIR,
            )
            print(f"[INFO] 向量数据库创建完成,共 {self.vector_store._collection.count()} 条记录")
        else:
            # 加载已有的向量数据库
            self.vector_store = Chroma(
                collection_name=Config.COLLECTION_NAME,
                embedding_function=self.embeddings,
                persist_directory=Config.CHROMA_PERSIST_DIR,
            )
            count = self.vector_store._collection.count()
            print(f"[INFO] 加载已有向量数据库,共 {count} 条记录")
        
        return self.vector_store
    
    def add_documents(self, chunks):
        """向已有数据库添加新文档"""
        if not self.vector_store:
            raise ValueError("请先调用 create_or_load 初始化数据库")
        
        self.vector_store.add_documents(chunks)
        print(f"[INFO] 新增 {len(chunks)} 条记录")
    
    def search(self, query: str, top_k: int = None):
        """相似度检索"""
        top_k = top_k or Config.TOP_K
        results = self.vector_store.similarity_search_with_score(query, k=top_k)
        
        print(f"\n[INFO] 检索 '{query}' 的 Top-{top_k} 结果:")
        for i, (doc, score) in enumerate(results):
            print(f"  {i+1}. [相似度: {score:.4f}] {doc.page_content[:80]}...")
        
        return results

4. RAG问答引擎

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# rag_engine.py
from langchain_community.llms import Ollama
from langchain.prompts import ChatPromptTemplate
from langchain.schema.runnable import RunnablePassthrough
from langchain.schema.output_parser import StrOutputParser
from config import Config


# RAG提示词模板
RAG_PROMPT_TEMPLATE = """你是一个专业的技术助手。请根据以下提供的参考资料来回答用户的问题。

规则:
1. 只基于提供的参考资料回答问题
2. 如果参考资料中没有相关信息,请明确告知用户
3. 回答要准确、简洁、有条理
4. 在回答末尾标注信息来源

参考资料:
{context}

用户问题:{question}

请回答:"""


class RAGEngine:
    """RAG问答引擎"""
    
    def __init__(self, vector_store):
        self.vector_store = vector_store
        
        # 初始化本地大模型(使用Ollama)
        print("[INFO] 加载大模型...")
        self.llm = Ollama(
            model=Config.LLM_MODEL,
            base_url=Config.LLM_BASE_URL,
            temperature=0.3,
        )
        print("[INFO] 大模型加载完成")
        
        # 构建RAG链
        self.rag_chain = self._build_chain()
    
    def _build_chain(self):
        """构建RAG处理链"""
        prompt = ChatPromptTemplate.from_template(RAG_PROMPT_TEMPLATE)
        
        # 获取向量数据库的检索器
        retriever = self.vector_store.as_retriever(
            search_type="similarity",
            search_kwargs={"k": Config.TOP_K},
        )
        
        def format_docs(docs):
            """格式化检索到的文档"""
            formatted = []
            for i, doc in enumerate(docs):
                source = doc.metadata.get("source", "未知来源")
                formatted.append(f"[文档{i+1}] (来源: {source})\n{doc.page_content}")
            return "\n\n".join(formatted)
        
        # 组装完整的RAG链
        rag_chain = (
            {
                "context": retriever | format_docs,
                "question": RunnablePassthrough(),
            }
            | prompt
            | self.llm
            | StrOutputParser()
        )
        
        return rag_chain
    
    def query(self, question: str) -> str:
        """执行RAG问答"""
        print(f"\n{'='*60}")
        print(f"问题: {question}")
        print(f"{'='*60}")
        
        answer = self.rag_chain.invoke(question)
        return answer
    
    def query_with_sources(self, question: str):
        """带来源的RAG问答"""
        # 检索相关文档
        docs = self.vector_store.similarity_search(question, k=Config.TOP_K)
        
        # 生成回答
        answer = self.query(question)
        
        return {
            "question": question,
            "answer": answer,
            "sources": [
                {"content": doc.page_content[:200], "source": doc.metadata.get("source", "")}
                for doc in docs
            ],
        }

5. 主程序入口

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
# rag_app.py
import sys
from document_processor import DocumentProcessor
from vector_store import VectorStoreManager
from rag_engine import RAGEngine
from config import Config


def build_knowledge_base():
    """构建知识库(首次运行时执行)"""
    print("=" * 60)
    print("  知识库构建流程")
    print("=" * 60)
    
    # Step 1: 加载文档
    processor = DocumentProcessor()
    documents = processor.load_documents()
    
    if not documents:
        print("[ERROR] 未找到任何文档,请检查 knowledge_base 目录")
        return None
    
    # Step 2: 文本分割
    chunks = processor.split_documents(documents)
    
    # Step 3: 向量化并存储
    store_manager = VectorStoreManager()
    vector_store = store_manager.create_or_load(chunks=chunks)
    
    print(f"\n[SUCCESS] 知识库构建完成!")
    return vector_store


def interactive_qa(vector_store):
    """交互式问答"""
    print("=" * 60)
    print("  本地知识库问答系统")
    print(f"  模型: {Config.LLM_MODEL}")
    print(f"  知识库: {Config.COLLECTION_NAME}")
    print("  输入 'quit' 退出,'search:' 前缀可只检索不生成")
    print("=" * 60)
    
    engine = RAGEngine(vector_store)
    
    while True:
        try:
            question = input("\n🔍 你的问题: ").strip()
            
            if question.lower() in ("quit", "exit", "q"):
                print("👋 再见!")
                break
            
            if not question:
                continue
            
            # 纯检索模式
            if question.startswith("search:"):
                query = question[7:].strip()
                vector_store_manager = VectorStoreManager()
                vector_store_manager.vector_store = vector_store
                vector_store_manager.search(query)
                continue
            
            # RAG问答
            result = engine.query_with_sources(question)
            
            print(f"\n💡 回答:\n{result['answer']}")
            
            print(f"\n📚 参考来源:")
            for src in result["sources"]:
                print(f"  - [{src['source']}] {src['content'][:100]}...")
        
        except KeyboardInterrupt:
            print("\n👋 再见!")
            break
        except Exception as e:
            print(f"[ERROR] 处理失败: {e}")


def main():
    import argparse
    parser = argparse.ArgumentParser(description="本地知识库RAG问答系统")
    parser.add_argument("--build", action="store_true", help="构建/更新知识库")
    parser.add_argument("--chat", action="store_true", help="进入问答交互")
    args = parser.parse_args()
    
    if args.build:
        vector_store = build_knowledge_base()
        if vector_store and args.chat:
            interactive_qa(vector_store)
    elif args.chat:
        # 直接加载已有知识库
        store_manager = VectorStoreManager()
        vector_store = store_manager.create_or_load()
        interactive_qa(vector_store)
    else:
        print("用法:")
        print("  python rag_app.py --build          # 构建知识库")
        print("  python rag_app.py --build --chat   # 构建后进入问答")
        print("  python rag_app.py --chat           # 直接进入问答")


if __name__ == "__main__":
    main()

进阶优化

1. 混合检索策略

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# hybrid_search.py
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever


class HybridRetriever:
    """混合检索:向量检索 + 关键词检索"""
    
    def __init__(self, vector_store, chunks):
        # 向量检索器
        self.vector_retriever = vector_store.as_retriever(
            search_kwargs={"k": 5}
        )
        
        # BM25关键词检索器
        self.bm25_retriever = BM25Retriever.from_documents(chunks)
        self.bm25_retriever.k = 5
        
        # 混合检索:各占50%权重
        self.ensemble_retriever = EnsembleRetriever(
            retrievers=[self.vector_retriever, self.bm25_retriever],
            weights=[0.6, 0.4],
        )
    
    def search(self, query: str, k: int = 3):
        """混合检索"""
        results = self.ensemble_retriever.invoke(query)
        return results[:k]

2. 查询重写

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# query_rewriter.py
REWRITE_PROMPT = """请将以下用户问题改写为更适合检索的形式。
要求:
1. 补充可能缺失的关键信息
2. 使用更精确的技术术语
3. 保持问题的核心意图

原始问题:{question}
改写后的问题:"""


class QueryRewriter:
    """查询重写:提升检索效果"""
    
    def __init__(self, llm):
        self.llm = llm
    
    def rewrite(self, question: str) -> str:
        """重写用户查询"""
        prompt = REWRITE_PROMPT.format(question=question)
        rewritten = self.llm.invoke(prompt)
        print(f"[INFO] 原始问题: {question}")
        print(f"[INFO] 重写问题: {rewritten}")
        return rewritten

3. 流式输出支持

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# stream_response.py
def stream_query(engine, question: str):
    """流式输出RAG回答"""
    retriever = engine.vector_store.as_retriever(search_kwargs={"k": 3})
    docs = retriever.invoke(question)
    context = "\n\n".join([doc.page_content for doc in docs])
    
    prompt = RAG_PROMPT_TEMPLATE.format(context=context, question=question)
    
    # 流式输出
    print("\n💡 回答: ", end="", flush=True)
    for chunk in engine.llm.stream(prompt):
        print(chunk, end="", flush=True)
    print()

性能优化建议

优化方向 具体措施 效果
Embedding加速 使用GPU + ONNX Runtime 推理速度提升5-10倍
向量检索优化 使用HNSW索引参数调优 检索延迟降低50%
缓存策略 对常见问题做语义缓存 减少重复计算
分块策略 根据文档类型调整chunk_size 提升检索精度
模型量化 使用GGUF量化模型 内存占用降低60%

常见问题排查

Q: ChromaDB报错 disk I/O error

1
2
3
# 清除损坏的数据库重新构建
rm -rf vector_db/
python rag_app.py --build

Q: Embedding模型下载慢

1
2
3
# 使用镜像源
import os
os.environ["HF_ENDPOINT"] = "https://hf-mirror.com"

Q: 内存不足(OOM)

1
2
3
4
5
6
# 减小batch_size并启用内存映射
embeddings = HuggingFaceEmbeddings(
    model_name="shibing624/text2vec-base-chinese",
    model_kwargs={"device": "cpu"},
    encode_kwargs={"normalize_embeddings": True, "batch_size": 8},
)

总结

本文介绍了如何使用LangChain + ChromaDB构建一个完整的本地RAG问答系统。核心流程是:文档加载 → 文本分割 → 向量化存储 → 相似度检索 → LLM生成回答

在实际项目中,还需要关注:

  • 数据质量:清洗文档、去除噪音,直接影响回答质量
  • 检索精度:通过混合检索、查询重写、重排序等手段优化
  • Prompt工程:精心设计提示词,约束模型输出格式
  • 评估体系:使用RAGAS等工具量化评估系统效果

RAG是当前大模型落地最实用的技术路径之一,掌握它是构建企业级AI应用的基础能力。