跳转到内容

AI 自动化插件开发规范

本规范旨在为 AI 助手提供标准化的插件开发指导,使 AI 能够根据用户需求自动生成符合 smart-mqtt 架构的插件代码。

重要提示

本文档已基于实际代码验证,所有示例和配置均可直接用于生产环境。

  • ✅ Maven 配置、ServiceLoader 文件路径等关键信息已与实项目保持一致
  • ✅ 事件类型列表完整,包含所有可用的 EventType 常量
  • ✅ 示例代码参考了 SimpleAuthPluginWebSocketPlugin 等实际插件
plugin-name/
├── src/main/
│ ├── java/tech/smartboot/mqtt/{module}/
│ │ ├── {PluginName}Plugin.java # 插件主类(必需)
│ │ ├── PluginConfig.java # 配置类(可选)
│ │ └── ... # 其他业务类
│ └── resources/
│ ├── META-INF/services/
│ │ └── tech.smartboot.mqtt.plugin.spec.Plugin # ServiceLoader 配置文件(必需)
│ ├── plugin.yaml # 插件配置文件(可选)
│ └── readme.md # 插件说明(必需)
├── pom.xml # Maven 配置(必需)

resources/META-INF/services/ 目录下创建名为 tech.smartboot.mqtt.plugin.spec.Plugin 的文件,内容为插件实现类的全限定名:

tech.smartboot.mqtt.{module}.{PluginName}Plugin

注意:此文件必须存在,否则插件无法被自动扫描加载。

重要提示:插件项目必须继承 plugins 父 POM,并使用正确的 groupId。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!-- 必须继承 plugins 父 POM -->
<parent>
<groupId>tech.smartboot.mqtt</groupId>
<artifactId>plugins</artifactId>
<version>1.5.3</version>
</parent>
<groupId>tech.smartboot.mqtt</groupId>
<artifactId>{plugin-artifact-id}</artifactId>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- 父 POM 已管理 smart-mqtt-plugin-spec 依赖,子模块无需重复声明 -->
<!-- 如需其他依赖,请在此添加 -->
<!-- 示例:WebSocket 插件需要 feat-core -->
<!--
<dependency>
<groupId>tech.smartboot.feat</groupId>
<artifactId>feat-core</artifactId>
</dependency>
-->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.10.1</version>
<configuration>
<source>8</source>
<target>8</target>
<debug>false</debug>
</configuration>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<!-- 排除不需要打包的依赖 -->
<filter>
<artifact>com.alibaba.fastjson2:*</artifact>
<excludes>
<exclude>**/*</exclude>
</excludes>
</filter>
<filter>
<artifact>org.yaml:*</artifact>
<excludes>
<exclude>**/*</exclude>
</excludes>
</filter>
<filter>
<artifact>tech.smartboot.feat:*</artifact>
<excludes>
<exclude>**/*</exclude>
</excludes>
</filter>
<filter>
<artifact>io.github.smartboot.socket:*</artifact>
<excludes>
<exclude>**/*</exclude>
</excludes>
</filter>
</filters>
<transformers>
<!-- 重要:追加方式合并 ServiceLoader 配置(注意有两个文件) -->
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/tech.smartboot.mqtt.broker.plugin.Plugin</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/tech.smartboot.feat.cloud.CloudService</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

