2025年7月8日 星期二

ESP32 雙核心控制 LED 與 DHT22 溫濕度感測器 (Wokwi 模擬) EX3 --Python + Telegram

 Telegram Bot (Python)MQTT Broker 搭配 Wokwi ESP32 (Arduino)

提供的 ESP32 程式碼已經是為了這種架構設計的,它透過 MQTT Broker 與外部通訊。而 Telegram Bot (用 Python aiogram 寫的) 則負責將 Telegram 訊息轉換成 MQTT 訊息,並將來自 ESP32 的 MQTT 數據回傳給 Telegram。

主要的「控制」邏輯是在 Telegram Bot 的 Python 程式中實現的,因為它負責解析你的 Telegram 指令 (/led_on 等) 並將其發送到 MQTT Broker,以及接收來自 ESP32 的數據並發送回 Telegram。


 Wokwi ESP32 程式碼 

請確保你的 Wokwi sketch.ino 檔案中是以下內容。這個程式碼會讓 ESP32 連接到 MQTT Broker,訂閱 LED 控制指令,並發布溫濕度數據。

C++
#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <DHT_U.h> // 需要同時包含 DHT_U.h

// --- Wi-Fi 設定 ---
const char* ssid = "Wokwi-GUEST"; // 如果在 Wokwi 上測試,請使用 "Wokwi-GUEST"
const char* password = "";        // 對於 Wokwi-GUEST,密碼為空

// --- MQTT 設定 ---
const char* mqtt_server = "broker.mqttgo.io"; //"broker.hivemq.com"; 或 "mqtt.eclipseprojects.io"
const int mqtt_port = 1883;
// 重要的:為你的 ESP32 指定一個獨特的 MQTT 客戶端 ID,以避免衝突
const char* mqtt_client_id = "ESP32_Wokwi_Telegram_IoT_YourName_001"; // <<< 請更換為你的唯一 ID!

// --- MQTT 主題定義 ---
// 這些主題名稱必須與你的 Python Telegram Bot 程式碼完全一致!
const char* TELEGRAM_MQTT_TOPIC_LED_CONTROL = "telegram_iot/esp32/led_control";   // Telegram Bot 發布 LED 指令到這裡
const char* TELEGRAM_MQTT_TOPIC_TEMPERATURE = "telegram_iot/esp32/temperature";   // ESP32 發布溫度到這裡
const char* TELEGRAM_MQTT_TOPIC_HUMIDITY = "telegram_iot/esp32/humidity";     // ESP32 發布濕度到這裡
const char* TELEGRAM_MQTT_TOPIC_STATUS = "telegram_iot/esp32/status";       // ESP32 發布上線狀態到這裡 (可選)

// --- WiFi 和 MQTT 客戶端物件 ---
WiFiClient espClient;
PubSubClient client(espClient);

// --- LED 設定 ---
const int ledPin = 2; // 連接到 ESP32 的 GPIO 2
enum LedMode { OFF, ON, FLASH, TIMER }; // OFF 放在最前面,作為預設安全值
volatile LedMode currentLedMode = OFF;
volatile unsigned long timerStartTime = 0;
volatile bool ledState = false; // 用於閃爍模式

// --- DHT22 感測器設定 ---
#define DHTPIN 4      // 連接到 ESP32 的 GPIO 4
#define DHTTYPE DHT22 // DHT 22 (AM2302)
DHT dht(DHTPIN, DHTTYPE);

// --- FreeRTOS 任務句柄 ---
TaskHandle_t TaskLEDControl = NULL;
TaskHandle_t TaskDHTSensor = NULL;

// --- 函式宣告 ---
void setup_wifi();
void reconnect_mqtt();
void mqtt_callback(char* topic, byte* payload, unsigned int length);
void ledControlTask(void *pvParameters);
void dhtSensorTask(void *pvParameters);

