在 Polyrepo 架构下利用 etcd 实现跨语言服务的动态配置与任务分发


定义复杂技术问题

在一个典型的成熟企业中,Polyrepo 是常态而非例外。技术栈的演进留下了不同时代的服务:一个稳定的 Java 服务使用 MyBatis 固守着核心的关系型数据库;一个用于数据科学和AI推理的 Python 集群利用 Ray 进行大规模并行计算;一个现代化的 TypeScript (Node.js) 服务作为 BFF (Backend for Frontend) 层,处理着前端的 API 请求。

这些服务各自独立部署、独立演进,但业务流程却需要它们协同工作。核心的挑战在于:当系统需要动态地将来自 TypeScript BFF 的请求分发给具备特定能力的 Ray Worker,并且这些 Worker 在执行任务时又需要访问由 Java/MyBatis 服务管理的数据库时,如何管理这种跨语言、跨仓库的动态配置与服务发现?

传统的配置文件分发或硬编码连接信息的方式在这种环境下会迅速成为瓶颈。任何 Ray Worker 的增减、数据库连接池参数的变更,都需要跨多个代码仓库进行修改、测试和部署,这与 Polyrepo 提倡的独立、敏捷背道而驰。我们需要一个统一的、动态的控制平面来协调这些异构的服务。

方案A:基于 API 网关和静态配置的中心化路由

一个直接的思路是引入一个智能 API 网关,所有请求都先经过网关。网关内部维护一份路由表和所有下游服务的配置信息,通常存储在一个 Git 仓库的 YAML 文件中。

  • 优势:

    • 逻辑集中,易于理解。
    • 权限、认证、限流等通用逻辑可以在网关层统一实现。
    • 配置有版本控制(GitOps)。
  • 劣势:

    • 动态性差: 这是此方案的致命弱点。当一个 Ray Worker 因负载增加而扩容,或一个新的数据源需要被 Java 服务接管时,都必须手动更新 YAML 文件,然后触发网关的配置重载或重启。这个过程存在延迟,并且在频繁变更的场景下,操作成本极高。
    • 网关过重: 网关不仅要处理路由,还要感知所有后端服务的具体能力(例如,哪个 Ray Worker 有 GPU)。这违反了单一职责原则,使得网关变得异常复杂和脆弱。
    • 服务发现滞后: 服务实例的健康状态和能力信息无法实时同步到网关,容易导致请求被转发到已经下线或不具备处理能力的实例上。

在真实项目中,这种方案仅适用于服务拓扑和配置变更频率极低的场景。对于需要弹性和动态调度的计算密集型应用,它会成为整个系统的瓶颈。

方案B:基于消息队列的任务广播与认领

另一个常见的模式是使用消息队列(如 RabbitMQ 或 Kafka)进行解耦。TypeScript BFF 将任务作为消息发布到特定主题(Topic)中,而 Ray Worker 则作为消费者订阅这些主题来获取任务。

  • 优势:

    • 高度解耦: 生产者和消费者完全分离,可以独立扩展。
    • 异步与削峰填谷: 对于耗时长的计算任务,异步处理能显著改善前端的用户体验。
    • 可靠性: 消息队列通常提供持久化和重试机制。
  • 劣势:

    • 并未解决配置管理问题: Worker 如何知道要连接哪个数据库?数据库的凭证、连接池大小等配置信息依然需要一个管理机制。将这些信息硬编码或放在环境变量中,又回到了静态配置的老路。
    • 复杂的任务路由: 如果任务需要被分发给具备特定能力的 Worker(例如,需要 GPU 的任务只能由带 GPU 的 Worker 处理),简单的 Topic 订阅模式就捉襟见肘了。我们可能需要为每种能力组合创建单独的队列,导致队列数量爆炸,管理变得复杂。
    • 控制平面缺失: 消息队列是优秀的数据平面,但它不提供一个观察和控制整个系统状态的控制平面。我们无法从一个统一的视图得知当前有多少可用 Worker、它们的健康状况以及系统的整体负载。

此方案解决了任务的异步解耦,但将更核心的动态配置和服务发现问题悬置了。

