ESP32 雙核心控制 LED 與 DHT22 溫濕度感測器 (Wokwi 模擬) EX51 -- Tkinter + SQLite
Python程式
import tkinter as tk
from tkinter import ttk, scrolledtext
import paho.mqtt.client as mqtt
import sqlite3
import datetime
import threading
import time
# --- MQTT Configuration (MUST match ESP32 code) ---
MQTT_BROKER = "broker.mqttgo.io"
MQTT_PORT = 1883
MQTT_CLIENT_ID = "Tkinter_Controller_YourName_001" # <<< Change to your unique ID!
# MQTT Topics (MUST match ESP32 code)
MQTT_TOPIC_LED_CONTROL = "esp32/led/control"
MQTT_TOPIC_TEMPERATURE = "esp32/dht/temperature"
MQTT_TOPIC_HUMIDITY = "esp32/dht/humidity"
MQTT_TOPIC_STATUS = "esp32/status"
# --- SQLite Database Configuration ---
DB_NAME = "esp32.db"
# --- Global Variables for Sensor Data ---
latest_temperature = "N/A"
latest_humidity = "N/A"
esp32_connection_status = "Disconnected"
# --- Database Functions ---
def create_table():
"""創建資料庫表格,如果不存在的話"""
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
time TEXT NOT NULL,
event TEXT NOT NULL
)
""")
conn.commit()
conn.close()
def log_event(event_description):
"""將事件記錄到資料庫,並更新日誌顯示"""
now = datetime.datetime.now()
date_str = now.strftime("%Y-%m-%d")
time_str = now.strftime("%H:%M:%S")
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cursor.execute("INSERT INTO events (date, time, event) VALUES (?, ?, ?)",
(date_str, time_str, event_description))
conn.commit()
conn.close()
print(f"Logged to DB: {date_str} {time_str} - {event_description}")
app.update_log_display(f"{date_str} {time_str} - {event_description}")
app.refresh_db_display() # 每次有新事件時更新資料庫顯示
# --- MQTT Callbacks ---
def on_connect(client, userdata, flags, rc):
global esp32_connection_status
if rc == 0:
print("Connected to MQTT Broker!")
client.subscribe(MQTT_TOPIC_TEMPERATURE)
client.subscribe(MQTT_TOPIC_HUMIDITY)
client.subscribe(MQTT_TOPIC_STATUS)
app.update_status("MQTT Connected")
esp32_connection_status = "Connected"
log_event("MQTT Broker Connected")
else:
print(f"Failed to connect, return code {rc}\n")
app.update_status(f"MQTT Disconnected (Code: {rc})")
esp32_connection_status = "Disconnected"
def on_disconnect(client, userdata, rc):
global esp32_connection_status
print(f"Disconnected with result code {rc}\n")
app.update_status("MQTT Disconnected")
esp32_connection_status = "Disconnected"
log_event(f"MQTT Broker Disconnected (Code: {rc})")
# Attempt to reconnect automatically in a new thread
threading.Thread(target=reconnect_loop, args=(client,)).start()
def reconnect_loop(client):
"""在斷線後嘗試重新連接 MQTT Broker"""
while not client.is_connected():
try:
print("Attempting to reconnect to MQTT...")
client.reconnect()
time.sleep(2) # Wait a bit before next attempt
except Exception as e:
print(f"Reconnect failed: {e}")
time.sleep(5) # Wait longer if there's an exception
def on_message(client, userdata, msg):
global latest_temperature, latest_humidity, esp32_connection_status
topic = msg.topic
payload = msg.payload.decode()
print(f"Received message - Topic: '{topic}', Payload: '{payload}'")
if topic == MQTT_TOPIC_TEMPERATURE:
latest_temperature = payload
app.update_dht_labels(latest_temperature, latest_humidity)
log_event(f"Received Temperature: {latest_temperature}°C")
elif topic == MQTT_TOPIC_HUMIDITY:
latest_humidity = payload
app.update_dht_labels(latest_temperature, latest_humidity)
log_event(f"Received Humidity: {latest_humidity}%")
elif topic == MQTT_TOPIC_STATUS:
esp32_connection_status = payload
app.update_esp32_status(esp32_connection_status)
log_event(f"ESP32 Status: {esp32_connection_status}")
# --- Tkinter Application ---
class IoTControllerApp:
def __init__(self, master):
self.master = master
master.title("ESP32 IoT Controller (Tkinter)")
master.geometry("700x750") # Larger window for database view
# Configure styles
self.style = ttk.Style()
self.style.configure("TFrame", padding=10, relief="groove")
self.style.configure("TButton", font=("Arial", 12), padding=8)
self.style.configure("TLabel", font=("Arial", 10))
self.style.configure("Header.TLabel", font=("Arial", 14, "bold"))
self.style.configure("Status.TLabel", font=("Arial", 10, "bold"))
# MQTT Client setup
self.client = mqtt.Client(client_id=MQTT_CLIENT_ID)
self.client.on_connect = on_connect
self.client.on_disconnect = on_disconnect
self.client.on_message = on_message
# Connect MQTT in a separate thread to not block GUI
self.mqtt_thread = threading.Thread(target=self._start_mqtt_client)
self.mqtt_thread.daemon = True # Allow thread to exit when main program exits
self.mqtt_thread.start()
# --- GUI Elements ---
# Status Frame
self.status_frame = ttk.Frame(master, style="TFrame")
self.status_frame.pack(pady=10, fill="x", padx=10)
self.mqtt_status_label = ttk.Label(self.status_frame, text="MQTT Status: Disconnected", style="Status.TLabel", foreground="red")
self.mqtt_status_label.grid(row=0, column=0, padx=5, pady=5, sticky="w")
self.esp32_status_label = ttk.Label(self.status_frame, text=f"ESP32 Status: {esp32_connection_status}", style="Status.TLabel", foreground="orange")
self.esp32_status_label.grid(row=0, column=1, padx=5, pady=5, sticky="e")
self.status_frame.columnconfigure(1, weight=1) # Make ESP32 status push to right
# Notebook (Tabs)
self.notebook = ttk.Notebook(master)
self.notebook.pack(pady=10, padx=10, expand=True, fill="both")
# Tab 1: Control & Log
self.control_log_frame = ttk.Frame(self.notebook, style="TFrame")
self.notebook.add(self.control_log_frame, text="Control & Log")
# LED Control Frame (inside Control & Log tab)
self.led_frame = ttk.Frame(self.control_log_frame, style="TFrame")
self.led_frame.pack(pady=10, fill="x", padx=10)
ttk.Label(self.led_frame, text="LED Control", style="Header.TLabel").grid(row=0, column=0, columnspan=2, pady=5)
self.on_button = ttk.Button(self.led_frame, text="LED ON", command=lambda: self.send_command("on"))
self.on_button.grid(row=1, column=0, padx=5, pady=5, sticky="ew")
self.off_button = ttk.Button(self.led_frame, text="LED OFF", command=lambda: self.send_command("off"))
self.off_button.grid(row=1, column=1, padx=5, pady=5, sticky="ew")
self.flash_button = ttk.Button(self.led_frame, text="LED FLASH", command=lambda: self.send_command("flash"))
self.flash_button.grid(row=2, column=0, padx=5, pady=5, sticky="ew")
self.timer_button = ttk.Button(self.led_frame, text="LED TIMER (10s)", command=lambda: self.send_command("timer"))
self.timer_button.grid(row=2, column=1, padx=5, pady=5, sticky="ew")
self.led_frame.columnconfigure(0, weight=1)
self.led_frame.columnconfigure(1, weight=1)
# DHT22 Data Frame (inside Control & Log tab)
self.dht_frame = ttk.Frame(self.control_log_frame, style="TFrame")
self.dht_frame.pack(pady=10, fill="x", padx=10)
ttk.Label(self.dht_frame, text="DHT22 Sensor Data", style="Header.TLabel").grid(row=0, column=0, columnspan=2, pady=5)
self.temp_label = ttk.Label(self.dht_frame, text=f"Temperature: {latest_temperature}°C")
self.temp_label.grid(row=1, column=0, padx=5, pady=5, sticky="w")
self.hum_label = ttk.Label(self.dht_frame, text=f"Humidity: {latest_humidity}%")
self.hum_label.grid(row=1, column=1, padx=5, pady=5, sticky="w")
self.dht_frame.columnconfigure(0, weight=1)
self.dht_frame.columnconfigure(1, weight=1)
# Log Display Frame (inside Control & Log tab)
self.log_frame = ttk.Frame(self.control_log_frame, style="TFrame")
self.log_frame.pack(pady=10, fill="both", expand=True, padx=10)
ttk.Label(self.log_frame, text="Event Log", style="Header.TLabel").pack(pady=5)
self.log_display = scrolledtext.ScrolledText(self.log_frame, wrap=tk.WORD, width=60, height=10, font=("Consolas", 9))
self.log_display.pack(expand=True, fill="both", padx=5, pady=5)
self.log_display.config(state=tk.DISABLED) # Make it read-only
# Tab 2: Database View
self.db_frame = ttk.Frame(self.notebook, style="TFrame")
self.notebook.add(self.db_frame, text="Database View")
self.db_tree = ttk.Treeview(self.db_frame, columns=("ID", "Date", "Time", "Event"), show="headings")
self.db_tree.heading("ID", text="ID")
self.db_tree.heading("Date", text="Date")
self.db_tree.heading("Time", text="Time")
self.db_tree.heading("Event", text="Event")
# Set column widths
self.db_tree.column("ID", width=50, anchor="center")
self.db_tree.column("Date", width=100, anchor="center")
self.db_tree.column("Time", width=90, anchor="center")
self.db_tree.column("Event", width=350, anchor="w") # Wider for event description
self.db_tree.pack(fill="both", expand=True, padx=5, pady=5)
# Scrollbar for Treeview
vsb = ttk.Scrollbar(self.db_tree, orient="vertical", command=self.db_tree.yview)
vsb.pack(side="right", fill="y")
self.db_tree.configure(yscrollcommand=vsb.set)
self.refresh_db_button = ttk.Button(self.db_frame, text="Refresh Database", command=self.refresh_db_display)
self.refresh_db_button.pack(pady=5)
# Initial database display refresh
self.refresh_db_display()
# Bind tab change event to refresh database view
self.notebook.bind("<<NotebookTabChanged>>", self._on_tab_change)
def _start_mqtt_client(self):
try:
self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
self.client.loop_forever() # Blocks, so run in a separate thread
except Exception as e:
print(f"MQTT client connection failed in thread: {e}")
self.update_status("MQTT Connection Error!")
log_event(f"MQTT Connection Error: {e}")
def send_command(self, command):
self.client.publish(MQTT_TOPIC_LED_CONTROL, command)
print(f"Sent command: {command}")
log_event(f"Sent LED command: {command}")
def update_dht_labels(self, temp, hum):
self.temp_label.config(text=f"Temperature: {temp}°C")
self.hum_label.config(text=f"Humidity: {hum}%")
def update_status(self, status):
color = "green" if "Connected" in status else "red"
self.mqtt_status_label.config(text=f"MQTT Status: {status}", foreground=color)
def update_esp32_status(self, status):
color = "green" if "online" in status else "red"
self.esp32_status_label.config(text=f"ESP32 Status: {status}", foreground=color)
def update_log_display(self, message):
self.log_display.config(state=tk.NORMAL) # Enable writing
self.log_display.insert(tk.END, message + "\n")
self.log_display.yview(tk.END) # Auto-scroll to bottom
self.log_display.config(state=tk.DISABLED) # Disable writing
def refresh_db_display(self):
"""從資料庫讀取所有事件並更新 Treeview 顯示"""
# 清空現有數據
for item in self.db_tree.get_children():
self.db_tree.delete(item)
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cursor.execute("SELECT id, date, time, event FROM events ORDER BY id DESC") # 最新數據在前
rows = cursor.fetchall()
conn.close()
for row in rows:
self.db_tree.insert("", tk.END, values=row) # 插入到 Treeview
print("Database display refreshed.")
def _on_tab_change(self, event):
"""當分頁切換時被呼叫,用於在切換到資料庫分頁時更新顯示"""
selected_tab = self.notebook.tab(self.notebook.select(), "text")
if selected_tab == "Database View":
self.refresh_db_display()
# --- Main Application Execution ---
if __name__ == "__main__":
create_table() # Ensure database table exists
root = tk.Tk()
app = IoTControllerApp(root)
root.mainloop()
操作步驟
啟動 Wokwi ESP32 模擬器:
在 Wokwi 上載入並運行上面的 ESP32 Arduino 程式碼。
確認 電路圖 設定正確。
檢查 Wokwi 的 Serial Monitor,確保 ESP32 成功連接 Wi-Fi 和 MQTT Broker。
運行 Python Tkinter 應用程式:
確保你已經用
pip install paho-mqtt安裝了必要的函式庫。將上面的 Python Tkinter 程式碼儲存為
tkinter_db_browser.py。重要: 為了避免 MQTT 客戶端 ID 衝突,請修改 Python 程式碼中的
MQTT_CLIENT_ID = "Tkinter_Controller_YourName_001",換成你獨特的名稱。打開你的終端機或命令提示字元,導航到該檔案所在目錄。
執行:
python tkinter_db_browser.py。一個 Tkinter 視窗將會彈出。你的終端機也會顯示 MQTT 連線和訊息日誌。
透過 Tkinter 介面控制與查看記錄:
在 Tkinter 視窗中,你會看到兩個分頁:"Control & Log" 和 "Database View"。
在 "Control & Log" 分頁:
點擊 "LED ON"、"LED OFF" 等按鈕來控制 Wokwi 中的 LED。
"DHT22 Sensor Data" 會顯示即時的溫濕度數據。
"Event Log" 會顯示所有發送的命令和接收的數據日誌。
切換到 "Database View" 分頁:
你會看到一個表格 (
ttk.Treeview),其中包含了esp32.db中events表格的所有記錄。每次你點擊按鈕發送指令,或 ESP32 發送新的溫濕度數據,Event Log 和 Database View (在你切換到該分頁時會自動更新,你也可以手動點擊 Refresh Database 按鈕) 都會顯示最新的記錄。
資料會以 ID 遞減 的方式排序,最新發生的事件會顯示在表格的上方。



沒有留言:
張貼留言