一直好奇parquet和hive是怎样做列裁剪(跳过某些列)的,今天跟踪了一下代码。 parquet和hive交接的代码已经在合并在hive里面了,直接看hive的代码
input format在:org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
数据准备
CREATE TABLE parquet (x INT, y STRING) STORED AS PARQUET;
CREATE TABLE parquet2 (x INT, y STRING) STORED AS PARQUET;
CREATE TABLE test (x INT, y STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '`';
-- 手工构造数据进test表
INSERT OVERWRITE TABLE parquet SELECT x,y FROM test;
INSERT OVERWRITE TABLE parquet2 SELECT x,y FROM test;
测试语句
SET mapred.job.tracker=local;
SET hive.exec.mode.local.auto=true;
SET fs.defaultFS=file:///;
-- 避免转成mapjoin
SET hive.auto.convert.join=false;
select a.x, b.y from parquet a left join parquet2 b on (a.x = b.x);
代码分析
maptask读取数据的时候,需要先获取一个reader
MapTask(job.getInputFormat().getRecordReader)-> CombineHiveInputFormat.getRecordReader -> HiveInputFormat.pushFilters
public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) {
// construct column name list and types for reference by filter push down
Utilities.setColumnNameList(jobConf, tableScan); // 在conf里设置columns
Utilities.setColumnTypeList(jobConf, tableScan); // 在conf里设置columns.types
}
从this.mrwork.getPathToAliases里面拿到table alias,然后再从this.mrwork.getAliasToWork拿到TableScanOperator,
ts.getNeededColumnIDs(), ts.getNeededColumns() 可以拿个这个表扫描需要的字段。
protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass,
String splitPath, String splitPathWithNoSchema, boolean nonNative) {
if (this.mrwork == null) {
init(job);
}
if(this.mrwork.getPathToAliases() == null) {
return;
}
ArrayList<String> aliases = new ArrayList<String>();
Iterator<Entry<String, ArrayList<String>>> iterator = this.mrwork
.getPathToAliases().entrySet().iterator();
while (iterator.hasNext()) {
Entry<String, ArrayList<String>> entry = iterator.next();
String key = entry.getKey();
boolean match;
if (nonNative) {
// For non-native tables, we need to do an exact match to avoid
// HIVE-1903. (The table location contains no files, and the string
// representation of its path does not have a trailing slash.)
match =
splitPath.equals(key) || splitPathWithNoSchema.equals(key);
} else {
// But for native tables, we need to do a prefix match for
// subdirectories. (Unlike non-native tables, prefix mixups don't seem
// to be a potential problem here since we are always dealing with the
// path to something deeper than the table location.)
match =
splitPath.startsWith(key) || splitPathWithNoSchema.startsWith(key);
}
if (match) {
ArrayList<String> list = entry.getValue();
for (String val : list) {
aliases.add(val);
}
}
}
for (String alias : aliases) {
Operator<? extends OperatorDesc> op = this.mrwork.getAliasToWork().get(
alias);
if (op instanceof TableScanOperator) {
TableScanOperator ts = (TableScanOperator) op;
// push down projections.
ColumnProjectionUtils.appendReadColumns(
jobConf, ts.getNeededColumnIDs(), ts.getNeededColumns());
// push down filters
pushFilters(jobConf, ts);
}
}
}
注意这个方法:ColumnProjectionUtils.appendReadColumns,收集这个数据用到的字段。存储到的参数是
hive.io.file.readcolumn.ids 记录需要获取的字段ID
hive.io.file.readcolumn.names 记录需要获取的字段名称
在ParquetRecordReaderWrapper.getSplit里也有一个
jobConf = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent());
感觉和上面的HiveInputFormat的功能重叠,代码基本一样,可能为了兼容旧版本?不过重复添加也没有影响。
使用这个字段是在DataWritableReadSupport.init里面,从final String columns = configuration.get(IOConstants.COLUMNS);拿出所有字段,然后构造一个schema,然后结合上面的hive.io.file.readcolumn.ids,从而构造出新的schema
过滤条件的下放也是类似的情况。