最终选择:以 etcd 作为分布式控制平面

我们的最终选择是引入 etcd,一个基于 Raft 协议的强一致性键值存储。它不作为数据传输的通道,而是作为一个高可用的、实时的“公告板”或“注册中心”,成为整个异构系统的控制平面。

选择理由:

  1. 强一致性: 配置信息、服务地址这类数据,绝不能出现读取到旧值或不一致的情况。etcd 提供的线性化读写保证了这一点。
  2. Watch 机制: 这是 etcd 的核心优势。服务可以“订阅”它们关心的键(Key)或前缀(Prefix)。当数据发生变化时,etcd 会立即将变更推送给所有监听的客户端。这使得服务能够实时响应系统状态的变化,无需低效的轮询。
  3. 租约(Lease)机制: 服务在注册时可以关联一个租约。服务需要定期为租约“续期”,如果服务宕机或网络分区导致无法续期,租约会自动过期,etcd 会自动删除与该租约关联的所有键。这提供了一种简单而可靠的服务健康检查和自动摘除机制。
  4. 跨语言支持: etcd 提供 gRPC 接口,并且拥有各种主流语言的成熟客户端库,完美契合我们跨语言技术栈的需求。

在这个架构中,etcd 存储的不是业务数据,而是系统的元数据:服务实例的位置、能力、健康状况,以及动态的配置参数。

核心实现概览

我们将通过 etcd 来协调 TypeScript、Java 和 Python/Ray 三个部分。

graph TD
    subgraph "etcd Cluster"
        A["/config/database/main"]
        B["/services/registry/java-mybatis-service/instance-01 (lease)"]
        C["/services/registry/ray-workers/worker-gpu-01 (lease)"]
        D["/services/registry/ray-workers/worker-cpu-01 (lease)"]
        E["/tasks/dispatch/task-123"]
    end

    subgraph "TypeScript BFF (Node.js)"
        TS_API[API Endpoint] -- 1. Submit Task --> TS_LOGIC{Dispatch Logic}
        TS_LOGIC -- 2. List Workers --> C & D
        TS_LOGIC -- 3. Write Dispatch Info --> E
    end

    subgraph "Java Service (MyBatis)"
        JAVA[MyBatis App] -- Watches --> A
        JAVA -- Registers --> B
    end

    subgraph "Python Cluster (Ray)"
        RAY_W1[Ray Worker GPU] -- Watches for tasks --> E
        RAY_W1 -- Registers --> C
        RAY_W1 -- Reads DB Config --> A

        RAY_W2[Ray Worker CPU] -- Watches for tasks --> E
        RAY_W2 -- Registers --> D
        RAY_W2 -- Reads DB Config --> A
    end

    User[Client] --> TS_API

    style JAVA fill:#f9f,stroke:#333,stroke-width:2px
    style RAY_W1 fill:#9cf,stroke:#333,stroke-width:2px
    style RAY_W2 fill:#9cf,stroke:#333,stroke-width:2px
    style TS_API fill:#9c9,stroke:#333,stroke-width:2px

etcd 中的数据结构设计:

  • /config/database/main: 存储主数据库的连接信息,格式为 JSON。例如:{"jdbcUrl": "...", "username": "...", "password": "...", "maxPoolSize": 20}
  • /services/registry/java-mybatis-service/{instance-id}: Java 服务的实例注册。值可以包含实例的 IP 和端口。这个键会关联一个租约。
  • /services/registry/ray-workers/{worker-id}: Ray Worker 的实例注册。值是一个描述其能力的 JSON,例如 {"host": "...", "port": ..., "capabilities": ["gpu", "large_memory"]}。同样关联租约。
  • /tasks/dispatch/{task-id}: 任务分发信息。当 TypeScript BFF 决定将一个任务分配给某个 Worker 时,会在这里写入一个值,例如 {"workerId": "worker-gpu-01", "payload": "..."}。每个 Worker 只需监听与自己 ID 相关的任务。

关键代码与原理解析

1. Java 服务: 动态 MyBatis DataSource 配置

