ESP32 雙核心控制 LED 與 DHT22 溫濕度感測器 (Wokwi 模擬) EX9 -- Python GUI
ESP32 與 Python GUI 應用程式透過 MQTT 進行通訊
專案概述
這個專案的目標是建立一個簡單的物聯網 (IoT) 系統,讓你的 ESP32 (在 Wokwi 模擬器上) 能發送溫度和濕度數據,並接收控制 LED 的指令。而你的電腦上則運行一個 Python 應用程式,它提供一個圖形使用者介面 (GUI) 來顯示這些數據,並發送控制指令。兩者之間透過 MQTT 協定 進行通訊。
核心組件
Wokwi ESP32 模擬器 (Arduino 程式碼):
模擬一個 ESP32 開發板。
包含一個 DHT22 溫濕度感測器和一個 LED。
連接到 Wi-Fi。
作為 MQTT 客戶端,將溫濕度數據發布到特定主題,並訂閱 LED 控制主題。
MQTT Broker (MQTT 伺服器):
一個中央訊息伺服器,負責接收和分發 MQTT 訊息。
ESP32 和 Python 應用程式都連接到它。
我們將使用一個免費的公共 MQTT Broker,例如
broker.mqttgo.io。
Python GUI 應用程式 (Python 程式碼):
在你的電腦上運行。
提供一個 Tkinter 圖形介面。
作為 MQTT 客戶端,訂閱溫濕度數據主題以顯示即時數據。
提供按鈕來發布 LED 控制指令到 MQTT 主題。
程式碼說明與設定
1. Arduino (Wokwi ESP32) 程式碼
這是運行在 Wokwi ESP32 模擬器上的程式碼。
好的,我們來從頭梳理一下 Arduino (ESP32) 和 Python 程式以及它們的操作步驟。這次我們專注於 ESP32 與 Python GUI 應用程式透過 MQTT 進行通訊,不包含 LINE Bot 功能,讓流程更清晰。
專案概述
這個專案的目標是建立一個簡單的物聯網 (IoT) 系統,讓你的 ESP32 (在 Wokwi 模擬器上) 能發送溫度和濕度數據,並接收控制 LED 的指令。而你的電腦上則運行一個 Python 應用程式,它提供一個圖形使用者介面 (GUI) 來顯示這些數據,並發送控制指令。兩者之間透過 MQTT 協定 進行通訊。
核心組件
Wokwi ESP32 模擬器 (Arduino 程式碼):
模擬一個 ESP32 開發板。
包含一個 DHT22 溫濕度感測器和一個 LED。
連接到 Wi-Fi。
作為 MQTT 客戶端,將溫濕度數據發布到特定主題,並訂閱 LED 控制主題。
MQTT Broker (MQTT 伺服器):
一個中央訊息伺服器,負責接收和分發 MQTT 訊息。
ESP32 和 Python 應用程式都連接到它。
我們將使用一個免費的公共 MQTT Broker,例如
broker.mqttgo.io。
Python GUI 應用程式 (Python 程式碼):
在你的電腦上運行。
提供一個 Tkinter 圖形介面。
作為 MQTT 客戶端,訂閱溫濕度數據主題以顯示即時數據。
提供按鈕來發布 LED 控制指令到 MQTT 主題。
程式碼說明與設定
1. Arduino (Wokwi ESP32) 程式碼
這是運行在 Wokwi ESP32 模擬器上的程式碼。
#include <WiFi.h>
#include <PubSubClient.h> // MQTT library
#include "DHT.h" // DHT sensor library
// --- WiFi & MQTT Settings ---
const char* ssid = "Wokwi-GUEST"; // Wokwi's virtual WiFi SSID
const char* password = ""; // Wokwi's virtual WiFi password (no password needed)
// Choose a public MQTT broker
// You can use "broker.mqttgo.io" or "mqtt.eclipseprojects.io"
const char* mqtt_server = "broker.mqttgo.io";
const int mqtt_port = 1883; // Default MQTT port
// Unique Client ID for MQTT (IMPORTANT: change this to something unique for your Wokwi project)
const char* mqtt_client_id = "ESP32-Wokwi-DHT-LED-YourName"; // <<-- 更改為你的獨特 ID
// --- MQTT Topics (MUST match Python app) ---
const char* mqtt_led_control_topic = "wokwi/esp32/led/control"; // 接收 LED 控制指令
const char* mqtt_temp_topic = "wokwi/esp32/dht/temperature"; // 發布溫度
const char* mqtt_humid_topic = "wokwi/esp32/dht/humidity"; // 發布濕度
const char* mqtt_status_topic = "wokwi/esp32/status"; // 發布上線狀態
// --- Hardware Settings ---
#define DHTPIN 4 // DHT22 sensor connected to ESP32 Pin 4
#define DHTTYPE DHT22 // DHT 22 (AM2302)
#define LED_PIN 2 // Built-in LED on ESP32 (or external LED on Pin 2)
DHT dht(DHTPIN, DHTTYPE);
WiFiClient espClient;
PubSubClient client(espClient);
long lastMsg = 0; // For timing sensor readings
unsigned long lastLedCommandTime = 0; // For LED timer function
bool ledTimerActive = false;
unsigned long ledTimerDuration = 10000; // 10 seconds for timer
// --- Function Prototypes ---
void setup_wifi();
void reconnect_mqtt();
void callback(char* topic, byte* payload, unsigned int length);
// --- Setup ---
void setup() {
Serial.begin(115200);
pinMode(LED_PIN, OUTPUT);
digitalWrite(LED_PIN, LOW); // Ensure LED is off initially
setup_wifi();
client.setServer(mqtt_server, mqtt_port);
client.setCallback(callback); // Set the function to call when a message is received
dht.begin();
}
// --- Main Loop ---
void loop() {
if (!client.connected()) {
reconnect_mqtt(); // Reconnect if disconnected
}
client.loop(); // Keep MQTT client alive and process incoming messages
long now = millis();
// Read sensor every 5 seconds
if (now - lastMsg > 5000) {
lastMsg = now;
float h = dht.readHumidity();
float t = dht.readTemperature(); // Read temperature as Celsius
// Check if any reads failed and exit early (to try again next cycle)
if (isnan(h) || isnan(t)) {
Serial.println("Failed to read from DHT sensor!");
} else {
Serial.print("Temperature: ");
Serial.print(t);
Serial.print(" °C, Humidity: ");
Serial.print(h);
Serial.println(" %");
// Publish sensor readings
char tempString[8];
dtostrf(t, 1, 2, tempString); // Convert float to string
client.publish(mqtt_temp_topic, tempString);
char humidString[8];
dtostrf(h, 1, 2, humidString);
client.publish(mqtt_humid_topic, humidString);
}
}
// Handle LED timer function
if (ledTimerActive && (millis() - lastLedCommandTime >= ledTimerDuration)) {
digitalWrite(LED_PIN, LOW); // Turn LED off after timer
Serial.println("LED timer ended. LED OFF.");
client.publish(mqtt_led_control_topic, "timer_off"); // Optional: inform Python app
ledTimerActive = false;
}
}
// --- WiFi Setup Function ---
void setup_wifi() {
delay(10);
Serial.println();
Serial.print("Connecting to ");
Serial.println(ssid);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("");
Serial.println("WiFi connected");
Serial.print("IP address: ");
Serial.println(WiFi.localIP());
}
// --- MQTT Reconnect Function ---
void reconnect_mqtt() {
while (!client.connected()) {
Serial.print("Attempting MQTT connection...");
// Attempt to connect
if (client.connect(mqtt_client_id, mqtt_status_topic, 1, true, "Offline")) { // Last Will and Testament
Serial.println("connected");
// Once connected, publish an announcement and subscribe to topics
client.publish(mqtt_status_topic, "Online"); // Announce status
client.subscribe(mqtt_led_control_topic); // Subscribe to control topic
Serial.print("Subscribed to ");
Serial.println(mqtt_led_control_topic);
} else {
Serial.print("failed, rc=");
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
// Wait 5 seconds before retrying
delay(5000);
}
}
}
// --- MQTT Message Callback Function (when message is received) ---
void callback(char* topic, byte* payload, unsigned int length) {
Serial.print("Message arrived [");
Serial.print(topic);
Serial.print("] ");
String message;
for (int i = 0; i < length; i++) {
Serial.print((char)payload[i]);
message += (char)payload[i];
}
Serial.println();
// Handle LED control commands
if (String(topic) == mqtt_led_control_topic) {
if (message == "on") {
digitalWrite(LED_PIN, HIGH);
Serial.println("LED ON");
ledTimerActive = false; // Cancel any active timer
} else if (message == "off") {
digitalWrite(LED_PIN, LOW);
Serial.println("LED OFF");
ledTimerActive = false; // Cancel any active timer
} else if (message == "flash") {
// Simple flash for demonstration, you can implement more complex patterns
digitalWrite(LED_PIN, HIGH);
delay(200);
digitalWrite(LED_PIN, LOW);
delay(200);
digitalWrite(LED_PIN, HIGH);
delay(200);
digitalWrite(LED_PIN, LOW);
Serial.println("LED FLASH command received.");
ledTimerActive = false;
} else if (message == "timer") {
digitalWrite(LED_PIN, HIGH); // Turn LED on
lastLedCommandTime = millis();
ledTimerActive = true;
Serial.println("LED Timer ON for 10 seconds.");
}
}
}
Wokwi 設定步驟:
打開 Wokwi 網站,點擊 "New Project" -> "ESP32 Devkit"。
在右側的 "DIAGRAM" 視窗中:
點擊 "+" 號添加一個 DHT22 感測器,將其數據引腳連接到 ESP32 的 GPIO4。
點擊 "+" 號添加一個 LED,將其正極 (長腳) 連接到 ESP32 的 GPIO2 (或任何你喜歡的 GPIO,但要記得在程式碼中修改
LED_PIN),負極 (短腳) 連接到 GND。建議在 LED 和 GPIO 之間串聯一個 220 歐姆的電阻來保護 LED。
在
sketch.ino檔案中,用上面的程式碼替換所有內容。重要: 修改
const char* mqtt_client_id = "ESP32-Wokwi-DHT-LED-YourName";中的YourName,讓它成為一個獨特的客戶端 ID,例如ESP32-Wokwi-DHT-LED-John。點擊綠色的 "Start Simulation" 按鈕。觀察下方串列埠 (Serial Monitor) 的輸出,確認 Wi-Fi 和 MQTT 連接成功,並開始發送溫濕度數據。
2. Python GUI 應用程式
這是運行在你電腦上的 Python 程式碼,提供圖形介面。
<Python tkinter 程式>
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import paho.mqtt.client as mqtt
import threading
import time
import os
import sys
# Flask and LINE Bot SDK imports are REMOVED as LINE functionality is not needed.
# --- Configuration Section ---
# MQTT Broker Settings
MQTT_BROKER = "broker.mqttgo.io" # 或 "mqtt.eclipseprojects.io"
MQTT_PORT = 1883
# MQTT Topics (MUST match Wokwi ESP32 code)
MQTT_LED_CONTROL_TOPIC = "wokwi/esp32/led/control"
MQTT_TEMP_TOPIC = "wokwi/esp32/dht/temperature"
MQTT_HUMID_TOPIC = "wokwi/esp32/dht/humidity"
MQTT_STATUS_TOPIC = "wokwi/esp32/status" # ESP32 發布上線狀態
# --- Global variables for latest sensor data ---
latest_temperature = "N/A"
latest_humidity = "N/A"
# --- Tkinter Main Application Class ---
class IoTApp:
def __init__(self, master):
self.master = master
master.title("Wokwi ESP32 IoT Control & Monitor (MQTT GUI Only)")
master.geometry("700x550") # 調整視窗大小
self.mqtt_client = None
self.mqtt_reconnect_timer = None # 用於延遲 MQTT 重連嘗試的計時器
self.create_widgets()
self.setup_mqtt()
# No Flask app to start if LINE functionality is removed.
def create_widgets(self):
"""Creates all widgets in the Tkinter GUI."""
# --- LED Control Section ---
led_frame = ttk.LabelFrame(self.master, text="LED Control (Local)", padding="10")
led_frame.pack(pady=10, padx=10, fill="x")
ttk.Button(led_frame, text="LED ON", command=lambda: self.publish_mqtt(MQTT_LED_CONTROL_TOPIC, "on")).pack(side="left", padx=5, pady=5)
ttk.Button(led_frame, text="LED OFF", command=lambda: self.publish_mqtt(MQTT_LED_CONTROL_TOPIC, "off")).pack(side="left", padx=5, pady=5)
ttk.Button(led_frame, text="LED FLASH", command=lambda: self.publish_mqtt(MQTT_LED_CONTROL_TOPIC, "flash")).pack(side="left", padx=5, pady=5)
ttk.Button(led_frame, text="LED TIMER (10s)", command=lambda: self.publish_mqtt(MQTT_LED_CONTROL_TOPIC, "timer")).pack(side="left", padx=5, pady=5)
# --- DHT22 Sensor Data Display Section ---
dht_frame = ttk.LabelFrame(self.master, text="DHT22 Sensor Data", padding="10")
dht_frame.pack(pady=10, padx=10, fill="x")
self.temp_label = ttk.Label(dht_frame, text=f"Temperature: {latest_temperature}°C", font=("Arial", 14))
self.temp_label.pack(pady=5, anchor="w")
self.humid_label = ttk.Label(dht_frame, text=f"Humidity: {latest_humidity}%", font=("Arial", 14))
self.humid_label.pack(pady=5, anchor="w")
# --- Status Display Section ---
status_frame = ttk.LabelFrame(self.master, text="Connection Status", padding="10")
status_frame.pack(pady=10, padx=10, fill="x")
self.mqtt_status_label = ttk.Label(status_frame, text="MQTT: Connecting...", font=("Arial", 10), foreground="blue")
self.mqtt_status_label.pack(pady=2, anchor="w")
# Removed Flask/LINE status labels
# --- Message Log Section ---
log_frame = ttk.LabelFrame(self.master, text="Message Log", padding="10")
log_frame.pack(pady=10, padx=10, fill="both", expand=True)
self.log_text = scrolledtext.ScrolledText(log_frame, width=80, height=15, wrap=tk.WORD, font=("Consolas", 10))
self.log_text.pack(expand=True, fill="both")
self.log_text.config(state=tk.DISABLED) # Make read-only
def log_message(self, message, tag=None):
"""Appends a message to the log text area."""
self.master.after(0, self._append_log_message, message, tag)
def _append_log_message(self, message, tag):
"""Internal function to append log message (thread-safe)."""
self.log_text.config(state=tk.NORMAL)
self.log_text.insert(tk.END, message + "\n", tag)
self.log_text.see(tk.END) # Scroll to the end
self.log_text.config(state=tk.DISABLED)
# --- MQTT Related Functions ---
def setup_mqtt(self):
"""Sets up MQTT client callbacks and attempts to connect."""
self.mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
self.mqtt_client.on_connect = self.on_mqtt_connect
self.mqtt_client.on_message = self.on_mqtt_message
self.mqtt_client.on_disconnect = self.on_mqtt_disconnect
# Initial connection attempt
self.attempt_mqtt_connection()
def attempt_mqtt_connection(self):
"""Attempts to connect to the MQTT broker."""
try:
self.update_mqtt_status("Attempting to connect to MQTT...", "blue")
self.log_message("MQTT: Attempting connection to broker...", "info")
self.mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
# Run MQTT loop in a separate thread to avoid blocking the GUI
if not hasattr(self, 'mqtt_thread') or not self.mqtt_thread.is_alive():
self.mqtt_thread = threading.Thread(target=self.mqtt_client.loop_forever, daemon=True)
self.mqtt_thread.start()
except Exception as e:
self.update_mqtt_status(f"Error: {e}", "red")
self.log_message(f"MQTT: Initial Connection Error: {e}", "error")
self.schedule_mqtt_reconnect()
def on_mqtt_connect(self, client, userdata, flags, rc, properties): # Added 'properties'
"""Callback function when MQTT connects successfully."""
if rc == 0:
self.update_mqtt_status("Connected to MQTT Broker!", "green")
self.log_message("MQTT: Connected to Broker!", "success")
client.subscribe(MQTT_TEMP_TOPIC)
client.subscribe(MQTT_HUMID_TOPIC)
client.subscribe(MQTT_STATUS_TOPIC) # 訂閱 ESP32 狀態
self.log_message(f"MQTT: Subscribed to {MQTT_TEMP_TOPIC}, {MQTT_HUMID_TOPIC}, {MQTT_STATUS_TOPIC}", "info")
if self.mqtt_reconnect_timer:
self.master.after_cancel(self.mqtt_reconnect_timer)
self.mqtt_reconnect_timer = None
else:
self.update_mqtt_status(f"Failed to connect (code {rc}). Retrying...", "orange")
self.log_message(f"MQTT: Connection failed, return code {rc}. Retrying...", "warning")
self.schedule_mqtt_reconnect()
def on_mqtt_disconnect(self, client, userdata, rc):
"""Callback function when MQTT disconnects."""
self.update_mqtt_status(f"Disconnected from MQTT (code {rc}). Reconnecting...", "red")
self.log_message(f"MQTT: Disconnected with result code {rc}. Reconnecting...", "error")
# Ensure that if the client disconnects, we re-attempt connection
self.schedule_mqtt_reconnect()
def schedule_mqtt_reconnect(self):
"""Schedules an MQTT reconnection attempt after a delay."""
# Only schedule if not already scheduled
if self.mqtt_reconnect_timer is None:
self.mqtt_reconnect_timer = self.master.after(5000, self.perform_mqtt_reconnect)
self.log_message("MQTT: Scheduled reconnect in 5 seconds.", "info")
else:
self.log_message("MQTT: Reconnect already scheduled.", "info")
def perform_mqtt_reconnect(self):
"""Performs the actual MQTT reconnection attempt."""
self.mqtt_reconnect_timer = None # Clear the timer as we are about to attempt connection
if not self.mqtt_client.is_connected():
self.log_message("MQTT: Attempting reconnect...", "info")
try:
self.mqtt_client.reconnect() # This will trigger on_mqtt_connect if successful
except Exception as e:
self.update_mqtt_status(f"MQTT reconnect failed: {e}", "red")
self.log_message(f"MQTT: Reconnect failed: {e}", "error")
self.schedule_mqtt_reconnect() # Schedule another reconnect if this one failed
else:
self.log_message("MQTT: Already connected, no reconnect needed.", "info")
def on_mqtt_message(self, client, userdata, msg):
"""Callback function when an MQTT message is received, updates sensor data."""
global latest_temperature, latest_humidity
payload = msg.payload.decode()
self.log_message(f"MQTT Rx: Topic='{msg.topic}', Payload='{payload}'", "mqtt_rx")
if msg.topic == MQTT_TEMP_TOPIC:
latest_temperature = payload
self.master.after(0, self.update_dht_labels)
elif msg.topic == MQTT_HUMID_TOPIC:
latest_humidity = payload
self.master.after(0, self.update_dht_labels)
elif msg.topic == MQTT_STATUS_TOPIC:
self.log_message(f"ESP32 Status: {payload}", "esp32_status")
def update_dht_labels(self):
"""Updates temperature and humidity display on Tkinter GUI."""
self.temp_label.config(text=f"Temperature: {latest_temperature}°C")
self.humid_label.config(text=f"Humidity: {latest_humidity}%")
def update_mqtt_status(self, text, color):
"""Updates MQTT status display on Tkinter GUI."""
self.master.after(0, lambda: self.mqtt_status_label.config(text=f"MQTT: {text}", foreground=color))
def publish_mqtt(self, topic, payload):
"""Publishes an MQTT message to the specified topic."""
if self.mqtt_client and self.mqtt_client.is_connected():
try:
self.mqtt_client.publish(topic, payload)
self.log_message(f"MQTT Tx: Topic='{topic}', Payload='{payload}'", "mqtt_tx")
self.update_mqtt_status(f"Sent command: {payload}", "blue")
except Exception as e:
self.update_mqtt_status(f"Publish Error: {e}", "red")
self.log_message(f"MQTT: Publish Error: {e}", "error")
else:
self.update_mqtt_status("MQTT Not Connected! Command not sent.", "red")
self.log_message("MQTT: Client not connected, cannot publish. Command not sent.", "error")
self.schedule_mqtt_reconnect() # Attempt to reconnect if not connected
def on_closing(self):
"""
Cleanup actions when the Tkinter window is closed.
Gracefully stops the MQTT client.
"""
self.log_message("Application: Closing...", "info")
if self.mqtt_client:
self.log_message("MQTT: Disconnecting client...", "info")
self.mqtt_client.disconnect()
if self.mqtt_reconnect_timer:
self.master.after_cancel(self.mqtt_reconnect_timer)
self.master.destroy()
sys.exit(0) # Use sys.exit(0) for a clean exit, as there are no other threads we are explicitly managing.
# --- Main Program Entry Point ---
if __name__ == "__main__":
root = tk.Tk()
app_instance = IoTApp(root)
# Configure tags for log messages
app_instance.log_text.tag_config("info", foreground="blue")
app_instance.log_text.tag_config("success", foreground="green")
app_instance.log_text.tag_config("warning", foreground="orange")
app_instance.log_text.tag_config("error", foreground="red")
app_instance.log_text.tag_config("mqtt_tx", foreground="purple")
app_instance.log_text.tag_config("mqtt_rx", foreground="darkgreen")
app_instance.log_text.tag_config("esp32_status", foreground="darkorange")
root.protocol("WM_DELETE_WINDOW", app_instance.on_closing)
root.mainloop()
<<操作步驟>>
步驟 1: 準備 Python 環境
安裝 Python (如果尚未安裝): 從
下載並安裝最新版本的 Python。安裝時記得勾選 "Add Python to PATH"。 或 使用 thonny (https://thonny.org/)python.org
Download version 4.1.7 for Windows•Mac•LinuxOfficial downloads for Windows
Installer with 64-bit Python 3.10, requires 64-bit Windows 8.1 / 10 / 11
thonny-4.1.7.exe (21 MB) ⇐ recommended for you
Installer with 32-bit Python 3.8, suitable for all Windows versions since 7
Portable variant with 64-bit Python 3.10
Portable variant with 32-bit Python 3.8
Re-using an existing Python installation (for advanced users)
pip install thonny
安裝必要的 Python 庫: 打開你的命令提示字元 (CMD) 或終端機,執行以下指令:
Bashpip install paho-mqtttkinter是 Python 標準庫的一部分,通常無需額外安裝。
步驟 2: 設置和運行 Wokwi ESP32 模擬器
打開 Wokwi 專案:
前往
,點擊 "New Project" -> "ESP32 Devkit"。https://wokwi.com/ 在左側
sketch.ino編輯器中,將上面提供的 Arduino (Wokwi ESP32) 程式碼 完整複製並貼上。
配置硬體 (DIAGRAM 視窗):
在右側的 "DIAGRAM" 視窗,點擊元件庫 (加號圖示)。
搜尋並添加一個 DHT22 感測器。將其數據引腳 (
DATA) 連接到 ESP32 的 GPIO4。搜尋並添加一個 LED。將其正極 (通常較長的引腳) 連接到 ESP32 的 GPIO2。負極 (較短引腳) 連接到
GND。為了保護 LED,你可以在 LED 和 GPIO2 之間串聯一個 220 歐姆的電阻 (在元件庫中搜尋Resistor,設置為220歐姆)。
自訂客戶端 ID:
在 Arduino 程式碼中找到這行:
const char* mqtt_client_id = "ESP32-Wokwi-DHT-LED-YourName";將
"YourName"部分修改為一個對你而言獨特的名稱,例如"ESP32-Wokwi-DHT-LED-Alice"。這有助於 MQTT Broker 區分不同的客戶端。
啟動模擬:
點擊 Wokwi 介面頂部的綠色 "Start Simulation" 按鈕。
觀察下方 "Serial Monitor" 視窗的輸出。你應該會看到 Wi-Fi 連接成功、MQTT 連接成功,並且每隔幾秒就會顯示溫度和濕度讀數。這表示 ESP32 正在正常發布數據。
步驟 3: 運行 Python GUI 應用程式
保存 Python 程式碼:
將上面提供的 Python GUI 應用程式程式碼 複製並保存到你的電腦上,例如命名為
mqtt_gui_app.py。
運行程式:
打開你的命令提示字元 (CMD) 或終端機。
使用
cd命令導航到你保存mqtt_gui_app.py檔案的目錄。執行指令:
Bashpython mqtt_gui_app.py
觀察 Tkinter 視窗:
一個圖形使用者介面視窗將會彈出。
"連接狀態" 區塊: 觀察 "MQTT" 的狀態。如果一切正常,它會顯示為 "MQTT: 已連接到 MQTT Broker!" 並呈綠色。
"DHT22 感測器數據" 區塊: 大約每 5 秒,你應該會看到 "溫度" 和 "濕度" 數據更新,這些數據來自 Wokwi 模擬器。
"訊息日誌" 區塊: 會顯示 MQTT 訊息的發送 (Tx) 和接收 (Rx) 記錄,以及其他系統訊息。
步驟 4: 互動測試
點擊 LED 控制按鈕:
在 Python GUI 視窗中,點擊 "LED 開啟"、"LED 關閉"、"LED 閃爍" 或 "LED 定時 (10秒)"。
觀察 Wokwi 模擬器中的 LED 燈是否響應你的指令。
同時,查看 Python GUI 的 "訊息日誌" 和 Wokwi 的 "Serial Monitor" 輸出,確認指令是否成功發布和接收。
常見問題與故障排除
Python GUI 中的 MQTT 狀態顯示 "連接中..." 或 "連接失敗":
檢查網路連接: 確保你的電腦可以正常上網。
檢查 MQTT Broker 地址和埠: 確保 Python 程式碼中的
MQTT_BROKER和MQTT_PORT與 Wokwi 程式碼中設定的mqtt_server和mqtt_port完全一致。防火牆問題: 你的電腦防火牆可能阻止了 Python 應用程式連接到 MQTT Broker。暫時關閉防火牆進行測試,如果可以,則為 Python 和 MQTT 埠添加例外規則。
Broker 線上狀態: 有時公共 Broker 可能會暫時離線。可以嘗試更換另一個公共 Broker,例如將
broker.mqttgo.io改為mqtt.eclipseprojects.io(兩邊程式碼都要改)。
Wokwi 模擬器中的 Serial Monitor 顯示 "Failed to read from DHT sensor!":
檢查 DHT22 感測器的接線,確保
DATA引腳連接到 GPIO4,並且連接正確。
Wokwi 模擬器中的 Serial Monitor 顯示 "WiFi connected" 但沒有 "MQTT connected":
檢查 MQTT Broker 地址和埠是否正確。
確保
mqtt_client_id是獨特的。如果有多個客戶端使用相同的 ID,可能會有連接問題。
Python GUI 顯示 "MQTT 已連接",但沒有溫濕度數據更新,或 LED 控制無效:
主題不匹配: 檢查 Python 程式碼中的
MQTT_TEMP_TOPIC,MQTT_HUMID_TOPIC,MQTT_LED_CONTROL_TOPIC是否與 Wokwi 程式碼中的主題名稱完全一致(包括大小寫和斜線)。Wokwi 是否正常發布? 檢查 Wokwi 的 Serial Monitor,確認它是否正在成功發布溫濕度數據。
Wokwi 是否訂閱了控制主題? 確認 Wokwi 的 Serial Monitor 顯示它已經訂閱了
wokwi/esp32/led/control主題。
沒有留言:
張貼留言