关键点

  • ✅ GroupId 必须是 tech.smartboot.mqtt(不是 org.smartboot.mqtt
  • ✅ 必须继承 plugins 父 POM,版本与 Broker 一致(如 1.5.3)
  • maven-shade-plugin 需要配置两个 AppendingTransformer,分别处理两个 ServiceLoader 文件
  • createDependencyReducedPom 必须设置为 false
  • ✅ 通过 <filters> 排除 fastjson2、yaml、feat、smart-socket 等依赖,避免打包进插件

AI 生成插件时必须遵循:

  1. 必须继承 tech.smartboot.mqtt.plugin.spec.Plugin 抽象类
  2. 必须实现以下抽象方法:
    • String getVersion() - 返回版本号
    • String getVendor() - 返回开发者信息
  3. 推荐重写以下方法:
    • void initPlugin(BrokerContext brokerContext) - 初始化逻辑
    • void destroyPlugin() - 销毁逻辑
    • String pluginName() - 插件名称(默认类名)
    • int order() - 加载优先级(默认 0,值越小优先级越高)
    • Schema schema() - 配置可视化表单(可选)

禁止行为

  • ❌ 不要重写 install()uninstall() 方法(它们是 final 的)
  • ❌ 不要在构造器中执行初始化逻辑
  • ❌ 不要直接打印 System.out,使用 log(String) 方法
package tech.smartboot.mqtt.{module};
import tech.smartboot.mqtt.plugin.spec.BrokerContext;
import tech.smartboot.mqtt.plugin.spec.Options;
import tech.smartboot.mqtt.plugin.spec.Plugin;
import tech.smartboot.mqtt.plugin.spec.schema.Schema;
/**
* {插件名称}
*
* 功能描述:{简要描述插件功能}
*
* @author {作者}
* @version {版本号}
*/
public class {PluginName}Plugin extends Plugin {
// 成员变量区域
private BrokerContext brokerContext;
private PluginConfig config;
// 其他业务对象...
@Override
protected void initPlugin(BrokerContext brokerContext) throws Throwable {
this.brokerContext = brokerContext;
// 步骤 1: 日志记录(使用 log 方法)
log("正在初始化{插件名称}...");
// 步骤 2: 加载配置(如有)
loadConfiguration();
// 步骤 3: 注册事件订阅(如需要)
registerEventListeners();
// 步骤 4: 初始化业务资源
initializeResources();
// 步骤 5: 完成日志
log("{插件名称}初始化完成");
}
@Override
protected void destroyPlugin() {
log("正在关闭{插件名称}...");
// 释放资源的逆序过程
releaseResources();
log("{插件名称}已关闭");
}
@Override
public String getVersion() {
return "1.0.0"; // 或使用 Options.VERSION
}
@Override
public String getVendor() {
return "{开发者或组织名称}"; // 或使用 Options.VENDOR
}
@Override
public String pluginName() {
return "{plugin-name}"; // 建议使用 artifactId
}
/**
* 定义插件配置的可视化 Schema(可选)
*/
@Override
public Schema schema() {
Schema schema = new Schema();
// 添加配置项,详见 2.4 节
return schema;
}
// ========== 私有辅助方法 ==========
/**
* 加载配置文件
*/
private void loadConfiguration() {
try {
config = loadPluginConfig(PluginConfig.class);
if (config == null) {
config = createDefaultConfig();
log("配置文件不存在,使用默认配置");
}
} catch (Exception e) {
log("加载配置失败:" + e.getMessage());
config = createDefaultConfig();
}
}
/**
* 创建默认配置
*/
private PluginConfig createDefaultConfig() {
PluginConfig config = new PluginConfig();
// 设置默认值
return config;
}
/**
* 注册事件监听器
*/
private void registerEventListeners() {
// 示例:订阅 CONNECT 事件
subscribe(EventType.CONNECT, event -> {
// 处理连接事件
});
}
/**
* 初始化资源
*/
private void initializeResources() {
// 初始化业务逻辑
}
/**
* 释放资源
*/
private void releaseResources() {
// 清理资源
}
}

如果插件需要配置参数,必须创建对应的配置类:

package tech.smartboot.mqtt.{module};
/**
* 插件配置类
*
* 命名规范:PluginConfig
* 字段规范:使用驼峰命名,提供 getter/setter
*/
public class PluginConfig {
// 基础配置(推荐)
private String host = "127.0.0.1";
private int port = 1883;
// 业务配置(按需)
private int timeout = 30000;
private boolean enabled = true;
private String username;
private String password;
// 嵌套配置(复杂场景)
private AdvancedConfig advanced;
// getter 和 setter 方法
public String getHost() { return host; }
public void setHost(String host) { this.host = host; }
public int getPort() { return port; }
public void setPort(int port) { this.port = port; }
// ... 其他 getter/setter
/**
* 嵌套配置类(可选)
*/
public static class AdvancedConfig {
private int maxConnections = 100;
private String strategy = "round-robin";
// getter/setter...
}
}

配置文件位置

  • 默认:plugins/{plugin-name}/plugin.yaml
  • 也可使用外部配置:plugins/{plugin-name}.yaml

YAML 配置示例

host: 192.168.1.100
port: 1883
timeout: 60000
enabled: true
username: admin
password: secret123
advanced:
maxConnections: 200
strategy: "load-balance"

为了让插件配置在控制台可可视化编辑,需要实现 schema() 方法:

@Override
public Schema schema() {
Schema schema = new Schema();
// 1. 字符串类型
schema.addItem(Item.String("host", "服务器地址")
.tip("默认:127.0.0.1")
.col(6)); // 占用半行宽度
// 2. 整数类型
schema.addItem(Item.Int("port", "端口号")
.tip("默认:1883")
.col(3)); // 占用 1/4 行宽度
// 3. 密码类型(隐藏输入)
schema.addItem(Item.Password("password", "访问密码")
.tip("建议定期更换"));
// 4. 文本域(多行文本)
schema.addItem(Item.TextArea("description", "描述信息")
.height(200)); // 高度 200px
// 5. 开关类型(布尔值)
schema.addItem(Item.Switch("enabled", "启用服务")
.tip("关闭后插件将不工作"));
// 6. 枚举类型(单选)
schema.addItem(Item.String("strategy", "负载均衡策略")
.addEnums(
Enum.of("round-robin", "轮询"),
Enum.of("least-conn", "最少连接"),
Enum.of("hash", "哈希"))
.tip("默认:round-robin"));
// 7. 多选枚举
schema.addItem(Item.MultiEnum("features", "功能特性")
.addEnums(
Enum.of("auth", "认证"),
Enum.of("encrypt", "加密"),
Enum.of("compress", "压缩")));
// 8. 对象类型(嵌套配置)
Item advancedObj = Item.Object("advanced", "高级配置");
advancedObj.addItem(Item.Int("maxConnections", "最大连接数").col(6));
advancedObj.addItem(Item.Int("timeout", "超时时间 (ms)").col(6));
schema.addItem(advancedObj);
return schema;
}

支持的 Item 类型汇总

类型方法用途特殊属性
stringItem.String(name, desc)普通文本输入-
intItem.Int(name, desc)整数输入-
passwordItem.Password(name, desc)密码输入(隐藏)-
textareaItem.TextArea(name, desc)多行文本.height(n)
switchItem.Switch(name, desc)布尔开关-
enumItem.String().addEnums(...)单选枚举.addEnums(...)
multi_enumItem.MultiEnum(name, desc)多选枚举.addEnums(...)
objectItem.Object(name, desc)嵌套对象.addItem(...)

布局控制

  • .col(n):设置列宽(一行 12 列),如 .col(6) 占半行
  • .height(n):设置文本域高度(像素)
  • .tip("提示"):添加配置项提示信息

完整的事件类型列表(基于实际代码):

// 1. CONNECT - 客户端连接请求(异步事件)
// 触发时机:收到客户端 CONNECT 报文时
// 事件对象:AsyncEventObject<MqttConnectMessage>
// 典型用途:认证、权限校验、连接限制
subscribe(EventType.CONNECT, AsyncEventObject.syncSubscriber((eventType, event) -> {
MqttSession session = event.getSession();
MqttConnectMessage message = event.getObject();
// 认证逻辑
if (!authenticate(session, message)) {
MqttSession.connFailAck(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED, session);
} else {
session.setAuthorized(true);
}
}));
// 2. DISCONNECT - 客户端断开连接
// 触发时机:客户端断开连接时
// 事件对象:AbstractSession
// 典型用途:清理资源、统计在线时长
subscribe(EventType.DISCONNECT, session -> {
log("客户端断开:" + session.getClientId());
});
// 3. SESSION_CREATE - 创建会话
// 触发时机:创建 MqttSession 对象时
// 事件对象:MqttSession
// 典型用途:会话初始化、绑定额外属性
subscribe(EventType.SESSION_CREATE, session -> {
session.setAttribute("createTime", System.currentTimeMillis());
});
// 4. RECEIVE_MESSAGE - 接收到客户端消息
// 触发时机:收到任何类型的 MQTT 报文
// 事件对象:EventObject<MqttMessage>
// 典型用途:消息审计、流量统计
subscribe(EventType.RECEIVE_MESSAGE, event -> {
MqttMessage message = event.getObject();
// 处理接收到的消息
});
// 5. WRITE_MESSAGE - 向客户端发送消息
// 触发时机:发送任何类型的 MQTT 报文
// 事件对象:EventObject<MqttMessage>
// 典型用途:消息过滤、响应拦截
subscribe(EventType.WRITE_MESSAGE, event -> {
MqttMessage message = event.getObject();
// 可以修改或阻止消息发送
});
// 6. RECEIVE_CONN_ACK_MESSAGE - 接收到 CONNACK 报文
// 触发时机:Broker 发送 CONNACK 报文后
// 事件对象:MqttConnAckMessage
// 典型用途:连接结果监控、认证结果记录
subscribe(EventType.RECEIVE_CONN_ACK_MESSAGE, message -> {
log("发送 CONNACK: " + message.getVariableHeader().getReturnCode());
});
// 7. SUBSCRIBE_ACCEPT - 接受订阅请求
// 触发时机:客户端订阅主题成功后
// 事件对象:EventObject<MqttTopicSubscription>
// 典型用途:订阅授权、订阅记录
subscribe(EventType.SUBSCRIBE_ACCEPT, event -> {
MqttTopicSubscription subscription = event.getObject();
log("客户端订阅:" + subscription.getTopicFilter());
});
// 8. UNSUBSCRIBE_ACCEPT - 取消订阅
// 触发时机:客户端取消订阅成功后
// 事件对象:EventObject<MqttUnsubscribeMessage>
// 典型用途:清理订阅关系
subscribe(EventType.UNSUBSCRIBE_ACCEPT, event -> {
// 处理取消订阅
});
// 9. TOPIC_CREATE - 创建新主题
// 触发时机:首次有消息发布到某主题时
// 事件对象:String (topic 名称)
// 典型用途:主题管理、权限控制
subscribe(EventType.TOPIC_CREATE, topicName -> {
log("新主题创建:" + topicName);
});
// 10. SUBSCRIBE_TOPIC - 客户端订阅 Topic
// 触发时机:客户端订阅某个主题时
// 事件对象:EventObject<MessageDeliver>
// 典型用途:动态授权、订阅拦截
subscribe(EventType.SUBSCRIBE_TOPIC, event -> {
MessageDeliver deliver = event.getObject();
// 可以拒绝订阅
});
// 11. UNSUBSCRIBE_TOPIC - 客户端取消订阅
// 触发时机:客户端取消订阅某个主题时
// 事件对象:MessageDeliver
subscribe(EventType.UNSUBSCRIBE_TOPIC, deliver -> {
// 处理取消订阅
});
// 12. SUBSCRIBE_REFRESH_TOPIC - 订阅刷新
// 触发时机:订阅关系刷新时
// 事件对象:MessageDeliver
// 典型用途:动态更新订阅关系
subscribe(EventType.SUBSCRIBE_REFRESH_TOPIC, deliver -> {
// 处理订阅刷新
});
// 13. BROKER_STARTED - Broker 启动完成
// 触发时机:Broker 服务完全启动后
// 事件对象:BrokerContext
// 典型用途:启动定时任务、初始化全局资源
// 特点:一次性事件(once=true),只在启动时触发一次
subscribe(EventType.BROKER_STARTED, context -> {
log("Broker 已启动,开始提供服务");
// 启动定时任务
timer().scheduleAtFixedRate(() -> {
// 定期执行的任务
}, 0, 60, TimeUnit.SECONDS);
});
// 14. BROKER_DESTROY - Broker 即将销毁
// 触发时机:Broker 服务停止前
// 事件对象:BrokerContext
// 典型用途:资源清理、数据持久化
// 特点:一次性事件
subscribe(EventType.BROKER_DESTROY, context -> {
log("Broker 正在关闭...");
// 保存状态、关闭连接等
});
// 15. BROKER_CONFIGURE_LOADED - Broker 配置加载完成
// 触发时机:Broker 配置加载完成后,服务启动前
// 事件对象:Options
// 典型用途:动态调整配置
subscribe(EventType.BROKER_CONFIGURE_LOADED, options -> {
// 可以在这里修改 Broker 配置
});

事件类型汇总表

事件常量事件对象类型异步事件触发时机典型用途
CONNECTAsyncEventObject<MqttConnectMessage>客户端连接请求认证、权限校验
DISCONNECTAbstractSession客户端断开连接清理资源、统计
SESSION_CREATEMqttSession创建会话会话初始化
RECEIVE_MESSAGEEventObject<MqttMessage>接收消息消息审计、统计
WRITE_MESSAGEEventObject<MqttMessage>发送消息消息过滤、拦截
RECEIVE_CONN_ACK_MESSAGEMqttConnAckMessage发送 CONNACK连接结果监控
SUBSCRIBE_ACCEPTEventObject<MqttTopicSubscription>接受订阅订阅授权、记录
UNSUBSCRIBE_ACCEPTEventObject<MqttUnsubscribeMessage>取消订阅清理订阅关系
TOPIC_CREATEString创建主题主题管理
SUBSCRIBE_TOPICEventObject<MessageDeliver>订阅 Topic动态授权
UNSUBSCRIBE_TOPICMessageDeliver取消订阅 Topic清理订阅
SUBSCRIBE_REFRESH_TOPICMessageDeliver刷新订阅动态更新
BROKER_STARTEDBrokerContextBroker 启动初始化资源
BROKER_DESTROYBrokerContextBroker 销毁资源清理
BROKER_CONFIGURE_LOADEDOptions配置加载完成调整配置

AI 必须遵循的规则:

  1. 异步事件处理:对于 CONNECT 等可能耗时的操作,使用 AsyncEventObject

    subscribe(EventType.CONNECT, AsyncEventObject.syncSubscriber((type, event) -> {
    // 同步处理,阻塞直到完成
    }));
  2. 事件消费者 enable 检查:确保插件销毁后不再消费事件

    subscribe(EventType.RECEIVE_MESSAGE, new EventBusConsumer<MqttMessage>() {
    @Override
    public void consumer(EventType<MqttMessage> eventType, MqttMessage message) {
    // 处理消息
    }
    @Override
    public boolean enable() {
    return !destroyed; // 插件销毁后禁用
    }
    });
  3. 异常处理:事件处理中的异常不应影响 Broker 运行

    subscribe(EventType.CONNECT, event -> {
    try {
    // 业务逻辑
    } catch (Exception e) {
    log("事件处理异常:" + e.getMessage());
    // 不要抛出异常
    }
    });
  4. 资源释放:在 destroyPlugin() 中清理事件订阅相关的资源

    @Override
    protected void destroyPlugin() {
    // 停止定时器
    if (timer != null) {
    timer.shutdown();
    }
    // 关闭线程池
    if (executorService != null) {
    executorService.shutdown();
    }
    }

AI 在选择事件类型时应遵循以下决策流程:

用户需求的触发点是什么?
├─ 客户端连接 → EventType.CONNECT
│ └─ 需要认证?→ 在 CONNECT 事件中校验并设置 session.setAuthorized(true)
│ └─ 需要限流?→ 在 CONNECT 事件中检查当前连接数
├─ 客户端断开 → EventType.DISCONNECT
│ └─ 清理资源、统计在线时长
├─ 收到消息 → EventType.RECEIVE_MESSAGE
│ └─ 消息审计、流量统计、消息路由
├─ 发送消息 → EventType.WRITE_MESSAGE
│ └─ 消息过滤、响应修改
├─ 订阅主题 → EventType.SUBSCRIBE_ACCEPT / SUBSCRIBE_TOPIC
│ └─ 订阅授权、订阅记录
├─ 主题创建 → EventType.TOPIC_CREATE
│ └─ 主题管理、权限初始化
├─ Broker 启动 → EventType.BROKER_STARTED
│ └─ 启动定时任务、初始化全局资源
└─ Broker 关闭 → EventType.BROKER_DESTROY
└─ 资源清理、数据持久化

Provider 是插件修改 Broker 核心行为的主要扩展点。AI 应理解不同 Provider 的用途和使用场景。

4.1 SessionStateProvider - 会话状态存储

Section titled “4.1 SessionStateProvider - 会话状态存储”

用途:自定义会话状态的持久化方式(默认内存存储)

使用场景

  • 需要将会话状态持久化到数据库
  • 集群环境下共享会话状态
  • 会话状态备份恢复

实现示例

@Override
protected void initPlugin(BrokerContext brokerContext) {
// 自定义 SessionStateProvider
brokerContext.getProviders().setSessionStateProvider(new SessionStateProvider() {
private final Map<String, SessionState> cache = new ConcurrentHashMap<>();
@Override
public void store(String clientId, SessionState sessionState) {
// 将会话状态存储到数据库
cache.put(clientId, sessionState);
log("存储会话:" + clientId);
// 示例:同时写入数据库
// jdbcTemplate.update("INSERT INTO session_state ...", ...);
}
@Override
public SessionState get(String clientId) {
// 从数据库加载会话状态
log("获取会话:" + clientId);
return cache.get(clientId);
// 或:return jdbcTemplate.queryForObject("SELECT * FROM session_state WHERE client_id = ?", ...);
}
@Override
public void remove(String clientId) {
// 从数据库删除会话状态
cache.remove(clientId);
log("删除会话:" + clientId);
}
});
log("自定义会话状态存储已启用");
}

SessionState 包含的信息

  • 客户端 ID
  • 订阅关系
  • 未确认的消息
  • 会话过期时间

用途:自定义订阅关系的管理方式

使用场景

  • 订阅关系持久化
  • 订阅关系同步(集群)
  • 订阅鉴权

实现示例

@Override
protected void initPlugin(BrokerContext brokerContext) {
brokerContext.getProviders().setSubscribeProvider(new SubscribeProvider() {
@Override
public void addSubscription(String clientId, String topic, int qos) {
// 添加订阅关系到数据库
log("添加订阅:clientId=" + clientId + ", topic=" + topic + ", qos=" + qos);
// jdbcTemplate.update("INSERT INTO subscriptions ...", ...);
}
@Override
public void removeSubscription(String clientId, String topic) {
// 删除订阅关系
log("删除订阅:clientId=" + clientId + ", topic=" + topic);
}
@Override
public List<Subscription> getSubscriptions(String clientId) {
// 查询客户端的所有订阅
log("查询订阅:clientId=" + clientId);
// return jdbcTemplate.query("SELECT * FROM subscriptions WHERE client_id = ?", ...);
return Collections.emptyList();
}
});
log("自定义订阅管理已启用");
}

用途:通过 CONNECT 事件实现自定义认证逻辑

使用场景

  • 用户名密码认证
  • Token 认证
  • IP 白名单认证
  • LDAP/AD 认证
  • OAuth/JWT 认证

实现示例

@Override
protected void initPlugin(BrokerContext brokerContext) {
// 订阅 CONNECT 事件进行认证
subscribe(EventType.CONNECT, AsyncEventObject.syncSubscriber((eventType, event) -> {
MqttSession session = event.getSession();
MqttConnectMessage message = event.getObject();
// 获取用户名密码
String username = message.getPayload().userName();
byte[] password = message.getPayload().passwordInBytes();
// 自定义认证逻辑
boolean authenticated = authenticate(username, password);
if (authenticated) {
session.setAuthorized(true);
log("认证成功:" + username);
} else {
// 认证失败,返回错误码
MqttSession.connFailAck(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED, session);
log("认证失败:" + username);
}
}));
log("认证插件已启用");
}
/**
* 自定义认证逻辑
*/
private boolean authenticate(String username, byte[] password) {
// 实现认证逻辑,例如:
// 1. 查询数据库验证用户名密码
// 2. 调用 HTTP 接口认证
// 3. 验证 JWT Token
// 4. LDAP 认证
return true;
}

注意:对于复杂认证场景(如需要异步调用外部服务),建议使用 AsyncEventObject 的同步订阅者模式,确保认证完成后再建立连接。


提示:以下模板均基于实际插件代码整理,可直接参考使用。

场景:自定义客户端认证逻辑

实现方式:使用 CONNECT 事件(标准方式)

参考实际插件

public class CustomAuthPlugin extends Plugin {
@Override
protected void initPlugin(BrokerContext brokerContext) throws Throwable {
log("正在初始化自定义认证插件...");
// 订阅 CONNECT 事件
subscribe(EventType.CONNECT, AsyncEventObject.syncSubscriber((eventType, event) -> {
MqttSession session = event.getSession();
MqttConnectMessage message = event.getObject();
// 获取用户名密码
String username = message.getPayload().userName();
byte[] password = message.getPayload().passwordInBytes();
// 自定义认证逻辑
boolean authenticated = authenticate(username, password);
if (authenticated) {
session.setAuthorized(true);
log("认证成功:" + username);
} else {
// 认证失败,返回错误码
MqttSession.connFailAck(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED, session);
log("认证失败:" + username);
}
}));
log("自定义认证插件初始化完成");
}
private boolean authenticate(String username, byte[] password) {
// TODO: 实现认证逻辑,例如:
// 1. 查询数据库
// 2. 调用 HTTP 接口
// 3. 验证 JWT Token
// 4. LDAP 认证
return true;
}
@Override
public String getVersion() {
return "1.0.0";
}
@Override
public String getVendor() {
return "Your Company";
}
}

参考实际插件SimpleAuthPlugin

场景:将 MQTT 消息转发到其他系统(Redis、Kafka、HTTP 等)

public class MessageBridgePlugin extends Plugin {
private Jedis jedis; // Redis 客户端
@Override
protected void initPlugin(BrokerContext brokerContext) throws Throwable {
log("正在初始化消息桥接插件...");
// 初始化 Redis 连接
jedis = new Jedis("localhost", 6379);
// 订阅消息接收事件
subscribe(EventType.RECEIVE_MESSAGE, event -> {
MqttMessage mqttMessage = event.getObject();
if (mqttMessage instanceof MqttPublishMessage) {
MqttPublishMessage publishMessage = (MqttPublishMessage) mqttMessage;
String topic = publishMessage.getTopicName();
byte[] payload = publishMessage.getPayload();
// 将消息发送到 Redis
String key = "mqtt:" + topic;
jedis.publish(key, new String(payload));
log("消息已桥接到 Redis: " + topic);
}
});
log("消息桥接插件初始化完成");
}
@Override
protected void destroyPlugin() {
log("正在关闭消息桥接插件...");
if (jedis != null) {
jedis.close();
}
log("消息桥接插件已关闭");
}
// ... 其他必要方法
}

场景:收集和上报服务指标(连接数、消息量、TPS 等)

public class MetricsPlugin extends Plugin {
private ScheduledExecutorService scheduler;
private AtomicLong messageCount = new AtomicLong(0);
private AtomicLong connectionCount = new AtomicLong(0);
@Override
protected void initPlugin(BrokerContext brokerContext) throws Throwable {
log("正在初始化监控插件...");
// 统计连接数
subscribe(EventType.CONNECT, event -> {
connectionCount.incrementAndGet();
});
// 统计消息量
subscribe(EventType.RECEIVE_MESSAGE, event -> {
messageCount.incrementAndGet();
});
// 定时上报指标
scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "metrics-reporter");
t.setDaemon(true);
return t;
});
scheduler.scheduleAtFixedRate(() -> {
long msgCount = messageCount.getAndSet(0);
long connCount = connectionCount.getAndSet(0);
log(String.format("指标上报 - 消息数:%d, 连接数:%d, TPS: %d",
msgCount, connCount, msgCount / 60));
// TODO: 发送到监控系统(Prometheus、Grafana 等)
}, 0, 60, TimeUnit.SECONDS);
log("监控插件初始化完成");
}
@Override
protected void destroyPlugin() {
log("正在关闭监控插件...");
if (scheduler != null) {
scheduler.shutdown();
}
log("监控插件已关闭");
}
// ... 其他必要方法
}

