网易首页 > 网易号 > 正文 申请入驻

zookeeper的Leader选举源码解析

0
分享至

Tech

导读

zookeeper是一个分布式服务框架,主要解决分布式应用中常见的多种数据问题,例如集群管理,状态同步等。为解决这些问题zookeeper需要Leader选举进行保障数据的强一致性机制和稳定性。本文通过集群的配置,对leader选举源进行解析,让读者们了解如何利用BIO通信机制,多线程多层队列实现高性能架构。

01

Leader选举机制

在今年的敏捷团队建设中,我通过Suite执行器实现了一键自动化单元测试。Juint除了Suite执行器还有哪些执行器呢?由此我的Runner探索之旅开始了!

Leader选举机制采用半数选举算法。

每一个zookeeper服务端称之为一个节点,每个节点都有投票权,把其选票投向每一个有选举权的节点,当其中一个节点选举出票数过半,这个节点就会成为Leader,其它节点成为Follower。

02

Leader选举集群配置

理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将目标页面展示到屏幕。

1. 重命名zoo_sample.cfg文件为zoo1.cfg ,zoo2.cfg,zoo3.cfg,zoo4.cfg

2. 修改zoo.cfg文件,修改值如下:


【plain】 zoo1.cfg文件内容: dataDir=/export/data/zookeeper-1 clientPort=2181 server.1=127.0.0.1:2001:3001 server.2=127.0.0.1:2002:3002:participant server.3=127.0.0.1:2003:3003:participant server.4=127.0.0.1:2004:3004:observer
zoo2.cfg文件内容: dataDir=/export/data/zookeeper-2 clientPort=2182 server.1=127.0.0.1:2001:3001 server.2=127.0.0.1:2002:3002:participant server.3=127.0.0.1:2003:3003:participant server.4=127.0.0.1:2004:3004:observer
zoo3.cfg文件内容: dataDir=/export/data/zookeeper-3 clientPort=2183 server.1=127.0.0.1:2001:3001 server.2=127.0.0.1:2002:3002:participant server.3=127.0.0.1:2003:3003:participant server.4=127.0.0.1:2004:3004:observer
zoo4.cfg文件内容: dataDir=/export/data/zookeeper-4 clientPort=2184 server.1=127.0.0.1:2001:3001 server.2=127.0.0.1:2002:3002:participant server.3=127.0.0.1:2003:3003:participant server.4=127.0.0.1:2004:3004:observer

3. server.第几号服务器(对应myid文件内容)=ip:数据同步端口:选举端口:选举标识

  • participant默认参与选举标识,可不写. observer不参与选举

4.在/export/data/zookeeper-1,/export/data/zookeeper-2,/export/data/zookeeper-3,/export/data/zookeeper-4目录下创建myid文件,文件内容分别写1 ,2,3,4,用于标识sid(全称:Server ID)赋值。

5. 启动三个zookeeper实例:

  • bin/zkServer.sh start conf/zoo1.cfg

  • bin/zkServer.sh start conf/zoo2.cfg

  • bin/zkServer.sh start conf/zoo3.cfg

6. 每启动一个实例,都会读取启动参数配置zoo.cfg文件,这样实例就可以知道其作为服务端身份信息sid以及集群中有多少个实例参与选举。

03

Leader选举流程

理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将目标页面展示到屏幕。

图1 第一轮到第二轮投票流程

前提:

设定票据数据格式vote(sid,zxid,epoch)

  • sid是Server ID每台服务的唯一标识,是myid文件内容;

  • zxid是数据事务id号;

  • epoch为选举周期,为方便理解下面讲解内容暂定为1初次选举,不写入下面内容里。

按照顺序启动sid=1,sid=2节点

第一轮投票:

1. sid=1节点:初始选票为自己,将选票vote(1,0)发送给sid=2节点;

2. sid=2节点:初始选票为自己,将选票vote(2,0)发送给sid=1节点;

3. sid=1节点:收到sid=2节点选票vote(2,0)和当前自己的选票vote(1,0),首先比对zxid值,zxid越大代表数据最新,优先选择zxid最大的选票,如果zxid相同,选举最大sid。当前投票选举结果为vote(2,0),sid=1节点的选票变为vote(2,0);

