首页 技术 正文
技术 2022年11月12日
0 收藏 611 点赞 4,625 浏览 9904 个字

前言

  前几天在学习Redis Cluster 模式的时候,突然想到如果把它的集群模式应用在T-io上也是挺有意思的一件事情。

Redis 集群简介

   Redis Cluster 中有 N 台实例,每个实例负责部分 Slot,总共有 16384 个Slot,然后客户端连接的时候,需要根据操作的Key计算出所在的Slot和服务实例地址,然后直接执行或者返回MOVE命令等。实例之间的元数据更新使用Gossip协议。简单一张图了解一下:

Tio集群

   我设计的很简单,就是一个多实例集群,没有主从关系,集群之间通过 伪Gossip(因为我也不知道咋实现,┭┮﹏┭┮)协议通讯。我的设计思路是这样的。

从图中可以看出,每个实例会和集群中的其他的某几个实例相连,通过信息扩散的方式达到最终集群的完整性。比如A只知道B的存在,但是它最终会根据B知道C的存在。或者C 根据B知道A的存在。

实现原理

每个实例在启动之后会加载集群的其他某几个节点。然后上线之后通过 ClientChannelContext 与另外的集群节点进行通讯。模仿Redis设计了如下几个命令:

MEET

在一个实例上线之后,会发送 MEET 命令给其他已知实例。其他实例收到命令之后回复PONG命令。

PING

实例会定期在实例列表中选择若干实例发送PING命令,对方实例回复PONG命令,交换信息

PONG

在接收到 MEET,PING命令之后回复PONG命令

FAIL

在创建 ClientChannelContext对象之后,可以通过ClientAioListener获取连接回调,如果连接失败,那么实例将向其他在线实例发送FAIL命令,当本节点收到的FAILE命令数大于节点总数的一半时,认为该节点确实已经宕掉了,那么在执行UNAVAILABLE命令

UNAVAILABLE

如果说FAIL命令是病危通知书,那么 UNAVAILABLE 就是 确认死亡。

代码设计实现

TioClusterServer

继承自 TioServer,新增start方法,增加其他集群业务,例如初始化集群上下文信息

ClusterServerTioConfig

继承自 ServerTioConfig,为了约束客户端传参类型。

TioClusterNode

集群节点,包含 IP,Port,创建时间,状态 等信息,同时负责创建自己的ClientChannelContext

TioClusterContext

集群上下文,负责集群节点的创建,集群数据更新工作,同时负责调度各个节点的事件处理。

ClusterCommandExecutor

集群命令发送执行器,负责将各个命令发送出去

ClusterAioHandler,ClusterAioListener,ClusterServerAioHandler,ClusterServerAioListener

这几个类大家就很熟悉了,就是用于编解码的处理类。

集群启动流程

使用Gossip协议有个缺点,集群上线之后,可能会有很大的延迟发现。实例越多,延迟越大。更何况我自己实现的伪 Gossip 协议,暂且不提。

代码解析

在集群上下文中,维护了一个节点的集合,节点的增删改查都是通过它来实现。

  /**
* 当前集群的所有节点
*/
private ConcurrentHashMap<String, TioClusterNode> clusterNodes = new ConcurrentHashMap<>(10);

初始化集群

单例,集群实例中只有一个大管家就是它,全局可用。

 /**
* 初始化集群
*/
public TioClusterContext(ClusterCommandExecutor clusterCommandExecutor) {
if (tioClusterContext == null) {
synchronized (this) {
if (tioClusterContext == null) {
this.startTime = SystemTimer.currentTimeMillis();
this.clusterCommandExecutor = clusterCommandExecutor;
this.clusterProperty = TioClusterProperty.getClusterProperty(); initCurrentNode();
initClusterNodes();
tioClusterContext = this;
}
}
}
}

通知节点选择

