2025年10月23日 星期四

Python TKinter + MQTT + WOKWI ESP32 + Telegram

Python TKinter + MQTT + WOKWI ESP32 + Telegram 









Python + TKinter 程式

import tkinter as tk

from tkinter import messagebox

import paho.mqtt.client as mqtt

import threading

import time

import asyncio

from telegram import Update

from telegram.ext import Application, CommandHandler, ContextTypes


# --- 設定 ---

MQTT_BROKER = "broker.mqttgo.io"

MQTT_PORT = 1883

TOPIC_CONTROL = "wokwi/led/control"

TOPIC_TEMP = "wokwi/dht/temperature"

TOPIC_HUMIDITY = "wokwi/dht/humidity"


# Telegram 設定 (⚠️ 請替換為您的 Bot Token 和 Chat ID)

TELEGRAM_BOT_TOKEN = "773289410254:AAHbrWu9ovb1BKPQyWsb1NSjNxfCGCrEWU-o"  

TARGET_CHAT_ID = 7192652181469

# --- 應用程式狀態 ---

app_status = {

    "led_state": "off",

    "temperature": "N/A",

    "humidity": "N/A",

    "status_request_pending": False # 新增標誌來處理 /status

}


# --- Tkinter GUI 類別 ---

class MqttTkinterApp:

    def __init__(self, master):

        self.master = master

        master.title("MQTT Tkinter 控制台")


        # 狀態變數

        self.temp_var = tk.StringVar(value=f"溫度: {app_status['temperature']}")

        self.humidity_var = tk.StringVar(value=f"濕度: {app_status['humidity']}")

        self.led_status_var = tk.StringVar(value=f"LED 狀態: {app_status['led_state']}")

        self.flash_id = None  # 用於儲存閃爍的 after ID

        self.timer_id = None  # 用於儲存計時器的 after ID

        self.timer_end_time = None # 用於儲存計時結束時間


        # Tkinter 元件

        self.canvas = tk.Canvas(master, width=100, height=100)

        self.canvas.pack(pady=10)

        self.led_indicator = self.canvas.create_oval(10, 10, 90, 90, fill="gray")


        tk.Label(master, textvariable=self.temp_var).pack()

        tk.Label(master, textvariable=self.humidity_var).pack()

        tk.Label(master, textvariable=self.led_status_var).pack()


        # Telegram Command 提示

        tk.Label(master, text="Telegram Commands: /on, /off, /flash, /timer, /status").pack(pady=5)

        

        # 建立 MQTT 客戶端

        self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)

        self.client.on_connect = self.on_connect

        self.client.on_message = self.on_message

        self.client.on_publish = self.on_publish

        self.client.user_data_set(self) # 傳遞 Tkinter 實例到 MQTT callback


        # 啟動 Telegram Bot

        self.telegram_bot = TelegramBotHandler(self.client, self, TELEGRAM_BOT_TOKEN)

        self.telegram_bot.start_bot()

        

        # 啟動 MQTT 執行緒

        self.mqtt_thread = threading.Thread(target=self.mqtt_loop_start, daemon=True)

        self.mqtt_thread.start()



        # 處理 Tkinter 關閉事件

        master.protocol("WM_DELETE_WINDOW", self.on_closing)


    def mqtt_loop_start(self):

        """在單獨的執行緒中啟動 MQTT 客戶端迴圈 (已修正 NameError)"""

        try:

            self.client.connect(MQTT_BROKER, MQTT_PORT, 60)

            self.client.loop_forever()

        except Exception as error: # 將錯誤捕獲為 error

            print(f"MQTT 連線錯誤: {error}")

            # 修正:將 Tkinter 操作延遲到主執行緒執行,使用 'error' 變數

            self.master.after(0, lambda: messagebox.showerror("MQTT 錯誤", f"無法連接到 MQTT Broker: {error}")) 



    def on_connect(self, client, userdata, flags, rc):

        """連線成功的回呼函式 (在背景執行緒中執行)"""

        if rc == 0:

            print("MQTT 連線成功")

            client.subscribe(TOPIC_CONTROL)

            client.subscribe(TOPIC_TEMP)

            client.subscribe(TOPIC_HUMIDITY)

            # 修正:將 Tkinter 操作延遲到主執行緒執行

            self.master.after(0, self.update_gui_log, "MQTT 連線成功並已訂閱")

        else:

            print(f"MQTT 連線失敗, 返回碼 {rc}")

            # 修正:將 Tkinter 操作延遲到主執行緒執行

            self.master.after(0, lambda: messagebox.showerror("MQTT 錯誤", f"MQTT 連線失敗, 返回碼 {rc}"))



    def on_message(self, client, userdata, msg):

        """接收到 MQTT 訊息的回呼函式 (在背景執行緒中執行)"""

        payload = msg.payload.decode()

        app = userdata # 取得 Tkinter 實例

        

        print(f"收到訊息: {msg.topic} - {payload}")


        # 所有 GUI 更新都必須使用 self.master.after(0, ...)

        

        if msg.topic == TOPIC_CONTROL:

            # 接收來自外部(如 ESP32)的 LED 狀態

            app.update_led_state_external(payload)

        

        elif msg.topic == TOPIC_TEMP:

            app.update_temperature(payload)

            # 如果是 /status 要求的,則檢查是否已收到濕度數據

            if app_status.get("status_request_pending") and app_status['humidity'] != 'N/A':

                 app.telegram_bot.send_temp_humidity_to_telegram(

                    f"當前溫濕度: 溫度 {app_status['temperature']}°C, 濕度 {app_status['humidity']}%"

                 )

                 app_status["status_request_pending"] = False # 重置標誌


        elif msg.topic == TOPIC_HUMIDITY:

            app.update_humidity(payload)

            # 如果是 /status 要求的,則檢查是否已收到溫度數據

            if app_status.get("status_request_pending") and app_status['temperature'] != 'N/A':

                 app.telegram_bot.send_temp_humidity_to_telegram(

                    f"當前溫濕度: 溫度 {app_status['temperature']}°C, 濕度 {app_status['humidity']}%"

                 )

                 app_status["status_request_pending"] = False # 重置標誌


    def on_publish(self, client, userdata, mid):

        """發佈訊息成功的回呼函式 (可選)"""

        print(f"訊息 {mid} 已發佈")


    # --- GUI 更新方法 (在主執行緒中安全執行) ---


    def update_gui_log(self, message):

        """通用日誌/狀態更新 (如果需要)"""

        print(message)


    def update_led_state_external(self, state):

        """根據 MQTT 接收到的 LED 狀態更新 GUI"""

        # 這裡已經使用了 self.master.after(0, ...) 確保在主執行緒中執行

        self.master.after(0, self._set_led_indicator, state)


    def update_temperature(self, temp):

        """更新溫度顯示"""

        app_status["temperature"] = temp

        self.master.after(0, self.temp_var.set, f"溫度: {temp}°C")

        

    def update_humidity(self, humidity):

        """更新濕度顯示"""

        app_status["humidity"] = humidity

        self.master.after(0, self.humidity_var.set, f"濕度: {humidity}%")


    # --- LED 控制邏輯 (在主執行緒中安全執行) ---


    def _cancel_timers(self):

        """取消所有計時器和閃爍"""

        if self.flash_id:

            self.master.after_cancel(self.flash_id)

            self.flash_id = None

        if self.timer_id:

            self.master.after_cancel(self.timer_id)

            self.timer_id = None

        self.timer_end_time = None

        

    def _set_led_indicator(self, state):

        """實際改變 GUI 中的圓形 LED 顏色和狀態文字"""

        self._cancel_timers() # 先取消舊的動畫

        

        app_status["led_state"] = state

        self.led_status_var.set(f"LED 狀態: {state}")

        

        if state == "on":

            self.canvas.itemconfig(self.led_indicator, fill="green")

        elif state == "off":

            self.canvas.itemconfig(self.led_indicator, fill="red")

        elif state == "flash":

            self._flash_toggle(True)

        elif state == "timer":

            self.timer_end_time = time.time() + 10 # 10 秒後結束

            self.canvas.itemconfig(self.led_indicator, fill="green")

            self._timer_countdown()

        elif state == "green": # 閃爍用狀態

            self.canvas.itemconfig(self.led_indicator, fill="green")

        elif state == "red": # 閃爍用狀態

            self.canvas.itemconfig(self.led_indicator, fill="red")

        else:

            self.canvas.itemconfig(self.led_indicator, fill="gray")


    def _flash_toggle(self, is_green):

        """閃爍動畫邏輯 (綠色/紅色交替)"""

        if app_status["led_state"] != "flash":

            return # 確保只在 flash 狀態時運行


        color = "green" if is_green else "red"

        self.canvas.itemconfig(self.led_indicator, fill=color)


        # 0.5 秒後呼叫自己,切換顏色

        self.flash_id = self.master.after(500, self._flash_toggle, not is_green)


    def _timer_countdown(self):

        """計時器動畫邏輯 (10 秒後變紅)"""

        if self.timer_end_time is None or time.time() >= self.timer_end_time:

            # 時間到,轉為紅色 LED (off 狀態)

            self.publish_command("off", from_telegram=False) # 發佈 off 訊息

            self.canvas.itemconfig(self.led_indicator, fill="red")

            app_status["led_state"] = "off"

            self.led_status_var.set("LED 狀態: off (Timer End)")

            self.timer_id = None

            return


        # 每 100 毫秒更新一次

        self.timer_id = self.master.after(100, self._timer_countdown)

        

    # --- MQTT 發佈方法 ---


    def publish_command(self, command, from_telegram=True):

        """發佈 LED 控制命令並更新 GUI"""

        

        # 1. 更新 GUI (安全地調用主執行緒中的方法)

        self.master.after(0, self._set_led_indicator, command)

        

        # 2. 發佈 MQTT 訊息 (可以在任何執行緒中安全執行)

        self.client.publish(TOPIC_CONTROL, command, qos=0)

        

        # 3. 回傳狀態給 Telegram (如果命令來自 Telegram)

        if from_telegram and TARGET_CHAT_ID != 0 and self.telegram_bot.application:

            self.telegram_bot.send_status_to_telegram(f"命令已執行: /{command}. LED 狀態: {app_status['led_state']}")



    def on_closing(self):

        """應用程式關閉時的清理工作"""

        print("正在關閉應用程式...")

        self._cancel_timers() # 停止所有 Tkinter 計時器

        

        if hasattr(self, 'telegram_bot') and self.telegram_bot:

             self.telegram_bot.stop_bot() # 停止 Telegram Bot

        

        if hasattr(self, 'client') and self.client:

            self.client.loop_stop() # 停止 MQTT 迴圈


        self.master.destroy()