4. sid=2节点:收到sid=1节点选票vote(1,0)和当前自己的选票vote(2,0),参照上述选举方式,选举结果为vote(2,0),sid=2节点的选票不变;

5. 第一轮投票选举结束。

第二轮投票:

1. sid=1节点:当前自己的选票为vote(2,0),将选票vote(2,0)发送给sid=2节点;

2. sid=2节点:当前自己的选票为vote(2,0),将选票vote(2,0)发送给sid=1节点;

3. sid=1节点:收到sid=2节点选票vote(2,0)和自己的选票vote(2,0), 按照半数选举算法,总共3个节点参与选举,已有2个节点选举出相同选票,推举sid=2节点为Leader,自己角色变为Follower;

4. sid=2节点:收到sid=1节点选票vote(2,0)和自己的选票vote(2,0),按照半数选举算法推举sid=2节点为Leader,自己角色变为Leader。

这时启动sid=3节点后,集群里已经选举出leader,sid=1和sid=2节点会将自己的leader选票发回给sid=3节点,通过半数选举结果还是sid=2节点为leader。

3.1 Leader选举采用多层队列架构

zookeeper选举底层主要分为选举应用层和消息传输队列层,第一层应用层队列统一接收和发送选票,而第二层传输层队列,是按照服务端sid分成了多个队列,是为了避免给每台服务端发送消息互相影响。比如对某台机器发送不成功不会影响正常服务端的发送。

图2 多层队列上下关系交互流程图

04

解析代码入口类

理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将目标页面展示到屏幕。

通过查看zkServer.sh文件内容找到服务启动类:

org.apache.zookeeper.server.quorum.QuorumPeerMain

05

选举流程代码解析

理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将目标页面展示到屏幕。

图3 选举代码实现流程图

1. 加载配置文件QuorumPeerConfig.parse(path);

针对 Leader选举关键配置信息如下:

  • 读取dataDir目录找到myid文件内容,设置当前应用sid标识,做为投票人身份信息。下面遇到myid变量为当前节点自己sid标识。

    • 设置peerType当前应用是否参与选举

  • new QuorumMaj()解析server.前缀加载集群成员信息,加载allMembers所有成员,votingMembers参与选举成员,observingMembers观察者成员,设置half值votingMembers.size()/2.