void setup() {
  Serial.begin(115200);
  pinMode(ledPin, OUTPUT);
  digitalWrite(ledPin, LOW); // 確保 LED 初始是關閉的

  Serial.println("\n--- ESP32 啟動中 ---");
  Serial.println("連接 Wi-Fi...");
  setup_wifi(); // 連接 Wi-Fi

  client.setServer(mqtt_server, mqtt_port); // 設定 MQTT Broker
  client.setCallback(mqtt_callback);       // 設定 MQTT 訊息回調函式

  dht.begin(); // 初始化 DHT 感測器

  // 創建 LED 控制任務,運行在 Core 0
  xTaskCreatePinnedToCore(
    ledControlTask,    /* 任務函式 */
    "LED Control",     /* 任務名稱 */
    2048,              /* 堆疊大小 (字節) */
    NULL,              /* 任務參數 */
    1,                 /* 任務優先級 */
    &TaskLEDControl,   /* 任務句柄 */
    0                  /* 運行在 Core 0 */
  );
  Serial.println("LED 控制任務已在 Core 0 創建。");

  // 創建 DHT 感測器任務,運行在 Core 1
  xTaskCreatePinnedToCore(
    dhtSensorTask,     /* 任務函式 */
    "DHT Sensor",      /* 任務名稱 */
    4096,              /* 堆疊大小 (字節) */
    NULL,              /* 任務參數 */
    1,                 /* 任務優先級 */
    &TaskDHTSensor,    /* 任務句柄 */
    1                  /* 運行在 Core 1 */
  );
  Serial.println("DHT 感測器任務已在 Core 1 創建。");
  Serial.println("--- 設定完成 ---");
  Serial.println("等待 MQTT 連線...");
}

void loop() {
  // 主循環中只負責維持 MQTT 連線和處理 MQTT 訊息
  if (!client.connected()) {
    reconnect_mqtt();
  }
  client.loop(); // 處理所有傳入和傳出的 MQTT 訊息
  delay(10); // 短暫延遲,避免佔用過多 CPU 資源
}

// --- Wi-Fi 連線函式 ---
void setup_wifi() {
  delay(10);
  WiFi.begin(ssid, password);

  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }
  Serial.println("\nWiFi 已連線!");
  Serial.print("IP 位址: ");
  Serial.println(WiFi.localIP());
}

// --- MQTT 重連函式 ---
void reconnect_mqtt() {
  while (!client.connected()) {
    Serial.print("嘗試 MQTT 連線...");
    // 嘗試連線
    if (client.connect(mqtt_client_id)) {
      Serial.println("已連線!");
      // 連線成功後,訂閱 LED 控制主題,接收來自 Telegram Bot 的指令
      client.subscribe(TELEGRAM_MQTT_TOPIC_LED_CONTROL);
      Serial.print("已訂閱: ");
      Serial.println(TELEGRAM_MQTT_TOPIC_LED_CONTROL);

      // 可選:發布上線狀態,讓 Telegram Bot 知道 ESP32 已啟動
      client.publish(TELEGRAM_MQTT_TOPIC_STATUS, "ESP32_online");
      Serial.println("已發布 ESP32 上線狀態。");
      Serial.println("\n--- Telegram 控制指令提示 ---");
      Serial.println("你可以使用以下 Telegram 指令:");
      Serial.println("  /led_on - 開啟 LED");
      Serial.println("  /led_off - 關閉 LED");
      Serial.println("  /led_flash - 讓 LED 閃爍");
      Serial.println("  /led_timer - LED 亮 10 秒後關閉");
      Serial.println("  /status - 查詢溫度/濕度/狀態");
      Serial.println("----------------------------------");
    } else {
      Serial.print("連線失敗, 返回碼=");
      Serial.print(client.state());
      Serial.println(" 5 秒後重試...");
      delay(5000); // 等待 5 秒後重試
    }
  }
}

// --- MQTT 訊息回調函式 ---
// 當收到訂閱主題的訊息時,此函式會被調用
void mqtt_callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("\n收到訊息 [");
  Serial.print(topic);
  Serial.print("] ");
  String message = "";
  for (int i = 0; i < length; i++) {
    message += (char)payload[i];
  }
  Serial.println(message);

  // 檢查是否為 LED 控制指令
  if (String(topic) == TELEGRAM_MQTT_TOPIC_LED_CONTROL) {
    if (message == "on") {
      currentLedMode = ON;
      digitalWrite(ledPin, HIGH);
      Serial.println("LED 控制: 開啟 (來自 Telegram)");
    } else if (message == "off") {
      currentLedMode = OFF;
      digitalWrite(ledPin, LOW);
      Serial.println("LED 控制: 關閉 (來自 Telegram)");
    } else if (message == "flash") {
      currentLedMode = FLASH;
      Serial.println("LED 控制: 閃爍 (來自 Telegram)");
    } else if (message == "timer") {
      currentLedMode = TIMER;
      digitalWrite(ledPin, HIGH); // 定時模式開始時先開啟 LED
      timerStartTime = millis();
      Serial.println("LED 控制: 定時 (10 秒,來自 Telegram)");
    } else {
      Serial.println("收到來自 Telegram 的未知 LED 指令。");
    }
  }
}

