您现在的位置是:首页 >学无止境 >redis 过期消息订阅实现(java实现)网站首页学无止境

redis 过期消息订阅实现(java实现)

持键写浮生 2023-06-17 12:00:02
简介redis 过期消息订阅实现(java实现)

一、开启redis消息通知功能

方法1: 修改conf文件

编辑/etc/redis/redis.conf文件,添加或启用以下内容(key过期通知):

notify-keyspace-events Ex

方法2: 使用命令

  1. 登陆redis-cli
  2. 输入下列命令
config set notify-keyspace-events Ex

关键字介绍:

上面Ex就是其中的关键字之一
  • K:keyspace事件,事件以__keyspace@__为前缀进行发布
  • E:keyevent事件,事件以__keyevent@__为前缀进行发布
  • g:一般性的,非特定类型的命令,比如del,expire,rename等
  • $:字符串特定命令
  • l:列表特定命令
  • s:集合特定命令
  • h:哈希特定命令
  • z:有序集合特定命令
  • x:过期事件,当某个键过期并删除时会产生该事件
  • e:驱逐事件,当某个键因maxmemore策略而被删除时,产生该事件
  • A:g$lshzxe的别名,因此AKE意味着所有事件

订阅者介绍

  • onMessage: 收到消息回调
  • onSubscribe: 订阅频道(channel)
  • onUnsubscribe: 取消订阅频道(channel)
  • onPMessage: 收到消息回调-p模式
  • onPSubscribe: 订阅频道(channel)p模式
  • onPUnsubscribe: 取消订阅频道(channel)p模式

带P的就是可以在订阅的时候支持表达式, 一次性订阅多个频道,

例如:

