Telegram Bot (Python) 與 MQTT Broker 搭配 Wokwi ESP32 (Arduino)。
提供的 ESP32 程式碼已經是為了這種架構設計的,它透過 MQTT Broker 與外部通訊。而 Telegram Bot (用 Python aiogram 寫的) 則負責將 Telegram 訊息轉換成 MQTT 訊息,並將來自 ESP32 的 MQTT 數據回傳給 Telegram。
主要的「控制」邏輯是在 Telegram Bot 的 Python 程式中實現的,因為它負責解析你的 Telegram 指令 (/led_on 等) 並將其發送到 MQTT Broker,以及接收來自 ESP32 的數據並發送回 Telegram。
Wokwi ESP32 程式碼
請確保你的 Wokwi sketch.ino 檔案中是以下內容。這個程式碼會讓 ESP32 連接到 MQTT Broker,訂閱 LED 控制指令,並發布溫濕度數據。
#include <WiFi.h>
#include <PubSubClient.h>
#include <DHT.h>
#include <DHT_U.h> // 需要同時包含 DHT_U.h
// --- Wi-Fi 設定 ---
const char* ssid = "Wokwi-GUEST"; // 如果在 Wokwi 上測試,請使用 "Wokwi-GUEST"
const char* password = ""; // 對於 Wokwi-GUEST,密碼為空
// --- MQTT 設定 ---
const char* mqtt_server = "broker.mqttgo.io"; //"broker.hivemq.com"; 或 "mqtt.eclipseprojects.io"const int mqtt_port = 1883;
// 重要的:為你的 ESP32 指定一個獨特的 MQTT 客戶端 ID,以避免衝突
const char* mqtt_client_id = "ESP32_Wokwi_Telegram_IoT_YourName_001"; // <<< 請更換為你的唯一 ID!
// --- MQTT 主題定義 ---
// 這些主題名稱必須與你的 Python Telegram Bot 程式碼完全一致!
const char* TELEGRAM_MQTT_TOPIC_LED_CONTROL = "telegram_iot/esp32/led_control"; // Telegram Bot 發布 LED 指令到這裡
const char* TELEGRAM_MQTT_TOPIC_TEMPERATURE = "telegram_iot/esp32/temperature"; // ESP32 發布溫度到這裡
const char* TELEGRAM_MQTT_TOPIC_HUMIDITY = "telegram_iot/esp32/humidity"; // ESP32 發布濕度到這裡
const char* TELEGRAM_MQTT_TOPIC_STATUS = "telegram_iot/esp32/status"; // ESP32 發布上線狀態到這裡 (可選)
// --- WiFi 和 MQTT 客戶端物件 ---
WiFiClient espClient;
PubSubClient client(espClient);
// --- LED 設定 ---
const int ledPin = 2; // 連接到 ESP32 的 GPIO 2
enum LedMode { OFF, ON, FLASH, TIMER }; // OFF 放在最前面,作為預設安全值
volatile LedMode currentLedMode = OFF;
volatile unsigned long timerStartTime = 0;
volatile bool ledState = false; // 用於閃爍模式
// --- DHT22 感測器設定 ---
#define DHTPIN 4 // 連接到 ESP32 的 GPIO 4
#define DHTTYPE DHT22 // DHT 22 (AM2302)
DHT dht(DHTPIN, DHTTYPE);
// --- FreeRTOS 任務句柄 ---
TaskHandle_t TaskLEDControl = NULL;
TaskHandle_t TaskDHTSensor = NULL;
// --- 函式宣告 ---
void setup_wifi();
void reconnect_mqtt();
void mqtt_callback(char* topic, byte* payload, unsigned int length);
void ledControlTask(void *pvParameters);
void dhtSensorTask(void *pvParameters);
void setup() {
Serial.begin(115200);
pinMode(ledPin, OUTPUT);
digitalWrite(ledPin, LOW); // 確保 LED 初始是關閉的
Serial.println("\n--- ESP32 啟動中 ---");
Serial.println("連接 Wi-Fi...");
setup_wifi(); // 連接 Wi-Fi
client.setServer(mqtt_server, mqtt_port); // 設定 MQTT Broker
client.setCallback(mqtt_callback); // 設定 MQTT 訊息回調函式
dht.begin(); // 初始化 DHT 感測器
// 創建 LED 控制任務,運行在 Core 0
xTaskCreatePinnedToCore(
ledControlTask, /* 任務函式 */
"LED Control", /* 任務名稱 */
2048, /* 堆疊大小 (字節) */
NULL, /* 任務參數 */
1, /* 任務優先級 */
&TaskLEDControl, /* 任務句柄 */
0 /* 運行在 Core 0 */
);
Serial.println("LED 控制任務已在 Core 0 創建。");
// 創建 DHT 感測器任務,運行在 Core 1
xTaskCreatePinnedToCore(
dhtSensorTask, /* 任務函式 */
"DHT Sensor", /* 任務名稱 */
4096, /* 堆疊大小 (字節) */
NULL, /* 任務參數 */
1, /* 任務優先級 */
&TaskDHTSensor, /* 任務句柄 */
1 /* 運行在 Core 1 */
);
Serial.println("DHT 感測器任務已在 Core 1 創建。");
Serial.println("--- 設定完成 ---");
Serial.println("等待 MQTT 連線...");
}
void loop() {
// 主循環中只負責維持 MQTT 連線和處理 MQTT 訊息
if (!client.connected()) {
reconnect_mqtt();
}
client.loop(); // 處理所有傳入和傳出的 MQTT 訊息
delay(10); // 短暫延遲,避免佔用過多 CPU 資源
}
// --- Wi-Fi 連線函式 ---
void setup_wifi() {
delay(10);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("\nWiFi 已連線!");
Serial.print("IP 位址: ");
Serial.println(WiFi.localIP());
}
// --- MQTT 重連函式 ---
void reconnect_mqtt() {
while (!client.connected()) {
Serial.print("嘗試 MQTT 連線...");
// 嘗試連線
if (client.connect(mqtt_client_id)) {
Serial.println("已連線!");
// 連線成功後,訂閱 LED 控制主題,接收來自 Telegram Bot 的指令
client.subscribe(TELEGRAM_MQTT_TOPIC_LED_CONTROL);
Serial.print("已訂閱: ");
Serial.println(TELEGRAM_MQTT_TOPIC_LED_CONTROL);
// 可選:發布上線狀態,讓 Telegram Bot 知道 ESP32 已啟動
client.publish(TELEGRAM_MQTT_TOPIC_STATUS, "ESP32_online");
Serial.println("已發布 ESP32 上線狀態。");
Serial.println("\n--- Telegram 控制指令提示 ---");
Serial.println("你可以使用以下 Telegram 指令:");
Serial.println(" /led_on - 開啟 LED");
Serial.println(" /led_off - 關閉 LED");
Serial.println(" /led_flash - 讓 LED 閃爍");
Serial.println(" /led_timer - LED 亮 10 秒後關閉");
Serial.println(" /status - 查詢溫度/濕度/狀態");
Serial.println("----------------------------------");
} else {
Serial.print("連線失敗, 返回碼=");
Serial.print(client.state());
Serial.println(" 5 秒後重試...");
delay(5000); // 等待 5 秒後重試
}
}
}
// --- MQTT 訊息回調函式 ---
// 當收到訂閱主題的訊息時,此函式會被調用
void mqtt_callback(char* topic, byte* payload, unsigned int length) {
Serial.print("\n收到訊息 [");
Serial.print(topic);
Serial.print("] ");
String message = "";
for (int i = 0; i < length; i++) {
message += (char)payload[i];
}
Serial.println(message);
// 檢查是否為 LED 控制指令
if (String(topic) == TELEGRAM_MQTT_TOPIC_LED_CONTROL) {
if (message == "on") {
currentLedMode = ON;
digitalWrite(ledPin, HIGH);
Serial.println("LED 控制: 開啟 (來自 Telegram)");
} else if (message == "off") {
currentLedMode = OFF;
digitalWrite(ledPin, LOW);
Serial.println("LED 控制: 關閉 (來自 Telegram)");
} else if (message == "flash") {
currentLedMode = FLASH;
Serial.println("LED 控制: 閃爍 (來自 Telegram)");
} else if (message == "timer") {
currentLedMode = TIMER;
digitalWrite(ledPin, HIGH); // 定時模式開始時先開啟 LED
timerStartTime = millis();
Serial.println("LED 控制: 定時 (10 秒,來自 Telegram)");
} else {
Serial.println("收到來自 Telegram 的未知 LED 指令。");
}
}
}
// --- LED 控制任務 (運行在 Core 0) ---
void ledControlTask(void *pvParameters) {
(void) pvParameters; // 避免編譯器警告
for (;;) { // 無限循環
switch (currentLedMode) {
case ON:
// LED 保持亮著,狀態由 mqtt_callback 設置
break;
case OFF:
// LED 保持熄滅,狀態由 mqtt_callback 設置
break;
case FLASH:
digitalWrite(ledPin, ledState); // 切換 LED 狀態
ledState = !ledState; // 反轉狀態,用於下次切換
vTaskDelay(pdMS_TO_TICKS(500)); // 延遲 500ms
break;
case TIMER:
if (millis() - timerStartTime >= 10000) { // 檢查是否已過 10 秒
digitalWrite(ledPin, LOW);
currentLedMode = OFF; // 定時結束後轉為 OFF 模式
Serial.println("LED 定時結束。LED 關閉。");
}
vTaskDelay(pdMS_TO_TICKS(10)); // 短暫延遲,讓出 CPU
break;
default:
digitalWrite(ledPin, LOW); // 預設為關閉
break;
}
vTaskDelay(pdMS_TO_TICKS(10)); // 短暫延遲,讓其他任務有機會執行
}
}
// --- DHT 感測器任務 (運行在 Core 1) ---
void dhtSensorTask(void *pvParameters) {
(void) pvParameters; // 避免編譯器警告
for (;;) { // 無限循環
// 定期讀取並發布
float h = dht.readHumidity();
float t = dht.readTemperature(); // 讀取攝氏溫度
// 檢查是否讀取失敗
if (isnan(h) || isnan(t)) {
Serial.println(F("從 DHT 感測器讀取失敗!重試中..."));
} else {
Serial.print(F("DHT 讀取: 濕度: "));
Serial.print(h);
Serial.print(F("% 溫度: "));
Serial.print(t);
Serial.println(F("°C"));
// 發布溫度
char tempString[8];
dtostrf(t, 4, 2, tempString); // 浮點數轉字串 (總共 4 位,小數點後 2 位)
client.publish(TELEGRAM_MQTT_TOPIC_TEMPERATURE, tempString);
// 發布濕度
char humString[8];
dtostrf(h, 4, 2, humString);
client.publish(TELEGRAM_MQTT_TOPIC_HUMIDITY, humString);
}
vTaskDelay(pdMS_TO_TICKS(10000)); // 每 10 秒發布一次數據 (可調整)
}
}
Telegram Bot 後端程式碼 (Python aiogram) - 這個是關鍵!
這個 Python 程式碼就是負責讓你的 Telegram 訊息與 ESP32 溝通的「大腦」。它需要運行在一個可以連網的環境中。
運行前準備:
取得你的 Telegram Bot Token:
在 Telegram App 裡,搜尋並找到
BotFather。傳送
/newbot指令,按照步驟建立你的 Bot,你會得到一個 HTTP API Token (一串像123456789:ABCDE...的字串)。請務必記下這個 Token。
安裝 Python 函式庫: 打開你的終端機或命令提示字元,執行:
Bashpip install aiogram paho-mqttPython 程式碼:將以下程式碼儲存為
telegram_bot_controller.py。請務必將YOUR_TELEGRAM_BOT_TOKEN替換為你實際的 Token!
import asyncio
import logging
import os
import paho.mqtt.client as mqtt
from aiogram import Bot, Dispatcher, types
from aiogram.client.default import DefaultBotProperties # aiogram 3.7.0+ 需要
from aiogram.enums import ParseMode
from aiogram.filters import CommandStart, Command
from aiogram.utils.markdown import hbold
# --- Log 設定 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Telegram Bot 設定 ---
# 從環境變數獲取 Bot Token,若無則使用預設值 (在正式環境中,強烈建議使用環境變數)
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "YOUR_TELEGRAM_BOT_TOKEN") # <<< 請替換成你的 Telegram Bot Token!
# 初始化 Bot,並設定預設的解析模式為 HTML
bot = Bot(token=TELEGRAM_BOT_TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
dp = Dispatcher() # 初始化 Dispatcher
# --- MQTT 設定 (必須與 ESP32 程式碼中的主題名稱一致) ---
MQTT_BROKER = "broker.hivemq.com"
MQTT_PORT = 1883
MQTT_CLIENT_ID = "Telegram_Bot_Backend_Client_YourName_001" # <<< 這個 Bot 也需要一個獨特的 MQTT 客戶端 ID!
# MQTT 主題 (與 ESP32 程式碼一致)
TELEGRAM_MQTT_TOPIC_LED_CONTROL = "telegram_iot/esp32/led_control"
TELEGRAM_MQTT_TOPIC_TEMPERATURE = "telegram_iot/esp32/temperature"
TELEGRAM_MQTT_TOPIC_HUMIDITY = "telegram_iot/esp32/humidity"
TELEGRAM_MQTT_TOPIC_STATUS = "telegram_iot/esp32/status"
# --- 全域變數,用於儲存最新的感測器數據和設備狀態 ---
latest_temperature = "N/A"
latest_humidity = "N/A"
esp32_status = "Offline" # 初始狀態
# --- MQTT 回調函式 ---
def on_connect(client, userdata, flags, rc):
"""當 MQTT 客戶端連接到 Broker 時調用"""
if rc == 0:
logging.info(f"成功連接到 MQTT Broker: {MQTT_BROKER}")
# 連接成功後,訂閱溫濕度數據主題和設備狀態主題
client.subscribe(TELEGRAM_MQTT_TOPIC_TEMPERATURE)
client.subscribe(TELEGRAM_MQTT_TOPIC_HUMIDITY)
client.subscribe(TELEGRAM_MQTT_TOPIC_STATUS)
logging.info(f"已訂閱主題: {TELEGRAM_MQTT_TOPIC_TEMPERATURE}, {TELEGRAM_MQTT_TOPIC_HUMIDITY}, {TELEGRAM_MQTT_TOPIC_STATUS}")
else:
logging.error(f"無法連接到 MQTT Broker, 返回碼: {rc}")
def on_message(client, userdata, msg):
"""當收到訂閱主題的訊息時調用"""
global latest_temperature, latest_humidity, esp32_status
topic = msg.topic
payload = msg.payload.decode()
logging.info(f"MQTT 訊息已接收 - 主題: '{topic}', 內容: '{payload}'")
if topic == TELEGRAM_MQTT_TOPIC_TEMPERATURE:
latest_temperature = payload
elif topic == TELEGRAM_MQTT_TOPIC_HUMIDITY:
latest_humidity = payload
elif topic == TELEGRAM_MQTT_TOPIC_STATUS:
esp32_status = payload
logging.info(f"ESP32 狀態已更新: {esp32_status}")
# --- 初始化 MQTT 客戶端 ---
mqtt_client = mqtt.Client(client_id=MQTT_CLIENT_ID)
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
try:
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
mqtt_client.loop_start() # 在背景執行緒中處理 MQTT 訊息,以免阻塞 Bot
except Exception as e:
logging.error(f"連接 MQTT Broker 失敗: {e}")
# --- Telegram Bot 命令處理函式 ---
@dp.message(CommandStart())
async def command_start_handler(message: types.Message) -> None:
"""處理 /start 命令"""
await message.answer(f"哈囉,{hbold(message.from_user.full_name)}!\n"
"我是你的物聯網助理。\n"
"你可以輸入以下指令來控制設備或查詢狀態:\n"
"/led_on - 開啟 LED\n"
"/led_off - 關閉 LED\n"
"/led_flash - 讓 LED 閃爍\n"
"/led_timer - LED 亮 10 秒後自動關閉\n"
"/status - 查詢溫濕度與設備狀態\n"
"/help - 查看所有指令")
@dp.message(Command("help"))
async def command_help_handler(message: types.Message) -> None:
"""處理 /help 命令"""
await message.answer("你可以輸入以下指令來控制設備或查詢狀態:\n"
"/led_on - 開啟 LED\n"
"/led_off - 關閉 LED\n"
"/led_flash - 讓 LED 閃爍\n"
"/led_timer - LED 亮 10 秒後自動關閉\n"
"/status - 查詢溫濕度與設備狀態\n")
@dp.message(Command("led_on"))
async def command_led_on_handler(message: types.Message) -> None:
"""處理 /led_on 命令,開啟 LED"""
mqtt_client.publish(TELEGRAM_MQTT_TOPIC_LED_CONTROL, "on")
await message.answer("💡 已送出開燈指令!")
@dp.message(Command("led_off"))
async def command_led_off_handler(message: types.Message) -> None:
"""處理 /led_off 命令,關閉 LED"""
mqtt_client.publish(TELEGRAM_MQTT_TOPIC_LED_CONTROL, "off")
await message.answer("💡 已送出關燈指令!")
@dp.message(Command("led_flash"))
async def command_led_flash_handler(message: types.Message) -> None:
"""處理 /led_flash 命令,讓 LED 閃爍"""
mqtt_client.publish(TELEGRAM_MQTT_TOPIC_LED_CONTROL, "flash")
await message.answer("💡 已送出閃爍指令!")
@dp.message(Command("led_timer"))
async def command_led_timer_handler(message: types.Message) -> None:
"""處理 /led_timer 命令,LED 亮 10 秒後自動關閉"""
mqtt_client.publish(TELEGRAM_MQTT_TOPIC_LED_CONTROL, "timer")
await message.answer("💡 已送出定時關燈指令 (10秒後自動關閉)。")
@dp.message(Command("status"))
async def command_status_handler(message: types.Message) -> None:
"""處理 /status 命令,查詢溫濕度與設備狀態"""
# 從全域變數獲取最新的溫濕度數據和設備狀態
response_text = (
f"🏡 環境數據:\n"
f"目前溫度: {latest_temperature}°C\n"
f"目前濕度: {latest_humidity}%\n"
f"設備狀態: {esp32_status}"
)
await message.answer(response_text)
@dp.message()
async def echo_handler(message: types.Message) -> None:
"""處理所有其他非命令的文字訊息"""
await message.answer("抱歉,我不太明白您的意思。\n"
"請使用指令,例如 /led_on 或 /status。")
# --- 啟動 Bot ---
async def main() -> None:
logging.info("啟動 Telegram Bot...")
# 運行 Bot,處理所有傳入更新
await dp.start_polling(bot)
if __name__ == "__main__":
asyncio.run(main())
操作步驟
啟動 Wokwi ESP32:
在 Wokwi 上載入並運行上面的 ESP32 Arduino 程式碼。
確認 電路圖 正確。
檢查 Wokwi 的 Serial Monitor,確保 ESP32 成功連接 Wi-Fi 和 MQTT Broker,並開始發布溫濕度數據。你也會看到它印出 Telegram 控制的提示指令。
運行 Telegram Bot 後端 (Python):
確保你已經用
pip install aiogram paho-mqtt安裝了必要的函式庫。將上面的 Python Telegram Bot 後端程式碼 儲存為
telegram_bot_controller.py。非常重要:替換程式碼中的
YOUR_TELEGRAM_BOT_TOKEN!打開終端機或命令提示字元,導航到該檔案所在目錄。
執行:
python telegram_bot_controller.py。你的終端機應該會顯示 MQTT 連線成功的訊息和 Bot 啟動的日誌。
透過 Telegram 應用程式控制:
在你的 Telegram App 中,搜尋你用 BotFather 建立的 Bot 的使用者名稱 (例如
@MyESP32_IoT_bot)。點擊它並開始對話。
傳送
/start:Bot 會回覆歡迎訊息和可用指令列表。控制 LED:
傳送
/led_on,觀察 Wokwi 中的 LED 是否亮起。傳送
/led_off,觀察 LED 是否熄滅。傳送
/led_flash,觀察 LED 是否開始閃爍。傳送
/led_timer,觀察 LED 亮起 10 秒後是否自動熄滅。
獲取感測器數據:
傳送
/status,Bot 會回覆最新的溫度、濕度和 ESP32 的連線狀態。
這樣,你就實現了直接透過 Telegram 程式控制 Wokwi ESP32 的功能,而不需要另外的 Tkinter GUI 應用程式。這個 Python Bot 程式就是你的主要控制介面。










沒有留言:
張貼留言