构建基于 Presto 查询的 Kong 动态路由插件并集成 SkyWalking 全链路追踪


静态的路由规则已经无法满足我们日益复杂的金丝雀发布和 A/B 测试需求。业务方希望能够依据存储在数据湖中的用户画像数据(例如用户等级、所在城市、历史订单行为),实时地将流量动态切分到不同的上游服务版本。传统的基于 Header 或权重的路由方式,其决策信息源有限,而我们的核心决策数据全部沉淀在由 Presto (Trino) 统一查询的数仓中。这就带来了一个直接的技术挑战:如何在纳秒级响应的 API 网关层,去查询一个分钟级响应的大数据查询引擎,并以此作为动态路由的依据。

最初的构想是在 Kong 网关前置一个独立的路由决策服务,但这会引入额外的网络跳数和单点故障风险。最理想的方案是将决策逻辑内聚到 Kong 自身。Kong 的插件机制提供了这种可能。我们决定使用 Go 语言开发一个自定义插件,因为它在网络编程和并发处理上相比 Lua 更具优势,且拥有更成熟的生态来对接 Presto 和 SkyWalking。

项目的核心目标是创建一个名为 presto-router 的插件,它会在 access 阶段拦截请求,根据配置的模板从请求中(如 JWT Claim)提取关键信息,构造 Presto SQL 查询,获取目标上游地址,最后将请求动态代理到该地址。整个过程必须是高性能且完全可观测的。

第一阶段:插件骨架与 Presto 直连

我们首先搭建插件的基础结构。一个 Kong Go 插件需要实现几个核心方法,其中 Access 是我们逻辑的入口点。配置结构 PrestoRouterConfig 定义了插件所需的所有参数,包括 Presto 连接信息和查询模板。

// ./kong/plugins/presto-router/main.go
package main

import (
	"context"
	"database/sql"
	"fmt"
	"log"
	"strings"
	"text/template"

	"github.com/Kong/go-pdk"
	"github.com/prestodb/presto-go-client/presto"
	// 导入其他必要的包
)

// Config 定义了插件的配置项
type PrestoRouterConfig struct {
	PrestoHost       string `json:"presto_host"`
	PrestoUser       string `json:"presto_user"`
	PrestoCatalog    string `json:"presto_catalog"`
	PrestoSchema     string `json:"presto_schema"`
	QueryTemplate    string `json:"query_template"`
	TargetKeyField   string `json:"target_key_field"` // 从请求中提取决策因子的字段,例如 "jwt.user_id"
	CacheTTL         int    `json:"cache_ttl_seconds"`
	RequestTimeout   int    `json:"request_timeout_ms"`
}

// New 插件初始化函数
func New() interface{} {
	return &PrestoRouterConfig{}
}

// 全局 Presto 数据库连接实例
var db *sql.DB