# --- Telegram Bot 類別 ---


class TelegramBotHandler:

    def __init__(self, mqtt_client, app, token):

        self.mqtt_client = mqtt_client

        self.app = app # Tkinter 應用實例

        self.token = token

        # 新增變數來儲存 asyncio loop (針對跨執行緒呼叫修正)

        self.tg_loop = None 

        

        # 只有在 Token 非預設值時才初始化 Bot

        if token == "YOUR_TELEGRAM_BOT_TOKEN":

            print("警告: Telegram Bot Token 未設定,Bot 功能將被跳過。")

            self.application = None

            return


        self.application = Application.builder().token(token).build()


        # 註冊命令處理器

        self.application.add_handler(CommandHandler("on", self.handle_command))

        self.application.add_handler(CommandHandler("off", self.handle_command))

        self.application.add_handler(CommandHandler("flash", self.handle_command))

        self.application.add_handler(CommandHandler("timer", self.handle_command))

        self.application.add_handler(CommandHandler("status", self.handle_status))

        self.application.add_handler(CommandHandler("help", self.handle_help))


        # 啟動 Bot 的執行緒 (非阻塞)

        self.bot_thread = threading.Thread(target=self._run_bot, daemon=True)


    def start_bot(self):

        """啟動 Telegram Bot 執行緒"""

        if self.application:

            self.bot_thread.start()

            print("Telegram Bot 已啟動")


    def stop_bot(self):

        """停止 Telegram Bot"""

        if self.application and self.application.running:

            try:

                self.application.stop()

                print("Telegram Bot 已停止")

            except Exception as e:

                print(f"停止 Telegram Bot 錯誤: {e}")


    def _run_bot(self):

        """Bot 輪詢迴圈 (已修正: 手動設定和儲存 loop)"""

        if self.application:

            # 1. 取得或創建一個新的 asyncio loop 給這個執行緒

            try:

                loop = asyncio.get_event_loop()

            except RuntimeError:

                loop = asyncio.new_event_loop()

                asyncio.set_event_loop(loop)

            

            # 2. 儲存 loop 供其他執行緒存取

            self.tg_loop = loop

            

            # 3. 運行 poller (它會使用當前執行緒的 loop)

            self.application.run_polling(poll_interval=0.5, timeout=10)



    async def handle_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:

        """處理 /on, /off, /flash, /timer 命令"""

        

        # 偵錯用,確認 Chat ID 已修正

        if TARGET_CHAT_ID == 0:

             print(f"DEBUG: 收到來自 Chat ID: {update.message.chat_id} 的命令") 

             

        # 檢查授權用戶

        if update.message.chat_id != TARGET_CHAT_ID:

            await update.message.reply_text("未授權的用戶。")

            return

            

        command = update.message.text[1:] # 移除 '/'

        

        # 呼叫 Tkinter 應用程式的發佈方法 (在 Bot 執行緒中執行)

        self.app.publish_command(command, from_telegram=True)

        


    async def handle_status(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:

        """處理 /status 命令"""

        if update.message.chat_id != TARGET_CHAT_ID:

            await update.message.reply_text("未授權的用戶。")

            return

            

        # 1. 重置數據和設置標誌,指示等待溫濕度數據

        app_status["status_request_pending"] = True

        app_status["temperature"] = "N/A"

        app_status["humidity"] = "N/A"

        

        # 2. 回覆訊息

        await update.message.reply_text("正在擷取溫濕度數據,請稍候...")

        # 3. 溫濕度數據將由 MQTT 的 on_message 接收並發送給 Telegram


    async def handle_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:

        """處理 /help 命令"""

        help_message = (

            "可用命令:\n"

            "/on - 開啟 LED (綠色)\n"

            "/off - 關閉 LED (紅色)\n"

            "/flash - LED 閃爍 (紅綠交替)\n"

            "/timer - LED 開啟 10 秒後自動關閉\n"

            "/status - 取得當前溫度和濕度"

        )

        await update.message.reply_text(help_message)



    # --- Telegram 訊息發送器 (已修正: 使用 self.tg_loop) ---


    def send_status_to_telegram(self, text):

        """發送 LED 狀態給 Telegram"""

        # 檢查 self.tg_loop 是否存在且應用程式正在運行

        if self.application and self.application.running and TARGET_CHAT_ID != 0 and self.tg_loop: 

            asyncio.run_coroutine_threadsafe(

                self.application.bot.send_message(chat_id=TARGET_CHAT_ID, text=text), 

                self.tg_loop # <-- 修正: 使用儲存的 loop

            )


    def send_temp_humidity_to_telegram(self, text):

        """發送溫濕度數據給 Telegram"""

        # 檢查 self.tg_loop 是否存在且應用程式正在運行

        if self.application and self.application.running and TARGET_CHAT_ID != 0 and self.tg_loop:

            asyncio.run_coroutine_threadsafe(

                self.application.bot.send_message(chat_id=TARGET_CHAT_ID, text=text), 

                self.tg_loop # <-- 修正: 使用儲存的 loop

            )



# --- 主程式 ---

if __name__ == '__main__':

    # 設置 Python 3.10+ 的 asyncio 相容性 (僅在 Windows 上可能需要)

    try:

        # 僅在主執行緒中設置,防止影響 Tkinter

        if threading.current_thread() is threading.main_thread():

            asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

    except AttributeError:

        pass # 其他作業系統上忽略


    root = tk.Tk()

    app = MqttTkinterApp(root)

    root.mainloop()

 

WOKWI程式

#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <DHT_U.h> // 需要同時包含 DHT_U.h

// --- Wi-Fi 設定 ---
const char* ssid = "Wokwi-GUEST";
const char* password = "";

// --- MQTT 設定 ---
const char* mqtt_server = "broker.mqttgo.io"; // 或 "mqtt.eclipseprojects.io"
const int mqtt_port = 1883;
const char* mqtt_client_id = "ESP32_Wokwi_Client";

// MQTT 主題
const char* mqtt_topic_led_control = "wokwi/led/control";
const char* mqtt_topic_temperature = "wokwi/dht/temperature";
const char* mqtt_topic_humidity = "wokwi/dht/humidity";

WiFiClient espClient;
PubSubClient client(espClient);

// --- LED 設定 ---
const int ledPin = 2; // 連接到 GPIO 2
enum LedMode { ON, OFF, FLASH, TIMER };
volatile LedMode currentLedMode = OFF;
volatile unsigned long timerStartTime = 0;
volatile bool ledState = false; // 用於閃爍模式

// --- DHT22 設定 ---
#define DHTPIN 4      // 連接到 GPIO 4
#define DHTTYPE DHT22 // DHT 22  (AM2302), AM2321
DHT dht(DHTPIN, DHTTYPE);

// --- 任務句柄 ---
TaskHandle_t TaskLEDControl = NULL;
TaskHandle_t TaskDHTSensor = NULL;

// --- 函式宣告 ---
void setup_wifi();
void reconnect_mqtt();
void callback(char* topic, byte* payload, unsigned int length);

void ledControlTask(void *pvParameters);
void dhtSensorTask(void *pvParameters);

void setup() {
  Serial.begin(115200);
  pinMode(ledPin, OUTPUT);
  digitalWrite(ledPin, LOW); // 確保初始是關閉的

  setup_wifi();
  client.setServer(mqtt_server, mqtt_port);
  client.setCallback(callback);

  dht.begin(); // 初始化 DHT 感測器

  // 創建 LED 控制任務,運行在 Core 0
  xTaskCreatePinnedToCore(
    ledControlTask,   /* 任務函式 */
    "LED Control",    /* 任務名稱 */
    2048,             /* 堆疊大小 (字節) */
    NULL,             /* 任務參數 */
    1,                /* 任務優先級 */
    &TaskLEDControl,  /* 任務句柄 */
    0                 /* 運行在 Core 0 */
  );

  // 創建 DHT 感測器任務,運行在 Core 1
  xTaskCreatePinnedToCore(
    dhtSensorTask,    /* 任務函式 */
    "DHT Sensor",     /* 任務名稱 */
    4096,             /* 堆疊大小 (字節) */
    NULL,             /* 任務參數 */
    1,                /* 任務優先級 */
    &TaskDHTSensor,   /* 任務句柄 */
    1                 /* 運行在 Core 1 */
  );
}

void loop() {
  // 主循環中只負責維持 MQTT 連線
  if (!client.connected()) {
    reconnect_mqtt();
  }
  client.loop(); // 處理 MQTT 訊息
  delay(10); // 短暫延遲,避免佔用太多 CPU
}

// --- Wi-Fi 連線函式 ---
void setup_wifi() {
  delay(10);
  Serial.println();
  Serial.print("Connecting to ");
  Serial.println(ssid);

  WiFi.begin(ssid, password);

  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }

  Serial.println("");
  Serial.println("WiFi connected");
  Serial.print("IP address: ");
  Serial.println(WiFi.localIP());
}

