+ *
+ * 此处为模版类定义 Jackson消息转换器 + * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调 即消息发送到exchange ack + * ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调 即消息发送不到任何一个队列中 ack + * + * @return the amqp template + */ + // @Primary + @Bean + public AmqpTemplate amqpTemplate() { + Logger log = LoggerFactory.getLogger(RabbitTemplate.class); + // 使用jackson 消息转换器 + rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); + rabbitTemplate.setEncoding("UTF-8"); + // 消息发送失败返回到队列中,yml需要配置 publisher-returns: true + rabbitTemplate.setMandatory(true); + rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { + String correlationId = message.getMessageProperties().getCorrelationIdString(); + log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey); + }); + // 消息确认,yml需要配置 publisher-confirms: true + rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { + if (ack) { + log.debug("消息发送到exchange成功,id: {}", correlationData.getId()); + } else { + log.debug("消息发送到exchange失败,原因: {}", cause); + } + }); + return rabbitTemplate; + } + + /* ----------------------------------------------------------------------------Direct exchange test--------------------------------------------------------------------------- */ + + /** + * 声明Direct交换机 支持持久化. + * + * @return the exchange + */ + @Bean("directExchange") + public Exchange directExchange() { + return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build(); + } + + /** + * 声明一个队列 支持持久化. + * + * @return the queue + */ + @Bean("directQueue") + public Queue directQueue() { + return QueueBuilder.durable("DIRECT_QUEUE").build(); + } + + /** + * 通过绑定键 将指定队列绑定到一个指定的交换机 . + * + * @param queue the queue + * @param exchange the exchange + * @return the binding + */ + @Bean + public Binding directBinding(@Qualifier("directQueue") Queue queue, + @Qualifier("directExchange") Exchange exchange) { + return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs(); + } + + /* ----------------------------------------------------------------------------Fanout exchange test--------------------------------------------------------------------------- */ + + /** + * 声明 fanout 交换机. + * + * @return the exchange + */ + @Bean("fanoutExchange") + public FanoutExchange fanoutExchange() { + return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build(); + } + + /** + * Fanout queue A. + * + * @return the queue + */ + @Bean("fanoutQueueA") + public Queue fanoutQueueA() { + return QueueBuilder.durable("FANOUT_QUEUE_A").build(); + } + + /** + * Fanout queue B . + * + * @return the queue + */ + @Bean("fanoutQueueB") + public Queue fanoutQueueB() { + return QueueBuilder.durable("FANOUT_QUEUE_B").build(); + } + + /** + * 绑定队列A 到Fanout 交换机. + * + * @param queue the queue + * @param fanoutExchange the fanout exchange + * @return the binding + */ + @Bean + public Binding bindingA(@Qualifier("fanoutQueueA") Queue queue, + @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { + return BindingBuilder.bind(queue).to(fanoutExchange); + } + + /** + * 绑定队列B 到Fanout 交换机. + * + * @param queue the queue + * @param fanoutExchange the fanout exchange + * @return the binding + */ + @Bean + public Binding bindingB(@Qualifier("fanoutQueueB") Queue queue, + @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { + return BindingBuilder.bind(queue).to(fanoutExchange); + } +} diff --git a/springboot-rabbitmq/src/main/java/com/xncoding/pos/mq/Receiver.java b/springboot-rabbitmq/src/main/java/com/xncoding/pos/mq/Receiver.java new file mode 100644 index 0000000..5e3ed28 --- /dev/null +++ b/springboot-rabbitmq/src/main/java/com/xncoding/pos/mq/Receiver.java @@ -0,0 +1,61 @@ +package com.xncoding.pos.mq; + +import com.rabbitmq.client.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * 消息监听器 + * + * @author XiongNeng + * @version 1.0 + * @since 2018/3/1 + */ +@Component +public class Receiver { + private static final Logger log = LoggerFactory.getLogger(Receiver.class); + + /** + * FANOUT广播队列监听一. + * + * @param message the message + * @param channel the channel + * @throws IOException the io exception 这里异常需要处理 + */ + @RabbitListener(queues = {"FANOUT_QUEUE_A"}) + public void on(Message message, Channel channel) throws IOException { + channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); + log.debug("FANOUT_QUEUE_A " + new String(message.getBody())); + } + + /** + * FANOUT广播队列监听二. + * + * @param message the message + * @param channel the channel + * @throws IOException the io exception 这里异常需要处理 + */ + @RabbitListener(queues = {"FANOUT_QUEUE_B"}) + public void t(Message message, Channel channel) throws IOException { + channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); + log.debug("FANOUT_QUEUE_B " + new String(message.getBody())); + } + + /** + * DIRECT模式. + * + * @param message the message + * @param channel the channel + * @throws IOException the io exception 这里异常需要处理 + */ + @RabbitListener(queues = {"DIRECT_QUEUE"}) + public void message(Message message, Channel channel) throws IOException { + channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); + log.debug("DIRECT " + new String(message.getBody())); + } +} diff --git a/springboot-rabbitmq/src/main/java/com/xncoding/pos/service/SenderService.java b/springboot-rabbitmq/src/main/java/com/xncoding/pos/service/SenderService.java new file mode 100644 index 0000000..c2f5461 --- /dev/null +++ b/springboot-rabbitmq/src/main/java/com/xncoding/pos/service/SenderService.java @@ -0,0 +1,43 @@ +package com.xncoding.pos.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.support.CorrelationData; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.UUID; + +/** + * 消息发送服务 + */ +@Service +public class SenderService { + private Logger logger = LoggerFactory.getLogger(this.getClass()); + @Resource + private RabbitTemplate rabbitTemplate; + + /** + * 测试广播模式. + * + * @param p the p + * @return the response entity + */ + public void broadcast(String p) { + CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); + rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData); + } + + /** + * 测试Direct模式. + * + * @param p the p + * @return the response entity + */ + public void direct(String p) { + CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); + rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY", p, correlationData); + } + +} diff --git a/springboot-rabbitmq/src/main/resources/application.yml b/springboot-rabbitmq/src/main/resources/application.yml new file mode 100644 index 0000000..6ac166a --- /dev/null +++ b/springboot-rabbitmq/src/main/resources/application.yml @@ -0,0 +1,38 @@ +########################################################## +################## 所有profile共有的配置 ################# +########################################################## + +################### spring配置 ################### +spring: + profiles: + active: dev + +--- + +##################################################################### +######################## 开发环境profile ########################## +##################################################################### +spring: + profiles: dev + rabbitmq: + host: 123.207.66.156 + port: 5672 + username: spring + password: 123456 + publisher-confirms: true #支持发布确认 + publisher-returns: true #支持发布返回 + listener: + simple: + acknowledge-mode: manual #采用手动应答 + concurrency: 1 #指定最小的消费者数量 + max-concurrency: 1 #指定最大的消费者数量 + retry: + enabled: true #是否支持重试 + +logging: + level: + ROOT: INFO + com: + xncoding: DEBUG + file: E:/logs/app.log + diff --git a/springboot-rabbitmq/src/test/java/com/xncoding/service/SenderServiceTest.java b/springboot-rabbitmq/src/test/java/com/xncoding/service/SenderServiceTest.java new file mode 100644 index 0000000..5a25ff8 --- /dev/null +++ b/springboot-rabbitmq/src/test/java/com/xncoding/service/SenderServiceTest.java @@ -0,0 +1,35 @@ +package com.xncoding.service; + +import com.xncoding.pos.Application; +import com.xncoding.pos.service.SenderService; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + + +/** + * UserServiceTest + * + * @author XiongNeng + * @version 1.0 + * @since 2018/2/2 + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = Application.class) +public class SenderServiceTest { + @Autowired + private SenderService senderService; + + @Test + public void testCache() { + // 测试广播模式 + senderService.broadcast("同学们集合啦!"); + // 测试Direct模式 + senderService.direct("定点消息"); + } +}