Telegram 送出命令 --> Python +TKinter -->MQTT -->WOKWI ESP32 RFID + LED
CREATE TABLE "rfid_logs" (
"id" INTEGER,
"date" TEXT NOT NULL,
"time" TEXT NOT NULL,
"LEDorRFID" TEXT,
"memo" TEXT,
PRIMARY KEY("id" AUTOINCREMENT)
);
wokwi 程式
Python 程式
import tkinter as tk
from tkinter import messagebox
import paho.mqtt.client as mqtt
import threading
import time
import asyncio
import sqlite3
import winsound
import sys
from datetime import datetime
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
# --- 1. 全域設定 ---
DB_NAME = "rfid115.db"
MQTT_BROKER = "broker.mqtt-dashboard.com"
MQTT_PORT = 1883
TOPIC_RFID_UID = "alex9ufo/rfid/UID"
TOPIC_LED_CONTROL = "alex9ufo/rfid/led"
TOPIC_LED_STATUS = "alex9ufo/rfid/ledStatus"
# Telegram 設定
TELEGRAM_BOT_TOKEN = "8022700986:AAGymymK9_d1HcTGJWl3mtqHmilxB64_5Zw"
TARGET_CHAT_ID = 7965218469
MODE_ADD = "新增模式"
MODE_COMPARE = "比對模式"
# --- 2. 資料庫操作 ---
class DatabaseManager:
def __init__(self, db_name):
self.db_name = db_name
self.create_table()
def connect(self):
return sqlite3.connect(self.db_name)
def create_table(self):
conn = self.connect()
conn.cursor().execute("""
CREATE TABLE IF NOT EXISTS rfid_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
time TEXT NOT NULL,
LEDorRFID TEXT,
memo TEXT
)
""")
conn.commit()
conn.close()
def add_log(self, led_or_rfid, memo):
conn = self.connect()
try:
now = datetime.now()
conn.cursor().execute(
"INSERT INTO rfid_logs (date, time, LEDorRFID, memo) VALUES (?, ?, ?, ?)",
(now.strftime("%Y-%m-%d"), now.strftime("%H:%M:%S"), led_or_rfid, memo)
)
conn.commit()
except Exception as e:
print(f"資料庫錯誤: {e}")
finally:
conn.close()
def get_latest_logs(self):
conn = self.connect()
res = conn.cursor().execute("SELECT * FROM rfid_logs ORDER BY id DESC LIMIT 50").fetchall()
conn.close()
return res
def get_all_rfid_uids(self):
conn = self.connect()
res = conn.cursor().execute("SELECT DISTINCT LEDorRFID FROM rfid_logs WHERE memo LIKE '[新增]%'").fetchall()
conn.close()
return [row[0] for row in res if row[0]]
# --- 3. Telegram Bot 處理 ---
class TelegramBotHandler:
def __init__(self, app, token):
self.app = app
self.token = token
self.tg_loop = None
self.application = Application.builder().token(token).build()
for cmd in ["on", "off", "flash", "timer"]:
self.application.add_handler(CommandHandler(cmd, self.handle_commands))
self.bot_thread = threading.Thread(target=self._run_bot, daemon=True)
def start_bot(self):
self.bot_thread.start()
def _run_bot(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.tg_loop = loop
self.application.run_polling(poll_interval=0.5)
async def handle_commands(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.message.chat_id != TARGET_CHAT_ID: return
cmd = update.message.text.replace("/", "")
self.app.master.after(0, self.app.process_incoming_command, f"Telegram:{cmd}", cmd)
def send_message(self, text):
if self.tg_loop:
asyncio.run_coroutine_threadsafe(
self.application.bot.send_message(chat_id=TARGET_CHAT_ID, text=text), self.tg_loop
)
# --- 4. Tkinter 主應用 ---
class RfidControlApp:
def __init__(self, master):
self.master = master
master.title("RFID & MQTT 監控系統")
self.db = DatabaseManager(DB_NAME)
self.status_var = tk.StringVar(value="系統連線中...")
self.mode_var = tk.StringVar(value=MODE_ADD)
self._setup_gui()
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.telegram_bot = TelegramBotHandler(self, TELEGRAM_BOT_TOKEN)
self.telegram_bot.start_bot()
threading.Thread(target=self._mqtt_thread, daemon=True).start()
# 啟動後 3 秒送出提示
self.master.after(3000, self._send_welcome_message)
def _setup_gui(self):
f_top = tk.Frame(self.master); f_top.pack(pady=10)
tk.Label(f_top, text="當前模式:", font=("Arial", 12)).pack(side=tk.LEFT)
tk.Label(f_top, textvariable=self.mode_var, font=("Arial", 12, "bold"), fg="blue").pack(side=tk.LEFT, padx=10)
tk.Button(f_top, text="切換模式", command=self._toggle_mode).pack(side=tk.LEFT)
self.info_label = tk.Label(self.master, textvariable=self.status_var,
font=("Microsoft JhengHei", 14, "bold"), bg="#FFFFE0", relief="sunken", width=45)
self.info_label.pack(pady=10, padx=20)
self.log_display = tk.Text(self.master, height=15, width=80, font=("Consolas", 10))
self.log_display.pack(pady=10, padx=10)
tk.Button(self.master, text="清空資料庫(Reset)", command=self._db_reset, fg="red").pack(pady=5)
self._handle_db_show()
def _send_welcome_message(self):
welcome_text = (
"🚀 RFID 監控系統已啟動!\n\n"
"請使用以下指令控制 LED:\n"
"/on - 開啟 LED\n"
"/off - 關閉 LED\n"
"/timer - 開啟 5 秒後關閉\n"
"/flash - 進入閃爍模式"
)
self.telegram_bot.send_message(welcome_text)
self.status_var.set("已向 Telegram 送出指令提示")
def _play_beep(self, freq, duration, repeat=1):
def run():
for _ in range(repeat):
winsound.Beep(freq, duration)
if repeat > 1: time.sleep(0.1)
threading.Thread(target=run, daemon=True).start()
def _toggle_mode(self):
new_m = MODE_COMPARE if self.mode_var.get() == MODE_ADD else MODE_ADD
self.mode_var.set(new_m)
self.status_var.set(f"模式切換: {new_m}")
def process_incoming_command(self, display_text, mqtt_cmd):
"""核心修正:更正常數名稱為 TOPIC_LED_CONTROL"""
self.status_var.set(f"收到指令: {display_text}")
# 修正這裡的 TOP_LED_CONTROL -> TOPIC_LED_CONTROL
self.client.publish(TOPIC_LED_CONTROL, mqtt_cmd)
self.db.add_log("CMD_IN", display_text)
self._handle_db_show()
def _mqtt_thread(self):
try:
self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
self.client.loop_forever()
except Exception as e:
print(f"MQTT 連線失敗: {e}")
def _on_connect(self, client, userdata, flags, rc, properties):
# 修正這裡的 TOP_RFID_UID -> TOPIC_RFID_UID
client.subscribe(TOPIC_RFID_UID)
client.subscribe(TOPIC_LED_STATUS)
self.master.after(0, lambda: self.status_var.set("MQTT 已連線,系統就緒"))
def _on_message(self, client, userdata, msg):
payload = msg.payload.decode()
if msg.topic == TOPIC_RFID_UID:
self.master.after(0, self._handle_rfid, payload)
elif msg.topic == TOPIC_LED_STATUS:
info = f"ESP32回報狀態: {payload.upper()}"
self.master.after(0, lambda: self.status_var.set(info))
self.db.add_log("MQTT_STATUS", info)
self.master.after(0, self._handle_db_show)
def _handle_rfid(self, uid):
mode = self.mode_var.get()
if mode == MODE_ADD:
self.db.add_log(uid, f"[新增] 卡號: {uid}")
self._play_beep(880, 1000, 3)
msg = f"RFID 已註冊: {uid}"
self.telegram_bot.send_message(f"✅ 已存入卡片: {uid}")
else:
uids = self.db.get_all_rfid_uids()
if uid in uids:
msg = f"比對正確: {uid}"
self._play_beep(2000, 200)
self.telegram_bot.send_message(f"🔓 通過: {uid}")
else:
msg = f"非法卡片: {uid}"
self._play_beep(1200, 5000)
self.telegram_bot.send_message(f"⚠️ 警報! 未授權卡片: {uid}")
self.status_var.set(msg)
self._handle_db_show()
def _handle_db_show(self):
logs = self.db.get_latest_logs()
self.log_display.delete(1.0, tk.END)
header = f"{'ID':<4} | {'日期':<10} | {'時間':<8} | {'LEDorRFID':<15} | 備註\n"
self.log_display.insert(tk.END, header + "-"*75 + "\n")
for r in logs:
self.log_display.insert(tk.END, f"{r[0]:<4} | {r[1]:<10} | {r[2]:<8} | {str(r[3]):<15} | {r[4]}\n")
def _db_reset(self):
if messagebox.askyesno("確認", "將刪除所有記錄?"):
conn = sqlite3.connect(DB_NAME)
conn.cursor().execute("DROP TABLE IF EXISTS rfid_logs")
conn.commit()
conn.close()
self.db.create_table()
self._handle_db_show()
self.status_var.set("資料庫已重置")
if __name__ == '__main__':
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
root = tk.Tk()
app = RfidControlApp(root)
root.mainloop()







沒有留言:
張貼留言