// Access 插件的核心逻辑
func (conf PrestoRouterConfig) Access(kong *pdk.PDK) {
	// 首次调用时初始化数据库连接
	if db == nil {
		dsn := fmt.Sprintf("http://%s@%s?catalog=%s&schema=%s",
			conf.PrestoUser, conf.PrestoHost, conf.PrestoCatalog, conf.PrestoSchema)
		
		var err error
		db, err = sql.Open("presto", dsn)
		if err != nil {
			kong.Log.Err("Failed to connect to Presto: ", err.Error())
			kong.Response.Exit(500, []byte("Internal Server Error: Presto connection failed"), nil)
			return
		}
		// 在真实项目中,连接池参数需要精细化配置
		db.SetMaxOpenConns(20)
		db.SetMaxIdleConns(5)
	}

	// 1. 从请求中提取决策因子
	key, err := extractKeyFromRequest(kong, conf.TargetKeyField)
	if err != nil {
		kong.Log.Err("Failed to extract key: ", err.Error())
		kong.Response.Exit(400, []byte("Bad Request: Missing routing key"), nil)
		return
	}
	
	// 2. 构造 SQL 查询
	sql, err := buildSQL(conf.QueryTemplate, key)
	if err != nil {
		kong.Log.Err("Failed to build SQL query: ", err.Error())
		kong.Response.Exit(500, []byte("Internal Server Error: SQL template error"), nil)
		return
	}

	// 3. 执行查询
	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(conf.RequestTimeout)*time.Millisecond)
	defer cancel()

	var targetUpstream string
	err = db.QueryRowContext(ctx, sql).Scan(&targetUpstream)
	if err != nil {
		// 如果查询无结果或出错,可能需要一个 fallback 机制,这里简化为报错
		if err == sql.ErrNoRows {
			kong.Log.Warn("No route found for key: ", key)
			kong.Response.Exit(404, []byte("Not Found: No route target determined"), nil)
			return
		}
		kong.Log.Err("Presto query failed: ", err.Error())
		kong.Response.Exit(503, []byte("Service Unavailable: Route decision failed"), nil)
		return
	}
	
	// 4. 设置动态上游
	host, port, err := parseHostPort(targetUpstream)
	if err != nil {
		kong.Log.Err("Invalid upstream format from Presto: ", err.Error())
		kong.Response.Exit(500, []byte("Internal Server Error: Invalid upstream format"), nil)
		return
	}
	
	kong.Service.SetTarget(host, port)
}


// extractKeyFromRequest 从请求的不同位置(header, jwt)提取决策因子
// 这是一个简化的实现
func extractKeyFromRequest(kong *pdk.PDK, field string) (string, error) {
	parts := strings.Split(field, ".")
	if len(parts) != 2 {
		return "", fmt.Errorf("invalid target_key_field format: %s", field)
	}

	source, key := parts[0], parts[1]
	switch source {
	case "header":
		return kong.Request.GetHeader(key)
	// 在真实项目中,需要引入JWT库来解析
	// case "jwt":
	// 	 ...
	default:
		return "", fmt.Errorf("unsupported key source: %s", source)
	}
}

// buildSQL 使用模板和 key 构建最终的 SQL
func buildSQL(queryTemplate, key string) (string, error) {
	tmpl, err := template.New("sql").Parse(queryTemplate)
	if err != nil {
		return "", err
	}
	var buf strings.Builder
	// 这里的 key 必须经过严格的清理,防止SQL注入,
	// 更好的方式是使用参数化查询,但Presto Go客户端对此支持不完善。
	// 在此场景下,我们假设 key 是内部可信的ID。
	data := map[string]string{"Key": key}
	err = tmpl.Execute(&buf, data)
	return buf.String(), err
}

// ... 其他辅助函数如 parseHostPort

这个初版实现了核心流程,但存在一个致命问题:每次请求都会同步查询 Presto。Presto 的查询通常是秒级甚至分钟级的,这会直接导致网关的延迟飙升到不可接受的程度。生产环境绝不能如此设计。

第二阶段:引入缓存与策略模式

为了解决性能瓶颈,引入缓存是必然选择。我们使用 go-cache 作为本地内存缓存。每次查询前先检查缓存,缓存命中则直接返回,否则再查询 Presto 并将结果写入缓存。

同时,我们注意到 extractKeyFromRequest 的逻辑可能会变得复杂。业务可能要求从 JWT Claim、请求体或 Header 中提取 key。这里是策略模式(Strategy Pattern)的典型应用场景。我们定义一个 KeyExtractor 接口,并为每种提取方式实现一个具体的策略。

// ./kong/plugins/presto-router/key_extractor.go
package main

import (
	"fmt"
	"strings"
	"github.com/Kong/go-pdk"
)

// KeyExtractor 定义了从请求中提取决策因子的策略接口
type KeyExtractor interface {
	Extract(kong *pdk.PDK) (string, error)
}

// NewKeyExtractor 是一个工厂函数,根据配置创建具体的策略实例
func NewKeyExtractor(configField string) (KeyExtractor, error) {
	parts := strings.Split(configField, ".")
	if len(parts) != 2 {
		return nil, fmt.Errorf("invalid target_key_field format: %s", configField)
	}
	source, key := parts[0], parts[1]
	
	switch source {
	case "header":
		return &HeaderExtractor{HeaderName: key}, nil
	// case "jwt":
	// 	return &JwtExtractor{ClaimName: key}, nil
	default:
		return nil, fmt.Errorf("unsupported key source: %s", source)
	}
}

