Telegram 程式 的指令
🎉 程式已啟動!
可用命令:
/on - 開啟 LED (綠色)
/off - 關閉 LED (紅色)
/flash - LED 閃爍 (紅綠交替)
/timer - LED 開啟 10 秒後自動關閉
/mode - 切換模式 (新增/比對)
/status - 顯示當前模式及最新記錄
Python TKinter程式
import tkinter as tk
from tkinter import messagebox
import paho.mqtt.client as mqtt
import threading
import time
import asyncio
import sqlite3
from datetime import datetime
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
import sys
# --- 1. 全域設定與常數 ---
DB_NAME = "rfid114.db"
MQTT_BROKER = "broker.mqtt-dashboard.com"
MQTT_PORT = 1883
TOPIC_RFID_UID = "alex9ufo/rfid/UID"
TOPIC_LED_CONTROL = "alex9ufo/rfid/led"
TOPIC_LED_STATUS = "alex9ufo/rfid/ledStatus"
# Telegram 設定 (已使用您提供的 Token 和 Chat ID)
TELEGRAM_BOT_TOKEN = "80227200986:AAGymymK9_d1HcTGJ2Wl3mtqHmilxB64_5Zw"
TARGET_CHAT_ID = 791652182469
# 應用程式模式
MODE_ADD = "新增模式"
MODE_COMPARE = "比對模式"
# --- 2. 資料庫操作類別 (修正為 5 欄位: id, date, time, LEDorRFID, memo) ---
class DatabaseManager:
def __init__(self, db_name):
self.db_name = db_name
self.conn = None
self.cursor = None
def connect(self):
self.conn = sqlite3.connect(self.db_name)
self.cursor = self.conn.cursor()
def close(self):
if self.conn:
self.conn.close()
# a. 建立資料表
def create_table(self):
self.connect()
try:
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS rfid_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
time TEXT NOT NULL,
LEDorRFID TEXT,
memo TEXT
)
""")
self.conn.commit()
return True, "資料表 rfid_logs 建立成功或已存在 (5 欄位)。"
except Exception as e:
return False, f"建立資料表失敗: {e}"
finally:
self.close()
# f. 刪除資料表
def delete_table(self):
self.connect()
try:
self.cursor.execute("DROP TABLE IF EXISTS rfid_logs")
self.conn.commit()
return True, "資料表 rfid_logs 刪除成功。"
except Exception as e:
return False, f"刪除資料表失敗: {e}"
finally:
self.close()
# b. 新增一筆資料
def add_log(self, led_or_rfid, memo):
self.connect()
try:
now = datetime.now()
date_str = now.strftime("%Y-%m-%d")
time_str = now.strftime("%H:%M:%S")
self.cursor.execute(
"INSERT INTO rfid_logs (date, time, LEDorRFID, memo) VALUES (?, ?, ?, ?)",
(date_str, time_str, led_or_rfid, memo)
)
self.conn.commit()
last_id = self.cursor.lastrowid
return True, f"新增記錄成功,ID: {last_id}"
except Exception as e:
return False, f"新增記錄失敗: {e}"
finally:
self.close()
# c. 刪除一筆資料
def delete_log_by_id(self, log_id):
self.connect()
try:
self.cursor.execute("DELETE FROM rfid_logs WHERE id=?", (log_id,))
rows_affected = self.cursor.rowcount
self.conn.commit()
if rows_affected > 0:
return True, f"ID {log_id} 的記錄刪除成功。"
else:
return False, f"未找到 ID {log_id} 的記錄。"
except Exception as e:
return False, f"刪除記錄失敗: {e}"
finally:
self.close()
# d. 查詢一筆資料
def query_log_by_id(self, log_id):
self.connect()
try:
self.cursor.execute("SELECT id, date, time, LEDorRFID, memo FROM rfid_logs WHERE id=?", (log_id,))
result = self.cursor.fetchone()
return True, result
except Exception as e:
return False, f"查詢記錄失敗: {e}"
finally:
self.close()
# e. 顯示所有資料最新50筆
def get_latest_logs(self):
self.connect()
try:
self.cursor.execute("SELECT id, date, time, LEDorRFID, memo FROM rfid_logs ORDER BY id DESC LIMIT 50")
results = self.cursor.fetchall()
return True, results
except Exception as e:
return False, f"查詢最新記錄失敗: {e}"
finally:
self.close()
# g. 獲取所有儲存的 UID (僅用於比對模式)
def get_all_rfid_uids(self):
self.connect()
try:
# 查詢所有 memo 欄位符合 '[新增] 卡號號碼:%' 的記錄的 LEDorRFID 欄位值
self.cursor.execute("SELECT DISTINCT LEDorRFID FROM rfid_logs WHERE memo LIKE '[新增] 卡號號碼:%'")
# 獲取結果並轉換為 UID 列表 (確保不是 None)
results = [row[0] for row in self.cursor.fetchall() if row[0] is not None]
return True, results
except Exception as e:
return False, f"獲取所有 UID 失敗: {e}"
finally:
self.close()
# --- 3. Telegram Bot 處理類別 ---
class TelegramBotHandler:
def __init__(self, app, token):
self.app = app
self.token = token
self.tg_loop = None
if token == "YOUR_TELEGRAM_BOT_TOKEN":
print("警告: Telegram Bot Token 未設定,Bot 功能將被跳過。")
self.application = None
return
self.application = Application.builder().token(token).build()
self.application.add_handler(CommandHandler("on", self.handle_command))
self.application.add_handler(CommandHandler("off", self.handle_command))
self.application.add_handler(CommandHandler("flash", self.handle_command))
self.application.add_handler(CommandHandler("timer", self.handle_command))
self.application.add_handler(CommandHandler("status", self.handle_status))
self.application.add_handler(CommandHandler("help", self.handle_help))
self.application.add_handler(CommandHandler("mode", self.handle_mode_switch))
self.bot_thread = threading.Thread(target=self._run_bot, daemon=True)
def start_bot(self):
if self.application:
self.bot_thread.start()
print("Telegram Bot 已啟動")
def _run_bot(self):
if self.application:
try:
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.tg_loop = loop
# 運行輪詢,如果出現 Conflict 錯誤,請檢查是否有其他實例運行
self.application.run_polling(poll_interval=0.5, timeout=10)
except Exception as e:
print(f"Telegram Bot 運行錯誤: {e}")
async def handle_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if update.message.chat_id != TARGET_CHAT_ID:
await update.message.reply_text("未授權的用戶。")
return
command = update.message.text[1:]
self.app.master.after(0, self.app.handle_telegram_command, command)
async def handle_status(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if update.message.chat_id != TARGET_CHAT_ID:
await update.message.reply_text("未授權的用戶。")
return
self.app.master.after(0, self.app.show_logs_in_telegram)
async def handle_mode_switch(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if update.message.chat_id != TARGET_CHAT_ID:
await update.message.reply_text("未授權的用戶。")
return
current_mode = self.app.mode_var.get()
new_mode = MODE_COMPARE if current_mode == MODE_ADD else MODE_ADD
self.app.master.after(0, self.app.set_mode, new_mode)
await update.message.reply_text(f"模式已切換為: {new_mode}")
async def handle_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if update.message.chat_id != TARGET_CHAT_ID:
await update.message.reply_text("未授權的用戶。")
return
await update.message.reply_text(self._get_help_message())
def _get_help_message(self):
"""4. 啟動時發送 Help 訊息給 Telegram"""
return (
"🎉 程式已啟動!\n\n"
"可用命令:\n"
"/on - 開啟 LED (綠色)\n"
"/off - 關閉 LED (紅色)\n"
"/flash - LED 閃爍 (紅綠交替)\n"
"/timer - LED 開啟 10 秒後自動關閉\n"
"/mode - 切換模式 (新增/比對)\n"
"/status - 顯示當前模式及最新記錄"
)
def send_message(self, text):
"""通用發送訊息函數,用於 RFID 結果、LED 狀態及啟動訊息"""
if self.application and self.tg_loop and self.application.running and TARGET_CHAT_ID != 0:
asyncio.run_coroutine_threadsafe(
self.application.bot.send_message(chat_id=TARGET_CHAT_ID, text=text),
self.tg_loop
)
# --- 4. Tkinter 主應用程式類別 ---
class RfidControlApp:
def __init__(self, master):
self.master = master
master.title("RFID & MQTT 控制台")
self.db = DatabaseManager(DB_NAME)
self.status_var = tk.StringVar(value="等待 MQTT 連線...")
self.mode_var = tk.StringVar(value=MODE_ADD)
self.flash_id = None
self.timer_id = None
self.timer_end_time = None
self._setup_gui()
# *** 修正點 1/2: 確保使用最新的 VERSION2 客戶端 ***
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.user_data_set(self)
self.telegram_bot = TelegramBotHandler(self, TELEGRAM_BOT_TOKEN)
if self.telegram_bot.application:
self.telegram_bot.start_bot()
# 延遲發送 Help 訊息,確保 Bot 啟動完成
self.master.after(3000, self._send_initial_help)
self.mqtt_thread = threading.Thread(target=self._mqtt_loop_start, daemon=True)
self.mqtt_thread.start()
master.protocol("WM_DELETE_WINDOW", self.on_closing)
def _setup_gui(self):
# 頂部狀態及模式
mode_frame = tk.Frame(self.master)
mode_frame.pack(pady=10)
# 放大模式標籤字體
tk.Label(mode_frame, text="當前模式:", font=("Arial", 16)).pack(side=tk.LEFT, padx=10)
tk.Label(mode_frame, textvariable=self.mode_var, font=("Arial", 16, "bold")).pack(side=tk.LEFT, padx=10)
# 放大切換模式按鈕
tk.Button(mode_frame, text="切換模式", command=self._toggle_mode, font=("Arial", 12), padx=10, pady=5).pack(side=tk.LEFT, padx=20)
# 圓形 LED 指示燈
self.canvas = tk.Canvas(self.master, width=120, height=120)
self.canvas.pack(pady=15)
self.led_indicator = self.canvas.create_oval(10, 10, 110, 110, fill="gray")
# 狀態顯示 (卡號/LED 動作) - 放大字體
tk.Label(self.master, textvariable=self.status_var, font=("Arial", 16, "bold")).pack(pady=15)
# 資料庫操作按鈕
db_frame = tk.Frame(self.master)
db_frame.pack(pady=10)
# 放大按鈕字體
button_font = ("Arial", 12)
button_padding = {'padx': 8, 'pady': 5}
tk.Button(db_frame, text="a.建表", command=self._handle_db_create, font=button_font, **button_padding).pack(side=tk.LEFT, padx=5)
tk.Button(db_frame, text="e.最新50筆", command=self._handle_db_show, font=button_font, **button_padding).pack(side=tk.LEFT, padx=5)
tk.Button(db_frame, text="f.刪表", command=self._handle_db_delete, font=button_font, **button_padding).pack(side=tk.LEFT, padx=5)
# 查詢/刪除 ID 輸入框及按鈕
query_frame = tk.Frame(self.master)
query_frame.pack(pady=5)
self.id_entry = tk.Entry(query_frame, width=15, font=("Arial", 12))
self.id_entry.pack(side=tk.LEFT, padx=5)
tk.Button(query_frame, text="c.刪除ID", command=self._handle_db_delete_id, font=button_font, **button_padding).pack(side=tk.LEFT, padx=5)
tk.Button(query_frame, text="d.查詢ID", command=self._handle_db_query_id, font=button_font, **button_padding).pack(side=tk.LEFT, padx=5)
# 資料顯示區域標題
tk.Label(self.master, text="--- 最新記錄 ---", font=("Arial", 12, "underline")).pack(pady=10)
# 資料顯示 Text Widget - 增加寬度,使用等寬字體
self.log_display = tk.Text(self.master, height=10, width=80, font=("Courier New", 10))
self.log_display.pack(pady=5, padx=10)
self._update_log_display("等待資料庫查詢...")
def _update_log_display(self, text):
"""用於在主執行緒中更新資料顯示區域 (Text Widget) 的內容"""
self.log_display.delete(1.0, tk.END)
self.log_display.insert(tk.END, text)
def _show_mqtt_error_gui(self, error_message):
"""用於在主執行緒中顯示 MQTT 連線錯誤訊息"""
# 注意: 這裡會顯示錯誤,但真正的連線錯誤是函式簽名不匹配
messagebox.showerror("MQTT 錯誤", f"無法連接到 MQTT Broker: {error_message}")
def on_closing(self):
"""應用程式關閉時的清理工作"""
print("正在關閉應用程式...")
self._cancel_timers()
if hasattr(self, 'telegram_bot') and self.telegram_bot.application:
try:
# 嘗試停止 Telegram 應用程式
self.telegram_bot.application.stop()
print("Telegram Bot 已停止")
except Exception:
pass
if hasattr(self, 'client') and self.client:
self.client.loop_stop()
self.master.destroy()
sys.exit()
# --- 資料庫操作實現 (1.a, c, d, e, f) ---
def _handle_db_create(self):
success, msg = self.db.create_table()
messagebox.showinfo("資料庫", msg)
self._update_log_display(msg)
def _handle_db_delete(self):
if messagebox.askyesno("確認刪除", "確定要刪除整個資料表嗎?資料將無法恢復!"):
success, msg = self.db.delete_table()
messagebox.showinfo("資料庫", msg)
self._update_log_display(msg)
def _handle_db_delete_id(self):
log_id = self.id_entry.get()
if not log_id.isdigit():
messagebox.showerror("錯誤", "請輸入有效的 ID 數字。")
return
success, msg = self.db.delete_log_by_id(int(log_id))
messagebox.showinfo("資料庫", msg)
self._handle_db_show()
def _handle_db_query_id(self):
log_id = self.id_entry.get()
if not log_id.isdigit():
messagebox.showerror("錯誤", "請輸入有效的 ID 數字。")
return
success, result = self.db.query_log_by_id(int(log_id))
if success and result:
msg = (f"查詢結果 (ID {result[0]}):\n"
f"日期: {result[1]}\n"
f"時間: {result[2]}\n"
f"LED/RFID: {result[3]}\n"
f"備註: {result[4]}")
elif success:
msg = f"未找到 ID {log_id} 的記錄。"
else:
msg = f"查詢失敗: {result}"
messagebox.showinfo("查詢結果", msg)
self._update_log_display(msg)
def _handle_db_show(self):
"""從資料庫讀取最新記錄並顯示在 Text Widget (調整格式以顯示 5 欄位)"""
success, results = self.db.get_latest_logs()
if success:
# 調整標題欄位寬度以適應 5 欄位
# 欄位: ID | 日期 | 時間 | LED/RFID | 備註
# 注意: 您的截圖中只有 ID, 日期, 時間, 備註,這裡保持 5 欄位結構
header = f"{'ID':<4} | {'日期':<10} | {'時間':<8} | {'LED/RFID':<15} | 備註\n"
separator = "-"*75 + "\n"
output = header + separator
# 調整資料欄位寬度
for row in results:
# 限制 memo 長度
memo_display = row[4][:30]
output += (
f"{str(row[0]):<4} | " # ID
f"{row[1]:<10} | " # 日期
f"{row[2]:<8} | " # 時間
f"{row[3]:<15} | " # LEDorRFID
f"{memo_display}\n" # 備註
)
self._update_log_display(output)
else:
self._update_log_display(f"顯示資料失敗: {results}")
def show_logs_in_telegram(self):
"""將模式和最新記錄發送到 Telegram (/status 命令)"""
mode_info = f"當前模式: {self.mode_var.get()}\n"
success, results = self.db.get_latest_logs()
if success and results:
# 僅顯示最新的 5 筆記錄,顯示 LEDorRFID 和 memo
log_list = [f"ID {row[0]} ({row[3]}): {row[4][:30]}" for row in results[:5]]
log_info = "--- 最新 5 筆記錄 ---\n" + "\n".join(log_list)
elif success:
log_info = "資料庫中沒有記錄。"
else:
log_info = f"資料庫查詢失敗: {results}"
self.telegram_bot.send_message(f"{mode_info}\n{log_info}")
def _toggle_mode(self):
current_mode = self.mode_var.get()
new_mode = MODE_COMPARE if current_mode == MODE_ADD else MODE_ADD
self.set_mode(new_mode)
def set_mode(self, new_mode):
self.mode_var.set(new_mode)
self.status_var.set(f"模式切換為: {new_mode}")
# --- MQTT 相關 ---
def _mqtt_loop_start(self):
"""在單獨的執行緒中啟動 MQTT 客戶端迴圈"""
try:
self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
self.client.loop_forever()
except Exception as error:
error_message = str(error)
print(f"MQTT 連線錯誤: {error_message}")
self.master.after(0, self._show_mqtt_error_gui, error_message)
# *** 修正點 2/2: 將 _on_connect 簽名更新為 V2 格式 (6 參數) ***
def _on_connect(self, client, userdata, flags, reasonCode, properties):
if reasonCode == 0: # reasonCode 是 V2 中取代 rc 的連線結果
client.subscribe(TOPIC_RFID_UID)
client.subscribe(TOPIC_LED_STATUS)
self.master.after(0, self.status_var.set, f"MQTT 連線成功 | {self.mode_var.get()}")
self.master.after(0, self._handle_db_show)
else:
# 使用 reasonCode 檢查連線失敗原因
self.master.after(0, self.status_var.set, f"MQTT 連線失敗, 碼 {reasonCode}")
def _on_message(self, client, userdata, msg):
payload = msg.payload.decode()
if msg.topic == TOPIC_LED_STATUS:
self._handle_led_status_update(payload)
return
if msg.topic == TOPIC_RFID_UID:
self._handle_rfid_uid(payload)
def _handle_rfid_uid(self, uid):
"""7) 處理 RFID 卡號 & 8) 比對模式邏輯"""
mode = self.mode_var.get()
if mode == MODE_ADD:
# 7) 新增模式: 寫入資料庫
led_or_rfid_value = uid
memo_value = f"[新增] 卡號號碼: {uid}"
self.master.after(0, self.db.add_log, led_or_rfid_value, memo_value)
self.master.after(0, self.status_var.set, memo_value)
self.master.after(0, self._handle_db_show)
self.telegram_bot.send_message(memo_value)
elif mode == MODE_COMPARE:
# 8) 比對模式: 與資料庫的 LEDorRFID 欄位中所有 UID 碼比對
success, authorized_uids = self.db.get_all_rfid_uids()
if success:
if uid in authorized_uids:
memo = "[比對] 卡號正確"
# 比對成功時,Telegram 訊息包含 UID 碼
telegram_msg = f"{memo} ({uid})"
else:
memo = "[比對] 卡號錯誤"
# 比對失敗時,Telegram 訊息不包含 UID 碼
telegram_msg = memo
else:
memo = f"[比對] 錯誤: 無法讀取資料庫 ({authorized_uids})"
telegram_msg = memo
# GUI/Telegram 操作
self.master.after(0, self.status_var.set, f"{memo} (UID: {uid})")
self.telegram_bot.send_message(telegram_msg)
def _handle_led_status_update(self, status):
"""6) 處理來自 ESP32 的 LED 狀態回傳 (修正欄位值)"""
# 6) 欄位修正
status_upper = status.upper()
led_or_rfid_value = status_upper
memo_value = f"LED Status: {status_upper}"
telegram_msg = f"裝置確認 LED 狀態: {status_upper}"
# 寫入資料庫, 刷新 GUI, 送給 Telegram
self.master.after(0, self.db.add_log, led_or_rfid_value, memo_value)
self.master.after(0, self._handle_db_show)
self.telegram_bot.send_message(telegram_msg)
# --- Telegram 命令處理 ---
def _send_initial_help(self):
self.telegram_bot.send_message(self.telegram_bot._get_help_message())
def handle_telegram_command(self, command):
command = command.lower()
if command in ["on", "off", "flash", "timer"]:
self._set_led_indicator(command)
self.client.publish(TOPIC_LED_CONTROL, command, qos=0)
self.status_var.set(f"Telegram 命令: {command.upper()}")
self.telegram_bot.send_message(f"已發送命令 /{command} 到裝置。等待裝置確認...")
else:
self.telegram_bot.send_message(f"無效命令: /{command}")
# --- LED GUI 邏輯 ---
def _cancel_timers(self):
if self.flash_id:
self.master.after_cancel(self.flash_id)
self.flash_id = None
if self.timer_id:
self.master.after_cancel(self.timer_id)
self.timer_id = None
self.timer_end_time = None
def _set_led_indicator(self, state):
self._cancel_timers()
if state == "on":
self.canvas.itemconfig(self.led_indicator, fill="green")
elif state == "off":
self.canvas.itemconfig(self.led_indicator, fill="red")
elif state == "flash":
self._flash_toggle(True)
elif state == "timer":
self.timer_end_time = time.time() + 10
self.canvas.itemconfig(self.led_indicator, fill="green")
self._timer_countdown()
else:
self.canvas.itemconfig(self.led_indicator, fill="gray")
def _flash_toggle(self, is_green):
color = "green" if is_green else "red"
self.canvas.itemconfig(self.led_indicator, fill=color)
self.flash_id = self.master.after(500, self._flash_toggle, not is_green)
def _timer_countdown(self):
if self.timer_end_time is None:
return
if time.time() >= self.timer_end_time:
self.canvas.itemconfig(self.led_indicator, fill="red")
self.timer_id = None
return
self.timer_id = self.master.after(100, self._timer_countdown)
# --- 主程式 ---
if __name__ == '__main__':
if sys.platform == "win32":
try:
# 解決 Windows 上的 asyncio RuntimeError: Event loop is closed
if threading.current_thread() is threading.main_thread():
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
except AttributeError:
pass
root = tk.Tk()
app = RfidControlApp(root)
root.mainloop()
作業3/4 Arduino ESP32+MFRC522V2 + LED程式
/***
ESP32 (WROOM-32) MFRC522 連接腳位參考:
Signal MFRC522 WROOM-32
RST/Reset RST 21
SPI SS SDA 5
SPI MOSI MOSI 23
SPI MISO MISO 19
SPI SCK SCK 18
注意: LED_PIN 13 假設為 LOW active (低電位點亮)。
這個程式碼使用 FreeRTOS 雙核心任務來分離 MQTT 通訊 (Core 0) 和 RFID/LED 控制 (Core 1)。
使用 Mutex (互斥鎖) 保護 Core 0 和 Core 1 之間的共享變數 (UID 和 LED 狀態)。
***/
// 引入 FreeRTOS 函式庫
#include <freertos/FreeRTOS.h>
#include <freertos/task.h>
#include <freertos/semphr.h>
#include <freertos/queue.h>
// Wifi 與 PubSubClient 程式庫
#include <PubSubClient.h>
#include <WiFi.h>
// Task Watchdog Timer 相關,用於禁用 TWDT
#include "esp_task_wdt.h"
// --- MFRC522v2 程式庫 ---
#include <MFRC522v2.h>
#include <MFRC522DriverSPI.h>
#include <MFRC522DriverPinSimple.h>
// ---------------------------------------------
// --- Wi-Fi 和 MQTT 設定 ---
const char* ssid = "alex9ufo";
const char* password = "alex9981";
const char* mqttBroker = "broker.mqtt-dashboard.com";
int mqttPort = 1883;
WiFiClient wifiClient;
PubSubClient mqttClient(wifiClient);
// --- MQTT 主題設定 ---
const char* topic_UID_publish = "alex9ufo/rfid/UID"; // 發佈 RFID UID
const char* topic_LED_subscribe = "alex9ufo/rfid/led"; // 訂閱 LED 控制命令
const char* topic_LED_status_publish = "alex9ufo/rfid/ledStatus"; // 發佈 LED 狀態
// --- 硬體腳位設定 ---
#define RST_PIN 21
#define SS_PIN 5
#define LED_PIN 13 // LED 控制 GPIO 腳位
MFRC522DriverPinSimple ss_pin_v2(SS_PIN);
MFRC522DriverSPI driver{ss_pin_v2};
MFRC522 mfrc522{driver};
// --- 雙核心通訊變數 (使用 FreeRTOS) ---
String new_uid_to_publish = ""; // Core 1 寫入,Core 0 讀取並發佈
SemaphoreHandle_t xUidMutex; // 保護 new_uid_to_publish
QueueHandle_t xLedQueue; // Core 0 (MQTT) 傳給 Core 1 (LED 控制) 的命令
String current_led_status = "OFF"; // Core 1 寫入狀態,Core 0 讀取並發佈
SemaphoreHandle_t xStatusMutex; // 保護 current_led_status
// --- MFRC522 狀態變數 (Core 1 使用) ---
byte last_UID[10] = {0};
byte last_UID_Size = 0;
unsigned long last_publish_time = 0;
const unsigned long REPEAT_DELAY_MS = 5000; // 重複發佈延遲 (5秒)
// --- 函數宣告 ---
void connectWifi();
void connectMqtt();
void onMqttMessage(char* topic, byte* payload, unsigned int length);
void ledMqttTask(void *pvParameters);
void rfidLedControlTask(void *pvParameters);
String uidToHexString(byte *buffer, byte bufferSize);
// ---------------------------------------------------------------------------------
// ------------------------------ CORE 0: MQTT/控制中心任務 -------------------------
// ---------------------------------------------------------------------------------
/**
* @brief MQTT 訊息處理回呼函式 (在 Core 0 執行)
* 負責將接收到的 LED 命令放入佇列 (Queue) 傳遞給 Core 1。
*/
void onMqttMessage(char* topic, byte* payload, unsigned int length) {
String topicStr(topic);
if (topicStr == topic_LED_subscribe) {
String command = "";
for (int i = 0; i < length; i++) {
command += (char)payload[i];
}
command.toUpperCase();
Serial.print("Core 0: Received LED command: ");
Serial.println(command);
// 將命令傳送到 Core 1 的佇列
// 使用 xQueueSend 確保在 Core 1 忙碌時不會丟失命令
if (xQueueSend(xLedQueue, &command, (TickType_t)10) != pdPASS) {
Serial.println("Core 0: Failed to send LED command to Core 1.");
}
}
}
/**
* @brief MQTT 發佈和連線維持任務 (在 Core 0 執行)
* 負責 Wi-Fi/MQTT 連線、UID 發佈、狀態發佈。
*/
void ledMqttTask(void *pvParameters) {
connectWifi();
vTaskDelay(pdMS_TO_TICKS(1000));
mqttClient.setServer(mqttBroker, mqttPort);
mqttClient.setCallback(onMqttMessage);
connectMqtt();
mqttClient.subscribe(topic_LED_subscribe);
Serial.print("Core 0: Subscribed to ");
Serial.println(topic_LED_subscribe);
Serial.println("Core 0: MQTT UID Publish & LED Command Task Running.");
while (1) {
// 維持 Wi-Fi 和 MQTT 連線
if (WiFi.status() != WL_CONNECTED) {
connectWifi();
}
if (!mqttClient.connected()) {
connectMqtt();
mqttClient.subscribe(topic_LED_subscribe);
}
// 處理 MQTT 接收和心跳
mqttClient.loop();
// 1. 處理 MFRC522 UID 發佈 (從 Core 1 接收)
// 嘗試取得 UID 互斥鎖 (等待 10 ticks)
if (xSemaphoreTake(xUidMutex, (TickType_t)10) == pdTRUE) {
// 檢查是否有新的 UID 需要發佈
if (new_uid_to_publish.length() > 0) {
char uid_payload[new_uid_to_publish.length() + 1];
new_uid_to_publish.toCharArray(uid_payload, sizeof(uid_payload));
if (mqttClient.publish(topic_UID_publish, uid_payload)) {
Serial.print("Core 0: Published UID: ");
Serial.println(new_uid_to_publish);
new_uid_to_publish = ""; // 清空,等待下一筆資料
} else {
Serial.println("Core 0: Failed to publish UID.");
}
}
// *** 修正: 確保無論發佈與否,都必須釋放鎖,防止死鎖 ***
xSemaphoreGive(xUidMutex);
}
// 2. 處理 LED 狀態發佈 (從 Core 1 接收並發佈)
// 嘗試取得狀態互斥鎖 (等待 10 ticks)
if (xSemaphoreTake(xStatusMutex, (TickType_t)10) == pdTRUE) {
// 檢查狀態是否已更新但尚未發佈 ("SENT" 表示已發佈)
if (current_led_status != "SENT") {
char status_payload[current_led_status.length() + 1];
current_led_status.toCharArray(status_payload, sizeof(status_payload));
if (mqttClient.publish(topic_LED_status_publish, status_payload)) {
Serial.print("Core 0: Published LED Status: ");
Serial.println(current_led_status);
current_led_status = "SENT"; // 標記為已發佈
} else {
Serial.println("Core 0: Failed to publish LED Status.");
}
}
// *** 修正: 確保無論發佈與否,都必須釋放鎖,防止死鎖 ***
xSemaphoreGive(xStatusMutex);
}
vTaskDelay(pdMS_TO_TICKS(50)); // 短暫延遲,讓 Core 1 有執行時間
}
}
/**
* @brief 連接 Wi-Fi
*/
void connectWifi() {
Serial.print("Core 0: Connecting to Wi-Fi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
vTaskDelay(pdMS_TO_TICKS(500));
Serial.print(".");
}
Serial.println("\nCore 0: Wi-Fi connected.");
Serial.print("IP Address: ");
Serial.println(WiFi.localIP());
}
/**
* @brief 連接 MQTT Broker
*/
void connectMqtt() {
Serial.print("Core 0: Connecting to MQTT Broker: ");
Serial.println(mqttBroker);
// 使用固定 Client ID
String clientId = "ESP32_MFRC522_LED_Client";
if (!mqttClient.connect(clientId.c_str())) {
Serial.print("Core 0: MQTT connection failed! PubSubClient state: ");
Serial.println(mqttClient.state());
vTaskDelay(pdMS_TO_TICKS(4000));
return;
}
Serial.println("Core 0: MQTT connected.");
}
// ---------------------------------------------------------------------------------
// ------------------------------ CORE 1: RFID/LED 控制任務 -----------------------
// ---------------------------------------------------------------------------------
/**
* @brief RFID 讀取和 LED 狀態控制任務 (在 Core 1 執行)
* 負責初始化 MFRC522、讀取 RFID 卡、處理 LED 命令、更新共享 UID 變數。
*/
void rfidLedControlTask(void *pvParameters) {
pinMode(LED_PIN, OUTPUT);
digitalWrite(LED_PIN, HIGH); // 初始化為 OFF (LOW active)
vTaskDelay(pdMS_TO_TICKS(500));
mfrc522.PCD_Init();
Serial.println("Core 1: RFID/LED Control Task Running.");
String led_command;
unsigned long flash_timer = 0;
bool led_state = false;
while (1) {
// 1. 處理來自 Core 0 的 LED 命令 (Queue)
if (xQueueReceive(xLedQueue, &led_command, (TickType_t)0) == pdPASS) {
flash_timer = 0; // 收到新命令時,停止任何閃爍或定時器
if (led_command == "ON") {
digitalWrite(LED_PIN,LOW); // 點亮
} else if (led_command == "OFF") {
digitalWrite(LED_PIN, HIGH); // 熄滅
} else if (led_command == "FLASH") {
flash_timer = millis();
} else if (led_command == "TIMER") {
flash_timer = millis() + 10000; // 10 秒定時
digitalWrite(LED_PIN,LOW);
}
// 更新狀態變數,通知 Core 0 發佈新的 LED 狀態
if (xSemaphoreTake(xStatusMutex, (TickType_t)10) == pdTRUE) {
current_led_status = led_command;
xSemaphoreGive(xStatusMutex);
}
}
// 2. 處理 LED 邏輯 (閃爍/定時)
if (flash_timer > 0) {
if (led_command == "FLASH") {
// 每 500ms 閃爍一次
if (millis() - flash_timer >= 500) {
led_state = !led_state;
digitalWrite(LED_PIN, led_state ? LOW : HIGH);
flash_timer = millis();
}
} else if (led_command == "TIMER") {
// 10 秒定時結束
if (millis() >= flash_timer) {
digitalWrite(LED_PIN, HIGH); // 熄滅 LED
flash_timer = 0;
led_command = "OFF";
// 定時結束後,將狀態更新為 OFF
if (xSemaphoreTake(xStatusMutex, (TickType_t)10) == pdTRUE) {
current_led_status = "OFF";
xSemaphoreGive(xStatusMutex);
}
}
}
}
// 3. 處理 RFID 讀取
if (mfrc522.PICC_IsNewCardPresent() && mfrc522.PICC_ReadCardSerial()) {
byte* currentUID = mfrc522.uid.uidByte;
byte currentUIDSize = mfrc522.uid.size;
String hexUID = uidToHexString(currentUID, currentUIDSize);
// 檢查是否為同一張卡片
bool is_same_card = (currentUIDSize == last_UID_Size) && (memcmp(currentUID, last_UID, currentUIDSize) == 0);
bool should_publish = false;
if (!is_same_card) {
Serial.println("Core 1: New card detected. Publishing immediately.");
should_publish = true;
} else if (millis() - last_publish_time >= REPEAT_DELAY_MS) {
// 檢查是否超過重複發佈延遲
Serial.println("Core 1: Same card detected after 5s. Publishing.");
should_publish = true;
}
if (should_publish) {
// 嘗試取得 UID 互斥鎖 (等待 10 ticks)
if (xSemaphoreTake(xUidMutex, (TickType_t)10) == pdTRUE) {
new_uid_to_publish = hexUID; // 更新共享變數
xSemaphoreGive(xUidMutex);
} else {
// 如果 Core 0 鎖住,這裡會失敗
Serial.println("Core 1: FAILED to take UID mutex. Core 0 might be delayed.");
}
last_publish_time = millis();
memcpy(last_UID, currentUID, currentUIDSize);
last_UID_Size = currentUIDSize;
}
mfrc522.PICC_HaltA(); // 停止卡片通訊
}
vTaskDelay(pdMS_TO_TICKS(10)); // 任務延遲
}
}
/**
* @brief 將 UID 轉換為十六進位字串 (例如 04C926442A2780)
*/
String uidToHexString(byte *buffer, byte bufferSize) {
String output = "";
for (byte i = 0; i < bufferSize; i++) {
if (buffer[i] < 0x10) {
output += "0";
}
output += String(buffer[i], HEX);
}
output.toUpperCase();
return output;
}
// ---------------------------------------------------------------------------------
// ------------------------------------ SETUP & LOOP -------------------------------
// ---------------------------------------------------------------------------------
void setup() {
Serial.begin(115200);
// *** 禁用 Task Watchdog Timer (TWDT) ***
// 解決長時間任務可能被重啟的問題
esp_task_wdt_deinit();
Serial.println("Task Watchdog Timer (TWDT) completely disabled.");
SPI.begin();
// 初始化 Mutex 和 Queue
xUidMutex = xSemaphoreCreateMutex();
xStatusMutex = xSemaphoreCreateMutex();
xLedQueue = xQueueCreate(5, sizeof(String));
if (xUidMutex == NULL || xStatusMutex == NULL || xLedQueue == NULL) {
Serial.println("FreeRTOS resource creation failed!");
while (1); // 停止運行
}
// 啟動 Core 0 上的 MQTT 任務 (優先級 1)
xTaskCreatePinnedToCore(
ledMqttTask,
"MQTT_PUB_Task",
10000,
NULL,
1,
NULL,
0 // 核心 ID (Core 0)
);
// 啟動 Core 1 上的 RFID/LED 控制任務 (優先級 1)
xTaskCreatePinnedToCore(
rfidLedControlTask,
"RFID_LED_Task",
12288,
NULL,
1,
NULL,
1 // 核心 ID (Core 1)
);
}
void loop() {
// 讓 FreeRTOS 完全掌控,loop() 僅用於延遲
vTaskDelay(1);
}







沒有留言:
張貼留言