使用方式
直接使用AMQP客户端
Maven依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
生产者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.200.129");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
String meassage = "hello world";
channel.basicPublish("", "hello", null, message.getBytes());
channel.close();
connection.close();
消费者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.200.129");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicConsume("hello", true, (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
}, consumerTag -> {
System.out.println("被取消");
});
Spring整合
Maven依赖
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
生产者
<rabbit:connection-factory id="connectionFactory" host="172.16.98.133" port="5672" username="admin" password="123" virtual-host="/"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue id="queue" name="queue" auto-declare="true"/>
<rabbit:fanout-exchange id="fanoutExchange" name="fanoutExchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="queue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<rabbit:topic-exchange id="topicExchange" name="topicExchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="lazy.*" queue="queue"/>
</rabbit:bindings>
</rabbit:topic-exchange>
rabbitTemplate.convertAndSend("queue", "hello");
rabbitTemplate.convertAndSend("fanoutExchange", "", "hello");
rabbitTemplate.convertAndSend("topicExchange", "lazy.xxx", "hello");
消费者
<bean id="springQueueListener" class="com.oliverclio.SpringQueueListener"/>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="springQueueListener" queue-names="queue"/>
</rabbit:listener-container>
public class SpringQueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println(new String(message.getBody()));
}
}
SpringBoot整合
Maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置
spring:
rabbitmq:
host: 172.16.98.133
port: 5672
username: admin
password: 123
virtual-host: /
listener:
simple:
acknowledge-mode: manual
生产者
@Bean
public Exchange exchange() {
return ExchangeBuilder.topicExchange("topicExchange").durable(true).build();
}
@Bean
public Queue queue() {
return QueueBuilder.durable("queue").build();
}
@Bean
public Binding binding(@Qualifier("queue") Queue queue, @Qualifier("exchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("lazy.#").noargs();
}
消费者
@Component
public class RabbitListener {
@RabbitListener(queues="queue")
public void listenerQueue(Message message) {
// 获取消息体
String body = new String(message.getBody());
// 获取delivery tag
long deliveryTag = message.getMessageProperties().getDeliveryTag();
}
@SneakyThrows
@RabbitListener(queues = "canalQueue")
public void movieItemsConsumer(@Payload String body,
@Header(value = AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) {
System.out.println(body);
System.out.println(deliveryTag);
// 消费者手动确认
channel.basicAck(deliveryTag, true);
}
}
工作模式
普通模式
使用默认交换机,以队列名称为路由键。一个队列对应一个生产者和一个消费者
生产者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.200.129");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
String meassage = "hello world";
channel.basicPublish("", "hello", null, message.getBytes());
channel.close();
connection.close();
消费者
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.200.129");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicConsume("hello", true, (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
}, consumerTag -> {
System.out.println("被取消");
});
工作队列模式
与普通模式相似,不同之处是一个队列对应多个消费者,一个消息只能被处理一次,多个消费者默认轮询消费消息
订阅模式
使用fanout类型的交换机,与该交换机绑定的队列全部都接收到信息
生产者
channel.exchangeDeclare("fanoutExchange", BuiltExchangeType.FANOUT, true, false, false, null);
channel.queueDeclare("queue1", true, false, false, null);
channel.queueDeclare("queue2", true, false, false, null);
channel.queueBind("queue1", "fanoutExchange", "");
channel.queueBind("queue2", "fanoutExchange", "");
String message = "hello";
channel.basicPublish("fanoutExchange", "", null, message.getBytes());
channel.close();
connection.close();
路由模式
使用direct类型的交换机,根据路由键路由到对应的队列,路由键需精确匹配
生产者
channel.exchangeDeclare("directExchange", BuiltExchangeType.DIRECT, true, false, false, null);
channel.queueDeclare("queue1", true, false, false, null);
channel.queueDeclare("queue2", true, false, false, null);
channel.queueBind("queue1", "directExchange", "orange");
channel.queueBind("queue2", "directExchange", "black");
channel.queueBind("queue2", "directExchange", "green");
String message = "hello";
channel.basicPublish("directExchange", "orange", null, message.getBytes());
channel.close();
connection.close();
通配符模式
使用topic类型的交换机,根据路由键路由到对应的队列,路由键可使用通配符匹配
*
精确匹配一个单词#
匹配0个或多个单词
生产者
channel.exchangeDeclare("topicExchange", BuiltExchangeType.TOPIC, true, false, false, null);
channel.queueDeclare("queue1", true, false, false, null);
channel.queueDeclare("queue2", true, false, false, null);
channel.queueBind("queue1", "topicExchange", "*.orange.*");
channel.queueBind("queue2", "topicExchange", "*.*.rabbit");
channel.queueBind("queue2", "topicExchange", "lazy.#");
String message = "hello";
channel.basicPublish("topicExchange", "lazy.xxx", null, message.getBytes());
channel.close();
connection.close();
消息可靠投递
生产者回调
<rabbit:connection-factory id="connectionFactory" host="172.16.98.133" port="5672" username="admin" password="123" virtual-host="/" publisher-confirms="true" publisher-returns="true" />
正常回调,ack
表示交换机是否成功接收到消息
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
});
交换机无法投递到队列时的回调
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
});
消费者确认
<bean id="springQueueListener" class="com.oliverclio.SpringQueueListener"/>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<rabbit:listener ref="springQueueListener" queue-names="queue"/>
</rabbit:listener-container>
public class SpringQueueListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) {
System.out.println(new String(message.getBody()));
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
handleBusiness();
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
channel.basicNack(deliveryTag, true, true);
}
}
}
消费者限流
消费者拉取数控制
前面拉取的消息被确认后,再继续后面的拉取
<bean id="springQueueListener" class="com.oliverclio.SpringQueueListener"/>
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch= "1">
<rabbit:listener ref="springQueueListener" queue-names="queue"/>
</rabbit:listener-container>
消息存活时间
队列统一设置
<rabbit:queue id="queue" name="queue" auto-declare="true">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
消息单一设置
rabbitTempalte.convertAndSend("exchange", "ttl.test", "hello", message -> {
message.getMessageProperties().setExpiration("5000");
})
死信队列
消息成为死信
- 队列已满,信息被丢弃
- 消费者否定确认,且不重新进入原有队列,即
requeue = false
- 消息在队列中过期
队列绑定死信交换机
<rabbit:queue id="queue" name="queue" auto-declare="true">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="dlxExchange"></entry>
<entry key="x-dead-letter-routing-key" value="dlx.test"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
延迟队列
TTL + 死信队列实现,消息发送到一个无消费者的队列,经过TTL定义的时间长度后,进入死信队列,然后被消费
持久化
队列持久化
channel.queueDeclare("hello", true, false, false, null);
消息持久化
channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
PREVIOUSSpring Boot项目起步配置
NEXTclasspath相关的几点总结