物联网(IoT)设备的远程监控与控制是现代智能系统的基础需求。本文将介绍一个基于MQTT协议的设备监控与控制系统,该系统由两部分组成:模拟单片机设备和PyQt客户端。我们将详细讨论系统的设计思路、代码实现以及实际应用场景。
该系统基于发布-订阅模式,采用MQTT协议实现设备与客户端之间的双向通信。系统结构如下:
我在之前已经介绍了如何使用EMQT,这里不再重复,有关信息查看这里:
https://www.dong-blog.fun/post/1963
https://www.dong-blog.fun/post/2049
系统使用两个主题进行通信:
设备数据主题(device/gps):设备向此主题发布GPS数据
json展开代码{
  "device_id": "device_001",
  "timestamp": 1692345678,
  "latitude": 30.056789,
  "longitude": 120.123456
}
控制指令主题(device/control):客户端向此主题发布控制命令
json展开代码{
  "command": "UP",
  "timestamp": 1692345700
}
json展开代码{
  "text": "自定义指令内容",
  "timestamp": 1692345720
}
通过区分command和text键,系统能够清晰地区分不同类型的控制信号。
设备模拟器(device_simulator.py)主要功能是:
python展开代码import paho.mqtt.client as mqtt
import json
import time
import random
# MQTT broker configuration
broker = "10.100.80.98"
port = 1883
topic = "device/gps"
control_topic = "device/control"
# Create MQTT client with Version 2 API
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
def on_connect(client, userdata, flags, rc, properties=None):
    print(f"Connected with result code {rc}")
    # Subscribe to control topic
    client.subscribe(control_topic)
    print(f"已订阅控制主题: {control_topic}")
def on_message(client, userdata, msg, properties=None):
    try:
        data = json.loads(msg.payload.decode())
        print(f"收到控制数据: {data}")
        
        # 处理不同类型的消息
        if "command" in data:
            command = data["command"]
            print(f"执行命令: {command}")
            
            # 这里可以添加对命令的具体响应逻辑
            if command == "UP":
                print("设备向上移动")
            elif command == "DOWN":
                print("设备向下移动")
            elif command == "LEFT":
                print("设备向左移动")
            elif command == "RIGHT":
                print("设备向右移动")
            elif command == "CONFIRM":
                print("设备确认操作")
            elif command == "CANCEL":
                print("设备取消操作")
        # 处理自定义文本消息
        elif "text" in data:
            text = data["text"]
            print(f"收到自定义文本: {text}")
            # 处理文本命令的逻辑
            print(f"设备执行文本命令: {text}")
    except Exception as e:
        print(f"处理消息时出错: {e}")
# Set callback functions
client.on_connect = on_connect
client.on_message = on_message
# Connect to broker
try:
    client.connect(broker, port, 60)
    # Start the network loop
    client.loop_start()
    print(f"已连接到MQTT服务器: {broker}:{port}")
except Exception as e:
    print(f"连接MQTT服务器失败: {e}")
    exit(1)
try:
    device_id = "device_001"
    print(f"设备 {device_id} 模拟器开始运行")
    print("按Ctrl+C停止程序")
    
    while True:
        # Generate random GPS coordinates around a specific location
        # Base coordinates: 30.0°N, 120.0°E
        latitude = 30.0 + random.uniform(-0.1, 0.1)
        longitude = 120.0 + random.uniform(-0.1, 0.1)
        
        # Create message payload
        message = {
            "device_id": device_id,
            "timestamp": int(time.time()),
            "latitude": round(latitude, 6),
            "longitude": round(longitude, 6)
        }
        
        # Publish message
        client.publish(topic, json.dumps(message))
        print(f"已发送GPS数据: 纬度={message['latitude']}, 经度={message['longitude']}")
        
        # Wait for 2 seconds before sending next message
        time.sleep(2)
except KeyboardInterrupt:
    print("\n接收到停止信号,正在停止设备模拟器...")
    client.loop_stop()
    client.disconnect()
    print("设备模拟器已停止")
except Exception as e:
    print(f"运行时错误: {e}")
    client.loop_stop()
    client.disconnect() 
设备模拟器使用随机生成的GPS坐标数据,模拟设备移动的场景,同时能够对不同类型的命令做出响应。
日志信息:

PyQt客户端(gps_monitor.py)提供了图形界面,主要功能包括:
客户端的核心在于MQTT与PyQt的集成,我们使用信号-槽机制来解决多线程问题:
python展开代码import sys
import json
import time
import paho.mqtt.client as mqtt
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, 
                            QHBoxLayout, QPushButton, QTextEdit, QLabel)
from PyQt5.QtCore import Qt, QTimer, pyqtSignal, QObject
# 创建一个MQTT信号处理类
class MQTTSignals(QObject):
    message_received = pyqtSignal(dict)