【Java】 public QuorumMaj(Properties props) throws ConfigException { for (Entry entry : props.entrySet()) { String key = entry.getKey().toString(); String value = entry.getValue().toString(); //读取集群配置文件中的server.开头的应用实例配置信息 if (key.startsWith("server.")) { int dot = key.indexOf('.'); long sid = Long.parseLong(key.substring(dot + 1)); QuorumServer qs = new QuorumServer(sid, value); allMembers.put(Long.valueOf(sid), qs); if (qs.type == LearnerType.PARTICIPANT) //应用实例绑定的角色为PARTICIPANT意为参与选举 votingMembers.put(Long.valueOf(sid), qs); else { //观察者成员 observingMembers.put(Long.valueOf(sid), qs); } } else if (key.equals("version")) { version = Long.parseLong(value, 16); } } //过半基数 half = votingMembers.size() / 2; }

2. QuorumPeerMain.runFromConfig(config) 启动服务;

3. QuorumPeer.startLeaderElection() 开启选举服务;

  • 设置当前选票new Vote(sid,zxid,epoch)



【plain】 synchronized public void startLeaderElection(){ try { if (getPeerState() == ServerState.LOOKING) { //首轮:当前节点默认投票对象为自己 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } } catch(IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; } //........ }
  • 创建选举管理类:QuorumCnxnManager;

  • 初始化recvQueue接收投票队列(第二层传输队列);

  • 初始化queueSendMap按sid发送投票队列(第二层传输队列);

  • 初始化senderWorkerMap发送投票工作线程容器,表示着与sid投票节点已连接;

  • 初始化选举监听线程类QuorumCnxnManager.Listener。


【Java】 //QuorumPeer.createCnxnManager() public QuorumCnxManager(QuorumPeer self, final long mySid, Map view, QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs, int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) { //接收投票队列(第二层传输队列) this.recvQueue = new ArrayBlockingQueue(RECV_CAPACITY); //按sid发送投票队列(第二层传输队列) this.queueSendMap = new ConcurrentHashMap>(); //发送投票工作线程容器,表示着与sid投票节点已连接 this.senderWorkerMap = new ConcurrentHashMap(); this.lastMessageSent = new ConcurrentHashMap();
String cnxToValue = System.getProperty("zookeeper.cnxTimeout"); if(cnxToValue != null){ this.cnxTO = Integer.parseInt(cnxToValue); }
this.self = self;
this.mySid = mySid; this.socketTimeout = socketTimeout; this.view = view; this.listenOnAllIPs = listenOnAllIPs;
initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize, quorumSaslAuthEnabled); // Starts listener thread that waits for connection requests //创建选举监听线程 接收选举投票请求 listener = new Listener(); listener.setName("QuorumPeerListener"); } //QuorumPeer.createElectionAlgorithm protected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 0: le = new LeaderElection(this); break; case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3: qcm = createCnxnManager();// new QuorumCnxManager(... new Listener()) QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start();//启动选举监听线程 FastLeaderElection fle = new FastLeaderElection(this, qcm); fle.start(); le = fle; } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return le;}

4. 开启选举监听线程QuorumCnxnManager.Listener;

  • 创建ServerSockket等待大于自己sid节点连接,连接信息存储到senderWorkerMap;

  • sid>self.sid才可以连接过来。



【Java】 //上面的listener.start()执行后,选择此方法 public void run() { int numRetries = 0; InetSocketAddress addr; Socket client = null; while((!shutdown) && (numRetries < 3)){ try { ss = new ServerSocket(); ss.setReuseAddress(true); if (self.getQuorumListenOnAllIPs()) { int port = self.getElectionAddress().getPort(); addr = new InetSocketAddress(port); } else { // Resolve hostname for this server in case the // underlying ip address has changed. self.recreateSocketAddresses(self.getId()); addr = self.getElectionAddress(); } LOG.info("My election bind port: " + addr.toString()); setName(addr.toString()); ss.bind(addr); while (!shutdown) { client = ss.accept(); setSockOpts(client); LOG.info("Received connection request " + client.getRemoteSocketAddress()); // Receive and handle the connection request // asynchronously if the quorum sasl authentication is // enabled. This is required because sasl server // authentication process may take few seconds to finish, // this may delay next peer connection requests. if (quorumSaslAuthEnabled) { receiveConnectionAsync(client); } else { //接收连接信息 receiveConnection(client); } numRetries = 0; } } catch (IOException e) { if (shutdown) { break; } LOG.error("Exception while listening", e); numRetries++; try { ss.close(); Thread.sleep(1000); } catch (IOException ie) { LOG.error("Error closing server socket", ie); } catch (InterruptedException ie) { LOG.error("Interrupted while sleeping. " + "Ignoring exception", ie); } closeSocket(client); } } LOG.info("Leaving listener"); if (!shutdown) { LOG.error("As I'm leaving the listener thread, " + "I won't be able to participate in leader " + "election any longer: " + self.getElectionAddress()); } else if (ss != null) { // Clean up for shutdown. try { ss.close(); } catch (IOException ie) { // Don't log an error for shutdown. LOG.debug("Error closing server socket", ie); } } }
//代码执行路径:receiveConnection()->handleConnection(...) private void handleConnection(Socket sock, DataInputStream din) throws IOException { //...省略 if (sid < self.getId()) { /* * This replica might still believe that the connection to sid is * up, so we have to shut down the workers before trying to open a * new connection. */ SendWorker sw = senderWorkerMap.get(sid); if (sw != null) { sw.finish(); }
/* * Now we start a new connection */ LOG.debug("Create new connection to server: {}", sid); closeSocket(sock);
if (electionAddr != null) { connectOne(sid, electionAddr); } else { connectOne(sid); }
} else { // Otherwise start worker threads to receive data. SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) { vsw.finish(); } //存储连接信息 senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(SEND_CAPACITY));
sw.start(); rw.start(); } }

5. 创建FastLeaderElection快速选举服务;

  • 初始选票发送队列sendqueue(第一层队列)

  • 初始选票接收队列recvqueue(第一层队列)

  • 创建线程WorkerSender

  • 创建线程WorkerReceiver



【Java】 //FastLeaderElection.starter private void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self; proposedLeader = -1; proposedZxid = -1; //发送队列sendqueue(第一层队列) sendqueue = new LinkedBlockingQueue(); //接收队列recvqueue(第一层队列) recvqueue = new LinkedBlockingQueue(); this.messenger = new Messenger(manager); } //new Messenger(manager) Messenger(QuorumCnxManager manager) { //创建线程WorkerSender this.ws = new WorkerSender(manager);
this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); this.wsThread.setDaemon(true); //创建线程WorkerReceiver this.wr = new WorkerReceiver(manager);
this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); this.wrThread.setDaemon(true); }

