欢迎访问悦橙教程(wld5.com),关注java教程。悦橙教程  java问答|  每日更新
页面导航 : > > 文章正文

Redis 事件订阅在 Spring 中的使用,

来源: javaer 分享于  点击 18772 次 点评:31

Redis 事件订阅在 Spring 中的使用,


Redis 事件订阅功能

Redis 的 key 事件机制允许客户端订阅接收 Redis 数据事件. 可以接收的事件有

  • 所有影响指定 KEY 的命令
  • 所有 KEY 的 LPUSH 操作
  • 所有 KEY 的过期事件

事件是通过 Redis 的 Pub/Sub 层传递的, 因此实现 Pub/Sub 的客户端都可以使用此功能. Redis 的 Pub/Sub 是 即发即弃(fire and forget)模式, 也就是说, 如果 Pub/Sub 客户端断开再重新连接, 那么在断开期间的所有事件都会丢失. 所以这个功能不适合需要可靠的事件通知的场景.

Redis 事件的命令实现

Redis 开启键空间通知

两种方式

  • redis-client 登录后通过命令设置开启 config set notify-keyspace-events Ex, 服务重启后失效
  • 编辑配置文件 redis.conf, 设置 notify-keyspace-events "Ex", 之后重启服务生效

客户端订阅键空间通知事件

注意 keyevent@0 前后是两个下划线

  • psubscribe __keyevent@*__:*: @后面的第一个*位置表示 Redis 的 0-15个DB, *表示所有DB, 第二个 * 的位置表示要监听的事件, *表示所有事件, 如果要指定过期事件, 可以改为 expired.
  • psubscribe __keyevent@0__:expired: 订阅 DB0 的所有key过期事件

订阅

./redis-cli -h 192.168.50.128
192.168.50.128:6379> psubscribe __keyevent@0__:expired
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__keyevent@0__:expired"
3) (integer) 1

设置一个key和超时时间

192.168.50.128:6379> setex name 10 Kitty

基于key过期事件实现延时队列

Redis 过期事件传的是 key, 是不包含value的, 如果使用redis过期事件机制实现延时队列, key和value 需要另存一份

  • 存 key value
  • 存 key, 设置过期时间, 相当于过期队列
  • 客户端订阅键过期通知
  • 客户端收到 key 过期事件后, 根据 key 读取 value, 处理业务逻辑

注意

  • key 过期通知事件是在 key 被删除时触发的
  • 在 key 过期前如果主动删除该 key 不会触发过期通知事件.

spring-data-redis 的事件机制

在分布式系统中, 因为 redis 一般会用于跨模块的缓存和临时数据, 因此可以通过 redis 实现分布式的消息传递

spring-data-redis 内建两个 listener, 分别是

  • KeyExpirationEventMessageListener 监听key的过期事件
  • KeyspaceEventMessageListener 监听key的操作事件

监听过期事件

KeyExpirationEventMessageListener 默认监听的是所有key的expired事件

private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");

protected void doRegister(RedisMessageListenerContainer listenerContainer) {
    listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);
}
  • 通过扩展这个类自定义监听器可以省掉将监听器添加到 RedisMessageListenerContainer 的步骤, 因为在 doRegister() 中已经做了添加到容器的处理
  • addMessageListener 时需要指定topic, __keyevent@*__:expired表示监听所有数据库中所有key的 expired 事件, 可以自定义自己需要监听的事件, 例如
    • __keyevent@0__:expired 仅监听 db0 的过期事件
    • __keyevent@0__:del 仅监听 db0 的删除事件

可以自定义扩展类中覆盖这个方法, 监听特定key, 以及对消息中的key进行业务处理

@Component
public class CustomKeyExpirationListener extends KeyExpirationEventMessageListener {
 
    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }
    
    protected void doRegister(RedisMessageListenerContainer listenerContainer) {
        listenerContainer.addMessageListener(this, new PatternTopic("__keyevent@0__:expired"));
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 得到过期key
        String expiredKey = message.toString();
        // 业务处理
    }

监听key操作

KeyspaceEventMessageListener 在初始化时, 会对发布配置进行调整, 并将自己注册到容器中

private String keyspaceNotificationsConfigParameter = "EA";

public void init() {
    if (StringUtils.hasText(this.keyspaceNotificationsConfigParameter)) {
        RedisConnection connection = this.listenerContainer.getConnectionFactory().getConnection();

        try {
            Properties config = connection.getConfig("notify-keyspace-events");
            if (!StringUtils.hasText(config.getProperty("notify-keyspace-events"))) {
                connection.setConfig("notify-keyspace-events", this.keyspaceNotificationsConfigParameter);
            }
        } finally {
            connection.close();
        }
    }
    this.doRegister(this.listenerContainer);
}

默认监听所有key的所有事件

private static final Topic TOPIC_ALL_KEYEVENTS = new PatternTopic("__keyevent@*");

protected void doRegister(RedisMessageListenerContainer container) {
    this.listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS);
}

可以在扩展这个监听器时, 可以通过重写 doRegister 自定义需要监听的事件类型

public class KeyDeleteEventMessageListener extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware {
    private static final Topic KEYEVENT_DELETE_TOPIC = new PatternTopic("__keyevent@*__:del");

    public KeyDeleteEventMessageListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }
 
    protected void doRegister(RedisMessageListenerContainer listenerContainer) {
        listenerContainer.addMessageListener(this, KEYEVENT_DELETE_TOPIC);
    }
 
    protected void doHandleMessage(Message message) {
        // 业务处理
    }
}
相关栏目:

用户点评