class GPSMonitor(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("GPS Monitor and Controller")
        self.setGeometry(100, 100, 800, 600)
        
        # MQTT setup
        self.broker = "10.100.80.98"
        self.port = 1883
        self.gps_topic = "device/gps"
        self.control_topic = "device/control"
        
        # 创建信号对象
        self.mqtt_signals = MQTTSignals()
        self.mqtt_signals.message_received.connect(self.update_display)
        
        self.setup_mqtt()
        self.setup_ui()
        
    def setup_mqtt(self):
        # 使用MQTT V5客户端
        self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        
        try:
            self.client.connect(self.broker, self.port, 60)
            self.client.loop_start()
        except Exception as e:
            print(f"MQTT连接错误: {e}")
        
    def on_connect(self, client, userdata, flags, rc, properties=None):
        print(f"Connected with result code {rc}")
        self.client.subscribe(self.gps_topic)
        
    def on_message(self, client, userdata, msg, properties=None):
        try:
            data = json.loads(msg.payload.decode())
            # 使用信号发送数据,不直接更新UI
            self.mqtt_signals.message_received.emit(data)
        except Exception as e:
            print(f"Error processing message: {e}")
            
    def setup_ui(self):
        # Main widget and layout
        main_widget = QWidget()
        self.setCentralWidget(main_widget)
        layout = QVBoxLayout(main_widget)
        
        # GPS display area
        self.gps_display = QTextEdit()
        self.gps_display.setReadOnly(True)
        layout.addWidget(self.gps_display)
        
        # Control buttons layout
        button_layout = QHBoxLayout()
        
        # Direction buttons
        self.btn_up = QPushButton("上")
        self.btn_down = QPushButton("下")
        self.btn_left = QPushButton("左")
        self.btn_right = QPushButton("右")
        self.btn_confirm = QPushButton("确认")
        self.btn_cancel = QPushButton("取消")
        
        # Add buttons to layout
        button_layout.addWidget(self.btn_up)
        button_layout.addWidget(self.btn_down)
        button_layout.addWidget(self.btn_left)
        button_layout.addWidget(self.btn_right)
        button_layout.addWidget(self.btn_confirm)
        button_layout.addWidget(self.btn_cancel)
        
        # Command input
        self.command_input = QTextEdit()
        self.command_input.setMaximumHeight(100)
        layout.addWidget(QLabel("命令输入:"))
        layout.addWidget(self.command_input)
        
        # Send button
        self.btn_send = QPushButton("发送")
        layout.addWidget(self.btn_send)
        
        # Add button layout to main layout
        layout.addLayout(button_layout)
        
        # Connect button signals
        self.btn_up.clicked.connect(lambda: self.send_command("UP"))
        self.btn_down.clicked.connect(lambda: self.send_command("DOWN"))
        self.btn_left.clicked.connect(lambda: self.send_command("LEFT"))
        self.btn_right.clicked.connect(lambda: self.send_command("RIGHT"))
        self.btn_confirm.clicked.connect(lambda: self.send_command("CONFIRM"))
        self.btn_cancel.clicked.connect(lambda: self.send_command("CANCEL"))
        self.btn_send.clicked.connect(self.send_custom_command)
        
    def update_display(self, data):
        display_text = f"时间戳: {data['timestamp']}\n"
        display_text += f"设备ID: {data['device_id']}\n"
        display_text += f"纬度: {data['latitude']}\n"
        display_text += f"经度: {data['longitude']}\n"
        display_text += "-" * 50 + "\n"
        
        current_text = self.gps_display.toPlainText()
        self.gps_display.setText(display_text + current_text)
        
    def send_command(self, command):
        message = {
            "command": command,
            "timestamp": int(time.time())
        }
        self.client.publish(self.control_topic, json.dumps(message))
        print(f"已发送命令: {command}")
        
    def send_custom_command(self):
        command = self.command_input.toPlainText().strip()
        if command:
            # 使用"text"作为键名而不是"command"
            message = {
                "text": command,
                "timestamp": int(time.time())
            }
            self.client.publish(self.control_topic, json.dumps(message))
            self.command_input.clear()
            print(f"已发送自定义文本: {command}")
    
    def closeEvent(self, event):
        """窗口关闭时的处理"""
        self.client.loop_stop()
        self.client.disconnect()
        event.accept()
if __name__ == "__main__":
    app = QApplication(sys.argv)
    window = GPSMonitor()
    window.show()
    sys.exit(app.exec_()) 
这种设计解决了MQTT回调线程与GUI线程的冲突问题,确保了应用程序的稳定性。
PyQt运行界面:

MQTT客户端的回调函数在非GUI线程中执行,而PyQt应用程序要求UI更新必须在主线程中进行。为了解决这个问题,我们采用了Qt的信号-槽机制:
MQTTSignals类,定义message_received信号这种方式确保了线程安全,避免了直接在回调函数中更新UI导致的崩溃问题。
该系统具有良好的扩展性:
系统运行后,设备模拟器会每2秒发送一次GPS数据,PyQt客户端会实时显示这些数据,并可通过按钮或自定义文本发送控制命令。设备模拟器收到命令后会打印相应的响应信息。
使用前需要安装必要的依赖:
bash展开代码pip install paho-mqtt PyQt5
要运行系统,请先启动设备模拟器,然后运行PyQt客户端:
bash展开代码python device_simulator.py python gps_monitor.py


本文作者:Dong
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!