EMQX使用

Docker下安装

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 --restart always emqx/emqx:latest

EMQX默认端口

  • 1883,MQTT over TCP
  • 8883,MQTT over SSL/TLS
  • 8083,MQTT over WebSocket
  • 8084,MQTT over WSS
  • 18083,HTTP

管理控制台

URL http://192.168.154.8:18083/ 用户名:admin 密码:public

MQTTX客户端

下载地址: https://mqttx.app/zh/downloads?os=windows

MQTT协议连接:mqtt://192.168.154.8:1883 WebSocket协议连接:ws://192.168.154.8:8083

Java客户端 Eclipse Paho

Maven依赖

<dependency>  
    <groupId>org.eclipse.paho</groupId>  
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>  
    <version>1.2.4</version>  
</dependency>

回调函数

public class MqttReceiver implements MqttCallback {  
    @Override  
    public void connectionLost(Throwable throwable) {  
        // 连接丢失后处理
    }  
  
    @Override  
    public void messageArrived(String topic, MqttMessage mqttMessage) {  
        // 接收到信息后处理
        byte[] payload = mqttMessage.getPayload();  
        String payloadString = new String(payload);  
  
        System.out.println(topic);  
        System.out.println(payloadString);
    }  
  
    @Override  
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {  
        // 消息发送成功后处理,只处理QoS为1或2的
    }  
}

建立连接

MqttClient mqttClient = new MqttClient("tcp://192.168.154.8:1883", "test_client", new MemoryPersistence());
mqttClient.setCallback(new MqttReceiver());  
mqttClient.connect();

订阅消息

mqttClient.subscribe("topic/#");

发送消息

String content = "Hello world!";
MqttMessage mqttMessage = new MqttMessage(content.getBytes());
mqttMessage.setQos(1);
client.publish("topic/1", mqttMessage);

断开连接

client.disconnect();
client.close();

Spring整合

Maven依赖

<dependency>  
    <groupId>org.springframework.integration</groupId>  
    <artifactId>spring-integration-mqtt</artifactId>  
</dependency>

建立连接

@Bean  
public MqttPahoClientFactory mqttPahoClientFactory() {  
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();  
    MqttConnectOptions options = new MqttConnectOptions();  
    String[] server = new String[1];  
    server[0] = "tcp://192.168.154.8:1883";  
    options.setServerURIs(server);  
    factory.setConnectionOptions(options);  
    return factory;  
}

订阅消息

// 用于接收消息的channel
@Bean  
public MessageChannel mqttInboundChannel() {  
    return new DirectChannel();  
}

// 订阅主题,将从MQTT服务端接收到的消息放到对应的channel中
@Bean  
public MessageProducer messageProducer(MqttPahoClientFactory mqttPahoClientFactory, @Qualifier("mqttInboundChannel") MessageChannel messageChannel) {  
    MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter("inClient", mqttPahoClientFactory, "test/1");  
    mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());  
    mqttPahoMessageDrivenChannelAdapter.setQos(1);  
    mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageChannel);  
    return mqttPahoMessageDrivenChannelAdapter;  
}

// 从channel中获取消息并处理
@Bean  
@ServiceActivator(inputChannel = "mqttInboundChannel")  
public MessageHandler messageInboundHandler() {  
    return message -> {  
        System.out.println(message.getHeaders().get("mqtt_receivedTopic"));  
        System.out.println(message.getPayload());  
    };  
}

使用@MessageEndpoint@ServiceActivator获取消息并处理

@MessageEndpoint  
public class TestHandler {  
    @ServiceActivator(inputChannel = "mqttInboundChannel")  
    public void handle(Message<?> message) {  
        System.out.println(message.getHeaders().get("mqtt_receivedTopic"));  
        System.out.println(message.getPayload());  
    }  
}

传递处理,可再传给下一个@ServiceActivator@Router

@MessageEndpoint  
public class TestHandler {  
    @ServiceActivator(inputChannel = "mqttInboundChannel", outputChannel = "next")  
    public String handle(Message<?> message) {  
        System.out.println(message.getHeaders().get("mqtt_receivedTopic"));  
        System.out.println(message.getPayload());  
        return "x";  
    }  
  
    @ServiceActivator(inputChannel = "next")  
    public void nextHandle(String str) {  
        System.out.println(str);  
    }  
}

发送消息

// 用于发送消息的channel
@Bean  
public MessageChannel mqttOutboundChannel() {  
    return new DirectChannel();  
}
@SpringBootApplication  
// 扫描消息发送器的组件
@IntegrationComponentScan("com.oliverclio.mqtt")  
public class MqttIntegrationApplication {  
    public static void main(String[] args) {  
        SpringApplication.run(MqttIntegrationApplication.class, args);
    }
}
// 发送消息的组件,发送到对应的channel
@Component  
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")  
public interface MqttSender {  
    // message自动对应到消息的payload,header通过注解指定
    void sendMessage(String message);  
    void sendMessage(String message, @Header(MqttHeaders.TOPIC) String topic);  
    void sendMessage(String message, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer qos);  
}
// 从channel中获取消息,并发送到MQTT服务端
@Bean  
@ServiceActivator(inputChannel = "mqttOutboundChannel")  
public MessageHandler messageOutboundHandler(MqttPahoClientFactory mqttPahoClientFactory) {  
    MqttPahoMessageHandler handler = new MqttPahoMessageHandler("outClient", mqttPahoClientFactory);  
    handler.setAsync(true);  
    handler.setConverter(new DefaultPahoMessageConverter());  
    handler.setDefaultTopic("test/1");  
    return handler;  
}

