2025年10月23日 星期四

Python TKinter + MQTT + WOKWI ESP32 + Telegram

 Python TKinter + MQTT + WOKWI ESP32 + Telegram ---2



  • 程式啟動時主動發送「可用命令」列表給 Telegram:這需要在 MqttTkinterApp.__init__ 之後,使用 self.telegram_bot.send_help_to_telegram() 來執行。

  • ESP32 溫/濕度變化時主動通知 Telegram:這需要在 MqttTkinterApp.update_temperatureMqttTkinterApp.update_humidity 中新增邏輯,比較新值和舊值,如果變化超過閾值(例如 0.5),則發送通知。

  • import tkinter as tk

    from tkinter import messagebox

    import paho.mqtt.client as mqtt

    import threading

    import time

    import asyncio

    from telegram import Update

    from telegram.ext import Application, CommandHandler, ContextTypes


    # --- 設定 ---

    MQTT_BROKER = "broker.mqttgo.io"

    MQTT_PORT = 1883

    TOPIC_CONTROL = "wokwi/led/control"

    TOPIC_TEMP = "wokwi/dht/temperature"

    TOPIC_HUMIDITY = "wokwi/dht/humidity"


    # Telegram 設定 (⚠️ 請替換為您的 Bot Token 和 Chat ID)

    TELEGRAM_BOT_TOKEN = "7738940254:AAHbrWu9ovb1BKPQyWsbNSjNxfCGCrEWU-o"  

    TARGET_CHAT_ID = 7965218469

    # --- 應用程式狀態 ---

    app_status = {

        "led_state": "off",

        "temperature": "N/A",

        "humidity": "N/A",

        "status_request_pending": False, # 新增標誌來處理 /status


        # 新增用於主動通知的狀態變數

        "last_notified_temp": None,

        "last_notified_humidity": None,

        "notify_threshold": 0.5  # 溫度或濕度變化超過此值時發送通知 (例如 0.5°C/%)

    }


    # --- Tkinter GUI 類別 ---

    class MqttTkinterApp:

        def __init__(self, master):

            self.master = master

            master.title("MQTT Tkinter 控制台")


            # 狀態變數

            self.temp_var = tk.StringVar(value=f"溫度: {app_status['temperature']}")

            self.humidity_var = tk.StringVar(value=f"濕度: {app_status['humidity']}")

            self.led_status_var = tk.StringVar(value=f"LED 狀態: {app_status['led_state']}")

            self.flash_id = None  # 用於儲存閃爍的 after ID

            self.timer_id = None  # 用於儲存計時器的 after ID

            self.timer_end_time = None # 用於儲存計時結束時間


            # Tkinter 元件

            self.canvas = tk.Canvas(master, width=100, height=100)

            self.canvas.pack(pady=10)

            self.led_indicator = self.canvas.create_oval(10, 10, 90, 90, fill="gray")


            tk.Label(master, textvariable=self.temp_var).pack()

            tk.Label(master, textvariable=self.humidity_var).pack()

            tk.Label(master, textvariable=self.led_status_var).pack()


            # Telegram Command 提示

            tk.Label(master, text="Telegram Commands: /on, /off, /flash, /timer, /status").pack(pady=5)

            

            # 建立 MQTT 客戶端

            self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)

            self.client.on_connect = self.on_connect

            self.client.on_message = self.on_message

            self.client.on_publish = self.on_publish

            self.client.user_data_set(self) # 傳遞 Tkinter 實例到 MQTT callback


            # 啟動 Telegram Bot

            self.telegram_bot = TelegramBotHandler(self.client, self, TELEGRAM_BOT_TOKEN)

            self.telegram_bot.start_bot()

            

            # 啟動 MQTT 執行緒

            self.mqtt_thread = threading.Thread(target=self.mqtt_loop_start, daemon=True)

            self.mqtt_thread.start()



            # 處理 Tkinter 關閉事件

            master.protocol("WM_DELETE_WINDOW", self.on_closing)

            

            # === 需求 1: 程式啟動時發送 Help 命令給 Telegram ===

            # 在 MQTT 連線成功後,主動發送 Help 訊息

            self.master.after(2000, self.send_initial_help) # 延遲 2 秒確保 Bot 已啟動


        def send_initial_help(self):

            """程式啟動後,主動呼叫 Telegram Bot 發送 Help 訊息"""

            if self.telegram_bot.application and TARGET_CHAT_ID != 0:

                self.telegram_bot.send_help_to_telegram()


        def mqtt_loop_start(self):

            """在單獨的執行緒中啟動 MQTT 客戶端迴圈"""

            try:

                self.client.connect(MQTT_BROKER, MQTT_PORT, 60)

                self.client.loop_forever()

            except Exception as error: 

                print(f"MQTT 連線錯誤: {error}")

                self.master.after(0, lambda: messagebox.showerror("MQTT 錯誤", f"無法連接到 MQTT Broker: {error}")) 



        def on_connect(self, client, userdata, flags, rc):

            """連線成功的回呼函式 (在背景執行緒中執行)"""

            if rc == 0:

                print("MQTT 連線成功")

                client.subscribe(TOPIC_CONTROL)

                client.subscribe(TOPIC_TEMP)

                client.subscribe(TOPIC_HUMIDITY)

                self.master.after(0, self.update_gui_log, "MQTT 連線成功並已訂閱")

            else:

                print(f"MQTT 連線失敗, 返回碼 {rc}")

                self.master.after(0, lambda: messagebox.showerror("MQTT 錯誤", f"MQTT 連線失敗, 返回碼 {rc}"))



        def on_message(self, client, userdata, msg):

            """接收到 MQTT 訊息的回呼函式 (在背景執行緒中執行)"""

            payload = msg.payload.decode()

            app = userdata # 取得 Tkinter 實例

            

            print(f"收到訊息: {msg.topic} - {payload}")

            

            if msg.topic == TOPIC_CONTROL:

                app.update_led_state_external(payload)

            

            elif msg.topic == TOPIC_TEMP:

                app.update_temperature(payload)

                if app_status.get("status_request_pending") and app_status['humidity'] != 'N/A':

                     app.telegram_bot.send_temp_humidity_to_telegram(

                        f"當前溫濕度: 溫度 {app_status['temperature']}°C, 濕度 {app_status['humidity']}%"

                     )

                     app_status["status_request_pending"] = False


            elif msg.topic == TOPIC_HUMIDITY:

                app.update_humidity(payload)

                if app_status.get("status_request_pending") and app_status['temperature'] != 'N/A':

                     app.telegram_bot.send_temp_humidity_to_telegram(

                        f"當前溫濕度: 溫度 {app_status['temperature']}°C, 濕度 {app_status['humidity']}%"

                     )

                     app_status["status_request_pending"] = False


        def on_publish(self, client, userdata, mid):

            """發佈訊息成功的回呼函式 (可選)"""

            print(f"訊息 {mid} 已發佈")


        # --- GUI 更新方法 (在主執行緒中安全執行) ---


        def update_gui_log(self, message):

            """通用日誌/狀態更新 (如果需要)"""

            print(message)


        def update_led_state_external(self, state):

            """根據 MQTT 接收到的 LED 狀態更新 GUI"""

            self.master.after(0, self._set_led_indicator, state)


        def update_temperature(self, temp_str):

            """更新溫度顯示並檢查是否需要主動通知 Telegram (需求 2)"""

            try:

                temp = float(temp_str)

            except ValueError:

                app_status["temperature"] = temp_str

                self.master.after(0, self.temp_var.set, f"溫度: {temp_str}°C")

                return


            # 檢查是否需要主動通知 Telegram

            last_temp = app_status['last_notified_temp']

            if last_temp is None:

                # 第一次接收,設置初始值

                app_status['last_notified_temp'] = temp

            elif abs(temp - last_temp) >= app_status['notify_threshold']:

                # 溫度變化超過閾值,發送通知

                self.telegram_bot.send_notification_to_telegram(

                    f"🌡️ 溫度更新: {temp:.2f}°C (原值: {last_temp:.2f}°C)"

                )

                app_status['last_notified_temp'] = temp


            app_status["temperature"] = temp

            self.master.after(0, self.temp_var.set, f"溫度: {temp}°C")

            

        def update_humidity(self, humidity_str):

            """更新濕度顯示並檢查是否需要主動通知 Telegram (需求 2)"""

            try:

                humidity = float(humidity_str)

            except ValueError:

                app_status["humidity"] = humidity_str

                self.master.after(0, self.humidity_var.set, f"濕度: {humidity_str}%")

                return


            # 檢查是否需要主動通知 Telegram

            last_humidity = app_status['last_notified_humidity']

            if last_humidity is None:

                # 第一次接收,設置初始值

                app_status['last_notified_humidity'] = humidity

            elif abs(humidity - last_humidity) >= app_status['notify_threshold']:

                # 濕度變化超過閾值,發送通知

                self.telegram_bot.send_notification_to_telegram(

                    f"💧 濕度更新: {humidity:.2f}% (原值: {last_humidity:.2f}%)"

                )

                app_status['last_notified_humidity'] = humidity


            app_status["humidity"] = humidity

            self.master.after(0, self.humidity_var.set, f"濕度: {humidity}%")


        # --- LED 控制邏輯 (在主執行緒中安全執行) ---

        # ... (此處省略與 LED 相關的 _cancel_timers, _set_led_indicator, _flash_toggle, _timer_countdown 函數,它們保持不變) ...

        def _cancel_timers(self):

            """取消所有計時器和閃爍"""

            if self.flash_id:

                self.master.after_cancel(self.flash_id)

                self.flash_id = None

            if self.timer_id:

                self.master.after_cancel(self.timer_id)

                self.timer_id = None

            self.timer_end_time = None

            

        def _set_led_indicator(self, state):

            """實際改變 GUI 中的圓形 LED 顏色和狀態文字"""

            self._cancel_timers() # 先取消舊的動畫

            

            app_status["led_state"] = state

            self.led_status_var.set(f"LED 狀態: {state}")

            

            if state == "on":

                self.canvas.itemconfig(self.led_indicator, fill="green")

            elif state == "off":

                self.canvas.itemconfig(self.led_indicator, fill="red")

            elif state == "flash":

                self._flash_toggle(True)

            elif state == "timer":

                self.timer_end_time = time.time() + 10 # 10 秒後結束

                self.canvas.itemconfig(self.led_indicator, fill="green")

                self._timer_countdown()

            elif state == "green": # 閃爍用狀態

                self.canvas.itemconfig(self.led_indicator, fill="green")

            elif state == "red": # 閃爍用狀態

                self.canvas.itemconfig(self.led_indicator, fill="red")

            else:

                self.canvas.itemconfig(self.led_indicator, fill="gray")


        def _flash_toggle(self, is_green):

            """閃爍動畫邏輯 (綠色/紅色交替)"""

            if app_status["led_state"] != "flash":

                return # 確保只在 flash 狀態時運行


            color = "green" if is_green else "red"

            self.canvas.itemconfig(self.led_indicator, fill=color)


            # 0.5 秒後呼叫自己,切換顏色

            self.flash_id = self.master.after(500, self._flash_toggle, not is_green)


        def _timer_countdown(self):

            """計時器動畫邏輯 (10 秒後變紅)"""

            if self.timer_end_time is None or time.time() >= self.timer_end_time:

                # 時間到,轉為紅色 LED (off 狀態)

                self.publish_command("off", from_telegram=False) # 發佈 off 訊息

                self.canvas.itemconfig(self.led_indicator, fill="red")

                app_status["led_state"] = "off"

                self.led_status_var.set("LED 狀態: off (Timer End)")

                self.timer_id = None

                return


            # 每 100 毫秒更新一次

            self.timer_id = self.master.after(100, self._timer_countdown)

            

        # --- MQTT 發佈方法 ---


        def publish_command(self, command, from_telegram=True):

            """發佈 LED 控制命令並更新 GUI"""

            

            # 1. 更新 GUI 

            self.master.after(0, self._set_led_indicator, command)

            

            # 2. 發佈 MQTT 訊息

            self.client.publish(TOPIC_CONTROL, command, qos=0)

            

            # 3. 回傳狀態給 Telegram

            if from_telegram and TARGET_CHAT_ID != 0 and self.telegram_bot.application:

                self.telegram_bot.send_status_to_telegram(f"命令已執行: /{command}. LED 狀態: {app_status['led_state']}")



        def on_closing(self):

            """應用程式關閉時的清理工作"""

            print("正在關閉應用程式...")

            self._cancel_timers() 

            

            if hasattr(self, 'telegram_bot') and self.telegram_bot:

                 self.telegram_bot.stop_bot() 

            

            if hasattr(self, 'client') and self.client:

                self.client.loop_stop()


            self.master.destroy()



    # --- Telegram Bot 類別 ---


    class TelegramBotHandler:

        # ... (tg_loop 修正保持不變) ...

        def __init__(self, mqtt_client, app, token):

            self.mqtt_client = mqtt_client

            self.app = app # Tkinter 應用實例

            self.token = token

            # 新增變數來儲存 asyncio loop (針對跨執行緒呼叫修正)

            self.tg_loop = None 

            

            # 只有在 Token 非預設值時才初始化 Bot

            if token == "YOUR_TELEGRAM_BOT_TOKEN":

                print("警告: Telegram Bot Token 未設定,Bot 功能將被跳過。")

                self.application = None

                return


            self.application = Application.builder().token(token).build()


            # 註冊命令處理器

            self.application.add_handler(CommandHandler("on", self.handle_command))

            self.application.add_handler(CommandHandler("off", self.handle_command))

            self.application.add_handler(CommandHandler("flash", self.handle_command))

            self.application.add_handler(CommandHandler("timer", self.handle_command))

            self.application.add_handler(CommandHandler("status", self.handle_status))

            self.application.add_handler(CommandHandler("help", self.handle_help))


            # 啟動 Bot 的執行緒 (非阻塞)

            self.bot_thread = threading.Thread(target=self._run_bot, daemon=True)


        def start_bot(self):

            """啟動 Telegram Bot 執行緒"""

            if self.application:

                self.bot_thread.start()

                print("Telegram Bot 已啟動")


        def stop_bot(self):

            """停止 Telegram Bot"""

            if self.application and self.application.running:

                try:

                    self.application.stop()

                    print("Telegram Bot 已停止")

                except Exception as e:

                    print(f"停止 Telegram Bot 錯誤: {e}")


        def _run_bot(self):

            """Bot 輪詢迴圈 (已修正: 手動設定和儲存 loop)"""

            if self.application:

                # 1. 取得或創建一個新的 asyncio loop 給這個執行緒

                try:

                    loop = asyncio.get_event_loop()

                except RuntimeError:

                    loop = asyncio.new_event_loop()

                    asyncio.set_event_loop(loop)

                

                # 2. 儲存 loop 供其他執行緒存取

                self.tg_loop = loop

                

                # 3. 運行 poller (它會使用當前執行緒的 loop)

                self.application.run_polling(poll_interval=0.5, timeout=10)



        async def handle_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:

            """處理 /on, /off, /flash, /timer 命令"""

            

            if TARGET_CHAT_ID == 0:

                 print(f"DEBUG: 收到來自 Chat ID: {update.message.chat_id} 的命令") 

                 

            if update.message.chat_id != TARGET_CHAT_ID:

                await update.message.reply_text("未授權的用戶。")

                return

                

            command = update.message.text[1:] 

            self.app.publish_command(command, from_telegram=True)

            


        async def handle_status(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:

            """處理 /status 命令"""

            if update.message.chat_id != TARGET_CHAT_ID:

                await update.message.reply_text("未授權的用戶。")

                return

                

            app_status["status_request_pending"] = True

            app_status["temperature"] = "N/A"

            app_status["humidity"] = "N/A"

            

            await update.message.reply_text("正在擷取溫濕度數據,請稍候...")


        async def handle_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:

            """處理 /help 命令"""

            help_message = self._get_help_message()

            await update.message.reply_text(help_message)

            

        def _get_help_message(self):

            """生成 Help 訊息字串"""

            return (

                "可用命令:\n"

                "/on - 開啟 LED (綠色)\n"

                "/off - 關閉 LED (紅色)\n"

                "/flash - LED 閃爍 (紅綠交替)\n"

                "/timer - LED 開啟 10 秒後自動關閉\n"

                "/status - 取得當前溫度和濕度"

            )


        # --- Telegram 訊息發送器 (已修正: 使用 self.tg_loop) ---

        

        def send_notification_to_telegram(self, text):

            """發送主動通知給 Telegram (需求 2)"""

            if self.application and self.application.running and TARGET_CHAT_ID != 0 and self.tg_loop: 

                asyncio.run_coroutine_threadsafe(

                    self.application.bot.send_message(chat_id=TARGET_CHAT_ID, text=text), 

                    self.tg_loop

                )


        def send_status_to_telegram(self, text):

            """發送 LED 狀態給 Telegram"""

            if self.application and self.application.running and TARGET_CHAT_ID != 0 and self.tg_loop: 

                asyncio.run_coroutine_threadsafe(

                    self.application.bot.send_message(chat_id=TARGET_CHAT_ID, text=text), 

                    self.tg_loop

                )


        def send_temp_humidity_to_telegram(self, text):

            """發送溫濕度數據給 Telegram"""

            if self.application and self.application.running and TARGET_CHAT_ID != 0 and self.tg_loop:

                asyncio.run_coroutine_threadsafe(

                    self.application.bot.send_message(chat_id=TARGET_CHAT_ID, text=text), 

                    self.tg_loop

                )


        def send_help_to_telegram(self):

            """發送初始 Help 訊息給 Telegram (需求 1)"""

            help_message = self._get_help_message()

            if self.application and self.application.running and TARGET_CHAT_ID != 0 and self.tg_loop:

                asyncio.run_coroutine_threadsafe(

                    self.application.bot.send_message(chat_id=TARGET_CHAT_ID, text=f"🎉 程式已啟動!\n\n{help_message}"), 

                    self.tg_loop

                )


    # --- 主程式 ---

    if __name__ == '__main__':

        try:

            if threading.current_thread() is threading.main_thread():

                asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

        except AttributeError:

            pass 


        root = tk.Tk()

        app = MqttTkinterApp(root)

        root.mainloop()


    沒有留言:

    張貼留言

    ESP32 (ESP-IDF in VS Code) MFRC522 + MQTT + PYTHON TKinter +SQLite

     ESP32 (ESP-IDF in VS Code) MFRC522 + MQTT + PYTHON TKinter +SQLite  ESP32 VS Code 程式 ; PlatformIO Project Configuration File ; ;   Build op...