zookeeper配置启动及配置文件解析 (一)

您所在的位置:网站首页 hadoop提供的自定义配置时编辑的配置文件中包含 zookeeper配置启动及配置文件解析 (一)

zookeeper配置启动及配置文件解析 (一)

#zookeeper配置启动及配置文件解析 (一)| 来源: 网络整理| 查看: 265

zookeeper配置启动及配置文件解析 (一)

本文基于zookeeper 3.7.0

单机部署 1、由于zookeeper集群的运行需要Java运行环境,所以需要首先安装 JDK2、官问下载zookeeper安装包,并上传服务器 https://zookeeper.apache.org/releases.html3、解压zookeeper tar -zxvf zookeeper-xxx.tar.gz4、进入zookeeper根目录5、修改配置文件 复制配置文件 cp conf/zoo_sample.cfg conf/zoo.cfg 编辑配置文件 vim conf/zoo.cfg 修改dataDir数据目录,/data/zookeeper是我的目录,你需要改成你的目录,其他默认# 心跳的间隔时间 tickTime=2000 # 允许follower(相对于 leader 而言的“客户端”)连接并同步到leader的初始化连接时间,它以tickTime的倍数来表示。当超过设置倍数的tickTime时间,则连接失败 initLimit=10 # leader与follower之间发送消息,请求和应答时间长度。如果follower在设置的时间内不能与leader进行通信,那么此follower将被丢弃 syncLimit=5 # 数据目录 dataDir=/data/zookeeper # 端口号 clientPort=2181 启动zookeeper服务 ./bin/zkServer.sh start 到这里单机部署已经完成 集群部署 1、找3台或以上机器(最好是奇数),每台机器重复上面步骤,启动成功后都停止。2、确定3台机器IP,并在zoo.cfg文件中追加配置#cluster server.1=172.16.6.121:2888:3888 server.2=172.16.6.122:2888:3888 server.3=172.16.6.123:2888:3888

这里2888、3888必须是不被占用的端口,2888表示follower和leader交换消息所使用的端口,3888表示选举leader所使用的端口,这两个端口可以根据你的实际情况修改

3、分别在3台机器的/data/zookeeper目录下创建myid文件,并分别写入1、2、3 vim /data/zookeeper/myid4、依次启动3台机器 集群部署已经完成 zookeeper启动入口

在zookeeper启动文件bin/zkServer.sh中会看到启动入口类 org.apache.zookeeper.server.quorum.QuorumPeerMain

QuorumPeerMain

在QuorumPeerMain类中可以直接找到main方法

main方法 public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain(); try { main.initializeAndRun(args); } ... { ... } ... ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue()); }

省略的都是些日志,这里不做过多分析,主要分析initializeAndRun方法

initializeAndRun方法

initializeAndRun方法是启动参数组装入口,并且这里确定了是单机启动,还是集群启动

protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { // 解析启动中带的参数 QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { //args[0]是配置文件zoo.cfg路径,从配置文件加载参数 config.parse(args[0]); } // 定时清除任务参数组装,定时任务的功能就是清除旧的快照信息。 DatadirCleanupManager purgeMgr = new DatadirCleanupManager( config.getDataDir(),//zk节点数据目录 config.getDataLogDir(),//配置服务器存储事物日志文件 config.getSnapRetainCount(),//相邻两次数据快照之前事物操作次数 config.getPurgeInterval());//定期清理事物日志和快照文件的间隔时间 // 启动定时清除任务 purgeMgr.start(); if (args.length == 1 && config.isDistributed()) { // 集群模式 runFromConfig(config); } else { // 单节点模式 ZooKeeperServerMain.main(args); } } zookeeper启动参数组装 QuorumPeerConfig属性 // 当设置为false时,服务器在复制模式下启动 private static boolean standaloneEnabled = true; // 在这个静态配置文件中,不允许出现以server , group,weight开头的配置项 private static boolean reconfigEnabled = false; // 对于多网卡的机器,可以为每个IP指定不同的监听端口,默认情况是所有IP都监听clientPort拽定的端口 protected InetSocketAddress clientPortAddress; // SSL安全端口地址 protected InetSocketAddress secureClientPortAddress; // 集群间是否使用SSL通信 protected boolean sslQuorum = false; // 是否使用端口统一 protected boolean shouldUsePortUnification = false; // 侦听观察者连接的端口;即观察者尝试连接的端口。如果设置了该属性,则除了在领导模式下外,服务器还将在跟随者模式下承载观察者连接,并在观察者模式下相应地尝试连接到任何投票对等方。 protected int observerMasterPort; // 启用密钥更新时自动加载 protected boolean sslQuorumReloadCertFiles = false; // 存储snapshot的目录,默认情况下,事务日志也会存储在这里 protected File dataDir; // 事务日志输出目录,尽量给事务日志的输出配置单独的磁盘或挂载点 protected File dataLogDir; // 动态变容配置文件路径 protected String dynamicConfigFileStr = null; // 配置文件路径 protected String configFileStr = null; // 客户端会话检查间隔、服务端之间心跳间隔,ZK基本上所有的时间都是这个时间的倍数 protected int tickTime = ZooKeeperServer.DEFAULT_TICK_TIME; // 一个客户端能够连接到同一个服务器上的最大连接数,根据IP来区分,默认值为0,如果设置为0,表示没有任何限制,设置该值是为了防止Dos攻击 protected int maxClientCnxns = 60; // 客户端的超时时间最小值,默认是2个tickTime protected int minSessionTimeout = -1; // 客户端的超时时间最大值,默认值是20个tickTime protected int maxSessionTimeout = -1; // 监控处理类名 protected String metricsProviderClassName = DefaultMetricsProvider.class.getName(); // 监控配置 protected Properties metricsProviderConfiguration = new Properties(); // 启用本地会话 protected boolean localSessionsEnabled = false; // 本地会话可以升级成全局会话 protected boolean localSessionsUpgradingEnabled = false; // TCP 服务端用于临时存放已完成三次握手的请求的队列的最大长度 protected int clientPortListenBacklog = -1; // tickTime * initLimit 决定了ACK的超时时间 protected int initLimit; // tickTime * syncLimit 决定了服务端心跳超时时间 protected int syncLimit; // 决定了 Follower 连接 Leader 的超时时间 protected int connectToLearnerMasterLimit; // 选举算法(1,2 已被废弃) protected int electionAlg = 3; // 选举leader所使用的端口 protected int electionPort = 2182; // 服务端是否接受来自任意 IP 地址的请求 protected boolean quorumListenOnAllIPs = false; // 服务ID protected long serverId = UNSET_SERVERID; // 验证器 protected QuorumVerifier quorumVerifier = null, lastSeenQuorumVerifier = null; // 保留多少个最新的snapshot文件 protected int snapRetainCount = 3; // 间隔多久进行一次 snapshot 的清理 protected int purgeInterval = 0; // Observer是否需要本地归档 protected boolean syncEnabled = true; // 初始化配置 protected String initialConfig; // 学习类型(参与者或者观察者) protected LearnerType peerType = LearnerType.PARTICIPANT; // 各个角色认证相关配置 protected boolean quorumServerRequireSasl = false; protected boolean quorumLearnerRequireSasl = false; protected boolean quorumEnableSasl = false; protected String quorumServicePrincipal = QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL_DEFAULT_VALUE; protected String quorumLearnerLoginContext = QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT_DFAULT_VALUE; protected String quorumServerLoginContext = QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT_DFAULT_VALUE; protected int quorumCnxnThreadsSize; // 多地址相关配置 private boolean multiAddressEnabled = Boolean.parseBoolean( System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_ENABLED, QuorumPeer.CONFIG_DEFAULT_MULTI_ADDRESS_ENABLED)); private boolean multiAddressReachabilityCheckEnabled = Boolean.parseBoolean(System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_ENABLED, "true")); private int multiAddressReachabilityCheckTimeoutMs = Integer.parseInt(System.getProperty(QuorumPeer.CONFIG_KEY_MULTI_ADDRESS_REACHABILITY_CHECK_TIMEOUT_MS, String.valueOf(MultipleAddresses.DEFAULT_TIMEOUT.toMillis()))); protected String oraclePath; // 最小快照保留计数 private final int MIN_SNAP_RETAIN_COUNT = 3; // JVM暂停监视器功能开关 protected boolean jvmPauseMonitorToRun = false; // JVM Pause Monitor warn threshold in ms protected long jvmPauseWarnThresholdMs = JvmPauseMonitor.WARN_THRESHOLD_DEFAULT; // JVM Pause Monitor info threshold in ms protected long jvmPauseInfoThresholdMs = JvmPauseMonitor.INFO_THRESHOLD_DEFAULT; // JVM暂停监视器睡眠时间(毫秒) protected long jvmPauseSleepTimeMs = JvmPauseMonitor.SLEEP_TIME_MS_DEFAULT; parse方法

parse方法是加载及解析配置文件的入口方法

public void parse(String path) throws ConfigException { try { // 加载zoo.cfg配置文件 File configFile = (new VerifyingFileFactory.Builder(LOG) .warnForRelativePath() .failForNonExistingPath() .build()).create(path); // 把zoo.cfg构建成Properties对象 Properties cfg = new Properties(); try (FileInputStream in = new FileInputStream(configFile)) { cfg.load(in); // 设置配置文件路径 configFileStr = path; } // 读取整个配置文件作为初始配置(直接把配置文件内容变成字符串) initialConfig = new String(Files.readAllBytes(configFile.toPath())); // 解析配置属性 parseProperties(cfg); } catch (IOException e) { throw new ConfigException("Error processing " + path, e); } catch (IllegalArgumentException e) { throw new ConfigException("Error processing " + path, e); } // 这里忽略动态配置相关内容 if (dynamicConfigFileStr != null) { ... } }

配置文件夹解析入口方法还是比较好理解的

parseProperties方法

正真的解析属性并赋值方法

public void parseProperties(Properties zkProp) throws IOException, ConfigException { ... for (Entry entry : zkProp.entrySet()) { ... if (...) { ... = ...; } else if (...) { ...; } else if (key.equals("electionAlg")) { // 选举算法(1,2 已被废弃),不是3就直接异常了 electionAlg = Integer.parseInt(value); if (electionAlg != 3) { throw new ConfigException("Invalid electionAlg value. Only 3 is supported."); } } else if (...) { ...; } else { System.setProperty("zookeeper." + key, value); } } if (...) { ....; } // tickTime这个如果为空或者等于0就会抛异常 if (tickTime == 0) { throw new IllegalArgumentException("tickTime is not set"); } // tickTime几乎是zookeeper超时及心跳基本单位,原因就在这儿 minSessionTimeout = minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout; maxSessionTimeout = maxSessionTimeout == -1 ? tickTime * 20 : maxSessionTimeout; ... if (dynamicConfigFileStr == null) { // 这也是一个重点方法 setupQuorumPeerConfig(zkProp, true); ... } }

这里忽略了几百行代码,主要都是些读取配置文件赋值变量的代码,这些并不重要,留下来的都是有意思的代码。

setupQuorumPeerConfig方法

无论是动态配置还是静态配置集群启动时都会执行setupQuorumPeerConfig方法

void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode) throws IOException, ConfigException { // 加载集群服务器地址配置 quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode, oraclePath); // 加载myId setupMyId(); // 加载端口号 setupClientPort(); // 设置初始状态 setupPeerType(); // 验证集群配置(时间、serverId) checkValidity(); }

我们在/data/zookeeper目录下配置了myid文件,它是怎么被加载的?

private void setupMyId() throws IOException { // 加载myid文件,从这可以看出它是大小写敏感的 File myIdFile = new File(dataDir, "myid"); if (!myIdFile.isFile()) { return; } // 读取myid文件中内容 BufferedReader br = new BufferedReader(new FileReader(myIdFile)); String myIdString; try { myIdString = br.readLine(); } finally { br.close(); } try { // myid文件中的数字就是serverId serverId = Long.parseLong(myIdString); // 存储在当前线程ThreadLocal中 MDC.put("myid", myIdString); } catch (NumberFormatException e) { throw new IllegalArgumentException("serverid " + myIdString + " is not a number"); } }

集群配置时,我们zoo.cnf文件后面追加了集群服务配置,setupClientPort的作用主要就是验证自身服务器ip及端口

private void setupClientPort() throws ConfigException { if (serverId == UNSET_SERVERID) { return; } // 获取此服务器ip端口 QuorumServer qs = quorumVerifier.getAllMembers().get(serverId); if (clientPortAddress != null && qs != null && qs.clientAddr != null) { if ((!clientPortAddress.getAddress().isAnyLocalAddress() && !clientPortAddress.equals(qs.clientAddr)) || ( clientPortAddress.getAddress().isAnyLocalAddress() && clientPortAddress.getPort() != qs.clientAddr.getPort())) { throw new ConfigException("client address for this server (id = " + serverId + ") in static config file is " + clientPortAddress + " is different from client address found in dynamic file: " + qs.clientAddr); } } if (qs != null && qs.clientAddr != null) { clientPortAddress = qs.clientAddr; } if (qs != null && qs.clientAddr == null) { qs.clientAddr = clientPortAddress; qs.isClientAddrFromStatic = true; } }

setupPeerType负责验证当前角色类型

private void setupPeerType() { // Warn about inconsistent peer type LearnerType roleByServersList = quorumVerifier.getObservingMembers().containsKey(serverId) ? LearnerType.OBSERVER : LearnerType.PARTICIPANT; if (roleByServersList != peerType) { LOG.warn( "Peer type from servers list ({}) doesn't match peerType ({}). Defaulting to servers list.", roleByServersList, peerType); peerType = roleByServersList; } }

checkValidity负责验证serverId、initLimit、syncLimit合法性

public void checkValidity() throws IOException, ConfigException { if (isDistributed()) { if (initLimit == 0) { throw new IllegalArgumentException("initLimit is not set"); } if (syncLimit == 0) { throw new IllegalArgumentException("syncLimit is not set"); } if (serverId == UNSET_SERVERID) { throw new IllegalArgumentException("myid file is missing"); } } }

看到这里唯一有疑惑的就是,集群服务器地址什么时候加载的,其实在setupQuorumPeerConfig方法中调用parseDynamicConfig方法

void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode) throws IOException, ConfigException { // 加载集群服务器地址配置(这里参数根据上面逻辑已经做了替换) quorumVerifier = parseDynamicConfig(prop, 3, true, true, null); ... }

看看parseDynamicConfig是怎么解析集群服务器地址的

public static QuorumVerifier parseDynamicConfig(Properties dynamicConfigProp, int eAlg, boolean warnings, boolean configBackwardCompatibilityMode, String oraclePath) throws IOException, ConfigException { // dynamicConfigProp为解析后的配置文件,eAlg为3,warnings为true,configBackwardCompatibilityMode为true,oraclePath为null ... // 主要解析方法 QuorumVerifier qv = createQuorumVerifier(dynamicConfigProp, isHierarchical, oraclePath); ... return qv; }

这里调用createQuorumVerifier,其中isHierarchical为false,oraclePath为null

private static QuorumVerifier createQuorumVerifier(Properties dynamicConfigProp, boolean isHierarchical, String oraclePath) throws ConfigException { if (oraclePath == null) { // 调用createQuorumVerifier return createQuorumVerifier(dynamicConfigProp, isHierarchical); } else { return new QuorumOracleMaj(dynamicConfigProp, oraclePath); } }

继续往下看,调用createQuorumVerifier

private static QuorumVerifier createQuorumVerifier(Properties dynamicConfigProp, boolean isHierarchical) throws ConfigException { if (isHierarchical) { return new QuorumHierarchical(dynamicConfigProp); } else { // 创建解析 return new QuorumMaj(dynamicConfigProp); } }

主要解析在QuorumMaj中

public QuorumMaj(Properties props) throws ConfigException { // 遍历属性 for (Entry entry : props.entrySet()) { String key = entry.getKey().toString(); String value = entry.getValue().toString(); // 找到server.开头的配置(例如:server.1=172.16.6.121:2888:3888) if (key.startsWith("server.")) { int dot = key.indexOf('.'); // 获取服务ID即sid long sid = Long.parseLong(key.substring(dot + 1)); // 构建对象new QuorumServer(1, 172.16.6.121:2888:3888) QuorumServer qs = new QuorumServer(sid, value); // allMembers赋值,键为sid,值为QuorumServer对象 allMembers.put(Long.valueOf(sid), qs); // 根据当前角色加入对应集合 if (qs.type == LearnerType.PARTICIPANT) { votingMembers.put(Long.valueOf(sid), qs); } else { observingMembers.put(Long.valueOf(sid), qs); } } else if (key.equals("version")) { // 这里截取版本 version = Long.parseLong(value, 16); } } half = votingMembers.size() / 2; }

zookeeper 解析配置相对来说还是比较容易的。

启动及准备阶段赋值

加载完配置文件,然后调用runFromConfig给具体业务赋值(就是为选举做准备)

public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } LOG.info("Starting quorum peer, myid=" + config.getServerId()); final MetricsProvider metricsProvider; try { metricsProvider = MetricsProviderBootstrap.startMetricsProvider( config.getMetricsProviderClassName(), config.getMetricsProviderConfiguration()); } catch (MetricsProviderLifeCycleException error) { throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error); } try { ServerMetrics.metricsProviderInitialized(metricsProvider); ProviderRegistry.initialize(); ServerCnxnFactory cnxnFactory = null; ServerCnxnFactory secureCnxnFactory = null; if (config.getClientPortAddress() != null) { cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false); } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true); } // 构造QuorumPeer对象 quorumPeer = getQuorumPeer(); // 初始化日志及数据文件目录结构 quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir())); quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled()); //quorumPeer.setQuorumPeers(config.getAllMembers()); // 设置选举算法 quorumPeer.setElectionType(config.getElectionAlg()); // 需要在data目录下创建一个myid文件,标识myid quorumPeer.setMyid(config.getServerId()); // 设计基本时间 quorumPeer.setTickTime(config.getTickTime()); // 客户端的超时时间最小值 quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); // 客户端的超时时间最大值 quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); // tickTime * initLimit 决定了ACK的超时时间 quorumPeer.setInitLimit(config.getInitLimit()); // tickTime * syncLimit 决定了服务端心跳超时时间 quorumPeer.setSyncLimit(config.getSyncLimit()); // 决定了 Follower 连接 Leader 的超时时间 quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit()); // 侦听观察者连接的端口;即观察者尝试连接的端口 quorumPeer.setObserverMasterPort(config.getObserverMasterPort()); // 设置配置文件 quorumPeer.setConfigFileName(config.getConfigFilename()); // TCP 服务端用于临时存放已完成三次握手的请求的队列的最大长度 quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog()); // 设置zookeeper的DB quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); // 集群验证器,主要完成判断一组server在已给定的配置的server列表中,是否能够构成集群 quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); if (config.getLastSeenQuorumVerifier() != null) { quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false); } // 初始化DB quorumPeer.initConfigInZKDatabase(); quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setSecureCnxnFactory(secureCnxnFactory); // 集群间是否使用SSL通信 quorumPeer.setSslQuorum(config.isSslQuorum()); // 是否使用端口统一 quorumPeer.setUsePortUnification(config.shouldUsePortUnification()); // 学习类型(参与者或者观察者) quorumPeer.setLearnerType(config.getPeerType()); // Observer是否需要本地归档 quorumPeer.setSyncEnabled(config.getSyncEnabled()); // 服务端是否接受来自任意IP地址的请求 quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); if (config.sslQuorumReloadCertFiles) { quorumPeer.getX509Util().enableCertFileReloading(); } // 设置多地址相关配置 quorumPeer.setMultiAddressEnabled(config.isMultiAddressEnabled()); quorumPeer.setMultiAddressReachabilityCheckEnabled(config.isMultiAddressReachabilityCheckEnabled()); quorumPeer.setMultiAddressReachabilityCheckTimeoutMs(config.getMultiAddressReachabilityCheckTimeoutMs()); // sets quorum sasl authentication configurations quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl); if (quorumPeer.isQuorumSaslAuthEnabled()) { quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl); quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl); quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal); quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext); quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext); } quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); quorumPeer.initialize(); if (config.jvmPauseMonitorToRun) { quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config)); } // 调用start方法(这里就是实际选举或者同步) quorumPeer.start(); ZKAuditProvider.addZKStartStopAuditLog(); quorumPeer.join(); } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } finally { try { metricsProvider.stop(); } catch (Throwable error) { LOG.warn("Error while stopping metrics", error); } } }


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3