Skip to content

Event Bus

The event bus of smart-mqtt is the core component of the entire framework, implemented using the classic producer/consumer model. All MQTT message processing, connection management, subscription management, etc. are driven through the event bus, which provides developers with extremely strong extension capabilities.

Event Bus Architecture Diagram

The event bus adopts the Publish-Subscribe Pattern (Pub/Sub), composed of three core roles:

flowchart LR
    P[Publisher<br/>Event Publisher] -->|publish| B[EventBus<br/>Event Bus]
    B -->|notify| S[Subscriber<br/>Event Subscriber]
  • Publisher: Triggers events at specific times, such as connection establishment, message reception, etc.
  • EventBus: Responsible for event registration, distribution, and routing
  • Subscriber: Listens to events of interest and executes corresponding processing logic
sequenceDiagram
    participant Client as MQTT Client
    participant Broker as Broker
    participant Bus as EventBus
    participant Subs as Subscribers

    Client->>Broker: Send CONNECT
    Broker->>Bus: publish(CONNECT event)
    Bus->>Subs: Asynchronously notify all subscribers
    Subs-->>Bus: Processing complete
    Bus-->>Broker: Continue subsequent flow
    Broker->>Client: Return CONNACK

The event bus supports both synchronous and asynchronous publishing modes:

ModeCharacteristicsApplicable Scenarios
Synchronous ModePublisher blocks waiting for all subscribers to complete processingSimple, fast logic processing
Asynchronous ModeBased on CompletableFuture chain processingTime-consuming operations (such as authentication, log recording)

smart-mqtt has built-in rich event types, covering various stages of the Broker lifecycle:

Event TypeTrigger TimingData ObjectCharacteristics
BROKER_CONFIGURE_LOADEDWhen Broker configuration loading is completeOptionsOne-time event
BROKER_STARTEDWhen Broker starts successfullyBrokerContextOne-time event
BROKER_DESTROYWhen Broker stops serviceBrokerContextOne-time event

Usage Scenarios: Plugin initialization, resource preparation, graceful shutdown, etc.

Event TypeTrigger TimingData ObjectDescription
SESSION_CREATETCP connection established, Session initialization completeMqttSessionConnection established but not yet authenticated
CONNECTClient CONNECT message authentication passedAsyncEventObject<MqttConnectMessage>Async event, supports interception
DISCONNECTWhen TCP connection disconnectsAbstractSessionIncludes normal and abnormal disconnections

Difference between SESSION_CREATE and CONNECT:

  • SESSION_CREATE: Only indicates TCP connection established, MQTT Session object created
  • CONNECT: Indicates MQTT connection authentication passed, client can start normal communication
Event TypeTrigger TimingData ObjectDescription
RECEIVE_MESSAGEWhen any message is received from clientEventObject<MqttMessage>High-performance optimization, independent storage
WRITE_MESSAGEWhen any message is sent to clientEventObject<MqttMessage>High-performance optimization, independent storage
RECEIVE_CONN_ACK_MESSAGEWhen CONNACK message is receivedMqttConnAckMessageUsed in client mode

Performance Optimization: RECEIVE_MESSAGE and WRITE_MESSAGE are high-frequency events. The framework uses independent CopyOnWriteArrayList to store subscribers and has done null check optimization, with zero overhead when there are no subscribers.

Event TypeTrigger TimingData ObjectDescription
TOPIC_CREATEWhen a new Topic is createdString (Topic name)Triggered on first subscription
SUBSCRIBE_ACCEPTWhen server accepts subscription requestEventObject<MqttTopicSubscription>Triggered after validation passes
UNSUBSCRIBE_ACCEPTWhen server accepts unsubscriptionEventObject<MqttUnsubscribeMessage>Triggered after validation passes
SUBSCRIBE_TOPICWhen client successfully subscribes to TopicEventObject<MessageDeliver>Triggered after subscription relationship established
UNSUBSCRIBE_TOPICWhen client unsubscribes from TopicMessageDeliverTriggered when subscription relationship removed
SUBSCRIBE_REFRESH_TOPICWhen refreshing Topic subscriptionMessageDeliverTriggered during session recovery

Event Trigger Sequence:

Client sends SUBSCRIBE
SUBSCRIBE_ACCEPT (validation passed)
TOPIC_CREATE (if new Topic)
SUBSCRIBE_TOPIC (establish subscription relationship)