// --- LED 控制任務 (運行在 Core 0) ---
void ledControlTask(void *pvParameters) {
  (void) pvParameters; // 避免編譯器警告

  for (;;) { // 無限循環
    switch (currentLedMode) {
      case ON:
        // LED 保持亮著,狀態由 mqtt_callback 設置
        break;
      case OFF:
        // LED 保持熄滅,狀態由 mqtt_callback 設置
        break;
      case FLASH:
        digitalWrite(ledPin, ledState); // 切換 LED 狀態
        ledState = !ledState;           // 反轉狀態,用於下次切換
        vTaskDelay(pdMS_TO_TICKS(500)); // 延遲 500ms
        break;
      case TIMER:
        if (millis() - timerStartTime >= 10000) { // 檢查是否已過 10 秒
          digitalWrite(ledPin, LOW);
          currentLedMode = OFF; // 定時結束後轉為 OFF 模式
          Serial.println("LED 定時結束。LED 關閉。");
        }
        vTaskDelay(pdMS_TO_TICKS(10)); // 短暫延遲,讓出 CPU
        break;
      default:
        digitalWrite(ledPin, LOW); // 預設為關閉
        break;
    }
    vTaskDelay(pdMS_TO_TICKS(10)); // 短暫延遲,讓其他任務有機會執行
  }
}

// --- DHT 感測器任務 (運行在 Core 1) ---
void dhtSensorTask(void *pvParameters) {
  (void) pvParameters; // 避免編譯器警告

  for (;;) { // 無限循環
    // 定期讀取並發布
    float h = dht.readHumidity();
    float t = dht.readTemperature(); // 讀取攝氏溫度

    // 檢查是否讀取失敗
    if (isnan(h) || isnan(t)) {
      Serial.println(F("從 DHT 感測器讀取失敗!重試中..."));
    } else {
      Serial.print(F("DHT 讀取: 濕度: "));
      Serial.print(h);
      Serial.print(F("%  溫度: "));
      Serial.print(t);
      Serial.println(F("°C"));

      // 發布溫度
      char tempString[8];
      dtostrf(t, 4, 2, tempString); // 浮點數轉字串 (總共 4 位,小數點後 2 位)
      client.publish(TELEGRAM_MQTT_TOPIC_TEMPERATURE, tempString);

      // 發布濕度
      char humString[8];
      dtostrf(h, 4, 2, humString);
      client.publish(TELEGRAM_MQTT_TOPIC_HUMIDITY, humString);
    }
    vTaskDelay(pdMS_TO_TICKS(10000)); // 每 10 秒發布一次數據 (可調整)
  }
}

Telegram Bot 後端程式碼 (Python aiogram) - 這個是關鍵!

這個 Python 程式碼就是負責讓你的 Telegram 訊息與 ESP32 溝通的「大腦」。它需要運行在一個可以連網的環境中。

運行前準備:

  1. 取得你的 Telegram Bot Token

    • 在 Telegram App 裡,搜尋並找到 BotFather

    • 傳送 /newbot 指令,按照步驟建立你的 Bot,你會得到一個 HTTP API Token (一串像 123456789:ABCDE... 的字串)。請務必記下這個 Token。

  2. 安裝 Python 函式庫: 打開你的終端機或命令提示字元,執行:

    Bash
    pip install aiogram paho-mqtt
    
  3. Python 程式碼:將以下程式碼儲存為 telegram_bot_controller.py請務必將 YOUR_TELEGRAM_BOT_TOKEN 替換為你實際的 Token!

Python
import asyncio
import logging
import os
import paho.mqtt.client as mqtt
from aiogram import Bot, Dispatcher, types
from aiogram.client.default import DefaultBotProperties # aiogram 3.7.0+ 需要
from aiogram.enums import ParseMode
from aiogram.filters import CommandStart, Command
from aiogram.utils.markdown import hbold

# --- Log 設定 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Telegram Bot 設定 ---
# 從環境變數獲取 Bot Token,若無則使用預設值 (在正式環境中,強烈建議使用環境變數)
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "YOUR_TELEGRAM_BOT_TOKEN") # <<< 請替換成你的 Telegram Bot Token!

