背景
最近打算要重启DataNode,之前有试过重启过程中导致业务任务失败的情况。所以想了解DataNode什么时候才算启动完成,以及能否检测DataNode是否已经准备好了。
本文分析的源码版本是hadoop cdh5.4.0
源码分析
从Datanode.java main方法开始
public static void main(String args[]) {
    if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
      System.exit(0);
    }
    secureMain(args, null);
  }进入secureMain方法,这里调用了createDataNode方法
public static DataNode createDataNode(String args[], Configuration conf,
      SecureResources resources) throws IOException {
    // 实例化datanode
    DataNode dn = instantiateDataNode(args, conf, resources);
    if (dn != null) {
      // 启动BPOfferService、dataXceiverServer、ipcServer等
      dn.runDatanodeDaemon();
    }
    return dn;
  }public static DataNode instantiateDataNode(String args [], Configuration conf,
      SecureResources resources) throws IOException {
    if (conf == null)
      conf = new HdfsConfiguration();
    
    if (args != null) {
      // parse generic hadoop options
      GenericOptionsParser hParser = new GenericOptionsParser(conf, args);
      args = hParser.getRemainingArgs();
    }
    
    if (!parseArguments(args, conf)) {
      printUsage(System.err);
      return null;
    }
    // 获取实际的存储路径(根据配置dfs.datanode.data.dir)
    Collection<StorageLocation> dataLocations = getStorageLocations(conf);
    UserGroupInformation.setConfiguration(conf);
    SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
        DFS_DATANODE_KERBEROS_PRINCIPAL_KEY);
    return makeInstance(dataLocations, conf, resources);
  }static DataNode makeInstance(Collection<StorageLocation> dataDirs,
      Configuration conf, SecureResources resources) throws IOException {
    LocalFileSystem localFS = FileSystem.getLocal(conf);
    FsPermission permission = new FsPermission(
        conf.get(DFS_DATANODE_DATA_DIR_PERMISSION_KEY,
                 DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
    // DataNode磁盘检查类,检查目录权限以及是否能够创建目录
    DataNodeDiskChecker dataNodeDiskChecker =
        new DataNodeDiskChecker(permission);
    // 找出能够正常读写的路径
    List<StorageLocation> locations =
        checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
    DefaultMetricsSystem.initialize("DataNode");
    assert locations.size() > 0 : "number of data directories should be > 0";
    return new DataNode(conf, locations, resources);
  }进入DataNode构造方法,主要看startDataNode(conf, dataDirs, resources)方法
void startDataNode(Configuration conf, 
                     List<StorageLocation> dataDirs,
                     SecureResources resources
                     ) throws IOException {
    // settings global for all BPs in the Data Node
    this.secureResources = resources;
    synchronized (this) {
      this.dataDirs = dataDirs;
    }
    this.conf = conf;
    this.dnConf = new DNConf(conf);
    checkSecureConfig(dnConf, conf, resources);
    this.spanReceiverHost = SpanReceiverHost.getInstance(conf);
    if (dnConf.maxLockedMemory > 0) {
      if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) {
        throw new RuntimeException(String.format(
            "Cannot start datanode because the configured max locked memory" +
            " size (%s) is greater than zero and native code is not available.",
            DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
      }
      if (Path.WINDOWS) {
        NativeIO.Windows.extendWorkingSetSize(dnConf.maxLockedMemory);
      } else {
        long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit();
        if (dnConf.maxLockedMemory > ulimit) {
          throw new RuntimeException(String.format(
            "Cannot start datanode because the configured max locked memory" +
            " size (%s) of %d bytes is more than the datanode's available" +
            " RLIMIT_MEMLOCK ulimit of %d bytes.",
            DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
            dnConf.maxLockedMemory,
            ulimit));
        }
      }
    }
    LOG.info("Starting DataNode with maxLockedMemory = " +
        dnConf.maxLockedMemory);
    storage = new DataStorage();
    
    // global DN settings
    registerMXBean();
    // 初始化xceiverServer一些对象
    initDataXceiver(conf);
    // 启动http web hdfs
    startInfoServer(conf);
    pauseMonitor = new JvmPauseMonitor(conf);
    pauseMonitor.start();
  
    // BlockPoolTokenSecretManager is required to create ipc server.
    this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();
    // Login is done by now. Set the DN user name.
    dnUserName = UserGroupInformation.getCurrentUser().getShortUserName();
    LOG.info("dnUserName = " + dnUserName);
    LOG.info("supergroup = " + supergroup);
    // 初始化ipc服务
    initIpcServer(conf);
    metrics = DataNodeMetrics.create(conf, getDisplayName());
    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
    
    // 重要,构造BlockPoolManager
    blockPoolManager = new BlockPoolManager(this);
    blockPoolManager.refreshNamenodes(conf);
    // Create the ReadaheadPool from the DataNode context so we can
    // exit without having to explicitly shutdown its thread pool.
    readaheadPool = ReadaheadPool.getInstance();
    saslClient = new SaslDataTransferClient(dnConf.conf, 
        dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
    saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
  }在blockPoolManager.refreshNamenodes里面调用了doRefreshNamenodes()方法
private void doRefreshNamenodes(
      Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {
    assert Thread.holdsLock(refreshNamenodesLock);
    Set<String> toRefresh = Sets.newLinkedHashSet();
    Set<String> toAdd = Sets.newLinkedHashSet();
    Set<String> toRemove;
    
    synchronized (this) {
      // Step 1. For each of the new nameservices, figure out whether
      // it's an update of the set of NNs for an existing NS,
      // or an entirely new nameservice.
      for (String nameserviceId : addrMap.keySet()) {
        if (bpByNameserviceId.containsKey(nameserviceId)) {
          toRefresh.add(nameserviceId);
        } else {
          toAdd.add(nameserviceId);
        }
      }
      
      // Step 2. Any nameservices we currently have but are no longer present
      // need to be removed.
      toRemove = Sets.newHashSet(Sets.difference(
          bpByNameserviceId.keySet(), addrMap.keySet()));
      
      assert toRefresh.size() + toAdd.size() ==
        addrMap.size() :
          "toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) +
          "  toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) +
          "  toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh);
      
       // 由于是重启,所以都是在toAdd里面
      // Step 3. Start new nameservices
      if (!toAdd.isEmpty()) {
        LOG.info("Starting BPOfferServices for nameservices: " +
            Joiner.on(",").useForNull("<default>").join(toAdd));
        // 这里遍历的是nameServices,如果用了hdfs federation就会有多个
        for (String nsToAdd : toAdd) {
          ArrayList<InetSocketAddress> addrs =
            Lists.newArrayList(addrMap.get(nsToAdd).values());
          // 创建BPOfferService
          BPOfferService bpos = createBPOS(addrs);
          // 加入bpByNameserviceId里面,下次再执行这个方法的话,就是加入toRefresh里面了
          bpByNameserviceId.put(nsToAdd, bpos);
          offerServices.add(bpos);
        }
      }
      // 启动所有offerServices的BPOfferService
      startAll();
    }关注一下createBPOS方法,里面会包含多个BPServiceActor,存储在 BPOfferService.bpServices 变量内
protected BPOfferService createBPOS(List<InetSocketAddress> nnAddrs) {
    return new BPOfferService(nnAddrs, dn);
  }
  BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
    Preconditions.checkArgument(!nnAddrs.isEmpty(),
        "Must pass at least one NN.");
    this.dn = dn;
    // 如果有standby和active两个namenode,就会创建两个BPServiceActor
    for (InetSocketAddress addr : nnAddrs) {
      this.bpServices.add(new BPServiceActor(addr, this));
    }
  }上面的startAll()把所有的BPOfferService都启动了,实际是调用下面的BPServiceActor.start(),BPServiceActor的作用在这文件的最上面注释写着:1.和namenode预注册,2.和namenode注册,3.周期性发送心跳到namenode,4.处理来自于namenode的命令。继续看这个类的run()方法
public void run() {
    LOG.info(this + " starting to offer service");
    try {
      while (true) {
        // init stuff
        try {
          // 向namenode注册以及初始化blockPool
          // setup storage
          connectToNNAndHandshake();
          break;
        } catch (IOException ioe) {
          // Initial handshake, storage recovery or registration failed
          runningState = RunningState.INIT_FAILED;
          if (shouldRetryInit()) {
            // Retry until all namenode's of BPOS failed initialization
            LOG.error("Initialization failed for " + this + " "
                + ioe.getLocalizedMessage());
            sleepAndLogInterrupts(5000, "initializing");
          } else {
            runningState = RunningState.FAILED;
            LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);
            return;
          }
        }
      }
      runningState = RunningState.RUNNING;
      while (shouldRun()) {
        try {
          // 更新当前那个是active actor,发送blockReport给namenode等
          offerService();
        } catch (Exception ex) {
          LOG.error("Exception in BPOfferService for " + this, ex);
          sleepAndLogInterrupts(5000, "offering service");
        }
      }
      runningState = RunningState.EXITED;
    } catch (Throwable ex) {
      LOG.warn("Unexpected exception in block pool " + this, ex);
      runningState = RunningState.FAILED;
    } finally {
      LOG.warn("Ending block pool service for: " + this);
      cleanUp();
    }
  }关注connectToNNAndHandshake()方法,这里初始化了blockPool
private void connectToNNAndHandshake() throws IOException {
    // get NN proxy
    bpNamenode = dn.connectToNN(nnAddr);
     // 从namenode获取一些版本等信息用于校验
    // First phase of the handshake with NN - get the namespace
    // info.
    NamespaceInfo nsInfo = retrieveNamespaceInfo();
    
    // Verify that this matches the other NN in this HA pair.
    // This also initializes our block pool in the DN if we are
    // the first NN connection for this BP.
    bpos.verifyAndSetNamespaceInfo(nsInfo);
    
     // 向namenode注册,并且设定了一下blockReport的时间(当前时间 - (blockReportInterval - delay)),默认是马上
    // Second phase of the handshake with the NN.
    register();
  }在verifyAndSetNamespaceInfo()方法里,主要看dn.initBlockPool(this);
void initBlockPool(BPOfferService bpos) throws IOException {
    NamespaceInfo nsInfo = bpos.getNamespaceInfo();
    if (nsInfo == null) {
      throw new IOException("NamespaceInfo not found: Block pool " + bpos
          + " should have retrieved namespace info before initBlockPool.");
    }
    
    setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID());
    // 把BlockPoolId和BPOfferService关联起来,存放在bpByBlockPoolId
    // Register the new block pool with the BP manager.
    blockPoolManager.addBlockPool(bpos);
    
    // 初始化data变量,创建出FsDatasetImpl
    // In the case that this is the first block pool to connect, initialize
    // the dataset, block scanners, etc.
    initStorage(nsInfo);
    // 去掉坏了的磁盘
    // Exclude failed disks before initializing the block pools to avoid startup
    // failures.
    checkDiskError();
    // 开启DirectoryScanner,定期运行,用于处理内存中的对象和实际存储文件的差异
    initDirectoryScanner(conf);
    // 添加blockPool
    data.addBlockPool(nsInfo.getBlockPoolID(), conf);
    blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
  }这里面关注的是data.addBlockPool方法
public void addBlockPool(String bpid, Configuration conf)
      throws IOException {
    LOG.info("Adding block pool " + bpid);
    synchronized(this) {
      // 创建BlockPoolSlice对象,加入bpSlices变量内
      volumes.addBlockPool(bpid, conf);
      // 初始化ReplicaMap对象(ReplicaMap:Maintains the replica map)
      volumeMap.initBlockPool(bpid);
    }
    // 获取所有磁盘的副本map
    volumes.getAllVolumesMap(bpid, volumeMap, ramDiskReplicaTracker);
  }volumes.addBlockPool方法里,是创建BlockPoolSlice。BlockPoolSlice的介绍是说这是BlockPool存储在一个磁盘的一部分,里面主要是记录一些目录,还有使用大小等。注意这个有个计算磁盘使用大小非常耗时,这里使用了缓存,每600秒更新一次,在datanode退出的时候会写把数值写到文件里。
void addBlockPool(final String bpid, final Configuration conf) throws IOException {
    long totalStartTime = Time.monotonicNow();
    
    final List<IOException> exceptions = Collections.synchronizedList(
        new ArrayList<IOException>());
    List<Thread> blockPoolAddingThreads = new ArrayList<Thread>();
    for (final FsVolumeImpl v : volumes.get()) {
      Thread t = new Thread() {
        public void run() {
          try (FsVolumeReference ref = v.obtainReference()) {
            FsDatasetImpl.LOG.info("Scanning block pool " + bpid +
                " on volume " + v + "...");
            long startTime = Time.monotonicNow();
            // 创建BlockPoolSlice
            v.addBlockPool(bpid, conf);
            long timeTaken = Time.monotonicNow() - startTime;
            FsDatasetImpl.LOG.info("Time taken to scan block pool " + bpid +
                " on " + v + ": " + timeTaken + "ms");
          } catch (ClosedChannelException e) {
            // ignore.
          } catch (IOException ioe) {
            FsDatasetImpl.LOG.info("Caught exception while scanning " + v +
                ". Will throw later.", ioe);
            exceptions.add(ioe);
          }
        }
      };volumes.getAllVolumesMap()方法,实际是调用每一个BlockPoolSlice.getVolumeMap(),我们当前版本在这里没有cache,因为要遍历目录,所以这里的操作比较耗时。添加cache具体见HDFS-7928补丁。
void getVolumeMap(ReplicaMap volumeMap,
                    final RamDiskReplicaTracker lazyWriteReplicaMap)
      throws IOException {
    // Recover lazy persist replicas, they will be added to the volumeMap
    // when we scan the finalized directory.
    if (lazypersistDir.exists()) {
      int numRecovered = moveLazyPersistReplicasToFinalized(lazypersistDir);
      FsDatasetImpl.LOG.info(
          "Recovered " + numRecovered + " replicas from " + lazypersistDir);
    }
    // 遍历底下所有目录,构造副本map
    // add finalized replicas
    addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
    // add rbw replicas
    addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
  }回到最上面,执行完bpos.verifyAndSetNamespaceInfo(nsInfo);后,执行register()方法,向namenode注册。执行完后,再往上就是调用offerService(),发送心跳,blockMap等。
结论
namenode启动过程中会有多个线程,http和rpc的端口会先启动,但是这个时候还是不能提供服务的。当前版本主要耗时在构建volumeMap上,需要遍历磁盘目录。
初始化完volumeMap会向namenode注册,注册成功后会在日志打印successfully registered with NN(有两条,active和standby namenode)
之后datanode向namenode汇报块信息,发送完成后在日志打印Successfully sent block report(同样有两条)
只有在blockReport完成后才算真的启动完成。
在重启的过程中,如果时间不算太长,在namenode中还没有判定这个datanode为dead node,里面的block map信息还保留着,所以在完成磁盘扫描 volumeMap 之后也算完成了。
其他
代码比较多,限于个人能力,可能理解也有误,如果发现错误以后再更新。同时也没找出可编程的方法来判断datanode是否启动完成,除非修改datanode代码自己加标志位。
update(2017-07-18):判断datanode是否启动完成核心在判断两次blockreport是否完成(分别向active和standby namenode发送),所以可以通过访问datanode jmx地址http://xxx:50075/jmx,检查参数”BlockReportsNumOps” : 大于等于2来判断。
 
参考
第七章:小朱笔记hadoop之源码分析-hdfs分析 第五节:Datanode 分析 版本差异比较大
记一次DataNode慢启动问题