从日志分析impala查询慢的原因(2)

上一篇文章看到 requestTblLoadAndWait 速度缓慢,继续分析。先上日志

# impalad日志
I0103 15:09:35.503356 27319 impala-beeswax-server.cc:170] query(): query=select distinct(disp_id) from default_impala.kpi_disp_user_info_stat_tb where pt='2014-04-08'
I0103 15:11:36.109915 27319 Frontend.java:779] Missing tables were not received in 120000ms. Load request will be retried.
I0103 15:11:36.110705 27319 Frontend.java:709] Requesting prioritized load of table(s): default_impala.kpi_disp_user_info_stat_tb
I0103 15:11:50.778054 27319 Frontend.java:833] create plan
I0103 15:11:50.806649 27319 HdfsScanNode.java:571] collecting partitions for table kpi_disp_user_info_stat_tb
I0103 15:11:51.070816 27319 impala-server.cc:590] Execution request: TExecRequest {



# catalog日志
I0103 15:09:35.892182 29165 rpc-trace.cc:133] RPC call: CatalogService.PrioritizeLoad(from ::ffff:127.0.0.1:44323)
I0103 15:09:39.563268 29185 HdfsTable.java:916] load table: default_impala.kpi_disp_user_info_stat_tb
I0103 15:10:48.357046 29185 HdfsTable.java:234] load block md for kpi_disp_user_info_stat_tb
I0103 15:11:36.110947 29165 rpc-trace.cc:133] RPC call: CatalogService.PrioritizeLoad(from ::ffff:127.0.0.1:44323)
I0103 15:11:48.948714 29185 HdfsTable.java:1056] table #rows=-1


# hive metastore日志
2015-01-03 15:09:38,991 INFO org.apache.hadoop.hive.metastore.HiveMetaStore: 54032: source:/IP地址 get_table : db=default_impala tbl=kpi_disp_user_info_stat_tb
2015-01-03 15:09:39,848 INFO org.apache.hadoop.hive.metastore.HiveMetaStore: 54032: source:/IP地址 get_partition_names : db=default_impala tbl=kpi_disp_user_info_stat_tb
2015-01-03 15:09:39,902 INFO org.apache.hadoop.hive.metastore.HiveMetaStore: 54032: source:/IP地址 get_partitions_by_names : db=default_impala tbl=kpi_disp_user_info_stat_tb
2015-01-03 15:09:52,839 INFO org.apache.hadoop.hive.metastore.HiveMetaStore: 54032: source:/IP地址 get_partitions_by_names : db=default_impala tbl=kpi_disp_user_info_stat_tb
2015-01-03 15:10:05,383 INFO org.apache.hadoop.hive.metastore.HiveMetaStore: 54032: source:/IP地址 get_partitions_by_names : db=default_impala tbl=kpi_disp_user_info_stat_tb

这次加多了metastore的日志进来 在15:09:35提交了查询语句,可以看到impalad和catalog都有一条日志,metastore在3秒后有一条日志(3秒的原因是hive metastore一个bug,我在测试时没修复,但不影响分析)。 catalogd开始获取这个表的元素据以及文件block信息,15:09:39至15:10:48从metastore获取完元素据,15:10:48开始获取文件block信息。直到15:11:48完成,期间因为impalad获取元素据超过120秒,15:11:36重新获取了一次。

