python tkinter 程式
1) 畫面顯示 連接到 broker.mqtt-dashboard.com (連上綠色)
2) 畫面顯示 主題: alex9ufo/Token 內定 payload:8022701986:AAGymymK9_d12HcTGJWle3mtqHmilxB64_5Zw 可以變更
3) 畫面顯示 主題: alex9ufo/chatID 內定 payload:749652818469 可以變更
4) 畫面顯示 主題: alex9ufo/relay1 內定 payload: 輸入<1> 被觸動 可以變更
5) 畫面顯示 主題: alex9ufo/relay2 內定 payload: 輸入<2> 被觸動 可以變更
6) 畫面顯示 主題: alex9ufo/relay3 內定 payload: 輸入<3> 被觸動 可以變更
7) 畫面顯示 主題: alex9ufo/relay4 內定 payload: 輸入<4> 被觸動 可以變更
8) 畫面顯示 主題: alex9ufo/relay5 內定 payload: 輸入<5> 被觸動 可以變更
9) 畫面顯示 主題: alex9ufo/relay6 內定 payload: 輸入<6> 被觸動 可以變更
input phtot coupler pin 34,35,32,33,25,26
output relay pin 23,22,21,19,18,5
1) input phtot coupler pin 34,35,32,33,25,26 , output relay pin 23,22,21,19,18,5 LED pin 2
2) 透過 mqtt broker.mqtt-dashboard.com 接收主題 : alex9ufo/Token的內容 為telegram bot token
接收主題 : alex9ufo/chatID的內容 為telegram chat id 若成功 則 pin 2 輸出 hi LED亮
3) 檢查 input phtot coupler pin 34,35,32,33,25,26 若是輸入為 hi 則對應 output relay pin 23,22,21,19,18,5 輸出Hi 並 透過 telegram Token chatID , 輸出對應的內容 輸入<1> .....<6> 被觸動 到 telegram 上通知使用者
#include <WiFi.h>
#include <PubSubClient.h>
#include <WiFiClientSecure.h>
#include <UniversalTelegramBot.h>
// WiFi & MQTT 設定
const char* ssid = "Wokwi-GUEST";
const char* password = "";
const char* mqtt_server = "broker.mqtt-dashboard.com";
// 引腳定義
const int inputPins[] = {34, 35, 32, 33, 25, 26};
const int relayPins[] = {23, 22, 21, 19, 18, 5};
const int ledPin = 2; // 系統狀態燈 (MQTT/Telegram 配置成功)
// 設定變數
String telegramToken = "";
String telegramChatID = "";
String relayMessages[6] = {"輸入<1> 被觸動", "輸入<2> 被觸動", "輸入<3> 被觸動",
"輸入<4> 被觸動", "輸入<5> 被觸動", "輸入<6> 被觸動"};
bool botInitialized = false;
WiFiClient espClient;
PubSubClient mqttClient(espClient);
WiFiClientSecure secured_client;
UniversalTelegramBot *bot = nullptr;
unsigned long lastCheckTime = 0;
const unsigned long checkInterval = 5000; // 5秒檢查一次
void setup() {
Serial.begin(115200);
pinMode(ledPin, OUTPUT);
digitalWrite(ledPin, LOW);
for (int i = 0; i < 6; i++) {
pinMode(inputPins[i], INPUT_PULLDOWN);
pinMode(relayPins[i], OUTPUT);
digitalWrite(relayPins[i], LOW);
}
setup_wifi();
mqttClient.setServer(mqtt_server, 1883);
mqttClient.setCallback(callback);
}
void setup_wifi() {
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) { delay(500); Serial.print("."); }
Serial.println("\nWiFi Connected");
}
void callback(char* topic, byte* payload, unsigned int length) {
String msg = "";
for (int i = 0; i < length; i++) msg += (char)payload[i];
String strTopic = String(topic);
if (strTopic == "alex9ufo/Token") telegramToken = msg;
else if (strTopic == "alex9ufo/chatID") telegramChatID = msg;
else {
for (int i = 1; i <= 6; i++) {
if (strTopic == "alex9ufo/relay" + String(i)) relayMessages[i-1] = msg;
}
}
// 配置成功檢查
if (telegramToken.length() > 20 && telegramChatID.length() > 5) {
digitalWrite(ledPin, HIGH);
secured_client.setInsecure();
if (bot != nullptr) delete bot;
bot = new UniversalTelegramBot(telegramToken, secured_client);
botInitialized = true;
Serial.println("Telegram Bot Initialized");
}
}
void reconnect() {
while (!mqttClient.connected()) {
if (mqttClient.connect("ESP32_Alex_Monitor")) {
mqttClient.subscribe("alex9ufo/Token");
mqttClient.subscribe("alex9ufo/chatID");
for(int i=1; i<=6; i++) mqttClient.subscribe(("alex9ufo/relay" + String(i)).c_str());
} else { delay(5000); }
}
}
void loop() {
if (!mqttClient.connected()) reconnect();
mqttClient.loop();
// 每 5 秒執行一次檢查
if (millis() - lastCheckTime > checkInterval) {
lastCheckTime = millis();
Serial.println("--- 5秒自動檢查中 ---");
for (int i = 0; i < 6; i++) {
if (digitalRead(inputPins[i]) == HIGH) {
// 1. 點亮對應 Relay/LED
digitalWrite(relayPins[i], HIGH);
Serial.printf("Pin %d is HIGH. Relay %d ON.\n", inputPins[i], i+1);
// 2. 如果 Telegram 已配置,發送對應訊息
if (botInitialized) {
if (bot->sendMessage(telegramChatID, relayMessages[i], "")) {
Serial.println("Telegram sent: " + relayMessages[i]);
} else {
Serial.println("Telegram send failed.");
}
}
} else {
// 如果輸入為 LOW,則關閉 Relay
digitalWrite(relayPins[i], LOW);
}
}
}
}
import tkinter as tk
from tkinter import messagebox
import paho.mqtt.client as mqtt
# MQTT 設定
MQTT_BROKER = "broker.mqtt-dashboard.com"
MQTT_PORT = 1883
class MqttGui:
def __init__(self, root):
self.root = root
self.root.title("MQTT Telegram 遠端配置工具")
self.root.geometry("550x680")
# --- 連線狀態區域 ---
self.status_frame = tk.Frame(root)
self.status_frame.pack(pady=10)
self.status_label = tk.Label(self.status_frame, text="MQTT 狀態: 嘗試中...", fg="black")
self.status_label.pack(side=tk.LEFT)
self.canvas = tk.Canvas(self.status_frame, width=20, height=20)
self.status_light = self.canvas.create_oval(5, 5, 15, 15, fill="red")
self.canvas.pack(side=tk.LEFT)
# --- Telegram 基本設定 ---
tk.Label(root, text="[Step 1] 設定 Telegram 憑證", font=('Arial', 10, 'bold')).pack(pady=5)
tk.Label(root, text="主題: alex9ufo/Token").pack()
self.token_entry = tk.Entry(root, width=60)
self.token_entry.insert(0, "80122700986:AAGym1ymK9_rd1eHcTGJWl3mtqHmilxB64_5Zw")
self.token_entry.pack()
tk.Label(root, text="主題: alex9ufo/chatID").pack(pady=(5,0))
self.chatid_entry = tk.Entry(root, width=60)
self.chatid_entry.insert(0, "7916522184369")
self.chatid_entry.pack()
# 修正後的分隔線 (使用 Frame 代替 Separator)
line = tk.Frame(root, height=2, bd=1, relief=tk.SUNKEN)
line.pack(fill='x', padx=10, pady=15)
# --- Relay 訊息設定 ---
tk.Label(root, text="[Step 2] 設定觸發訊息 (可變更)", font=('Arial', 10, 'bold')).pack(pady=5)
self.relay_entries = []
for i in range(1, 7):
frame = tk.Frame(root)
frame.pack(pady=2, anchor="w", padx=40)
tk.Label(frame, text=f"Relay {i} 訊息:", width=12, anchor="e").pack(side=tk.LEFT)
entry = tk.Entry(frame, width=35)
entry.insert(0, f"輸入<{i}> 被觸動")
entry.pack(side=tk.LEFT, padx=5)
self.relay_entries.append(entry)
# --- 按鈕 ---
self.send_btn = tk.Button(root, text="發送所有設定到 ESP32",
bg="#4CAF50", fg="white", font=('Arial', 12, 'bold'),
command=self.publish_all_config)
self.send_btn.pack(pady=25)
# 初始化 MQTT
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
try:
# 這裡增加了一個 try-except 避免沒網路時程式崩潰
self.client.connect(MQTT_BROKER, MQTT_PORT, 60)
self.client.loop_start()
except Exception as e:
print(f"Connect error: {e}")
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.status_label.config(text="MQTT 狀態: 已連線", fg="green")
self.canvas.itemconfig(self.status_light, fill="green")
else:
self.status_label.config(text="MQTT 狀態: 連線失敗", fg="red")
def publish_all_config(self):
try:
# 發送 Token 與 ID (保留 retain 讓 ESP32 斷線回來也能抓到)
self.client.publish("alex9ufo/Token", self.token_entry.get(), retain=True)
self.client.publish("alex9ufo/chatID", self.chatid_entry.get(), retain=True)
# 發送 6 個 Relay 的自定義訊息
for i, entry in enumerate(self.relay_entries):
topic = f"alex9ufo/relay{i+1}"
self.client.publish(topic, entry.get(), retain=True)
messagebox.showinfo("發送成功", "所有設定已透過 MQTT 更新!")
except Exception as e:
messagebox.showerror("錯誤", f"發送失敗: {e}")
if __name__ == "__main__":
root = tk.Tk()
app = MqttGui(root)
root.mainloop()
沒有留言:
張貼留言