消息路由

下例使用HeaderValueRouter,根据消息头的某一属性值做转发决定

@Bean  
public MessageChannel firstSubInboundChannel() {  
    return new DirectChannel();  
}  
  
@Bean  
public MessageChannel secondSubInboundChannel() {  
    return new DirectChannel();  
}

// 从channel中获取消息后,根据一定逻辑决定转发到另一个channel
@Bean  
@ServiceActivator(inputChannel = "mqttInboundChannel")  
public HeaderValueRouter headerValueRouter() {  
    // 根据消息头的某一属性值做决定
    HeaderValueRouter router = new HeaderValueRouter("mqtt_receivedTopic");  
    router.setChannelMapping("test/1", "firstSubInboundChannel");  
    router.setChannelMapping("test/2", "secondSubInboundChannel");  
    router.setChannelMapping("test/3", "firstSubInboundChannel");  
    return router;  
}

@Bean  
@ServiceActivator(inputChannel = "firstSubInboundChannel")  
public MessageHandler firstSubInboundHandler() {  
    return message -> {  
        System.out.println("number 1");  
        System.out.println(message.getHeaders().get("mqtt_receivedTopic"));  
        System.out.println(message.getPayload());  
    };  
}  
  
@Bean  
@ServiceActivator(inputChannel = "secondSubInboundChannel")  
public MessageHandler secondSubInboundHandler() {  
    return message -> {  
        System.out.println("number 2");  
        System.out.println(message.getHeaders().get("mqtt_receivedTopic"));  
        System.out.println(message.getPayload());  
    };  
}

通过使用AbstractMessageRouter来自由实现转发逻辑

@Bean  
public AbstractMessageRouter abstractMessageRouter() {  
    return new AbstractMessageRouter() {  
        @Override  
        @Router(inputChannel = "mqttInboundChannel")  
        protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {  
            MessageHeaders headers = message.getHeaders();  
            String topic = headers.get("mqtt_receivedTopic").toString();  
  
            Collection<MessageChannel> resultCollection = Collections.singleton(firstSubInboundChannel());  
            if ("test/2".equals(topic)) {  
                resultCollection = Collections.singleton(secondSubInboundChannel());  
            }  
            return resultCollection;  
        }  
    };  
}

@Bean  
@ServiceActivator(inputChannel = "mqttInboundChannel")  
public AbstractMessageRouter abstractMessageRouter() {  
    return new AbstractMessageRouter() {  
        @Override  
        protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {  
            MessageHeaders headers = message.getHeaders();  
            String topic = headers.get("mqtt_receivedTopic").toString();  
            Collection<MessageChannel> resultCollection = Collections.singleton(firstSubInboundChannel());  
            if ("test/2".equals(topic)) {  
                resultCollection = Collections.singleton(secondSubInboundChannel());  
            }  
            return resultCollection;  
        }  
    };  
}

使用@MessageEndpoint@Router注解

@MessageEndpoint  
public class TestRouter {  
  
    private final MessageChannel firstSubInboundChannel;  
    private final MessageChannel secondSubInboundChannel;  
  
    public TestRouter(@Qualifier("firstSubInboundChannel") MessageChannel firstSubInboundChannel, @Qualifier("secondSubInboundChannel") MessageChannel secondSubInboundChannel) {  
        this.firstSubInboundChannel = firstSubInboundChannel;  
        this.secondSubInboundChannel = secondSubInboundChannel;  
    }  
  
    @Router(inputChannel = "mqttInboundChannel")  
    public Collection<MessageChannel> resolveOutputChannel(Message<?> message) {  
        MessageHeaders headers = message.getHeaders();  
        String topic = headers.get("mqtt_receivedTopic").toString();  
  
        Collection<MessageChannel> resultCollection = Collections.singleton(firstSubInboundChannel);  
        if ("test/2".equals(topic)) {  
            resultCollection = Collections.singleton(secondSubInboundChannel);  
        }  
        return resultCollection;  
    }  
}

@Router注解下的方法,传入参数可以是消息payload的包装对象,用于接收payload数据。或使用@Header注解,接收指定的消息头数据。返回数据可以改为String类型,返回channel的名称

@Router(inputChannel = "mqttInboundChannel")  
public String resolveOutputChannel(@Header("mqtt_receivedTopic") String topic) {  
    String resultChannelName = "firstSubInboundChannel";  
    if ("test/2".equals(topic)) {  
        resultChannelName = "secondSubInboundChannel";  
    }  
    return resultChannelName;  
}

JavaScript客户端

安装

npm install mqtt@4.3.7 --save

引入

import mqtt from 'mqtt/dist/mqtt'

建立连接

const options = {
  username: 'username',
  password: 'password'
}
const client = mqtt.connect('ws://192.168.88.1:8083/mqtt', options)

订阅消息

client.subscribe('test/2', {qos: 2})  
client.on('message', (topic, message) => {  
  console.log(topic)  
  console.log(message)  
})

client.unsubscribe('test/2')

发送消息

client.publish('test/1', 'message', {qos: 2, retain: false})

如需发送二进制信息,可使用Uint8Array类型

参考文档