2022年12月28日 星期三

Paho MQTT Python Client 使用手册

Paho MQTT Python Client 使用手册

 

介紹

paho.mqtt.python 是一個MQTT用戶端python庫,能夠讓應用程式簡單方便的連接到MQTT代理進行消息發佈、訂閱主題和消息接收。

目前 paho.mqtt.python-1.5.1 版本支持5.03.1.13.1 MQTT協定,同時支援Python 2.7.9+3.5+

環境

  • MQTT代理EMQ X Broker 4.2.6
  • Python 3.9.0
  • paho-mqtt 1.5.1

準備

  1. 參照 EMQ X Broker安裝啟動教程
    成功啟動EMQ後,可通過流覽器訪問 http://localhost:18083 admin/public 進入EMQ控制台,在【工具 > Websocket】模組可方便進行用戶端連接、訂閱、消息接收、發佈等測試和調試工作
    emqx_broker

 

  1. Python 安裝省略
  2. paho-mqtt 安裝

1.  pip install paho-mqtt

快速開始

Python快速實現MQTT主題訂閱和消息接收

1.  import paho.mqtt.client as mq_tt

2.   

3.   

4.  def on_connect(client, userdata, flags, rc):

5.      """

6.      回呼函數:當MQTT代理回應用戶端連接請求時觸發

7.      :param client: 回檔返回的用戶端實例

8.      :param userdata: Client()user_data_set()中設置的私有使用者資料

9.      :param flags: MQTT代理發送的回應標識

10.     :param rc: 連接結果

11.             0:連接成功

12.             1:連接被拒絕 - 協議版本

13.             2: 連接被拒絕 - 用戶端識別字無效

14.             3:連接被拒絕 - 伺服器不可用

15.             4:連接被拒絕 - 用戶名或密碼錯誤

16.             5:連接被拒絕 - 未授權6-255:當前未使用

17.     :return:

18.     """

19.     print("Connected with result code "+str(rc))

20.     # on_connect()中進行消息訂閱,是因為如果丟失連接進行重連,主題也會重新被訂閱

21.     client.subscribe("testTopic/#")

22.  

23.  

24. def on_message(client, userdata, message):

25.     """

26.     回呼函數:當接收到MQTT代理發佈的消息時觸發

27.     :param client: 回檔返回的用戶端實例

28.     :param userdata: Client()user_data_set()中設置的私有使用者資料

29.     :param message: MQTTMessage的一個實例,這是一個包含主題,有效負載,qosretain的類

30.     :return:

31.     """

32.     print(message.topic+" "+str(message.payload))

33.  

34.  

35. mq_client = mq_tt.Client(client_id='www.cooooder.com')

36. mq_client.on_connect = on_connect

37. mq_client.on_message = on_message

38. # 連接到EMQX Broker MQTT代理

39. mq_client.connect("127.0.0.1", 1883, 60)

40.  

41. # 阻塞式自動處理收發資料、自動處理重新連接,所有的資料處理邏輯都在預先設定好的回呼函數中進行的

42. mq_client.loop_forever()

EMQ X Broker - 【用戶端】可以看到用戶端已連接

emqx_client

 

EMQ X Broker - Websocket】發佈testTopic主題消息

emqx_publish

 

Python程式列印出接收到的消息

1.  Connected with result code 0

2.  testTopic b'{ "msg": "Hello, World!" }'

3.  testTopic b'{ "msg": "Hello, World2!" }'

常用API

Client

Client類實例常規用法流程如下:

  • 創建一個用戶端實例
  • 使用任一 connect*() 方法連接到MQTT代理
  • 調用任一 loop*() 方法保持與MQTT代理通訊
  • 使用 subscribe() 方法訂閱一個主題並接收消息
  • 使用 publish() 方法向MQTT代理發佈消息
  • 使用 disconnect() 中斷與MQTT代理的連接

client()

1.  # 構造方法

