我目前正在使用 PyHive (Python3.6) 将数据读取到 Hive 集群之外的服务器,然后使用 Python 执行分析。
执行分析后,我想将数据写回 Hive 服务器。在寻找解决方案时,大多数帖子都使用 PySpark。从长远来看,我们将设置我们的系统以使用 PySpark。但是,在短期内,有没有一种方法可以轻松地使用 Python 从集群外部的服务器直接将数据写入 Hive 表?
谢谢你的帮助!
我目前正在使用 PyHive (Python3.6) 将数据读取到 Hive 集群之外的服务器,然后使用 Python 执行分析。
执行分析后,我想将数据写回 Hive 服务器。在寻找解决方案时,大多数帖子都使用 PySpark。从长远来看,我们将设置我们的系统以使用 PySpark。但是,在短期内,有没有一种方法可以轻松地使用 Python 从集群外部的服务器直接将数据写入 Hive 表?
谢谢你的帮助!
您想以哪种格式将数据写入配置单元?Parquet/Avro/Binary 还是简单的 csv/文本格式?根据您在创建 hive 表时使用的 serde 选择,可以使用不同的 python 库首先将您的数据帧转换为相应的 serde,将文件存储在本地,然后您可以使用 save_to_hdfs 之类的东西(如下面的@Jared Wilber 回答)将该文件移动到 hdfs 配置单元表位置路径。
创建 hive 表(默认或外部表)时,它会从特定的 HDFS 位置(默认或提供的位置)读取/存储其数据。而这个hdfs位置可以直接访问修改数据。如果手动更新 hive 表中的数据 - SERDE、PARTITIONS、ROW FORMAT DELIMITED 等,请记住一些事情。
python中一些有用的serde库:
它需要一些挖掘,但我能够找到一种使用 sqlalchemy 直接从 pandas 数据框创建配置单元表的方法。
from sqlalchemy import create_engine
#Input Information
host = 'username@local-host'
port = 10000
schema = 'hive_schema'
table = 'new_table'
#Execution
engine = create_engine(f'hive://{host}:{port}/{schema}')
engine.execute('CREATE TABLE ' + table + ' (col1 col1-type, col2 col2-type)')
Data.to_sql(name=table, con=engine, if_exists='append')
你可以回信。将 df 的数据转换为这样的格式,就像您一次将多行插入表中一样,例如.. insert into table values (first row of dataframe comma separated ), (second row), (third row)
.... 等等;因此您可以插入。
bundle=df.assign(col='('+df[df.col[0]] + ','+df[df.col[1]] +...+df[df.col[n]]+')'+',').col.str.cat(' ')[:-1]
con.cursor().execute('insert into table table_name values'+ bundle)
你就完成了。
您可以使用该subprocess
模块。
以下功能适用于您已在本地保存的数据。例如,如果您将数据框保存到 csv,则将 csv 的名称传递给save_to_hdfs
,它将把它扔到 hdfs 中。我确定有一种方法可以直接抛出数据框,但这应该可以帮助您入门。
output
这是一个将本地对象 , 保存到user/<your_name>/<output_name>
hdfs的示例函数。
import os
from subprocess import PIPE, Popen
def save_to_hdfs(output):
"""
Save a file in local scope to hdfs.
Note, this performs a forced put - any file with the same name will be
overwritten.
"""
hdfs_path = os.path.join(os.sep, 'user', '<your_name>', output)
put = Popen(["hadoop", "fs", "-put", "-f", output, hdfs_path], stdin=PIPE, bufsize=-1)
put.communicate()
# example
df = pd.DataFrame(...)
output_file = 'yourdata.csv'
dataframe.to_csv(output_file)
save_to_hdfs(output_file)
# remove locally created file (so it doesn't pollute nodes)
os.remove(output_file)