From dab9c8968f08c54d9e738c2ce33334ba56a5ba3d Mon Sep 17 00:00:00 2001 From: yidao620 Date: Thu, 1 Mar 2018 10:59:05 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BB=A3=E7=A0=81=E7=A4=BA?= =?UTF-8?q?=E4=BE=8Bspringboot-rabbitmq?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 4 +- springboot-rabbitmq/.gitignore | 14 ++ springboot-rabbitmq/LICENSE | 20 +++ springboot-rabbitmq/README.md | 11 ++ springboot-rabbitmq/pom.xml | 87 ++++++++++ springboot-rabbitmq/run.sh | 72 +++++++++ .../java/com/xncoding/pos/Application.java | 12 ++ .../com/xncoding/pos/config/RabbitConfig.java | 152 ++++++++++++++++++ .../java/com/xncoding/pos/mq/Receiver.java | 61 +++++++ .../xncoding/pos/service/SenderService.java | 43 +++++ .../src/main/resources/application.yml | 38 +++++ .../xncoding/service/SenderServiceTest.java | 35 ++++ 12 files changed, 547 insertions(+), 2 deletions(-) create mode 100644 springboot-rabbitmq/.gitignore create mode 100644 springboot-rabbitmq/LICENSE create mode 100644 springboot-rabbitmq/README.md create mode 100644 springboot-rabbitmq/pom.xml create mode 100644 springboot-rabbitmq/run.sh create mode 100644 springboot-rabbitmq/src/main/java/com/xncoding/pos/Application.java create mode 100644 springboot-rabbitmq/src/main/java/com/xncoding/pos/config/RabbitConfig.java create mode 100644 springboot-rabbitmq/src/main/java/com/xncoding/pos/mq/Receiver.java create mode 100644 springboot-rabbitmq/src/main/java/com/xncoding/pos/service/SenderService.java create mode 100644 springboot-rabbitmq/src/main/resources/application.yml create mode 100644 springboot-rabbitmq/src/test/java/com/xncoding/service/SenderServiceTest.java diff --git a/README.md b/README.md index 3dd9c32..5ab333e 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ app-manage-api | [实现需要认证授权访问的RESTful API接 每个子项目都可以单独运行,都是打包成jar包后,通过使用内置jetty容器执行,有3种方式运行。:point_right: 1. 在IDEA里面直接运行Application.java的main函数。 -2. 另一种方式是执行`mvn clean package`命令后传到linux服务器上面,通过命令`java -jar xxx.jar`方式运行 +2. 另一种方式是执行`mvn clean package`命令后传到linux服务器上面,通过命令`java -Xms64m -Xmx1024m -jar xxx.jar`方式运行 3. 在linux服务器上面,配置好jdk、maven、git命令后,通过`git clone sb-xxx`拉取工程后,执行`./run.sh start test`命令来执行 注:每个子项目有自己的README.md文件,告诉你该怎么初始化环境,比如准备好数据库SQL文件等。 @@ -66,7 +66,7 @@ app-manage-api | [实现需要认证授权访问的RESTful API接 ``` xml 4.0.0 -sb-cache +springboot-cache war ``` diff --git a/springboot-rabbitmq/.gitignore b/springboot-rabbitmq/.gitignore new file mode 100644 index 0000000..1e3c5bb --- /dev/null +++ b/springboot-rabbitmq/.gitignore @@ -0,0 +1,14 @@ +# 此为注释– 将被Git 忽略 +# /结尾表示是目录,忽略目录和目录下的所有件 +# /开头表示根目录,否则是.gitignore的相对目录 +# !开头表示反选 +.idea/ +target/ +*.iml +*.ipr +*.iws +*.log +.svn/ +.project +rebel.xml +.rebel-remote.xml.* diff --git a/springboot-rabbitmq/LICENSE b/springboot-rabbitmq/LICENSE new file mode 100644 index 0000000..83cd47d --- /dev/null +++ b/springboot-rabbitmq/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2018 Xiong Neng + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/springboot-rabbitmq/README.md b/springboot-rabbitmq/README.md new file mode 100644 index 0000000..3a6af79 --- /dev/null +++ b/springboot-rabbitmq/README.md @@ -0,0 +1,11 @@ +## 集成消息队列RabbitMQ + +消息队列RabbitMQ的使用例子,演示Direct模式和广播发送模式。 + +测试用例:`com.xncoding.service.SenderServiceTest.java` + +## 许可证 + +Copyright (c) 2018 Xiong Neng + +基于 MIT 协议发布: diff --git a/springboot-rabbitmq/pom.xml b/springboot-rabbitmq/pom.xml new file mode 100644 index 0000000..5cba70e --- /dev/null +++ b/springboot-rabbitmq/pom.xml @@ -0,0 +1,87 @@ + + + 4.0.0 + + com.xncoding + springboot-rabbitmq + 1.0.0-SNAPSHOT + jar + + springboot-rabbitmq + 集成消息队列RabbitMQ + + + org.springframework.boot + spring-boot-starter-parent + 1.5.9.RELEASE + + + + + UTF-8 + UTF-8 + 1.8 + + + + + org.springframework.boot + spring-boot-starter-amqp + + + + org.springframework.boot + spring-boot-starter-test + + + com.vaadin.external.google + android-json + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.6.1 + + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.20 + + true + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + + + src/main/resources + + + src/main/java + + **/*.xml + + + + + + diff --git a/springboot-rabbitmq/run.sh b/springboot-rabbitmq/run.sh new file mode 100644 index 0000000..955efb1 --- /dev/null +++ b/springboot-rabbitmq/run.sh @@ -0,0 +1,72 @@ +#!/bin/bash +# 项目自动更新脚本 +# 先clone相应的分支下来: +# git clone ssh://git@120.24.173.142:7999/xxx.git +# 远程调试启动: +# nohup java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 -Xms512m -Xmx1024m -jar -Dspring.profiles.active=${profile} ${jarfile} >/dev/null 2>&1 & + +function start { + profile="$1" + echo "启动环境profile=${profile}" + jarfile=$(ls target/*.jar) + if [[ "$?" == "0" ]]; then + stop $profile $jarfile + fi + branch=$(git branch |awk '{print $2}') + git pull origin ${branch} + echo "更新完代码开始重新打包" + mvn clean && mvn clean && mvn package -DskipTests=true + if [[ "$?" != "0" ]]; then + echo "编译出错,退出!" + exit 1 + fi + echo "nohup java -Xms512m -Xmx1024m -jar -Dspring.profiles.active=${profile} ${jarfile} >/dev/null 2>&1 &" + nohup java -Xms512m -Xmx1024m -jar -Dspring.profiles.active=${profile} ${jarfile} >/dev/null 2>&1 & + echo "启动应用中,请查看日志文件..." +} + +function stop { + profile="$1" + jarfile="$2" + ps aux | grep "${jarfile}" | grep "spring.profiles.active=${profile}" | grep -v grep > /dev/null + if [[ "$?" == "0" ]]; then + echo "该应用还在跑,我先停了它" + pid=$(ps aux | grep "${jarfile}" | grep "spring.profiles.active=${profile}" | grep -v grep |awk '{print $2}') + if [[ "$pid" != "" ]]; then + kill -9 $pid + fi + echo "停止应用成功..." + fi +} + +if [[ "$1" == "start" ]]; then + if [[ "$#" < 2 ]]; then + echo "请输入正确参数:./epay.sh start {profile}" + exit 1 + fi + profile="$2" + if [[ "$profile" != "dev" && "$profile" != "test" && "$profile" != "show" && "$profile" != "production" ]]; then + echo "参数错误,请输入正确的profile参数,使用方法:" + echo "./epay.sh start {profile} ==> 启动应用,{profile}取值:dev|test|show|production" + exit 1 + fi + start "${profile}" +elif [[ "$1" == "stop" ]]; then + if [[ "$#" < 2 ]]; then + echo "请输入正确参数:./epay.sh stop {profile}" + exit 1 + fi + profile="$2" + if [[ "$profile" != "dev" && "$profile" != "test" && "$profile" != "show" && "$profile" != "production" ]]; then + echo "参数错误,请输入正确的profile参数,使用方法:" + echo "./epay.sh stop {profile} ==> 停止应用,{profile}取值:dev|test|show|production" + exit 1 + fi + jarfile=$(ls target/*.jar) + stop $profile $jarfile +else + echo "参数错误,使用方法:{}参数是必填的,[]参数可选" + echo "./epay.sh start {profile} ==> 启动应用,{profile}取值:dev|test|show|production" + echo "./epay.sh stop {profile} ==> 停止应用,{profile}取值:dev|test|show|production" + exit 1 +fi diff --git a/springboot-rabbitmq/src/main/java/com/xncoding/pos/Application.java b/springboot-rabbitmq/src/main/java/com/xncoding/pos/Application.java new file mode 100644 index 0000000..adc4c48 --- /dev/null +++ b/springboot-rabbitmq/src/main/java/com/xncoding/pos/Application.java @@ -0,0 +1,12 @@ +package com.xncoding.pos; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Application { + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + +} diff --git a/springboot-rabbitmq/src/main/java/com/xncoding/pos/config/RabbitConfig.java b/springboot-rabbitmq/src/main/java/com/xncoding/pos/config/RabbitConfig.java new file mode 100644 index 0000000..2d28f1b --- /dev/null +++ b/springboot-rabbitmq/src/main/java/com/xncoding/pos/config/RabbitConfig.java @@ -0,0 +1,152 @@ +package com.xncoding.pos.config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import javax.annotation.Resource; + +/** + * RabbitConfig + * + * @author XiongNeng + * @version 1.0 + * @since 2018/3/1 + */ +@Configuration +public class RabbitConfig { + @Resource + private RabbitTemplate rabbitTemplate; + + /** + * 定制化amqp模版 可根据需要定制多个 + *

+ *

+ * 此处为模版类定义 Jackson消息转换器 + * ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调 即消息发送到exchange ack + * ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调 即消息发送不到任何一个队列中 ack + * + * @return the amqp template + */ + // @Primary + @Bean + public AmqpTemplate amqpTemplate() { + Logger log = LoggerFactory.getLogger(RabbitTemplate.class); + // 使用jackson 消息转换器 + rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); + rabbitTemplate.setEncoding("UTF-8"); + // 消息发送失败返回到队列中,yml需要配置 publisher-returns: true + rabbitTemplate.setMandatory(true); + rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { + String correlationId = message.getMessageProperties().getCorrelationIdString(); + log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey); + }); + // 消息确认,yml需要配置 publisher-confirms: true + rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { + if (ack) { + log.debug("消息发送到exchange成功,id: {}", correlationData.getId()); + } else { + log.debug("消息发送到exchange失败,原因: {}", cause); + } + }); + return rabbitTemplate; + } + + /* ----------------------------------------------------------------------------Direct exchange test--------------------------------------------------------------------------- */ + + /** + * 声明Direct交换机 支持持久化. + * + * @return the exchange + */ + @Bean("directExchange") + public Exchange directExchange() { + return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build(); + } + + /** + * 声明一个队列 支持持久化. + * + * @return the queue + */ + @Bean("directQueue") + public Queue directQueue() { + return QueueBuilder.durable("DIRECT_QUEUE").build(); + } + + /** + * 通过绑定键 将指定队列绑定到一个指定的交换机 . + * + * @param queue the queue + * @param exchange the exchange + * @return the binding + */ + @Bean + public Binding directBinding(@Qualifier("directQueue") Queue queue, + @Qualifier("directExchange") Exchange exchange) { + return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs(); + } + + /* ----------------------------------------------------------------------------Fanout exchange test--------------------------------------------------------------------------- */ + + /** + * 声明 fanout 交换机. + * + * @return the exchange + */ + @Bean("fanoutExchange") + public FanoutExchange fanoutExchange() { + return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build(); + } + + /** + * Fanout queue A. + * + * @return the queue + */ + @Bean("fanoutQueueA") + public Queue fanoutQueueA() { + return QueueBuilder.durable("FANOUT_QUEUE_A").build(); + } + + /** + * Fanout queue B . + * + * @return the queue + */ + @Bean("fanoutQueueB") + public Queue fanoutQueueB() { + return QueueBuilder.durable("FANOUT_QUEUE_B").build(); + } + + /** + * 绑定队列A 到Fanout 交换机. + * + * @param queue the queue + * @param fanoutExchange the fanout exchange + * @return the binding + */ + @Bean + public Binding bindingA(@Qualifier("fanoutQueueA") Queue queue, + @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { + return BindingBuilder.bind(queue).to(fanoutExchange); + } + + /** + * 绑定队列B 到Fanout 交换机. + * + * @param queue the queue + * @param fanoutExchange the fanout exchange + * @return the binding + */ + @Bean + public Binding bindingB(@Qualifier("fanoutQueueB") Queue queue, + @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { + return BindingBuilder.bind(queue).to(fanoutExchange); + } +} diff --git a/springboot-rabbitmq/src/main/java/com/xncoding/pos/mq/Receiver.java b/springboot-rabbitmq/src/main/java/com/xncoding/pos/mq/Receiver.java new file mode 100644 index 0000000..5e3ed28 --- /dev/null +++ b/springboot-rabbitmq/src/main/java/com/xncoding/pos/mq/Receiver.java @@ -0,0 +1,61 @@ +package com.xncoding.pos.mq; + +import com.rabbitmq.client.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import java.io.IOException; + +/** + * 消息监听器 + * + * @author XiongNeng + * @version 1.0 + * @since 2018/3/1 + */ +@Component +public class Receiver { + private static final Logger log = LoggerFactory.getLogger(Receiver.class); + + /** + * FANOUT广播队列监听一. + * + * @param message the message + * @param channel the channel + * @throws IOException the io exception 这里异常需要处理 + */ + @RabbitListener(queues = {"FANOUT_QUEUE_A"}) + public void on(Message message, Channel channel) throws IOException { + channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); + log.debug("FANOUT_QUEUE_A " + new String(message.getBody())); + } + + /** + * FANOUT广播队列监听二. + * + * @param message the message + * @param channel the channel + * @throws IOException the io exception 这里异常需要处理 + */ + @RabbitListener(queues = {"FANOUT_QUEUE_B"}) + public void t(Message message, Channel channel) throws IOException { + channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); + log.debug("FANOUT_QUEUE_B " + new String(message.getBody())); + } + + /** + * DIRECT模式. + * + * @param message the message + * @param channel the channel + * @throws IOException the io exception 这里异常需要处理 + */ + @RabbitListener(queues = {"DIRECT_QUEUE"}) + public void message(Message message, Channel channel) throws IOException { + channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); + log.debug("DIRECT " + new String(message.getBody())); + } +} diff --git a/springboot-rabbitmq/src/main/java/com/xncoding/pos/service/SenderService.java b/springboot-rabbitmq/src/main/java/com/xncoding/pos/service/SenderService.java new file mode 100644 index 0000000..c2f5461 --- /dev/null +++ b/springboot-rabbitmq/src/main/java/com/xncoding/pos/service/SenderService.java @@ -0,0 +1,43 @@ +package com.xncoding.pos.service; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.support.CorrelationData; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.UUID; + +/** + * 消息发送服务 + */ +@Service +public class SenderService { + private Logger logger = LoggerFactory.getLogger(this.getClass()); + @Resource + private RabbitTemplate rabbitTemplate; + + /** + * 测试广播模式. + * + * @param p the p + * @return the response entity + */ + public void broadcast(String p) { + CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); + rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData); + } + + /** + * 测试Direct模式. + * + * @param p the p + * @return the response entity + */ + public void direct(String p) { + CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); + rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY", p, correlationData); + } + +} diff --git a/springboot-rabbitmq/src/main/resources/application.yml b/springboot-rabbitmq/src/main/resources/application.yml new file mode 100644 index 0000000..6ac166a --- /dev/null +++ b/springboot-rabbitmq/src/main/resources/application.yml @@ -0,0 +1,38 @@ +########################################################## +################## 所有profile共有的配置 ################# +########################################################## + +################### spring配置 ################### +spring: + profiles: + active: dev + +--- + +##################################################################### +######################## 开发环境profile ########################## +##################################################################### +spring: + profiles: dev + rabbitmq: + host: 123.207.66.156 + port: 5672 + username: spring + password: 123456 + publisher-confirms: true #支持发布确认 + publisher-returns: true #支持发布返回 + listener: + simple: + acknowledge-mode: manual #采用手动应答 + concurrency: 1 #指定最小的消费者数量 + max-concurrency: 1 #指定最大的消费者数量 + retry: + enabled: true #是否支持重试 + +logging: + level: + ROOT: INFO + com: + xncoding: DEBUG + file: E:/logs/app.log + diff --git a/springboot-rabbitmq/src/test/java/com/xncoding/service/SenderServiceTest.java b/springboot-rabbitmq/src/test/java/com/xncoding/service/SenderServiceTest.java new file mode 100644 index 0000000..5a25ff8 --- /dev/null +++ b/springboot-rabbitmq/src/test/java/com/xncoding/service/SenderServiceTest.java @@ -0,0 +1,35 @@ +package com.xncoding.service; + +import com.xncoding.pos.Application; +import com.xncoding.pos.service.SenderService; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + + +/** + * UserServiceTest + * + * @author XiongNeng + * @version 1.0 + * @since 2018/2/2 + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = Application.class) +public class SenderServiceTest { + @Autowired + private SenderService senderService; + + @Test + public void testCache() { + // 测试广播模式 + senderService.broadcast("同学们集合啦!"); + // 测试Direct模式 + senderService.direct("定点消息"); + } +}