redisson中连接对象创建及断线重连 – i flym

您所在的位置:网站首页 redission连接池不释放 redisson中连接对象创建及断线重连 – i flym

redisson中连接对象创建及断线重连 – i flym

#redisson中连接对象创建及断线重连 – i flym| 来源: 网络整理| 查看: 265

在redisson中,由于使用了netty来封装对redis的协议访问,因此对于连接对象的创建和释放,也借由相应的connection来实现。由篇由masterSlave的角度,描述整个redisson中对于client的封装,以及在网络中断情况下,客户端会得到什么样的反馈,如何实现重连的情况。

本篇主要介绍master slave情况下各个对象的关系图信息,以及在具体创建时的一些处理问题.Redisson版本:2.1.1

1 对象关系图

整个关系如上所示,由下图进行描述

由MasterSlaveServersConfig负责配置连接的各项参数,比如master地址,slave地址,连接数大小等,这个对象是在redision创建时,由Config对象负责创建的 在调用Redission.create时,创建起相应的connectionManager对象,其持有相应的master连接信息,以及相应slave的连接信息 相应的connectionManager负责创建卢相应masterEntry以及slaveEntry信息,并且保存相应的映射信息 connectionManager负责当前客户端对于具体服务器端的各项配置以下,比如转码器,连接池,使用的各项协议等,其根据这些信息,使用netty创建起相应的redisClient对象 entry将已经创建好的redisClient交由connectionEntry负责持有,因此这里的client仅表示一个特有的客户端连接信息。在初始化时并不自动创建相应的连接 entry因此持有相应的client,因此也自然根据当前的需要创建出相应的connection对象,这里entry的创建工作交由client负责,同时将创建好的connection管理起来

2 代码的创建过程

这里通过一个具体的API调用来描述创建过程,选择的API为 RedissonList.size.先列出相应的调用代码

1 //size方法,发起sizeAsync异步调用 public int size() { return get(sizeAsync()); } 2 //sizeAsync,使用命令执行器,发起 LLEN的异步调用 public Future sizeAsync() { return commandExecutor.readAsync(getName(), LLEN, getName()); } 3 //异步调用,先计算具体用于连接的桶(用于集群调用),最终落到具体的客户端中 public Future readAsync(String key, Codec codec, RedisCommand command, Object ... params) { Promise mainPromise = connectionManager.newPromise(); int slot = connectionManager.calcSlot(key); async(true, slot, null, codec, command, params, mainPromise, 0); ...... } 4 //执行具体的异步调用逻辑 protected void async(final boolean readOnlyMode, final int slot, final MultiDecoder messageDecoder, final Codec codec, final RedisCommand command, final Object[] params, final Promise mainPromise, final int attempt) { ...... org.redisson.client.RedisConnection connection; //创建起相应的对象 connection = connectionManager.connectionWriteOp(slot); ChannelFuture future = connection.send(new CommandData(attemptPromise, messageDecoder, codec, command, params)); ...... //注册调用成功回调,以释放连接 attemptPromise.addListener(connectionManager.createReleaseWriteListener(slot, connection, timeout)); ...... attemptPromise.addListener(new FutureListener() { public void operationComplete(Future future) throws Exception { //成功回调,设置相应的值 mainPromise.setSuccess(future.getNow()); }); } 5 //这里回到创建写连接的过程,通过定位到具体的entry,返回相应的连接对象, //这里即connection维护了相应entry对象,反向由entry来获取相应的连接 public RedisConnection connectionWriteOp(int slot) { MasterSlaveEntry e = getEntry(slot); return e.connectionWriteOp(); } 6 //获取写连接,先尝试从池中获取,不然就创建新的(这里受连接数限制,由Semaphore信号量控制) public RedisConnection connectionWriteOp() { ..... RedisConnection conn = masterEntry.getConnections().poll(); //这里的masterEntry即,connectionEntry,转由connectionEntry来创建新连接 return masterEntry.connect(config); } 7 //connectionEntry创建connection对象,其实这里就将相应的工作转由client来进行,同时负责持有相应connection的引用信息,以形成连接池 public RedisConnection connect(MasterSlaveServersConfig config) { RedisConnection conn = client.connect(); ......//负责绑定database,验证等 return conn; } 8 //具体client创建connection的过程,即将相应的工作交由bootstrap来进行。而bootstrap即netty的一个简单客户端连接工具类 public RedisConnection connect() { try { ChannelFuture future = bootstrap.connect(); future.syncUninterruptibly(); return new RedisConnection(this, future.channel()); } catch (Exception e) { throw new RedisConnectionException("unable to connect", e); } }

经过以上的步骤,整个连接对象的创建即创建起来,并且最终交由connectionEntry持有,其内部的结构如下所示:

//相应的底层连接client private final RedisClient client; //缓存的连接信息 private final Queue connections = new ConcurrentLinkedQueue(); //信号量,用于控制并发连接数信息 private final Semaphore connectionsSemaphore;

可以看出,通过这个类,即可完成连接信息的存储以及创建,获取等过程

3 封装底层netty处理

在上面的第2部分的第8个步骤当中,可以看出。redisson将创建好的通道注册入collection中,那么一个connection通过即持有相应的IO通道信息,通过通道,即可达到发送命令以及接收数据的目的。相应的调用方法如下所示:

//等待,直到获取相应的结果信息 public R await(Future cmd) { if (!cmd.awaitUninterruptibly(redisClient.getTimeout(), TimeUnit.MILLISECONDS)) { throw ex; } return cmd.getNow(); } //一个同步调用,通过在相应的异步方法作作await操作,以获取相应的调用 public R sync(Codec encoder, RedisCommand command, Object ... params) { Future r = async(encoder, command, params); return await(r); } //一个异步调用,典型的promise调用 public Future async(Codec encoder, RedisCommand command, Object ... params) { Promise promise = redisClient.getBootstrap().group().next().newPromise(); send(new CommandData(promise, encoder, command, params)); return promise; } //发送命令,即直接往通道里写相应的数据信息 public ChannelFuture send(CommandData data) { return channel.writeAndFlush(data); }

4    数据处理器

在创建connection时,并没有看到相应的数据处理器的注册操作,原因是redisson已经将整个注册工作提前。即在创建connection之前已经设置了相应的处理器,如下所示:

public RedisClient(EventLoopGroup group, Class


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3