我们面临一个日益严峻的挑战:内部技术文档库(数万份Markdown文件,托管在Git中)的检索效率已经降至冰点。传统的基于关键词的搜索(如grep或Elasticsearch的BM25)在处理概念相似但用词不同的查询时表现拙劣。“如何实现服务熔断?”的查询无法找到一篇详尽阐述“弹性工程中的断路器模式”的文章。显而易见,我们需要的是语义搜索。
初步的技术构想是引入向量数据库,但随之而来的是一系列生产环境必须面对的棘手问题:
- 安全性: 文档包含公司核心IP,整个索引和查询过程必须在严格隔离的网络环境中进行。
- 性能: 将数万份文档进行分块、调用Embedding模型生成向量,是一个计算密集且耗时的过程。在Gatsby的构建阶段(
gatsby build)同步执行此操作,会将构建时间从几分钟延长到数小时,这是不可接受的。 - 灵活性: 纯粹的向量相似度并不能完全满足业务需求。我们需要根据文档的元数据(如作者、更新时间、项目归属)对搜索结果进行动态的、可配置的业务逻辑重排。
这个需求最终催生了一个异构技术栈组合的解决方案:整个后端服务被部署在AWS VPC的私有子网中,利用Celery处理异步的向量化任务,将结果存入Pinecone,并通过一个嵌入了Lua脚本引擎的API层,实现搜索结果的动态排序,最终服务于一个Gatsby构建的静态前端。
架构概览:VPC内的隔离与协同
一切的基础是网络隔离。任何涉及内部数据处理的服务都不能暴露在公网上。我们设计的VPC架构如下:
graph TD
subgraph "AWS Cloud"
subgraph "VPC (10.0.0.0/16)"
subgraph "Public Subnet"
NAT[NAT Gateway]
ALB[Application Load Balancer]
end
subgraph "Private Subnet A"
API[FastAPI Service w/ Lua]
CeleryWorker1[Celery Worker]
end
subgraph "Private Subnet B"
Redis[ElastiCache for Redis]
CeleryWorker2[Celery Worker]
end
end
subgraph "External Services"
Pinecone[Pinecone Service]
OpenAI[OpenAI API]
GitRepo[Git Repository]
end
subgraph "Public Access"
User[User via Browser]
CDN[CDN/S3 for Gatsby Site]
end
end
User --> CDN
CDN -- Client-side API Call --> ALB
ALB -- Routes traffic to --> API
API -- Search Query --> Pinecone
API -- Enqueue Task --> Redis
Redis -- Delivers Task --> CeleryWorker1
Redis -- Delivers Task --> CeleryWorker2
CeleryWorker1 -- Generates Embeddings --> OpenAI
CeleryWorker1 -- Upserts Vectors --> Pinecone
CeleryWorker2 -- Generates Embeddings --> OpenAI
CeleryWorker2 -- Upserts Vectors --> Pinecone
API -- Needs egress --> NAT
CeleryWorker1 -- Needs egress --> NAT
CeleryWorker2 -- Needs egress --> NAT
NAT -- Allows outbound to --> OpenAI
NAT -- Allows outbound to --> Pinecone
GitRepo -- Webhook --> ALB --> API
- VPC: 提供一个逻辑隔离的网络环境。
- Public Subnet: 仅放置ALB(应用负载均衡器)和NAT网关。ALB是唯一面向公网的入口,用于接收来自Gatsby前端的API请求以及Git仓库的Webhook。
- Private Subnets: 我们的核心应用——FastAPI服务、Celery Workers、Redis Broker——全部部署于此。它们没有公网IP,无法从外部直接访问。
- NAT Gateway: 允许私有子网中的服务(如Celery Worker)发起对外部服务(如OpenAI API, Pinecone)的出站请求,但阻止外部主动发起的入站连接。
- 数据流:
- 索引触发: Git仓库更新时,Webhook触发ALB,将事件转发给FastAPI。
- 任务分发: FastAPI接收到Webhook后,将“处理文档”的任务放入Redis队列。
- 异步处理: Celery Workers从队列中获取任务,开始拉取文档、分块、调用Embedding模型、将向量写入Pinecone。
- 查询: 用户在Gatsby站点上搜索,客户端代码向ALB发起API请求。API查询Pinecone,获取初步结果,使用Lua脚本重排,最后返回给前端。
核心实现:Celery异步向量化管道
Celery是整个系统的发动机。它将耗时的IO和CPU密集型任务从主应用线程中解耦出去。
1. 项目结构
/semantic_search_backend
|-- app/
| |-- __init__.py
| |-- main.py # FastAPI 应用
| |-- api/
| | |-- routes.py # API 路由
| |-- services/
| | |-- lua_reranker.py # Lua 排序引擎
| |-- tasks/
| |-- __init__.py
| |-- celery_app.py # Celery 应用实例
| |-- config.py # Celery 配置
| |-- processing.py # 核心任务定义
|-- requirements.txt
|-- Dockerfile
|-- rerank.lua # Lua 排序脚本
2. Celery应用配置 (tasks/celery_app.py and tasks/config.py)
这里的关键是确保Celery能够通过VPC内的私有DNS或IP地址连接到Redis。
tasks/config.py:
import os
class CeleryConfig:
# 从环境变量获取VPC内Redis的endpoint
# 例如: "redis://redis-master.semantic-search.local:6379/0"
BROKER_URL = os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379/0")
RESULT_BACKEND = os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379/0")
# 任务序列化
TASK_SERIALIZER = 'json'
RESULT_SERIALIZER = 'json'
ACCEPT_CONTENT = ['json']
TIMEZONE = 'UTC'
ENABLE_UTC = True
# 任务路由,可以根据任务类型分发到不同队列
TASK_ROUTES = {
'app.tasks.processing.process_document_batch': {'queue': 'indexing_queue'},
}
# 任务执行的超时与确认
TASK_ACKS_LATE = True
TASK_REJECT_ON_WORKER_LOST = True
WORKER_PREFETCH_MULTIPLIER = 1 # 每个worker一次只取一个任务,因为embedding很耗时
tasks/celery_app.py:
from celery import Celery
from app.tasks.config import CeleryConfig
# 创建Celery实例
# 'app.tasks' 是为了让Celery能够自动发现 'processing.py' 中的任务
celery_app = Celery('app.tasks')
# 加载配置
celery_app.config_from_object(CeleryConfig)
# 自动发现任务
celery_app.autodiscover_tasks(['app.tasks'])
if __name__ == '__main__':
celery_app.start()
3. 核心处理任务 (tasks/processing.py)
这是管道的核心逻辑,包含文档分块、向量生成和写入Pinecone的完整过程。
import os
import logging
import time
from typing import List, Dict, Any
import pinecone
from openai import OpenAI
from langchain.text_splitter import RecursiveCharacterTextSplitter
from .celery_app import celery_app
# --- 初始化客户端 ---
# 这里的初始化必须是线程安全的,Celery worker是多进程/多线程的。
# 最好在任务函数内部或使用线程局部存储来处理客户端实例。
def get_openai_client():
return OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
def get_pinecone_index():
pinecone.init(
api_key=os.environ.get("PINECONE_API_KEY"),
environment=os.environ.get("PINECONE_ENVIRONMENT")
)
index_name = "internal-docs"
if index_name not in pinecone.list_indexes():
# 在真实项目中,索引的创建应该是通过IaC工具(如Terraform)管理的
# 而不是在运行时代码中创建。
raise RuntimeError(f"Pinecone index '{index_name}' does not exist.")
return pinecone.Index(index_name)
# --- 日志配置 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- 文本分块器 ---
# 这是一个关键步骤,块的大小直接影响embedding的质量和搜索相关性
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
add_start_index=True,
)
@celery_app.task(
bind=True,
autoretry_for=(Exception,), # 对所有异常都自动重试
retry_kwargs={'max_retries': 3, 'countdown': 60}, # 最多重试3次,间隔60秒
name='app.tasks.processing.process_document_batch'
)
def process_document_batch(self, documents: List[Dict[str, Any]]):
"""
处理一批文档,生成向量并存入Pinecone。
`documents` 的格式: [{'content': '...', 'metadata': {'source': '...', ...}}]
"""
logger.info(f"Task {self.request.id}: Received a batch of {len(documents)} documents to process.")
openai_client = get_openai_client()
pinecone_index = get_pinecone_index()
all_chunks = []
all_metadatas = []
for doc in documents:
# 1. 分块
chunks = text_splitter.split_text(doc['content'])
for i, chunk in enumerate(chunks):
# 2. 为每个块构建丰富的元数据
# 这是提升搜索质量的关键,元数据可以用于过滤和排序
metadata = doc.get('metadata', {}).copy()
metadata.update({
'chunk_index': i,
'text': chunk, # 存储原文用于结果展示
})
all_chunks.append(chunk)
all_metadatas.append(metadata)
if not all_chunks:
logger.warning("No text chunks were generated from the documents.")
return "No content to process."
# 3. 批量生成Embeddings
# 减少API调用次数以提高效率
try:
logger.info(f"Generating embeddings for {len(all_chunks)} chunks...")
start_time = time.time()
res = openai_client.embeddings.create(
input=all_chunks,
model="text-embedding-ada-002" # 或者其他更先进的模型
)
embeddings = [record.embedding for record in res.data]
logger.info(f"Embedding generation took {time.time() - start_time:.2f} seconds.")
except Exception as e:
logger.error(f"Failed to generate embeddings: {e}")
# 异常会由Celery的autoretry机制捕获并重试
raise
# 4. 准备写入Pinecone的数据格式
vectors_to_upsert = []
for i, (chunk, embedding, metadata) in enumerate(zip(all_chunks, embeddings, all_metadatas)):
# Pinecone的ID必须是唯一的字符串
vector_id = f"{metadata.get('source', 'unknown')}_{metadata.get('commit_hash', 'HEAD')}_{i}"
vectors_to_upsert.append({
"id": vector_id,
"values": embedding,
"metadata": {k: v for k, v in metadata.items() if k != 'text'} # 不在metadata中存储原文,节省空间
})
# 5. 批量写入Pinecone
# Pinecone建议每次写入不超过100个向量
batch_size = 100
try:
logger.info(f"Upserting {len(vectors_to_upsert)} vectors to Pinecone...")
start_time = time.time()
for i in range(0, len(vectors_to_upsert), batch_size):
batch = vectors_to_upsert[i:i+batch_size]
pinecone_index.upsert(vectors=batch, namespace='internal-docs-ns')
logger.info(f"Pinecone upsert took {time.time() - start_time:.2f} seconds.")
except Exception as e:
logger.error(f"Failed to upsert to Pinecone: {e}")
raise
return f"Successfully processed and indexed {len(documents)} documents, resulting in {len(vectors_to_upsert)} vectors."
这段代码展示了生产级任务的几个要点:详细的日志、针对API调用的异常处理与重试机制、批量操作以提升效率,以及对元数据的精心处理。
动态排序:嵌入Lua脚本引擎
单纯的向量相似度分数有时会产生误导。一篇陈旧但匹配度高的文章可能不如一篇近期更新、匹配度稍低的文章有价值。我们需要一种动态、可热更新的业务逻辑来对结果进行重排。硬编码在Python代码中会使得每次逻辑变更都需要重新部署服务。
Lua是我们的选择。它轻量、快速、易于沙箱化,非常适合作为嵌入式脚本语言。我们使用lupa库在Python中执行Lua。
1. Lua排序脚本 (rerank.lua)
这个脚本定义了一个简单的契约:输入一个包含搜索结果的table,输出一个经过重排的table。
-- rerank.lua
-- 对Pinecone返回的搜索结果进行动态业务逻辑重排
--[[
输入 `results` table 的结构:
[
{
id = "doc1_chunk2",
score = 0.89,
metadata = {
author = "jdoe",
last_modified_timestamp = 1672531200, -- Unix timestamp
project = "ProjectA",
-- ... 其他元数据
}
},
...
]
--]]
function rerank(results)
-- 1. 定义一个用于排序的打分函数
local function calculate_business_score(item)
local vector_score = item.score
local metadata = item.metadata
-- 基础分是向量相似度分
local final_score = vector_score
-- 2. 时间衰减因子:越新的文档权重越高
-- 当前时间的Unix timestamp(由Python传入)
local current_timestamp = tonumber(metadata.current_timestamp)
local doc_timestamp = tonumber(metadata.last_modified_timestamp) or current_timestamp
if current_timestamp and doc_timestamp then
-- 计算文档距今的天数
local days_old = (current_timestamp - doc_timestamp) / (60 * 60 * 24)
-- 每过30天,分数衰减10%
local decay_factor = 0.9 ^ (days_old / 30)
final_score = final_score * decay_factor
end
-- 3. 作者权重:特定资深工程师的文档有更高权重
local author_boost = {
["senior_dev_1"] = 1.2,
["architect_bob"] = 1.5,
}
local boost = author_boost[metadata.author]
if boost then
final_score = final_score * boost
end
-- 4. 项目权重:核心项目的文档权重更高
if metadata.project == "CoreInfrastructure" then
final_score = final_score * 1.1
end
return final_score
end
-- 3. 为每个结果计算新的业务分数
for i, item in ipairs(results) do
item.business_score = calculate_business_score(item)
end
-- 4. 根据新的业务分数进行降序排序
table.sort(results, function(a, b)
return a.business_score > b.business_score
end)
return results
end
2. Python集成 (services/lua_reranker.py)
这个模块负责加载、执行Lua脚本,并处理Python与Lua之间的数据转换。
import time
import logging
from typing import List, Dict, Any
from lupa import LuaRuntime
logger = logging.getLogger(__name__)
class LuaReranker:
def __init__(self, script_path: str):
self.script_path = script_path
self.lua = LuaRuntime(unpack_returned_tuples=True)
self._load_script()
def _load_script(self):
"""加载Lua脚本并编译,只在初始化时执行一次。"""
try:
with open(self.script_path, 'r', encoding='utf-8') as f:
script_content = f.read()
# 在一个受保护的环境中执行脚本,防止污染全局命名空间
self.lua_rerank_func = self.lua.eval(f'function(results) {script_content} return rerank(results) end')
logger.info(f"Successfully loaded and compiled Lua script from {self.script_path}")
except Exception as e:
logger.error(f"Failed to load Lua script: {e}")
self.lua_rerank_func = None
def reload(self):
"""提供一个热重载脚本的接口,无需重启服务。"""
logger.info("Reloading Lua script...")
self._load_script()
def rerank(self, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
if not self.lua_rerank_func:
logger.error("Lua rerank function is not available. Returning original results.")
return results
if not results:
return []
# 在元数据中注入当前时间戳,供Lua脚本使用
current_timestamp = int(time.time())
for item in results:
if 'metadata' not in item:
item['metadata'] = {}
item['metadata']['current_timestamp'] = current_timestamp
try:
# lupa 自动处理 Python dict/list 到 Lua table 的转换
reranked_results = self.lua_rerank_func(results)
# 将 Lua table 转换回 Python list of dicts
return list(reranked_results.values())
except Exception as e:
logger.error(f"Error during Lua execution: {e}. Returning original results.")
# 发生错误时,优雅降级,返回原始结果
return results
# 在FastAPI应用中实例化
# reranker = LuaReranker(script_path="rerank.lua")
这种设计将易变的业务逻辑从核心服务中剥离,运维人员可以直接修改rerank.lua文件并通过一个API端点触发reranker.reload()方法,实现业务逻辑的动态更新,极大地提高了灵活性。
Gatsby前端与API的最终交互
Gatsby站点本身是静态的,托管在S3/CDN上。它的搜索功能通过客户端JavaScript实现,向部署在VPC内的ALB发起安全的HTTPS请求。
FastAPI应用 (app/api/routes.py) 中的搜索端点会执行以下操作:
- 接收用户查询字符串。
- 调用OpenAI API将查询字符串转换为向量。
- 使用该向量查询Pinecone,获取Top-K相似的文档块。
- 将Pinecone返回的结果(包含分数和元数据)传递给
LuaReranker实例。 - 获取Lua脚本重排后的结果列表。
- 将最终结果返回给Gatsby前端。
这个架构虽然技术栈跨度大,但每个组件都精确地解决了特定问题:VPC提供了必要的安全隔离;Celery解决了计算密集型任务的异步化,保证了系统的响应能力;Pinecone提供了核心的语义检索能力;Gatsby提供了高性能的浏览体验;而Lua则为整个系统注入了无与伦比的业务灵活性。
局限与展望
当前方案并非没有改进空间。首先,Lua脚本的执行是同步阻塞的,如果排序逻辑变得极其复杂,可能会影响API的响应延迟。一个可能的优化是,对于复杂的重排,可以将其也设计成一个Celery任务。其次,Embedding模型的选择是固定的,未来可以构建一套A/B测试框架,动态评估不同模型对搜索质量的影响。最后,对文档的更新处理目前是全量重新索引,对于大型文档库,引入基于文件内容哈希的增量索引机制将是降低计算资源消耗的关键。