根据配置文件选择要发送消息的N个节点,原则如下,距离上次更新时间超过配置文件中的超时时间的节点优先。然后轮询选择下一个节点,防止某个节点可能较长时间未被选中(也可以通过时上次更新时间排序解决),达到配置文件的节点数,停止。

 public List<TioClusterNode> selectNotifyNodes(boolean containsCurrent,boolean containsUnavailable) {
List<TioClusterNode> nodes = new LinkedList<>();
TioClusterNode[] nodesInArray = getClusterNodesInArray(containsCurrent);
//根据时间间隔查找下一个节点
for (TioClusterNode node : nodesInArray) {
if ((SystemTimer.currentTimeMillis() - node.getUpdateTime()) >= clusterProperty.getNotifyTimeInterval()
&& nodes.size() < clusterProperty.getNotifyNodesMaxCount()
&& (containsCurrent || !isCurrentNode(node))
&& !nodes.contains(node)
&& (containsUnavailable || !node.isUnAvailable())) {
nodes.add(node);
}
} int maxCount = nodesInArray.length;
int lastCount = (clusterProperty.getNotifyNodesMaxCount() > maxCount ? maxCount : clusterProperty.getNotifyNodesMaxCount()) - nodes.size(); int tryMaxCount = lastCount;
int tryCount = 0;
//很容易出现死循环,做好边界
while (lastCount > 0 && tryCount < tryMaxCount) {
TioClusterNode nextNode = RoundRibbonNodeSelector.nextPingNode(nodesInArray);
if (nextNode != null && !nodes.contains(nextNode)) {
if (containsUnavailable || !nextNode.isUnAvailable()) {
nodes.add(nextNode);
}
lastCount--;
}
tryCount++;
}
return nodes;
}

命令发送

发送命令很简单,就是调用 Tio 的Send方法。

 private void send(TioClusterNode node, Consumer<ChannelContext> consumer){
ClientChannelContext channelContext = node.getClientChannelContext();
if (node.connectionActivated()) {
consumer.accept(channelContext);
}
} @Override
public void meet(TioClusterNode other) {
send(other, channelContext -> Tio.send(channelContext, ClusterPacketBuilder.buildMeetPacket()));
}

命令编码

由于格式固定,所以命令的内容就不直接用 JSON 格式传输,根据命令的传输内容不同,分别组装不同的命令。

  public ByteBuffer build( final ClusterPacket clusterPacket) {
this.clusterPacket = clusterPacket;
byte[] body = body();
ByteBuffer buffer = ByteBuffer.allocate(bufferLength());
//command
buffer.put(clusterPacket.command().getType());
//fromServerLength
buffer.putShort(fromServerLength());
//bodyLength
buffer.putInt(body.length);
//fromServer
buffer.put(fromServerBytes);
buffer.put(body);
return buffer;
}
//此方法由具体的命令类实现,每个命令所传输的内容不同,格式也不同
protected abstract byte[] body();

AioHandler 解耦

由于我们在内部处理集群消息的时候需要用到AioHandler,所以这个是避免不了的,但是用户又有自己的消息要处理,所以,我又增加了一层,将ClusterServerAioHandler作为抽象类,增加若干抽象方法,然后具体的用户消息的处理放到新增的抽象方法里,其他类同理,不在阐述。

public abstract class ClusterServerAioHandler implements ServerAioHandler {    private static final Logger logger = LoggerFactory.getLogger(ClusterServerAioHandler.class);    /**
* 用户自定义解码
*/
public abstract Packet clusterDecode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException; /**
* 用户自定义编码
*/
public abstract ByteBuffer clusterEncode(Packet packet, TioConfig tioConfig, ChannelContext channelContext); /**
* 用户自定义消息处理
*/
public abstract void clusterHandler(Packet packet, String packetJsonString, ChannelContext channelContext) throws Exception; /**
* 处理集群消息解码
* */
@Override
public final Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext channelContext) throws AioDecodeException {
Packet clusterPacket = ClusterPacketDecoder.decode(buffer, limit, position, readableLength, channelContext);
if (clusterPacket != null) {
return clusterPacket;
}
//reset buffer because cluster has already read a byte for cluster command
buffer.position(position);
return clusterDecode(buffer, limit, position, readableLength, channelContext);
} /**
* 处理集群消息编码
* */
@Override
public final ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
ByteBuffer buffer = ClusterPacketEncoder.encode(packet, tioConfig, channelContext);
if (buffer != null) {
return buffer;
}
//当集群解析的Buffer为NULL时候,说明是非集群内部消息,交给用户自定义处理
return clusterEncode(packet, tioConfig, channelContext);
} /**
*
* 处理集群消息*/
@Override
public final void handler(Packet packet, ChannelContext channelContext) throws Exception {
//这里留空,后边讲解
}
}

