基于 Axum 构建与 Prettier 和 NLP 集成的高性能代码分析与格式化服务


团队的 CI 流水线越来越慢,其中一个瓶颈竟是代码格式化和静态检查步骤。每个作业都要在独立的 Docker 容器里 npm install 然后运行 Prettier,这在 Rust 为主的技术栈中显得格格不入且效率低下。我们需要一个集中式的、高性能的服务来解决这个问题:一个接收代码片段,返回格式化结果和基础代码分析的内部 API。这个决策引出了一系列技术选型:后端框架用什么?如何与 Prettier 这个 Node.js 生态的工具高效交互?

我们最终选择了 Rust 和 Axum 框架。原因很简单:极致的性能、内存安全,以及与 Tokio 生态系统的无缝集成,这对于处理 I/O 密集型任务(如进程间通信)至关重要。本文记录了从零开始构建这个服务的完整过程,重点在于解决核心的跨语言进程通信挑战,并在此基础上,集成简单的 NLP 分析,让服务超越一个单纯的格式化工具。

技术痛点与架构构想

直接在 Rust 中通过 std::process::Command 为每个请求启动一个 prettier 子进程是最直接的想法,但这在生产环境中是完全不可接受的。每次调用的进程创建和销毁开销、Node.js 运行时的启动延迟,都会在高并发下迅速摧毁性能。

一个更稳健的方案是,在我们的 Axum 服务启动时,就创建一个长期运行的 Prettier 子进程。服务通过子进程的 stdin 发送待处理的代码,并通过 stdout 接收格式化后的结果。这种“守护进程”模式将进程启动的开销平摊到整个服务生命周期中,使得单次请求的处理延迟极大降低。

接下来,我们需要为这种通信设计一个简单的协议。JSON-RPC 是一个不错的选择,但为了极致的简洁,我们决定采用更基础的行分割 JSON 协议。Rust 服务向子进程的 stdin 写入一行 JSON 字符串,Node.js 脚本读取这一行,处理后,向 stdout 写回一行结果 JSON。

整个架构的请求生命周期如下:

sequenceDiagram
    participant Client
    participant Axum Service
    participant Prettier Worker (Node.js)

    Client->>+Axum Service: POST /format (code, options)
    Axum Service->>+Prettier Worker: Writes JSON to stdin
    Note right of Axum Service: { "id": "req-1", "code": "...", "parser": "typescript" }
    Prettier Worker-->>-Axum Service: Reads from stdin & formats
    Note left of Prettier Worker: Formatting logic executes
    Axum Service->>+Prettier Worker: Writes JSON to stdout
    Note right of Axum Service: { "id": "req-1", "result": "..." } or { "id": "req-1", "error": "..." }
    Prettier Worker-->>-Client: Returns formatted code & analysis

项目初始化与 Axum 基础搭建

首先是项目结构。我们需要一个 Rust 项目,以及一个存放 Node.js worker 脚本的地方。

# 创建 Rust 项目
cargo new code_formatter_service
cd code_formatter_service

# 创建 Node.js worker 目录
mkdir -p worker
touch worker/prettier-worker.js
touch worker/package.json

worker/package.json 中添加 Prettier 依赖:

{
  "name": "prettier-worker",
  "version": "1.0.0",
  "private": true,
  "dependencies": {
    "prettier": "^3.0.0"
  }
}

然后在 worker 目录运行 npm install

接下来是 Cargo.toml 的配置,我们需要 axum, tokio, serde 作为基础,以及 tracing 用于日志记录。

[package]
name = "code_formatter_service"
version = "0.1.0"
edition = "2021"

[dependencies]
axum = "0.7"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
anyhow = "1.0"
thiserror = "1.0"
uuid = { version = "1.6", features = ["v4"] }

我们的 main.rs 先从一个基础的 Axum 服务开始,包含日志和状态管理。服务的核心状态 AppState 将持有一个到 Prettier Worker 的通信句柄。

// src/main.rs

use axum::{
    routing::post,
    Router,
};
use std::net::SocketAddr;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

// 定义应用状态,稍后会填充
#[derive(Clone)]
struct AppState {}

#[tokio::main]
async fn main() {
    // 初始化日志
    tracing_subscriber::registry()
        .with(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| "code_formatter_service=debug,tower_http=debug".into()),
        )
        .with(tracing_subscriber::fmt::layer())
        .init();

    let state = AppState {};

    let app = Router::new()
        // 稍后实现 format_handler
        // .route("/format", post(format_handler))
        .with_state(state);

    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    tracing::debug!("listening on {}", addr);
    let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

