Hive – Distinct 的实现

准备数据

语句

select count, count(distinct uid) from logs group by count;
hive> select * from logs;
OK
a	苹果	3
a	橙子	3
a	烧鸡	1
b	烧鸡	3

hive> select count, count(distinct uid) from logs group by count;
1	1
3	2

根据count分组,计算独立用户数。

计算过程

hive-distinct-cal

  1. 第一步先在mapper计算部分值,会以count和uid作为key,如果是distinct并且之前已经出现过,则忽略这条计算。第一步是以<count, uid>组合为key,第二步是以count为key.
  2. ReduceSink是在mapper.close()时才执行的,在GroupByOperator.close()时,把结果输出。注意这里虽然key是count和uid,但是在reduce时分区是按count来的!
  3. 第一步的distinct计算的值没用,要留到reduce计算的才准确。这里只是减少了key组合相同的行。不过如果是普通的count,后面是会合并起来的。
  4. distinct通过比较lastInvoke判断要不要+1(因为在reduce是排序过了的,所以判断distict的字段变了没有,如果没变,则不+1)

Operator

hive-distinct-op

Explain

hive> explain select count, count(distinct uid) from logs group by count;
OK
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME logs))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL count)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL uid)))) (TOK_GROUPBY (TOK_TABLE_OR_COL count))))

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree:
        logs 
          TableScan //表扫描
            alias: logs
            Select Operator//列裁剪,取出uid,count字段就够了
              expressions:
                    expr: count
                    type: int
                    expr: uid
                    type: string
              outputColumnNames: count, uid
              Group By Operator //先来map聚集
                aggregations:
                      expr: count(DISTINCT uid) //聚集表达式
                bucketGroup: false
                keys:
                      expr: count
                      type: int
                      expr: uid
                      type: string
                mode: hash //hash方式
                outputColumnNames: _col0, _col1, _col2
                Reduce Output Operator
                  key expressions: //输出的键
                        expr: _col0 //count
                        type: int
                        expr: _col1 //uid
                        type: string
                  sort order: ++
                  Map-reduce partition columns: //这里是按group by的字段分区的
                        expr: _col0 //这里表示count
                        type: int
                  tag: -1
                  value expressions:
                        expr: _col2
                        type: bigint
      Reduce Operator Tree:
        Group By Operator //第二次聚集
          aggregations:
                expr: count(DISTINCT KEY._col1:0._col0) //uid:count
          bucketGroup: false
          keys:
                expr: KEY._col0 //count
                type: int
          mode: mergepartial //合并
          outputColumnNames: _col0, _col1
          Select Operator //列裁剪
            expressions:
                  expr: _col0
                  type: int
                  expr: _col1
                  type: bigint
            outputColumnNames: _col0, _col1
            File Output Operator //输出结果到文件
              compressed: false
              GlobalTableId: 0
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

  Stage: Stage-0
    Fetch Operator
      limit: -1
updatedupdated2024-08-302024-08-30