Paho MQTT Python Client 使用手册
介紹
paho.mqtt.python 是一個MQTT用戶端python庫,能夠讓應用程式簡單方便的連接到MQTT代理進行消息發佈、訂閱主題和消息接收。
目前 paho.mqtt.python-1.5.1 版本支持5.0、3.1.1和3.1 MQTT協定,同時支援Python 2.7.9+或3.5+。
環境
- MQTT代理:EMQ X Broker 4.2.6
- Python 3.9.0
- paho-mqtt 1.5.1
準備
- 參照 EMQ X Broker安裝啟動教程
成功啟動EMQ後,可通過流覽器訪問 http://localhost:18083 admin/public 進入EMQ控制台,在【工具 > Websocket】模組可方便進行用戶端連接、訂閱、消息接收、發佈等測試和調試工作
emqx_broker
- Python 安裝省略
- paho-mqtt 安裝
1. pip
install paho-mqtt
快速開始
Python快速實現MQTT主題訂閱和消息接收
1. import paho.mqtt.client
as mq_tt
2.
3.
4. def on_connect(client,
userdata, flags, rc):
5. """
6. 回呼函數:當MQTT代理回應用戶端連接請求時觸發
7. :param client: 回檔返回的用戶端實例
8. :param userdata: Client()或user_data_set()中設置的私有使用者資料
9. :param flags: MQTT代理發送的回應標識
10. :param rc: 連接結果
11. 0:連接成功
12.
1:連接被拒絕 - 協議版本
13.
2: 連接被拒絕 - 用戶端識別字無效
14.
3:連接被拒絕 - 伺服器不可用
15.
4:連接被拒絕 - 用戶名或密碼錯誤
16.
5:連接被拒絕 - 未授權6-255:當前未使用
17. :return:
18. """
19. print("Connected with
result code "+str(rc))
20. # 在on_connect()中進行消息訂閱,是因為如果丟失連接進行重連,主題也會重新被訂閱
21. client.subscribe("testTopic/#")
22.
23.
24. def on_message(client,
userdata, message):
25. """
26. 回呼函數:當接收到MQTT代理發佈的消息時觸發
27. :param client: 回檔返回的用戶端實例
28. :param userdata: Client()或user_data_set()中設置的私有使用者資料
29. :param message: MQTTMessage的一個實例,這是一個包含主題,有效負載,qos,retain的類
30. :return:
31. """
32. print(message.topic+"
"+str(message.payload))
33.
34.
35. mq_client
= mq_tt.Client(client_id='www.cooooder.com')
36. mq_client.on_connect
= on_connect
37. mq_client.on_message
= on_message
38. # 連接到EMQX Broker MQTT代理
39. mq_client.connect("127.0.0.1", 1883, 60)
40.
41. # 阻塞式自動處理收發資料、自動處理重新連接,所有的資料處理邏輯都在預先設定好的回呼函數中進行的
42. mq_client.loop_forever()
在 EMQ X Broker - 【用戶端】可以看到用戶端已連接
在 EMQ X Broker - 【Websocket】發佈testTopic主題消息
Python程式列印出接收到的消息
1. Connected
with result code 0
2. testTopic
b'{ "msg":
"Hello, World!" }'
3. testTopic
b'{
"msg": "Hello, World2!" }'
常用API
Client
Client類實例常規用法流程如下:
- 創建一個用戶端實例
- 使用任一 connect*() 方法連接到MQTT代理
- 調用任一 loop*() 方法保持與MQTT代理通訊
- 使用 subscribe() 方法訂閱一個主題並接收消息
- 使用 publish() 方法向MQTT代理發佈消息
- 使用 disconnect() 中斷與MQTT代理的連接
client()
1. # 構造方法
2. Client(client_id="",
clean_session=True,
userdata=None,
protocol=MQTTv311, transport="tcp")
- client_id
- 連接到MQTT代理時使用的唯一用戶端ID字串。如果為0或者為None,將隨機生成分配一個,這種情況下clean_session參數必須為True
- clean_session
- 布林數值型別,用來確定用戶端類型。如果為True,當斷開連接時,MQTT代理將移除該用戶端的所有資訊;如果為False,用戶端則為持久用戶端,當斷開連接時,訂閱資訊和訊息佇列將被MQTT保存
- 當斷開連接時,用戶端不會丟棄自己發送的消息。調用 connect() 或者 reconnect() 將導致重新發送消息,只有使用 reinitialise() 可以將用戶端重置為初始狀態
- userdata
- 使用者定義的任意類型資料作為 userdata 參數傳遞給回呼函數,可以通過調用user_data_set() 方法進行更新,不過會有點延遲
- protocol
- 用戶端使用的MQTT協定版本,可以是 MQTTv31 或 MQTTv311
- transport
- 傳輸形式,設置為websockets,則會通過websockets發送給MQTT,默認tcp
- 示例
1. import paho.mqtt.client
as mqtt
2.
3. mqttc =
mqtt.Client()
connect()
1. connect(host,
port=1883,
keepalive=60,
bind_address="")
用戶端連接MQTT代理,這是一個阻塞函數
- host
- 代理的主機名稱或者IP位址
- port
- 連接服務的埠,預設1883
- keepalive
- 心跳檢測時長
- bind_address
- 綁定此用戶端本地網路的IP位址
- 回呼函數
connect_async()
1. connect_async(host,
port=1883,
keepalive=60,
bind_address="")
與 loop_start() 結合使用以非阻塞的形式進行連接,在調用 loop_start() 之前,連接不會完成
disconnect()
1. disconnect()
徹底與MQTT代理斷開,使用該方法斷開連接不會讓代理發送遺囑消息
- 回呼函數
enable_logger()
1. enable_logger(logger=None)
使用標準的Python日誌包啟用日誌記錄,可以與on_log回檔方法同時使用
reconnect()
1. reconnect()
使用之前的資訊配置重新連接代理,在調用之前必須先調用 connect*() 方法
reinitialise()
重置用戶端為初始化狀態,參數與 client() 一致
- 示例
1. mqttc.reinitialise()
loop()
1. loop(timeout=1.0,
max_packets=1)
定期調用處理事件
- timeout
- 最大阻塞的秒數
- max_packets
- 已過期,不設置
- 示例
1. while True:
2. mqttc.loop()
loop_start() /
loop_stop()
1. loop_start()
2. loop_stop(force=False)
這些函數實現了網路迴圈的執行緒介面,在執行connect*()之前或者之後調用一次 loop_start() ,後臺會自動運行一個執行緒調用 loop() ,這樣就釋放了主執行緒去執行其它工作,避免發生阻塞,這個調用也處理重新連接到代理。調用 loop_stop() 停止後臺執行緒
1. mqttc.connect("127.0.0.1")
2. mqttc.loop_start()
3.
4. while True:
5. mqttc.publish("topicTest", 'test')
loop_forever()
阻塞式網路迴圈處理事件,直到用戶端調用 disconnect() 才會返回,它會自動重連
publish()
1. publish(topic,
payload=None, qos=0, retain=False)
用戶端向MQTT代理發送一條消息
- topic
- 消息發佈的主題,不能為None或者空字元
- payload
- 發送的消息內容,如果沒有賦值或者賦值為None,則將使用零長度的消息。傳遞int或者float將會被轉換為該數位的字串, 如果想發送真正的int或者float資料,使用 struct.pack() 去創建
- qos
- 消息的服務品質等級,必須為0 or 1 or 2
- retain
- 設置為True,MQTT代理保留最後一條消息,以便分發給消息發佈後的訂閱者
- 回呼函數
Return MQTTMessageInfo對象
reconnect_delay_set()
1. reconnect_delay_set(min_delay=1,
max_delay=120)
斷開連接後,用戶端將自動嘗試連接,每次嘗試間隔 [min_delay,
max_delay] 秒,從min_delay開始逐漸加倍至max_delay,連接成功後,延遲重置為min_delay
subscribe()
1. subscribe(topic,
qos=0)
訂閱一個或多個主題,該方法有三種不同的調用方式:
1. # 1. 字串和整數
2. subscribe("my/topic", 2)
3. # 2. 字串和整數元組
4. subscribe(("my/topic", 1))
5. # 3. 字串和整數元組的清單
6. # 單次調用多個主題,比多次調用subscribe更有效
7. subscribe([("my/topic", 0), ("another/topic", 2)])
Return 一個元組 (result, mid)
- result
- 成功:MQTT_ERR_SUCCESS
- 失敗:(MQTT_ERR_NO_CONN,
None)
- mid
- 消息ID
- 回呼函數
unsubscribe()
1. unsubscribe(topic)
取消一個或多個主題
- topic
- 主題字串或者字串清單
Return 一個元組 (result, mid) - result
- 成功:MQTT_ERR_SUCCESS
- 失敗:(MQTT_ERR_NO_CONN,
None)
- mid
- 消息ID
- 回呼函數
user_data_set()
1. user_data_set(userdata)
設置傳遞給回呼函數的使用者私有資料
username_pw_set()
1. username_pw_set(username,password = None)
設置用戶名和密碼(可選)供MQTT代理驗證,必須在 connect*() 之前調用
will_set()
1. will_set(topic,
payload=None, qos=0, retain=False)
設置遺囑發送給MQTT代理,如果用戶端在沒有調用 disconnect() 的情況下斷開連接,則MQTT代理將會代表它發送該消息
- topic
- 遺囑消息發佈的主題,不能為None或者空字元
- payload
- 遺囑發送的消息內容,如果沒有賦值或者賦值為None,則將使用零長度的消息作為遺囑。傳遞int或者float將會被轉換為該數位的字串, 如果想發送真正的int或者float資料,使用 struct.pack() 去創建
- qos
- 遺囑消息的服務品質等級,必須為0 or 1 or 2
- retain
- 設置為True,MQTT代理保留最後一條消息,以便分發給消息發佈後的訂閱者
回呼函數
on_connect()
1. on_connect(client,
userdata, flags, rc)
MQTT代理回應用戶端連接請求時( connect*() )調用
- client
- 回檔返回的用戶端實例
- userdata
- Client() 或 user_data_set() 中設置的私有使用者資料
- flags
- MQTT代理發送的回應標識
- rc
- 連接結果
- 0:連接成功
- 1:連接被拒絕 - 協議版本
- 2:連接被拒絕 - 用戶端識別字無效
- 3:連接被拒絕 - 伺服器不可用
- 4:連接被拒絕 - 用戶名或密碼錯誤
- 5:連接被拒絕 - 未授權6-255:當前未使用
- 示例
1. def on_connect(client,
userdata, flags, rc):
2. print("Connected with
result code "+str(rc))
3.
4. mqttc.on_connect
= on_connect
on_disconnect()
當用戶端與MQTT代理斷開連接時 (disconnect()) 調用
1. on_disconnect(client,
userdata, rc)
- client
- 回檔返回的用戶端實例
- userdata
- Client() 或 user_data_set() 中設置的私有使用者資料
- rc
- 斷開結果,如果是 MQTT_ERR_SUCCESS(0),則是回應disconnect()調用
- 如果是其它值,則是意外關閉
- 示例
1. def on_disconnect(client,
userdata, rc):
2. if rc != 0:
3. print("Unexpected
disconnection.")
4.
5. mqttc.on_disconnect
= on_disconnect
on_message()
1. on_message(client,
userdata, message)
在用戶端收到已訂閱主題的消息,並且該消息沒有被主題篩檢程式 message_callback_add() 匹配時調用
- client
- 回檔返回的用戶端實例
- userdata
- Client() 或 user_data_set() 中設置的私有使用者資料
- message
- MQTTMessage實例,包含 topic、payload、qos、retain
- 示例
1. def on_message(client,
userdata, message):
2. print("Received message
'" +
str(message.payload) + "' on topic '"
3. + message.topic + "'
with QoS " + str(message.qos))
4.
5. mqttc.on_message
= on_message
message_callback_add()
1. message_callback_add(sub,
callback)
定義特定訂閱主題傳入的消息回檔,包括萬用字元,比如:用戶端訂閱了 sensor/#主題,一個回檔處理 sensor/temperature,另一個回檔處理 sensor/humidity
- sub
- 待過濾的主題,只能定義一個回檔
- callback
- 回呼函數,與 on_message() 相同形式
- 示例
1. # 處理溫度消息回檔
2. def temperature_callback(client,
userdata, message):
3. print(message.topic+"
"+str(message.payload))
4.
5. # 處理濕度消息回檔
6. def humidity_callback(client,
userdata, message):
7. print(message.topic+"
"+str(message.payload))
8.
9.
10. mqttc.subscribe('sensor/#')
11. mqttc.message_callback_add('sensor/temperature',
temperature_callback)
12. mqttc.message_callback_add('sensor/humidity',
humidity_callback)
message_callback_remove()
1. message_callback_remove(sub)
刪除先前註冊的主題/訂閱特定回檔
on_publish()
1. on_publish(client,
userdata, mid)
當用戶端調用 publish() 發佈一條消息至MQTT代理後調用。Qos=1或2時,意味著用戶端和代理完成握手,Qos=0時,僅表示消息離開用戶端。
- mid
- mid變數與從相應的 publish() 返回的mid變數匹配,以允許跟蹤傳出的消息。
即使 publish() 調用返回,也不總意味著消息已發送
on_subscribe()
1. on_subscribe(client,
userdata, mid, granted_qos)
當MQTT代理回應訂閱請求時被調用
- mid
- mid變數匹配從相應的 subscribe() 返回的mid變數
- granted_qos
- 整數清單,它提供了代理為每個不同的訂閱請求授予的QoS級別
on_unsubscribe()
1. on_unsubscribe(client,
userdata, mid)
當代理回應取消訂閱請求時調用
- mid
- mid匹配從相應的 unsubscribe() 返回的mid變數
on_log()
1. on_log(client,
userdata, level, buf)
當用戶端有日誌資訊時調用
- level
- 消息嚴重性
- MQTT_LOG_INFO
- MQTT_LOG_NOTICE
- MQTT_LOG_WARNING
- MQTT_LOG_ERR
- MQTT_LOG_DEBUG
- buf
- 該消息本身就在buf裡
可以與標準的Python logging同時使用,通過enable_logger()方法啟用
示例
1. import paho.mqtt.client
as mq_tt
2. import time
3. import logging
4.
5.
6. logging.basicConfig(level='DEBUG', format='%(asctime)s
[%(name)s:%(lineno)d] [%(levelname)s]- %(message)s')
7.
8.
9. def on_connect(client,
userdata, flags, rc):
10. print("Connected with
result code "+str(rc))
11. client.subscribe("topicTest/#")
12.
13.
14. def topic_one_callback(client,
userdata, message):
15. print(message.topic+"
"+str(message.payload))
16.
17.
18. def topic_two_callback(client,
userdata, message):
19. print(message.topic+"
"+str(message.payload))
20.
21.
22. mq_client
= mq_tt.Client(client_id='www.cooooder.com')
23. mq_client.enable_logger()
24. mq_client.on_connect
= on_connect
25. mq_client.message_callback_add("topicTest/one",
topic_one_callback)
26. mq_client.message_callback_add("topicTest/two",
topic_two_callback)
27. # 連接到EMQX Broker MQTT代理
28. mq_client.connect("127.0.0.1", 1883, 60)
29.
30. mq_client.loop_start()
出處:https://www.cooooder.com/archives/20210303
沒有留言:
張貼留言