WOKWI ESP32 模擬RFID ,LED + Python TKinter SQLite
WOKWI ESP32程式
#include <SPI.h>
#include <MFRC522.h>
#include <WiFi.h>
#include <PubSubClient.h>
#include <Wire.h>
#include <LiquidCrystal_I2C.h>
// --- 硬體腳位 (您的指定) ---
#define SS_PIN 5
#define RST_PIN 22
#define LED_PIN 2
#define I2C_SDA 17
#define I2C_SCL 16
// --- 設定區 ---
const char* ssid = "Wokwi-GUEST";
const char* password = "";
// MQTT 設定 (與您的 Python Tkinter 程式對接)
const char* mqtt_server = "broker.mqtt-dashboard.com";
const char* TOPIC_RFID_UID = "alex9ufo/rfid/UID";
const char* TOPIC_LED_CONTROL = "alex9ufo/led/control";
const char* TOPIC_LED_STATUS = "alex9ufo/led/status";
// 硬體物件
LiquidCrystal_I2C lcd(0x27, 16, 2);
MFRC522 mfrc522(SS_PIN, RST_PIN);
WiFiClient espClient;
PubSubClient mqttClient(espClient);
// --- FreeRTOS 隊列 ---
QueueHandle_t rfidQueue;
struct RfidMsg { char uid[20]; };
// 全域變數
bool isFlashing = false;
// --- MQTT 接收處理 (來自 Python 控制台) ---
void mqttCallback(char* topic, byte* payload, unsigned int length) {
String message = "";
for (int i = 0; i < length; i++) message += (char)payload[i];
Serial.printf("\n[MQTT CMD]: %s\n", message.c_str());
lcd.clear();
lcd.setCursor(0, 0);
lcd.print("MQTT Command:");
lcd.setCursor(0, 1);
isFlashing = false;
if (message == "on") {
digitalWrite(LED_PIN, HIGH);
lcd.print("LED: ON");
mqttClient.publish(TOPIC_LED_STATUS, "ON");
}
else if (message == "off") {
digitalWrite(LED_PIN, LOW);
lcd.print("LED: OFF");
mqttClient.publish(TOPIC_LED_STATUS, "OFF");
}
else if (message == "flash") {
isFlashing = true;
lcd.print("MODE: FLASHING");
mqttClient.publish(TOPIC_LED_STATUS, "FLASHING");
}
else if (message == "timer") {
digitalWrite(LED_PIN, HIGH);
lcd.print("TIMER: 5 SEC");
mqttClient.publish(TOPIC_LED_STATUS, "TIMER_START");
vTaskDelay(5000 / portTICK_PERIOD_MS); // 在 Task 中使用 vTaskDelay 不會卡死整個系統
digitalWrite(LED_PIN, LOW);
mqttClient.publish(TOPIC_LED_STATUS, "OFF");
}
}
// --- Core 0: 負責 WiFi 與 MQTT 通訊 ---
void mqttTask(void *pvParameters) {
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) { vTaskDelay(500 / portTICK_PERIOD_MS); }
mqttClient.setServer(mqtt_server, 1883);
mqttClient.setCallback(mqttCallback);
RfidMsg rMsg;
while (true) {
// 維護 MQTT 連線
if (!mqttClient.connected()) {
Serial.print("Connecting to MQTT...");
if (mqttClient.connect("ESP32_RFID_Gate_NoTG")) {
Serial.println("Connected");
mqttClient.subscribe(TOPIC_LED_CONTROL);
} else {
vTaskDelay(5000 / portTICK_PERIOD_MS);
}
}
mqttClient.loop();
// 接收來自 Core 1 的 RFID 訊息並發送到 MQTT
if (xQueueReceive(rfidQueue, &rMsg, 0) == pdPASS) {
mqttClient.publish(TOPIC_RFID_UID, rMsg.uid);
Serial.printf("Sent UID to Python: %s\n", rMsg.uid);
}
// 處理閃爍邏輯
if (isFlashing) {
digitalWrite(LED_PIN, !digitalRead(LED_PIN));
vTaskDelay(300 / portTICK_PERIOD_MS);
}
vTaskDelay(10 / portTICK_PERIOD_MS);
}
}
// --- Core 1: 專門負責 RFID 掃描 (不處理網路) ---
void rfidTask(void *pvParameters) {
SPI.begin();
mfrc522.PCD_Init();
RfidMsg rMsg;
while (true) {
if (mfrc522.PICC_IsNewCardPresent() && mfrc522.PICC_ReadCardSerial()) {
String uidStr = "";
for (byte i = 0; i < mfrc522.uid.size; i++) {
uidStr += (mfrc522.uid.uidByte[i] < 0x10 ? "0" : "");
uidStr += String(mfrc522.uid.uidByte[i], HEX);
}
uidStr.toUpperCase();
// 更新 LCD 顯示
lcd.clear();
lcd.setCursor(0, 0); lcd.print("RFID Detected!");
lcd.setCursor(0, 1); lcd.print("ID: " + uidStr);
// 將卡號打包送入隊列,交給 Core 0 發送
uidStr.toCharArray(rMsg.uid, 20);
xQueueSend(rfidQueue, &rMsg, portMAX_DELAY);
mfrc522.PICC_HaltA();
mfrc522.PCD_StopCrypto1();
}
vTaskDelay(200 / portTICK_PERIOD_MS);
}
}
void setup() {
Serial.begin(115200);
pinMode(LED_PIN, OUTPUT);
// 初始化 I2C LCD
Wire.begin(I2C_SDA, I2C_SCL);
lcd.init();
lcd.backlight();
lcd.print("MQTT Connecting...");
// 建立隊列
rfidQueue = xQueueCreate(10, sizeof(RfidMsg));
if (rfidQueue != NULL) {
// 建立雙核心任務
xTaskCreatePinnedToCore(mqttTask, "MQTT_Task", 8192, NULL, 1, NULL, 0);
xTaskCreatePinnedToCore(rfidTask, "RFID_Task", 4096, NULL, 1, NULL, 1);
}
}
void loop() {
// FreeRTOS 架構下 loop 不需執行內容
vTaskDelay(portMAX_DELAY);
}
Python + TKinter 程式
import tkinter as tk
from tkinter import messagebox, ttk
import paho.mqtt.client as mqtt
import sqlite3
import os
from datetime import datetime
import winsound
# --- 系統參數設定 ---
MQTT_BROKER = "broker.mqtt-dashboard.com"
TOPIC_CONTROL = "alex9ufo/led/control"
TOPIC_STATUS = "alex9ufo/led/status"
TOPIC_RFID = "alex9ufo/rfid/UID"
DB_NAME = "rfid2026.db"
class RFIDSystemApp:
def __init__(self, root):
self.root = root
self.root.title("WOWKI ESP32 RFID 系統軟體")
# 1. 初始化變數與帳密載入
self.admin_user = "Admin"
self.admin_pass = "123456"
self.flash_job = None
self.mode = tk.StringVar(value="ADD")
self.mqtt_connected = False
self.mqtt_canvas = None
self.tree = None
self.init_db_and_load_auth()
self.setup_mqtt()
self.login_ui()
def init_db_and_load_auth(self):
"""初始化資料庫並從中讀取帳號密碼"""
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS rfid_history
(id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT, time TEXT, uid TEXT, note TEXT)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS system_config
(key TEXT PRIMARY KEY, value TEXT)''')
# 讀取帳號
cursor.execute("SELECT value FROM system_config WHERE key='user'")
res_u = cursor.fetchone()
if res_u: self.admin_user = res_u[0]
else: cursor.execute("INSERT INTO system_config VALUES ('user', ?)", (self.admin_user,))
# 讀取密碼
cursor.execute("SELECT value FROM system_config WHERE key='pass'")
res_p = cursor.fetchone()
if res_p: self.admin_pass = res_p[0]
else: cursor.execute("INSERT INTO system_config VALUES ('pass', ?)", (self.admin_pass,))
conn.commit()
conn.close()
def setup_mqtt(self):
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
try:
self.client.connect_async(MQTT_BROKER, 1883, 60)
self.client.loop_start()
except Exception as e:
print(f"MQTT 初始化失敗: {e}")
def on_connect(self, client, userdata, flags, rc, properties=None):
self.client.subscribe([(TOPIC_STATUS, 0), (TOPIC_RFID, 0)])
self.mqtt_connected = True
self.root.after(0, self.update_mqtt_status_ui)
def on_disconnect(self, client, userdata, rc, properties=None):
self.mqtt_connected = False
self.root.after(0, self.update_mqtt_status_ui)
def update_mqtt_status_ui(self):
if self.mqtt_canvas:
color = "green" if self.mqtt_connected else "red"
text = f"MQTT 狀態: {'已連線' if self.mqtt_connected else '斷開'}"
self.mqtt_canvas.itemconfig(self.mqtt_light, fill=color)
self.lbl_mqtt_info.config(text=text, fg=color)
def on_message(self, client, userdata, msg):
payload = msg.payload.decode().strip().upper()
if msg.topic == TOPIC_STATUS:
self.root.after(0, lambda: self.update_led_display(payload))
elif msg.topic == TOPIC_RFID:
self.root.after(0, lambda: self.process_rfid(payload))
def process_rfid(self, uid):
if self.mode.get() == "ADD":
self.save_rfid_data(uid, "新增")
self.lbl_result.config(text=f"已新增卡號: {uid}", fg="black", font=("微軟正黑體", 16))
else:
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cursor.execute("SELECT * FROM rfid_history WHERE uid = ? AND note = '新增'", (uid,))
found = cursor.fetchone()
conn.close()
if found:
self.lbl_result.config(text="比對相同", fg="green", font=("微軟正黑體", 32, "bold"))
self.save_rfid_data(uid, "比對相同")
else:
self.lbl_result.config(text="比對錯誤", fg="red", font=("微軟正黑體", 32, "bold"))
self.root.after(100, lambda: winsound.Beep(880, 3000))
self.save_rfid_data(uid, "比對錯誤")
def save_rfid_data(self, uid, note):
now = datetime.now()
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cursor.execute("INSERT INTO rfid_history (date, time, uid, note) VALUES (?, ?, ?, ?)",
(now.strftime("%Y/%m/%d"), now.strftime("%H:%M:%S"), uid, note))
conn.commit()
conn.close()
if self.tree: self.update_table()
def main_ui(self):
self.clear_window()
# 狀態列
m_frame = tk.Frame(self.root, relief="groove", borderwidth=2); m_frame.pack(fill="x", padx=10, pady=5)
self.mqtt_canvas = tk.Canvas(m_frame, width=20, height=20); self.mqtt_canvas.pack(side="left", padx=5)
self.mqtt_light = self.mqtt_canvas.create_oval(2, 2, 18, 18, fill="gray")
self.lbl_mqtt_info = tk.Label(m_frame, text="MQTT 狀態: 檢查中..."); self.lbl_mqtt_info.pack(side="left")
self.update_mqtt_status_ui()
# 管理按鈕區
db_f = tk.Frame(self.root); db_f.pack(pady=5)
tk.Button(db_f, text="建立資料庫", command=self.init_db_with_msg).pack(side="left", padx=5)
tk.Button(db_f, text="刪除選定", bg="#ffcccc", command=self.delete_selected).pack(side="left", padx=5)
tk.Button(db_f, text="清空資料庫", bg="orange", command=self.clear_all_db).pack(side="left", padx=5)
# --- 新增結束按鈕 ---
tk.Button(db_f, text="結束程式", bg="red", fg="white", command=self.exit_app).pack(side="left", padx=5)
# 模式與大字顯示
mode_f = tk.LabelFrame(self.root, text="RFID 工作模式"); mode_f.pack(fill="x", padx=10, pady=5)
tk.Radiobutton(mode_f, text="新增模式", variable=self.mode, value="ADD").pack(side="left", padx=20)
tk.Radiobutton(mode_f, text="比對模式", variable=self.mode, value="CHECK").pack(side="left", padx=20)
self.lbl_result = tk.Label(self.root, text="等待感應...", font=("微軟正黑體", 16)); self.lbl_result.pack(pady=10)
# LED 控制
led_f = tk.Frame(self.root); led_f.pack()
self.canvas = tk.Canvas(led_f, width=40, height=40); self.canvas.pack(side="left")
self.led_circle = self.canvas.create_oval(5, 5, 35, 35, fill="red")
self.lbl_led_status = tk.Label(led_f, text="LED: OFF"); self.lbl_led_status.pack(side="left", padx=10)
btn_f = tk.Frame(self.root); btn_f.pack(pady=5)
for t, c in [("ON", "on"), ("OFF", "off"), ("FLASH", "flash"), ("TIMER(5S)", "timer")]:
tk.Button(btn_f, text=t, width=10, command=lambda cmd=c: self.client.publish(TOPIC_CONTROL, cmd)).pack(side="left", padx=2)
# 資料表
self.tree = ttk.Treeview(self.root, columns=("ID", "日期", "時間", "卡號", "備註"), show="headings")
for col in ("ID", "日期", "時間", "卡號", "備註"):
self.tree.heading(col, text=col)
self.tree.column(col, width=95, anchor="center")
self.tree.pack(padx=10, pady=10)
self.update_table()
def exit_app(self):
"""安全關閉連線並退出"""
if messagebox.askyesno("結束", "確定要結束程式嗎?"):
self.client.loop_stop()
self.client.disconnect()
self.root.destroy()
# --- 其他輔助功能 ---
def update_table(self):
if not self.tree: return
for i in self.tree.get_children(): self.tree.delete(i)
conn = sqlite3.connect(DB_NAME); cursor = conn.cursor()
cursor.execute("SELECT * FROM rfid_history ORDER BY id DESC")
for row in cursor.fetchall(): self.tree.insert("", "end", values=row)
conn.close()
def init_db_with_msg(self):
self.init_db_and_load_auth()
messagebox.showinfo("資料庫", "資料庫結構已重整")
self.update_table()
def delete_selected(self):
selected = self.tree.selection()
if not selected: return
db_id = self.tree.item(selected)['values'][0]
conn = sqlite3.connect(DB_NAME); cursor = conn.cursor()
cursor.execute("DELETE FROM rfid_history WHERE id = ?", (db_id,))
conn.commit(); conn.close()
self.update_table()
def clear_all_db(self):
if messagebox.askyesno("警告", "清空所有歷史資料?"):
conn = sqlite3.connect(DB_NAME); cursor = conn.cursor()
cursor.execute("DELETE FROM rfid_history")
cursor.execute("DELETE FROM sqlite_sequence WHERE name='rfid_history'")
conn.commit(); conn.close()
self.update_table()
def update_led_display(self, status):
if self.flash_job: self.root.after_cancel(self.flash_job); self.flash_job = None
if status == "ON": self.canvas.itemconfig(self.led_circle, fill="green")
elif status == "OFF": self.canvas.itemconfig(self.led_circle, fill="red")
elif "FLASH" in status: self.run_flash_logic()
elif "TIMER" in status:
self.canvas.itemconfig(self.led_circle, fill="green")
self.root.after(5000, lambda: self.canvas.itemconfig(self.led_circle, fill="red"))
self.lbl_led_status.config(text=f"ESP32 LED 狀態: {status}")
def run_flash_logic(self):
self.flash_state = getattr(self, 'flash_state', False)
self.flash_state = not self.flash_state
color = "green" if self.flash_state else "red"
self.canvas.itemconfig(self.led_circle, fill=color)
self.flash_job = self.root.after(500, self.run_flash_logic)
def login_ui(self):
self.clear_window(); f = tk.Frame(self.root); f.pack(pady=20)
tk.Label(f, text="登入帳號").grid(row=0, column=0)
self.e_u = tk.Entry(f); self.e_u.grid(row=0, column=1)
tk.Label(f, text="登入密碼").grid(row=1, column=0)
self.e_p = tk.Entry(f, show="*"); self.e_p.grid(row=1, column=1)
tk.Button(f, text="登入", command=self.handle_login).grid(row=2, columnspan=2)
def handle_login(self):
if self.e_u.get() == self.admin_user and self.e_p.get() == self.admin_pass:
if messagebox.askyesno("詢問", "是否變更帳號或密碼?"): self.change_auth_ui()
else: self.main_ui()
else: messagebox.showerror("錯誤", "帳號或密碼不正確")
def change_auth_ui(self):
self.clear_window()
tk.Label(self.root, text="變更帳號").pack(); self.n_u = tk.Entry(self.root); self.n_u.pack()
tk.Label(self.root, text="變更密碼").pack(); self.n_p = tk.Entry(self.root); self.n_p.pack()
tk.Button(self.root, text="存檔並登入", command=self.save_new_auth).pack()
def save_new_auth(self):
new_u, new_p = self.n_u.get(), self.n_p.get()
conn = sqlite3.connect(DB_NAME); cursor = conn.cursor()
cursor.execute("UPDATE system_config SET value=? WHERE key='user'", (new_u,))
cursor.execute("UPDATE system_config SET value=? WHERE key='pass'", (new_p,))
conn.commit(); conn.close()
self.admin_user, self.admin_pass = new_u, new_p
messagebox.showinfo("成功", "帳密已儲存")
self.main_ui()
def clear_window(self):
for w in self.root.winfo_children(): w.destroy()
if __name__ == "__main__":
root = tk.Tk()
app = RFIDSystemApp(root)
root.mainloop()






沒有留言:
張貼留言