2025年11月13日 星期四

Wokwi ESP32 8 Port Input/Output + MQTT + Python + Telegram 控制

  • Wokwi ESP32  8 Port Input/Output + MQTT + Python + Telegram 控制







  • P3-P8 (有內建拉電阻的腳位):
    如果您的按鈕是連接到 VCC (3.3V),按下按鈕時,腳位會保持 HIGH,因此狀態永遠不會改變。P1/P2 (無內建拉電阻的腳位,Pin 34/35): 這些腳位缺乏內建電阻,它們的讀數處於 浮動 狀態,雖然可能偶爾跳變,但不可靠。

  •  

    WOKWI ESP32程式

    #include <WiFi.h>
    #include <PubSubClient.h>
    #include <ArduinoJson.h> // 需要安裝此庫

    // --- Wi-Fi 設定 ---
    const char* ssid = "Wokwi-GUEST";      
    const char* password = "";            

    // --- MQTT 設定 ---
    const char* mqttServer = "broker.mqtt-dashboard.com";
    const int mqttPort = 1883;
    // 輸出 Topic (ESP32 接收)
    const char* mqttTopicOutput = "alex9ufo/8port/LED";
    // 輸入 Topic (ESP32 發送)
    const char* mqttTopicInput = "alex9ufo/8port/INPUT";

    // --- 腳位與 Topic 定義 ---

    // 輸出 (Relay) 腳位對應 (原功能)
    struct RelayPin {
      int port;
      int pin;
    };
    RelayPin relayMap[] = {
      {1, 23}, {2, 22}, {3, 21}, {4, 19},
      {5, 18}, {6, 5},  {7, 17}, {8, 16}
    };
    const int NUM_RELAYS = sizeof(relayMap) / sizeof(relayMap[0]);

    // 輸入 (Input) 腳位對應 (新功能)
    struct InputPin {
      int port;
      int pin;
    };
    InputPin inputMap[] = {
      {1, 34}, {2, 35}, {3, 32}, {4, 33},
      {5, 25}, {6, 26}, {7, 27}, {8, 14}
    };
    const int NUM_INPUTS = sizeof(inputMap) / sizeof(inputMap[0]);

    // 用於儲存上一個輸入狀態,以便只在狀態改變時發送訊息
    int lastInputState[NUM_INPUTS];

    // --- 物件實例化 ---
    WiFiClient espClient;
    PubSubClient client(espClient);

    // --- 函式聲明 ---
    void setup_wifi();
    void reconnect();
    void callback(char* topic, byte* payload, unsigned int length);
    void publishInputStates();

    // --- Setup ---
    void setup() {
      Serial.begin(115200);

      // 初始化所有輸出腳位
      for (int i = 0; i < NUM_RELAYS; i++) {
        pinMode(relayMap[i].pin, OUTPUT);
        digitalWrite(relayMap[i].pin, LOW);
      }
     
      // 初始化所有輸入腳位並開啟上拉電阻 (如果需要)
      for (int i = 0; i < NUM_INPUTS; i++) {
        // 數位輸入腳位 (例如連接按鈕)
        pinMode(inputMap[i].pin, INPUT_PULLUP); // 設為 INPUT_PULLUP,未按下時讀取 HIGH
        lastInputState[i] = digitalRead(inputMap[i].pin); // 記錄初始狀態
      }
     
      setup_wifi();
      client.setServer(mqttServer, mqttPort);
      client.setCallback(callback);
    }

    // --- Loop ---
    void loop() {
      if (!client.connected()) {
        reconnect();
      }
      client.loop();
     
      // **新功能:監測輸入狀態並發送 MQTT 訊息**
      publishInputStates();

      // 短暫延遲以避免 CPU 佔用過高
      delay(50);
    }

    // --- MQTT 訊息接收與處理 (處理 alex9ufo/8port/LED) ---
    void callback(char* topic, byte* payload, unsigned int length) {
      Serial.print("Message arrived [");
      Serial.print(topic);
      Serial.print("] ");
     
      // 處理 Payload ... (與前一個版本相同)
      char message[length + 1];
      strncpy(message, (char*)payload, length);
      message[length] = '\0';
      Serial.println(message);

      if (message[0] == 'P' && (length == 4 || length == 5)) {
        int port_num = message[1] - '0';
       
        if (port_num >= 1 && port_num <= 8) {
          int target_pin = -1;
          for (int i = 0; i < NUM_RELAYS; i++) {
            if (relayMap[i].port == port_num) {
              target_pin = relayMap[i].pin;
              break;
            }
          }

          if (target_pin != -1) {
            int state = -1;
            if (length == 4 && strcmp(&message[2], "ON") == 0) {
              state = HIGH;
            } else if (length == 5 && strcmp(&message[2], "OFF") == 0) {
              state = LOW;
            }

            if (state != -1) {
              digitalWrite(target_pin, state);
              Serial.printf("Output P%d (Pin %d) set to %s\n", port_num, target_pin, (state == HIGH ? "HIGH/ON" : "LOW/OFF"));
            }
          }
        }
      }
    }

    // --- 發送輸入狀態到 MQTT ---
    void publishInputStates() {
      static unsigned long lastSendTime = 0;
      const long debounceDelay = 100; // 消除按鈕抖動的延遲
     
      bool stateChanged = false;
     
      // 1. 讀取並檢查狀態是否改變
      for (int i = 0; i < NUM_INPUTS; i++) {
        int current_state = digitalRead(inputMap[i].pin);
       
        // 檢查狀態是否改變 (同時處理按鈕抖動)
        if (current_state != lastInputState[i] && (millis() - lastSendTime > debounceDelay)) {
          lastInputState[i] = current_state;
          stateChanged = true;
        }
      }

      // 2. 如果狀態改變且 MQTT 已連接,則發送訊息
      if (stateChanged && client.connected()) {
        lastSendTime = millis();
       
        // 使用 ArduinoJson 建立 JSON 格式訊息
        const size_t capacity = JSON_OBJECT_SIZE(NUM_INPUTS);
        StaticJsonDocument<capacity> doc;

        for (int i = 0; i < NUM_INPUTS; i++) {
          // 腳位讀取到 HIGH/LOW。我們將其轉為 1/0
          int state = digitalRead(inputMap[i].pin);
          // 由於使用了 INPUT_PULLUP,
          // 按鈕按下時讀取 LOW (0),釋放時讀取 HIGH (1)。
          // 這裡直接發送讀取到的值。
         
          char key[4];
          sprintf(key, "P%d", inputMap[i].port);
          doc[key] = state;
        }
       
        char output[256];
        serializeJson(doc, output);

        client.publish(mqttTopicInput, output);
        Serial.printf("📢 發送輸入狀態: %s\n", output);
      }
    }

    // --- Wi-Fi 連接 ---
    void setup_wifi() {
      // ... (保持不變)
      delay(10);
      Serial.println("\nConnecting to WiFi...");
      WiFi.begin(ssid, password);

      while (WiFi.status() != WL_CONNECTED) {
        delay(500);
        Serial.print(".");
      }

      Serial.println("\nWiFi connected.");
      Serial.print("IP address: ");
      Serial.println(WiFi.localIP());
    }

    // --- MQTT 重連 ---
    void reconnect() {
      // ... (保持不變)
      while (!client.connected()) {
        Serial.print("Attempting MQTT connection...");
        if (client.connect("ESP32_Monitor_Client")) {
          Serial.println("connected");
          // 訂閱輸出 Topic
          client.subscribe(mqttTopicOutput);
          Serial.print("Subscribed to Output: ");
          Serial.println(mqttTopicOutput);
         
          // 重新連線後,立即發送當前輸入狀態
          publishInputStates();
        } else {
          Serial.print("failed, rc=");
          Serial.print(client.state());
          Serial.println(" try again in 5 seconds");
          delay(5000);
        }
      }
    }

    PYTHON程式

    import tkinter as tk

    from tkinter import ttk, messagebox

    import paho.mqtt.client as mqtt

    import json

    import threading

    import time


    # --- Telegram 設定 (!!!請替換為您的資訊!!!) ---

    BOT_TOKEN = "8102227020986:AAGymymK9_d1HcTGJWl3mtqHmilxB64_5Zw" #"YOUR_BOT_TOKEN"  # 替換為您的 Bot Token

    CHAT_ID = 79651218469  #"YOUR_CHAT_ID"      # 替換為您的目標 Chat ID (接收通知的對象)

    BOT_USERNAME = "@alextest999_bot"#"YOUR_TELEGRAM_BOT_USERNAME" # 您的 Bot 使用者名稱



    TELEGRAM_ENABLED = True


    try:

        from telegram import Update

        from telegram.ext import Application, CommandHandler, ContextTypes

    except ImportError:

        print("❌ Telegram 庫未安裝。請執行 'pip install python-telegram-bot'。將禁用 Telegram 功能。")

        TELEGRAM_ENABLED = False



    # --- MQTT 設定 ---

    MQTT_BROKER = "broker.mqtt-dashboard.com"

    MQTT_PORT = 1883

    MQTT_TOPIC_OUTPUT = "alex9ufo/8port/LED"

    MQTT_TOPIC_INPUT = "alex9ufo/8port/INPUT"

    CLIENT_ID = "Tkinter_Controller_Monitor_Final"


    # --- 腳位對應 ---

    RELAY_MAP = {1: 23, 2: 22, 3: 21, 4: 19, 5: 18, 6: 5, 7: 17, 8: 16}

    INPUT_MAP = {1: 34, 2: 35, 3: 32, 4: 33, 5: 25, 6: 26, 7: 27, 8: 14}



    # --- 狀態追蹤與顏色/文字定義 ---

    # 使用字典追蹤當前狀態 (P1: 0/1)

    CURRENT_LED_STATE = {f'P{i}': 0 for i in range(1, 9)} 

    CURRENT_INPUT_STATE = {f'P{i}': 0 for i in range(1, 9)} 


    COLOR_ON = "green"           

    COLOR_OFF = "SystemButtonFace" 

    COLOR_INPUT_HIGH = "green"   

    COLOR_INPUT_LOW = "lightgray" 

    TEXT_HIGH = "HIGH"

    TEXT_LOW = "LOW"


    # --- 全域 Tkinter 變數 ---

    root = tk.Tk()

    status_label = None 

    input_leds = {} 

    buttons = {}

    application = None # Telegram Bot Application



    # --- Telegram 處理函式 ---


    def format_status_message(title, state_dict):

        """格式化 LED 或 INPUT 狀態訊息."""

        

        # 建立一個包含 Port/Pin/State 的列表

        lines = [f"--- {title} 狀態 ---"]

        for i in range(1, 9):

            port_key = f'P{i}'

            pin_num = RELAY_MAP[i] if 'LED' in title else INPUT_MAP[i]

            

            # 根據狀態決定顯示文字

            state_value = state_dict.get(port_key, 0)

            state_text = TEXT_HIGH if state_value == 1 else TEXT_LOW

            

            lines.append(f"{port_key} (Pin {pin_num}): {state_text}")

        

        return "\n".join(lines)



    async def send_telegram_message(message: str):

        """將訊息發送到 Telegram Bot."""

        if TELEGRAM_ENABLED and application:

            try:

                # 確保 Chat ID 是一個字串

                await application.bot.send_message(chat_id=str(CHAT_ID), text=message)

                print(f"📧 Telegram 訊息已發送: {message.splitlines()[0]}...")

            except Exception as e:

                print(f"❌ Telegram 發送失敗: {e}")



    async def led_status_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:

        """處理 /led 命令,回覆 LED 狀態."""

        msg = format_status_message("當前 LED (輸出)", CURRENT_LED_STATE)

        await update.message.reply_text(msg)


    async def input_status_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:

        """處理 /input 命令,回覆 INPUT 狀態."""

        msg = format_status_message("當前 INPUT (輸入)", CURRENT_INPUT_STATE)

        await update.message.reply_text(msg)



    def run_telegram_bot():

        """在獨立線程中運行 Telegram Bot 服務."""

        global application

        if not TELEGRAM_ENABLED:

            return


        try:

            application = Application.builder().token(BOT_TOKEN).build()


            application.add_handler(CommandHandler("led", led_status_command))

            application.add_handler(CommandHandler("input", input_status_command))


            # 以非阻塞方式運行 Bot

            application.run_polling(poll_interval=1.0)

        except Exception as e:

            print(f"❌ Telegram Bot 啟動失敗: {e}")



    # --- 狀態改變檢查與通知 ---


    def check_and_notify(new_state: dict, old_state: dict, title: str):

        """檢查狀態變化並發送 Telegram 通知."""

        changed = False

        for port, new_value in new_state.items():

            if port in old_state and new_value != old_state[port]:

                changed = True

                break

                

        if changed:

            message = format_status_message(f"{title} 狀態更新", new_state)

            # 在新的線程中發送 Telegram 訊息,避免阻塞

            threading.Thread(target=lambda: application.loop.run_until_complete(send_telegram_message(message))).start()

            

        old_state.update(new_state) # 更新狀態



    # --- MQTT 客戶端回調函式 ---


    def on_connect(client, userdata, flags, rc, properties=None):

        """當客戶端連接到 Broker 時被調用。"""

        if rc == 0:

            print("💡 MQTT 連接成功!")

            root.after(0, lambda: status_label.config(text=f"已連接到 {MQTT_BROKER}", foreground="green"))

            

            client.subscribe(MQTT_TOPIC_INPUT, qos=0)

            print(f"✅ 訂閱 Topic: {MQTT_TOPIC_INPUT}")

            

        else:

            print(f"❌ MQTT 連接失敗,返回碼: {rc}")

            root.after(0, lambda: status_label.config(text=f"連接失敗 (Code {rc})", foreground="red"))


    def on_disconnect(client, userdata, rc, properties=None):

        """當客戶端從 Broker 斷開連接時被調用。"""

        print(f"🔌 MQTT 斷開連接,返回碼: {rc}")

        root.after(0, lambda: status_label.config(text="已斷開連接", foreground="gray"))


    def on_message_input(client, userdata, msg):

        """處理來自 ESP32 的輸入狀態訊息 (alex9ufo/8port/INPUT)。"""

        payload = msg.payload.decode()

        

        try:

            states = json.loads(payload)

            

            # 1. 檢查並發送 Telegram 通知

            check_and_notify(states, CURRENT_INPUT_STATE, "INPUT (輸入)")

            

            # 2. 在主線程中更新 GUI

            root.after(0, lambda: update_input_leds_gui(states))

            

        except Exception as e:

            print(f"處理輸入訊息錯誤: {e}")



    def update_input_leds_gui(states):

        """根據收到的 JSON 數據更新 GUI 上的輸入狀態指示燈的顏色和文字。"""

        for port, state in states.items():

            if port in input_leds:

                if state == 1:

                    color = COLOR_INPUT_HIGH

                    text = TEXT_HIGH

                else:

                    color = COLOR_INPUT_LOW

                    text = TEXT_LOW

                    

                input_leds[port].config(bg=color, text=text)



    # --- GUI 輸出控制邏輯 ---


    def toggle_relay(port, action):

        """根據按鈕點擊發送 MQTT 訊息並更新顏色。"""

        payload = f"P{port}{action}"

        

        if not client.is_connected():

            messagebox.showwarning("連線狀態", "MQTT 客戶端未連接,請檢查網路。")

            return

            

        try:

            result = client.publish(MQTT_TOPIC_OUTPUT, payload, qos=0)

            

            if result.rc == mqtt.MQTT_ERR_SUCCESS:

                new_state_value = 1 if action == "ON" else 0

                port_key = f'P{port}'


                # 1. 檢查並發送 Telegram 通知

                temp_new_state = CURRENT_LED_STATE.copy()

                temp_new_state[port_key] = new_state_value

                check_and_notify(temp_new_state, CURRENT_LED_STATE, "LED (輸出)")


                # 2. 更新 GUI

                if action == "ON":

                    buttons[port]['on'].config(relief=tk.SUNKEN, bg=COLOR_ON)

                    buttons[port]['off'].config(relief=tk.RAISED, bg=COLOR_OFF) 

                else: # OFF

                    buttons[port]['on'].config(relief=tk.RAISED, bg=COLOR_OFF)

                    buttons[port]['off'].config(relief=tk.SUNKEN, bg=COLOR_OFF) 

                

        except Exception as e:

            messagebox.showerror("MQTT 錯誤", f"無法發送訊息: {e}")



    # --- 主程式設定與 GUI 佈局 ---

    root.title("MQTT 8-Port Control & Monitor")


    # 初始化 MQTT 客戶端

    client = mqtt.Client(client_id=CLIENT_ID, protocol=mqtt.MQTTv5) 

    client.on_connect = on_connect

    client.on_disconnect = on_disconnect

    client.message_callback_add(MQTT_TOPIC_INPUT, on_message_input)


    # GUI 佈局

    main_frame = ttk.Frame(root, padding="10")

    main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))


    # 狀態標籤

    status_label = tk.Label(main_frame, text="初始化中...", font=("Arial", 10), foreground="gray")

    status_label.grid(row=0, column=0, columnspan=5, pady=5)


    # --- 輸出控制區塊 (LED/Relay) ---

    output_frame = ttk.LabelFrame(main_frame, text="OUTPUT Control (Relay/LED)", padding="10")

    output_frame.grid(row=1, column=0, padx=10, pady=10, sticky=tk.W)


    for port in range(1, 9):

        row_num = port

        pin_num = RELAY_MAP[port]

        ttk.Label(output_frame, text=f"Port P{port} (Pin {pin_num}):", width=15, anchor="w").grid(row=row_num, column=0, sticky=tk.W, padx=5, pady=2)

        

        on_btn = tk.Button(output_frame, text="ON", command=lambda p=port: toggle_relay(p, "ON"), width=8, bg=COLOR_OFF)

        on_btn.grid(row=row_num, column=1, padx=5, pady=2)

        

        off_btn = tk.Button(output_frame, text="OFF", command=lambda p=port: toggle_relay(p, "OFF"), width=8, relief=tk.SUNKEN, bg=COLOR_OFF) 

        off_btn.grid(row=row_num, column=2, padx=5, pady=2)

        

        buttons[port] = {'on': on_btn, 'off': off_btn}


    # --- 輸入狀態監控區塊 ---

    input_frame = ttk.LabelFrame(main_frame, text="INPUT Monitor (Pin State)", padding="10")

    input_frame.grid(row=1, column=1, padx=10, pady=10, sticky=tk.W)


    for port in range(1, 9):

        port_key = f"P{port}"

        pin_num = INPUT_MAP[port]

        row_num = port

        

        ttk.Label(input_frame, text=f"Input {port_key} (Pin {pin_num}):", width=15, anchor="w").grid(row=row_num, column=0, sticky=tk.W, padx=5, pady=2)

        

        led = tk.Label(input_frame, text=TEXT_LOW, width=4, relief=tk.RAISED, bg=COLOR_INPUT_LOW)

        led.grid(row=row_num, column=1, padx=5, pady=2)

        

        input_leds[port_key] = led 


    # --- 啟動 MQTT 和 Telegram ---

    try:

        # 啟動 Telegram Bot (在獨立線程中)

        if TELEGRAM_ENABLED:

            telegram_thread = threading.Thread(target=run_telegram_bot)

            telegram_thread.daemon = True # 確保主程式退出時 Bot 線程也會退出

            telegram_thread.start()

            

        # 啟動 MQTT

        client.connect(MQTT_BROKER, MQTT_PORT, 60)

        client.loop_start() 

        

    except Exception as e:

        messagebox.showerror("連線錯誤", f"無法連接到 MQTT Broker 或啟動 Bot: {e}")

        if status_label:

            status_label.config(text="連線錯誤,請檢查網路", foreground="red")



    # 運行主循環並確保程式退出時關閉所有線程

    def on_closing():

        print("👋 關閉應用程式...")

        client.loop_stop()

        client.disconnect()

        

        if TELEGRAM_ENABLED and application:

            # 停止 Telegram Bot 的 polling

            application.stop_running() 

            

        root.destroy()


    root.protocol("WM_DELETE_WINDOW", on_closing)

    root.mainloop()


    沒有留言:

    張貼留言

    ESP32 (ESP-IDF in VS Code) MFRC522 + MQTT + PYTHON TKinter +SQLite

     ESP32 (ESP-IDF in VS Code) MFRC522 + MQTT + PYTHON TKinter +SQLite  ESP32 VS Code 程式 ; PlatformIO Project Configuration File ; ;   Build op...