Wokwi ESP32 8 Port Input/Output + MQTT + Python + Telegram 控制
P3-P8 (有內建拉電阻的腳位): 如果您的按鈕是連接到 VCC (3.3V),按下按鈕時,腳位會保持 HIGH,因此狀態永遠不會改變。P1/P2 (無內建拉電阻的腳位,Pin 34/35): 這些腳位缺乏內建電阻,它們的讀數處於 浮動 狀態,雖然可能偶爾跳變,但不可靠。
WOKWI ESP32程式
PYTHON程式
import tkinter as tk
from tkinter import ttk, messagebox
import paho.mqtt.client as mqtt
import json
import threading
import time
# --- Telegram 設定 (!!!請替換為您的資訊!!!) ---
BOT_TOKEN = "8102227020986:AAGymymK9_d1HcTGJWl3mtqHmilxB64_5Zw" #"YOUR_BOT_TOKEN" # 替換為您的 Bot Token
CHAT_ID = 79651218469 #"YOUR_CHAT_ID" # 替換為您的目標 Chat ID (接收通知的對象)
BOT_USERNAME = "@alextest999_bot"#"YOUR_TELEGRAM_BOT_USERNAME" # 您的 Bot 使用者名稱
TELEGRAM_ENABLED = True
try:
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
except ImportError:
print("❌ Telegram 庫未安裝。請執行 'pip install python-telegram-bot'。將禁用 Telegram 功能。")
TELEGRAM_ENABLED = False
# --- MQTT 設定 ---
MQTT_BROKER = "broker.mqtt-dashboard.com"
MQTT_PORT = 1883
MQTT_TOPIC_OUTPUT = "alex9ufo/8port/LED"
MQTT_TOPIC_INPUT = "alex9ufo/8port/INPUT"
CLIENT_ID = "Tkinter_Controller_Monitor_Final"
# --- 腳位對應 ---
RELAY_MAP = {1: 23, 2: 22, 3: 21, 4: 19, 5: 18, 6: 5, 7: 17, 8: 16}
INPUT_MAP = {1: 34, 2: 35, 3: 32, 4: 33, 5: 25, 6: 26, 7: 27, 8: 14}
# --- 狀態追蹤與顏色/文字定義 ---
# 使用字典追蹤當前狀態 (P1: 0/1)
CURRENT_LED_STATE = {f'P{i}': 0 for i in range(1, 9)}
CURRENT_INPUT_STATE = {f'P{i}': 0 for i in range(1, 9)}
COLOR_ON = "green"
COLOR_OFF = "SystemButtonFace"
COLOR_INPUT_HIGH = "green"
COLOR_INPUT_LOW = "lightgray"
TEXT_HIGH = "HIGH"
TEXT_LOW = "LOW"
# --- 全域 Tkinter 變數 ---
root = tk.Tk()
status_label = None
input_leds = {}
buttons = {}
application = None # Telegram Bot Application
# --- Telegram 處理函式 ---
def format_status_message(title, state_dict):
"""格式化 LED 或 INPUT 狀態訊息."""
# 建立一個包含 Port/Pin/State 的列表
lines = [f"--- {title} 狀態 ---"]
for i in range(1, 9):
port_key = f'P{i}'
pin_num = RELAY_MAP[i] if 'LED' in title else INPUT_MAP[i]
# 根據狀態決定顯示文字
state_value = state_dict.get(port_key, 0)
state_text = TEXT_HIGH if state_value == 1 else TEXT_LOW
lines.append(f"{port_key} (Pin {pin_num}): {state_text}")
return "\n".join(lines)
async def send_telegram_message(message: str):
"""將訊息發送到 Telegram Bot."""
if TELEGRAM_ENABLED and application:
try:
# 確保 Chat ID 是一個字串
await application.bot.send_message(chat_id=str(CHAT_ID), text=message)
print(f"📧 Telegram 訊息已發送: {message.splitlines()[0]}...")
except Exception as e:
print(f"❌ Telegram 發送失敗: {e}")
async def led_status_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""處理 /led 命令,回覆 LED 狀態."""
msg = format_status_message("當前 LED (輸出)", CURRENT_LED_STATE)
await update.message.reply_text(msg)
async def input_status_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""處理 /input 命令,回覆 INPUT 狀態."""
msg = format_status_message("當前 INPUT (輸入)", CURRENT_INPUT_STATE)
await update.message.reply_text(msg)
def run_telegram_bot():
"""在獨立線程中運行 Telegram Bot 服務."""
global application
if not TELEGRAM_ENABLED:
return
try:
application = Application.builder().token(BOT_TOKEN).build()
application.add_handler(CommandHandler("led", led_status_command))
application.add_handler(CommandHandler("input", input_status_command))
# 以非阻塞方式運行 Bot
application.run_polling(poll_interval=1.0)
except Exception as e:
print(f"❌ Telegram Bot 啟動失敗: {e}")
# --- 狀態改變檢查與通知 ---
def check_and_notify(new_state: dict, old_state: dict, title: str):
"""檢查狀態變化並發送 Telegram 通知."""
changed = False
for port, new_value in new_state.items():
if port in old_state and new_value != old_state[port]:
changed = True
break
if changed:
message = format_status_message(f"{title} 狀態更新", new_state)
# 在新的線程中發送 Telegram 訊息,避免阻塞
threading.Thread(target=lambda: application.loop.run_until_complete(send_telegram_message(message))).start()
old_state.update(new_state) # 更新狀態
# --- MQTT 客戶端回調函式 ---
def on_connect(client, userdata, flags, rc, properties=None):
"""當客戶端連接到 Broker 時被調用。"""
if rc == 0:
print("💡 MQTT 連接成功!")
root.after(0, lambda: status_label.config(text=f"已連接到 {MQTT_BROKER}", foreground="green"))
client.subscribe(MQTT_TOPIC_INPUT, qos=0)
print(f"✅ 訂閱 Topic: {MQTT_TOPIC_INPUT}")
else:
print(f"❌ MQTT 連接失敗,返回碼: {rc}")
root.after(0, lambda: status_label.config(text=f"連接失敗 (Code {rc})", foreground="red"))
def on_disconnect(client, userdata, rc, properties=None):
"""當客戶端從 Broker 斷開連接時被調用。"""
print(f"🔌 MQTT 斷開連接,返回碼: {rc}")
root.after(0, lambda: status_label.config(text="已斷開連接", foreground="gray"))
def on_message_input(client, userdata, msg):
"""處理來自 ESP32 的輸入狀態訊息 (alex9ufo/8port/INPUT)。"""
payload = msg.payload.decode()
try:
states = json.loads(payload)
# 1. 檢查並發送 Telegram 通知
check_and_notify(states, CURRENT_INPUT_STATE, "INPUT (輸入)")
# 2. 在主線程中更新 GUI
root.after(0, lambda: update_input_leds_gui(states))
except Exception as e:
print(f"處理輸入訊息錯誤: {e}")
def update_input_leds_gui(states):
"""根據收到的 JSON 數據更新 GUI 上的輸入狀態指示燈的顏色和文字。"""
for port, state in states.items():
if port in input_leds:
if state == 1:
color = COLOR_INPUT_HIGH
text = TEXT_HIGH
else:
color = COLOR_INPUT_LOW
text = TEXT_LOW
input_leds[port].config(bg=color, text=text)
# --- GUI 輸出控制邏輯 ---
def toggle_relay(port, action):
"""根據按鈕點擊發送 MQTT 訊息並更新顏色。"""
payload = f"P{port}{action}"
if not client.is_connected():
messagebox.showwarning("連線狀態", "MQTT 客戶端未連接,請檢查網路。")
return
try:
result = client.publish(MQTT_TOPIC_OUTPUT, payload, qos=0)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
new_state_value = 1 if action == "ON" else 0
port_key = f'P{port}'
# 1. 檢查並發送 Telegram 通知
temp_new_state = CURRENT_LED_STATE.copy()
temp_new_state[port_key] = new_state_value
check_and_notify(temp_new_state, CURRENT_LED_STATE, "LED (輸出)")
# 2. 更新 GUI
if action == "ON":
buttons[port]['on'].config(relief=tk.SUNKEN, bg=COLOR_ON)
buttons[port]['off'].config(relief=tk.RAISED, bg=COLOR_OFF)
else: # OFF
buttons[port]['on'].config(relief=tk.RAISED, bg=COLOR_OFF)
buttons[port]['off'].config(relief=tk.SUNKEN, bg=COLOR_OFF)
except Exception as e:
messagebox.showerror("MQTT 錯誤", f"無法發送訊息: {e}")
# --- 主程式設定與 GUI 佈局 ---
root.title("MQTT 8-Port Control & Monitor")
# 初始化 MQTT 客戶端
client = mqtt.Client(client_id=CLIENT_ID, protocol=mqtt.MQTTv5)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.message_callback_add(MQTT_TOPIC_INPUT, on_message_input)
# GUI 佈局
main_frame = ttk.Frame(root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# 狀態標籤
status_label = tk.Label(main_frame, text="初始化中...", font=("Arial", 10), foreground="gray")
status_label.grid(row=0, column=0, columnspan=5, pady=5)
# --- 輸出控制區塊 (LED/Relay) ---
output_frame = ttk.LabelFrame(main_frame, text="OUTPUT Control (Relay/LED)", padding="10")
output_frame.grid(row=1, column=0, padx=10, pady=10, sticky=tk.W)
for port in range(1, 9):
row_num = port
pin_num = RELAY_MAP[port]
ttk.Label(output_frame, text=f"Port P{port} (Pin {pin_num}):", width=15, anchor="w").grid(row=row_num, column=0, sticky=tk.W, padx=5, pady=2)
on_btn = tk.Button(output_frame, text="ON", command=lambda p=port: toggle_relay(p, "ON"), width=8, bg=COLOR_OFF)
on_btn.grid(row=row_num, column=1, padx=5, pady=2)
off_btn = tk.Button(output_frame, text="OFF", command=lambda p=port: toggle_relay(p, "OFF"), width=8, relief=tk.SUNKEN, bg=COLOR_OFF)
off_btn.grid(row=row_num, column=2, padx=5, pady=2)
buttons[port] = {'on': on_btn, 'off': off_btn}
# --- 輸入狀態監控區塊 ---
input_frame = ttk.LabelFrame(main_frame, text="INPUT Monitor (Pin State)", padding="10")
input_frame.grid(row=1, column=1, padx=10, pady=10, sticky=tk.W)
for port in range(1, 9):
port_key = f"P{port}"
pin_num = INPUT_MAP[port]
row_num = port
ttk.Label(input_frame, text=f"Input {port_key} (Pin {pin_num}):", width=15, anchor="w").grid(row=row_num, column=0, sticky=tk.W, padx=5, pady=2)
led = tk.Label(input_frame, text=TEXT_LOW, width=4, relief=tk.RAISED, bg=COLOR_INPUT_LOW)
led.grid(row=row_num, column=1, padx=5, pady=2)
input_leds[port_key] = led
# --- 啟動 MQTT 和 Telegram ---
try:
# 啟動 Telegram Bot (在獨立線程中)
if TELEGRAM_ENABLED:
telegram_thread = threading.Thread(target=run_telegram_bot)
telegram_thread.daemon = True # 確保主程式退出時 Bot 線程也會退出
telegram_thread.start()
# 啟動 MQTT
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_start()
except Exception as e:
messagebox.showerror("連線錯誤", f"無法連接到 MQTT Broker 或啟動 Bot: {e}")
if status_label:
status_label.config(text="連線錯誤,請檢查網路", foreground="red")
# 運行主循環並確保程式退出時關閉所有線程
def on_closing():
print("👋 關閉應用程式...")
client.loop_stop()
client.disconnect()
if TELEGRAM_ENABLED and application:
# 停止 Telegram Bot 的 polling
application.stop_running()
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()





沒有留言:
張貼留言