在 Laravel Monorepo 中利用 AWS SNS 构建类型安全的跨服务事件总线


项目演进到一定阶段,拆分是必然的。我们的 PHP Monorepo 仓库里,原本的单个 Laravel 应用被逐步重构为多个逻辑上独立的 Package,例如 UserServiceOrderServicePaymentService。最初,它们之间的通信依赖于内部的 HTTP API 调用。这种方式简单直接,但在服务数量增多后,问题开始集中爆发:同步调用导致的服务强耦合、链式故障,以及每次新增服务间交互时都需要修改双方代码。

解耦的方案显而易见:引入事件驱动架构(EDA)。但一个真正生产可用的事件系统,远不止是发个消息那么简单。我们需要一个方案,它必须满足以下几个苛刻的条件:

  1. 强类型约束:在一个大型 Monorepo 中,几十个开发者共同协作,必须在代码层面保证事件结构的正确性,避免运行时因为某个字段拼写错误或类型不匹配导致生产故障。
  2. 低接入成本:新服务或新事件的接入应该是标准化的、低心智负担的。开发者应该聚焦于业务逻辑,而不是每次都去处理消息中间件的复杂配置和SDK。
  3. 可观测性与可靠性:消息的发送与消费过程必须有日志、有监控。失败的消息需要有明确的处理策略。
  4. 环境一致性:本地开发、测试环境和生产环境的行为应该尽量一致,同时要避免本地开发严重依赖云资源。

经过一番选型,我们决定围绕 AWS SNS 构建一套内部事件总线。选择 SNS 的原因是它天然的发布-订阅(Pub/Sub)和扇出(Fan-out)能力,一个事件可以被多个下游服务消费,且服务本身是完全托管的,运维成本低。而 Monorepo 的结构,则为我们实现“强类型约束”和“低接入成本”提供了天然的土壤。

整个构建过程的核心,是在 Monorepo 的 packages/ 目录下创建两个基础库:EventContractsEventBus。前者负责定义所有跨服务的事件结构,后者则封装与 AWS SNS 交互的所有细节。

第一步:Monorepo 结构与事件契约定义

我们的 Monorepo 根目录下的 composer.json 通过 repositories 字段来管理内部的 packages,这使得各个服务可以像引用外部依赖一样引用内部库。

// /composer.json
{
    "name": "my-company/monorepo",
    "private": true,
    "repositories": [
        {
            "type": "path",
            "url": "packages/*"
        }
    ],
    "require": {
        "my-company/user-service": "*@dev",
        "my-company/order-service": "*@dev",
        // ... other services
    }
}

目录结构如下:

.
├── packages/
│   ├── EventContracts/  // 共享的事件定义
│   │   ├── src/
│   │   │   └── Events/
│   │   │       └── User/
│   │   │           └── UserCreated.php
│   │   └── composer.json
│   ├── EventBus/        // 事件总线实现
│   │   ├── src/
│   │   └── composer.json
│   └── UserService/     // 用户服务 (事件发布方)
│       ├── src/
│       └── composer.json
└── OrderService/    // 订单服务 (事件消费方)
    ├── src/
    └── composer.json

关键在于 packages/EventContracts。这里是所有跨服务事件的唯一真实来源(Single Source of Truth)。所有事件都被定义为具体的、不可变的 PHP 类。

// /packages/EventContracts/src/Events/User/UserCreated.php

namespace MyCompany\EventContracts\Events\User;

use MyCompany\EventContracts\AbstractDomainEvent;

final class UserCreated extends AbstractDomainEvent
{
    /**
     * @param non-empty-string $userId
     * @param non-empty-string $email
     * @param non-empty-string $name
     * @param \DateTimeImmutable $createdAt
     */
    public function __construct(
        public readonly string $userId,
        public readonly string $email,
        public readonly string $name,
        public readonly \DateTimeImmutable $createdAt,
    ) {}

    /**
     * 定义事件的唯一标识符,用于消息路由
     * @return string
     */
    public static function getEventName(): string
    {
        return 'user.created';
    }

    /**
     * 将事件对象转换为可序列化的数组
     * @return array
     */
    public function toPayload(): array
    {
        return [
            'user_id' => $this->userId,
            'email' => $this->email,
            'name' => $this->name,
            'created_at' => $this->createdAt->format(\DateTimeInterface::RFC3339_EXTENDED),
        ];
    }

