RabbitMQ的初步认识与简单使用

时间:2021-6-19 作者:qvyue

概念及理解

RabbitMQ的特点

可靠性:

RabbitMQ 使用如持久化、传输确认、发布确认等机制来保证可靠性。

高可用性:

队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

多语言客户端:

RabbitMQ 几乎支持所有的常用语言,比如 Java、Python、Go、PHP、C#…。

灵活的路由:

消息进入队列之前先通过 Exchange 来路由消息。RabbitMQ 提供了一些内置的 Exchange 。针对更多复杂的路由功能,可以将多个 Exchange 绑定在一起,也可以通过插件机制实现自定义的 Exchange 。

消息集群:

多个 RabbitMQ 服务器可以组成一个集群

多种协议:

RabbitMQ 支持多种消息队列协议,比如AMQP(RabbitMQ最早开发就是为了支持AMQP)、 STOMP、MQTT
也包括HTTP

(HTTP当然不是一个消息协议。RabbitMQ可以通过以下三种方式来传输消息:

  • 管理插件支持一个简单的HTTP API用于发送和接收消息。主要用于测试诊断的目的,但是针对少量的消息来说还是可靠的。
  • Web-STOMP插件使得,在浏览器上可使用基于WebSockets、或者SockJS来控制消息。
  • JSON-RPC插件使浏览器通过JSON-RPC和基于AMQP 0-9-1协议的消息进行通信。注意JSON RPC是一个同步的协议,基于异步传输的AMQP的一些功能将使用polling方式进行模拟。

)。

管理界面:

RabbitMQ 提供了一个易用的用户界面,使得用户可以直观的监控和管理exchange、queue等。

跟踪机制:

如果消息异常,RabbitMQ 提供了消息跟踪机制。

插件机制:

RabbitMQ 提供了许多插件,易于扩展,也可以编写自己的插件。

RabbitMQ的几个概念

Exchange:

交换器,管理所有队列。生产者将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中或者丢弃(消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃)。

四种类型:Direct、Fanout、Topic、Headers

(direct类型的交换机,如图)

RabbitMQ的初步认识与简单使用
Direct类型交换器实例图.jpg
Queue:

数据队列,是RabbitMQ的内部对象,用于存储消息,正常情况下消息一直在队列里面,等待消费者连接到这个队列将其取走。

RoutingKey:

一个路由规则,如果你现在所有队列的RoutingKey都一样,那么就属于广播消息,而如果不一样,则属于点对点

Message:

消息,由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routingKey(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

Binding:

绑定,用于消息队列和交换器之间的关联。在绑定Exchange与Queue的同时,一般会指定一个bindingKey(指定当前Exchange下,什么样的RoutingKey会被下派到当前绑定的Queue中);消费者将消息发送给Exchange时,一般会指定一个routingKey;当bindingKey与routingKey相匹配时,消息将会被路由到对应的Queue中(如图)。

RabbitMQ的初步认识与简单使用
Binding示意图.png
Connection:

网络连接,比如一个TCP连接。

Channel:

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

Virtual Host:

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机可以理解为不同的用户空间,也就是说各个空间可以有自己的队列信息,有自己的操作用户。

简单应用实例(Springboot)

gradle依赖

 compile('org.springframework.boot:spring-boot-starter-amqp')

配置类,加载及初始化mq链接

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

@Component
public class RabbitmqConfig {

 @Autowired
 private Environment env;

 @Autowired
 private CachingConnectionFactory connectionFactory;

 @Autowired
 private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

 /**
 * 单一消费者
 * @return
 */
 @Bean(name = "singleListenerContainer")
 public SimpleRabbitListenerContainerFactory listenerContainer(){
 SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
 factory.setConnectionFactory(connectionFactory);
 factory.setMessageConverter(new Jackson2JsonMessageConverter());
 factory.setConcurrentConsumers(1);
 factory.setMaxConcurrentConsumers(1);
 factory.setPrefetchCount(1);
 factory.setTxSize(1);
 //手动应答确认
 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return factory;
 }

 //多消费者此处没用到


}

mq操作自定义工具类

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQUtils {

 private static RabbitAdmin rabbitAdmin;
 @Autowired
 private ConnectionFactory connectionFactory;
 @Autowired
 private ObjectMapper objectMapper;
 @Autowired
 private RabbitTemplate rabbitTemplate;

 /**
 * 设置RabbitAdmin链接
 *
 * @return
 */
 public RabbitAdmin getRabbitAdmin() {
 if (rabbitAdmin == null) {
 rabbitAdmin = new RabbitAdmin(connectionFactory);
 }
 return rabbitAdmin;
 }

 /**
 * 初始化Queue基本信息(主要设置 queueName)
 * @param queueName
 * @return
 */
 private Queue creatQueueByName(String queueName){
 return QueueBuilder.durable(queueName).build();
 }

 /**
 * 初始化Exchange 信息(主要设置 exchangeName)
 * @param exchangeName
 * @return
 */
 private Exchange creatTopicExchange(String exchangeName){
 return ExchangeBuilder.topicExchange(exchangeName).durable(true).build();
 }

 /**
 * 创建Queue以及Exchange,并且进行绑定;
 * 用户注册时使用,用于绑定用户Appkey 信息;
 * 绑定Queue: queueName 以及 Exchange:exchangeName ,并且设置 routingKey;
 * @param queueName
 * @param exchangeName
 * @param routingKey
 */
 public void bindingExchange(String queueName,String exchangeName, String routingKey){

 Queue  queue = this.creatQueueByName(queueName);
 Exchange exchange = this.creatTopicExchange(exchangeName);
 Binding binding = BindingBuilder
 .bind(queue)
 .to(exchange)
 .with(routingKey)
 .noargs();
 this.getRabbitAdmin().declareQueue(queue);
 this.getRabbitAdmin().declareExchange(exchange);
 this.getRabbitAdmin().declareBinding(binding);
 }

 public  void convertAndSend(String exchangeName, String routingKey, T entity) throws JsonProcessingException {
 Message message =  MessageBuilder
 .withBody(objectMapper.writeValueAsBytes(entity))
 .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
 .build();
 message.getMessageProperties()
 .setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,
 MessageProperties.CONTENT_TYPE_JSON);
 rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
 }


}

mq发送消息测试类

@Test
 public void testAddToMQ() {
 String orderId = "12345";
 PayOrder payOrder = new PayOrder();
 payOrder.setOrderId(orderId);
 String exchangeName = "callback.exchange";
 String queueName = "callback.queue";
 String routingKey = orderId;
 try {
 //创建Queue以及Exchange,并且进行绑定;
 rabbitMQUtils.bindingExchange(queueName, exchangeName, routingKey);
 //发送消息
 rabbitMQUtils.convertAndSend(exchangeName, routingKey, payOrder);
 } catch (JsonProcessingException e) {
 e.printStackTrace();
 }
 System.out.println("通知已抵达");
 }

mq消息消费测试类

import com.rabbitmq.client.Channel;
import lombok.AllArgsConstructor;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.context.annotation.PropertySource;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

/**
 * 消息消费者
 *
 * @author jyswm
 * @date 2021/4/13 17:00
 */
@Component
@AllArgsConstructor
@PropertySource(value = "classpath:rabbit-listener.properties")
public class RabbitConsumer {

 /**
 * 监听接受消息
 */
 @RabbitListener(
 bindings = {@QueueBinding(
 value = @Queue(
 value = "${order.callback.queue}", durable = "true"),
 exchange = @Exchange(
 value = "${order.callback.queue.exchange}", type = ExchangeTypes.TOPIC)
 )},containerFactory = "singleListenerContainer")
 public void consumeTouristQueueCalendar(@Payload Message message,
 @Headers Map headers,
 Channel channel) {
 Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
 try {
 String val = new String(message.getBody());
 System.out.println("消费到的消息:"+val);
 channel.basicAck(deliveryTag, false);
 } catch (IOException e) {
 e.printStackTrace();
 }
 }

}

rabbit-listener.properties

#mq通道监听的配置文件
order.callback.queue.exchange=callback.exchange
order.callback.queue=callback.queue

application.yml

rabbitmq:
 host: 192.168.1.2
 port: 5672
 username: admin
 password: admin
声明:本文内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:qvyue@qq.com 进行举报,并提供相关证据,工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。