6. 开启WorkerSender和WorkerReceiver线程。

WorkerSender线程自旋获取sendqueue第一层队列元素

  • sendqueue队列元素内容为相关选票信息详见ToSend类;

  • 首先判断选票sid是否和自己sid值相同,相等直接放入到recvQueue队列中;

  • 不相同将sendqueue队列元素转储到queueSendMap第二层传输队列中。



【Java】//FastLeaderElection.Messenger.WorkerSenderclass WorkerSender extends ZooKeeperThread{ //... public void run() { while (!stop) { try { ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if(m == null) continue; //将投票信息发送出去 process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); } } //QuorumCnxManager#toSend public void toSend(Long sid, ByteBuffer b) { /* * If sending message to myself, then simply enqueue it (loopback). */ if (this.mySid == sid) { b.position(0); addToRecvQueue(new Message(b.duplicate(), sid)); /* * Otherwise send to the corresponding thread to send. */ } else { /* * Start a new connection if doesn't have one already. */ ArrayBlockingQueue bq = new ArrayBlockingQueue( SEND_CAPACITY); ArrayBlockingQueue oldq = queueSendMap.putIfAbsent(sid, bq); //转储到queueSendMap第二层传输队列中 if (oldq != null) { addToSendQueue(oldq, b); } else { addToSendQueue(bq, b); } connectOne(sid); } }

WorkerReceiver线程自旋获取recvQueue第二层传输队列元素转存到recvqueue第一层队列中。


【Java】 //WorkerReceiver public void run() { Message response; while (!stop) { // Sleeps on receive try { //自旋获取recvQueue第二层传输队列元素 response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); if(response == null) continue; // The current protocol and two previous generations all send at least 28 bytes if (response.buffer.capacity() < 28) { LOG.error("Got a short response: " + response.buffer.capacity()); continue; } //... if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ //第二层传输队列元素转存到recvqueue第一层队列中 recvqueue.offer(n); //... } } //... }

06

选举核心逻辑

理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将目标页面展示到屏幕。

1. 启动线程QuorumPeer

开始Leader选举投票makeLEStrategy().lookForLeader();

sendNotifications()向其它节点发送选票信息,选票信息存储到sendqueue队列中。sendqueue队列由WorkerSender线程处理。


【plain】 //QuorunPeer.run //... try { reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } //makeLEStrategy().lookForLeader() 发送投票 setCurrentVote(makeLEStrategy().lookForLeader()); } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); } //... //FastLeaderElection.lookLeader public Vote lookForLeader() throws InterruptedException { //... //向其他应用发送投票 sendNotifications(); //... }
private void sendNotifications() { //获取应用节点 for (long sid : self.getCurrentAndNextConfigVoters()) { QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING, sid, proposedEpoch, qv.toString().getBytes()); if(LOG.isDebugEnabled()){ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" + Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) + " (n.round), " + sid + " (recipient), " + self.getId() + " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)"); } //储存投票信息 sendqueue.offer(notmsg); } }
class WorkerSender extends ZooKeeperThread { //... public void run() { while (!stop) { try { //提取已储存的投票信息 ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if(m == null) continue;
process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); } //... }

自旋recvqueue队列元素获取投票过来的选票信息:


【Java】 public Vote lookForLeader() throws InterruptedException { //... /* * Loop in which we exchange notifications until we find a leader */ while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ /* * Remove next notification from queue, times out after 2 times * the termination time */ //提取投递过来的选票信息 Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS); /* * Sends more notifications if haven't received enough. * Otherwise processes new notification. */ if(n == null){ if(manager.haveDelivered()){ //已全部连接成功,并且前一轮投票都完成,需要再次发起投票 sendNotifications(); } else { //如果未收到选票信息,manager.contentAll()自动连接其它socket节点 manager.connectAll(); } /* * Exponential backoff */ int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } //.... } //... }

【Java】 //manager.connectAll()->connectOne(sid)->initiateConnection(...)->startConnection(...)
private boolean startConnection(Socket sock, Long sid) throws IOException { DataOutputStream dout = null; DataInputStream din = null; try { // Use BufferedOutputStream to reduce the number of IP packets. This is // important for x-DC scenarios. BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream()); dout = new DataOutputStream(buf);
// Sending id and challenge // represents protocol version (in other words - message type) dout.writeLong(PROTOCOL_VERSION); dout.writeLong(self.getId()); String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort(); byte[] addr_bytes = addr.getBytes(); dout.writeInt(addr_bytes.length); dout.write(addr_bytes); dout.flush();
din = new DataInputStream( new BufferedInputStream(sock.getInputStream())); } catch (IOException e) { LOG.warn("Ignoring exception reading or writing challenge: ", e); closeSocket(sock); return false; }
// authenticate learner QuorumPeer.QuorumServer qps = self.getVotingView().get(sid); if (qps != null) { // TODO - investigate why reconfig makes qps null. authLearner.authenticate(sock, qps.hostname); }
// If lost the challenge, then drop the new connection //保证集群中所有节点之间只有一个通道连接 if (sid > self.getId()) { LOG.info("Have smaller server identifier, so dropping the " + "connection: (" + sid + ", " + self.getId() + ")"); closeSocket(sock); // Otherwise proceed with the connection } else { SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null) vsw.finish();
senderWorkerMap.put(sid, sw); queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue( SEND_CAPACITY));
sw.start(); rw.start();
return true;
} return false; }

如上述代码中所示,sid>self.sid才可以创建连接Socket和SendWorker,RecvWorker线程,存储到senderWorkerMap中。对应第2步中的sid

图4 节点之间连接方式


【Java】
public Vote lookForLeader() throws InterruptedException { //... if (n.electionEpoch > logicalclock.get()) { //当前选举周期小于选票周期,重置recvset选票池 //大于当前周期更新当前选票信息,再次发送投票 logicalclock.set(n.electionEpoch); recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) { if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {//相同选举周期 //接收的选票与当前选票PK成功后,替换当前选票 updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); } //...
}

在上代码中,自旋从recvqueue队列中获取到选票信息。开始进行选举:

  • 判断当前选票和接收过来的选票周期是否一致

  • 大于当前周期更新当前选票信息,再次发送投票

  • 周期相等:当前选票信息和接收的选票信息进行PK


【Java】 //接收的选票与当前选票PK protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid)); if(self.getQuorumVerifier().getWeight(newId) == 0){ return false; }
/* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */ return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));wId > curId))))); }

在上述代码中的totalOrderPredicate方法逻辑如下:

  • 竞选周期大于当前周期为true

  • 竞选周期相等,竞选zxid大于当前zxid为true

  • 竞选周期相等,竞选zxid等于当前zxid,竞选sid大于当前sid为true

  • 经过上述条件判断为true将当前选票信息替换为竞选成功的选票,同时再次将新的选票投出去。



