[toc]

前言

MQTT协议是一种轻量级的通信协议,可以用于物联网设备通信以及移动设备通信等场景。掌握MQTT协议的基本概念和工作原理是开发物联网应用程序的重要基础。

一、MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。

二、MQTT特点

轻量级:MQTT协议使用的是二进制协议,通信数据包很小,可以在网络带宽较小的情况下使用。
易于实现:MQTT协议的实现比较简单,使用方便。
可靠性高:MQTT协议支持数据包重传,可以保证消息的可靠性。
支持QoS服务质量等级:MQTT协议支持三种QoS服务质量等级,可以根据需求选择合适的服务质量等级。

三、MQTT原理

MQTT是一个基于客户端-服务器的协议

正因为基于客户端-服务器形式,MQTT协议中需要三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,发布者和订阅者都是客户端,代理是服务器,发布者可以同时是订阅者。

举个例子:

我是一个短视频app用户,我对游戏(这时“游戏”就是主题)比较感兴趣,于是我关注了游戏(订阅“游戏”主题),期间app平台每天就推送新的游戏的视频(这就是推送消息,视频则是有效载荷 ),这时我就是订阅者。而我平常也喜欢摄影,常常拍有趣的事情,于是我就向app平台上传视频分享(发布消息),定义标题为摄影(这时“摄影”就是主题),而这时我是发布者,同时我也可以关注其他主题的短视频(我是关注其他主题的其中一员,订阅者可以有很多个),所以我此时既是发布者也是订阅者,而app平台则是服务器。

四、MQTT数据格式

MQTT控制报文由三部分组成,MQTT所有收、发的数据都是按照这个格式的,因此简单是MQTT的一个特点:

其中固定头必须有,可变头和消息体(有效载荷)在固定头的部分消息类型是可以没有。

1.固定头


固定头(Fixed header):由消息类型、标志、剩余长度三部分组成。

(1)消息类型

消息类型定义了消息的种类和用途,通过使用不同的MQTT消息类型,可以实现消息的发布、订阅、确认、取消等功能,以满足不同场景的通信需求。

消息类型共16个,如下:

名字报文流动方向描述
Reserved0禁止保留
CONNECT1客户端到服务端客户端请求连接服务端
CONNACK2服务端到客户端连接报文确认
PUBLISH3两个方向都允许发布消息
PUBACK4两个方向都允许QoS 1消息发布收到确认
PUBREC5两个方向都允许发布收到(保证交付第一步)
PUBREL6两个方向都允许发布释放(保证交付第二步)
PUBCOMP7两个方向都允许QoS 2消息发布完成(保证交互第三步)
SUBSCRIBE8客户端到服务端客户端订阅请求
SUBACK9服务端到客户端订阅请求报文确认
UNSUBSCRIBE10客户端到服务端客户端取消订阅请求
UNSUBACK11服务端到客户端取消订阅报文确认
PINGREQ12客户端到服务端心跳请求
PINGRESP13服务端到客户端心跳响应
DISCONNECT14客户端到服务端客户端断开连接
Reserved15禁止保留

(2)标志

标志(Flags)用于在MQTT协议中标识消息的不同属性和操作。DUPQoSRETAIN都属于标志,这三个标志下面会一一介绍。

标志位如下:

控制报文固定报头标志Bit 3Bit 2Bit 1Bit 0
CONNECTReserved0000
CONNACKReserved0000
PUBLISHUsed in MQTT 3.1.1DUP1QoS2QoS2RETAIN3
PUBACKReserved0000
PUBRECReserved0000
PUBRELReserved0010
PUBCOMPReserved0000
SUBSCRIBEReserved0010
SUBACKReserved0000
UNSUBSCRIBEReserved0010
UNSUBACKReserved0000
PINGREQReserved0000
PINGRESPReserved0000
DISCONNECTReserved0000
重发标志 DUP

DUP标志被设置为0,表示客户端或服务端第一次请求发送这个PUBLISH报文。DUP标志被设置为1,表示这可能是一个早前报文请求的重发。

客户端或服务端请求重发一个PUBLISH报文时,必须将DUP标志设置为1。对于QoS 0的消息,DUP标志必须设置为0 。

服务质量等级 QoS

服务质量(QoS)是MQTT的避免信息丢失及对数据保护的一个措施。

许多人知道,TCP/IP协议是可靠的,安全的,MQTT是基于TCP/IP协议的。为何还还要加QoS呢?其实TCP/IP协议是相对安全的,而并非绝对的安全,所以也会出现有数据丢失的可能,而QoS无疑是给MQTT添加了一层保障。

QoS是固定报头第一个字节中的第一第二位数据,这两位数据用来设置等级。

QoS值Bit 2Bit 1描述
000最多发一次
101至少发一次
210只发一次
-11保留位
保留标志 RETAIN

发布保留标识,表示服务器要保留这次推送的信息,如果有新的订阅者出现,就把这消息推送给它,如果设有那么推送至当前订阅者后释放。

(3)剩余长度

剩余长度在协议中规定为1到4个字节长,表示当前报文剩余部分的字节数,包括可变报头和负载的数据。