// HeaderExtractor 从请求头提取 key
type HeaderExtractor struct {
	HeaderName string
}

func (e *HeaderExtractor) Extract(kong *pdk.PDK) (string, error) {
	return kong.Request.GetHeader(e.HeaderName)
}

// JwtExtractor 从JWT Claim中提取 key (示例结构)
// type JwtExtractor struct {
// 	ClaimName string
// }
// func (e *JwtExtractor) Extract(kong *pdk.PDK) (string, error) {
// 	// ... JWT解析逻辑 ...
// }

修改主逻辑,集成缓存和策略模式:

// ./kong/plugins/presto-router/main.go (修改后)
import (
	// ...
	"github.com/patrickmn/go-cache"
	"time"
)

// ... Config 结构体不变 ...

var (
	db    *sql.DB
	c     *cache.Cache
	extractor KeyExtractor
)


func (conf PrestoRouterConfig) Access(kong *pdk.PDK) {
	// 仅在首次请求时进行初始化
	once.Do(func() {
		initPluginState(conf, kong)
	})

	if db == nil || extractor == nil {
		// 初始化失败,直接退出
		kong.Response.Exit(500, []byte("Internal Server Error: Plugin not initialized"), nil)
		return
	}

	// 1. 使用策略模式提取 Key
	key, err := extractor.Extract(kong)
	if err != nil {
		kong.Log.Err("Failed to extract key: ", err.Error())
		kong.Response.Exit(400, []byte("Bad Request: Missing routing key"), nil)
		return
	}
	
	// 2. 检查缓存
	if upstream, found := c.Get(key); found {
		host, port, _ := parseHostPort(upstream.(string))
		kong.Service.SetTarget(host, port)
		// 在 header 中注入一个标志,方便调试
		kong.Response.SetHeader("X-Routing-Source", "cache")
		return
	}

	// 3. 缓存未命中,查询 Presto
	// ... (buildSQL 和 db.QueryRowContext 逻辑不变) ...
	
	// 查询成功后,将结果写入缓存
	c.Set(key, targetUpstream, cache.Default)
	kong.Response.SetHeader("X-Routing-Source", "presto")

	// ... (设置动态上游的逻辑不变) ...
}


var once sync.Once

func initPluginState(conf PrestoRouterConfig, kong *pdk.PDK) {
	dsn := fmt.Sprintf("http://%s@%s?catalog=%s&schema=%s",
		conf.PrestoUser, conf.PrestoHost, conf.PrestoCatalog, conf.PrestoSchema)
	
	var err error
	db, err = sql.Open("presto", dsn)
	if err != nil {
		kong.Log.Err("Failed to connect to Presto: ", err.Error())
		return
	}
	db.SetMaxOpenConns(20)
	db.SetMaxIdleConns(5)
	
	c = cache.New(time.Duration(conf.CacheTTL)*time.Second, 10*time.Minute)
	
	extractor, err = NewKeyExtractor(conf.TargetKeyField)
	if err != nil {
		kong.Log.Err("Failed to create key extractor: ", err.Error())
		return
	}
}

通过引入本地缓存和策略模式,我们解决了性能问题,并让插件的扩展性变得更好。但现在我们面临一个新的问题:当路由决策失败或 Presto 查询变慢时,我们如何快速定位问题?整个决策链路是一个黑盒。

第三阶段:集成 SkyWalking 实现全链路可观测

为了打开这个黑盒,我们引入 SkyWalking。通过 go2sky 客户端,我们可以在插件内部创建 Span,将路由决策过程完整地串联到全链路追踪中。这里的关键是,我们不仅要追踪插件本身的执行,更要为对 Presto 的查询创建一个专门的 ExitSpan,这样在 SkyWalking 的 UI 上就能清晰地看到这次 DB 查询的耗时、SQL 语句等关键信息。

