RabbitMQ基本使用

使用方式

直接使用AMQP客户端

Maven依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.8.0</version>
</dependency>

生产者

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.200.129");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
String meassage = "hello world";
channel.basicPublish("", "hello", null, message.getBytes());
channel.close();
connection.close();

消费者

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.200.129");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicConsume("hello", true, (consumerTag, message) -> {
    System.out.println(new String(message.getBody()));
}, consumerTag -> {
    System.out.println("被取消");
});

Spring整合

Maven依赖

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.1.8.RELEASE</version>
</dependency>

生产者

<rabbit:connection-factory id="connectionFactory" host="172.16.98.133" port="5672" username="admin" password="123" virtual-host="/"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue id="queue" name="queue" auto-declare="true"/>
<rabbit:fanout-exchange id="fanoutExchange" name="fanoutExchange" auto-declare="true">
    <rabbit:bindings>
        <rabbit:binding queue="queue"/>
    </rabbit:bindings>
</rabbit:fanout-exchange>
<rabbit:topic-exchange id="topicExchange" name="topicExchange" auto-declare="true">
    <rabbit:bindings>
        <rabbit:binding pattern="lazy.*" queue="queue"/>
    </rabbit:bindings>
</rabbit:topic-exchange>
rabbitTemplate.convertAndSend("queue", "hello");
rabbitTemplate.convertAndSend("fanoutExchange", "", "hello");
rabbitTemplate.convertAndSend("topicExchange", "lazy.xxx", "hello");

消费者

<bean id="springQueueListener" class="com.oliverclio.SpringQueueListener"/>

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="springQueueListener" queue-names="queue"/>
</rabbit:listener-container>
public class SpringQueueListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        System.out.println(new String(message.getBody()));
    }
}

SpringBoot整合

Maven依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置

spring:
    rabbitmq:
        host: 172.16.98.133
        port: 5672
        username: admin
        password: 123
        virtual-host: /
        listener:  
          simple:  
            acknowledge-mode: manual

生产者

@Bean
public Exchange exchange() {
    return ExchangeBuilder.topicExchange("topicExchange").durable(true).build();
}

@Bean
public Queue queue() {
    return QueueBuilder.durable("queue").build();
}

@Bean
public Binding binding(@Qualifier("queue") Queue queue, @Qualifier("exchange") Exchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with("lazy.#").noargs();
}

消费者

@Component
public class RabbitListener {
    @RabbitListener(queues="queue")
    public void listenerQueue(Message message) {
        // 获取消息体
        String body = new String(message.getBody());
        // 获取delivery tag
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
    }

    @SneakyThrows  
    @RabbitListener(queues = "canalQueue")  
    public void movieItemsConsumer(@Payload String body,
        @Header(value = AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) {  
        System.out.println(body);  
        System.out.println(deliveryTag);  
        // 消费者手动确认
        channel.basicAck(deliveryTag, true);  
    }
}

工作模式

普通模式

使用默认交换机,以队列名称为路由键。一个队列对应一个生产者和一个消费者

生产者

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.200.129");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
String meassage = "hello world";
channel.basicPublish("", "hello", null, message.getBytes());
channel.close();
connection.close();

消费者

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.200.129");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicConsume("hello", true, (consumerTag, message) -> {
    System.out.println(new String(message.getBody()));
}, consumerTag -> {
    System.out.println("被取消");
});

工作队列模式

与普通模式相似,不同之处是一个队列对应多个消费者,一个消息只能被处理一次,多个消费者默认轮询消费消息

订阅模式

使用fanout类型的交换机,与该交换机绑定的队列全部都接收到信息

生产者

channel.exchangeDeclare("fanoutExchange", BuiltExchangeType.FANOUT, true, false, false, null);
channel.queueDeclare("queue1", true, false, false, null);
channel.queueDeclare("queue2", true, false, false, null);
channel.queueBind("queue1", "fanoutExchange", "");
channel.queueBind("queue2", "fanoutExchange", "");
String message = "hello";
channel.basicPublish("fanoutExchange", "", null, message.getBytes());
channel.close();
connection.close();

路由模式

使用direct类型的交换机,根据路由键路由到对应的队列,路由键需精确匹配

生产者

channel.exchangeDeclare("directExchange", BuiltExchangeType.DIRECT, true, false, false, null);
channel.queueDeclare("queue1", true, false, false, null);
channel.queueDeclare("queue2", true, false, false, null);
channel.queueBind("queue1", "directExchange", "orange");
channel.queueBind("queue2", "directExchange", "black");
channel.queueBind("queue2", "directExchange", "green");
String message = "hello";
channel.basicPublish("directExchange", "orange", null, message.getBytes());
channel.close();
connection.close();

通配符模式

使用topic类型的交换机,根据路由键路由到对应的队列,路由键可使用通配符匹配

  • * 精确匹配一个单词
  • # 匹配0个或多个单词

生产者

channel.exchangeDeclare("topicExchange", BuiltExchangeType.TOPIC, true, false, false, null);
channel.queueDeclare("queue1", true, false, false, null);
channel.queueDeclare("queue2", true, false, false, null);
channel.queueBind("queue1", "topicExchange", "*.orange.*");
channel.queueBind("queue2", "topicExchange", "*.*.rabbit");
channel.queueBind("queue2", "topicExchange", "lazy.#");
String message = "hello";
channel.basicPublish("topicExchange", "lazy.xxx", null, message.getBytes());
channel.close();
connection.close();

消息可靠投递

生产者回调

<rabbit:connection-factory id="connectionFactory" host="172.16.98.133" port="5672" username="admin" password="123" virtual-host="/" publisher-confirms="true" publisher-returns="true" />

正常回调,ack表示交换机是否成功接收到消息

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    
});

交换机无法投递到队列时的回调

rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

});

消费者确认

<bean id="springQueueListener" class="com.oliverclio.SpringQueueListener"/>

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
    <rabbit:listener ref="springQueueListener" queue-names="queue"/>
</rabbit:listener-container>
public class SpringQueueListener implements ChannelAwareMessageListener {
    @Override
    public void onMessage(Message message, Channel channel) {
        System.out.println(new String(message.getBody()));
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            handleBusiness();
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            channel.basicNack(deliveryTag, true, true);
        }
    }
}

消费者限流

消费者拉取数控制

前面拉取的消息被确认后,再继续后面的拉取

<bean id="springQueueListener" class="com.oliverclio.SpringQueueListener"/>

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch= "1">
    <rabbit:listener ref="springQueueListener" queue-names="queue"/>
</rabbit:listener-container>

消息存活时间

队列统一设置

<rabbit:queue id="queue" name="queue" auto-declare="true">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
    </rabbit:queue-arguments>
</rabbit:queue>

消息单一设置

rabbitTempalte.convertAndSend("exchange", "ttl.test", "hello", message -> {
    message.getMessageProperties().setExpiration("5000");
})

死信队列

消息成为死信

  • 队列已满,信息被丢弃
  • 消费者否定确认,且不重新进入原有队列,即requeue = false
  • 消息在队列中过期

队列绑定死信交换机

<rabbit:queue id="queue" name="queue" auto-declare="true">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="dlxExchange"></entry>
        <entry key="x-dead-letter-routing-key" value="dlx.test"></entry>
    </rabbit:queue-arguments>
</rabbit:queue>

延迟队列

TTL + 死信队列实现,消息发送到一个无消费者的队列,经过TTL定义的时间长度后,进入死信队列,然后被消费

持久化

队列持久化

channel.queueDeclare("hello", true, false, false, null);

消息持久化

channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());