Skip to content

事件总线

This content is not available in your language yet.

smart-mqtt 的事件总线是整个框架的核心组件,采用经典的生产/消费模型实现。所有 MQTT 消息处理、连接管理、订阅管理等都通过事件总线驱动,这为开发者提供了极强的扩展能力。

事件总线架构图

事件总线采用 发布-订阅模式(Pub/Sub),由三个核心角色组成:

flowchart LR
    P[Publisher<br/>事件发布者] -->|publish| B[EventBus<br/>事件总线]
    B -->|notify| S[Subscriber<br/>事件订阅者]
  • Publisher(发布者):在特定时机触发事件,如连接建立、消息接收等
  • EventBus(事件总线):负责事件的注册、分发和路由
  • Subscriber(订阅者):监听感兴趣的事件并执行相应处理逻辑
sequenceDiagram
    participant Client as MQTT Client
    participant Broker as Broker
    participant Bus as EventBus
    participant Subs as Subscribers

    Client->>Broker: 发送 CONNECT
    Broker->>Bus: publish(CONNECT事件)
    Bus->>Subs: 异步通知所有订阅者
    Subs-->>Bus: 完成处理
    Bus-->>Broker: 继续后续流程
    Broker->>Client: 返回 CONNACK

事件总线支持同步异步两种发布模式:

模式特点适用场景
同步模式发布者阻塞等待所有订阅者处理完成简单、快速的逻辑处理
异步模式基于 CompletableFuture 链式处理耗时操作(如认证、日志记录)

smart-mqtt 内置了丰富的事件类型,覆盖 Broker 生命周期的各个阶段:

事件类型触发时机数据对象特性
BROKER_CONFIGURE_LOADEDBroker 配置加载完成时Options一次性事件
BROKER_STARTEDBroker 启动成功时BrokerContext一次性事件
BROKER_DESTROYBroker 停止服务时BrokerContext一次性事件

使用场景:插件初始化、资源准备、优雅关闭等。

事件类型触发时机数据对象说明
SESSION_CREATETCP 连接建立,Session 初始化完成MqttSession连接建立但尚未认证
CONNECT客户端 CONNECT 消息认证通过AsyncEventObject<MqttConnectMessage>异步事件,支持拦截
DISCONNECTTCP 连接断开时AbstractSession包含正常断开和异常断开

SESSION_CREATE vs CONNECT 的区别

  • SESSION_CREATE:仅表示 TCP 连接建立,MQTT Session 对象已创建
  • CONNECT:表示 MQTT 连接认证通过,客户端可以开始正常通信
事件类型触发时机数据对象说明
RECEIVE_MESSAGE接收到客户端任何消息时EventObject<MqttMessage>高性能优化,独立存储
WRITE_MESSAGE向客户端发送任何消息时EventObject<MqttMessage>高性能优化,独立存储
RECEIVE_CONN_ACK_MESSAGE接收到 CONNACK 消息时MqttConnAckMessage客户端模式使用

性能优化RECEIVE_MESSAGEWRITE_MESSAGE 是高频事件,框架使用独立的 CopyOnWriteArrayList 存储订阅者,并做了空检查优化,无订阅者时零开销。

事件类型触发时机数据对象说明
TOPIC_CREATE新 Topic 被创建时String(Topic名称)首次订阅触发
SUBSCRIBE_ACCEPT服务端接受订阅请求时EventObject<MqttTopicSubscription>验证通过后触发
UNSUBSCRIBE_ACCEPT服务端接受取消订阅时EventObject<MqttUnsubscribeMessage>验证通过后触发
SUBSCRIBE_TOPIC客户端订阅 Topic 成功时EventObject<MessageDeliver>建立订阅关系后触发
UNSUBSCRIBE_TOPIC客户端取消订阅 Topic 时MessageDeliver解除订阅关系时触发
SUBSCRIBE_REFRESH_TOPIC刷新 Topic 订阅时MessageDeliver会话恢复时触发

事件触发顺序

客户端发送 SUBSCRIBE
SUBSCRIBE_ACCEPT(验证通过)
TOPIC_CREATE(如果是新Topic)
SUBSCRIBE_TOPIC(建立订阅关系)