// Frontend.java
  private boolean requestTblLoadAndWait(Set<TableName> requestedTbls, long timeoutMs)
      throws InternalException {
// impala本地缓存了元素据,每个表有一个版本号。本地的缓存一开始只缓存表名,分区信息等都没有缓存
    Set<TableName> missingTbls = getMissingTbls(requestedTbls);
    // There are no missing tables, return and avoid making an RPC to the CatalogServer.
    if (missingTbls.isEmpty()) return true;

    // Call into the CatalogServer and request the required tables be loaded.
    LOG.info(String.format("Requesting prioritized load of table(s): %s",
        Joiner.on(", ").join(missingTbls)));
// 这里通过jni的方式调用c++写的代码,与catalogd通信获取元素据
    TStatus status = FeSupport.PrioritizeLoad(missingTbls);
    if (status.getStatus_code() != TStatusCode.OK) {
      throw new InternalException("Error requesting prioritized load: " +
          Joiner.on("\n").join(status.getError_msgs()));
    }

    long startTimeMs = System.currentTimeMillis();
    // Wait until all the required tables are loaded in the Impalad's catalog cache.
    // 检查是不是在本地缓存里了
    while (!missingTbls.isEmpty()) {
      // Check if the timeout has been reached.
      if (timeoutMs > 0 && System.currentTimeMillis() - startTimeMs > timeoutMs) {
        return false;
      }

      LOG.trace(String.format("Waiting for table(s) to complete loading: %s",
          Joiner.on(", ").join(missingTbls)));
      getCatalog().waitForCatalogUpdate(MAX_CATALOG_UPDATE_WAIT_TIME_MS);
      missingTbls = getMissingTbls(missingTbls);
      // TODO: Check for query cancellation here.
    }
    return true;
  }

继续看FeSupport.PrioritizeLoad的代码,这里实际是调用c++的代码,在fe-support.cc里,直接看

extern "C"
JNIEXPORT jbyteArray JNICALL
Java_com_cloudera_impala_service_FeSupport_NativePrioritizeLoad(
    JNIEnv* env, jclass caller_class, jbyteArray thrift_struct) {
  TPrioritizeLoadRequest request;
  DeserializeThriftMsg(env, thrift_struct, &request);

  CatalogOpExecutor catalog_op_executor(ExecEnv::GetInstance(), NULL);
  TPrioritizeLoadResponse result;
  Status status = catalog_op_executor.PrioritizeLoad(request, &result);
  if (!status.ok()) {
    LOG(ERROR) << status.GetErrorMsg();
    // Create a new Status, copy in this error, then update the result.
    Status catalog_service_status(result.status);
    catalog_service_status.AddError(status);
    status.ToThrift(&result.status);
  }

  jbyteArray result_bytes = NULL;
  THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env,
                     JniUtil::internal_exc_class(), result_bytes);
  return result_bytes;
}

c++代码没看懂,不知道怎么调用到了catalogd的代码,不管了,从日志看执行了这行代码HdfsTable.java:916

public void load(Table cachedEntry, HiveMetaStoreClient client,
      org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
    numHdfsFiles_ = 0;
    totalHdfsBytes_ = 0;
    LOG.debug("load table: " + db_.getName() + "." + name_);
...
loadColumns(fieldSchemas, client);
//这里有一些判断是否使用缓存的代码,如果是完全没有缓存,会通过以下方法获取
        msPartitions.addAll(MetaStoreUtil.fetchAllPartitions(
            client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES));
//如果部分分区已经获取过了,会只获取未知分区的元素据
        LOG.info(String.format("Incrementally refreshing %d/%d partitions.",
            modifiedPartitionNames.size(), totalPartitions));
        // No need to make the metastore call if no partitions are to be updated.
        if (modifiedPartitionNames.size() > 0) {
          // Now reload the the remaining partitions.
          msPartitions.addAll(MetaStoreUtil.fetchPartitionsByName(client,
              Lists.newArrayList(modifiedPartitionNames), db_.getName(), name_));
        }
...
// 加载元素据和获取block信息
      loadPartitions(msPartitions, msTbl, oldFileDescMap);
      // load table stats
      numRows_ = getRowCount(msTbl.getParameters());
      LOG.debug("table #rows=" + Long.toString(numRows_));
}

在MetaStoreUtil.fetchAllPartitions方法里会先get_partition_names获取全部的分区名,然后再分批get_partitions_by_names来获取分区的全部信息。每次RPC获取的分区数量由参数HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_TABLE_PARTITION_MAX指定,默认1000个分区一次。 在loadPartitions方法中,会创建一些分区对象和搜集分区的行数(这个应该是要用comput stat才有值,默认搜集不到),然后再调用loadBlockMd(fileDescsToLoad);方法

