SpringCloud集群整合Rabbitmq、延迟队列、Stream实战

时间:2021-7-3 作者:qvyue

代码github地址:https://github.com/tzb1017432592/springcloud-lean
SpringCloud高可用集群的搭建在我之前的博客已经写有,我这里启动了三个服务中心、三个服务网关、两个服务提供客户端、一个消费端、一个nginx服务,在此集群之上我们整合rabbitmq,这里我会启动一个mq的生产者端和mq消费者端

SpringCloud集群整合Rabbitmq、延迟队列、Stream实战

引入依赖

     org.springframework.bootspring-boot-starter-amqporg.springframework.bootspring-boot-starter-weborg.springframework.cloudspring-cloud-starter-netflix-eureka-clientorg.springframework.amqpspring-rabbit-testtest

yml配置,eureka-rabbitmq-producer是服务的生产者端,eureka-rabbitmq-consumer是服务的消费者端,

server:
  port: 8951

eureka:
  client:
    service-url:
      defaultZone: http://eureka-cluster01:8761/eureka/,http://eureka-cluster02:8762/eureka/
spring:
  application:
    name: eureka-rabbitmq-producer
  profiles: mq01
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: cloudtest

---
server:
  port: 8952

eureka:
  client:
    service-url:
      defaultZone: http://eureka-cluster01:8761/eureka/,http://eureka-cluster02:8762/eureka/
spring:
  application:
    name: eureka-rabbitmq-consumer
  profiles: mq02
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: cloudtest

配置好idea启动的参数,根据不同的参数启动不同的环境

SpringCloud集群整合Rabbitmq、延迟队列、Stream实战

代码创建服务生产者端的exchange、queue、binding、routingkey

@Configuration
public class RabbitmqConfig {
    public final static String EXCHANGE_TEST = "exchange_test";
    public final static String QUEUE_TEST = "queue_test";
    public final static String ROUTINGKEY_TEST = "cloudtest.*";

    @Bean(EXCHANGE_TEST)
    @Profile(value = "mq01")
    public Exchange exchange() {
        return ExchangeBuilder
                .topicExchange(EXCHANGE_TEST)
                .durable(true)
                .build();
    }

    @Bean(QUEUE_TEST)
    @Profile(value = "mq01")
    public Queue queue() {
        return new Queue(QUEUE_TEST);
    }

    @Bean
    @Profile(value = "mq01")
    public Binding binding(
            @Qualifier(EXCHANGE_TEST) Exchange exchange
            , @Qualifier(QUEUE_TEST) Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(ROUTINGKEY_TEST)
                .noargs();
    }
}

发送消息

@RestController
@RequestMapping("/rabbitmq/producer/")
@Profile(value = "mq01")
public class TestController {

    public final static String ROUTING_KEY1 = "cloudtest.test1";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("producer1/{message}")
    public String producer1(@PathVariable("message") String message) {
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TEST, ROUTING_KEY1, message);
        return "success";
    }
}

服务消费者端监听队列

@Component
@Profile("mq02")
@Slf4j
public class RabbitmqConsumer {

    @RabbitListener(queues = RabbitmqConfig.QUEUE_TEST)
    public void Consumer1(String payload, Message message){
        log.info("payload:【{}】,message:【{}】",payload,message.getMessageProperties().toString());
    }
}

配置好网关

server:
  port: 8851

eureka:
  client:
    service-url:
      defaultZone: http://eureka-cluster01:8761/eureka/,http://eureka-cluster02:8762/eureka/