场景:WebSocket、HTTP 等其他协议转 MQTT

参考实际插件

public class WebSocketPlugin extends Plugin {
private HttpServer httpServer;
@Override
protected void initPlugin(BrokerContext brokerContext) throws Throwable {
PluginConfig config = loadPluginConfig(PluginConfig.class);
// 注册使用的端口
addUsagePort(config.getPort(), "WebSocket 端口");
// 创建 HTTP 服务器
httpServer = Feat.httpServer(opts -> opts.debug(true))
.httpHandler(request -> {
request.getResponse().setHeader("Sec-WebSocket-Protocol", "mqtt");
request.upgrade(new WebSocketUpgrade() {
ProxySession proxySession;
@Override
public void onHandShake(WebSocketRequest req, WebSocketResponse resp) {
proxySession = new ProxySession(req.getAioSession(), resp);
// 触发 NEW_SESSION 事件
brokerContext.Options().getProcessor()
.stateEvent(proxySession, StateMachineEnum.NEW_SESSION, null);
}
@Override
public void handleBinaryMessage(WebSocketRequest req, WebSocketResponse resp, byte[] data) {
// 解码 MQTT 消息并处理
// ...
}
@Override
public void destroy() {
// 触发 SESSION_CLOSED 事件
brokerContext.Options().getProcessor()
.stateEvent(proxySession, StateMachineEnum.SESSION_CLOSED, null);
}
});
});
httpServer.listen(config.getPort());
log("WebSocket 插件已启动,端口:" + config.getPort());
}
@Override
protected void destroyPlugin() {
if (httpServer != null) {
httpServer.shutdown();
}
}
// ... 其他必要方法
}

