ESP32 雙核心控制 LED 與 DHT22 溫濕度感測器 (Wokwi 模擬) EX5 -- Tkinter + SQLite
Python Tkinter 方式來控制 ESP32 的 LED,並接收 DHT22 的溫濕度數據,同時將這些事件和數據記錄到一個 SQLite 資料庫 (
esp32.db) 中。這個方案的架構會是:
Wokwi ESP32 (Arduino Code):
負責連接 Wi-Fi 和 MQTT Broker。
訂閱 Tkinter 應用程式發送的 LED 控制命令。
發布 DHT22 溫濕度數據。
Python Tkinter 應用程式:
提供一個圖形使用者介面 (GUI) 來發送 LED 控制命令。
作為 MQTT Client 連接到相同的 MQTT Broker。
發送 LED 控制命令到 MQTT Broker。
訂閱 DHT22 數據。
接收到 DHT22 數據或發送 LED 命令時,將事件和數據(
id,date,time,event)記錄到esp32.dbSQLite 資料庫。
一、 Wokwi ESP32 Arduino 程式碼
這個 ESP32 程式碼會保持精簡,只負責 MQTT 通訊、LED 控制和 DHT22 讀取。
請將以下程式碼貼到你的 Wokwi sketch.ino 檔案中。
Wokwi esp32 arduino程式
Python Tkinter 應用程式與 SQLite 記錄
這個 Python 程式碼會建立一個 Tkinter GUI,並使用 paho-mqtt 庫與 MQTT Broker 通訊,同時使用 sqlite3 庫將事件記錄到 esp32.db。
運行前準備:
安裝 Python 函式庫: 打開你的終端機或命令提示字元,執行:
Bashpip install paho-mqttPython 程式碼:將以下程式碼儲存為
tkinter_controller.py。
Python程式
import tkinter as tk
from tkinter import ttk, scrolledtext
import paho.mqtt.client as mqtt
import sqlite3
import datetime
import threading
import time
# --- MQTT Configuration (MUST match ESP32 code) ---
MQTT_BROKER = "broker.mqttgo.io"
MQTT_PORT = 1883
MQTT_CLIENT_ID = "Tkinter_Controller_YourName_001" # <<< Change to your unique ID!
# MQTT Topics (MUST match ESP32 code)
MQTT_TOPIC_LED_CONTROL = "esp32/led/control"
MQTT_TOPIC_TEMPERATURE = "esp32/dht/temperature"
MQTT_TOPIC_HUMIDITY = "esp32/dht/humidity"
MQTT_TOPIC_STATUS = "esp32/status"
# --- SQLite Database Configuration ---
DB_NAME = "esp32.db"
# --- Global Variables for Sensor Data ---
latest_temperature = "N/A"
latest_humidity = "N/A"
esp32_connection_status = "Disconnected"
# --- Database Functions ---
def create_table():
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
time TEXT NOT NULL,
event TEXT NOT NULL
)
""")
conn.commit()
conn.close()
def log_event(event_description):
now = datetime.datetime.now()
date_str = now.strftime("%Y-%m-%d")
time_str = now.strftime("%H:%M:%S")
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cursor.execute("INSERT INTO events (date, time, event) VALUES (?, ?, ?)",
(date_str, time_str, event_description))
conn.commit()
conn.close()
print(f"Logged to DB: {date_str} {time_str} - {event_description}")
app.update_log_display(f"{date_str} {time_str} - {event_description}")
# --- MQTT Callbacks ---
def on_connect(client, userdata, flags, rc):
global esp32_connection_status
if rc == 0:
print("Connected to MQTT Broker!")
client.subscribe(MQTT_TOPIC_TEMPERATURE)
client.subscribe(MQTT_TOPIC_HUMIDITY)
client.subscribe(MQTT_TOPIC_STATUS)
app.update_status("MQTT Connected")
esp32_connection_status = "Connected"
log_event("MQTT Broker Connected")
else:
print(f"Failed to connect, return code {rc}\n")
app.update_status(f"MQTT Disconnected (Code: {rc})")
esp32_connection_status = "Disconnected"
def on_disconnect(client, userdata, rc):
global esp32_connection_status
print(f"Disconnected with result code {rc}\n")
app.update_status("MQTT Disconnected")
esp32_connection_status = "Disconnected"
log_event(f"MQTT Broker Disconnected (Code: {rc})")
# Attempt to reconnect automatically
threading.Thread(target=reconnect_loop, args=(client,)).start()
def reconnect_loop(client):
while not client.is_connected():
try:
print("Attempting to reconnect to MQTT...")
client.reconnect()
time.sleep(2) # Wait a bit before next attempt
except Exception as e:
print(f"Reconnect failed: {e}")
time.sleep(5) # Wait longer if there's an exception
def on_message(client, userdata, msg):
global latest_temperature, latest_humidity, esp32_connection_status
topic = msg.topic
payload = msg.payload.decode()
print(f"Received message - Topic: '{topic}', Payload: '{payload}'")
if topic == MQTT_TOPIC_TEMPERATURE:
latest_temperature = payload
app.update_dht_labels(latest_temperature, latest_humidity)
log_event(f"Received Temperature: {latest_temperature}°C")
elif topic == MQTT_TOPIC_HUMIDITY:
latest_humidity = payload
app.update_dht_labels(latest_temperature, latest_humidity)
log_event(f"Received Humidity: {latest_humidity}%")
elif topic == MQTT_TOPIC_STATUS:
esp32_connection_status = payload
app.update_esp32_status(esp32_connection_status)
log_event(f"ESP32 Status: {esp32_connection_status}")
# --- Tkinter Application ---
class IoTControllerApp:
def __init__(self, master):
self.master = master
master.title("ESP32 IoT Controller (Tkinter)")
master.geometry("600x650") # Larger window to accommodate logs
# Configure styles
self.style = ttk.Style()
self.style.configure("TFrame", padding=10, relief="groove")
self.style.configure("TButton", font=("Arial", 12), padding=8)
self.style.configure("TLabel", font=("Arial", 10))
self.style.configure("Header.TLabel", font=("Arial", 14, "bold"))
self.style.configure("Status.TLabel", font=("Arial", 10, "bold"))
# MQTT Client setup
self.client = mqtt.Client(client_id=MQTT_CLIENT_ID)
self.client.on_connect = on_connect
self.client.on_disconnect = on_disconnect
self.client.on_message = on_message
# Connect MQTT in a separate thread to not block GUI
self.mqtt_thread = threading.Thread(target=self._start_mqtt_client)
self.mqtt_thread.daemon = True # Allow thread to exit when main program exits
self.mqtt_thread.start()
# --- GUI Elements ---
# Status Frame
self.status_frame = ttk.Frame(master, style="TFrame")
self.status_frame.pack(pady=10, fill="x", padx=10)
self.mqtt_status_label = ttk.Label(self.status_frame, text="MQTT Status: Disconnected", style="Status.TLabel", foreground="red")
self.mqtt_status_label.grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.esp32_status_label = ttk.Label(self.status_frame, text=f"ESP32 Status: {esp32_connection_status}", style="Status.TLabel", foreground="orange")
self.esp32_status_label.grid(row=0, column=1, padx=5, pady=5, sticky="e")
self.status_frame.columnconfigure(1, weight=1) # Make ESP32 status push to right
# LED Control Frame
self.led_frame = ttk.Frame(master, style="TFrame")
self.led_frame.pack(pady=10, fill="x", padx=10)
ttk.Label(self.led_frame, text="LED Control", style="Header.TLabel").grid(row=0, column=0, columnspan=2, pady=5)
self.on_button = ttk.Button(self.led_frame, text="LED ON", command=lambda: self.send_command("on"))
self.on_button.grid(row=1, column=0, padx=5, pady=5, sticky="ew")
self.off_button = ttk.Button(self.led_frame, text="LED OFF", command=lambda: self.send_command("off"))
self.off_button.grid(row=1, column=1, padx=5, pady=5, sticky="ew")
self.flash_button = ttk.Button(self.led_frame, text="LED FLASH", command=lambda: self.send_command("flash"))
self.flash_button.grid(row=2, column=0, padx=5, pady=5, sticky="ew")
self.timer_button = ttk.Button(self.led_frame, text="LED TIMER (10s)", command=lambda: self.send_command("timer"))
self.timer_button.grid(row=2, column=1, padx=5, pady=5, sticky="ew")
self.led_frame.columnconfigure(0, weight=1)
self.led_frame.columnconfigure(1, weight=1)
# DHT22 Data Frame
self.dht_frame = ttk.Frame(master, style="TFrame")
self.dht_frame.pack(pady=10, fill="x", padx=10)
ttk.Label(self.dht_frame, text="DHT22 Sensor Data", style="Header.TLabel").grid(row=0, column=0, columnspan=2, pady=5)
self.temp_label = ttk.Label(self.dht_frame, text=f"Temperature: {latest_temperature}°C")
self.temp_label.grid(row=1, column=0, padx=5, pady=5, sticky="w")
self.hum_label = ttk.Label(self.dht_frame, text=f"Humidity: {latest_humidity}%")
self.hum_label.grid(row=1, column=1, padx=5, pady=5, sticky="w")
self.dht_frame.columnconfigure(0, weight=1)
self.dht_frame.columnconfigure(1, weight=1)
# Log Display Frame
self.log_frame = ttk.Frame(master, style="TFrame")
self.log_frame.pack(pady=10, fill="both", expand=True, padx=10)
ttk.Label(self.log_frame, text="Event Log", style="Header.TLabel").pack(pady=5)
self.log_display = scrolledtext.ScrolledText(self.log_frame, wrap=tk.WORD, width=60, height=15, font=("Consolas", 9))
self.log_display.pack(expand=True, fill="both", padx=5, pady=5)
self.log_display.config(state=tk.DISABLED) # Make it read-only
def _start_mqtt_client(self):
try:
self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
self.client.loop_forever() # Blocks, so run in a separate thread
except Exception as e:
print(f"MQTT client connection failed in thread: {e}")
self.update_status("MQTT Connection Error!")
log_event(f"MQTT Connection Error: {e}")
def send_command(self, command):
self.client.publish(MQTT_TOPIC_LED_CONTROL, command)
print(f"Sent command: {command}")
log_event(f"Sent LED command: {command}")
def update_dht_labels(self, temp, hum):
self.temp_label.config(text=f"Temperature: {temp}°C")
self.hum_label.config(text=f"Humidity: {hum}%")
def update_status(self, status):
color = "green" if "Connected" in status else "red"
self.mqtt_status_label.config(text=f"MQTT Status: {status}", foreground=color)
def update_esp32_status(self, status):
color = "green" if "online" in status else "red"
self.esp32_status_label.config(text=f"ESP32 Status: {status}", foreground=color)
def update_log_display(self, message):
self.log_display.config(state=tk.NORMAL) # Enable writing
self.log_display.insert(tk.END, message + "\n")
self.log_display.yview(tk.END) # Auto-scroll to bottom
self.log_display.config(state=tk.DISABLED) # Disable writing
# --- Main Application Execution ---
if __name__ == "__main__":
create_table() # Ensure database table exists
root = tk.Tk()
app = IoTControllerApp(root)
root.mainloop()
操作步驟
啟動 Wokwi ESP32 模擬器:
在 Wokwi 上載入並運行上面的 ESP32 Arduino 程式碼。
確認
diagram.json設定正確。檢查 Wokwi 的 Serial Monitor,確保 ESP32 成功連接 Wi-Fi 和 MQTT Broker,並開始發布溫濕度數據。你會看到它打印出「WiFi Connected!」和 MQTT 連接成功的訊息。
運行 Python Tkinter 應用程式:
確保你已經用
pip install paho-mqtt安裝了必要的函式庫。將上面的 Python Tkinter 程式碼儲存為
tkinter_controller.py。重要: 為了避免 MQTT 客戶端 ID 衝突,請修改 Python 程式碼中的
MQTT_CLIENT_ID = "Tkinter_Controller_YourName_001",換成你獨特的名稱。打開你的終端機或命令提示字元,導航到該檔案所在目錄。
執行:
python tkinter_controller.py。一個 Tkinter 視窗將會彈出。你的終端機也會顯示 MQTT 連線和訊息日誌。
透過 Tkinter 介面控制與查看記錄:
在 Tkinter 視窗中,點擊 "LED ON"、"LED OFF"、"LED FLASH"、"LED TIMER (10s)" 按鈕。觀察 Wokwi 模擬器中的 LED 是否按照指令動作。
在 "DHT22 Sensor Data" 區域,你會看到溫度和濕度數據不斷更新(每 10 秒)。
在 "Event Log" 區域,你會看到所有發送的 LED 命令和接收到的溫濕度數據都被實時記錄下來。
查看資料庫: 在
tkinter_controller.py運行後,會在同一個目錄下生成一個esp32.db檔案。你可以使用任何 SQLite 瀏覽器(如DB Browser for SQLite)打開這個檔案,查看events表格中的所有記錄。資料庫
esp32.db欄位說明id: 每次事件的唯一識別碼,自動遞增。date: 事件發生的日期,格式為YYYY-MM-DD。time: 事件發生的時間,格式為HH:MM:SS。event: 事件的詳細描述,例如 "Sent LED command: on" 或 "Received Temperature: 25.5°C"。
這個設定提供了一個完整的 IoT 解決方案,從 ESP32 的硬體控制和數據採集,到 Python GUI 的指令發送和數據展示,再到 SQLite 資料庫的歷史記錄存儲。
Downloads
(Please consider sponsoring us on Patreon 😄)
Windows
Our latest release (3.13.1) for Windows:
- DB Browser for SQLite - Standard installer for 32-bit Windows
- DB Browser for SQLite - .zip (no installer) for 32-bit Windows
- DB Browser for SQLite - Standard installer for 64-bit Windows
- DB Browser for SQLite - .zip (no installer) for 64-bit Windows
Free code signing provided by SignPath.io, certificate by SignPath Foundation.
Windows PortableApp
Our latest release (3.13.1) for Windows:
Note - If for any reason the standard Windows release does not work (e.g. gives an error), try a nightly build (below).
Nightly builds often fix bugs reported after the last release. 😄
macOS
Our latest release (3.13.1) for macOS:
Homebrew
If you prefer using Homebrew for macOS, our latest release can be installed via:
brew install db-browser-for-sqlite






沒有留言:
張貼留言