spring:
  application:
    name: eureka-gateway
  cloud:
    gateway:
      discovery:
        locator:
          enabled: true
      routes:
        - id: eureka-client01
          uri: lb://eureka-client01
          predicates:
            - Path=/client01/**

        - id: eureka-consumer
          uri: lb://eureka-consumer
          predicates:
            - Path=/consumer/**

        - id: eureka-rabbitmq-producer
          uri: lb://eureka-rabbitmq-producer
          predicates:
            - Path=/rabbitmq/producer/**

        - id: eureka-rabbitmq-consumer
          uri: lb://eureka-rabbitmq-consumer
          predicates:
            - Path=/rabbitmq/**

  profiles: gw01

启动服务,服务已经启动好了

SpringCloud集群整合Rabbitmq、延迟队列、Stream实战

发送消息

SpringCloud集群整合Rabbitmq、延迟队列、Stream实战
SpringCloud集群整合Rabbitmq、延迟队列、Stream实战

延迟队列

延迟队列有许多应用场景如:自动收货、自动取消订单、自动发布文章等,实现延迟队列需要下载插件
下载地址:https://www.rabbitmq.com/community-plugins.html

SpringCloud集群整合Rabbitmq、延迟队列、Stream实战

将下载好的插件放到rabbitmq安装目录的plugins目录中执行命令

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
SpringCloud集群整合Rabbitmq、延迟队列、Stream实战

代码

@Configuration
public class RabbitmqDelayConfig {
    public final static String DELAY_EXCHANGE_TEST = "delay_exchange_test";
    public final static String DELAY_QUEUE_TEST = "delay_queue_test";
    public final static String ROUTINGKEY_DELAY = "delay_cloudtest.*";

    @Bean(DELAY_EXCHANGE_TEST)
    @Profile(value = "mq01")
    public Exchange exchange(){
        return ExchangeBuilder
                .topicExchange(DELAY_EXCHANGE_TEST)
                .delayed()          // 开启支持延迟消息
                .durable(true)
                .build();
    }

    // 创建队列
    @Bean(DELAY_QUEUE_TEST)
    @Profile(value = "mq01")
    public Queue queue(){
        return new Queue(DELAY_QUEUE_TEST);
    }

    // 队列绑定交换机
    @Bean
    @Profile(value = "mq01")
    public Binding delayBinding(
            @Qualifier(DELAY_QUEUE_TEST) Queue queue,
            @Qualifier(DELAY_EXCHANGE_TEST) Exchange exchange){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(ROUTINGKEY_DELAY)
                .noargs();      
    }
}

发送延迟消息

@RestController
@RequestMapping("/rabbitmq/producer")
@Profile("mq01")
@Slf4j
public class TestController {

    public final static String ROUTING_KEY1 = "cloudtest.test1";
    public final static String DELAY_ROUTING_KEY1 = "delay_cloudtest.test1";

    @Resource
    private RabbitTemplate rabbitTemplate;


    @GetMapping("delayproducer1/{message}")
    public String delayproducer1(@PathVariable("message") String message) {
        rabbitTemplate.convertAndSend(
                RabbitmqDelayConfig.DELAY_EXCHANGE_TEST
                , DELAY_ROUTING_KEY1
                , message
                , m -> {
                    // 设置消息的持久
                    m.getMessageProperties()
                            .setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    // 设置消息延迟的时间,单位ms毫秒
                    m.getMessageProperties()
                            .setDelay(5000);
                    return m;
                });
        log.info("发送延迟消息成功【{}】",new Date());
        return "success";
    }
}

接收延迟消息

@Component
@Profile("mq02")
@Slf4j
public class RabbitmqConsumer {
    @RabbitListener(queues = RabbitmqDelayConfig.DELAY_QUEUE_TEST)
    public void Consumer2(String payload, Message message){
        log.info("payload:【{}】,message:【{}】",payload,message.getMessageProperties().toString());
        log.info("接收延迟消息成功【{}】",new Date());
    }
}

访问延迟队列发送接口

SpringCloud集群整合Rabbitmq、延迟队列、Stream实战

我们代码中设置的延迟队列是5秒,时间正好相差5秒

SpringCloud集群整合Rabbitmq、延迟队列、Stream实战
SpringCloud集群整合Rabbitmq、延迟队列、Stream实战

SpringStream

引入依赖

        org.springframework.cloudspring-cloud-starter-stream-rabbit
生产者端

生产者端 yml

server:
  port: 8951

eureka:
  client:
    service-url:
      defaultZone: http://eureka-cluster01:8761/eureka/,http://eureka-cluster02:8762/eureka/
spring:
  application:
    name: eureka-rabbitmq-producer
  profiles: mq01
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: cloudtest
  cloud:
    stream:
      bindings:
        #定义生产者通道
        myOutput:
          #定义交换机名称
          destination:  stream_exchange

代码
定义生产者通道

@Component
@Profile("mq01")
public interface OutputStreamChannel {

    String OUTPUT="myOutput";

    @Output(OUTPUT)
    MessageChannel output();

}

定义生产者生产消息逻辑

@Service
@EnableBinding(OutputStreamChannel.class)
@Profile("mq01")
@Slf4j
public class ProducerStreamServiceImpl implements ProducerStreamService {

    @Autowired
    private OutputStreamChannel outputStreamChannel;

    @Override
    public boolean rabbitMQSend(String msg) {
        return outputStreamChannel
                .output()
                .send(MessageBuilder.withPayload(msg).build());
    }
}

生产者控制类

@RestController
@RequestMapping("/rabbitmq/producer")
@Profile("mq01")
@Slf4j
public class TestController {
    @Resource
    private ProducerStreamService producerStreamService;

    @GetMapping("streamproducer1/{message}")
    public String streamproducer1(@PathVariable("message") String message) {
        return producerStreamService.rabbitMQSend(message)?"success":"fail";
    }

}
消费者端

yml

server:
  port: 8952

eureka:
  client:
    service-url:
      defaultZone: http://eureka-cluster01:8761/eureka/,http://eureka-cluster02:8762/eureka/
spring:
  application:
    name: eureka-rabbitmq-consumer
  profiles: mq02
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: cloudtest
  cloud:
    stream:
      bindings:
        #定义消费者通道
        myInput:
          #定义交换机名称
          destination:  stream_exchange

消费者通道

@Component
@Profile("mq02")
public interface InputStreamChannel {
    String INPUT="myInput";

    @Input(INPUT)
    SubscribableChannel input();
}

定义消费者消费消息逻辑

@Service
@EnableBinding(InputStreamChannel.class)
@Profile("mq02")
@Slf4j
public class ConsumerStreamServiceImpl implements ConsumerStreamService {

    @Override
    @StreamListener(InputStreamChannel.INPUT)
    public void rabbitMQreceive(String msg) {
        log.info("stream消费到的消息:【{}】",msg);
    }
}

启动项目发送消息

SpringCloud集群整合Rabbitmq、延迟队列、Stream实战

消息成功发送,并且已经被消费到

SpringCloud集群整合Rabbitmq、延迟队列、Stream实战
声明:本文内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:qvyue@qq.com 进行举报,并提供相关证据,工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。