AI 自动化插件开发规范
AI 自动化插件开发规范
Section titled “AI 自动化插件开发规范”本规范旨在为 AI 助手提供标准化的插件开发指导,使 AI 能够根据用户需求自动生成符合 smart-mqtt 架构的插件代码。
重要提示
本文档已基于实际代码验证,所有示例和配置均可直接用于生产环境。
- ✅ Maven 配置、ServiceLoader 文件路径等关键信息已与实项目保持一致
- ✅ 事件类型列表完整,包含所有可用的 EventType 常量
- ✅ 示例代码参考了 SimpleAuthPlugin 和 WebSocketPlugin 等实际插件
一、快速开始
Section titled “一、快速开始”1.1 项目结构
Section titled “1.1 项目结构”plugin-name/├── src/main/│ ├── java/tech/smartboot/mqtt/{module}/│ │ ├── {PluginName}Plugin.java # 插件主类(必需)│ │ ├── PluginConfig.java # 配置类(可选)│ │ └── ... # 其他业务类│ └── resources/│ ├── META-INF/services/│ │ └── tech.smartboot.mqtt.plugin.spec.Plugin # ServiceLoader 配置文件(必需)│ ├── plugin.yaml # 插件配置文件(可选)│ └── readme.md # 插件说明(必需)├── pom.xml # Maven 配置(必需)1.2 ServiceLoader 配置
Section titled “1.2 ServiceLoader 配置”在 resources/META-INF/services/ 目录下创建名为 tech.smartboot.mqtt.plugin.spec.Plugin 的文件,内容为插件实现类的全限定名:
tech.smartboot.mqtt.{module}.{PluginName}Plugin注意:此文件必须存在,否则插件无法被自动扫描加载。
1.3 Maven POM 配置模板
Section titled “1.3 Maven POM 配置模板”重要提示:插件项目必须继承 plugins 父 POM,并使用正确的 groupId。
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<!-- 必须继承 plugins 父 POM --> <parent> <groupId>tech.smartboot.mqtt</groupId> <artifactId>plugins</artifactId> <version>1.5.3</version> </parent>
<groupId>tech.smartboot.mqtt</groupId> <artifactId>{plugin-artifact-id}</artifactId> <packaging>jar</packaging>
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties>
<dependencies> <!-- 父 POM 已管理 smart-mqtt-plugin-spec 依赖,子模块无需重复声明 --> <!-- 如需其他依赖,请在此添加 --> <!-- 示例:WebSocket 插件需要 feat-core --> <!-- <dependency> <groupId>tech.smartboot.feat</groupId> <artifactId>feat-core</artifactId> </dependency> --> </dependencies>
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.10.1</version> <configuration> <source>8</source> <target>8</target> <debug>false</debug> </configuration> </plugin> <plugin> <artifactId>maven-shade-plugin</artifactId> <version>3.3.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <filters> <!-- 排除不需要打包的依赖 --> <filter> <artifact>com.alibaba.fastjson2:*</artifact> <excludes> <exclude>**/*</exclude> </excludes> </filter> <filter> <artifact>org.yaml:*</artifact> <excludes> <exclude>**/*</exclude> </excludes> </filter> <filter> <artifact>tech.smartboot.feat:*</artifact> <excludes> <exclude>**/*</exclude> </excludes> </filter> <filter> <artifact>io.github.smartboot.socket:*</artifact> <excludes> <exclude>**/*</exclude> </excludes> </filter> </filters> <transformers> <!-- 重要:追加方式合并 ServiceLoader 配置(注意有两个文件) --> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/services/tech.smartboot.mqtt.broker.plugin.Plugin</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/services/tech.smartboot.feat.cloud.CloudService</resource> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build></project>关键点:
- ✅ GroupId 必须是
tech.smartboot.mqtt(不是org.smartboot.mqtt) - ✅ 必须继承
plugins父 POM,版本与 Broker 一致(如 1.5.3) - ✅
maven-shade-plugin需要配置两个AppendingTransformer,分别处理两个 ServiceLoader 文件 - ✅
createDependencyReducedPom必须设置为false - ✅ 通过
<filters>排除 fastjson2、yaml、feat、smart-socket 等依赖,避免打包进插件
二、插件开发核心规范
Section titled “二、插件开发核心规范”2.1 插件基类继承规则
Section titled “2.1 插件基类继承规则”AI 生成插件时必须遵循:
- 必须继承
tech.smartboot.mqtt.plugin.spec.Plugin抽象类 - 必须实现以下抽象方法:
String getVersion()- 返回版本号String getVendor()- 返回开发者信息
- 推荐重写以下方法:
void initPlugin(BrokerContext brokerContext)- 初始化逻辑void destroyPlugin()- 销毁逻辑String pluginName()- 插件名称(默认类名)int order()- 加载优先级(默认 0,值越小优先级越高)Schema schema()- 配置可视化表单(可选)
禁止行为:
- ❌ 不要重写
install()和uninstall()方法(它们是 final 的) - ❌ 不要在构造器中执行初始化逻辑
- ❌ 不要直接打印 System.out,使用
log(String)方法
2.2 标准插件模板
Section titled “2.2 标准插件模板”package tech.smartboot.mqtt.{module};
import tech.smartboot.mqtt.plugin.spec.BrokerContext;import tech.smartboot.mqtt.plugin.spec.Options;import tech.smartboot.mqtt.plugin.spec.Plugin;import tech.smartboot.mqtt.plugin.spec.schema.Schema;
/** * {插件名称} * * 功能描述:{简要描述插件功能} * * @author {作者} * @version {版本号} */public class {PluginName}Plugin extends Plugin {
// 成员变量区域 private BrokerContext brokerContext; private PluginConfig config; // 其他业务对象...
@Override protected void initPlugin(BrokerContext brokerContext) throws Throwable { this.brokerContext = brokerContext;
// 步骤 1: 日志记录(使用 log 方法) log("正在初始化{插件名称}...");
// 步骤 2: 加载配置(如有) loadConfiguration();
// 步骤 3: 注册事件订阅(如需要) registerEventListeners();
// 步骤 4: 初始化业务资源 initializeResources();
// 步骤 5: 完成日志 log("{插件名称}初始化完成"); }
@Override protected void destroyPlugin() { log("正在关闭{插件名称}...");
// 释放资源的逆序过程 releaseResources();
log("{插件名称}已关闭"); }
@Override public String getVersion() { return "1.0.0"; // 或使用 Options.VERSION }
@Override public String getVendor() { return "{开发者或组织名称}"; // 或使用 Options.VENDOR }
@Override public String pluginName() { return "{plugin-name}"; // 建议使用 artifactId }
/** * 定义插件配置的可视化 Schema(可选) */ @Override public Schema schema() { Schema schema = new Schema(); // 添加配置项,详见 2.4 节 return schema; }
// ========== 私有辅助方法 ==========
/** * 加载配置文件 */ private void loadConfiguration() { try { config = loadPluginConfig(PluginConfig.class); if (config == null) { config = createDefaultConfig(); log("配置文件不存在,使用默认配置"); } } catch (Exception e) { log("加载配置失败:" + e.getMessage()); config = createDefaultConfig(); } }
/** * 创建默认配置 */ private PluginConfig createDefaultConfig() { PluginConfig config = new PluginConfig(); // 设置默认值 return config; }
/** * 注册事件监听器 */ private void registerEventListeners() { // 示例:订阅 CONNECT 事件 subscribe(EventType.CONNECT, event -> { // 处理连接事件 }); }
/** * 初始化资源 */ private void initializeResources() { // 初始化业务逻辑 }
/** * 释放资源 */ private void releaseResources() { // 清理资源 }}2.3 配置类规范
Section titled “2.3 配置类规范”如果插件需要配置参数,必须创建对应的配置类:
package tech.smartboot.mqtt.{module};
/** * 插件配置类 * * 命名规范:PluginConfig * 字段规范:使用驼峰命名,提供 getter/setter */public class PluginConfig {
// 基础配置(推荐) private String host = "127.0.0.1"; private int port = 1883;
// 业务配置(按需) private int timeout = 30000; private boolean enabled = true; private String username; private String password;
// 嵌套配置(复杂场景) private AdvancedConfig advanced;
// getter 和 setter 方法 public String getHost() { return host; } public void setHost(String host) { this.host = host; }
public int getPort() { return port; } public void setPort(int port) { this.port = port; }
// ... 其他 getter/setter
/** * 嵌套配置类(可选) */ public static class AdvancedConfig { private int maxConnections = 100; private String strategy = "round-robin";
// getter/setter... }}配置文件位置:
- 默认:
plugins/{plugin-name}/plugin.yaml - 也可使用外部配置:
plugins/{plugin-name}.yaml
YAML 配置示例:
host: 192.168.1.100port: 1883timeout: 60000enabled: trueusername: adminpassword: secret123advanced: maxConnections: 200 strategy: "load-balance"2.4 Schema 可视化配置规范
Section titled “2.4 Schema 可视化配置规范”为了让插件配置在控制台可可视化编辑,需要实现 schema() 方法:
@Overridepublic Schema schema() { Schema schema = new Schema();
// 1. 字符串类型 schema.addItem(Item.String("host", "服务器地址") .tip("默认:127.0.0.1") .col(6)); // 占用半行宽度
// 2. 整数类型 schema.addItem(Item.Int("port", "端口号") .tip("默认:1883") .col(3)); // 占用 1/4 行宽度
// 3. 密码类型(隐藏输入) schema.addItem(Item.Password("password", "访问密码") .tip("建议定期更换"));
// 4. 文本域(多行文本) schema.addItem(Item.TextArea("description", "描述信息") .height(200)); // 高度 200px
// 5. 开关类型(布尔值) schema.addItem(Item.Switch("enabled", "启用服务") .tip("关闭后插件将不工作"));
// 6. 枚举类型(单选) schema.addItem(Item.String("strategy", "负载均衡策略") .addEnums( Enum.of("round-robin", "轮询"), Enum.of("least-conn", "最少连接"), Enum.of("hash", "哈希")) .tip("默认:round-robin"));
// 7. 多选枚举 schema.addItem(Item.MultiEnum("features", "功能特性") .addEnums( Enum.of("auth", "认证"), Enum.of("encrypt", "加密"), Enum.of("compress", "压缩")));
// 8. 对象类型(嵌套配置) Item advancedObj = Item.Object("advanced", "高级配置"); advancedObj.addItem(Item.Int("maxConnections", "最大连接数").col(6)); advancedObj.addItem(Item.Int("timeout", "超时时间 (ms)").col(6)); schema.addItem(advancedObj);
return schema;}支持的 Item 类型汇总:
| 类型 | 方法 | 用途 | 特殊属性 |
|---|---|---|---|
string | Item.String(name, desc) | 普通文本输入 | - |
int | Item.Int(name, desc) | 整数输入 | - |
password | Item.Password(name, desc) | 密码输入(隐藏) | - |
textarea | Item.TextArea(name, desc) | 多行文本 | .height(n) |
switch | Item.Switch(name, desc) | 布尔开关 | - |
enum | Item.String().addEnums(...) | 单选枚举 | .addEnums(...) |
multi_enum | Item.MultiEnum(name, desc) | 多选枚举 | .addEnums(...) |
object | Item.Object(name, desc) | 嵌套对象 | .addItem(...) |
布局控制:
.col(n):设置列宽(一行 12 列),如.col(6)占半行.height(n):设置文本域高度(像素).tip("提示"):添加配置项提示信息
三、事件总线使用规范
Section titled “三、事件总线使用规范”3.1 事件类型详解
Section titled “3.1 事件类型详解”完整的事件类型列表(基于实际代码):
连接相关事件
Section titled “连接相关事件”// 1. CONNECT - 客户端连接请求(异步事件)// 触发时机:收到客户端 CONNECT 报文时// 事件对象:AsyncEventObject<MqttConnectMessage>// 典型用途:认证、权限校验、连接限制subscribe(EventType.CONNECT, AsyncEventObject.syncSubscriber((eventType, event) -> { MqttSession session = event.getSession(); MqttConnectMessage message = event.getObject();
// 认证逻辑 if (!authenticate(session, message)) { MqttSession.connFailAck(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED, session); } else { session.setAuthorized(true); }}));
// 2. DISCONNECT - 客户端断开连接// 触发时机:客户端断开连接时// 事件对象:AbstractSession// 典型用途:清理资源、统计在线时长subscribe(EventType.DISCONNECT, session -> { log("客户端断开:" + session.getClientId());});
// 3. SESSION_CREATE - 创建会话// 触发时机:创建 MqttSession 对象时// 事件对象:MqttSession// 典型用途:会话初始化、绑定额外属性subscribe(EventType.SESSION_CREATE, session -> { session.setAttribute("createTime", System.currentTimeMillis());});消息相关事件
Section titled “消息相关事件”// 4. RECEIVE_MESSAGE - 接收到客户端消息// 触发时机:收到任何类型的 MQTT 报文// 事件对象:EventObject<MqttMessage>// 典型用途:消息审计、流量统计subscribe(EventType.RECEIVE_MESSAGE, event -> { MqttMessage message = event.getObject(); // 处理接收到的消息});
// 5. WRITE_MESSAGE - 向客户端发送消息// 触发时机:发送任何类型的 MQTT 报文// 事件对象:EventObject<MqttMessage>// 典型用途:消息过滤、响应拦截subscribe(EventType.WRITE_MESSAGE, event -> { MqttMessage message = event.getObject(); // 可以修改或阻止消息发送});
// 6. RECEIVE_CONN_ACK_MESSAGE - 接收到 CONNACK 报文// 触发时机:Broker 发送 CONNACK 报文后// 事件对象:MqttConnAckMessage// 典型用途:连接结果监控、认证结果记录subscribe(EventType.RECEIVE_CONN_ACK_MESSAGE, message -> { log("发送 CONNACK: " + message.getVariableHeader().getReturnCode());});
// 7. SUBSCRIBE_ACCEPT - 接受订阅请求// 触发时机:客户端订阅主题成功后// 事件对象:EventObject<MqttTopicSubscription>// 典型用途:订阅授权、订阅记录subscribe(EventType.SUBSCRIBE_ACCEPT, event -> { MqttTopicSubscription subscription = event.getObject(); log("客户端订阅:" + subscription.getTopicFilter());});
// 8. UNSUBSCRIBE_ACCEPT - 取消订阅// 触发时机:客户端取消订阅成功后// 事件对象:EventObject<MqttUnsubscribeMessage>// 典型用途:清理订阅关系subscribe(EventType.UNSUBSCRIBE_ACCEPT, event -> { // 处理取消订阅});Topic 相关事件
Section titled “Topic 相关事件”// 9. TOPIC_CREATE - 创建新主题// 触发时机:首次有消息发布到某主题时// 事件对象:String (topic 名称)// 典型用途:主题管理、权限控制subscribe(EventType.TOPIC_CREATE, topicName -> { log("新主题创建:" + topicName);});
// 10. SUBSCRIBE_TOPIC - 客户端订阅 Topic// 触发时机:客户端订阅某个主题时// 事件对象:EventObject<MessageDeliver>// 典型用途:动态授权、订阅拦截subscribe(EventType.SUBSCRIBE_TOPIC, event -> { MessageDeliver deliver = event.getObject(); // 可以拒绝订阅});
// 11. UNSUBSCRIBE_TOPIC - 客户端取消订阅// 触发时机:客户端取消订阅某个主题时// 事件对象:MessageDeliversubscribe(EventType.UNSUBSCRIBE_TOPIC, deliver -> { // 处理取消订阅});
// 12. SUBSCRIBE_REFRESH_TOPIC - 订阅刷新// 触发时机:订阅关系刷新时// 事件对象:MessageDeliver// 典型用途:动态更新订阅关系subscribe(EventType.SUBSCRIBE_REFRESH_TOPIC, deliver -> { // 处理订阅刷新});生命周期事件
Section titled “生命周期事件”// 13. BROKER_STARTED - Broker 启动完成// 触发时机:Broker 服务完全启动后// 事件对象:BrokerContext// 典型用途:启动定时任务、初始化全局资源// 特点:一次性事件(once=true),只在启动时触发一次subscribe(EventType.BROKER_STARTED, context -> { log("Broker 已启动,开始提供服务");
// 启动定时任务 timer().scheduleAtFixedRate(() -> { // 定期执行的任务 }, 0, 60, TimeUnit.SECONDS);});
// 14. BROKER_DESTROY - Broker 即将销毁// 触发时机:Broker 服务停止前// 事件对象:BrokerContext// 典型用途:资源清理、数据持久化// 特点:一次性事件subscribe(EventType.BROKER_DESTROY, context -> { log("Broker 正在关闭..."); // 保存状态、关闭连接等});
// 15. BROKER_CONFIGURE_LOADED - Broker 配置加载完成// 触发时机:Broker 配置加载完成后,服务启动前// 事件对象:Options// 典型用途:动态调整配置subscribe(EventType.BROKER_CONFIGURE_LOADED, options -> { // 可以在这里修改 Broker 配置});事件类型汇总表:
| 事件常量 | 事件对象类型 | 异步事件 | 触发时机 | 典型用途 |
|---|---|---|---|---|
CONNECT | AsyncEventObject<MqttConnectMessage> | ✅ | 客户端连接请求 | 认证、权限校验 |
DISCONNECT | AbstractSession | ❌ | 客户端断开连接 | 清理资源、统计 |
SESSION_CREATE | MqttSession | ❌ | 创建会话 | 会话初始化 |
RECEIVE_MESSAGE | EventObject<MqttMessage> | ❌ | 接收消息 | 消息审计、统计 |
WRITE_MESSAGE | EventObject<MqttMessage> | ❌ | 发送消息 | 消息过滤、拦截 |
RECEIVE_CONN_ACK_MESSAGE | MqttConnAckMessage | ❌ | 发送 CONNACK | 连接结果监控 |
SUBSCRIBE_ACCEPT | EventObject<MqttTopicSubscription> | ❌ | 接受订阅 | 订阅授权、记录 |
UNSUBSCRIBE_ACCEPT | EventObject<MqttUnsubscribeMessage> | ❌ | 取消订阅 | 清理订阅关系 |
TOPIC_CREATE | String | ❌ | 创建主题 | 主题管理 |
SUBSCRIBE_TOPIC | EventObject<MessageDeliver> | ❌ | 订阅 Topic | 动态授权 |
UNSUBSCRIBE_TOPIC | MessageDeliver | ❌ | 取消订阅 Topic | 清理订阅 |
SUBSCRIBE_REFRESH_TOPIC | MessageDeliver | ❌ | 刷新订阅 | 动态更新 |
BROKER_STARTED | BrokerContext | ❌ | Broker 启动 | 初始化资源 |
BROKER_DESTROY | BrokerContext | ❌ | Broker 销毁 | 资源清理 |
BROKER_CONFIGURE_LOADED | Options | ❌ | 配置加载完成 | 调整配置 |
3.2 事件订阅最佳实践
Section titled “3.2 事件订阅最佳实践”AI 必须遵循的规则:
-
异步事件处理:对于 CONNECT 等可能耗时的操作,使用
AsyncEventObjectsubscribe(EventType.CONNECT, AsyncEventObject.syncSubscriber((type, event) -> {// 同步处理,阻塞直到完成})); -
事件消费者 enable 检查:确保插件销毁后不再消费事件
subscribe(EventType.RECEIVE_MESSAGE, new EventBusConsumer<MqttMessage>() {@Overridepublic void consumer(EventType<MqttMessage> eventType, MqttMessage message) {// 处理消息}@Overridepublic boolean enable() {return !destroyed; // 插件销毁后禁用}}); -
异常处理:事件处理中的异常不应影响 Broker 运行
subscribe(EventType.CONNECT, event -> {try {// 业务逻辑} catch (Exception e) {log("事件处理异常:" + e.getMessage());// 不要抛出异常}}); -
资源释放:在
destroyPlugin()中清理事件订阅相关的资源@Overrideprotected void destroyPlugin() {// 停止定时器if (timer != null) {timer.shutdown();}// 关闭线程池if (executorService != null) {executorService.shutdown();}}
3.3 事件使用决策树
Section titled “3.3 事件使用决策树”AI 在选择事件类型时应遵循以下决策流程:
用户需求的触发点是什么?├─ 客户端连接 → EventType.CONNECT│ └─ 需要认证?→ 在 CONNECT 事件中校验并设置 session.setAuthorized(true)│ └─ 需要限流?→ 在 CONNECT 事件中检查当前连接数│├─ 客户端断开 → EventType.DISCONNECT│ └─ 清理资源、统计在线时长│├─ 收到消息 → EventType.RECEIVE_MESSAGE│ └─ 消息审计、流量统计、消息路由│├─ 发送消息 → EventType.WRITE_MESSAGE│ └─ 消息过滤、响应修改│├─ 订阅主题 → EventType.SUBSCRIBE_ACCEPT / SUBSCRIBE_TOPIC│ └─ 订阅授权、订阅记录│├─ 主题创建 → EventType.TOPIC_CREATE│ └─ 主题管理、权限初始化│├─ Broker 启动 → EventType.BROKER_STARTED│ └─ 启动定时任务、初始化全局资源│└─ Broker 关闭 → EventType.BROKER_DESTROY └─ 资源清理、数据持久化四、Provider 扩展点规范
Section titled “四、Provider 扩展点规范”Provider 是插件修改 Broker 核心行为的主要扩展点。AI 应理解不同 Provider 的用途和使用场景。
4.1 SessionStateProvider - 会话状态存储
Section titled “4.1 SessionStateProvider - 会话状态存储”用途:自定义会话状态的持久化方式(默认内存存储)
使用场景:
- 需要将会话状态持久化到数据库
- 集群环境下共享会话状态
- 会话状态备份恢复
实现示例:
@Overrideprotected void initPlugin(BrokerContext brokerContext) { // 自定义 SessionStateProvider brokerContext.getProviders().setSessionStateProvider(new SessionStateProvider() { private final Map<String, SessionState> cache = new ConcurrentHashMap<>();
@Override public void store(String clientId, SessionState sessionState) { // 将会话状态存储到数据库 cache.put(clientId, sessionState); log("存储会话:" + clientId);
// 示例:同时写入数据库 // jdbcTemplate.update("INSERT INTO session_state ...", ...); }
@Override public SessionState get(String clientId) { // 从数据库加载会话状态 log("获取会话:" + clientId); return cache.get(clientId); // 或:return jdbcTemplate.queryForObject("SELECT * FROM session_state WHERE client_id = ?", ...); }
@Override public void remove(String clientId) { // 从数据库删除会话状态 cache.remove(clientId); log("删除会话:" + clientId); } });
log("自定义会话状态存储已启用");}SessionState 包含的信息:
- 客户端 ID
- 订阅关系
- 未确认的消息
- 会话过期时间
4.2 SubscribeProvider - 订阅管理
Section titled “4.2 SubscribeProvider - 订阅管理”用途:自定义订阅关系的管理方式
使用场景:
- 订阅关系持久化
- 订阅关系同步(集群)
- 订阅鉴权
实现示例:
@Overrideprotected void initPlugin(BrokerContext brokerContext) { brokerContext.getProviders().setSubscribeProvider(new SubscribeProvider() { @Override public void addSubscription(String clientId, String topic, int qos) { // 添加订阅关系到数据库 log("添加订阅:clientId=" + clientId + ", topic=" + topic + ", qos=" + qos); // jdbcTemplate.update("INSERT INTO subscriptions ...", ...); }
@Override public void removeSubscription(String clientId, String topic) { // 删除订阅关系 log("删除订阅:clientId=" + clientId + ", topic=" + topic); }
@Override public List<Subscription> getSubscriptions(String clientId) { // 查询客户端的所有订阅 log("查询订阅:clientId=" + clientId); // return jdbcTemplate.query("SELECT * FROM subscriptions WHERE client_id = ?", ...); return Collections.emptyList(); } });
log("自定义订阅管理已启用");}4.3 认证实现方式
Section titled “4.3 认证实现方式”用途:通过 CONNECT 事件实现自定义认证逻辑
使用场景:
- 用户名密码认证
- Token 认证
- IP 白名单认证
- LDAP/AD 认证
- OAuth/JWT 认证
实现示例:
@Overrideprotected void initPlugin(BrokerContext brokerContext) { // 订阅 CONNECT 事件进行认证 subscribe(EventType.CONNECT, AsyncEventObject.syncSubscriber((eventType, event) -> { MqttSession session = event.getSession(); MqttConnectMessage message = event.getObject();
// 获取用户名密码 String username = message.getPayload().userName(); byte[] password = message.getPayload().passwordInBytes();
// 自定义认证逻辑 boolean authenticated = authenticate(username, password);
if (authenticated) { session.setAuthorized(true); log("认证成功:" + username); } else { // 认证失败,返回错误码 MqttSession.connFailAck(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED, session); log("认证失败:" + username); } }));
log("认证插件已启用");}
/** * 自定义认证逻辑 */private boolean authenticate(String username, byte[] password) { // 实现认证逻辑,例如: // 1. 查询数据库验证用户名密码 // 2. 调用 HTTP 接口认证 // 3. 验证 JWT Token // 4. LDAP 认证 return true;}注意:对于复杂认证场景(如需要异步调用外部服务),建议使用 AsyncEventObject 的同步订阅者模式,确保认证完成后再建立连接。
五、常用插件类型实现模板
Section titled “五、常用插件类型实现模板”提示:以下模板均基于实际插件代码整理,可直接参考使用。
5.1 认证插件
Section titled “5.1 认证插件”场景:自定义客户端认证逻辑
实现方式:使用 CONNECT 事件(标准方式)
参考实际插件:
SimpleAuthPlugin- 简单内存认证AdvancedAuthPlugin- 高级多认证器支持
public class CustomAuthPlugin extends Plugin {
@Override protected void initPlugin(BrokerContext brokerContext) throws Throwable { log("正在初始化自定义认证插件...");
// 订阅 CONNECT 事件 subscribe(EventType.CONNECT, AsyncEventObject.syncSubscriber((eventType, event) -> { MqttSession session = event.getSession(); MqttConnectMessage message = event.getObject();
// 获取用户名密码 String username = message.getPayload().userName(); byte[] password = message.getPayload().passwordInBytes();
// 自定义认证逻辑 boolean authenticated = authenticate(username, password);
if (authenticated) { session.setAuthorized(true); log("认证成功:" + username); } else { // 认证失败,返回错误码 MqttSession.connFailAck(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED, session); log("认证失败:" + username); } }));
log("自定义认证插件初始化完成"); }
private boolean authenticate(String username, byte[] password) { // TODO: 实现认证逻辑,例如: // 1. 查询数据库 // 2. 调用 HTTP 接口 // 3. 验证 JWT Token // 4. LDAP 认证 return true; }
@Override public String getVersion() { return "1.0.0"; }
@Override public String getVendor() { return "Your Company"; }}参考实际插件:SimpleAuthPlugin
5.2 消息桥接插件
Section titled “5.2 消息桥接插件”场景:将 MQTT 消息转发到其他系统(Redis、Kafka、HTTP 等)
public class MessageBridgePlugin extends Plugin {
private Jedis jedis; // Redis 客户端
@Override protected void initPlugin(BrokerContext brokerContext) throws Throwable { log("正在初始化消息桥接插件...");
// 初始化 Redis 连接 jedis = new Jedis("localhost", 6379);
// 订阅消息接收事件 subscribe(EventType.RECEIVE_MESSAGE, event -> { MqttMessage mqttMessage = event.getObject();
if (mqttMessage instanceof MqttPublishMessage) { MqttPublishMessage publishMessage = (MqttPublishMessage) mqttMessage; String topic = publishMessage.getTopicName(); byte[] payload = publishMessage.getPayload();
// 将消息发送到 Redis String key = "mqtt:" + topic; jedis.publish(key, new String(payload));
log("消息已桥接到 Redis: " + topic); } });
log("消息桥接插件初始化完成"); }
@Override protected void destroyPlugin() { log("正在关闭消息桥接插件..."); if (jedis != null) { jedis.close(); } log("消息桥接插件已关闭"); }
// ... 其他必要方法}5.3 监控插件
Section titled “5.3 监控插件”场景:收集和上报服务指标(连接数、消息量、TPS 等)
public class MetricsPlugin extends Plugin {
private ScheduledExecutorService scheduler; private AtomicLong messageCount = new AtomicLong(0); private AtomicLong connectionCount = new AtomicLong(0);
@Override protected void initPlugin(BrokerContext brokerContext) throws Throwable { log("正在初始化监控插件...");
// 统计连接数 subscribe(EventType.CONNECT, event -> { connectionCount.incrementAndGet(); });
// 统计消息量 subscribe(EventType.RECEIVE_MESSAGE, event -> { messageCount.incrementAndGet(); });
// 定时上报指标 scheduler = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "metrics-reporter"); t.setDaemon(true); return t; });
scheduler.scheduleAtFixedRate(() -> { long msgCount = messageCount.getAndSet(0); long connCount = connectionCount.getAndSet(0);
log(String.format("指标上报 - 消息数:%d, 连接数:%d, TPS: %d", msgCount, connCount, msgCount / 60));
// TODO: 发送到监控系统(Prometheus、Grafana 等) }, 0, 60, TimeUnit.SECONDS);
log("监控插件初始化完成"); }
@Override protected void destroyPlugin() { log("正在关闭监控插件..."); if (scheduler != null) { scheduler.shutdown(); } log("监控插件已关闭"); }
// ... 其他必要方法}5.4 协议转换插件
Section titled “5.4 协议转换插件”场景:WebSocket、HTTP 等其他协议转 MQTT
参考实际插件:
WebSocketPlugin- WebSocket 转 MQTTMqttsPlugin- MQTT over TLS/SSL
public class WebSocketPlugin extends Plugin {
private HttpServer httpServer;
@Override protected void initPlugin(BrokerContext brokerContext) throws Throwable { PluginConfig config = loadPluginConfig(PluginConfig.class);
// 注册使用的端口 addUsagePort(config.getPort(), "WebSocket 端口");
// 创建 HTTP 服务器 httpServer = Feat.httpServer(opts -> opts.debug(true)) .httpHandler(request -> { request.getResponse().setHeader("Sec-WebSocket-Protocol", "mqtt"); request.upgrade(new WebSocketUpgrade() { ProxySession proxySession;
@Override public void onHandShake(WebSocketRequest req, WebSocketResponse resp) { proxySession = new ProxySession(req.getAioSession(), resp); // 触发 NEW_SESSION 事件 brokerContext.Options().getProcessor() .stateEvent(proxySession, StateMachineEnum.NEW_SESSION, null); }
@Override public void handleBinaryMessage(WebSocketRequest req, WebSocketResponse resp, byte[] data) { // 解码 MQTT 消息并处理 // ... }
@Override public void destroy() { // 触发 SESSION_CLOSED 事件 brokerContext.Options().getProcessor() .stateEvent(proxySession, StateMachineEnum.SESSION_CLOSED, null); } }); });
httpServer.listen(config.getPort()); log("WebSocket 插件已启动,端口:" + config.getPort()); }
@Override protected void destroyPlugin() { if (httpServer != null) { httpServer.shutdown(); } }
// ... 其他必要方法}5.5 定时任务插件
Section titled “5.5 定时任务插件”场景:周期性执行任务(数据清理、定时推送等)
public class ScheduledTaskPlugin extends Plugin {
private Timer timer;
@Override protected void initPlugin(BrokerContext brokerContext) throws Throwable { log("正在初始化定时任务插件...");
// 使用插件自带的定时器 timer = timer();
// 每秒执行一次 timer.scheduleAtFixedRate(() -> { try { // 定时任务逻辑 log("执行定时任务...");
// 可以向特定主题推送消息 brokerContext.getOrCreateTopic("system/heartbeat") .publish("heartbeat".getBytes(), MqttQoS.AT_MOST_ONCE, false); } catch (Exception e) { log("定时任务执行异常:" + e.getMessage()); } }, 0, 1000); // 每 1000ms 执行一次
log("定时任务插件初始化完成"); }
@Override protected void destroyPlugin() { log("正在关闭定时任务插件..."); // 定时器会在 Plugin.uninstall() 中自动关闭 log("定时任务插件已关闭"); }
// ... 其他必要方法}5.6 企业级插件
Section titled “5.6 企业级插件”场景:提供完整的企业级功能(数据库持久化、监控指标、OpenAI 集成等)
参考实际插件:EnterprisePlugin - 企业级功能集成
六、插件开发最佳实践
Section titled “六、插件开发最佳实践”6.1 职责单一原则
Section titled “6.1 职责单一原则”✅ 正确示例:每个插件只负责一个明确的功能
❌ 错误:LargePlugin(什么都做)├─ 认证逻辑├─ 消息桥接├─ 监控统计└─ 定时任务
✅ 正确:拆分为多个插件├─ AuthPlugin(认证)├─ BridgePlugin(消息桥接)├─ MetricsPlugin(监控)└─ SchedulerPlugin(定时任务)6.2 异常处理规范
Section titled “6.2 异常处理规范”AI 必须遵循的规则:
-
插件内部异常不能影响 Broker 运行
subscribe(EventType.CONNECT, event -> {try {// 业务逻辑} catch (Exception e) {log("处理异常:" + e.getMessage());// 不要抛出异常}}); -
异步操作的异常处理
CompletableFuture.supplyAsync(() -> {// 异步逻辑}).exceptionally(throwable -> {log("异步任务异常:" + throwable.getMessage());return null;}); -
资源初始化失败的降级处理
try {resource = initResource();} catch (Exception e) {log("资源初始化失败,使用默认配置:" + e.getMessage());resource = createDefaultResource();}
6.3 资源管理
Section titled “6.3 资源管理”必须遵守的规则:
-
谁创建,谁销毁
private ScheduledExecutorService scheduler;@Overrideprotected void initPlugin() {scheduler = Executors.newScheduledThreadPool(4);}@Overrideprotected void destroyPlugin() {if (scheduler != null) {scheduler.shutdown(); // 必须关闭}} -
使用插件提供的定时器
// 推荐使用 plugin.timer()Timer timer = timer();timer.schedule(task, delay);// 插件卸载时会自动关闭 -
外部资源及时释放
// 数据库连接、HTTP 客户端、文件流等@Overrideprotected void destroyPlugin() {if (connection != null) connection.close();if (httpClient != null) httpClient.close();if (inputStream != null) inputStream.close();}
6.4 日志规范
Section titled “6.4 日志规范”AI 必须使用规范的日志方式:
-
使用插件提供的 log() 方法
log("插件初始化中...");log("配置加载成功");log("警告:XXX 可能存在问题", LogLevel.WARN); -
避免直接打印
// ❌ 错误System.out.println("消息");System.err.println("错误");// ✅ 正确log("消息"); -
日志内容规范
// 包含关键信息log("认证成功:userId=" + userId + ", ip=" + ipAddress);// 避免敏感信息// ❌ 不要打印完整密码log("密码:" + password);// ✅ 打印脱敏信息log("密码验证:" + (password != null ? "***" : "null"));
6.5 配置外部化
Section titled “6.5 配置外部化”可变参数必须提取到配置文件:
// ❌ 硬编码String host = "127.0.0.1";int port = 1883;
// ✅ 从配置加载PluginConfig config = loadPluginConfig(PluginConfig.class);String host = config.getHost();int port = config.getPort();配置文件结构:
host: 192.168.1.100port: 1883timeout: 30000credentials: username: admin password: encrypted_password6.6 版本管理
Section titled “6.6 版本管理”合理实现 getVersion() 方法:
@Overridepublic String getVersion() { // 方式 1: 使用常量 return "1.0.0";
// 方式 2: 使用 Broker 版本 return Options.VERSION;
// 方式 3: 从 MANIFEST.MF 读取 return getClass().getPackage().getImplementationVersion();}6.7 插件优先级
Section titled “6.7 插件优先级”通过 order() 方法控制加载顺序:
@Overridepublic int order() { // 数值越小,优先级越高
// 基础插件优先加载 if (this instanceof AuthPlugin) { return -100; // 高优先级 }
// 依赖其他插件的后加载 if (this instanceof BusinessPlugin) { return 100; // 低优先级 }
return 0; // 默认}常见优先级规划:
-100 ~ -1: 基础插件(认证、会话管理等)0 ~ 99: 核心功能插件(消息处理、协议转换等)100 ~ 199: 辅助功能插件(监控、日志等)200+: 业务插件(定制功能)七、AI 代码生成检查清单
Section titled “七、AI 代码生成检查清单”AI 在生成插件代码时,必须逐项检查以下内容:
7.1 结构检查
Section titled “7.1 结构检查”- 是否创建了正确的包路径:
tech.smartboot.mqtt.{module} - 是否创建了 ServiceLoader 配置文件:
META-INF/services/tech.smartboot.mqtt.plugin.spec.Plugin - ServiceLoader 文件内容是否为插件类的全限定名
- 是否创建了
readme.md文件 - pom.xml 是否配置了
maven-shade-plugin和AppendingTransformer
7.2 代码规范检查
Section titled “7.2 代码规范检查”- 插件类是否继承了
Plugin抽象类 - 是否实现了
getVersion()方法 - 是否实现了
getVendor()方法 - 是否重写了
initPlugin()方法 - 是否重写了
destroyPlugin()方法 - 是否没有重写
install()和uninstall()方法 - 是否使用
log()方法而非System.out
7.3 配置检查
Section titled “7.3 配置检查”- 是否需要配置类(PluginConfig)
- 配置类是否有 getter/setter
- 是否实现了
schema()方法(如需可视化配置) - Schema 中的字段是否与配置类对应
- 是否使用了
.col()控制布局
7.4 事件使用检查
Section titled “7.4 事件使用检查”- 事件类型选择是否正确
- 异步事件是否使用了
AsyncEventObject - 事件消费者是否实现了
enable()检查 - 事件处理中是否有异常捕获
- 是否在 destroyPlugin() 中清理事件相关资源
7.5 资源管理检查
Section titled “7.5 资源管理检查”- 创建的线程池是否在 destroyPlugin() 中关闭
- 打开的连接是否在 destroyPlugin() 中关闭
- 是否优先使用插件提供的
timer() - 是否有资源泄漏风险
7.6 异常处理检查
Section titled “7.6 异常处理检查”- 事件处理中是否捕获了所有异常
- 异步操作是否有
exceptionally()处理 - 初始化失败是否有降级方案
- 是否没有将异常抛给 Broker
7.7 部署检查
Section titled “7.7 部署检查”- 打包后 jar 文件名是否规范
- 是否说明了部署步骤
- 是否说明了配置方法
- 是否说明了端口占用情况(使用
addUsagePort())
八、完整示例:HTTP 认证插件
Section titled “八、完整示例:HTTP 认证插件”以下是完整的插件示例,AI 可以参考此模板生成类似的插件:
8.1 插件主类
Section titled “8.1 插件主类”package tech.smartboot.mqtt.auth.http;
import tech.smartboot.mqtt.common.enums.MqttConnectReturnCode;import tech.smartboot.mqtt.common.message.MqttConnectMessage;import tech.smartboot.mqtt.plugin.spec.BrokerContext;import tech.smartboot.mqtt.plugin.spec.MqttSession;import tech.smartboot.mqtt.plugin.spec.Options;import tech.smartboot.mqtt.plugin.spec.Plugin;import tech.smartboot.mqtt.plugin.spec.bus.AsyncEventObject;import tech.smartboot.mqtt.plugin.spec.bus.EventType;import tech.smartboot.mqtt.plugin.spec.schema.Enum;import tech.smartboot.mqtt.plugin.spec.schema.Item;import tech.smartboot.mqtt.plugin.spec.schema.Schema;
import java.io.BufferedReader;import java.io.InputStreamReader;import java.io.OutputStream;import java.net.HttpURLConnection;import java.net.URL;import java.nio.charset.StandardCharsets;
/** * HTTP 认证插件 * * 功能:通过 HTTP 接口验证客户端用户名密码 * * @author SmartBoot Team * @version 1.0.0 */public class HttpAuthPlugin extends Plugin {
private PluginConfig config; private BrokerContext brokerContext;
@Override protected void initPlugin(BrokerContext brokerContext) throws Throwable { this.brokerContext = brokerContext;
log("=============================================="); log("正在初始化 HTTP 认证插件...");
// 加载配置 config = loadPluginConfig(PluginConfig.class); if (config == null) { config = createDefaultConfig(); log("配置文件不存在,使用默认配置"); }
log("认证服务器地址:" + config.getAuthUrl()); log("超时时间:" + config.getTimeout() + "ms"); log("允许匿名:" + config.isAllowAnonymous());
// 订阅 CONNECT 事件 subscribe(EventType.CONNECT, AsyncEventObject.syncSubscriber((eventType, event) -> { MqttSession session = event.getSession();
// 如果已经断开,直接返回 if (session.isDisconnect()) { return; }
MqttConnectMessage message = event.getObject(); String username = message.getPayload().userName(); byte[] password = message.getPayload().passwordInBytes();
// 检查是否匿名访问 if (isAnonymous(username, password)) { if (config.isAllowAnonymous()) { session.setAuthorized(true); log("匿名访问已授权:" + session.getClientId()); return; } else { log("匿名访问被拒绝:" + session.getClientId()); MqttSession.connFailAck(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED, session); return; } }
// 执行 HTTP 认证 try { boolean authenticated = authenticateViaHttp(username, new String(password, StandardCharsets.UTF_8));
if (authenticated) { session.setAuthorized(true); log("认证成功:" + username); } else { log("认证失败:" + username); MqttSession.connFailAck(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED, session); } } catch (Exception e) { log("HTTP 认证异常:" + e.getMessage()); // 认证服务不可用,拒绝连接 MqttSession.connFailAck(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, session); } }));
log("=============================================="); log("HTTP 认证插件初始化完成"); log("=============================================="); }
@Override protected void destroyPlugin() { log("正在关闭 HTTP 认证插件..."); log("HTTP 认证插件已关闭"); }
@Override public String getVersion() { return "1.0.0"; }
@Override public String getVendor() { return Options.VENDOR; }
@Override public String pluginName() { return "http-auth-plugin"; }
@Override public Schema schema() { Schema schema = new Schema();
// 认证服务器地址 schema.addItem(Item.String("authUrl", "认证服务器 URL") .tip("例如:http://localhost:8080/api/auth") .col(12));
// 超时时间 schema.addItem(Item.Int("timeout", "HTTP 请求超时 (毫秒)") .tip("默认:5000") .col(6));
// 允许匿名 schema.addItem(Item.Switch("allowAnonymous", "允许匿名访问") .tip("开启后,不提供用户名密码也能连接") .col(6));
// 请求模板 schema.addItem(Item.TextArea("requestTemplate", "请求体模板") .tip("支持{username}和{password}占位符") .height(200) .col(12));
return schema; }
/** * 创建默认配置 */ private PluginConfig createDefaultConfig() { PluginConfig config = new PluginConfig(); config.setAuthUrl("http://localhost:8080/api/auth"); config.setTimeout(5000); config.setAllowAnonymous(false); config.setRequestTemplate("{\"username\":\"{username}\",\"password\":\"{password}\"}"); return config; }
/** * 通过 HTTP 接口认证 */ private boolean authenticateViaHttp(String username, String password) throws Exception { URL url = new URL(config.getAuthUrl()); HttpURLConnection conn = (HttpURLConnection) url.openConnection();
try { conn.setConnectTimeout(config.getTimeout()); conn.setReadTimeout(config.getTimeout()); conn.setRequestMethod("POST"); conn.setRequestProperty("Content-Type", "application/json"); conn.setDoOutput(true);
// 构建请求体 String requestBody = config.getRequestTemplate() .replace("{username}", username) .replace("{password}", password);
// 发送请求 try (OutputStream os = conn.getOutputStream()) { os.write(requestBody.getBytes(StandardCharsets.UTF_8)); os.flush(); }
// 读取响应 int responseCode = conn.getResponseCode(); if (responseCode == 200) { return true; } else { log("HTTP 认证返回错误码:" + responseCode); return false; } } finally { conn.disconnect(); } }
/** * 检查是否为匿名连接 */ private boolean isAnonymous(String username, byte[] password) { return username == null || username.isEmpty() || password == null || password.length == 0; }}8.2 配置类
Section titled “8.2 配置类”package tech.smartboot.mqtt.auth.http;
/** * HTTP 认证插件配置类 */public class PluginConfig {
/** * 认证服务器 URL */ private String authUrl = "http://localhost:8080/api/auth";
/** * HTTP 请求超时(毫秒) */ private int timeout = 5000;
/** * 是否允许匿名访问 */ private boolean allowAnonymous = false;
/** * 请求体模板,支持{username}和{password}占位符 */ private String requestTemplate = "{\"username\":\"{username}\",\"password\":\"{password}\"}";
// getter 和 setter
public String getAuthUrl() { return authUrl; }
public void setAuthUrl(String authUrl) { this.authUrl = authUrl; }
public int getTimeout() { return timeout; }
public void setTimeout(int timeout) { this.timeout = timeout; }
public boolean isAllowAnonymous() { return allowAnonymous; }
public void setAllowAnonymous(boolean allowAnonymous) { this.allowAnonymous = allowAnonymous; }
public String getRequestTemplate() { return requestTemplate; }
public void setRequestTemplate(String requestTemplate) { this.requestTemplate = requestTemplate; }}8.3 配置文件示例
Section titled “8.3 配置文件示例”authUrl: http://192.168.1.100:8080/api/mqtt/authtimeout: 10000allowAnonymous: falserequestTemplate: | { "username": "{username}", "password": "{password}", "clientId": "{clientId}", "timestamp": ${timestamp} }8.4 ServiceLoader 配置
Section titled “8.4 ServiceLoader 配置”tech.smartboot.mqtt.auth.http.HttpAuthPlugin8.5 README 示例
Section titled “8.5 README 示例”# HTTP 认证插件
## 功能说明
本插件通过 HTTP 接口验证 MQTT 客户端的用户名和密码,实现自定义认证逻辑。
## 配置参数
| 参数 | 类型 | 说明 | 默认值 ||------|------|------|--------|| authUrl | String | 认证服务器 URL | http://localhost:8080/api/auth || timeout | int | HTTP 请求超时 (ms) | 5000 || allowAnonymous | boolean | 是否允许匿名访问 | false || requestTemplate | String | 请求体模板 | `{"username":"{username}",...}` |
## 部署步骤
1. 将编译后的 jar 文件放入 `plugins` 目录2. 在 `plugins/http-auth-plugin/` 目录创建 `plugin.yaml` 配置文件3. 重启 smart-mqtt 服务
## 认证接口要求
- 请求方法:POST- Content-Type: application/json- 请求体:由 requestTemplate 配置- 响应码:200 表示认证成功,其他表示失败
## 示例
认证请求:```jsonPOST http://localhost:8080/api/auth{ "username": "admin", "password": "123456"}
响应:200 OK // 认证成功响应:401 Unauthorized // 认证失败---
## 九、常见问题解答(FAQ)
### Q1: 插件无法被加载怎么办?
**检查清单:**1. ✅ 确认 `META-INF/services/tech.smartboot.mqtt.plugin.spec.Plugin` 文件存在2. ✅ 确认文件内容为插件类的全限定名3. ✅ 确认 jar 文件放在 `plugins` 目录4. ✅ 查看启动日志是否有 "load plugin: xxx" 输出
### Q2: 插件初始化失败如何处理?
**调试步骤:**1. 查看日志输出,定位失败位置2. 检查配置文件是否正确3. 检查依赖的外部服务是否可用4. 在 `initPlugin()` 中添加更多日志
### Q3: 如何调试插件代码?
**方法 1:远程调试**```bash# 启动时添加调试参数java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -jar smart-mqtt.jar方法 2:增加日志
log("DEBUG: 执行到此处,config=" + config);Q4: 多个插件的执行顺序如何控制?
Section titled “Q4: 多个插件的执行顺序如何控制?”通过 order() 方法控制:
@Overridepublic int order() { return 100; // 值越小,优先级越高}Q5: 插件如何与外部系统通信?
Section titled “Q5: 插件如何与外部系统通信?”推荐方式:
- HTTP 客户端:OkHttp、HttpClient
- Redis:Jedis、Lettuce
- 数据库:JDBC、MyBatis
- 消息队列:Kafka、RabbitMQ 客户端
注意事项:
- 在
destroyPlugin()中关闭连接 - 设置合理的超时时间
- 处理网络异常
Q6: 如何实现热插拔?
Section titled “Q6: 如何实现热插拔?”目前 smart-mqtt 不支持运行时热插拔,需要重启服务。
变通方案:
- 在插件内部实现配置热更新
- 使用
BROKER_CONFIGURE_LOADED事件动态调整
Q7: 插件可以开启端口吗?
Section titled “Q7: 插件可以开启端口吗?”可以,需要使用 addUsagePort() 注册:
addUsagePort(8080, "HTTP 服务端口");这样可以在控制台显示插件使用的端口信息。
九、常用插件类型快速参考
Section titled “九、常用插件类型快速参考”| 插件类型 | 核心事件/扩展点 | 典型用途 | 参考插件 |
|---|---|---|---|
| 认证插件 | EventType.CONNECT | 用户名密码、Token、JWT、LDAP 认证 | SimpleAuthPlugin |
| 消息桥接插件 | EventType.RECEIVE_MESSAGE | 转发到 Redis、Kafka、HTTP 等 | RedisBridgePlugin |
| 监控插件 | EventType.CONNECT, RECEIVE_MESSAGE | 统计连接数、消息量、TPS | - |
| 协议转换插件 | 自定义 Server | WebSocket、HTTP、TLS 转 MQTT | WebSocketPlugin |
| 会话持久化插件 | SessionStateProvider | 将会话状态存储到数据库 | MemorySessionPlugin |
| 订阅管理插件 | SubscribeProvider | 自定义订阅关系管理 | - |
| 定时任务插件 | timer() | 定期推送、数据清理 | BenchPlugin |
| 集群插件 | BROKER_STARTED, 自定义通信 | 节点间数据同步 | ClusterPlugin |
| 企业级插件 | 多种组合 | 数据库、监控、AI 集成 | EnterprisePlugin |
选择指南:
- 需要认证?→ 使用
CONNECT事件 - 需要处理消息?→ 使用
RECEIVE_MESSAGE或WRITE_MESSAGE事件 - 需要持久化会话?→ 实现
SessionStateProvider - 需要自定义订阅?→ 实现
SubscribeProvider - 需要开启新端口?→ 创建 HTTP/TCP 服务器,使用
addUsagePort()注册 - 需要定时任务?→ 使用
timer()方法
本规范详细阐述了 smart-mqtt 插件开发的完整流程和技术细节,所有内容包括:
- ✅ 项目结构:正确的 Maven 配置、GroupId、父 POM 依赖
- ✅ ServiceLoader:两个配置文件的正确处理方式
- ✅ 插件基类:生命周期方法、事件订阅机制
- ✅ 配置管理:PluginConfig、YAML 文件、Schema 可视化
- ✅ 事件总线:15 种完整事件类型及其使用场景
- ✅ Provider 扩展:SessionStateProvider、SubscribeProvider
- ✅ 最佳实践:异常处理、资源管理、日志规范
- ✅ 实战模板:认证、桥接、监控、协议转换等常见插件类型
AI 开发指南
Section titled “AI 开发指南”AI 助手在生成插件代码时,应严格遵循本规范,确保生成的代码:
- ✅ 符合架构:遵循 smart-mqtt 的插件体系和设计规范
- ✅ 可编译运行:Maven 配置、依赖关系完全正确
- ✅ 可维护:代码结构清晰、职责单一、注释完善
- ✅ 健壮:正确处理异常、合理管理资源
- ✅ 可配置:提供 Schema 可视化和 YAML 配置支持
快速上手路径
Section titled “快速上手路径”新手推荐学习路径:
- 阅读 SimpleAuthPlugin - 理解基础认证流程
- 阅读 WebSocketPlugin - 学习如何开启端口和协议转换
- 参考本文档第 9 节「常用插件类型快速参考」- 选择合适的实现方式
- 遵循检查清单(第 8 章)- 确保代码质量
- 📚 smart-mqtt 官方文档
- 💻 GitHub 源码仓库
- 🔌 现有插件源码 - 学习实际实现
通过标准化、规范化的开发流程,可以大大提高插件开发效率和质量。