hdfs选择datanode策略

前几天报错如下:

Error: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/xxx/part-r-00002 could only be replicated to 0 nodes instead of minReplication (=1).  There are 11 datanode(s) running and no node(s) are excluded in this operation.
	at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget(BlockManager.java:1327)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2278)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:480)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:297)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:44080)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:453)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1002)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1695)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1691)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1689)

阅读一下代码了解hdfs选择块的策略,可能理解有误,如果有错误请指出,代码来自cdh-4.2.1版本 在 FSNamesystem.getAdditionalBlock()里,有个blockManager.chooseTarget(src, replication, clientNode, excludedNodes, blockSize);
这个方法用来决定在那个节点写数据 代码文件:hadoop-2.0.0-cdh4.2.1\src\hadoop-hdfs-project\hadoop-hdfs\src\main\java\org\apache\hadoop\hdfs\server\blockmanagement\BlockPlacementPolicyDefault.java chooseTarget方法调用了几次转换,看最终调用的方法

/* choose <i>numOfReplicas</i> from all data nodes */
  private DatanodeDescriptor chooseTarget(int numOfReplicas,
                                          DatanodeDescriptor writer,
                                          HashMap<Node, Node> excludedNodes,
                                          long blocksize,
                                          int maxNodesPerRack,
                                          List<DatanodeDescriptor> results) {
      
    if (numOfReplicas == 0 ¦¦ clusterMap.getNumOfLeaves()==0) {
      return writer;
    }
    int totalReplicasExpected = numOfReplicas; // 总共需要的副本数
      
// results是当前已分配的节点
    int numOfResults = results.size();
    boolean newBlock = (numOfResults==0);
    if (writer == null && !newBlock) {
      writer = results.get(0);
    }
      
    try {
// 如果还没分配过,先选择本地节点
      if (numOfResults == 0) {
        writer = chooseLocalNode(writer, excludedNodes, 
                                 blocksize, maxNodesPerRack, results);
        if (--numOfReplicas == 0) {
          return writer;
        }
      }
// 如果之前已分配一个或零个,在其他机架的一台机器选择
      if (numOfResults <= 1) {
        chooseRemoteRack(1, results.get(0), excludedNodes, 
                         blocksize, maxNodesPerRack, results);
        if (--numOfReplicas == 0) {
          return writer;
        }
      }
// 如果前面还没分配完,已有小于或等于两个副本
      if (numOfResults <= 2) {
        // 如果前两个在同一个机架上,选择一个其他机架的
        if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
          chooseRemoteRack(1, results.get(0), excludedNodes,
                           blocksize, maxNodesPerRack, results);
        } else if (newBlock){ 
          chooseLocalRack(results.get(1), excludedNodes, blocksize, 
                          maxNodesPerRack, results);
        } else {
          chooseLocalRack(writer, excludedNodes, blocksize,
                          maxNodesPerRack, results);
        }
        if (--numOfReplicas == 0) {
          return writer;
        }
      }
// 如果还需要副本,就随机选择
      chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, 
                   blocksize, maxNodesPerRack, results);
    } catch (NotEnoughReplicasException e) {
      LOG.warn("Not able to place enough replicas, still in need of "
               + numOfReplicas + " to reach " + totalReplicasExpected + "\n"
               + e.getMessage());
    }
    return writer;
  }

看一下 chooseLocalNode

/* choose <i>localMachine</i> as the target.
   * if <i>localMachine</i> is not available, 
   * choose a node on the same rack
   * @return the chosen node
   */
  private DatanodeDescriptor chooseLocalNode(
                                             DatanodeDescriptor localMachine,
                                             HashMap<Node, Node> excludedNodes,
                                             long blocksize,
                                             int maxNodesPerRack,
                                             List<DatanodeDescriptor> results)
    throws NotEnoughReplicasException {
    // if no local machine, randomly choose one node 如果没有本地机器,随机选择一个
    if (localMachine == null)
      return chooseRandom(NodeBase.ROOT, excludedNodes, 
                          blocksize, maxNodesPerRack, results);
    if (preferLocalNode) { // 这个值写死为true
      // otherwise try local machine first
      Node oldNode = excludedNodes.put(localMachine, localMachine); // 把当前节点先加入排除列表
      if (oldNode == null) { // was not in the excluded list 如果返回null表示之前没有在排除列表里
        if (isGoodTarget(localMachine, blocksize,
                         maxNodesPerRack, false, results)) {//判断是否好的目标,后面再看这个方法
          results.add(localMachine);
          return localMachine;
        }
      } 
    }      
    // try a node on local rack
    return chooseLocalRack(localMachine, excludedNodes, 
                           blocksize, maxNodesPerRack, results);// 如果本机不适合,选择同机架的其他机器
  }