核心:构建 Rust 与 Node.js 的通信桥梁

这是整个项目的技术核心。我们将创建一个 PrettierWorker 结构体来封装与子进程的所有交互。

首先,定义通信协议的数据结构。

// src/worker.rs (新文件)

use serde::{Deserialize, Serialize};
use uuid::Uuid;

#[derive(Serialize, Debug)]
pub struct WorkerRequest {
    pub id: Uuid,
    pub code: String,
    pub parser: String,
}

#[derive(Deserialize, Debug)]
#[serde(untagged)]
pub enum WorkerResponse {
    Success { id: Uuid, result: String },
    Error { id: Uuid, error: String },
}

接下来是 prettier-worker.js 的实现。它必须从 stdin 逐行读取 JSON,调用 Prettier,然后将结果写回 stdout

// worker/prettier-worker.js

const readline = require('readline');
const prettier = require('prettier');

const rl = readline.createInterface({
    input: process.stdin,
    output: process.stdout,
    terminal: false
});

rl.on('line', async (line) => {
    try {
        const request = JSON.parse(line);
        const { id, code, parser } = request;

        if (!id || typeof code !== 'string' || !parser) {
            throw new Error('Invalid request payload');
        }

        const formattedCode = await prettier.format(code, { parser, printWidth: 100 });
        
        const response = {
            id: id,
            result: formattedCode
        };
        // 关键:必须加换行符,以便 Rust 端可以按行读取
        process.stdout.write(JSON.stringify(response) + '\n');

    } catch (e) {
        // 捕获 Prettier 的格式化错误或 JSON 解析错误
        const request = JSON.parse(line); // 尝试再次解析以获取ID
        const response = {
            id: request.id || 'unknown',
            error: e.message
        };
        process.stdout.write(JSON.stringify(response) + '\n');
    }
});

现在,我们来实现 Rust 端的 PrettierWorker。它需要管理子进程的生命周期,并提供一个异步方法来发送请求和接收响应。这里的坑在于,多个 Axum 请求可能会并发地与同一个子进程交互,因此我们需要一个 Mutex 来保证每次只有一个请求在写入和等待响应。同时,stdinstdout 的读写必须是异步的。

// src/worker.rs (续)

use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use tokio::sync::{Mutex, oneshot};
use std::collections::HashMap;
use std::io;
use std::sync::Arc;