    /**
     * 从数组负载重构事件对象
     * @param array $payload
     * @return static
     * @throws \Exception
     */
    public static function fromPayload(array $payload): self
    {
        return new self(
            $payload['user_id'],
            $payload['email'],
            $payload['name'],
            new \DateTimeImmutable($payload['created_at'])
        );
    }
}

// /packages/EventContracts/src/AbstractDomainEvent.php
namespace MyCompany\EventContracts;

abstract class AbstractDomainEvent
{
    abstract public static function getEventName(): string;
    abstract public function toPayload(): array;
    abstract public static function fromPayload(array $payload): self;
}

通过这种方式,任何服务想要发布或消费 UserCreated 事件,都必须依赖 my-company/event-contracts 包。PHP 的类型系统保证了在编译时(或静态分析时)就能发现属性或方法的不匹配,彻底杜绝了因“魔法字符串”或手误导致的生产问题。

第二步:构建 EventBus - 封装发布逻辑

EventBus 包的核心职责是提供一个简单的接口,让任何服务都能发布一个 AbstractDomainEvent 对象,而无需关心背后是 SNS、Kafka 还是 RabbitMQ。

首先,我们定义一个 EventPublisher 接口和它的 SNS 实现。

// /packages/EventBus/src/Interfaces/EventPublisher.php
namespace MyCompany\EventBus\Interfaces;

use MyCompany\EventContracts\AbstractDomainEvent;

interface EventPublisher
{
    public function publish(AbstractDomainEvent $event): void;
}
// /packages/EventBus/src/Publishers/SnsEventPublisher.php
namespace MyCompany\EventBus\Publishers;

use Aws\Sns\SnsClient;
use MyCompany\EventContracts\AbstractDomainEvent;
use MyCompany\EventBus\Interfaces\EventPublisher;
use Psr\Log\LoggerInterface;
use Throwable;

class SnsEventPublisher implements EventPublisher
{
    public function __construct(
        private readonly SnsClient $snsClient,
        private readonly LoggerInterface $logger,
        private readonly string $topicArn // SNS Topic ARN 从配置注入
    ) {}

    public function publish(AbstractDomainEvent $event): void
    {
        try {
            $messageBody = json_encode($event->toPayload(), JSON_THROW_ON_ERROR);

            $this->snsClient->publish([
                'TopicArn' => $this->topicArn,
                'Message' => $messageBody,
                'MessageAttributes' => [
                    'EventType' => [
                        'DataType' => 'String',
                        'StringValue' => $event::getEventName(),
                    ],
                ],
            ]);

            $this->logger->info('Event published successfully.', [
                'event_type' => $event::getEventName(),
                'topic_arn' => $this->topicArn,
            ]);

        } catch (Throwable $e) {
            // 在真实项目中,这里应该有更健壮的错误处理
            // 例如:推送到死信队列、触发告警
            $this->logger->error('Failed to publish event to SNS.', [
                'event_type' => $event::getEventName(),
                'exception' => $e->getMessage(),
                'trace' => $e->getTraceAsString(),
            ]);

            // 决定是否抛出异常,这取决于业务是否允许消息丢失
            // throw new EventPublishingFailedException('...', 0, $e);
        }
    }
}

这里的关键点在于 MessageAttributes。我们将事件名 user.created 放入消息属性中。这对于消费方至关重要,因为它允许消费方(比如 SQS 订阅)设置过滤策略,只接收自己感兴趣的事件类型,避免不必要的网络流量和计算开销。

为了让这个实现无缝集成到 Laravel 应用中,我们创建了一个 ServiceProvider

// /packages/EventBus/src/EventBusServiceProvider.php
namespace MyCompany\EventBus;

use Aws\Sns\SnsClient;
use Illuminate\Contracts\Foundation\Application;
use Illuminate\Support\ServiceProvider;
use MyCompany\EventBus\Interfaces\EventPublisher;
use MyCompany\EventBus\Publishers\SnsEventPublisher;
use Psr\Log\LoggerInterface;

