diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/LICENSE b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/LICENSE new file mode 100644 index 0000000..83cd47d --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2018 Xiong Neng + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/README.md b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/README.md new file mode 100644 index 0000000..368f94d --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/README.md @@ -0,0 +1,9 @@ +## RabbitMQ实现RPC调用客户端 + +消息队列RabbitMQ的使用例子,演示了RPC调用的客户端例子。 + +## 许可证 + +Copyright (c) 2018 Xiong Neng + +基于 MIT 协议发布: diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/pom.xml b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/pom.xml new file mode 100644 index 0000000..e542231 --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/pom.xml @@ -0,0 +1,87 @@ + + + 4.0.0 + + com.xncoding + springboot-rabbitmq-rpc-client + 1.0.0-SNAPSHOT + jar + + springboot-rabbitmq-rpc-client + 集成消息队列RabbitMQ RPC调用 - 客户端 + + + org.springframework.boot + spring-boot-starter-parent + 1.5.10.RELEASE + + + + + UTF-8 + UTF-8 + 1.8 + + + + + org.springframework.boot + spring-boot-starter-amqp + + + + org.springframework.boot + spring-boot-starter-test + + + com.vaadin.external.google + android-json + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.6.1 + + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.20 + + true + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + + + src/main/resources + + + src/main/java + + **/*.xml + + + + + + diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/main/java/com/xncoding/pos/Application.java b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/main/java/com/xncoding/pos/Application.java new file mode 100644 index 0000000..adc4c48 --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/main/java/com/xncoding/pos/Application.java @@ -0,0 +1,12 @@ +package com.xncoding.pos; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Application { + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + +} diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/main/java/com/xncoding/pos/config/RabbitConfig.java b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/main/java/com/xncoding/pos/config/RabbitConfig.java new file mode 100644 index 0000000..20c44c7 --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/main/java/com/xncoding/pos/config/RabbitConfig.java @@ -0,0 +1,89 @@ +package com.xncoding.pos.config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.AmqpTemplate; +import org.springframework.amqp.core.AnonymousQueue; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbit.AsyncRabbitTemplate; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.scheduling.annotation.EnableAsync; + +import javax.annotation.Resource; + +/** + * RabbitConfig + * + * @author XiongNeng + * @version 1.0 + * @since 2018/3/1 + */ +@Configuration +public class RabbitConfig { + /** + * 同步RPC队列 + */ + public static final String QUEUE_SYNC_RPC = "rpc.sync"; + + /** + * 异步RPC队列,使用临时回复队列,或者使用“Direct reply-to”特性 + */ + public static final String QUEUE_ASYNC_RPC = "rpc.async"; + + /** + * 异步RPC队列,每个客户端使用不同的固定回复队列,需要额外提供correlationId以关联请求和响应 + */ + public static final String QUEUE_ASYNC_RPC_WITH_FIXED_REPLY = "rpc.with.fixed.reply"; + + @Bean + public Queue syncRPCQueue() { + return new Queue(QUEUE_SYNC_RPC); + } + + @Bean + public Queue asyncRPCQueue() { + return new Queue(QUEUE_ASYNC_RPC); + } + + @Bean + public Queue fixedReplyRPCQueue() { + return new Queue(QUEUE_ASYNC_RPC_WITH_FIXED_REPLY); + } + + @Bean + public Queue repliesQueue() { + return new AnonymousQueue(); + } + + @Bean + public MessageConverter jsonMessageConverter(){ + return new Jackson2JsonMessageConverter(); + } + + @Bean + @Primary + public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) { + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); + container.setQueueNames(repliesQueue().getName()); + return container; + } + + @Bean + public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { + final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); + rabbitTemplate.setMessageConverter(jsonMessageConverter()); + return rabbitTemplate; + } + + @Bean + public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate template, SimpleMessageListenerContainer container) { + return new AsyncRabbitTemplate(template, container); + } +} diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/main/java/com/xncoding/pos/model/User.java b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/main/java/com/xncoding/pos/model/User.java new file mode 100644 index 0000000..407537c --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/main/java/com/xncoding/pos/model/User.java @@ -0,0 +1,29 @@ +package com.xncoding.pos.model; + +/** + * User + * + * @author XiongNeng + * @version 1.0 + * @since 2018/5/17 + */ +public class User { + private String name; + private Integer age; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Integer getAge() { + return age; + } + + public void setAge(Integer age) { + this.age = age; + } +} diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/main/java/com/xncoding/pos/service/SenderService.java b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/main/java/com/xncoding/pos/service/SenderService.java new file mode 100644 index 0000000..db9e503 --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/main/java/com/xncoding/pos/service/SenderService.java @@ -0,0 +1,51 @@ +package com.xncoding.pos.service; + +import com.xncoding.pos.model.User; +import com.xncoding.pos.util.StringUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.AmqpTemplate; +import org.springframework.amqp.core.MessagePostProcessor; +import org.springframework.amqp.rabbit.AsyncRabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.AsyncResult; +import org.springframework.stereotype.Service; +import org.springframework.util.concurrent.ListenableFuture; + +import java.util.concurrent.Future; + +import static com.xncoding.pos.config.RabbitConfig.QUEUE_ASYNC_RPC; +import static com.xncoding.pos.config.RabbitConfig.QUEUE_ASYNC_RPC_WITH_FIXED_REPLY; + +/** + * 消息发送服务 + */ +@Service +public class SenderService { + private Logger logger = LoggerFactory.getLogger(this.getClass()); + @Autowired + AsyncRabbitTemplate asyncRabbitTemplate; + + @Autowired + AmqpTemplate amqpTemplate; + + @Async + public Future sendAsync(User message) { +// String result = (String) amqpTemplate.convertSendAndReceive(QUEUE_ASYNC_RPC, message, m -> { +// m.getMessageProperties().setCorrelationIdString(StringUtil.generateUUId()); +// return m; +// }); + String result = (String) amqpTemplate.convertSendAndReceive(QUEUE_ASYNC_RPC, message); + return new AsyncResult<>(result); + } + + public Future sendWithFixedReplay(User message) { +// ListenableFuture future = asyncRabbitTemplate.convertSendAndReceive(QUEUE_ASYNC_RPC_WITH_FIXED_REPLY, message, m -> { +// m.getMessageProperties().setCorrelationIdString(StringUtil.generateUUId()); +// return m; +// }); + return asyncRabbitTemplate.convertSendAndReceive(QUEUE_ASYNC_RPC_WITH_FIXED_REPLY, message); + } + +} diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/main/java/com/xncoding/pos/util/StringUtil.java b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/main/java/com/xncoding/pos/util/StringUtil.java new file mode 100644 index 0000000..9ec72ba --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/main/java/com/xncoding/pos/util/StringUtil.java @@ -0,0 +1,16 @@ +package com.xncoding.pos.util; + +import java.util.UUID; + +/** + * StringUtil + * + * @author XiongNeng + * @version 1.0 + * @since 2018/5/17 + */ +public class StringUtil { + public static String generateUUId() { + return UUID.randomUUID().toString(); + } +} diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/main/resources/application.yml b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/main/resources/application.yml new file mode 100644 index 0000000..43d1946 --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/main/resources/application.yml @@ -0,0 +1,29 @@ +########################################################## +################## 所有profile共有的配置 ################# +########################################################## + +################### spring配置 ################### +spring: + profiles: + active: dev + +--- + +##################################################################### +######################## 开发环境profile ########################## +##################################################################### +spring: + profiles: dev + rabbitmq: + host: 119.29.12.177 + port: 5672 + username: guest + password: guest + +logging: + level: + ROOT: INFO + com: + xncoding: DEBUG + file: D:/logs/rabbitmq-rpc-client.log + diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/test/java/com/xncoding/service/SenderServiceTest.java b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/test/java/com/xncoding/service/SenderServiceTest.java new file mode 100644 index 0000000..610c857 --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-client/src/test/java/com/xncoding/service/SenderServiceTest.java @@ -0,0 +1,90 @@ +package com.xncoding.service; + +import com.xncoding.pos.Application; +import com.xncoding.pos.model.User; +import com.xncoding.pos.service.SenderService; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + + +/** + * SenderServiceTest + * + * @author XiongNeng + * @version 1.0 + * @since 2018/2/2 + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = Application.class) +public class SenderServiceTest { + + Logger logger = LoggerFactory.getLogger(getClass()); + @Autowired + private SenderService senderService; + + private List users; + + @Before + public void prepare() { + users = new ArrayList<>(); + User user1 = new User(); + user1.setName("用户1"); + user1.setAge(19); + users.add(user1); + + User user2 = new User(); + user2.setName("用户2"); + user2.setAge(20); + users.add(user2); + } + + @Test + public void testSendAsync() throws InterruptedException, ExecutionException { + + List> results = new ArrayList<>(); + for (User user : users) { + Future result = senderService.sendAsync(user); + results.add(result); + } + for (Future future : results) { + String result = future.get(); + if (result == null) { + Assert.fail("message will not timeout"); + } else { + logger.info("tttttttttttt=" + result); + } + } + } + + @Test + public void testSendWithFixedReplay() throws InterruptedException, ExecutionException{ + List> results = new ArrayList<>(); + for (User user : users) { + Future result = senderService.sendWithFixedReplay(user); + results.add(result); + } + for (Future future : results) { + String result = future.get(); + if (result == null) { + Assert.fail("message will not timeout"); + } else { + logger.info("tttttttttttt=" + result); + } + } + } +} diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/LICENSE b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/LICENSE new file mode 100644 index 0000000..83cd47d --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2018 Xiong Neng + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/README.md b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/README.md new file mode 100644 index 0000000..505cfa0 --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/README.md @@ -0,0 +1,9 @@ +## RabbitMQ实现RPC调用服务端 + +消息队列RabbitMQ的使用例子,演示了RPC调用的服务端例子。 + +## 许可证 + +Copyright (c) 2018 Xiong Neng + +基于 MIT 协议发布: diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/pom.xml b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/pom.xml new file mode 100644 index 0000000..3ebd027 --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/pom.xml @@ -0,0 +1,87 @@ + + + 4.0.0 + + com.xncoding + springboot-rabbitmq-rpc-server + 1.0.0-SNAPSHOT + jar + + springboot-rabbitmq-rpc-server + 集成消息队列RabbitMQ RPC - 服务端 + + + org.springframework.boot + spring-boot-starter-parent + 1.5.10.RELEASE + + + + + UTF-8 + UTF-8 + 1.8 + + + + + org.springframework.boot + spring-boot-starter-amqp + + + + org.springframework.boot + spring-boot-starter-test + + + com.vaadin.external.google + android-json + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.6.1 + + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.20 + + true + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + + + + src/main/resources + + + src/main/java + + **/*.xml + + + + + + diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/Application.java b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/Application.java new file mode 100644 index 0000000..adc4c48 --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/Application.java @@ -0,0 +1,12 @@ +package com.xncoding.pos; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Application { + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + +} diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/config/ExecutorConfig.java b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/config/ExecutorConfig.java new file mode 100644 index 0000000..5ba2b69 --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/config/ExecutorConfig.java @@ -0,0 +1,59 @@ +package com.xncoding.pos.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * ExecutorConfig + * + * @author XiongNeng + * @version 1.0 + * @since 2018/5/17 + */ +@Configuration +@EnableAsync +public class ExecutorConfig { + /** + * Set the ThreadPoolExecutor's core pool size. + */ + private int corePoolSize = 10; + /** + * Set the ThreadPoolExecutor's maximum pool size. + */ + private int maxPoolSize = 200; + /** + * Set the capacity for the ThreadPoolExecutor's BlockingQueue. + */ + private int queueCapacity = 10; + +// @Bean +// public Executor mySimpleAsync() { +// ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); +// executor.setCorePoolSize(corePoolSize); +// executor.setMaxPoolSize(maxPoolSize); +// executor.setQueueCapacity(queueCapacity); +// executor.setThreadNamePrefix("MySimpleExecutor-"); +// executor.initialize(); +// return executor; +// } + + @Bean + public Executor myAsync() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(corePoolSize); + executor.setMaxPoolSize(maxPoolSize); + executor.setQueueCapacity(queueCapacity); + executor.setThreadNamePrefix("MyExecutor-"); + + // rejection-policy:当pool已经达到max size的时候,如何处理新任务 + // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.initialize(); + return executor; + } +} diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/config/RabbitConfig.java b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/config/RabbitConfig.java new file mode 100644 index 0000000..d5a6996 --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/config/RabbitConfig.java @@ -0,0 +1,89 @@ +package com.xncoding.pos.config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.AsyncRabbitTemplate; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; +import org.springframework.scheduling.annotation.EnableAsync; + +import javax.annotation.Resource; + +/** + * RabbitConfig + * + * @author XiongNeng + * @version 1.0 + * @since 2018/3/1 + */ +@Configuration +public class RabbitConfig { + /** + * 同步RPC队列 + */ + public static final String QUEUE_SYNC_RPC = "rpc.sync"; + + /** + * 异步RPC队列,使用临时回复队列,或者使用“Direct reply-to”特性 + */ + public static final String QUEUE_ASYNC_RPC = "rpc.async"; + + /** + * 异步RPC队列,每个客户端使用不同的固定回复队列,需要额外提供correlationId以关联请求和响应 + */ + public static final String QUEUE_ASYNC_RPC_WITH_FIXED_REPLY = "rpc.with.fixed.reply"; + + @Bean + public Queue syncRPCQueue() { + return new Queue(QUEUE_SYNC_RPC); + } + + @Bean + public Queue asyncRPCQueue() { + return new Queue(QUEUE_ASYNC_RPC); + } + + @Bean + public Queue fixedReplyRPCQueue() { + return new Queue(QUEUE_ASYNC_RPC_WITH_FIXED_REPLY); + } + + @Bean + public Queue repliesQueue() { + return new AnonymousQueue(); + } + + @Bean + public MessageConverter jsonMessageConverter(){ + return new Jackson2JsonMessageConverter(); + } + + @Bean + @Primary + public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) { + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); + container.setQueueNames(repliesQueue().getName()); + return container; + } + + @Bean + public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { + final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); + rabbitTemplate.setMessageConverter(jsonMessageConverter()); + return rabbitTemplate; + } + + @Bean + public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate template, SimpleMessageListenerContainer container) { + return new AsyncRabbitTemplate(template, container); + } +} diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/model/User.java b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/model/User.java new file mode 100644 index 0000000..407537c --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/model/User.java @@ -0,0 +1,29 @@ +package com.xncoding.pos.model; + +/** + * User + * + * @author XiongNeng + * @version 1.0 + * @since 2018/5/17 + */ +public class User { + private String name; + private Integer age; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Integer getAge() { + return age; + } + + public void setAge(Integer age) { + this.age = age; + } +} diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/server/AsyncRPCServer.java b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/server/AsyncRPCServer.java new file mode 100644 index 0000000..3056787 --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/server/AsyncRPCServer.java @@ -0,0 +1,87 @@ +package com.xncoding.pos.server; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.xncoding.pos.model.User; +import com.xncoding.pos.util.JacksonUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.amqp.AmqpException; +import org.springframework.amqp.core.AmqpTemplate; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessagePostProcessor; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Component; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; + +import java.nio.charset.Charset; + +import static com.xncoding.pos.config.RabbitConfig.QUEUE_ASYNC_RPC; +import static com.xncoding.pos.config.RabbitConfig.QUEUE_ASYNC_RPC_WITH_FIXED_REPLY; + + +@Component +public class AsyncRPCServer { + + @Autowired + AmqpTemplate amqpTemplate; + + @Autowired + AsyncTask asyncTask; + + Logger logger = LoggerFactory.getLogger(getClass()); + + @RabbitListener(queues = QUEUE_ASYNC_RPC) + public void processAsyncRpc(Message message, @Header(AmqpHeaders.REPLY_TO) String replyTo) { + String body = new String(message.getBody(), Charset.forName("UTF-8")); + User user = JacksonUtil.json2Bean(body, new TypeReference(){}); + logger.info("recevie message {} and reply to {}, user.name={}", body, replyTo, user.getName()); + if (replyTo.startsWith("amq.rabbitmq.reply-to")) { + logger.debug("starting with version 3.4.0, the RabbitMQ server now supports Direct reply-to"); + } else { + logger.info("fall back to using a temporary reply queue"); + } + ListenableFuture asyncResult = asyncTask.expensiveOperation(body); + asyncResult.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + amqpTemplate.convertAndSend(replyTo, result); + } + + @Override + public void onFailure(Throwable ex) { + logger.error("接受到QUEUE_ASYNC_RPC失败", ex); + } + }); + } + + @RabbitListener(queues = QUEUE_ASYNC_RPC_WITH_FIXED_REPLY) + public void processAsyncRpcFixed(User user, + @Header(AmqpHeaders.REPLY_TO) String replyTo, + @Header(AmqpHeaders.CORRELATION_ID) byte[] correlationId) { +// String body = new String(message.getBody(), Charset.forName("UTF-8")); +// User user = JacksonUtil.json2Bean(body, new TypeReference(){}); + logger.info("user.name={}", user.getName()); + logger.info("use a fixed reply queue={}, correlationId={}", replyTo, new String(correlationId)); + ListenableFuture asyncResult = asyncTask.expensiveOperation(user.getName()); + asyncResult.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + amqpTemplate.convertAndSend(replyTo, (Object) result, m -> { + //https://stackoverflow.com/questions/42382307/messageproperties-setcorrelationidstring-is-not-working + m.getMessageProperties().setCorrelationId(correlationId); + return m; + }); + } + + @Override + public void onFailure(Throwable ex) { + logger.error("接受到QUEUE_ASYNC_RPC_WITH_FIXED_REPLY失败", ex); + } + }); + } + +} diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/server/AsyncTask.java b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/server/AsyncTask.java new file mode 100644 index 0000000..1ba742a --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/server/AsyncTask.java @@ -0,0 +1,26 @@ +package com.xncoding.pos.server; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.AsyncResult; +import org.springframework.stereotype.Component; +import org.springframework.util.concurrent.ListenableFuture; + +@Component +public class AsyncTask { + + Logger logger = LoggerFactory.getLogger(getClass()); + + @Async + public ListenableFuture expensiveOperation(String message) { + int millis = (int) (Math.random() * 5 * 1000); + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + } + String result = message + " executed by " + Thread.currentThread().getName() + " for " + millis + " ms"; + logger.info("task result {}", result); + return new AsyncResult(result); + } +} diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/util/JacksonUtil.java b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/util/JacksonUtil.java new file mode 100644 index 0000000..89b7e30 --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/java/com/xncoding/pos/util/JacksonUtil.java @@ -0,0 +1,45 @@ +package com.xncoding.pos.util; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; + +/** + * JacksonUtil + * + * @author XiongNeng + * @version 1.0 + * @since 2018/3/4 + */ +public class JacksonUtil { + private static ObjectMapper mapper = new ObjectMapper(); + + public static String bean2Json(Object obj) { + try { + return mapper.writeValueAsString(obj); + } catch (JsonProcessingException e) { + e.printStackTrace(); + return null; + } + } + +// public static T json2Bean(String jsonStr, Class objClass) { +// try { +// return mapper.readValue(jsonStr, objClass); +// } catch (IOException e) { +// e.printStackTrace(); +// return null; +// } +// } + + public static T json2Bean(String jsonStr, TypeReference typeReference) { + try { + return mapper.readValue(jsonStr, typeReference); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + } +} diff --git a/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/resources/application.yml b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/resources/application.yml new file mode 100644 index 0000000..8826148 --- /dev/null +++ b/springboot-rabbitmq-rpc/springboot-rabbitmq-rpc-server/src/main/resources/application.yml @@ -0,0 +1,38 @@ +########################################################## +################## 所有profile共有的配置 ################# +########################################################## + +################### spring配置 ################### +spring: + profiles: + active: dev + +--- + +##################################################################### +######################## 开发环境profile ########################## +##################################################################### +spring: + profiles: dev + rabbitmq: + host: 119.29.12.177 + port: 5672 + username: guest + password: guest +# publisher-confirms: true #支持发布确认 +# publisher-returns: true #支持发布返回 +# listener: +# simple: +# acknowledge-mode: manual #采用手动应答 +# concurrency: 1 #指定最小的消费者数量 +# max-concurrency: 20 #指定最大的消费者数量 +# retry: +# enabled: true #是否支持重试 + +logging: + level: + ROOT: INFO + com: + xncoding: DEBUG + file: D:/logs/rabbitmq-rpc-server.log +