关于使用Redisson订阅数问题

 更新时间:2022年01月14日 11:11:05   作者:玉树临枫  
本文主要介绍了关于使用Redisson订阅数问题,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

一、前提

最近在使用分布式锁redisson时遇到一个线上问题:发现是subscriptionsPerConnection or subscriptionConnectionPoolSize 的大小不够,需要提高配置才能解决。

二、源码分析

下面对其源码进行分析,才能找到到底是什么逻辑导致问题所在:

1、RedissonLock#lock() 方法

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
? ? ? ? long threadId = Thread.currentThread().getId();
? ? ? ? // 尝试获取,如果ttl == null,则表示获取锁成功
? ? ? ? Long ttl = tryAcquire(leaseTime, unit, threadId);
? ? ? ? // lock acquired
? ? ? ? if (ttl == null) {
? ? ? ? ? ? return;
? ? ? ? }

? ? ? ? // 订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题
? ? ? ? RFuture<RedissonLockEntry> future = subscribe(threadId);
? ? ? ? if (interruptibly) {
? ? ? ? ? ? commandExecutor.syncSubscriptionInterrupted(future);
? ? ? ? } else {
? ? ? ? ? ? commandExecutor.syncSubscription(future);
? ? ? ? }

? ? ? ? // 后面代码忽略
? ? ? ? try {
? ? ? ? ? ? // 无限循环获取锁,直到获取锁成功
? ? ? ? ? ? // ...
? ? ? ? } finally {
? ? ? ? ? ? // 取消订阅锁释放事件
? ? ? ? ? ? unsubscribe(future, threadId);
? ? ? ? }
}

总结下主要逻辑:

  • 获取当前线程的线程id;
  • tryAquire尝试获取锁,并返回ttl
  • 如果ttl为空,则结束流程;否则进入后续逻辑;
  • this.subscribe(threadId)订阅当前线程,返回一个RFuture;
  • 如果在指定时间没有监听到,则会产生如上异常。
  • 订阅成功后, 通过while(true)循环,一直尝试获取锁
  • fially代码块,会解除订阅

所以上述这情况问题应该出现在subscribe()方法中

2、详细看下subscribe()方法

protected RFuture<RedissonLockEntry> subscribe(long threadId) {
? ? // entryName 格式:“id:name”;
? ? // channelName 格式:“redisson_lock__channel:name”;
? ? return pubSub.subscribe(getEntryName(), getChannelName());
}

RedissonLock#pubSub 是在RedissonLock构造函数中初始化的:

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
? ? // ....
? ? this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}

而subscribeService在MasterSlaveConnectionManager的实现中又是通过如下方式构造的

public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) {
? ? this(config, id);
? ? this.config = cfg;

? ? // 初始化
? ? initTimer(cfg);
? ? initSingleEntry();
}

protected void initTimer(MasterSlaveServersConfig config) {
? ? int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout()};
? ? Arrays.sort(timeouts);
? ? int minTimeout = timeouts[0];
? ? if (minTimeout % 100 != 0) {
? ? ? ? minTimeout = (minTimeout % 100) / 2;
? ? } else if (minTimeout == 100) {
? ? ? ? minTimeout = 50;
? ? } else {
? ? ? ? minTimeout = 100;
? ? }

? ? timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), minTimeout, TimeUnit.MILLISECONDS, 1024, false);

? ? connectionWatcher = new IdleConnectionWatcher(this, config);

? ? // 初始化:其中this就是MasterSlaveConnectionManager实例,config则为MasterSlaveServersConfig实例:
? ? subscribeService = new PublishSubscribeService(this, config);
}

PublishSubscribeService构造函数

private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);
public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
? ? super();
? ? this.connectionManager = connectionManager;
? ? this.config = config;
? ? for (int i = 0; i < locks.length; i++) {
? ? ? ? // 这里初始化了一组信号量,每个信号量的初始值为1
? ? ? ? locks[i] = new AsyncSemaphore(1);
? ? }
}

