前言
在大模型应用开发中,RAG(Retrieval-Augmented Generation,检索增强生成)是最核心的技术范式之一。它通过将外部知识库与大语言模型结合,解决了LLM的"幻觉"问题和知识时效性问题。
本文将从零开始,使用 LangChain 框架和 ChromaDB 向量数据库,构建一个完整的本地知识库问答系统。所有代码均可本地运行,无需依赖云端API。
为什么需要RAG?
直接使用大模型存在以下痛点:
- 知识截断:模型训练数据有截止日期,无法回答最新问题
- 幻觉问题:模型会"编造"不存在的信息
- 领域缺失:通用模型缺乏特定领域的专业知识
- 数据安全:企业内部数据不适合上传到第三方API
RAG的核心思路是:先检索,再生成。从知识库中检索相关文档片段,将其作为上下文注入Prompt,让大模型基于真实数据进行回答。
技术架构
1
2
3
4
5
6
7
|
用户提问
↓
[文档加载] → [文本分割] → [向量化] → [ChromaDB存储]
↓
用户提问 → [Embedding] → [向量检索] → [Top-K相关片段]
↓
[Prompt组装] → [LLM生成回答]
|
环境准备
安装依赖
1
2
3
4
5
6
7
8
9
10
|
# 创建虚拟环境
python -m venv rag_env
source rag_env/bin/activate # Linux/Mac
# rag_env\Scripts\activate # Windows
# 安装核心依赖
pip install langchain langchain-community langchain-chroma chromadb
pip install sentence-transformers # 本地Embedding模型
pip install unstructured # 文档加载器
pip install tiktoken # Token计算
|
项目结构
1
2
3
4
5
6
7
8
9
|
rag_project/
├── knowledge_base/ # 知识库文档目录
│ ├── doc1.txt
│ ├── doc2.md
│ └── doc3.pdf
├── vector_db/ # ChromaDB持久化目录
├── rag_app.py # 主程序
├── config.py # 配置文件
└── requirements.txt
|
核心代码实现
1. 配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
# config.py
import os
class Config:
# 文档目录
KNOWLEDGE_DIR = os.path.join(os.path.dirname(__file__), "knowledge_base")
# ChromaDB持久化路径
CHROMA_PERSIST_DIR = os.path.join(os.path.dirname(__file__), "vector_db")
# 向量数据库集合名称
COLLECTION_NAME = "knowledge_base"
# Embedding模型(使用本地模型,无需API Key)
EMBEDDING_MODEL = "shibing624/text2vec-base-chinese"
# 文本分割参数
CHUNK_SIZE = 500
CHUNK_OVERLAP = 50
# 检索参数
TOP_K = 3
# LLM配置(使用本地Ollama)
LLM_MODEL = "qwen2.5:7b"
LLM_BASE_URL = "http://localhost:11434"
|
2. 文档加载与分割
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# document_processor.py
from langchain_community.document_loaders import (
TextLoader,
DirectoryLoader,
UnstructuredMarkdownLoader,
PyPDFLoader,
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from config import Config
class DocumentProcessor:
"""文档处理器:负责加载和分割文档"""
def __init__(self):
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=Config.CHUNK_SIZE,
chunk_overlap=Config.CHUNK_OVERLAP,
length_function=len,
separators=["\n\n", "\n", "。", "!", "?", ".", "!", "?", " "],
)
def load_documents(self, directory: str = None):
"""加载目录下所有支持的文档"""
directory = directory or Config.KNOWLEDGE_DIR
all_docs = []
# 加载TXT文件
try:
txt_loader = DirectoryLoader(
directory,
glob="**/*.txt",
loader_cls=TextLoader,
loader_kwargs={"encoding": "utf-8"},
)
all_docs.extend(txt_loader.load())
except Exception as e:
print(f"[WARN] 加载TXT文件失败: {e}")
# 加载Markdown文件
try:
md_loader = DirectoryLoader(
directory,
glob="**/*.md",
loader_cls=UnstructuredMarkdownLoader,
)
all_docs.extend(md_loader.load())
except Exception as e:
print(f"[WARN] 加载Markdown文件失败: {e}")
# 加载PDF文件
try:
pdf_loader = DirectoryLoader(
directory,
glob="**/*.pdf",
loader_cls=PyPDFLoader,
)
all_docs.extend(pdf_loader.load())
except Exception as e:
print(f"[WARN] 加载PDF文件失败: {e}")
print(f"[INFO] 共加载 {len(all_docs)} 个文档片段")
return all_docs
def split_documents(self, documents):
"""将文档分割为小块"""
chunks = self.text_splitter.split_documents(documents)
print(f"[INFO] 分割后共 {len(chunks)} 个文本块")
return chunks
|
3. 向量数据库管理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
# vector_store.py
from langchain_chroma import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from config import Config
class VectorStoreManager:
"""向量数据库管理器"""
def __init__(self):
# 初始化本地Embedding模型
print("[INFO] 加载Embedding模型...")
self.embeddings = HuggingFaceEmbeddings(
model_name=Config.EMBEDDING_MODEL,
model_kwargs={"device": "cpu"},
encode_kwargs={"normalize_embeddings": True},
)
print("[INFO] Embedding模型加载完成")
self.vector_store = None
def create_or_load(self, chunks=None):
"""创建新的向量库或加载已有的"""
if chunks:
# 创建新的向量数据库
self.vector_store = Chroma.from_documents(
documents=chunks,
embedding=self.embeddings,
collection_name=Config.COLLECTION_NAME,
persist_directory=Config.CHROMA_PERSIST_DIR,
)
print(f"[INFO] 向量数据库创建完成,共 {self.vector_store._collection.count()} 条记录")
else:
# 加载已有的向量数据库
self.vector_store = Chroma(
collection_name=Config.COLLECTION_NAME,
embedding_function=self.embeddings,
persist_directory=Config.CHROMA_PERSIST_DIR,
)
count = self.vector_store._collection.count()
print(f"[INFO] 加载已有向量数据库,共 {count} 条记录")
return self.vector_store
def add_documents(self, chunks):
"""向已有数据库添加新文档"""
if not self.vector_store:
raise ValueError("请先调用 create_or_load 初始化数据库")
self.vector_store.add_documents(chunks)
print(f"[INFO] 新增 {len(chunks)} 条记录")
def search(self, query: str, top_k: int = None):
"""相似度检索"""
top_k = top_k or Config.TOP_K
results = self.vector_store.similarity_search_with_score(query, k=top_k)
print(f"\n[INFO] 检索 '{query}' 的 Top-{top_k} 结果:")
for i, (doc, score) in enumerate(results):
print(f" {i+1}. [相似度: {score:.4f}] {doc.page_content[:80]}...")
return results
|
4. RAG问答引擎
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
# rag_engine.py
from langchain_community.llms import Ollama
from langchain.prompts import ChatPromptTemplate
from langchain.schema.runnable import RunnablePassthrough
from langchain.schema.output_parser import StrOutputParser
from config import Config
# RAG提示词模板
RAG_PROMPT_TEMPLATE = """你是一个专业的技术助手。请根据以下提供的参考资料来回答用户的问题。
规则:
1. 只基于提供的参考资料回答问题
2. 如果参考资料中没有相关信息,请明确告知用户
3. 回答要准确、简洁、有条理
4. 在回答末尾标注信息来源
参考资料:
{context}
用户问题:{question}
请回答:"""
class RAGEngine:
"""RAG问答引擎"""
def __init__(self, vector_store):
self.vector_store = vector_store
# 初始化本地大模型(使用Ollama)
print("[INFO] 加载大模型...")
self.llm = Ollama(
model=Config.LLM_MODEL,
base_url=Config.LLM_BASE_URL,
temperature=0.3,
)
print("[INFO] 大模型加载完成")
# 构建RAG链
self.rag_chain = self._build_chain()
def _build_chain(self):
"""构建RAG处理链"""
prompt = ChatPromptTemplate.from_template(RAG_PROMPT_TEMPLATE)
# 获取向量数据库的检索器
retriever = self.vector_store.as_retriever(
search_type="similarity",
search_kwargs={"k": Config.TOP_K},
)
def format_docs(docs):
"""格式化检索到的文档"""
formatted = []
for i, doc in enumerate(docs):
source = doc.metadata.get("source", "未知来源")
formatted.append(f"[文档{i+1}] (来源: {source})\n{doc.page_content}")
return "\n\n".join(formatted)
# 组装完整的RAG链
rag_chain = (
{
"context": retriever | format_docs,
"question": RunnablePassthrough(),
}
| prompt
| self.llm
| StrOutputParser()
)
return rag_chain
def query(self, question: str) -> str:
"""执行RAG问答"""
print(f"\n{'='*60}")
print(f"问题: {question}")
print(f"{'='*60}")
answer = self.rag_chain.invoke(question)
return answer
def query_with_sources(self, question: str):
"""带来源的RAG问答"""
# 检索相关文档
docs = self.vector_store.similarity_search(question, k=Config.TOP_K)
# 生成回答
answer = self.query(question)
return {
"question": question,
"answer": answer,
"sources": [
{"content": doc.page_content[:200], "source": doc.metadata.get("source", "")}
for doc in docs
],
}
|
5. 主程序入口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
# rag_app.py
import sys
from document_processor import DocumentProcessor
from vector_store import VectorStoreManager
from rag_engine import RAGEngine
from config import Config
def build_knowledge_base():
"""构建知识库(首次运行时执行)"""
print("=" * 60)
print(" 知识库构建流程")
print("=" * 60)
# Step 1: 加载文档
processor = DocumentProcessor()
documents = processor.load_documents()
if not documents:
print("[ERROR] 未找到任何文档,请检查 knowledge_base 目录")
return None
# Step 2: 文本分割
chunks = processor.split_documents(documents)
# Step 3: 向量化并存储
store_manager = VectorStoreManager()
vector_store = store_manager.create_or_load(chunks=chunks)
print(f"\n[SUCCESS] 知识库构建完成!")
return vector_store
def interactive_qa(vector_store):
"""交互式问答"""
print("=" * 60)
print(" 本地知识库问答系统")
print(f" 模型: {Config.LLM_MODEL}")
print(f" 知识库: {Config.COLLECTION_NAME}")
print(" 输入 'quit' 退出,'search:' 前缀可只检索不生成")
print("=" * 60)
engine = RAGEngine(vector_store)
while True:
try:
question = input("\n🔍 你的问题: ").strip()
if question.lower() in ("quit", "exit", "q"):
print("👋 再见!")
break
if not question:
continue
# 纯检索模式
if question.startswith("search:"):
query = question[7:].strip()
vector_store_manager = VectorStoreManager()
vector_store_manager.vector_store = vector_store
vector_store_manager.search(query)
continue
# RAG问答
result = engine.query_with_sources(question)
print(f"\n💡 回答:\n{result['answer']}")
print(f"\n📚 参考来源:")
for src in result["sources"]:
print(f" - [{src['source']}] {src['content'][:100]}...")
except KeyboardInterrupt:
print("\n👋 再见!")
break
except Exception as e:
print(f"[ERROR] 处理失败: {e}")
def main():
import argparse
parser = argparse.ArgumentParser(description="本地知识库RAG问答系统")
parser.add_argument("--build", action="store_true", help="构建/更新知识库")
parser.add_argument("--chat", action="store_true", help="进入问答交互")
args = parser.parse_args()
if args.build:
vector_store = build_knowledge_base()
if vector_store and args.chat:
interactive_qa(vector_store)
elif args.chat:
# 直接加载已有知识库
store_manager = VectorStoreManager()
vector_store = store_manager.create_or_load()
interactive_qa(vector_store)
else:
print("用法:")
print(" python rag_app.py --build # 构建知识库")
print(" python rag_app.py --build --chat # 构建后进入问答")
print(" python rag_app.py --chat # 直接进入问答")
if __name__ == "__main__":
main()
|
进阶优化
1. 混合检索策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
# hybrid_search.py
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
class HybridRetriever:
"""混合检索:向量检索 + 关键词检索"""
def __init__(self, vector_store, chunks):
# 向量检索器
self.vector_retriever = vector_store.as_retriever(
search_kwargs={"k": 5}
)
# BM25关键词检索器
self.bm25_retriever = BM25Retriever.from_documents(chunks)
self.bm25_retriever.k = 5
# 混合检索:各占50%权重
self.ensemble_retriever = EnsembleRetriever(
retrievers=[self.vector_retriever, self.bm25_retriever],
weights=[0.6, 0.4],
)
def search(self, query: str, k: int = 3):
"""混合检索"""
results = self.ensemble_retriever.invoke(query)
return results[:k]
|
2. 查询重写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
# query_rewriter.py
REWRITE_PROMPT = """请将以下用户问题改写为更适合检索的形式。
要求:
1. 补充可能缺失的关键信息
2. 使用更精确的技术术语
3. 保持问题的核心意图
原始问题:{question}
改写后的问题:"""
class QueryRewriter:
"""查询重写:提升检索效果"""
def __init__(self, llm):
self.llm = llm
def rewrite(self, question: str) -> str:
"""重写用户查询"""
prompt = REWRITE_PROMPT.format(question=question)
rewritten = self.llm.invoke(prompt)
print(f"[INFO] 原始问题: {question}")
print(f"[INFO] 重写问题: {rewritten}")
return rewritten
|
3. 流式输出支持
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
# stream_response.py
def stream_query(engine, question: str):
"""流式输出RAG回答"""
retriever = engine.vector_store.as_retriever(search_kwargs={"k": 3})
docs = retriever.invoke(question)
context = "\n\n".join([doc.page_content for doc in docs])
prompt = RAG_PROMPT_TEMPLATE.format(context=context, question=question)
# 流式输出
print("\n💡 回答: ", end="", flush=True)
for chunk in engine.llm.stream(prompt):
print(chunk, end="", flush=True)
print()
|
性能优化建议
| 优化方向 |
具体措施 |
效果 |
| Embedding加速 |
使用GPU + ONNX Runtime |
推理速度提升5-10倍 |
| 向量检索优化 |
使用HNSW索引参数调优 |
检索延迟降低50% |
| 缓存策略 |
对常见问题做语义缓存 |
减少重复计算 |
| 分块策略 |
根据文档类型调整chunk_size |
提升检索精度 |
| 模型量化 |
使用GGUF量化模型 |
内存占用降低60% |
常见问题排查
Q: ChromaDB报错 disk I/O error
1
2
3
|
# 清除损坏的数据库重新构建
rm -rf vector_db/
python rag_app.py --build
|
Q: Embedding模型下载慢
1
2
3
|
# 使用镜像源
import os
os.environ["HF_ENDPOINT"] = "https://hf-mirror.com"
|
Q: 内存不足(OOM)
1
2
3
4
5
6
|
# 减小batch_size并启用内存映射
embeddings = HuggingFaceEmbeddings(
model_name="shibing624/text2vec-base-chinese",
model_kwargs={"device": "cpu"},
encode_kwargs={"normalize_embeddings": True, "batch_size": 8},
)
|
总结
本文介绍了如何使用LangChain + ChromaDB构建一个完整的本地RAG问答系统。核心流程是:文档加载 → 文本分割 → 向量化存储 → 相似度检索 → LLM生成回答。
在实际项目中,还需要关注:
- 数据质量:清洗文档、去除噪音,直接影响回答质量
- 检索精度:通过混合检索、查询重写、重排序等手段优化
- Prompt工程:精心设计提示词,约束模型输出格式
- 评估体系:使用RAGAS等工具量化评估系统效果
RAG是当前大模型落地最实用的技术路径之一,掌握它是构建企业级AI应用的基础能力。