上一篇文章看到 requestTblLoadAndWait 速度缓慢,继续分析。先上日志
# impalad日志
I0103 15:09:35.503356 27319 impala-beeswax-server.cc:170] query(): query=select distinct(disp_id) from default_impala.kpi_disp_user_info_stat_tb where pt='2014-04-08'
I0103 15:11:36.109915 27319 Frontend.java:779] Missing tables were not received in 120000ms. Load request will be retried.
I0103 15:11:36.110705 27319 Frontend.java:709] Requesting prioritized load of table(s): default_impala.kpi_disp_user_info_stat_tb
I0103 15:11:50.778054 27319 Frontend.java:833] create plan
I0103 15:11:50.806649 27319 HdfsScanNode.java:571] collecting partitions for table kpi_disp_user_info_stat_tb
I0103 15:11:51.070816 27319 impala-server.cc:590] Execution request: TExecRequest {
# catalog日志
I0103 15:09:35.892182 29165 rpc-trace.cc:133] RPC call: CatalogService.PrioritizeLoad(from ::ffff:127.0.0.1:44323)
I0103 15:09:39.563268 29185 HdfsTable.java:916] load table: default_impala.kpi_disp_user_info_stat_tb
I0103 15:10:48.357046 29185 HdfsTable.java:234] load block md for kpi_disp_user_info_stat_tb
I0103 15:11:36.110947 29165 rpc-trace.cc:133] RPC call: CatalogService.PrioritizeLoad(from ::ffff:127.0.0.1:44323)
I0103 15:11:48.948714 29185 HdfsTable.java:1056] table #rows=-1
# hive metastore日志
2015-01-03 15:09:38,991 INFO org.apache.hadoop.hive.metastore.HiveMetaStore: 54032: source:/IP地址 get_table : db=default_impala tbl=kpi_disp_user_info_stat_tb
2015-01-03 15:09:39,848 INFO org.apache.hadoop.hive.metastore.HiveMetaStore: 54032: source:/IP地址 get_partition_names : db=default_impala tbl=kpi_disp_user_info_stat_tb
2015-01-03 15:09:39,902 INFO org.apache.hadoop.hive.metastore.HiveMetaStore: 54032: source:/IP地址 get_partitions_by_names : db=default_impala tbl=kpi_disp_user_info_stat_tb
2015-01-03 15:09:52,839 INFO org.apache.hadoop.hive.metastore.HiveMetaStore: 54032: source:/IP地址 get_partitions_by_names : db=default_impala tbl=kpi_disp_user_info_stat_tb
2015-01-03 15:10:05,383 INFO org.apache.hadoop.hive.metastore.HiveMetaStore: 54032: source:/IP地址 get_partitions_by_names : db=default_impala tbl=kpi_disp_user_info_stat_tb
这次加多了metastore的日志进来 在15:09:35提交了查询语句,可以看到impalad和catalog都有一条日志,metastore在3秒后有一条日志(3秒的原因是hive metastore一个bug,我在测试时没修复,但不影响分析)。 catalogd开始获取这个表的元素据以及文件block信息,15:09:39至15:10:48从metastore获取完元素据,15:10:48开始获取文件block信息。直到15:11:48完成,期间因为impalad获取元素据超过120秒,15:11:36重新获取了一次。
// Frontend.java
private boolean requestTblLoadAndWait(Set<TableName> requestedTbls, long timeoutMs)
throws InternalException {
// impala本地缓存了元素据,每个表有一个版本号。本地的缓存一开始只缓存表名,分区信息等都没有缓存
Set<TableName> missingTbls = getMissingTbls(requestedTbls);
// There are no missing tables, return and avoid making an RPC to the CatalogServer.
if (missingTbls.isEmpty()) return true;
// Call into the CatalogServer and request the required tables be loaded.
LOG.info(String.format("Requesting prioritized load of table(s): %s",
Joiner.on(", ").join(missingTbls)));
// 这里通过jni的方式调用c++写的代码,与catalogd通信获取元素据
TStatus status = FeSupport.PrioritizeLoad(missingTbls);
if (status.getStatus_code() != TStatusCode.OK) {
throw new InternalException("Error requesting prioritized load: " +
Joiner.on("\n").join(status.getError_msgs()));
}
long startTimeMs = System.currentTimeMillis();
// Wait until all the required tables are loaded in the Impalad's catalog cache.
// 检查是不是在本地缓存里了
while (!missingTbls.isEmpty()) {
// Check if the timeout has been reached.
if (timeoutMs > 0 && System.currentTimeMillis() - startTimeMs > timeoutMs) {
return false;
}
LOG.trace(String.format("Waiting for table(s) to complete loading: %s",
Joiner.on(", ").join(missingTbls)));
getCatalog().waitForCatalogUpdate(MAX_CATALOG_UPDATE_WAIT_TIME_MS);
missingTbls = getMissingTbls(missingTbls);
// TODO: Check for query cancellation here.
}
return true;
}
继续看FeSupport.PrioritizeLoad的代码,这里实际是调用c++的代码,在fe-support.cc里,直接看
extern "C"
JNIEXPORT jbyteArray JNICALL
Java_com_cloudera_impala_service_FeSupport_NativePrioritizeLoad(
JNIEnv* env, jclass caller_class, jbyteArray thrift_struct) {
TPrioritizeLoadRequest request;
DeserializeThriftMsg(env, thrift_struct, &request);
CatalogOpExecutor catalog_op_executor(ExecEnv::GetInstance(), NULL);
TPrioritizeLoadResponse result;
Status status = catalog_op_executor.PrioritizeLoad(request, &result);
if (!status.ok()) {
LOG(ERROR) << status.GetErrorMsg();
// Create a new Status, copy in this error, then update the result.
Status catalog_service_status(result.status);
catalog_service_status.AddError(status);
status.ToThrift(&result.status);
}
jbyteArray result_bytes = NULL;
THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env,
JniUtil::internal_exc_class(), result_bytes);
return result_bytes;
}
c++代码没看懂,不知道怎么调用到了catalogd的代码,不管了,从日志看执行了这行代码HdfsTable.java:916
public void load(Table cachedEntry, HiveMetaStoreClient client,
org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
numHdfsFiles_ = 0;
totalHdfsBytes_ = 0;
LOG.debug("load table: " + db_.getName() + "." + name_);
...
loadColumns(fieldSchemas, client);
//这里有一些判断是否使用缓存的代码,如果是完全没有缓存,会通过以下方法获取
msPartitions.addAll(MetaStoreUtil.fetchAllPartitions(
client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES));
//如果部分分区已经获取过了,会只获取未知分区的元素据
LOG.info(String.format("Incrementally refreshing %d/%d partitions.",
modifiedPartitionNames.size(), totalPartitions));
// No need to make the metastore call if no partitions are to be updated.
if (modifiedPartitionNames.size() > 0) {
// Now reload the the remaining partitions.
msPartitions.addAll(MetaStoreUtil.fetchPartitionsByName(client,
Lists.newArrayList(modifiedPartitionNames), db_.getName(), name_));
}
...
// 加载元素据和获取block信息
loadPartitions(msPartitions, msTbl, oldFileDescMap);
// load table stats
numRows_ = getRowCount(msTbl.getParameters());
LOG.debug("table #rows=" + Long.toString(numRows_));
}
在MetaStoreUtil.fetchAllPartitions方法里会先get_partition_names获取全部的分区名,然后再分批get_partitions_by_names来获取分区的全部信息。每次RPC获取的分区数量由参数HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_TABLE_PARTITION_MAX指定,默认1000个分区一次。 在loadPartitions方法中,会创建一些分区对象和搜集分区的行数(这个应该是要用comput stat才有值,默认搜集不到),然后再调用loadBlockMd(fileDescsToLoad);方法
/**
* Loads the file block metadata for the given collection of FileDescriptors.
* The FileDescriptors are passed as a Map of partition location to list of
* files that exist under that directory.
*/
private void loadBlockMd(Map<String, List<FileDescriptor>> fileDescriptors)
throws RuntimeException {
Preconditions.checkNotNull(fileDescriptors);
LOG.debug("load block md for " + name_);
// Store all BlockLocations so they can be reused when loading the disk IDs.
List<BlockLocation> blockLocations = Lists.newArrayList();
// loop over all files and record their block metadata, minus volume ids
for (String parentPath: fileDescriptors.keySet()) {
for (FileDescriptor fileDescriptor: fileDescriptors.get(parentPath)) {
Path p = new Path(parentPath, fileDescriptor.getFileName());
BlockLocation[] locations = null;
try {
// 这里可能耗时,每个分区都获取文件信息(已缓存的不获取,但没看懂缓存的机制)
FileStatus fileStatus = DFS.getFileStatus(p);
// fileDescriptors should not contain directories.
Preconditions.checkArgument(!fileStatus.isDirectory());
// RPC操作
locations = DFS.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
Preconditions.checkNotNull(locations);
blockLocations.addAll(Arrays.asList(locations));
// Loop over all blocks in the file.
for (BlockLocation block: locations) {
String[] blockHostPorts = block.getNames();
try {
blockHostPorts = block.getNames();
} catch (IOException e) {
// this shouldn't happen, getNames() doesn't throw anything
String errorMsg = "BlockLocation.getNames() failed:\n" + e.getMessage();
LOG.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
// Now enumerate all replicas of the block, adding any unknown hosts
// to hostIndex_ and the index for that host to replicaHostIdxs.
List<Integer> replicaHostIdxs = new ArrayList<Integer>(blockHostPorts.length);
for (int i = 0; i < blockHostPorts.length; ++i) {
String[] ip_port = blockHostPorts[i].split(":");
Preconditions.checkState(ip_port.length == 2);
TNetworkAddress network_address = new TNetworkAddress(ip_port[0],
Integer.parseInt(ip_port[1]));
replicaHostIdxs.add(hostIndex_.getIndex(network_address));
}
fileDescriptor.addFileBlock(
new FileBlock(block.getOffset(), block.getLength(), replicaHostIdxs));
}
} catch (IOException e) {
throw new RuntimeException("couldn't determine block locations for path '"
+ p + "':\n" + e.getMessage(), e);
}
}
}
if (SUPPORTS_VOLUME_ID) {
LOG.trace("loading disk ids for: " + getFullName() +
". nodes: " + getNumNodes());
loadDiskIds(blockLocations, fileDescriptors);
LOG.trace("completed load of disk ids for: " + getFullName());
}
}
另外找到一个有一点关系的issue, IMPALA-1480 Slow DDL statements for tables with large number of partitions google group上的讨论 https://groups.google.com/a/cloudera.org/forum/#!topic/impala-user/Xv8d2jndzZ0
对于很多分区的表,获取全部元素据以及block信息花费很多时间导致查询变慢。如果impala能够针对查询只获取需要用到的分区信息将会加快很多。