// 自定义错误类型
#[derive(thiserror::Error, Debug)]
pub enum WorkerError {
    #[error("IO error: {0}")]
    Io(#[from] io::Error),
    #[error("JSON serialization/deserialization error: {0}")]
    Json(#[from] serde_json::Error),
    #[error("Worker process terminated unexpectedly")]
    ProcessTerminated,
    #[error("Request timed out")]
    Timeout,
    #[error("Worker returned an error: {0}")]
    WorkerError(String),
}

// 负责与子进程通信的Actor
pub struct PrettierWorker {
    stdin: Arc<Mutex<ChildStdin>>,
    // 使用oneshot channel来匹配请求和响应
    pending_requests: Arc<Mutex<HashMap<Uuid, oneshot::Sender<WorkerResponse>>>>,
}

impl PrettierWorker {
    pub fn new() -> Result<Self, WorkerError> {
        let mut child = Command::new("node")
            .arg("worker/prettier-worker.js")
            .stdin(std::process::Stdio::piped())
            .stdout(std::process::Stdio::piped())
            .stderr(std::process::Stdio::inherit()) // 将stderr重定向到父进程,方便调试
            .spawn()?;

        let stdin = child.stdin.take().ok_or_else(|| WorkerError::Io(io::Error::new(io::ErrorKind::Other, "Failed to get stdin")))?;
        let stdout = child.stdout.take().ok_or_else(|| WorkerError::Io(io::Error::new(io::ErrorKind::Other, "Failed to get stdout")))?;

        let pending_requests = Arc::new(Mutex::new(HashMap::new()));
        let pending_requests_clone = Arc::clone(&pending_requests);

        // 启动一个后台任务来持续读取stdout
        tokio::spawn(async move {
            Self::read_output(child, stdout, pending_requests_clone).await;
        });

        Ok(Self {
            stdin: Arc::new(Mutex::new(stdin)),
            pending_requests,
        })
    }
    
    // 后台读取任务
    async fn read_output(
        mut child: Child,
        stdout: ChildStdout,
        pending_requests: Arc<Mutex<HashMap<Uuid, oneshot::Sender<WorkerResponse>>>>
    ) {
        let mut reader = BufReader::new(stdout).lines();
        loop {
            tokio::select! {
                // 等待子进程退出
                _ = child.wait() => {
                    tracing::error!("Prettier worker process terminated unexpectedly.");
                    // 通知所有等待的请求
                    let mut pending = pending_requests.lock().await;
                    pending.clear(); // 发送者被drop时,接收者会收到错误
                    break;
                },
                // 读取一行输出
                line_result = reader.next_line() => {
                    match line_result {
                        Ok(Some(line)) => {
                            if let Ok(response) = serde_json::from_str::<WorkerResponse>(&line) {
                                let id = match &response {
                                    WorkerResponse::Success { id, .. } => *id,
                                    WorkerResponse::Error { id, .. } => *id,
                                };
                                let mut pending = pending_requests.lock().await;
                                if let Some(tx) = pending.remove(&id) {
                                    let _ = tx.send(response);
                                }
                            } else {
                                tracing::warn!("Failed to parse worker response: {}", line);
                            }
                        },
                        Ok(None) => {
                            // stdout关闭,进程已退出
                             tracing::info!("Prettier worker stdout closed.");
                             break;
                        }
                        Err(e) => {
                            tracing::error!("Error reading from worker stdout: {}", e);
                            break;
                        }
                    }
                }
            }
        }
    }

    pub async fn format(&self, code: String, parser: String) -> Result<String, WorkerError> {
        let request = WorkerRequest {
            id: Uuid::new_v4(),
            code,
            parser,
        };

        let (tx, rx) = oneshot::channel();
        
        {
            let mut pending = self.pending_requests.lock().await;
            pending.insert(request.id, tx);
        }

        let mut request_json = serde_json::to_string(&request)? + "\n";
        
        {
            let mut stdin_guard = self.stdin.lock().await;
            stdin_guard.write_all(request_json.as_bytes()).await?;
            stdin_guard.flush().await?;
        }

        // 设置一个超时
        match tokio::time::timeout(tokio::time::Duration::from_secs(5), rx).await {
            Ok(Ok(response)) => match response {
                WorkerResponse::Success { result, .. } => Ok(result),
                WorkerResponse::Error { error, .. } => Err(WorkerError::WorkerError(error)),
            },
            Ok(Err(_)) => Err(WorkerError::ProcessTerminated), // channel被关闭
            Err(_) => Err(WorkerError::Timeout),
        }
    }
}

这段代码是系统的核心。我们不再为每个请求加锁整个进程,而是使用一个HashMap来追踪在途请求。每个请求在发送前,都会在map中注册一个oneshot::Sender。后台的读取任务在收到响应后,根据ID找到对应的Sender并发送结果。这种设计极大地提升了并发能力。

集成到 Axum Handler 并添加 NLP

现在,我们将 PrettierWorker 集成到 AppStateaxum 的 handler 中。

// src/main.rs (更新)

mod worker; // 引入worker模块

use axum::{
    extract::State,
    http::StatusCode,
    response::{IntoResponse, Response},
    routing::post,
    Json, Router,
};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::sync::Arc;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use worker::PrettierWorker;

// 将Worker封装在Arc中,以便在多线程间共享
#[derive(Clone)]
struct AppState {
    worker: Arc<PrettierWorker>,
}

#[derive(Deserialize)]
struct FormatRequest {
    code: String,
    parser: String,
}

#[derive(Serialize)]
struct FormatResponse {
    formatted_code: String,
    analysis: CodeAnalysis,
}

#[derive(Serialize)]
struct CodeAnalysis {
    todo_comment_count: usize,
}

async fn format_handler(
    State(state): State<AppState>,
    Json(payload): Json<FormatRequest>,
) -> Result<Json<FormatResponse>, AppError> {
    let formatted_code = state
        .worker
        .format(payload.code, payload.parser)
        .await?;

    // 在这里集成NLP逻辑
    // 这是一个简单的例子:使用regex统计TODO注释的数量
    let todo_regex = regex::Regex::new(r"(?i)//\s*TODO").unwrap();
    let todo_count = todo_regex.find_iter(&formatted_code).count();

    let response = FormatResponse {
        formatted_code,
        analysis: CodeAnalysis {
            todo_comment_count: todo_count,
        },
    };

    Ok(Json(response))
}

// 统一的错误处理
struct AppError(worker::WorkerError);

impl IntoResponse for AppError {
    fn into_response(self) -> Response {
        let (status, error_message) = match self.0 {
            worker::WorkerError::WorkerError(msg) => (StatusCode::BAD_REQUEST, format!("Formatting error: {}", msg)),
            worker::WorkerError::Timeout => (StatusCode::GATEWAY_TIMEOUT, "Request to worker timed out".to_string()),
            _ => (StatusCode::INTERNAL_SERVER_ERROR, "An internal error occurred".to_string()),
        };
        (status, Json(serde_json::json!({ "error": error_message }))).into_response()
    }
}

impl<E> From<E> for AppError where E: Into<worker::WorkerError> {
    fn from(err: E) -> Self {
        Self(err.into())
    }
}


#[tokio::main]
async fn main() -> anyhow::Result<()> {
    tracing_subscriber::registry()
        .with(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| "code_formatter_service=debug,tower_http=debug".into()),
        )
        .with(tracing_subscriber::fmt::layer())
        .init();
    
    // 还需要添加 regex 依赖
    // cargo add regex
    
    let worker = PrettierWorker::new()?;
    let state = AppState {
        worker: Arc::new(worker),
    };

    let app = Router::new()
        .route("/format", post(format_handler))
        .with_state(state);

    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    tracing::debug!("listening on {}", addr);
    let listener = tokio::net::TcpListener::bind(addr).await?;
    axum::serve(listener, app).await?;
    
    Ok(())
}

别忘了添加 regex 依赖到 Cargo.toml: cargo add regex -F unistring

至此,一个功能完备的服务已经成型。它能接收代码,通过一个长期运行的 Node.js 进程高效地进行格式化,然后用 Rust 的原生性能进行额外的代码分析,最后将合并的结果返回给客户端。错误处理也得到了妥善的设计,无论是 Prettier 自身的格式化错误,还是进程通信的内部错误,都能以合适的 HTTP 状态码和信息返回。

生产环境考量与测试思路

在真实项目中,还有几点需要完善:

  1. 配置化: Prettier worker 的路径、服务器端口等应该通过配置文件或环境变量来管理,而不是硬编码。可以使用 config-rsdotenv 等库。
  2. 更健壮的 Worker 管理: 当前实现中,如果 Node.js 进程崩溃,整个服务将无法工作,并且不会自动重启。一个生产级的实现应该包含一个监控协程,使用 child.wait().await 来监听子进程的退出,并在必要时尝试重启它,这通常被称为“看门狗”(watchdog)模式。
  3. 单元测试: format_handler 可以通过 axum::body::to_byteshttp::Request 来进行单元测试。对于 PrettierWorker,可以编写一个模拟的 Node.js 脚本作为测试替身,该脚本仅回显请求或返回预设的错误,从而在不依赖真实 Prettier 的情况下测试通信逻辑。
  4. 集成测试: 编写一个测试用例,它实际启动整个 Axum 服务和子进程,然后通过 HTTP 客户端发送请求,断言响应是否符合预期。这能确保端到端的流程正确无误。

局限性与未来迭代方向

当前的设计虽然高效,但仍存在一个瓶颈:单一的 Prettier worker 进程。尽管内部通信是异步的,但在 Node.js 端,格式化任务本身是 CPU 密集型的,这意味着在同一时刻,只有一个格式化任务能在执行。

为了应对更高的并发量,未来的迭代可以探索以下路径:

  1. Worker 池: 启动一个固定大小的 Prettier worker 进程池。Axum 服务端维护一个队列,将请求分发给空闲的 worker。这需要更复杂的进程管理和请求路由逻辑,可以使用 tokio::sync::mpsc channel 来实现。
  2. WASM 替代方案: 对于某些语言的格式化器,存在或可以编译为 WebAssembly (WASM) 的版本。在 Rust 中使用像 Wasmer 或 Wasmtime 这样的运行时来执行 WASM 格式化器,可以完全消除跨语言进程通信的开销和 Node.js 依赖,实现纯 Rust 的解决方案,这将是性能上的终极形态。

尽管存在这些可优化的点,当前架构已经成功地解决的最初的问题,为团队提供了一个稳定、快速、且易于扩展的内部代码处理基础设施。它很好地展示了如何利用 Rust 和 Axum 的强大能力,去粘合和增强其他技术生态中的优秀工具。


  目录