前言
最近各种原因导致一个月里很是颓废。昨天突然想写一个聊天的APP,这样可以让我学习到新东西,还能复习安卓开发的知识,太棒了!
因此衍生的一个编写这个APP的思路,觉得很是有趣。就是借用 MQ 之中队列功能进行发送信息。每当注册一个用户之后就为其创建一个消息队列,不管用户在不在线,好友向他发送消息,然后当该用户在线的时候客户端就会去读取对应队列的信息。原本使用 WebSocket 只能在线聊天,这样一个思路完全就可以解决离线接收信息了!
不过用户数量如果过多,从而导致队列过多,这个就是以后考虑的事情了~ 正在试着实现这个思路嘛。
接下来便是实际去阅读文章看看这样可不可行,因此接触到了MQTT协议。简单的来说,这个协议比 RabbitMQ 中默认的 AMQP 更为轻量级,并且完全可以适应我这个思路。
开始
接下来就开始先学习一下这个协议~
启动
默认情况下 RabbitMQ 是不启动的,所以需要我们去手动开启一下。到 RabbitMQ 根目录下的 /sbin/ ,然后输入以下指令。
rabbitmq-plugins enable rabbitmq_mqtt
这样在 mq 的web端 —— Overrview —— Ports and contexts 中就可以看到,它启动在1883端口了。
测试
简单的来说,当我们以用户ID为名称创建一个 “通道” ,之后再创建一个消费者绑定到这个 “通道”,这样就算完成了!
接下来我们使用一个叫 MQTTBox 的软件进行测试。
消息正常接收,并且经过测试,没有消费者的情况下会保留至用户上线进行接收~
在SpringBoot中实现Demo
pom包
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
配置
首先引入相对应的配置。
@Data
@Component
@ConfigurationProperties(prefix = "rabbitmq.mqtt")
public class MqttConfig {
private String UserName;
private String PassWord;
private String defaultTopic;
private String url;
}
消息订阅者相关配置类。
@Slf4j
@Configuration
public class MqttInboundConfig {
@Autowired
private MqttConfig mqttConfig;
//接收通道
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",
mqttConfig.getDefaultTopic());
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
//设置消息质量:0->至多一次;1->至少一次;2->只有一次
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
//处理订阅消息
log.info("接收到信息:{}",message.getPayload());
}
};
}
}
消息消费者相关配置类。
@Configuration
public class MqttOutboundConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { mqttConfig.getUrl()});
options.setUserName(mqttConfig.getUserName());
options.setPassword(mqttConfig.getPassWord().toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
消息推送接口,该接口会被自动实现。
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* 发送消息到默认topic
*/
void sendToMqtt(String payload);
/**
* 发送消息到指定topic
*/
void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
/**
* 发送消息到指定topic并设置QOS
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
测试
最后
完成!接下来的思路就是在Android上实现MQTT信息消费即可!
本页的评论功能已关闭