1首先写配置监听文件
@Configuration
@EnableCaching
public class RestRedisConfig extends CachingConfigurerSupport {
@Value("${redis.server-addr}")
private String host;
@Value("${redis.server-port}")
private int port;
@Value("${redis.server-pass}")
private String password;
private int timeout = 10000;//毫秒
@Bean
public RedisConnectionFactory redisConnectionFactory() {
JedisConnectionFactory fac = new JedisConnectionFactory();
fac.setHostName(host);
fac.setPort(port);
fac.setPassword(password);
fac.setTimeout(timeout);
fac.getPoolConfig().setMaxIdle(300);
fac.getPoolConfig().setMaxTotal(1000);
fac.getPoolConfig().setMaxWaitMillis(1000);
fac.getPoolConfig().setMinEvictableIdleTimeMillis(
300000);
fac.getPoolConfig()
.setNumTestsPerEvictionRun(1024);
fac.getPoolConfig().setTimeBetweenEvictionRunsMillis(
30000);
fac.getPoolConfig().setTestOnBorrow(true);
fac.getPoolConfig().setTestWhileIdle(true);
return fac;
}
/**
* RedisTemplate配置
*/
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate template = new StringRedisTemplate(factory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 如需监听多个 在 MessageListenerAdapter listenerAdapter后面加参数
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//可以添加多个 messageListener
container.addMessageListener(listenerAdapter, new PatternTopic(RedisConst.TASKWORKORDER));
return container;
}
/**
* 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法 ,如需监听多个复制下面的方法
* @param redisReceiver
* @return
*/
@Bean
MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {
return new MessageListenerAdapter(redisReceiver, "TaskWorkOrder");
}
2 写订阅redis消息的类
/**
* 接收订阅的redis消息
*/
@Service
public class RedisReceiver {
public static final Logger logger = LoggerFactory.getLogger(RedisReceiver.class);
@Autowired
PushAndroid pushAndroid;
//接收订单的服务推送
public void TaskWorkOrder(String message) {
try{
logger.info("redis消息来了:"+message);
//去掉前后多的引号
message = message.substring(1);
message = message.substring(0,message.length()-1);
String sendMessage = message.substring(0, message.indexOf("|"));
String targetValue = message.substring(message.indexOf("|")+1);
pushAndroid.sendMessage("01","mobile","MESSAGE","ACCOUNT",targetValue,sendMessage,sendMessage);
logger.info("redis消息来了处理后:"+message);
}catch (Exception e){
logger.error("接收redis消息失败,error={}",e);
}
}
}
3 消息发布的测试类
/**
* 接警发布消息服务
*/
@RestController
@RequestMapping("/abeyance/sendMessageTest")
public class sendMessageService {
Logger logger = LoggerFactory.getLogger(sendMessageService.class);
@Autowired
RedisTemplate<String, String> redisTemplate;
ObjectMapper objectMapper = new ObjectMapper();
@PostMapping(value = "/test")
public String resetCache(String token) throws Exception {
CommonResult commonResult = new CommonResult();
sendMessage(RedisConst.TASKWORKORDER,"这是个测试消息");//发布消息,
return objectMapper.writeValueAsString(commonResult);
}
/**
* redis 发布消息
* @param key
* @param message
*/
public void sendMessage(String key, String message) {
redisTemplate.convertAndSend(key,message);
}
}