2026年1月15日 星期四

ALOHA 協議的 RFID 標籤讀取模擬程式

 ALOHA 協議的 RFID 標籤讀取模擬程式

模擬多個標籤同時向讀取器發送資料時發生的「碰撞」(Collision)。

Pure ALOHA 的基本原理是:標籤隨時發送資料,如果發生碰撞(多個標籤同時發送),則等待隨機時間後重試。

  1. 併發處理 (Threading)

    • RFID 標籤是獨立運作的。我們為每個標籤建立一個 Thread(執行緒),模擬它們自主決定何時發送資料。

  2. 碰撞檢測機制

    • 使用一個全域串列 active_transmissions

    • 當標籤開始發送時,將自己加入串列;結束時移除。

    • 如果在發送期間,該串列長度大於 1,代表有多個標籤同時佔用通道,發生 Collision(碰撞)

  3. 狀態視覺化

    • 灰色:空閒 (Idle)。

    • 黃色:嘗試發送中。

    • 綠色:發送成功(通道內僅有該標籤)。

    • 紅色:發送失敗/碰撞(通道內有多個標籤)。

ALOHA 的核心概念

在真正的 RFID 系統中,Pure ALOHA 的最大吞吐量約為 18.4%。這是因為標籤完全不理會通道是否繁忙就直接發送。為了優化這個問題,通常會使用 Slotted ALOHA(劃分時間槽,標籤只能在時間槽開始時發送),這可以將效率提升到 36.8%


import tkinter as tk

import random

import time

from threading import Thread, Event


class RFIDSimulator:

    def __init__(self, root):

        self.root = root

        self.root.title("RFID ALOHA 模擬器 (自動/手動模式)")

        

        # 參數設置

        self.num_tags = 5

        self.is_running = False

        self.manual_mode = tk.BooleanVar(value=False) # 模式變數

        self.step_event = Event() # 用於手動模式的同步信號

        

        self.success_count = 0

        self.collision_count = 0

        self.active_transmissions = []

        

        self.setup_ui()

        

    def setup_ui(self):

        # 畫布區

        self.canvas = tk.Canvas(self.root, width=600, height=250, bg="white")

        self.canvas.pack(pady=10)

        

        # 繪製 Reader & Tags

        self.reader = self.canvas.create_rectangle(250, 20, 350, 70, fill="lightblue")

        self.canvas.create_text(300, 45, text="RFID Reader")

        

        self.tags = []

        for i in range(self.num_tags):

            x = 100 + (i * 100)

            tag = self.canvas.create_oval(x-20, 180, x+20, 220, fill="lightgrey")

            self.canvas.create_text(x, 235, text=f"Tag {i+1}")

            self.tags.append(tag)


        # 控制面板

        control_frame = tk.Frame(self.root)

        control_frame.pack(pady=10)


        self.btn_start = tk.Button(control_frame, text="開始模擬", command=self.start_simulation, width=10)

        self.btn_start.pack(side=tk.LEFT, padx=5)

        

        self.btn_stop = tk.Button(control_frame, text="停止", command=self.stop_simulation, state=tk.DISABLED, width=10)

        self.btn_stop.pack(side=tk.LEFT, padx=5)


        # 模式切換

        tk.Checkbutton(control_frame, text="手動模式", variable=self.manual_mode).pack(side=tk.LEFT, padx=10)


        # 下一步按鈕

        self.btn_next = tk.Button(control_frame, text="下一步 >>", command=self.next_step, state=tk.DISABLED, bg="#e1f5fe")

        self.btn_next.pack(side=tk.LEFT, padx=5)


        # 狀態顯示

        self.status_label = tk.Label(self.root, text="準備就緒", font=("Arial", 12))

        self.status_label.pack()

        

        self.stat_label = tk.Label(self.root, text="成功: 0 | 碰撞: 0", font=("Arial", 10))

        self.stat_label.pack(pady=5)


    def simulate_tag_behavior(self, tag_index):

        while self.is_running:

            # 1. 等待發送時機

            if self.manual_mode.get():

                # 手動模式:等待按鈕觸發事件

                self.step_event.wait() 

                # 每個標籤隨機決定這次是否要發送 (模擬隨機碰撞)

                if random.random() > 0.5: # 50% 機率發送

                    self.perform_transmission(tag_index)

            else:

                # 自動模式:隨機冷卻後發送

                time.sleep(random.uniform(1, 4))

                if self.is_running:

                    self.perform_transmission(tag_index)

            

            # 為了避免手動模式下 Event 沒被清除導致無限循環

            if tag_index == self.num_tags - 1 and self.manual_mode.get():

                time.sleep(0.1) # 確保所有標籤都跑過判斷

                self.step_event.clear()


    def perform_transmission(self, tag_index):

        """執行發送動作與碰撞檢測"""

        self.canvas.itemconfig(self.tags[tag_index], fill="yellow")

        self.active_transmissions.append(tag_index)

        

        time.sleep(0.8) # 模擬傳輸時間

        

        if len(self.active_transmissions) > 1:

            self.collision_count += 1

            self.canvas.itemconfig(self.tags[tag_index], fill="red")

            self.update_status("發生碰撞!", "red")

        else:

            self.success_count += 1

            self.canvas.itemconfig(self.tags[tag_index], fill="#4CAF50") # 綠色

            self.update_status(f"標籤 {tag_index+1} 讀取成功", "green")

        

        time.sleep(0.5)

        if tag_index in self.active_transmissions:

            self.active_transmissions.remove(tag_index)

        self.canvas.itemconfig(self.tags[tag_index], fill="lightgrey")

        self.update_stats()


    def next_step(self):

        """手動模式觸發按鈕"""

        self.update_status("檢測通道中...", "blue")

        self.step_event.set() # 喚醒所有正在 wait 的標籤執行緒


    def update_status(self, msg, color):

        self.root.after(0, lambda: self.status_label.config(text=msg, fg=color))


    def update_stats(self):

        self.root.after(0, lambda: self.stat_label.config(text=f"成功: {self.success_count} | 碰撞: {self.collision_count}"))


    def start_simulation(self):

        self.is_running = True

        self.success_count = 0

        self.collision_count = 0

        self.update_stats()

        

        self.btn_start.config(state=tk.DISABLED)

        self.btn_stop.config(state=tk.NORMAL)

        self.btn_next.config(state=tk.NORMAL)

        

        for i in range(self.num_tags):

            Thread(target=self.simulate_tag_behavior, args=(i,), daemon=True).start()


    def stop_simulation(self):

        self.is_running = False

        self.step_event.set() # 確保所有執行緒能跳出循環

        self.btn_start.config(state=tk.NORMAL)

        self.btn_stop.config(state=tk.DISABLED)

        self.btn_next.config(state=tk.DISABLED)

        self.update_status("模擬停止", "black")


if __name__ == "__main__":

    root = tk.Tk()

    app = RFIDSimulator(root)

    root.mainloop()


沒有留言:

張貼留言

RFID reader-driven I-code 協議

RFID reader-driven I-code  協議   I-Code 協議(特別是 NXP 的 I-Code SLI 系列)是典型的 Reader-driven 協議。它結合了 Polling 與 Slotted ALOHA 的混合機制。讀取器發送 Inventor...