MQTT

MQTT是一个客户端服务端架构的发布/订阅模式的消息传输协议。是一种轻量级的发布、订阅信息传输协议。可在不可靠的网络环境中进行扩展,适用于设备硬件存储空间或网络带宽有限的场景。使用MQTT协议,消息发送者与接收者不受时间和空间的限制。物联网平台支持设备使用MQTT协议接入。

使用场景

无人机云平台项目中,云服务器需要与连接无人机遥控的app(安卓)在无人机飞行期间持续相互发送数据,能够在低带宽、高延迟、不稳定的网络等因素下保证高效稳定、轻量开销小、降低网络流量等特征,同时硬件及带宽和开发难度不能太高, 综合考量后,决定采用MQTT协议实现功能。

需要实现的功能有:

  1. 连续的会话控制;

  2. 移动端上报实时传输无人机的电量、经纬数据、高度角度数据,供云平台展示;

  3. 云平台下发控制指令,移动端接收后实时改变无人机飞行状态;

后期考虑

通过RocketMQ与Mosquitto相结合的方式,实现基于RocketMQ的MQTT消息推送服务器及其分布式部署。

具体实现

  • 配置MqttConfig.java

    @Configuration
    public class MqttConfig {

    /**
    * 1. 先创建连接
    */

    /**
    * 创建MqttPahoClientFactory,设置MQTT Broker连接属性,如果使用SSL验证,也在这里设置。
    * @return factory
    */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions options = new MqttConnectOptions();

    // 设置代理端的URL地址,可以是多个
    options.setServerURIs(new String[]{"tcp://127.0.0.1:1883"});

    factory.setConnectionOptions(options);
    return factory;
    }

    /**
    * 2、入站通道
    */
    @Bean
    public MessageChannel mqttInputChannel() {
    return new DirectChannel();
    }

    /**
    * 入站
    */
    @Bean
    public MessageProducer inbound() {
    // Paho客户端消息驱动通道适配器,主要用来订阅主题
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("consumerClient-paho",
    mqttClientFactory(), "boat", "collector", "battery", "+/sensor");
    adapter.setCompletionTimeout(5000);

    // Paho消息转换器
    DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
    // 按字节接收消息
    // defaultPahoMessageConverter.setPayloadAsBytes(true);
    adapter.setConverter(defaultPahoMessageConverter);
    adapter.setQos(1); // 设置QoS
    adapter.setOutputChannel(mqttInputChannel());
    return adapter;
    }


    /**
    * 3、消息转化,中间站
    */

    @Bean
    // ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
    return message -> {
    String payload = message.getPayload().toString();

    // byte[] bytes = (byte[]) message.getPayload(); // 收到的消息是字节格式
    String topic = message.getHeaders().get("mqtt_receivedTopic").toString();

    // 根据主题分别进行消息处理。
    if (topic.matches(".+/sensor")) { // 匹配:1/sensor
    String sensorSn = topic.split("/")[0];
    System.out.println("传感器" + sensorSn + ": 的消息: " + payload);
    } else if (topic.equals("collector")) {
    System.out.println("采集器的消息:" + payload);
    } else {
    System.out.println("丢弃消息:主题[" + topic + "],负载:" + payload);
    }

    };
    }


    /**
    * 4、发送消息
    */

    /**
    * 出站通道
    */
    @Bean
    public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
    }

    /**
    * 出站
    */
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler outbound() {

    // 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("publishClient", mqttClientFactory());
    messageHandler.setAsync(true); // 如果设置成true,即异步,发送消息时将不会阻塞。
    messageHandler.setDefaultTopic("command");
    messageHandler.setDefaultQos(1); // 设置默认QoS

    // Paho消息转换器
    DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();

    // defaultPahoMessageConverter.setPayloadAsBytes(true); // 发送默认按字节类型发送消息
    messageHandler.setConverter(defaultPahoMessageConverter);
    return messageHandler;
    }

    }
  • MqttGateway接口

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MqttGateway {
    // 定义重载方法,用于消息发送
    void sendToMqtt(String payload);
    // 指定topic进行消息发送
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
    }
  • MyMessage.java

    public class MyMessage implements Serializable {

    private String topic;
    private String content;

    public String getTopic() {
    return topic;
    }

    public void setTopic(String topic) {
    this.topic = topic;
    }

    public String getContent() {
    return content;
    }

    public void setContent(String content) {
    this.content = content;
    }
    }
  • MqttController.java

    /**
    * 对外暴露发送消息的controller
    */
    @RestController
    public class MqttController {
    @Resource
    private MqttGateway mqttGateway;

    @PostMapping("/send")
    public String send(@RequestBody MyMessage myMessage) {
    // 发送消息到指定主题
    mqttGateway.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent());
    return "send topic: " + myMessage.getTopic()+ ", message : " + myMessage.getContent();
    }
    }

术语 Terminology

本规范中用到的关键字 必须 MUST,不能 MUST NOT,要求 REQUIRED,将会 SHALL,不会 SHALL NOT,应该 SHOULD,不应该 SHOULD NOT,推荐 RECOMMENDED,可以 MAY,可选 OPTIONAL 都是按照 IETF RFC 2119 [RFC2119] 中的描述解释。

参考:

  1. b站-迈向物联网第一步——MQTT理论知识详解
  2. 阿里云mqtt协议规范
  3. mqtt协议中文版