编辑
2025-04-28
单片机
00

目录

基于MQTT的设备监控与控制系统设计
引言
系统架构
通信协议设计
设备模拟器实现
PyQt客户端实现
线程安全设计
系统扩展性
系统运行效果
源码与使用方法

基于MQTT的设备监控与控制系统设计

引言

物联网(IoT)设备的远程监控与控制是现代智能系统的基础需求。本文将介绍一个基于MQTT协议的设备监控与控制系统,该系统由两部分组成:模拟单片机设备和PyQt客户端。我们将详细讨论系统的设计思路、代码实现以及实际应用场景。

系统架构

该系统基于发布-订阅模式,采用MQTT协议实现设备与客户端之间的双向通信。系统结构如下:

  1. 设备模拟器:模拟单片机设备,定期发送GPS位置数据,并接收控制命令
  2. PyQt客户端:显示设备GPS数据,并提供用户界面发送控制命令
  3. MQTT服务器:充当消息代理,负责转发消息

我在之前已经介绍了如何使用EMQT,这里不再重复,有关信息查看这里:

https://www.dong-blog.fun/post/1963

https://www.dong-blog.fun/post/2049

通信协议设计

系统使用两个主题进行通信:

  1. 设备数据主题(device/gps):设备向此主题发布GPS数据

    json
    { "device_id": "device_001", "timestamp": 1692345678, "latitude": 30.056789, "longitude": 120.123456 }
  2. 控制指令主题(device/control):客户端向此主题发布控制命令

    • 按钮控制命令:
      json
      { "command": "UP", "timestamp": 1692345700 }
    • 自定义文本命令:
      json
      { "text": "自定义指令内容", "timestamp": 1692345720 }

通过区分commandtext键,系统能够清晰地区分不同类型的控制信号。

设备模拟器实现

设备模拟器(device_simulator.py)主要功能是:

  1. 模拟GPS数据采集并发布到MQTT服务器
  2. 接收并处理控制命令
python
import paho.mqtt.client as mqtt import json import time import random # MQTT broker configuration broker = "10.100.80.98" port = 1883 topic = "device/gps" control_topic = "device/control" # Create MQTT client with Version 2 API client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) def on_connect(client, userdata, flags, rc, properties=None): print(f"Connected with result code {rc}") # Subscribe to control topic client.subscribe(control_topic) print(f"已订阅控制主题: {control_topic}") def on_message(client, userdata, msg, properties=None): try: data = json.loads(msg.payload.decode()) print(f"收到控制数据: {data}") # 处理不同类型的消息 if "command" in data: command = data["command"] print(f"执行命令: {command}") # 这里可以添加对命令的具体响应逻辑 if command == "UP": print("设备向上移动") elif command == "DOWN": print("设备向下移动") elif command == "LEFT": print("设备向左移动") elif command == "RIGHT": print("设备向右移动") elif command == "CONFIRM": print("设备确认操作") elif command == "CANCEL": print("设备取消操作") # 处理自定义文本消息 elif "text" in data: text = data["text"] print(f"收到自定义文本: {text}") # 处理文本命令的逻辑 print(f"设备执行文本命令: {text}") except Exception as e: print(f"处理消息时出错: {e}") # Set callback functions client.on_connect = on_connect client.on_message = on_message # Connect to broker try: client.connect(broker, port, 60) # Start the network loop client.loop_start() print(f"已连接到MQTT服务器: {broker}:{port}") except Exception as e: print(f"连接MQTT服务器失败: {e}") exit(1) try: device_id = "device_001" print(f"设备 {device_id} 模拟器开始运行") print("按Ctrl+C停止程序") while True: # Generate random GPS coordinates around a specific location # Base coordinates: 30.0°N, 120.0°E latitude = 30.0 + random.uniform(-0.1, 0.1) longitude = 120.0 + random.uniform(-0.1, 0.1) # Create message payload message = { "device_id": device_id, "timestamp": int(time.time()), "latitude": round(latitude, 6), "longitude": round(longitude, 6) } # Publish message client.publish(topic, json.dumps(message)) print(f"已发送GPS数据: 纬度={message['latitude']}, 经度={message['longitude']}") # Wait for 2 seconds before sending next message time.sleep(2) except KeyboardInterrupt: print("\n接收到停止信号,正在停止设备模拟器...") client.loop_stop() client.disconnect() print("设备模拟器已停止") except Exception as e: print(f"运行时错误: {e}") client.loop_stop() client.disconnect()

设备模拟器使用随机生成的GPS坐标数据,模拟设备移动的场景,同时能够对不同类型的命令做出响应。

日志信息:

image.png

PyQt客户端实现

PyQt客户端(gps_monitor.py)提供了图形界面,主要功能包括:

  1. 显示设备GPS数据
  2. 提供方向控制按钮
  3. 支持自定义文本命令输入和发送

客户端的核心在于MQTT与PyQt的集成,我们使用信号-槽机制来解决多线程问题:

python
import sys import json import time import paho.mqtt.client as mqtt from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QTextEdit, QLabel) from PyQt5.QtCore import Qt, QTimer, pyqtSignal, QObject # 创建一个MQTT信号处理类 class MQTTSignals(QObject): message_received = pyqtSignal(dict) class GPSMonitor(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("GPS Monitor and Controller") self.setGeometry(100, 100, 800, 600) # MQTT setup self.broker = "10.100.80.98" self.port = 1883 self.gps_topic = "device/gps" self.control_topic = "device/control" # 创建信号对象 self.mqtt_signals = MQTTSignals() self.mqtt_signals.message_received.connect(self.update_display) self.setup_mqtt() self.setup_ui() def setup_mqtt(self): # 使用MQTT V5客户端 self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) self.client.on_connect = self.on_connect self.client.on_message = self.on_message try: self.client.connect(self.broker, self.port, 60) self.client.loop_start() except Exception as e: print(f"MQTT连接错误: {e}") def on_connect(self, client, userdata, flags, rc, properties=None): print(f"Connected with result code {rc}") self.client.subscribe(self.gps_topic) def on_message(self, client, userdata, msg, properties=None): try: data = json.loads(msg.payload.decode()) # 使用信号发送数据,不直接更新UI self.mqtt_signals.message_received.emit(data) except Exception as e: print(f"Error processing message: {e}") def setup_ui(self): # Main widget and layout main_widget = QWidget() self.setCentralWidget(main_widget) layout = QVBoxLayout(main_widget) # GPS display area self.gps_display = QTextEdit() self.gps_display.setReadOnly(True) layout.addWidget(self.gps_display) # Control buttons layout button_layout = QHBoxLayout() # Direction buttons self.btn_up = QPushButton("上") self.btn_down = QPushButton("下") self.btn_left = QPushButton("左") self.btn_right = QPushButton("右") self.btn_confirm = QPushButton("确认") self.btn_cancel = QPushButton("取消") # Add buttons to layout button_layout.addWidget(self.btn_up) button_layout.addWidget(self.btn_down) button_layout.addWidget(self.btn_left) button_layout.addWidget(self.btn_right) button_layout.addWidget(self.btn_confirm) button_layout.addWidget(self.btn_cancel) # Command input self.command_input = QTextEdit() self.command_input.setMaximumHeight(100) layout.addWidget(QLabel("命令输入:")) layout.addWidget(self.command_input) # Send button self.btn_send = QPushButton("发送") layout.addWidget(self.btn_send) # Add button layout to main layout layout.addLayout(button_layout) # Connect button signals self.btn_up.clicked.connect(lambda: self.send_command("UP")) self.btn_down.clicked.connect(lambda: self.send_command("DOWN")) self.btn_left.clicked.connect(lambda: self.send_command("LEFT")) self.btn_right.clicked.connect(lambda: self.send_command("RIGHT")) self.btn_confirm.clicked.connect(lambda: self.send_command("CONFIRM")) self.btn_cancel.clicked.connect(lambda: self.send_command("CANCEL")) self.btn_send.clicked.connect(self.send_custom_command) def update_display(self, data): display_text = f"时间戳: {data['timestamp']}\n" display_text += f"设备ID: {data['device_id']}\n" display_text += f"纬度: {data['latitude']}\n" display_text += f"经度: {data['longitude']}\n" display_text += "-" * 50 + "\n" current_text = self.gps_display.toPlainText() self.gps_display.setText(display_text + current_text) def send_command(self, command): message = { "command": command, "timestamp": int(time.time()) } self.client.publish(self.control_topic, json.dumps(message)) print(f"已发送命令: {command}") def send_custom_command(self): command = self.command_input.toPlainText().strip() if command: # 使用"text"作为键名而不是"command" message = { "text": command, "timestamp": int(time.time()) } self.client.publish(self.control_topic, json.dumps(message)) self.command_input.clear() print(f"已发送自定义文本: {command}") def closeEvent(self, event): """窗口关闭时的处理""" self.client.loop_stop() self.client.disconnect() event.accept() if __name__ == "__main__": app = QApplication(sys.argv) window = GPSMonitor() window.show() sys.exit(app.exec_())

这种设计解决了MQTT回调线程与GUI线程的冲突问题,确保了应用程序的稳定性。

PyQt运行界面:

image.png

线程安全设计

MQTT客户端的回调函数在非GUI线程中执行,而PyQt应用程序要求UI更新必须在主线程中进行。为了解决这个问题,我们采用了Qt的信号-槽机制:

  1. 创建一个MQTTSignals类,定义message_received信号
  2. 在MQTT回调函数中发射信号
  3. 将信号连接到GUI线程中的槽函数

这种方式确保了线程安全,避免了直接在回调函数中更新UI导致的崩溃问题。

系统扩展性

该系统具有良好的扩展性:

  1. 多设备支持:通过增加设备ID识别,可轻松扩展为多设备系统
  2. 协议扩展:可以增加新的消息类型和命令
  3. 功能增强:可以添加历史数据记录、轨迹显示、报警功能等

系统运行效果

系统运行后,设备模拟器会每2秒发送一次GPS数据,PyQt客户端会实时显示这些数据,并可通过按钮或自定义文本发送控制命令。设备模拟器收到命令后会打印相应的响应信息。

源码与使用方法

使用前需要安装必要的依赖:

bash
pip install paho-mqtt PyQt5

要运行系统,请先启动设备模拟器,然后运行PyQt客户端:

bash
python device_simulator.py python gps_monitor.py
如果对你有用的话,可以打赏哦
打赏
ali pay
wechat pay

本文作者:Dong

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!