【Java】 public Vote lookForLeader() throws InterruptedException { //... //存储节点对应的选票信息 // key:选票来源sid value:选票推举的Leader sid recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//半数选举开始 if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } /*WorkerSender * This predicate is true once we don't read any new * relevant message from the reception queue */ if (n == null) { //已选举出leader 更新当前节点是否为leader self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch); leaveInstance(endVote); return endVote; } } //... } /** * Termination predicate. Given a set of votes, determines if have * sufficient to declare the end of the election round. * * @param votes * Set of votes * @param vote * Identifier of the vote received last PK后的选票 */ private boolean termPredicate(HashMap votes, Vote vote) { SyncedLearnerTracker voteSet = new SyncedLearnerTracker(); voteSet.addQuorumVerifier(self.getQuorumVerifier()); if (self.getLastSeenQuorumVerifier() != null && self.getLastSeenQuorumVerifier().getVersion() > self .getQuorumVerifier().getVersion()) { voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier()); } /* * First make the views consistent. Sometimes peers will have different * zxids for a server depending on timing. */ //votes 来源于recvset 存储各个节点推举出来的选票信息 for (Map.Entry entry : votes.entrySet()) { //选举出的sid和其它节点选择的sid相同存储到voteSet变量中。 if (vote.equals(entry.getValue())) { //保存推举出来的sid voteSet.addAck(entry.getKey()); } } //判断选举出来的选票数量是否过半 return voteSet.hasAllQuorums(); } //QuorumMaj#containsQuorum public boolean containsQuorum(Set ackSet) { return (ackSet.size() > half); }

在上述代码中:recvset是存储每个sid推举的选票信息。

第一轮 sid1:vote(1,0,1) ,sid2:vote(2,0,1);

第二轮 sid1:vote(2,0,1) ,sid2:vote(2,0,1)。

最终经过选举信息vote(2,0,1)为推荐leader,并用推荐leader在recvset选票池里比对持相同票数量为2个。因为总共有3个节点参与选举,sid1和sid2都选举sid2为leader,满足票数过半要求,故确认sid2为leader。

  • setPeerState更新当前节点角色;

  • proposedLeader选举出来的sid和自己sid相等,设置为Leader;

  • 上述条件不相等,设置为Follower或Observing;

  • 更新currentVote当前选票为Leader的选票vote(2,0,1)。

07

总结

理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将目标页面展示到屏幕。

通过对Leader选举源码的解析,可以了解到:

1. 多个应用节点之间网络通信采用BIO方式进行相互投票,同时保证每个节点之间只使用一个通道,减少网络资源的消耗,足以见得在BIO分布式中间件开发中的技术重要性。

2. 基于BIO的基础上,灵活运用多线程和内存消息队列完好实现多层队列架构,每层队列由不同的线程分工协作,提高快速选举性能目的。

3. 为BIO在多线程技术上的实践带来了宝贵的经验。

求分享

求点赞

求在看

特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“网易号”用户上传并发布,本平台仅提供信息存储服务。

Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.

相关推荐
热点推荐
《玫瑰》剧组太过分?刘亦菲抱的娃才出生20天,就用强光怼脸拍!

《玫瑰》剧组太过分?刘亦菲抱的娃才出生20天,就用强光怼脸拍!

每日漓说
2024-06-19 15:45:49
聪明!把洗衣液倒进菲律宾快艇油箱真是制约菲军的最好新办法

聪明!把洗衣液倒进菲律宾快艇油箱真是制约菲军的最好新办法

橘色数码
2024-06-20 18:03:15
儿媳照顾50岁农村公公,酒后公公行夫妻之事,公公:儿媳经验丰富

儿媳照顾50岁农村公公,酒后公公行夫妻之事,公公:儿媳经验丰富

魅老八足球
2024-05-13 13:49:37
山东肥城一体育老师被指猥亵九年级女生 涉事老师已被警方控制

山东肥城一体育老师被指猥亵九年级女生 涉事老师已被警方控制

奔流新闻
2024-06-21 00:08:10
店员泼顾客一脸咖啡粉,大喊“你投诉呀”!知名品牌回应,有员工称8小时内要做500杯咖啡

店员泼顾客一脸咖啡粉,大喊“你投诉呀”!知名品牌回应,有员工称8小时内要做500杯咖啡

21世纪经济报道
2024-06-20 19:32:21
输日本后,女排奥运12人浮现!3年主力被弃,21岁新星或压哨入选

输日本后,女排奥运12人浮现!3年主力被弃,21岁新星或压哨入选

我爱英超
2024-06-20 20:54:44
西班牙1-0意大利,赛后评分:不是莫拉塔第1,而是切尔西悍将第1

西班牙1-0意大利,赛后评分:不是莫拉塔第1,而是切尔西悍将第1

侧身凌空斩
2024-06-21 04:55:28
奖杯变筹码?布朗带着FMVP奖杯现身赌场

奖杯变筹码?布朗带着FMVP奖杯现身赌场

懂球帝
2024-06-21 10:05:49
陈妍希口碑崩塌!外网账号内容全曝光,婚后仍和多位男星暧昧

陈妍希口碑崩塌!外网账号内容全曝光,婚后仍和多位男星暧昧

古希腊掌管月桂的神
2024-06-20 22:02:23
全面撤退,民营企业都不投资了!

全面撤退,民营企业都不投资了!

蓝色海边
2024-06-20 15:22:22
土媒:费内巴切在与穆里尼奥会谈后,决定不与巴舒亚伊续约

土媒:费内巴切在与穆里尼奥会谈后,决定不与巴舒亚伊续约

直播吧
2024-06-21 10:16:17
喝茶对心脏到底是好是坏?医生苦劝:4种茶,一口都不要喝

喝茶对心脏到底是好是坏?医生苦劝:4种茶,一口都不要喝

宋若讲故事
2023-01-18 21:38:26
一直以为监狱里踩缝纫机是开玩笑,没想到里面的工种这么丰富

一直以为监狱里踩缝纫机是开玩笑,没想到里面的工种这么丰富

开玩笑的水母
2024-06-20 19:09:59
国家社科基金项目成果:男人阴茎越短,智商越高

国家社科基金项目成果:男人阴茎越短,智商越高

必记本
2024-06-19 01:09:57
4-1终于拿到总冠军!林书豪,哭了!简直比打CBA都艰难……

4-1终于拿到总冠军!林书豪,哭了!简直比打CBA都艰难……

篮球实战宝典
2024-06-21 00:05:34
国产女主终于放弃傻白甜?刘亦菲变疯批手撕渣男,网友:男权假象

国产女主终于放弃傻白甜?刘亦菲变疯批手撕渣男,网友:男权假象

毒舌电影
2024-06-20 19:00:02
雷迪克聘请谁加入他的教练组?自己菜,可教练组却是8冠组合

雷迪克聘请谁加入他的教练组?自己菜,可教练组却是8冠组合

阿雄侃篮球
2024-06-21 11:38:44
团结的潘帕斯!阿根廷全队没有庆祝,赶去查看拼抢倒地的麦卡

团结的潘帕斯!阿根廷全队没有庆祝,赶去查看拼抢倒地的麦卡

直播吧
2024-06-21 09:33:22
法国名宿提醒姆巴佩:我曾太关注面具而忘了做动作,导致韧带受伤

法国名宿提醒姆巴佩:我曾太关注面具而忘了做动作,导致韧带受伤

直播吧
2024-06-21 10:05:50
国外电视台为了收视率,女主播们露球播报,观众每天都会准时收看

国外电视台为了收视率,女主播们露球播报,观众每天都会准时收看

影孖看世界
2024-05-11 20:23:58
2024-06-21 12:00:49
君伟说
君伟说
分享职场故事
230文章数 48关注度
往期回顾 全部

科技要闻

王仲远:GPT4不是国内大模型的尽头

头条要闻

普京一天见了四位越南领导人 河内市委书记没有出现

头条要闻

普京一天见了四位越南领导人 河内市委书记没有出现

体育要闻

1-0"吊打"意大利 西班牙这就叫冠军相?

娱乐要闻

陈晓惹争议!被曝婚变离家出走冷暴力

财经要闻

普华永道,引火烧身

汽车要闻

领克纯电,来得不晚

态度原创

教育
数码
健康
游戏
公开课

教育要闻

2024年辽宁高考专科招生院校及计划变化,招生院校新增46所!

数码要闻

数百款Intel CPU PC受影响!Phoenix UEFI固件被曝安全漏洞

晚餐不吃or吃七分饱,哪种更减肥?

别担心 《艾尔登法环:黄金树幽影》不会淘汰旧武器

公开课

近视只是视力差?小心并发症

无障碍浏览 进入关怀版