Java 服务需要启动时注册自己,并监听数据库配置的变化,以便在不重启应用的情况下动态调整 SqlSessionFactory 使用的 DataSource。我们使用 jetcd 客户端。

// pom.xml dependency
// <dependency>
//     <groupId>io.etcd</groupId>
//     <artifactId>jetcd-core</artifactId>
//     <version>0.7.7</version>
// </dependency>

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.watch.WatchEvent;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;
import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
import org.mybatis.spring.SqlSessionTemplate;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicReference;
import org.json.JSONObject; // Using org.json for simplicity

public class DynamicMyBatisConfigurator {

    private static final String ETCD_ENDPOINTS = "http://etcd-node1:2379";
    private static final String DB_CONFIG_KEY = "/config/database/main";
    
    private final Client etcdClient;
    private final AtomicReference<SqlSessionFactory> sqlSessionFactoryRef = new AtomicReference<>();

    public DynamicMyBatisConfigurator() {
        this.etcdClient = Client.builder().endpoints(ETCD_ENDPOINTS).build();
    }

    public void initialize() {
        System.out.println("Initializing dynamic MyBatis configurator...");
        // 1. Initial fetch of config
        fetchAndApplyDbConfig();

        // 2. Setup a watch for future changes
        watchDbConfigChanges();
        
        // 3. Register this service instance with a lease (code omitted for brevity)
        // This would involve creating a lease, putting a key with the lease, and keeping it alive.
    }

    private void fetchAndApplyDbConfig() {
        try {
            etcdClient.getKVClient().get(ByteSequence.from(DB_CONFIG_KEY, StandardCharsets.UTF_8))
                .thenAccept(response -> {
                    if (response.getKvs().isEmpty()) {
                        System.err.println("FATAL: Database config key not found in etcd: " + DB_CONFIG_KEY);
                        // In a real app, this should prevent startup
                        return;
                    }
                    String configJson = response.getKvs().get(0).getValue().toString(StandardCharsets.UTF_8);
                    System.out.println("Fetched initial DB config: " + configJson);
                    updateSqlSessionFactory(configJson);
                }).join(); // Block for initial setup
        } catch (Exception e) {
            // Critical error, should handle properly
            throw new RuntimeException("Failed to fetch initial config from etcd", e);
        }
    }
    
