add rabbitmq rpc sample

This commit is contained in:
Xiong Neng
2018-05-17 15:11:08 +08:00
parent 66c4d7ae39
commit f8e958fa71
21 changed files with 933 additions and 0 deletions

View File

@ -0,0 +1,12 @@
package com.xncoding.pos;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

View File

@ -0,0 +1,59 @@
package com.xncoding.pos.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* ExecutorConfig
*
* @author XiongNeng
* @version 1.0
* @since 2018/5/17
*/
@Configuration
@EnableAsync
public class ExecutorConfig {
/**
* Set the ThreadPoolExecutor's core pool size.
*/
private int corePoolSize = 10;
/**
* Set the ThreadPoolExecutor's maximum pool size.
*/
private int maxPoolSize = 200;
/**
* Set the capacity for the ThreadPoolExecutor's BlockingQueue.
*/
private int queueCapacity = 10;
// @Bean
// public Executor mySimpleAsync() {
// ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// executor.setCorePoolSize(corePoolSize);
// executor.setMaxPoolSize(maxPoolSize);
// executor.setQueueCapacity(queueCapacity);
// executor.setThreadNamePrefix("MySimpleExecutor-");
// executor.initialize();
// return executor;
// }
@Bean
public Executor myAsync() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix("MyExecutor-");
// rejection-policy当pool已经达到max size的时候如何处理新任务
// CALLER_RUNS不在新线程中执行任务而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}

View File

@ -0,0 +1,89 @@
package com.xncoding.pos.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.annotation.EnableAsync;
import javax.annotation.Resource;
/**
* RabbitConfig
*
* @author XiongNeng
* @version 1.0
* @since 2018/3/1
*/
@Configuration
public class RabbitConfig {
/**
* 同步RPC队列
*/
public static final String QUEUE_SYNC_RPC = "rpc.sync";
/**
* 异步RPC队列使用临时回复队列或者使用“Direct reply-to”特性
*/
public static final String QUEUE_ASYNC_RPC = "rpc.async";
/**
* 异步RPC队列每个客户端使用不同的固定回复队列需要额外提供correlationId以关联请求和响应
*/
public static final String QUEUE_ASYNC_RPC_WITH_FIXED_REPLY = "rpc.with.fixed.reply";
@Bean
public Queue syncRPCQueue() {
return new Queue(QUEUE_SYNC_RPC);
}
@Bean
public Queue asyncRPCQueue() {
return new Queue(QUEUE_ASYNC_RPC);
}
@Bean
public Queue fixedReplyRPCQueue() {
return new Queue(QUEUE_ASYNC_RPC_WITH_FIXED_REPLY);
}
@Bean
public Queue repliesQueue() {
return new AnonymousQueue();
}
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
@Bean
@Primary
public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(repliesQueue().getName());
return container;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean
public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate template, SimpleMessageListenerContainer container) {
return new AsyncRabbitTemplate(template, container);
}
}

View File

@ -0,0 +1,29 @@
package com.xncoding.pos.model;
/**
* User
*
* @author XiongNeng
* @version 1.0
* @since 2018/5/17
*/
public class User {
private String name;
private Integer age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
}

View File

