2026年1月7日 星期三

WOKWI ESP32 模擬RFID ,LED + Python TKinter SQLite

 WOKWI ESP32 模擬RFID ,LED + Python TKinter SQLite 









WOKWI ESP32程式

#include <SPI.h>
#include <MFRC522.h>
#include <WiFi.h>
#include <PubSubClient.h>
#include <Wire.h>
#include <LiquidCrystal_I2C.h>

// --- 硬體腳位 (您的指定) ---
#define SS_PIN    5
#define RST_PIN   22
#define LED_PIN   2
#define I2C_SDA   17
#define I2C_SCL   16

// --- 設定區 ---
const char* ssid = "Wokwi-GUEST";
const char* password = "";

// MQTT 設定 (與您的 Python Tkinter 程式對接)
const char* mqtt_server = "broker.mqtt-dashboard.com";
const char* TOPIC_RFID_UID = "alex9ufo/rfid/UID";
const char* TOPIC_LED_CONTROL = "alex9ufo/led/control";
const char* TOPIC_LED_STATUS = "alex9ufo/led/status";

// 硬體物件
LiquidCrystal_I2C lcd(0x27, 16, 2);
MFRC522 mfrc522(SS_PIN, RST_PIN);
WiFiClient espClient;
PubSubClient mqttClient(espClient);

// --- FreeRTOS 隊列 ---
QueueHandle_t rfidQueue;
struct RfidMsg { char uid[20]; };

// 全域變數
bool isFlashing = false;

// --- MQTT 接收處理 (來自 Python 控制台) ---
void mqttCallback(char* topic, byte* payload, unsigned int length) {
  String message = "";
  for (int i = 0; i < length; i++) message += (char)payload[i];
 
  Serial.printf("\n[MQTT CMD]: %s\n", message.c_str());
 
  lcd.clear();
  lcd.setCursor(0, 0);
  lcd.print("MQTT Command:");
  lcd.setCursor(0, 1);

  isFlashing = false;
  if (message == "on") {
    digitalWrite(LED_PIN, HIGH);
    lcd.print("LED: ON");
    mqttClient.publish(TOPIC_LED_STATUS, "ON");
  }
  else if (message == "off") {
    digitalWrite(LED_PIN, LOW);
    lcd.print("LED: OFF");
    mqttClient.publish(TOPIC_LED_STATUS, "OFF");
  }
  else if (message == "flash") {
    isFlashing = true;
    lcd.print("MODE: FLASHING");
    mqttClient.publish(TOPIC_LED_STATUS, "FLASHING");
  }
  else if (message == "timer") {
    digitalWrite(LED_PIN, HIGH);
    lcd.print("TIMER: 5 SEC");
    mqttClient.publish(TOPIC_LED_STATUS, "TIMER_START");
    vTaskDelay(5000 / portTICK_PERIOD_MS); // 在 Task 中使用 vTaskDelay 不會卡死整個系統
    digitalWrite(LED_PIN, LOW);
    mqttClient.publish(TOPIC_LED_STATUS, "OFF");
  }
}

// --- Core 0: 負責 WiFi 與 MQTT 通訊 ---
void mqttTask(void *pvParameters) {
  WiFi.begin(ssid, password);
  while (WiFi.status() != WL_CONNECTED) { vTaskDelay(500 / portTICK_PERIOD_MS); }
 
  mqttClient.setServer(mqtt_server, 1883);
  mqttClient.setCallback(mqttCallback);

  RfidMsg rMsg;
  while (true) {
    // 維護 MQTT 連線
    if (!mqttClient.connected()) {
      Serial.print("Connecting to MQTT...");
      if (mqttClient.connect("ESP32_RFID_Gate_NoTG")) {
        Serial.println("Connected");
        mqttClient.subscribe(TOPIC_LED_CONTROL);
      } else {
        vTaskDelay(5000 / portTICK_PERIOD_MS);
      }
    }
    mqttClient.loop();

    // 接收來自 Core 1 的 RFID 訊息並發送到 MQTT
    if (xQueueReceive(rfidQueue, &rMsg, 0) == pdPASS) {
      mqttClient.publish(TOPIC_RFID_UID, rMsg.uid);
      Serial.printf("Sent UID to Python: %s\n", rMsg.uid);
    }

    // 處理閃爍邏輯
    if (isFlashing) {
      digitalWrite(LED_PIN, !digitalRead(LED_PIN));
      vTaskDelay(300 / portTICK_PERIOD_MS);
    }
   
    vTaskDelay(10 / portTICK_PERIOD_MS);
  }
}