    private void watchDbConfigChanges() {
        Watch.Watcher watcher = etcdClient.getWatchClient().watch(
            ByteSequence.from(DB_CONFIG_KEY, StandardCharsets.UTF_8)
        );
        
        // Run in a background thread
        Thread watchThread = new Thread(() -> {
            System.out.println("Started watching key: " + DB_CONFIG_KEY);
            try {
                while (true) {
                    for (WatchEvent event : watcher.listen().getEvents()) {
                        if (event.getEventType() == WatchEvent.EventType.PUT) {
                            String newConfigJson = event.getKeyValue().getValue().toString(StandardCharsets.UTF_8);
                            System.out.println("Detected DB config change. New config: " + newConfigJson);
                            updateSqlSessionFactory(newConfigJson);
                        }
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println("Watch thread interrupted.");
            } catch (Exception e) {
                System.err.println("Error in etcd watch loop: " + e.getMessage());
                // Implement reconnection logic here
            }
        });
        watchThread.setDaemon(true);
        watchThread.start();
    }

    private synchronized void updateSqlSessionFactory(String configJson) {
        System.out.println("Attempting to update SqlSessionFactory...");
        JSONObject config = new JSONObject(configJson);

        // Use a robust connection pool like HikariCP
        HikariConfig hikariConfig = new HikariConfig();
        hikariConfig.setJdbcUrl(config.getString("jdbcUrl"));
        hikariConfig.setUsername(config.getString("username"));
        hikariConfig.setPassword(config.getString("password"));
        hikariConfig.setMaximumPoolSize(config.getInt("maxPoolSize"));
        hikariConfig.setDriverClassName("com.mysql.cj.jdbc.Driver"); // Example

        HikariDataSource newDataSource = new HikariDataSource(hikariConfig);
        
        // This part is crucial: create a new environment and factory
        org.apache.ibatis.mapping.Environment environment = 
            new org.apache.ibatis.mapping.Environment("production", new JdbcTransactionFactory(), newDataSource);
        
        org.apache.ibatis.session.Configuration mybatisConfig = new org.apache.ibatis.session.Configuration(environment);
        // It's critical to add mappers to the new configuration
        // mybatisConfig.addMapper(YourMapper.class);
        
        SqlSessionFactory newFactory = new SqlSessionFactoryBuilder().build(mybatisConfig);

        SqlSessionFactory oldFactory = sqlSessionFactoryRef.getAndSet(newFactory);

        // Gracefully shut down the old data source and its connection pool
        if (oldFactory != null) {
            HikariDataSource oldDataSource = (HikariDataSource) oldFactory.getConfiguration().getEnvironment().getDataSource();
            System.out.println("Closing old HikariCP data source...");
            oldDataSource.close();
        }
        System.out.println("SqlSessionFactory updated successfully.");
    }
    
    public SqlSessionTemplate getSqlSessionTemplate() {
        // The rest of the application uses this method to get the current session
        return new SqlSessionTemplate(sqlSessionFactoryRef.get());
    }
}
  • 设计要点: 这里的核心挑战在于安全地替换正在使用的 DataSource。我们通过 AtomicReference 来原子性地替换 SqlSessionFactory 实例。每次配置变更时,创建一个全新的 HikariDataSourceSqlSessionFactory,然后替换旧的。最关键的一步是安全地关闭旧的 DataSource 连接池,以释放数据库连接。这是一个生产级实现必须考虑的问题。

2. Python/Ray Worker: 注册能力并监听任务

Ray Worker 启动后,需要向 etcd 注册自己的能力(如是否有 GPU),并监听分配给自己的任务。

# requirements.txt
# ray
# etcd3

import etcd3
import time
import uuid
import json
import threading
import ray

ETCD_HOST = 'etcd-node1'
ETCD_PORT = 2379
SERVICE_PREFIX = '/services/registry/ray-workers/'
TASK_DISPATCH_PREFIX = '/tasks/dispatch/'

@ray.remote(num_gpus=1) # Example for a GPU worker
class GPUTaskActor:
    def __init__(self):
        self.worker_id = f"worker-gpu-{uuid.uuid4()}"
        self.capabilities = {"host": "127.0.0.1", "capabilities": ["gpu", "16gb_ram"]}
        self.etcd = etcd3.client(host=ETCD_HOST, port=ETCD_PORT)
        self.lease = None
        self.shutdown_event = threading.Event()
        
        print(f"[{self.worker_id}] Initializing...")
        self._register_with_lease()
        self._start_watching_tasks()
    
    def _register_with_lease(self):
        """Register the worker with a 10-second lease and start a keep-alive thread."""
        try:
            # Lease expires if not refreshed in 10 seconds
            self.lease = self.etcd.lease(10)
            key = f"{SERVICE_PREFIX}{self.worker_id}"
            value = json.dumps(self.capabilities)
            self.etcd.put(key, value, self.lease)
            
            # Start keep-alive in a background thread
            def keep_alive():
                while not self.shutdown_event.is_set():
                    try:
                        self.lease.refresh()
                        time.sleep(5) # Refresh every 5 seconds
                    except Exception as e:
                        print(f"[{self.worker_id}] Failed to refresh lease: {e}. Exiting.")
                        break
            
            threading.Thread(target=keep_alive, daemon=True).start()
            print(f"[{self.worker_id}] Registered successfully with key: {key}")
        except Exception as e:
            print(f"[{self.worker_id}] FATAL: Could not register with etcd: {e}")
            # Handle failure, maybe retry or exit
            raise

    def _start_watching_tasks(self):
        """Watch for tasks assigned specifically to this worker."""
        def watch_loop():
            watch_key_prefix = f"{TASK_DISPATCH_PREFIX}"
            print(f"[{self.worker_id}] Started watching for tasks on prefix: {watch_key_prefix}")
            
            try:
                events_iterator, cancel = self.etcd.watch_prefix(watch_key_prefix)
                for event in events_iterator:
                    # We are only interested in PUT events
                    if isinstance(event, etcd3.events.PutEvent):
                        try:
                            task_data = json.loads(event.value)
                            if task_data.get('workerId') == self.worker_id:
                                print(f"[{self.worker_id}] Received task: {event.key.decode()}")
                                self.process_task(task_data['payload'])
                                # Acknowledge by deleting the task key
                                self.etcd.delete(event.key)
                        except json.JSONDecodeError:
                            print(f"[{self.worker_id}] Invalid JSON for task {event.key.decode()}")
                        except Exception as proc_e:
                            print(f"[{self.worker_id}] Error processing task: {proc_e}")
            except Exception as e:
                print(f"[{self.worker_id}] Watch loop error: {e}")

        threading.Thread(target=watch_loop, daemon=True).start()

    def process_task(self, payload):
        """The actual work is done here."""
        # This is where the worker would read DB config from etcd,
        # connect to the database via MyBatis service, and run computation.
        print(f"[{self.worker_id}] Processing payload: {payload}. Simulating work...")
        time.sleep(2) # Simulate work
        print(f"[{self.worker_id}] Finished processing payload.")
        
    def stop(self):
        """Graceful shutdown."""
        print(f"[{self.worker_id}] Shutting down...")
        self.shutdown_event.set()
        if self.lease:
            self.lease.revoke()
        self.etcd.close()

# Main execution block
if __name__ == "__main__":
    ray.init()
    actor = GPUTaskActor.remote()
    try:
        # Keep the main thread alive to let the actor run
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        ray.get(actor.stop.remote())
  • 设计要点: 使用租约(Lease)是此处的关键。它确保了当 Worker 进程崩溃或失联时,其在 etcd 中的注册信息会自动被清理,避免了 TypeScript BFF 将任务分发给一个“僵尸”节点。任务监听逻辑通过 watch_prefix 来实现,每个 Worker 只处理 workerId 与自身匹配的任务,这是一种简单有效的任务定向分发机制。

3. TypeScript BFF: 发现服务并动态分发任务

BFF 层接收外部请求,然后查询 etcd 找到合适的 Worker,并将任务信息写入 etcd 中。

// package.json dependencies:
// "express": "^4.18.2",
// "etcd3": "^1.1.2"

import express, { Request, Response } from 'express';
import { Etcd3, IKeyValue } from 'etcd3';

const app = express();
app.use(express.json());

const ETCD_HOSTS = 'etcd-node1:2379';
const WORKER_REGISTRY_PREFIX = '/services/registry/ray-workers/';
const TASK_DISPATCH_PREFIX = '/tasks/dispatch/';

const etcd = new Etcd3({ hosts: ETCD_HOSTS });

interface WorkerInfo {
    id: string;
    host: string;
    capabilities: string[];
}

// A simple in-memory cache for available workers to reduce etcd lookups.
// In a real app, this should be populated and updated via a watch.
let availableWorkers: WorkerInfo[] = [];

// This function should be part of a background process that watches the worker registry
async function refreshWorkerCache() {
    console.log('Refreshing worker cache...');
    try {
        const workersKVs = await etcd.get(WORKER_REGISTRY_PREFIX).prefix();
        const workerList: WorkerInfo[] = [];

        for (const key in workersKVs) {
            const id = key.replace(WORKER_REGISTRY_PREFIX, '');
            try {
                const info = JSON.parse(workersKVs[key]);
                workerList.push({
                    id,
                    host: info.host,
                    capabilities: info.capabilities || [],
                });
            } catch (e) {
                console.error(`Failed to parse worker info for ${id}`, e);
            }
        }
        availableWorkers = workerList;
        console.log('Worker cache updated:', availableWorkers.map(w => w.id));
    } catch (error) {
        console.error('Failed to refresh worker cache from etcd:', error);
    }
}

// A simple scheduler: find a worker with the required capability.
function findCapableWorker(requiredCapability: string): WorkerInfo | null {
    const capableWorkers = availableWorkers.filter(w =>
        w.capabilities.includes(requiredCapability)
    );
    if (capableWorkers.length === 0) {
        return null;
    }
    // Simple round-robin or random selection for load balancing
    return capableWorkers[Math.floor(Math.random() * capableWorkers.length)];
}


app.post('/api/tasks', async (req: Request, res: Response) => {
    const { payload, requires } = req.body; // e.g., requires: "gpu"

    if (!payload || !requires) {
        return res.status(400).send({ error: 'Payload and requires field are mandatory.' });
    }

    const selectedWorker = findCapableWorker(requires);

    if (!selectedWorker) {
        return res.status(503).send({ error: `No available workers with capability: ${requires}` });
    }

    const taskId = `task-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
    const taskDispatchKey = `${TASK_DISPATCH_PREFIX}${taskId}`;
    const taskData = {
        workerId: selectedWorker.id,
        payload: payload,
        submittedAt: new Date().toISOString()
    };

    try {
        // Write the task to etcd for the selected worker to pick up
        await etcd.put(taskDispatchKey).value(JSON.stringify(taskData));
        console.log(`Dispatched task ${taskId} to worker ${selectedWorker.id}`);
        
        // A robust implementation would also watch for this task key to be deleted (acknowledged)
        // and handle timeouts if it's not processed within a certain SLO.
        
        return res.status(202).send({
            message: 'Task accepted and dispatched.',
            taskId: taskId,
            dispatchedTo: selectedWorker.id
        });
    } catch (error) {
        console.error(`Failed to dispatch task ${taskId} to etcd`, error);
        return res.status(500).send({ error: 'Failed to communicate with the control plane.' });
    }
});

const PORT = 3000;
app.listen(PORT, () => {
    console.log(`BFF server running on port ${PORT}`);
    // Periodically refresh the cache. For production, a watch is much better.
    setInterval(refreshWorkerCache, 15000);
    refreshWorkerCache();
});
  • 设计要点: TypeScript BFF 的核心职责是“调度”。它查询 etcd 获取当前可用的 Worker 及其能力,根据任务需求(例如 requires: "gpu")选择一个合适的 Worker,然后将任务信息写入一个该 Worker 正在监听的 etcd Key 下。为了提高性能,代码中实现了一个简单的 Worker 缓存。在生产环境中,这个缓存应该由一个持续的 etcd watch 机制来实时更新,而不是周期性轮询。

架构的扩展性与局限性

这种基于 etcd 的控制平面架构具有很高的扩展性。当需要引入一个新的 Go 服务时,只需为其实现相应的 etcd 注册和发现逻辑即可,现有服务无需任何改动。当需要增加一种新型的 Ray Worker(例如,有 TPU 的),只需让新 Worker 注册自己的 tpu 能力,BFF 的调度逻辑就能自动发现并使用它。

然而,该架构也存在一些局限性和需要注意的边界:

  1. etcd 不是消息总线: 必须严格遵守 etcd 作为控制平面元数据存储的定位。任何大的业务数据、任务载荷(payload)都不应该直接存储在 etcd 中。正确的做法是在 etcd 中传递数据的引用,例如一个对象存储的路径或数据库中的记录ID。etcd 对 value 大小有限制,且大量写入会影响其 Raft 集群的性能。
  2. 对 etcd 的强依赖: 整个系统的稳定性和实时性现在高度依赖 etcd 集群的健康。etcd 集群的部署、监控和灾备变得至关重要。任何到 etcd 的网络抖动都可能导致服务注册失败或配置更新延迟。
  3. 实现复杂性: 虽然概念清晰,但在每个服务中正确实现租约保活、watch 断线重连、数据解析的健壮性等逻辑,需要投入相当的工程精力。这比简单地读一个静态配置文件要复杂得多。
  4. 最终一致性窗口: 尽管 etcd 本身是强一致的,但从一个服务更新 etcd 到另一个服务通过 watch 收到更新之间,存在一个短暂的时间窗口。在设计业务逻辑时,必须考虑到这个短暂的异步延迟。

对于面临跨语言、动态环境、服务能力频繁变化的 Polyrepo 组织而言,构建这样一个控制平面所带来的灵活性和自动化水平,远超过其增加的实现复杂性。


  目录