/**
   * Loads the file block metadata for the given collection of FileDescriptors.
   * The FileDescriptors are passed as a Map of partition location to list of
   * files that exist under that directory.
   */
  private void loadBlockMd(Map<String, List<FileDescriptor>> fileDescriptors)
      throws RuntimeException {
    Preconditions.checkNotNull(fileDescriptors);
    LOG.debug("load block md for " + name_);

    // Store all BlockLocations so they can be reused when loading the disk IDs.
    List<BlockLocation> blockLocations = Lists.newArrayList();

    // loop over all files and record their block metadata, minus volume ids
    for (String parentPath: fileDescriptors.keySet()) {
      for (FileDescriptor fileDescriptor: fileDescriptors.get(parentPath)) {
        Path p = new Path(parentPath, fileDescriptor.getFileName());
        BlockLocation[] locations = null;
        try {
          // 这里可能耗时,每个分区都获取文件信息(已缓存的不获取,但没看懂缓存的机制)
          FileStatus fileStatus = DFS.getFileStatus(p);
          // fileDescriptors should not contain directories.
          Preconditions.checkArgument(!fileStatus.isDirectory());
          // RPC操作
          locations = DFS.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
          Preconditions.checkNotNull(locations);
          blockLocations.addAll(Arrays.asList(locations));

          // Loop over all blocks in the file.
          for (BlockLocation block: locations) {
            String[] blockHostPorts = block.getNames();
            try {
              blockHostPorts = block.getNames();
            } catch (IOException e) {
              // this shouldn't happen, getNames() doesn't throw anything
              String errorMsg = "BlockLocation.getNames() failed:\n" + e.getMessage();
              LOG.error(errorMsg);
              throw new IllegalStateException(errorMsg);
            }
            // Now enumerate all replicas of the block, adding any unknown hosts
            // to hostIndex_ and the index for that host to replicaHostIdxs.
            List<Integer> replicaHostIdxs = new ArrayList<Integer>(blockHostPorts.length);
            for (int i = 0; i < blockHostPorts.length; ++i) {
              String[] ip_port = blockHostPorts[i].split(":");
              Preconditions.checkState(ip_port.length == 2);
              TNetworkAddress network_address = new TNetworkAddress(ip_port[0],
                  Integer.parseInt(ip_port[1]));
              replicaHostIdxs.add(hostIndex_.getIndex(network_address));
            }
            fileDescriptor.addFileBlock(
                new FileBlock(block.getOffset(), block.getLength(), replicaHostIdxs));
          }
        } catch (IOException e) {
          throw new RuntimeException("couldn't determine block locations for path '"
              + p + "':\n" + e.getMessage(), e);
        }
      }
    }

    if (SUPPORTS_VOLUME_ID) {
      LOG.trace("loading disk ids for: " + getFullName() +
          ". nodes: " + getNumNodes());
      loadDiskIds(blockLocations, fileDescriptors);
      LOG.trace("completed load of disk ids for: " + getFullName());
    }
  }

  另外找到一个有一点关系的issue, IMPALA-1480 Slow DDL statements for tables with large number of partitions google group上的讨论 https://groups.google.com/a/cloudera.org/forum/#!topic/impala-user/Xv8d2jndzZ0

Dimitris:
It looks like an issue I am currently working on (https://issues.cloudera.org/browse/IMPALA-1480). I have a patch in flight that significantly improves DDL and INSERT statements for partitioned tables. Currently, the issue is that we force-reload the entire table metadata even though only few partitions have been modified.
不过他说的是DDL语句,会强制加载整个元数据,尽管只改几个分区,和这里的查询类似。
## 结论

对于很多分区的表,获取全部元素据以及block信息花费很多时间导致查询变慢。如果impala能够针对查询只获取需要用到的分区信息将会加快很多。  

updatedupdated2024-12-222024-12-22