这篇文章主要介绍“zk中learner的作用是什么”,在日常操作中,相信很多人在zk中learner的作用是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”zk中learner的作用是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

创新互联是一家专业提供忠县企业网站建设,专注与网站设计、成都网站建设、成都h5网站建设、小程序制作等业务。10年已为忠县众多企业、政府机构等服务。创新互联专业的建站公司优惠进行中。
learner时observer,follower的父类,定义了公共属性和方法
子类 Follower 和Observer
内部类:
PacketInFlight表示在提议中还没有commit的消息
static class PacketInFlight { TxnHeader hdr; Record rec; }
属性:
| QuorumPeer | 服务器节点 | 
| LearnerZooKeeperServer | learner的服务节点 | 
| BufferedOutputStream | 输出流 | 
| Socket | 端口套接字 | 
| InetSocketAddress | 地址信息 | 
| InputArchive | 输入存档 | 
| OutputArchive | 输出存档 | 
| leaderProtocolVersion | leader协议版本 | 
| BUFFERED_MESSAGE_SIZE | 缓存信息大小 | 
| MessageTracker | 顺序接收和发送信息 | 
方法
| validateSession(ServerCnxn cnxn, long clientId, int timeout) | 验证session有效性 | 
| writePacket(QuorumPacket pp, boolean flush) | 发送包给leader | 
| readPacket(QuorumPacket pp) | 从leader读取message | 
| request(Request request) | 发送request给leader | 
| findLeader | 查找认为是leader的地址信息 | 
| createSocket() | 创建socket对象 | 
| registerWithLeader(int pktType) | 执行handshake protocal建立follower/observer连接 | 
到服务器验证session有效性
void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOException {
    LOG.info("Revalidating client: 0x" + Long.toHexString(clientId));
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    DataOutputStream dos = new DataOutputStream(baos);
    dos.writeLong(clientId);
    dos.writeInt(timeout);
    dos.close();
    QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos.toByteArray(), null);
    pendingRevalidations.put(clientId, cnxn);
    if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(
            LOG,
            ZooTrace.SESSION_TRACE_MASK,
            "To validate session 0x" + Long.toHexString(clientId));
    }
    writePacket(qp, true);
}
void writePacket(QuorumPacket pp, boolean flush) throws IOException {
    synchronized (leaderOs) {
        if (pp != null) {
            messageTracker.trackSent(pp.getType());
            leaderOs.writeRecord(pp, "packet");
        }
        if (flush) {
            bufferedOutput.flush();
        }
    }
}
void request(Request request) throws IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    DataOutputStream oa = new DataOutputStream(baos);
    oa.writeLong(request.sessionId);
    oa.writeInt(request.cxid);
    oa.writeInt(request.type);
    if (request.request != null) {
        request.request.rewind();
        int len = request.request.remaining();
        byte[] b = new byte[len];
        request.request.get(b);
        request.request.rewind();
        oa.write(b);
    }
    oa.close();
    QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
    writePacket(qp, true);
}
查找当前的leader信息
protected QuorumServer findLeader() {
    QuorumServer leaderServer = null;
    // Find the leader by id
    Vote current = self.getCurrentVote();
    for (QuorumServer s : self.getView().values()) {
        if (s.id == current.getId()) {
            // Ensure we have the leader's correct IP address before
            // attempting to connect.
            s.recreateSocketAddresses();
            leaderServer = s;
            break;
        }
    }
    if (leaderServer == null) {
        LOG.warn("Couldn't find the leader with id = " + current.getId());
    }
    return leaderServer;
}
连接套接字
sockConnect(Socket sock, InetSocketAddress addr, int timeout) 
建立和leader的连接
/**
 * Establish a connection with the LearnerMaster found by findLearnerMaster.
 * Followers only connect to Leaders, Observers can connect to any active LearnerMaster.
 * Retries until either initLimit time has elapsed or 5 tries have happened.
 * @param addr - the address of the Peer to connect to.
 * @throws IOException - if the socket connection fails on the 5th attempt
 * if there is an authentication failure while connecting to leader
 * @throws X509Exception
 * @throws InterruptedException
 */
protected void connectToLeader(InetSocketAddress addr, String hostname) throws IOException, InterruptedException, X509Exception {
    this.sock = createSocket();
    this.leaderAddr = addr;
    // leader connection timeout defaults to tickTime * initLimit
    int connectTimeout = self.tickTime * self.initLimit;
    // but if connectToLearnerMasterLimit is specified, use that value to calculate
    // timeout instead of using the initLimit value
    if (self.connectToLearnerMasterLimit > 0) {
        connectTimeout = self.tickTime * self.connectToLearnerMasterLimit;
    }
    int remainingTimeout;
    long startNanoTime = nanoTime();
    for (int tries = 0; tries < 5; tries++) {
        try {
            // recalculate the init limit time because retries sleep for 1000 milliseconds
            remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1000000);
            if (remainingTimeout <= 0) {
                LOG.error("connectToLeader exceeded on retries.");
                throw new IOException("connectToLeader exceeded on retries.");
            }
            sockConnect(sock, addr, Math.min(connectTimeout, remainingTimeout));
            if (self.isSslQuorum()) {
                //开始握手
                ((SSLSocket) sock).startHandshake();
            }
            sock.setTcpNoDelay(nodelay);
            break;
        } catch (IOException e) {
            //出现异常
            remainingTimeout = connectTimeout - (int) ((nanoTime() - startNanoTime) / 1000000);
            //剩余超时时间
            if (remainingTimeout <= 1000) {
                //打印错误日志
                LOG.error("Unexpected exception, connectToLeader exceeded. tries=" + tries
                          + ", remaining init limit=" + remainingTimeout
                          + ", connecting to " + addr, e);
                throw e;
                //尝试次数大于4
            } else if (tries >= 4) {
                //打印错误日志
                LOG.error("Unexpected exception, retries exceeded. tries=" + tries
                          + ", remaining init limit=" + remainingTimeout
                          + ", connecting to " + addr, e);
                throw e;
            } else {
                //发出警告
                LOG.warn("Unexpected exception, tries=" + tries
                         + ", remaining init limit=" + remainingTimeout
                         + ", connecting to " + addr, e);
                //重新尝试建立socket连接
                this.sock = createSocket();
            }
        }
        //读取配置延时时间,默认100ns
        Thread.sleep(leaderConnectDelayDuringRetryMs);
    }
    self.authLearner.authenticate(sock, hostname);
    leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
    bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
    leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
}到此,关于“zk中learner的作用是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!
本文名称:zk中learner的作用是什么
当前URL:http://www.scyingshan.cn/article/gcjcoe.html

 建站
建站
 咨询
咨询 售后
售后
 建站咨询
建站咨询 
 