编辑
2025-04-28
单片机
00

目录

一、准备工作
二、通过 EMQX Dashboard 配置(推荐)
三、调试

之前博客介绍了如何搭建EMQX:https://www.dong-blog.fun/post/1963

本博客在 EMQX 中配置规则将数据写入 MySQL,可以通过 规则引擎 + 数据桥接 实现。以下是详细步骤:

一、准备工作

  1. 确保 MySQL 已就绪:

docker 启动一个mysql数据库:

bash
mkdir mysql cd mysql docker run -d \ --name mysql_for_emqx \ -p 3306:3306 \ -e MYSQL_ROOT_PASSWORD=yourpassword \ -e MYSQL_DATABASE=emqx_data \ -v $PWD/mysql_data:/var/lib/mysql \ --restart unless-stopped \ docker.1ms.run/mysql:8.0 \ --character-set-server=utf8mb4 \ --collation-server=utf8mb4_unicode_ci

进入mysql数据库交互终端:

# 进入MySQL交互终端 docker exec -it mysql_for_emqx mysql -h 127.0.0.1 -uroot -pyourpassword

• 创建数据库和表(示例表结构):

sql
CREATE DATABASE emqx_data; # 这句不用执行,docker run的时候已经创建了此数据库 USE emqx_data; SHOW TABLES; CREATE TABLE mqtt_messages ( id INT AUTO_INCREMENT PRIMARY KEY, topic VARCHAR(255), payload TEXT, qos TINYINT, client_id VARCHAR(255), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); SHOW TABLES;

看到这个就是表已经创建了:

mysql> SHOW TABLES; +---------------------+ | Tables_in_emqx_data | +---------------------+ | mqtt_messages | +---------------------+ 1 row in set (0.00 sec)
  1. 获取 MySQL 连接信息: • 主机:mysql_host(如 127.0.0.1

    • 端口:3306

    • 用户名:root

    • 密码:yourpassword

    • 数据库名:emqx_data


二、通过 EMQX Dashboard 配置(推荐)

  1. 登录 Dashboard: • 访问 http://10.100.80.98:18083,输入账号密码(默认 admin/public)。

  2. 创建规则:

    • 进入 规则引擎 → 规则 → 创建。

    • SQL 示例(匹配所有test消息):

sql
SELECT topic, payload, qos, clientid AS client_id FROM "test/#"

• 点击 测试 验证规则是否能匹配到 MQTT 消息。

如何测试自己的SQL是不是写对了呢,这样搞,先把启用调试打开,然后设置好主题,然后点击运行调试,在下图这个设置里,相当于我用客户端发了消息到主题(test/haha)。可以看到右边有结果{"client_id":"c_emqx","payload":"{\"msg\": \"hello\"}","qos":1,"topic":"test/haha"} ,说明规则生效了。然后点保存即可。

image.png

  1. 添加 MySQL 数据桥接:

本来应该这么做:

• 在规则详情页点击 添加动作 → 数据桥接 → MySQL。 • 填写 MySQL 连接信息: 名称:mysql_bridge 模式:异步(高性能) MySQL 服务器:mysql_host:3306 数据库:emqx_data 用户名:root 密码:yourpassword SQL 模板:INSERT INTO mqtt_messages(topic, payload, qos, client_id) VALUES(${topic}, ${payload}, ${qos}, ${client_id}) • 关键参数说明: ◦ `${topic}`、`${payload}` 等是规则 SQL 输出的字段。 ◦ 如果字段名冲突,可用 `${payload.key}` 提取 JSON 中的嵌套字段。

但是在免费的EMQX里面,没有数据桥接的功能,数据桥接的功能是企业版要收费的。

所以只能这么做》》》》

首先点击添加动作:

image.png

然后写个python flask代码,启动一个http api接口,用于接收消息并把数据写数据库里去,我在本地直接启动下面这个代码。这个代码会开启一个端口5000的http api接口服务,路由是 /mqtt 。当有请求携带数据POST访问 /mqtt 接口后,这个接口会解析传入的data里的数据topic, payload, qos, client_id,并写入到数据库里。

python
# 安装依赖:pip install flask pymysql from flask import Flask, request import pymysql from datetime import datetime app = Flask(__name__) @app.route('/mqtt', methods=['POST']) def handle_mqtt(): try: data = request.json print("Received data:", data) # 调试用 # 连接MySQL(参数需与你的MySQL容器匹配) conn = pymysql.connect( host='127.0.0.1', # 如果是Docker容器间通信,改用容器名如'mysql_for_emqx' port=3306, user='root', password='yourpassword', database='emqx_data' ) # 写入数据 with conn.cursor() as cursor: sql = """ INSERT INTO mqtt_messages (topic, payload, qos, client_id) VALUES (%s, %s, %s, %s) """ cursor.execute(sql, ( data['topic'], str(data['payload']), data['qos'], data['client_id'] )) conn.commit() return {"status": "success"}, 200 except Exception as e: print("Error:", e) return {"status": "error", "message": str(e)}, 500 if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)

在本地测试这个flask服务是否成功:

bash
curl -X POST http://127.0.0.1:5000/mqtt -H "Content-Type: application/json" -d '{ "topic": "test/123", "payload": "hello", "qos": 1, "client_id": "client_1" }'

看来是ok的:

image.png

curl访问接口,并把数据传入了flask,flask代码里写入到了数据库。我们可以查看mysql的表里是不是多了数据:

# 进入MySQL交互终端 docker exec -it mysql_for_emqx mysql -h 127.0.0.1 -uroot -pyourpassword USE emqx_data; SELECT * FROM emqx_data.mqtt_messages;

看到这个信息就是可以了,可以看到表里面有个数据:

bash
mysql> USE emqx_data; Reading table information for completion of table and column names You can turn off this feature to get a quicker startup with -A Database changed mysql> SELECT * FROM emqx_data.mqtt_messages; +----+----------+---------+------+-----------+---------------------+ | id | topic | payload | qos | client_id | created_at | +----+----------+---------+------+-----------+---------------------+ | 1 | test/123 | hello | 1 | client_1 | 2025-04-28 08:00:05 | +----+----------+---------+------+-----------+---------------------+ 1 row in set (0.00 sec)

添加 HTTP 的连接器,URL写flask的ip和接口,测试连接显示连接成功,则可以保存。在下图里连接器名称我写了mysql_server,这是随便自己写的。URL我写了http://10.100.80.98:5000 ,这是我的ip和flask的服务端口。

image.png

添加动作。名称可以随便写。URL路径写/mqtt , 连接器选刚才创建的连接器。

请求体这么写:

{ "topic": "${topic}", "payload": "${payload}", "qos": ${qos}, "client_id": "${client_id}" }

点测试连接,OK的就没问题,保存好(要么保存要么更新,别忘记保存,优先选更新)。如下图:

image.png

三、调试

简单一点,直接模拟个mqtt客户端,往broker主题上发数据。装好工具:

sudo apt-get update sudo apt-get install mosquitto-clients # 只安装客户端工具(不安装服务端)

直接发一条数据:

bash
mosquitto_pub \ -h 10.100.80.98 \ -p 1883 \ -t "test/abc" \ -m "xxxxxddddd" \ -q 1

这个客户端往broker的主题"test/abc" 发了消息,消息是"xxxxxddddd" 。发了之后,由broker的规则接收到,并通过动作访问了flask,flask代码收到数据会存到mysql里。在emqx里可以进入这里看数据面板:

image.png

也可以看看当前数据库是否有这条数据,有这条数据说明一切ok:

image.png

如果你不想装mosquitto-clients ,也可以用这个代码去直接发布消息,也会在数据库看到此消息。

import paho.mqtt.client as mqtt # 定义MQTT服务器地址和端口 broker = "10.100.80.98" port = 1883 # 创建MQTT客户端 client = mqtt.Client() # 连接到MQTT服务器 client.connect(broker, port, 60) # 发布消息 topic = "test/abc" message = "Hello, EMQX!" client.publish(topic, message) # 断开连接 client.disconnect()
如果对你有用的话,可以打赏哦
打赏
ali pay
wechat pay

本文作者:Dong

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!