2.  Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")

  • client_id
    • 連接到MQTT代理時使用的唯一用戶端ID字串。如果為0或者為None,將隨機生成分配一個,這種情況下clean_session參數必須為True
  • clean_session
    • 布林數值型別,用來確定用戶端類型。如果為True,當斷開連接時,MQTT代理將移除該用戶端的所有資訊;如果為False,用戶端則為持久用戶端,當斷開連接時,訂閱資訊和訊息佇列將被MQTT保存
    • 當斷開連接時,用戶端不會丟棄自己發送的消息。調用 connect() 或者 reconnect() 將導致重新發送消息,只有使用 reinitialise() 可以將用戶端重置為初始狀態
  • userdata
    • 使用者定義的任意類型資料作為 userdata 參數傳遞給回呼函數,可以通過調用user_data_set() 方法進行更新,不過會有點延遲
  • protocol
    • 用戶端使用的MQTT協定版本,可以是 MQTTv31 MQTTv311
  • transport
    • 傳輸形式,設置為websockets,則會通過websockets發送給MQTT,默認tcp
  • 示例

1.  import paho.mqtt.client as mqtt

2.   

3.  mqttc = mqtt.Client()

connect()

1.  connect(host, port=1883, keepalive=60, bind_address="")

用戶端連接MQTT代理,這是一個阻塞函數

  • host
    • 代理的主機名稱或者IP位址
  • port
    • 連接服務的埠,預設1883
  • keepalive
    • 心跳檢測時長
  • bind_address
    • 綁定此用戶端本地網路的IP位址
  • 回呼函數

connect_async()

1.  connect_async(host, port=1883, keepalive=60, bind_address="")

 loop_start() 結合使用以非阻塞的形式進行連接,在調用 loop_start() 之前,連接不會完成

disconnect()

1.  disconnect()

徹底與MQTT代理斷開,使用該方法斷開連接不會讓代理發送遺囑消息

enable_logger()

1.  enable_logger(logger=None)

使用標準的Python日誌包啟用日誌記錄,可以與on_log回檔方法同時使用

reconnect()

1.  reconnect()

使用之前的資訊配置重新連接代理,在調用之前必須先調用 connect*() 方法

reinitialise()

重置用戶端為初始化狀態,參數與 client() 一致

  • 示例

1.  mqttc.reinitialise()

loop()

1.  loop(timeout=1.0, max_packets=1)

定期調用處理事件

  • timeout
    • 最大阻塞的秒數
  • max_packets
    • 已過期,不設置
  • 示例

1.  while True:

2.      mqttc.loop()

loop_start() / loop_stop()

1.  loop_start()

2.  loop_stop(force=False)

這些函數實現了網路迴圈的執行緒介面,在執行connect*()之前或者之後調用一次 loop_start() ,後臺會自動運行一個執行緒調用 loop() ,這樣就釋放了主執行緒去執行其它工作,避免發生阻塞,這個調用也處理重新連接到代理。調用 loop_stop() 停止後臺執行緒

1.  mqttc.connect("127.0.0.1")

2.  mqttc.loop_start()

3.   

4.  while True:

5.      mqttc.publish("topicTest", 'test')

loop_forever()

阻塞式網路迴圈處理事件,直到用戶端調用 disconnect() 才會返回,它會自動重連

publish()

1.  publish(topic, payload=None, qos=0, retain=False)

用戶端向MQTT代理發送一條消息

  • topic
    • 消息發佈的主題,不能為None或者空字元
  • payload
    • 發送的消息內容,如果沒有賦值或者賦值為None,則將使用零長度的消息。傳遞int或者float將會被轉換為該數位的字串, 如果想發送真正的int或者float資料,使用 struct.pack() 去創建
  • qos
    • 消息的服務品質等級,必須為0 or 1 or 2
  • retain
    • 設置為TrueMQTT代理保留最後一條消息,以便分發給消息發佈後的訂閱者
  • 回呼函數

Return MQTTMessageInfo對象

reconnect_delay_set()

1.  reconnect_delay_set(min_delay=1, max_delay=120)

斷開連接後,用戶端將自動嘗試連接,每次嘗試間隔 [min_delay, max_delay] 秒,從min_delay開始逐漸加倍至max_delay,連接成功後,延遲重置為min_delay