3、回到subscribe()方法主要逻辑还是交给了 LockPubSub#subscribe()里面

private final ConcurrentMap<String, E> entries = new ConcurrentHashMap<>();

public RFuture<E> subscribe(String entryName, String channelName) {
? ? ? // 从PublishSubscribeService获取对应的信号量。 相同的channelName获取的是同一个信号量
? ? ?// public AsyncSemaphore getSemaphore(ChannelName channelName) {
? ? // ? ?return locks[Math.abs(channelName.hashCode() % locks.length)];
? ? // }
? ? AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));

? ? AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>(); ? ?
? ? RPromise<E> newPromise = new RedissonPromise<E>() {
? ? ? ? @Override
? ? ? ? public boolean cancel(boolean mayInterruptIfRunning) {
? ? ? ? ? ? return semaphore.remove(listenerHolder.get());
? ? ? ? }
? ? };

? ? Runnable listener = new Runnable() {

? ? ? ? @Override
? ? ? ? public void run() {
? ? ? ? ? ? // ?如果存在RedissonLockEntry, 则直接利用已有的监听
? ? ? ? ? ? E entry = entries.get(entryName);
? ? ? ? ? ? if (entry != null) {
? ? ? ? ? ? ? ? entry.acquire();
? ? ? ? ? ? ? ? semaphore.release();
? ? ? ? ? ? ? ? entry.getPromise().onComplete(new TransferListener<E>(newPromise));
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? }

? ? ? ? ? ? E value = createEntry(newPromise);
? ? ? ? ? ? value.acquire();

? ? ? ? ? ? E oldValue = entries.putIfAbsent(entryName, value);
? ? ? ? ? ? if (oldValue != null) {
? ? ? ? ? ? ? ? oldValue.acquire();
? ? ? ? ? ? ? ? semaphore.release();
? ? ? ? ? ? ? ? oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? }

? ? ? ? ? ? // 创建监听,
? ? ? ? ? ? RedisPubSubListener<Object> listener = createListener(channelName, value);
? ? ? ? ? ? // 订阅监听
? ? ? ? ? ? service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
? ? ? ? }
? ? };

? ? // 最终会执行listener.run方法
? ? semaphore.acquire(listener);
? ? listenerHolder.set(listener);

? ? return newPromise;
}

AsyncSemaphore#acquire()方法

public void acquire(Runnable listener) {
? ? acquire(listener, 1);
}

public void acquire(Runnable listener, int permits) {
? ? boolean run = false;

? ? synchronized (this) {
? ? ? ? // counter初始化值为1
? ? ? ? if (counter < permits) {
? ? ? ? ? ? // 如果不是第一次执行,则将listener加入到listeners集合中
? ? ? ? ? ? listeners.add(new Entry(listener, permits));
? ? ? ? ? ? return;
? ? ? ? } else {
? ? ? ? ? ? counter -= permits;
? ? ? ? ? ? run = true;
? ? ? ? }
? ? }

? ? // 第一次执行acquire, 才会执行listener.run()方法
? ? if (run) {
? ? ? ? listener.run();
? ? }
}

梳理上述逻辑:

1、从PublishSubscribeService获取对应的信号量, 相同的channelName获取的是同一个信号量
2、如果是第一次请求,则会立马执行listener.run()方法, 否则需要等上个线程获取到该信号量执行完方能执行;
3、如果已经存在RedissonLockEntry, 则利用已经订阅就行
4、如果不存在RedissonLockEntry, 则会创建新的RedissonLockEntry,然后进行。

从上面代码看,主要逻辑是交给了PublishSubscribeService#subscribe方法

4、PublishSubscribeService#subscribe逻辑如下:

private final ConcurrentMap<ChannelName, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<>();
private final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<>();

