动态注册hive udf

目标

可以不重启hive服务端动态增加或更新udf,本文用到动态加载以及hive hook实现。

看看原来的代码是怎样写的

我看的是cdh4.3.0代码,目前有两个地方注册udf, 一个是在org.apache.hadoop.hive.ql.exec.FunctionRegistry类里,大部分的内置udf都是在这里注册的,很多网站都是通过修改这里代码重新打包来注册udf。 第二个是最近新增了一个hive-builtins*.jar文件,注册函数是在

public SessionState(HiveConf conf) {
...
      Class<?> pluginClass = Utilities.getBuiltinUtilsClass();
      URL jarLocation = pluginClass.getProtectionDomain().getCodeSource().getLocation();
      add_builtin_resource(ResourceType.JAR, jarLocation.toString());
      FunctionRegistry.registerFunctionsFromPluginJar(jarLocation, pluginClass.getClassLoader());
...
  }

通过找到org.apache.hive.builtins.BuiltinUtils类定位jar的位置,把jar包加入hive.added.jars.path配置里,然后通过FunctionRegistry.registerFunctionsFromPluginJar来注册udf

//我们可以用这两个方法来注册udf
public static void registerTemporaryUDF(String functionName, Class<? extends UDF> UDFClass, boolean isOperator)
public static void registerFunctionsFromPluginJar(URL jarLocation, ClassLoader classLoader)

存在的问题

当我更新jar包时,发现执行的代码没有更新,发生什么问题了?原因是jar是动态加载的,udf.jar都是通过aux.path加入到当前线程里的,当一直都是同一个线程时,只是把jar放到classpath的末端,程序还是找到之前已经加载的代码。代码不能更新怎么办?使用新的类名,当找不到这个类名的时候,会到jar文件里面找。注意新jar包不要直接替换,否则jvm会奔溃。

最终解决方法

使用一个hook,通过registerFunctionsFromPluginJar来注册udf,hive hook的介绍可以看这个PPT udf的代码可以参考buildin udf那个jar包时怎么做的 hook 代码,在解析hive语句之前注册udf

public class AddUdfHook extends AbstractSemanticAnalyzerHook {
	static final private Log LOG = LogFactory
			.getLog(AddUdfHook.class.getName());

	@Override
	public void postAnalyze(HiveSemanticAnalyzerHookContext context,
			List<Task<? extends Serializable>> rootTasks)
			throws SemanticException {
		super.postAnalyze(context, rootTasks);
	}

	@Override
	public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context,
			ASTNode ast) throws SemanticException {
		LOG.warn("udf register hook");

		try {
			Class<?> pluginClass = Class.forName("cn.uc.udf.CustomUtils");
			URL jarLocation = pluginClass.getProtectionDomain().getCodeSource()
					.getLocation();
			FunctionRegistry.registerFunctionsFromPluginJar(jarLocation,
					pluginClass.getClassLoader());
		} catch (Exception ex) {
			LOG.error("load udf err", ex);
		}
		return super.preAnalyze(context, ast);
	}

编译好了需要在hive-site.xml加上hook的配置

<property>
    <name>hive.semantic.analyzer.hook</name>
    <value>cn.uc.hook.AddUdfHook</value>
  </property>
 <property>
    <name>hive.aux.jars.path</name>
    <value>file:///home/xxx/myudf.jar</value>
  </property>

全部代码下载

updatedupdated2023-12-062023-12-06