class EventBusServiceProvider extends ServiceProvider
{
    public function register(): void
    {
        $this->mergeConfigFrom(__DIR__.'/../config/event-bus.php', 'event-bus');

        $this->app->singleton(EventPublisher::class, function (Application $app) {
            $config = $app['config']['event-bus.sns'];

            $snsClient = new SnsClient([
                'version' => 'latest',
                'region' => $config['region'],
                'credentials' => [
                    'key' => $config['key'],
                    'secret' => $config['secret'],
                ],
                // 在生产环境中,推荐使用 IAM Role,而不是硬编码 key/secret
            ]);

            return new SnsEventPublisher(
                $snsClient,
                $app->make(LoggerInterface::class),
                $config['topic_arn']
            );
        });
    }

    public function boot(): void
    {
        $this->publishes([
            __DIR__.'/../config/event-bus.php' => config_path('event-bus.php'),
        ], 'config');
    }
}

现在,在 UserService 中发布一个事件就变得非常简单。首先,UserServicecomposer.json 需要依赖这两个内部包:

// /packages/UserService/composer.json
{
    "name": "my-company/user-service",
    "require": {
        "my-company/event-contracts": "*@dev",
        "my-company/event-bus": "*@dev"
    }
}

然后在业务代码中注入 EventPublisher 接口并使用它:

// /packages/UserService/src/Actions/CreateUserAction.php

use MyCompany\EventBus\Interfaces\EventPublisher;
use MyCompany\EventContracts\Events\User\UserCreated;

class CreateUserAction
{
    public function __construct(private readonly EventPublisher $publisher) {}

    public function execute(string $email, string $name): User
    {
        // ... 创建用户的数据库逻辑 ...
        $user = User::create([...]);

        // 发布领域事件
        $event = new UserCreated(
            $user->id,
            $user->email,
            $user->name,
            new \DateTimeImmutable()
        );
        $this->publisher->publish($event);

        return $user;
    }
}

第三步:消费事件 - 安全的 Webhook 与事件路由

事件消费是整个链条中最复杂也最容易出错的一环。SNS 支持将消息推送到 HTTPS 端点。我们需要在消费方服务(如 OrderService)中创建一个统一的、安全的 Webhook 来接收所有来自 SNS 的通知。

这个 Webhook 必须处理两件核心任务:

  1. 验证请求来源:确保请求确实来自 AWS SNS,而不是伪造的恶意请求。
  2. 消息路由:根据消息内容,将其分发给正确的业务逻辑处理器。

为此,我们在 EventBus 包中构建一个可复用的 SnsWebhookController

// /packages/EventBus/src/Http/Controllers/SnsWebhookController.php
namespace MyCompany\EventBus\Http\Controllers;

use Illuminate\Http\Request;
use Illuminate\Http\Response;
use Illuminate\Routing\Controller;
use Illuminate\Support\Facades\Log;
use GuzzleHttp\Client;
use MyCompany\EventBus\Sns\Message;
use MyCompany\EventBus\Sns\MessageValidator;
use MyCompany\EventBus\Sns\Exceptions\InvalidMessageSignatureException;
use MyCompany\EventBus\Routing\EventRouter;

class SnsWebhookController extends Controller
{
    public function __construct(
        private readonly MessageValidator $validator,
        private readonly EventRouter $router
    ) {}

    public function handle(Request $request): Response
    {
        try {
            $message = Message::fromRequest($request);
            $this->validator->validate($message);
        } catch (InvalidMessageSignatureException $e) {
            Log::warning('Invalid SNS message signature received.', ['error' => $e->getMessage()]);
            return response('Invalid signature.', 400);
        } catch (\Throwable $e) {
            Log::error('Failed to parse SNS message.', ['error' => $e->getMessage()]);
            return response('Invalid message format.', 400);
        }

        // 处理订阅确认
        if ($message->isSubscriptionConfirmation()) {
            return $this->confirmSubscription($message);
        }

        // 处理通知消息
        if ($message->isNotification()) {
            try {
                $this->router->route($message);
                return response('OK', 200);
            } catch (\Throwable $e) {
                // 如果路由或处理失败,返回服务器错误
                // SNS会根据Topic的重试策略进行重试
                Log::error('Failed to process SNS notification.', [
                    'message_id' => $message->getId(),
                    'error' => $e->getMessage(),
                ]);
                return response('Processing failed.', 500);
            }
        }

        Log::info('Received unhandled SNS message type.', ['type' => $message->getType()]);
        return response('Message type not handled.', 200);
    }

