Hive – JOIN实现过程

准备数据

语句
SELECT a.uid,a.name,b.age FROM logs a JOIN users b ON (a.uid=b.uid);
我们希望的结果是把users表join进来获取age字段。

hive> select * from logs;
OK
a	苹果	5
a	橙子	3
b	烧鸡	1

hive> select * from users;
OK
a	23
b	21

hive> SELECT a.uid,a.name,b.age FROM logs a JOIN users b ON (a.uid=b.uid);
a	苹果	23
a	橙子	23
b	烧鸡	21

计算过程

hive-join-cal

  1. key这里后面的数字是tag,后面在reduce阶段用来区分来自于那个表的数据。tag是附属在key后面的。那为什么会把a(0)和a(1)汇集在一起了呢,是因为对先对a求了hashcode,设在了HiveKey上,所以同一个key还是在一起的。
  2. Map阶段只是拆分key和value。
  3. reduce阶段主要看它是如何把它合并起来了,从图上可以直观的看到,其实就是把tag=1的内容,都加到tag=0的后面,就是这么简单。
  4. 代码实现上,就是先临时用个变量把值存储起来在storage里面, storage(0) = [{a, 苹果}, {a, 橙子}] storage(1) = [{23}],当key变化(如a变为b)或全部结束时,会调用endGroup()方法,把内容合并起来。变成[{a,苹果,23}, {a, 橙子,23}]

Operator

hive-join-op

Explain

hive> explain SELECT a.uid,a.name,b.age FROM logs a JOIN users b ON (a.uid=b.uid);
OK

//语法树
ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME logs) a) (TOK_TABREF (TOK_TABNAME users) b) (= (. (TOK_TABLE_OR_COL a) uid) (. (TOK_TABLE_OR_COL b) uid)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) uid)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) name)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL b) age)))))

//阶段
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Alias -> Map Operator Tree: //mapper阶段
        a 
          TableScan //扫描表, 就只是一行一行的传递下去而已
            alias: a
            Reduce Output Operator //输出给reduce的内容
              key expressions: // key啦,这里的key是uid,就是我们写在ON子句那个,你可以试试加多几个条件
                    expr: uid
                    type: string
              sort order: + //排序
              Map-reduce partition columns://分区字段,貌似是和key一样的
                    expr: uid
                    type: string
              tag: 0 //用来区分这个key是来自哪个表的
              value expressions: //reduce用到的value字段
                    expr: uid
                    type: string
                    expr: name
                    type: string
        b 
          TableScan //扫描表, 就只是一行一行的传递下去而已
            alias: b
            Reduce Output Operator //输出给reduce的内容
              key expressions: //key
                    expr: uid
                    type: string
              sort order: +
              Map-reduce partition columns: //分区字段
                    expr: uid
                    type: string
              tag: 1 //用来区分这个key是来自哪个表的
              value expressions: //值
                    expr: age
                    type: int
      Reduce Operator Tree: // reduce阶段
        Join Operator // JOIN的Operator
          condition map:
               Inner Join 0 to 1 // 内连接0和1表
          condition expressions: // 第0个表有两个字段,分别是uid和name, 第1个表有一个字段age
            0 {VALUE._col0} {VALUE._col1}
            1 {VALUE._col1}
          handleSkewJoin: false //是否处理倾斜join,如果是,会分为两个MR任务
          outputColumnNames: _col0, _col1, _col6 //输出字段
          Select Operator //列裁剪(我们sql写的select字段)
            expressions:
                  expr: _col0
                  type: string
                  expr: _col1
                  type: string
                  expr: _col6
                  type: int
            outputColumnNames: _col0, _col1, _col2
            File Output Operator //把结果输出到文件
              compressed: false
              GlobalTableId: 0
              table:
                  input format: org.apache.hadoop.mapred.TextInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

  Stage: Stage-0
    Fetch Operator
      limit: -1

可以看到里面都是一个个Operator顺序的执行下来

updatedupdated2024-08-302024-08-30