subscribe()

1.  subscribe(topic, qos=0)

訂閱一個或多個主題,該方法有三種不同的調用方式:

1.  # 1. 字串和整數

2.  subscribe("my/topic", 2)

3.  # 2. 字串和整數元組

4.  subscribe(("my/topic", 1))

5.  # 3. 字串和整數元組的清單

6.  # 單次調用多個主題,比多次調用subscribe更有效

7.  subscribe([("my/topic", 0), ("another/topic", 2)])

Return 一個元組 (result, mid)

  • result
    • 成功:MQTT_ERR_SUCCESS
    • 失敗:(MQTT_ERR_NO_CONN, None)
  • mid
    • 消息ID
  • 回呼函數

unsubscribe()

1.  unsubscribe(topic)

取消一個或多個主題

  • topic
    • 主題字串或者字串清單
      Return
      一個元組 (result, mid)
  • result
    • 成功:MQTT_ERR_SUCCESS
    • 失敗:(MQTT_ERR_NO_CONN, None)
  • mid
    • 消息ID
  • 回呼函數

user_data_set()

1.  user_data_set(userdata)

設置傳遞給回呼函數的使用者私有資料

username_pw_set()

1.  username_pw_set(usernamepassword = None)

設置用戶名和密碼(可選)供MQTT代理驗證,必須在 connect*() 之前調用

will_set()

1.  will_set(topic, payload=None, qos=0, retain=False)

設置遺囑發送給MQTT代理,如果用戶端在沒有調用 disconnect() 的情況下斷開連接,則MQTT代理將會代表它發送該消息

  • topic
    • 遺囑消息發佈的主題,不能為None或者空字元
  • payload
    • 遺囑發送的消息內容,如果沒有賦值或者賦值為None,則將使用零長度的消息作為遺囑。傳遞int或者float將會被轉換為該數位的字串, 如果想發送真正的int或者float資料,使用 struct.pack() 去創建
  • qos
    • 遺囑消息的服務品質等級,必須為0 or 1 or 2
  • retain
    • 設置為TrueMQTT代理保留最後一條消息,以便分發給消息發佈後的訂閱者

回呼函數

on_connect()

1.  on_connect(client, userdata, flags, rc)

MQTT代理回應用戶端連接請求時connect*() )調用

  • client
    • 回檔返回的用戶端實例
  • userdata
  • flags
    • MQTT代理發送的回應標識
  • rc
    • 連接結果
      • 0:連接成功
      • 1:連接被拒絕 - 協議版本
      • 2:連接被拒絕 - 用戶端識別字無效
      • 3:連接被拒絕 - 伺服器不可用
      • 4:連接被拒絕 - 用戶名或密碼錯誤
      • 5:連接被拒絕 - 未授權6-255:當前未使用
  • 示例

1.  def on_connect(client, userdata, flags, rc):

2.      print("Connected with result code "+str(rc))

3.   

4.  mqttc.on_connect = on_connect

on_disconnect()

當用戶端與MQTT代理斷開連接時 (disconnect()) 調用

1.  on_disconnect(client, userdata, rc)

  • client
    • 回檔返回的用戶端實例
  • userdata
  • rc
    • 斷開結果,如果是 MQTT_ERR_SUCCESS(0),則是回應disconnect()調用
    • 如果是其它值,則是意外關閉
  • 示例

1.  def on_disconnect(client, userdata, rc):

2.      if rc != 0:

3.          print("Unexpected disconnection.")

4.   

5.  mqttc.on_disconnect = on_disconnect

on_message()

1.  on_message(client, userdata, message)

在用戶端收到已訂閱主題的消息,並且該消息沒有被主題篩檢程式 message_callback_add() 匹配時調用

  • client
    • 回檔返回的用戶端實例
  • userdata
  • message
    • MQTTMessage實例,包含 topicpayloadqosretain
  • 示例

1.  def on_message(client, userdata, message):

