Skip to content

集群插件

This content is not available in your language yet.

cluster-plugin 插件为 smart-mqtt broker 提供基于 HTTP 的集群协调能力,实现多节点集群部署、工作节点接入和集群状态同步。

  • 支持核心节点与工作节点的集群部署
  • 节点间通过 HTTP 通信进行状态同步与消息转发
  • 支持分布式消息路由与队列策略
  • 动态发现与管理集群节点
  • ClusterPlugin:插件入口,负责初始化集群核心服务或工作节点连接。
  • Coordinator:负责集群节点发现、状态维护、节点加入/退出事件处理及分布式消息路由。
  • Distributor:消息分发器,支持多种队列策略。
  • PluginConfig:插件配置管理,支持核心节点、监听地址、端口、队列长度、队列策略、集群节点列表等参数。

plugin.yaml 中配置,示例:

core: true # 是否为核心节点,true=核心节点,false=工作节点
host: 0.0.0.0 # 集群服务监听地址,仅当core为true时有效
port: 8884 # 集群服务监听端口,仅当core为true时有效
queueLength: 1024 # 消息队列长度
queuePolicy: 0 # 队列策略(0=丢弃最新,1=丢弃最旧)
clusters: # 集群节点地址列表
- http://core1:8884
- http://core2:8884
  1. 将插件及配置文件放置于 smart-mqtt 的 plugins 目录下
  2. 配置好 plugin.yaml,根据实际部署角色设置 core/host/port/clusters 等参数
  3. 启动 smart-mqtt 服务,插件会自动加载并初始化集群功能
  • 集群节点间需保证网络互通,端口配置需一致
  • 各节点 plugin.yaml 配置需根据实际角色(核心/工作)分别设置
  • 启动顺序建议先启动核心节点,再启动工作节点
sequenceDiagram
    autonumber
    participant WorkerN as 工作节点N
    participant Cluster as 集群插件
    participant Core as 核心节点<br/>(Coordinator)

    rect rgb(230, 245, 255)
        Note over WorkerN,Core: 节点注册阶段
        WorkerN->>Cluster: 1. 节点启动
        Cluster->>Cluster: 2. 加载配置<br/>(clusters地址列表)
        
        Cluster->>Core: 3. HTTP POST /register<br/>(节点信息+认证凭证)
        
        Core->>Core: 4. 验证节点身份
        alt 验证通过
            Core->>Core: 5a. 注册到节点列表<br/>分配NodeID
            Core-->>Cluster: 6a. HTTP 200 OK<br/>(集群配置+节点列表)
            Cluster->>Cluster: 7a. 更新本地状态表
            Note over WorkerN,Core: 节点加入集群成功
        else 验证失败
            Core-->>Cluster: 6b. HTTP 403 Forbidden
            Cluster->>WorkerN: 7b. 注册失败,服务退出
        end
    end

    rect rgb(255, 245, 230)
        Note over WorkerN,Core: 状态同步阶段
        Core->>Cluster: 8. 状态广播<br/>(节点加入事件)
        Cluster->>Cluster: 9. 更新集群拓扑
        
        WorkerN->>Core: 10. 心跳检测
        Core-->>WorkerN: 11. 心跳响应
    end
sequenceDiagram
    autonumber
    participant PubClient as 发布者(节点1)
    participant Worker1 as 工作节点1
    participant Cluster1 as 集群插件(节点1)
    participant Core as 核心节点
    participant Cluster2 as 集群插件(节点2)
    participant Worker2 as 工作节点2
    participant SubClient as 订阅者(节点2)

    rect rgb(230, 245, 255)
        Note over PubClient,SubClient: 消息发布阶段
        PubClient->>Worker1: 1. PUBLISH消息<br/>(topic: sensors/temp)
        Worker1->>Cluster1: 2. 消息路由请求
        
        Cluster1->>Cluster1: 3. 目标节点判断<br/>(根据topic路由表)
    end

    rect rgb(255, 245, 230)
        Note over Core,Worker2: 跨节点转发
        
        alt 目标在本地
            Cluster1->>Worker1: 4a. 本地转发
            Worker1->>PubClient: 4b. 确认接收
        else 目标在远程节点
            Cluster1->>Core: 4c. HTTP POST /forward<br/>(消息+目标节点)
            
            Core->>Core: 5. 路由决策
            Core->>Cluster2: 6. HTTP转发消息
            
            Cluster2->>Worker2: 7. 本地投递
            Worker2->>SubClient: 8. 推送消息
            
            SubClient-->>Worker2: 9. 确认接收
            Worker2-->>Cluster2: 10. 转发确认
            Cluster2-->>Core: 11. 返回确认
            Core-->>Cluster1: 12. 转发完成
        end
    end

    rect rgb(255, 255, 230)
        Note over PubClient,SubClient: 队列策略处理
        
        alt 队列策略=丢弃最新(queuePolicy=0)
            Cluster1->>Cluster1: 队列满时丢弃新消息
        else 队列策略=丢弃最旧(queuePolicy=1)
            Cluster1->>Cluster1: 队列满时丢弃旧消息
        end
    end
  1. 核心节点启动: 核心节点启动HTTP服务,等待工作节点连接
  2. 工作节点注册: 工作节点根据配置的clusters地址连接到核心节点并完成注册
  3. 消息路由:
    • 本地客户端直接转发
    • 远程客户端通过HTTP转发到对应节点
  4. 状态同步: Coordinator维护集群节点状态,处理节点加入/退出事件
  5. 分布式分发: Distributor根据队列策略管理消息分发