__keyevent@*__:expired ```

其中的*标识订阅所有db的key过期事件

二、在pom文件中引入需要的redis依赖

        <!--添加redis依赖-->
         <dependency>
           <groupId>org.springframework.data</groupId>
           <artifactId>spring-data-redis</artifactId>
           <version>1.8.4.RELEASE</version>
         </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

三、编写基本配置文件 redis.properties

redis.hostName=192.168.6.138
redis.port=6379
redis.password=123321
# 连接超时时间
redis.timeout=10000

#最大空闲数
redis.maxIdle=300
#控制一个pool可分配多少个jedis实例,用来替换上面的redis.maxActive,如果是jedis 2.4以后用该属性
redis.maxTotal=1000
#最大建立连接等待时间。如果超过此时间将接到异常。设为-1表示无限制。
redis.maxWaitMillis=1000
#连接的最小空闲时间 默认1800000毫秒(30分钟)
redis.minEvictableIdleTimeMillis=300000
#每次释放连接的最大数目,默认3
redis.numTestsPerEvictionRun=1024
#逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1
redis.timeBetweenEvictionRunsMillis=30000
#是否在从池中取出连接前进行检验,如果检验失败,则从池中去除连接并尝试取出另一个
redis.testOnBorrow=true
#在空闲时检查有效性, 默认false
redis.testWhileIdle=true

四、编写配置类 RedisConfig

package com.lanqiaobei.ssm.yjk.config;


import com.lanqiaobei.ssm.yjk.util.RedisKeyExpirationListener;
//import com.liuyanzhao.ssm.blog.util.RedisLockUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.jcache.config.JCacheConfigurerSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.nio.charset.StandardCharsets;

@Configuration
@PropertySource("classpath:redis.properties")
public class RedisConfig extends JCacheConfigurerSupport {
    @Autowired
    private Environment environment;

    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        JedisConnectionFactory fac = new JedisConnectionFactory();
        fac.setHostName(environment.getProperty("redis.hostName"));
        fac.setPort(Integer.parseInt(environment.getProperty("redis.port")));
        fac.setPassword(environment.getProperty("redis.password"));
        fac.setTimeout(Integer.parseInt(environment.getProperty("redis.timeout")));
        fac.getPoolConfig().setMaxIdle(Integer.parseInt(environment.getProperty("redis.maxIdle")));
        fac.getPoolConfig().setMaxTotal(Integer.parseInt(environment.getProperty("redis.maxTotal")));
        fac.getPoolConfig().setMaxWaitMillis(Integer.parseInt(environment.getProperty("redis.maxWaitMillis")));
        fac.getPoolConfig().setMinEvictableIdleTimeMillis(
                Integer.parseInt(environment.getProperty("redis.minEvictableIdleTimeMillis")));
        fac.getPoolConfig()
                .setNumTestsPerEvictionRun(Integer.parseInt(environment.getProperty("redis.numTestsPerEvictionRun")));
        fac.getPoolConfig().setTimeBetweenEvictionRunsMillis(
                Integer.parseInt(environment.getProperty("redis.timeBetweenEvictionRunsMillis")));
        fac.getPoolConfig().setTestOnBorrow(Boolean.parseBoolean(environment.getProperty("redis.testOnBorrow")));
        fac.getPoolConfig().setTestWhileIdle(Boolean.parseBoolean(environment.getProperty("redis.testWhileIdle")));
        return fac;
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory){
        // 创建RedisTemplate对象
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // 设置连接工厂
        template.setConnectionFactory(connectionFactory);
        // 创建JSON序列化工具
        GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
        // 设置Key的序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(StandardCharsets.UTF_8);
        template.setKeySerializer(stringRedisSerializer);
        template.setHashKeySerializer(stringRedisSerializer);
        // 设置Value的序列化
        template.setValueSerializer(jsonRedisSerializer);
        template.setHashValueSerializer(jsonRedisSerializer);
        template.setDefaultSerializer(stringRedisSerializer);
        // 返回
        return template;
    }
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(
            RedisConnectionFactory redisConnectionFactory,
            RedisKeyExpirationListener redisKeyExpirationListener) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);

        // 监听 __keyevent@0__:expired 频道,这里的0指数据库编号为 0;
        container.addMessageListener(redisKeyExpirationListener,
                new PatternTopic("__keyevent@0__:expired"));

        return container;
    }

    @Bean
    public RedisKeyExpirationListener redisKeyExpirationListener() {
        return new RedisKeyExpirationListener();
    }
    // 其他 Bean 定义
}

五、实现监听类 RedisKeyExpirationListener

package com.lanqiaobei.ssm.yjk.util;

import cn.hutool.core.util.StrUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static cn.hutool.core.util.IdUtil.randomUUID;
@Component
public class RedisKeyExpirationListener implements MessageListener {
    @Autowired
    private RedisTemplate<String,Object> redisTemplate;
    @Autowired
    private PublisherMQ publisherMQ;

    //分布式锁过期时间 s  可以根据业务自己调节
    private static final Long LOCK_REDIS_TIMEOUT = 2000L;

    @Override
    public void onMessage(Message message, byte[] pattern) {

        // 获取过期的 Key,需要利用byte[]录入和接收,不然会出现中文乱码
        byte[] body = message.getBody();
        String allKey = redisTemplate.getStringSerializer().deserialize(body);
        String expiredKey = StrUtil.removePrefix(allKey, "todo:");//hutool工具里面去掉首位字符
//        System.out.println(expiredKey);

        // 处理相应的业务逻辑
//————————————————————————————————————————————————————————
        String key = "todolock:"+expiredKey;
        String value = randomUUID();
        //redis尝试获取锁,加锁
        Boolean getLock = getLock(key,value);

        if(getLock){
            publisherMQ.sendMessage("lqb.direct","queueLQBKey",expiredKey);
            releaseLock(key,value);
        }
    }

    /**
     *  加锁
     **/
    public Boolean getLock(String key,String value){
        Boolean lockStatus = redisTemplate.opsForValue().setIfAbsent(key,value);
        if (lockStatus) {
            System.out.println("Set key-value successfully!");
            redisTemplate.expire(key, LOCK_REDIS_TIMEOUT, TimeUnit.MILLISECONDS);//毫秒级
        } else {
            System.out.println("Key already exists!");
        }
        return lockStatus;
    }
    /**
     *  释放锁
     **/
    public Long releaseLock(String key,String value){
        String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        RedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript,Long.class);
        Long releaseStatus = (Long)this.redisTemplate.execute(redisScript, Collections.singletonList(key),value);
        return releaseStatus;
    }
}

监听redis过期消息提醒,同一个数据(键)过期会有多次通知提醒。原因是:可能是由于 Redis 的主从复制或者分片集群等机制导致的。在主从复制或者分片集群中,可能会发生多个节点同时订阅了相同的键空间通知,从而导致同一个键空间事件被多次触发。
我的解决方法是:给键过期后提醒的回调函数加锁,收到多个通知提醒,回调函数加锁后最终只会有一个执行,其他没有获得锁的回调不会执行,这样就避免了重复执行任务代码。

这里的实现方法在另一个文章:
https://blog.csdn.net/m0_46652188/article/details/130394484

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。