// --- MQTT 重連函式 ---
void reconnect_mqtt() {
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    // 嘗試連線
    if (client.connect(mqtt_client_id)) {
      Serial.println("connected");
      // 訂閱 LED 控制主題
      client.subscribe(mqtt_topic_led_control);
      Serial.print("Subscribed to: ");
      Serial.println(mqtt_topic_led_control);
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      // 等待 5 秒後重試
      delay(5000);
    }
  }
}

// --- MQTT 訊息回調函式 ---
void callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Message arrived [");
  Serial.print(topic);
  Serial.print("] ");
  String message = "";
  for (int i = 0; i < length; i++) {
    message += (char)payload[i];
  }
  Serial.println(message);

  if (String(topic) == mqtt_topic_led_control) {
    if (message == "on") {
      currentLedMode = ON;
      digitalWrite(ledPin, HIGH);
      Serial.println("LED Mode: ON");
    } else if (message == "off") {
      currentLedMode = OFF;
      digitalWrite(ledPin, LOW);
      Serial.println("LED Mode: OFF");
    } else if (message == "flash") {
      currentLedMode = FLASH;
      Serial.println("LED Mode: FLASH");
    } else if (message == "timer") {
      currentLedMode = TIMER;
      digitalWrite(ledPin, HIGH); // 定時模式開始時先開啟 LED
      timerStartTime = millis();
      Serial.println("LED Mode: TIMER (10s)");
    } else {
      Serial.println("Unknown LED command.");
    }
  }
}

