ALOHA 協議的 RFID 標籤讀取模擬程式
模擬多個標籤同時向讀取器發送資料時發生的「碰撞」(Collision)。
Pure ALOHA 的基本原理是:標籤隨時發送資料,如果發生碰撞(多個標籤同時發送),則等待隨機時間後重試。
併發處理 (Threading):
RFID 標籤是獨立運作的。我們為每個標籤建立一個
Thread(執行緒),模擬它們自主決定何時發送資料。
碰撞檢測機制:
使用一個全域串列
active_transmissions。當標籤開始發送時,將自己加入串列;結束時移除。
如果在發送期間,該串列長度大於 1,代表有多個標籤同時佔用通道,發生 Collision(碰撞)。
狀態視覺化:
灰色:空閒 (Idle)。
黃色:嘗試發送中。
綠色:發送成功(通道內僅有該標籤)。
紅色:發送失敗/碰撞(通道內有多個標籤)。
ALOHA 的核心概念
在真正的 RFID 系統中,Pure ALOHA 的最大吞吐量約為 18.4%。這是因為標籤完全不理會通道是否繁忙就直接發送。為了優化這個問題,通常會使用 Slotted ALOHA(劃分時間槽,標籤只能在時間槽開始時發送),這可以將效率提升到 36.8%。
import tkinter as tk
import random
import time
from threading import Thread, Event
class RFIDSimulator:
def __init__(self, root):
self.root = root
self.root.title("RFID ALOHA 模擬器 (自動/手動模式)")
# 參數設置
self.num_tags = 5
self.is_running = False
self.manual_mode = tk.BooleanVar(value=False) # 模式變數
self.step_event = Event() # 用於手動模式的同步信號
self.success_count = 0
self.collision_count = 0
self.active_transmissions = []
self.setup_ui()
def setup_ui(self):
# 畫布區
self.canvas = tk.Canvas(self.root, width=600, height=250, bg="white")
self.canvas.pack(pady=10)
# 繪製 Reader & Tags
self.reader = self.canvas.create_rectangle(250, 20, 350, 70, fill="lightblue")
self.canvas.create_text(300, 45, text="RFID Reader")
self.tags = []
for i in range(self.num_tags):
x = 100 + (i * 100)
tag = self.canvas.create_oval(x-20, 180, x+20, 220, fill="lightgrey")
self.canvas.create_text(x, 235, text=f"Tag {i+1}")
self.tags.append(tag)
# 控制面板
control_frame = tk.Frame(self.root)
control_frame.pack(pady=10)
self.btn_start = tk.Button(control_frame, text="開始模擬", command=self.start_simulation, width=10)
self.btn_start.pack(side=tk.LEFT, padx=5)
self.btn_stop = tk.Button(control_frame, text="停止", command=self.stop_simulation, state=tk.DISABLED, width=10)
self.btn_stop.pack(side=tk.LEFT, padx=5)
# 模式切換
tk.Checkbutton(control_frame, text="手動模式", variable=self.manual_mode).pack(side=tk.LEFT, padx=10)
# 下一步按鈕
self.btn_next = tk.Button(control_frame, text="下一步 >>", command=self.next_step, state=tk.DISABLED, bg="#e1f5fe")
self.btn_next.pack(side=tk.LEFT, padx=5)
# 狀態顯示
self.status_label = tk.Label(self.root, text="準備就緒", font=("Arial", 12))
self.status_label.pack()
self.stat_label = tk.Label(self.root, text="成功: 0 | 碰撞: 0", font=("Arial", 10))
self.stat_label.pack(pady=5)
def simulate_tag_behavior(self, tag_index):
while self.is_running:
# 1. 等待發送時機
if self.manual_mode.get():
# 手動模式:等待按鈕觸發事件
self.step_event.wait()
# 每個標籤隨機決定這次是否要發送 (模擬隨機碰撞)
if random.random() > 0.5: # 50% 機率發送
self.perform_transmission(tag_index)
else:
# 自動模式:隨機冷卻後發送
time.sleep(random.uniform(1, 4))
if self.is_running:
self.perform_transmission(tag_index)
# 為了避免手動模式下 Event 沒被清除導致無限循環
if tag_index == self.num_tags - 1 and self.manual_mode.get():
time.sleep(0.1) # 確保所有標籤都跑過判斷
self.step_event.clear()
def perform_transmission(self, tag_index):
"""執行發送動作與碰撞檢測"""
self.canvas.itemconfig(self.tags[tag_index], fill="yellow")
self.active_transmissions.append(tag_index)
time.sleep(0.8) # 模擬傳輸時間
if len(self.active_transmissions) > 1:
self.collision_count += 1
self.canvas.itemconfig(self.tags[tag_index], fill="red")
self.update_status("發生碰撞!", "red")
else:
self.success_count += 1
self.canvas.itemconfig(self.tags[tag_index], fill="#4CAF50") # 綠色
self.update_status(f"標籤 {tag_index+1} 讀取成功", "green")
time.sleep(0.5)
if tag_index in self.active_transmissions:
self.active_transmissions.remove(tag_index)
self.canvas.itemconfig(self.tags[tag_index], fill="lightgrey")
self.update_stats()
def next_step(self):
"""手動模式觸發按鈕"""
self.update_status("檢測通道中...", "blue")
self.step_event.set() # 喚醒所有正在 wait 的標籤執行緒
def update_status(self, msg, color):
self.root.after(0, lambda: self.status_label.config(text=msg, fg=color))
def update_stats(self):
self.root.after(0, lambda: self.stat_label.config(text=f"成功: {self.success_count} | 碰撞: {self.collision_count}"))
def start_simulation(self):
self.is_running = True
self.success_count = 0
self.collision_count = 0
self.update_stats()
self.btn_start.config(state=tk.DISABLED)
self.btn_stop.config(state=tk.NORMAL)
self.btn_next.config(state=tk.NORMAL)
for i in range(self.num_tags):
Thread(target=self.simulate_tag_behavior, args=(i,), daemon=True).start()
def stop_simulation(self):
self.is_running = False
self.step_event.set() # 確保所有執行緒能跳出循環
self.btn_start.config(state=tk.NORMAL)
self.btn_stop.config(state=tk.DISABLED)
self.btn_next.config(state=tk.DISABLED)
self.update_status("模擬停止", "black")
if __name__ == "__main__":
root = tk.Tk()
app = RFIDSimulator(root)
root.mainloop()
沒有留言:
張貼留言