Docker下安装
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 --restart always emqx/emqx:latest
EMQX默认端口
- 1883,MQTT over TCP
- 8883,MQTT over SSL/TLS
- 8083,MQTT over WebSocket
- 8084,MQTT over WSS
- 18083,HTTP
管理控制台
URL http://192.168.154.8:18083/ 用户名:admin 密码:public
MQTTX客户端
下载地址: https://mqttx.app/zh/downloads?os=windows
MQTT协议连接:mqtt://192.168.154.8:1883 WebSocket协议连接:ws://192.168.154.8:8083
Java客户端 Eclipse Paho
Maven依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.4</version>
</dependency>
回调函数
public class MqttReceiver implements MqttCallback {
@Override
public void connectionLost(Throwable throwable) {
// 连接丢失后处理
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
// 接收到信息后处理
byte[] payload = mqttMessage.getPayload();
String payloadString = new String(payload);
System.out.println(topic);
System.out.println(payloadString);
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
// 消息发送成功后处理,只处理QoS为1或2的
}
}
建立连接
MqttClient mqttClient = new MqttClient("tcp://192.168.154.8:1883", "test_client", new MemoryPersistence());
mqttClient.setCallback(new MqttReceiver());
mqttClient.connect();
订阅消息
mqttClient.subscribe("topic/#");
发送消息
String content = "Hello world!";
MqttMessage mqttMessage = new MqttMessage(content.getBytes());
mqttMessage.setQos(1);
client.publish("topic/1", mqttMessage);
断开连接
client.disconnect();
client.close();
Spring整合
Maven依赖
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
建立连接
@Bean
public MqttPahoClientFactory mqttPahoClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
String[] server = new String[1];
server[0] = "tcp://192.168.154.8:1883";
options.setServerURIs(server);
factory.setConnectionOptions(options);
return factory;
}
订阅消息
// 用于接收消息的channel
@Bean
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}
// 订阅主题,将从MQTT服务端接收到的消息放到对应的channel中
@Bean
public MessageProducer messageProducer(MqttPahoClientFactory mqttPahoClientFactory, @Qualifier("mqttInboundChannel") MessageChannel messageChannel) {
MqttPahoMessageDrivenChannelAdapter mqttPahoMessageDrivenChannelAdapter = new MqttPahoMessageDrivenChannelAdapter("inClient", mqttPahoClientFactory, "test/1");
mqttPahoMessageDrivenChannelAdapter.setConverter(new DefaultPahoMessageConverter());
mqttPahoMessageDrivenChannelAdapter.setQos(1);
mqttPahoMessageDrivenChannelAdapter.setOutputChannel(messageChannel);
return mqttPahoMessageDrivenChannelAdapter;
}
// 从channel中获取消息并处理
@Bean
@ServiceActivator(inputChannel = "mqttInboundChannel")
public MessageHandler messageInboundHandler() {
return message -> {
System.out.println(message.getHeaders().get("mqtt_receivedTopic"));
System.out.println(message.getPayload());
};
}
使用@MessageEndpoint
和@ServiceActivator
获取消息并处理
@MessageEndpoint
public class TestHandler {
@ServiceActivator(inputChannel = "mqttInboundChannel")
public void handle(Message<?> message) {
System.out.println(message.getHeaders().get("mqtt_receivedTopic"));
System.out.println(message.getPayload());
}
}
传递处理,可再传给下一个@ServiceActivator
或@Router
@MessageEndpoint
public class TestHandler {
@ServiceActivator(inputChannel = "mqttInboundChannel", outputChannel = "next")
public String handle(Message<?> message) {
System.out.println(message.getHeaders().get("mqtt_receivedTopic"));
System.out.println(message.getPayload());
return "x";
}
@ServiceActivator(inputChannel = "next")
public void nextHandle(String str) {
System.out.println(str);
}
}
发送消息
// 用于发送消息的channel
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@SpringBootApplication
// 扫描消息发送器的组件
@IntegrationComponentScan("com.oliverclio.mqtt")
public class MqttIntegrationApplication {
public static void main(String[] args) {
SpringApplication.run(MqttIntegrationApplication.class, args);
}
}
// 发送消息的组件,发送到对应的channel
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttSender {
// message自动对应到消息的payload,header通过注解指定
void sendMessage(String message);
void sendMessage(String message, @Header(MqttHeaders.TOPIC) String topic);
void sendMessage(String message, @Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer qos);
}
// 从channel中获取消息,并发送到MQTT服务端
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler messageOutboundHandler(MqttPahoClientFactory mqttPahoClientFactory) {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler("outClient", mqttPahoClientFactory);
handler.setAsync(true);
handler.setConverter(new DefaultPahoMessageConverter());
handler.setDefaultTopic("test/1");
return handler;
}
消息路由
下例使用HeaderValueRouter
,根据消息头的某一属性值做转发决定
@Bean
public MessageChannel firstSubInboundChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel secondSubInboundChannel() {
return new DirectChannel();
}
// 从channel中获取消息后,根据一定逻辑决定转发到另一个channel
@Bean
@ServiceActivator(inputChannel = "mqttInboundChannel")
public HeaderValueRouter headerValueRouter() {
// 根据消息头的某一属性值做决定
HeaderValueRouter router = new HeaderValueRouter("mqtt_receivedTopic");
router.setChannelMapping("test/1", "firstSubInboundChannel");
router.setChannelMapping("test/2", "secondSubInboundChannel");
router.setChannelMapping("test/3", "firstSubInboundChannel");
return router;
}
@Bean
@ServiceActivator(inputChannel = "firstSubInboundChannel")
public MessageHandler firstSubInboundHandler() {
return message -> {
System.out.println("number 1");
System.out.println(message.getHeaders().get("mqtt_receivedTopic"));
System.out.println(message.getPayload());
};
}
@Bean
@ServiceActivator(inputChannel = "secondSubInboundChannel")
public MessageHandler secondSubInboundHandler() {
return message -> {
System.out.println("number 2");
System.out.println(message.getHeaders().get("mqtt_receivedTopic"));
System.out.println(message.getPayload());
};
}
通过使用AbstractMessageRouter
来自由实现转发逻辑
@Bean
public AbstractMessageRouter abstractMessageRouter() {
return new AbstractMessageRouter() {
@Override
@Router(inputChannel = "mqttInboundChannel")
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
MessageHeaders headers = message.getHeaders();
String topic = headers.get("mqtt_receivedTopic").toString();
Collection<MessageChannel> resultCollection = Collections.singleton(firstSubInboundChannel());
if ("test/2".equals(topic)) {
resultCollection = Collections.singleton(secondSubInboundChannel());
}
return resultCollection;
}
};
}
或
@Bean
@ServiceActivator(inputChannel = "mqttInboundChannel")
public AbstractMessageRouter abstractMessageRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
MessageHeaders headers = message.getHeaders();
String topic = headers.get("mqtt_receivedTopic").toString();
Collection<MessageChannel> resultCollection = Collections.singleton(firstSubInboundChannel());
if ("test/2".equals(topic)) {
resultCollection = Collections.singleton(secondSubInboundChannel());
}
return resultCollection;
}
};
}
使用@MessageEndpoint
和@Router
注解
@MessageEndpoint
public class TestRouter {
private final MessageChannel firstSubInboundChannel;
private final MessageChannel secondSubInboundChannel;
public TestRouter(@Qualifier("firstSubInboundChannel") MessageChannel firstSubInboundChannel, @Qualifier("secondSubInboundChannel") MessageChannel secondSubInboundChannel) {
this.firstSubInboundChannel = firstSubInboundChannel;
this.secondSubInboundChannel = secondSubInboundChannel;
}
@Router(inputChannel = "mqttInboundChannel")
public Collection<MessageChannel> resolveOutputChannel(Message<?> message) {
MessageHeaders headers = message.getHeaders();
String topic = headers.get("mqtt_receivedTopic").toString();
Collection<MessageChannel> resultCollection = Collections.singleton(firstSubInboundChannel);
if ("test/2".equals(topic)) {
resultCollection = Collections.singleton(secondSubInboundChannel);
}
return resultCollection;
}
}
@Router
注解下的方法,传入参数可以是消息payload的包装对象,用于接收payload数据。或使用@Header
注解,接收指定的消息头数据。返回数据可以改为String
类型,返回channel的名称
@Router(inputChannel = "mqttInboundChannel")
public String resolveOutputChannel(@Header("mqtt_receivedTopic") String topic) {
String resultChannelName = "firstSubInboundChannel";
if ("test/2".equals(topic)) {
resultChannelName = "secondSubInboundChannel";
}
return resultChannelName;
}
JavaScript客户端
安装
npm install mqtt@4.3.7 --save
引入
import mqtt from 'mqtt/dist/mqtt'
建立连接
const options = {
username: 'username',
password: 'password'
}
const client = mqtt.connect('ws://192.168.88.1:8083/mqtt', options)
订阅消息
client.subscribe('test/2', {qos: 2})
client.on('message', (topic, message) => {
console.log(topic)
console.log(message)
})
client.unsubscribe('test/2')
发送消息
client.publish('test/1', 'message', {qos: 2, retain: false})
如需发送二进制信息,可使用Uint8Array
类型
参考文档
- https://www.emqx.io/docs/zh/v4.4/development/java.html#%E9%80%9A%E8%BF%87-maven-%E5%AE%89%E8%A3%85-paho-java
- https://docs.spring.io/spring-integration/docs/5.5.18/reference/html/mqtt.html#mqtt
- https://docs.spring.io/spring-integration/docs/5.5.18/reference/html/core.html#spring-integration-core-messaging
- https://docs.spring.io/spring-integration/docs/5.5.18/reference/html/message-routing.html#messaging-routing-chapter
- https://www.jianshu.com/p/da84fbcd4c5b
- https://www.emqx.com/zh/blog/mqtt-js-tutorial
PREVIOUSWebSocket前后端搭建
NEXTVue响应式原理