0

我目前正在使用 PyHive (Python3.6) 将数据读取到 Hive 集群之外的服务器,然后使用 Python 执行分析。

执行分析后,我想将数据写回 Hive 服务器。在寻找解决方案时,大多数帖子都使用 PySpark。从长远来看,我们将设置我们的系统以使用 PySpark。但是,在短期内,有没有一种方法可以轻松地使用 Python 从集群外部的服务器直接将数据写入 Hive 表?

谢谢你的帮助!

4

4 回答 4

0

您想以哪种格式将数据写入配置单元?Parquet/Avro/Binary 还是简单的 csv/文本格式?根据您在创建 hive 表时使用的 serde 选择,可以使用不同的 python 库首先将您的数据帧转换为相应的 serde,将文件存储在本地,然后您可以使用 save_to_hdfs 之类的东西(如下面的@Jared Wilber 回答)将该文件移动到 hdfs 配置单元表位置路径。

创建 hive 表(默认或外部表)时,它会从特定的 HDFS 位置(默认或提供的位置)读取/存储其数据。而这个hdfs位置可以直接访问修改数据。如果手动更新 hive 表中的数据 - SERDE、PARTITIONS、ROW FORMAT DELIMITED 等,请记住一些事情。

python中一些有用的serde库:

于 2018-12-03T21:52:32.430 回答
0

它需要一些挖掘,但我能够找到一种使用 sqlalchemy 直接从 pandas 数据框创建配置单元表的方法。

from sqlalchemy import create_engine

#Input Information
host = 'username@local-host'
port = 10000
schema = 'hive_schema'
table = 'new_table'


#Execution
engine = create_engine(f'hive://{host}:{port}/{schema}')
engine.execute('CREATE TABLE ' + table + ' (col1 col1-type, col2 col2-type)')
Data.to_sql(name=table, con=engine, if_exists='append')
于 2018-12-22T05:30:06.297 回答
0

你可以回信。将 df 的数据转换为这样的格式,就像您一次将多行插入表中一样,例如.. insert into table values (first row of dataframe comma separated ), (second row), (third row).... 等等;因此您可以插入。

bundle=df.assign(col='('+df[df.col[0]] + ','+df[df.col[1]] +...+df[df.col[n]]+')'+',').col.str.cat(' ')[:-1]

con.cursor().execute('insert into table table_name values'+ bundle)

你就完成了。

于 2019-06-14T11:56:08.453 回答
0

您可以使用该subprocess模块。

以下功能适用于您已在本地保存的数据。例如,如果您将数据框保存到 csv,则将 csv 的名称传递给save_to_hdfs,它将把它扔到 hdfs 中。我确定有一种方法可以直接抛出数据框,但这应该可以帮助您入门。

output这是一个将本地对象 , 保存到user/<your_name>/<output_name>hdfs的示例函数。

  import os
  from subprocess import PIPE, Popen

  def save_to_hdfs(output):
      """
      Save a file in local scope to hdfs.
      Note, this performs a forced put - any file with the same name will be 
      overwritten.
      """
      hdfs_path = os.path.join(os.sep, 'user', '<your_name>', output)
      put = Popen(["hadoop", "fs", "-put", "-f", output, hdfs_path], stdin=PIPE, bufsize=-1)
      put.communicate()

  # example
  df = pd.DataFrame(...)
  output_file = 'yourdata.csv'
  dataframe.to_csv(output_file)
  save_to_hdfs(output_file)
  # remove locally created file (so it doesn't pollute nodes)
  os.remove(output_file)
于 2018-12-03T17:11:31.603 回答