概念及理解
RabbitMQ的特点
可靠性:
RabbitMQ 使用如持久化、传输确认、发布确认等机制来保证可靠性。
高可用性:
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
多语言客户端:
RabbitMQ 几乎支持所有的常用语言,比如 Java、Python、Go、PHP、C#…。
灵活的路由:
消息进入队列之前先通过 Exchange 来路由消息。RabbitMQ 提供了一些内置的 Exchange 。针对更多复杂的路由功能,可以将多个 Exchange 绑定在一起,也可以通过插件机制实现自定义的 Exchange 。
消息集群:
多个 RabbitMQ 服务器可以组成一个集群
多种协议:
RabbitMQ 支持多种消息队列协议,比如AMQP(RabbitMQ最早开发就是为了支持AMQP)、 STOMP、MQTT
也包括HTTP
(HTTP当然不是一个消息协议。RabbitMQ可以通过以下三种方式来传输消息:
- 管理插件支持一个简单的HTTP API用于发送和接收消息。主要用于测试诊断的目的,但是针对少量的消息来说还是可靠的。
- Web-STOMP插件使得,在浏览器上可使用基于WebSockets、或者SockJS来控制消息。
- JSON-RPC插件使浏览器通过JSON-RPC和基于AMQP 0-9-1协议的消息进行通信。注意JSON RPC是一个同步的协议,基于异步传输的AMQP的一些功能将使用polling方式进行模拟。
)。
管理界面:
RabbitMQ 提供了一个易用的用户界面,使得用户可以直观的监控和管理exchange、queue等。
跟踪机制:
如果消息异常,RabbitMQ 提供了消息跟踪机制。
插件机制:
RabbitMQ 提供了许多插件,易于扩展,也可以编写自己的插件。
RabbitMQ的几个概念
Exchange:
交换器,管理所有队列。生产者将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中或者丢弃(消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃)。
四种类型:Direct、Fanout、Topic、Headers
(direct类型的交换机,如图)

Queue:
数据队列,是RabbitMQ的内部对象,用于存储消息,正常情况下消息一直在队列里面,等待消费者连接到这个队列将其取走。
RoutingKey:
一个路由规则,如果你现在所有队列的RoutingKey都一样,那么就属于广播消息,而如果不一样,则属于点对点
Message:
消息,由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routingKey(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
Binding:
绑定,用于消息队列和交换器之间的关联。在绑定Exchange与Queue的同时,一般会指定一个bindingKey(指定当前Exchange下,什么样的RoutingKey会被下派到当前绑定的Queue中);消费者将消息发送给Exchange时,一般会指定一个routingKey;当bindingKey与routingKey相匹配时,消息将会被路由到对应的Queue中(如图)。

Connection:
网络连接,比如一个TCP连接。
Channel:
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
Virtual Host:
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机可以理解为不同的用户空间,也就是说各个空间可以有自己的队列信息,有自己的操作用户。
简单应用实例(Springboot)
gradle依赖
compile('org.springframework.boot:spring-boot-starter-amqp')
配置类,加载及初始化mq链接
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
@Component
public class RabbitmqConfig {
@Autowired
private Environment env;
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
/**
* 单一消费者
* @return
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
factory.setTxSize(1);
//手动应答确认
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
//多消费者此处没用到
}
mq操作自定义工具类
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQUtils {
private static RabbitAdmin rabbitAdmin;
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private ObjectMapper objectMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 设置RabbitAdmin链接
*
* @return
*/
public RabbitAdmin getRabbitAdmin() {
if (rabbitAdmin == null) {
rabbitAdmin = new RabbitAdmin(connectionFactory);
}
return rabbitAdmin;
}
/**
* 初始化Queue基本信息(主要设置 queueName)
* @param queueName
* @return
*/
private Queue creatQueueByName(String queueName){
return QueueBuilder.durable(queueName).build();
}
/**
* 初始化Exchange 信息(主要设置 exchangeName)
* @param exchangeName
* @return
*/
private Exchange creatTopicExchange(String exchangeName){
return ExchangeBuilder.topicExchange(exchangeName).durable(true).build();
}
/**
* 创建Queue以及Exchange,并且进行绑定;
* 用户注册时使用,用于绑定用户Appkey 信息;
* 绑定Queue: queueName 以及 Exchange:exchangeName ,并且设置 routingKey;
* @param queueName
* @param exchangeName
* @param routingKey
*/
public void bindingExchange(String queueName,String exchangeName, String routingKey){
Queue queue = this.creatQueueByName(queueName);
Exchange exchange = this.creatTopicExchange(exchangeName);
Binding binding = BindingBuilder
.bind(queue)
.to(exchange)
.with(routingKey)
.noargs();
this.getRabbitAdmin().declareQueue(queue);
this.getRabbitAdmin().declareExchange(exchange);
this.getRabbitAdmin().declareBinding(binding);
}
public void convertAndSend(String exchangeName, String routingKey, T entity) throws JsonProcessingException {
Message message = MessageBuilder
.withBody(objectMapper.writeValueAsBytes(entity))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
message.getMessageProperties()
.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,
MessageProperties.CONTENT_TYPE_JSON);
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
}
}
mq发送消息测试类
@Test
public void testAddToMQ() {
String orderId = "12345";
PayOrder payOrder = new PayOrder();
payOrder.setOrderId(orderId);
String exchangeName = "callback.exchange";
String queueName = "callback.queue";
String routingKey = orderId;
try {
//创建Queue以及Exchange,并且进行绑定;
rabbitMQUtils.bindingExchange(queueName, exchangeName, routingKey);
//发送消息
rabbitMQUtils.convertAndSend(exchangeName, routingKey, payOrder);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
System.out.println("通知已抵达");
}
mq消息消费测试类
import com.rabbitmq.client.Channel;
import lombok.AllArgsConstructor;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.context.annotation.PropertySource;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
* 消息消费者
*
* @author jyswm
* @date 2021/4/13 17:00
*/
@Component
@AllArgsConstructor
@PropertySource(value = "classpath:rabbit-listener.properties")
public class RabbitConsumer {
/**
* 监听接受消息
*/
@RabbitListener(
bindings = {@QueueBinding(
value = @Queue(
value = "${order.callback.queue}", durable = "true"),
exchange = @Exchange(
value = "${order.callback.queue.exchange}", type = ExchangeTypes.TOPIC)
)},containerFactory = "singleListenerContainer")
public void consumeTouristQueueCalendar(@Payload Message message,
@Headers Map headers,
Channel channel) {
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
try {
String val = new String(message.getBody());
System.out.println("消费到的消息:"+val);
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
rabbit-listener.properties
#mq通道监听的配置文件
order.callback.queue.exchange=callback.exchange
order.callback.queue=callback.queue
application.yml
rabbitmq:
host: 192.168.1.2
port: 5672
username: admin
password: admin