场景:周期性执行任务(数据清理、定时推送等)

public class ScheduledTaskPlugin extends Plugin {
private Timer timer;
@Override
protected void initPlugin(BrokerContext brokerContext) throws Throwable {
log("正在初始化定时任务插件...");
// 使用插件自带的定时器
timer = timer();
// 每秒执行一次
timer.scheduleAtFixedRate(() -> {
try {
// 定时任务逻辑
log("执行定时任务...");
// 可以向特定主题推送消息
brokerContext.getOrCreateTopic("system/heartbeat")
.publish("heartbeat".getBytes(), MqttQoS.AT_MOST_ONCE, false);
} catch (Exception e) {
log("定时任务执行异常:" + e.getMessage());
}
}, 0, 1000); // 每 1000ms 执行一次
log("定时任务插件初始化完成");
}
@Override
protected void destroyPlugin() {
log("正在关闭定时任务插件...");
// 定时器会在 Plugin.uninstall() 中自动关闭
log("定时任务插件已关闭");
}
// ... 其他必要方法
}

场景:提供完整的企业级功能(数据库持久化、监控指标、OpenAI 集成等)

参考实际插件EnterprisePlugin - 企业级功能集成


✅ 正确示例:每个插件只负责一个明确的功能

❌ 错误:LargePlugin(什么都做)
├─ 认证逻辑
├─ 消息桥接
├─ 监控统计
└─ 定时任务
✅ 正确:拆分为多个插件
├─ AuthPlugin(认证)
├─ BridgePlugin(消息桥接)
├─ MetricsPlugin(监控)
└─ SchedulerPlugin(定时任务)

