Redis 事件订阅在 Spring 中的使用,
分享于 点击 18772 次 点评:31
Redis 事件订阅在 Spring 中的使用,
Redis 事件订阅功能
Redis 的 key 事件机制允许客户端订阅接收 Redis 数据事件. 可以接收的事件有
- 所有影响指定 KEY 的命令
- 所有 KEY 的 LPUSH 操作
- 所有 KEY 的过期事件
事件是通过 Redis 的 Pub/Sub 层传递的, 因此实现 Pub/Sub 的客户端都可以使用此功能. Redis 的 Pub/Sub 是 即发即弃(fire and forget)模式, 也就是说, 如果 Pub/Sub 客户端断开再重新连接, 那么在断开期间的所有事件都会丢失. 所以这个功能不适合需要可靠的事件通知的场景.
Redis 事件的命令实现
Redis 开启键空间通知
两种方式
- redis-client 登录后通过命令设置开启
config set notify-keyspace-events Ex
, 服务重启后失效 - 编辑配置文件 redis.conf, 设置
notify-keyspace-events "Ex"
, 之后重启服务生效
客户端订阅键空间通知事件
注意 keyevent@0
前后是两个下划线
psubscribe __keyevent@*__:*
:@
后面的第一个*
位置表示 Redis 的 0-15个DB,*
表示所有DB, 第二个*
的位置表示要监听的事件,*
表示所有事件, 如果要指定过期事件, 可以改为expired
.psubscribe __keyevent@0__:expired
: 订阅 DB0 的所有key过期事件
订阅
./redis-cli -h 192.168.50.128
192.168.50.128:6379> psubscribe __keyevent@0__:expired
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__keyevent@0__:expired"
3) (integer) 1
设置一个key和超时时间
192.168.50.128:6379> setex name 10 Kitty
基于key过期事件实现延时队列
Redis 过期事件传的是 key, 是不包含value的, 如果使用redis过期事件机制实现延时队列, key和value 需要另存一份
- 存 key value
- 存 key, 设置过期时间, 相当于过期队列
- 客户端订阅键过期通知
- 客户端收到 key 过期事件后, 根据 key 读取 value, 处理业务逻辑
注意
- key 过期通知事件是在 key 被删除时触发的
- 在 key 过期前如果主动删除该 key 不会触发过期通知事件.
spring-data-redis 的事件机制
在分布式系统中, 因为 redis 一般会用于跨模块的缓存和临时数据, 因此可以通过 redis 实现分布式的消息传递
spring-data-redis 内建两个 listener, 分别是
- KeyExpirationEventMessageListener 监听key的过期事件
- KeyspaceEventMessageListener 监听key的操作事件
监听过期事件
KeyExpirationEventMessageListener 默认监听的是所有key的expired事件
private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");
protected void doRegister(RedisMessageListenerContainer listenerContainer) {
listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);
}
- 通过扩展这个类自定义监听器可以省掉将监听器添加到 RedisMessageListenerContainer 的步骤, 因为在 doRegister() 中已经做了添加到容器的处理
- addMessageListener 时需要指定topic,
__keyevent@*__:expired
表示监听所有数据库中所有key的 expired 事件, 可以自定义自己需要监听的事件, 例如__keyevent@0__:expired
仅监听 db0 的过期事件__keyevent@0__:del
仅监听 db0 的删除事件
可以自定义扩展类中覆盖这个方法, 监听特定key, 以及对消息中的key进行业务处理
@Component
public class CustomKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
protected void doRegister(RedisMessageListenerContainer listenerContainer) {
listenerContainer.addMessageListener(this, new PatternTopic("__keyevent@0__:expired"));
}
@Override
public void onMessage(Message message, byte[] pattern) {
// 得到过期key
String expiredKey = message.toString();
// 业务处理
}
监听key操作
KeyspaceEventMessageListener 在初始化时, 会对发布配置进行调整, 并将自己注册到容器中
private String keyspaceNotificationsConfigParameter = "EA";
public void init() {
if (StringUtils.hasText(this.keyspaceNotificationsConfigParameter)) {
RedisConnection connection = this.listenerContainer.getConnectionFactory().getConnection();
try {
Properties config = connection.getConfig("notify-keyspace-events");
if (!StringUtils.hasText(config.getProperty("notify-keyspace-events"))) {
connection.setConfig("notify-keyspace-events", this.keyspaceNotificationsConfigParameter);
}
} finally {
connection.close();
}
}
this.doRegister(this.listenerContainer);
}
默认监听所有key的所有事件
private static final Topic TOPIC_ALL_KEYEVENTS = new PatternTopic("__keyevent@*");
protected void doRegister(RedisMessageListenerContainer container) {
this.listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS);
}
可以在扩展这个监听器时, 可以通过重写 doRegister 自定义需要监听的事件类型
public class KeyDeleteEventMessageListener extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware {
private static final Topic KEYEVENT_DELETE_TOPIC = new PatternTopic("__keyevent@*__:del");
public KeyDeleteEventMessageListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
protected void doRegister(RedisMessageListenerContainer listenerContainer) {
listenerContainer.addMessageListener(this, KEYEVENT_DELETE_TOPIC);
}
protected void doHandleMessage(Message message) {
// 业务处理
}
}
用户点评