// --- Core 1: 專門負責 RFID 掃描 (不處理網路) ---
void rfidTask(void *pvParameters) {
  SPI.begin();
  mfrc522.PCD_Init();
  RfidMsg rMsg;
  while (true) {
    if (mfrc522.PICC_IsNewCardPresent() && mfrc522.PICC_ReadCardSerial()) {
      String uidStr = "";
      for (byte i = 0; i < mfrc522.uid.size; i++) {
        uidStr += (mfrc522.uid.uidByte[i] < 0x10 ? "0" : "");
        uidStr += String(mfrc522.uid.uidByte[i], HEX);
      }
      uidStr.toUpperCase();
     
      // 更新 LCD 顯示
      lcd.clear();
      lcd.setCursor(0, 0); lcd.print("RFID Detected!");
      lcd.setCursor(0, 1); lcd.print("ID: " + uidStr);
     
      // 將卡號打包送入隊列,交給 Core 0 發送
      uidStr.toCharArray(rMsg.uid, 20);
      xQueueSend(rfidQueue, &rMsg, portMAX_DELAY);

      mfrc522.PICC_HaltA();
      mfrc522.PCD_StopCrypto1();
    }
    vTaskDelay(200 / portTICK_PERIOD_MS);
  }
}

void setup() {
  Serial.begin(115200);
  pinMode(LED_PIN, OUTPUT);
 
  // 初始化 I2C LCD
  Wire.begin(I2C_SDA, I2C_SCL);
  lcd.init();
  lcd.backlight();
  lcd.print("MQTT Connecting...");

  // 建立隊列
  rfidQueue = xQueueCreate(10, sizeof(RfidMsg));

  if (rfidQueue != NULL) {
    // 建立雙核心任務
    xTaskCreatePinnedToCore(mqttTask, "MQTT_Task", 8192, NULL, 1, NULL, 0);
    xTaskCreatePinnedToCore(rfidTask, "RFID_Task", 4096, NULL, 1, NULL, 1);
  }
}

void loop() {
  // FreeRTOS 架構下 loop 不需執行內容
  vTaskDelay(portMAX_DELAY);
}


Python + TKinter 程式