@ -0,0 +1,87 @@
package com.xncoding.pos.server;
import com.fasterxml.jackson.core.type.TypeReference;
import com.xncoding.pos.model.User;
import com.xncoding.pos.util.JacksonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.nio.charset.Charset;
import static com.xncoding.pos.config.RabbitConfig.QUEUE_ASYNC_RPC;
import static com.xncoding.pos.config.RabbitConfig.QUEUE_ASYNC_RPC_WITH_FIXED_REPLY;
@Component
public class AsyncRPCServer {
@Autowired
AmqpTemplate amqpTemplate;
@Autowired
AsyncTask asyncTask;
Logger logger = LoggerFactory.getLogger(getClass());
@RabbitListener(queues = QUEUE_ASYNC_RPC)
public void processAsyncRpc(Message message, @Header(AmqpHeaders.REPLY_TO) String replyTo) {
String body = new String(message.getBody(), Charset.forName("UTF-8"));
User user = JacksonUtil.json2Bean(body, new TypeReference<User>(){});
logger.info("recevie message {} and reply to {}, user.name={}", body, replyTo, user.getName());
if (replyTo.startsWith("amq.rabbitmq.reply-to")) {
logger.debug("starting with version 3.4.0, the RabbitMQ server now supports Direct reply-to");
} else {
logger.info("fall back to using a temporary reply queue");
}
ListenableFuture<String> asyncResult = asyncTask.expensiveOperation(body);
asyncResult.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
amqpTemplate.convertAndSend(replyTo, result);
}
@Override
public void onFailure(Throwable ex) {
logger.error("接受到QUEUE_ASYNC_RPC失败", ex);
}
});
}
@RabbitListener(queues = QUEUE_ASYNC_RPC_WITH_FIXED_REPLY)
public void processAsyncRpcFixed(User user,
@Header(AmqpHeaders.REPLY_TO) String replyTo,
@Header(AmqpHeaders.CORRELATION_ID) byte[] correlationId) {
// String body = new String(message.getBody(), Charset.forName("UTF-8"));
// User user = JacksonUtil.json2Bean(body, new TypeReference<User>(){});
logger.info("user.name={}", user.getName());
logger.info("use a fixed reply queue={}, correlationId={}", replyTo, new String(correlationId));
ListenableFuture<String> asyncResult = asyncTask.expensiveOperation(user.getName());
asyncResult.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
amqpTemplate.convertAndSend(replyTo, (Object) result, m -> {
//https://stackoverflow.com/questions/42382307/messageproperties-setcorrelationidstring-is-not-working
m.getMessageProperties().setCorrelationId(correlationId);
return m;
});
}
@Override
public void onFailure(Throwable ex) {
logger.error("接受到QUEUE_ASYNC_RPC_WITH_FIXED_REPLY失败", ex);
}
});
}
}

View File

@ -0,0 +1,26 @@
package com.xncoding.pos.server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
@Component
public class AsyncTask {
Logger logger = LoggerFactory.getLogger(getClass());
@Async
public ListenableFuture<String> expensiveOperation(String message) {
int millis = (int) (Math.random() * 5 * 1000);
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
}
String result = message + " executed by " + Thread.currentThread().getName() + " for " + millis + " ms";
logger.info("task result {}", result);
return new AsyncResult<String>(result);
}
}

View File

@ -0,0 +1,45 @@
package com.xncoding.pos.util;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
/**
* JacksonUtil
*
* @author XiongNeng
* @version 1.0
* @since 2018/3/4
*/
public class JacksonUtil {
private static ObjectMapper mapper = new ObjectMapper();
public static String bean2Json(Object obj) {
try {
return mapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
e.printStackTrace();
return null;
}
}
// public static <T> T json2Bean(String jsonStr, Class<T> objClass) {
// try {
// return mapper.readValue(jsonStr, objClass);
// } catch (IOException e) {
// e.printStackTrace();
// return null;
// }
// }
public static <T> T json2Bean(String jsonStr, TypeReference<T> typeReference) {
try {
return mapper.readValue(jsonStr, typeReference);
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}

View File

@ -0,0 +1,38 @@
##########################################################
################## 所有profile共有的配置 #################
##########################################################
################### spring配置 ###################
spring:
profiles:
active: dev
---
#####################################################################
######################## 开发环境profile ##########################
#####################################################################
spring:
profiles: dev
rabbitmq:
host: 119.29.12.177
port: 5672
username: guest
password: guest
# publisher-confirms: true #支持发布确认
# publisher-returns: true #支持发布返回
# listener:
# simple:
# acknowledge-mode: manual #采用手动应答
# concurrency: 1 #指定最小的消费者数量
# max-concurrency: 20 #指定最大的消费者数量
# retry:
# enabled: true #是否支持重试
logging:
level:
ROOT: INFO
com:
xncoding: DEBUG
file: D:/logs/rabbitmq-rpc-server.log