# 初始化 Bot,並設定預設的解析模式為 HTML
bot = Bot(token=TELEGRAM_BOT_TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
dp = Dispatcher() # 初始化 Dispatcher

# --- MQTT 設定 (必須與 ESP32 程式碼中的主題名稱一致) ---
MQTT_BROKER = "broker.hivemq.com"
MQTT_PORT = 1883
MQTT_CLIENT_ID = "Telegram_Bot_Backend_Client_YourName_001" # <<< 這個 Bot 也需要一個獨特的 MQTT 客戶端 ID!

# MQTT 主題 (與 ESP32 程式碼一致)
TELEGRAM_MQTT_TOPIC_LED_CONTROL = "telegram_iot/esp32/led_control"
TELEGRAM_MQTT_TOPIC_TEMPERATURE = "telegram_iot/esp32/temperature"
TELEGRAM_MQTT_TOPIC_HUMIDITY = "telegram_iot/esp32/humidity"
TELEGRAM_MQTT_TOPIC_STATUS = "telegram_iot/esp32/status"

# --- 全域變數,用於儲存最新的感測器數據和設備狀態 ---
latest_temperature = "N/A"
latest_humidity = "N/A"
esp32_status = "Offline" # 初始狀態

# --- MQTT 回調函式 ---
def on_connect(client, userdata, flags, rc):
    """當 MQTT 客戶端連接到 Broker 時調用"""
    if rc == 0:
        logging.info(f"成功連接到 MQTT Broker: {MQTT_BROKER}")
        # 連接成功後,訂閱溫濕度數據主題和設備狀態主題
        client.subscribe(TELEGRAM_MQTT_TOPIC_TEMPERATURE)
        client.subscribe(TELEGRAM_MQTT_TOPIC_HUMIDITY)
        client.subscribe(TELEGRAM_MQTT_TOPIC_STATUS)
        logging.info(f"已訂閱主題: {TELEGRAM_MQTT_TOPIC_TEMPERATURE}, {TELEGRAM_MQTT_TOPIC_HUMIDITY}, {TELEGRAM_MQTT_TOPIC_STATUS}")
    else:
        logging.error(f"無法連接到 MQTT Broker, 返回碼: {rc}")

def on_message(client, userdata, msg):
    """當收到訂閱主題的訊息時調用"""
    global latest_temperature, latest_humidity, esp32_status
    topic = msg.topic
    payload = msg.payload.decode()
    logging.info(f"MQTT 訊息已接收 - 主題: '{topic}', 內容: '{payload}'")

    if topic == TELEGRAM_MQTT_TOPIC_TEMPERATURE:
        latest_temperature = payload
    elif topic == TELEGRAM_MQTT_TOPIC_HUMIDITY:
        latest_humidity = payload
    elif topic == TELEGRAM_MQTT_TOPIC_STATUS:
        esp32_status = payload
        logging.info(f"ESP32 狀態已更新: {esp32_status}")

# --- 初始化 MQTT 客戶端 ---
mqtt_client = mqtt.Client(client_id=MQTT_CLIENT_ID)
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message

try:
    mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
    mqtt_client.loop_start() # 在背景執行緒中處理 MQTT 訊息,以免阻塞 Bot
except Exception as e:
    logging.error(f"連接 MQTT Broker 失敗: {e}")

# --- Telegram Bot 命令處理函式 ---

@dp.message(CommandStart())
async def command_start_handler(message: types.Message) -> None:
    """處理 /start 命令"""
    await message.answer(f"哈囉,{hbold(message.from_user.full_name)}!\n"
                         "我是你的物聯網助理。\n"
                         "你可以輸入以下指令來控制設備或查詢狀態:\n"
                         "/led_on - 開啟 LED\n"
                         "/led_off - 關閉 LED\n"
                         "/led_flash - 讓 LED 閃爍\n"
                         "/led_timer - LED 亮 10 秒後自動關閉\n"
                         "/status - 查詢溫濕度與設備狀態\n"
                         "/help - 查看所有指令")

@dp.message(Command("help"))
async def command_help_handler(message: types.Message) -> None:
    """處理 /help 命令"""
    await message.answer("你可以輸入以下指令來控制設備或查詢狀態:\n"
                         "/led_on - 開啟 LED\n"
                         "/led_off - 關閉 LED\n"
                         "/led_flash - 讓 LED 閃爍\n"
                         "/led_timer - LED 亮 10 秒後自動關閉\n"
                         "/status - 查詢溫濕度與設備狀態\n")

@dp.message(Command("led_on"))
async def command_led_on_handler(message: types.Message) -> None:
    """處理 /led_on 命令,開啟 LED"""
    mqtt_client.publish(TELEGRAM_MQTT_TOPIC_LED_CONTROL, "on")
    await message.answer("💡 已送出開燈指令!")

@dp.message(Command("led_off"))
async def command_led_off_handler(message: types.Message) -> None:
    """處理 /led_off 命令,關閉 LED"""
    mqtt_client.publish(TELEGRAM_MQTT_TOPIC_LED_CONTROL, "off")
    await message.answer("💡 已送出關燈指令!")

@dp.message(Command("led_flash"))
async def command_led_flash_handler(message: types.Message) -> None:
    """處理 /led_flash 命令,讓 LED 閃爍"""
    mqtt_client.publish(TELEGRAM_MQTT_TOPIC_LED_CONTROL, "flash")
    await message.answer("💡 已送出閃爍指令!")

@dp.message(Command("led_timer"))
async def command_led_timer_handler(message: types.Message) -> None:
    """處理 /led_timer 命令,LED 亮 10 秒後自動關閉"""
    mqtt_client.publish(TELEGRAM_MQTT_TOPIC_LED_CONTROL, "timer")
    await message.answer("💡 已送出定時關燈指令 (10秒後自動關閉)。")

@dp.message(Command("status"))
async def command_status_handler(message: types.Message) -> None:
    """處理 /status 命令,查詢溫濕度與設備狀態"""
    # 從全域變數獲取最新的溫濕度數據和設備狀態
    response_text = (
        f"🏡 環境數據:\n"
        f"目前溫度: {latest_temperature}°C\n"
        f"目前濕度: {latest_humidity}%\n"
        f"設備狀態: {esp32_status}"
    )
    await message.answer(response_text)

@dp.message()
async def echo_handler(message: types.Message) -> None:
    """處理所有其他非命令的文字訊息"""
    await message.answer("抱歉,我不太明白您的意思。\n"
                         "請使用指令,例如 /led_on 或 /status。")

# --- 啟動 Bot ---
async def main() -> None:
    logging.info("啟動 Telegram Bot...")
    # 運行 Bot,處理所有傳入更新
    await dp.start_polling(bot)

if __name__ == "__main__":
    asyncio.run(main())

操作步驟

  1. 啟動 Wokwi ESP32

    • 在 Wokwi 上載入並運行上面的 ESP32 Arduino 程式碼

    • 確認 電路圖  正確。

    • 檢查 Wokwi 的 Serial Monitor,確保 ESP32 成功連接 Wi-Fi 和 MQTT Broker,並開始發布溫濕度數據。你也會看到它印出 Telegram 控制的提示指令。

  2. 運行 Telegram Bot 後端 (Python)

    • 確保你已經用 pip install aiogram paho-mqtt 安裝了必要的函式庫。

    • 將上面的 Python Telegram Bot 後端程式碼 儲存為 telegram_bot_controller.py

    • 非常重要:替換程式碼中的 YOUR_TELEGRAM_BOT_TOKEN

    • 打開終端機或命令提示字元,導航到該檔案所在目錄。

    • 執行:python telegram_bot_controller.py

    • 你的終端機應該會顯示 MQTT 連線成功的訊息和 Bot 啟動的日誌。

  3. 透過 Telegram 應用程式控制

    • 在你的 Telegram App 中,搜尋你用 BotFather 建立的 Bot 的使用者名稱 (例如 @MyESP32_IoT_bot)。

    • 點擊它並開始對話。

    • 傳送 /start:Bot 會回覆歡迎訊息和可用指令列表。

    • 控制 LED

      • 傳送 /led_on,觀察 Wokwi 中的 LED 是否亮起。

      • 傳送 /led_off,觀察 LED 是否熄滅。

      • 傳送 /led_flash,觀察 LED 是否開始閃爍。

      • 傳送 /led_timer,觀察 LED 亮起 10 秒後是否自動熄滅。

    • 獲取感測器數據

      • 傳送 /status,Bot 會回覆最新的溫度、濕度和 ESP32 的連線狀態。


這樣,你就實現了直接透過 Telegram 程式控制 Wokwi ESP32 的功能,而不需要另外的 Tkinter GUI 應用程式。這個 Python Bot 程式就是你的主要控制介面。












沒有留言:

張貼留言

ESP32 (ESP-IDF in VS Code) MFRC522 + MQTT + PYTHON TKinter +SQLite

 ESP32 (ESP-IDF in VS Code) MFRC522 + MQTT + PYTHON TKinter +SQLite  ESP32 VS Code 程式 ; PlatformIO Project Configuration File ; ;   Build op...