2026 作業3 RFID+ Telegram + Python Tkinter 練習 (Wokwi mfrc55 元件)
1) Telegram 電腦版 手機版 安裝
Wokwi 是一個完全在雲端(瀏覽器)執行的微處理器模擬器,它的組譯與編譯(Compile)工作都是在後端的雲端伺服器上即時處理。
原則上,Wokwi 的伺服器架構相當彈性,隨時都可以使用。但如果從伺服器負載與網路延遲的角度來看,以下幾個時間點通常反應最快、最不容易因為伺服器忙碌而出現超時(Timeout)或組譯失敗:
1. 避開歐美日間尖峰(台灣時間:深夜至清晨)
Wokwi 的使用者遍布全球,特別是歐美的學校、創客與工程師。
最佳時間: 台灣時間 上午 06:00 到 中午 12:00,或者 深夜 23:00 以後。
原因: 這段時間剛好是美國的深夜/清晨,以及歐洲的凌晨,全球整體的伺服器使用量處於低谷,系統資源最充裕。
2. 避開台灣與亞洲的教學熱門時段
許多大專院校和高職的電子、資訊科系(例如物聯網、微處理器專題課程)會使用 Wokwi 進行線上教學與實作。
建議避開: 週一至週五的 上午 09:00 - 12:00、下午 13:30 - 16:30。
原因: 當多個班級同時上課、幾百名學生一起按下「執行/編譯」時,短時間內會造成伺服器排隊現象。
💡 排除組譯失敗的工程實用建議
如果您在編譯 ESP32 或 Arduino 專案時經常遇到失敗,通常不一定是時間問題,更常見的是以下幾個隱性原因。可以透過以下方式大幅提升組譯成功率:
檢查第三方函式庫(Libraries): Wokwi 載入過多或不相容的函式庫有時會導致編譯器卡死。建議檢查
diagram.json或pip相關設定,確認函式庫版本是否衝突。避免過大的全域陣列: 如果在程式碼中宣告了過大的靜態陣列(特別是記憶體較小的晶片),容易觸發編譯器的記憶體配置錯誤。
瀏覽器快取殘留: 有時是前端瀏覽器與後端 WebSocket 連線斷開。如果發現一直卡在 "Compiling...",直接 重新整理網頁(F5) 或複製程式碼開一個新的 Wokwi 專案視窗,通常就能立刻解決。
- Android: 開啟 Google Play 商店,搜尋「Telegram」並點擊安裝。
- iOS (iPhone): 開啟 App Store,搜尋「Telegram」並下載。
- 安裝完成: 點擊「Start Messaging」,輸入手機號碼(若為台灣號碼,例如 0912-345-678,輸入 912345678 即可,不需加前綴 0)。
- 驗證與註冊: 輸入簡訊接收到的驗證碼,輸入名字後即完成。
- 官方網站: 訪問 telegram.org/dl 下載對應版本。
- 應用商店:Windows 用戶可在 Microsoft Store 直接搜尋下載。
- 安裝與登入: 執行安裝檔,輸入手機號碼,並使用手機端 Telegram 接收到的驗證碼登入。
- 繁體中文語言包: 在 Telegram 中開啟此連結 https://t.me/setlanguage/taiwan 並點擊「Change」即可。
- 隱私與安全: Telegram 使用電話號碼註冊,無需密碼。
- 網頁版: 若不便安裝應用程式,可使用 Web 網頁版。
(Wokwi 與 Telegram 二者溝通訊息反映比較慢)
📋 【系統使用指令手冊】
🔹 /on : 💡 開啟 LED (恆亮)
🔹 /off : 🌑 關閉 LED
🔹 /flash : ⚡ 進入 LED 閃爍模式
🔹 /timer : ⏳ 開啟 LED 持續 5 秒
🔹 /rfid : 🔍 讀取查詢最後一筆 UID
請點擊上方藍色指令字體直接進行遠端控制。
如何下載及安裝 telegram(電報) app 、如何中文化它的操作介面
https://alex9ufoexploer.blogspot.com/2025/04/telegram-app.html
- 下載: Android 開啟 Google Play,iOS 開啟 App Store,搜尋 "Telegram" 並安裝。
- 註冊: 開啟 APP,點擊 "Start Messaging",輸入手機號碼。
- 驗證: 輸入收到的簡訊驗證碼 (SMS) 完成登入。
- 下載: 前往 Telegram 官網 或 Microsoft Store 下載 Telegram Desktop。
- 安裝:執行下載的安裝檔,按指示完成安裝。
- 登入: 開啟程式,使用手機版 Telegram 掃描 QR Code 或輸入手機號碼驗證登入。
- 繁體中文(台灣)語言包: 點擊此連結切換。
- 切換方法: 點擊連結後,在彈出視窗點選 "Change" 或 "Apply Language"。
- 驗證碼: 若收不到簡訊,請檢查手機號碼是否正確,或嘗試使用語音驗證。
- 同步: 只要使用相同手機號碼,手機和電腦的對話紀錄會自動同步。
Telegram基本設定:帳號註冊、介面中文化、建立使用者、隱藏電話號碼
- 打開 Telegram,在搜尋欄輸入
@BotFather(請認明有藍色勾勾的官方帳號)並開始對話。 - 輸入指令
/newbot送出。 - 依序設定機器人資料:
- Name:設定機器人的顯示名稱(中英文皆可,後續可更改)。
- Username:設定機器人的帳號 ID(必須是英文,且結尾必須包含
bot,例如mytest_bot,此名稱具唯一性,註冊後無法輕易更改)。
- 建立成功後,BotFather 會回傳一長串訊息,其中包含一組 HTTP API Token(類似
123456789:ABCdefGhIJKlmNoPQRsTUVwxyZ)。請將它複製並妥善保存,這是串接程式的鑰匙。
- 在 Telegram 搜尋你剛剛設定的 Username(例如
@mytest_bot)並進入對話視窗。 - 點擊下方的 「Start」(開始)按鈕來啟動機器人。
- 回到 @BotFather 對話視窗,輸入並傳送
/mybots。 - 點選你剛剛建立的機器人名稱。
- 依序點擊 Bot Settings Group PrivacyTurn off(關閉隱私模式,若畫面顯示 Turn on 則代表已成功關閉)。
- 將你的機器人 (Bot) 加入該群組或頻道,並給予管理員權限。
- 在群組或頻道中隨意發送一則訊息(例如
hello)。 - 最快方式(電腦版限定):在該則訊息上點擊右鍵並選擇 Copy Message Link (複製訊息連結)。
- 連結格式會類似:
https://t.me - 其中的
-123456789(包含負號) 就是此群組的 Chat ID。
- 連結格式會類似:
- API 查詢方式:打開瀏覽器,輸入網址:
https://api.telegram.org/bot<你的_BOT_TOKEN>/getUpdates。在回傳的 JSON 原始碼中,尋找"chat":{"id":...}欄位,即可看到該群組或頻道的完整 Chat ID。
- 搜尋並加入 @RawDataBot 或 @getidsbot。
- 點擊「Start」。
- 機器人會直接回傳你目前的 Chat ID 及該次對話的相關資訊。若將它加入群組,也可以直接在群組內查詢該群組的 ID。
//=====================================================================
這份針對 Wokwi ESP32 平台開發的韌體,採用了 FreeRTOS 多工作業系統架構,將核心功能切分成多個獨立任務(Task),並分配到 ESP32 的雙核心(Core 0 與 Core 1)平行處理。這種設計能有效防止通訊網路(WiFi/MQTT/Telegram)延遲時導致硬體(RFID 掃描、LED 閃爍)產生卡頓。
以下為您進行全支程式碼的逐行詳細拆解說明:
一、 標頭檔導入 (Header Includes)
#include <SPI.h> // 用於與 MFRC522 RFID 模組進行 SPI 界面通訊
#include <MFRC522.h> // MFRC522 RFID 讀卡機驅動函式庫
#include <WiFi.h> // ESP32 基礎 Wi-Fi 連線功能
#include <WiFiClientSecure.h> // 安全加密的網路客戶端,為 Telegram HTTPS 連線必備
#include <UniversalTelegramBot.h> // Telegram Bot API 控制函式庫
#include <ArduinoJson.h> // Telegram 函式庫底層解析 JSON 封包必備
#include <Wire.h> // I2C 界面通訊函式庫(用於 LCD 螢幕)
#include <LiquidCrystal_I2C.h> // I2C 介面的 1602 LCD 驅動函式庫
#include <PubSubClient.h> // MQTT 協定通訊函式庫,用於與 Python 控制端連線
二、 硬體腳位與基本參數定義
// --- 1. 硬體與網路設定 ---
#define SS_PIN 5 // RFID 模組的 SPI 選擇腳位 (Slave Select)
#define RST_PIN 22 // RFID 模組的重設腳位 (Reset)
#define LED_PIN 2 // ESP32 板載開發燈腳位 (GPIO2)
#define I2C_SDA 17 // LCD 螢幕 I2C 資料腳位 (SDA)
#define I2C_SCL 16 // LCD 螢幕 I2C 時脈腳位 (SCL)
const char* ssid = "Wokwi-GUEST"; // Wokwi 虛擬環境專用的免密碼 Wi-Fi 熱點名稱
const char* password = ""; // Wokwi 虛擬 Wi-Fi 無密碼
三、 雲端服務與通訊主題設定 (Cloud & MQTT Configuration)
// Telegram 帳號資訊
#define BOTtoken "7738940254:AAHbrWu9ovb1BKPQyWsbNSjNxfCGCrEWU-o" // 專屬的 Telegram 機器人金鑰
#define CHAT_ID "7965218469" // 接收通知的個人或群組 ID
// MQTT 資訊
const char* mqtt_server = "broker.emqx.io"; // 使用 EMQX 公共測試伺服器
const char* topic_cmd = "esp32/alex/control"; // ESP32 訂閱此主題,接收 Python 傳來的控制字串
const char* topic_data = "esp32/rfid/data"; // ESP32 將讀取到的 RFID 卡號推送給 Python
const char* topic_status = "esp32/alex/status"; // ESP32 回報當前 LED 狀態給 Python
四、 系統物件初始化
// --- 2. 物件初始化 ---
WiFiClient wifiClient; // 建立基礎不加密的 Wi-Fi 客戶端實例(供 MQTT 使用)
WiFiClientSecure secured_client; // 建立 SSL 加密的 Wi-Fi 客戶端實例(供 Telegram 使用)
PubSubClient mqttClient(wifiClient); // 以基礎 Wi-Fi 實例初始化 MQTT 客戶端
UniversalTelegramBot bot(BOTtoken, secured_client); // 以加密客戶端與 Token 初始化 Telegram 機器人
LiquidCrystal_I2C lcd(0x27, 16, 2); // 初始化 LCD,設定 I2C 位址為 0x27,大小為 16行x2列
MFRC522 mfrc522(SS_PIN, RST_PIN); // 初始化 MFRC522 RFID 實例並綁定腳位
五、 全域變數、自由即時作業系統隊列與列舉 (FreeRTOS Queue & Variables)
// --- 3. 全域變數 ---
QueueHandle_t rfidQueue; // 定義 FreeRTOS 的隊列控制代碼,用於跨核心任務傳遞資料
struct RfidMsg { char uid[20]; }; // 定義隊列傳遞的資料結構:內含一個 20 字元的字元陣列儲存卡號
enum LedMode { LED_OFF, LED_ON, LED_FLASH, LED_TIMER }; // 定義 LED 四種工作狀態的列舉型態
volatile LedMode currentMode = LED_OFF; // 用 volatile 宣告當前模式,確保跨核心讀寫時資料同步不卡死
unsigned long timerStartTime = 0; // 紀錄 Timer 模式啟動時的系統時間毫秒數 (millis)
bool statusNeedsUpdate = false; // 旗標:當 5 秒計時自動結束時,轉為 true 通知通訊任務發送回報
String lastUid = "None"; // 儲存上一筆成功發行的 UID 卡號,用於過濾重複刷卡
六、 輔助顯示與回報函式 (Helper Functions)
// --- 4. 顯示輔補函式 ---
void displayLog(String line1, String line2) {
// 同步輸出除錯訊息到電腦的序列埠監視器 (Serial Monitor)
Serial.println("\n========================================");
Serial.println("LCD L1: " + line1);
Serial.println("LCD L2: " + line2);
Serial.println("========================================");
lcd.clear(); // 清除 LCD 螢幕快取
lcd.setCursor(0, 0); lcd.print(line1.substring(0, 16)); // 在第一行顯示前 16 個字元
lcd.setCursor(0, 1); lcd.print(line2.substring(0, 16)); // 在第二行顯示前 16 個字元
}
// 狀態同步回報
void reportStatus(String state, String source) {
String msg = "📢 LED: " + state + " (來自 " + source + ")";
// 透過路徑一:向 Telegram 視窗推播即時文字訊息
Serial.printf("[TX -> TG] 發送通知: %s\n", msg.c_str());
bot.sendMessage(CHAT_ID, msg, "");
// 透過路徑二:發布最新狀態到 MQTT,讓 Python 控制端可以Polling或動態更新介面
Serial.printf("[TX -> MQTT] 發送狀態至 [%s]: %s\n", topic_status, state.c_str());
mqttClient.publish(topic_status, state.c_str());
// 本地端同步:改寫 LCD 螢幕顯示
displayLog("LED -> " + state, "Src: " + source);
}
七、 指令解析核心 (Command Parser)
// --- 5. 核心控制解析 ---
void executeCommand(String cmd, String source) {
cmd.toLowerCase(); // 將接收到的字串強制轉為小寫,避免因大小寫不符造成無法識別
cmd.trim(); // 清除字串前後多餘的空白或換行符號
if (cmd.length() == 0) return; // 空字串不處理
Serial.printf("[CORE] 解析指令 [%s] | 來源: %s\n", cmd.c_str(), source.c_str());
// 解析 "on" 或 "/on":切換為恆亮
if (cmd == "on" || cmd == "/on") {
currentMode = LED_ON;
reportStatus("ON", source);
}
// 解析 "off" 或 "/off":關閉
else if (cmd == "off" || cmd == "/off") {
currentMode = LED_OFF;
reportStatus("OFF", source);
}
// 解析 "flash" 或 "/flash":切換至閃爍狀態
else if (cmd == "flash" || cmd == "/flash") {
currentMode = LED_FLASH;
reportStatus("FLASHING", source);
}
// 解析 "timer" 或 "/timer":啟動 5 秒定時
else if (cmd == "timer" || cmd == "/timer") {
currentMode = LED_TIMER;
timerStartTime = millis(); // 記錄目前的起算時間
reportStatus("TIMER_5S", source);
}
// 解析 "rfid" 或 "/rfid":響應來自 Python 端的每 2 秒定期輪詢 (Polling)
else if (cmd == "rfid" || cmd == "/rfid") {
Serial.printf("[TX -> MQTT] 響應輪詢, 推送當前 UID [%s] 至 [%s]\n", lastUid.c_str(), topic_data);
mqttClient.publish(topic_data, lastUid.c_str()); // 主動將快取的 UID 回傳給 Python
displayLog("M-Tx Polling", "UID: " + lastUid);
} else {
Serial.printf("[CORE] 未知指令: %s\n", cmd.c_str());
}
}
八、 MQTT 接收回呼機制 (MQTT Callback)
// --- 6. MQTT 接收處理 ---
void mqttCallback(char* topic, byte* payload, unsigned int length) {
String message = "";
for (int i = 0; i < length; i++) message += (char)payload[i]; // 將位元組陣列重組為 String 字串
Serial.printf("\n[RX <- MQTT] 收到主題: %s | 內容: %s\n", topic, message.c_str());
// 如果確定收到的主題是控制路徑
if (String(topic) == topic_cmd) {
displayLog("[M-Rx] Cmd In", "Msg: " + message);
executeCommand(message, "MQTT-Python"); // 丟入核心解析器,標記來源為 MQTT-Python
}
}
九、 FreeRTOS 多工任務實作 (FreeRTOS Tasks)
1. LED 硬體控制任務 (由 Core 1 負責)
// [任務] LED 硬體行為
void ledTask(void *pvParameters) {
pinMode(LED_PIN, OUTPUT); // 初始化硬體腳位為輸出模式
while (true) {
switch (currentMode) {
case LED_ON:
digitalWrite(LED_PIN, HIGH);
break;
case LED_OFF:
digitalWrite(LED_PIN, LOW);
break;
case LED_FLASH:
digitalWrite(LED_PIN, !digitalRead(LED_PIN)); // 反轉當前電位狀態(達成閃爍)
vTaskDelay(300 / portTICK_PERIOD_MS); // 閃爍間隔為 300 毫秒
continue; // 跳過底下預設延遲,直接進入下一次切換
case LED_TIMER:
digitalWrite(LED_PIN, HIGH); // 點亮 LED
if (millis() - timerStartTime > 5000) { // 當現在時間減去起算時間超過 5000 毫秒
currentMode = LED_OFF; // 強制切換為關閉
statusNeedsUpdate = true; // 啟動回報旗標
}
break;
}
vTaskDelay(50 / portTICK_PERIOD_MS); // 基礎狀態機掃描延遲 50 毫秒,釋放 CPU 資源
}
}
2. 核心雙路徑通訊管理任務 (由 Core 0 專職網絡運算)
// [任務] 雙路徑通訊管理 (Telegram & MQTT)
void commTask(void *pvParameters) {
WiFi.begin(ssid, password); // 啟動 Wi-Fi 連線程序
Serial.print("[SYS] 正在連線至 Wi-Fi");
displayLog("WiFi Connecting", ssid);
while (WiFi.status() != WL_CONNECTED) { // 阻塞等待直到 Wi-Fi 成功分配 IP
vTaskDelay(500 / portTICK_PERIOD_MS);
Serial.print(".");
}
Serial.printf("\n[SYS] Wi-Fi 已連線! IP: %s\n", WiFi.localIP().toString().c_str());
displayLog("WiFi Connected", WiFi.localIP().toString());
secured_client.setInsecure(); // 跳過對 Telegram 伺服器 SSL 憑證的嚴格本地鏈檢驗(省記憶體)
mqttClient.setServer(mqtt_server, 1883); // 配置 MQTT 伺服器 IP 與通訊埠
mqttClient.setCallback(mqttCallback); // 綁定回呼函式
unsigned long lastBotRun = 0; // 紀錄上一次檢索 Telegram 的系統時間
RfidMsg rMsg; // 宣告本地的 RFID 資料結構容器
bool welcomeSent = false; // 用來標記開機手冊是否已發送
while (true) {
// [路徑 A] MQTT 連線狀態維護
if (!mqttClient.connected()) {
Serial.println("[MQTT] 嘗試連接至 Broker...");
// 使用隨機亂數衍生 Client ID,防範與 Python 控制端因名稱重複而被迫踢下線
String clientId = "ESP32_Alex_" + String(random(1000, 9999));
if (mqttClient.connect(clientId.c_str())) {
Serial.println("[MQTT] 伺服器連接成功!");
mqttClient.subscribe(topic_cmd); // 重新連線後必須重新訂閱控制主題
Serial.printf("[MQTT] 成功訂閱主題: %s\n", topic_cmd);
displayLog("MQTT Connected", "Subscribed OK");
} else {
Serial.printf("[MQTT] 連線失敗, 錯誤代碼 = %d (5秒後重試)\n", mqttClient.state());
vTaskDelay(5000 / portTICK_PERIOD_MS); // 連線失敗後,延遲 5 秒再重試
continue;
}
}
mqttClient.loop(); // 保持 MQTT 底層 Socket 封包的即時心跳與收發處理
// 🚀 開機手冊推播邏輯:當網路皆就緒後,僅執行一次
if (!welcomeSent) {
String manual = "🤖 RFID 控制系統已開機上線!\n";
manual += "📋 【系統使用指令手冊】\n\n";
manual += "🔹 /on : 💡 開啟 LED (恆亮)\n";
manual += "🔹 /off : 🌑 關閉 LED\n";
manual += "🔹 /flash : ⚡ 進入 LED 閃爍模式\n";
manual += "🔹 /timer : ⏳ 開啟 LED 持續 5 秒\n";
manual += "🔹 /rfid : 🔍 讀取查詢最後一筆 UID\n\n";
manual += "請點擊上方藍色指令字體直接進行遠端控制。";
Serial.println("[SYS] 正在發送開機手冊至 Telegram...");
if (bot.sendMessage(CHAT_ID, manual, "")) {
Serial.println("[SYS] 手冊發送成功!");
welcomeSent = true; // 成功發送後將旗標設為 true,從此不再洗版
} else {
Serial.println("[SYS] 手冊發送失敗,將於下次循環重試。");
}
}
// [路徑 B] Telegram 輪詢監聽 (每 1.5 秒執行一次發包查詢,優化頻寬)
if (millis() - lastBotRun > 1500) {
int num = bot.getUpdates(bot.last_message_received + 1); // 向伺服器調取最新一期未讀訊息
if (num > 0) {
Serial.printf("[RX <- TG] 收到 %d 條新訊息\n", num);
for (int i = 0; i < num; i++) {
displayLog("[TG-Rx] From: " + bot.messages[i].from_name, "Cmd: " + bot.messages[i].text);
executeCommand(bot.messages[i].text, "Telegram"); // 將文字內容交付解碼
}
}
lastBotRun = millis(); // 更新檢索時間
}
// 當 LED 計時終了時的被動回報程序
if (statusNeedsUpdate) {
reportStatus("OFF (Timer End)", "SYSTEM");
statusNeedsUpdate = false; // 清除旗標
}
// 🚀 RFID 跨核心隊列解算核心:過濾同卡連續刷入
if (xQueueReceive(rfidQueue, &rMsg, 0) == pdPASS) { // 非阻塞檢索 Queue(有卡片進來才成立)
String newUid = String(rMsg.uid);
// 關鍵安全檢查:比對本次刷入之卡號是否與系統內記錄的上一筆完完全全一致
if (newUid == lastUid) {
// 如果一致,代表是卡片放在感應區沒拿開,直接攔截拋棄不處置
Serial.printf("[RFID FILTER] 偵測到重複 UID [%s],捨棄不發行。\n", newUid.c_str());
}
else {
// 如果是全新刷入的不同卡片,更新系統內部變數,並觸發雙路徑通訊發行
lastUid = newUid;
Serial.printf("\n[RFID CORE] 偵測到新卡片! UID: %s\n", lastUid.c_str());
displayLog("New RFID In", "UID: " + lastUid);
// 發行路徑一:向 Telegram 推播感應警報
Serial.println("[TX -> TG] 主動推播新刷卡通知...");
bot.sendMessage(CHAT_ID, "🔔 RFID 新感應通知\n卡號: " + lastUid, "");
// 發行路徑二:向 MQTT 的 data 主題發布最新卡號,使 Python 端對應的 UI 得以同步
Serial.printf("[TX -> MQTT] 主動發布新 UID 至 [%s]...\n", topic_data);
mqttClient.publish(topic_data, lastUid.c_str());
}
}
vTaskDelay(30 / portTICK_PERIOD_MS); // 通訊主要任務循環時間:基礎延遲 30 毫秒
}
}
3. 實體 RFID 卡片高頻硬體掃描任務 (由 Core 1 負責)
// [任務] RFID 實體晶片掃描
void rfidTask(void *pvParameters) {
SPI.begin(); // 初始化 SPI 匯流排
mfrc522.PCD_Init(); // 初始化 MFRC522 射頻晶片
RfidMsg rMsg; // 建立區域的結構資料結構暫存體
while (true) {
// 偵測是否有新卡片貼近天線,且是否成功解讀出該卡片之階層序列號
if (mfrc522.PICC_IsNewCardPresent() && mfrc522.PICC_ReadCardSerial()) {
String uid = "";
for (byte i = 0; i < mfrc522.uid.size; i++) { // 迴圈重組多個 Byte 的 HEX 16進位十六進制數值
uid += (mfrc522.uid.uidByte[i] < 0x10 ? "0" : "");
uid += String(mfrc522.uid.uidByte[i], HEX);
}
uid.toUpperCase(); // 卡號強制轉化為標準大寫英文字母
memset(rMsg.uid, 0, sizeof(rMsg.uid)); // 清空記憶體緩衝區
uid.toCharArray(rMsg.uid, 20); // 將字串塞入要跨核心傳遞的字元陣列中
// 核心機制:透過 FreeRTOS Queue 機制,將本地讀到的卡號「跨核心」安全投遞給 Core 0 的通訊核心
xQueueSend(rfidQueue, &rMsg, portMAX_DELAY); // 如果 Queue 滿了,則掛起任務等待直到寫入成功
mfrc522.PICC_HaltA(); // 停止該卡片的射頻感應,防止其持續發出反向散射載波
mfrc522.PCD_StopCrypto1(); // 結束暫存的加密通訊階段
}
vTaskDelay(200 / portTICK_PERIOD_MS); // 讀卡器硬體掃描頻率:每 200 毫秒掃描一次
}
}
十、 系統硬體啟動配置段 (System Setup & Loop)
void setup() {
Serial.begin(115200); // 啟動主硬體序列埠通訊,鮑率設定為常用的 115200
delay(1000); // 預留穩定系統電源的緩衝 1 秒鐘
Wire.begin(I2C_SDA, I2C_SCL); // 啟動 I2C 控制匯流排並綁定指定腳位
lcd.init(); // 初始化 LCD
lcd.backlight(); // 點亮 LCD 的 LED 背光燈板
displayLog("System Loading", "Manual Ver 4.0"); // 開機初段畫面輸出
// 初始化建立安全互鎖的 FreeRTOS 隊列,長度為 10 個 RfidMsg 單元
rfidQueue = xQueueCreate(10, sizeof(RfidMsg));
// 🚀 多工核心任務指派與負載平衡分工
// 指派任務 1 (通訊任務) 固定在 Core 0 核心運行,分配 10240 Bytes 的獨立堆疊空間,優先級為 1
xTaskCreatePinnedToCore(commTask, "Comm", 10240, NULL, 1, NULL, 0);
// 指派任務 2 (LED硬體動作任務) 固定在 Core 1 核心運行,優先級為 2 (高於讀卡,防閃爍時脈抖動)
xTaskCreatePinnedToCore(ledTask, "LED", 2048, NULL, 2, NULL, 1);
// 指派任務 3 (RFID硬體讀取任務) 固定在 Core 1 核心運行,優先級為 1
xTaskCreatePinnedToCore(rfidTask, "RFID", 4096, NULL, 1, NULL, 1);
}
void loop() {
// 由於採用了全功能 FreeRTOS 子任務架構,Arduino 原生的 loop() 段已不需要使用
vTaskDelete(NULL); // 系統啟動完成後,主動刪除 loop 任務本身,將剩餘的主核心資源全面回收到記憶體中
}import tkinter as tk from tkinter import messagebox import paho.mqtt.client as mqtt import time # --- 設定區 --- MQTT_BROKER = "broker.emqx.io" MQTT_TOPIC = "esp32/alex/control" RFID_TOPIC = "esp32/rfid/data" class RfidControlApp: def __init__(self, root): self.root = root self.root.title("Alex RFID 控制系統 (自動輪詢版)") self.root.geometry("300x420") # 內部變數:記錄最後接收到的資料與時間 self.current_uid = "None" self.last_update_time = "無" # MQTT 連線初始化 (相容新舊版本 paho-mqtt) try: self.client = mqtt.Client(mqtt.CallbackType.VERSION_1) except AttributeError: self.client = mqtt.Client() self.client.on_connect = self.on_connect self.client.on_message = self.on_message self.client.connect(MQTT_BROKER, 1883, 60) self.client.loop_start() # UI 介面設計 tk.Label(root, text="RFID 系統控制台", font=("Arial", 16, "bold")).pack(pady=10) self.btn_on = tk.Button(root, text="開啟 LED (/on)", command=lambda: self.send_cmd("on"), width=20, bg="#e1f5fe") self.btn_on.pack(pady=5) self.btn_off = tk.Button(root, text="關閉 LED (/off)", command=lambda: self.send_cmd("off"), width=20) self.btn_off.pack(pady=5) self.btn_flash = tk.Button(root, text="閃爍模式 (/flash)", command=lambda: self.send_cmd("flash"), width=20) self.btn_flash.pack(pady=5) self.btn_timer = tk.Button(root, text="開啟 5 秒 (/timer)", command=lambda: self.send_cmd("timer"), width=20) self.btn_timer.pack(pady=5) tk.Frame(root, height=2, bd=1, relief=tk.SUNKEN).pack(fill=tk.X, padx=5, pady=10) # 狀態顯示區 tk.Label(root, text="[ 自動輪詢狀態 ]", font=("Arial", 10, "bold"), fg="gray").pack() self.label_uid = tk.Label(root, text="最後讀取: None", font=("Arial", 11, "bold"), fg="blue") self.label_uid.pack(pady=5) self.label_time = tk.Label(root, text="更新時間: 無", font=("Arial", 9), fg="green") self.label_time.pack(pady=2) # 🚀 核心:啟動每 2 秒定期 Polling 的循環任務 self.start_polling_loop() def on_connect(self, client, userdata, flags, rc): print(f"MQTT 已連線, 代碼: {rc}") client.subscribe(RFID_TOPIC) def on_message(self, client, userdata, msg): # 當 Wokwi ESP32 或模擬端將卡片 UID 丟上 RFID_TOPIC 時觸發 payload = msg.payload.decode() self.current_uid = payload self.last_update_time = time.strftime("%H:%M:%S") print(f"[MQTT 接收] 收到新 UID: {payload}") def send_cmd(self, cmd): self.client.publish(MQTT_TOPIC, cmd) print(f"發送指令: {cmd}") # 🚀 定期 Polling 的核心函式 def start_polling_loop(self): """每 2 秒執行一次的非阻塞循環""" # 1. 執行輪詢動作:向 MQTT 發送 rfid 請求指令(通知 ESP32 上傳最新狀態) print("[Polling] 每 2 秒定期請求 UID 狀態...") self.send_cmd("rfid") # 2. 更新 UI 介面(顯示當前最新的資料) self.label_uid.config(text=f"最後讀取: {self.current_uid}") self.label_time.config(text=f"更新時間: {self.last_update_time}") # 3. 設定 2000 毫秒(2秒)後,再次呼叫自己,形成無限定期循環 self.root.after(2000, self.start_polling_loop) if __name__ == "__main__": root = tk.Tk() app = RfidControlApp(root) root.mainloop()
//=====================================================================
這份 Python 程式是利用 Tkinter 函式庫 製作的圖形介面(GUI)控制台,並透過 MQTT 協定(paho-mqtt) 與 Wokwi 上的 ESP32 進行雙向通訊。它最大的特點是利用非阻塞的定時器,每 2 秒自動向 ESP32 輪詢(Polling)最新的 RFID 卡號狀態。
以下為您進行全支程式碼的逐行詳細拆解說明:
一、 匯入函式庫 (Imports)
import tkinter as tk # 匯入 Python 內建的圖形介面(GUI)函式庫,並簡寫為 tk
from tkinter import messagebox # 從 tkinter 匯入彈出式對話框模組(如提示、警告視窗)
import paho.mqtt.client as mqtt # 匯入 MQTT 客戶端通訊函式庫,並簡寫為 mqtt
import time # 匯入時間函式庫,用於取得當前系統時間與格式化時間
二、 基礎參數設定區 (Configuration)
# --- 設定區 ---
MQTT_BROKER = "broker.emqx.io" # 指定 MQTT Broker(伺服器)位址,使用 EMQX 公共測試伺服器
MQTT_TOPIC = "esp32/alex/control" # 發送控制指令(如 on, off, rfid)給 ESP32 的主題
RFID_TOPIC = "esp32/rfid/data" # 接收從 ESP32 傳回的 RFID 卡號(UID)的主題
三、 類別初始化與 MQTT 連線 (Class & MQTT Init)
class RfidControlApp:
def __init__(self, root):
self.root = root # 將傳入的 Tkinter 主視窗實例存入類別變數 self.root
self.root.title("Alex RFID 控制系統 (自動輪詢版)") # 設定 GUI 視窗的最上方標題
self.root.geometry("300x420") # 設定 GUI 視窗的寬度為 300 像素,高度為 420 像素
# 內部變數:記錄最後接收到的資料與時間
self.current_uid = "None" # 初始化儲存 RFID 卡號的變數,預設為 "None"
self.last_update_time = "無" # 初始化記錄最新更新時間的變數,預設為 "無"
# MQTT 連線初始化 (相容新舊版本 paho-mqtt)
try:
# 嘗試使用新版 paho-mqtt (v2.x) 的語法初始化,指定回呼機制採用舊版 v1 的相容架構
self.client = mqtt.Client(mqtt.CallbackType.VERSION_1)
except AttributeError:
# 如果發生錯誤,代表本地端安裝的是舊版 paho-mqtt (v1.x),則直接使用傳統語法初始化
self.client = mqtt.Client()
self.client.on_connect = self.on_connect # 綁定連線成功的事件處理函式(Callback)
self.client.on_message = self.on_message # 綁定接收到訂閱訊息的事件處理函式(Callback)
self.client.connect(MQTT_BROKER, 1883, 60) # 開始連線到 MQTT 伺服器(預設通訊埠為 1883,保持連線心跳為 60 秒)
self.client.loop_start() # 啟動一個獨立的背景執行緒來處理 MQTT 封包收發,防範卡住 GUI 畫面
四、 圖形介面設計 (UI Layout)
# UI 介面設計
# 建立主標題文字標籤,字型大小 16、粗體,並透過 pack 放置到視窗中(上下外距各 10 像素)
tk.Label(root, text="RFID 系統控制台", font=("Arial", 16, "bold")).pack(pady=10)
# 建立「開啟 LED」按鈕,點擊時觸發 send_cmd("on"),背景色為淺藍色 (#e1f5fe),加入視窗並設定上下外距
self.btn_on = tk.Button(root, text="開啟 LED (/on)", command=lambda: self.send_cmd("on"), width=20, bg="#e1f5fe")
self.btn_on.pack(pady=5)
# 建立「關閉 LED」按鈕,點擊時觸發 send_cmd("off")
self.btn_off = tk.Button(root, text="關閉 LED (/off)", command=lambda: self.send_cmd("off"), width=20)
self.btn_off.pack(pady=5)
# 建立「閃爍模式」按鈕,點擊時觸發 send_cmd("flash")
self.btn_flash = tk.Button(root, text="閃爍模式 (/flash)", command=lambda: self.send_cmd("flash"), width=20)
self.btn_flash.pack(pady=5)
# 建立「開啟 5 秒」按鈕,點擊時觸發 send_cmd("timer")
self.btn_timer = tk.Button(root, text="開啟 5 秒 (/timer)", command=lambda: self.send_cmd("timer"), width=20)
self.btn_timer.pack(pady=5)
# 建立一條水平分割線(高度 2 像素、內縮下陷樣式),用來區隔控制按鈕與下方的狀態顯示區
tk.Frame(root, height=2, bd=1, relief=tk.SUNKEN).pack(fill=tk.X, padx=5, pady=10)
# 狀態顯示區
# 顯示提示文字標籤 "[ 自動輪詢狀態 ]",顏色設定為灰色
tk.Label(root, text="[ 自動輪詢狀態 ]", font=("Arial", 10, "bold"), fg="gray").pack()
# 建立顯示最新 RFID 卡號的文字標籤,預設文字為 "最後讀取: None",顏色為藍色、粗體
self.label_uid = tk.Label(root, text="最後讀取: None", font=("Arial", 11, "bold"), fg="blue")
self.label_uid.pack(pady=5)
# 建立顯示最新更新時間的文字標籤,預設文字為 "更新時間: 無",顏色為綠色
self.label_time = tk.Label(root, text="更新時間: 無", font=("Arial", 9), fg="green")
self.label_time.pack(pady=2)
# 🚀 核心:啟動每 2 秒定期 Polling 的循環任務
self.start_polling_loop() # 呼叫自動輪詢函式,開啟定時任務循環
五、 MQTT 事件處理與發送功能 (MQTT Callbacks & Publish)
def on_connect(self, client, userdata, flags, rc):
# 當成功連線到 MQTT Broker 時,伺服器會回應 rc(連線代碼,0 代表成功)
print(f"MQTT 已連線, 代碼: {rc}")
client.subscribe(RFID_TOPIC) # 連線成功後,立刻訂閱 RFID 卡號主題,準備接收 ESP32 上傳的資料
def on_message(self, client, userdata, msg):
# 當 Wokwi ESP32 或模擬端將卡片 UID 丟上 RFID_TOPIC 時,此函式會自動被背景執行緒觸發
payload = msg.payload.decode() # 將接收到的原始二進位位元組資料解碼為字串
self.current_uid = payload # 將解碼後的卡號暫存到類別變數 self.current_uid 中
self.last_update_time = time.strftime("%H:%M:%S") # 取得當前的系統時間,格式化為「時:分:秒」並存入變數
print(f"[MQTT 接收] 收到新 UID: {payload}") # 在電腦終端機印出接收除錯訊息
def send_cmd(self, cmd):
# 通用的 MQTT 指令發送函式
self.client.publish(MQTT_TOPIC, cmd) # 將指令字串(如 on, off, rfid)發布到控制主題
print(f"發送指令: {cmd}") # 在電腦終端機印出已發送的指令
def query_rfid(self):
# 這是原本手動查詢按鈕保留下來的函式(目前 UI 刻意沒綁定它,由自動輪詢取代)
self.send_cmd("rfid") # 發送 "rfid" 指令要求 ESP32 回傳資料
messagebox.showinfo("查詢", "已請求 ESP32 更新 UID 狀態") # 跳出提示對話框通知使用者
六、 核心計時輪詢迴圈 (Polling Loop via after)
# 🚀 定期 Polling 的核心函式
def start_polling_loop(self):
"""每 2 秒執行一次的非阻塞循環"""
# 1. 執行輪詢動作:向 MQTT 發送 rfid 請求指令(通知 ESP32 上傳最新狀態)
print("[Polling] 每 2 秒定期請求 UID 狀態...")
self.send_cmd("rfid") # 主動向 ESP32 發送獲取 UID 的請求
# 2. 更新 UI 介面(將背景儲存的最新資料即時刷新到視窗畫面上)
self.label_uid.config(text=f"最後讀取: {self.current_uid}") # 更新卡號標籤文字
self.label_time.config(text=f"更新時間: {self.last_update_time}") # 更新時間標籤文字
# 3. 設定 2000 毫秒(2秒)後,再次呼叫自己,形成無限定期循環
# 這是 Tkinter 處理定時任務的核心關鍵。它註冊了一個 2 秒後的鬧鐘,
# 在等待的 2 秒期間內「完全不佔用主執行緒」,因此視窗按鈕依然可以點擊,畫面絕對不會卡死。
self.root.after(2000, self.start_polling_loop)
七、 主程式進入點 (Main Entry)
if __name__ == "__main__":
root = tk.Tk() # 建立 Tkinter 的根視窗實體物件
app = RfidControlApp(root) # 將根視窗傳入,初始化我們的應用程式類別
root.mainloop() # 啟動 Tkinter 的主事件循環(Event Loop),開始監聽視窗上的滑鼠點擊與畫面繪製













沒有留言:
張貼留言