    private function confirmSubscription(Message $message): Response
    {
        try {
            // 必须访问 SubscribeURL 来确认订阅
            (new Client())->get($message->getSubscriptionUrl());
            Log::info('SNS subscription confirmed.', ['topic' => $message->getTopicArn()]);
            return response('Subscription confirmed.', 200);
        } catch (\Throwable $e) {
            Log::error('Failed to confirm SNS subscription.', [
                'url' => $message->getSubscriptionUrl(),
                'error' => $e->getMessage(),
            ]);
            return response('Could not confirm subscription.', 500);
        }
    }
}

签名验证是安全性的基石,绝不能省略。MessageValidator 的实现细节较为繁琐,它需要下载 AWS 提供的公钥证书,并验证消息签名的有效性。这是一个常见的模式,但代码必须严谨。

sequenceDiagram
    participant AWS_SNS
    participant OrderService_Webhook
    participant MessageValidator
    participant EventRouter
    participant UserCreatedListener

    AWS_SNS ->> OrderService_Webhook: POST /sns-webhook (Notification)
    OrderService_Webhook ->> MessageValidator: validate(message)
    MessageValidator ->> AWS_SNS: Download public key certificate (cached)
    MessageValidator -->> OrderService_Webhook: Signature is valid
    OrderService_Webhook ->> EventRouter: route(message)
    EventRouter ->> EventRouter: Get 'EventType' attribute ("user.created")
    EventRouter ->> EventRouter: Find listener for "user.created"
    EventRouter ->> EventRouter: Deserialize message body into UserCreated object
    EventRouter ->> UserCreatedListener: handle(UserCreated event)
    UserCreatedListener -->> EventRouter: Logic executed successfully
    EventRouter -->> OrderService_Webhook: OK
    OrderService_Webhook -->> AWS_SNS: HTTP 200 OK

EventRouter 的作用是将 SNS 消息体转换为我们定义的强类型 AbstractDomainEvent 对象,然后分发给 Laravel 的事件系统。

// /packages/EventBus/src/Routing/EventRouter.php
namespace MyCompany\EventBus\Routing;

use Illuminate\Contracts\Events\Dispatcher;
use MyCompany\EventBus\Sns\Message;
use MyCompany\EventContracts\AbstractDomainEvent;

class EventRouter
{
    /** @var array<string, class-string<AbstractDomainEvent>> */
    private array $eventMap;

    public function __construct(
        private readonly Dispatcher $laravelDispatcher,
        array $eventMap
    ) {
        $this->eventMap = $eventMap;
    }

    public function route(Message $message): void
    {
        $eventType = $message->getEventType();
        if (!isset($this->eventMap[$eventType])) {
            // 在真实项目中,这可能是一个需要监控的指标
            // 表明有未知的事件类型被发送
            \Illuminate\Support\Facades\Log::warning('Unknown event type received.', ['type' => $eventType]);
            return;
        }

        $eventClass = $this->eventMap[$eventType];
        $payload = $message->getBody();

        /** @var AbstractDomainEvent $event */
        $event = $eventClass::fromPayload($payload);
        
        // 分发给 Laravel 的本地事件监听器
        $this->laravelDispatcher->dispatch($event);
    }
}

这个 eventMap 配置是连接外部消息和内部代码的桥梁。它在消费方服务的 ServiceProvider 中进行配置。

现在,在 OrderService 中,我们需要做以下几件事:

  1. 依赖 event-contractsevent-bus
  2. routes/api.php 中注册 webhook 路由。
  3. 创建一个 Laravel Listener 来处理 UserCreated 事件。
  4. 配置 EventRoutereventMap
// /packages/OrderService/src/Listeners/CreateCustomerProfileOnUserCreation.php
namespace MyCompany\OrderService\Listeners;

use MyCompany\EventContracts\Events\User\UserCreated;
use Illuminate\Contracts\Queue\ShouldQueue; // 实现 ShouldQueue 使监听器异步执行

class CreateCustomerProfileOnUserCreation implements ShouldQueue
{
    public function handle(UserCreated $event): void
    {
        // 这里是具体的业务逻辑
        // 例如,在订单服务的数据库中创建一个客户记录
        \Illuminate\Support\Facades\Log::info('Handling UserCreated event in OrderService', [
            'user_id' => $event->userId,
            'email' => $event->email,
        ]);
        
        // Customer::create([...]);
    }
}

