之前博客介绍了如何搭建EMQX:https://www.dong-blog.fun/post/1963
本博客在 EMQX 中配置规则将数据写入 MySQL,可以通过 规则引擎 + 数据桥接 实现。以下是详细步骤:
docker 启动一个mysql数据库:
bashmkdir mysql
cd mysql
docker run -d \
--name mysql_for_emqx \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=yourpassword \
-e MYSQL_DATABASE=emqx_data \
-v $PWD/mysql_data:/var/lib/mysql \
--restart unless-stopped \
docker.1ms.run/mysql:8.0 \
--character-set-server=utf8mb4 \
--collation-server=utf8mb4_unicode_ci
进入mysql数据库交互终端:
# 进入MySQL交互终端 docker exec -it mysql_for_emqx mysql -h 127.0.0.1 -uroot -pyourpassword
• 创建数据库和表(示例表结构):
sqlCREATE DATABASE emqx_data; # 这句不用执行,docker run的时候已经创建了此数据库
USE emqx_data;
SHOW TABLES;
CREATE TABLE mqtt_messages (
id INT AUTO_INCREMENT PRIMARY KEY,
topic VARCHAR(255),
payload TEXT,
qos TINYINT,
client_id VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
SHOW TABLES;
看到这个就是表已经创建了:
mysql> SHOW TABLES; +---------------------+ | Tables_in_emqx_data | +---------------------+ | mqtt_messages | +---------------------+ 1 row in set (0.00 sec)
获取 MySQL 连接信息:
• 主机:mysql_host
(如 127.0.0.1
)
• 端口:3306
• 用户名:root
• 密码:yourpassword
• 数据库名:emqx_data
登录 Dashboard:
• 访问 http://10.100.80.98:18083
,输入账号密码(默认 admin/public
)。
创建规则:
• 进入 规则引擎 → 规则 → 创建。
• SQL 示例(匹配所有test消息):
sqlSELECT
topic,
payload,
qos,
clientid AS client_id
FROM
"test/#"
• 点击 测试 验证规则是否能匹配到 MQTT 消息。
如何测试自己的SQL是不是写对了呢,这样搞,先把启用调试打开,然后设置好主题,然后点击运行调试,在下图这个设置里,相当于我用客户端发了消息到主题(test/haha)。可以看到右边有结果{"client_id":"c_emqx","payload":"{\"msg\": \"hello\"}","qos":1,"topic":"test/haha"}
,说明规则生效了。然后点保存即可。
本来应该这么做:
• 在规则详情页点击 添加动作 → 数据桥接 → MySQL。 • 填写 MySQL 连接信息: 名称:mysql_bridge 模式:异步(高性能) MySQL 服务器:mysql_host:3306 数据库:emqx_data 用户名:root 密码:yourpassword SQL 模板:INSERT INTO mqtt_messages(topic, payload, qos, client_id) VALUES(${topic}, ${payload}, ${qos}, ${client_id}) • 关键参数说明: ◦ `${topic}`、`${payload}` 等是规则 SQL 输出的字段。 ◦ 如果字段名冲突,可用 `${payload.key}` 提取 JSON 中的嵌套字段。
但是在免费的EMQX里面,没有数据桥接的功能,数据桥接的功能是企业版要收费的。
所以只能这么做》》》》
首先点击添加动作:
然后写个python flask代码,启动一个http api接口,用于接收消息并把数据写数据库里去,我在本地直接启动下面这个代码。这个代码会开启一个端口5000的http api接口服务,路由是 /mqtt 。当有请求携带数据POST访问 /mqtt 接口后,这个接口会解析传入的data里的数据topic, payload, qos, client_id
,并写入到数据库里。
python# 安装依赖:pip install flask pymysql
from flask import Flask, request
import pymysql
from datetime import datetime
app = Flask(__name__)
@app.route('/mqtt', methods=['POST'])
def handle_mqtt():
try:
data = request.json
print("Received data:", data) # 调试用
# 连接MySQL(参数需与你的MySQL容器匹配)
conn = pymysql.connect(
host='127.0.0.1', # 如果是Docker容器间通信,改用容器名如'mysql_for_emqx'
port=3306,
user='root',
password='yourpassword',
database='emqx_data'
)
# 写入数据
with conn.cursor() as cursor:
sql = """
INSERT INTO mqtt_messages (topic, payload, qos, client_id)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(sql, (
data['topic'],
str(data['payload']),
data['qos'],
data['client_id']
))
conn.commit()
return {"status": "success"}, 200
except Exception as e:
print("Error:", e)
return {"status": "error", "message": str(e)}, 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
在本地测试这个flask服务是否成功:
bashcurl -X POST http://127.0.0.1:5000/mqtt -H "Content-Type: application/json" -d '{
"topic": "test/123",
"payload": "hello",
"qos": 1,
"client_id": "client_1"
}'
看来是ok的:
curl访问接口,并把数据传入了flask,flask代码里写入到了数据库。我们可以查看mysql的表里是不是多了数据:
# 进入MySQL交互终端 docker exec -it mysql_for_emqx mysql -h 127.0.0.1 -uroot -pyourpassword USE emqx_data; SELECT * FROM emqx_data.mqtt_messages;
看到这个信息就是可以了,可以看到表里面有个数据:
bashmysql> USE emqx_data;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> SELECT * FROM emqx_data.mqtt_messages;
+----+----------+---------+------+-----------+---------------------+
| id | topic | payload | qos | client_id | created_at |
+----+----------+---------+------+-----------+---------------------+
| 1 | test/123 | hello | 1 | client_1 | 2025-04-28 08:00:05 |
+----+----------+---------+------+-----------+---------------------+
1 row in set (0.00 sec)
添加 HTTP 的连接器,URL写flask的ip和接口,测试连接显示连接成功,则可以保存。在下图里连接器名称我写了mysql_server,这是随便自己写的。URL我写了http://10.100.80.98:5000
,这是我的ip和flask的服务端口。
添加动作。名称可以随便写。URL路径写/mqtt
, 连接器选刚才创建的连接器。
请求体这么写:
{ "topic": "${topic}", "payload": "${payload}", "qos": ${qos}, "client_id": "${client_id}" }
点测试连接,OK的就没问题,保存好(要么保存要么更新,别忘记保存,优先选更新)。如下图:
简单一点,直接模拟个mqtt客户端,往broker主题上发数据。装好工具:
sudo apt-get update sudo apt-get install mosquitto-clients # 只安装客户端工具(不安装服务端)
直接发一条数据:
bashmosquitto_pub \
-h 10.100.80.98 \
-p 1883 \
-t "test/abc" \
-m "xxxxxddddd" \
-q 1
这个客户端往broker的主题"test/abc"
发了消息,消息是"xxxxxddddd"
。发了之后,由broker的规则接收到,并通过动作访问了flask,flask代码收到数据会存到mysql里。在emqx里可以进入这里看数据面板:
也可以看看当前数据库是否有这条数据,有这条数据说明一切ok:
如果你不想装mosquitto-clients
,也可以用这个代码去直接发布消息,也会在数据库看到此消息。
import paho.mqtt.client as mqtt # 定义MQTT服务器地址和端口 broker = "10.100.80.98" port = 1883 # 创建MQTT客户端 client = mqtt.Client() # 连接到MQTT服务器 client.connect(broker, port, 60) # 发布消息 topic = "test/abc" message = "Hello, EMQX!" client.publish(topic, message) # 断开连接 client.disconnect()
本文作者:Dong
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!