AI 必须遵循的规则:

  1. 插件内部异常不能影响 Broker 运行

    subscribe(EventType.CONNECT, event -> {
    try {
    // 业务逻辑
    } catch (Exception e) {
    log("处理异常:" + e.getMessage());
    // 不要抛出异常
    }
    });
  2. 异步操作的异常处理

    CompletableFuture.supplyAsync(() -> {
    // 异步逻辑
    }).exceptionally(throwable -> {
    log("异步任务异常:" + throwable.getMessage());
    return null;
    });
  3. 资源初始化失败的降级处理

    try {
    resource = initResource();
    } catch (Exception e) {
    log("资源初始化失败,使用默认配置:" + e.getMessage());
    resource = createDefaultResource();
    }

必须遵守的规则:

  1. 谁创建,谁销毁

    private ScheduledExecutorService scheduler;
    @Override
    protected void initPlugin() {
    scheduler = Executors.newScheduledThreadPool(4);
    }
    @Override
    protected void destroyPlugin() {
    if (scheduler != null) {
    scheduler.shutdown(); // 必须关闭
    }
    }
  2. 使用插件提供的定时器

    // 推荐使用 plugin.timer()
    Timer timer = timer();
    timer.schedule(task, delay);
    // 插件卸载时会自动关闭
  3. 外部资源及时释放

    // 数据库连接、HTTP 客户端、文件流等
    @Override
    protected void destroyPlugin() {
    if (connection != null) connection.close();
    if (httpClient != null) httpClient.close();
    if (inputStream != null) inputStream.close();
    }