继续看chooseLocalRack

/* choose one node from the rack that <i>localMachine</i> is on.
   * if no such node is available, choose one node from the rack where
   * a second replica is on.
   * if still no such node is available, choose a random node 
   * in the cluster.
   * @return the chosen node
   */
  private DatanodeDescriptor chooseLocalRack(
                                             DatanodeDescriptor localMachine,
                                             HashMap<Node, Node> excludedNodes,
                                             long blocksize,
                                             int maxNodesPerRack,
                                             List<DatanodeDescriptor> results)
    throws NotEnoughReplicasException {
    // no local machine, so choose a random machine
    if (localMachine == null) {
      return chooseRandom(NodeBase.ROOT, excludedNodes, 
                          blocksize, maxNodesPerRack, results);
    }
      
    // choose one from the local rack 选择同一个机架的
    try {
      return chooseRandom(
                          localMachine.getNetworkLocation(),
                          excludedNodes, blocksize, maxNodesPerRack, results);
    } catch (NotEnoughReplicasException e1) {
      // find the second replica 从已有副本里面找同一个机架的
      DatanodeDescriptor newLocal=null;
      for(Iterator<DatanodeDescriptor> iter=results.iterator();
          iter.hasNext();) {
        DatanodeDescriptor nextNode = iter.next();
        if (nextNode != localMachine) {
          newLocal = nextNode;
          break;
        }
      }
      if (newLocal != null) {
        try {
          return chooseRandom(
                              newLocal.getNetworkLocation(),
                              excludedNodes, blocksize, maxNodesPerRack, results);
        } catch(NotEnoughReplicasException e2) {
          //otherwise randomly choose one from the network 如果还是找不到,随机选择
          return chooseRandom(NodeBase.ROOT, excludedNodes,
                              blocksize, maxNodesPerRack, results);
        }
      } else {
        //otherwise randomly choose one from the network
        return chooseRandom(NodeBase.ROOT, excludedNodes,
                            blocksize, maxNodesPerRack, results);
      }
    }
  }

继续chooseRemoteRack

/* choose <i>numOfReplicas</i> nodes from the racks 
   * that <i>localMachine</i> is NOT on.
   * if not enough nodes are available, choose the remaining ones 
   * from the local rack
   */
    
  private void chooseRemoteRack(int numOfReplicas,
                                DatanodeDescriptor localMachine,
                                HashMap<Node, Node> excludedNodes,
                                long blocksize,
                                int maxReplicasPerRack,
                                List<DatanodeDescriptor> results)
    throws NotEnoughReplicasException {
    int oldNumOfReplicas = results.size();
    // randomly choose one node from remote racks 选择其他机架的节点,注意这里的"~"是表示排除本机架,如果找不到就只有在本机架找
    try {
      chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
                   excludedNodes, blocksize, maxReplicasPerRack, results);
    } catch (NotEnoughReplicasException e) {
      chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
                   localMachine.getNetworkLocation(), excludedNodes, blocksize, 
                   maxReplicasPerRack, results);
    }
  }

最后一个 chooseRandom

/* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
   */
  private void chooseRandom(int numOfReplicas,
                            String nodes,
                            HashMap<Node, Node> excludedNodes,
                            long blocksize,
                            int maxNodesPerRack,
                            List<DatanodeDescriptor> results)
    throws NotEnoughReplicasException {
      
    int numOfAvailableNodes =
      clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet()); // 当前可用节点数
    StringBuilder builder = null;
    if (LOG.isDebugEnabled()) {
      builder = threadLocalBuilder.get();
      builder.setLength(0);
      builder.append("[");
    }
    boolean badTarget = false;
    while(numOfReplicas > 0 && numOfAvailableNodes > 0) { // 循环直到所有可用副本数为0
      DatanodeDescriptor chosenNode = 
        (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
      Node oldNode = excludedNodes.put(chosenNode, chosenNode);
      if (oldNode == null) {
        numOfAvailableNodes--;

        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
          numOfReplicas--;
          results.add(chosenNode);
        } else {
          badTarget = true;
        }
      }
    }
      
    if (numOfReplicas>0) {
      String detail = enableDebugLogging;
      if (LOG.isDebugEnabled()) {
        if (badTarget && builder != null) {
          detail = builder.append("]").toString();
          builder.setLength(0);
        } else detail = "";
      }
      throw new NotEnoughReplicasException(detail);
    }
  }