标记为 一次性事件once=true)的类型,其订阅者只会被执行一次,之后自动失效。这适用于初始化操作:

// BROKER_STARTED 只会在 Broker 启动完成后触发一次
brokerContext.getEventBus().subscribe(EventType.BROKER_STARTED, (eventType, context) -> {
// 执行一次性初始化操作
initializePlugin();
});

订阅客户端连接事件,统计在线连接数:

AtomicInteger connectionCount = new AtomicInteger(0);
// CONNECT 是异步事件,需要使用 syncConsumer 包装
brokerContext.getEventBus().subscribe(EventType.CONNECT,
AsyncEventObject.syncConsumer((eventType, event) -> {
MqttSession session = event.getSession();
int count = connectionCount.incrementAndGet();
System.out.println("Client " + session.getClientId() + " connected, total: " + count);
event.getFuture().complete(null); // 完成异步处理
})
);
// DISCONNECT 是同步事件
brokerContext.getEventBus().subscribe(EventType.DISCONNECT, (eventType, session) -> {
int count = connectionCount.decrementAndGet();
System.out.println("Client " + session.getClientId() + " disconnected, total: " + count);
});

记录所有收发的消息:

// 接收消息
brokerContext.getEventBus().subscribe(EventType.RECEIVE_MESSAGE, (eventType, event) -> {
MqttMessage message = event.getObject();
MqttSession session = event.getSession();
logger.info("[RECV] client={}, message={}", session.getClientId(), message);
});
// 发送消息
brokerContext.getEventBus().subscribe(EventType.WRITE_MESSAGE, (eventType, event) -> {
MqttMessage message = event.getObject();
MqttSession session = event.getSession();
logger.info("[SEND] client={}, message={}", session.getClientId(), message);
});

在 Broker 启动时执行插件初始化:

brokerContext.getEventBus().subscribe(EventType.BROKER_STARTED,
new DisposableEventBusSubscriber<BrokerContext>() {
@Override
public void consumer(EventType<BrokerContext> eventType, BrokerContext context) {
// 只执行一次:初始化数据库连接、启动后台线程等
plugin.initDatabase();
plugin.startBackgroundTask();
}
}
);

在 Broker 停止时释放资源:

brokerContext.getEventBus().subscribe(EventType.BROKER_DESTROY, (eventType, context) -> {
// 关闭线程池
executorService.shutdown();
// 关闭数据库连接
database.close();
// 清理临时文件
cleanupTempFiles();
});

插件可以定义自己的事件类型,用于模块间通信:

public class MyPlugin extends Plugin {
// 定义自定义事件类型
public static final EventType<MyCustomData> CUSTOM_EVENT =
new EventType<>("my_custom_event");
@Override
public void install(BrokerContext context) {
// 订阅自定义事件
context.getEventBus().subscribe(CUSTOM_EVENT, (eventType, data) -> {
handleCustomEvent(data);
});
}
private void someMethod() {
// 触发自定义事件
brokerContext.getEventBus().publish(CUSTOM_EVENT, myData);
}
}
  • 同步订阅:处理逻辑简单、耗时短(< 1ms)的场景
  • 异步订阅:涉及 I/O 操作(数据库、网络请求)的场景,避免阻塞 Broker

订阅者内部的异常不应影响其他订阅者和 Broker 运行:

brokerContext.getEventBus().subscribe(EventType.RECEIVE_MESSAGE, (eventType, event) -> {
try {
processMessage(event.getObject());
} catch (Exception e) {
logger.error("Process message failed", e);
// 不要抛出异常
}
});
  • 避免在 RECEIVE_MESSAGEWRITE_MESSAGE 事件中进行耗时操作
  • 如需处理,使用异步模式或将任务提交到独立的线程池
  • 高频事件的订阅者应尽量轻量化

事件可能在多线程环境下触发,确保订阅者逻辑线程安全:

// 使用线程安全的计数器
private final LongAdder messageCount = new LongAdder();
brokerContext.getEventBus().subscribe(EventType.RECEIVE_MESSAGE, (eventType, event) -> {
messageCount.increment(); // 线程安全
});
  • BROKER_STARTED 中初始化资源
  • BROKER_DESTROY 中释放资源
  • 避免内存泄漏:确保及时取消不再需要的事件订阅