import tkinter as tk from tkinter import messagebox, ttk import paho.mqtt.client as mqtt import sqlite3 import os from datetime import datetime import winsound # --- 系統參數設定 --- MQTT_BROKER = "broker.mqtt-dashboard.com" TOPIC_CONTROL = "alex9ufo/led/control" TOPIC_STATUS = "alex9ufo/led/status" TOPIC_RFID = "alex9ufo/rfid/UID" DB_NAME = "rfid2026.db" class RFIDSystemApp: def __init__(self, root): self.root = root self.root.title("WOWKI ESP32 RFID 系統軟體") # 1. 初始化變數與帳密載入 self.admin_user = "Admin" self.admin_pass = "123456" self.flash_job = None self.mode = tk.StringVar(value="ADD") self.mqtt_connected = False self.mqtt_canvas = None self.tree = None self.init_db_and_load_auth() self.setup_mqtt() self.login_ui() def init_db_and_load_auth(self): """初始化資料庫並從中讀取帳號密碼""" conn = sqlite3.connect(DB_NAME) cursor = conn.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS rfid_history (id INTEGER PRIMARY KEY AUTOINCREMENT, date TEXT, time TEXT, uid TEXT, note TEXT)''') cursor.execute('''CREATE TABLE IF NOT EXISTS system_config (key TEXT PRIMARY KEY, value TEXT)''') # 讀取帳號 cursor.execute("SELECT value FROM system_config WHERE key='user'") res_u = cursor.fetchone() if res_u: self.admin_user = res_u[0] else: cursor.execute("INSERT INTO system_config VALUES ('user', ?)", (self.admin_user,)) # 讀取密碼 cursor.execute("SELECT value FROM system_config WHERE key='pass'") res_p = cursor.fetchone() if res_p: self.admin_pass = res_p[0] else: cursor.execute("INSERT INTO system_config VALUES ('pass', ?)", (self.admin_pass,)) conn.commit() conn.close() def setup_mqtt(self): self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) self.client.on_connect = self.on_connect self.client.on_disconnect = self.on_disconnect self.client.on_message = self.on_message try: self.client.connect_async(MQTT_BROKER, 1883, 60) self.client.loop_start() except Exception as e: print(f"MQTT 初始化失敗: {e}") def on_connect(self, client, userdata, flags, rc, properties=None): self.client.subscribe([(TOPIC_STATUS, 0), (TOPIC_RFID, 0)]) self.mqtt_connected = True self.root.after(0, self.update_mqtt_status_ui) def on_disconnect(self, client, userdata, rc, properties=None): self.mqtt_connected = False self.root.after(0, self.update_mqtt_status_ui) def update_mqtt_status_ui(self): if self.mqtt_canvas: color = "green" if self.mqtt_connected else "red" text = f"MQTT 狀態: {'已連線' if self.mqtt_connected else '斷開'}" self.mqtt_canvas.itemconfig(self.mqtt_light, fill=color) self.lbl_mqtt_info.config(text=text, fg=color) def on_message(self, client, userdata, msg): payload = msg.payload.decode().strip().upper() if msg.topic == TOPIC_STATUS: self.root.after(0, lambda: self.update_led_display(payload)) elif msg.topic == TOPIC_RFID: self.root.after(0, lambda: self.process_rfid(payload)) def process_rfid(self, uid): if self.mode.get() == "ADD": self.save_rfid_data(uid, "新增") self.lbl_result.config(text=f"已新增卡號: {uid}", fg="black", font=("微軟正黑體", 16)) else: conn = sqlite3.connect(DB_NAME) cursor = conn.cursor() cursor.execute("SELECT * FROM rfid_history WHERE uid = ? AND note = '新增'", (uid,)) found = cursor.fetchone() conn.close() if found: self.lbl_result.config(text="比對相同", fg="green", font=("微軟正黑體", 32, "bold")) self.save_rfid_data(uid, "比對相同") else: self.lbl_result.config(text="比對錯誤", fg="red", font=("微軟正黑體", 32, "bold")) self.root.after(100, lambda: winsound.Beep(880, 3000)) self.save_rfid_data(uid, "比對錯誤") def save_rfid_data(self, uid, note): now = datetime.now() conn = sqlite3.connect(DB_NAME) cursor = conn.cursor() cursor.execute("INSERT INTO rfid_history (date, time, uid, note) VALUES (?, ?, ?, ?)", (now.strftime("%Y/%m/%d"), now.strftime("%H:%M:%S"), uid, note)) conn.commit() conn.close() if self.tree: self.update_table() def main_ui(self): self.clear_window() # 狀態列 m_frame = tk.Frame(self.root, relief="groove", borderwidth=2); m_frame.pack(fill="x", padx=10, pady=5) self.mqtt_canvas = tk.Canvas(m_frame, width=20, height=20); self.mqtt_canvas.pack(side="left", padx=5) self.mqtt_light = self.mqtt_canvas.create_oval(2, 2, 18, 18, fill="gray") self.lbl_mqtt_info = tk.Label(m_frame, text="MQTT 狀態: 檢查中..."); self.lbl_mqtt_info.pack(side="left") self.update_mqtt_status_ui() # 管理按鈕區 db_f = tk.Frame(self.root); db_f.pack(pady=5) tk.Button(db_f, text="建立資料庫", command=self.init_db_with_msg).pack(side="left", padx=5) tk.Button(db_f, text="刪除選定", bg="#ffcccc", command=self.delete_selected).pack(side="left", padx=5) tk.Button(db_f, text="清空資料庫", bg="orange", command=self.clear_all_db).pack(side="left", padx=5) # --- 新增結束按鈕 --- tk.Button(db_f, text="結束程式", bg="red", fg="white", command=self.exit_app).pack(side="left", padx=5) # 模式與大字顯示 mode_f = tk.LabelFrame(self.root, text="RFID 工作模式"); mode_f.pack(fill="x", padx=10, pady=5) tk.Radiobutton(mode_f, text="新增模式", variable=self.mode, value="ADD").pack(side="left", padx=20) tk.Radiobutton(mode_f, text="比對模式", variable=self.mode, value="CHECK").pack(side="left", padx=20) self.lbl_result = tk.Label(self.root, text="等待感應...", font=("微軟正黑體", 16)); self.lbl_result.pack(pady=10) # LED 控制 led_f = tk.Frame(self.root); led_f.pack() self.canvas = tk.Canvas(led_f, width=40, height=40); self.canvas.pack(side="left") self.led_circle = self.canvas.create_oval(5, 5, 35, 35, fill="red") self.lbl_led_status = tk.Label(led_f, text="LED: OFF"); self.lbl_led_status.pack(side="left", padx=10) btn_f = tk.Frame(self.root); btn_f.pack(pady=5) for t, c in [("ON", "on"), ("OFF", "off"), ("FLASH", "flash"), ("TIMER(5S)", "timer")]: tk.Button(btn_f, text=t, width=10, command=lambda cmd=c: self.client.publish(TOPIC_CONTROL, cmd)).pack(side="left", padx=2) # 資料表 self.tree = ttk.Treeview(self.root, columns=("ID", "日期", "時間", "卡號", "備註"), show="headings") for col in ("ID", "日期", "時間", "卡號", "備註"): self.tree.heading(col, text=col) self.tree.column(col, width=95, anchor="center") self.tree.pack(padx=10, pady=10) self.update_table() def exit_app(self): """安全關閉連線並退出""" if messagebox.askyesno("結束", "確定要結束程式嗎?"): self.client.loop_stop() self.client.disconnect() self.root.destroy() # --- 其他輔助功能 --- def update_table(self): if not self.tree: return for i in self.tree.get_children(): self.tree.delete(i) conn = sqlite3.connect(DB_NAME); cursor = conn.cursor() cursor.execute("SELECT * FROM rfid_history ORDER BY id DESC") for row in cursor.fetchall(): self.tree.insert("", "end", values=row) conn.close() def init_db_with_msg(self): self.init_db_and_load_auth() messagebox.showinfo("資料庫", "資料庫結構已重整") self.update_table() def delete_selected(self): selected = self.tree.selection() if not selected: return db_id = self.tree.item(selected)['values'][0] conn = sqlite3.connect(DB_NAME); cursor = conn.cursor() cursor.execute("DELETE FROM rfid_history WHERE id = ?", (db_id,)) conn.commit(); conn.close() self.update_table() def clear_all_db(self): if messagebox.askyesno("警告", "清空所有歷史資料?"): conn = sqlite3.connect(DB_NAME); cursor = conn.cursor() cursor.execute("DELETE FROM rfid_history") cursor.execute("DELETE FROM sqlite_sequence WHERE name='rfid_history'") conn.commit(); conn.close() self.update_table() def update_led_display(self, status): if self.flash_job: self.root.after_cancel(self.flash_job); self.flash_job = None if status == "ON": self.canvas.itemconfig(self.led_circle, fill="green") elif status == "OFF": self.canvas.itemconfig(self.led_circle, fill="red") elif "FLASH" in status: self.run_flash_logic() elif "TIMER" in status: self.canvas.itemconfig(self.led_circle, fill="green") self.root.after(5000, lambda: self.canvas.itemconfig(self.led_circle, fill="red")) self.lbl_led_status.config(text=f"ESP32 LED 狀態: {status}") def run_flash_logic(self): self.flash_state = getattr(self, 'flash_state', False) self.flash_state = not self.flash_state color = "green" if self.flash_state else "red" self.canvas.itemconfig(self.led_circle, fill=color) self.flash_job = self.root.after(500, self.run_flash_logic) def login_ui(self): self.clear_window(); f = tk.Frame(self.root); f.pack(pady=20) tk.Label(f, text="登入帳號").grid(row=0, column=0) self.e_u = tk.Entry(f); self.e_u.grid(row=0, column=1) tk.Label(f, text="登入密碼").grid(row=1, column=0) self.e_p = tk.Entry(f, show="*"); self.e_p.grid(row=1, column=1) tk.Button(f, text="登入", command=self.handle_login).grid(row=2, columnspan=2) def handle_login(self): if self.e_u.get() == self.admin_user and self.e_p.get() == self.admin_pass: if messagebox.askyesno("詢問", "是否變更帳號或密碼?"): self.change_auth_ui() else: self.main_ui() else: messagebox.showerror("錯誤", "帳號或密碼不正確") def change_auth_ui(self): self.clear_window() tk.Label(self.root, text="變更帳號").pack(); self.n_u = tk.Entry(self.root); self.n_u.pack() tk.Label(self.root, text="變更密碼").pack(); self.n_p = tk.Entry(self.root); self.n_p.pack() tk.Button(self.root, text="存檔並登入", command=self.save_new_auth).pack() def save_new_auth(self): new_u, new_p = self.n_u.get(), self.n_p.get() conn = sqlite3.connect(DB_NAME); cursor = conn.cursor() cursor.execute("UPDATE system_config SET value=? WHERE key='user'", (new_u,)) cursor.execute("UPDATE system_config SET value=? WHERE key='pass'", (new_p,)) conn.commit(); conn.close() self.admin_user, self.admin_pass = new_u, new_p messagebox.showinfo("成功", "帳密已儲存") self.main_ui() def clear_window(self): for w in self.root.winfo_children(): w.destroy() if __name__ == "__main__": root = tk.Tk() app = RFIDSystemApp(root) root.mainloop()

沒有留言:

張貼留言

WOKWI ESP32 模擬RFID ,LED + Python TKinter SQLite

 WOKWI ESP32 模擬RFID ,LED + Python TKinter SQLite  WOKWI ESP32程式 #include <SPI.h> #include <MFRC522.h> #include <WiFi.h> #i...