项目演进到一定阶段,拆分是必然的。我们的 PHP Monorepo 仓库里,原本的单个 Laravel 应用被逐步重构为多个逻辑上独立的 Package,例如 UserService、OrderService 和 PaymentService。最初,它们之间的通信依赖于内部的 HTTP API 调用。这种方式简单直接,但在服务数量增多后,问题开始集中爆发:同步调用导致的服务强耦合、链式故障,以及每次新增服务间交互时都需要修改双方代码。
解耦的方案显而易见:引入事件驱动架构(EDA)。但一个真正生产可用的事件系统,远不止是发个消息那么简单。我们需要一个方案,它必须满足以下几个苛刻的条件:
- 强类型约束:在一个大型 Monorepo 中,几十个开发者共同协作,必须在代码层面保证事件结构的正确性,避免运行时因为某个字段拼写错误或类型不匹配导致生产故障。
- 低接入成本:新服务或新事件的接入应该是标准化的、低心智负担的。开发者应该聚焦于业务逻辑,而不是每次都去处理消息中间件的复杂配置和SDK。
- 可观测性与可靠性:消息的发送与消费过程必须有日志、有监控。失败的消息需要有明确的处理策略。
- 环境一致性:本地开发、测试环境和生产环境的行为应该尽量一致,同时要避免本地开发严重依赖云资源。
经过一番选型,我们决定围绕 AWS SNS 构建一套内部事件总线。选择 SNS 的原因是它天然的发布-订阅(Pub/Sub)和扇出(Fan-out)能力,一个事件可以被多个下游服务消费,且服务本身是完全托管的,运维成本低。而 Monorepo 的结构,则为我们实现“强类型约束”和“低接入成本”提供了天然的土壤。
整个构建过程的核心,是在 Monorepo 的 packages/ 目录下创建两个基础库:EventContracts 和 EventBus。前者负责定义所有跨服务的事件结构,后者则封装与 AWS SNS 交互的所有细节。
第一步:Monorepo 结构与事件契约定义
我们的 Monorepo 根目录下的 composer.json 通过 repositories 字段来管理内部的 packages,这使得各个服务可以像引用外部依赖一样引用内部库。
// /composer.json
{
"name": "my-company/monorepo",
"private": true,
"repositories": [
{
"type": "path",
"url": "packages/*"
}
],
"require": {
"my-company/user-service": "*@dev",
"my-company/order-service": "*@dev",
// ... other services
}
}
目录结构如下:
.
├── packages/
│ ├── EventContracts/ // 共享的事件定义
│ │ ├── src/
│ │ │ └── Events/
│ │ │ └── User/
│ │ │ └── UserCreated.php
│ │ └── composer.json
│ ├── EventBus/ // 事件总线实现
│ │ ├── src/
│ │ └── composer.json
│ └── UserService/ // 用户服务 (事件发布方)
│ ├── src/
│ └── composer.json
└── OrderService/ // 订单服务 (事件消费方)
├── src/
└── composer.json
关键在于 packages/EventContracts。这里是所有跨服务事件的唯一真实来源(Single Source of Truth)。所有事件都被定义为具体的、不可变的 PHP 类。
// /packages/EventContracts/src/Events/User/UserCreated.php
namespace MyCompany\EventContracts\Events\User;
use MyCompany\EventContracts\AbstractDomainEvent;
final class UserCreated extends AbstractDomainEvent
{
/**
* @param non-empty-string $userId
* @param non-empty-string $email
* @param non-empty-string $name
* @param \DateTimeImmutable $createdAt
*/
public function __construct(
public readonly string $userId,
public readonly string $email,
public readonly string $name,
public readonly \DateTimeImmutable $createdAt,
) {}
/**
* 定义事件的唯一标识符,用于消息路由
* @return string
*/
public static function getEventName(): string
{
return 'user.created';
}
/**
* 将事件对象转换为可序列化的数组
* @return array
*/
public function toPayload(): array
{
return [
'user_id' => $this->userId,
'email' => $this->email,
'name' => $this->name,
'created_at' => $this->createdAt->format(\DateTimeInterface::RFC3339_EXTENDED),
];
}
/**
* 从数组负载重构事件对象
* @param array $payload
* @return static
* @throws \Exception
*/
public static function fromPayload(array $payload): self
{
return new self(
$payload['user_id'],
$payload['email'],
$payload['name'],
new \DateTimeImmutable($payload['created_at'])
);
}
}
// /packages/EventContracts/src/AbstractDomainEvent.php
namespace MyCompany\EventContracts;
abstract class AbstractDomainEvent
{
abstract public static function getEventName(): string;
abstract public function toPayload(): array;
abstract public static function fromPayload(array $payload): self;
}
通过这种方式,任何服务想要发布或消费 UserCreated 事件,都必须依赖 my-company/event-contracts 包。PHP 的类型系统保证了在编译时(或静态分析时)就能发现属性或方法的不匹配,彻底杜绝了因“魔法字符串”或手误导致的生产问题。
第二步:构建 EventBus - 封装发布逻辑
EventBus 包的核心职责是提供一个简单的接口,让任何服务都能发布一个 AbstractDomainEvent 对象,而无需关心背后是 SNS、Kafka 还是 RabbitMQ。
首先,我们定义一个 EventPublisher 接口和它的 SNS 实现。
// /packages/EventBus/src/Interfaces/EventPublisher.php
namespace MyCompany\EventBus\Interfaces;
use MyCompany\EventContracts\AbstractDomainEvent;
interface EventPublisher
{
public function publish(AbstractDomainEvent $event): void;
}
// /packages/EventBus/src/Publishers/SnsEventPublisher.php
namespace MyCompany\EventBus\Publishers;
use Aws\Sns\SnsClient;
use MyCompany\EventContracts\AbstractDomainEvent;
use MyCompany\EventBus\Interfaces\EventPublisher;
use Psr\Log\LoggerInterface;
use Throwable;
class SnsEventPublisher implements EventPublisher
{
public function __construct(
private readonly SnsClient $snsClient,
private readonly LoggerInterface $logger,
private readonly string $topicArn // SNS Topic ARN 从配置注入
) {}
public function publish(AbstractDomainEvent $event): void
{
try {
$messageBody = json_encode($event->toPayload(), JSON_THROW_ON_ERROR);
$this->snsClient->publish([
'TopicArn' => $this->topicArn,
'Message' => $messageBody,
'MessageAttributes' => [
'EventType' => [
'DataType' => 'String',
'StringValue' => $event::getEventName(),
],
],
]);
$this->logger->info('Event published successfully.', [
'event_type' => $event::getEventName(),
'topic_arn' => $this->topicArn,
]);
} catch (Throwable $e) {
// 在真实项目中,这里应该有更健壮的错误处理
// 例如:推送到死信队列、触发告警
$this->logger->error('Failed to publish event to SNS.', [
'event_type' => $event::getEventName(),
'exception' => $e->getMessage(),
'trace' => $e->getTraceAsString(),
]);
// 决定是否抛出异常,这取决于业务是否允许消息丢失
// throw new EventPublishingFailedException('...', 0, $e);
}
}
}
这里的关键点在于 MessageAttributes。我们将事件名 user.created 放入消息属性中。这对于消费方至关重要,因为它允许消费方(比如 SQS 订阅)设置过滤策略,只接收自己感兴趣的事件类型,避免不必要的网络流量和计算开销。
为了让这个实现无缝集成到 Laravel 应用中,我们创建了一个 ServiceProvider。
// /packages/EventBus/src/EventBusServiceProvider.php
namespace MyCompany\EventBus;
use Aws\Sns\SnsClient;
use Illuminate\Contracts\Foundation\Application;
use Illuminate\Support\ServiceProvider;
use MyCompany\EventBus\Interfaces\EventPublisher;
use MyCompany\EventBus\Publishers\SnsEventPublisher;
use Psr\Log\LoggerInterface;
class EventBusServiceProvider extends ServiceProvider
{
public function register(): void
{
$this->mergeConfigFrom(__DIR__.'/../config/event-bus.php', 'event-bus');
$this->app->singleton(EventPublisher::class, function (Application $app) {
$config = $app['config']['event-bus.sns'];
$snsClient = new SnsClient([
'version' => 'latest',
'region' => $config['region'],
'credentials' => [
'key' => $config['key'],
'secret' => $config['secret'],
],
// 在生产环境中,推荐使用 IAM Role,而不是硬编码 key/secret
]);
return new SnsEventPublisher(
$snsClient,
$app->make(LoggerInterface::class),
$config['topic_arn']
);
});
}
public function boot(): void
{
$this->publishes([
__DIR__.'/../config/event-bus.php' => config_path('event-bus.php'),
], 'config');
}
}
现在,在 UserService 中发布一个事件就变得非常简单。首先,UserService 的 composer.json 需要依赖这两个内部包:
// /packages/UserService/composer.json
{
"name": "my-company/user-service",
"require": {
"my-company/event-contracts": "*@dev",
"my-company/event-bus": "*@dev"
}
}
然后在业务代码中注入 EventPublisher 接口并使用它:
// /packages/UserService/src/Actions/CreateUserAction.php
use MyCompany\EventBus\Interfaces\EventPublisher;
use MyCompany\EventContracts\Events\User\UserCreated;
class CreateUserAction
{
public function __construct(private readonly EventPublisher $publisher) {}
public function execute(string $email, string $name): User
{
// ... 创建用户的数据库逻辑 ...
$user = User::create([...]);
// 发布领域事件
$event = new UserCreated(
$user->id,
$user->email,
$user->name,
new \DateTimeImmutable()
);
$this->publisher->publish($event);
return $user;
}
}
第三步:消费事件 - 安全的 Webhook 与事件路由
事件消费是整个链条中最复杂也最容易出错的一环。SNS 支持将消息推送到 HTTPS 端点。我们需要在消费方服务(如 OrderService)中创建一个统一的、安全的 Webhook 来接收所有来自 SNS 的通知。
这个 Webhook 必须处理两件核心任务:
- 验证请求来源:确保请求确实来自 AWS SNS,而不是伪造的恶意请求。
- 消息路由:根据消息内容,将其分发给正确的业务逻辑处理器。
为此,我们在 EventBus 包中构建一个可复用的 SnsWebhookController。
// /packages/EventBus/src/Http/Controllers/SnsWebhookController.php
namespace MyCompany\EventBus\Http\Controllers;
use Illuminate\Http\Request;
use Illuminate\Http\Response;
use Illuminate\Routing\Controller;
use Illuminate\Support\Facades\Log;
use GuzzleHttp\Client;
use MyCompany\EventBus\Sns\Message;
use MyCompany\EventBus\Sns\MessageValidator;
use MyCompany\EventBus\Sns\Exceptions\InvalidMessageSignatureException;
use MyCompany\EventBus\Routing\EventRouter;
class SnsWebhookController extends Controller
{
public function __construct(
private readonly MessageValidator $validator,
private readonly EventRouter $router
) {}
public function handle(Request $request): Response
{
try {
$message = Message::fromRequest($request);
$this->validator->validate($message);
} catch (InvalidMessageSignatureException $e) {
Log::warning('Invalid SNS message signature received.', ['error' => $e->getMessage()]);
return response('Invalid signature.', 400);
} catch (\Throwable $e) {
Log::error('Failed to parse SNS message.', ['error' => $e->getMessage()]);
return response('Invalid message format.', 400);
}
// 处理订阅确认
if ($message->isSubscriptionConfirmation()) {
return $this->confirmSubscription($message);
}
// 处理通知消息
if ($message->isNotification()) {
try {
$this->router->route($message);
return response('OK', 200);
} catch (\Throwable $e) {
// 如果路由或处理失败,返回服务器错误
// SNS会根据Topic的重试策略进行重试
Log::error('Failed to process SNS notification.', [
'message_id' => $message->getId(),
'error' => $e->getMessage(),
]);
return response('Processing failed.', 500);
}
}
Log::info('Received unhandled SNS message type.', ['type' => $message->getType()]);
return response('Message type not handled.', 200);
}
private function confirmSubscription(Message $message): Response
{
try {
// 必须访问 SubscribeURL 来确认订阅
(new Client())->get($message->getSubscriptionUrl());
Log::info('SNS subscription confirmed.', ['topic' => $message->getTopicArn()]);
return response('Subscription confirmed.', 200);
} catch (\Throwable $e) {
Log::error('Failed to confirm SNS subscription.', [
'url' => $message->getSubscriptionUrl(),
'error' => $e->getMessage(),
]);
return response('Could not confirm subscription.', 500);
}
}
}
签名验证是安全性的基石,绝不能省略。MessageValidator 的实现细节较为繁琐,它需要下载 AWS 提供的公钥证书,并验证消息签名的有效性。这是一个常见的模式,但代码必须严谨。
sequenceDiagram
participant AWS_SNS
participant OrderService_Webhook
participant MessageValidator
participant EventRouter
participant UserCreatedListener
AWS_SNS ->> OrderService_Webhook: POST /sns-webhook (Notification)
OrderService_Webhook ->> MessageValidator: validate(message)
MessageValidator ->> AWS_SNS: Download public key certificate (cached)
MessageValidator -->> OrderService_Webhook: Signature is valid
OrderService_Webhook ->> EventRouter: route(message)
EventRouter ->> EventRouter: Get 'EventType' attribute ("user.created")
EventRouter ->> EventRouter: Find listener for "user.created"
EventRouter ->> EventRouter: Deserialize message body into UserCreated object
EventRouter ->> UserCreatedListener: handle(UserCreated event)
UserCreatedListener -->> EventRouter: Logic executed successfully
EventRouter -->> OrderService_Webhook: OK
OrderService_Webhook -->> AWS_SNS: HTTP 200 OK
EventRouter 的作用是将 SNS 消息体转换为我们定义的强类型 AbstractDomainEvent 对象,然后分发给 Laravel 的事件系统。
// /packages/EventBus/src/Routing/EventRouter.php
namespace MyCompany\EventBus\Routing;
use Illuminate\Contracts\Events\Dispatcher;
use MyCompany\EventBus\Sns\Message;
use MyCompany\EventContracts\AbstractDomainEvent;
class EventRouter
{
/** @var array<string, class-string<AbstractDomainEvent>> */
private array $eventMap;
public function __construct(
private readonly Dispatcher $laravelDispatcher,
array $eventMap
) {
$this->eventMap = $eventMap;
}
public function route(Message $message): void
{
$eventType = $message->getEventType();
if (!isset($this->eventMap[$eventType])) {
// 在真实项目中,这可能是一个需要监控的指标
// 表明有未知的事件类型被发送
\Illuminate\Support\Facades\Log::warning('Unknown event type received.', ['type' => $eventType]);
return;
}
$eventClass = $this->eventMap[$eventType];
$payload = $message->getBody();
/** @var AbstractDomainEvent $event */
$event = $eventClass::fromPayload($payload);
// 分发给 Laravel 的本地事件监听器
$this->laravelDispatcher->dispatch($event);
}
}
这个 eventMap 配置是连接外部消息和内部代码的桥梁。它在消费方服务的 ServiceProvider 中进行配置。
现在,在 OrderService 中,我们需要做以下几件事:
- 依赖
event-contracts和event-bus。 - 在
routes/api.php中注册 webhook 路由。 - 创建一个 Laravel Listener 来处理
UserCreated事件。 - 配置
EventRouter的eventMap。
// /packages/OrderService/src/Listeners/CreateCustomerProfileOnUserCreation.php
namespace MyCompany\OrderService\Listeners;
use MyCompany\EventContracts\Events\User\UserCreated;
use Illuminate\Contracts\Queue\ShouldQueue; // 实现 ShouldQueue 使监听器异步执行
class CreateCustomerProfileOnUserCreation implements ShouldQueue
{
public function handle(UserCreated $event): void
{
// 这里是具体的业务逻辑
// 例如,在订单服务的数据库中创建一个客户记录
\Illuminate\Support\Facades\Log::info('Handling UserCreated event in OrderService', [
'user_id' => $event->userId,
'email' => $event->email,
]);
// Customer::create([...]);
}
}
// /packages/OrderService/src/Providers/EventServiceProvider.php
namespace MyCompany\OrderService\Providers;
use Illuminate\Foundation\Support\Providers\EventServiceProvider as ServiceProvider;
use MyCompany\EventContracts\Events\User\UserCreated;
use MyCompany\OrderService\Listeners\CreateCustomerProfileOnUserCreation;
class EventServiceProvider extends ServiceProvider
{
protected $listen = [
UserCreated::class => [
CreateCustomerProfileOnUserCreation::class,
],
];
}
// /packages/OrderService/src/Providers/AppServiceProvider.php
...
use MyCompany\EventBus\Routing\EventRouter;
public function register()
{
$this->app->singleton(EventRouter::class, function ($app) {
return new EventRouter(
$app['events'],
config('event-bus.event_map')
);
});
}
config/event-bus.php 配置文件中则包含了事件名到类的映射:
// /packages/OrderService/config/event-bus.php
return [
'event_map' => [
'user.created' => \MyCompany\EventContracts\Events\User\UserCreated::class,
// ... 其他事件映射
],
];
至此,整个链路就通了。当 UserService 创建一个用户时,它会发布一个 UserCreated 事件。SNS 接收到这个事件后,会通过 HTTPS 请求调用 OrderService 的 webhook。Webhook 控制器验证签名,将消息交给 EventRouter,后者反序列化消息体为 UserCreated 对象,并最终触发 CreateCustomerProfileOnUserCreation 监听器的执行。
测试策略思考
一个没有测试的系统是脆弱的。对于这套事件总线,我们的测试分为几个层面:
- 单元测试:
CreateCustomerProfileOnUserCreation监听器可以被独立测试,只需手动创建一个UserCreated事件对象作为输入即可。 - **集成测试 (Publisher)**:在
UserService的测试中,我们可以使用 Laravel 的Event::fake()来断言事件是否被发布。或者,更进一步,我们可以 mockSnsEventPublisher,并断言publish方法是否被以正确的参数调用。 - **集成测试 (Consumer)**:
SnsWebhookController的测试是关键。我们可以构造一个模拟的 SNS 请求体,包括合法的签名和消息头,然后POST到测试路由,断言最终对应的 Listener 是否被执行。对于签名验证部分,可以编写专门的测试用例,使用一个固定的私钥生成签名,然后用公钥去验证,覆盖成功和失败的场景。
当前方案的局限性与未来迭代方向
这套方案解决了我们最初的几个核心痛点,但在生产环境中,它依然存在一些可以优化的地方。
首先,Webhook 的同步处理模式是它的阿喀琉斯之踵。如果 OrderService 的监听器执行时间过长,或者突发事件量增大,很容易导致 webhook 端点响应超时。SNS 会根据策略重试,但这可能引发更严重的服务雪崩。一个更健壮的架构是采用“SNS-to-SQS”模式。每个消费者服务都拥有一个自己的 SQS 队列,该队列订阅 SNS 主题。服务不再通过 HTTP Webhook 接收事件,而是通过一个常驻的 Laravel Queue Worker 从 SQS 队列中拉取消息。这种方式将事件的接收和处理彻底异步化,利用了 SQS 的持久化、重试和死信队列能力,极大提升了系统的韧性。
其次,本地开发体验有待提升。当前方案要求开发者在本地也配置 AWS 的凭证,并且需要一个公网可访问的地址来接收 SNS 的 webhook(通常通过 ngrok 等工具实现)。这增加了本地环境的搭建复杂度。未来的迭代可以为 EventBus 包增加一个 log 或 sync 驱动。在本地开发时,可以将 EventPublisher 的实现切换为直接将事件写入日志文件,或者同步调用对应的 Listener,从而完全脱离对 AWS 和网络穿透工具的依赖。
最后,随着事件种类的增多,eventMap 的维护会成为一个新的认知负担。可以探索基于 PHP 的注解或属性(Attribute)进行自动注册,当一个 Listener 监听一个 AbstractDomainEvent 时,自动将其 getEventName() 和类名的映射关系注册到 EventRouter 中,减少手动配置。