// /packages/OrderService/src/Providers/EventServiceProvider.php
namespace MyCompany\OrderService\Providers;

use Illuminate\Foundation\Support\Providers\EventServiceProvider as ServiceProvider;
use MyCompany\EventContracts\Events\User\UserCreated;
use MyCompany\OrderService\Listeners\CreateCustomerProfileOnUserCreation;

class EventServiceProvider extends ServiceProvider
{
    protected $listen = [
        UserCreated::class => [
            CreateCustomerProfileOnUserCreation::class,
        ],
    ];
}

// /packages/OrderService/src/Providers/AppServiceProvider.php
...
use MyCompany\EventBus\Routing\EventRouter;

public function register()
{
    $this->app->singleton(EventRouter::class, function ($app) {
        return new EventRouter(
            $app['events'],
            config('event-bus.event_map')
        );
    });
}

config/event-bus.php 配置文件中则包含了事件名到类的映射:

// /packages/OrderService/config/event-bus.php
return [
    'event_map' => [
        'user.created' => \MyCompany\EventContracts\Events\User\UserCreated::class,
        // ... 其他事件映射
    ],
];

至此,整个链路就通了。当 UserService 创建一个用户时,它会发布一个 UserCreated 事件。SNS 接收到这个事件后,会通过 HTTPS 请求调用 OrderService 的 webhook。Webhook 控制器验证签名,将消息交给 EventRouter,后者反序列化消息体为 UserCreated 对象,并最终触发 CreateCustomerProfileOnUserCreation 监听器的执行。

测试策略思考

一个没有测试的系统是脆弱的。对于这套事件总线,我们的测试分为几个层面:

  • 单元测试CreateCustomerProfileOnUserCreation 监听器可以被独立测试,只需手动创建一个 UserCreated 事件对象作为输入即可。
  • **集成测试 (Publisher)**:在 UserService 的测试中,我们可以使用 Laravel 的 Event::fake() 来断言事件是否被发布。或者,更进一步,我们可以 mock SnsEventPublisher,并断言 publish 方法是否被以正确的参数调用。
  • **集成测试 (Consumer)**:SnsWebhookController 的测试是关键。我们可以构造一个模拟的 SNS 请求体,包括合法的签名和消息头,然后 POST 到测试路由,断言最终对应的 Listener 是否被执行。对于签名验证部分,可以编写专门的测试用例,使用一个固定的私钥生成签名,然后用公钥去验证,覆盖成功和失败的场景。

当前方案的局限性与未来迭代方向

这套方案解决了我们最初的几个核心痛点,但在生产环境中,它依然存在一些可以优化的地方。

首先,Webhook 的同步处理模式是它的阿喀琉斯之踵。如果 OrderService 的监听器执行时间过长,或者突发事件量增大,很容易导致 webhook 端点响应超时。SNS 会根据策略重试,但这可能引发更严重的服务雪崩。一个更健壮的架构是采用“SNS-to-SQS”模式。每个消费者服务都拥有一个自己的 SQS 队列,该队列订阅 SNS 主题。服务不再通过 HTTP Webhook 接收事件,而是通过一个常驻的 Laravel Queue Worker 从 SQS 队列中拉取消息。这种方式将事件的接收和处理彻底异步化,利用了 SQS 的持久化、重试和死信队列能力,极大提升了系统的韧性。

其次,本地开发体验有待提升。当前方案要求开发者在本地也配置 AWS 的凭证,并且需要一个公网可访问的地址来接收 SNS 的 webhook(通常通过 ngrok 等工具实现)。这增加了本地环境的搭建复杂度。未来的迭代可以为 EventBus 包增加一个 logsync 驱动。在本地开发时,可以将 EventPublisher 的实现切换为直接将事件写入日志文件,或者同步调用对应的 Listener,从而完全脱离对 AWS 和网络穿透工具的依赖。

最后,随着事件种类的增多,eventMap 的维护会成为一个新的认知负担。可以探索基于 PHP 的注解或属性(Attribute)进行自动注册,当一个 Listener 监听一个 AbstractDomainEvent 时,自动将其 getEventName() 和类名的映射关系注册到 EventRouter 中,减少手动配置。


  目录