public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
? ? RPromise<PubSubConnectionEntry> promise = new RedissonPromise<PubSubConnectionEntry>();
? ? // 主要逻辑入口, 这里要主要channelName每次都是新对象, 但内部覆写hashCode+equals。
? ? subscribe(codec, new ChannelName(channelName), promise, PubSubType.SUBSCRIBE, semaphore, listeners);
? ? return promise;
}

private void subscribe(Codec codec, ChannelName channelName, ?RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {

? ? PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);
? ? if (connEntry != null) {
? ? ? ? // 从已有Connection中取,如果存在直接把listeners加入到PubSubConnectionEntry中
? ? ? ? addListeners(channelName, promise, type, lock, connEntry, listeners);
? ? ? ? return;
? ? }

? ? // 没有时,才是最重要的逻辑
? ? freePubSubLock.acquire(new Runnable() {

? ? ? ? @Override
? ? ? ? public void run() {
? ? ? ? ? ? if (promise.isDone()) {
? ? ? ? ? ? ? ? lock.release();
? ? ? ? ? ? ? ? freePubSubLock.release();
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? }

? ? ? ? ? ? // 从队列中取头部元素
? ? ? ? ? ? PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
? ? ? ? ? ? if (freeEntry == null) {
? ? ? ? ? ? ? ? // 第一次肯定是没有的需要建立
? ? ? ? ? ? ? ? connect(codec, channelName, promise, type, lock, listeners);
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? }

? ? ? ? ? ? // 如果存在则尝试获取,如果remainFreeAmount小于0则抛出异常终止了。
? ? ? ? ? ? int remainFreeAmount = freeEntry.tryAcquire();
? ? ? ? ? ? if (remainFreeAmount == -1) {
? ? ? ? ? ? ? ? throw new IllegalStateException();
? ? ? ? ? ? }

? ? ? ? ? ? PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
? ? ? ? ? ? if (oldEntry != null) {
? ? ? ? ? ? ? ? freeEntry.release();
? ? ? ? ? ? ? ? freePubSubLock.release();

? ? ? ? ? ? ? ? addListeners(channelName, promise, type, lock, oldEntry, listeners);
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? }

? ? ? ? ? ? // 如果remainFreeAmount=0, 则从队列中移除
? ? ? ? ? ? if (remainFreeAmount == 0) {
? ? ? ? ? ? ? ? freePubSubConnections.poll();
? ? ? ? ? ? }
? ? ? ? ? ? freePubSubLock.release();

? ? ? ? ? ? // 增加监听
? ? ? ? ? ? RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, freeEntry, listeners);

? ? ? ? ? ? ChannelFuture future;
? ? ? ? ? ? if (PubSubType.PSUBSCRIBE == type) {
? ? ? ? ? ? ? ? future = freeEntry.psubscribe(codec, channelName);
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? future = freeEntry.subscribe(codec, channelName);
? ? ? ? ? ? }

? ? ? ? ? ? future.addListener(new ChannelFutureListener() {
? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? public void operationComplete(ChannelFuture future) throws Exception {
? ? ? ? ? ? ? ? ? ? if (!future.isSuccess()) {
? ? ? ? ? ? ? ? ? ? ? ? if (!promise.isDone()) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? subscribeFuture.cancel(false);
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? return;
? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? ? ? connectionManager.newTimeout(new TimerTask() {
? ? ? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? ? ? public void run(Timeout timeout) throws Exception {
? ? ? ? ? ? ? ? ? ? ? ? ? ? subscribeFuture.cancel(false);
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? }, config.getTimeout(), TimeUnit.MILLISECONDS);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? });
? ? ? ? }

? ? });
}


private void connect(Codec codec, ChannelName channelName, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {
? ? // 根据channelName计算出slot获取PubSubConnection
? ? int slot = connectionManager.calcSlot(channelName.getName());
? ? RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
? ? promise.onComplete((res, e) -> {
? ? ? ? if (e != null) {
? ? ? ? ? ? ((RPromise<RedisPubSubConnection>) connFuture).tryFailure(e);
? ? ? ? }
? ? });


? ? connFuture.onComplete((conn, e) -> {
? ? ? ? if (e != null) {
? ? ? ? ? ? freePubSubLock.release();
? ? ? ? ? ? lock.release();
? ? ? ? ? ? promise.tryFailure(e);
? ? ? ? ? ? return;
? ? ? ? }

? ? ? ? // 这里会从配置中读取subscriptionsPerConnection
? ? ? ? PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
? ? ? ? // 每获取一次,subscriptionsPerConnection就会减直到为0
? ? ? ? int remainFreeAmount = entry.tryAcquire();

? ? ? ? // 如果旧的存在,则将现有的entry释放,然后将listeners加入到oldEntry中
? ? ? ? PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
? ? ? ? if (oldEntry != null) {
? ? ? ? ? ? releaseSubscribeConnection(slot, entry);

? ? ? ? ? ? freePubSubLock.release();

? ? ? ? ? ? addListeners(channelName, promise, type, lock, oldEntry, listeners);
? ? ? ? ? ? return;
? ? ? ? }


? ? ? ? if (remainFreeAmount > 0) {
? ? ? ? ? ? // 加入到队列中
? ? ? ? ? ? freePubSubConnections.add(entry);
? ? ? ? }
? ? ? ? freePubSubLock.release();

? ? ? ? RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, entry, listeners);

? ? ? ? // 这里真正的进行订阅(底层与redis交互)
? ? ? ? ChannelFuture future;
? ? ? ? if (PubSubType.PSUBSCRIBE == type) {
? ? ? ? ? ? future = entry.psubscribe(codec, channelName);
? ? ? ? } else {
? ? ? ? ? ? future = entry.subscribe(codec, channelName);
? ? ? ? }

? ? ? ? future.addListener(new ChannelFutureListener() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public void operationComplete(ChannelFuture future) throws Exception {
? ? ? ? ? ? ? ? if (!future.isSuccess()) {
? ? ? ? ? ? ? ? ? ? if (!promise.isDone()) {
? ? ? ? ? ? ? ? ? ? ? ? subscribeFuture.cancel(false);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? return;
? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? connectionManager.newTimeout(new TimerTask() {
? ? ? ? ? ? ? ? ? ? @Override
? ? ? ? ? ? ? ? ? ? public void run(Timeout timeout) throws Exception {
? ? ? ? ? ? ? ? ? ? ? ? subscribeFuture.cancel(false);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }, config.getTimeout(), TimeUnit.MILLISECONDS);
? ? ? ? ? ? }
? ? ? ? });
? ? });
}

PubSubConnectionEntry#tryAcquire方法, subscriptionsPerConnection代表了每个连接的最大订阅数。当tryAcqcurie的时候会减少这个数量:

?public int tryAcquire() {
????while (true) {
????????int value = subscribedChannelsAmount.get();
????????if (value == 0) {
????????????return -1;
????????}

????????if (subscribedChannelsAmount.compareAndSet(value, value - 1)) {
????????????return value - 1;
????????}
????}
}

梳理上述逻辑:

1、还是进行重复判断, 根据channelName从name2PubSubConnection中获取,看是否存在已经订阅:PubSubConnectionEntry; 如果存在直接把新的listener加入到PubSubConnectionEntry。
2、从队列freePubSubConnections中取公用的PubSubConnectionEntry, 如果没有就进入connect()方法

2.1 会根据subscriptionsPerConnection创建PubSubConnectionEntry, 然后调用其tryAcquire()方法 - 每调用一次就会减1
2.2 将新的PubSubConnectionEntry放入全局的name2PubSubConnection, 方便后续重复使用;
2.3 同时也将PubSubConnectionEntry放入队列freePubSubConnections中。- remainFreeAmount > 0
2.4 后面就是进行底层的subscribe和addListener

3、如果已经存在PubSubConnectionEntry,则利用已有的PubSubConnectionEntry进行tryAcquire;
4、如果remainFreeAmount < 0 会抛出IllegalStateException异常;如果remainFreeAmount=0,则会将其从队列中移除, 那么后续请求会重新获取一个可用的连接
5、最后也是进行底层的subscribe和addListener;

三 总结

根因: 从上面代码分析, 导致问题的根因是因为PublishSubscribeService 会使用公共队列中的freePubSubConnections, 如果同一个key一次性请求超过subscriptionsPerConnection它的默认值5时,remainFreeAmount就可能出现-1的情况, 那么就会导致commandExecutor.syncSubscription(future)中等待超时,也就抛出如上异常Subscribe timeout: (7500ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.

解决方法: 在初始化Redisson可以可指定这个配置项的值。

相关参数的解释以及默认值请参考官网:https://github.com/redisson/redisson/wiki/2.-Configuration#23-common-settings

到此这篇关于关于使用Redisson订阅数问题的文章就介绍到这了,更多相关Redisson 订阅数 内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • RedisDesktopManager无法远程连接Redis的完美解决方法

    RedisDesktopManager无法远程连接Redis的完美解决方法

    下载RedisDesktopManager客户端,输入服务器IP地址,端口(缺省值:6379);点击Test Connection按钮测试连接,连接失败,怎么回事呢?下面小编给大家带来了RedisDesktopManager无法远程连接Redis的完美解决方法,一起看看吧
    2018-03-03
  • Redis源码环境构建过程详解

    Redis源码环境构建过程详解

    这篇文章主要介绍了Redis源码环境构建过程,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-07-07
  • redis数据类型_动力节点Java学院整理

    redis数据类型_动力节点Java学院整理

    这篇文章主要介绍了redis数据类型,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-08-08
  • redis中数据类型命令整理

    redis中数据类型命令整理

    在本篇文章里小编给大家整理的是关于redis中5种数据类型基本命令介绍,需要的朋友们可以学习下。
    2020-03-03
  • 基于Redis实现分布式单号及分布式ID(自定义规则生成)

    基于Redis实现分布式单号及分布式ID(自定义规则生成)

    一些业务背景下,业务要求单号需要有区分不同的前缀,那么在分布式的架构下如何自定义单号而且还能保证唯一呢?本文就来详细的介绍一下
    2021-09-09
  • 关于Redis未授权访问漏洞利用的介绍与修复建议

    关于Redis未授权访问漏洞利用的介绍与修复建议

    Redis是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API,下面这篇文章主要给大家介绍了关于Redis未授权访问漏洞利用的介绍和修复建议,文中介绍的非常详细,需要的朋友可以参考下。
    2017-07-07
  • redis禁止几个危险命令的方法

    redis禁止几个危险命令的方法

    今天小编就为大家分享一篇redis禁止几个危险命令的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-05-05
  • Redis教程(二):String数据类型

    Redis教程(二):String数据类型

    这篇文章主要介绍了Redis教程(二):String数据类型,本文讲解了String数据类型概述、相关命令列表、命令使用示例三部分内容,需要的朋友可以参考下
    2015-04-04
  • Windows下Redis的安装使用教程

    Windows下Redis的安装使用教程

    这篇文章主要以图文结合的方式为大家详细介绍了Windows下Redis的安装使用,Redis的出现,很大程度补偿了memcached这类key/value存储的不足,在部分场合可以对关系数据库起到很好的补充作用,对Redis感兴趣的小伙伴们可以参考一下
    2016-05-05
  • redis中的数据结构和编码详解

    redis中的数据结构和编码详解

    本文主要和大家分享几种Redis数据结构详解,希望文中的案例和代码,能帮助到大家。
    2020-03-03

最新评论