这里可以用装饰器模式(Decorator Pattern)来无侵入地为 Presto 查询增加追踪能力。我们创建一个 TracingPrestoClient 来包装原有的 *sql.DB

// ./kong/plugins/presto-router/tracing.go
package main

import (
	"context"
	"database/sql"
	"fmt"
	"time"

	"github.com/SkyAPM/go2sky"
	"github.com/SkyAPM/go2sky/reporter"
	v3 "github.com/SkyAPM/go2sky/reporter/grpc/native"
)

// 全局 Tracer
var tracer *go2sky.Tracer

// InitTracer 初始化 SkyWalking Tracer
func InitTracer(serviceName, oapServer string) error {
	r, err := reporter.NewGRPCReporter(oapServer)
	if err != nil {
		return fmt.Errorf("create skywalking reporter error: %w", err)
	}

	tracer, err = go2sky.NewTracer(serviceName, go2sky.WithReporter(r))
	if err != nil {
		return fmt.Errorf("create skywalking tracer error: %w", err)
	}
	return nil
}

// TracingPrestoClient 是一个装饰器,为 Presto 查询添加追踪能力
type TracingPrestoClient struct {
	db *sql.DB
	prestoHost string
}

// QueryRowContextWithTracing 执行带追踪的查询
func (c *TracingPrestoClient) QueryRowContextWithTracing(ctx context.Context, query string) (*sql.Row, error) {
	// 从上下文中提取父 Span,如果没有,则创建一个新的 EntrySpan
	span, ctx, err := tracer.CreateExitSpan(ctx, "Presto/Query", c.prestoHost, func(header string) error {
		// Presto Go Client 不支持注入 header, 这里是一个 conceptual point
		return nil
	})
	if err != nil {
		return c.db.QueryRowContext(ctx, query), err
	}
	defer span.End()

	span.SetComponent(20) // Component ID for PrestoDB
	span.SetSpanLayer(v3.SpanLayer_Database)
	span.Tag(go2sky.TagDBStatement, query)
	span.Tag(go2sky.TagDBType, "Presto")

	row := c.db.QueryRowContext(ctx, query)
	if err := row.Err(); err != nil {
		span.Error(time.Now(), err.Error())
	}
	
	return row, nil
}

现在,我们将追踪逻辑集成到主流程中:

// ./kong/plugins/presto-router/main.go (最终版)

// ...

// 定义一个新的全局变量
var tracingClient *TracingPrestoClient

func (conf PrestoRouterConfig) Access(kong *pdk.PDK) {
	// 初始化逻辑中增加 Tracer 和 TracingClient 的初始化
	once.Do(func() {
		// ... 其他初始化 ...
		err := InitTracer("kong-presto-router", conf.SkywalkingOAP) // 假设配置中有 SkywalkingOAP
		if err != nil {
			kong.Log.Err("Failed to init SkyWalking tracer: ", err.Error())
		}
		tracingClient = &TracingPrestoClient{db: db, prestoHost: conf.PrestoHost}
	})
	
	// 从请求头中提取 SkyWalking 的上下文
	carrier, _ := kong.Request.GetHeader("sw8")
	span, ctx, err := tracer.CreateEntrySpan(context.Background(), "/kong/presto-router", func() (string, error) {
		return carrier, nil
	})
	if err != nil {
		kong.Log.Warn("Failed to create entry span: ", err.Error())
	} else {
		defer span.End()
		span.SetComponent(3001) // Component ID for Kong
		span.SetSpanLayer(v3.SpanLayer_Http)
	}

	// ... 缓存检查逻辑不变 ...

	// 缓存未命中,使用带追踪的客户端查询 Presto
	row, err := tracingClient.QueryRowContextWithTracing(ctx, sql)
	if err != nil {
		kong.Log.Err("Presto query wrapper error: ", err.Error())
		// ... 错误处理 ...
		return
	}
	
	var targetUpstream string
	err = row.Scan(&targetUpstream)

	// ... 后续逻辑 ...
}