// --- LED 控制任務 (運行在 Core 0) ---
void ledControlTask(void *pvParameters) {
  (void) pvParameters; // 避免編譯器警告

  for (;;) { // 無限循環
    switch (currentLedMode) {
      case ON:
        // LED 保持亮著,由 callback 函式設置
        break;
      case OFF:
        // LED 保持熄滅,由 callback 函式設置
        break;
      case FLASH:
        digitalWrite(ledPin, ledState);
        ledState = !ledState;
        vTaskDelay(pdMS_TO_TICKS(500)); // 每 500ms 改變一次狀態
        break;
      case TIMER:
        if (millis() - timerStartTime >= 10000) { // 10 秒
          digitalWrite(ledPin, LOW);
          currentLedMode = OFF; // 定時結束後轉為 OFF 模式
          Serial.println("LED Timer finished. LED OFF.");
        }
        vTaskDelay(pdMS_TO_TICKS(10)); // 短暫延遲
        break;
      default:
        digitalWrite(ledPin, LOW); // 預設為關閉
        break;
    }
    vTaskDelay(pdMS_TO_TICKS(10)); // 短暫延遲,讓其他任務有機會執行
  }
}

// --- DHT 感測器任務 (運行在 Core 1) ---
void dhtSensorTask(void *pvParameters) {
  (void) pvParameters; // 避免編譯器警告

  for (;;) { // 無限循環
    delay(2000); // 每 2 秒讀取一次數據,避免頻繁讀取導致錯誤

    float h = dht.readHumidity();
    float t = dht.readTemperature(); // 讀取攝氏溫度

    // 檢查是否讀取失敗,如果是則嘗試重讀
    if (isnan(h) || isnan(t)) {
      Serial.println(F("Failed to read from DHT sensor!"));
    } else {
      Serial.print(F("Humidity: "));
      Serial.print(h);
      Serial.print(F("%  Temperature: "));
      Serial.print(t);
      Serial.println(F("°C"));

      // 發布溫度
      char tempString[8];
      dtostrf(t, 4, 2, tempString); // 浮點數轉字串
      client.publish(mqtt_topic_temperature, tempString);

      // 發布濕度
      char humString[8];
      dtostrf(h, 4, 2, humString); // 浮點數轉字串
      delay(250);
      client.publish(mqtt_topic_humidity, humString);
    }
    vTaskDelay(pdMS_TO_TICKS(5000)); // 每 5 秒執行一次,避免 MQTT 發布過於頻繁
  }
}

沒有留言:

張貼留言

ESP32 (ESP-IDF in VS Code) MFRC522 + MQTT + PYTHON TKinter +SQLite

 ESP32 (ESP-IDF in VS Code) MFRC522 + MQTT + PYTHON TKinter +SQLite  ESP32 VS Code 程式 ; PlatformIO Project Configuration File ; ;   Build op...