Wokwi ESP32 8 Port Input/Output + MQTT + Python 控制
P3-P8 (有內建拉電阻的腳位): 如果您的按鈕是連接到 VCC (3.3V),按下按鈕時,腳位會保持 HIGH,因此狀態永遠不會改變。
P1/P2 (無內建拉電阻的腳位,Pin 34/35): 這些腳位缺乏內建電阻,它們的讀數處於 浮動 狀態,雖然可能偶爾跳變,但不可靠。
WOKWI程式
PYTHON TKINTER程式
import tkinter as tk
from tkinter import ttk, messagebox
import paho.mqtt.client as mqtt
import json # 確保有導入 json 庫
# --- MQTT 設定 ---
MQTT_BROKER = "broker.mqtt-dashboard.com"
MQTT_PORT = 1883
# 輸出 (Relay/LED) Topic: 由 Python 發送
MQTT_TOPIC_OUTPUT = "alex9ufo/8port/LED"
# 輸入 (Input State) Topic: 由 ESP32 發送,Python 接收
MQTT_TOPIC_INPUT = "alex9ufo/8port/INPUT"
CLIENT_ID = "Tkinter_Controller_Monitor_Final"
# --- 輸出 (Relay) 與 ESP32 腳位對應 (原圖片) ---
RELAY_MAP = {
1: 23, 2: 22, 3: 21, 4: 19,
5: 18, 6: 5, 7: 17, 8: 16
}
# --- 輸入 (Input) 與 ESP32 腳位對應 (新圖片) ---
INPUT_MAP = {
1: 34, 2: 35, 3: 32, 4: 33,
5: 25, 6: 26, 7: 27, 8: 14
}
# --- 顏色與文字定義 ---
COLOR_ON = "green" # 輸出 ON 時的顏色
COLOR_OFF = "SystemButtonFace" # 預設按鈕背景色 (用於輸出 OFF)
COLOR_INPUT_HIGH = "green" # 輸入 HIGH 時的 LED 顏色
COLOR_INPUT_LOW = "lightgray" # 輸入 LOW 時的 LED 顏色
TEXT_HIGH = "HIGH"
TEXT_LOW = "LOW"
# --- Tkinter 主視窗實例 (需要先宣告) ---
root = tk.Tk()
status_label = None
input_leds = {} # 用來儲存輸入狀態指示燈的字典
# --- MQTT 客戶端回調函式 ---
def on_connect(client, userdata, flags, rc, properties=None):
"""當客戶端連接到 Broker 時被調用。"""
if rc == 0:
print("💡 MQTT 連接成功!")
root.after(0, lambda: status_label.config(text=f"已連接到 {MQTT_BROKER}", foreground="green"))
# 成功連接後,訂閱輸入 Topic
client.subscribe(MQTT_TOPIC_INPUT, qos=0)
print(f"✅ 訂閱 Topic: {MQTT_TOPIC_INPUT}")
else:
print(f"❌ MQTT 連接失敗,返回碼: {rc}")
root.after(0, lambda: status_label.config(text=f"連接失敗 (Code {rc})", foreground="red"))
def on_disconnect(client, userdata, rc, properties=None):
"""當客戶端從 Broker 斷開連接時被調用。"""
print(f"🔌 MQTT 斷開連接,返回碼: {rc}")
root.after(0, lambda: status_label.config(text="已斷開連接", foreground="gray"))
def on_message_input(client, userdata, msg):
"""處理來自 ESP32 的輸入狀態訊息 (alex9ufo/8port/INPUT)。"""
payload = msg.payload.decode()
print(f"📥 收到輸入狀態: {payload}")
# 預期的 Payload 格式為 JSON 字符串,例如: {"P1":1,"P2":0,...,"P8":1}
try:
states = json.loads(payload)
# 在主線程中更新 GUI
root.after(0, lambda: update_input_leds(states))
except Exception as e:
print(f"處理輸入訊息錯誤: {e}")
def update_input_leds(states):
"""根據收到的 JSON 數據更新 GUI 上的輸入狀態指示燈的顏色和文字。"""
for port, state in states.items():
# port 是 'P1', 'P2', ...
if port in input_leds:
if state == 1:
color = COLOR_INPUT_HIGH
text = TEXT_HIGH
else:
color = COLOR_INPUT_LOW
text = TEXT_LOW
input_leds[port].config(bg=color, text=text)
# --- GUI 輸出控制邏輯 (保持不變) ---
def toggle_relay(port, action):
"""根據按鈕點擊發送 MQTT 訊息並更新顏色。"""
payload = f"P{port}{action}"
if not client.is_connected():
messagebox.showwarning("連線狀態", "MQTT 客戶端未連接,請檢查網路。")
return
try:
result = client.publish(MQTT_TOPIC_OUTPUT, payload, qos=0)
print(f"📤 發送: Topic='{MQTT_TOPIC_OUTPUT}', Payload='{payload}', 狀態: {result.rc}")
if result.rc == mqtt.MQTT_ERR_SUCCESS:
if action == "ON":
buttons[port]['on'].config(relief=tk.SUNKEN, bg=COLOR_ON)
buttons[port]['off'].config(relief=tk.RAISED, bg=COLOR_OFF)
else: # OFF
buttons[port]['on'].config(relief=tk.RAISED, bg=COLOR_OFF)
buttons[port]['off'].config(relief=tk.SUNKEN, bg=COLOR_OFF)
except Exception as e:
print(f"發送錯誤: {e}")
messagebox.showerror("MQTT 錯誤", f"無法發送訊息: {e}")
# --- 主程式設定 ---
root.title("MQTT 8-Port Control & Monitor")
# 初始化 MQTT 客戶端
client = mqtt.Client(client_id=CLIENT_ID, protocol=mqtt.MQTTv5)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
# 設置 on_message 回調
client.message_callback_add(MQTT_TOPIC_INPUT, on_message_input)
# --- GUI 佈局 ---
main_frame = ttk.Frame(root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# 狀態標籤
status_label = tk.Label(main_frame, text="初始化中...", font=("Arial", 10), foreground="gray")
status_label.grid(row=0, column=0, columnspan=5, pady=5)
# --- 輸出控制區塊 ---
output_frame = ttk.LabelFrame(main_frame, text="OUTPUT Control (Relay/LED)", padding="10")
output_frame.grid(row=1, column=0, padx=10, pady=10, sticky=tk.W)
buttons = {}
for port in range(1, 9):
row_num = port
pin_num = RELAY_MAP[port]
ttk.Label(output_frame, text=f"Port P{port} (Pin {pin_num}):", width=15, anchor="w").grid(row=row_num, column=0, sticky=tk.W, padx=5, pady=2)
on_btn = tk.Button(output_frame, text="ON", command=lambda p=port: toggle_relay(p, "ON"), width=8, bg=COLOR_OFF)
on_btn.grid(row=row_num, column=1, padx=5, pady=2)
off_btn = tk.Button(output_frame, text="OFF", command=lambda p=port: toggle_relay(p, "OFF"), width=8, relief=tk.SUNKEN, bg=COLOR_OFF)
off_btn.grid(row=row_num, column=2, padx=5, pady=2)
buttons[port] = {'on': on_btn, 'off': off_btn}
# --- 輸入狀態監控區塊 ---
input_frame = ttk.LabelFrame(main_frame, text="INPUT Monitor (Pin State)", padding="10")
input_frame.grid(row=1, column=1, padx=10, pady=10, sticky=tk.W)
for port in range(1, 9):
port_key = f"P{port}"
pin_num = INPUT_MAP[port]
row_num = port
ttk.Label(input_frame, text=f"Input {port_key} (Pin {pin_num}):", width=15, anchor="w").grid(row=row_num, column=0, sticky=tk.W, padx=5, pady=2)
# LED 指示燈 (初始狀態為 LOW)
led = tk.Label(input_frame, text=TEXT_LOW, width=4, relief=tk.RAISED, bg=COLOR_INPUT_LOW)
led.grid(row=row_num, column=1, padx=5, pady=2)
input_leds[port_key] = led # 儲存引用
# --- 嘗試連接 MQTT ---
try:
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_start()
except Exception as e:
messagebox.showerror("連線錯誤", f"無法連接到 MQTT Broker: {e}")
if status_label:
status_label.config(text="連線錯誤,請檢查網路", foreground="red")
# 運行主循環並確保程式退出時關閉 MQTT 連接
def on_closing():
print("👋 關閉應用程式...")
client.loop_stop()
client.disconnect()
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()





沒有留言:
張貼留言