parquet-hive列裁剪和谓词下推

一直好奇parquet和hive是怎样做列裁剪(跳过某些列)的,今天跟踪了一下代码。 parquet和hive交接的代码已经在合并在hive里面了,直接看hive的代码

input format在:org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat

数据准备

CREATE TABLE parquet (x INT, y STRING) STORED AS PARQUET;
CREATE TABLE parquet2 (x INT, y STRING) STORED AS PARQUET;
CREATE TABLE test (x INT, y STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '`';
-- 手工构造数据进test表
INSERT OVERWRITE TABLE parquet SELECT x,y FROM test;
INSERT OVERWRITE TABLE parquet2 SELECT x,y FROM test;

 

测试语句

SET mapred.job.tracker=local;
SET hive.exec.mode.local.auto=true;
SET fs.defaultFS=file:///; 
-- 避免转成mapjoin
SET hive.auto.convert.join=false; 
select a.x, b.y from parquet a left join parquet2 b on (a.x = b.x);

代码分析

maptask读取数据的时候,需要先获取一个reader
MapTask(job.getInputFormat().getRecordReader)-> CombineHiveInputFormat.getRecordReader -> HiveInputFormat.pushFilters

public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) {
    // construct column name list and types for reference by filter push down
    Utilities.setColumnNameList(jobConf, tableScan); // 在conf里设置columns
    Utilities.setColumnTypeList(jobConf, tableScan); // 在conf里设置columns.types
}

从this.mrwork.getPathToAliases里面拿到table alias,然后再从this.mrwork.getAliasToWork拿到TableScanOperator,
ts.getNeededColumnIDs(), ts.getNeededColumns() 可以拿个这个表扫描需要的字段。

protected void pushProjectionsAndFilters(JobConf jobConf, Class inputFormatClass,
      String splitPath, String splitPathWithNoSchema, boolean nonNative) {
    if (this.mrwork == null) {
      init(job);
    }

    if(this.mrwork.getPathToAliases() == null) {
      return;
    }

    ArrayList<String> aliases = new ArrayList<String>();
    Iterator<Entry<String, ArrayList<String>>> iterator = this.mrwork
        .getPathToAliases().entrySet().iterator();

    while (iterator.hasNext()) {
      Entry<String, ArrayList<String>> entry = iterator.next();
      String key = entry.getKey();
      boolean match;
      if (nonNative) {
        // For non-native tables, we need to do an exact match to avoid
        // HIVE-1903.  (The table location contains no files, and the string
        // representation of its path does not have a trailing slash.)
        match =
          splitPath.equals(key) || splitPathWithNoSchema.equals(key);
      } else {
        // But for native tables, we need to do a prefix match for
        // subdirectories.  (Unlike non-native tables, prefix mixups don't seem
        // to be a potential problem here since we are always dealing with the
        // path to something deeper than the table location.)
        match =
          splitPath.startsWith(key) || splitPathWithNoSchema.startsWith(key);
      }
      if (match) {
        ArrayList<String> list = entry.getValue();
        for (String val : list) {
          aliases.add(val);
        }
      }
    }

    for (String alias : aliases) {
      Operator<? extends OperatorDesc> op = this.mrwork.getAliasToWork().get(
        alias);
      if (op instanceof TableScanOperator) {
        TableScanOperator ts = (TableScanOperator) op;
        // push down projections.
        ColumnProjectionUtils.appendReadColumns(
            jobConf, ts.getNeededColumnIDs(), ts.getNeededColumns());
        // push down filters
        pushFilters(jobConf, ts);
      }
    }
  }

注意这个方法:ColumnProjectionUtils.appendReadColumns,收集这个数据用到的字段。存储到的参数是
hive.io.file.readcolumn.ids 记录需要获取的字段ID
hive.io.file.readcolumn.names 记录需要获取的字段名称 在ParquetRecordReaderWrapper.getSplit里也有一个
jobConf = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent()); 感觉和上面的HiveInputFormat的功能重叠,代码基本一样,可能为了兼容旧版本?不过重复添加也没有影响。 使用这个字段是在DataWritableReadSupport.init里面,从final String columns = configuration.get(IOConstants.COLUMNS);拿出所有字段,然后构造一个schema,然后结合上面的hive.io.file.readcolumn.ids,从而构造出新的schema 过滤条件的下放也是类似的情况。

updatedupdated2024-08-302024-08-30