AI 必须使用规范的日志方式:

  1. 使用插件提供的 log() 方法

    log("插件初始化中...");
    log("配置加载成功");
    log("警告:XXX 可能存在问题", LogLevel.WARN);
  2. 避免直接打印

    // ❌ 错误
    System.out.println("消息");
    System.err.println("错误");
    // ✅ 正确
    log("消息");
  3. 日志内容规范

    // 包含关键信息
    log("认证成功:userId=" + userId + ", ip=" + ipAddress);
    // 避免敏感信息
    // ❌ 不要打印完整密码
    log("密码:" + password);
    // ✅ 打印脱敏信息
    log("密码验证:" + (password != null ? "***" : "null"));

可变参数必须提取到配置文件:

// ❌ 硬编码
String host = "127.0.0.1";
int port = 1883;
// ✅ 从配置加载
PluginConfig config = loadPluginConfig(PluginConfig.class);
String host = config.getHost();
int port = config.getPort();

配置文件结构:

plugin.yaml
host: 192.168.1.100
port: 1883
timeout: 30000
credentials:
username: admin
password: encrypted_password

合理实现 getVersion() 方法:

@Override
public String getVersion() {
// 方式 1: 使用常量
return "1.0.0";
// 方式 2: 使用 Broker 版本
return Options.VERSION;
// 方式 3: 从 MANIFEST.MF 读取
return getClass().getPackage().getImplementationVersion();
}

通过 order() 方法控制加载顺序:

@Override
public int order() {
// 数值越小,优先级越高
// 基础插件优先加载
if (this instanceof AuthPlugin) {
return -100; // 高优先级
}
// 依赖其他插件的后加载
if (this instanceof BusinessPlugin) {
return 100; // 低优先级
}
return 0; // 默认
}

常见优先级规划:

-100 ~ -1: 基础插件(认证、会话管理等)
0 ~ 99: 核心功能插件(消息处理、协议转换等)
100 ~ 199: 辅助功能插件(监控、日志等)
200+: 业务插件(定制功能)