剩余长度详情及计算可以看一下这篇:

(66条消息) MQTT 剩余长度_但是那是但是的博客-CSDN博客

2.可变头

可变头(Variable header):可变报头的内容根据消息类型的不同而不同,也可以说可变报头的内容根据固定头的不同而不同。

可变头有个比较需要注重的一点,这点比较容易混淆。以PUBLISH报文举个列子,如图:

描述76543210
Topic Name 主题名
byte 1Length MSB (0)00000000
byte 2Length LSB (3)00000011
byte 3‘a’ (0x61)01100001
byte 4‘/’ (0x2F)00101111
byte 5‘b’ (0x62)01100010
报文标识符
byte 6报文标识符 MSB (0)00000000
byte 7报文标识符 LSB (10)00001010

很多人可能会把Length MSB和报文标识符 MSB,Length LSB和报文标识符 LSB混淆,许多博客可能没有写,或者写了并没有着重讲解,导致知识混乱,实际上他们是不一样的。

(1)长度Length

Length MSB和Length LSB代表后面协议名(或主题名)的长度。Length MSB和Length LSB

是一起的。

主题名为 “a/b”,Length MSB和Length LSB加起来等于3,3代表着后面跟了3个字节,即“a/b”。

(2)报文标识符

报文标识符 MSB和报文标识符 LSB统称报文标识符,他俩也是一起的。

客户端每次发送一个新的控制报文类型时都必须分配一个当前未使用的报文标识符。如果一个客户端要重发这个特殊的控制报文,在随后重发那个报文时,它必须使用相同的标识符。当客户端处理完这个报文对应的确认后,这个报文标识符就释放可重用。

有些控制报文并不包含报文标识符。包含报文标识符的控制报文有PUBLISH(QoS > 0时), PUBACK,PUBREC,PUBREL,PUBCOMP,SUBSCRIBE, SUBACK,UNSUBSCRIBE,UNSUBACK。

3.有效载荷

有效载荷(Payload):发布或接收的数据。有效载荷可以是任何类型的数据,例如文本、二进制流或JSON格式的数据。

发布者将有效载荷发布到特定主题上,订阅者可以订阅该主题并接收有效载荷。有效载荷是MQTT协议传输数据的核心,也是其实现低延迟、高效率的重要因素之一。

下面是固定头不同消息类型是否需要有效载荷:

控制报文有效载荷
CONNECT需要
CONNACK不需要
PUBLISH可选
PUBACK不需要
PUBREC不需要
PUBREL不需要
PUBCOMP不需要
SUBSCRIBE需要
SUBACK需要
UNSUBSCRIBE需要
UNSUBACK不需要
PINGREQ不需要
PINGRESP不需要
DISCONNECT不需要

附录

使用官方 Docker 镜像快速安装和运行 EMQX,并使用 Docker Compose 实现集群搭建:
https://www.emqx.io/docs/zh/latest/deploy/install-docker.html

DEMO

go get github.com/eclipse/paho.mqtt.golang
package main

import (
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "log"
    "os"
    "strconv"
    "time"
)

func main() {
    options := mqtt.NewClientOptions()
    options.AddBroker("mqtt://192.168.2.140:1883")
    options.SetAutoReconnect(true)
    options.SetPassword("qwert")
    options.SetUsername("mqtt")
    client := mqtt.NewClient(options)
    token := client.Connect()
    if token.Wait() && token.Error() != nil {
        log.Println("error:", token.Error())
        os.Exit(0)
    }
    topic := "test_topic"
    i := 0

    // 监听遗嘱
    go func() {
        testWill := "test_will"
        client.Subscribe(testWill, 1, func(client mqtt.Client, message mqtt.Message) {
            log.Println("Subscribe topic:", message.Topic(), "	message:", string(message.Payload()))
        })
    }()

    // 发布消息
    for true {
        i++
        message := "{"msg":"test message","message_id":"" + strconv.Itoa(i) + ""}"
        token = client.Publish(topic, 2, true, message)
        log.Println("Publish message:", message)
        time.Sleep(1000 * time.Millisecond)
    }

}
package main

import (
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "log"
    "os"
)

func main() {
    options := mqtt.NewClientOptions()
    options.AddBroker("mqtt://192.168.2.140:1883")
    options.SetAutoReconnect(true)
    options.SetCleanSession(false)
    options.SetClientID("subscribe_001")

    // 设置遗嘱消息
    options.SetWill("test_will", "{"clientId":"subscribe_002","type":"1002"}", byte(2), true)
    options.SetKeepAlive(5)
    options.SetPassword("qwert")
    options.SetUsername("mqtt")
    client := mqtt.NewClient(options)
    token := client.Connect()
    if token.Wait() && token.Error() != nil {
        log.Println("error:", token.Error())
        os.Exit(0)
    }
    topic := "test_topic"
    client.Subscribe(topic, 2, func(client mqtt.Client, message mqtt.Message) {
        log.Println("Subscribe topic:", message.Topic(), "	message:", string(message.Payload()))
    })

    select {}
}