尝试使用RabbitMQ中集成的MQTT协议来做IM的基础

尝试使用RabbitMQ中集成的MQTT协议来做IM的基础

前言

最近各种原因导致一个月里很是颓废。昨天突然想写一个聊天的APP,这样可以让我学习到新东西,还能复习安卓开发的知识,太棒了!

因此衍生的一个编写这个APP的思路,觉得很是有趣。就是借用 MQ 之中队列功能进行发送信息。每当注册一个用户之后就为其创建一个消息队列,不管用户在不在线,好友向他发送消息,然后当该用户在线的时候客户端就会去读取对应队列的信息。原本使用 WebSocket 只能在线聊天,这样一个思路完全就可以解决离线接收信息了!

不过用户数量如果过多,从而导致队列过多,这个就是以后考虑的事情了~ 正在试着实现这个思路嘛。

接下来便是实际去阅读文章看看这样可不可行,因此接触到了MQTT协议。简单的来说,这个协议比 RabbitMQ 中默认的 AMQP 更为轻量级,并且完全可以适应我这个思路。

开始

接下来就开始先学习一下这个协议~

启动

默认情况下 RabbitMQ 是不启动的,所以需要我们去手动开启一下。到 RabbitMQ 根目录下的 /sbin/ ,然后输入以下指令。

rabbitmq-plugins enable rabbitmq_mqtt

这样在 mq 的web端 —— Overrview —— Ports and contexts 中就可以看到,它启动在1883端口了。

测试

e3d8abc183f74ca1a6f10e90119cc7e9_tplv-k3u1fbpfcp-zoom-1.png

简单的来说,当我们以用户ID为名称创建一个 “通道” ,之后再创建一个消费者绑定到这个 “通道”,这样就算完成了!

接下来我们使用一个叫 MQTTBox 的软件进行测试。

Snipaste_2020-11-01_18-56-25.png

消息正常接收,并且经过测试,没有消费者的情况下会保留至用户上线进行接收~

在SpringBoot中实现Demo

pom包

    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-mqtt</artifactId>
    </dependency>

配置

首先引入相对应的配置。

@Data
@Component
@ConfigurationProperties(prefix = "rabbitmq.mqtt")
public class MqttConfig {
    private String UserName;
    private String PassWord;
    private String defaultTopic;
    private String url;

}

消息订阅者相关配置类。

@Slf4j
@Configuration
public class MqttInboundConfig {

    @Autowired
    private MqttConfig mqttConfig;

    //接收通道
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",
                        mqttConfig.getDefaultTopic());
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        //设置消息质量:0->至多一次;1->至少一次;2->只有一次
        adapter.setQos(2);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                //处理订阅消息
                log.info("接收到信息:{}",message.getPayload());
            }

        };
    }

}

消息消费者相关配置类。

@Configuration
public class MqttOutboundConfig {

    @Autowired
    private MqttConfig mqttConfig;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { mqttConfig.getUrl()});
        options.setUserName(mqttConfig.getUserName());
        options.setPassword(mqttConfig.getPassWord().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

}

消息推送接口,该接口会被自动实现。

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    /**
     * 发送消息到默认topic
     */
    void sendToMqtt(String payload);

    /**
     * 发送消息到指定topic
     */
    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);

    /**
     * 发送消息到指定topic并设置QOS
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

测试

Snipaste_2020-11-01_20-22-14.png

最后

完成!接下来的思路就是在Android上实现MQTT信息消费即可!