Event types marked as one-time events (once=true) will only have their subscribers executed once, then automatically become invalid. This is suitable for initialization operations:

// BROKER_STARTED will only trigger once after Broker startup completes
brokerContext.getEventBus().subscribe(EventType.BROKER_STARTED, (eventType, context) -> {
// Execute one-time initialization operations
initializePlugin();
});

Subscribe to client connection events, count online connections:

AtomicInteger connectionCount = new AtomicInteger(0);
// CONNECT is an async event, needs to be wrapped with syncConsumer
brokerContext.getEventBus().subscribe(EventType.CONNECT,
AsyncEventObject.syncConsumer((eventType, event) -> {
MqttSession session = event.getSession();
int count = connectionCount.incrementAndGet();
System.out.println("Client " + session.getClientId() + " connected, total: " + count);
event.getFuture().complete(null); // Complete async processing
})
);
// DISCONNECT is a sync event
brokerContext.getEventBus().subscribe(EventType.DISCONNECT, (eventType, session) -> {
int count = connectionCount.decrementAndGet();
System.out.println("Client " + session.getClientId() + " disconnected, total: " + count);
});

Record all sent and received messages:

// Receive message
brokerContext.getEventBus().subscribe(EventType.RECEIVE_MESSAGE, (eventType, event) -> {
MqttMessage message = event.getObject();
MqttSession session = event.getSession();
logger.info("[RECV] client={}, message={}", session.getClientId(), message);
});
// Send message
brokerContext.getEventBus().subscribe(EventType.WRITE_MESSAGE, (eventType, event) -> {
MqttMessage message = event.getObject();
MqttSession session = event.getSession();
logger.info("[SEND] client={}, message={}", session.getClientId(), message);
});

Execute plugin initialization when Broker starts:

brokerContext.getEventBus().subscribe(EventType.BROKER_STARTED,
new DisposableEventBusSubscriber<BrokerContext>() {
@Override
public void consumer(EventType<BrokerContext> eventType, BrokerContext context) {
// Execute only once: initialize database connection, start background threads, etc.
plugin.initDatabase();
plugin.startBackgroundTask();
}
}
);

Release resources when Broker stops:

brokerContext.getEventBus().subscribe(EventType.BROKER_DESTROY, (eventType, context) -> {
// Close thread pool
executorService.shutdown();
// Close database connection
database.close();
// Clean up temporary files
cleanupTempFiles();
});

Plugins can define their own event types for inter-module communication:

public class MyPlugin extends Plugin {
// Define custom event type
public static final EventType<MyCustomData> CUSTOM_EVENT =
new EventType<>("my_custom_event");
@Override
public void install(BrokerContext context) {
// Subscribe to custom event
context.getEventBus().subscribe(CUSTOM_EVENT, (eventType, data) -> {
handleCustomEvent(data);
});
}
private void someMethod() {
// Trigger custom event
brokerContext.getEventBus().publish(CUSTOM_EVENT, myData);
}
}

1. Choose Appropriate Subscription Mode

Section titled "1. Choose Appropriate Subscription Mode"
  • Synchronous Subscription: Scenarios with simple processing logic and short duration (< 1ms)
  • Asynchronous Subscription: Scenarios involving I/O operations (database, network requests), avoid blocking Broker

Exceptions inside subscribers should not affect other subscribers and Broker operation:

brokerContext.getEventBus().subscribe(EventType.RECEIVE_MESSAGE, (eventType, event) -> {
try {
processMessage(event.getObject());
} catch (Exception e) {
logger.error("Process message failed", e);
// Do not throw exception
}
});
  • Avoid time-consuming operations in RECEIVE_MESSAGE and WRITE_MESSAGE events
  • If processing is needed, use async mode or submit tasks to an independent thread pool
  • Subscribers for high-frequency events should be as lightweight as possible

Events may be triggered in multi-threaded environments, ensure subscriber logic is thread-safe:

// Use thread-safe counter
private final LongAdder messageCount = new LongAdder();
brokerContext.getEventBus().subscribe(EventType.RECEIVE_MESSAGE, (eventType, event) -> {
messageCount.increment(); // Thread-safe
});
  • Initialize resources in BROKER_STARTED
  • Release resources in BROKER_DESTROY
  • Avoid memory leaks: Ensure timely cancellation of no longer needed event subscriptions