部署与配置

要让这个 Go 插件运行,需要构建一个自定义的 Kong Docker 镜像。

Dockerfile:

FROM kong:3.4

USER root

# 设置 Go 环境变量
ENV GO111MODULE=on
ENV GOPROXY=https://goproxy.cn,direct

# 安装 Go
RUN apt-get update && apt-get install -y --no-install-recommends \
    golang \
    && rm -rf /var/lib/apt/lists/*

# 将插件源码复制到镜像中
WORKDIR /usr/local/kong/go-plugins/presto-router
COPY . .

# 构建 Go 插件
RUN go mod tidy && go build -buildmode=plugin -o presto-router.so .

# 切换回 Kong 用户
USER kong

kong.conf:

...
go_plugin_dir = /usr/local/kong/go-plugins/presto-router
go_plugins_load = presto-router
...

在 Service 或 Route 上启用插件:

services:
  - name: my-dynamic-service
    url: http://default-upstream-service # 一个默认或备用上游
    plugins:
      - name: presto-router
        config:
          presto_host: "trino.data.svc:8080"
          presto_user: "service-account"
          presto_catalog: "hive"
          presto_schema: "user_profiles"
          query_template: "SELECT target_host FROM user_routing_table WHERE user_id = '{{.Key}}' LIMIT 1"
          target_key_field: "header.X-User-ID"
          cache_ttl_seconds: 300
          request_timeout_ms: 500
          skywalking_oap: "skywalking-oap.observe.svc:11800"

通过这套架构,我们最终实现了一个健壮、高性能且完全可观测的动态路由决策中心。

sequenceDiagram
    participant Client
    participant Kong
    participant PrestoRouterPlugin
    participant LocalCache
    participant Presto
    participant UpstreamA
    participant UpstreamB
    participant SkyWalking

    Client->>+Kong: Request (Header: X-User-ID=123)
    Kong->>+PrestoRouterPlugin: Access Phase
    Note over PrestoRouterPlugin: SkyWalking EntrySpan Start
    PrestoRouterPlugin->>LocalCache: Get("123")
    LocalCache-->>PrestoRouterPlugin: Not Found
    
    Note over PrestoRouterPlugin: SkyWalking ExitSpan for Presto Start
    PrestoRouterPlugin->>+Presto: SELECT target_host...WHERE user_id='123'
    Presto-->>-PrestoRouterPlugin: "service-a.prod:8080"
    Note over PrestoRouterPlugin: SkyWalking ExitSpan End
    
    PrestoRouterPlugin->>LocalCache: Set("123", "service-a.prod:8080")
    PrestoRouterPlugin-->>-Kong: Set Target to service-a.prod:8080
    
    Kong->>+UpstreamA: Proxy Request
    UpstreamA-->>-Kong: Response
    Note over PrestoRouterPlugin: SkyWalking EntrySpan End
    Kong-->>-Client: Response

    %% Subsequent Request %%
    Client->>+Kong: Request (Header: X-User-ID=123)
    Kong->>+PrestoRouterPlugin: Access Phase
    PrestoRouterPlugin->>LocalCache: Get("123")
    LocalCache-->>PrestoRouterPlugin: Found "service-a.prod:8080"
    PrestoRouterPlugin-->>-Kong: Set Target to service-a.prod:8080
    Kong->>+UpstreamA: Proxy Request
    UpstreamA-->>-Kong: Response
    Kong-->>-Client: Response

这个方案的局限性在于,本地缓存是 per-pod 的,在 Kong 节点扩缩容或重启时会丢失,且存在短暂的数据不一致性。未来的优化方向可以考虑引入一个集中的、高可用的缓存层,如 Redis,但这会增加系统的复杂度和延迟。此外,Presto 查询模板目前较为简单,对于更复杂的决策逻辑,可能需要设计一套更灵活的规则引擎。当前的实现依赖于 Presto 的稳定性和性能,一个健壮的熔断和快速失败机制,以及一个可靠的 fallback 路由策略,是生产环境不可或缺的加固措施。


  目录