集群消息的转发

暂时还没有将 slot 的概念派上用场,先用了普通的取模方式。此方法是根据业务主键获取所在节点的方法。比如 ID为1 的用户连接的服务节点可能为 127.0.0.1:7002,ID为2的用户连接的服务节点为127.0.0.1:7001.至于细节可以屏蔽,比如通过NGINX让用户无感知到底连接了哪台服务器。

 /**
* 根据 hashCode 获取该值所绑定的节点,如果集群机器增加或者删除可能会导致节点错误,发消息失败(暂不考虑)
* */
public static TioClusterNode getClusterServerNode(Object object) {
int hashCode = object.hashCode();
//获取集群活跃节点
List<TioClusterNode> nodes = TioClusterContext.currentContext().getActivateNodes();
int length = nodes.size();
//取模
int index = hashCode % length;
return nodes.get(index);
}

发送消息

 private static boolean sendToUserInternal(TioConfig tioConfig,String userId, Packet packet) {
TioClusterNode node = getClusterServerNode(userId);
//如果是本机,直接发送到本机即可
if (TioClusterContext.currentContext().currentServer().equals(node) && tioConfig != null) {
return Tio.sendToUser(tioConfig, userId, packet);
} else {
//这里需要包装成为ClusterPacket,因为集群内部的消息是通过ClusterPacket实现的
ClusterPacket userPacket = ClusterPacketBuilder.buildUserPacket(packet);
return Tio.send(node.getClientChannelContext(), userPacket);
}
}

上文中的代码很简单,如果获取到的节点是本机节点,那么很幸运,直接执行发送即可。否则,需要将Packet包装到ClusterPacket中,因为集群内部的消息都是通过它来流通的。新增 USER 命令。它的包编码很简单,转JSON。

    @Override
protected byte[] body() {
Packet userPacket = ((ClusterUserPacket) clusterPacket).getPacketBody().getUserPacket();
String json = Json.toJson(userPacket);
return SafeStringEncoder.stringToBytes(json);
}

在上文中留空的地方代码如下:

 @Override
public final void handler(Packet packet, ChannelContext channelContext) throws Exception {
//第一种情况,内部消息转发
if (packet instanceof ClusterPacket) {
ClusterPacket clusterPacket = (ClusterPacket) packet;
logger.info("[{}] RECEIVED [{}] FROM [{}] ", TioClusterContext.currentContext().currentServer().serverName(),clusterPacket.command().getCmd() ,clusterPacket.getPacketBody().getFromNode().serverName());
//当命令为USER时,说明此消息是从其他节点过来的用户消息
if (clusterPacket.command() == ClusterCommand.USER) {
ClusterUserPacket clusterUserPacket = (ClusterUserPacket) packet;
//由于框架并不知道用户使用了具体的Packet类型,所以,这里提供了 JSON 和 对象的两种传递方式,在这里 getuserPakcet 是为NULL的。
clusterHandler(clusterUserPacket.getPacketBody().getUserPacket(), clusterUserPacket.getPacketBody().getUserPacketString(), channelContext);
} else {
ClusterObjectFactory.getPacketHandler(clusterPacket.command()).handle(clusterPacket, channelContext);
}
} else {
//第二种情况,直接走用户消息处理
clusterHandler(packet, null, channelContext);
}
}

