Heloowird

Hive UDF/UDAF 的Python 实现

之前习惯使用Hadoop Streaming 处理行文本,而目前主要使用Hive SQL 或者Spark 处理大数据。Hive 大部分数据格式为ORC 格式,不适合直接使用Hadoop Streaming。

一般情况下,Hive SQL 可以满足日常统计和特征提取,然而涉及复杂的数据逻辑处理,Hive 內置函数无法胜任。这时,需要用到UDF 或者UDAF。实际上Hive SQL 会被转化成map 和reduce,并最终在Hadoop 集群上执行。所以Hive SQL 不过披上SQL 解释器这件“外衣”的Hadoop Streaming。

什么是UDF 和UDAF ?UDF 为用户自定义函数(User Defined Function),可以自定义其中的处理逻辑,运行在map 或者reduce 阶段。UDAF 和UDF 差了一个A,这个A 代表Aggregation,即用户自定义聚合函数,和 内置的SUM()、 COUNT()函数类似,用于处理聚簇数据,运行在reduce 阶段。那么接下来怎么实现UDF/UDAF ?

Hive 支持Java、Python、Scale等语言进行UDF 的编写。这里主要介绍Python 实现方式。首先,看看在Hive SQL 中如何使用UDF 命令:

1
2
3
4
5
6
7
8
9
10
......
add file udf.py;
......
select
transform(id, addr_and_name)
using 'python udf.py'
as (id, addr, poi_name)
from
test.test_udf
......

其中第2行,添加udf.py 到Hive Resource,即分发到各个运行Hive 任务的机器上。第5-7行,transform(…) using ‘xx xxx’ as (……)为Python UDF/UDAF 的固有格式。另外,有的会使用map 或者 reduce 代替transform,这里需特殊说明:map 和reduce 并不会强制对应的UDF 代码在map 阶段或reduce 阶段执行,仅便于人工阅读,作为transform 的别名(alias)。

UDF 和UDAF 的输入需用户Hive SQL中指定字段,在自定义代码中通过标准输入sys.stdin按行读取,字段之间使用tab 分隔;按行输出,列之间也使用tab 分隔:

1
2
3
4
5
6
7
8
9
10
# udf.py
import sys
def main():
for line in sys.stdin:
id, addr_and_name = line.strip('\r\n').split('\t')
addr, poi_name = addr_and_name.split('|')
print '%s\t%s\t%s' % (id, addr, poi_name)
if __name__ == "__main__":
main()

如果udf.py 需要用到第三方包时,可以使用conda 或者virtualenv 打包自定义Python环境。建议将自定义Python 环境进行压缩,然后添加到Hive Resource;使用时,为自定义Pyhton 添加可执行权限,并使用一个额外的shell 脚本进行包装:

1
2
3
4
# udf.sh
chmod -R 777 custom_python.tgz
./custom_python.tgz/custom_python/bin/python udf.py

相应地,Hive SQL 改为如下形式:

1
2
3
4
5
6
7
8
9
10
11
......
add file udf.py;
add file udf.sh;
add archive custom_python.tgz;
......
select
transform(id, addr_and_name)
using 'sh udf.sh'
as (id, addr, poi_name)
from test.test_udf
......

通过add archive 添加的文件会自动解压到和udf.py、udf.sh 同级的目录custom_python.tgz 下,注意:这里目录的名字就是custom_python.tgz,而不再是 压缩包。当然可以在.sh 文件中输出日志来确定自定义Python 环境的目录结构。以上UDF 仍按上述方法编写和使用即可。

UDAF 的编写和使用方式,与UDF 一致。只是在Hive SQL 上需要配合distribute by 或者cluster by 使用。建议在UDAF 的Python 执行文件中输出运行日志,方便确定UDAF 在reduce 阶段执行。