hive简单查询的过程

查询以下语句的过程

select fr from xxxxxx where substr(fr, 1, 1) = 'a';

Hive查询在编译的阶段把语句转换成一个个有层级关系的Operator,然后执行下去。 ExecMapper中mo是一个MapOperator对象,里面有个childOperators存储了一个树形的操作步骤(Operator),如本次查询的Operator结构如下:

TableScanOperator  表扫描
  FilterOperator         过滤条件
    SelectOperator       列裁剪,只查询fr出来
      FileSinkOperator   输出到文件

首先第一步是mo.process(),传入的value是整行内容

public void map(Object key, Object value, OutputCollector output,
      Reporter reporter) throws IOException {
...
        mo.process((Writable)value);
...
}

没做什么处理,只是做了个反序列化,放入rowWithPart[0]中

rowWithPart[0] = deserializer.deserialize((Writable) value);
...
forward(rowWithPart, rowObjectInspector); // 传递给下一个Operator

遍历它的所有子Operator,一个个做

for (int i = 0; i < childOperatorsArray.length; i++) {
      Operator<? extends OperatorDesc> o = childOperatorsArray[i];
      if (o.getDone()) {
        childrenDone++;
      } else {
        o.process(row, childOperatorsTag[i]);
      }
    }

下一个Operator是TableScan,这个只是单纯传给下一个Operator

public void processOp(Object row, int tag) throws HiveException {
    if (conf != null && conf.isGatherStats()) {
      gatherStats(row);
    }
    forward(row, inputObjInspectors[tag]);
  }

下一个Operator是FilterOperator,执行udf和判断条件是否成立,如果成立才传给下一个
这里有个计数器 PASSED记录通过的, FILTERED记录被过滤掉的。

Object condition = conditionEvaluator.evaluate(row);

从FilterOperator可以看到conditionEvaluator的expr值有

class org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual(class org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge(Column[fr], Const int 1, Const int 1(), Const string a()

这里面有一个substr的udf,来看一下他是怎么处理的。substr(fr, 1, 1) = ‘a’ ,这里很多地方都调用了get()方法,用来获取值。
udf是通过反射调用的, udf function看起来只实例化一次

@Override
  public Object evaluate(DeferredObject[] arguments) throws HiveException {
    assert (arguments.length == realArguments.length);

    // Calculate all the arguments
    for (int i = 0; i < realArguments.length; i++) {
      realArguments[i] = arguments[i].get();
    }

    // Call the function
    Object result = FunctionRegistry.invoke(udfMethod, udf, conversionHelper
        .convertIfNecessary(realArguments));

    return result;
  }

下一个Operator是SelectOperator,只输出某些列,如果是select *,就原样输出

if (conf.isSelStarNoCompute()) {
      forward(row, inputObjInspectors[tag]);
      return;
    }

我们是只取出fr的,所以有个eval[0],取出的结果放到output里面,这里输出的是output=[‘android’]

for (int i = 0; i < eval.length; i++) {
      try {
        output[i] = eval[i].evaluate(row);
      } catch (HiveException e) {
        throw e;
      } catch (RuntimeException e) {
        throw new HiveException("Error evaluating "
            + conf.getColList().get(i).getExprString(), e);
      }
    }
    forward(output, outputObjInspector);

最后一步FileSinkOperator,序列化结果写到磁盘

recordValue = serializer.serialize(row, inputObjInspectors[0]);
...
rowOutWriters[0].write(recordValue);

重复以上步骤,直到所有数据全部读完。

updatedupdated2024-12-222024-12-22