AI 在生成插件代码时,必须逐项检查以下内容:

  • 是否创建了正确的包路径:tech.smartboot.mqtt.{module}
  • 是否创建了 ServiceLoader 配置文件:META-INF/services/tech.smartboot.mqtt.plugin.spec.Plugin
  • ServiceLoader 文件内容是否为插件类的全限定名
  • 是否创建了 readme.md 文件
  • pom.xml 是否配置了 maven-shade-pluginAppendingTransformer
  • 插件类是否继承了 Plugin 抽象类
  • 是否实现了 getVersion() 方法
  • 是否实现了 getVendor() 方法
  • 是否重写了 initPlugin() 方法
  • 是否重写了 destroyPlugin() 方法
  • 是否没有重写 install()uninstall() 方法
  • 是否使用 log() 方法而非 System.out
  • 是否需要配置类(PluginConfig)
  • 配置类是否有 getter/setter
  • 是否实现了 schema() 方法(如需可视化配置)
  • Schema 中的字段是否与配置类对应
  • 是否使用了 .col() 控制布局
  • 事件类型选择是否正确
  • 异步事件是否使用了 AsyncEventObject
  • 事件消费者是否实现了 enable() 检查
  • 事件处理中是否有异常捕获
  • 是否在 destroyPlugin() 中清理事件相关资源
  • 创建的线程池是否在 destroyPlugin() 中关闭
  • 打开的连接是否在 destroyPlugin() 中关闭
  • 是否优先使用插件提供的 timer()
  • 是否有资源泄漏风险
  • 事件处理中是否捕获了所有异常
  • 异步操作是否有 exceptionally() 处理
  • 初始化失败是否有降级方案
  • 是否没有将异常抛给 Broker
  • 打包后 jar 文件名是否规范
  • 是否说明了部署步骤
  • 是否说明了配置方法
  • 是否说明了端口占用情况(使用 addUsagePort()

以下是完整的插件示例,AI 可以参考此模板生成类似的插件:

package tech.smartboot.mqtt.auth.http;
import tech.smartboot.mqtt.common.enums.MqttConnectReturnCode;
import tech.smartboot.mqtt.common.message.MqttConnectMessage;
import tech.smartboot.mqtt.plugin.spec.BrokerContext;
import tech.smartboot.mqtt.plugin.spec.MqttSession;
import tech.smartboot.mqtt.plugin.spec.Options;
import tech.smartboot.mqtt.plugin.spec.Plugin;
import tech.smartboot.mqtt.plugin.spec.bus.AsyncEventObject;
import tech.smartboot.mqtt.plugin.spec.bus.EventType;
import tech.smartboot.mqtt.plugin.spec.schema.Enum;
import tech.smartboot.mqtt.plugin.spec.schema.Item;
import tech.smartboot.mqtt.plugin.spec.schema.Schema;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
/**
* HTTP 认证插件
*
* 功能:通过 HTTP 接口验证客户端用户名密码
*
* @author SmartBoot Team
* @version 1.0.0
*/
public class HttpAuthPlugin extends Plugin {
private PluginConfig config;
private BrokerContext brokerContext;
@Override
protected void initPlugin(BrokerContext brokerContext) throws Throwable {
this.brokerContext = brokerContext;
log("==============================================");
log("正在初始化 HTTP 认证插件...");
// 加载配置
config = loadPluginConfig(PluginConfig.class);
if (config == null) {
config = createDefaultConfig();
log("配置文件不存在,使用默认配置");
}
log("认证服务器地址:" + config.getAuthUrl());
log("超时时间:" + config.getTimeout() + "ms");
log("允许匿名:" + config.isAllowAnonymous());
// 订阅 CONNECT 事件
subscribe(EventType.CONNECT, AsyncEventObject.syncSubscriber((eventType, event) -> {
MqttSession session = event.getSession();
// 如果已经断开,直接返回
if (session.isDisconnect()) {
return;
}
MqttConnectMessage message = event.getObject();
String username = message.getPayload().userName();
byte[] password = message.getPayload().passwordInBytes();
// 检查是否匿名访问
if (isAnonymous(username, password)) {
if (config.isAllowAnonymous()) {
session.setAuthorized(true);
log("匿名访问已授权:" + session.getClientId());
return;
} else {
log("匿名访问被拒绝:" + session.getClientId());
MqttSession.connFailAck(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED, session);
return;
}
}
// 执行 HTTP 认证
try {
boolean authenticated = authenticateViaHttp(username, new String(password, StandardCharsets.UTF_8));
if (authenticated) {
session.setAuthorized(true);
log("认证成功:" + username);
} else {
log("认证失败:" + username);
MqttSession.connFailAck(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED, session);
}
} catch (Exception e) {
log("HTTP 认证异常:" + e.getMessage());
// 认证服务不可用,拒绝连接
MqttSession.connFailAck(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE, session);
}
}));
log("==============================================");
log("HTTP 认证插件初始化完成");
log("==============================================");
}
@Override
protected void destroyPlugin() {
log("正在关闭 HTTP 认证插件...");
log("HTTP 认证插件已关闭");
}
@Override
public String getVersion() {
return "1.0.0";
}
@Override
public String getVendor() {
return Options.VENDOR;
}
@Override
public String pluginName() {
return "http-auth-plugin";
}
@Override
public Schema schema() {
Schema schema = new Schema();
// 认证服务器地址
schema.addItem(Item.String("authUrl", "认证服务器 URL")
.tip("例如:http://localhost:8080/api/auth")
.col(12));
// 超时时间
schema.addItem(Item.Int("timeout", "HTTP 请求超时 (毫秒)")
.tip("默认:5000")
.col(6));
// 允许匿名
schema.addItem(Item.Switch("allowAnonymous", "允许匿名访问")
.tip("开启后,不提供用户名密码也能连接")
.col(6));
// 请求模板
schema.addItem(Item.TextArea("requestTemplate", "请求体模板")
.tip("支持{username}和{password}占位符")
.height(200)
.col(12));
return schema;
}
/**
* 创建默认配置
*/
private PluginConfig createDefaultConfig() {
PluginConfig config = new PluginConfig();
config.setAuthUrl("http://localhost:8080/api/auth");
config.setTimeout(5000);
config.setAllowAnonymous(false);
config.setRequestTemplate("{\"username\":\"{username}\",\"password\":\"{password}\"}");
return config;
}
/**
* 通过 HTTP 接口认证
*/
private boolean authenticateViaHttp(String username, String password) throws Exception {
URL url = new URL(config.getAuthUrl());
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
try {
conn.setConnectTimeout(config.getTimeout());
conn.setReadTimeout(config.getTimeout());
conn.setRequestMethod("POST");
conn.setRequestProperty("Content-Type", "application/json");
conn.setDoOutput(true);
// 构建请求体
String requestBody = config.getRequestTemplate()
.replace("{username}", username)
.replace("{password}", password);
// 发送请求
try (OutputStream os = conn.getOutputStream()) {
os.write(requestBody.getBytes(StandardCharsets.UTF_8));
os.flush();
}
// 读取响应
int responseCode = conn.getResponseCode();
if (responseCode == 200) {
return true;
} else {
log("HTTP 认证返回错误码:" + responseCode);
return false;
}
} finally {
conn.disconnect();
}
}
/**
* 检查是否为匿名连接
*/
private boolean isAnonymous(String username, byte[] password) {
return username == null || username.isEmpty() ||
password == null || password.length == 0;
}
}
package tech.smartboot.mqtt.auth.http;
/**
* HTTP 认证插件配置类
*/
public class PluginConfig {
/**
* 认证服务器 URL
*/
private String authUrl = "http://localhost:8080/api/auth";
/**
* HTTP 请求超时(毫秒)
*/
private int timeout = 5000;
/**
* 是否允许匿名访问
*/
private boolean allowAnonymous = false;
/**
* 请求体模板,支持{username}和{password}占位符
*/
private String requestTemplate = "{\"username\":\"{username}\",\"password\":\"{password}\"}";
// getter 和 setter
public String getAuthUrl() {
return authUrl;
}
public void setAuthUrl(String authUrl) {
this.authUrl = authUrl;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public boolean isAllowAnonymous() {
return allowAnonymous;
}
public void setAllowAnonymous(boolean allowAnonymous) {
this.allowAnonymous = allowAnonymous;
}
public String getRequestTemplate() {
return requestTemplate;
}
public void setRequestTemplate(String requestTemplate) {
this.requestTemplate = requestTemplate;
}
}
plugin.yaml
authUrl: http://192.168.1.100:8080/api/mqtt/auth
timeout: 10000
allowAnonymous: false
requestTemplate: |
{
"username": "{username}",
"password": "{password}",
"clientId": "{clientId}",
"timestamp": ${timestamp}
}
tech.smartboot.mqtt.auth.http.HttpAuthPlugin
# HTTP 认证插件
## 功能说明
本插件通过 HTTP 接口验证 MQTT 客户端的用户名和密码,实现自定义认证逻辑。
## 配置参数
| 参数 | 类型 | 说明 | 默认值 |
|------|------|------|--------|
| authUrl | String | 认证服务器 URL | http://localhost:8080/api/auth |
| timeout | int | HTTP 请求超时 (ms) | 5000 |
| allowAnonymous | boolean | 是否允许匿名访问 | false |
| requestTemplate | String | 请求体模板 | `{"username":"{username}",...}` |
## 部署步骤
1. 将编译后的 jar 文件放入 `plugins` 目录
2.`plugins/http-auth-plugin/` 目录创建 `plugin.yaml` 配置文件
3. 重启 smart-mqtt 服务
## 认证接口要求
- 请求方法:POST
- Content-Type: application/json
- 请求体:由 requestTemplate 配置
- 响应码:200 表示认证成功,其他表示失败
## 示例
认证请求:
```json
POST http://localhost:8080/api/auth
{
"username": "admin",
"password": "123456"
}
响应:200 OK // 认证成功
响应:401 Unauthorized // 认证失败
---
## 九、常见问题解答(FAQ)
### Q1: 插件无法被加载怎么办?
**检查清单:**
1. ✅ 确认 `META-INF/services/tech.smartboot.mqtt.plugin.spec.Plugin` 文件存在
2. ✅ 确认文件内容为插件类的全限定名
3. ✅ 确认 jar 文件放在 `plugins` 目录
4. ✅ 查看启动日志是否有 "load plugin: xxx" 输出
### Q2: 插件初始化失败如何处理?
**调试步骤:**
1. 查看日志输出,定位失败位置
2. 检查配置文件是否正确
3. 检查依赖的外部服务是否可用
4. 在 `initPlugin()` 中添加更多日志
### Q3: 如何调试插件代码?
**方法 1:远程调试**
```bash
# 启动时添加调试参数
java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -jar smart-mqtt.jar

方法 2:增加日志

log("DEBUG: 执行到此处,config=" + config);

Q4: 多个插件的执行顺序如何控制?

Section titled “Q4: 多个插件的执行顺序如何控制?”

通过 order() 方法控制:

@Override
public int order() {
return 100; // 值越小,优先级越高
}

推荐方式:

  • HTTP 客户端:OkHttp、HttpClient
  • Redis:Jedis、Lettuce
  • 数据库:JDBC、MyBatis
  • 消息队列:Kafka、RabbitMQ 客户端

注意事项:

  • destroyPlugin() 中关闭连接
  • 设置合理的超时时间
  • 处理网络异常

目前 smart-mqtt 不支持运行时热插拔,需要重启服务。

变通方案:

  • 在插件内部实现配置热更新
  • 使用 BROKER_CONFIGURE_LOADED 事件动态调整

可以,需要使用 addUsagePort() 注册:

addUsagePort(8080, "HTTP 服务端口");

这样可以在控制台显示插件使用的端口信息。


插件类型核心事件/扩展点典型用途参考插件
认证插件EventType.CONNECT用户名密码、Token、JWT、LDAP 认证SimpleAuthPlugin
消息桥接插件EventType.RECEIVE_MESSAGE转发到 Redis、Kafka、HTTP 等RedisBridgePlugin
监控插件EventType.CONNECT, RECEIVE_MESSAGE统计连接数、消息量、TPS-
协议转换插件自定义 ServerWebSocket、HTTP、TLS 转 MQTTWebSocketPlugin
会话持久化插件SessionStateProvider将会话状态存储到数据库MemorySessionPlugin
订阅管理插件SubscribeProvider自定义订阅关系管理-
定时任务插件timer()定期推送、数据清理BenchPlugin
集群插件BROKER_STARTED, 自定义通信节点间数据同步ClusterPlugin
企业级插件多种组合数据库、监控、AI 集成EnterprisePlugin

选择指南

  • 需要认证?→ 使用 CONNECT 事件
  • 需要处理消息?→ 使用 RECEIVE_MESSAGEWRITE_MESSAGE 事件
  • 需要持久化会话?→ 实现 SessionStateProvider
  • 需要自定义订阅?→ 实现 SubscribeProvider
  • 需要开启新端口?→ 创建 HTTP/TCP 服务器,使用 addUsagePort() 注册
  • 需要定时任务?→ 使用 timer() 方法

本规范详细阐述了 smart-mqtt 插件开发的完整流程和技术细节,所有内容包括:

  1. 项目结构:正确的 Maven 配置、GroupId、父 POM 依赖
  2. ServiceLoader:两个配置文件的正确处理方式
  3. 插件基类:生命周期方法、事件订阅机制
  4. 配置管理:PluginConfig、YAML 文件、Schema 可视化
  5. 事件总线:15 种完整事件类型及其使用场景
  6. Provider 扩展:SessionStateProvider、SubscribeProvider
  7. 最佳实践:异常处理、资源管理、日志规范
  8. 实战模板:认证、桥接、监控、协议转换等常见插件类型

AI 助手在生成插件代码时,应严格遵循本规范,确保生成的代码:

  • 符合架构:遵循 smart-mqtt 的插件体系和设计规范
  • 可编译运行:Maven 配置、依赖关系完全正确
  • 可维护:代码结构清晰、职责单一、注释完善
  • 健壮:正确处理异常、合理管理资源
  • 可配置:提供 Schema 可视化和 YAML 配置支持

新手推荐学习路径

  1. 阅读 SimpleAuthPlugin - 理解基础认证流程
  2. 阅读 WebSocketPlugin - 学习如何开启端口和协议转换
  3. 参考本文档第 9 节「常用插件类型快速参考」- 选择合适的实现方式
  4. 遵循检查清单(第 8 章)- 确保代码质量

通过标准化、规范化的开发流程,可以大大提高插件开发效率和质量。