测试环节

  • 1 编写 ServerAioHandler,ServerAioListener,正如上文所讲,需要继承ClusterServerAioHandler
public class MyClusterServerAioHandler extends ClusterServerAioHandler {}
public class MyClusterServerAioListener extends ClusterServerAioListener{}
  • 2 编写starter
  public static void main(String[] args) throws IOException {
start();
} private static void start() throws IOException {
//这里要实例化 ClusterSververTioConfig
tioConfig = new ClusterServerTioConfig(new MyClusterServerAioHandler(), new MyClusterServerAioListener());
TioClusterServer tioServer = new TioClusterServer(tioConfig);
tioServer.start();
}
  • 3 编写配置文件
tio.server.ip=127.0.0.1
tio.server.port=7001
tio.cluster.server.nodes=127.0.0.1:7002
tio.cluster.server.notify.max.count=4
tio.cluster.server.notify.interval=20000
tio.cluster.server.notify.retry.count=1
tio.cluster.server.notify.timeout=20000
tio.server.ip=127.0.0.1
tio.server.port=7002
tio.cluster.server.nodes=127.0.0.1:7001
tio.cluster.server.notify.max.count=4
tio.cluster.server.notify.interval=20000
tio.cluster.server.notify.retry.count=1
tio.cluster.server.notify.timeout=20000
  • 4 启动服务,查看日志

    日志中可以看出,刚开始一个节点是连不通的,但是该节点上线之后发送了MEET命令。

    现在两个节点都上线了,集群目前正常。

  • 5 启动两个客户端,一个用户1,一个用户2 ,用户1 连接 7002,用户2 连接 7001

  public static void main(String[] args) throws Exception {        String name = System.getenv("TIO-CLUSTER-FILE-NAME");
Prop prop = new Prop(name + ".properties"); ClientTioConfig config = new ClientTioConfig(new MyClientAioHandler(), new MyClientAioListener(), null);
config.setHeartbeatTimeout(0);
config.setName("testClient");
TioClient tioClient = new TioClient(config);
ClientChannelContext channelContext = tioClient.connect(new Node(prop.get("tio.server.ip"), prop.getInt("tio.server.port")), 20000); String userId = prop.get("tio.userid");
Tio.send(channelContext, MyPacket.handshakePacket(userId)); while (true) {
Thread.sleep(5000);
Tio.send(channelContext, MyPacket.helloPacket(prop.get("tio.touserid"), "hello there ,I'm user " + userId + ",I come from " + prop.get("tio.server.port")));
}
}

总结

本文通过参考Redis集群模式实现了一个简单的集群功能。开发过程中遇到的问题:

  • 集群节点的发现功能,Gossip协议的实现,需要优化,优化节点算法
  • 细节的处理,写着写着可能发现隐藏着死循环!!!
  • 并发的处理,在这方面由于能力原因处理较少,应该还有优化空间
  • 消息类的解耦和封装,还有很多优化空间

此集群实现可能出现的问题:

  • 集群节点过多的时候,一个实例可能维护多个 client ,再加上各种命令的发送,会比较浪费系统资源。可以通过合理的设置超时时间,通知节点的个数等来优化,我觉得也可以采用 RocketMQ的NameServer的方式去实现

  • 思路虽然不算错误,但是个人感觉,依赖第三方组件也是一种稳妥的做法,Redis发布订阅他不香吗?,Zookeeper管理集群节点不好吗?

  • 其实这都不是问题,最主要的是享受思考和开发的过程,通常会几个小时最后发现是自己犯了一个小错误导致的。不过还好,结局还是在接受范围内。

最后本文也花费了我接近两个小时的时间,分享出我的想法和开发历程。希望能给各位带来收获,另外也希望各位大牛提出更好的意见和方案。谢谢大家,回家喽~~

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,484
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,899
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,732
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,485
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:8,125
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:5,286