查询以下语句的过程
select fr from xxxxxx where substr(fr, 1, 1) = 'a';
Hive查询在编译的阶段把语句转换成一个个有层级关系的Operator,然后执行下去。 ExecMapper中mo是一个MapOperator对象,里面有个childOperators存储了一个树形的操作步骤(Operator),如本次查询的Operator结构如下:
TableScanOperator 表扫描
FilterOperator 过滤条件
SelectOperator 列裁剪,只查询fr出来
FileSinkOperator 输出到文件
首先第一步是mo.process(),传入的value是整行内容
public void map(Object key, Object value, OutputCollector output,
Reporter reporter) throws IOException {
...
mo.process((Writable)value);
...
}
没做什么处理,只是做了个反序列化,放入rowWithPart[0]中
rowWithPart[0] = deserializer.deserialize((Writable) value);
...
forward(rowWithPart, rowObjectInspector); // 传递给下一个Operator
遍历它的所有子Operator,一个个做
for (int i = 0; i < childOperatorsArray.length; i++) {
Operator<? extends OperatorDesc> o = childOperatorsArray[i];
if (o.getDone()) {
childrenDone++;
} else {
o.process(row, childOperatorsTag[i]);
}
}
下一个Operator是TableScan,这个只是单纯传给下一个Operator
public void processOp(Object row, int tag) throws HiveException {
if (conf != null && conf.isGatherStats()) {
gatherStats(row);
}
forward(row, inputObjInspectors[tag]);
}
下一个Operator是FilterOperator,执行udf和判断条件是否成立,如果成立才传给下一个
这里有个计数器 PASSED记录通过的, FILTERED记录被过滤掉的。
Object condition = conditionEvaluator.evaluate(row);
从FilterOperator可以看到conditionEvaluator的expr值有
class org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual(class org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge(Column[fr], Const int 1, Const int 1(), Const string a()
这里面有一个substr的udf,来看一下他是怎么处理的。substr(fr, 1, 1) = ‘a’ ,这里很多地方都调用了get()方法,用来获取值。
udf是通过反射调用的, udf function看起来只实例化一次
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
assert (arguments.length == realArguments.length);
// Calculate all the arguments
for (int i = 0; i < realArguments.length; i++) {
realArguments[i] = arguments[i].get();
}
// Call the function
Object result = FunctionRegistry.invoke(udfMethod, udf, conversionHelper
.convertIfNecessary(realArguments));
return result;
}
下一个Operator是SelectOperator,只输出某些列,如果是select *,就原样输出
if (conf.isSelStarNoCompute()) {
forward(row, inputObjInspectors[tag]);
return;
}
我们是只取出fr的,所以有个eval[0],取出的结果放到output里面,这里输出的是output=[‘android’]
for (int i = 0; i < eval.length; i++) {
try {
output[i] = eval[i].evaluate(row);
} catch (HiveException e) {
throw e;
} catch (RuntimeException e) {
throw new HiveException("Error evaluating "
+ conf.getColList().get(i).getExprString(), e);
}
}
forward(output, outputObjInspector);
最后一步FileSinkOperator,序列化结果写到磁盘
recordValue = serializer.serialize(row, inputObjInspectors[0]);
...
rowOutWriters[0].write(recordValue);
重复以上步骤,直到所有数据全部读完。