ailearn

AI项目实战 - 知识库问答系统

构建企业级知识库问答系统,支持多种文档格式

访问-- -- --

前置知识:需要先掌握 RAG基础

本文重点:企业级知识库系统实现


一、系统架构

知识库问答系统:
文档处理流程:
上传 → 解析 → 分割 → 向量化 → 存储
问答流程:
问题 → 向量化 → 检索 → 重排序 → 生成 → 引用
核心模块:
├── 文档管理:上传、解析、版本控制
├── 向量引擎:索引、检索、更新
├── 问答引擎:理解、检索、生成
└── 管理后台:知识库管理、效果评估

二、文档处理

2.1 多格式解析

# document_processor.py
from langchain.document_loaders import PyPDFLoader, Docx2txtLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
import os
class DocumentProcessor:
    """文档处理器"""
    
    def __init__(self, chunk_size=500, chunk_overlap=50):
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=["\n\n", "\n", "。", "!", "?", ";", " ", ""]
        )
    
    def load_document(self, file_path):
        ext = os.path.splitext(file_path)[1].lower()
        
        loaders = {
            '.pdf': PyPDFLoader,
            '.docx': Docx2txtLoader,
            '.txt': TextLoader
        }
        
        loader_class = loaders.get(ext, TextLoader)
        loader = loader_class(file_path)
        return loader.load()
    
    def process(self, file_path):
        documents = self.load_document(file_path)
        chunks = self.text_splitter.split_documents(documents)
        
        for i, chunk in enumerate(chunks):
            chunk.metadata["chunk_id"] = f"{os.path.basename(file_path)}_{i}"
            chunk.metadata["source"] = file_path
        
        return chunks

2.2 表格处理

import pandas as pd
def process_table(file_path):
    """处理Excel/CSV表格"""
    if file_path.endswith('.csv'):
        df = pd.read_csv(file_path)
    else:
        df = pd.read_excel(file_path)
    
    chunks = []
    
    # 表头描述
    header_desc = f"表格包含以下列:{', '.join(df.columns.tolist())}"
    chunks.append({"content": header_desc, "type": "header"})
    
    # 逐行转换
    for idx, row in df.iterrows():
        row_text = " | ".join([f"{col}: {val}" for col, val in row.items()])
        chunks.append({"content": row_text, "type": "row", "row_idx": idx})
    
    return chunks

三、问答引擎

# qa_engine.py
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
import chromadb
class KnowledgeQA:
    """知识库问答引擎"""
    
    def __init__(self, config):
        self.llm = ChatOpenAI(model=config["model"])
        self.embeddings = OpenAIEmbeddings()
        
        self.client = chromadb.PersistentClient(config["persist_dir"])
        self.collection = self.client.get_or_create_collection("knowledge")
    
    def index_documents(self, documents, doc_id):
        texts = [doc.page_content for doc in documents]
        metadatas = [doc.metadata for doc in documents]
        ids = [f"{doc_id}_{i}" for i in range(len(documents))]
        
        self.collection.add(
            documents=texts,
            metadatas=metadatas,
            ids=ids
        )
    
    def query(self, question, top_k=5):
        results = self.collection.query(
            query_texts=[question],
            n_results=top_k
        )
        
        context = "\n\n".join(results['documents'][0])
        
        prompt = f"""基于以下知识库内容回答问题。
如果知识库中没有相关信息,请明确说明。
知识库内容:
{context}
问题:{question}
请回答:"""
        
        response = self.llm.invoke(prompt)
        
        return {
            "answer": response.content,
            "sources": [
                {"content": doc[:200], "metadata": meta}
                for doc, meta in zip(results['documents'][0], results['metadatas'][0])
            ]
        }
    
    def delete_document(self, doc_id):
        all_ids = self.collection.get()["ids"]
        doc_ids = [id for id in all_ids if id.startswith(doc_id)]
        if doc_ids:
            self.collection.delete(ids=doc_ids)

四、管理API

# api.py
from fastapi import FastAPI, UploadFile, File
from pydantic import BaseModel
import uuid, os
app = FastAPI(title="知识库问答系统")
qa_engine = KnowledgeQA({"model": "gpt-3.5-turbo", "persist_dir": "./db"})
doc_processor = DocumentProcessor()
class QueryRequest(BaseModel):
    question: str
    top_k: int = 5
@app.post("/documents/upload")
async def upload_document(file: UploadFile = File(...)):
    doc_id = str(uuid.uuid4())
    file_path = f"./uploads/{doc_id}_{file.filename}"
    
    os.makedirs("./uploads", exist_ok=True)
    with open(file_path, "wb") as f:
        f.write(await file.read())
    
    chunks = doc_processor.process(file_path)
    qa_engine.index_documents(chunks, doc_id)
    
    return {"doc_id": doc_id, "filename": file.filename, "chunks": len(chunks)}
@app.post("/query")
async def query(request: QueryRequest):
    return qa_engine.query(request.question, request.top_k)
@app.delete("/documents/{doc_id}")
async def delete_document(doc_id: str):
    qa_engine.delete_document(doc_id)
    return {"status": "deleted"}

参考资源


返回AI项目实战 最后更新: 2026年4月20日

访问 --

讨论与反馈