最后来看如何判断节点是否可用,isGoodTarget

/* judge if a node is a good target.
   * return true if <i>node</i> has enough space, 
   * does not have too much load, and the rack does not have too many nodes
   */
  private boolean isGoodTarget(DatanodeDescriptor node,
                               long blockSize, int maxTargetPerLoc,
                               List<DatanodeDescriptor> results) {
    return isGoodTarget(node, blockSize, maxTargetPerLoc,
                        this.considerLoad, results);
  }
    
  private boolean isGoodTarget(DatanodeDescriptor node,
                               long blockSize, int maxTargetPerLoc,
                               boolean considerLoad,
                               List<DatanodeDescriptor> results) {
    // check if the node is (being) decommissed 判断是否退役了
    if (node.isDecommissionInProgress() ¦¦ node.isDecommissioned()) {
      if(LOG.isDebugEnabled()) {
        threadLocalBuilder.get().append(node.toString()).append(": ")
          .append("Node ").append(NodeBase.getPath(node))
          .append(" is not chosen because the node is (being) decommissioned ");
      }
      return false;
    }

    long remaining = node.getRemaining() - 
                     (node.getBlocksScheduled() * blockSize); 
    // check the remaining capacity of the target machine 判断剩余的磁盘空间, 块大小 * 5 > 剩余大小 表示不可用
    if (blockSize* HdfsConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
      if(LOG.isDebugEnabled()) {
        threadLocalBuilder.get().append(node.toString()).append(": ")
          .append("Node ").append(NodeBase.getPath(node))
          .append(" is not chosen because the node does not have enough space ");
      }
      return false;
    }
      
    // check the communication traffic of the target machine 判断各个datanode的负载情况,这里的负载不是系统负载,而是根据xceiver线程数,这个线程好像是负责读写的。
    if (considerLoad) { // 默认这个为true,配置项 dfs.namenode.replication.considerLoad
      double avgLoad = 0;
      int size = clusterMap.getNumOfLeaves(); // 所有datanode数
      if (size != 0 && stats != null) {
        avgLoad = (double)stats.getTotalLoad()/size; // 获取总的xceiver线程数除以所有datanode数,算出平均负载
      }
      if (node.getXceiverCount() > (2.0 * avgLoad)) { // 如果当前线程数 大于 平均值的两倍,则不可使用
        if(LOG.isDebugEnabled()) {
          threadLocalBuilder.get().append(node.toString()).append(": ")
            .append("Node ").append(NodeBase.getPath(node))
            .append(" is not chosen because the node is too busy ");
        }
        return false;
      }
    }
      
    // check if the target rack has chosen too many nodes 检查是否有太多节点在当前机架上
    String rackname = node.getNetworkLocation();
    int counter=1;
    for(Iterator<DatanodeDescriptor> iter = results.iterator();
        iter.hasNext();) {
      Node result = iter.next();
      if (rackname.equals(result.getNetworkLocation())) {
        counter++;
      }
    }

//maxNodesPerRack的算法,没看懂,可能是每个机架的平均数+2,不知道为什么定一个这样的值 int maxNodesPerRack = (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;

    if (counter>maxTargetPerLoc) {
      if(LOG.isDebugEnabled()) {
        threadLocalBuilder.get().append(node.toString()).append(": ")
          .append("Node ").append(NodeBase.getPath(node))
          .append(" is not chosen because the rack has too many chosen nodes ");
      }
      return false;
    }
    return true;
  }

我们当前集群状态,总共11台datanode,没有退役机器。其中7台机器磁盘已满或者基本满,剩下4台机器可写。没有划分机架,所以整个集群认为只有一个机架。所以应该不会触发“同一个机架不能有太多个节点的问题”(具体原因看上面代码,这里表述可能不是很准确) 怀疑检查负载的逻辑,7台满的机器datanode负载可能很低,但因为磁盘满不可以使用,导致剩下的4台机器线程数超过平均值的两倍。
当前只能加入日志,等下次出错后检查。

updatedupdated2023-12-062023-12-06