WOKWI RFID + MQTT +Python Sqlite
Wokwi 程式
#include <SPI.h>
#include <MFRC522.h>
#include <WiFi.h>
#include <PubSubClient.h>
#include <Wire.h>
#include <LiquidCrystal_I2C.h>
// --- 硬體腳位 ---
#define SS_PIN 5
#define RST_PIN 22
#define LED_PIN 2
#define I2C_SDA 17
#define I2C_SCL 16
// --- MQTT 設定 ---
const char* ssid = "Wokwi-GUEST";
const char* password = "";
const char* mqtt_server = "mqttgo.io";
const char* topic_uid = "alex9ufo/rfidUID";
const char* topic_led_cmd = "alex9ufo/ledControl";
const char* topic_led_status = "alex9ufo/ledStatus";
LiquidCrystal_I2C lcd(0x27, 16, 2);
MFRC522 mfrc522(SS_PIN, RST_PIN);
WiFiClient espClient;
PubSubClient client(espClient);
// --- 全域變數 ---
QueueHandle_t uidQueue;
struct Message { char uid[20]; };
bool isFlashing = false; // 控制 Flash 模式是否持續
// --- MQTT 接收指令 ---
void callback(char* topic, byte* payload, unsigned int length) {
String message = "";
for (int i = 0; i < length; i++) message += (char)payload[i];
Serial.print("\n[MQTT Command] Received: ");
Serial.println(message);
// 每次收到新指令先停止持續閃爍
isFlashing = false;
if (message == "ON") {
digitalWrite(LED_PIN, HIGH);
client.publish(topic_led_status, "ON");
}
else if (message == "OFF") {
digitalWrite(LED_PIN, LOW);
client.publish(topic_led_status, "OFF");
}
else if (message == "FLASH") {
isFlashing = true; // 啟動持續閃爍
client.publish(topic_led_status, "FLASH");
}
else if (message == "TIMER") {
client.publish(topic_led_status, "TIMER");
digitalWrite(LED_PIN, HIGH);
vTaskDelay(5000 / portTICK_PERIOD_MS);
digitalWrite(LED_PIN, LOW);
client.publish(topic_led_status, "OFF");
}
}
// --- Core 0: MQTT 與 LED 狀態維護 ---
void mqttTask(void *pvParameters) {
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
vTaskDelay(500 / portTICK_PERIOD_MS);
Serial.print(".");
}
Serial.println("\nWiFi Connected");
client.setServer(mqtt_server, 1883);
client.setCallback(callback);
Message msg;
while (true) {
if (!client.connected()) {
String clientId = "ESP32-" + String(random(0xffff), HEX);
if (client.connect(clientId.c_str())) {
client.subscribe(topic_led_cmd);
} else {
vTaskDelay(5000 / portTICK_PERIOD_MS);
continue;
}
}
client.loop();
// 處理持續閃爍邏輯
if (isFlashing) {
digitalWrite(LED_PIN, !digitalRead(LED_PIN));
vTaskDelay(300 / portTICK_PERIOD_MS);
}
// 傳送 RFID UID
if (xQueueReceive(uidQueue, &msg, 1) == pdPASS) {
Serial.print("[MQTT Publish] Sending UID to Broker: ");
Serial.println(msg.uid);
client.publish(topic_uid, msg.uid);
}
vTaskDelay(10 / portTICK_PERIOD_MS);
}
}
// --- Core 1: RFID 掃描 ---
void rfidTask(void *pvParameters) {
SPI.begin();
mfrc522.PCD_Init();
Message msg;
while (true) {
if (mfrc522.PICC_IsNewCardPresent() && mfrc522.PICC_ReadCardSerial()) {
String currentUID = "";
for (byte i = 0; i < mfrc522.uid.size; i++) {
currentUID.concat(String(mfrc522.uid.uidByte[i] < 0x10 ? "0" : ""));
currentUID.concat(String(mfrc522.uid.uidByte[i], HEX));
if (i < mfrc522.uid.size - 1) currentUID.concat(" ");
}
currentUID.toUpperCase();
// --- Serial Monitor 顯示 ---
Serial.println("\n------------------------------");
Serial.print("RFID Card Detected! UID: ");
Serial.println(currentUID);
Serial.println("------------------------------");
lcd.setCursor(0, 1);
lcd.print("ID: " + currentUID + " ");
currentUID.toCharArray(msg.uid, 20);
xQueueSend(uidQueue, &msg, portMAX_DELAY);
mfrc522.PICC_HaltA();
mfrc522.PCD_StopCrypto1();
}
vTaskDelay(200 / portTICK_PERIOD_MS);
}
}
void setup() {
Serial.begin(115200);
pinMode(LED_PIN, OUTPUT);
Wire.begin(I2C_SDA, I2C_SCL);
lcd.init();
lcd.backlight();
lcd.setCursor(0, 0);
lcd.print("RFID MQTT System");
uidQueue = xQueueCreate(10, sizeof(Message));
if (uidQueue != NULL) {
xTaskCreatePinnedToCore(mqttTask, "MQTT_Task", 8192, NULL, 1, NULL, 0);
xTaskCreatePinnedToCore(rfidTask, "RFID_Task", 8192, NULL, 1, NULL, 1);
}
}
void loop() { vTaskDelay(portMAX_DELAY); }
Python 程式
import tkinter as tk
from tkinter import ttk
import paho.mqtt.client as mqtt
import sqlite3
from datetime import datetime
import threading
# --- MQTT 設定 ---
MQTT_BROKER = "mqttgo.io"
TOPIC_UID = "alex9ufo/rfidUID"
TOPIC_LED_CMD = "alex9ufo/ledControl"
TOPIC_LED_STATUS = "alex9ufo/ledStatus"
class SmartHomeApp:
def __init__(self, root):
self.root = root
self.root.title("MQTT RFID & LED 管理監控系統")
self.root.geometry("800x600")
self.init_db()
self.setup_ui()
self.setup_mqtt()
def init_db(self):
# 建立資料庫
self.conn = sqlite3.connect("iot_system.db", check_same_thread=False)
self.cursor = self.conn.cursor()
self.cursor.execute('''CREATE TABLE IF NOT EXISTS logs
(id INTEGER PRIMARY KEY AUTOINCREMENT, date TEXT, time TEXT, status TEXT)''')
self.conn.commit()
def setup_ui(self):
# --- 頂部:MQTT 連線狀態區 ---
status_bar = tk.Frame(self.root, bd=1, relief=tk.SUNKEN)
status_bar.pack(side="top", fill="x", padx=10, pady=5)
self.mqtt_status_label = tk.Label(status_bar, text="MQTT 狀態: 嘗試連線中...", fg="orange", font=("Arial", 10, "bold"))
self.mqtt_status_label.pack(side="left", padx=5)
self.broker_label = tk.Label(status_bar, text=f"Broker: {MQTT_BROKER}", fg="gray")
self.broker_label.pack(side="right", padx=5)
# --- 控制區 ---
ctrl_frame = tk.LabelFrame(self.root, text="遠端 LED 控制指令", font=("Arial", 10, "bold"))
ctrl_frame.pack(fill="x", padx=15, pady=10)
cmds = [("開燈 (ON)", "ON", "green"), ("關燈 (OFF)", "OFF", "red"),
("持續閃爍 (FLASH)", "FLASH", "orange"), ("計時5秒 (TIMER)", "TIMER", "blue")]
for text, cmd, color in cmds:
btn = tk.Button(ctrl_frame, text=text, width=15, bg="#f0f0f0",
command=lambda c=cmd: self.send_command(c))
btn.pack(side="left", padx=10, pady=15)
# --- 硬體回饋顯示區 ---
feedback_frame = tk.Frame(self.root)
feedback_frame.pack(fill="x", padx=15, pady=5)
# 模擬 LED 圓燈
self.canvas = tk.Canvas(feedback_frame, width=40, height=40)
self.led_circle = self.canvas.create_oval(5, 5, 35, 35, fill="red") # 預設關閉(紅)
self.canvas.pack(side="left")
self.hw_status_text = tk.Label(feedback_frame, text="硬體 LED 狀態: OFF", font=("Arial", 12, "bold"))
self.hw_status_text.pack(side="left", padx=10)
# --- 下方:資料表格 ---
table_frame = tk.LabelFrame(self.root, text="系統事件紀錄 (SQLite)", font=("Arial", 10, "bold"))
table_frame.pack(fill="both", expand=True, padx=15, pady=15)
self.tree = ttk.Treeview(table_frame, columns=("ID", "Date", "Time", "Status"), show="headings")
self.tree.heading("ID", text="ID"); self.tree.column("ID", width=50, anchor="center")
self.tree.heading("Date", text="日期"); self.tree.column("Date", width=120, anchor="center")
self.tree.heading("Time", text="時間"); self.tree.column("Time", width=120, anchor="center")
self.tree.heading("Status", text="事件內容 / RFID UID"); self.tree.column("Status", width=350)
# 滾動條
scrollbar = ttk.Scrollbar(table_frame, orient="vertical", command=self.tree.yview)
self.tree.configure(yscroll=scrollbar.set)
self.tree.pack(side="left", fill="both", expand=True)
scrollbar.pack(side="right", fill="y")
self.update_table()
def setup_mqtt(self):
# 初始化 MQTT Client
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
self.client.on_message = self.on_message
try:
self.client.connect(MQTT_BROKER, 1883, 60)
# 使用 loop_start 確保不會阻塞 GUI
self.client.loop_start()
except Exception as e:
self.mqtt_status_label.config(text=f"MQTT 狀態: 連線失敗 ({e})", fg="red")
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.mqtt_status_label.config(text="MQTT 狀態: 已連接至 mqttgo.io", fg="green")
self.client.subscribe([(TOPIC_UID, 0), (TOPIC_LED_STATUS, 0)])
else:
self.mqtt_status_label.config(text=f"MQTT 狀態: 連線錯誤({rc})", fg="red")
def on_disconnect(self, client, userdata, rc):
self.mqtt_status_label.config(text="MQTT 狀態: 已斷開連線", fg="red")
def on_message(self, client, userdata, msg):
payload = msg.payload.decode()
topic = msg.topic
if topic == TOPIC_LED_STATUS:
# 更新介面 LED 圓燈與文字
self.root.after(0, self.update_led_ui, payload)
self.log_event(f"LED 狀態變更 -> {payload}")
elif topic == TOPIC_UID:
# 接收 RFID 卡號
print(f"收到 RFID 卡號: {payload}")
self.log_event(f"RFID 感應卡號: {payload}")
def send_command(self, cmd):
if self.client.is_connected():
self.client.publish(TOPIC_LED_CMD, cmd)
print(f"發送指令: {cmd}")
else:
tk.messagebox.showwarning("連線錯誤", "目前未連接到 MQTT Broker,無法發送指令")
def update_led_ui(self, status):
self.hw_status_text.config(text=f"硬體 LED 狀態: {status}")
# 如果狀態是 ON, FLASH 或 TIMER,顯示綠燈;OFF 顯示紅燈
if status in ["ON", "FLASH", "TIMER"]:
self.canvas.itemconfig(self.led_circle, fill="green")
else:
self.canvas.itemconfig(self.led_circle, fill="red")
def log_event(self, event_str):
now = datetime.now()
date_s = now.strftime("%Y-%m-%d")
time_s = now.strftime("%H:%M:%S")
# 寫入資料庫
self.cursor.execute("INSERT INTO logs (date, time, status) VALUES (?, ?, ?)",
(date_s, time_s, event_str))
self.conn.commit()
# 更新表格顯示
self.root.after(0, self.update_table)
def update_table(self):
# 清除舊數據
for row in self.tree.get_children():
self.tree.delete(row)
# 從資料庫讀取最新 20 筆
self.cursor.execute("SELECT * FROM logs ORDER BY id DESC LIMIT 20")
for row in self.cursor.fetchall():
self.tree.insert("", "end", values=row)
if __name__ == "__main__":
root = tk.Tk()
app = SmartHomeApp(root)
root.mainloop()





沒有留言:
張貼留言