一个hiveserver2 python客户端的例子,大部分代码来自于hue。
忽略了一些必要的判断,只是做一个简单的例子。
需要安装thrift以及把hive-0.10.0-cdh4.3.0/src/service/src/gen/thrift/gen-py目录拷贝到项目目录中。
thrift文件在hive-0.10.0-cdh4.3.0/src/service/if/TCLIService.thrift
默认需要通过sasl客户端,前端的表现是连接了没反应,后端hiveserver2有错误日志
java.lang.RuntimeException: org.apache.thrift.transport.TTransportException
at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219)
如果不用sasl客户端,可以设置参数(参考stackoverflow)
<property><name>hive.server2.authentication</name><value>NOSASL</value></property>
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import os
cur_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.join(cur_dir, "gen-py"))
from TCLIService import TCLIService
from TCLIService.ttypes import TOpenSessionReq, TGetTablesReq, TFetchResultsReq,\
TStatusCode, TGetResultSetMetadataReq, TGetColumnsReq, TType,\
TExecuteStatementReq, TGetOperationStatusReq, TFetchOrientation,\
TCloseSessionReq, TGetSchemasReq, TGetLogReq, TCancelOperationReq
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
class HiveServerTColumnValue:
def __init__(self, tcolumn_value):
self.column_value = tcolumn_value
@property
def val(self):
# TODO get index from schema
if self.column_value.boolVal is not None:
return self.column_value.boolVal.value
elif self.column_value.byteVal is not None:
return self.column_value.byteVal.value
elif self.column_value.i16Val is not None:
return self.column_value.i16Val.value
elif self.column_value.i32Val is not None:
return self.column_value.i32Val.value
elif self.column_value.i64Val is not None:
return self.column_value.i64Val.value
elif self.column_value.doubleVal is not None:
return self.column_value.doubleVal.value
elif self.column_value.stringVal is not None:
return self.column_value.stringVal.value
class HiveServerClient(object):
user = 'fatkun'
session_handle = None
def connect(self):
transport = TSocket.TSocket('localhost', 10000)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
client = TCLIService.Client(protocol)
transport.open()
self._client = client
def open_session(self, username):
req = TOpenSessionReq(username=username, configuration={})
res = self._client.OpenSession(req)
session_handle = res.sessionHandle
print res
return session_handle
def call(self, fn, req, status=TStatusCode.SUCCESS_STATUS):
if self.session_handle is None:
self.session_handle = self.open_session(self.user)
if hasattr(req, 'sessionHandle') and req.sessionHandle is None:
req.sessionHandle = self.session_handle
res = fn(req)
return res
def execute_statement(self, statement, max_rows=100):
req = TExecuteStatementReq(statement=statement, confOverlay={})
res = self.call(self._client.ExecuteStatement, req)
return self.fetch_result(res.operationHandle, max_rows=max_rows)
def fetch_result(self, operation_handle, orientation=TFetchOrientation.FETCH_NEXT, max_rows=100):
fetch_req = TFetchResultsReq(operationHandle=operation_handle, orientation=orientation, maxRows=max_rows)
res = self.call(self._client.FetchResults, fetch_req)
if operation_handle.hasResultSet:
meta_req = TGetResultSetMetadataReq(operationHandle=operation_handle)
schema = self.call(self._client.GetResultSetMetadata, meta_req)
else:
schema = None
return res, schema
def main():
client = HiveServerClient()
client.connect()
client.execute_statement(statement='SET hive.server2.blocking.query=true')
statement = 'select name from test'
res, schema = client.execute_statement(statement)
for row in res.results.rows:
for column in row.colVals:
print HiveServerTColumnValue(column).val
if __name__ == '__main__':
main()