2.      print("Received message '" + str(message.payload) + "' on topic '"

3.          + message.topic + "' with QoS " + str(message.qos))

4.   

5.  mqttc.on_message = on_message

message_callback_add()

1.  message_callback_add(sub, callback)

定義特定訂閱主題傳入的消息回檔,包括萬用字元,比如:用戶端訂閱了 sensor/#主題,一個回檔處理 sensor/temperature,另一個回檔處理 sensor/humidity

  • sub
    • 待過濾的主題,只能定義一個回檔
  • callback
  • 示例

1.  # 處理溫度消息回檔

2.  def temperature_callback(client, userdata, message):

3.      print(message.topic+" "+str(message.payload))

4.   

5.  # 處理濕度消息回檔

6.  def humidity_callback(client, userdata, message):

7.      print(message.topic+" "+str(message.payload))

8.   

9.   

10. mqttc.subscribe('sensor/#')

11. mqttc.message_callback_add('sensor/temperature', temperature_callback)

12. mqttc.message_callback_add('sensor/humidity', humidity_callback)

message_callback_remove()

1.  message_callback_remove(sub)

刪除先前註冊的主題/訂閱特定回檔

on_publish()

1.  on_publish(client, userdata, mid)

當用戶端調用 publish() 發佈一條消息至MQTT代理後調用。Qos=12時,意味著用戶端和代理完成握手,Qos=0時,僅表示消息離開用戶端。

  • mid
    • mid變數與從相應的 publish() 返回的mid變數匹配,以允許跟蹤傳出的消息。

即使 publish() 調用返回,也不總意味著消息已發送

on_subscribe()

1.  on_subscribe(client, userdata, mid, granted_qos)

MQTT代理回應訂閱請求時被調用

  • mid
    • mid變數匹配從相應的 subscribe() 返回的mid變數
  • granted_qos
    • 整數清單,它提供了代理為每個不同的訂閱請求授予的QoS級別

on_unsubscribe()

1.  on_unsubscribe(client, userdata, mid)

當代理回應取消訂閱請求時調用

on_log()

1.  on_log(client, userdata, level, buf)

當用戶端有日誌資訊時調用

  • level
    • 消息嚴重性
      • MQTT_LOG_INFO
      • MQTT_LOG_NOTICE
      • MQTT_LOG_WARNING
      • MQTT_LOG_ERR
      • MQTT_LOG_DEBUG
  • buf
    • 該消息本身就在buf
      可以與標準的Python logging同時使用,通過enable_logger()方法啟用

示例

1.  import paho.mqtt.client as mq_tt

2.  import time

3.  import logging

4.   

5.   

6.  logging.basicConfig(level='DEBUG', format='%(asctime)s [%(name)s:%(lineno)d] [%(levelname)s]- %(message)s')

7.   

8.   

9.  def on_connect(client, userdata, flags, rc):

10.     print("Connected with result code "+str(rc))

11.     client.subscribe("topicTest/#")

12.  

13.  

14. def topic_one_callback(client, userdata, message):

15.     print(message.topic+" "+str(message.payload))

16.  

17.  

18. def topic_two_callback(client, userdata, message):

19.     print(message.topic+" "+str(message.payload))

20.  

21.  

22. mq_client = mq_tt.Client(client_id='www.cooooder.com')

23. mq_client.enable_logger()

24. mq_client.on_connect = on_connect

25. mq_client.message_callback_add("topicTest/one", topic_one_callback)

26. mq_client.message_callback_add("topicTest/two", topic_two_callback)

27. # 連接到EMQX Broker MQTT代理

28. mq_client.connect("127.0.0.1", 1883, 60)

29.  

30. mq_client.loop_start()

出處:https://www.cooooder.com/archives/20210303

 

沒有留言:

張貼留言

Messaging API作為替代方案

  LINE超好用功能要沒了!LINE Notify明年3月底終止服務,有什麼替代方案? LINE Notify將於2025年3月31日結束服務,官方建議改用Messaging API作為替代方案。 //CHANNEL_ACCESS_TOKEN = 'Messaging ...