官网地址下载
自建mqtt服务器,使用的EMQX搭建,官网地址:
EMQX: 大规模分布式物联网 MQTT 消息服务器
下载地址:选择需要下载服务器版本
这里下载的是window版本的
下载完成解压后,在文件夹打开cmd窗口
开启服务
输入:./emqx start开启服务
查看状态:输入./emqx_ctl status
mqtt服务器就运行起来了
下一步就是发布订阅了,官方文档
本地服务地址
浏览器输入地址http://127.0.0.1:18083 就可以打开mqtt本地的服务后台
账号admin 密码public 之后会提示你修改密码
这个就是打开的首页了
![image.png](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/c5be459eed624393acf11cddaf631696~tplv-k3u1fbpfcp-zoom-in-crop-mark:3024:0:0:0.image)
配置发布订阅测试
之后开始配置客户端进行发布订阅测试
官网描述
以下是 EMQX 中一条消息从发布到发送到订阅者所需的流程及注意事项:
发布者、订阅者连接到 EMQX,需要提供正确的连接地址与认证信息,并确保两者连接到同一个 EMQX;
订阅者订阅主题,默认情况下没有匹配订阅者的消息将被立即丢弃;
发布者发布消息,请确保消息主题能够被订阅者匹配,且订阅者已经订阅对应主题;
订阅者收到消息。
第一步:需要先在这里创建数据源
选择Password-Based
账号类型选择client_id,创建
选择对应的数据库,这里选的是built-in Database
![image.png](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ebd6c43616f44dbf96ddc1c5e347171b~tplv-k3u1fbpfcp-zoom-in-crop-mark:3024:0:0:0.image)
创建用户/客户端clentId
创建完成后,点击用户管理,需要添加两个用户,用来测试发送和接收
点击添加按钮
客户端ID和密码按照自己的需要填写即可,需要记住,等会连接需要使用
创建完成后这里可以看到连接状态
新建完成后,下一步使用mqttx进行连接测试
mqttx连接测试
填写对应的信息,需要创建两个,用于接收和发送
本地的地址写localhost,端口1883,client_id 填写上边创建的,用户名随便写了
填写完成点击连接,绿色表示连接成功
然后添加订阅
设置订阅,两个分别订阅对方的主题,点击发送,就可以接受到了
测试的mqtt服务器就好了
利用Android连接mqtt
1、需要导入
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
复制代码
2、权限需要新增
复制代码
3、开始调用
MqttAndroidClient mqttAndroidClient;
MqttConnectOptions mqttConnectOptions;
private String mqttUrl;
private String mqttClientId;
private String mqttUser;
private String mqttPassword;
private void initMqtt(){
mqttUrl = "tcp://172.26.21.137:1883";
mqttClientId = "mqttx_00969225";
mqttUser = "mqttx_00969225";
mqttPassword = "mqttx_00969225";
mqttAndroidClient = new MqttAndroidClient(getApplicationContext(), mqttUrl, mqttClientId);
mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setAutomaticReconnect(true);//自动重连
mqttConnectOptions.setCleanSession(true);//清楚会话,如果是false,qos设置为1,可以获取到断线消息
mqttConnectOptions.setUserName(mqttUser);
mqttConnectOptions.setPassword(mqttPassword.toCharArray());
mqttConnectOptions.setKeepAliveInterval(60); //保活时间间隔60s
mqttConnectOptions.setConnectionTimeout(30); //连接超时30s
try {
//连接失败,可以使用这种带回调的方式,查看失败的原因
/** mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken iMqttToken) {
MyLogger.getLogger().e("--onSuccess 连接完成---");
}
@Override
public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
MyLogger.getLogger().e("--onFailure ---"+throwable.getMessage());
}
});*/
//连接成功后,如果出现断线重连监听接收不到,需要将这里的连接更换为以下这种方式,就可以收到了
mqttAndroidClient.connect(mqttConnectOptions)
} catch (MqttException e) {
e.printStackTrace();
}
mqttAndroidClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean isReconnect, String s) {
//isReconnect true为重连,默认false
MyLogger.getLogger().e("--connectComplete 连接完成---"+isReconnect);
// mqttAndroidClient.setBufferOpts();
if(isReconnect){
MyLogger.getLogger().e("--connectComplete 重新连接");
}
subscribeToTopic();
}
@Override
public void connectionLost(Throwable throwable) {
MyLogger.getLogger().e("--connectionLost 连接丢失---");
}
//消息收到回调的缺省回调
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
MyLogger.getLogger().e("--messageArrived topic---"+topic);
String json = new String(mqttMessage.getPayload());
MyLogger.getLogger().e("--messageArrived json---"+json);
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
MyLogger.getLogger().e("--deliveryComplete--");
}
});
}
/**
* 订阅
*/
private void subscribeToTopic() {
try {
if (mqttAndroidClient != null) {
mqttAndroidClient.subscribe(topic, new int[]{1,1}, this, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken iMqttToken) {
MyLogger.getLogger().i("订阅成功onSuccess=");
}
@Override
public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
MyLogger.getLogger().i("订阅失败onFailure=");
}
});
}
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
复制代码
就可以利用mqttx进行发送消息测试了
|